LCOV - code coverage report
Current view: top level - safekeeper/src - send_wal.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 19.2 % 452 87
Test Date: 2024-05-10 13:18:37 Functions: 9.0 % 111 10

            Line data    Source code
       1              : //! This module implements the streaming side of replication protocol, starting
       2              : //! with the "START_REPLICATION" message, and registry of walsenders.
       3              : 
       4              : use crate::handler::SafekeeperPostgresHandler;
       5              : use crate::metrics::RECEIVED_PS_FEEDBACKS;
       6              : use crate::receive_wal::WalReceivers;
       7              : use crate::safekeeper::{Term, TermLsn};
       8              : use crate::timeline::Timeline;
       9              : use crate::wal_service::ConnectionId;
      10              : use crate::wal_storage::WalReader;
      11              : use crate::GlobalTimelines;
      12              : use anyhow::{bail, Context as AnyhowContext};
      13              : use bytes::Bytes;
      14              : use parking_lot::Mutex;
      15              : use postgres_backend::PostgresBackend;
      16              : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
      17              : use postgres_ffi::get_current_timestamp;
      18              : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
      19              : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
      20              : use serde::{Deserialize, Serialize};
      21              : use tokio::io::{AsyncRead, AsyncWrite};
      22              : use utils::failpoint_support;
      23              : use utils::id::TenantTimelineId;
      24              : use utils::pageserver_feedback::PageserverFeedback;
      25              : 
      26              : use std::cmp::min;
      27              : use std::net::SocketAddr;
      28              : use std::str;
      29              : use std::sync::Arc;
      30              : use std::time::Duration;
      31              : use tokio::sync::watch::Receiver;
      32              : use tokio::time::timeout;
      33              : use tracing::*;
      34              : use utils::{bin_ser::BeSer, lsn::Lsn};
      35              : 
      36              : // See: https://www.postgresql.org/docs/13/protocol-replication.html
      37              : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
      38              : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
      39              : // neon extension of replication protocol
      40              : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
      41              : 
      42              : type FullTransactionId = u64;
      43              : 
      44              : /// Hot standby feedback received from replica
      45            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      46              : pub struct HotStandbyFeedback {
      47              :     pub ts: TimestampTz,
      48              :     pub xmin: FullTransactionId,
      49              :     pub catalog_xmin: FullTransactionId,
      50              : }
      51              : 
      52              : const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
      53              : 
      54              : impl HotStandbyFeedback {
      55        17976 :     pub fn empty() -> HotStandbyFeedback {
      56        17976 :         HotStandbyFeedback {
      57        17976 :             ts: 0,
      58        17976 :             xmin: 0,
      59        17976 :             catalog_xmin: 0,
      60        17976 :         }
      61        17976 :     }
      62              : }
      63              : 
      64              : /// Standby status update
      65            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      66              : pub struct StandbyReply {
      67              :     pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
      68              :     pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
      69              :     pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
      70              :     pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
      71              :     pub reply_requested: bool,
      72              : }
      73              : 
      74              : impl StandbyReply {
      75            8 :     fn empty() -> Self {
      76            8 :         StandbyReply {
      77            8 :             write_lsn: Lsn::INVALID,
      78            8 :             flush_lsn: Lsn::INVALID,
      79            8 :             apply_lsn: Lsn::INVALID,
      80            8 :             reply_ts: 0,
      81            8 :             reply_requested: false,
      82            8 :         }
      83            8 :     }
      84              : }
      85              : 
      86            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      87              : pub struct StandbyFeedback {
      88              :     reply: StandbyReply,
      89              :     hs_feedback: HotStandbyFeedback,
      90              : }
      91              : 
      92              : /// WalSenders registry. Timeline holds it (wrapped in Arc).
      93              : pub struct WalSenders {
      94              :     mutex: Mutex<WalSendersShared>,
      95              :     walreceivers: Arc<WalReceivers>,
      96              : }
      97              : 
      98              : impl WalSenders {
      99            0 :     pub fn new(walreceivers: Arc<WalReceivers>) -> Arc<WalSenders> {
     100            0 :         Arc::new(WalSenders {
     101            0 :             mutex: Mutex::new(WalSendersShared::new()),
     102            0 :             walreceivers,
     103            0 :         })
     104            0 :     }
     105              : 
     106              :     /// Register new walsender. Returned guard provides access to the slot and
     107              :     /// automatically deregisters in Drop.
     108            0 :     fn register(
     109            0 :         self: &Arc<WalSenders>,
     110            0 :         ttid: TenantTimelineId,
     111            0 :         addr: SocketAddr,
     112            0 :         conn_id: ConnectionId,
     113            0 :         appname: Option<String>,
     114            0 :     ) -> WalSenderGuard {
     115            0 :         let slots = &mut self.mutex.lock().slots;
     116            0 :         let walsender_state = WalSenderState {
     117            0 :             ttid,
     118            0 :             addr,
     119            0 :             conn_id,
     120            0 :             appname,
     121            0 :             feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
     122            0 :         };
     123              :         // find empty slot or create new one
     124            0 :         let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
     125            0 :             slots[pos] = Some(walsender_state);
     126            0 :             pos
     127              :         } else {
     128            0 :             let pos = slots.len();
     129            0 :             slots.push(Some(walsender_state));
     130            0 :             pos
     131              :         };
     132            0 :         WalSenderGuard {
     133            0 :             id: pos,
     134            0 :             walsenders: self.clone(),
     135            0 :         }
     136            0 :     }
     137              : 
     138              :     /// Get state of all walsenders.
     139            0 :     pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
     140            0 :         self.mutex.lock().slots.iter().flatten().cloned().collect()
     141            0 :     }
     142              : 
     143              :     /// Get LSN of the most lagging pageserver receiver. Return None if there are no
     144              :     /// active walsenders.
     145            0 :     pub fn laggard_lsn(self: &Arc<WalSenders>) -> Option<Lsn> {
     146            0 :         self.mutex
     147            0 :             .lock()
     148            0 :             .slots
     149            0 :             .iter()
     150            0 :             .flatten()
     151            0 :             .filter_map(|s| match s.feedback {
     152            0 :                 ReplicationFeedback::Pageserver(feedback) => Some(feedback.last_received_lsn),
     153            0 :                 ReplicationFeedback::Standby(_) => None,
     154            0 :             })
     155            0 :             .min()
     156            0 :     }
     157              : 
     158              :     /// Returns total counter of pageserver feedbacks received and last feedback.
     159            0 :     pub fn get_ps_feedback_stats(self: &Arc<WalSenders>) -> (u64, PageserverFeedback) {
     160            0 :         let shared = self.mutex.lock();
     161            0 :         (shared.ps_feedback_counter, shared.last_ps_feedback)
     162            0 :     }
     163              : 
     164              :     /// Get aggregated hot standby feedback (we send it to compute).
     165            0 :     pub fn get_hotstandby(self: &Arc<WalSenders>) -> HotStandbyFeedback {
     166            0 :         self.mutex.lock().agg_hs_feedback
     167            0 :     }
     168              : 
     169              :     /// Record new pageserver feedback, update aggregated values.
     170            0 :     fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
     171            0 :         let mut shared = self.mutex.lock();
     172            0 :         shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
     173            0 :         shared.last_ps_feedback = *feedback;
     174            0 :         shared.ps_feedback_counter += 1;
     175            0 :         drop(shared);
     176            0 : 
     177            0 :         RECEIVED_PS_FEEDBACKS.inc();
     178            0 : 
     179            0 :         // send feedback to connected walproposers
     180            0 :         self.walreceivers.broadcast_pageserver_feedback(*feedback);
     181            0 :     }
     182              : 
     183              :     /// Record standby reply.
     184            0 :     fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
     185            0 :         let mut shared = self.mutex.lock();
     186            0 :         let slot = shared.get_slot_mut(id);
     187            0 :         match &mut slot.feedback {
     188            0 :             ReplicationFeedback::Standby(sf) => sf.reply = *reply,
     189              :             ReplicationFeedback::Pageserver(_) => {
     190            0 :                 slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
     191            0 :                     reply: *reply,
     192            0 :                     hs_feedback: HotStandbyFeedback::empty(),
     193            0 :                 })
     194              :             }
     195              :         }
     196            0 :     }
     197              : 
     198              :     /// Record hot standby feedback, update aggregated value.
     199            0 :     fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
     200            0 :         let mut shared = self.mutex.lock();
     201            0 :         let slot = shared.get_slot_mut(id);
     202            0 :         match &mut slot.feedback {
     203            0 :             ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
     204              :             ReplicationFeedback::Pageserver(_) => {
     205            0 :                 slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
     206            0 :                     reply: StandbyReply::empty(),
     207            0 :                     hs_feedback: *feedback,
     208            0 :                 })
     209              :             }
     210              :         }
     211            0 :         shared.update_hs_feedback();
     212            0 :     }
     213              : 
     214              :     /// Get remote_consistent_lsn reported by the pageserver. Returns None if
     215              :     /// client is not pageserver.
     216            0 :     fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
     217            0 :         let shared = self.mutex.lock();
     218            0 :         let slot = shared.get_slot(id);
     219            0 :         match slot.feedback {
     220            0 :             ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
     221            0 :             _ => None,
     222              :         }
     223            0 :     }
     224              : 
     225              :     /// Unregister walsender.
     226            0 :     fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
     227            0 :         let mut shared = self.mutex.lock();
     228            0 :         shared.slots[id] = None;
     229            0 :         shared.update_hs_feedback();
     230            0 :     }
     231              : }
     232              : 
     233              : struct WalSendersShared {
     234              :     // aggregated over all walsenders value
     235              :     agg_hs_feedback: HotStandbyFeedback,
     236              :     // last feedback ever received from any pageserver, empty if none
     237              :     last_ps_feedback: PageserverFeedback,
     238              :     // total counter of pageserver feedbacks received
     239              :     ps_feedback_counter: u64,
     240              :     slots: Vec<Option<WalSenderState>>,
     241              : }
     242              : 
     243              : impl WalSendersShared {
     244            4 :     fn new() -> Self {
     245            4 :         WalSendersShared {
     246            4 :             agg_hs_feedback: HotStandbyFeedback::empty(),
     247            4 :             last_ps_feedback: PageserverFeedback::empty(),
     248            4 :             ps_feedback_counter: 0,
     249            4 :             slots: Vec::new(),
     250            4 :         }
     251            4 :     }
     252              : 
     253              :     /// Get content of provided id slot, it must exist.
     254            0 :     fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
     255            0 :         self.slots[id].as_ref().expect("walsender doesn't exist")
     256            0 :     }
     257              : 
     258              :     /// Get mut content of provided id slot, it must exist.
     259            0 :     fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
     260            0 :         self.slots[id].as_mut().expect("walsender doesn't exist")
     261            0 :     }
     262              : 
     263              :     /// Update aggregated hot standy feedback. We just take min of valid xmins
     264              :     /// and ts.
     265            4 :     fn update_hs_feedback(&mut self) {
     266            4 :         let mut agg = HotStandbyFeedback::empty();
     267            8 :         for ws_state in self.slots.iter().flatten() {
     268            8 :             if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
     269            8 :                 let hs_feedback = standby_feedback.hs_feedback;
     270            8 :                 // doing Option math like op1.iter().chain(op2.iter()).min()
     271            8 :                 // would be nicer, but we serialize/deserialize this struct
     272            8 :                 // directly, so leave as is for now
     273            8 :                 if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
     274            4 :                     if agg.xmin != INVALID_FULL_TRANSACTION_ID {
     275            2 :                         agg.xmin = min(agg.xmin, hs_feedback.xmin);
     276            2 :                     } else {
     277            2 :                         agg.xmin = hs_feedback.xmin;
     278            2 :                     }
     279            4 :                     agg.ts = min(agg.ts, hs_feedback.ts);
     280            4 :                 }
     281            8 :                 if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     282            0 :                     if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     283            0 :                         agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
     284            0 :                     } else {
     285            0 :                         agg.catalog_xmin = hs_feedback.catalog_xmin;
     286            0 :                     }
     287            0 :                     agg.ts = min(agg.ts, hs_feedback.ts);
     288            8 :                 }
     289            0 :             }
     290              :         }
     291            4 :         self.agg_hs_feedback = agg;
     292            4 :     }
     293              : }
     294              : 
     295              : // Serialized is used only for pretty printing in json.
     296            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     297              : pub struct WalSenderState {
     298              :     ttid: TenantTimelineId,
     299              :     addr: SocketAddr,
     300              :     conn_id: ConnectionId,
     301              :     // postgres application_name
     302              :     appname: Option<String>,
     303              :     feedback: ReplicationFeedback,
     304              : }
     305              : 
     306              : // Receiver is either pageserver or regular standby, which have different
     307              : // feedbacks.
     308            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     309              : enum ReplicationFeedback {
     310              :     Pageserver(PageserverFeedback),
     311              :     Standby(StandbyFeedback),
     312              : }
     313              : 
     314              : // id of the occupied slot in WalSenders to access it (and save in the
     315              : // WalSenderGuard). We could give Arc directly to the slot, but there is not
     316              : // much sense in that as values aggregation which is performed on each feedback
     317              : // receival iterates over all walsenders.
     318              : pub type WalSenderId = usize;
     319              : 
     320              : /// Scope guard to access slot in WalSenders registry and unregister from it in
     321              : /// Drop.
     322              : pub struct WalSenderGuard {
     323              :     id: WalSenderId,
     324              :     walsenders: Arc<WalSenders>,
     325              : }
     326              : 
     327              : impl Drop for WalSenderGuard {
     328            0 :     fn drop(&mut self) {
     329            0 :         self.walsenders.unregister(self.id);
     330            0 :     }
     331              : }
     332              : 
     333              : impl SafekeeperPostgresHandler {
     334              :     /// Wrapper around handle_start_replication_guts handling result. Error is
     335              :     /// handled here while we're still in walsender ttid span; with API
     336              :     /// extension, this can probably be moved into postgres_backend.
     337            0 :     pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
     338            0 :         &mut self,
     339            0 :         pgb: &mut PostgresBackend<IO>,
     340            0 :         start_pos: Lsn,
     341            0 :         term: Option<Term>,
     342            0 :     ) -> Result<(), QueryError> {
     343            0 :         if let Err(end) = self
     344            0 :             .handle_start_replication_guts(pgb, start_pos, term)
     345            0 :             .await
     346              :         {
     347              :             // Log the result and probably send it to the client, closing the stream.
     348            0 :             pgb.handle_copy_stream_end(end).await;
     349            0 :         }
     350            0 :         Ok(())
     351            0 :     }
     352              : 
     353            0 :     pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
     354            0 :         &mut self,
     355            0 :         pgb: &mut PostgresBackend<IO>,
     356            0 :         start_pos: Lsn,
     357            0 :         term: Option<Term>,
     358            0 :     ) -> Result<(), CopyStreamHandlerEnd> {
     359            0 :         let appname = self.appname.clone();
     360            0 :         let tli =
     361            0 :             GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
     362              : 
     363              :         // Use a guard object to remove our entry from the timeline when we are done.
     364            0 :         let ws_guard = Arc::new(tli.get_walsenders().register(
     365            0 :             self.ttid,
     366            0 :             *pgb.get_peer_addr(),
     367            0 :             self.conn_id,
     368            0 :             self.appname.clone(),
     369            0 :         ));
     370              : 
     371              :         // Walsender can operate in one of two modes which we select by
     372              :         // application_name: give only committed WAL (used by pageserver) or all
     373              :         // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
     374              :         // The second case is always driven by a consensus leader which term
     375              :         // must be supplied.
     376            0 :         let end_watch = if term.is_some() {
     377            0 :             EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
     378              :         } else {
     379            0 :             EndWatch::Commit(tli.get_commit_lsn_watch_rx())
     380              :         };
     381              :         // we don't check term here; it will be checked on first waiting/WAL reading anyway.
     382            0 :         let end_pos = end_watch.get();
     383            0 : 
     384            0 :         if end_pos < start_pos {
     385            0 :             warn!(
     386            0 :                 "requested start_pos {} is ahead of available WAL end_pos {}",
     387              :                 start_pos, end_pos
     388              :             );
     389            0 :         }
     390              : 
     391            0 :         info!(
     392            0 :             "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
     393              :             start_pos,
     394              :             end_pos,
     395            0 :             matches!(end_watch, EndWatch::Flush(_)),
     396              :             appname
     397              :         );
     398              : 
     399              :         // switch to copy
     400            0 :         pgb.write_message(&BeMessage::CopyBothResponse).await?;
     401              : 
     402            0 :         let (_, persisted_state) = tli.get_state().await;
     403            0 :         let wal_reader = WalReader::new(
     404            0 :             self.conf.workdir.clone(),
     405            0 :             self.conf.timeline_dir(&tli.ttid),
     406            0 :             &persisted_state,
     407            0 :             start_pos,
     408            0 :             self.conf.is_wal_backup_enabled(),
     409            0 :         )?;
     410              : 
     411              :         // Split to concurrently receive and send data; replies are generally
     412              :         // not synchronized with sends, so this avoids deadlocks.
     413            0 :         let reader = pgb.split().context("START_REPLICATION split")?;
     414              : 
     415            0 :         let mut sender = WalSender {
     416            0 :             pgb,
     417            0 :             tli: tli.clone(),
     418            0 :             appname,
     419            0 :             start_pos,
     420            0 :             end_pos,
     421            0 :             term,
     422            0 :             end_watch,
     423            0 :             ws_guard: ws_guard.clone(),
     424            0 :             wal_reader,
     425            0 :             send_buf: [0; MAX_SEND_SIZE],
     426            0 :         };
     427            0 :         let mut reply_reader = ReplyReader {
     428            0 :             reader,
     429            0 :             ws_guard: ws_guard.clone(),
     430            0 :             tli,
     431            0 :         };
     432              : 
     433            0 :         let res = tokio::select! {
     434              :             // todo: add read|write .context to these errors
     435              :             r = sender.run() => r,
     436              :             r = reply_reader.run() => r,
     437              :         };
     438              : 
     439            0 :         let ws_state = ws_guard
     440            0 :             .walsenders
     441            0 :             .mutex
     442            0 :             .lock()
     443            0 :             .get_slot(ws_guard.id)
     444            0 :             .clone();
     445            0 :         info!(
     446            0 :             "finished streaming to {}, feedback={:?}",
     447              :             ws_state.addr, ws_state.feedback,
     448              :         );
     449              : 
     450              :         // Join pg backend back.
     451            0 :         pgb.unsplit(reply_reader.reader)?;
     452              : 
     453            0 :         res
     454            0 :     }
     455              : }
     456              : 
     457              : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
     458              : /// given term (recovery by walproposer or peer safekeeper).
     459              : enum EndWatch {
     460              :     Commit(Receiver<Lsn>),
     461              :     Flush(Receiver<TermLsn>),
     462              : }
     463              : 
     464              : impl EndWatch {
     465              :     /// Get current end of WAL.
     466            0 :     fn get(&self) -> Lsn {
     467            0 :         match self {
     468            0 :             EndWatch::Commit(r) => *r.borrow(),
     469            0 :             EndWatch::Flush(r) => r.borrow().lsn,
     470              :         }
     471            0 :     }
     472              : 
     473              :     /// Wait for the update.
     474            0 :     async fn changed(&mut self) -> anyhow::Result<()> {
     475            0 :         match self {
     476            0 :             EndWatch::Commit(r) => r.changed().await?,
     477            0 :             EndWatch::Flush(r) => r.changed().await?,
     478              :         }
     479            0 :         Ok(())
     480            0 :     }
     481              : }
     482              : 
     483              : /// A half driving sending WAL.
     484              : struct WalSender<'a, IO> {
     485              :     pgb: &'a mut PostgresBackend<IO>,
     486              :     tli: Arc<Timeline>,
     487              :     appname: Option<String>,
     488              :     // Position since which we are sending next chunk.
     489              :     start_pos: Lsn,
     490              :     // WAL up to this position is known to be locally available.
     491              :     // Usually this is the same as the latest commit_lsn, but in case of
     492              :     // walproposer recovery, this is flush_lsn.
     493              :     //
     494              :     // We send this LSN to the receiver as wal_end, so that it knows how much
     495              :     // WAL this safekeeper has. This LSN should be as fresh as possible.
     496              :     end_pos: Lsn,
     497              :     /// When streaming uncommitted part, the term the client acts as the leader
     498              :     /// in. Streaming is stopped if local term changes to a different (higher)
     499              :     /// value.
     500              :     term: Option<Term>,
     501              :     /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
     502              :     end_watch: EndWatch,
     503              :     ws_guard: Arc<WalSenderGuard>,
     504              :     wal_reader: WalReader,
     505              :     // buffer for readling WAL into to send it
     506              :     send_buf: [u8; MAX_SEND_SIZE],
     507              : }
     508              : 
     509              : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
     510              : 
     511              : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
     512              :     /// Send WAL until
     513              :     /// - an error occurs
     514              :     /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
     515              :     ///
     516              :     /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
     517              :     /// convenience.
     518            0 :     async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     519              :         loop {
     520              :             // Wait for the next portion if it is not there yet, or just
     521              :             // update our end of WAL available for sending value, we
     522              :             // communicate it to the receiver.
     523            0 :             self.wait_wal().await?;
     524            0 :             assert!(
     525            0 :                 self.end_pos > self.start_pos,
     526            0 :                 "nothing to send after waiting for WAL"
     527              :             );
     528              : 
     529              :             // try to send as much as available, capped by MAX_SEND_SIZE
     530            0 :             let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
     531            0 :             // if we went behind available WAL, back off
     532            0 :             if chunk_end_pos >= self.end_pos {
     533            0 :                 chunk_end_pos = self.end_pos;
     534            0 :             } else {
     535            0 :                 // If sending not up to end pos, round down to page boundary to
     536            0 :                 // avoid breaking WAL record not at page boundary, as protocol
     537            0 :                 // demands. See walsender.c (XLogSendPhysical).
     538            0 :                 chunk_end_pos = chunk_end_pos
     539            0 :                     .checked_sub(chunk_end_pos.block_offset())
     540            0 :                     .unwrap();
     541            0 :             }
     542            0 :             let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
     543            0 :             let send_buf = &mut self.send_buf[..send_size];
     544              :             let send_size: usize;
     545              :             {
     546              :                 // If uncommitted part is being pulled, check that the term is
     547              :                 // still the expected one.
     548            0 :                 let _term_guard = if let Some(t) = self.term {
     549            0 :                     Some(self.tli.acquire_term(t).await?)
     550              :                 } else {
     551            0 :                     None
     552              :                 };
     553              :                 // Read WAL into buffer. send_size can be additionally capped to
     554              :                 // segment boundary here.
     555            0 :                 send_size = self.wal_reader.read(send_buf).await?
     556              :             };
     557            0 :             let send_buf = &send_buf[..send_size];
     558            0 : 
     559            0 :             // and send it
     560            0 :             self.pgb
     561            0 :                 .write_message(&BeMessage::XLogData(XLogDataBody {
     562            0 :                     wal_start: self.start_pos.0,
     563            0 :                     wal_end: self.end_pos.0,
     564            0 :                     timestamp: get_current_timestamp(),
     565            0 :                     data: send_buf,
     566            0 :                 }))
     567            0 :                 .await?;
     568              : 
     569            0 :             if let Some(appname) = &self.appname {
     570            0 :                 if appname == "replica" {
     571            0 :                     failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
     572            0 :                 }
     573            0 :             }
     574            0 :             trace!(
     575            0 :                 "sent {} bytes of WAL {}-{}",
     576            0 :                 send_size,
     577            0 :                 self.start_pos,
     578            0 :                 self.start_pos + send_size as u64
     579              :             );
     580            0 :             self.start_pos += send_size as u64;
     581              :         }
     582            0 :     }
     583              : 
     584              :     /// wait until we have WAL to stream, sending keepalives and checking for
     585              :     /// exit in the meanwhile
     586            0 :     async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     587              :         loop {
     588            0 :             self.end_pos = self.end_watch.get();
     589            0 :             let have_something_to_send = (|| {
     590            0 :                 fail::fail_point!(
     591            0 :                     "sk-pause-send",
     592            0 :                     self.appname.as_deref() != Some("pageserver"),
     593            0 :                     |_| { false }
     594              :                 );
     595            0 :                 self.end_pos > self.start_pos
     596            0 :             })();
     597            0 : 
     598            0 :             if have_something_to_send {
     599            0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     600            0 :                 return Ok(());
     601            0 :             }
     602              : 
     603              :             // Wait for WAL to appear, now self.end_pos == self.start_pos.
     604            0 :             if let Some(lsn) = self.wait_for_lsn().await? {
     605            0 :                 self.end_pos = lsn;
     606            0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     607            0 :                 return Ok(());
     608            0 :             }
     609            0 : 
     610            0 :             // Timed out waiting for WAL, check for termination and send KA.
     611            0 :             // Check for termination only if we are streaming up to commit_lsn
     612            0 :             // (to pageserver).
     613            0 :             if let EndWatch::Commit(_) = self.end_watch {
     614            0 :                 if let Some(remote_consistent_lsn) = self
     615            0 :                     .ws_guard
     616            0 :                     .walsenders
     617            0 :                     .get_ws_remote_consistent_lsn(self.ws_guard.id)
     618              :                 {
     619            0 :                     if self.tli.should_walsender_stop(remote_consistent_lsn).await {
     620              :                         // Terminate if there is nothing more to send.
     621              :                         // Note that "ending streaming" part of the string is used by
     622              :                         // pageserver to identify WalReceiverError::SuccessfulCompletion,
     623              :                         // do not change this string without updating pageserver.
     624            0 :                         return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
     625            0 :                         "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
     626            0 :                         self.appname, self.start_pos,
     627            0 :                     )));
     628            0 :                     }
     629            0 :                 }
     630            0 :             }
     631              : 
     632            0 :             self.pgb
     633            0 :                 .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
     634            0 :                     wal_end: self.end_pos.0,
     635            0 :                     timestamp: get_current_timestamp(),
     636            0 :                     request_reply: true,
     637            0 :                 }))
     638            0 :                 .await?;
     639              :         }
     640            0 :     }
     641              : 
     642              :     /// Wait until we have available WAL > start_pos or timeout expires. Returns
     643              :     /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
     644              :     /// - Ok(None) if timeout expired;
     645              :     /// - Err in case of error -- only if 1) term changed while fetching in recovery
     646              :     ///   mode 2) watch channel closed, which must never happen.
     647            0 :     async fn wait_for_lsn(&mut self) -> anyhow::Result<Option<Lsn>> {
     648            0 :         let fp = (|| {
     649            0 :             fail::fail_point!(
     650            0 :                 "sk-pause-send",
     651            0 :                 self.appname.as_deref() != Some("pageserver"),
     652            0 :                 |_| { true }
     653              :             );
     654            0 :             false
     655            0 :         })();
     656            0 :         if fp {
     657            0 :             tokio::time::sleep(POLL_STATE_TIMEOUT).await;
     658            0 :             return Ok(None);
     659            0 :         }
     660              : 
     661            0 :         let res = timeout(POLL_STATE_TIMEOUT, async move {
     662              :             loop {
     663            0 :                 let end_pos = self.end_watch.get();
     664            0 :                 if end_pos > self.start_pos {
     665            0 :                     return Ok(end_pos);
     666            0 :                 }
     667            0 :                 if let EndWatch::Flush(rx) = &self.end_watch {
     668            0 :                     let curr_term = rx.borrow().term;
     669            0 :                     if let Some(client_term) = self.term {
     670            0 :                         if curr_term != client_term {
     671            0 :                             bail!("term changed: requested {}, now {}", client_term, curr_term);
     672            0 :                         }
     673            0 :                     }
     674            0 :                 }
     675            0 :                 self.end_watch.changed().await?;
     676              :             }
     677            0 :         })
     678            0 :         .await;
     679              : 
     680            0 :         match res {
     681              :             // success
     682            0 :             Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
     683              :             // error inside closure
     684            0 :             Ok(Err(err)) => Err(err),
     685              :             // timeout
     686            0 :             Err(_) => Ok(None),
     687              :         }
     688            0 :     }
     689              : }
     690              : 
     691              : /// A half driving receiving replies.
     692              : struct ReplyReader<IO> {
     693              :     reader: PostgresBackendReader<IO>,
     694              :     ws_guard: Arc<WalSenderGuard>,
     695              :     tli: Arc<Timeline>,
     696              : }
     697              : 
     698              : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
     699            0 :     async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     700              :         loop {
     701            0 :             let msg = self.reader.read_copy_message().await?;
     702            0 :             self.handle_feedback(&msg).await?
     703              :         }
     704            0 :     }
     705              : 
     706            0 :     async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
     707            0 :         match msg.first().cloned() {
     708            0 :             Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
     709            0 :                 // Note: deserializing is on m[1..] because we skip the tag byte.
     710            0 :                 let hs_feedback = HotStandbyFeedback::des(&msg[1..])
     711            0 :                     .context("failed to deserialize HotStandbyFeedback")?;
     712            0 :                 self.ws_guard
     713            0 :                     .walsenders
     714            0 :                     .record_hs_feedback(self.ws_guard.id, &hs_feedback);
     715              :             }
     716            0 :             Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
     717            0 :                 let reply =
     718            0 :                     StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
     719            0 :                 self.ws_guard
     720            0 :                     .walsenders
     721            0 :                     .record_standby_reply(self.ws_guard.id, &reply);
     722              :             }
     723              :             Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
     724              :                 // pageserver sends this.
     725              :                 // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
     726            0 :                 let buf = Bytes::copy_from_slice(&msg[9..]);
     727            0 :                 let ps_feedback = PageserverFeedback::parse(buf);
     728            0 : 
     729            0 :                 trace!("PageserverFeedback is {:?}", ps_feedback);
     730            0 :                 self.ws_guard
     731            0 :                     .walsenders
     732            0 :                     .record_ps_feedback(self.ws_guard.id, &ps_feedback);
     733            0 :                 self.tli
     734            0 :                     .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
     735            0 :                     .await;
     736              :                 // in principle new remote_consistent_lsn could allow to
     737              :                 // deactivate the timeline, but we check that regularly through
     738              :                 // broker updated, not need to do it here
     739              :             }
     740            0 :             _ => warn!("unexpected message {:?}", msg),
     741              :         }
     742            0 :         Ok(())
     743            0 :     }
     744              : }
     745              : 
     746              : #[cfg(test)]
     747              : mod tests {
     748              :     use utils::id::{TenantId, TimelineId};
     749              : 
     750              :     use super::*;
     751              : 
     752            8 :     fn mock_ttid() -> TenantTimelineId {
     753            8 :         TenantTimelineId {
     754            8 :             tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
     755            8 :             timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
     756            8 :         }
     757            8 :     }
     758              : 
     759            8 :     fn mock_addr() -> SocketAddr {
     760            8 :         "127.0.0.1:8080".parse().unwrap()
     761            8 :     }
     762              : 
     763              :     // add to wss specified feedback setting other fields to dummy values
     764            8 :     fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
     765            8 :         let walsender_state = WalSenderState {
     766            8 :             ttid: mock_ttid(),
     767            8 :             addr: mock_addr(),
     768            8 :             conn_id: 1,
     769            8 :             appname: None,
     770            8 :             feedback,
     771            8 :         };
     772            8 :         wss.slots.push(Some(walsender_state))
     773            8 :     }
     774              : 
     775              :     // form standby feedback with given hot standby feedback ts/xmin and the
     776              :     // rest set to dummy values.
     777            8 :     fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
     778            8 :         ReplicationFeedback::Standby(StandbyFeedback {
     779            8 :             reply: StandbyReply::empty(),
     780            8 :             hs_feedback: HotStandbyFeedback {
     781            8 :                 ts,
     782            8 :                 xmin,
     783            8 :                 catalog_xmin: 0,
     784            8 :             },
     785            8 :         })
     786            8 :     }
     787              : 
     788              :     // test that hs aggregation works as expected
     789              :     #[test]
     790            2 :     fn test_hs_feedback_no_valid() {
     791            2 :         let mut wss = WalSendersShared::new();
     792            2 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
     793            2 :         wss.update_hs_feedback();
     794            2 :         assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID);
     795            2 :     }
     796              : 
     797              :     #[test]
     798            2 :     fn test_hs_feedback() {
     799            2 :         let mut wss = WalSendersShared::new();
     800            2 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
     801            2 :         push_feedback(&mut wss, hs_feedback(1, 42));
     802            2 :         push_feedback(&mut wss, hs_feedback(1, 64));
     803            2 :         wss.update_hs_feedback();
     804            2 :         assert_eq!(wss.agg_hs_feedback.xmin, 42);
     805            2 :     }
     806              : }
        

Generated by: LCOV version 2.1-beta