LCOV - code coverage report
Current view: top level - safekeeper/src - send_wal.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 93.5 % 507 474
Test Date: 2024-02-07 07:37:29 Functions: 45.3 % 161 73

            Line data    Source code
       1              : //! This module implements the streaming side of replication protocol, starting
       2              : //! with the "START_REPLICATION" message, and registry of walsenders.
       3              : 
       4              : use crate::handler::SafekeeperPostgresHandler;
       5              : use crate::safekeeper::{Term, TermLsn};
       6              : use crate::timeline::Timeline;
       7              : use crate::wal_service::ConnectionId;
       8              : use crate::wal_storage::WalReader;
       9              : use crate::GlobalTimelines;
      10              : use anyhow::{bail, Context as AnyhowContext};
      11              : use bytes::Bytes;
      12              : use parking_lot::Mutex;
      13              : use postgres_backend::PostgresBackend;
      14              : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
      15              : use postgres_ffi::get_current_timestamp;
      16              : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
      17              : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
      18              : use serde::{Deserialize, Serialize};
      19              : use tokio::io::{AsyncRead, AsyncWrite};
      20              : use utils::failpoint_support;
      21              : use utils::id::TenantTimelineId;
      22              : use utils::pageserver_feedback::PageserverFeedback;
      23              : 
      24              : use std::cmp::{max, min};
      25              : use std::net::SocketAddr;
      26              : use std::str;
      27              : use std::sync::Arc;
      28              : use std::time::Duration;
      29              : use tokio::sync::watch::Receiver;
      30              : use tokio::time::timeout;
      31              : use tracing::*;
      32              : use utils::{bin_ser::BeSer, lsn::Lsn};
      33              : 
      34              : // See: https://www.postgresql.org/docs/13/protocol-replication.html
      35              : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
      36              : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
      37              : // neon extension of replication protocol
      38              : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
      39              : 
      40              : type FullTransactionId = u64;
      41              : 
      42              : /// Hot standby feedback received from replica
      43           21 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      44              : pub struct HotStandbyFeedback {
      45              :     pub ts: TimestampTz,
      46              :     pub xmin: FullTransactionId,
      47              :     pub catalog_xmin: FullTransactionId,
      48              : }
      49              : 
      50              : const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
      51              : 
      52              : impl HotStandbyFeedback {
      53      1644387 :     pub fn empty() -> HotStandbyFeedback {
      54      1644387 :         HotStandbyFeedback {
      55      1644387 :             ts: 0,
      56      1644387 :             xmin: 0,
      57      1644387 :             catalog_xmin: 0,
      58      1644387 :         }
      59      1644387 :     }
      60              : }
      61              : 
      62              : /// Standby status update
      63        49159 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      64              : pub struct StandbyReply {
      65              :     pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
      66              :     pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
      67              :     pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
      68              :     pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
      69              :     pub reply_requested: bool,
      70              : }
      71              : 
      72              : impl StandbyReply {
      73         1068 :     fn empty() -> Self {
      74         1068 :         StandbyReply {
      75         1068 :             write_lsn: Lsn::INVALID,
      76         1068 :             flush_lsn: Lsn::INVALID,
      77         1068 :             apply_lsn: Lsn::INVALID,
      78         1068 :             reply_ts: 0,
      79         1068 :             reply_requested: false,
      80         1068 :         }
      81         1068 :     }
      82              : }
      83              : 
      84            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      85              : pub struct StandbyFeedback {
      86              :     pub reply: StandbyReply,
      87              :     pub hs_feedback: HotStandbyFeedback,
      88              : }
      89              : 
      90              : impl StandbyFeedback {
      91          622 :     pub fn empty() -> Self {
      92          622 :         StandbyFeedback {
      93          622 :             reply: StandbyReply::empty(),
      94          622 :             hs_feedback: HotStandbyFeedback::empty(),
      95          622 :         }
      96          622 :     }
      97              : }
      98              : 
      99              : /// WalSenders registry. Timeline holds it (wrapped in Arc).
     100              : pub struct WalSenders {
     101              :     mutex: Mutex<WalSendersShared>,
     102              : }
     103              : 
     104              : impl WalSenders {
     105          616 :     pub fn new() -> Arc<WalSenders> {
     106          616 :         Arc::new(WalSenders {
     107          616 :             mutex: Mutex::new(WalSendersShared::new()),
     108          616 :         })
     109          616 :     }
     110              : 
     111              :     /// Register new walsender. Returned guard provides access to the slot and
     112              :     /// automatically deregisters in Drop.
     113          768 :     fn register(
     114          768 :         self: &Arc<WalSenders>,
     115          768 :         ttid: TenantTimelineId,
     116          768 :         addr: SocketAddr,
     117          768 :         conn_id: ConnectionId,
     118          768 :         appname: Option<String>,
     119          768 :     ) -> WalSenderGuard {
     120          768 :         let slots = &mut self.mutex.lock().slots;
     121          768 :         let walsender_state = WalSenderState {
     122          768 :             ttid,
     123          768 :             addr,
     124          768 :             conn_id,
     125          768 :             appname,
     126          768 :             feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
     127          768 :         };
     128              :         // find empty slot or create new one
     129          768 :         let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
     130          248 :             slots[pos] = Some(walsender_state);
     131          248 :             pos
     132              :         } else {
     133          520 :             let pos = slots.len();
     134          520 :             slots.push(Some(walsender_state));
     135          520 :             pos
     136              :         };
     137          768 :         WalSenderGuard {
     138          768 :             id: pos,
     139          768 :             walsenders: self.clone(),
     140          768 :         }
     141          768 :     }
     142              : 
     143              :     /// Get state of all walsenders.
     144          257 :     pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
     145          257 :         self.mutex.lock().slots.iter().flatten().cloned().collect()
     146          257 :     }
     147              : 
     148              :     /// Get aggregated pageserver feedback.
     149           51 :     pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
     150           51 :         self.mutex.lock().agg_ps_feedback
     151           51 :     }
     152              : 
     153              :     /// Get aggregated pageserver and hot standby feedback (we send them to compute).
     154      1643317 :     pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, StandbyFeedback) {
     155      1643317 :         let shared = self.mutex.lock();
     156      1643317 :         (shared.agg_ps_feedback, shared.agg_standby_feedback)
     157      1643317 :     }
     158              : 
     159              :     /// Record new pageserver feedback, update aggregated values.
     160       795424 :     fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
     161       795424 :         let mut shared = self.mutex.lock();
     162       795424 :         shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
     163       795424 :         shared.update_ps_feedback();
     164       795424 :     }
     165              : 
     166              :     /// Record standby reply.
     167        49159 :     fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
     168        49159 :         let mut shared = self.mutex.lock();
     169        49159 :         let slot = shared.get_slot_mut(id);
     170        49159 :         info!(
     171        49159 :             "Record standby reply: ts={} apply_lsn={}",
     172        49159 :             reply.reply_ts, reply.apply_lsn
     173        49159 :         );
     174        49159 :         match &mut slot.feedback {
     175        49153 :             ReplicationFeedback::Standby(sf) => sf.reply = *reply,
     176              :             ReplicationFeedback::Pageserver(_) => {
     177            6 :                 slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
     178            6 :                     reply: *reply,
     179            6 :                     hs_feedback: HotStandbyFeedback::empty(),
     180            6 :                 })
     181              :             }
     182              :         }
     183        49159 :     }
     184              : 
     185              :     /// Record hot standby feedback, update aggregated value.
     186           21 :     fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
     187           21 :         let mut shared = self.mutex.lock();
     188           21 :         let slot = shared.get_slot_mut(id);
     189           21 :         match &mut slot.feedback {
     190           21 :             ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
     191              :             ReplicationFeedback::Pageserver(_) => {
     192            0 :                 slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
     193            0 :                     reply: StandbyReply::empty(),
     194            0 :                     hs_feedback: *feedback,
     195            0 :                 })
     196              :             }
     197              :         }
     198           21 :         shared.update_reply_feedback();
     199           21 :     }
     200              : 
     201              :     /// Get remote_consistent_lsn reported by the pageserver. Returns None if
     202              :     /// client is not pageserver.
     203         3356 :     fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
     204         3356 :         let shared = self.mutex.lock();
     205         3356 :         let slot = shared.get_slot(id);
     206         3356 :         match slot.feedback {
     207         3203 :             ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
     208          153 :             _ => None,
     209              :         }
     210         3356 :     }
     211              : 
     212              :     /// Unregister walsender.
     213          413 :     fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
     214          413 :         let mut shared = self.mutex.lock();
     215          413 :         shared.slots[id] = None;
     216          413 :         shared.update_reply_feedback();
     217          413 :     }
     218              : }
     219              : 
     220              : struct WalSendersShared {
     221              :     // aggregated over all walsenders value
     222              :     agg_standby_feedback: StandbyFeedback,
     223              :     // aggregated over all walsenders value
     224              :     agg_ps_feedback: PageserverFeedback,
     225              :     slots: Vec<Option<WalSenderState>>,
     226              : }
     227              : 
     228              : impl WalSendersShared {
     229          622 :     fn new() -> Self {
     230          622 :         WalSendersShared {
     231          622 :             agg_standby_feedback: StandbyFeedback::empty(),
     232          622 :             agg_ps_feedback: PageserverFeedback::empty(),
     233          622 :             slots: Vec::new(),
     234          622 :         }
     235          622 :     }
     236              : 
     237              :     /// Get content of provided id slot, it must exist.
     238         3356 :     fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
     239         3356 :         self.slots[id].as_ref().expect("walsender doesn't exist")
     240         3356 :     }
     241              : 
     242              :     /// Get mut content of provided id slot, it must exist.
     243       844604 :     fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
     244       844604 :         self.slots[id].as_mut().expect("walsender doesn't exist")
     245       844604 :     }
     246              : 
     247              :     /// Update aggregated hot standy feedback. We just take min of valid xmins
     248              :     /// and ts.
     249          438 :     fn update_reply_feedback(&mut self) {
     250          438 :         let mut agg = HotStandbyFeedback::empty();
     251          438 :         let mut reply_agg = StandbyReply::empty();
     252          438 :         for ws_state in self.slots.iter().flatten() {
     253          168 :             if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
     254           29 :                 let hs_feedback = standby_feedback.hs_feedback;
     255           29 :                 // doing Option math like op1.iter().chain(op2.iter()).min()
     256           29 :                 // would be nicer, but we serialize/deserialize this struct
     257           29 :                 // directly, so leave as is for now
     258           29 :                 if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
     259           25 :                     if agg.xmin != INVALID_FULL_TRANSACTION_ID {
     260            2 :                         agg.xmin = min(agg.xmin, hs_feedback.xmin);
     261           23 :                     } else {
     262           23 :                         agg.xmin = hs_feedback.xmin;
     263           23 :                     }
     264           25 :                     agg.ts = max(agg.ts, hs_feedback.ts);
     265            4 :                 }
     266           29 :                 if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     267            0 :                     if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     268            0 :                         agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
     269            0 :                     } else {
     270            0 :                         agg.catalog_xmin = hs_feedback.catalog_xmin;
     271            0 :                     }
     272            0 :                     agg.ts = max(agg.ts, hs_feedback.ts);
     273           29 :                 }
     274           29 :                 let reply = standby_feedback.reply;
     275           29 :                 if reply.write_lsn != Lsn::INVALID {
     276           21 :                     if reply_agg.write_lsn != Lsn::INVALID {
     277            0 :                         reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn);
     278           21 :                     } else {
     279           21 :                         reply_agg.write_lsn = reply.write_lsn;
     280           21 :                     }
     281            8 :                 }
     282           29 :                 if reply.flush_lsn != Lsn::INVALID {
     283           21 :                     if reply_agg.flush_lsn != Lsn::INVALID {
     284            0 :                         reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn);
     285           21 :                     } else {
     286           21 :                         reply_agg.flush_lsn = reply.flush_lsn;
     287           21 :                     }
     288            8 :                 }
     289           29 :                 if reply.apply_lsn != Lsn::INVALID {
     290           21 :                     if reply_agg.apply_lsn != Lsn::INVALID {
     291            0 :                         reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn);
     292           21 :                     } else {
     293           21 :                         reply_agg.apply_lsn = reply.apply_lsn;
     294           21 :                     }
     295            8 :                 }
     296           29 :                 if reply.reply_ts != 0 {
     297           21 :                     if reply_agg.reply_ts != 0 {
     298            0 :                         reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts);
     299           21 :                     } else {
     300           21 :                         reply_agg.reply_ts = reply.reply_ts;
     301           21 :                     }
     302            8 :                 }
     303          139 :             }
     304              :         }
     305          438 :         self.agg_standby_feedback = StandbyFeedback {
     306          438 :             reply: reply_agg,
     307          438 :             hs_feedback: agg,
     308          438 :         };
     309          438 :     }
     310              : 
     311              :     /// Update aggregated pageserver feedback. LSNs (last_received,
     312              :     /// disk_consistent, remote_consistent) and reply timestamp are just
     313              :     /// maximized; timeline_size if taken from feedback with highest
     314              :     /// last_received lsn. This is generally reasonable, but we might want to
     315              :     /// implement other policies once multiple pageservers start to be actively
     316              :     /// used.
     317       795426 :     fn update_ps_feedback(&mut self) {
     318       795426 :         let init = PageserverFeedback::empty();
     319       795426 :         let acc =
     320       795426 :             self.slots
     321       795426 :                 .iter()
     322       795426 :                 .flatten()
     323      1173112 :                 .fold(init, |mut acc, ws_state| match ws_state.feedback {
     324      1152277 :                     ReplicationFeedback::Pageserver(feedback) => {
     325      1152277 :                         if feedback.last_received_lsn > acc.last_received_lsn {
     326       858316 :                             acc.current_timeline_size = feedback.current_timeline_size;
     327       858316 :                         }
     328      1152277 :                         acc.last_received_lsn =
     329      1152277 :                             max(feedback.last_received_lsn, acc.last_received_lsn);
     330      1152277 :                         acc.disk_consistent_lsn =
     331      1152277 :                             max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn);
     332      1152277 :                         acc.remote_consistent_lsn =
     333      1152277 :                             max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn);
     334      1152277 :                         acc.replytime = max(feedback.replytime, acc.replytime);
     335      1152277 :                         acc
     336              :                     }
     337        20835 :                     ReplicationFeedback::Standby(_) => acc,
     338      1173112 :                 });
     339       795426 :         self.agg_ps_feedback = acc;
     340       795426 :     }
     341              : }
     342              : 
     343              : // Serialized is used only for pretty printing in json.
     344          137 : #[derive(Debug, Clone, Serialize, Deserialize)]
     345              : pub struct WalSenderState {
     346              :     ttid: TenantTimelineId,
     347              :     addr: SocketAddr,
     348              :     conn_id: ConnectionId,
     349              :     // postgres application_name
     350              :     appname: Option<String>,
     351              :     feedback: ReplicationFeedback,
     352              : }
     353              : 
     354              : // Receiver is either pageserver or regular standby, which have different
     355              : // feedbacks.
     356          137 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     357              : enum ReplicationFeedback {
     358              :     Pageserver(PageserverFeedback),
     359              :     Standby(StandbyFeedback),
     360              : }
     361              : 
     362              : // id of the occupied slot in WalSenders to access it (and save in the
     363              : // WalSenderGuard). We could give Arc directly to the slot, but there is not
     364              : // much sense in that as values aggregation which is performed on each feedback
     365              : // receival iterates over all walsenders.
     366              : pub type WalSenderId = usize;
     367              : 
     368              : /// Scope guard to access slot in WalSenders registry and unregister from it in
     369              : /// Drop.
     370              : pub struct WalSenderGuard {
     371              :     id: WalSenderId,
     372              :     walsenders: Arc<WalSenders>,
     373              : }
     374              : 
     375              : impl Drop for WalSenderGuard {
     376          413 :     fn drop(&mut self) {
     377          413 :         self.walsenders.unregister(self.id);
     378          413 :     }
     379              : }
     380              : 
     381              : impl SafekeeperPostgresHandler {
     382              :     /// Wrapper around handle_start_replication_guts handling result. Error is
     383              :     /// handled here while we're still in walsender ttid span; with API
     384              :     /// extension, this can probably be moved into postgres_backend.
     385          768 :     pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
     386          768 :         &mut self,
     387          768 :         pgb: &mut PostgresBackend<IO>,
     388          768 :         start_pos: Lsn,
     389          768 :         term: Option<Term>,
     390          768 :     ) -> Result<(), QueryError> {
     391          768 :         if let Err(end) = self
     392          768 :             .handle_start_replication_guts(pgb, start_pos, term)
     393      3213768 :             .await
     394              :         {
     395              :             // Log the result and probably send it to the client, closing the stream.
     396          413 :             pgb.handle_copy_stream_end(end).await;
     397            0 :         }
     398          413 :         Ok(())
     399          413 :     }
     400              : 
     401          768 :     pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
     402          768 :         &mut self,
     403          768 :         pgb: &mut PostgresBackend<IO>,
     404          768 :         start_pos: Lsn,
     405          768 :         term: Option<Term>,
     406          768 :     ) -> Result<(), CopyStreamHandlerEnd> {
     407          768 :         let appname = self.appname.clone();
     408          768 :         let tli =
     409          768 :             GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
     410              : 
     411              :         // Use a guard object to remove our entry from the timeline when we are done.
     412          768 :         let ws_guard = Arc::new(tli.get_walsenders().register(
     413          768 :             self.ttid,
     414          768 :             *pgb.get_peer_addr(),
     415          768 :             self.conn_id,
     416          768 :             self.appname.clone(),
     417          768 :         ));
     418              : 
     419              :         // Walsender can operate in one of two modes which we select by
     420              :         // application_name: give only committed WAL (used by pageserver) or all
     421              :         // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
     422              :         // The second case is always driven by a consensus leader which term
     423              :         // must be supplied.
     424          768 :         let end_watch = if term.is_some() {
     425           18 :             EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
     426              :         } else {
     427          750 :             EndWatch::Commit(tli.get_commit_lsn_watch_rx())
     428              :         };
     429              :         // we don't check term here; it will be checked on first waiting/WAL reading anyway.
     430          768 :         let end_pos = end_watch.get();
     431          768 : 
     432          768 :         if end_pos < start_pos {
     433           14 :             warn!(
     434           14 :                 "requested start_pos {} is ahead of available WAL end_pos {}",
     435           14 :                 start_pos, end_pos
     436           14 :             );
     437          754 :         }
     438              : 
     439          768 :         info!(
     440          768 :             "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
     441          768 :             start_pos,
     442          768 :             end_pos,
     443          768 :             matches!(end_watch, EndWatch::Flush(_)),
     444          768 :             appname
     445          768 :         );
     446              : 
     447              :         // switch to copy
     448          768 :         pgb.write_message(&BeMessage::CopyBothResponse).await?;
     449              : 
     450          768 :         let (_, persisted_state) = tli.get_state().await;
     451          768 :         let wal_reader = WalReader::new(
     452          768 :             self.conf.workdir.clone(),
     453          768 :             self.conf.timeline_dir(&tli.ttid),
     454          768 :             &persisted_state,
     455          768 :             start_pos,
     456          768 :             self.conf.is_wal_backup_enabled(),
     457          768 :         )?;
     458              : 
     459              :         // Split to concurrently receive and send data; replies are generally
     460              :         // not synchronized with sends, so this avoids deadlocks.
     461          768 :         let reader = pgb.split().context("START_REPLICATION split")?;
     462              : 
     463          768 :         let mut sender = WalSender {
     464          768 :             pgb,
     465          768 :             tli: tli.clone(),
     466          768 :             appname,
     467          768 :             start_pos,
     468          768 :             end_pos,
     469          768 :             term,
     470          768 :             end_watch,
     471          768 :             ws_guard: ws_guard.clone(),
     472          768 :             wal_reader,
     473          768 :             send_buf: [0; MAX_SEND_SIZE],
     474          768 :         };
     475          768 :         let mut reply_reader = ReplyReader {
     476          768 :             reader,
     477          768 :             ws_guard,
     478          768 :             tli,
     479          768 :         };
     480              : 
     481          768 :         let res = tokio::select! {
     482              :             // todo: add read|write .context to these errors
     483           64 :             r = sender.run() => r,
     484          349 :             r = reply_reader.run() => r,
     485              :         };
     486              :         // Join pg backend back.
     487          413 :         pgb.unsplit(reply_reader.reader)?;
     488              : 
     489          413 :         res
     490          413 :     }
     491              : }
     492              : 
     493              : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
     494              : /// given term (recovery by walproposer or peer safekeeper).
     495              : enum EndWatch {
     496              :     Commit(Receiver<Lsn>),
     497              :     Flush(Receiver<TermLsn>),
     498              : }
     499              : 
     500              : impl EndWatch {
     501              :     /// Get current end of WAL.
     502      3395561 :     fn get(&self) -> Lsn {
     503      3395561 :         match self {
     504      3393410 :             EndWatch::Commit(r) => *r.borrow(),
     505         2151 :             EndWatch::Flush(r) => r.borrow().lsn,
     506              :         }
     507      3395561 :     }
     508              : 
     509              :     /// Wait for the update.
     510      1856158 :     async fn changed(&mut self) -> anyhow::Result<()> {
     511      1856158 :         match self {
     512      2380874 :             EndWatch::Commit(r) => r.changed().await?,
     513           32 :             EndWatch::Flush(r) => r.changed().await?,
     514              :         }
     515      1852111 :         Ok(())
     516      1852111 :     }
     517              : }
     518              : 
     519              : /// A half driving sending WAL.
     520              : struct WalSender<'a, IO> {
     521              :     pgb: &'a mut PostgresBackend<IO>,
     522              :     tli: Arc<Timeline>,
     523              :     appname: Option<String>,
     524              :     // Position since which we are sending next chunk.
     525              :     start_pos: Lsn,
     526              :     // WAL up to this position is known to be locally available.
     527              :     // Usually this is the same as the latest commit_lsn, but in case of
     528              :     // walproposer recovery, this is flush_lsn.
     529              :     //
     530              :     // We send this LSN to the receiver as wal_end, so that it knows how much
     531              :     // WAL this safekeeper has. This LSN should be as fresh as possible.
     532              :     end_pos: Lsn,
     533              :     /// When streaming uncommitted part, the term the client acts as the leader
     534              :     /// in. Streaming is stopped if local term changes to a different (higher)
     535              :     /// value.
     536              :     term: Option<Term>,
     537              :     /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
     538              :     end_watch: EndWatch,
     539              :     ws_guard: Arc<WalSenderGuard>,
     540              :     wal_reader: WalReader,
     541              :     // buffer for readling WAL into to send it
     542              :     send_buf: [u8; MAX_SEND_SIZE],
     543              : }
     544              : 
     545              : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
     546              :     /// Send WAL until
     547              :     /// - an error occurs
     548              :     /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
     549              :     ///
     550              :     /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
     551              :     /// convenience.
     552          768 :     async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     553              :         loop {
     554              :             // Wait for the next portion if it is not there yet, or just
     555              :             // update our end of WAL available for sending value, we
     556              :             // communicate it to the receiver.
     557      2380947 :             self.wait_wal().await?;
     558              :             assert!(
     559       818193 :                 self.end_pos > self.start_pos,
     560            0 :                 "nothing to send after waiting for WAL"
     561              :             );
     562              : 
     563              :             // try to send as much as available, capped by MAX_SEND_SIZE
     564       818193 :             let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
     565       818193 :             // if we went behind available WAL, back off
     566       818193 :             if chunk_end_pos >= self.end_pos {
     567       736354 :                 chunk_end_pos = self.end_pos;
     568       736354 :             } else {
     569        81839 :                 // If sending not up to end pos, round down to page boundary to
     570        81839 :                 // avoid breaking WAL record not at page boundary, as protocol
     571        81839 :                 // demands. See walsender.c (XLogSendPhysical).
     572        81839 :                 chunk_end_pos = chunk_end_pos
     573        81839 :                     .checked_sub(chunk_end_pos.block_offset())
     574        81839 :                     .unwrap();
     575        81839 :             }
     576       818193 :             let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
     577       818193 :             let send_buf = &mut self.send_buf[..send_size];
     578              :             let send_size: usize;
     579              :             {
     580              :                 // If uncommitted part is being pulled, check that the term is
     581              :                 // still the expected one.
     582       818193 :                 let _term_guard = if let Some(t) = self.term {
     583         2085 :                     Some(self.tli.acquire_term(t).await?)
     584              :                 } else {
     585       816108 :                     None
     586              :                 };
     587              :                 // Read WAL into buffer. send_size can be additionally capped to
     588              :                 // segment boundary here.
     589       818192 :                 send_size = self.wal_reader.read(send_buf).await?
     590              :             };
     591       818191 :             let send_buf = &send_buf[..send_size];
     592       818191 : 
     593       818191 :             // and send it
     594       818191 :             self.pgb
     595       818191 :                 .write_message(&BeMessage::XLogData(XLogDataBody {
     596       818191 :                     wal_start: self.start_pos.0,
     597       818191 :                     wal_end: self.end_pos.0,
     598       818191 :                     timestamp: get_current_timestamp(),
     599       818191 :                     data: send_buf,
     600       818191 :                 }))
     601        23382 :                 .await?;
     602              : 
     603       818170 :             if let Some(appname) = &self.appname {
     604       816226 :                 if appname == "replica" {
     605            0 :                     failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
     606       795175 :                 }
     607         1944 :             }
     608            0 :             trace!(
     609            0 :                 "sent {} bytes of WAL {}-{}",
     610            0 :                 send_size,
     611            0 :                 self.start_pos,
     612            0 :                 self.start_pos + send_size as u64
     613            0 :             );
     614       818170 :             self.start_pos += send_size as u64;
     615              :         }
     616           64 :     }
     617              : 
     618              :     /// wait until we have WAL to stream, sending keepalives and checking for
     619              :     /// exit in the meanwhile
     620       818938 :     async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     621              :         loop {
     622       822240 :             self.end_pos = self.end_watch.get();
     623       822240 :             if self.end_pos > self.start_pos {
     624              :                 // We have something to send.
     625            0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     626       101798 :                 return Ok(());
     627       720442 :             }
     628              : 
     629              :             // Wait for WAL to appear, now self.end_pos == self.start_pos.
     630      2380903 :             if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
     631       716395 :                 self.end_pos = lsn;
     632            0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     633       716395 :                 return Ok(());
     634         3356 :             }
     635         3356 : 
     636         3356 :             // Timed out waiting for WAL, check for termination and send KA.
     637         3356 :             // Check for termination only if we are streaming up to commit_lsn
     638         3356 :             // (to pageserver).
     639         3356 :             if let EndWatch::Commit(_) = self.end_watch {
     640         3356 :                 if let Some(remote_consistent_lsn) = self
     641         3356 :                     .ws_guard
     642         3356 :                     .walsenders
     643         3356 :                     .get_ws_remote_consistent_lsn(self.ws_guard.id)
     644              :                 {
     645         3203 :                     if self.tli.should_walsender_stop(remote_consistent_lsn).await {
     646              :                         // Terminate if there is nothing more to send.
     647              :                         // Note that "ending streaming" part of the string is used by
     648              :                         // pageserver to identify WalReceiverError::SuccessfulCompletion,
     649              :                         // do not change this string without updating pageserver.
     650           54 :                         return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
     651           54 :                         "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
     652           54 :                         self.appname, self.start_pos,
     653           54 :                     )));
     654         3149 :                     }
     655          153 :                 }
     656            0 :             }
     657              : 
     658         3302 :             self.pgb
     659         3302 :                 .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
     660         3302 :                     wal_end: self.end_pos.0,
     661         3302 :                     timestamp: get_current_timestamp(),
     662         3302 :                     request_reply: true,
     663         3302 :                 }))
     664            0 :                 .await?;
     665              :         }
     666       818247 :     }
     667              : }
     668              : 
     669              : /// A half driving receiving replies.
     670              : struct ReplyReader<IO> {
     671              :     reader: PostgresBackendReader<IO>,
     672              :     ws_guard: Arc<WalSenderGuard>,
     673              :     tli: Arc<Timeline>,
     674              : }
     675              : 
     676              : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
     677          768 :     async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     678              :         loop {
     679      3081881 :             let msg = self.reader.read_copy_message().await?;
     680       844604 :             self.handle_feedback(&msg).await?
     681              :         }
     682          349 :     }
     683              : 
     684       844604 :     async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
     685       844604 :         match msg.first().cloned() {
     686           21 :             Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
     687           21 :                 // Note: deserializing is on m[1..] because we skip the tag byte.
     688           21 :                 let mut hs_feedback = HotStandbyFeedback::des(&msg[1..])
     689           21 :                     .context("failed to deserialize HotStandbyFeedback")?;
     690              :                 // TODO: xmin/catalog_xmin are serialized by walreceiver.c in this way:
     691              :                 // pq_sendint32(&reply_message, xmin);
     692              :                 // pq_sendint32(&reply_message, xmin_epoch);
     693              :                 // So it is two big endian 32-bit words in low endian order!
     694           21 :                 hs_feedback.xmin = (hs_feedback.xmin >> 32) | (hs_feedback.xmin << 32);
     695           21 :                 hs_feedback.catalog_xmin =
     696           21 :                     (hs_feedback.catalog_xmin >> 32) | (hs_feedback.catalog_xmin << 32);
     697           21 :                 self.ws_guard
     698           21 :                     .walsenders
     699           21 :                     .record_hs_feedback(self.ws_guard.id, &hs_feedback);
     700              :             }
     701        49159 :             Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
     702        49159 :                 let reply =
     703        49159 :                     StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
     704        49159 :                 self.ws_guard
     705        49159 :                     .walsenders
     706        49159 :                     .record_standby_reply(self.ws_guard.id, &reply);
     707              :             }
     708              :             Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
     709              :                 // pageserver sends this.
     710              :                 // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
     711       795424 :                 let buf = Bytes::copy_from_slice(&msg[9..]);
     712       795424 :                 let ps_feedback = PageserverFeedback::parse(buf);
     713              : 
     714            0 :                 trace!("PageserverFeedback is {:?}", ps_feedback);
     715       795424 :                 self.ws_guard
     716       795424 :                     .walsenders
     717       795424 :                     .record_ps_feedback(self.ws_guard.id, &ps_feedback);
     718       795424 :                 self.tli
     719       795424 :                     .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
     720       131789 :                     .await;
     721              :                 // in principle new remote_consistent_lsn could allow to
     722              :                 // deactivate the timeline, but we check that regularly through
     723              :                 // broker updated, not need to do it here
     724              :             }
     725            0 :             _ => warn!("unexpected message {:?}", msg),
     726              :         }
     727       844603 :         Ok(())
     728       844603 :     }
     729              : }
     730              : 
     731              : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
     732              : 
     733              : /// Wait until we have available WAL > start_pos or timeout expires. Returns
     734              : /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
     735              : /// - Ok(None) if timeout expired;
     736              : /// - Err in case of error -- only if 1) term changed while fetching in recovery
     737              : ///   mode 2) watch channel closed, which must never happen.
     738       720442 : async fn wait_for_lsn(
     739       720442 :     rx: &mut EndWatch,
     740       720442 :     client_term: Option<Term>,
     741       720442 :     start_pos: Lsn,
     742       720442 : ) -> anyhow::Result<Option<Lsn>> {
     743       720442 :     let res = timeout(POLL_STATE_TIMEOUT, async move {
     744              :         loop {
     745      2572553 :             let end_pos = rx.get();
     746      2572553 :             if end_pos > start_pos {
     747       716395 :                 return Ok(end_pos);
     748      1856158 :             }
     749      1856158 :             if let EndWatch::Flush(rx) = rx {
     750           32 :                 let curr_term = rx.borrow().term;
     751           32 :                 if let Some(client_term) = client_term {
     752           32 :                     if curr_term != client_term {
     753            0 :                         bail!("term changed: requested {}, now {}", client_term, curr_term);
     754           32 :                     }
     755            0 :                 }
     756      1856126 :             }
     757      2380903 :             rx.changed().await?;
     758              :         }
     759       720442 :     })
     760      2380903 :     .await;
     761              : 
     762       716395 :     match res {
     763              :         // success
     764       716395 :         Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
     765              :         // error inside closure
     766            0 :         Ok(Err(err)) => Err(err),
     767              :         // timeout
     768         3356 :         Err(_) => Ok(None),
     769              :     }
     770       719751 : }
     771              : 
     772              : #[cfg(test)]
     773              : mod tests {
     774              :     use postgres_protocol::PG_EPOCH;
     775              :     use utils::id::{TenantId, TimelineId};
     776              : 
     777              :     use super::*;
     778              : 
     779           12 :     fn mock_ttid() -> TenantTimelineId {
     780           12 :         TenantTimelineId {
     781           12 :             tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
     782           12 :             timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
     783           12 :         }
     784           12 :     }
     785              : 
     786           12 :     fn mock_addr() -> SocketAddr {
     787           12 :         "127.0.0.1:8080".parse().unwrap()
     788           12 :     }
     789              : 
     790              :     // add to wss specified feedback setting other fields to dummy values
     791           12 :     fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
     792           12 :         let walsender_state = WalSenderState {
     793           12 :             ttid: mock_ttid(),
     794           12 :             addr: mock_addr(),
     795           12 :             conn_id: 1,
     796           12 :             appname: None,
     797           12 :             feedback,
     798           12 :         };
     799           12 :         wss.slots.push(Some(walsender_state))
     800           12 :     }
     801              : 
     802              :     // form standby feedback with given hot standby feedback ts/xmin and the
     803              :     // rest set to dummy values.
     804            8 :     fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
     805            8 :         ReplicationFeedback::Standby(StandbyFeedback {
     806            8 :             reply: StandbyReply::empty(),
     807            8 :             hs_feedback: HotStandbyFeedback {
     808            8 :                 ts,
     809            8 :                 xmin,
     810            8 :                 catalog_xmin: 0,
     811            8 :             },
     812            8 :         })
     813            8 :     }
     814              : 
     815              :     // test that hs aggregation works as expected
     816            2 :     #[test]
     817            2 :     fn test_hs_feedback_no_valid() {
     818            2 :         let mut wss = WalSendersShared::new();
     819            2 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
     820            2 :         wss.update_reply_feedback();
     821            2 :         assert_eq!(
     822            2 :             wss.agg_standby_feedback.hs_feedback.xmin,
     823            2 :             INVALID_FULL_TRANSACTION_ID
     824            2 :         );
     825            2 :     }
     826              : 
     827            2 :     #[test]
     828            2 :     fn test_hs_feedback() {
     829            2 :         let mut wss = WalSendersShared::new();
     830            2 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
     831            2 :         push_feedback(&mut wss, hs_feedback(1, 42));
     832            2 :         push_feedback(&mut wss, hs_feedback(1, 64));
     833            2 :         wss.update_reply_feedback();
     834            2 :         assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42);
     835            2 :     }
     836              : 
     837              :     // form pageserver feedback with given last_record_lsn / tli size and the
     838              :     // rest set to dummy values.
     839            4 :     fn ps_feedback(current_timeline_size: u64, last_received_lsn: Lsn) -> ReplicationFeedback {
     840            4 :         ReplicationFeedback::Pageserver(PageserverFeedback {
     841            4 :             current_timeline_size,
     842            4 :             last_received_lsn,
     843            4 :             disk_consistent_lsn: Lsn::INVALID,
     844            4 :             remote_consistent_lsn: Lsn::INVALID,
     845            4 :             replytime: *PG_EPOCH,
     846            4 :         })
     847            4 :     }
     848              : 
     849              :     // test that ps aggregation works as expected
     850            2 :     #[test]
     851            2 :     fn test_ps_feedback() {
     852            2 :         let mut wss = WalSendersShared::new();
     853            2 :         push_feedback(&mut wss, ps_feedback(8, Lsn(42)));
     854            2 :         push_feedback(&mut wss, ps_feedback(4, Lsn(84)));
     855            2 :         wss.update_ps_feedback();
     856            2 :         assert_eq!(wss.agg_ps_feedback.current_timeline_size, 4);
     857            2 :         assert_eq!(wss.agg_ps_feedback.last_received_lsn, Lsn(84));
     858            2 :     }
     859              : }
        

Generated by: LCOV version 2.1-beta