LCOV - code coverage report
Current view: top level - safekeeper/src - send_wal.rs (source / functions) Coverage Total Hit
Test: 1d5975439f3c9882b18414799141ebf9a3922c58.info Lines: 17.7 % 648 115
Test Date: 2025-07-31 15:59:03 Functions: 25.4 % 63 16

            Line data    Source code
       1              : //! This module implements the streaming side of replication protocol, starting
       2              : //! with the "START_REPLICATION" message, and registry of walsenders.
       3              : 
       4              : use std::cmp::{max, min};
       5              : use std::net::SocketAddr;
       6              : use std::sync::Arc;
       7              : use std::time::Duration;
       8              : 
       9              : use anyhow::{Context as AnyhowContext, bail};
      10              : use bytes::Bytes;
      11              : use futures::FutureExt;
      12              : use itertools::Itertools;
      13              : use parking_lot::Mutex;
      14              : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend, PostgresBackendReader, QueryError};
      15              : use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion, get_current_timestamp};
      16              : use postgres_ffi_types::TimestampTz;
      17              : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
      18              : use safekeeper_api::Term;
      19              : use safekeeper_api::models::{
      20              :     HotStandbyFeedback, INVALID_FULL_TRANSACTION_ID, ReplicationFeedback, StandbyFeedback,
      21              :     StandbyReply,
      22              : };
      23              : use tokio::io::{AsyncRead, AsyncWrite};
      24              : use tokio::sync::watch::Receiver;
      25              : use tokio::time::timeout;
      26              : use tracing::*;
      27              : use utils::bin_ser::BeSer;
      28              : use utils::failpoint_support;
      29              : use utils::lsn::Lsn;
      30              : use utils::pageserver_feedback::PageserverFeedback;
      31              : use utils::postgres_client::PostgresClientProtocol;
      32              : 
      33              : use crate::handler::SafekeeperPostgresHandler;
      34              : use crate::metrics::{RECEIVED_PS_FEEDBACKS, WAL_READERS};
      35              : use crate::receive_wal::WalReceivers;
      36              : use crate::safekeeper::TermLsn;
      37              : use crate::send_interpreted_wal::{
      38              :     Batch, InterpretedWalReader, InterpretedWalReaderHandle, InterpretedWalSender,
      39              : };
      40              : use crate::timeline::WalResidentTimeline;
      41              : use crate::wal_reader_stream::StreamingWalReader;
      42              : use crate::wal_storage::WalReader;
      43              : 
      44              : // See: https://www.postgresql.org/docs/13/protocol-replication.html
      45              : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
      46              : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
      47              : // neon extension of replication protocol
      48              : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
      49              : 
      50              : /// WalSenders registry. Timeline holds it (wrapped in Arc).
      51              : pub struct WalSenders {
      52              :     mutex: Mutex<WalSendersShared>,
      53              :     walreceivers: Arc<WalReceivers>,
      54              : }
      55              : 
      56              : pub struct WalSendersTimelineMetricValues {
      57              :     pub ps_feedback_counter: u64,
      58              :     pub ps_corruption_detected: bool,
      59              :     pub last_ps_feedback: PageserverFeedback,
      60              :     pub interpreted_wal_reader_tasks: usize,
      61              : }
      62              : 
      63              : impl WalSenders {
      64            5 :     pub fn new(walreceivers: Arc<WalReceivers>) -> Arc<WalSenders> {
      65            5 :         Arc::new(WalSenders {
      66            5 :             mutex: Mutex::new(WalSendersShared::new()),
      67            5 :             walreceivers,
      68            5 :         })
      69            5 :     }
      70              : 
      71              :     /// Register new walsender. Returned guard provides access to the slot and
      72              :     /// automatically deregisters in Drop.
      73            0 :     fn register(self: &Arc<WalSenders>, walsender_state: WalSenderState) -> WalSenderGuard {
      74            0 :         let slots = &mut self.mutex.lock().slots;
      75              :         // find empty slot or create new one
      76            0 :         let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
      77            0 :             slots[pos] = Some(walsender_state);
      78            0 :             pos
      79              :         } else {
      80            0 :             let pos = slots.len();
      81            0 :             slots.push(Some(walsender_state));
      82            0 :             pos
      83              :         };
      84            0 :         WalSenderGuard {
      85            0 :             id: pos,
      86            0 :             walsenders: self.clone(),
      87            0 :         }
      88            0 :     }
      89              : 
      90            0 :     fn create_or_update_interpreted_reader<
      91            0 :         FUp: FnOnce(&Arc<InterpretedWalReaderHandle>) -> anyhow::Result<()>,
      92            0 :         FNew: FnOnce() -> InterpretedWalReaderHandle,
      93            0 :     >(
      94            0 :         self: &Arc<WalSenders>,
      95            0 :         id: WalSenderId,
      96            0 :         start_pos: Lsn,
      97            0 :         max_delta_for_fanout: Option<u64>,
      98            0 :         update: FUp,
      99            0 :         create: FNew,
     100            0 :     ) -> anyhow::Result<()> {
     101            0 :         let state = &mut self.mutex.lock();
     102              : 
     103            0 :         let mut selected_interpreted_reader = None;
     104            0 :         for slot in state.slots.iter().flatten() {
     105            0 :             if let WalSenderState::Interpreted(slot_state) = slot {
     106            0 :                 if let Some(ref interpreted_reader) = slot_state.interpreted_wal_reader {
     107            0 :                     let select = match (interpreted_reader.current_position(), max_delta_for_fanout)
     108              :                     {
     109            0 :                         (Some(pos), Some(max_delta)) => {
     110            0 :                             let delta = pos.0.abs_diff(start_pos.0);
     111            0 :                             delta <= max_delta
     112              :                         }
     113              :                         // Reader is not active
     114            0 :                         (None, _) => false,
     115              :                         // Gating fanout by max delta is disabled.
     116              :                         // Attach to any active reader.
     117            0 :                         (_, None) => true,
     118              :                     };
     119              : 
     120            0 :                     if select {
     121            0 :                         selected_interpreted_reader = Some(interpreted_reader.clone());
     122            0 :                         break;
     123            0 :                     }
     124            0 :                 }
     125            0 :             }
     126              :         }
     127              : 
     128            0 :         let slot = state.get_slot_mut(id);
     129            0 :         let slot_state = match slot {
     130            0 :             WalSenderState::Interpreted(s) => s,
     131            0 :             WalSenderState::Vanilla(_) => unreachable!(),
     132              :         };
     133              : 
     134            0 :         let selected_or_new = match selected_interpreted_reader {
     135            0 :             Some(selected) => {
     136            0 :                 update(&selected)?;
     137            0 :                 selected
     138              :             }
     139            0 :             None => Arc::new(create()),
     140              :         };
     141              : 
     142            0 :         slot_state.interpreted_wal_reader = Some(selected_or_new);
     143              : 
     144            0 :         Ok(())
     145            0 :     }
     146              : 
     147              :     /// Get state of all walsenders.
     148            0 :     pub fn get_all_public(self: &Arc<WalSenders>) -> Vec<safekeeper_api::models::WalSenderState> {
     149            0 :         self.mutex
     150            0 :             .lock()
     151            0 :             .slots
     152            0 :             .iter()
     153            0 :             .flatten()
     154            0 :             .map(|state| match state {
     155            0 :                 WalSenderState::Vanilla(s) => {
     156            0 :                     safekeeper_api::models::WalSenderState::Vanilla(s.clone())
     157              :                 }
     158            0 :                 WalSenderState::Interpreted(s) => {
     159            0 :                     safekeeper_api::models::WalSenderState::Interpreted(s.public_state.clone())
     160              :                 }
     161            0 :             })
     162            0 :             .collect()
     163            0 :     }
     164              : 
     165              :     /// Get LSN of the most lagging pageserver receiver. Return None if there are no
     166              :     /// active walsenders.
     167            0 :     pub fn laggard_lsn(self: &Arc<WalSenders>) -> Option<Lsn> {
     168            0 :         self.mutex
     169            0 :             .lock()
     170            0 :             .slots
     171            0 :             .iter()
     172            0 :             .flatten()
     173            0 :             .filter_map(|s| match s.get_feedback() {
     174            0 :                 ReplicationFeedback::Pageserver(feedback) => Some(feedback.last_received_lsn),
     175            0 :                 ReplicationFeedback::Standby(_) => None,
     176            0 :             })
     177            0 :             .min()
     178            0 :     }
     179              : 
     180              :     /// Returns total counter of pageserver feedbacks received and last feedback.
     181            0 :     pub fn info_for_metrics(self: &Arc<WalSenders>) -> WalSendersTimelineMetricValues {
     182            0 :         let shared = self.mutex.lock();
     183              : 
     184            0 :         let interpreted_wal_reader_tasks = shared
     185            0 :             .slots
     186            0 :             .iter()
     187            0 :             .filter_map(|ss| match ss {
     188            0 :                 Some(WalSenderState::Interpreted(int)) => int.interpreted_wal_reader.as_ref(),
     189            0 :                 Some(WalSenderState::Vanilla(_)) => None,
     190            0 :                 None => None,
     191            0 :             })
     192            0 :             .unique_by(|reader| Arc::as_ptr(reader))
     193            0 :             .count();
     194              : 
     195            0 :         WalSendersTimelineMetricValues {
     196            0 :             ps_feedback_counter: shared.ps_feedback_counter,
     197            0 :             ps_corruption_detected: shared.ps_corruption_detected,
     198            0 :             last_ps_feedback: shared.last_ps_feedback,
     199            0 :             interpreted_wal_reader_tasks,
     200            0 :         }
     201            0 :     }
     202              : 
     203              :     /// Get aggregated hot standby feedback (we send it to compute).
     204          620 :     pub fn get_hotstandby(self: &Arc<WalSenders>) -> StandbyFeedback {
     205          620 :         self.mutex.lock().agg_standby_feedback
     206          620 :     }
     207              : 
     208              :     /// Record new pageserver feedback, update aggregated values.
     209            0 :     fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
     210            0 :         let mut shared = self.mutex.lock();
     211            0 :         *shared.get_slot_mut(id).get_mut_feedback() = ReplicationFeedback::Pageserver(*feedback);
     212            0 :         shared.last_ps_feedback = *feedback;
     213            0 :         shared.ps_feedback_counter += 1;
     214            0 :         if feedback.corruption_detected {
     215            0 :             shared.ps_corruption_detected = true;
     216            0 :         }
     217            0 :         drop(shared);
     218              : 
     219            0 :         RECEIVED_PS_FEEDBACKS.inc();
     220              : 
     221              :         // send feedback to connected walproposers
     222            0 :         self.walreceivers.broadcast_pageserver_feedback(*feedback);
     223            0 :     }
     224              : 
     225              :     /// Record standby reply.
     226            0 :     fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
     227            0 :         let mut shared = self.mutex.lock();
     228            0 :         let slot = shared.get_slot_mut(id);
     229            0 :         debug!(
     230            0 :             "Record standby reply: ts={} apply_lsn={}",
     231              :             reply.reply_ts, reply.apply_lsn
     232              :         );
     233            0 :         match &mut slot.get_mut_feedback() {
     234            0 :             ReplicationFeedback::Standby(sf) => sf.reply = *reply,
     235              :             ReplicationFeedback::Pageserver(_) => {
     236            0 :                 *slot.get_mut_feedback() = ReplicationFeedback::Standby(StandbyFeedback {
     237            0 :                     reply: *reply,
     238            0 :                     hs_feedback: HotStandbyFeedback::empty(),
     239            0 :                 })
     240              :             }
     241              :         }
     242            0 :     }
     243              : 
     244              :     /// Record hot standby feedback, update aggregated value.
     245            0 :     fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
     246            0 :         let mut shared = self.mutex.lock();
     247            0 :         let slot = shared.get_slot_mut(id);
     248            0 :         match &mut slot.get_mut_feedback() {
     249            0 :             ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
     250              :             ReplicationFeedback::Pageserver(_) => {
     251            0 :                 *slot.get_mut_feedback() = ReplicationFeedback::Standby(StandbyFeedback {
     252            0 :                     reply: StandbyReply::empty(),
     253            0 :                     hs_feedback: *feedback,
     254            0 :                 })
     255              :             }
     256              :         }
     257            0 :         shared.update_reply_feedback();
     258            0 :     }
     259              : 
     260              :     /// Get remote_consistent_lsn reported by the pageserver. Returns None if
     261              :     /// client is not pageserver.
     262            0 :     pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
     263            0 :         let shared = self.mutex.lock();
     264            0 :         let slot = shared.get_slot(id);
     265            0 :         match slot.get_feedback() {
     266            0 :             ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
     267            0 :             _ => None,
     268              :         }
     269            0 :     }
     270              : 
     271              :     /// Unregister walsender.
     272            0 :     fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
     273            0 :         let mut shared = self.mutex.lock();
     274            0 :         shared.slots[id] = None;
     275            0 :         shared.update_reply_feedback();
     276            0 :     }
     277              : }
     278              : 
     279              : struct WalSendersShared {
     280              :     // aggregated over all walsenders value
     281              :     agg_standby_feedback: StandbyFeedback,
     282              :     // last feedback ever received from any pageserver, empty if none
     283              :     last_ps_feedback: PageserverFeedback,
     284              :     // total counter of pageserver feedbacks received
     285              :     ps_feedback_counter: u64,
     286              :     // Hadron: true iff we received a pageserver feedback that incidated
     287              :     // data corruption in the timeline
     288              :     ps_corruption_detected: bool,
     289              :     slots: Vec<Option<WalSenderState>>,
     290              : }
     291              : 
     292              : /// Safekeeper internal definitions of wal sender state
     293              : ///
     294              : /// As opposed to [`safekeeper_api::models::WalSenderState`] these struct may
     295              : /// include state that we don not wish to expose to the public api.
     296              : #[derive(Debug, Clone)]
     297              : pub(crate) enum WalSenderState {
     298              :     Vanilla(VanillaWalSenderInternalState),
     299              :     Interpreted(InterpretedWalSenderInternalState),
     300              : }
     301              : 
     302              : type VanillaWalSenderInternalState = safekeeper_api::models::VanillaWalSenderState;
     303              : 
     304              : #[derive(Debug, Clone)]
     305              : pub(crate) struct InterpretedWalSenderInternalState {
     306              :     public_state: safekeeper_api::models::InterpretedWalSenderState,
     307              :     interpreted_wal_reader: Option<Arc<InterpretedWalReaderHandle>>,
     308              : }
     309              : 
     310              : impl WalSenderState {
     311            0 :     fn get_addr(&self) -> &SocketAddr {
     312            0 :         match self {
     313            0 :             WalSenderState::Vanilla(state) => &state.addr,
     314            0 :             WalSenderState::Interpreted(state) => &state.public_state.addr,
     315              :         }
     316            0 :     }
     317              : 
     318            4 :     fn get_feedback(&self) -> &ReplicationFeedback {
     319            4 :         match self {
     320            4 :             WalSenderState::Vanilla(state) => &state.feedback,
     321            0 :             WalSenderState::Interpreted(state) => &state.public_state.feedback,
     322              :         }
     323            4 :     }
     324              : 
     325            0 :     fn get_mut_feedback(&mut self) -> &mut ReplicationFeedback {
     326            0 :         match self {
     327            0 :             WalSenderState::Vanilla(state) => &mut state.feedback,
     328            0 :             WalSenderState::Interpreted(state) => &mut state.public_state.feedback,
     329              :         }
     330            0 :     }
     331              : }
     332              : 
     333              : impl WalSendersShared {
     334            7 :     fn new() -> Self {
     335            7 :         WalSendersShared {
     336            7 :             agg_standby_feedback: StandbyFeedback::empty(),
     337            7 :             last_ps_feedback: PageserverFeedback::empty(),
     338            7 :             ps_feedback_counter: 0,
     339            7 :             ps_corruption_detected: false,
     340            7 :             slots: Vec::new(),
     341            7 :         }
     342            7 :     }
     343              : 
     344              :     /// Get content of provided id slot, it must exist.
     345            0 :     fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
     346            0 :         self.slots[id].as_ref().expect("walsender doesn't exist")
     347            0 :     }
     348              : 
     349              :     /// Get mut content of provided id slot, it must exist.
     350            0 :     fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
     351            0 :         self.slots[id].as_mut().expect("walsender doesn't exist")
     352            0 :     }
     353              : 
     354              :     /// Update aggregated hot standy and normal reply feedbacks. We just take min of valid xmins
     355              :     /// and ts.
     356            2 :     fn update_reply_feedback(&mut self) {
     357            2 :         let mut agg = HotStandbyFeedback::empty();
     358            2 :         let mut reply_agg = StandbyReply::empty();
     359            4 :         for ws_state in self.slots.iter().flatten() {
     360            4 :             if let ReplicationFeedback::Standby(standby_feedback) = ws_state.get_feedback() {
     361            4 :                 let hs_feedback = standby_feedback.hs_feedback;
     362              :                 // doing Option math like op1.iter().chain(op2.iter()).min()
     363              :                 // would be nicer, but we serialize/deserialize this struct
     364              :                 // directly, so leave as is for now
     365            4 :                 if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
     366            2 :                     if agg.xmin != INVALID_FULL_TRANSACTION_ID {
     367            1 :                         agg.xmin = min(agg.xmin, hs_feedback.xmin);
     368            1 :                     } else {
     369            1 :                         agg.xmin = hs_feedback.xmin;
     370            1 :                     }
     371            2 :                     agg.ts = max(agg.ts, hs_feedback.ts);
     372            2 :                 }
     373            4 :                 if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     374            0 :                     if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     375            0 :                         agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
     376            0 :                     } else {
     377            0 :                         agg.catalog_xmin = hs_feedback.catalog_xmin;
     378            0 :                     }
     379            0 :                     agg.ts = max(agg.ts, hs_feedback.ts);
     380            4 :                 }
     381            4 :                 let reply = standby_feedback.reply;
     382            4 :                 if reply.write_lsn != Lsn::INVALID {
     383            0 :                     if reply_agg.write_lsn != Lsn::INVALID {
     384            0 :                         reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn);
     385            0 :                     } else {
     386            0 :                         reply_agg.write_lsn = reply.write_lsn;
     387            0 :                     }
     388            4 :                 }
     389            4 :                 if reply.flush_lsn != Lsn::INVALID {
     390            0 :                     if reply_agg.flush_lsn != Lsn::INVALID {
     391            0 :                         reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn);
     392            0 :                     } else {
     393            0 :                         reply_agg.flush_lsn = reply.flush_lsn;
     394            0 :                     }
     395            4 :                 }
     396            4 :                 if reply.apply_lsn != Lsn::INVALID {
     397            0 :                     if reply_agg.apply_lsn != Lsn::INVALID {
     398            0 :                         reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn);
     399            0 :                     } else {
     400            0 :                         reply_agg.apply_lsn = reply.apply_lsn;
     401            0 :                     }
     402            4 :                 }
     403            4 :                 if reply.reply_ts != 0 {
     404            0 :                     if reply_agg.reply_ts != 0 {
     405            0 :                         reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts);
     406            0 :                     } else {
     407            0 :                         reply_agg.reply_ts = reply.reply_ts;
     408            0 :                     }
     409            4 :                 }
     410            0 :             }
     411              :         }
     412            2 :         self.agg_standby_feedback = StandbyFeedback {
     413            2 :             reply: reply_agg,
     414            2 :             hs_feedback: agg,
     415            2 :         };
     416            2 :     }
     417              : }
     418              : 
     419              : // id of the occupied slot in WalSenders to access it (and save in the
     420              : // WalSenderGuard). We could give Arc directly to the slot, but there is not
     421              : // much sense in that as values aggregation which is performed on each feedback
     422              : // receival iterates over all walsenders.
     423              : pub type WalSenderId = usize;
     424              : 
     425              : /// Scope guard to access slot in WalSenders registry and unregister from it in
     426              : /// Drop.
     427              : pub struct WalSenderGuard {
     428              :     id: WalSenderId,
     429              :     walsenders: Arc<WalSenders>,
     430              : }
     431              : 
     432              : impl WalSenderGuard {
     433            0 :     pub fn id(&self) -> WalSenderId {
     434            0 :         self.id
     435            0 :     }
     436              : 
     437            0 :     pub fn walsenders(&self) -> &Arc<WalSenders> {
     438            0 :         &self.walsenders
     439            0 :     }
     440              : }
     441              : 
     442              : impl Drop for WalSenderGuard {
     443            0 :     fn drop(&mut self) {
     444            0 :         self.walsenders.unregister(self.id);
     445            0 :     }
     446              : }
     447              : 
     448              : impl SafekeeperPostgresHandler {
     449              :     /// Wrapper around handle_start_replication_guts handling result. Error is
     450              :     /// handled here while we're still in walsender ttid span; with API
     451              :     /// extension, this can probably be moved into postgres_backend.
     452            0 :     pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin + Send>(
     453            0 :         &mut self,
     454            0 :         pgb: &mut PostgresBackend<IO>,
     455            0 :         start_pos: Lsn,
     456            0 :         term: Option<Term>,
     457            0 :     ) -> Result<(), QueryError> {
     458            0 :         let tli = self
     459            0 :             .global_timelines
     460            0 :             .get(self.ttid)
     461            0 :             .map_err(|e| QueryError::Other(e.into()))?;
     462            0 :         let residence_guard = tli.wal_residence_guard().await?;
     463              : 
     464            0 :         if let Err(end) = self
     465            0 :             .handle_start_replication_guts(pgb, start_pos, term, residence_guard)
     466            0 :             .await
     467              :         {
     468            0 :             let info = tli.get_safekeeper_info(&self.conf).await;
     469              :             // Log the result and probably send it to the client, closing the stream.
     470            0 :             pgb.handle_copy_stream_end(end)
     471            0 :             .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.flush_lsn)))
     472            0 :             .await;
     473            0 :         }
     474            0 :         Ok(())
     475            0 :     }
     476              : 
     477            0 :     pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin + Send>(
     478            0 :         &mut self,
     479            0 :         pgb: &mut PostgresBackend<IO>,
     480            0 :         start_pos: Lsn,
     481            0 :         term: Option<Term>,
     482            0 :         tli: WalResidentTimeline,
     483            0 :     ) -> Result<(), CopyStreamHandlerEnd> {
     484            0 :         let appname = self.appname.clone();
     485              : 
     486              :         // Use a guard object to remove our entry from the timeline when we are done.
     487            0 :         let ws_guard = match self.protocol() {
     488            0 :             PostgresClientProtocol::Vanilla => Arc::new(tli.get_walsenders().register(
     489            0 :                 WalSenderState::Vanilla(VanillaWalSenderInternalState {
     490            0 :                     ttid: self.ttid,
     491            0 :                     addr: *pgb.get_peer_addr(),
     492            0 :                     conn_id: self.conn_id,
     493            0 :                     appname: self.appname.clone(),
     494            0 :                     feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
     495            0 :                 }),
     496              :             )),
     497            0 :             PostgresClientProtocol::Interpreted { .. } => Arc::new(tli.get_walsenders().register(
     498            0 :                 WalSenderState::Interpreted(InterpretedWalSenderInternalState {
     499            0 :                     public_state: safekeeper_api::models::InterpretedWalSenderState {
     500            0 :                         ttid: self.ttid,
     501            0 :                         shard: self.shard.unwrap(),
     502            0 :                         addr: *pgb.get_peer_addr(),
     503            0 :                         conn_id: self.conn_id,
     504            0 :                         appname: self.appname.clone(),
     505            0 :                         feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
     506            0 :                     },
     507            0 :                     interpreted_wal_reader: None,
     508            0 :                 }),
     509              :             )),
     510              :         };
     511              : 
     512              :         // Walsender can operate in one of two modes which we select by
     513              :         // application_name: give only committed WAL (used by pageserver) or all
     514              :         // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
     515              :         // The second case is always driven by a consensus leader which term
     516              :         // must be supplied.
     517            0 :         let end_watch = if term.is_some() {
     518            0 :             EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
     519              :         } else {
     520            0 :             EndWatch::Commit(tli.get_commit_lsn_watch_rx())
     521              :         };
     522              :         // we don't check term here; it will be checked on first waiting/WAL reading anyway.
     523            0 :         let end_pos = end_watch.get();
     524              : 
     525            0 :         if end_pos < start_pos {
     526            0 :             info!(
     527            0 :                 "requested start_pos {} is ahead of available WAL end_pos {}",
     528              :                 start_pos, end_pos
     529              :             );
     530            0 :         }
     531              : 
     532            0 :         info!(
     533            0 :             "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={:?}",
     534              :             start_pos,
     535              :             end_pos,
     536            0 :             matches!(end_watch, EndWatch::Flush(_)),
     537              :             appname,
     538            0 :             self.protocol(),
     539              :         );
     540              : 
     541              :         // switch to copy
     542            0 :         pgb.write_message(&BeMessage::CopyBothResponse).await?;
     543              : 
     544            0 :         let wal_reader = tli.get_walreader(start_pos).await?;
     545              : 
     546              :         // Split to concurrently receive and send data; replies are generally
     547              :         // not synchronized with sends, so this avoids deadlocks.
     548            0 :         let reader = pgb.split().context("START_REPLICATION split")?;
     549              : 
     550            0 :         let send_fut = match self.protocol() {
     551              :             PostgresClientProtocol::Vanilla => {
     552            0 :                 let sender = WalSender {
     553            0 :                     pgb,
     554              :                     // should succeed since we're already holding another guard
     555            0 :                     tli: tli.wal_residence_guard().await?,
     556            0 :                     appname: appname.clone(),
     557            0 :                     start_pos,
     558            0 :                     end_pos,
     559            0 :                     term,
     560            0 :                     end_watch,
     561            0 :                     ws_guard: ws_guard.clone(),
     562            0 :                     wal_reader,
     563            0 :                     send_buf: vec![0u8; MAX_SEND_SIZE],
     564              :                 };
     565              : 
     566            0 :                 FutureExt::boxed(sender.run())
     567              :             }
     568              :             PostgresClientProtocol::Interpreted {
     569            0 :                 format,
     570            0 :                 compression,
     571              :             } => {
     572            0 :                 let pg_version =
     573            0 :                     PgMajorVersion::try_from(tli.tli.get_state().await.1.server.pg_version)
     574            0 :                         .unwrap();
     575            0 :                 let end_watch_view = end_watch.view();
     576            0 :                 let wal_residence_guard = tli.wal_residence_guard().await?;
     577            0 :                 let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(2);
     578            0 :                 let shard = self.shard.unwrap();
     579              : 
     580            0 :                 if self.conf.wal_reader_fanout && !shard.is_unsharded() {
     581            0 :                     let ws_id = ws_guard.id();
     582            0 :                     ws_guard.walsenders().create_or_update_interpreted_reader(
     583            0 :                         ws_id,
     584            0 :                         start_pos,
     585            0 :                         self.conf.max_delta_for_fanout,
     586              :                         {
     587            0 :                             let tx = tx.clone();
     588            0 :                             |reader| {
     589            0 :                                 tracing::info!(
     590            0 :                                     "Fanning out interpreted wal reader at {}",
     591              :                                     start_pos
     592              :                                 );
     593            0 :                                 reader
     594            0 :                                     .fanout(shard, tx, start_pos)
     595            0 :                                     .with_context(|| "Failed to fan out reader")
     596            0 :                             }
     597              :                         },
     598            0 :                         || {
     599            0 :                             tracing::info!("Spawning interpreted wal reader at {}", start_pos);
     600              : 
     601            0 :                             let wal_stream = StreamingWalReader::new(
     602            0 :                                 wal_residence_guard,
     603            0 :                                 term,
     604            0 :                                 start_pos,
     605            0 :                                 end_pos,
     606            0 :                                 end_watch,
     607              :                                 MAX_SEND_SIZE,
     608              :                             );
     609              : 
     610            0 :                             InterpretedWalReader::spawn(
     611            0 :                                 wal_stream, start_pos, tx, shard, pg_version, &appname,
     612              :                             )
     613            0 :                         },
     614            0 :                     )?;
     615              : 
     616            0 :                     let sender = InterpretedWalSender {
     617            0 :                         format,
     618            0 :                         compression,
     619            0 :                         appname,
     620            0 :                         tli: tli.wal_residence_guard().await?,
     621            0 :                         start_lsn: start_pos,
     622            0 :                         pgb,
     623            0 :                         end_watch_view,
     624            0 :                         wal_sender_guard: ws_guard.clone(),
     625            0 :                         rx,
     626              :                     };
     627              : 
     628            0 :                     FutureExt::boxed(sender.run())
     629              :                 } else {
     630            0 :                     let wal_reader = StreamingWalReader::new(
     631            0 :                         wal_residence_guard,
     632            0 :                         term,
     633            0 :                         start_pos,
     634            0 :                         end_pos,
     635            0 :                         end_watch,
     636              :                         MAX_SEND_SIZE,
     637              :                     );
     638              : 
     639            0 :                     let reader = InterpretedWalReader::new(
     640            0 :                         wal_reader, start_pos, tx, shard, pg_version, None,
     641              :                     );
     642              : 
     643            0 :                     let sender = InterpretedWalSender {
     644            0 :                         format,
     645            0 :                         compression,
     646            0 :                         appname: appname.clone(),
     647            0 :                         tli: tli.wal_residence_guard().await?,
     648            0 :                         start_lsn: start_pos,
     649            0 :                         pgb,
     650            0 :                         end_watch_view,
     651            0 :                         wal_sender_guard: ws_guard.clone(),
     652            0 :                         rx,
     653              :                     };
     654              : 
     655            0 :                     FutureExt::boxed(async move {
     656              :                         // Sender returns an Err on all code paths.
     657              :                         // If the sender finishes first, we will drop the reader future.
     658              :                         // If the reader finishes first, the sender will finish too since
     659              :                         // the wal sender has dropped.
     660            0 :                         let res = tokio::try_join!(sender.run(), reader.run(start_pos, &appname));
     661            0 :                         match res.map(|_| ()) {
     662            0 :                             Ok(_) => unreachable!("sender finishes with Err by convention"),
     663            0 :                             err_res => err_res,
     664              :                         }
     665            0 :                     })
     666              :                 }
     667              :             }
     668              :         };
     669              : 
     670            0 :         let tli_cancel = tli.cancel.clone();
     671              : 
     672            0 :         let mut reply_reader = ReplyReader {
     673            0 :             reader,
     674            0 :             ws_guard: ws_guard.clone(),
     675            0 :             tli,
     676            0 :         };
     677              : 
     678            0 :         let res = tokio::select! {
     679              :             // todo: add read|write .context to these errors
     680            0 :             r = send_fut => r,
     681            0 :             r = reply_reader.run() => r,
     682            0 :             _ = tli_cancel.cancelled() => {
     683            0 :                 return Err(CopyStreamHandlerEnd::Cancelled);
     684              :             }
     685              :         };
     686              : 
     687            0 :         let ws_state = ws_guard
     688            0 :             .walsenders
     689            0 :             .mutex
     690            0 :             .lock()
     691            0 :             .get_slot(ws_guard.id)
     692            0 :             .clone();
     693            0 :         info!(
     694            0 :             "finished streaming to {}, feedback={:?}",
     695            0 :             ws_state.get_addr(),
     696            0 :             ws_state.get_feedback(),
     697              :         );
     698              : 
     699              :         // Join pg backend back.
     700            0 :         pgb.unsplit(reply_reader.reader)?;
     701              : 
     702            0 :         res
     703            0 :     }
     704              : }
     705              : 
     706              : /// TODO(vlad): maybe lift this instead
     707              : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
     708              : /// given term (recovery by walproposer or peer safekeeper).
     709              : #[derive(Clone)]
     710              : pub(crate) enum EndWatch {
     711              :     Commit(Receiver<Lsn>),
     712              :     Flush(Receiver<TermLsn>),
     713              : }
     714              : 
     715              : impl EndWatch {
     716            0 :     pub(crate) fn view(&self) -> EndWatchView {
     717            0 :         EndWatchView(self.clone())
     718            0 :     }
     719              : 
     720              :     /// Get current end of WAL.
     721           12 :     pub(crate) fn get(&self) -> Lsn {
     722           12 :         match self {
     723           12 :             EndWatch::Commit(r) => *r.borrow(),
     724            0 :             EndWatch::Flush(r) => r.borrow().lsn,
     725              :         }
     726           12 :     }
     727              : 
     728              :     /// Wait for the update.
     729            7 :     pub(crate) async fn changed(&mut self) -> anyhow::Result<()> {
     730            7 :         match self {
     731            7 :             EndWatch::Commit(r) => r.changed().await?,
     732            0 :             EndWatch::Flush(r) => r.changed().await?,
     733              :         }
     734            3 :         Ok(())
     735            3 :     }
     736              : 
     737            4 :     pub(crate) async fn wait_for_lsn(
     738            4 :         &mut self,
     739            4 :         lsn: Lsn,
     740            4 :         client_term: Option<Term>,
     741            4 :     ) -> anyhow::Result<Lsn> {
     742              :         loop {
     743            7 :             let end_pos = self.get();
     744            7 :             if end_pos > lsn {
     745            0 :                 return Ok(end_pos);
     746            7 :             }
     747            7 :             if let EndWatch::Flush(rx) = &self {
     748            0 :                 let curr_term = rx.borrow().term;
     749            0 :                 if let Some(client_term) = client_term {
     750            0 :                     if curr_term != client_term {
     751            0 :                         bail!("term changed: requested {}, now {}", client_term, curr_term);
     752            0 :                     }
     753            0 :                 }
     754            7 :             }
     755            7 :             self.changed().await?;
     756              :         }
     757            0 :     }
     758              : }
     759              : 
     760              : pub(crate) struct EndWatchView(EndWatch);
     761              : 
     762              : impl EndWatchView {
     763            0 :     pub(crate) fn get(&self) -> Lsn {
     764            0 :         self.0.get()
     765            0 :     }
     766              : }
     767              : /// A half driving sending WAL.
     768              : struct WalSender<'a, IO> {
     769              :     pgb: &'a mut PostgresBackend<IO>,
     770              :     tli: WalResidentTimeline,
     771              :     appname: Option<String>,
     772              :     // Position since which we are sending next chunk.
     773              :     start_pos: Lsn,
     774              :     // WAL up to this position is known to be locally available.
     775              :     // Usually this is the same as the latest commit_lsn, but in case of
     776              :     // walproposer recovery, this is flush_lsn.
     777              :     //
     778              :     // We send this LSN to the receiver as wal_end, so that it knows how much
     779              :     // WAL this safekeeper has. This LSN should be as fresh as possible.
     780              :     end_pos: Lsn,
     781              :     /// When streaming uncommitted part, the term the client acts as the leader
     782              :     /// in. Streaming is stopped if local term changes to a different (higher)
     783              :     /// value.
     784              :     term: Option<Term>,
     785              :     /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
     786              :     end_watch: EndWatch,
     787              :     ws_guard: Arc<WalSenderGuard>,
     788              :     wal_reader: WalReader,
     789              :     // buffer for readling WAL into to send it
     790              :     send_buf: Vec<u8>,
     791              : }
     792              : 
     793              : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
     794              : 
     795              : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
     796              :     /// Send WAL until
     797              :     /// - an error occurs
     798              :     /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
     799              :     /// - timeline's cancellation token fires
     800              :     ///
     801              :     /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
     802              :     /// convenience.
     803            0 :     async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
     804            0 :         let metric = WAL_READERS
     805            0 :             .get_metric_with_label_values(&[
     806            0 :                 "future",
     807            0 :                 self.appname.as_deref().unwrap_or("safekeeper"),
     808            0 :             ])
     809            0 :             .unwrap();
     810              : 
     811            0 :         metric.inc();
     812            0 :         scopeguard::defer! {
     813              :             metric.dec();
     814              :         }
     815              : 
     816              :         loop {
     817              :             // Wait for the next portion if it is not there yet, or just
     818              :             // update our end of WAL available for sending value, we
     819              :             // communicate it to the receiver.
     820            0 :             self.wait_wal().await?;
     821            0 :             assert!(
     822            0 :                 self.end_pos > self.start_pos,
     823            0 :                 "nothing to send after waiting for WAL"
     824              :             );
     825              : 
     826              :             // try to send as much as available, capped by MAX_SEND_SIZE
     827            0 :             let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
     828              :             // if we went behind available WAL, back off
     829            0 :             if chunk_end_pos >= self.end_pos {
     830            0 :                 chunk_end_pos = self.end_pos;
     831            0 :             } else {
     832            0 :                 // If sending not up to end pos, round down to page boundary to
     833            0 :                 // avoid breaking WAL record not at page boundary, as protocol
     834            0 :                 // demands. See walsender.c (XLogSendPhysical).
     835            0 :                 chunk_end_pos = chunk_end_pos
     836            0 :                     .checked_sub(chunk_end_pos.block_offset())
     837            0 :                     .unwrap();
     838            0 :             }
     839            0 :             let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
     840            0 :             let send_buf = &mut self.send_buf[..send_size];
     841              :             let send_size: usize;
     842              :             {
     843              :                 // If uncommitted part is being pulled, check that the term is
     844              :                 // still the expected one.
     845            0 :                 let _term_guard = if let Some(t) = self.term {
     846            0 :                     Some(self.tli.acquire_term(t).await?)
     847              :                 } else {
     848            0 :                     None
     849              :                 };
     850              :                 // Read WAL into buffer. send_size can be additionally capped to
     851              :                 // segment boundary here.
     852            0 :                 send_size = self.wal_reader.read(send_buf).await?
     853              :             };
     854            0 :             let send_buf = &send_buf[..send_size];
     855              : 
     856              :             // and send it, while respecting Timeline::cancel
     857            0 :             let msg = BeMessage::XLogData(XLogDataBody {
     858            0 :                 wal_start: self.start_pos.0,
     859            0 :                 wal_end: self.end_pos.0,
     860            0 :                 timestamp: get_current_timestamp(),
     861            0 :                 data: send_buf,
     862            0 :             });
     863            0 :             self.pgb.write_message(&msg).await?;
     864              : 
     865            0 :             if let Some(appname) = &self.appname {
     866            0 :                 if appname == "replica" {
     867            0 :                     failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
     868            0 :                 }
     869            0 :             }
     870            0 :             trace!(
     871            0 :                 "sent {} bytes of WAL {}-{}",
     872              :                 send_size,
     873              :                 self.start_pos,
     874            0 :                 self.start_pos + send_size as u64
     875              :             );
     876            0 :             self.start_pos += send_size as u64;
     877              :         }
     878            0 :     }
     879              : 
     880              :     /// wait until we have WAL to stream, sending keepalives and checking for
     881              :     /// exit in the meanwhile
     882            0 :     async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     883              :         loop {
     884            0 :             self.end_pos = self.end_watch.get();
     885            0 :             let have_something_to_send = (|| {
     886            0 :                 fail::fail_point!(
     887            0 :                     "sk-pause-send",
     888            0 :                     self.appname.as_deref() != Some("pageserver"),
     889            0 :                     |_| { false }
     890              :                 );
     891            0 :                 self.end_pos > self.start_pos
     892              :             })();
     893              : 
     894            0 :             if have_something_to_send {
     895            0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     896            0 :                 return Ok(());
     897            0 :             }
     898              : 
     899              :             // Wait for WAL to appear, now self.end_pos == self.start_pos.
     900            0 :             if let Some(lsn) = self.wait_for_lsn().await? {
     901            0 :                 self.end_pos = lsn;
     902            0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     903            0 :                 return Ok(());
     904            0 :             }
     905              : 
     906              :             // Timed out waiting for WAL, check for termination and send KA.
     907              :             // Check for termination only if we are streaming up to commit_lsn
     908              :             // (to pageserver).
     909            0 :             if let EndWatch::Commit(_) = self.end_watch {
     910            0 :                 if let Some(remote_consistent_lsn) = self
     911            0 :                     .ws_guard
     912            0 :                     .walsenders
     913            0 :                     .get_ws_remote_consistent_lsn(self.ws_guard.id)
     914              :                 {
     915            0 :                     if self.tli.should_walsender_stop(remote_consistent_lsn).await {
     916              :                         // Terminate if there is nothing more to send.
     917              :                         // Note that "ending streaming" part of the string is used by
     918              :                         // pageserver to identify WalReceiverError::SuccessfulCompletion,
     919              :                         // do not change this string without updating pageserver.
     920            0 :                         return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
     921            0 :                             "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
     922            0 :                             self.appname, self.start_pos,
     923            0 :                         )));
     924            0 :                     }
     925            0 :                 }
     926            0 :             }
     927              : 
     928            0 :             let msg = BeMessage::KeepAlive(WalSndKeepAlive {
     929            0 :                 wal_end: self.end_pos.0,
     930            0 :                 timestamp: get_current_timestamp(),
     931            0 :                 request_reply: true,
     932            0 :             });
     933              : 
     934            0 :             self.pgb.write_message(&msg).await?;
     935              :         }
     936            0 :     }
     937              : 
     938              :     /// Wait until we have available WAL > start_pos or timeout expires. Returns
     939              :     /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
     940              :     /// - Ok(None) if timeout expired;
     941              :     /// - Err in case of error -- only if 1) term changed while fetching in recovery
     942              :     ///   mode 2) watch channel closed, which must never happen.
     943            0 :     async fn wait_for_lsn(&mut self) -> anyhow::Result<Option<Lsn>> {
     944            0 :         let fp = (|| {
     945            0 :             fail::fail_point!(
     946            0 :                 "sk-pause-send",
     947            0 :                 self.appname.as_deref() != Some("pageserver"),
     948            0 :                 |_| { true }
     949              :             );
     950            0 :             false
     951              :         })();
     952            0 :         if fp {
     953            0 :             tokio::time::sleep(POLL_STATE_TIMEOUT).await;
     954            0 :             return Ok(None);
     955            0 :         }
     956              : 
     957            0 :         let res = timeout(POLL_STATE_TIMEOUT, async move {
     958              :             loop {
     959            0 :                 let end_pos = self.end_watch.get();
     960            0 :                 if end_pos > self.start_pos {
     961            0 :                     return Ok(end_pos);
     962            0 :                 }
     963            0 :                 if let EndWatch::Flush(rx) = &self.end_watch {
     964            0 :                     let curr_term = rx.borrow().term;
     965            0 :                     if let Some(client_term) = self.term {
     966            0 :                         if curr_term != client_term {
     967            0 :                             bail!("term changed: requested {}, now {}", client_term, curr_term);
     968            0 :                         }
     969            0 :                     }
     970            0 :                 }
     971            0 :                 self.end_watch.changed().await?;
     972              :             }
     973            0 :         })
     974            0 :         .await;
     975              : 
     976            0 :         match res {
     977              :             // success
     978            0 :             Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
     979              :             // error inside closure
     980            0 :             Ok(Err(err)) => Err(err),
     981              :             // timeout
     982            0 :             Err(_) => Ok(None),
     983              :         }
     984            0 :     }
     985              : }
     986              : 
     987              : /// A half driving receiving replies.
     988              : struct ReplyReader<IO> {
     989              :     reader: PostgresBackendReader<IO>,
     990              :     ws_guard: Arc<WalSenderGuard>,
     991              :     tli: WalResidentTimeline,
     992              : }
     993              : 
     994              : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
     995            0 :     async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     996              :         loop {
     997            0 :             let msg = self.reader.read_copy_message().await?;
     998            0 :             self.handle_feedback(&msg).await?
     999              :         }
    1000            0 :     }
    1001              : 
    1002            0 :     async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
    1003            0 :         match msg.first().cloned() {
    1004              :             Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
    1005              :                 // Note: deserializing is on m[1..] because we skip the tag byte.
    1006            0 :                 let mut hs_feedback = HotStandbyFeedback::des(&msg[1..])
    1007            0 :                     .context("failed to deserialize HotStandbyFeedback")?;
    1008              :                 // TODO: xmin/catalog_xmin are serialized by walreceiver.c in this way:
    1009              :                 // pq_sendint32(&reply_message, xmin);
    1010              :                 // pq_sendint32(&reply_message, xmin_epoch);
    1011              :                 // So it is two big endian 32-bit words in low endian order!
    1012            0 :                 hs_feedback.xmin = hs_feedback.xmin.rotate_left(32);
    1013            0 :                 hs_feedback.catalog_xmin = hs_feedback.catalog_xmin.rotate_left(32);
    1014            0 :                 self.ws_guard
    1015            0 :                     .walsenders
    1016            0 :                     .record_hs_feedback(self.ws_guard.id, &hs_feedback);
    1017              :             }
    1018              :             Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
    1019            0 :                 let reply =
    1020            0 :                     StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
    1021            0 :                 self.ws_guard
    1022            0 :                     .walsenders
    1023            0 :                     .record_standby_reply(self.ws_guard.id, &reply);
    1024              :             }
    1025              :             Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
    1026              :                 // pageserver sends this.
    1027              :                 // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
    1028            0 :                 let buf = Bytes::copy_from_slice(&msg[9..]);
    1029            0 :                 let ps_feedback = PageserverFeedback::parse(buf);
    1030              : 
    1031            0 :                 trace!("PageserverFeedback is {:?}", ps_feedback);
    1032            0 :                 self.ws_guard
    1033            0 :                     .walsenders
    1034            0 :                     .record_ps_feedback(self.ws_guard.id, &ps_feedback);
    1035            0 :                 self.tli
    1036            0 :                     .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
    1037            0 :                     .await;
    1038              :                 // in principle new remote_consistent_lsn could allow to
    1039              :                 // deactivate the timeline, but we check that regularly through
    1040              :                 // broker updated, not need to do it here
    1041              :             }
    1042            0 :             _ => warn!("unexpected message {:?}", msg),
    1043              :         }
    1044            0 :         Ok(())
    1045            0 :     }
    1046              : }
    1047              : 
    1048              : #[cfg(test)]
    1049              : mod tests {
    1050              :     use safekeeper_api::models::FullTransactionId;
    1051              :     use utils::id::{TenantId, TenantTimelineId, TimelineId};
    1052              : 
    1053              :     use super::*;
    1054              : 
    1055            4 :     fn mock_ttid() -> TenantTimelineId {
    1056            4 :         TenantTimelineId {
    1057            4 :             tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
    1058            4 :             timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
    1059            4 :         }
    1060            4 :     }
    1061              : 
    1062            4 :     fn mock_addr() -> SocketAddr {
    1063            4 :         "127.0.0.1:8080".parse().unwrap()
    1064            4 :     }
    1065              : 
    1066              :     // add to wss specified feedback setting other fields to dummy values
    1067            4 :     fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
    1068            4 :         let walsender_state = WalSenderState::Vanilla(VanillaWalSenderInternalState {
    1069            4 :             ttid: mock_ttid(),
    1070            4 :             addr: mock_addr(),
    1071            4 :             conn_id: 1,
    1072            4 :             appname: None,
    1073            4 :             feedback,
    1074            4 :         });
    1075            4 :         wss.slots.push(Some(walsender_state))
    1076            4 :     }
    1077              : 
    1078              :     // form standby feedback with given hot standby feedback ts/xmin and the
    1079              :     // rest set to dummy values.
    1080            4 :     fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
    1081            4 :         ReplicationFeedback::Standby(StandbyFeedback {
    1082            4 :             reply: StandbyReply::empty(),
    1083            4 :             hs_feedback: HotStandbyFeedback {
    1084            4 :                 ts,
    1085            4 :                 xmin,
    1086            4 :                 catalog_xmin: 0,
    1087            4 :             },
    1088            4 :         })
    1089            4 :     }
    1090              : 
    1091              :     // test that hs aggregation works as expected
    1092              :     #[test]
    1093            1 :     fn test_hs_feedback_no_valid() {
    1094            1 :         let mut wss = WalSendersShared::new();
    1095            1 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
    1096            1 :         wss.update_reply_feedback();
    1097            1 :         assert_eq!(
    1098              :             wss.agg_standby_feedback.hs_feedback.xmin,
    1099              :             INVALID_FULL_TRANSACTION_ID
    1100              :         );
    1101            1 :     }
    1102              : 
    1103              :     #[test]
    1104            1 :     fn test_hs_feedback() {
    1105            1 :         let mut wss = WalSendersShared::new();
    1106            1 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
    1107            1 :         push_feedback(&mut wss, hs_feedback(1, 42));
    1108            1 :         push_feedback(&mut wss, hs_feedback(1, 64));
    1109            1 :         wss.update_reply_feedback();
    1110            1 :         assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42);
    1111            1 :     }
    1112              : }
        

Generated by: LCOV version 2.1-beta