Line data Source code
1 : //! This module implements the streaming side of replication protocol, starting
2 : //! with the "START_REPLICATION" message, and registry of walsenders.
3 :
4 : use crate::handler::SafekeeperPostgresHandler;
5 : use crate::metrics::RECEIVED_PS_FEEDBACKS;
6 : use crate::receive_wal::WalReceivers;
7 : use crate::safekeeper::{Term, TermLsn};
8 : use crate::timeline::WalResidentTimeline;
9 : use crate::wal_service::ConnectionId;
10 : use crate::wal_storage::WalReader;
11 : use crate::GlobalTimelines;
12 : use anyhow::{bail, Context as AnyhowContext};
13 : use bytes::Bytes;
14 : use parking_lot::Mutex;
15 : use postgres_backend::PostgresBackend;
16 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
17 : use postgres_ffi::get_current_timestamp;
18 : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
19 : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
20 : use serde::{Deserialize, Serialize};
21 : use tokio::io::{AsyncRead, AsyncWrite};
22 : use utils::failpoint_support;
23 : use utils::id::TenantTimelineId;
24 : use utils::pageserver_feedback::PageserverFeedback;
25 :
26 : use std::cmp::{max, min};
27 : use std::net::SocketAddr;
28 : use std::str;
29 : use std::sync::Arc;
30 : use std::time::Duration;
31 : use tokio::sync::watch::Receiver;
32 : use tokio::time::timeout;
33 : use tracing::*;
34 : use utils::{bin_ser::BeSer, lsn::Lsn};
35 :
36 : // See: https://www.postgresql.org/docs/13/protocol-replication.html
37 : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
38 : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
39 : // neon extension of replication protocol
40 : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
41 :
42 : type FullTransactionId = u64;
43 :
44 : /// Hot standby feedback received from replica
45 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
46 : pub struct HotStandbyFeedback {
47 : pub ts: TimestampTz,
48 : pub xmin: FullTransactionId,
49 : pub catalog_xmin: FullTransactionId,
50 : }
51 :
52 : const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
53 :
54 : impl HotStandbyFeedback {
55 4399 : pub fn empty() -> HotStandbyFeedback {
56 4399 : HotStandbyFeedback {
57 4399 : ts: 0,
58 4399 : xmin: 0,
59 4399 : catalog_xmin: 0,
60 4399 : }
61 4399 : }
62 : }
63 :
64 : /// Standby status update
65 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
66 : pub struct StandbyReply {
67 : pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
68 : pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
69 : pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
70 : pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
71 : pub reply_requested: bool,
72 : }
73 :
74 : impl StandbyReply {
75 8 : fn empty() -> Self {
76 8 : StandbyReply {
77 8 : write_lsn: Lsn::INVALID,
78 8 : flush_lsn: Lsn::INVALID,
79 8 : apply_lsn: Lsn::INVALID,
80 8 : reply_ts: 0,
81 8 : reply_requested: false,
82 8 : }
83 8 : }
84 : }
85 :
86 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
87 : pub struct StandbyFeedback {
88 : pub reply: StandbyReply,
89 : pub hs_feedback: HotStandbyFeedback,
90 : }
91 :
92 : impl StandbyFeedback {
93 2 : pub fn empty() -> Self {
94 2 : StandbyFeedback {
95 2 : reply: StandbyReply::empty(),
96 2 : hs_feedback: HotStandbyFeedback::empty(),
97 2 : }
98 2 : }
99 : }
100 :
101 : /// WalSenders registry. Timeline holds it (wrapped in Arc).
102 : pub struct WalSenders {
103 : mutex: Mutex<WalSendersShared>,
104 : walreceivers: Arc<WalReceivers>,
105 : }
106 :
107 : impl WalSenders {
108 0 : pub fn new(walreceivers: Arc<WalReceivers>) -> Arc<WalSenders> {
109 0 : Arc::new(WalSenders {
110 0 : mutex: Mutex::new(WalSendersShared::new()),
111 0 : walreceivers,
112 0 : })
113 0 : }
114 :
115 : /// Register new walsender. Returned guard provides access to the slot and
116 : /// automatically deregisters in Drop.
117 0 : fn register(
118 0 : self: &Arc<WalSenders>,
119 0 : ttid: TenantTimelineId,
120 0 : addr: SocketAddr,
121 0 : conn_id: ConnectionId,
122 0 : appname: Option<String>,
123 0 : ) -> WalSenderGuard {
124 0 : let slots = &mut self.mutex.lock().slots;
125 0 : let walsender_state = WalSenderState {
126 0 : ttid,
127 0 : addr,
128 0 : conn_id,
129 0 : appname,
130 0 : feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
131 0 : };
132 : // find empty slot or create new one
133 0 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
134 0 : slots[pos] = Some(walsender_state);
135 0 : pos
136 : } else {
137 0 : let pos = slots.len();
138 0 : slots.push(Some(walsender_state));
139 0 : pos
140 : };
141 0 : WalSenderGuard {
142 0 : id: pos,
143 0 : walsenders: self.clone(),
144 0 : }
145 0 : }
146 :
147 : /// Get state of all walsenders.
148 0 : pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
149 0 : self.mutex.lock().slots.iter().flatten().cloned().collect()
150 0 : }
151 :
152 : /// Get LSN of the most lagging pageserver receiver. Return None if there are no
153 : /// active walsenders.
154 0 : pub fn laggard_lsn(self: &Arc<WalSenders>) -> Option<Lsn> {
155 0 : self.mutex
156 0 : .lock()
157 0 : .slots
158 0 : .iter()
159 0 : .flatten()
160 0 : .filter_map(|s| match s.feedback {
161 0 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.last_received_lsn),
162 0 : ReplicationFeedback::Standby(_) => None,
163 0 : })
164 0 : .min()
165 0 : }
166 :
167 : /// Returns total counter of pageserver feedbacks received and last feedback.
168 0 : pub fn get_ps_feedback_stats(self: &Arc<WalSenders>) -> (u64, PageserverFeedback) {
169 0 : let shared = self.mutex.lock();
170 0 : (shared.ps_feedback_counter, shared.last_ps_feedback)
171 0 : }
172 :
173 : /// Get aggregated hot standby feedback (we send it to compute).
174 0 : pub fn get_hotstandby(self: &Arc<WalSenders>) -> StandbyFeedback {
175 0 : self.mutex.lock().agg_standby_feedback
176 0 : }
177 :
178 : /// Record new pageserver feedback, update aggregated values.
179 0 : fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
180 0 : let mut shared = self.mutex.lock();
181 0 : shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
182 0 : shared.last_ps_feedback = *feedback;
183 0 : shared.ps_feedback_counter += 1;
184 0 : drop(shared);
185 0 :
186 0 : RECEIVED_PS_FEEDBACKS.inc();
187 0 :
188 0 : // send feedback to connected walproposers
189 0 : self.walreceivers.broadcast_pageserver_feedback(*feedback);
190 0 : }
191 :
192 : /// Record standby reply.
193 0 : fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
194 0 : let mut shared = self.mutex.lock();
195 0 : let slot = shared.get_slot_mut(id);
196 0 : debug!(
197 0 : "Record standby reply: ts={} apply_lsn={}",
198 : reply.reply_ts, reply.apply_lsn
199 : );
200 0 : match &mut slot.feedback {
201 0 : ReplicationFeedback::Standby(sf) => sf.reply = *reply,
202 : ReplicationFeedback::Pageserver(_) => {
203 0 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
204 0 : reply: *reply,
205 0 : hs_feedback: HotStandbyFeedback::empty(),
206 0 : })
207 : }
208 : }
209 0 : }
210 :
211 : /// Record hot standby feedback, update aggregated value.
212 0 : fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
213 0 : let mut shared = self.mutex.lock();
214 0 : let slot = shared.get_slot_mut(id);
215 0 : match &mut slot.feedback {
216 0 : ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
217 : ReplicationFeedback::Pageserver(_) => {
218 0 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
219 0 : reply: StandbyReply::empty(),
220 0 : hs_feedback: *feedback,
221 0 : })
222 : }
223 : }
224 0 : shared.update_reply_feedback();
225 0 : }
226 :
227 : /// Get remote_consistent_lsn reported by the pageserver. Returns None if
228 : /// client is not pageserver.
229 0 : fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
230 0 : let shared = self.mutex.lock();
231 0 : let slot = shared.get_slot(id);
232 0 : match slot.feedback {
233 0 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
234 0 : _ => None,
235 : }
236 0 : }
237 :
238 : /// Unregister walsender.
239 0 : fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
240 0 : let mut shared = self.mutex.lock();
241 0 : shared.slots[id] = None;
242 0 : shared.update_reply_feedback();
243 0 : }
244 : }
245 :
246 : struct WalSendersShared {
247 : // aggregated over all walsenders value
248 : agg_standby_feedback: StandbyFeedback,
249 : // last feedback ever received from any pageserver, empty if none
250 : last_ps_feedback: PageserverFeedback,
251 : // total counter of pageserver feedbacks received
252 : ps_feedback_counter: u64,
253 : slots: Vec<Option<WalSenderState>>,
254 : }
255 :
256 : impl WalSendersShared {
257 2 : fn new() -> Self {
258 2 : WalSendersShared {
259 2 : agg_standby_feedback: StandbyFeedback::empty(),
260 2 : last_ps_feedback: PageserverFeedback::empty(),
261 2 : ps_feedback_counter: 0,
262 2 : slots: Vec::new(),
263 2 : }
264 2 : }
265 :
266 : /// Get content of provided id slot, it must exist.
267 0 : fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
268 0 : self.slots[id].as_ref().expect("walsender doesn't exist")
269 0 : }
270 :
271 : /// Get mut content of provided id slot, it must exist.
272 0 : fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
273 0 : self.slots[id].as_mut().expect("walsender doesn't exist")
274 0 : }
275 :
276 : /// Update aggregated hot standy and normal reply feedbacks. We just take min of valid xmins
277 : /// and ts.
278 2 : fn update_reply_feedback(&mut self) {
279 2 : let mut agg = HotStandbyFeedback::empty();
280 2 : let mut reply_agg = StandbyReply::empty();
281 4 : for ws_state in self.slots.iter().flatten() {
282 4 : if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
283 4 : let hs_feedback = standby_feedback.hs_feedback;
284 4 : // doing Option math like op1.iter().chain(op2.iter()).min()
285 4 : // would be nicer, but we serialize/deserialize this struct
286 4 : // directly, so leave as is for now
287 4 : if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
288 2 : if agg.xmin != INVALID_FULL_TRANSACTION_ID {
289 1 : agg.xmin = min(agg.xmin, hs_feedback.xmin);
290 1 : } else {
291 1 : agg.xmin = hs_feedback.xmin;
292 1 : }
293 2 : agg.ts = max(agg.ts, hs_feedback.ts);
294 2 : }
295 4 : if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
296 0 : if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
297 0 : agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
298 0 : } else {
299 0 : agg.catalog_xmin = hs_feedback.catalog_xmin;
300 0 : }
301 0 : agg.ts = max(agg.ts, hs_feedback.ts);
302 4 : }
303 4 : let reply = standby_feedback.reply;
304 4 : if reply.write_lsn != Lsn::INVALID {
305 0 : if reply_agg.write_lsn != Lsn::INVALID {
306 0 : reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn);
307 0 : } else {
308 0 : reply_agg.write_lsn = reply.write_lsn;
309 0 : }
310 4 : }
311 4 : if reply.flush_lsn != Lsn::INVALID {
312 0 : if reply_agg.flush_lsn != Lsn::INVALID {
313 0 : reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn);
314 0 : } else {
315 0 : reply_agg.flush_lsn = reply.flush_lsn;
316 0 : }
317 4 : }
318 4 : if reply.apply_lsn != Lsn::INVALID {
319 0 : if reply_agg.apply_lsn != Lsn::INVALID {
320 0 : reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn);
321 0 : } else {
322 0 : reply_agg.apply_lsn = reply.apply_lsn;
323 0 : }
324 4 : }
325 4 : if reply.reply_ts != 0 {
326 0 : if reply_agg.reply_ts != 0 {
327 0 : reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts);
328 0 : } else {
329 0 : reply_agg.reply_ts = reply.reply_ts;
330 0 : }
331 4 : }
332 0 : }
333 : }
334 2 : self.agg_standby_feedback = StandbyFeedback {
335 2 : reply: reply_agg,
336 2 : hs_feedback: agg,
337 2 : };
338 2 : }
339 : }
340 :
341 : // Serialized is used only for pretty printing in json.
342 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
343 : pub struct WalSenderState {
344 : ttid: TenantTimelineId,
345 : addr: SocketAddr,
346 : conn_id: ConnectionId,
347 : // postgres application_name
348 : appname: Option<String>,
349 : feedback: ReplicationFeedback,
350 : }
351 :
352 : // Receiver is either pageserver or regular standby, which have different
353 : // feedbacks.
354 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
355 : enum ReplicationFeedback {
356 : Pageserver(PageserverFeedback),
357 : Standby(StandbyFeedback),
358 : }
359 :
360 : // id of the occupied slot in WalSenders to access it (and save in the
361 : // WalSenderGuard). We could give Arc directly to the slot, but there is not
362 : // much sense in that as values aggregation which is performed on each feedback
363 : // receival iterates over all walsenders.
364 : pub type WalSenderId = usize;
365 :
366 : /// Scope guard to access slot in WalSenders registry and unregister from it in
367 : /// Drop.
368 : pub struct WalSenderGuard {
369 : id: WalSenderId,
370 : walsenders: Arc<WalSenders>,
371 : }
372 :
373 : impl Drop for WalSenderGuard {
374 0 : fn drop(&mut self) {
375 0 : self.walsenders.unregister(self.id);
376 0 : }
377 : }
378 :
379 : impl SafekeeperPostgresHandler {
380 : /// Wrapper around handle_start_replication_guts handling result. Error is
381 : /// handled here while we're still in walsender ttid span; with API
382 : /// extension, this can probably be moved into postgres_backend.
383 0 : pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
384 0 : &mut self,
385 0 : pgb: &mut PostgresBackend<IO>,
386 0 : start_pos: Lsn,
387 0 : term: Option<Term>,
388 0 : ) -> Result<(), QueryError> {
389 0 : let tli = GlobalTimelines::get(self.ttid).map_err(|e| QueryError::Other(e.into()))?;
390 0 : let residence_guard = tli.wal_residence_guard().await?;
391 :
392 0 : if let Err(end) = self
393 0 : .handle_start_replication_guts(pgb, start_pos, term, residence_guard)
394 0 : .await
395 : {
396 0 : let info = tli.get_safekeeper_info(&self.conf).await;
397 : // Log the result and probably send it to the client, closing the stream.
398 0 : pgb.handle_copy_stream_end(end)
399 0 : .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.flush_lsn)))
400 0 : .await;
401 0 : }
402 0 : Ok(())
403 0 : }
404 :
405 0 : pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
406 0 : &mut self,
407 0 : pgb: &mut PostgresBackend<IO>,
408 0 : start_pos: Lsn,
409 0 : term: Option<Term>,
410 0 : tli: WalResidentTimeline,
411 0 : ) -> Result<(), CopyStreamHandlerEnd> {
412 0 : let appname = self.appname.clone();
413 0 :
414 0 : // Use a guard object to remove our entry from the timeline when we are done.
415 0 : let ws_guard = Arc::new(tli.get_walsenders().register(
416 0 : self.ttid,
417 0 : *pgb.get_peer_addr(),
418 0 : self.conn_id,
419 0 : self.appname.clone(),
420 0 : ));
421 :
422 : // Walsender can operate in one of two modes which we select by
423 : // application_name: give only committed WAL (used by pageserver) or all
424 : // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
425 : // The second case is always driven by a consensus leader which term
426 : // must be supplied.
427 0 : let end_watch = if term.is_some() {
428 0 : EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
429 : } else {
430 0 : EndWatch::Commit(tli.get_commit_lsn_watch_rx())
431 : };
432 : // we don't check term here; it will be checked on first waiting/WAL reading anyway.
433 0 : let end_pos = end_watch.get();
434 0 :
435 0 : if end_pos < start_pos {
436 0 : warn!(
437 0 : "requested start_pos {} is ahead of available WAL end_pos {}",
438 : start_pos, end_pos
439 : );
440 0 : }
441 :
442 0 : info!(
443 0 : "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}",
444 : start_pos,
445 : end_pos,
446 0 : matches!(end_watch, EndWatch::Flush(_)),
447 : appname
448 : );
449 :
450 : // switch to copy
451 0 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
452 :
453 0 : let wal_reader = tli.get_walreader(start_pos).await?;
454 :
455 : // Split to concurrently receive and send data; replies are generally
456 : // not synchronized with sends, so this avoids deadlocks.
457 0 : let reader = pgb.split().context("START_REPLICATION split")?;
458 :
459 0 : let tli_cancel = tli.cancel.clone();
460 :
461 0 : let mut sender = WalSender {
462 0 : pgb,
463 0 : // should succeed since we're already holding another guard
464 0 : tli: tli.wal_residence_guard().await?,
465 0 : appname,
466 0 : start_pos,
467 0 : end_pos,
468 0 : term,
469 0 : end_watch,
470 0 : ws_guard: ws_guard.clone(),
471 0 : wal_reader,
472 0 : send_buf: vec![0u8; MAX_SEND_SIZE],
473 0 : };
474 0 : let mut reply_reader = ReplyReader {
475 0 : reader,
476 0 : ws_guard: ws_guard.clone(),
477 0 : tli,
478 0 : };
479 :
480 0 : let res = tokio::select! {
481 : // todo: add read|write .context to these errors
482 0 : r = sender.run() => r,
483 0 : r = reply_reader.run() => r,
484 0 : _ = tli_cancel.cancelled() => {
485 0 : return Err(CopyStreamHandlerEnd::Cancelled);
486 : }
487 : };
488 :
489 0 : let ws_state = ws_guard
490 0 : .walsenders
491 0 : .mutex
492 0 : .lock()
493 0 : .get_slot(ws_guard.id)
494 0 : .clone();
495 0 : info!(
496 0 : "finished streaming to {}, feedback={:?}",
497 : ws_state.addr, ws_state.feedback,
498 : );
499 :
500 : // Join pg backend back.
501 0 : pgb.unsplit(reply_reader.reader)?;
502 :
503 0 : res
504 0 : }
505 : }
506 :
507 : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
508 : /// given term (recovery by walproposer or peer safekeeper).
509 : enum EndWatch {
510 : Commit(Receiver<Lsn>),
511 : Flush(Receiver<TermLsn>),
512 : }
513 :
514 : impl EndWatch {
515 : /// Get current end of WAL.
516 0 : fn get(&self) -> Lsn {
517 0 : match self {
518 0 : EndWatch::Commit(r) => *r.borrow(),
519 0 : EndWatch::Flush(r) => r.borrow().lsn,
520 : }
521 0 : }
522 :
523 : /// Wait for the update.
524 0 : async fn changed(&mut self) -> anyhow::Result<()> {
525 0 : match self {
526 0 : EndWatch::Commit(r) => r.changed().await?,
527 0 : EndWatch::Flush(r) => r.changed().await?,
528 : }
529 0 : Ok(())
530 0 : }
531 : }
532 :
533 : /// A half driving sending WAL.
534 : struct WalSender<'a, IO> {
535 : pgb: &'a mut PostgresBackend<IO>,
536 : tli: WalResidentTimeline,
537 : appname: Option<String>,
538 : // Position since which we are sending next chunk.
539 : start_pos: Lsn,
540 : // WAL up to this position is known to be locally available.
541 : // Usually this is the same as the latest commit_lsn, but in case of
542 : // walproposer recovery, this is flush_lsn.
543 : //
544 : // We send this LSN to the receiver as wal_end, so that it knows how much
545 : // WAL this safekeeper has. This LSN should be as fresh as possible.
546 : end_pos: Lsn,
547 : /// When streaming uncommitted part, the term the client acts as the leader
548 : /// in. Streaming is stopped if local term changes to a different (higher)
549 : /// value.
550 : term: Option<Term>,
551 : /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
552 : end_watch: EndWatch,
553 : ws_guard: Arc<WalSenderGuard>,
554 : wal_reader: WalReader,
555 : // buffer for readling WAL into to send it
556 : send_buf: Vec<u8>,
557 : }
558 :
559 : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
560 :
561 : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
562 : /// Send WAL until
563 : /// - an error occurs
564 : /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
565 : /// - timeline's cancellation token fires
566 : ///
567 : /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
568 : /// convenience.
569 0 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
570 : loop {
571 : // Wait for the next portion if it is not there yet, or just
572 : // update our end of WAL available for sending value, we
573 : // communicate it to the receiver.
574 0 : self.wait_wal().await?;
575 0 : assert!(
576 0 : self.end_pos > self.start_pos,
577 0 : "nothing to send after waiting for WAL"
578 : );
579 :
580 : // try to send as much as available, capped by MAX_SEND_SIZE
581 0 : let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
582 0 : // if we went behind available WAL, back off
583 0 : if chunk_end_pos >= self.end_pos {
584 0 : chunk_end_pos = self.end_pos;
585 0 : } else {
586 0 : // If sending not up to end pos, round down to page boundary to
587 0 : // avoid breaking WAL record not at page boundary, as protocol
588 0 : // demands. See walsender.c (XLogSendPhysical).
589 0 : chunk_end_pos = chunk_end_pos
590 0 : .checked_sub(chunk_end_pos.block_offset())
591 0 : .unwrap();
592 0 : }
593 0 : let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
594 0 : let send_buf = &mut self.send_buf[..send_size];
595 : let send_size: usize;
596 : {
597 : // If uncommitted part is being pulled, check that the term is
598 : // still the expected one.
599 0 : let _term_guard = if let Some(t) = self.term {
600 0 : Some(self.tli.acquire_term(t).await?)
601 : } else {
602 0 : None
603 : };
604 : // Read WAL into buffer. send_size can be additionally capped to
605 : // segment boundary here.
606 0 : send_size = self.wal_reader.read(send_buf).await?
607 : };
608 0 : let send_buf = &send_buf[..send_size];
609 0 :
610 0 : // and send it, while respecting Timeline::cancel
611 0 : let msg = BeMessage::XLogData(XLogDataBody {
612 0 : wal_start: self.start_pos.0,
613 0 : wal_end: self.end_pos.0,
614 0 : timestamp: get_current_timestamp(),
615 0 : data: send_buf,
616 0 : });
617 0 : self.pgb.write_message(&msg).await?;
618 :
619 0 : if let Some(appname) = &self.appname {
620 0 : if appname == "replica" {
621 0 : failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
622 0 : }
623 0 : }
624 0 : trace!(
625 0 : "sent {} bytes of WAL {}-{}",
626 0 : send_size,
627 0 : self.start_pos,
628 0 : self.start_pos + send_size as u64
629 : );
630 0 : self.start_pos += send_size as u64;
631 : }
632 0 : }
633 :
634 : /// wait until we have WAL to stream, sending keepalives and checking for
635 : /// exit in the meanwhile
636 0 : async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
637 : loop {
638 0 : self.end_pos = self.end_watch.get();
639 0 : let have_something_to_send = (|| {
640 0 : fail::fail_point!(
641 0 : "sk-pause-send",
642 0 : self.appname.as_deref() != Some("pageserver"),
643 0 : |_| { false }
644 0 : );
645 0 : self.end_pos > self.start_pos
646 0 : })();
647 0 :
648 0 : if have_something_to_send {
649 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
650 0 : return Ok(());
651 0 : }
652 :
653 : // Wait for WAL to appear, now self.end_pos == self.start_pos.
654 0 : if let Some(lsn) = self.wait_for_lsn().await? {
655 0 : self.end_pos = lsn;
656 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
657 0 : return Ok(());
658 0 : }
659 0 :
660 0 : // Timed out waiting for WAL, check for termination and send KA.
661 0 : // Check for termination only if we are streaming up to commit_lsn
662 0 : // (to pageserver).
663 0 : if let EndWatch::Commit(_) = self.end_watch {
664 0 : if let Some(remote_consistent_lsn) = self
665 0 : .ws_guard
666 0 : .walsenders
667 0 : .get_ws_remote_consistent_lsn(self.ws_guard.id)
668 : {
669 0 : if self.tli.should_walsender_stop(remote_consistent_lsn).await {
670 : // Terminate if there is nothing more to send.
671 : // Note that "ending streaming" part of the string is used by
672 : // pageserver to identify WalReceiverError::SuccessfulCompletion,
673 : // do not change this string without updating pageserver.
674 0 : return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
675 0 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
676 0 : self.appname, self.start_pos,
677 0 : )));
678 0 : }
679 0 : }
680 0 : }
681 :
682 0 : let msg = BeMessage::KeepAlive(WalSndKeepAlive {
683 0 : wal_end: self.end_pos.0,
684 0 : timestamp: get_current_timestamp(),
685 0 : request_reply: true,
686 0 : });
687 0 :
688 0 : self.pgb.write_message(&msg).await?;
689 : }
690 0 : }
691 :
692 : /// Wait until we have available WAL > start_pos or timeout expires. Returns
693 : /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
694 : /// - Ok(None) if timeout expired;
695 : /// - Err in case of error -- only if 1) term changed while fetching in recovery
696 : /// mode 2) watch channel closed, which must never happen.
697 0 : async fn wait_for_lsn(&mut self) -> anyhow::Result<Option<Lsn>> {
698 0 : let fp = (|| {
699 0 : fail::fail_point!(
700 0 : "sk-pause-send",
701 0 : self.appname.as_deref() != Some("pageserver"),
702 0 : |_| { true }
703 0 : );
704 0 : false
705 0 : })();
706 0 : if fp {
707 0 : tokio::time::sleep(POLL_STATE_TIMEOUT).await;
708 0 : return Ok(None);
709 0 : }
710 :
711 0 : let res = timeout(POLL_STATE_TIMEOUT, async move {
712 : loop {
713 0 : let end_pos = self.end_watch.get();
714 0 : if end_pos > self.start_pos {
715 0 : return Ok(end_pos);
716 0 : }
717 0 : if let EndWatch::Flush(rx) = &self.end_watch {
718 0 : let curr_term = rx.borrow().term;
719 0 : if let Some(client_term) = self.term {
720 0 : if curr_term != client_term {
721 0 : bail!("term changed: requested {}, now {}", client_term, curr_term);
722 0 : }
723 0 : }
724 0 : }
725 0 : self.end_watch.changed().await?;
726 : }
727 0 : })
728 0 : .await;
729 :
730 0 : match res {
731 : // success
732 0 : Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
733 : // error inside closure
734 0 : Ok(Err(err)) => Err(err),
735 : // timeout
736 0 : Err(_) => Ok(None),
737 : }
738 0 : }
739 : }
740 :
741 : /// A half driving receiving replies.
742 : struct ReplyReader<IO> {
743 : reader: PostgresBackendReader<IO>,
744 : ws_guard: Arc<WalSenderGuard>,
745 : tli: WalResidentTimeline,
746 : }
747 :
748 : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
749 0 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
750 : loop {
751 0 : let msg = self.reader.read_copy_message().await?;
752 0 : self.handle_feedback(&msg).await?
753 : }
754 0 : }
755 :
756 0 : async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
757 0 : match msg.first().cloned() {
758 : Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
759 : // Note: deserializing is on m[1..] because we skip the tag byte.
760 0 : let mut hs_feedback = HotStandbyFeedback::des(&msg[1..])
761 0 : .context("failed to deserialize HotStandbyFeedback")?;
762 : // TODO: xmin/catalog_xmin are serialized by walreceiver.c in this way:
763 : // pq_sendint32(&reply_message, xmin);
764 : // pq_sendint32(&reply_message, xmin_epoch);
765 : // So it is two big endian 32-bit words in low endian order!
766 0 : hs_feedback.xmin = hs_feedback.xmin.rotate_left(32);
767 0 : hs_feedback.catalog_xmin = hs_feedback.catalog_xmin.rotate_left(32);
768 0 : self.ws_guard
769 0 : .walsenders
770 0 : .record_hs_feedback(self.ws_guard.id, &hs_feedback);
771 : }
772 : Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
773 0 : let reply =
774 0 : StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
775 0 : self.ws_guard
776 0 : .walsenders
777 0 : .record_standby_reply(self.ws_guard.id, &reply);
778 : }
779 : Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
780 : // pageserver sends this.
781 : // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
782 0 : let buf = Bytes::copy_from_slice(&msg[9..]);
783 0 : let ps_feedback = PageserverFeedback::parse(buf);
784 0 :
785 0 : trace!("PageserverFeedback is {:?}", ps_feedback);
786 0 : self.ws_guard
787 0 : .walsenders
788 0 : .record_ps_feedback(self.ws_guard.id, &ps_feedback);
789 0 : self.tli
790 0 : .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
791 0 : .await;
792 : // in principle new remote_consistent_lsn could allow to
793 : // deactivate the timeline, but we check that regularly through
794 : // broker updated, not need to do it here
795 : }
796 0 : _ => warn!("unexpected message {:?}", msg),
797 : }
798 0 : Ok(())
799 0 : }
800 : }
801 :
802 : #[cfg(test)]
803 : mod tests {
804 : use utils::id::{TenantId, TimelineId};
805 :
806 : use super::*;
807 :
808 4 : fn mock_ttid() -> TenantTimelineId {
809 4 : TenantTimelineId {
810 4 : tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
811 4 : timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
812 4 : }
813 4 : }
814 :
815 4 : fn mock_addr() -> SocketAddr {
816 4 : "127.0.0.1:8080".parse().unwrap()
817 4 : }
818 :
819 : // add to wss specified feedback setting other fields to dummy values
820 4 : fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
821 4 : let walsender_state = WalSenderState {
822 4 : ttid: mock_ttid(),
823 4 : addr: mock_addr(),
824 4 : conn_id: 1,
825 4 : appname: None,
826 4 : feedback,
827 4 : };
828 4 : wss.slots.push(Some(walsender_state))
829 4 : }
830 :
831 : // form standby feedback with given hot standby feedback ts/xmin and the
832 : // rest set to dummy values.
833 4 : fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
834 4 : ReplicationFeedback::Standby(StandbyFeedback {
835 4 : reply: StandbyReply::empty(),
836 4 : hs_feedback: HotStandbyFeedback {
837 4 : ts,
838 4 : xmin,
839 4 : catalog_xmin: 0,
840 4 : },
841 4 : })
842 4 : }
843 :
844 : // test that hs aggregation works as expected
845 : #[test]
846 1 : fn test_hs_feedback_no_valid() {
847 1 : let mut wss = WalSendersShared::new();
848 1 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
849 1 : wss.update_reply_feedback();
850 1 : assert_eq!(
851 1 : wss.agg_standby_feedback.hs_feedback.xmin,
852 1 : INVALID_FULL_TRANSACTION_ID
853 1 : );
854 1 : }
855 :
856 : #[test]
857 1 : fn test_hs_feedback() {
858 1 : let mut wss = WalSendersShared::new();
859 1 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
860 1 : push_feedback(&mut wss, hs_feedback(1, 42));
861 1 : push_feedback(&mut wss, hs_feedback(1, 64));
862 1 : wss.update_reply_feedback();
863 1 : assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42);
864 1 : }
865 : }
|