Line data Source code
1 : //! This module implements the streaming side of replication protocol, starting
2 : //! with the "START_REPLICATION" message, and registry of walsenders.
3 :
4 : use crate::handler::SafekeeperPostgresHandler;
5 : use crate::metrics::RECEIVED_PS_FEEDBACKS;
6 : use crate::receive_wal::WalReceivers;
7 : use crate::safekeeper::TermLsn;
8 : use crate::send_interpreted_wal::InterpretedWalSender;
9 : use crate::timeline::WalResidentTimeline;
10 : use crate::wal_reader_stream::WalReaderStreamBuilder;
11 : use crate::wal_storage::WalReader;
12 : use anyhow::{bail, Context as AnyhowContext};
13 : use bytes::Bytes;
14 : use futures::future::Either;
15 : use parking_lot::Mutex;
16 : use postgres_backend::PostgresBackend;
17 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
18 : use postgres_ffi::get_current_timestamp;
19 : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
20 : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
21 : use safekeeper_api::models::{
22 : ConnectionId, HotStandbyFeedback, ReplicationFeedback, StandbyFeedback, StandbyReply,
23 : WalSenderState, INVALID_FULL_TRANSACTION_ID,
24 : };
25 : use safekeeper_api::Term;
26 : use tokio::io::{AsyncRead, AsyncWrite};
27 : use utils::failpoint_support;
28 : use utils::id::TenantTimelineId;
29 : use utils::pageserver_feedback::PageserverFeedback;
30 : use utils::postgres_client::PostgresClientProtocol;
31 :
32 : use std::cmp::{max, min};
33 : use std::net::SocketAddr;
34 : use std::sync::Arc;
35 : use std::time::Duration;
36 : use tokio::sync::watch::Receiver;
37 : use tokio::time::timeout;
38 : use tracing::*;
39 : use utils::{bin_ser::BeSer, lsn::Lsn};
40 :
41 : // See: https://www.postgresql.org/docs/13/protocol-replication.html
42 : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
43 : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
44 : // neon extension of replication protocol
45 : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
46 :
47 : /// WalSenders registry. Timeline holds it (wrapped in Arc).
48 : pub struct WalSenders {
49 : mutex: Mutex<WalSendersShared>,
50 : walreceivers: Arc<WalReceivers>,
51 : }
52 :
53 : impl WalSenders {
54 0 : pub fn new(walreceivers: Arc<WalReceivers>) -> Arc<WalSenders> {
55 0 : Arc::new(WalSenders {
56 0 : mutex: Mutex::new(WalSendersShared::new()),
57 0 : walreceivers,
58 0 : })
59 0 : }
60 :
61 : /// Register new walsender. Returned guard provides access to the slot and
62 : /// automatically deregisters in Drop.
63 0 : fn register(
64 0 : self: &Arc<WalSenders>,
65 0 : ttid: TenantTimelineId,
66 0 : addr: SocketAddr,
67 0 : conn_id: ConnectionId,
68 0 : appname: Option<String>,
69 0 : ) -> WalSenderGuard {
70 0 : let slots = &mut self.mutex.lock().slots;
71 0 : let walsender_state = WalSenderState {
72 0 : ttid,
73 0 : addr,
74 0 : conn_id,
75 0 : appname,
76 0 : feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
77 0 : };
78 : // find empty slot or create new one
79 0 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
80 0 : slots[pos] = Some(walsender_state);
81 0 : pos
82 : } else {
83 0 : let pos = slots.len();
84 0 : slots.push(Some(walsender_state));
85 0 : pos
86 : };
87 0 : WalSenderGuard {
88 0 : id: pos,
89 0 : walsenders: self.clone(),
90 0 : }
91 0 : }
92 :
93 : /// Get state of all walsenders.
94 0 : pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
95 0 : self.mutex.lock().slots.iter().flatten().cloned().collect()
96 0 : }
97 :
98 : /// Get LSN of the most lagging pageserver receiver. Return None if there are no
99 : /// active walsenders.
100 0 : pub fn laggard_lsn(self: &Arc<WalSenders>) -> Option<Lsn> {
101 0 : self.mutex
102 0 : .lock()
103 0 : .slots
104 0 : .iter()
105 0 : .flatten()
106 0 : .filter_map(|s| match s.feedback {
107 0 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.last_received_lsn),
108 0 : ReplicationFeedback::Standby(_) => None,
109 0 : })
110 0 : .min()
111 0 : }
112 :
113 : /// Returns total counter of pageserver feedbacks received and last feedback.
114 0 : pub fn get_ps_feedback_stats(self: &Arc<WalSenders>) -> (u64, PageserverFeedback) {
115 0 : let shared = self.mutex.lock();
116 0 : (shared.ps_feedback_counter, shared.last_ps_feedback)
117 0 : }
118 :
119 : /// Get aggregated hot standby feedback (we send it to compute).
120 0 : pub fn get_hotstandby(self: &Arc<WalSenders>) -> StandbyFeedback {
121 0 : self.mutex.lock().agg_standby_feedback
122 0 : }
123 :
124 : /// Record new pageserver feedback, update aggregated values.
125 0 : fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
126 0 : let mut shared = self.mutex.lock();
127 0 : shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
128 0 : shared.last_ps_feedback = *feedback;
129 0 : shared.ps_feedback_counter += 1;
130 0 : drop(shared);
131 0 :
132 0 : RECEIVED_PS_FEEDBACKS.inc();
133 0 :
134 0 : // send feedback to connected walproposers
135 0 : self.walreceivers.broadcast_pageserver_feedback(*feedback);
136 0 : }
137 :
138 : /// Record standby reply.
139 0 : fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
140 0 : let mut shared = self.mutex.lock();
141 0 : let slot = shared.get_slot_mut(id);
142 0 : debug!(
143 0 : "Record standby reply: ts={} apply_lsn={}",
144 : reply.reply_ts, reply.apply_lsn
145 : );
146 0 : match &mut slot.feedback {
147 0 : ReplicationFeedback::Standby(sf) => sf.reply = *reply,
148 : ReplicationFeedback::Pageserver(_) => {
149 0 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
150 0 : reply: *reply,
151 0 : hs_feedback: HotStandbyFeedback::empty(),
152 0 : })
153 : }
154 : }
155 0 : }
156 :
157 : /// Record hot standby feedback, update aggregated value.
158 0 : fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
159 0 : let mut shared = self.mutex.lock();
160 0 : let slot = shared.get_slot_mut(id);
161 0 : match &mut slot.feedback {
162 0 : ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
163 : ReplicationFeedback::Pageserver(_) => {
164 0 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
165 0 : reply: StandbyReply::empty(),
166 0 : hs_feedback: *feedback,
167 0 : })
168 : }
169 : }
170 0 : shared.update_reply_feedback();
171 0 : }
172 :
173 : /// Get remote_consistent_lsn reported by the pageserver. Returns None if
174 : /// client is not pageserver.
175 0 : pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
176 0 : let shared = self.mutex.lock();
177 0 : let slot = shared.get_slot(id);
178 0 : match slot.feedback {
179 0 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
180 0 : _ => None,
181 : }
182 0 : }
183 :
184 : /// Unregister walsender.
185 0 : fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
186 0 : let mut shared = self.mutex.lock();
187 0 : shared.slots[id] = None;
188 0 : shared.update_reply_feedback();
189 0 : }
190 : }
191 :
192 : struct WalSendersShared {
193 : // aggregated over all walsenders value
194 : agg_standby_feedback: StandbyFeedback,
195 : // last feedback ever received from any pageserver, empty if none
196 : last_ps_feedback: PageserverFeedback,
197 : // total counter of pageserver feedbacks received
198 : ps_feedback_counter: u64,
199 : slots: Vec<Option<WalSenderState>>,
200 : }
201 :
202 : impl WalSendersShared {
203 2 : fn new() -> Self {
204 2 : WalSendersShared {
205 2 : agg_standby_feedback: StandbyFeedback::empty(),
206 2 : last_ps_feedback: PageserverFeedback::empty(),
207 2 : ps_feedback_counter: 0,
208 2 : slots: Vec::new(),
209 2 : }
210 2 : }
211 :
212 : /// Get content of provided id slot, it must exist.
213 0 : fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
214 0 : self.slots[id].as_ref().expect("walsender doesn't exist")
215 0 : }
216 :
217 : /// Get mut content of provided id slot, it must exist.
218 0 : fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
219 0 : self.slots[id].as_mut().expect("walsender doesn't exist")
220 0 : }
221 :
222 : /// Update aggregated hot standy and normal reply feedbacks. We just take min of valid xmins
223 : /// and ts.
224 2 : fn update_reply_feedback(&mut self) {
225 2 : let mut agg = HotStandbyFeedback::empty();
226 2 : let mut reply_agg = StandbyReply::empty();
227 4 : for ws_state in self.slots.iter().flatten() {
228 4 : if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
229 4 : let hs_feedback = standby_feedback.hs_feedback;
230 4 : // doing Option math like op1.iter().chain(op2.iter()).min()
231 4 : // would be nicer, but we serialize/deserialize this struct
232 4 : // directly, so leave as is for now
233 4 : if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
234 2 : if agg.xmin != INVALID_FULL_TRANSACTION_ID {
235 1 : agg.xmin = min(agg.xmin, hs_feedback.xmin);
236 1 : } else {
237 1 : agg.xmin = hs_feedback.xmin;
238 1 : }
239 2 : agg.ts = max(agg.ts, hs_feedback.ts);
240 2 : }
241 4 : if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
242 0 : if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
243 0 : agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
244 0 : } else {
245 0 : agg.catalog_xmin = hs_feedback.catalog_xmin;
246 0 : }
247 0 : agg.ts = max(agg.ts, hs_feedback.ts);
248 4 : }
249 4 : let reply = standby_feedback.reply;
250 4 : if reply.write_lsn != Lsn::INVALID {
251 0 : if reply_agg.write_lsn != Lsn::INVALID {
252 0 : reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn);
253 0 : } else {
254 0 : reply_agg.write_lsn = reply.write_lsn;
255 0 : }
256 4 : }
257 4 : if reply.flush_lsn != Lsn::INVALID {
258 0 : if reply_agg.flush_lsn != Lsn::INVALID {
259 0 : reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn);
260 0 : } else {
261 0 : reply_agg.flush_lsn = reply.flush_lsn;
262 0 : }
263 4 : }
264 4 : if reply.apply_lsn != Lsn::INVALID {
265 0 : if reply_agg.apply_lsn != Lsn::INVALID {
266 0 : reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn);
267 0 : } else {
268 0 : reply_agg.apply_lsn = reply.apply_lsn;
269 0 : }
270 4 : }
271 4 : if reply.reply_ts != 0 {
272 0 : if reply_agg.reply_ts != 0 {
273 0 : reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts);
274 0 : } else {
275 0 : reply_agg.reply_ts = reply.reply_ts;
276 0 : }
277 4 : }
278 0 : }
279 : }
280 2 : self.agg_standby_feedback = StandbyFeedback {
281 2 : reply: reply_agg,
282 2 : hs_feedback: agg,
283 2 : };
284 2 : }
285 : }
286 :
287 : // id of the occupied slot in WalSenders to access it (and save in the
288 : // WalSenderGuard). We could give Arc directly to the slot, but there is not
289 : // much sense in that as values aggregation which is performed on each feedback
290 : // receival iterates over all walsenders.
291 : pub type WalSenderId = usize;
292 :
293 : /// Scope guard to access slot in WalSenders registry and unregister from it in
294 : /// Drop.
295 : pub struct WalSenderGuard {
296 : id: WalSenderId,
297 : walsenders: Arc<WalSenders>,
298 : }
299 :
300 : impl WalSenderGuard {
301 0 : pub fn id(&self) -> WalSenderId {
302 0 : self.id
303 0 : }
304 :
305 0 : pub fn walsenders(&self) -> &Arc<WalSenders> {
306 0 : &self.walsenders
307 0 : }
308 : }
309 :
310 : impl Drop for WalSenderGuard {
311 0 : fn drop(&mut self) {
312 0 : self.walsenders.unregister(self.id);
313 0 : }
314 : }
315 :
316 : impl SafekeeperPostgresHandler {
317 : /// Wrapper around handle_start_replication_guts handling result. Error is
318 : /// handled here while we're still in walsender ttid span; with API
319 : /// extension, this can probably be moved into postgres_backend.
320 0 : pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
321 0 : &mut self,
322 0 : pgb: &mut PostgresBackend<IO>,
323 0 : start_pos: Lsn,
324 0 : term: Option<Term>,
325 0 : ) -> Result<(), QueryError> {
326 0 : let tli = self
327 0 : .global_timelines
328 0 : .get(self.ttid)
329 0 : .map_err(|e| QueryError::Other(e.into()))?;
330 0 : let residence_guard = tli.wal_residence_guard().await?;
331 :
332 0 : if let Err(end) = self
333 0 : .handle_start_replication_guts(pgb, start_pos, term, residence_guard)
334 0 : .await
335 : {
336 0 : let info = tli.get_safekeeper_info(&self.conf).await;
337 : // Log the result and probably send it to the client, closing the stream.
338 0 : pgb.handle_copy_stream_end(end)
339 0 : .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.flush_lsn)))
340 0 : .await;
341 0 : }
342 0 : Ok(())
343 0 : }
344 :
345 0 : pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
346 0 : &mut self,
347 0 : pgb: &mut PostgresBackend<IO>,
348 0 : start_pos: Lsn,
349 0 : term: Option<Term>,
350 0 : tli: WalResidentTimeline,
351 0 : ) -> Result<(), CopyStreamHandlerEnd> {
352 0 : let appname = self.appname.clone();
353 0 :
354 0 : // Use a guard object to remove our entry from the timeline when we are done.
355 0 : let ws_guard = Arc::new(tli.get_walsenders().register(
356 0 : self.ttid,
357 0 : *pgb.get_peer_addr(),
358 0 : self.conn_id,
359 0 : self.appname.clone(),
360 0 : ));
361 :
362 : // Walsender can operate in one of two modes which we select by
363 : // application_name: give only committed WAL (used by pageserver) or all
364 : // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
365 : // The second case is always driven by a consensus leader which term
366 : // must be supplied.
367 0 : let end_watch = if term.is_some() {
368 0 : EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
369 : } else {
370 0 : EndWatch::Commit(tli.get_commit_lsn_watch_rx())
371 : };
372 : // we don't check term here; it will be checked on first waiting/WAL reading anyway.
373 0 : let end_pos = end_watch.get();
374 0 :
375 0 : if end_pos < start_pos {
376 0 : warn!(
377 0 : "requested start_pos {} is ahead of available WAL end_pos {}",
378 : start_pos, end_pos
379 : );
380 0 : }
381 :
382 0 : info!(
383 0 : "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={:?}",
384 : start_pos,
385 : end_pos,
386 0 : matches!(end_watch, EndWatch::Flush(_)),
387 : appname,
388 0 : self.protocol(),
389 : );
390 :
391 : // switch to copy
392 0 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
393 :
394 0 : let wal_reader = tli.get_walreader(start_pos).await?;
395 :
396 : // Split to concurrently receive and send data; replies are generally
397 : // not synchronized with sends, so this avoids deadlocks.
398 0 : let reader = pgb.split().context("START_REPLICATION split")?;
399 :
400 0 : let send_fut = match self.protocol() {
401 : PostgresClientProtocol::Vanilla => {
402 0 : let sender = WalSender {
403 0 : pgb,
404 0 : // should succeed since we're already holding another guard
405 0 : tli: tli.wal_residence_guard().await?,
406 0 : appname,
407 0 : start_pos,
408 0 : end_pos,
409 0 : term,
410 0 : end_watch,
411 0 : ws_guard: ws_guard.clone(),
412 0 : wal_reader,
413 0 : send_buf: vec![0u8; MAX_SEND_SIZE],
414 0 : };
415 0 :
416 0 : Either::Left(sender.run())
417 : }
418 : PostgresClientProtocol::Interpreted {
419 0 : format,
420 0 : compression,
421 : } => {
422 0 : let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000;
423 0 : let end_watch_view = end_watch.view();
424 0 : let wal_stream_builder = WalReaderStreamBuilder {
425 0 : tli: tli.wal_residence_guard().await?,
426 0 : start_pos,
427 0 : end_pos,
428 0 : term,
429 0 : end_watch,
430 0 : wal_sender_guard: ws_guard.clone(),
431 0 : };
432 0 :
433 0 : let sender = InterpretedWalSender {
434 0 : format,
435 0 : compression,
436 0 : pgb,
437 0 : wal_stream_builder,
438 0 : end_watch_view,
439 0 : shard: self.shard.unwrap(),
440 0 : pg_version,
441 0 : appname,
442 0 : };
443 0 :
444 0 : Either::Right(sender.run())
445 : }
446 : };
447 :
448 0 : let tli_cancel = tli.cancel.clone();
449 0 :
450 0 : let mut reply_reader = ReplyReader {
451 0 : reader,
452 0 : ws_guard: ws_guard.clone(),
453 0 : tli,
454 0 : };
455 :
456 0 : let res = tokio::select! {
457 : // todo: add read|write .context to these errors
458 0 : r = send_fut => r,
459 0 : r = reply_reader.run() => r,
460 0 : _ = tli_cancel.cancelled() => {
461 0 : return Err(CopyStreamHandlerEnd::Cancelled);
462 : }
463 : };
464 :
465 0 : let ws_state = ws_guard
466 0 : .walsenders
467 0 : .mutex
468 0 : .lock()
469 0 : .get_slot(ws_guard.id)
470 0 : .clone();
471 0 : info!(
472 0 : "finished streaming to {}, feedback={:?}",
473 : ws_state.addr, ws_state.feedback,
474 : );
475 :
476 : // Join pg backend back.
477 0 : pgb.unsplit(reply_reader.reader)?;
478 :
479 0 : res
480 0 : }
481 : }
482 :
483 : /// TODO(vlad): maybe lift this instead
484 : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
485 : /// given term (recovery by walproposer or peer safekeeper).
486 : #[derive(Clone)]
487 : pub(crate) enum EndWatch {
488 : Commit(Receiver<Lsn>),
489 : Flush(Receiver<TermLsn>),
490 : }
491 :
492 : impl EndWatch {
493 0 : pub(crate) fn view(&self) -> EndWatchView {
494 0 : EndWatchView(self.clone())
495 0 : }
496 :
497 : /// Get current end of WAL.
498 0 : pub(crate) fn get(&self) -> Lsn {
499 0 : match self {
500 0 : EndWatch::Commit(r) => *r.borrow(),
501 0 : EndWatch::Flush(r) => r.borrow().lsn,
502 : }
503 0 : }
504 :
505 : /// Wait for the update.
506 0 : pub(crate) async fn changed(&mut self) -> anyhow::Result<()> {
507 0 : match self {
508 0 : EndWatch::Commit(r) => r.changed().await?,
509 0 : EndWatch::Flush(r) => r.changed().await?,
510 : }
511 0 : Ok(())
512 0 : }
513 :
514 0 : pub(crate) async fn wait_for_lsn(
515 0 : &mut self,
516 0 : lsn: Lsn,
517 0 : client_term: Option<Term>,
518 0 : ) -> anyhow::Result<Lsn> {
519 : loop {
520 0 : let end_pos = self.get();
521 0 : if end_pos > lsn {
522 0 : return Ok(end_pos);
523 0 : }
524 0 : if let EndWatch::Flush(rx) = &self {
525 0 : let curr_term = rx.borrow().term;
526 0 : if let Some(client_term) = client_term {
527 0 : if curr_term != client_term {
528 0 : bail!("term changed: requested {}, now {}", client_term, curr_term);
529 0 : }
530 0 : }
531 0 : }
532 0 : self.changed().await?;
533 : }
534 0 : }
535 : }
536 :
537 : pub(crate) struct EndWatchView(EndWatch);
538 :
539 : impl EndWatchView {
540 0 : pub(crate) fn get(&self) -> Lsn {
541 0 : self.0.get()
542 0 : }
543 : }
544 : /// A half driving sending WAL.
545 : struct WalSender<'a, IO> {
546 : pgb: &'a mut PostgresBackend<IO>,
547 : tli: WalResidentTimeline,
548 : appname: Option<String>,
549 : // Position since which we are sending next chunk.
550 : start_pos: Lsn,
551 : // WAL up to this position is known to be locally available.
552 : // Usually this is the same as the latest commit_lsn, but in case of
553 : // walproposer recovery, this is flush_lsn.
554 : //
555 : // We send this LSN to the receiver as wal_end, so that it knows how much
556 : // WAL this safekeeper has. This LSN should be as fresh as possible.
557 : end_pos: Lsn,
558 : /// When streaming uncommitted part, the term the client acts as the leader
559 : /// in. Streaming is stopped if local term changes to a different (higher)
560 : /// value.
561 : term: Option<Term>,
562 : /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
563 : end_watch: EndWatch,
564 : ws_guard: Arc<WalSenderGuard>,
565 : wal_reader: WalReader,
566 : // buffer for readling WAL into to send it
567 : send_buf: Vec<u8>,
568 : }
569 :
570 : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
571 :
572 : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
573 : /// Send WAL until
574 : /// - an error occurs
575 : /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
576 : /// - timeline's cancellation token fires
577 : ///
578 : /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
579 : /// convenience.
580 0 : async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
581 : loop {
582 : // Wait for the next portion if it is not there yet, or just
583 : // update our end of WAL available for sending value, we
584 : // communicate it to the receiver.
585 0 : self.wait_wal().await?;
586 0 : assert!(
587 0 : self.end_pos > self.start_pos,
588 0 : "nothing to send after waiting for WAL"
589 : );
590 :
591 : // try to send as much as available, capped by MAX_SEND_SIZE
592 0 : let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
593 0 : // if we went behind available WAL, back off
594 0 : if chunk_end_pos >= self.end_pos {
595 0 : chunk_end_pos = self.end_pos;
596 0 : } else {
597 0 : // If sending not up to end pos, round down to page boundary to
598 0 : // avoid breaking WAL record not at page boundary, as protocol
599 0 : // demands. See walsender.c (XLogSendPhysical).
600 0 : chunk_end_pos = chunk_end_pos
601 0 : .checked_sub(chunk_end_pos.block_offset())
602 0 : .unwrap();
603 0 : }
604 0 : let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
605 0 : let send_buf = &mut self.send_buf[..send_size];
606 : let send_size: usize;
607 : {
608 : // If uncommitted part is being pulled, check that the term is
609 : // still the expected one.
610 0 : let _term_guard = if let Some(t) = self.term {
611 0 : Some(self.tli.acquire_term(t).await?)
612 : } else {
613 0 : None
614 : };
615 : // Read WAL into buffer. send_size can be additionally capped to
616 : // segment boundary here.
617 0 : send_size = self.wal_reader.read(send_buf).await?
618 : };
619 0 : let send_buf = &send_buf[..send_size];
620 0 :
621 0 : // and send it, while respecting Timeline::cancel
622 0 : let msg = BeMessage::XLogData(XLogDataBody {
623 0 : wal_start: self.start_pos.0,
624 0 : wal_end: self.end_pos.0,
625 0 : timestamp: get_current_timestamp(),
626 0 : data: send_buf,
627 0 : });
628 0 : self.pgb.write_message(&msg).await?;
629 :
630 0 : if let Some(appname) = &self.appname {
631 0 : if appname == "replica" {
632 0 : failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
633 0 : }
634 0 : }
635 0 : trace!(
636 0 : "sent {} bytes of WAL {}-{}",
637 0 : send_size,
638 0 : self.start_pos,
639 0 : self.start_pos + send_size as u64
640 : );
641 0 : self.start_pos += send_size as u64;
642 : }
643 0 : }
644 :
645 : /// wait until we have WAL to stream, sending keepalives and checking for
646 : /// exit in the meanwhile
647 0 : async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
648 : loop {
649 0 : self.end_pos = self.end_watch.get();
650 0 : let have_something_to_send = (|| {
651 0 : fail::fail_point!(
652 0 : "sk-pause-send",
653 0 : self.appname.as_deref() != Some("pageserver"),
654 0 : |_| { false }
655 0 : );
656 0 : self.end_pos > self.start_pos
657 0 : })();
658 0 :
659 0 : if have_something_to_send {
660 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
661 0 : return Ok(());
662 0 : }
663 :
664 : // Wait for WAL to appear, now self.end_pos == self.start_pos.
665 0 : if let Some(lsn) = self.wait_for_lsn().await? {
666 0 : self.end_pos = lsn;
667 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
668 0 : return Ok(());
669 0 : }
670 0 :
671 0 : // Timed out waiting for WAL, check for termination and send KA.
672 0 : // Check for termination only if we are streaming up to commit_lsn
673 0 : // (to pageserver).
674 0 : if let EndWatch::Commit(_) = self.end_watch {
675 0 : if let Some(remote_consistent_lsn) = self
676 0 : .ws_guard
677 0 : .walsenders
678 0 : .get_ws_remote_consistent_lsn(self.ws_guard.id)
679 : {
680 0 : if self.tli.should_walsender_stop(remote_consistent_lsn).await {
681 : // Terminate if there is nothing more to send.
682 : // Note that "ending streaming" part of the string is used by
683 : // pageserver to identify WalReceiverError::SuccessfulCompletion,
684 : // do not change this string without updating pageserver.
685 0 : return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
686 0 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
687 0 : self.appname, self.start_pos,
688 0 : )));
689 0 : }
690 0 : }
691 0 : }
692 :
693 0 : let msg = BeMessage::KeepAlive(WalSndKeepAlive {
694 0 : wal_end: self.end_pos.0,
695 0 : timestamp: get_current_timestamp(),
696 0 : request_reply: true,
697 0 : });
698 0 :
699 0 : self.pgb.write_message(&msg).await?;
700 : }
701 0 : }
702 :
703 : /// Wait until we have available WAL > start_pos or timeout expires. Returns
704 : /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
705 : /// - Ok(None) if timeout expired;
706 : /// - Err in case of error -- only if 1) term changed while fetching in recovery
707 : /// mode 2) watch channel closed, which must never happen.
708 0 : async fn wait_for_lsn(&mut self) -> anyhow::Result<Option<Lsn>> {
709 0 : let fp = (|| {
710 0 : fail::fail_point!(
711 0 : "sk-pause-send",
712 0 : self.appname.as_deref() != Some("pageserver"),
713 0 : |_| { true }
714 0 : );
715 0 : false
716 0 : })();
717 0 : if fp {
718 0 : tokio::time::sleep(POLL_STATE_TIMEOUT).await;
719 0 : return Ok(None);
720 0 : }
721 :
722 0 : let res = timeout(POLL_STATE_TIMEOUT, async move {
723 : loop {
724 0 : let end_pos = self.end_watch.get();
725 0 : if end_pos > self.start_pos {
726 0 : return Ok(end_pos);
727 0 : }
728 0 : if let EndWatch::Flush(rx) = &self.end_watch {
729 0 : let curr_term = rx.borrow().term;
730 0 : if let Some(client_term) = self.term {
731 0 : if curr_term != client_term {
732 0 : bail!("term changed: requested {}, now {}", client_term, curr_term);
733 0 : }
734 0 : }
735 0 : }
736 0 : self.end_watch.changed().await?;
737 : }
738 0 : })
739 0 : .await;
740 :
741 0 : match res {
742 : // success
743 0 : Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
744 : // error inside closure
745 0 : Ok(Err(err)) => Err(err),
746 : // timeout
747 0 : Err(_) => Ok(None),
748 : }
749 0 : }
750 : }
751 :
752 : /// A half driving receiving replies.
753 : struct ReplyReader<IO> {
754 : reader: PostgresBackendReader<IO>,
755 : ws_guard: Arc<WalSenderGuard>,
756 : tli: WalResidentTimeline,
757 : }
758 :
759 : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
760 0 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
761 : loop {
762 0 : let msg = self.reader.read_copy_message().await?;
763 0 : self.handle_feedback(&msg).await?
764 : }
765 0 : }
766 :
767 0 : async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
768 0 : match msg.first().cloned() {
769 : Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
770 : // Note: deserializing is on m[1..] because we skip the tag byte.
771 0 : let mut hs_feedback = HotStandbyFeedback::des(&msg[1..])
772 0 : .context("failed to deserialize HotStandbyFeedback")?;
773 : // TODO: xmin/catalog_xmin are serialized by walreceiver.c in this way:
774 : // pq_sendint32(&reply_message, xmin);
775 : // pq_sendint32(&reply_message, xmin_epoch);
776 : // So it is two big endian 32-bit words in low endian order!
777 0 : hs_feedback.xmin = hs_feedback.xmin.rotate_left(32);
778 0 : hs_feedback.catalog_xmin = hs_feedback.catalog_xmin.rotate_left(32);
779 0 : self.ws_guard
780 0 : .walsenders
781 0 : .record_hs_feedback(self.ws_guard.id, &hs_feedback);
782 : }
783 : Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
784 0 : let reply =
785 0 : StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
786 0 : self.ws_guard
787 0 : .walsenders
788 0 : .record_standby_reply(self.ws_guard.id, &reply);
789 : }
790 : Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
791 : // pageserver sends this.
792 : // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
793 0 : let buf = Bytes::copy_from_slice(&msg[9..]);
794 0 : let ps_feedback = PageserverFeedback::parse(buf);
795 0 :
796 0 : trace!("PageserverFeedback is {:?}", ps_feedback);
797 0 : self.ws_guard
798 0 : .walsenders
799 0 : .record_ps_feedback(self.ws_guard.id, &ps_feedback);
800 0 : self.tli
801 0 : .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
802 0 : .await;
803 : // in principle new remote_consistent_lsn could allow to
804 : // deactivate the timeline, but we check that regularly through
805 : // broker updated, not need to do it here
806 : }
807 0 : _ => warn!("unexpected message {:?}", msg),
808 : }
809 0 : Ok(())
810 0 : }
811 : }
812 :
813 : #[cfg(test)]
814 : mod tests {
815 : use safekeeper_api::models::FullTransactionId;
816 : use utils::id::{TenantId, TimelineId};
817 :
818 : use super::*;
819 :
820 4 : fn mock_ttid() -> TenantTimelineId {
821 4 : TenantTimelineId {
822 4 : tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
823 4 : timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
824 4 : }
825 4 : }
826 :
827 4 : fn mock_addr() -> SocketAddr {
828 4 : "127.0.0.1:8080".parse().unwrap()
829 4 : }
830 :
831 : // add to wss specified feedback setting other fields to dummy values
832 4 : fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
833 4 : let walsender_state = WalSenderState {
834 4 : ttid: mock_ttid(),
835 4 : addr: mock_addr(),
836 4 : conn_id: 1,
837 4 : appname: None,
838 4 : feedback,
839 4 : };
840 4 : wss.slots.push(Some(walsender_state))
841 4 : }
842 :
843 : // form standby feedback with given hot standby feedback ts/xmin and the
844 : // rest set to dummy values.
845 4 : fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
846 4 : ReplicationFeedback::Standby(StandbyFeedback {
847 4 : reply: StandbyReply::empty(),
848 4 : hs_feedback: HotStandbyFeedback {
849 4 : ts,
850 4 : xmin,
851 4 : catalog_xmin: 0,
852 4 : },
853 4 : })
854 4 : }
855 :
856 : // test that hs aggregation works as expected
857 : #[test]
858 1 : fn test_hs_feedback_no_valid() {
859 1 : let mut wss = WalSendersShared::new();
860 1 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
861 1 : wss.update_reply_feedback();
862 1 : assert_eq!(
863 1 : wss.agg_standby_feedback.hs_feedback.xmin,
864 1 : INVALID_FULL_TRANSACTION_ID
865 1 : );
866 1 : }
867 :
868 : #[test]
869 1 : fn test_hs_feedback() {
870 1 : let mut wss = WalSendersShared::new();
871 1 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
872 1 : push_feedback(&mut wss, hs_feedback(1, 42));
873 1 : push_feedback(&mut wss, hs_feedback(1, 64));
874 1 : wss.update_reply_feedback();
875 1 : assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42);
876 1 : }
877 : }
|