LCOV - code coverage report
Current view: top level - safekeeper/src - send_wal.rs (source / functions) Coverage Total Hit
Test: bb45db3982713bfd5bec075773079136e362195e.info Lines: 19.3 % 565 109
Test Date: 2024-12-11 15:53:32 Functions: 9.3 % 118 11

            Line data    Source code
       1              : //! This module implements the streaming side of replication protocol, starting
       2              : //! with the "START_REPLICATION" message, and registry of walsenders.
       3              : 
       4              : use crate::handler::SafekeeperPostgresHandler;
       5              : use crate::metrics::RECEIVED_PS_FEEDBACKS;
       6              : use crate::receive_wal::WalReceivers;
       7              : use crate::safekeeper::{Term, TermLsn};
       8              : use crate::send_interpreted_wal::InterpretedWalSender;
       9              : use crate::timeline::WalResidentTimeline;
      10              : use crate::wal_reader_stream::WalReaderStreamBuilder;
      11              : use crate::wal_service::ConnectionId;
      12              : use crate::wal_storage::WalReader;
      13              : use anyhow::{bail, Context as AnyhowContext};
      14              : use bytes::Bytes;
      15              : use futures::future::Either;
      16              : use parking_lot::Mutex;
      17              : use postgres_backend::PostgresBackend;
      18              : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
      19              : use postgres_ffi::get_current_timestamp;
      20              : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
      21              : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
      22              : use serde::{Deserialize, Serialize};
      23              : use tokio::io::{AsyncRead, AsyncWrite};
      24              : use utils::failpoint_support;
      25              : use utils::id::TenantTimelineId;
      26              : use utils::pageserver_feedback::PageserverFeedback;
      27              : use utils::postgres_client::PostgresClientProtocol;
      28              : 
      29              : use std::cmp::{max, min};
      30              : use std::net::SocketAddr;
      31              : use std::str;
      32              : use std::sync::Arc;
      33              : use std::time::Duration;
      34              : use tokio::sync::watch::Receiver;
      35              : use tokio::time::timeout;
      36              : use tracing::*;
      37              : use utils::{bin_ser::BeSer, lsn::Lsn};
      38              : 
      39              : // See: https://www.postgresql.org/docs/13/protocol-replication.html
      40              : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
      41              : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
      42              : // neon extension of replication protocol
      43              : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
      44              : 
      45              : type FullTransactionId = u64;
      46              : 
      47              : /// Hot standby feedback received from replica
      48            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      49              : pub struct HotStandbyFeedback {
      50              :     pub ts: TimestampTz,
      51              :     pub xmin: FullTransactionId,
      52              :     pub catalog_xmin: FullTransactionId,
      53              : }
      54              : 
      55              : const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
      56              : 
      57              : impl HotStandbyFeedback {
      58         3038 :     pub fn empty() -> HotStandbyFeedback {
      59         3038 :         HotStandbyFeedback {
      60         3038 :             ts: 0,
      61         3038 :             xmin: 0,
      62         3038 :             catalog_xmin: 0,
      63         3038 :         }
      64         3038 :     }
      65              : }
      66              : 
      67              : /// Standby status update
      68            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      69              : pub struct StandbyReply {
      70              :     pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
      71              :     pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
      72              :     pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
      73              :     pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
      74              :     pub reply_requested: bool,
      75              : }
      76              : 
      77              : impl StandbyReply {
      78            8 :     fn empty() -> Self {
      79            8 :         StandbyReply {
      80            8 :             write_lsn: Lsn::INVALID,
      81            8 :             flush_lsn: Lsn::INVALID,
      82            8 :             apply_lsn: Lsn::INVALID,
      83            8 :             reply_ts: 0,
      84            8 :             reply_requested: false,
      85            8 :         }
      86            8 :     }
      87              : }
      88              : 
      89            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      90              : pub struct StandbyFeedback {
      91              :     pub reply: StandbyReply,
      92              :     pub hs_feedback: HotStandbyFeedback,
      93              : }
      94              : 
      95              : impl StandbyFeedback {
      96            2 :     pub fn empty() -> Self {
      97            2 :         StandbyFeedback {
      98            2 :             reply: StandbyReply::empty(),
      99            2 :             hs_feedback: HotStandbyFeedback::empty(),
     100            2 :         }
     101            2 :     }
     102              : }
     103              : 
     104              : /// WalSenders registry. Timeline holds it (wrapped in Arc).
     105              : pub struct WalSenders {
     106              :     mutex: Mutex<WalSendersShared>,
     107              :     walreceivers: Arc<WalReceivers>,
     108              : }
     109              : 
     110              : impl WalSenders {
     111            0 :     pub fn new(walreceivers: Arc<WalReceivers>) -> Arc<WalSenders> {
     112            0 :         Arc::new(WalSenders {
     113            0 :             mutex: Mutex::new(WalSendersShared::new()),
     114            0 :             walreceivers,
     115            0 :         })
     116            0 :     }
     117              : 
     118              :     /// Register new walsender. Returned guard provides access to the slot and
     119              :     /// automatically deregisters in Drop.
     120            0 :     fn register(
     121            0 :         self: &Arc<WalSenders>,
     122            0 :         ttid: TenantTimelineId,
     123            0 :         addr: SocketAddr,
     124            0 :         conn_id: ConnectionId,
     125            0 :         appname: Option<String>,
     126            0 :     ) -> WalSenderGuard {
     127            0 :         let slots = &mut self.mutex.lock().slots;
     128            0 :         let walsender_state = WalSenderState {
     129            0 :             ttid,
     130            0 :             addr,
     131            0 :             conn_id,
     132            0 :             appname,
     133            0 :             feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
     134            0 :         };
     135              :         // find empty slot or create new one
     136            0 :         let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
     137            0 :             slots[pos] = Some(walsender_state);
     138            0 :             pos
     139              :         } else {
     140            0 :             let pos = slots.len();
     141            0 :             slots.push(Some(walsender_state));
     142            0 :             pos
     143              :         };
     144            0 :         WalSenderGuard {
     145            0 :             id: pos,
     146            0 :             walsenders: self.clone(),
     147            0 :         }
     148            0 :     }
     149              : 
     150              :     /// Get state of all walsenders.
     151            0 :     pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
     152            0 :         self.mutex.lock().slots.iter().flatten().cloned().collect()
     153            0 :     }
     154              : 
     155              :     /// Get LSN of the most lagging pageserver receiver. Return None if there are no
     156              :     /// active walsenders.
     157            0 :     pub fn laggard_lsn(self: &Arc<WalSenders>) -> Option<Lsn> {
     158            0 :         self.mutex
     159            0 :             .lock()
     160            0 :             .slots
     161            0 :             .iter()
     162            0 :             .flatten()
     163            0 :             .filter_map(|s| match s.feedback {
     164            0 :                 ReplicationFeedback::Pageserver(feedback) => Some(feedback.last_received_lsn),
     165            0 :                 ReplicationFeedback::Standby(_) => None,
     166            0 :             })
     167            0 :             .min()
     168            0 :     }
     169              : 
     170              :     /// Returns total counter of pageserver feedbacks received and last feedback.
     171            0 :     pub fn get_ps_feedback_stats(self: &Arc<WalSenders>) -> (u64, PageserverFeedback) {
     172            0 :         let shared = self.mutex.lock();
     173            0 :         (shared.ps_feedback_counter, shared.last_ps_feedback)
     174            0 :     }
     175              : 
     176              :     /// Get aggregated hot standby feedback (we send it to compute).
     177            0 :     pub fn get_hotstandby(self: &Arc<WalSenders>) -> StandbyFeedback {
     178            0 :         self.mutex.lock().agg_standby_feedback
     179            0 :     }
     180              : 
     181              :     /// Record new pageserver feedback, update aggregated values.
     182            0 :     fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
     183            0 :         let mut shared = self.mutex.lock();
     184            0 :         shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
     185            0 :         shared.last_ps_feedback = *feedback;
     186            0 :         shared.ps_feedback_counter += 1;
     187            0 :         drop(shared);
     188            0 : 
     189            0 :         RECEIVED_PS_FEEDBACKS.inc();
     190            0 : 
     191            0 :         // send feedback to connected walproposers
     192            0 :         self.walreceivers.broadcast_pageserver_feedback(*feedback);
     193            0 :     }
     194              : 
     195              :     /// Record standby reply.
     196            0 :     fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
     197            0 :         let mut shared = self.mutex.lock();
     198            0 :         let slot = shared.get_slot_mut(id);
     199            0 :         debug!(
     200            0 :             "Record standby reply: ts={} apply_lsn={}",
     201              :             reply.reply_ts, reply.apply_lsn
     202              :         );
     203            0 :         match &mut slot.feedback {
     204            0 :             ReplicationFeedback::Standby(sf) => sf.reply = *reply,
     205              :             ReplicationFeedback::Pageserver(_) => {
     206            0 :                 slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
     207            0 :                     reply: *reply,
     208            0 :                     hs_feedback: HotStandbyFeedback::empty(),
     209            0 :                 })
     210              :             }
     211              :         }
     212            0 :     }
     213              : 
     214              :     /// Record hot standby feedback, update aggregated value.
     215            0 :     fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
     216            0 :         let mut shared = self.mutex.lock();
     217            0 :         let slot = shared.get_slot_mut(id);
     218            0 :         match &mut slot.feedback {
     219            0 :             ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
     220              :             ReplicationFeedback::Pageserver(_) => {
     221            0 :                 slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
     222            0 :                     reply: StandbyReply::empty(),
     223            0 :                     hs_feedback: *feedback,
     224            0 :                 })
     225              :             }
     226              :         }
     227            0 :         shared.update_reply_feedback();
     228            0 :     }
     229              : 
     230              :     /// Get remote_consistent_lsn reported by the pageserver. Returns None if
     231              :     /// client is not pageserver.
     232            0 :     pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
     233            0 :         let shared = self.mutex.lock();
     234            0 :         let slot = shared.get_slot(id);
     235            0 :         match slot.feedback {
     236            0 :             ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
     237            0 :             _ => None,
     238              :         }
     239            0 :     }
     240              : 
     241              :     /// Unregister walsender.
     242            0 :     fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
     243            0 :         let mut shared = self.mutex.lock();
     244            0 :         shared.slots[id] = None;
     245            0 :         shared.update_reply_feedback();
     246            0 :     }
     247              : }
     248              : 
     249              : struct WalSendersShared {
     250              :     // aggregated over all walsenders value
     251              :     agg_standby_feedback: StandbyFeedback,
     252              :     // last feedback ever received from any pageserver, empty if none
     253              :     last_ps_feedback: PageserverFeedback,
     254              :     // total counter of pageserver feedbacks received
     255              :     ps_feedback_counter: u64,
     256              :     slots: Vec<Option<WalSenderState>>,
     257              : }
     258              : 
     259              : impl WalSendersShared {
     260            2 :     fn new() -> Self {
     261            2 :         WalSendersShared {
     262            2 :             agg_standby_feedback: StandbyFeedback::empty(),
     263            2 :             last_ps_feedback: PageserverFeedback::empty(),
     264            2 :             ps_feedback_counter: 0,
     265            2 :             slots: Vec::new(),
     266            2 :         }
     267            2 :     }
     268              : 
     269              :     /// Get content of provided id slot, it must exist.
     270            0 :     fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
     271            0 :         self.slots[id].as_ref().expect("walsender doesn't exist")
     272            0 :     }
     273              : 
     274              :     /// Get mut content of provided id slot, it must exist.
     275            0 :     fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
     276            0 :         self.slots[id].as_mut().expect("walsender doesn't exist")
     277            0 :     }
     278              : 
     279              :     /// Update aggregated hot standy and normal reply feedbacks. We just take min of valid xmins
     280              :     /// and ts.
     281            2 :     fn update_reply_feedback(&mut self) {
     282            2 :         let mut agg = HotStandbyFeedback::empty();
     283            2 :         let mut reply_agg = StandbyReply::empty();
     284            4 :         for ws_state in self.slots.iter().flatten() {
     285            4 :             if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
     286            4 :                 let hs_feedback = standby_feedback.hs_feedback;
     287            4 :                 // doing Option math like op1.iter().chain(op2.iter()).min()
     288            4 :                 // would be nicer, but we serialize/deserialize this struct
     289            4 :                 // directly, so leave as is for now
     290            4 :                 if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
     291            2 :                     if agg.xmin != INVALID_FULL_TRANSACTION_ID {
     292            1 :                         agg.xmin = min(agg.xmin, hs_feedback.xmin);
     293            1 :                     } else {
     294            1 :                         agg.xmin = hs_feedback.xmin;
     295            1 :                     }
     296            2 :                     agg.ts = max(agg.ts, hs_feedback.ts);
     297            2 :                 }
     298            4 :                 if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     299            0 :                     if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     300            0 :                         agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
     301            0 :                     } else {
     302            0 :                         agg.catalog_xmin = hs_feedback.catalog_xmin;
     303            0 :                     }
     304            0 :                     agg.ts = max(agg.ts, hs_feedback.ts);
     305            4 :                 }
     306            4 :                 let reply = standby_feedback.reply;
     307            4 :                 if reply.write_lsn != Lsn::INVALID {
     308            0 :                     if reply_agg.write_lsn != Lsn::INVALID {
     309            0 :                         reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn);
     310            0 :                     } else {
     311            0 :                         reply_agg.write_lsn = reply.write_lsn;
     312            0 :                     }
     313            4 :                 }
     314            4 :                 if reply.flush_lsn != Lsn::INVALID {
     315            0 :                     if reply_agg.flush_lsn != Lsn::INVALID {
     316            0 :                         reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn);
     317            0 :                     } else {
     318            0 :                         reply_agg.flush_lsn = reply.flush_lsn;
     319            0 :                     }
     320            4 :                 }
     321            4 :                 if reply.apply_lsn != Lsn::INVALID {
     322            0 :                     if reply_agg.apply_lsn != Lsn::INVALID {
     323            0 :                         reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn);
     324            0 :                     } else {
     325            0 :                         reply_agg.apply_lsn = reply.apply_lsn;
     326            0 :                     }
     327            4 :                 }
     328            4 :                 if reply.reply_ts != 0 {
     329            0 :                     if reply_agg.reply_ts != 0 {
     330            0 :                         reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts);
     331            0 :                     } else {
     332            0 :                         reply_agg.reply_ts = reply.reply_ts;
     333            0 :                     }
     334            4 :                 }
     335            0 :             }
     336              :         }
     337            2 :         self.agg_standby_feedback = StandbyFeedback {
     338            2 :             reply: reply_agg,
     339            2 :             hs_feedback: agg,
     340            2 :         };
     341            2 :     }
     342              : }
     343              : 
     344              : // Serialized is used only for pretty printing in json.
     345            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     346              : pub struct WalSenderState {
     347              :     ttid: TenantTimelineId,
     348              :     addr: SocketAddr,
     349              :     conn_id: ConnectionId,
     350              :     // postgres application_name
     351              :     appname: Option<String>,
     352              :     feedback: ReplicationFeedback,
     353              : }
     354              : 
     355              : // Receiver is either pageserver or regular standby, which have different
     356              : // feedbacks.
     357            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     358              : enum ReplicationFeedback {
     359              :     Pageserver(PageserverFeedback),
     360              :     Standby(StandbyFeedback),
     361              : }
     362              : 
     363              : // id of the occupied slot in WalSenders to access it (and save in the
     364              : // WalSenderGuard). We could give Arc directly to the slot, but there is not
     365              : // much sense in that as values aggregation which is performed on each feedback
     366              : // receival iterates over all walsenders.
     367              : pub type WalSenderId = usize;
     368              : 
     369              : /// Scope guard to access slot in WalSenders registry and unregister from it in
     370              : /// Drop.
     371              : pub struct WalSenderGuard {
     372              :     id: WalSenderId,
     373              :     walsenders: Arc<WalSenders>,
     374              : }
     375              : 
     376              : impl WalSenderGuard {
     377            0 :     pub fn id(&self) -> WalSenderId {
     378            0 :         self.id
     379            0 :     }
     380              : 
     381            0 :     pub fn walsenders(&self) -> &Arc<WalSenders> {
     382            0 :         &self.walsenders
     383            0 :     }
     384              : }
     385              : 
     386              : impl Drop for WalSenderGuard {
     387            0 :     fn drop(&mut self) {
     388            0 :         self.walsenders.unregister(self.id);
     389            0 :     }
     390              : }
     391              : 
     392              : impl SafekeeperPostgresHandler {
     393              :     /// Wrapper around handle_start_replication_guts handling result. Error is
     394              :     /// handled here while we're still in walsender ttid span; with API
     395              :     /// extension, this can probably be moved into postgres_backend.
     396            0 :     pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
     397            0 :         &mut self,
     398            0 :         pgb: &mut PostgresBackend<IO>,
     399            0 :         start_pos: Lsn,
     400            0 :         term: Option<Term>,
     401            0 :     ) -> Result<(), QueryError> {
     402            0 :         let tli = self
     403            0 :             .global_timelines
     404            0 :             .get(self.ttid)
     405            0 :             .map_err(|e| QueryError::Other(e.into()))?;
     406            0 :         let residence_guard = tli.wal_residence_guard().await?;
     407              : 
     408            0 :         if let Err(end) = self
     409            0 :             .handle_start_replication_guts(pgb, start_pos, term, residence_guard)
     410            0 :             .await
     411              :         {
     412            0 :             let info = tli.get_safekeeper_info(&self.conf).await;
     413              :             // Log the result and probably send it to the client, closing the stream.
     414            0 :             pgb.handle_copy_stream_end(end)
     415            0 :             .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.flush_lsn)))
     416            0 :             .await;
     417            0 :         }
     418            0 :         Ok(())
     419            0 :     }
     420              : 
     421            0 :     pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
     422            0 :         &mut self,
     423            0 :         pgb: &mut PostgresBackend<IO>,
     424            0 :         start_pos: Lsn,
     425            0 :         term: Option<Term>,
     426            0 :         tli: WalResidentTimeline,
     427            0 :     ) -> Result<(), CopyStreamHandlerEnd> {
     428            0 :         let appname = self.appname.clone();
     429            0 : 
     430            0 :         // Use a guard object to remove our entry from the timeline when we are done.
     431            0 :         let ws_guard = Arc::new(tli.get_walsenders().register(
     432            0 :             self.ttid,
     433            0 :             *pgb.get_peer_addr(),
     434            0 :             self.conn_id,
     435            0 :             self.appname.clone(),
     436            0 :         ));
     437              : 
     438              :         // Walsender can operate in one of two modes which we select by
     439              :         // application_name: give only committed WAL (used by pageserver) or all
     440              :         // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
     441              :         // The second case is always driven by a consensus leader which term
     442              :         // must be supplied.
     443            0 :         let end_watch = if term.is_some() {
     444            0 :             EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
     445              :         } else {
     446            0 :             EndWatch::Commit(tli.get_commit_lsn_watch_rx())
     447              :         };
     448              :         // we don't check term here; it will be checked on first waiting/WAL reading anyway.
     449            0 :         let end_pos = end_watch.get();
     450            0 : 
     451            0 :         if end_pos < start_pos {
     452            0 :             warn!(
     453            0 :                 "requested start_pos {} is ahead of available WAL end_pos {}",
     454              :                 start_pos, end_pos
     455              :             );
     456            0 :         }
     457              : 
     458            0 :         info!(
     459            0 :             "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={:?}",
     460              :             start_pos,
     461              :             end_pos,
     462            0 :             matches!(end_watch, EndWatch::Flush(_)),
     463              :             appname,
     464            0 :             self.protocol(),
     465              :         );
     466              : 
     467              :         // switch to copy
     468            0 :         pgb.write_message(&BeMessage::CopyBothResponse).await?;
     469              : 
     470            0 :         let wal_reader = tli.get_walreader(start_pos).await?;
     471              : 
     472              :         // Split to concurrently receive and send data; replies are generally
     473              :         // not synchronized with sends, so this avoids deadlocks.
     474            0 :         let reader = pgb.split().context("START_REPLICATION split")?;
     475              : 
     476            0 :         let send_fut = match self.protocol() {
     477              :             PostgresClientProtocol::Vanilla => {
     478            0 :                 let sender = WalSender {
     479            0 :                     pgb,
     480            0 :                     // should succeed since we're already holding another guard
     481            0 :                     tli: tli.wal_residence_guard().await?,
     482            0 :                     appname,
     483            0 :                     start_pos,
     484            0 :                     end_pos,
     485            0 :                     term,
     486            0 :                     end_watch,
     487            0 :                     ws_guard: ws_guard.clone(),
     488            0 :                     wal_reader,
     489            0 :                     send_buf: vec![0u8; MAX_SEND_SIZE],
     490            0 :                 };
     491            0 : 
     492            0 :                 Either::Left(sender.run())
     493              :             }
     494              :             PostgresClientProtocol::Interpreted {
     495            0 :                 format,
     496            0 :                 compression,
     497              :             } => {
     498            0 :                 let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000;
     499            0 :                 let end_watch_view = end_watch.view();
     500            0 :                 let wal_stream_builder = WalReaderStreamBuilder {
     501            0 :                     tli: tli.wal_residence_guard().await?,
     502            0 :                     start_pos,
     503            0 :                     end_pos,
     504            0 :                     term,
     505            0 :                     end_watch,
     506            0 :                     wal_sender_guard: ws_guard.clone(),
     507            0 :                 };
     508            0 : 
     509            0 :                 let sender = InterpretedWalSender {
     510            0 :                     format,
     511            0 :                     compression,
     512            0 :                     pgb,
     513            0 :                     wal_stream_builder,
     514            0 :                     end_watch_view,
     515            0 :                     shard: self.shard.unwrap(),
     516            0 :                     pg_version,
     517            0 :                     appname,
     518            0 :                 };
     519            0 : 
     520            0 :                 Either::Right(sender.run())
     521              :             }
     522              :         };
     523              : 
     524            0 :         let tli_cancel = tli.cancel.clone();
     525            0 : 
     526            0 :         let mut reply_reader = ReplyReader {
     527            0 :             reader,
     528            0 :             ws_guard: ws_guard.clone(),
     529            0 :             tli,
     530            0 :         };
     531              : 
     532            0 :         let res = tokio::select! {
     533              :             // todo: add read|write .context to these errors
     534            0 :             r = send_fut => r,
     535            0 :             r = reply_reader.run() => r,
     536            0 :             _ = tli_cancel.cancelled() => {
     537            0 :                 return Err(CopyStreamHandlerEnd::Cancelled);
     538              :             }
     539              :         };
     540              : 
     541            0 :         let ws_state = ws_guard
     542            0 :             .walsenders
     543            0 :             .mutex
     544            0 :             .lock()
     545            0 :             .get_slot(ws_guard.id)
     546            0 :             .clone();
     547            0 :         info!(
     548            0 :             "finished streaming to {}, feedback={:?}",
     549              :             ws_state.addr, ws_state.feedback,
     550              :         );
     551              : 
     552              :         // Join pg backend back.
     553            0 :         pgb.unsplit(reply_reader.reader)?;
     554              : 
     555            0 :         res
     556            0 :     }
     557              : }
     558              : 
     559              : /// TODO(vlad): maybe lift this instead
     560              : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
     561              : /// given term (recovery by walproposer or peer safekeeper).
     562              : #[derive(Clone)]
     563              : pub(crate) enum EndWatch {
     564              :     Commit(Receiver<Lsn>),
     565              :     Flush(Receiver<TermLsn>),
     566              : }
     567              : 
     568              : impl EndWatch {
     569            0 :     pub(crate) fn view(&self) -> EndWatchView {
     570            0 :         EndWatchView(self.clone())
     571            0 :     }
     572              : 
     573              :     /// Get current end of WAL.
     574            0 :     pub(crate) fn get(&self) -> Lsn {
     575            0 :         match self {
     576            0 :             EndWatch::Commit(r) => *r.borrow(),
     577            0 :             EndWatch::Flush(r) => r.borrow().lsn,
     578              :         }
     579            0 :     }
     580              : 
     581              :     /// Wait for the update.
     582            0 :     pub(crate) async fn changed(&mut self) -> anyhow::Result<()> {
     583            0 :         match self {
     584            0 :             EndWatch::Commit(r) => r.changed().await?,
     585            0 :             EndWatch::Flush(r) => r.changed().await?,
     586              :         }
     587            0 :         Ok(())
     588            0 :     }
     589              : 
     590            0 :     pub(crate) async fn wait_for_lsn(
     591            0 :         &mut self,
     592            0 :         lsn: Lsn,
     593            0 :         client_term: Option<Term>,
     594            0 :     ) -> anyhow::Result<Lsn> {
     595              :         loop {
     596            0 :             let end_pos = self.get();
     597            0 :             if end_pos > lsn {
     598            0 :                 return Ok(end_pos);
     599            0 :             }
     600            0 :             if let EndWatch::Flush(rx) = &self {
     601            0 :                 let curr_term = rx.borrow().term;
     602            0 :                 if let Some(client_term) = client_term {
     603            0 :                     if curr_term != client_term {
     604            0 :                         bail!("term changed: requested {}, now {}", client_term, curr_term);
     605            0 :                     }
     606            0 :                 }
     607            0 :             }
     608            0 :             self.changed().await?;
     609              :         }
     610            0 :     }
     611              : }
     612              : 
     613              : pub(crate) struct EndWatchView(EndWatch);
     614              : 
     615              : impl EndWatchView {
     616            0 :     pub(crate) fn get(&self) -> Lsn {
     617            0 :         self.0.get()
     618            0 :     }
     619              : }
     620              : /// A half driving sending WAL.
     621              : struct WalSender<'a, IO> {
     622              :     pgb: &'a mut PostgresBackend<IO>,
     623              :     tli: WalResidentTimeline,
     624              :     appname: Option<String>,
     625              :     // Position since which we are sending next chunk.
     626              :     start_pos: Lsn,
     627              :     // WAL up to this position is known to be locally available.
     628              :     // Usually this is the same as the latest commit_lsn, but in case of
     629              :     // walproposer recovery, this is flush_lsn.
     630              :     //
     631              :     // We send this LSN to the receiver as wal_end, so that it knows how much
     632              :     // WAL this safekeeper has. This LSN should be as fresh as possible.
     633              :     end_pos: Lsn,
     634              :     /// When streaming uncommitted part, the term the client acts as the leader
     635              :     /// in. Streaming is stopped if local term changes to a different (higher)
     636              :     /// value.
     637              :     term: Option<Term>,
     638              :     /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
     639              :     end_watch: EndWatch,
     640              :     ws_guard: Arc<WalSenderGuard>,
     641              :     wal_reader: WalReader,
     642              :     // buffer for readling WAL into to send it
     643              :     send_buf: Vec<u8>,
     644              : }
     645              : 
     646              : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
     647              : 
     648              : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
     649              :     /// Send WAL until
     650              :     /// - an error occurs
     651              :     /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
     652              :     /// - timeline's cancellation token fires
     653              :     ///
     654              :     /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
     655              :     /// convenience.
     656            0 :     async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
     657              :         loop {
     658              :             // Wait for the next portion if it is not there yet, or just
     659              :             // update our end of WAL available for sending value, we
     660              :             // communicate it to the receiver.
     661            0 :             self.wait_wal().await?;
     662            0 :             assert!(
     663            0 :                 self.end_pos > self.start_pos,
     664            0 :                 "nothing to send after waiting for WAL"
     665              :             );
     666              : 
     667              :             // try to send as much as available, capped by MAX_SEND_SIZE
     668            0 :             let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
     669            0 :             // if we went behind available WAL, back off
     670            0 :             if chunk_end_pos >= self.end_pos {
     671            0 :                 chunk_end_pos = self.end_pos;
     672            0 :             } else {
     673            0 :                 // If sending not up to end pos, round down to page boundary to
     674            0 :                 // avoid breaking WAL record not at page boundary, as protocol
     675            0 :                 // demands. See walsender.c (XLogSendPhysical).
     676            0 :                 chunk_end_pos = chunk_end_pos
     677            0 :                     .checked_sub(chunk_end_pos.block_offset())
     678            0 :                     .unwrap();
     679            0 :             }
     680            0 :             let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
     681            0 :             let send_buf = &mut self.send_buf[..send_size];
     682              :             let send_size: usize;
     683              :             {
     684              :                 // If uncommitted part is being pulled, check that the term is
     685              :                 // still the expected one.
     686            0 :                 let _term_guard = if let Some(t) = self.term {
     687            0 :                     Some(self.tli.acquire_term(t).await?)
     688              :                 } else {
     689            0 :                     None
     690              :                 };
     691              :                 // Read WAL into buffer. send_size can be additionally capped to
     692              :                 // segment boundary here.
     693            0 :                 send_size = self.wal_reader.read(send_buf).await?
     694              :             };
     695            0 :             let send_buf = &send_buf[..send_size];
     696            0 : 
     697            0 :             // and send it, while respecting Timeline::cancel
     698            0 :             let msg = BeMessage::XLogData(XLogDataBody {
     699            0 :                 wal_start: self.start_pos.0,
     700            0 :                 wal_end: self.end_pos.0,
     701            0 :                 timestamp: get_current_timestamp(),
     702            0 :                 data: send_buf,
     703            0 :             });
     704            0 :             self.pgb.write_message(&msg).await?;
     705              : 
     706            0 :             if let Some(appname) = &self.appname {
     707            0 :                 if appname == "replica" {
     708            0 :                     failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
     709            0 :                 }
     710            0 :             }
     711            0 :             trace!(
     712            0 :                 "sent {} bytes of WAL {}-{}",
     713            0 :                 send_size,
     714            0 :                 self.start_pos,
     715            0 :                 self.start_pos + send_size as u64
     716              :             );
     717            0 :             self.start_pos += send_size as u64;
     718              :         }
     719            0 :     }
     720              : 
     721              :     /// wait until we have WAL to stream, sending keepalives and checking for
     722              :     /// exit in the meanwhile
     723            0 :     async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     724              :         loop {
     725            0 :             self.end_pos = self.end_watch.get();
     726            0 :             let have_something_to_send = (|| {
     727            0 :                 fail::fail_point!(
     728            0 :                     "sk-pause-send",
     729            0 :                     self.appname.as_deref() != Some("pageserver"),
     730            0 :                     |_| { false }
     731            0 :                 );
     732            0 :                 self.end_pos > self.start_pos
     733            0 :             })();
     734            0 : 
     735            0 :             if have_something_to_send {
     736            0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     737            0 :                 return Ok(());
     738            0 :             }
     739              : 
     740              :             // Wait for WAL to appear, now self.end_pos == self.start_pos.
     741            0 :             if let Some(lsn) = self.wait_for_lsn().await? {
     742            0 :                 self.end_pos = lsn;
     743            0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     744            0 :                 return Ok(());
     745            0 :             }
     746            0 : 
     747            0 :             // Timed out waiting for WAL, check for termination and send KA.
     748            0 :             // Check for termination only if we are streaming up to commit_lsn
     749            0 :             // (to pageserver).
     750            0 :             if let EndWatch::Commit(_) = self.end_watch {
     751            0 :                 if let Some(remote_consistent_lsn) = self
     752            0 :                     .ws_guard
     753            0 :                     .walsenders
     754            0 :                     .get_ws_remote_consistent_lsn(self.ws_guard.id)
     755              :                 {
     756            0 :                     if self.tli.should_walsender_stop(remote_consistent_lsn).await {
     757              :                         // Terminate if there is nothing more to send.
     758              :                         // Note that "ending streaming" part of the string is used by
     759              :                         // pageserver to identify WalReceiverError::SuccessfulCompletion,
     760              :                         // do not change this string without updating pageserver.
     761            0 :                         return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
     762            0 :                         "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
     763            0 :                         self.appname, self.start_pos,
     764            0 :                     )));
     765            0 :                     }
     766            0 :                 }
     767            0 :             }
     768              : 
     769            0 :             let msg = BeMessage::KeepAlive(WalSndKeepAlive {
     770            0 :                 wal_end: self.end_pos.0,
     771            0 :                 timestamp: get_current_timestamp(),
     772            0 :                 request_reply: true,
     773            0 :             });
     774            0 : 
     775            0 :             self.pgb.write_message(&msg).await?;
     776              :         }
     777            0 :     }
     778              : 
     779              :     /// Wait until we have available WAL > start_pos or timeout expires. Returns
     780              :     /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
     781              :     /// - Ok(None) if timeout expired;
     782              :     /// - Err in case of error -- only if 1) term changed while fetching in recovery
     783              :     ///   mode 2) watch channel closed, which must never happen.
     784            0 :     async fn wait_for_lsn(&mut self) -> anyhow::Result<Option<Lsn>> {
     785            0 :         let fp = (|| {
     786            0 :             fail::fail_point!(
     787            0 :                 "sk-pause-send",
     788            0 :                 self.appname.as_deref() != Some("pageserver"),
     789            0 :                 |_| { true }
     790            0 :             );
     791            0 :             false
     792            0 :         })();
     793            0 :         if fp {
     794            0 :             tokio::time::sleep(POLL_STATE_TIMEOUT).await;
     795            0 :             return Ok(None);
     796            0 :         }
     797              : 
     798            0 :         let res = timeout(POLL_STATE_TIMEOUT, async move {
     799              :             loop {
     800            0 :                 let end_pos = self.end_watch.get();
     801            0 :                 if end_pos > self.start_pos {
     802            0 :                     return Ok(end_pos);
     803            0 :                 }
     804            0 :                 if let EndWatch::Flush(rx) = &self.end_watch {
     805            0 :                     let curr_term = rx.borrow().term;
     806            0 :                     if let Some(client_term) = self.term {
     807            0 :                         if curr_term != client_term {
     808            0 :                             bail!("term changed: requested {}, now {}", client_term, curr_term);
     809            0 :                         }
     810            0 :                     }
     811            0 :                 }
     812            0 :                 self.end_watch.changed().await?;
     813              :             }
     814            0 :         })
     815            0 :         .await;
     816              : 
     817            0 :         match res {
     818              :             // success
     819            0 :             Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
     820              :             // error inside closure
     821            0 :             Ok(Err(err)) => Err(err),
     822              :             // timeout
     823            0 :             Err(_) => Ok(None),
     824              :         }
     825            0 :     }
     826              : }
     827              : 
     828              : /// A half driving receiving replies.
     829              : struct ReplyReader<IO> {
     830              :     reader: PostgresBackendReader<IO>,
     831              :     ws_guard: Arc<WalSenderGuard>,
     832              :     tli: WalResidentTimeline,
     833              : }
     834              : 
     835              : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
     836            0 :     async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     837              :         loop {
     838            0 :             let msg = self.reader.read_copy_message().await?;
     839            0 :             self.handle_feedback(&msg).await?
     840              :         }
     841            0 :     }
     842              : 
     843            0 :     async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
     844            0 :         match msg.first().cloned() {
     845              :             Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
     846              :                 // Note: deserializing is on m[1..] because we skip the tag byte.
     847            0 :                 let mut hs_feedback = HotStandbyFeedback::des(&msg[1..])
     848            0 :                     .context("failed to deserialize HotStandbyFeedback")?;
     849              :                 // TODO: xmin/catalog_xmin are serialized by walreceiver.c in this way:
     850              :                 // pq_sendint32(&reply_message, xmin);
     851              :                 // pq_sendint32(&reply_message, xmin_epoch);
     852              :                 // So it is two big endian 32-bit words in low endian order!
     853            0 :                 hs_feedback.xmin = hs_feedback.xmin.rotate_left(32);
     854            0 :                 hs_feedback.catalog_xmin = hs_feedback.catalog_xmin.rotate_left(32);
     855            0 :                 self.ws_guard
     856            0 :                     .walsenders
     857            0 :                     .record_hs_feedback(self.ws_guard.id, &hs_feedback);
     858              :             }
     859              :             Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
     860            0 :                 let reply =
     861            0 :                     StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
     862            0 :                 self.ws_guard
     863            0 :                     .walsenders
     864            0 :                     .record_standby_reply(self.ws_guard.id, &reply);
     865              :             }
     866              :             Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
     867              :                 // pageserver sends this.
     868              :                 // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
     869            0 :                 let buf = Bytes::copy_from_slice(&msg[9..]);
     870            0 :                 let ps_feedback = PageserverFeedback::parse(buf);
     871            0 : 
     872            0 :                 trace!("PageserverFeedback is {:?}", ps_feedback);
     873            0 :                 self.ws_guard
     874            0 :                     .walsenders
     875            0 :                     .record_ps_feedback(self.ws_guard.id, &ps_feedback);
     876            0 :                 self.tli
     877            0 :                     .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
     878            0 :                     .await;
     879              :                 // in principle new remote_consistent_lsn could allow to
     880              :                 // deactivate the timeline, but we check that regularly through
     881              :                 // broker updated, not need to do it here
     882              :             }
     883            0 :             _ => warn!("unexpected message {:?}", msg),
     884              :         }
     885            0 :         Ok(())
     886            0 :     }
     887              : }
     888              : 
     889              : #[cfg(test)]
     890              : mod tests {
     891              :     use utils::id::{TenantId, TimelineId};
     892              : 
     893              :     use super::*;
     894              : 
     895            4 :     fn mock_ttid() -> TenantTimelineId {
     896            4 :         TenantTimelineId {
     897            4 :             tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
     898            4 :             timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
     899            4 :         }
     900            4 :     }
     901              : 
     902            4 :     fn mock_addr() -> SocketAddr {
     903            4 :         "127.0.0.1:8080".parse().unwrap()
     904            4 :     }
     905              : 
     906              :     // add to wss specified feedback setting other fields to dummy values
     907            4 :     fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
     908            4 :         let walsender_state = WalSenderState {
     909            4 :             ttid: mock_ttid(),
     910            4 :             addr: mock_addr(),
     911            4 :             conn_id: 1,
     912            4 :             appname: None,
     913            4 :             feedback,
     914            4 :         };
     915            4 :         wss.slots.push(Some(walsender_state))
     916            4 :     }
     917              : 
     918              :     // form standby feedback with given hot standby feedback ts/xmin and the
     919              :     // rest set to dummy values.
     920            4 :     fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
     921            4 :         ReplicationFeedback::Standby(StandbyFeedback {
     922            4 :             reply: StandbyReply::empty(),
     923            4 :             hs_feedback: HotStandbyFeedback {
     924            4 :                 ts,
     925            4 :                 xmin,
     926            4 :                 catalog_xmin: 0,
     927            4 :             },
     928            4 :         })
     929            4 :     }
     930              : 
     931              :     // test that hs aggregation works as expected
     932              :     #[test]
     933            1 :     fn test_hs_feedback_no_valid() {
     934            1 :         let mut wss = WalSendersShared::new();
     935            1 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
     936            1 :         wss.update_reply_feedback();
     937            1 :         assert_eq!(
     938            1 :             wss.agg_standby_feedback.hs_feedback.xmin,
     939            1 :             INVALID_FULL_TRANSACTION_ID
     940            1 :         );
     941            1 :     }
     942              : 
     943              :     #[test]
     944            1 :     fn test_hs_feedback() {
     945            1 :         let mut wss = WalSendersShared::new();
     946            1 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
     947            1 :         push_feedback(&mut wss, hs_feedback(1, 42));
     948            1 :         push_feedback(&mut wss, hs_feedback(1, 64));
     949            1 :         wss.update_reply_feedback();
     950            1 :         assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42);
     951            1 :     }
     952              : }
        

Generated by: LCOV version 2.1-beta