Line data Source code
1 : //! This module implements the streaming side of replication protocol, starting
2 : //! with the "START_REPLICATION" message, and registry of walsenders.
3 :
4 : use crate::handler::SafekeeperPostgresHandler;
5 : use crate::metrics::RECEIVED_PS_FEEDBACKS;
6 : use crate::receive_wal::WalReceivers;
7 : use crate::safekeeper::{Term, TermLsn};
8 : use crate::timeline::Timeline;
9 : use crate::wal_service::ConnectionId;
10 : use crate::wal_storage::WalReader;
11 : use crate::GlobalTimelines;
12 : use anyhow::{bail, Context as AnyhowContext};
13 : use bytes::Bytes;
14 : use parking_lot::Mutex;
15 : use postgres_backend::PostgresBackend;
16 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
17 : use postgres_ffi::get_current_timestamp;
18 : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
19 : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
20 : use serde::{Deserialize, Serialize};
21 : use tokio::io::{AsyncRead, AsyncWrite};
22 : use utils::failpoint_support;
23 : use utils::id::TenantTimelineId;
24 : use utils::pageserver_feedback::PageserverFeedback;
25 :
26 : use std::cmp::min;
27 : use std::net::SocketAddr;
28 : use std::str;
29 : use std::sync::Arc;
30 : use std::time::Duration;
31 : use tokio::sync::watch::Receiver;
32 : use tokio::time::timeout;
33 : use tracing::*;
34 : use utils::{bin_ser::BeSer, lsn::Lsn};
35 :
36 : // See: https://www.postgresql.org/docs/13/protocol-replication.html
37 : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
38 : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
39 : // neon extension of replication protocol
40 : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
41 :
42 : type FullTransactionId = u64;
43 :
44 : /// Hot standby feedback received from replica
45 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
46 : pub struct HotStandbyFeedback {
47 : pub ts: TimestampTz,
48 : pub xmin: FullTransactionId,
49 : pub catalog_xmin: FullTransactionId,
50 : }
51 :
52 : const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
53 :
54 : impl HotStandbyFeedback {
55 19818 : pub fn empty() -> HotStandbyFeedback {
56 19818 : HotStandbyFeedback {
57 19818 : ts: 0,
58 19818 : xmin: 0,
59 19818 : catalog_xmin: 0,
60 19818 : }
61 19818 : }
62 : }
63 :
64 : /// Standby status update
65 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
66 : pub struct StandbyReply {
67 : pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
68 : pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
69 : pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
70 : pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
71 : pub reply_requested: bool,
72 : }
73 :
74 : impl StandbyReply {
75 8 : fn empty() -> Self {
76 8 : StandbyReply {
77 8 : write_lsn: Lsn::INVALID,
78 8 : flush_lsn: Lsn::INVALID,
79 8 : apply_lsn: Lsn::INVALID,
80 8 : reply_ts: 0,
81 8 : reply_requested: false,
82 8 : }
83 8 : }
84 : }
85 :
86 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
87 : pub struct StandbyFeedback {
88 : reply: StandbyReply,
89 : hs_feedback: HotStandbyFeedback,
90 : }
91 :
92 : /// WalSenders registry. Timeline holds it (wrapped in Arc).
93 : pub struct WalSenders {
94 : mutex: Mutex<WalSendersShared>,
95 : walreceivers: Arc<WalReceivers>,
96 : }
97 :
98 : impl WalSenders {
99 0 : pub fn new(walreceivers: Arc<WalReceivers>) -> Arc<WalSenders> {
100 0 : Arc::new(WalSenders {
101 0 : mutex: Mutex::new(WalSendersShared::new()),
102 0 : walreceivers,
103 0 : })
104 0 : }
105 :
106 : /// Register new walsender. Returned guard provides access to the slot and
107 : /// automatically deregisters in Drop.
108 0 : fn register(
109 0 : self: &Arc<WalSenders>,
110 0 : ttid: TenantTimelineId,
111 0 : addr: SocketAddr,
112 0 : conn_id: ConnectionId,
113 0 : appname: Option<String>,
114 0 : ) -> WalSenderGuard {
115 0 : let slots = &mut self.mutex.lock().slots;
116 0 : let walsender_state = WalSenderState {
117 0 : ttid,
118 0 : addr,
119 0 : conn_id,
120 0 : appname,
121 0 : feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
122 0 : };
123 : // find empty slot or create new one
124 0 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
125 0 : slots[pos] = Some(walsender_state);
126 0 : pos
127 : } else {
128 0 : let pos = slots.len();
129 0 : slots.push(Some(walsender_state));
130 0 : pos
131 : };
132 0 : WalSenderGuard {
133 0 : id: pos,
134 0 : walsenders: self.clone(),
135 0 : }
136 0 : }
137 :
138 : /// Get state of all walsenders.
139 0 : pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
140 0 : self.mutex.lock().slots.iter().flatten().cloned().collect()
141 0 : }
142 :
143 : /// Get LSN of the most lagging pageserver receiver. Return None if there are no
144 : /// active walsenders.
145 0 : pub fn laggard_lsn(self: &Arc<WalSenders>) -> Option<Lsn> {
146 0 : self.mutex
147 0 : .lock()
148 0 : .slots
149 0 : .iter()
150 0 : .flatten()
151 0 : .filter_map(|s| match s.feedback {
152 0 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.last_received_lsn),
153 0 : ReplicationFeedback::Standby(_) => None,
154 0 : })
155 0 : .min()
156 0 : }
157 :
158 : /// Returns total counter of pageserver feedbacks received and last feedback.
159 0 : pub fn get_ps_feedback_stats(self: &Arc<WalSenders>) -> (u64, PageserverFeedback) {
160 0 : let shared = self.mutex.lock();
161 0 : (shared.ps_feedback_counter, shared.last_ps_feedback)
162 0 : }
163 :
164 : /// Get aggregated hot standby feedback (we send it to compute).
165 0 : pub fn get_hotstandby(self: &Arc<WalSenders>) -> HotStandbyFeedback {
166 0 : self.mutex.lock().agg_hs_feedback
167 0 : }
168 :
169 : /// Record new pageserver feedback, update aggregated values.
170 0 : fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
171 0 : let mut shared = self.mutex.lock();
172 0 : shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
173 0 : shared.last_ps_feedback = *feedback;
174 0 : shared.ps_feedback_counter += 1;
175 0 : drop(shared);
176 0 :
177 0 : RECEIVED_PS_FEEDBACKS.inc();
178 0 :
179 0 : // send feedback to connected walproposers
180 0 : self.walreceivers.broadcast_pageserver_feedback(*feedback);
181 0 : }
182 :
183 : /// Record standby reply.
184 0 : fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
185 0 : let mut shared = self.mutex.lock();
186 0 : let slot = shared.get_slot_mut(id);
187 0 : match &mut slot.feedback {
188 0 : ReplicationFeedback::Standby(sf) => sf.reply = *reply,
189 : ReplicationFeedback::Pageserver(_) => {
190 0 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
191 0 : reply: *reply,
192 0 : hs_feedback: HotStandbyFeedback::empty(),
193 0 : })
194 : }
195 : }
196 0 : }
197 :
198 : /// Record hot standby feedback, update aggregated value.
199 0 : fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
200 0 : let mut shared = self.mutex.lock();
201 0 : let slot = shared.get_slot_mut(id);
202 0 : match &mut slot.feedback {
203 0 : ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
204 : ReplicationFeedback::Pageserver(_) => {
205 0 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
206 0 : reply: StandbyReply::empty(),
207 0 : hs_feedback: *feedback,
208 0 : })
209 : }
210 : }
211 0 : shared.update_hs_feedback();
212 0 : }
213 :
214 : /// Get remote_consistent_lsn reported by the pageserver. Returns None if
215 : /// client is not pageserver.
216 0 : fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
217 0 : let shared = self.mutex.lock();
218 0 : let slot = shared.get_slot(id);
219 0 : match slot.feedback {
220 0 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
221 0 : _ => None,
222 : }
223 0 : }
224 :
225 : /// Unregister walsender.
226 0 : fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
227 0 : let mut shared = self.mutex.lock();
228 0 : shared.slots[id] = None;
229 0 : shared.update_hs_feedback();
230 0 : }
231 : }
232 :
233 : struct WalSendersShared {
234 : // aggregated over all walsenders value
235 : agg_hs_feedback: HotStandbyFeedback,
236 : // last feedback ever received from any pageserver, empty if none
237 : last_ps_feedback: PageserverFeedback,
238 : // total counter of pageserver feedbacks received
239 : ps_feedback_counter: u64,
240 : slots: Vec<Option<WalSenderState>>,
241 : }
242 :
243 : impl WalSendersShared {
244 4 : fn new() -> Self {
245 4 : WalSendersShared {
246 4 : agg_hs_feedback: HotStandbyFeedback::empty(),
247 4 : last_ps_feedback: PageserverFeedback::empty(),
248 4 : ps_feedback_counter: 0,
249 4 : slots: Vec::new(),
250 4 : }
251 4 : }
252 :
253 : /// Get content of provided id slot, it must exist.
254 0 : fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
255 0 : self.slots[id].as_ref().expect("walsender doesn't exist")
256 0 : }
257 :
258 : /// Get mut content of provided id slot, it must exist.
259 0 : fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
260 0 : self.slots[id].as_mut().expect("walsender doesn't exist")
261 0 : }
262 :
263 : /// Update aggregated hot standy feedback. We just take min of valid xmins
264 : /// and ts.
265 4 : fn update_hs_feedback(&mut self) {
266 4 : let mut agg = HotStandbyFeedback::empty();
267 8 : for ws_state in self.slots.iter().flatten() {
268 8 : if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
269 8 : let hs_feedback = standby_feedback.hs_feedback;
270 8 : // doing Option math like op1.iter().chain(op2.iter()).min()
271 8 : // would be nicer, but we serialize/deserialize this struct
272 8 : // directly, so leave as is for now
273 8 : if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
274 4 : if agg.xmin != INVALID_FULL_TRANSACTION_ID {
275 2 : agg.xmin = min(agg.xmin, hs_feedback.xmin);
276 2 : } else {
277 2 : agg.xmin = hs_feedback.xmin;
278 2 : }
279 4 : agg.ts = min(agg.ts, hs_feedback.ts);
280 4 : }
281 8 : if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
282 0 : if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
283 0 : agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
284 0 : } else {
285 0 : agg.catalog_xmin = hs_feedback.catalog_xmin;
286 0 : }
287 0 : agg.ts = min(agg.ts, hs_feedback.ts);
288 8 : }
289 0 : }
290 : }
291 4 : self.agg_hs_feedback = agg;
292 4 : }
293 : }
294 :
295 : // Serialized is used only for pretty printing in json.
296 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
297 : pub struct WalSenderState {
298 : ttid: TenantTimelineId,
299 : addr: SocketAddr,
300 : conn_id: ConnectionId,
301 : // postgres application_name
302 : appname: Option<String>,
303 : feedback: ReplicationFeedback,
304 : }
305 :
306 : // Receiver is either pageserver or regular standby, which have different
307 : // feedbacks.
308 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
309 : enum ReplicationFeedback {
310 : Pageserver(PageserverFeedback),
311 : Standby(StandbyFeedback),
312 : }
313 :
314 : // id of the occupied slot in WalSenders to access it (and save in the
315 : // WalSenderGuard). We could give Arc directly to the slot, but there is not
316 : // much sense in that as values aggregation which is performed on each feedback
317 : // receival iterates over all walsenders.
318 : pub type WalSenderId = usize;
319 :
320 : /// Scope guard to access slot in WalSenders registry and unregister from it in
321 : /// Drop.
322 : pub struct WalSenderGuard {
323 : id: WalSenderId,
324 : walsenders: Arc<WalSenders>,
325 : }
326 :
327 : impl Drop for WalSenderGuard {
328 0 : fn drop(&mut self) {
329 0 : self.walsenders.unregister(self.id);
330 0 : }
331 : }
332 :
333 : impl SafekeeperPostgresHandler {
334 : /// Wrapper around handle_start_replication_guts handling result. Error is
335 : /// handled here while we're still in walsender ttid span; with API
336 : /// extension, this can probably be moved into postgres_backend.
337 0 : pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
338 0 : &mut self,
339 0 : pgb: &mut PostgresBackend<IO>,
340 0 : start_pos: Lsn,
341 0 : term: Option<Term>,
342 0 : ) -> Result<(), QueryError> {
343 0 : if let Err(end) = self
344 0 : .handle_start_replication_guts(pgb, start_pos, term)
345 0 : .await
346 : {
347 : // Log the result and probably send it to the client, closing the stream.
348 0 : pgb.handle_copy_stream_end(end).await;
349 0 : }
350 0 : Ok(())
351 0 : }
352 :
353 0 : pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
354 0 : &mut self,
355 0 : pgb: &mut PostgresBackend<IO>,
356 0 : start_pos: Lsn,
357 0 : term: Option<Term>,
358 0 : ) -> Result<(), CopyStreamHandlerEnd> {
359 0 : let appname = self.appname.clone();
360 0 : let tli =
361 0 : GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
362 :
363 : // Use a guard object to remove our entry from the timeline when we are done.
364 0 : let ws_guard = Arc::new(tli.get_walsenders().register(
365 0 : self.ttid,
366 0 : *pgb.get_peer_addr(),
367 0 : self.conn_id,
368 0 : self.appname.clone(),
369 0 : ));
370 :
371 : // Walsender can operate in one of two modes which we select by
372 : // application_name: give only committed WAL (used by pageserver) or all
373 : // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
374 : // The second case is always driven by a consensus leader which term
375 : // must be supplied.
376 0 : let end_watch = if term.is_some() {
377 0 : EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
378 : } else {
379 0 : EndWatch::Commit(tli.get_commit_lsn_watch_rx())
380 : };
381 : // we don't check term here; it will be checked on first waiting/WAL reading anyway.
382 0 : let end_pos = end_watch.get();
383 0 :
384 0 : if end_pos < start_pos {
385 0 : warn!(
386 0 : "requested start_pos {} is ahead of available WAL end_pos {}",
387 0 : start_pos, end_pos
388 0 : );
389 0 : }
390 :
391 0 : info!(
392 0 : "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
393 0 : start_pos,
394 0 : end_pos,
395 0 : matches!(end_watch, EndWatch::Flush(_)),
396 0 : appname
397 0 : );
398 :
399 : // switch to copy
400 0 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
401 :
402 0 : let (_, persisted_state) = tli.get_state().await;
403 0 : let wal_reader = WalReader::new(
404 0 : self.conf.workdir.clone(),
405 0 : self.conf.timeline_dir(&tli.ttid),
406 0 : &persisted_state,
407 0 : start_pos,
408 0 : self.conf.is_wal_backup_enabled(),
409 0 : )?;
410 :
411 : // Split to concurrently receive and send data; replies are generally
412 : // not synchronized with sends, so this avoids deadlocks.
413 0 : let reader = pgb.split().context("START_REPLICATION split")?;
414 :
415 0 : let mut sender = WalSender {
416 0 : pgb,
417 0 : tli: tli.clone(),
418 0 : appname,
419 0 : start_pos,
420 0 : end_pos,
421 0 : term,
422 0 : end_watch,
423 0 : ws_guard: ws_guard.clone(),
424 0 : wal_reader,
425 0 : send_buf: [0; MAX_SEND_SIZE],
426 0 : };
427 0 : let mut reply_reader = ReplyReader {
428 0 : reader,
429 0 : ws_guard: ws_guard.clone(),
430 0 : tli,
431 0 : };
432 :
433 0 : let res = tokio::select! {
434 : // todo: add read|write .context to these errors
435 0 : r = sender.run() => r,
436 0 : r = reply_reader.run() => r,
437 : };
438 :
439 0 : let ws_state = ws_guard
440 0 : .walsenders
441 0 : .mutex
442 0 : .lock()
443 0 : .get_slot(ws_guard.id)
444 0 : .clone();
445 0 : info!(
446 0 : "finished streaming to {}, feedback={:?}",
447 0 : ws_state.addr, ws_state.feedback,
448 0 : );
449 :
450 : // Join pg backend back.
451 0 : pgb.unsplit(reply_reader.reader)?;
452 :
453 0 : res
454 0 : }
455 : }
456 :
457 : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
458 : /// given term (recovery by walproposer or peer safekeeper).
459 : enum EndWatch {
460 : Commit(Receiver<Lsn>),
461 : Flush(Receiver<TermLsn>),
462 : }
463 :
464 : impl EndWatch {
465 : /// Get current end of WAL.
466 0 : fn get(&self) -> Lsn {
467 0 : match self {
468 0 : EndWatch::Commit(r) => *r.borrow(),
469 0 : EndWatch::Flush(r) => r.borrow().lsn,
470 : }
471 0 : }
472 :
473 : /// Wait for the update.
474 0 : async fn changed(&mut self) -> anyhow::Result<()> {
475 0 : match self {
476 0 : EndWatch::Commit(r) => r.changed().await?,
477 0 : EndWatch::Flush(r) => r.changed().await?,
478 : }
479 0 : Ok(())
480 0 : }
481 : }
482 :
483 : /// A half driving sending WAL.
484 : struct WalSender<'a, IO> {
485 : pgb: &'a mut PostgresBackend<IO>,
486 : tli: Arc<Timeline>,
487 : appname: Option<String>,
488 : // Position since which we are sending next chunk.
489 : start_pos: Lsn,
490 : // WAL up to this position is known to be locally available.
491 : // Usually this is the same as the latest commit_lsn, but in case of
492 : // walproposer recovery, this is flush_lsn.
493 : //
494 : // We send this LSN to the receiver as wal_end, so that it knows how much
495 : // WAL this safekeeper has. This LSN should be as fresh as possible.
496 : end_pos: Lsn,
497 : /// When streaming uncommitted part, the term the client acts as the leader
498 : /// in. Streaming is stopped if local term changes to a different (higher)
499 : /// value.
500 : term: Option<Term>,
501 : /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
502 : end_watch: EndWatch,
503 : ws_guard: Arc<WalSenderGuard>,
504 : wal_reader: WalReader,
505 : // buffer for readling WAL into to send it
506 : send_buf: [u8; MAX_SEND_SIZE],
507 : }
508 :
509 : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
510 : /// Send WAL until
511 : /// - an error occurs
512 : /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
513 : ///
514 : /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
515 : /// convenience.
516 0 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
517 : loop {
518 : // Wait for the next portion if it is not there yet, or just
519 : // update our end of WAL available for sending value, we
520 : // communicate it to the receiver.
521 0 : self.wait_wal().await?;
522 0 : assert!(
523 0 : self.end_pos > self.start_pos,
524 0 : "nothing to send after waiting for WAL"
525 : );
526 :
527 : // try to send as much as available, capped by MAX_SEND_SIZE
528 0 : let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
529 0 : // if we went behind available WAL, back off
530 0 : if chunk_end_pos >= self.end_pos {
531 0 : chunk_end_pos = self.end_pos;
532 0 : } else {
533 0 : // If sending not up to end pos, round down to page boundary to
534 0 : // avoid breaking WAL record not at page boundary, as protocol
535 0 : // demands. See walsender.c (XLogSendPhysical).
536 0 : chunk_end_pos = chunk_end_pos
537 0 : .checked_sub(chunk_end_pos.block_offset())
538 0 : .unwrap();
539 0 : }
540 0 : let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
541 0 : let send_buf = &mut self.send_buf[..send_size];
542 : let send_size: usize;
543 : {
544 : // If uncommitted part is being pulled, check that the term is
545 : // still the expected one.
546 0 : let _term_guard = if let Some(t) = self.term {
547 0 : Some(self.tli.acquire_term(t).await?)
548 : } else {
549 0 : None
550 : };
551 : // Read WAL into buffer. send_size can be additionally capped to
552 : // segment boundary here.
553 0 : send_size = self.wal_reader.read(send_buf).await?
554 : };
555 0 : let send_buf = &send_buf[..send_size];
556 0 :
557 0 : // and send it
558 0 : self.pgb
559 0 : .write_message(&BeMessage::XLogData(XLogDataBody {
560 0 : wal_start: self.start_pos.0,
561 0 : wal_end: self.end_pos.0,
562 0 : timestamp: get_current_timestamp(),
563 0 : data: send_buf,
564 0 : }))
565 0 : .await?;
566 :
567 0 : if let Some(appname) = &self.appname {
568 0 : if appname == "replica" {
569 0 : failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
570 0 : }
571 0 : }
572 0 : trace!(
573 0 : "sent {} bytes of WAL {}-{}",
574 0 : send_size,
575 0 : self.start_pos,
576 0 : self.start_pos + send_size as u64
577 0 : );
578 0 : self.start_pos += send_size as u64;
579 : }
580 0 : }
581 :
582 : /// wait until we have WAL to stream, sending keepalives and checking for
583 : /// exit in the meanwhile
584 0 : async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
585 : loop {
586 0 : self.end_pos = self.end_watch.get();
587 0 : if self.end_pos > self.start_pos {
588 : // We have something to send.
589 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
590 0 : return Ok(());
591 0 : }
592 :
593 : // Wait for WAL to appear, now self.end_pos == self.start_pos.
594 0 : if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
595 0 : self.end_pos = lsn;
596 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
597 0 : return Ok(());
598 0 : }
599 0 :
600 0 : // Timed out waiting for WAL, check for termination and send KA.
601 0 : // Check for termination only if we are streaming up to commit_lsn
602 0 : // (to pageserver).
603 0 : if let EndWatch::Commit(_) = self.end_watch {
604 0 : if let Some(remote_consistent_lsn) = self
605 0 : .ws_guard
606 0 : .walsenders
607 0 : .get_ws_remote_consistent_lsn(self.ws_guard.id)
608 : {
609 0 : if self.tli.should_walsender_stop(remote_consistent_lsn).await {
610 : // Terminate if there is nothing more to send.
611 : // Note that "ending streaming" part of the string is used by
612 : // pageserver to identify WalReceiverError::SuccessfulCompletion,
613 : // do not change this string without updating pageserver.
614 0 : return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
615 0 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
616 0 : self.appname, self.start_pos,
617 0 : )));
618 0 : }
619 0 : }
620 0 : }
621 :
622 0 : self.pgb
623 0 : .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
624 0 : wal_end: self.end_pos.0,
625 0 : timestamp: get_current_timestamp(),
626 0 : request_reply: true,
627 0 : }))
628 0 : .await?;
629 : }
630 0 : }
631 : }
632 :
633 : /// A half driving receiving replies.
634 : struct ReplyReader<IO> {
635 : reader: PostgresBackendReader<IO>,
636 : ws_guard: Arc<WalSenderGuard>,
637 : tli: Arc<Timeline>,
638 : }
639 :
640 : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
641 0 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
642 : loop {
643 0 : let msg = self.reader.read_copy_message().await?;
644 0 : self.handle_feedback(&msg).await?
645 : }
646 0 : }
647 :
648 0 : async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
649 0 : match msg.first().cloned() {
650 0 : Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
651 0 : // Note: deserializing is on m[1..] because we skip the tag byte.
652 0 : let hs_feedback = HotStandbyFeedback::des(&msg[1..])
653 0 : .context("failed to deserialize HotStandbyFeedback")?;
654 0 : self.ws_guard
655 0 : .walsenders
656 0 : .record_hs_feedback(self.ws_guard.id, &hs_feedback);
657 : }
658 0 : Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
659 0 : let reply =
660 0 : StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
661 0 : self.ws_guard
662 0 : .walsenders
663 0 : .record_standby_reply(self.ws_guard.id, &reply);
664 : }
665 : Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
666 : // pageserver sends this.
667 : // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
668 0 : let buf = Bytes::copy_from_slice(&msg[9..]);
669 0 : let ps_feedback = PageserverFeedback::parse(buf);
670 0 :
671 0 : trace!("PageserverFeedback is {:?}", ps_feedback);
672 0 : self.ws_guard
673 0 : .walsenders
674 0 : .record_ps_feedback(self.ws_guard.id, &ps_feedback);
675 0 : self.tli
676 0 : .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
677 0 : .await;
678 : // in principle new remote_consistent_lsn could allow to
679 : // deactivate the timeline, but we check that regularly through
680 : // broker updated, not need to do it here
681 : }
682 0 : _ => warn!("unexpected message {:?}", msg),
683 : }
684 0 : Ok(())
685 0 : }
686 : }
687 :
688 : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
689 :
690 : /// Wait until we have available WAL > start_pos or timeout expires. Returns
691 : /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
692 : /// - Ok(None) if timeout expired;
693 : /// - Err in case of error -- only if 1) term changed while fetching in recovery
694 : /// mode 2) watch channel closed, which must never happen.
695 0 : async fn wait_for_lsn(
696 0 : rx: &mut EndWatch,
697 0 : client_term: Option<Term>,
698 0 : start_pos: Lsn,
699 0 : ) -> anyhow::Result<Option<Lsn>> {
700 0 : let res = timeout(POLL_STATE_TIMEOUT, async move {
701 : loop {
702 0 : let end_pos = rx.get();
703 0 : if end_pos > start_pos {
704 0 : return Ok(end_pos);
705 0 : }
706 0 : if let EndWatch::Flush(rx) = rx {
707 0 : let curr_term = rx.borrow().term;
708 0 : if let Some(client_term) = client_term {
709 0 : if curr_term != client_term {
710 0 : bail!("term changed: requested {}, now {}", client_term, curr_term);
711 0 : }
712 0 : }
713 0 : }
714 0 : rx.changed().await?;
715 : }
716 0 : })
717 0 : .await;
718 :
719 0 : match res {
720 : // success
721 0 : Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
722 : // error inside closure
723 0 : Ok(Err(err)) => Err(err),
724 : // timeout
725 0 : Err(_) => Ok(None),
726 : }
727 0 : }
728 :
729 : #[cfg(test)]
730 : mod tests {
731 : use utils::id::{TenantId, TimelineId};
732 :
733 : use super::*;
734 :
735 8 : fn mock_ttid() -> TenantTimelineId {
736 8 : TenantTimelineId {
737 8 : tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
738 8 : timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
739 8 : }
740 8 : }
741 :
742 8 : fn mock_addr() -> SocketAddr {
743 8 : "127.0.0.1:8080".parse().unwrap()
744 8 : }
745 :
746 : // add to wss specified feedback setting other fields to dummy values
747 8 : fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
748 8 : let walsender_state = WalSenderState {
749 8 : ttid: mock_ttid(),
750 8 : addr: mock_addr(),
751 8 : conn_id: 1,
752 8 : appname: None,
753 8 : feedback,
754 8 : };
755 8 : wss.slots.push(Some(walsender_state))
756 8 : }
757 :
758 : // form standby feedback with given hot standby feedback ts/xmin and the
759 : // rest set to dummy values.
760 8 : fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
761 8 : ReplicationFeedback::Standby(StandbyFeedback {
762 8 : reply: StandbyReply::empty(),
763 8 : hs_feedback: HotStandbyFeedback {
764 8 : ts,
765 8 : xmin,
766 8 : catalog_xmin: 0,
767 8 : },
768 8 : })
769 8 : }
770 :
771 : // test that hs aggregation works as expected
772 : #[test]
773 2 : fn test_hs_feedback_no_valid() {
774 2 : let mut wss = WalSendersShared::new();
775 2 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
776 2 : wss.update_hs_feedback();
777 2 : assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID);
778 2 : }
779 :
780 : #[test]
781 2 : fn test_hs_feedback() {
782 2 : let mut wss = WalSendersShared::new();
783 2 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
784 2 : push_feedback(&mut wss, hs_feedback(1, 42));
785 2 : push_feedback(&mut wss, hs_feedback(1, 64));
786 2 : wss.update_hs_feedback();
787 2 : assert_eq!(wss.agg_hs_feedback.xmin, 42);
788 2 : }
789 : }
|