LCOV - code coverage report
Current view: top level - safekeeper/src - send_wal.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 93.3 % 449 419
Test Date: 2023-09-06 10:18:01 Functions: 46.8 % 154 72

            Line data    Source code
       1              : //! This module implements the streaming side of replication protocol, starting
       2              : //! with the "START_REPLICATION" message, and registry of walsenders.
       3              : 
       4              : use crate::handler::SafekeeperPostgresHandler;
       5              : use crate::safekeeper::{Term, TermLsn};
       6              : use crate::timeline::Timeline;
       7              : use crate::wal_service::ConnectionId;
       8              : use crate::wal_storage::WalReader;
       9              : use crate::GlobalTimelines;
      10              : use anyhow::{bail, Context as AnyhowContext};
      11              : use bytes::Bytes;
      12              : use parking_lot::Mutex;
      13              : use postgres_backend::PostgresBackend;
      14              : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
      15              : use postgres_ffi::get_current_timestamp;
      16              : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
      17              : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
      18              : use serde::{Deserialize, Serialize};
      19              : use serde_with::{serde_as, DisplayFromStr};
      20              : use tokio::io::{AsyncRead, AsyncWrite};
      21              : use utils::id::TenantTimelineId;
      22              : use utils::lsn::AtomicLsn;
      23              : use utils::pageserver_feedback::PageserverFeedback;
      24              : 
      25              : use std::cmp::{max, min};
      26              : use std::net::SocketAddr;
      27              : use std::str;
      28              : use std::sync::Arc;
      29              : use std::time::Duration;
      30              : use tokio::sync::watch::Receiver;
      31              : use tokio::time::timeout;
      32              : use tracing::*;
      33              : use utils::{bin_ser::BeSer, lsn::Lsn};
      34              : 
      35              : // See: https://www.postgresql.org/docs/13/protocol-replication.html
      36              : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
      37              : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
      38              : // neon extension of replication protocol
      39              : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
      40              : 
      41              : type FullTransactionId = u64;
      42              : 
      43              : /// Hot standby feedback received from replica
      44            3 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      45              : pub struct HotStandbyFeedback {
      46              :     pub ts: TimestampTz,
      47              :     pub xmin: FullTransactionId,
      48              :     pub catalog_xmin: FullTransactionId,
      49              : }
      50              : 
      51              : const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
      52              : 
      53              : impl HotStandbyFeedback {
      54      2201429 :     pub fn empty() -> HotStandbyFeedback {
      55      2201429 :         HotStandbyFeedback {
      56      2201429 :             ts: 0,
      57      2201429 :             xmin: 0,
      58      2201429 :             catalog_xmin: 0,
      59      2201429 :         }
      60      2201429 :     }
      61              : }
      62              : 
      63              : /// Standby status update
      64           12 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      65              : pub struct StandbyReply {
      66              :     pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
      67              :     pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
      68              :     pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
      69              :     pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
      70              :     pub reply_requested: bool,
      71              : }
      72              : 
      73              : impl StandbyReply {
      74            4 :     fn empty() -> Self {
      75            4 :         StandbyReply {
      76            4 :             write_lsn: Lsn::INVALID,
      77            4 :             flush_lsn: Lsn::INVALID,
      78            4 :             apply_lsn: Lsn::INVALID,
      79            4 :             reply_ts: 0,
      80            4 :             reply_requested: false,
      81            4 :         }
      82            4 :     }
      83              : }
      84              : 
      85            0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
      86              : pub struct StandbyFeedback {
      87              :     reply: StandbyReply,
      88              :     hs_feedback: HotStandbyFeedback,
      89              : }
      90              : 
      91              : /// WalSenders registry. Timeline holds it (wrapped in Arc).
      92              : pub struct WalSenders {
      93              :     /// Lsn maximized over all walsenders *and* peer data, so might be higher
      94              :     /// than what we receive from replicas.
      95              :     remote_consistent_lsn: AtomicLsn,
      96              :     mutex: Mutex<WalSendersShared>,
      97              : }
      98              : 
      99              : impl WalSenders {
     100          604 :     pub fn new(remote_consistent_lsn: Lsn) -> Arc<WalSenders> {
     101          604 :         Arc::new(WalSenders {
     102          604 :             remote_consistent_lsn: AtomicLsn::from(remote_consistent_lsn),
     103          604 :             mutex: Mutex::new(WalSendersShared::new()),
     104          604 :         })
     105          604 :     }
     106              : 
     107              :     /// Register new walsender. Returned guard provides access to the slot and
     108              :     /// automatically deregisters in Drop.
     109          830 :     fn register(
     110          830 :         self: &Arc<WalSenders>,
     111          830 :         ttid: TenantTimelineId,
     112          830 :         addr: SocketAddr,
     113          830 :         conn_id: ConnectionId,
     114          830 :         appname: Option<String>,
     115          830 :     ) -> WalSenderGuard {
     116          830 :         let slots = &mut self.mutex.lock().slots;
     117          830 :         let walsender_state = WalSenderState {
     118          830 :             ttid,
     119          830 :             addr,
     120          830 :             conn_id,
     121          830 :             appname,
     122          830 :             feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
     123          830 :         };
     124              :         // find empty slot or create new one
     125          830 :         let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
     126          279 :             slots[pos] = Some(walsender_state);
     127          279 :             pos
     128              :         } else {
     129          551 :             let pos = slots.len();
     130          551 :             slots.push(Some(walsender_state));
     131          551 :             pos
     132              :         };
     133          830 :         WalSenderGuard {
     134          830 :             id: pos,
     135          830 :             walsenders: self.clone(),
     136          830 :         }
     137          830 :     }
     138              : 
     139              :     /// Get state of all walsenders.
     140          204 :     pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
     141          204 :         self.mutex.lock().slots.iter().flatten().cloned().collect()
     142          204 :     }
     143              : 
     144              :     /// Get aggregated pageserver feedback.
     145           51 :     pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
     146           51 :         self.mutex.lock().agg_ps_feedback
     147           51 :     }
     148              : 
     149              :     /// Get aggregated pageserver and hot standby feedback (we send them to compute).
     150      2200283 :     pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, HotStandbyFeedback) {
     151      2200283 :         let shared = self.mutex.lock();
     152      2200283 :         (shared.agg_ps_feedback, shared.agg_hs_feedback)
     153      2200283 :     }
     154              : 
     155              :     /// Record new pageserver feedback, update aggregated values.
     156       736446 :     fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
     157       736446 :         let mut shared = self.mutex.lock();
     158       736446 :         shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
     159       736446 :         shared.update_ps_feedback();
     160       736446 :         self.update_remote_consistent_lsn(shared.agg_ps_feedback.remote_consistent_lsn);
     161       736446 :     }
     162              : 
     163              :     /// Record standby reply.
     164           12 :     fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
     165           12 :         let mut shared = self.mutex.lock();
     166           12 :         let slot = shared.get_slot_mut(id);
     167           12 :         match &mut slot.feedback {
     168           11 :             ReplicationFeedback::Standby(sf) => sf.reply = *reply,
     169              :             ReplicationFeedback::Pageserver(_) => {
     170            1 :                 slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
     171            1 :                     reply: *reply,
     172            1 :                     hs_feedback: HotStandbyFeedback::empty(),
     173            1 :                 })
     174              :             }
     175              :         }
     176           12 :     }
     177              : 
     178              :     /// Record hot standby feedback, update aggregated value.
     179            1 :     fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
     180            1 :         let mut shared = self.mutex.lock();
     181            1 :         let slot = shared.get_slot_mut(id);
     182            1 :         match &mut slot.feedback {
     183            1 :             ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
     184              :             ReplicationFeedback::Pageserver(_) => {
     185            0 :                 slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
     186            0 :                     reply: StandbyReply::empty(),
     187            0 :                     hs_feedback: *feedback,
     188            0 :                 })
     189              :             }
     190              :         }
     191            1 :         shared.update_hs_feedback();
     192            1 :     }
     193              : 
     194              :     /// Get remote_consistent_lsn reported by the pageserver. Returns None if
     195              :     /// client is not pageserver.
     196         1941 :     fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
     197         1941 :         let shared = self.mutex.lock();
     198         1941 :         let slot = shared.get_slot(id);
     199         1941 :         match slot.feedback {
     200         1941 :             ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
     201            0 :             _ => None,
     202              :         }
     203         1941 :     }
     204              : 
     205              :     /// Get remote_consistent_lsn maximized across all walsenders and peers.
     206        21682 :     pub fn get_remote_consistent_lsn(self: &Arc<WalSenders>) -> Lsn {
     207        21682 :         self.remote_consistent_lsn.load()
     208        21682 :     }
     209              : 
     210              :     /// Update maximized remote_consistent_lsn, return new (potentially) value.
     211       745523 :     pub fn update_remote_consistent_lsn(self: &Arc<WalSenders>, candidate: Lsn) -> Lsn {
     212       745523 :         self.remote_consistent_lsn
     213       745523 :             .fetch_max(candidate)
     214       745523 :             .max(candidate)
     215       745523 :     }
     216              : 
     217              :     /// Unregister walsender.
     218          533 :     fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
     219          533 :         let mut shared = self.mutex.lock();
     220          533 :         shared.slots[id] = None;
     221          533 :         shared.update_hs_feedback();
     222          533 :     }
     223              : }
     224              : 
     225              : struct WalSendersShared {
     226              :     // aggregated over all walsenders value
     227              :     agg_hs_feedback: HotStandbyFeedback,
     228              :     // aggregated over all walsenders value
     229              :     agg_ps_feedback: PageserverFeedback,
     230              :     slots: Vec<Option<WalSenderState>>,
     231              : }
     232              : 
     233              : impl WalSendersShared {
     234          607 :     fn new() -> Self {
     235          607 :         WalSendersShared {
     236          607 :             agg_hs_feedback: HotStandbyFeedback::empty(),
     237          607 :             agg_ps_feedback: PageserverFeedback::empty(),
     238          607 :             slots: Vec::new(),
     239          607 :         }
     240          607 :     }
     241              : 
     242              :     /// Get content of provided id slot, it must exist.
     243         1941 :     fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
     244         1941 :         self.slots[id].as_ref().expect("walsender doesn't exist")
     245         1941 :     }
     246              : 
     247              :     /// Get mut content of provided id slot, it must exist.
     248       736459 :     fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
     249       736459 :         self.slots[id].as_mut().expect("walsender doesn't exist")
     250       736459 :     }
     251              : 
     252              :     /// Update aggregated hot standy feedback. We just take min of valid xmins
     253              :     /// and ts.
     254          536 :     fn update_hs_feedback(&mut self) {
     255          536 :         let mut agg = HotStandbyFeedback::empty();
     256          536 :         for ws_state in self.slots.iter().flatten() {
     257           61 :             if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
     258            5 :                 let hs_feedback = standby_feedback.hs_feedback;
     259            5 :                 // doing Option math like op1.iter().chain(op2.iter()).min()
     260            5 :                 // would be nicer, but we serialize/deserialize this struct
     261            5 :                 // directly, so leave as is for now
     262            5 :                 if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
     263            2 :                     if agg.xmin != INVALID_FULL_TRANSACTION_ID {
     264            1 :                         agg.xmin = min(agg.xmin, hs_feedback.xmin);
     265            1 :                     } else {
     266            1 :                         agg.xmin = hs_feedback.xmin;
     267            1 :                     }
     268            2 :                     agg.ts = min(agg.ts, hs_feedback.ts);
     269            3 :                 }
     270            5 :                 if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     271            0 :                     if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
     272            0 :                         agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
     273            0 :                     } else {
     274            0 :                         agg.catalog_xmin = hs_feedback.catalog_xmin;
     275            0 :                     }
     276            0 :                     agg.ts = min(agg.ts, hs_feedback.ts);
     277            5 :                 }
     278           56 :             }
     279              :         }
     280          536 :         self.agg_hs_feedback = agg;
     281          536 :     }
     282              : 
     283              :     /// Update aggregated pageserver feedback. LSNs (last_received,
     284              :     /// disk_consistent, remote_consistent) and reply timestamp are just
     285              :     /// maximized; timeline_size if taken from feedback with highest
     286              :     /// last_received lsn. This is generally reasonable, but we might want to
     287              :     /// implement other policies once multiple pageservers start to be actively
     288              :     /// used.
     289       736447 :     fn update_ps_feedback(&mut self) {
     290       736447 :         let init = PageserverFeedback::empty();
     291       736447 :         let acc =
     292       736447 :             self.slots
     293       736447 :                 .iter()
     294       736447 :                 .flatten()
     295       741121 :                 .fold(init, |mut acc, ws_state| match ws_state.feedback {
     296       741117 :                     ReplicationFeedback::Pageserver(feedback) => {
     297       741117 :                         if feedback.last_received_lsn > acc.last_received_lsn {
     298       737649 :                             acc.current_timeline_size = feedback.current_timeline_size;
     299       737649 :                         }
     300       741117 :                         acc.last_received_lsn =
     301       741117 :                             max(feedback.last_received_lsn, acc.last_received_lsn);
     302       741117 :                         acc.disk_consistent_lsn =
     303       741117 :                             max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn);
     304       741117 :                         acc.remote_consistent_lsn =
     305       741117 :                             max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn);
     306       741117 :                         acc.replytime = max(feedback.replytime, acc.replytime);
     307       741117 :                         acc
     308              :                     }
     309            4 :                     ReplicationFeedback::Standby(_) => acc,
     310       741121 :                 });
     311       736447 :         self.agg_ps_feedback = acc;
     312       736447 :     }
     313              : }
     314              : 
     315              : // Serialized is used only for pretty printing in json.
     316              : #[serde_as]
     317           86 : #[derive(Debug, Clone, Serialize, Deserialize)]
     318              : pub struct WalSenderState {
     319              :     #[serde_as(as = "DisplayFromStr")]
     320              :     ttid: TenantTimelineId,
     321              :     addr: SocketAddr,
     322              :     conn_id: ConnectionId,
     323              :     // postgres application_name
     324              :     appname: Option<String>,
     325              :     feedback: ReplicationFeedback,
     326              : }
     327              : 
     328              : // Receiver is either pageserver or regular standby, which have different
     329              : // feedbacks.
     330           86 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
     331              : enum ReplicationFeedback {
     332              :     Pageserver(PageserverFeedback),
     333              :     Standby(StandbyFeedback),
     334              : }
     335              : 
     336              : // id of the occupied slot in WalSenders to access it (and save in the
     337              : // WalSenderGuard). We could give Arc directly to the slot, but there is not
     338              : // much sense in that as values aggregation which is performed on each feedback
     339              : // receival iterates over all walsenders.
     340              : pub type WalSenderId = usize;
     341              : 
     342              : /// Scope guard to access slot in WalSenders registry and unregister from it in
     343              : /// Drop.
     344              : pub struct WalSenderGuard {
     345              :     id: WalSenderId,
     346              :     walsenders: Arc<WalSenders>,
     347              : }
     348              : 
     349              : impl Drop for WalSenderGuard {
     350          533 :     fn drop(&mut self) {
     351          533 :         self.walsenders.unregister(self.id);
     352          533 :     }
     353              : }
     354              : 
     355              : impl SafekeeperPostgresHandler {
     356              :     /// Wrapper around handle_start_replication_guts handling result. Error is
     357              :     /// handled here while we're still in walsender ttid span; with API
     358              :     /// extension, this can probably be moved into postgres_backend.
     359          830 :     pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
     360          830 :         &mut self,
     361          830 :         pgb: &mut PostgresBackend<IO>,
     362          830 :         start_pos: Lsn,
     363          830 :         term: Option<Term>,
     364          830 :     ) -> Result<(), QueryError> {
     365          830 :         if let Err(end) = self
     366          830 :             .handle_start_replication_guts(pgb, start_pos, term)
     367      2769161 :             .await
     368              :         {
     369              :             // Log the result and probably send it to the client, closing the stream.
     370          533 :             pgb.handle_copy_stream_end(end).await;
     371            0 :         }
     372          533 :         Ok(())
     373          533 :     }
     374              : 
     375          830 :     pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
     376          830 :         &mut self,
     377          830 :         pgb: &mut PostgresBackend<IO>,
     378          830 :         start_pos: Lsn,
     379          830 :         term: Option<Term>,
     380          830 :     ) -> Result<(), CopyStreamHandlerEnd> {
     381          830 :         let appname = self.appname.clone();
     382          830 :         let tli =
     383          830 :             GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
     384              : 
     385              :         // Use a guard object to remove our entry from the timeline when we are done.
     386          830 :         let ws_guard = Arc::new(tli.get_walsenders().register(
     387          830 :             self.ttid,
     388          830 :             *pgb.get_peer_addr(),
     389          830 :             self.conn_id,
     390          830 :             self.appname.clone(),
     391          830 :         ));
     392              : 
     393              :         // Walsender can operate in one of two modes which we select by
     394              :         // application_name: give only committed WAL (used by pageserver) or all
     395              :         // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
     396              :         // The second case is always driven by a consensus leader which term
     397              :         // must generally be also supplied. However we're sloppy to do this in
     398              :         // walproposer recovery which will be removed soon. So TODO is to make
     399              :         // it not Option'al then.
     400              :         //
     401              :         // Fetching WAL without term in recovery creates a small risk of this
     402              :         // WAL getting concurrently garbaged if another compute rises which
     403              :         // collects majority and starts fixing log on this safekeeper itself.
     404              :         // That's ok as (old) proposer will never be able to commit such WAL.
     405          830 :         let end_watch = if self.is_walproposer_recovery() {
     406           64 :             EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
     407              :         } else {
     408          766 :             EndWatch::Commit(tli.get_commit_lsn_watch_rx())
     409              :         };
     410              :         // we don't check term here; it will be checked on first waiting/WAL reading anyway.
     411          830 :         let end_pos = end_watch.get();
     412          830 : 
     413          830 :         if end_pos < start_pos {
     414           16 :             warn!(
     415           16 :                 "requested start_pos {} is ahead of available WAL end_pos {}",
     416           16 :                 start_pos, end_pos
     417           16 :             );
     418          814 :         }
     419              : 
     420          830 :         info!(
     421          830 :             "starting streaming from {:?}, available WAL ends at {}, recovery={}",
     422          830 :             start_pos,
     423          830 :             end_pos,
     424          830 :             matches!(end_watch, EndWatch::Flush(_))
     425          830 :         );
     426              : 
     427              :         // switch to copy
     428          830 :         pgb.write_message(&BeMessage::CopyBothResponse).await?;
     429              : 
     430          830 :         let (_, persisted_state) = tli.get_state().await;
     431          830 :         let wal_reader = WalReader::new(
     432          830 :             self.conf.workdir.clone(),
     433          830 :             self.conf.timeline_dir(&tli.ttid),
     434          830 :             &persisted_state,
     435          830 :             start_pos,
     436          830 :             self.conf.wal_backup_enabled,
     437          830 :         )?;
     438              : 
     439              :         // Split to concurrently receive and send data; replies are generally
     440              :         // not synchronized with sends, so this avoids deadlocks.
     441          830 :         let reader = pgb.split().context("START_REPLICATION split")?;
     442              : 
     443          830 :         let mut sender = WalSender {
     444          830 :             pgb,
     445          830 :             tli: tli.clone(),
     446          830 :             appname,
     447          830 :             start_pos,
     448          830 :             end_pos,
     449          830 :             term,
     450          830 :             end_watch,
     451          830 :             ws_guard: ws_guard.clone(),
     452          830 :             wal_reader,
     453          830 :             send_buf: [0; MAX_SEND_SIZE],
     454          830 :         };
     455          830 :         let mut reply_reader = ReplyReader { reader, ws_guard };
     456              : 
     457          830 :         let res = tokio::select! {
     458              :             // todo: add read|write .context to these errors
     459          140 :             r = sender.run() => r,
     460          393 :             r = reply_reader.run() => r,
     461              :         };
     462              :         // Join pg backend back.
     463          533 :         pgb.unsplit(reply_reader.reader)?;
     464              : 
     465          533 :         res
     466          533 :     }
     467              : }
     468              : 
     469              : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
     470              : /// given term (recovery by walproposer or peer safekeeper).
     471              : enum EndWatch {
     472              :     Commit(Receiver<Lsn>),
     473              :     Flush(Receiver<TermLsn>),
     474              : }
     475              : 
     476              : impl EndWatch {
     477              :     /// Get current end of WAL.
     478      2813205 :     fn get(&self) -> Lsn {
     479      2813205 :         match self {
     480      2809241 :             EndWatch::Commit(r) => *r.borrow(),
     481         3964 :             EndWatch::Flush(r) => r.borrow().lsn,
     482              :         }
     483      2813205 :     }
     484              : 
     485              :     /// Wait for the update.
     486      1427721 :     async fn changed(&mut self) -> anyhow::Result<()> {
     487      1427721 :         match self {
     488      2007177 :             EndWatch::Commit(r) => r.changed().await?,
     489          128 :             EndWatch::Flush(r) => r.changed().await?,
     490              :         }
     491      1425106 :         Ok(())
     492      1425106 :     }
     493              : }
     494              : 
     495              : /// A half driving sending WAL.
     496              : struct WalSender<'a, IO> {
     497              :     pgb: &'a mut PostgresBackend<IO>,
     498              :     tli: Arc<Timeline>,
     499              :     appname: Option<String>,
     500              :     // Position since which we are sending next chunk.
     501              :     start_pos: Lsn,
     502              :     // WAL up to this position is known to be locally available.
     503              :     // Usually this is the same as the latest commit_lsn, but in case of
     504              :     // walproposer recovery, this is flush_lsn.
     505              :     //
     506              :     // We send this LSN to the receiver as wal_end, so that it knows how much
     507              :     // WAL this safekeeper has. This LSN should be as fresh as possible.
     508              :     end_pos: Lsn,
     509              :     /// When streaming uncommitted part, the term the client acts as the leader
     510              :     /// in. Streaming is stopped if local term changes to a different (higher)
     511              :     /// value.
     512              :     term: Option<Term>,
     513              :     /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
     514              :     end_watch: EndWatch,
     515              :     ws_guard: Arc<WalSenderGuard>,
     516              :     wal_reader: WalReader,
     517              :     // buffer for readling WAL into to send it
     518              :     send_buf: [u8; MAX_SEND_SIZE],
     519              : }
     520              : 
     521              : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
     522              :     /// Send WAL until
     523              :     /// - an error occurs
     524              :     /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
     525              :     ///
     526              :     /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
     527              :     /// convenience.
     528          830 :     async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     529              :         loop {
     530              :             // Wait for the next portion if it is not there yet, or just
     531              :             // update our end of WAL available for sending value, we
     532              :             // communicate it to the receiver.
     533      2007222 :             self.wait_wal().await?;
     534       740439 :             assert!(
     535       740439 :                 self.end_pos > self.start_pos,
     536            0 :                 "nothing to send after waiting for WAL"
     537              :             );
     538              : 
     539              :             // try to send as much as available, capped by MAX_SEND_SIZE
     540       740439 :             let mut send_size = self
     541       740439 :                 .end_pos
     542       740439 :                 .checked_sub(self.start_pos)
     543       740439 :                 .context("reading wal without waiting for it first")?
     544              :                 .0 as usize;
     545       740439 :             send_size = min(send_size, self.send_buf.len());
     546       740439 :             let send_buf = &mut self.send_buf[..send_size];
     547              :             let send_size: usize;
     548              :             {
     549              :                 // If uncommitted part is being pulled, check that the term is
     550              :                 // still the expected one.
     551       740439 :                 let _term_guard = if let Some(t) = self.term {
     552            1 :                     Some(self.tli.acquire_term(t).await?)
     553              :                 } else {
     554       740438 :                     None
     555              :                 };
     556              :                 // read wal into buffer
     557       740588 :                 send_size = self.wal_reader.read(send_buf).await?
     558              :             };
     559       740437 :             let send_buf = &send_buf[..send_size];
     560       740437 : 
     561       740437 :             // and send it
     562       740437 :             self.pgb
     563       740437 :                 .write_message(&BeMessage::XLogData(XLogDataBody {
     564       740437 :                     wal_start: self.start_pos.0,
     565       740437 :                     wal_end: self.end_pos.0,
     566       740437 :                     timestamp: get_current_timestamp(),
     567       740437 :                     data: send_buf,
     568       740437 :                 }))
     569        21121 :                 .await?;
     570              : 
     571            0 :             trace!(
     572            0 :                 "sent {} bytes of WAL {}-{}",
     573            0 :                 send_size,
     574            0 :                 self.start_pos,
     575            0 :                 self.start_pos + send_size as u64
     576            0 :             );
     577       740417 :             self.start_pos += send_size as u64;
     578              :         }
     579          140 :     }
     580              : 
     581              :     /// wait until we have WAL to stream, sending keepalives and checking for
     582              :     /// exit in the meanwhile
     583       741246 :     async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     584              :         loop {
     585       743054 :             self.end_pos = self.end_watch.get();
     586       743054 :             if self.end_pos > self.start_pos {
     587              :                 // We have something to send.
     588            0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     589        98839 :                 return Ok(());
     590       644215 :             }
     591              : 
     592              :             // Wait for WAL to appear, now self.end_pos == self.start_pos.
     593      2007216 :             if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
     594       641600 :                 self.end_pos = lsn;
     595            0 :                 trace!("got end_pos {:?}, streaming", self.end_pos);
     596       641600 :                 return Ok(());
     597         1941 :             }
     598         1941 : 
     599         1941 :             // Timed out waiting for WAL, check for termination and send KA.
     600         1941 :             // Check for termination only if we are streaming up to commit_lsn
     601         1941 :             // (to pageserver).
     602         1941 :             if let EndWatch::Commit(_) = self.end_watch {
     603         1941 :                 if let Some(remote_consistent_lsn) = self
     604         1941 :                     .ws_guard
     605         1941 :                     .walsenders
     606         1941 :                     .get_ws_remote_consistent_lsn(self.ws_guard.id)
     607              :                 {
     608         1941 :                     if self.tli.should_walsender_stop(remote_consistent_lsn).await {
     609              :                         // Terminate if there is nothing more to send.
     610              :                         // Note that "ending streaming" part of the string is used by
     611              :                         // pageserver to identify WalReceiverError::SuccessfulCompletion,
     612              :                         // do not change this string without updating pageserver.
     613          133 :                         return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
     614          133 :                         "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
     615          133 :                         self.appname, self.start_pos,
     616          133 :                     )));
     617         1808 :                     }
     618            0 :                 }
     619            0 :             }
     620              : 
     621         1808 :             self.pgb
     622         1808 :                 .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
     623         1808 :                     wal_end: self.end_pos.0,
     624         1808 :                     timestamp: get_current_timestamp(),
     625         1808 :                     request_reply: true,
     626         1808 :                 }))
     627            0 :                 .await?;
     628              :         }
     629       740572 :     }
     630              : }
     631              : 
     632              : /// A half driving receiving replies.
     633              : struct ReplyReader<IO> {
     634              :     reader: PostgresBackendReader<IO>,
     635              :     ws_guard: Arc<WalSenderGuard>,
     636              : }
     637              : 
     638              : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
     639          830 :     async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
     640              :         loop {
     641      2769038 :             let msg = self.reader.read_copy_message().await?;
     642       736458 :             self.handle_feedback(&msg)?
     643              :         }
     644          393 :     }
     645              : 
     646       736458 :     fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
     647       736458 :         match msg.first().cloned() {
     648            1 :             Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
     649            1 :                 // Note: deserializing is on m[1..] because we skip the tag byte.
     650            1 :                 let hs_feedback = HotStandbyFeedback::des(&msg[1..])
     651            1 :                     .context("failed to deserialize HotStandbyFeedback")?;
     652            1 :                 self.ws_guard
     653            1 :                     .walsenders
     654            1 :                     .record_hs_feedback(self.ws_guard.id, &hs_feedback);
     655              :             }
     656           12 :             Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
     657           12 :                 let reply =
     658           12 :                     StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
     659           12 :                 self.ws_guard
     660           12 :                     .walsenders
     661           12 :                     .record_standby_reply(self.ws_guard.id, &reply);
     662              :             }
     663              :             Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
     664              :                 // pageserver sends this.
     665              :                 // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
     666       736445 :                 let buf = Bytes::copy_from_slice(&msg[9..]);
     667       736445 :                 let ps_feedback = PageserverFeedback::parse(buf);
     668       736445 : 
     669       736445 :                 trace!("PageserverFeedback is {:?}", ps_feedback);
     670       736445 :                 self.ws_guard
     671       736445 :                     .walsenders
     672       736445 :                     .record_ps_feedback(self.ws_guard.id, &ps_feedback);
     673              :                 // in principle new remote_consistent_lsn could allow to
     674              :                 // deactivate the timeline, but we check that regularly through
     675              :                 // broker updated, not need to do it here
     676              :             }
     677            0 :             _ => warn!("unexpected message {:?}", msg),
     678              :         }
     679       736458 :         Ok(())
     680       736458 :     }
     681              : }
     682              : 
     683              : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
     684              : 
     685              : /// Wait until we have available WAL > start_pos or timeout expires. Returns
     686              : /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
     687              : /// - Ok(None) if timeout expired;
     688              : /// - Err in case of error -- only if 1) term changed while fetching in recovery
     689              : ///   mode 2) watch channel closed, which must never happen.
     690       644215 : async fn wait_for_lsn(
     691       644215 :     rx: &mut EndWatch,
     692       644215 :     client_term: Option<Term>,
     693       644215 :     start_pos: Lsn,
     694       644215 : ) -> anyhow::Result<Option<Lsn>> {
     695       644215 :     let res = timeout(POLL_STATE_TIMEOUT, async move {
     696              :         loop {
     697      2069321 :             let end_pos = rx.get();
     698      2069321 :             if end_pos > start_pos {
     699       641600 :                 return Ok(end_pos);
     700      1427721 :             }
     701      1427721 :             if let EndWatch::Flush(rx) = rx {
     702          128 :                 let curr_term = rx.borrow().term;
     703          128 :                 if let Some(client_term) = client_term {
     704            0 :                     if curr_term != client_term {
     705            0 :                         bail!("term changed: requested {}, now {}", client_term, curr_term);
     706            0 :                     }
     707          128 :                 }
     708      1427593 :             }
     709      2007216 :             rx.changed().await?;
     710              :         }
     711       644215 :     })
     712      2007216 :     .await;
     713              : 
     714       641600 :     match res {
     715              :         // success
     716       641600 :         Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
     717              :         // error inside closure
     718            0 :         Ok(Err(err)) => Err(err),
     719              :         // timeout
     720         1941 :         Err(_) => Ok(None),
     721              :     }
     722       643541 : }
     723              : 
     724              : #[cfg(test)]
     725              : mod tests {
     726              :     use postgres_protocol::PG_EPOCH;
     727              :     use utils::id::{TenantId, TimelineId};
     728              : 
     729              :     use super::*;
     730              : 
     731            6 :     fn mock_ttid() -> TenantTimelineId {
     732            6 :         TenantTimelineId {
     733            6 :             tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
     734            6 :             timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
     735            6 :         }
     736            6 :     }
     737              : 
     738            6 :     fn mock_addr() -> SocketAddr {
     739            6 :         "127.0.0.1:8080".parse().unwrap()
     740            6 :     }
     741              : 
     742              :     // add to wss specified feedback setting other fields to dummy values
     743            6 :     fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
     744            6 :         let walsender_state = WalSenderState {
     745            6 :             ttid: mock_ttid(),
     746            6 :             addr: mock_addr(),
     747            6 :             conn_id: 1,
     748            6 :             appname: None,
     749            6 :             feedback,
     750            6 :         };
     751            6 :         wss.slots.push(Some(walsender_state))
     752            6 :     }
     753              : 
     754              :     // form standby feedback with given hot standby feedback ts/xmin and the
     755              :     // rest set to dummy values.
     756            4 :     fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
     757            4 :         ReplicationFeedback::Standby(StandbyFeedback {
     758            4 :             reply: StandbyReply::empty(),
     759            4 :             hs_feedback: HotStandbyFeedback {
     760            4 :                 ts,
     761            4 :                 xmin,
     762            4 :                 catalog_xmin: 0,
     763            4 :             },
     764            4 :         })
     765            4 :     }
     766              : 
     767              :     // test that hs aggregation works as expected
     768            1 :     #[test]
     769            1 :     fn test_hs_feedback_no_valid() {
     770            1 :         let mut wss = WalSendersShared::new();
     771            1 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
     772            1 :         wss.update_hs_feedback();
     773            1 :         assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID);
     774            1 :     }
     775              : 
     776            1 :     #[test]
     777            1 :     fn test_hs_feedback() {
     778            1 :         let mut wss = WalSendersShared::new();
     779            1 :         push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
     780            1 :         push_feedback(&mut wss, hs_feedback(1, 42));
     781            1 :         push_feedback(&mut wss, hs_feedback(1, 64));
     782            1 :         wss.update_hs_feedback();
     783            1 :         assert_eq!(wss.agg_hs_feedback.xmin, 42);
     784            1 :     }
     785              : 
     786              :     // form pageserver feedback with given last_record_lsn / tli size and the
     787              :     // rest set to dummy values.
     788            2 :     fn ps_feedback(current_timeline_size: u64, last_received_lsn: Lsn) -> ReplicationFeedback {
     789            2 :         ReplicationFeedback::Pageserver(PageserverFeedback {
     790            2 :             current_timeline_size,
     791            2 :             last_received_lsn,
     792            2 :             disk_consistent_lsn: Lsn::INVALID,
     793            2 :             remote_consistent_lsn: Lsn::INVALID,
     794            2 :             replytime: *PG_EPOCH,
     795            2 :         })
     796            2 :     }
     797              : 
     798              :     // test that ps aggregation works as expected
     799            1 :     #[test]
     800            1 :     fn test_ps_feedback() {
     801            1 :         let mut wss = WalSendersShared::new();
     802            1 :         push_feedback(&mut wss, ps_feedback(8, Lsn(42)));
     803            1 :         push_feedback(&mut wss, ps_feedback(4, Lsn(84)));
     804            1 :         wss.update_ps_feedback();
     805            1 :         assert_eq!(wss.agg_ps_feedback.current_timeline_size, 4);
     806            1 :         assert_eq!(wss.agg_ps_feedback.last_received_lsn, Lsn(84));
     807            1 :     }
     808              : }
        

Generated by: LCOV version 2.1-beta