Line data Source code
1 : //! This module implements the streaming side of replication protocol, starting
2 : //! with the "START_REPLICATION" message, and registry of walsenders.
3 :
4 : use crate::handler::SafekeeperPostgresHandler;
5 : use crate::safekeeper::{Term, TermLsn};
6 : use crate::timeline::Timeline;
7 : use crate::wal_service::ConnectionId;
8 : use crate::wal_storage::WalReader;
9 : use crate::GlobalTimelines;
10 : use anyhow::{bail, Context as AnyhowContext};
11 : use bytes::Bytes;
12 : use parking_lot::Mutex;
13 : use postgres_backend::PostgresBackend;
14 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
15 : use postgres_ffi::get_current_timestamp;
16 : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
17 : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
18 : use serde::{Deserialize, Serialize};
19 : use tokio::io::{AsyncRead, AsyncWrite};
20 : use utils::failpoint_support;
21 : use utils::id::TenantTimelineId;
22 : use utils::pageserver_feedback::PageserverFeedback;
23 :
24 : use std::cmp::{max, min};
25 : use std::net::SocketAddr;
26 : use std::str;
27 : use std::sync::Arc;
28 : use std::time::Duration;
29 : use tokio::sync::watch::Receiver;
30 : use tokio::time::timeout;
31 : use tracing::*;
32 : use utils::{bin_ser::BeSer, lsn::Lsn};
33 :
34 : // See: https://www.postgresql.org/docs/13/protocol-replication.html
35 : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
36 : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
37 : // neon extension of replication protocol
38 : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
39 :
40 : type FullTransactionId = u64;
41 :
42 : /// Hot standby feedback received from replica
43 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
44 : pub struct HotStandbyFeedback {
45 : pub ts: TimestampTz,
46 : pub xmin: FullTransactionId,
47 : pub catalog_xmin: FullTransactionId,
48 : }
49 :
50 : const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
51 :
52 : impl HotStandbyFeedback {
53 17842 : pub fn empty() -> HotStandbyFeedback {
54 17842 : HotStandbyFeedback {
55 17842 : ts: 0,
56 17842 : xmin: 0,
57 17842 : catalog_xmin: 0,
58 17842 : }
59 17842 : }
60 : }
61 :
62 : /// Standby status update
63 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
64 : pub struct StandbyReply {
65 : pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
66 : pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
67 : pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
68 : pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
69 : pub reply_requested: bool,
70 : }
71 :
72 : impl StandbyReply {
73 8 : fn empty() -> Self {
74 8 : StandbyReply {
75 8 : write_lsn: Lsn::INVALID,
76 8 : flush_lsn: Lsn::INVALID,
77 8 : apply_lsn: Lsn::INVALID,
78 8 : reply_ts: 0,
79 8 : reply_requested: false,
80 8 : }
81 8 : }
82 : }
83 :
84 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
85 : pub struct StandbyFeedback {
86 : reply: StandbyReply,
87 : hs_feedback: HotStandbyFeedback,
88 : }
89 :
90 : /// WalSenders registry. Timeline holds it (wrapped in Arc).
91 : pub struct WalSenders {
92 : mutex: Mutex<WalSendersShared>,
93 : }
94 :
95 : impl WalSenders {
96 0 : pub fn new() -> Arc<WalSenders> {
97 0 : Arc::new(WalSenders {
98 0 : mutex: Mutex::new(WalSendersShared::new()),
99 0 : })
100 0 : }
101 :
102 : /// Register new walsender. Returned guard provides access to the slot and
103 : /// automatically deregisters in Drop.
104 0 : fn register(
105 0 : self: &Arc<WalSenders>,
106 0 : ttid: TenantTimelineId,
107 0 : addr: SocketAddr,
108 0 : conn_id: ConnectionId,
109 0 : appname: Option<String>,
110 0 : ) -> WalSenderGuard {
111 0 : let slots = &mut self.mutex.lock().slots;
112 0 : let walsender_state = WalSenderState {
113 0 : ttid,
114 0 : addr,
115 0 : conn_id,
116 0 : appname,
117 0 : feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
118 0 : };
119 : // find empty slot or create new one
120 0 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
121 0 : slots[pos] = Some(walsender_state);
122 0 : pos
123 : } else {
124 0 : let pos = slots.len();
125 0 : slots.push(Some(walsender_state));
126 0 : pos
127 : };
128 0 : WalSenderGuard {
129 0 : id: pos,
130 0 : walsenders: self.clone(),
131 0 : }
132 0 : }
133 :
134 : /// Get state of all walsenders.
135 0 : pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
136 0 : self.mutex.lock().slots.iter().flatten().cloned().collect()
137 0 : }
138 :
139 : /// Get LSN of the most lagging pageserver receiver. Return None if there are no
140 : /// active walsenders.
141 0 : pub fn laggard_lsn(self: &Arc<WalSenders>) -> Option<Lsn> {
142 0 : self.mutex
143 0 : .lock()
144 0 : .slots
145 0 : .iter()
146 0 : .flatten()
147 0 : .filter_map(|s| match s.feedback {
148 0 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.last_received_lsn),
149 0 : ReplicationFeedback::Standby(_) => None,
150 0 : })
151 0 : .min()
152 0 : }
153 :
154 : /// Get aggregated pageserver feedback.
155 0 : pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
156 0 : self.mutex.lock().agg_ps_feedback
157 0 : }
158 :
159 : /// Get aggregated pageserver and hot standby feedback (we send them to compute).
160 0 : pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, HotStandbyFeedback) {
161 0 : let shared = self.mutex.lock();
162 0 : (shared.agg_ps_feedback, shared.agg_hs_feedback)
163 0 : }
164 :
165 : /// Record new pageserver feedback, update aggregated values.
166 0 : fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
167 0 : let mut shared = self.mutex.lock();
168 0 : shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
169 0 : shared.update_ps_feedback();
170 0 : }
171 :
172 : /// Record standby reply.
173 0 : fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
174 0 : let mut shared = self.mutex.lock();
175 0 : let slot = shared.get_slot_mut(id);
176 0 : match &mut slot.feedback {
177 0 : ReplicationFeedback::Standby(sf) => sf.reply = *reply,
178 : ReplicationFeedback::Pageserver(_) => {
179 0 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
180 0 : reply: *reply,
181 0 : hs_feedback: HotStandbyFeedback::empty(),
182 0 : })
183 : }
184 : }
185 0 : }
186 :
187 : /// Record hot standby feedback, update aggregated value.
188 0 : fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
189 0 : let mut shared = self.mutex.lock();
190 0 : let slot = shared.get_slot_mut(id);
191 0 : match &mut slot.feedback {
192 0 : ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
193 : ReplicationFeedback::Pageserver(_) => {
194 0 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
195 0 : reply: StandbyReply::empty(),
196 0 : hs_feedback: *feedback,
197 0 : })
198 : }
199 : }
200 0 : shared.update_hs_feedback();
201 0 : }
202 :
203 : /// Get remote_consistent_lsn reported by the pageserver. Returns None if
204 : /// client is not pageserver.
205 0 : fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
206 0 : let shared = self.mutex.lock();
207 0 : let slot = shared.get_slot(id);
208 0 : match slot.feedback {
209 0 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
210 0 : _ => None,
211 : }
212 0 : }
213 :
214 : /// Unregister walsender.
215 0 : fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
216 0 : let mut shared = self.mutex.lock();
217 0 : shared.slots[id] = None;
218 0 : shared.update_hs_feedback();
219 0 : }
220 : }
221 :
222 : struct WalSendersShared {
223 : // aggregated over all walsenders value
224 : agg_hs_feedback: HotStandbyFeedback,
225 : // aggregated over all walsenders value
226 : agg_ps_feedback: PageserverFeedback,
227 : slots: Vec<Option<WalSenderState>>,
228 : }
229 :
230 : impl WalSendersShared {
231 6 : fn new() -> Self {
232 6 : WalSendersShared {
233 6 : agg_hs_feedback: HotStandbyFeedback::empty(),
234 6 : agg_ps_feedback: PageserverFeedback::empty(),
235 6 : slots: Vec::new(),
236 6 : }
237 6 : }
238 :
239 : /// Get content of provided id slot, it must exist.
240 0 : fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
241 0 : self.slots[id].as_ref().expect("walsender doesn't exist")
242 0 : }
243 :
244 : /// Get mut content of provided id slot, it must exist.
245 0 : fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
246 0 : self.slots[id].as_mut().expect("walsender doesn't exist")
247 0 : }
248 :
249 : /// Update aggregated hot standy feedback. We just take min of valid xmins
250 : /// and ts.
251 4 : fn update_hs_feedback(&mut self) {
252 4 : let mut agg = HotStandbyFeedback::empty();
253 8 : for ws_state in self.slots.iter().flatten() {
254 8 : if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
255 8 : let hs_feedback = standby_feedback.hs_feedback;
256 8 : // doing Option math like op1.iter().chain(op2.iter()).min()
257 8 : // would be nicer, but we serialize/deserialize this struct
258 8 : // directly, so leave as is for now
259 8 : if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
260 4 : if agg.xmin != INVALID_FULL_TRANSACTION_ID {
261 2 : agg.xmin = min(agg.xmin, hs_feedback.xmin);
262 2 : } else {
263 2 : agg.xmin = hs_feedback.xmin;
264 2 : }
265 4 : agg.ts = min(agg.ts, hs_feedback.ts);
266 4 : }
267 8 : if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
268 0 : if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
269 0 : agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
270 0 : } else {
271 0 : agg.catalog_xmin = hs_feedback.catalog_xmin;
272 0 : }
273 0 : agg.ts = min(agg.ts, hs_feedback.ts);
274 8 : }
275 0 : }
276 : }
277 4 : self.agg_hs_feedback = agg;
278 4 : }
279 :
280 : /// Update aggregated pageserver feedback. LSNs (last_received,
281 : /// disk_consistent, remote_consistent) and reply timestamp are just
282 : /// maximized; timeline_size if taken from feedback with highest
283 : /// last_received lsn. This is generally reasonable, but we might want to
284 : /// implement other policies once multiple pageservers start to be actively
285 : /// used.
286 2 : fn update_ps_feedback(&mut self) {
287 2 : let init = PageserverFeedback::empty();
288 2 : let acc =
289 2 : self.slots
290 2 : .iter()
291 2 : .flatten()
292 4 : .fold(init, |mut acc, ws_state| match ws_state.feedback {
293 4 : ReplicationFeedback::Pageserver(feedback) => {
294 4 : if feedback.last_received_lsn > acc.last_received_lsn {
295 4 : acc.current_timeline_size = feedback.current_timeline_size;
296 4 : }
297 4 : acc.last_received_lsn =
298 4 : max(feedback.last_received_lsn, acc.last_received_lsn);
299 4 : acc.disk_consistent_lsn =
300 4 : max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn);
301 4 : acc.remote_consistent_lsn =
302 4 : max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn);
303 4 : acc.replytime = max(feedback.replytime, acc.replytime);
304 4 : acc
305 : }
306 0 : ReplicationFeedback::Standby(_) => acc,
307 4 : });
308 2 : self.agg_ps_feedback = acc;
309 2 : }
310 : }
311 :
312 : // Serialized is used only for pretty printing in json.
313 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
314 : pub struct WalSenderState {
315 : ttid: TenantTimelineId,
316 : addr: SocketAddr,
317 : conn_id: ConnectionId,
318 : // postgres application_name
319 : appname: Option<String>,
320 : feedback: ReplicationFeedback,
321 : }
322 :
323 : // Receiver is either pageserver or regular standby, which have different
324 : // feedbacks.
325 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
326 : enum ReplicationFeedback {
327 : Pageserver(PageserverFeedback),
328 : Standby(StandbyFeedback),
329 : }
330 :
331 : // id of the occupied slot in WalSenders to access it (and save in the
332 : // WalSenderGuard). We could give Arc directly to the slot, but there is not
333 : // much sense in that as values aggregation which is performed on each feedback
334 : // receival iterates over all walsenders.
335 : pub type WalSenderId = usize;
336 :
337 : /// Scope guard to access slot in WalSenders registry and unregister from it in
338 : /// Drop.
339 : pub struct WalSenderGuard {
340 : id: WalSenderId,
341 : walsenders: Arc<WalSenders>,
342 : }
343 :
344 : impl Drop for WalSenderGuard {
345 0 : fn drop(&mut self) {
346 0 : self.walsenders.unregister(self.id);
347 0 : }
348 : }
349 :
350 : impl SafekeeperPostgresHandler {
351 : /// Wrapper around handle_start_replication_guts handling result. Error is
352 : /// handled here while we're still in walsender ttid span; with API
353 : /// extension, this can probably be moved into postgres_backend.
354 0 : pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
355 0 : &mut self,
356 0 : pgb: &mut PostgresBackend<IO>,
357 0 : start_pos: Lsn,
358 0 : term: Option<Term>,
359 0 : ) -> Result<(), QueryError> {
360 0 : if let Err(end) = self
361 0 : .handle_start_replication_guts(pgb, start_pos, term)
362 0 : .await
363 : {
364 : // Log the result and probably send it to the client, closing the stream.
365 0 : pgb.handle_copy_stream_end(end).await;
366 0 : }
367 0 : Ok(())
368 0 : }
369 :
370 0 : pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
371 0 : &mut self,
372 0 : pgb: &mut PostgresBackend<IO>,
373 0 : start_pos: Lsn,
374 0 : term: Option<Term>,
375 0 : ) -> Result<(), CopyStreamHandlerEnd> {
376 0 : let appname = self.appname.clone();
377 0 : let tli =
378 0 : GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
379 :
380 : // Use a guard object to remove our entry from the timeline when we are done.
381 0 : let ws_guard = Arc::new(tli.get_walsenders().register(
382 0 : self.ttid,
383 0 : *pgb.get_peer_addr(),
384 0 : self.conn_id,
385 0 : self.appname.clone(),
386 0 : ));
387 :
388 : // Walsender can operate in one of two modes which we select by
389 : // application_name: give only committed WAL (used by pageserver) or all
390 : // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
391 : // The second case is always driven by a consensus leader which term
392 : // must be supplied.
393 0 : let end_watch = if term.is_some() {
394 0 : EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
395 : } else {
396 0 : EndWatch::Commit(tli.get_commit_lsn_watch_rx())
397 : };
398 : // we don't check term here; it will be checked on first waiting/WAL reading anyway.
399 0 : let end_pos = end_watch.get();
400 0 :
401 0 : if end_pos < start_pos {
402 0 : warn!(
403 0 : "requested start_pos {} is ahead of available WAL end_pos {}",
404 0 : start_pos, end_pos
405 0 : );
406 0 : }
407 :
408 0 : info!(
409 0 : "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
410 0 : start_pos,
411 0 : end_pos,
412 0 : matches!(end_watch, EndWatch::Flush(_)),
413 0 : appname
414 0 : );
415 :
416 : // switch to copy
417 0 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
418 :
419 0 : let (_, persisted_state) = tli.get_state().await;
420 0 : let wal_reader = WalReader::new(
421 0 : self.conf.workdir.clone(),
422 0 : self.conf.timeline_dir(&tli.ttid),
423 0 : &persisted_state,
424 0 : start_pos,
425 0 : self.conf.is_wal_backup_enabled(),
426 0 : )?;
427 :
428 : // Split to concurrently receive and send data; replies are generally
429 : // not synchronized with sends, so this avoids deadlocks.
430 0 : let reader = pgb.split().context("START_REPLICATION split")?;
431 :
432 0 : let mut sender = WalSender {
433 0 : pgb,
434 0 : tli: tli.clone(),
435 0 : appname,
436 0 : start_pos,
437 0 : end_pos,
438 0 : term,
439 0 : end_watch,
440 0 : ws_guard: ws_guard.clone(),
441 0 : wal_reader,
442 0 : send_buf: [0; MAX_SEND_SIZE],
443 0 : };
444 0 : let mut reply_reader = ReplyReader {
445 0 : reader,
446 0 : ws_guard,
447 0 : tli,
448 0 : };
449 :
450 0 : let res = tokio::select! {
451 : // todo: add read|write .context to these errors
452 0 : r = sender.run() => r,
453 0 : r = reply_reader.run() => r,
454 : };
455 : // Join pg backend back.
456 0 : pgb.unsplit(reply_reader.reader)?;
457 :
458 0 : res
459 0 : }
460 : }
461 :
462 : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
463 : /// given term (recovery by walproposer or peer safekeeper).
464 : enum EndWatch {
465 : Commit(Receiver<Lsn>),
466 : Flush(Receiver<TermLsn>),
467 : }
468 :
469 : impl EndWatch {
470 : /// Get current end of WAL.
471 0 : fn get(&self) -> Lsn {
472 0 : match self {
473 0 : EndWatch::Commit(r) => *r.borrow(),
474 0 : EndWatch::Flush(r) => r.borrow().lsn,
475 : }
476 0 : }
477 :
478 : /// Wait for the update.
479 0 : async fn changed(&mut self) -> anyhow::Result<()> {
480 0 : match self {
481 0 : EndWatch::Commit(r) => r.changed().await?,
482 0 : EndWatch::Flush(r) => r.changed().await?,
483 : }
484 0 : Ok(())
485 0 : }
486 : }
487 :
488 : /// A half driving sending WAL.
489 : struct WalSender<'a, IO> {
490 : pgb: &'a mut PostgresBackend<IO>,
491 : tli: Arc<Timeline>,
492 : appname: Option<String>,
493 : // Position since which we are sending next chunk.
494 : start_pos: Lsn,
495 : // WAL up to this position is known to be locally available.
496 : // Usually this is the same as the latest commit_lsn, but in case of
497 : // walproposer recovery, this is flush_lsn.
498 : //
499 : // We send this LSN to the receiver as wal_end, so that it knows how much
500 : // WAL this safekeeper has. This LSN should be as fresh as possible.
501 : end_pos: Lsn,
502 : /// When streaming uncommitted part, the term the client acts as the leader
503 : /// in. Streaming is stopped if local term changes to a different (higher)
504 : /// value.
505 : term: Option<Term>,
506 : /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
507 : end_watch: EndWatch,
508 : ws_guard: Arc<WalSenderGuard>,
509 : wal_reader: WalReader,
510 : // buffer for readling WAL into to send it
511 : send_buf: [u8; MAX_SEND_SIZE],
512 : }
513 :
514 : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
515 : /// Send WAL until
516 : /// - an error occurs
517 : /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
518 : ///
519 : /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
520 : /// convenience.
521 0 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
522 : loop {
523 : // Wait for the next portion if it is not there yet, or just
524 : // update our end of WAL available for sending value, we
525 : // communicate it to the receiver.
526 0 : self.wait_wal().await?;
527 0 : assert!(
528 0 : self.end_pos > self.start_pos,
529 0 : "nothing to send after waiting for WAL"
530 : );
531 :
532 : // try to send as much as available, capped by MAX_SEND_SIZE
533 0 : let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
534 0 : // if we went behind available WAL, back off
535 0 : if chunk_end_pos >= self.end_pos {
536 0 : chunk_end_pos = self.end_pos;
537 0 : } else {
538 0 : // If sending not up to end pos, round down to page boundary to
539 0 : // avoid breaking WAL record not at page boundary, as protocol
540 0 : // demands. See walsender.c (XLogSendPhysical).
541 0 : chunk_end_pos = chunk_end_pos
542 0 : .checked_sub(chunk_end_pos.block_offset())
543 0 : .unwrap();
544 0 : }
545 0 : let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
546 0 : let send_buf = &mut self.send_buf[..send_size];
547 : let send_size: usize;
548 : {
549 : // If uncommitted part is being pulled, check that the term is
550 : // still the expected one.
551 0 : let _term_guard = if let Some(t) = self.term {
552 0 : Some(self.tli.acquire_term(t).await?)
553 : } else {
554 0 : None
555 : };
556 : // Read WAL into buffer. send_size can be additionally capped to
557 : // segment boundary here.
558 0 : send_size = self.wal_reader.read(send_buf).await?
559 : };
560 0 : let send_buf = &send_buf[..send_size];
561 0 :
562 0 : // and send it
563 0 : self.pgb
564 0 : .write_message(&BeMessage::XLogData(XLogDataBody {
565 0 : wal_start: self.start_pos.0,
566 0 : wal_end: self.end_pos.0,
567 0 : timestamp: get_current_timestamp(),
568 0 : data: send_buf,
569 0 : }))
570 0 : .await?;
571 :
572 0 : if let Some(appname) = &self.appname {
573 0 : if appname == "replica" {
574 0 : failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
575 0 : }
576 0 : }
577 0 : trace!(
578 0 : "sent {} bytes of WAL {}-{}",
579 0 : send_size,
580 0 : self.start_pos,
581 0 : self.start_pos + send_size as u64
582 0 : );
583 0 : self.start_pos += send_size as u64;
584 : }
585 0 : }
586 :
587 : /// wait until we have WAL to stream, sending keepalives and checking for
588 : /// exit in the meanwhile
589 0 : async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
590 : loop {
591 0 : self.end_pos = self.end_watch.get();
592 0 : if self.end_pos > self.start_pos {
593 : // We have something to send.
594 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
595 0 : return Ok(());
596 0 : }
597 :
598 : // Wait for WAL to appear, now self.end_pos == self.start_pos.
599 0 : if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
600 0 : self.end_pos = lsn;
601 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
602 0 : return Ok(());
603 0 : }
604 0 :
605 0 : // Timed out waiting for WAL, check for termination and send KA.
606 0 : // Check for termination only if we are streaming up to commit_lsn
607 0 : // (to pageserver).
608 0 : if let EndWatch::Commit(_) = self.end_watch {
609 0 : if let Some(remote_consistent_lsn) = self
610 0 : .ws_guard
611 0 : .walsenders
612 0 : .get_ws_remote_consistent_lsn(self.ws_guard.id)
613 : {
614 0 : if self.tli.should_walsender_stop(remote_consistent_lsn).await {
615 : // Terminate if there is nothing more to send.
616 : // Note that "ending streaming" part of the string is used by
617 : // pageserver to identify WalReceiverError::SuccessfulCompletion,
618 : // do not change this string without updating pageserver.
619 0 : return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
620 0 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
621 0 : self.appname, self.start_pos,
622 0 : )));
623 0 : }
624 0 : }
625 0 : }
626 :
627 0 : self.pgb
628 0 : .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
629 0 : wal_end: self.end_pos.0,
630 0 : timestamp: get_current_timestamp(),
631 0 : request_reply: true,
632 0 : }))
633 0 : .await?;
634 : }
635 0 : }
636 : }
637 :
638 : /// A half driving receiving replies.
639 : struct ReplyReader<IO> {
640 : reader: PostgresBackendReader<IO>,
641 : ws_guard: Arc<WalSenderGuard>,
642 : tli: Arc<Timeline>,
643 : }
644 :
645 : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
646 0 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
647 : loop {
648 0 : let msg = self.reader.read_copy_message().await?;
649 0 : self.handle_feedback(&msg).await?
650 : }
651 0 : }
652 :
653 0 : async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
654 0 : match msg.first().cloned() {
655 0 : Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
656 0 : // Note: deserializing is on m[1..] because we skip the tag byte.
657 0 : let hs_feedback = HotStandbyFeedback::des(&msg[1..])
658 0 : .context("failed to deserialize HotStandbyFeedback")?;
659 0 : self.ws_guard
660 0 : .walsenders
661 0 : .record_hs_feedback(self.ws_guard.id, &hs_feedback);
662 : }
663 0 : Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
664 0 : let reply =
665 0 : StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
666 0 : self.ws_guard
667 0 : .walsenders
668 0 : .record_standby_reply(self.ws_guard.id, &reply);
669 : }
670 : Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
671 : // pageserver sends this.
672 : // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
673 0 : let buf = Bytes::copy_from_slice(&msg[9..]);
674 0 : let ps_feedback = PageserverFeedback::parse(buf);
675 :
676 0 : trace!("PageserverFeedback is {:?}", ps_feedback);
677 0 : self.ws_guard
678 0 : .walsenders
679 0 : .record_ps_feedback(self.ws_guard.id, &ps_feedback);
680 0 : self.tli
681 0 : .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
682 0 : .await;
683 : // in principle new remote_consistent_lsn could allow to
684 : // deactivate the timeline, but we check that regularly through
685 : // broker updated, not need to do it here
686 : }
687 0 : _ => warn!("unexpected message {:?}", msg),
688 : }
689 0 : Ok(())
690 0 : }
691 : }
692 :
693 : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
694 :
695 : /// Wait until we have available WAL > start_pos or timeout expires. Returns
696 : /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
697 : /// - Ok(None) if timeout expired;
698 : /// - Err in case of error -- only if 1) term changed while fetching in recovery
699 : /// mode 2) watch channel closed, which must never happen.
700 0 : async fn wait_for_lsn(
701 0 : rx: &mut EndWatch,
702 0 : client_term: Option<Term>,
703 0 : start_pos: Lsn,
704 0 : ) -> anyhow::Result<Option<Lsn>> {
705 0 : let res = timeout(POLL_STATE_TIMEOUT, async move {
706 : loop {
707 0 : let end_pos = rx.get();
708 0 : if end_pos > start_pos {
709 0 : return Ok(end_pos);
710 0 : }
711 0 : if let EndWatch::Flush(rx) = rx {
712 0 : let curr_term = rx.borrow().term;
713 0 : if let Some(client_term) = client_term {
714 0 : if curr_term != client_term {
715 0 : bail!("term changed: requested {}, now {}", client_term, curr_term);
716 0 : }
717 0 : }
718 0 : }
719 0 : rx.changed().await?;
720 : }
721 0 : })
722 0 : .await;
723 :
724 0 : match res {
725 : // success
726 0 : Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
727 : // error inside closure
728 0 : Ok(Err(err)) => Err(err),
729 : // timeout
730 0 : Err(_) => Ok(None),
731 : }
732 0 : }
733 :
734 : #[cfg(test)]
735 : mod tests {
736 : use postgres_protocol::PG_EPOCH;
737 : use utils::id::{TenantId, TimelineId};
738 :
739 : use super::*;
740 :
741 12 : fn mock_ttid() -> TenantTimelineId {
742 12 : TenantTimelineId {
743 12 : tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
744 12 : timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
745 12 : }
746 12 : }
747 :
748 12 : fn mock_addr() -> SocketAddr {
749 12 : "127.0.0.1:8080".parse().unwrap()
750 12 : }
751 :
752 : // add to wss specified feedback setting other fields to dummy values
753 12 : fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
754 12 : let walsender_state = WalSenderState {
755 12 : ttid: mock_ttid(),
756 12 : addr: mock_addr(),
757 12 : conn_id: 1,
758 12 : appname: None,
759 12 : feedback,
760 12 : };
761 12 : wss.slots.push(Some(walsender_state))
762 12 : }
763 :
764 : // form standby feedback with given hot standby feedback ts/xmin and the
765 : // rest set to dummy values.
766 8 : fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
767 8 : ReplicationFeedback::Standby(StandbyFeedback {
768 8 : reply: StandbyReply::empty(),
769 8 : hs_feedback: HotStandbyFeedback {
770 8 : ts,
771 8 : xmin,
772 8 : catalog_xmin: 0,
773 8 : },
774 8 : })
775 8 : }
776 :
777 : // test that hs aggregation works as expected
778 2 : #[test]
779 2 : fn test_hs_feedback_no_valid() {
780 2 : let mut wss = WalSendersShared::new();
781 2 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
782 2 : wss.update_hs_feedback();
783 2 : assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID);
784 2 : }
785 :
786 2 : #[test]
787 2 : fn test_hs_feedback() {
788 2 : let mut wss = WalSendersShared::new();
789 2 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
790 2 : push_feedback(&mut wss, hs_feedback(1, 42));
791 2 : push_feedback(&mut wss, hs_feedback(1, 64));
792 2 : wss.update_hs_feedback();
793 2 : assert_eq!(wss.agg_hs_feedback.xmin, 42);
794 2 : }
795 :
796 : // form pageserver feedback with given last_record_lsn / tli size and the
797 : // rest set to dummy values.
798 4 : fn ps_feedback(current_timeline_size: u64, last_received_lsn: Lsn) -> ReplicationFeedback {
799 4 : ReplicationFeedback::Pageserver(PageserverFeedback {
800 4 : current_timeline_size,
801 4 : last_received_lsn,
802 4 : disk_consistent_lsn: Lsn::INVALID,
803 4 : remote_consistent_lsn: Lsn::INVALID,
804 4 : replytime: *PG_EPOCH,
805 4 : })
806 4 : }
807 :
808 : // test that ps aggregation works as expected
809 2 : #[test]
810 2 : fn test_ps_feedback() {
811 2 : let mut wss = WalSendersShared::new();
812 2 : push_feedback(&mut wss, ps_feedback(8, Lsn(42)));
813 2 : push_feedback(&mut wss, ps_feedback(4, Lsn(84)));
814 2 : wss.update_ps_feedback();
815 2 : assert_eq!(wss.agg_ps_feedback.current_timeline_size, 4);
816 2 : assert_eq!(wss.agg_ps_feedback.last_received_lsn, Lsn(84));
817 2 : }
818 : }
|