TLA Line data Source code
1 : //! This module implements the streaming side of replication protocol, starting
2 : //! with the "START_REPLICATION" message, and registry of walsenders.
3 :
4 : use crate::handler::SafekeeperPostgresHandler;
5 : use crate::safekeeper::{Term, TermLsn};
6 : use crate::timeline::Timeline;
7 : use crate::wal_service::ConnectionId;
8 : use crate::wal_storage::WalReader;
9 : use crate::GlobalTimelines;
10 : use anyhow::{bail, Context as AnyhowContext};
11 : use bytes::Bytes;
12 : use parking_lot::Mutex;
13 : use postgres_backend::PostgresBackend;
14 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
15 : use postgres_ffi::get_current_timestamp;
16 : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
17 : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
18 : use serde::{Deserialize, Serialize};
19 : use tokio::io::{AsyncRead, AsyncWrite};
20 : use utils::failpoint_support;
21 : use utils::id::TenantTimelineId;
22 : use utils::lsn::AtomicLsn;
23 : use utils::pageserver_feedback::PageserverFeedback;
24 :
25 : use std::cmp::{max, min};
26 : use std::net::SocketAddr;
27 : use std::str;
28 : use std::sync::Arc;
29 : use std::time::Duration;
30 : use tokio::sync::watch::Receiver;
31 : use tokio::time::timeout;
32 : use tracing::*;
33 : use utils::{bin_ser::BeSer, lsn::Lsn};
34 :
35 : // See: https://www.postgresql.org/docs/13/protocol-replication.html
36 : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
37 : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
38 : // neon extension of replication protocol
39 : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
40 :
41 : type FullTransactionId = u64;
42 :
43 : /// Hot standby feedback received from replica
44 CBC 3 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
45 : pub struct HotStandbyFeedback {
46 : pub ts: TimestampTz,
47 : pub xmin: FullTransactionId,
48 : pub catalog_xmin: FullTransactionId,
49 : }
50 :
51 : const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
52 :
53 : impl HotStandbyFeedback {
54 1295244 : pub fn empty() -> HotStandbyFeedback {
55 1295244 : HotStandbyFeedback {
56 1295244 : ts: 0,
57 1295244 : xmin: 0,
58 1295244 : catalog_xmin: 0,
59 1295244 : }
60 1295244 : }
61 : }
62 :
63 : /// Standby status update
64 1295 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
65 : pub struct StandbyReply {
66 : pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
67 : pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
68 : pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
69 : pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
70 : pub reply_requested: bool,
71 : }
72 :
73 : impl StandbyReply {
74 4 : fn empty() -> Self {
75 4 : StandbyReply {
76 4 : write_lsn: Lsn::INVALID,
77 4 : flush_lsn: Lsn::INVALID,
78 4 : apply_lsn: Lsn::INVALID,
79 4 : reply_ts: 0,
80 4 : reply_requested: false,
81 4 : }
82 4 : }
83 : }
84 :
85 LBC (2) : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
86 : pub struct StandbyFeedback {
87 : reply: StandbyReply,
88 : hs_feedback: HotStandbyFeedback,
89 : }
90 :
91 : /// WalSenders registry. Timeline holds it (wrapped in Arc).
92 : pub struct WalSenders {
93 : /// Lsn maximized over all walsenders *and* peer data, so might be higher
94 : /// than what we receive from replicas.
95 : remote_consistent_lsn: AtomicLsn,
96 : mutex: Mutex<WalSendersShared>,
97 : }
98 :
99 : impl WalSenders {
100 CBC 582 : pub fn new(remote_consistent_lsn: Lsn) -> Arc<WalSenders> {
101 582 : Arc::new(WalSenders {
102 582 : remote_consistent_lsn: AtomicLsn::from(remote_consistent_lsn),
103 582 : mutex: Mutex::new(WalSendersShared::new()),
104 582 : })
105 582 : }
106 :
107 : /// Register new walsender. Returned guard provides access to the slot and
108 : /// automatically deregisters in Drop.
109 734 : fn register(
110 734 : self: &Arc<WalSenders>,
111 734 : ttid: TenantTimelineId,
112 734 : addr: SocketAddr,
113 734 : conn_id: ConnectionId,
114 734 : appname: Option<String>,
115 734 : ) -> WalSenderGuard {
116 734 : let slots = &mut self.mutex.lock().slots;
117 734 : let walsender_state = WalSenderState {
118 734 : ttid,
119 734 : addr,
120 734 : conn_id,
121 734 : appname,
122 734 : feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
123 734 : };
124 : // find empty slot or create new one
125 734 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
126 256 : slots[pos] = Some(walsender_state);
127 256 : pos
128 : } else {
129 478 : let pos = slots.len();
130 478 : slots.push(Some(walsender_state));
131 478 : pos
132 : };
133 734 : WalSenderGuard {
134 734 : id: pos,
135 734 : walsenders: self.clone(),
136 734 : }
137 734 : }
138 :
139 : /// Get state of all walsenders.
140 252 : pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
141 252 : self.mutex.lock().slots.iter().flatten().cloned().collect()
142 252 : }
143 :
144 : /// Get aggregated pageserver feedback.
145 51 : pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
146 51 : self.mutex.lock().agg_ps_feedback
147 51 : }
148 :
149 : /// Get aggregated pageserver and hot standby feedback (we send them to compute).
150 1294232 : pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, HotStandbyFeedback) {
151 1294232 : let shared = self.mutex.lock();
152 1294232 : (shared.agg_ps_feedback, shared.agg_hs_feedback)
153 1294232 : }
154 :
155 : /// Record new pageserver feedback, update aggregated values.
156 565811 : fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
157 565811 : let mut shared = self.mutex.lock();
158 565811 : shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
159 565811 : shared.update_ps_feedback();
160 565811 : self.update_remote_consistent_lsn(shared.agg_ps_feedback.remote_consistent_lsn);
161 565811 : }
162 :
163 : /// Record standby reply.
164 1295 : fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
165 1295 : let mut shared = self.mutex.lock();
166 1295 : let slot = shared.get_slot_mut(id);
167 1295 : match &mut slot.feedback {
168 1292 : ReplicationFeedback::Standby(sf) => sf.reply = *reply,
169 : ReplicationFeedback::Pageserver(_) => {
170 3 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
171 3 : reply: *reply,
172 3 : hs_feedback: HotStandbyFeedback::empty(),
173 3 : })
174 : }
175 : }
176 1295 : }
177 :
178 : /// Record hot standby feedback, update aggregated value.
179 2 : fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
180 2 : let mut shared = self.mutex.lock();
181 2 : let slot = shared.get_slot_mut(id);
182 2 : match &mut slot.feedback {
183 2 : ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
184 : ReplicationFeedback::Pageserver(_) => {
185 UBC 0 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
186 0 : reply: StandbyReply::empty(),
187 0 : hs_feedback: *feedback,
188 0 : })
189 : }
190 : }
191 CBC 2 : shared.update_hs_feedback();
192 2 : }
193 :
194 : /// Get remote_consistent_lsn reported by the pageserver. Returns None if
195 : /// client is not pageserver.
196 3461 : fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
197 3461 : let shared = self.mutex.lock();
198 3461 : let slot = shared.get_slot(id);
199 3461 : match slot.feedback {
200 3461 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
201 UBC 0 : _ => None,
202 : }
203 CBC 3461 : }
204 :
205 : /// Get remote_consistent_lsn maximized across all walsenders and peers.
206 26716 : pub fn get_remote_consistent_lsn(self: &Arc<WalSenders>) -> Lsn {
207 26716 : self.remote_consistent_lsn.load()
208 26716 : }
209 :
210 : /// Update maximized remote_consistent_lsn, return new (potentially) value.
211 577771 : pub fn update_remote_consistent_lsn(self: &Arc<WalSenders>, candidate: Lsn) -> Lsn {
212 577771 : self.remote_consistent_lsn
213 577771 : .fetch_max(candidate)
214 577771 : .max(candidate)
215 577771 : }
216 :
217 : /// Unregister walsender.
218 418 : fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
219 418 : let mut shared = self.mutex.lock();
220 418 : shared.slots[id] = None;
221 418 : shared.update_hs_feedback();
222 418 : }
223 : }
224 :
225 : struct WalSendersShared {
226 : // aggregated over all walsenders value
227 : agg_hs_feedback: HotStandbyFeedback,
228 : // aggregated over all walsenders value
229 : agg_ps_feedback: PageserverFeedback,
230 : slots: Vec<Option<WalSenderState>>,
231 : }
232 :
233 : impl WalSendersShared {
234 585 : fn new() -> Self {
235 585 : WalSendersShared {
236 585 : agg_hs_feedback: HotStandbyFeedback::empty(),
237 585 : agg_ps_feedback: PageserverFeedback::empty(),
238 585 : slots: Vec::new(),
239 585 : }
240 585 : }
241 :
242 : /// Get content of provided id slot, it must exist.
243 3461 : fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
244 3461 : self.slots[id].as_ref().expect("walsender doesn't exist")
245 3461 : }
246 :
247 : /// Get mut content of provided id slot, it must exist.
248 567108 : fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
249 567108 : self.slots[id].as_mut().expect("walsender doesn't exist")
250 567108 : }
251 :
252 : /// Update aggregated hot standy feedback. We just take min of valid xmins
253 : /// and ts.
254 422 : fn update_hs_feedback(&mut self) {
255 422 : let mut agg = HotStandbyFeedback::empty();
256 422 : for ws_state in self.slots.iter().flatten() {
257 123 : if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
258 6 : let hs_feedback = standby_feedback.hs_feedback;
259 6 : // doing Option math like op1.iter().chain(op2.iter()).min()
260 6 : // would be nicer, but we serialize/deserialize this struct
261 6 : // directly, so leave as is for now
262 6 : if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
263 2 : if agg.xmin != INVALID_FULL_TRANSACTION_ID {
264 1 : agg.xmin = min(agg.xmin, hs_feedback.xmin);
265 1 : } else {
266 1 : agg.xmin = hs_feedback.xmin;
267 1 : }
268 2 : agg.ts = min(agg.ts, hs_feedback.ts);
269 4 : }
270 6 : if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
271 UBC 0 : if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
272 0 : agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
273 0 : } else {
274 0 : agg.catalog_xmin = hs_feedback.catalog_xmin;
275 0 : }
276 0 : agg.ts = min(agg.ts, hs_feedback.ts);
277 CBC 6 : }
278 117 : }
279 : }
280 422 : self.agg_hs_feedback = agg;
281 422 : }
282 :
283 : /// Update aggregated pageserver feedback. LSNs (last_received,
284 : /// disk_consistent, remote_consistent) and reply timestamp are just
285 : /// maximized; timeline_size if taken from feedback with highest
286 : /// last_received lsn. This is generally reasonable, but we might want to
287 : /// implement other policies once multiple pageservers start to be actively
288 : /// used.
289 565812 : fn update_ps_feedback(&mut self) {
290 565812 : let init = PageserverFeedback::empty();
291 565812 : let acc =
292 565812 : self.slots
293 565812 : .iter()
294 565812 : .flatten()
295 570374 : .fold(init, |mut acc, ws_state| match ws_state.feedback {
296 569918 : ReplicationFeedback::Pageserver(feedback) => {
297 569918 : if feedback.last_received_lsn > acc.last_received_lsn {
298 566715 : acc.current_timeline_size = feedback.current_timeline_size;
299 566715 : }
300 569918 : acc.last_received_lsn =
301 569918 : max(feedback.last_received_lsn, acc.last_received_lsn);
302 569918 : acc.disk_consistent_lsn =
303 569918 : max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn);
304 569918 : acc.remote_consistent_lsn =
305 569918 : max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn);
306 569918 : acc.replytime = max(feedback.replytime, acc.replytime);
307 569918 : acc
308 : }
309 456 : ReplicationFeedback::Standby(_) => acc,
310 570374 : });
311 565812 : self.agg_ps_feedback = acc;
312 565812 : }
313 : }
314 :
315 : // Serialized is used only for pretty printing in json.
316 92 : #[derive(Debug, Clone, Serialize, Deserialize)]
317 : pub struct WalSenderState {
318 : ttid: TenantTimelineId,
319 : addr: SocketAddr,
320 : conn_id: ConnectionId,
321 : // postgres application_name
322 : appname: Option<String>,
323 : feedback: ReplicationFeedback,
324 : }
325 :
326 : // Receiver is either pageserver or regular standby, which have different
327 : // feedbacks.
328 92 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
329 : enum ReplicationFeedback {
330 : Pageserver(PageserverFeedback),
331 : Standby(StandbyFeedback),
332 : }
333 :
334 : // id of the occupied slot in WalSenders to access it (and save in the
335 : // WalSenderGuard). We could give Arc directly to the slot, but there is not
336 : // much sense in that as values aggregation which is performed on each feedback
337 : // receival iterates over all walsenders.
338 : pub type WalSenderId = usize;
339 :
340 : /// Scope guard to access slot in WalSenders registry and unregister from it in
341 : /// Drop.
342 : pub struct WalSenderGuard {
343 : id: WalSenderId,
344 : walsenders: Arc<WalSenders>,
345 : }
346 :
347 : impl Drop for WalSenderGuard {
348 418 : fn drop(&mut self) {
349 418 : self.walsenders.unregister(self.id);
350 418 : }
351 : }
352 :
353 : impl SafekeeperPostgresHandler {
354 : /// Wrapper around handle_start_replication_guts handling result. Error is
355 : /// handled here while we're still in walsender ttid span; with API
356 : /// extension, this can probably be moved into postgres_backend.
357 734 : pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
358 734 : &mut self,
359 734 : pgb: &mut PostgresBackend<IO>,
360 734 : start_pos: Lsn,
361 734 : term: Option<Term>,
362 734 : ) -> Result<(), QueryError> {
363 734 : if let Err(end) = self
364 734 : .handle_start_replication_guts(pgb, start_pos, term)
365 2111087 : .await
366 : {
367 : // Log the result and probably send it to the client, closing the stream.
368 418 : pgb.handle_copy_stream_end(end).await;
369 UBC 0 : }
370 CBC 418 : Ok(())
371 418 : }
372 :
373 734 : pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
374 734 : &mut self,
375 734 : pgb: &mut PostgresBackend<IO>,
376 734 : start_pos: Lsn,
377 734 : term: Option<Term>,
378 734 : ) -> Result<(), CopyStreamHandlerEnd> {
379 734 : let appname = self.appname.clone();
380 734 : let tli =
381 734 : GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
382 :
383 : // Use a guard object to remove our entry from the timeline when we are done.
384 734 : let ws_guard = Arc::new(tli.get_walsenders().register(
385 734 : self.ttid,
386 734 : *pgb.get_peer_addr(),
387 734 : self.conn_id,
388 734 : self.appname.clone(),
389 734 : ));
390 :
391 : // Walsender can operate in one of two modes which we select by
392 : // application_name: give only committed WAL (used by pageserver) or all
393 : // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
394 : // The second case is always driven by a consensus leader which term
395 : // must be supplied.
396 734 : let end_watch = if term.is_some() {
397 16 : EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
398 : } else {
399 718 : EndWatch::Commit(tli.get_commit_lsn_watch_rx())
400 : };
401 : // we don't check term here; it will be checked on first waiting/WAL reading anyway.
402 734 : let end_pos = end_watch.get();
403 734 :
404 734 : if end_pos < start_pos {
405 18 : warn!(
406 18 : "requested start_pos {} is ahead of available WAL end_pos {}",
407 18 : start_pos, end_pos
408 18 : );
409 716 : }
410 :
411 734 : info!(
412 734 : "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
413 734 : start_pos,
414 734 : end_pos,
415 734 : matches!(end_watch, EndWatch::Flush(_)),
416 734 : appname
417 734 : );
418 :
419 : // switch to copy
420 734 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
421 :
422 734 : let (_, persisted_state) = tli.get_state().await;
423 734 : let wal_reader = WalReader::new(
424 734 : self.conf.workdir.clone(),
425 734 : self.conf.timeline_dir(&tli.ttid),
426 734 : &persisted_state,
427 734 : start_pos,
428 734 : self.conf.wal_backup_enabled,
429 734 : )?;
430 :
431 : // Split to concurrently receive and send data; replies are generally
432 : // not synchronized with sends, so this avoids deadlocks.
433 734 : let reader = pgb.split().context("START_REPLICATION split")?;
434 :
435 734 : let mut sender = WalSender {
436 734 : pgb,
437 734 : tli: tli.clone(),
438 734 : appname,
439 734 : start_pos,
440 734 : end_pos,
441 734 : term,
442 734 : end_watch,
443 734 : ws_guard: ws_guard.clone(),
444 734 : wal_reader,
445 734 : send_buf: [0; MAX_SEND_SIZE],
446 734 : };
447 734 : let mut reply_reader = ReplyReader { reader, ws_guard };
448 :
449 734 : let res = tokio::select! {
450 : // todo: add read|write .context to these errors
451 68 : r = sender.run() => r,
452 350 : r = reply_reader.run() => r,
453 : };
454 : // Join pg backend back.
455 418 : pgb.unsplit(reply_reader.reader)?;
456 :
457 418 : res
458 418 : }
459 : }
460 :
461 : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
462 : /// given term (recovery by walproposer or peer safekeeper).
463 : enum EndWatch {
464 : Commit(Receiver<Lsn>),
465 : Flush(Receiver<TermLsn>),
466 : }
467 :
468 : impl EndWatch {
469 : /// Get current end of WAL.
470 2261841 : fn get(&self) -> Lsn {
471 2261841 : match self {
472 2259871 : EndWatch::Commit(r) => *r.borrow(),
473 1970 : EndWatch::Flush(r) => r.borrow().lsn,
474 : }
475 2261841 : }
476 :
477 : /// Wait for the update.
478 1202617 : async fn changed(&mut self) -> anyhow::Result<()> {
479 1202617 : match self {
480 1531042 : EndWatch::Commit(r) => r.changed().await?,
481 43 : EndWatch::Flush(r) => r.changed().await?,
482 : }
483 1198512 : Ok(())
484 1198512 : }
485 : }
486 :
487 : /// A half driving sending WAL.
488 : struct WalSender<'a, IO> {
489 : pgb: &'a mut PostgresBackend<IO>,
490 : tli: Arc<Timeline>,
491 : appname: Option<String>,
492 : // Position since which we are sending next chunk.
493 : start_pos: Lsn,
494 : // WAL up to this position is known to be locally available.
495 : // Usually this is the same as the latest commit_lsn, but in case of
496 : // walproposer recovery, this is flush_lsn.
497 : //
498 : // We send this LSN to the receiver as wal_end, so that it knows how much
499 : // WAL this safekeeper has. This LSN should be as fresh as possible.
500 : end_pos: Lsn,
501 : /// When streaming uncommitted part, the term the client acts as the leader
502 : /// in. Streaming is stopped if local term changes to a different (higher)
503 : /// value.
504 : term: Option<Term>,
505 : /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
506 : end_watch: EndWatch,
507 : ws_guard: Arc<WalSenderGuard>,
508 : wal_reader: WalReader,
509 : // buffer for readling WAL into to send it
510 : send_buf: [u8; MAX_SEND_SIZE],
511 : }
512 :
513 : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
514 : /// Send WAL until
515 : /// - an error occurs
516 : /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
517 : ///
518 : /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
519 : /// convenience.
520 734 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
521 : loop {
522 : // Wait for the next portion if it is not there yet, or just
523 : // update our end of WAL available for sending value, we
524 : // communicate it to the receiver.
525 1531100 : self.wait_wal().await?;
526 : assert!(
527 568379 : self.end_pos > self.start_pos,
528 UBC 0 : "nothing to send after waiting for WAL"
529 : );
530 :
531 : // try to send as much as available, capped by MAX_SEND_SIZE
532 CBC 568379 : let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
533 568379 : // if we went behind available WAL, back off
534 568379 : if chunk_end_pos >= self.end_pos {
535 502419 : chunk_end_pos = self.end_pos;
536 502419 : } else {
537 65960 : // If sending not up to end pos, round down to page boundary to
538 65960 : // avoid breaking WAL record not at page boundary, as protocol
539 65960 : // demands. See walsender.c (XLogSendPhysical).
540 65960 : chunk_end_pos = chunk_end_pos
541 65960 : .checked_sub(chunk_end_pos.block_offset())
542 65960 : .unwrap();
543 65960 : }
544 568379 : let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
545 568379 : let send_buf = &mut self.send_buf[..send_size];
546 : let send_size: usize;
547 : {
548 : // If uncommitted part is being pulled, check that the term is
549 : // still the expected one.
550 568379 : let _term_guard = if let Some(t) = self.term {
551 1905 : Some(self.tli.acquire_term(t).await?)
552 : } else {
553 566474 : None
554 : };
555 : // Read WAL into buffer. send_size can be additionally capped to
556 : // segment boundary here.
557 568378 : send_size = self.wal_reader.read(send_buf).await?
558 : };
559 568377 : let send_buf = &send_buf[..send_size];
560 568377 :
561 568377 : // and send it
562 568377 : self.pgb
563 568377 : .write_message(&BeMessage::XLogData(XLogDataBody {
564 568377 : wal_start: self.start_pos.0,
565 568377 : wal_end: self.end_pos.0,
566 568377 : timestamp: get_current_timestamp(),
567 568377 : data: send_buf,
568 568377 : }))
569 17151 : .await?;
570 :
571 568336 : if let Some(appname) = &self.appname {
572 566572 : if appname == "replica" {
573 UBC 0 : failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
574 CBC 566008 : }
575 1764 : }
576 UBC 0 : trace!(
577 0 : "sent {} bytes of WAL {}-{}",
578 0 : send_size,
579 0 : self.start_pos,
580 0 : self.start_pos + send_size as u64
581 0 : );
582 CBC 568336 : self.start_pos += send_size as u64;
583 : }
584 68 : }
585 :
586 : /// wait until we have WAL to stream, sending keepalives and checking for
587 : /// exit in the meanwhile
588 569070 : async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
589 : loop {
590 572484 : self.end_pos = self.end_watch.get();
591 572484 : if self.end_pos > self.start_pos {
592 : // We have something to send.
593 UBC 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
594 CBC 82373 : return Ok(());
595 490111 : }
596 :
597 : // Wait for WAL to appear, now self.end_pos == self.start_pos.
598 1531085 : if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
599 486006 : self.end_pos = lsn;
600 UBC 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
601 CBC 486006 : return Ok(());
602 3462 : }
603 3462 :
604 3462 : // Timed out waiting for WAL, check for termination and send KA.
605 3462 : // Check for termination only if we are streaming up to commit_lsn
606 3462 : // (to pageserver).
607 3462 : if let EndWatch::Commit(_) = self.end_watch {
608 3461 : if let Some(remote_consistent_lsn) = self
609 3461 : .ws_guard
610 3461 : .walsenders
611 3461 : .get_ws_remote_consistent_lsn(self.ws_guard.id)
612 : {
613 3461 : if self.tli.should_walsender_stop(remote_consistent_lsn).await {
614 : // Terminate if there is nothing more to send.
615 : // Note that "ending streaming" part of the string is used by
616 : // pageserver to identify WalReceiverError::SuccessfulCompletion,
617 : // do not change this string without updating pageserver.
618 48 : return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
619 48 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
620 48 : self.appname, self.start_pos,
621 48 : )));
622 3413 : }
623 UBC 0 : }
624 CBC 1 : }
625 :
626 3414 : self.pgb
627 3414 : .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
628 3414 : wal_end: self.end_pos.0,
629 3414 : timestamp: get_current_timestamp(),
630 3414 : request_reply: true,
631 3414 : }))
632 UBC 0 : .await?;
633 : }
634 CBC 568427 : }
635 : }
636 :
637 : /// A half driving receiving replies.
638 : struct ReplyReader<IO> {
639 : reader: PostgresBackendReader<IO>,
640 : ws_guard: Arc<WalSenderGuard>,
641 : }
642 :
643 : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
644 734 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
645 : loop {
646 2110936 : let msg = self.reader.read_copy_message().await?;
647 567108 : self.handle_feedback(&msg)?
648 : }
649 350 : }
650 :
651 567108 : fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
652 567108 : match msg.first().cloned() {
653 2 : Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
654 2 : // Note: deserializing is on m[1..] because we skip the tag byte.
655 2 : let hs_feedback = HotStandbyFeedback::des(&msg[1..])
656 2 : .context("failed to deserialize HotStandbyFeedback")?;
657 2 : self.ws_guard
658 2 : .walsenders
659 2 : .record_hs_feedback(self.ws_guard.id, &hs_feedback);
660 : }
661 1295 : Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
662 1295 : let reply =
663 1295 : StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
664 1295 : self.ws_guard
665 1295 : .walsenders
666 1295 : .record_standby_reply(self.ws_guard.id, &reply);
667 : }
668 : Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
669 : // pageserver sends this.
670 : // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
671 565811 : let buf = Bytes::copy_from_slice(&msg[9..]);
672 565811 : let ps_feedback = PageserverFeedback::parse(buf);
673 565811 :
674 565811 : trace!("PageserverFeedback is {:?}", ps_feedback);
675 565811 : self.ws_guard
676 565811 : .walsenders
677 565811 : .record_ps_feedback(self.ws_guard.id, &ps_feedback);
678 : // in principle new remote_consistent_lsn could allow to
679 : // deactivate the timeline, but we check that regularly through
680 : // broker updated, not need to do it here
681 : }
682 UBC 0 : _ => warn!("unexpected message {:?}", msg),
683 : }
684 CBC 567108 : Ok(())
685 567108 : }
686 : }
687 :
688 : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
689 :
690 : /// Wait until we have available WAL > start_pos or timeout expires. Returns
691 : /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
692 : /// - Ok(None) if timeout expired;
693 : /// - Err in case of error -- only if 1) term changed while fetching in recovery
694 : /// mode 2) watch channel closed, which must never happen.
695 490111 : async fn wait_for_lsn(
696 490111 : rx: &mut EndWatch,
697 490111 : client_term: Option<Term>,
698 490111 : start_pos: Lsn,
699 490111 : ) -> anyhow::Result<Option<Lsn>> {
700 490111 : let res = timeout(POLL_STATE_TIMEOUT, async move {
701 : loop {
702 1688623 : let end_pos = rx.get();
703 1688623 : if end_pos > start_pos {
704 486006 : return Ok(end_pos);
705 1202617 : }
706 1202617 : if let EndWatch::Flush(rx) = rx {
707 33 : let curr_term = rx.borrow().term;
708 33 : if let Some(client_term) = client_term {
709 33 : if curr_term != client_term {
710 UBC 0 : bail!("term changed: requested {}, now {}", client_term, curr_term);
711 CBC 33 : }
712 UBC 0 : }
713 CBC 1202584 : }
714 1531085 : rx.changed().await?;
715 : }
716 490111 : })
717 1531085 : .await;
718 :
719 486006 : match res {
720 : // success
721 486006 : Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
722 : // error inside closure
723 UBC 0 : Ok(Err(err)) => Err(err),
724 : // timeout
725 CBC 3462 : Err(_) => Ok(None),
726 : }
727 489468 : }
728 :
729 : #[cfg(test)]
730 : mod tests {
731 : use postgres_protocol::PG_EPOCH;
732 : use utils::id::{TenantId, TimelineId};
733 :
734 : use super::*;
735 :
736 6 : fn mock_ttid() -> TenantTimelineId {
737 6 : TenantTimelineId {
738 6 : tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
739 6 : timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
740 6 : }
741 6 : }
742 :
743 6 : fn mock_addr() -> SocketAddr {
744 6 : "127.0.0.1:8080".parse().unwrap()
745 6 : }
746 :
747 : // add to wss specified feedback setting other fields to dummy values
748 6 : fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
749 6 : let walsender_state = WalSenderState {
750 6 : ttid: mock_ttid(),
751 6 : addr: mock_addr(),
752 6 : conn_id: 1,
753 6 : appname: None,
754 6 : feedback,
755 6 : };
756 6 : wss.slots.push(Some(walsender_state))
757 6 : }
758 :
759 : // form standby feedback with given hot standby feedback ts/xmin and the
760 : // rest set to dummy values.
761 4 : fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
762 4 : ReplicationFeedback::Standby(StandbyFeedback {
763 4 : reply: StandbyReply::empty(),
764 4 : hs_feedback: HotStandbyFeedback {
765 4 : ts,
766 4 : xmin,
767 4 : catalog_xmin: 0,
768 4 : },
769 4 : })
770 4 : }
771 :
772 : // test that hs aggregation works as expected
773 1 : #[test]
774 1 : fn test_hs_feedback_no_valid() {
775 1 : let mut wss = WalSendersShared::new();
776 1 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
777 1 : wss.update_hs_feedback();
778 1 : assert_eq!(wss.agg_hs_feedback.xmin, INVALID_FULL_TRANSACTION_ID);
779 1 : }
780 :
781 1 : #[test]
782 1 : fn test_hs_feedback() {
783 1 : let mut wss = WalSendersShared::new();
784 1 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
785 1 : push_feedback(&mut wss, hs_feedback(1, 42));
786 1 : push_feedback(&mut wss, hs_feedback(1, 64));
787 1 : wss.update_hs_feedback();
788 1 : assert_eq!(wss.agg_hs_feedback.xmin, 42);
789 1 : }
790 :
791 : // form pageserver feedback with given last_record_lsn / tli size and the
792 : // rest set to dummy values.
793 2 : fn ps_feedback(current_timeline_size: u64, last_received_lsn: Lsn) -> ReplicationFeedback {
794 2 : ReplicationFeedback::Pageserver(PageserverFeedback {
795 2 : current_timeline_size,
796 2 : last_received_lsn,
797 2 : disk_consistent_lsn: Lsn::INVALID,
798 2 : remote_consistent_lsn: Lsn::INVALID,
799 2 : replytime: *PG_EPOCH,
800 2 : })
801 2 : }
802 :
803 : // test that ps aggregation works as expected
804 1 : #[test]
805 1 : fn test_ps_feedback() {
806 1 : let mut wss = WalSendersShared::new();
807 1 : push_feedback(&mut wss, ps_feedback(8, Lsn(42)));
808 1 : push_feedback(&mut wss, ps_feedback(4, Lsn(84)));
809 1 : wss.update_ps_feedback();
810 1 : assert_eq!(wss.agg_ps_feedback.current_timeline_size, 4);
811 1 : assert_eq!(wss.agg_ps_feedback.last_received_lsn, Lsn(84));
812 1 : }
813 : }
|