Line data Source code
1 : //! This module implements the streaming side of replication protocol, starting
2 : //! with the "START_REPLICATION" message, and registry of walsenders.
3 :
4 : use crate::handler::SafekeeperPostgresHandler;
5 : use crate::safekeeper::{Term, TermLsn};
6 : use crate::timeline::Timeline;
7 : use crate::wal_service::ConnectionId;
8 : use crate::wal_storage::WalReader;
9 : use crate::GlobalTimelines;
10 : use anyhow::{bail, Context as AnyhowContext};
11 : use bytes::Bytes;
12 : use parking_lot::Mutex;
13 : use postgres_backend::PostgresBackend;
14 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
15 : use postgres_ffi::get_current_timestamp;
16 : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
17 : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
18 : use serde::{Deserialize, Serialize};
19 : use tokio::io::{AsyncRead, AsyncWrite};
20 : use utils::failpoint_support;
21 : use utils::id::TenantTimelineId;
22 : use utils::pageserver_feedback::PageserverFeedback;
23 :
24 : use std::cmp::{max, min};
25 : use std::net::SocketAddr;
26 : use std::str;
27 : use std::sync::Arc;
28 : use std::time::Duration;
29 : use tokio::sync::watch::Receiver;
30 : use tokio::time::timeout;
31 : use tracing::*;
32 : use utils::{bin_ser::BeSer, lsn::Lsn};
33 :
34 : // See: https://www.postgresql.org/docs/13/protocol-replication.html
35 : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
36 : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
37 : // neon extension of replication protocol
38 : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
39 :
40 : type FullTransactionId = u64;
41 :
42 : /// Hot standby feedback received from replica
43 21 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
44 : pub struct HotStandbyFeedback {
45 : pub ts: TimestampTz,
46 : pub xmin: FullTransactionId,
47 : pub catalog_xmin: FullTransactionId,
48 : }
49 :
50 : const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
51 :
52 : impl HotStandbyFeedback {
53 1644387 : pub fn empty() -> HotStandbyFeedback {
54 1644387 : HotStandbyFeedback {
55 1644387 : ts: 0,
56 1644387 : xmin: 0,
57 1644387 : catalog_xmin: 0,
58 1644387 : }
59 1644387 : }
60 : }
61 :
62 : /// Standby status update
63 49159 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
64 : pub struct StandbyReply {
65 : pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
66 : pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
67 : pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
68 : pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
69 : pub reply_requested: bool,
70 : }
71 :
72 : impl StandbyReply {
73 1068 : fn empty() -> Self {
74 1068 : StandbyReply {
75 1068 : write_lsn: Lsn::INVALID,
76 1068 : flush_lsn: Lsn::INVALID,
77 1068 : apply_lsn: Lsn::INVALID,
78 1068 : reply_ts: 0,
79 1068 : reply_requested: false,
80 1068 : }
81 1068 : }
82 : }
83 :
84 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
85 : pub struct StandbyFeedback {
86 : pub reply: StandbyReply,
87 : pub hs_feedback: HotStandbyFeedback,
88 : }
89 :
90 : impl StandbyFeedback {
91 622 : pub fn empty() -> Self {
92 622 : StandbyFeedback {
93 622 : reply: StandbyReply::empty(),
94 622 : hs_feedback: HotStandbyFeedback::empty(),
95 622 : }
96 622 : }
97 : }
98 :
99 : /// WalSenders registry. Timeline holds it (wrapped in Arc).
100 : pub struct WalSenders {
101 : mutex: Mutex<WalSendersShared>,
102 : }
103 :
104 : impl WalSenders {
105 616 : pub fn new() -> Arc<WalSenders> {
106 616 : Arc::new(WalSenders {
107 616 : mutex: Mutex::new(WalSendersShared::new()),
108 616 : })
109 616 : }
110 :
111 : /// Register new walsender. Returned guard provides access to the slot and
112 : /// automatically deregisters in Drop.
113 768 : fn register(
114 768 : self: &Arc<WalSenders>,
115 768 : ttid: TenantTimelineId,
116 768 : addr: SocketAddr,
117 768 : conn_id: ConnectionId,
118 768 : appname: Option<String>,
119 768 : ) -> WalSenderGuard {
120 768 : let slots = &mut self.mutex.lock().slots;
121 768 : let walsender_state = WalSenderState {
122 768 : ttid,
123 768 : addr,
124 768 : conn_id,
125 768 : appname,
126 768 : feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
127 768 : };
128 : // find empty slot or create new one
129 768 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
130 248 : slots[pos] = Some(walsender_state);
131 248 : pos
132 : } else {
133 520 : let pos = slots.len();
134 520 : slots.push(Some(walsender_state));
135 520 : pos
136 : };
137 768 : WalSenderGuard {
138 768 : id: pos,
139 768 : walsenders: self.clone(),
140 768 : }
141 768 : }
142 :
143 : /// Get state of all walsenders.
144 257 : pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
145 257 : self.mutex.lock().slots.iter().flatten().cloned().collect()
146 257 : }
147 :
148 : /// Get aggregated pageserver feedback.
149 51 : pub fn get_ps_feedback(self: &Arc<WalSenders>) -> PageserverFeedback {
150 51 : self.mutex.lock().agg_ps_feedback
151 51 : }
152 :
153 : /// Get aggregated pageserver and hot standby feedback (we send them to compute).
154 1643317 : pub fn get_feedbacks(self: &Arc<WalSenders>) -> (PageserverFeedback, StandbyFeedback) {
155 1643317 : let shared = self.mutex.lock();
156 1643317 : (shared.agg_ps_feedback, shared.agg_standby_feedback)
157 1643317 : }
158 :
159 : /// Record new pageserver feedback, update aggregated values.
160 795424 : fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
161 795424 : let mut shared = self.mutex.lock();
162 795424 : shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
163 795424 : shared.update_ps_feedback();
164 795424 : }
165 :
166 : /// Record standby reply.
167 49159 : fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
168 49159 : let mut shared = self.mutex.lock();
169 49159 : let slot = shared.get_slot_mut(id);
170 49159 : info!(
171 49159 : "Record standby reply: ts={} apply_lsn={}",
172 49159 : reply.reply_ts, reply.apply_lsn
173 49159 : );
174 49159 : match &mut slot.feedback {
175 49153 : ReplicationFeedback::Standby(sf) => sf.reply = *reply,
176 : ReplicationFeedback::Pageserver(_) => {
177 6 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
178 6 : reply: *reply,
179 6 : hs_feedback: HotStandbyFeedback::empty(),
180 6 : })
181 : }
182 : }
183 49159 : }
184 :
185 : /// Record hot standby feedback, update aggregated value.
186 21 : fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
187 21 : let mut shared = self.mutex.lock();
188 21 : let slot = shared.get_slot_mut(id);
189 21 : match &mut slot.feedback {
190 21 : ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
191 : ReplicationFeedback::Pageserver(_) => {
192 0 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
193 0 : reply: StandbyReply::empty(),
194 0 : hs_feedback: *feedback,
195 0 : })
196 : }
197 : }
198 21 : shared.update_reply_feedback();
199 21 : }
200 :
201 : /// Get remote_consistent_lsn reported by the pageserver. Returns None if
202 : /// client is not pageserver.
203 3356 : fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
204 3356 : let shared = self.mutex.lock();
205 3356 : let slot = shared.get_slot(id);
206 3356 : match slot.feedback {
207 3203 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
208 153 : _ => None,
209 : }
210 3356 : }
211 :
212 : /// Unregister walsender.
213 413 : fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
214 413 : let mut shared = self.mutex.lock();
215 413 : shared.slots[id] = None;
216 413 : shared.update_reply_feedback();
217 413 : }
218 : }
219 :
220 : struct WalSendersShared {
221 : // aggregated over all walsenders value
222 : agg_standby_feedback: StandbyFeedback,
223 : // aggregated over all walsenders value
224 : agg_ps_feedback: PageserverFeedback,
225 : slots: Vec<Option<WalSenderState>>,
226 : }
227 :
228 : impl WalSendersShared {
229 622 : fn new() -> Self {
230 622 : WalSendersShared {
231 622 : agg_standby_feedback: StandbyFeedback::empty(),
232 622 : agg_ps_feedback: PageserverFeedback::empty(),
233 622 : slots: Vec::new(),
234 622 : }
235 622 : }
236 :
237 : /// Get content of provided id slot, it must exist.
238 3356 : fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
239 3356 : self.slots[id].as_ref().expect("walsender doesn't exist")
240 3356 : }
241 :
242 : /// Get mut content of provided id slot, it must exist.
243 844604 : fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
244 844604 : self.slots[id].as_mut().expect("walsender doesn't exist")
245 844604 : }
246 :
247 : /// Update aggregated hot standy feedback. We just take min of valid xmins
248 : /// and ts.
249 438 : fn update_reply_feedback(&mut self) {
250 438 : let mut agg = HotStandbyFeedback::empty();
251 438 : let mut reply_agg = StandbyReply::empty();
252 438 : for ws_state in self.slots.iter().flatten() {
253 168 : if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
254 29 : let hs_feedback = standby_feedback.hs_feedback;
255 29 : // doing Option math like op1.iter().chain(op2.iter()).min()
256 29 : // would be nicer, but we serialize/deserialize this struct
257 29 : // directly, so leave as is for now
258 29 : if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
259 25 : if agg.xmin != INVALID_FULL_TRANSACTION_ID {
260 2 : agg.xmin = min(agg.xmin, hs_feedback.xmin);
261 23 : } else {
262 23 : agg.xmin = hs_feedback.xmin;
263 23 : }
264 25 : agg.ts = max(agg.ts, hs_feedback.ts);
265 4 : }
266 29 : if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
267 0 : if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
268 0 : agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
269 0 : } else {
270 0 : agg.catalog_xmin = hs_feedback.catalog_xmin;
271 0 : }
272 0 : agg.ts = max(agg.ts, hs_feedback.ts);
273 29 : }
274 29 : let reply = standby_feedback.reply;
275 29 : if reply.write_lsn != Lsn::INVALID {
276 21 : if reply_agg.write_lsn != Lsn::INVALID {
277 0 : reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn);
278 21 : } else {
279 21 : reply_agg.write_lsn = reply.write_lsn;
280 21 : }
281 8 : }
282 29 : if reply.flush_lsn != Lsn::INVALID {
283 21 : if reply_agg.flush_lsn != Lsn::INVALID {
284 0 : reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn);
285 21 : } else {
286 21 : reply_agg.flush_lsn = reply.flush_lsn;
287 21 : }
288 8 : }
289 29 : if reply.apply_lsn != Lsn::INVALID {
290 21 : if reply_agg.apply_lsn != Lsn::INVALID {
291 0 : reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn);
292 21 : } else {
293 21 : reply_agg.apply_lsn = reply.apply_lsn;
294 21 : }
295 8 : }
296 29 : if reply.reply_ts != 0 {
297 21 : if reply_agg.reply_ts != 0 {
298 0 : reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts);
299 21 : } else {
300 21 : reply_agg.reply_ts = reply.reply_ts;
301 21 : }
302 8 : }
303 139 : }
304 : }
305 438 : self.agg_standby_feedback = StandbyFeedback {
306 438 : reply: reply_agg,
307 438 : hs_feedback: agg,
308 438 : };
309 438 : }
310 :
311 : /// Update aggregated pageserver feedback. LSNs (last_received,
312 : /// disk_consistent, remote_consistent) and reply timestamp are just
313 : /// maximized; timeline_size if taken from feedback with highest
314 : /// last_received lsn. This is generally reasonable, but we might want to
315 : /// implement other policies once multiple pageservers start to be actively
316 : /// used.
317 795426 : fn update_ps_feedback(&mut self) {
318 795426 : let init = PageserverFeedback::empty();
319 795426 : let acc =
320 795426 : self.slots
321 795426 : .iter()
322 795426 : .flatten()
323 1173112 : .fold(init, |mut acc, ws_state| match ws_state.feedback {
324 1152277 : ReplicationFeedback::Pageserver(feedback) => {
325 1152277 : if feedback.last_received_lsn > acc.last_received_lsn {
326 858316 : acc.current_timeline_size = feedback.current_timeline_size;
327 858316 : }
328 1152277 : acc.last_received_lsn =
329 1152277 : max(feedback.last_received_lsn, acc.last_received_lsn);
330 1152277 : acc.disk_consistent_lsn =
331 1152277 : max(feedback.disk_consistent_lsn, acc.disk_consistent_lsn);
332 1152277 : acc.remote_consistent_lsn =
333 1152277 : max(feedback.remote_consistent_lsn, acc.remote_consistent_lsn);
334 1152277 : acc.replytime = max(feedback.replytime, acc.replytime);
335 1152277 : acc
336 : }
337 20835 : ReplicationFeedback::Standby(_) => acc,
338 1173112 : });
339 795426 : self.agg_ps_feedback = acc;
340 795426 : }
341 : }
342 :
343 : // Serialized is used only for pretty printing in json.
344 137 : #[derive(Debug, Clone, Serialize, Deserialize)]
345 : pub struct WalSenderState {
346 : ttid: TenantTimelineId,
347 : addr: SocketAddr,
348 : conn_id: ConnectionId,
349 : // postgres application_name
350 : appname: Option<String>,
351 : feedback: ReplicationFeedback,
352 : }
353 :
354 : // Receiver is either pageserver or regular standby, which have different
355 : // feedbacks.
356 137 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
357 : enum ReplicationFeedback {
358 : Pageserver(PageserverFeedback),
359 : Standby(StandbyFeedback),
360 : }
361 :
362 : // id of the occupied slot in WalSenders to access it (and save in the
363 : // WalSenderGuard). We could give Arc directly to the slot, but there is not
364 : // much sense in that as values aggregation which is performed on each feedback
365 : // receival iterates over all walsenders.
366 : pub type WalSenderId = usize;
367 :
368 : /// Scope guard to access slot in WalSenders registry and unregister from it in
369 : /// Drop.
370 : pub struct WalSenderGuard {
371 : id: WalSenderId,
372 : walsenders: Arc<WalSenders>,
373 : }
374 :
375 : impl Drop for WalSenderGuard {
376 413 : fn drop(&mut self) {
377 413 : self.walsenders.unregister(self.id);
378 413 : }
379 : }
380 :
381 : impl SafekeeperPostgresHandler {
382 : /// Wrapper around handle_start_replication_guts handling result. Error is
383 : /// handled here while we're still in walsender ttid span; with API
384 : /// extension, this can probably be moved into postgres_backend.
385 768 : pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
386 768 : &mut self,
387 768 : pgb: &mut PostgresBackend<IO>,
388 768 : start_pos: Lsn,
389 768 : term: Option<Term>,
390 768 : ) -> Result<(), QueryError> {
391 768 : if let Err(end) = self
392 768 : .handle_start_replication_guts(pgb, start_pos, term)
393 3213768 : .await
394 : {
395 : // Log the result and probably send it to the client, closing the stream.
396 413 : pgb.handle_copy_stream_end(end).await;
397 0 : }
398 413 : Ok(())
399 413 : }
400 :
401 768 : pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
402 768 : &mut self,
403 768 : pgb: &mut PostgresBackend<IO>,
404 768 : start_pos: Lsn,
405 768 : term: Option<Term>,
406 768 : ) -> Result<(), CopyStreamHandlerEnd> {
407 768 : let appname = self.appname.clone();
408 768 : let tli =
409 768 : GlobalTimelines::get(self.ttid).map_err(|e| CopyStreamHandlerEnd::Other(e.into()))?;
410 :
411 : // Use a guard object to remove our entry from the timeline when we are done.
412 768 : let ws_guard = Arc::new(tli.get_walsenders().register(
413 768 : self.ttid,
414 768 : *pgb.get_peer_addr(),
415 768 : self.conn_id,
416 768 : self.appname.clone(),
417 768 : ));
418 :
419 : // Walsender can operate in one of two modes which we select by
420 : // application_name: give only committed WAL (used by pageserver) or all
421 : // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
422 : // The second case is always driven by a consensus leader which term
423 : // must be supplied.
424 768 : let end_watch = if term.is_some() {
425 18 : EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
426 : } else {
427 750 : EndWatch::Commit(tli.get_commit_lsn_watch_rx())
428 : };
429 : // we don't check term here; it will be checked on first waiting/WAL reading anyway.
430 768 : let end_pos = end_watch.get();
431 768 :
432 768 : if end_pos < start_pos {
433 14 : warn!(
434 14 : "requested start_pos {} is ahead of available WAL end_pos {}",
435 14 : start_pos, end_pos
436 14 : );
437 754 : }
438 :
439 768 : info!(
440 768 : "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
441 768 : start_pos,
442 768 : end_pos,
443 768 : matches!(end_watch, EndWatch::Flush(_)),
444 768 : appname
445 768 : );
446 :
447 : // switch to copy
448 768 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
449 :
450 768 : let (_, persisted_state) = tli.get_state().await;
451 768 : let wal_reader = WalReader::new(
452 768 : self.conf.workdir.clone(),
453 768 : self.conf.timeline_dir(&tli.ttid),
454 768 : &persisted_state,
455 768 : start_pos,
456 768 : self.conf.is_wal_backup_enabled(),
457 768 : )?;
458 :
459 : // Split to concurrently receive and send data; replies are generally
460 : // not synchronized with sends, so this avoids deadlocks.
461 768 : let reader = pgb.split().context("START_REPLICATION split")?;
462 :
463 768 : let mut sender = WalSender {
464 768 : pgb,
465 768 : tli: tli.clone(),
466 768 : appname,
467 768 : start_pos,
468 768 : end_pos,
469 768 : term,
470 768 : end_watch,
471 768 : ws_guard: ws_guard.clone(),
472 768 : wal_reader,
473 768 : send_buf: [0; MAX_SEND_SIZE],
474 768 : };
475 768 : let mut reply_reader = ReplyReader {
476 768 : reader,
477 768 : ws_guard,
478 768 : tli,
479 768 : };
480 :
481 768 : let res = tokio::select! {
482 : // todo: add read|write .context to these errors
483 64 : r = sender.run() => r,
484 349 : r = reply_reader.run() => r,
485 : };
486 : // Join pg backend back.
487 413 : pgb.unsplit(reply_reader.reader)?;
488 :
489 413 : res
490 413 : }
491 : }
492 :
493 : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
494 : /// given term (recovery by walproposer or peer safekeeper).
495 : enum EndWatch {
496 : Commit(Receiver<Lsn>),
497 : Flush(Receiver<TermLsn>),
498 : }
499 :
500 : impl EndWatch {
501 : /// Get current end of WAL.
502 3395561 : fn get(&self) -> Lsn {
503 3395561 : match self {
504 3393410 : EndWatch::Commit(r) => *r.borrow(),
505 2151 : EndWatch::Flush(r) => r.borrow().lsn,
506 : }
507 3395561 : }
508 :
509 : /// Wait for the update.
510 1856158 : async fn changed(&mut self) -> anyhow::Result<()> {
511 1856158 : match self {
512 2380874 : EndWatch::Commit(r) => r.changed().await?,
513 32 : EndWatch::Flush(r) => r.changed().await?,
514 : }
515 1852111 : Ok(())
516 1852111 : }
517 : }
518 :
519 : /// A half driving sending WAL.
520 : struct WalSender<'a, IO> {
521 : pgb: &'a mut PostgresBackend<IO>,
522 : tli: Arc<Timeline>,
523 : appname: Option<String>,
524 : // Position since which we are sending next chunk.
525 : start_pos: Lsn,
526 : // WAL up to this position is known to be locally available.
527 : // Usually this is the same as the latest commit_lsn, but in case of
528 : // walproposer recovery, this is flush_lsn.
529 : //
530 : // We send this LSN to the receiver as wal_end, so that it knows how much
531 : // WAL this safekeeper has. This LSN should be as fresh as possible.
532 : end_pos: Lsn,
533 : /// When streaming uncommitted part, the term the client acts as the leader
534 : /// in. Streaming is stopped if local term changes to a different (higher)
535 : /// value.
536 : term: Option<Term>,
537 : /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
538 : end_watch: EndWatch,
539 : ws_guard: Arc<WalSenderGuard>,
540 : wal_reader: WalReader,
541 : // buffer for readling WAL into to send it
542 : send_buf: [u8; MAX_SEND_SIZE],
543 : }
544 :
545 : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
546 : /// Send WAL until
547 : /// - an error occurs
548 : /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
549 : ///
550 : /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
551 : /// convenience.
552 768 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
553 : loop {
554 : // Wait for the next portion if it is not there yet, or just
555 : // update our end of WAL available for sending value, we
556 : // communicate it to the receiver.
557 2380947 : self.wait_wal().await?;
558 : assert!(
559 818193 : self.end_pos > self.start_pos,
560 0 : "nothing to send after waiting for WAL"
561 : );
562 :
563 : // try to send as much as available, capped by MAX_SEND_SIZE
564 818193 : let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
565 818193 : // if we went behind available WAL, back off
566 818193 : if chunk_end_pos >= self.end_pos {
567 736354 : chunk_end_pos = self.end_pos;
568 736354 : } else {
569 81839 : // If sending not up to end pos, round down to page boundary to
570 81839 : // avoid breaking WAL record not at page boundary, as protocol
571 81839 : // demands. See walsender.c (XLogSendPhysical).
572 81839 : chunk_end_pos = chunk_end_pos
573 81839 : .checked_sub(chunk_end_pos.block_offset())
574 81839 : .unwrap();
575 81839 : }
576 818193 : let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
577 818193 : let send_buf = &mut self.send_buf[..send_size];
578 : let send_size: usize;
579 : {
580 : // If uncommitted part is being pulled, check that the term is
581 : // still the expected one.
582 818193 : let _term_guard = if let Some(t) = self.term {
583 2085 : Some(self.tli.acquire_term(t).await?)
584 : } else {
585 816108 : None
586 : };
587 : // Read WAL into buffer. send_size can be additionally capped to
588 : // segment boundary here.
589 818192 : send_size = self.wal_reader.read(send_buf).await?
590 : };
591 818191 : let send_buf = &send_buf[..send_size];
592 818191 :
593 818191 : // and send it
594 818191 : self.pgb
595 818191 : .write_message(&BeMessage::XLogData(XLogDataBody {
596 818191 : wal_start: self.start_pos.0,
597 818191 : wal_end: self.end_pos.0,
598 818191 : timestamp: get_current_timestamp(),
599 818191 : data: send_buf,
600 818191 : }))
601 23382 : .await?;
602 :
603 818170 : if let Some(appname) = &self.appname {
604 816226 : if appname == "replica" {
605 0 : failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
606 795175 : }
607 1944 : }
608 0 : trace!(
609 0 : "sent {} bytes of WAL {}-{}",
610 0 : send_size,
611 0 : self.start_pos,
612 0 : self.start_pos + send_size as u64
613 0 : );
614 818170 : self.start_pos += send_size as u64;
615 : }
616 64 : }
617 :
618 : /// wait until we have WAL to stream, sending keepalives and checking for
619 : /// exit in the meanwhile
620 818938 : async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
621 : loop {
622 822240 : self.end_pos = self.end_watch.get();
623 822240 : if self.end_pos > self.start_pos {
624 : // We have something to send.
625 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
626 101798 : return Ok(());
627 720442 : }
628 :
629 : // Wait for WAL to appear, now self.end_pos == self.start_pos.
630 2380903 : if let Some(lsn) = wait_for_lsn(&mut self.end_watch, self.term, self.start_pos).await? {
631 716395 : self.end_pos = lsn;
632 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
633 716395 : return Ok(());
634 3356 : }
635 3356 :
636 3356 : // Timed out waiting for WAL, check for termination and send KA.
637 3356 : // Check for termination only if we are streaming up to commit_lsn
638 3356 : // (to pageserver).
639 3356 : if let EndWatch::Commit(_) = self.end_watch {
640 3356 : if let Some(remote_consistent_lsn) = self
641 3356 : .ws_guard
642 3356 : .walsenders
643 3356 : .get_ws_remote_consistent_lsn(self.ws_guard.id)
644 : {
645 3203 : if self.tli.should_walsender_stop(remote_consistent_lsn).await {
646 : // Terminate if there is nothing more to send.
647 : // Note that "ending streaming" part of the string is used by
648 : // pageserver to identify WalReceiverError::SuccessfulCompletion,
649 : // do not change this string without updating pageserver.
650 54 : return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
651 54 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
652 54 : self.appname, self.start_pos,
653 54 : )));
654 3149 : }
655 153 : }
656 0 : }
657 :
658 3302 : self.pgb
659 3302 : .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
660 3302 : wal_end: self.end_pos.0,
661 3302 : timestamp: get_current_timestamp(),
662 3302 : request_reply: true,
663 3302 : }))
664 0 : .await?;
665 : }
666 818247 : }
667 : }
668 :
669 : /// A half driving receiving replies.
670 : struct ReplyReader<IO> {
671 : reader: PostgresBackendReader<IO>,
672 : ws_guard: Arc<WalSenderGuard>,
673 : tli: Arc<Timeline>,
674 : }
675 :
676 : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
677 768 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
678 : loop {
679 3081881 : let msg = self.reader.read_copy_message().await?;
680 844604 : self.handle_feedback(&msg).await?
681 : }
682 349 : }
683 :
684 844604 : async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
685 844604 : match msg.first().cloned() {
686 21 : Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
687 21 : // Note: deserializing is on m[1..] because we skip the tag byte.
688 21 : let mut hs_feedback = HotStandbyFeedback::des(&msg[1..])
689 21 : .context("failed to deserialize HotStandbyFeedback")?;
690 : // TODO: xmin/catalog_xmin are serialized by walreceiver.c in this way:
691 : // pq_sendint32(&reply_message, xmin);
692 : // pq_sendint32(&reply_message, xmin_epoch);
693 : // So it is two big endian 32-bit words in low endian order!
694 21 : hs_feedback.xmin = (hs_feedback.xmin >> 32) | (hs_feedback.xmin << 32);
695 21 : hs_feedback.catalog_xmin =
696 21 : (hs_feedback.catalog_xmin >> 32) | (hs_feedback.catalog_xmin << 32);
697 21 : self.ws_guard
698 21 : .walsenders
699 21 : .record_hs_feedback(self.ws_guard.id, &hs_feedback);
700 : }
701 49159 : Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
702 49159 : let reply =
703 49159 : StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
704 49159 : self.ws_guard
705 49159 : .walsenders
706 49159 : .record_standby_reply(self.ws_guard.id, &reply);
707 : }
708 : Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
709 : // pageserver sends this.
710 : // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
711 795424 : let buf = Bytes::copy_from_slice(&msg[9..]);
712 795424 : let ps_feedback = PageserverFeedback::parse(buf);
713 :
714 0 : trace!("PageserverFeedback is {:?}", ps_feedback);
715 795424 : self.ws_guard
716 795424 : .walsenders
717 795424 : .record_ps_feedback(self.ws_guard.id, &ps_feedback);
718 795424 : self.tli
719 795424 : .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
720 131789 : .await;
721 : // in principle new remote_consistent_lsn could allow to
722 : // deactivate the timeline, but we check that regularly through
723 : // broker updated, not need to do it here
724 : }
725 0 : _ => warn!("unexpected message {:?}", msg),
726 : }
727 844603 : Ok(())
728 844603 : }
729 : }
730 :
731 : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
732 :
733 : /// Wait until we have available WAL > start_pos or timeout expires. Returns
734 : /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
735 : /// - Ok(None) if timeout expired;
736 : /// - Err in case of error -- only if 1) term changed while fetching in recovery
737 : /// mode 2) watch channel closed, which must never happen.
738 720442 : async fn wait_for_lsn(
739 720442 : rx: &mut EndWatch,
740 720442 : client_term: Option<Term>,
741 720442 : start_pos: Lsn,
742 720442 : ) -> anyhow::Result<Option<Lsn>> {
743 720442 : let res = timeout(POLL_STATE_TIMEOUT, async move {
744 : loop {
745 2572553 : let end_pos = rx.get();
746 2572553 : if end_pos > start_pos {
747 716395 : return Ok(end_pos);
748 1856158 : }
749 1856158 : if let EndWatch::Flush(rx) = rx {
750 32 : let curr_term = rx.borrow().term;
751 32 : if let Some(client_term) = client_term {
752 32 : if curr_term != client_term {
753 0 : bail!("term changed: requested {}, now {}", client_term, curr_term);
754 32 : }
755 0 : }
756 1856126 : }
757 2380903 : rx.changed().await?;
758 : }
759 720442 : })
760 2380903 : .await;
761 :
762 716395 : match res {
763 : // success
764 716395 : Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
765 : // error inside closure
766 0 : Ok(Err(err)) => Err(err),
767 : // timeout
768 3356 : Err(_) => Ok(None),
769 : }
770 719751 : }
771 :
772 : #[cfg(test)]
773 : mod tests {
774 : use postgres_protocol::PG_EPOCH;
775 : use utils::id::{TenantId, TimelineId};
776 :
777 : use super::*;
778 :
779 12 : fn mock_ttid() -> TenantTimelineId {
780 12 : TenantTimelineId {
781 12 : tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
782 12 : timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
783 12 : }
784 12 : }
785 :
786 12 : fn mock_addr() -> SocketAddr {
787 12 : "127.0.0.1:8080".parse().unwrap()
788 12 : }
789 :
790 : // add to wss specified feedback setting other fields to dummy values
791 12 : fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
792 12 : let walsender_state = WalSenderState {
793 12 : ttid: mock_ttid(),
794 12 : addr: mock_addr(),
795 12 : conn_id: 1,
796 12 : appname: None,
797 12 : feedback,
798 12 : };
799 12 : wss.slots.push(Some(walsender_state))
800 12 : }
801 :
802 : // form standby feedback with given hot standby feedback ts/xmin and the
803 : // rest set to dummy values.
804 8 : fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
805 8 : ReplicationFeedback::Standby(StandbyFeedback {
806 8 : reply: StandbyReply::empty(),
807 8 : hs_feedback: HotStandbyFeedback {
808 8 : ts,
809 8 : xmin,
810 8 : catalog_xmin: 0,
811 8 : },
812 8 : })
813 8 : }
814 :
815 : // test that hs aggregation works as expected
816 2 : #[test]
817 2 : fn test_hs_feedback_no_valid() {
818 2 : let mut wss = WalSendersShared::new();
819 2 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
820 2 : wss.update_reply_feedback();
821 2 : assert_eq!(
822 2 : wss.agg_standby_feedback.hs_feedback.xmin,
823 2 : INVALID_FULL_TRANSACTION_ID
824 2 : );
825 2 : }
826 :
827 2 : #[test]
828 2 : fn test_hs_feedback() {
829 2 : let mut wss = WalSendersShared::new();
830 2 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
831 2 : push_feedback(&mut wss, hs_feedback(1, 42));
832 2 : push_feedback(&mut wss, hs_feedback(1, 64));
833 2 : wss.update_reply_feedback();
834 2 : assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42);
835 2 : }
836 :
837 : // form pageserver feedback with given last_record_lsn / tli size and the
838 : // rest set to dummy values.
839 4 : fn ps_feedback(current_timeline_size: u64, last_received_lsn: Lsn) -> ReplicationFeedback {
840 4 : ReplicationFeedback::Pageserver(PageserverFeedback {
841 4 : current_timeline_size,
842 4 : last_received_lsn,
843 4 : disk_consistent_lsn: Lsn::INVALID,
844 4 : remote_consistent_lsn: Lsn::INVALID,
845 4 : replytime: *PG_EPOCH,
846 4 : })
847 4 : }
848 :
849 : // test that ps aggregation works as expected
850 2 : #[test]
851 2 : fn test_ps_feedback() {
852 2 : let mut wss = WalSendersShared::new();
853 2 : push_feedback(&mut wss, ps_feedback(8, Lsn(42)));
854 2 : push_feedback(&mut wss, ps_feedback(4, Lsn(84)));
855 2 : wss.update_ps_feedback();
856 2 : assert_eq!(wss.agg_ps_feedback.current_timeline_size, 4);
857 2 : assert_eq!(wss.agg_ps_feedback.last_received_lsn, Lsn(84));
858 2 : }
859 : }
|