Line data Source code
1 : //! This module implements the streaming side of replication protocol, starting
2 : //! with the "START_REPLICATION" message, and registry of walsenders.
3 :
4 : use crate::handler::SafekeeperPostgresHandler;
5 : use crate::safekeeper::{Term, TermLsn};
6 : use crate::timeline::Timeline;
7 : use crate::wal_service::ConnectionId;
8 : use crate::wal_storage::WalReader;
9 : use crate::GlobalTimelines;
10 : use anyhow::{bail, Context as AnyhowContext};
11 : use bytes::Bytes;
12 : use parking_lot::Mutex;
13 : use postgres_backend::PostgresBackend;
14 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
15 : use postgres_ffi::get_current_timestamp;
16 : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
17 : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
18 : use serde::{Deserialize, Serialize};
19 : use tokio::io::{AsyncRead, AsyncWrite};
20 : use utils::failpoint_support;
21 : use utils::id::TenantTimelineId;
22 : use utils::pageserver_feedback::PageserverFeedback;
23 :
24 : use std::cmp::{max, min};
25 : use std::net::SocketAddr;
26 : use std::str;
27 : use std::sync::Arc;
28 : use std::time::Duration;
29 : use tokio::sync::watch::Receiver;
30 : use tokio::time::timeout;
31 : use tracing::*;
32 : use utils::{bin_ser::BeSer, lsn::Lsn};
33 :
34 : // See: https://www.postgresql.org/docs/13/protocol-replication.html
35 : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
36 : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
37 : // neon extension of replication protocol
38 : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
39 :
40 : type FullTransactionId = u64;
41 :
42 : /// Hot standby feedback received from replica
43 6 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
44 : pub struct HotStandbyFeedback {
45 : pub ts: TimestampTz,
46 : pub xmin: FullTransactionId,
47 : pub catalog_xmin: FullTransactionId,
48 : }
49 :
50 : const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
51 :
52 : impl HotStandbyFeedback {
53 1539733 : pub fn empty() -> HotStandbyFeedback {
54 1539733 : HotStandbyFeedback {
55 1539733 : ts: 0,
56 1539733 : xmin: 0,
57 1539733 : catalog_xmin: 0,
58 1539733 : }
59 1539733 : }
60 : }
61 :
62 : /// Standby status update
63 1189 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
64 : pub struct StandbyReply {
65 : pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
66 : pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
67 : pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
68 : pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
69 : pub reply_requested: bool,
70 : }
71 :
72 : impl StandbyReply {
73 8 : fn empty() -> Self {
74 8 : StandbyReply {
75 8 : write_lsn: Lsn::INVALID,
76 8 : flush_lsn: Lsn::INVALID,
77 8 : apply_lsn: Lsn::INVALID,
78 8 : reply_ts: 0,
79 8 : reply_requested: false,
80 8 : }
81 8 : }
82 : }
83 :
84 3 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
85 : pub struct StandbyFeedback {
86 : reply: StandbyReply,
87 : hs_feedback: HotStandbyFeedback,
88 : }
89 :
90 : /// WalSenders registry. Timeline holds it (wrapped in Arc).
91 : pub struct WalSenders {
92 : mutex: Mutex<WalSendersShared>,
93 : }
94 :
95 : impl WalSenders {
96 614 : pub fn new() -> Arc<WalSenders> {
97 614 : Arc::new(WalSenders {
98 614 : mutex: Mutex::new(WalSendersShared::new()),
99 614 : })
100 614 : }
101 :
102 : /// Register new walsender. Returned guard provides access to the slot and
103 : /// automatically deregisters in Drop.
104 787 : fn register(
105 787 : self: &Arc<WalSenders>,
106 787 : ttid: TenantTimelineId,
107 787 : addr: SocketAddr,
108 787 : conn_id: ConnectionId,
109 787 : appname: Option<String>,
110 787 : ) -> WalSenderGuard {
111 787 : let slots = &mut self.mutex.lock().slots;
112 787 : let walsender_state = WalSenderState {
113 787 : ttid,
114 787 : addr,
115 787 : conn_id,
116 787 : appname,
117 787 : feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
118 787 : };
119 : // find empty slot or create new one
120 787 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
121 255 : slots[pos] = Some(walsender_state);
122 255 : pos
123 : } else {
124 532 : let pos = slots.len();
125 532 : slots.push(Some(walsender_state));
126 532 : pos
127 : };
128 787 : WalSenderGuard {
129 787 : id: pos,
130 787 : walsenders: self.clone(),
131 787 : }
132 787 : }
133 :
134 : /// Get state of all walsenders.
135 255 : pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
136 255 : self.mutex.lock().slots.iter().flatten().cloned().collect()
137 255 : }
138 :
139 : /// Get aggregated pageserver feedback.
140 51 : pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
141 51 : self.mutex.lock().agg_ps_feedback
142 51 : }
143 :
144 : /// Get aggregated pageserver and hot standby feedback (we send them to compute).
145 1521032 : pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, HotStandbyFeedback) {
146 1521032 : let shared = self.mutex.lock();
147 1521032 : (shared.agg_ps_feedback, shared.agg_hs_feedback)
148 1521032 : }
149 :
150 : /// Record new pageserver feedback, update aggregated values.
151 752063 : fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
152 752063 : let mut shared = self.mutex.lock();
153 752063 : shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
154 752063 : shared.update_ps_feedback();
155 752063 : }
156 :
157 : /// Record standby reply.
158 1189 : fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
159 1189 : let mut shared = self.mutex.lock();
160 1189 : let slot = shared.get_slot_mut(id);
161 1189 : match &mut slot.feedback {
162 1186 : ReplicationFeedback::Standby(sf) => sf.reply = *reply,
163 : ReplicationFeedback::Pageserver(_) => {
164 3 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
165 3 : reply: *reply,
166 3 : hs_feedback: HotStandbyFeedback::empty(),
167 3 : })
168 : }
169 : }
170 1189 : }
171 :
172 : /// Record hot standby feedback, update aggregated value.
173 2 : fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
174 2 : let mut shared = self.mutex.lock();
175 2 : let slot = shared.get_slot_mut(id);
176 2 : match &mut slot.feedback {
177 2 : ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
178 : ReplicationFeedback::Pageserver(_) => {
179 0 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
180 0 : reply: StandbyReply::empty(),
181 0 : hs_feedback: *feedback,
182 0 : })
183 : }
184 : }
185 2 : shared.update_hs_feedback();
186 2 : }
187 :
188 : /// Get remote_consistent_lsn reported by the pageserver. Returns None if
189 : /// client is not pageserver.
190 3547 : fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
191 3547 : let shared = self.mutex.lock();
192 3547 : let slot = shared.get_slot(id);
193 3547 : match slot.feedback {
194 3547 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
195 0 : _ => None,
196 : }
197 3547 : }
198 :
199 : /// Unregister walsender.
200 423 : fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
201 423 : let mut shared = self.mutex.lock();
202 423 : shared.slots[id] = None;
203 423 : shared.update_hs_feedback();
204 423 : }
205 : }
206 :
207 : struct WalSendersShared {
208 : // aggregated over all walsenders value
209 : agg_hs_feedback: HotStandbyFeedback,
210 : // aggregated over all walsenders value
211 : agg_ps_feedback: PageserverFeedback,
212 : slots: Vec<Option<WalSenderState>>,
213 : }
214 :
215 : impl WalSendersShared {
216 620 : fn new() -> Self {
217 620 : WalSendersShared {
218 620 : agg_hs_feedback: HotStandbyFeedback::empty(),
219 620 : agg_ps_feedback: PageserverFeedback::empty(),
220 620 : slots: Vec::new(),
221 620 : }
222 620 : }
223 :
224 : /// Get content of provided id slot, it must exist.
225 3547 : fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
226 3547 : self.slots[id].as_ref().expect("walsender doesn't exist")
227 3547 : }
228 :
229 : /// Get mut content of provided id slot, it must exist.
230 753254 : fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
231 753254 : self.slots[id].as_mut().expect("walsender doesn't exist")
232 753254 : }
233 :
234 : /// Update aggregated hot standy feedback. We just take min of valid xmins
235 : /// and ts.
236 429 : fn update_hs_feedback(&mut self) {
237 429 : let mut agg = HotStandbyFeedback::empty();
238 429 : for ws_state in self.slots.iter().flatten() {
239 207 : if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
240 10 : let hs_feedback = standby_feedback.hs_feedback;
241 10 : // doing Option math like op1.iter().chain(op2.iter()).min()
242 10 : // would be nicer, but we serialize/deserialize this struct
243 10 : // directly, so leave as is for now
244 10 : if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
245 4 : if agg.xmin != INVALID_FULL_TRANSACTION_ID {
246 2 : agg.xmin = min(agg.xmin, hs_feedback.xmin);
247 2 : } else {
248 2 : agg.xmin = hs_feedback.xmin;
249 2 : }
250 4 : agg.ts = min(agg.ts, hs_feedback.ts);
251 6 : }
252 10 : if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
253 0 : if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
254 0 : agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
255 0 : } else {
256 0 : agg.catalog_xmin = hs_feedback.catalog_xmin;
257 0 : }
258 0 : agg.ts = min(agg.ts, hs_feedback.ts);
259 10 : }
260 197 : }
261 : }
262 429 : self.agg_hs_feedback = agg;
263 429 : }
264 :
265 : /// Update aggregated pageserver feedback. LSNs (last_received,
266 : /// disk_consistent, remote_consistent) and reply timestamp are just
267 : /// maximized; timeline_size if taken from feedback with highest
268 : /// last_received lsn. This is generally reasonable, but we might want to
269 : /// implement other policies once multiple pageservers start to be actively
270 : /// used.
271 752065 : fn update_ps_feedback(&mut self) {
272 752065 : let init = PageserverFeedback::empty();
273 752065 : let acc =
274 752065 : self.slots
275 752065 : .iter()
276 752065 : .flatten()
277 1120121 : .fold(init, |mut acc, ws_state| match ws_state.feedback {
278 1119690 : ReplicationFeedback::Pageserver(feedback) => {
279 1119690 : if feedback.last_received_lsn > acc.last_received_lsn {
280 831423 : acc.current_timeline_size = feedback.current_timeline_size;
281 831423 : }
282 1119690 : acc.last_received_lsn =
283 1119690 : max(feedback.last_received_lsn, acc.last_received_lsn);
284 1119690 : acc.disk_consistent_lsn =
285 1119690 : max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn);
286 1119690 : acc.remote_consistent_lsn =
287 1119690 : max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn);
288 1119690 : acc.replytime = max(feedback.replytime, acc.replytime);
289 1119690 : acc
290 : }
291 431 : ReplicationFeedback::Standby(_) => acc,
292 1120121 : });
293 752065 : self.agg_ps_feedback = acc;
294 752065 : }
295 : }
296 :
297 : // Serialized is used only for pretty printing in json.
298 78 : #[derive(Debug, Clone, Serialize, Deserialize)]
299 : pub struct WalSenderState {
300 : ttid: TenantTimelineId,
301 : addr: SocketAddr,
302 : conn_id: ConnectionId,
303 : // postgres application_name
304 : appname: Option<String>,
305 : feedback: ReplicationFeedback,
306 : }
307 :
308 : // Receiver is either pageserver or regular standby, which have different
309 : // feedbacks.
310 78 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
311 : enum ReplicationFeedback {
312 : Pageserver(PageserverFeedback),
313 : Standby(StandbyFeedback),
314 : }
315 :
316 : // id of the occupied slot in WalSenders to access it (and save in the
317 : // WalSenderGuard). We could give Arc directly to the slot, but there is not
318 : // much sense in that as values aggregation which is performed on each feedback
319 : // receival iterates over all walsenders.
320 : pub type WalSenderId = usize;
321 :
322 : /// Scope guard to access slot in WalSenders registry and unregister from it in
323 : /// Drop.
324 : pub struct WalSenderGuard {
325 : id: WalSenderId,
326 : walsenders: Arc<WalSenders>,
327 : }
328 :
329 : impl Drop for WalSenderGuard {
330 423 : fn drop(&mut self) {
331 423 : self.walsenders.unregister(self.id);
332 423 : }
333 : }
334 :
335 : impl SafekeeperPostgresHandler {
336 : /// Wrapper around handle_start_replication_guts handling result. Error is
337 : /// handled here while we're still in walsender ttid span; with API
338 : /// extension, this can probably be moved into postgres_backend.
339 787 : pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
340 787 : &mut self,
341 787 : pgb: &mut PostgresBackend<IO>,
342 787 : start_pos: Lsn,
343 787 : term: Option<Term>,
344 787 : ) -> Result<(), QueryError> {
345 787 : if let Err(end) = self
346 787 : .handle_start_replication_guts(pgb, start_pos, term)
347 2933199 : .await
348 : {
349 : // Log the result and probably send it to the client, closing the stream.
350 423 : pgb.handle_copy_stream_end(end).await;
351 0 : }
352 423 : Ok(())
353 423 : }
354 :
355 787 : pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
356 787 : &mut self,
357 787 : pgb: &mut PostgresBackend<IO>,
358 787 : start_pos: Lsn,
359 787 : term: Option<Term>,
360 787 : ) -> Result<(), CopyStreamHandlerEnd> {
361 787 : let appname = self.appname.clone();
362 787 : let tli =
363 787 : GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
364 :
365 : // Use a guard object to remove our entry from the timeline when we are done.
366 787 : let ws_guard = Arc::new(tli.get_walsenders().register(
367 787 : self.ttid,
368 787 : *pgb.get_peer_addr(),
369 787 : self.conn_id,
370 787 : self.appname.clone(),
371 787 : ));
372 :
373 : // Walsender can operate in one of two modes which we select by
374 : // application_name: give only committed WAL (used by pageserver) or all
375 : // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
376 : // The second case is always driven by a consensus leader which term
377 : // must be supplied.
378 787 : let end_watch = if term.is_some() {
379 21 : EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
380 : } else {
381 766 : EndWatch::Commit(tli.get_commit_lsn_watch_rx())
382 : };
383 : // we don't check term here; it will be checked on first waiting/WAL reading anyway.
384 787 : let end_pos = end_watch.get();
385 787 :
386 787 : if end_pos < start_pos {
387 12 : warn!(
388 12 : "requested start_pos {} is ahead of available WAL end_pos {}",
389 12 : start_pos, end_pos
390 12 : );
391 775 : }
392 :
393 787 : info!(
394 787 : "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
395 787 : start_pos,
396 787 : end_pos,
397 787 : matches!(end_watch, EndWatch::Flush(_)),
398 787 : appname
399 787 : );
400 :
401 : // switch to copy
402 787 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
403 :
404 787 : let (_, persisted_state) = tli.get_state().await;
405 787 : let wal_reader = WalReader::new(
406 787 : self.conf.workdir.clone(),
407 787 : self.conf.timeline_dir(&tli.ttid),
408 787 : &persisted_state,
409 787 : start_pos,
410 787 : self.conf.is_wal_backup_enabled(),
411 787 : )?;
412 :
413 : // Split to concurrently receive and send data; replies are generally
414 : // not synchronized with sends, so this avoids deadlocks.
415 787 : let reader = pgb.split().context("START_REPLICATION split")?;
416 :
417 787 : let mut sender = WalSender {
418 787 : pgb,
419 787 : tli: tli.clone(),
420 787 : appname,
421 787 : start_pos,
422 787 : end_pos,
423 787 : term,
424 787 : end_watch,
425 787 : ws_guard: ws_guard.clone(),
426 787 : wal_reader,
427 787 : send_buf: [0; MAX_SEND_SIZE],
428 787 : };
429 787 : let mut reply_reader = ReplyReader {
430 787 : reader,
431 787 : ws_guard,
432 787 : tli,
433 787 : };
434 :
435 787 : let res = tokio::select! {
436 : // todo: add read|write .context to these errors
437 57 : r = sender.run() => r,
438 366 : r = reply_reader.run() => r,
439 : };
440 : // Join pg backend back.
441 423 : pgb.unsplit(reply_reader.reader)?;
442 :
443 423 : res
444 423 : }
445 : }
446 :
447 : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
448 : /// given term (recovery by walproposer or peer safekeeper).
449 : enum EndWatch {
450 : Commit(Receiver<Lsn>),
451 : Flush(Receiver<TermLsn>),
452 : }
453 :
454 : impl EndWatch {
455 : /// Get current end of WAL.
456 3108094 : fn get(&self) -> Lsn {
457 3108094 : match self {
458 3105845 : EndWatch::Commit(r) => *r.borrow(),
459 2249 : EndWatch::Flush(r) => r.borrow().lsn,
460 : }
461 3108094 : }
462 :
463 : /// Wait for the update.
464 1693603 : async fn changed(&mut self) -> anyhow::Result<()> {
465 1693603 : match self {
466 2166253 : EndWatch::Commit(r) => r.changed().await?,
467 40 : EndWatch::Flush(r) => r.changed().await?,
468 : }
469 1689348 : Ok(())
470 1689348 : }
471 : }
472 :
473 : /// A half driving sending WAL.
474 : struct WalSender<'a, IO> {
475 : pgb: &'a mut PostgresBackend<IO>,
476 : tli: Arc<Timeline>,
477 : appname: Option<String>,
478 : // Position since which we are sending next chunk.
479 : start_pos: Lsn,
480 : // WAL up to this position is known to be locally available.
481 : // Usually this is the same as the latest commit_lsn, but in case of
482 : // walproposer recovery, this is flush_lsn.
483 : //
484 : // We send this LSN to the receiver as wal_end, so that it knows how much
485 : // WAL this safekeeper has. This LSN should be as fresh as possible.
486 : end_pos: Lsn,
487 : /// When streaming uncommitted part, the term the client acts as the leader
488 : /// in. Streaming is stopped if local term changes to a different (higher)
489 : /// value.
490 : term: Option<Term>,
491 : /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
492 : end_watch: EndWatch,
493 : ws_guard: Arc<WalSenderGuard>,
494 : wal_reader: WalReader,
495 : // buffer for readling WAL into to send it
496 : send_buf: [u8; MAX_SEND_SIZE],
497 : }
498 :
499 : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
500 : /// Send WAL until
501 : /// - an error occurs
502 : /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
503 : ///
504 : /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
505 : /// convenience.
506 787 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
507 : loop {
508 : // Wait for the next portion if it is not there yet, or just
509 : // update our end of WAL available for sending value, we
510 : // communicate it to the receiver.
511 2166469 : self.wait_wal().await?;
512 754621 : assert!(
513 754621 : self.end_pos > self.start_pos,
514 0 : "nothing to send after waiting for WAL"
515 : );
516 :
517 : // try to send as much as available, capped by MAX_SEND_SIZE
518 754621 : let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
519 754621 : // if we went behind available WAL, back off
520 754621 : if chunk_end_pos >= self.end_pos {
521 674303 : chunk_end_pos = self.end_pos;
522 674303 : } else {
523 80318 : // If sending not up to end pos, round down to page boundary to
524 80318 : // avoid breaking WAL record not at page boundary, as protocol
525 80318 : // demands. See walsender.c (XLogSendPhysical).
526 80318 : chunk_end_pos = chunk_end_pos
527 80318 : .checked_sub(chunk_end_pos.block_offset())
528 80318 : .unwrap();
529 80318 : }
530 754621 : let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
531 754621 : let send_buf = &mut self.send_buf[..send_size];
532 : let send_size: usize;
533 : {
534 : // If uncommitted part is being pulled, check that the term is
535 : // still the expected one.
536 754621 : let _term_guard = if let Some(t) = self.term {
537 2174 : Some(self.tli.acquire_term(t).await?)
538 : } else {
539 752447 : None
540 : };
541 : // Read WAL into buffer. send_size can be additionally capped to
542 : // segment boundary here.
543 754620 : send_size = self.wal_reader.read(send_buf).await?
544 : };
545 754616 : let send_buf = &send_buf[..send_size];
546 754616 :
547 754616 : // and send it
548 754616 : self.pgb
549 754616 : .write_message(&BeMessage::XLogData(XLogDataBody {
550 754616 : wal_start: self.start_pos.0,
551 754616 : wal_end: self.end_pos.0,
552 754616 : timestamp: get_current_timestamp(),
553 754616 : data: send_buf,
554 754616 : }))
555 22115 : .await?;
556 :
557 754585 : if let Some(appname) = &self.appname {
558 752555 : if appname == "replica" {
559 0 : failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
560 752016 : }
561 2030 : }
562 0 : trace!(
563 0 : "sent {} bytes of WAL {}-{}",
564 0 : send_size,
565 0 : self.start_pos,
566 0 : self.start_pos + send_size as u64
567 0 : );
568 754585 : self.start_pos += send_size as u64;
569 : }
570 57 : }
571 :
572 : /// wait until we have WAL to stream, sending keepalives and checking for
573 : /// exit in the meanwhile
574 755372 : async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
575 : loop {
576 758876 : self.end_pos = self.end_watch.get();
577 758876 : if self.end_pos > self.start_pos {
578 : // We have something to send.
579 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
580 99793 : return Ok(());
581 659083 : }
582 :
583 : // Wait for WAL to appear, now self.end_pos == self.start_pos.
584 2166293 : if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
585 654828 : self.end_pos = lsn;
586 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
587 654828 : return Ok(());
588 3548 : }
589 3548 :
590 3548 : // Timed out waiting for WAL, check for termination and send KA.
591 3548 : // Check for termination only if we are streaming up to commit_lsn
592 3548 : // (to pageserver).
593 3548 : if let EndWatch::Commit(_) = self.end_watch {
594 3547 : if let Some(remote_consistent_lsn) = self
595 3547 : .ws_guard
596 3547 : .walsenders
597 3547 : .get_ws_remote_consistent_lsn(self.ws_guard.id)
598 : {
599 3547 : if self.tli.should_walsender_stop(remote_consistent_lsn).await {
600 : // Terminate if there is nothing more to send.
601 : // Note that "ending streaming" part of the string is used by
602 : // pageserver to identify WalReceiverError::SuccessfulCompletion,
603 : // do not change this string without updating pageserver.
604 44 : return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
605 44 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
606 44 : self.appname, self.start_pos,
607 44 : )));
608 3503 : }
609 0 : }
610 1 : }
611 :
612 3504 : self.pgb
613 3504 : .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
614 3504 : wal_end: self.end_pos.0,
615 3504 : timestamp: get_current_timestamp(),
616 3504 : request_reply: true,
617 3504 : }))
618 0 : .await?;
619 : }
620 754665 : }
621 : }
622 :
623 : /// A half driving receiving replies.
624 : struct ReplyReader<IO> {
625 : reader: PostgresBackendReader<IO>,
626 : ws_guard: Arc<WalSenderGuard>,
627 : tli: Arc<Timeline>,
628 : }
629 :
630 : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
631 787 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
632 : loop {
633 2806075 : let msg = self.reader.read_copy_message().await?;
634 753254 : self.handle_feedback(&msg).await?
635 : }
636 366 : }
637 :
638 753254 : async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
639 753254 : match msg.first().cloned() {
640 2 : Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
641 2 : // Note: deserializing is on m[1..] because we skip the tag byte.
642 2 : let hs_feedback = HotStandbyFeedback::des(&msg[1..])
643 2 : .context("failed to deserialize HotStandbyFeedback")?;
644 2 : self.ws_guard
645 2 : .walsenders
646 2 : .record_hs_feedback(self.ws_guard.id, &hs_feedback);
647 : }
648 1189 : Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
649 1189 : let reply =
650 1189 : StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
651 1189 : self.ws_guard
652 1189 : .walsenders
653 1189 : .record_standby_reply(self.ws_guard.id, &reply);
654 : }
655 : Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
656 : // pageserver sends this.
657 : // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
658 752063 : let buf = Bytes::copy_from_slice(&msg[9..]);
659 752063 : let ps_feedback = PageserverFeedback::parse(buf);
660 :
661 0 : trace!("PageserverFeedback is {:?}", ps_feedback);
662 752063 : self.ws_guard
663 752063 : .walsenders
664 752063 : .record_ps_feedback(self.ws_guard.id, &ps_feedback);
665 752063 : self.tli
666 752063 : .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
667 127017 : .await;
668 : // in principle new remote_consistent_lsn could allow to
669 : // deactivate the timeline, but we check that regularly through
670 : // broker updated, not need to do it here
671 : }
672 0 : _ => warn!("unexpected message {:?}", msg),
673 : }
674 753254 : Ok(())
675 753254 : }
676 : }
677 :
678 : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
679 :
680 : /// Wait until we have available WAL > start_pos or timeout expires. Returns
681 : /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
682 : /// - Ok(None) if timeout expired;
683 : /// - Err in case of error -- only if 1) term changed while fetching in recovery
684 : /// mode 2) watch channel closed, which must never happen.
685 659083 : async fn wait_for_lsn(
686 659083 : rx: &mut EndWatch,
687 659083 : client_term: Option<Term>,
688 659083 : start_pos: Lsn,
689 659083 : ) -> anyhow::Result<Option<Lsn>> {
690 659083 : let res = timeout(POLL_STATE_TIMEOUT, async move {
691 : loop {
692 2348431 : let end_pos = rx.get();
693 2348431 : if end_pos > start_pos {
694 654828 : return Ok(end_pos);
695 1693603 : }
696 1693603 : if let EndWatch::Flush(rx) = rx {
697 36 : let curr_term = rx.borrow().term;
698 36 : if let Some(client_term) = client_term {
699 36 : if curr_term != client_term {
700 0 : bail!("term changed: requested {}, now {}", client_term, curr_term);
701 36 : }
702 0 : }
703 1693567 : }
704 2166293 : rx.changed().await?;
705 : }
706 659083 : })
707 2166293 : .await;
708 :
709 654828 : match res {
710 : // success
711 654828 : Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
712 : // error inside closure
713 0 : Ok(Err(err)) => Err(err),
714 : // timeout
715 3548 : Err(_) => Ok(None),
716 : }
717 658376 : }
718 :
719 : #[cfg(test)]
720 : mod tests {
721 : use postgres_protocol::PG_EPOCH;
722 : use utils::id::{TenantId, TimelineId};
723 :
724 : use super::*;
725 :
726 12 : fn mock_ttid() -> TenantTimelineId {
727 12 : TenantTimelineId {
728 12 : tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
729 12 : timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
730 12 : }
731 12 : }
732 :
733 12 : fn mock_addr() -> SocketAddr {
734 12 : "127.0.0.1:8080".parse().unwrap()
735 12 : }
736 :
737 : // add to wss specified feedback setting other fields to dummy values
738 12 : fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
739 12 : let walsender_state = WalSenderState {
740 12 : ttid: mock_ttid(),
741 12 : addr: mock_addr(),
742 12 : conn_id: 1,
743 12 : appname: None,
744 12 : feedback,
745 12 : };
746 12 : wss.slots.push(Some(walsender_state))
747 12 : }
748 :
749 : // form standby feedback with given hot standby feedback ts/xmin and the
750 : // rest set to dummy values.
751 8 : fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
752 8 : ReplicationFeedback::Standby(StandbyFeedback {
753 8 : reply: StandbyReply::empty(),
754 8 : hs_feedback: HotStandbyFeedback {
755 8 : ts,
756 8 : xmin,
757 8 : catalog_xmin: 0,
758 8 : },
759 8 : })
760 8 : }
761 :
762 : // test that hs aggregation works as expected
763 2 : #[test]
764 2 : fn test_hs_feedback_no_valid() {
765 2 : let mut wss = WalSendersShared::new();
766 2 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
767 2 : wss.update_hs_feedback();
768 2 : assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID);
769 2 : }
770 :
771 2 : #[test]
772 2 : fn test_hs_feedback() {
773 2 : let mut wss = WalSendersShared::new();
774 2 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
775 2 : push_feedback(&mut wss, hs_feedback(1, 42));
776 2 : push_feedback(&mut wss, hs_feedback(1, 64));
777 2 : wss.update_hs_feedback();
778 2 : assert_eq!(wss.agg_hs_feedback.xmin, 42);
779 2 : }
780 :
781 : // form pageserver feedback with given last_record_lsn / tli size and the
782 : // rest set to dummy values.
783 4 : fn ps_feedback(current_timeline_size: u64, last_received_lsn: Lsn) -> ReplicationFeedback {
784 4 : ReplicationFeedback::Pageserver(PageserverFeedback {
785 4 : current_timeline_size,
786 4 : last_received_lsn,
787 4 : disk_consistent_lsn: Lsn::INVALID,
788 4 : remote_consistent_lsn: Lsn::INVALID,
789 4 : replytime: *PG_EPOCH,
790 4 : })
791 4 : }
792 :
793 : // test that ps aggregation works as expected
794 2 : #[test]
795 2 : fn test_ps_feedback() {
796 2 : let mut wss = WalSendersShared::new();
797 2 : push_feedback(&mut wss, ps_feedback(8, Lsn(42)));
798 2 : push_feedback(&mut wss, ps_feedback(4, Lsn(84)));
799 2 : wss.update_ps_feedback();
800 2 : assert_eq!(wss.agg_ps_feedback.current_timeline_size, 4);
801 2 : assert_eq!(wss.agg_ps_feedback.last_received_lsn, Lsn(84));
802 2 : }
803 : }
|