Line data Source code
1 : //! This module implements the streaming side of replication protocol, starting
2 : //! with the "START_REPLICATION" message, and registry of walsenders.
3 :
4 : use crate::handler::SafekeeperPostgresHandler;
5 : use crate::metrics::RECEIVED_PS_FEEDBACKS;
6 : use crate::receive_wal::WalReceivers;
7 : use crate::safekeeper::{Term, TermLsn};
8 : use crate::send_interpreted_wal::InterpretedWalSender;
9 : use crate::timeline::WalResidentTimeline;
10 : use crate::wal_reader_stream::WalReaderStreamBuilder;
11 : use crate::wal_service::ConnectionId;
12 : use crate::wal_storage::WalReader;
13 : use anyhow::{bail, Context as AnyhowContext};
14 : use bytes::Bytes;
15 : use futures::future::Either;
16 : use parking_lot::Mutex;
17 : use postgres_backend::PostgresBackend;
18 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackendReader, QueryError};
19 : use postgres_ffi::get_current_timestamp;
20 : use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
21 : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
22 : use serde::{Deserialize, Serialize};
23 : use tokio::io::{AsyncRead, AsyncWrite};
24 : use utils::failpoint_support;
25 : use utils::id::TenantTimelineId;
26 : use utils::pageserver_feedback::PageserverFeedback;
27 : use utils::postgres_client::PostgresClientProtocol;
28 :
29 : use std::cmp::{max, min};
30 : use std::net::SocketAddr;
31 : use std::str;
32 : use std::sync::Arc;
33 : use std::time::Duration;
34 : use tokio::sync::watch::Receiver;
35 : use tokio::time::timeout;
36 : use tracing::*;
37 : use utils::{bin_ser::BeSer, lsn::Lsn};
38 :
39 : // See: https://www.postgresql.org/docs/13/protocol-replication.html
40 : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
41 : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
42 : // neon extension of replication protocol
43 : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
44 :
45 : type FullTransactionId = u64;
46 :
47 : /// Hot standby feedback received from replica
48 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
49 : pub struct HotStandbyFeedback {
50 : pub ts: TimestampTz,
51 : pub xmin: FullTransactionId,
52 : pub catalog_xmin: FullTransactionId,
53 : }
54 :
55 : const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
56 :
57 : impl HotStandbyFeedback {
58 3038 : pub fn empty() -> HotStandbyFeedback {
59 3038 : HotStandbyFeedback {
60 3038 : ts: 0,
61 3038 : xmin: 0,
62 3038 : catalog_xmin: 0,
63 3038 : }
64 3038 : }
65 : }
66 :
67 : /// Standby status update
68 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
69 : pub struct StandbyReply {
70 : pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
71 : pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
72 : pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
73 : pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
74 : pub reply_requested: bool,
75 : }
76 :
77 : impl StandbyReply {
78 8 : fn empty() -> Self {
79 8 : StandbyReply {
80 8 : write_lsn: Lsn::INVALID,
81 8 : flush_lsn: Lsn::INVALID,
82 8 : apply_lsn: Lsn::INVALID,
83 8 : reply_ts: 0,
84 8 : reply_requested: false,
85 8 : }
86 8 : }
87 : }
88 :
89 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
90 : pub struct StandbyFeedback {
91 : pub reply: StandbyReply,
92 : pub hs_feedback: HotStandbyFeedback,
93 : }
94 :
95 : impl StandbyFeedback {
96 2 : pub fn empty() -> Self {
97 2 : StandbyFeedback {
98 2 : reply: StandbyReply::empty(),
99 2 : hs_feedback: HotStandbyFeedback::empty(),
100 2 : }
101 2 : }
102 : }
103 :
104 : /// WalSenders registry. Timeline holds it (wrapped in Arc).
105 : pub struct WalSenders {
106 : mutex: Mutex<WalSendersShared>,
107 : walreceivers: Arc<WalReceivers>,
108 : }
109 :
110 : impl WalSenders {
111 0 : pub fn new(walreceivers: Arc<WalReceivers>) -> Arc<WalSenders> {
112 0 : Arc::new(WalSenders {
113 0 : mutex: Mutex::new(WalSendersShared::new()),
114 0 : walreceivers,
115 0 : })
116 0 : }
117 :
118 : /// Register new walsender. Returned guard provides access to the slot and
119 : /// automatically deregisters in Drop.
120 0 : fn register(
121 0 : self: &Arc<WalSenders>,
122 0 : ttid: TenantTimelineId,
123 0 : addr: SocketAddr,
124 0 : conn_id: ConnectionId,
125 0 : appname: Option<String>,
126 0 : ) -> WalSenderGuard {
127 0 : let slots = &mut self.mutex.lock().slots;
128 0 : let walsender_state = WalSenderState {
129 0 : ttid,
130 0 : addr,
131 0 : conn_id,
132 0 : appname,
133 0 : feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
134 0 : };
135 : // find empty slot or create new one
136 0 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
137 0 : slots[pos] = Some(walsender_state);
138 0 : pos
139 : } else {
140 0 : let pos = slots.len();
141 0 : slots.push(Some(walsender_state));
142 0 : pos
143 : };
144 0 : WalSenderGuard {
145 0 : id: pos,
146 0 : walsenders: self.clone(),
147 0 : }
148 0 : }
149 :
150 : /// Get state of all walsenders.
151 0 : pub fn get_all(self: &Arc<WalSenders>) -> Vec<WalSenderState> {
152 0 : self.mutex.lock().slots.iter().flatten().cloned().collect()
153 0 : }
154 :
155 : /// Get LSN of the most lagging pageserver receiver. Return None if there are no
156 : /// active walsenders.
157 0 : pub fn laggard_lsn(self: &Arc<WalSenders>) -> Option<Lsn> {
158 0 : self.mutex
159 0 : .lock()
160 0 : .slots
161 0 : .iter()
162 0 : .flatten()
163 0 : .filter_map(|s| match s.feedback {
164 0 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.last_received_lsn),
165 0 : ReplicationFeedback::Standby(_) => None,
166 0 : })
167 0 : .min()
168 0 : }
169 :
170 : /// Returns total counter of pageserver feedbacks received and last feedback.
171 0 : pub fn get_ps_feedback_stats(self: &Arc<WalSenders>) -> (u64, PageserverFeedback) {
172 0 : let shared = self.mutex.lock();
173 0 : (shared.ps_feedback_counter, shared.last_ps_feedback)
174 0 : }
175 :
176 : /// Get aggregated hot standby feedback (we send it to compute).
177 0 : pub fn get_hotstandby(self: &Arc<WalSenders>) -> StandbyFeedback {
178 0 : self.mutex.lock().agg_standby_feedback
179 0 : }
180 :
181 : /// Record new pageserver feedback, update aggregated values.
182 0 : fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
183 0 : let mut shared = self.mutex.lock();
184 0 : shared.get_slot_mut(id).feedback = ReplicationFeedback::Pageserver(*feedback);
185 0 : shared.last_ps_feedback = *feedback;
186 0 : shared.ps_feedback_counter += 1;
187 0 : drop(shared);
188 0 :
189 0 : RECEIVED_PS_FEEDBACKS.inc();
190 0 :
191 0 : // send feedback to connected walproposers
192 0 : self.walreceivers.broadcast_pageserver_feedback(*feedback);
193 0 : }
194 :
195 : /// Record standby reply.
196 0 : fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
197 0 : let mut shared = self.mutex.lock();
198 0 : let slot = shared.get_slot_mut(id);
199 0 : debug!(
200 0 : "Record standby reply: ts={} apply_lsn={}",
201 : reply.reply_ts, reply.apply_lsn
202 : );
203 0 : match &mut slot.feedback {
204 0 : ReplicationFeedback::Standby(sf) => sf.reply = *reply,
205 : ReplicationFeedback::Pageserver(_) => {
206 0 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
207 0 : reply: *reply,
208 0 : hs_feedback: HotStandbyFeedback::empty(),
209 0 : })
210 : }
211 : }
212 0 : }
213 :
214 : /// Record hot standby feedback, update aggregated value.
215 0 : fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
216 0 : let mut shared = self.mutex.lock();
217 0 : let slot = shared.get_slot_mut(id);
218 0 : match &mut slot.feedback {
219 0 : ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
220 : ReplicationFeedback::Pageserver(_) => {
221 0 : slot.feedback = ReplicationFeedback::Standby(StandbyFeedback {
222 0 : reply: StandbyReply::empty(),
223 0 : hs_feedback: *feedback,
224 0 : })
225 : }
226 : }
227 0 : shared.update_reply_feedback();
228 0 : }
229 :
230 : /// Get remote_consistent_lsn reported by the pageserver. Returns None if
231 : /// client is not pageserver.
232 0 : pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
233 0 : let shared = self.mutex.lock();
234 0 : let slot = shared.get_slot(id);
235 0 : match slot.feedback {
236 0 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
237 0 : _ => None,
238 : }
239 0 : }
240 :
241 : /// Unregister walsender.
242 0 : fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
243 0 : let mut shared = self.mutex.lock();
244 0 : shared.slots[id] = None;
245 0 : shared.update_reply_feedback();
246 0 : }
247 : }
248 :
249 : struct WalSendersShared {
250 : // aggregated over all walsenders value
251 : agg_standby_feedback: StandbyFeedback,
252 : // last feedback ever received from any pageserver, empty if none
253 : last_ps_feedback: PageserverFeedback,
254 : // total counter of pageserver feedbacks received
255 : ps_feedback_counter: u64,
256 : slots: Vec<Option<WalSenderState>>,
257 : }
258 :
259 : impl WalSendersShared {
260 2 : fn new() -> Self {
261 2 : WalSendersShared {
262 2 : agg_standby_feedback: StandbyFeedback::empty(),
263 2 : last_ps_feedback: PageserverFeedback::empty(),
264 2 : ps_feedback_counter: 0,
265 2 : slots: Vec::new(),
266 2 : }
267 2 : }
268 :
269 : /// Get content of provided id slot, it must exist.
270 0 : fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
271 0 : self.slots[id].as_ref().expect("walsender doesn't exist")
272 0 : }
273 :
274 : /// Get mut content of provided id slot, it must exist.
275 0 : fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
276 0 : self.slots[id].as_mut().expect("walsender doesn't exist")
277 0 : }
278 :
279 : /// Update aggregated hot standy and normal reply feedbacks. We just take min of valid xmins
280 : /// and ts.
281 2 : fn update_reply_feedback(&mut self) {
282 2 : let mut agg = HotStandbyFeedback::empty();
283 2 : let mut reply_agg = StandbyReply::empty();
284 4 : for ws_state in self.slots.iter().flatten() {
285 4 : if let ReplicationFeedback::Standby(standby_feedback) = ws_state.feedback {
286 4 : let hs_feedback = standby_feedback.hs_feedback;
287 4 : // doing Option math like op1.iter().chain(op2.iter()).min()
288 4 : // would be nicer, but we serialize/deserialize this struct
289 4 : // directly, so leave as is for now
290 4 : if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
291 2 : if agg.xmin != INVALID_FULL_TRANSACTION_ID {
292 1 : agg.xmin = min(agg.xmin, hs_feedback.xmin);
293 1 : } else {
294 1 : agg.xmin = hs_feedback.xmin;
295 1 : }
296 2 : agg.ts = max(agg.ts, hs_feedback.ts);
297 2 : }
298 4 : if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
299 0 : if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
300 0 : agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
301 0 : } else {
302 0 : agg.catalog_xmin = hs_feedback.catalog_xmin;
303 0 : }
304 0 : agg.ts = max(agg.ts, hs_feedback.ts);
305 4 : }
306 4 : let reply = standby_feedback.reply;
307 4 : if reply.write_lsn != Lsn::INVALID {
308 0 : if reply_agg.write_lsn != Lsn::INVALID {
309 0 : reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn);
310 0 : } else {
311 0 : reply_agg.write_lsn = reply.write_lsn;
312 0 : }
313 4 : }
314 4 : if reply.flush_lsn != Lsn::INVALID {
315 0 : if reply_agg.flush_lsn != Lsn::INVALID {
316 0 : reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn);
317 0 : } else {
318 0 : reply_agg.flush_lsn = reply.flush_lsn;
319 0 : }
320 4 : }
321 4 : if reply.apply_lsn != Lsn::INVALID {
322 0 : if reply_agg.apply_lsn != Lsn::INVALID {
323 0 : reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn);
324 0 : } else {
325 0 : reply_agg.apply_lsn = reply.apply_lsn;
326 0 : }
327 4 : }
328 4 : if reply.reply_ts != 0 {
329 0 : if reply_agg.reply_ts != 0 {
330 0 : reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts);
331 0 : } else {
332 0 : reply_agg.reply_ts = reply.reply_ts;
333 0 : }
334 4 : }
335 0 : }
336 : }
337 2 : self.agg_standby_feedback = StandbyFeedback {
338 2 : reply: reply_agg,
339 2 : hs_feedback: agg,
340 2 : };
341 2 : }
342 : }
343 :
344 : // Serialized is used only for pretty printing in json.
345 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
346 : pub struct WalSenderState {
347 : ttid: TenantTimelineId,
348 : addr: SocketAddr,
349 : conn_id: ConnectionId,
350 : // postgres application_name
351 : appname: Option<String>,
352 : feedback: ReplicationFeedback,
353 : }
354 :
355 : // Receiver is either pageserver or regular standby, which have different
356 : // feedbacks.
357 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
358 : enum ReplicationFeedback {
359 : Pageserver(PageserverFeedback),
360 : Standby(StandbyFeedback),
361 : }
362 :
363 : // id of the occupied slot in WalSenders to access it (and save in the
364 : // WalSenderGuard). We could give Arc directly to the slot, but there is not
365 : // much sense in that as values aggregation which is performed on each feedback
366 : // receival iterates over all walsenders.
367 : pub type WalSenderId = usize;
368 :
369 : /// Scope guard to access slot in WalSenders registry and unregister from it in
370 : /// Drop.
371 : pub struct WalSenderGuard {
372 : id: WalSenderId,
373 : walsenders: Arc<WalSenders>,
374 : }
375 :
376 : impl WalSenderGuard {
377 0 : pub fn id(&self) -> WalSenderId {
378 0 : self.id
379 0 : }
380 :
381 0 : pub fn walsenders(&self) -> &Arc<WalSenders> {
382 0 : &self.walsenders
383 0 : }
384 : }
385 :
386 : impl Drop for WalSenderGuard {
387 0 : fn drop(&mut self) {
388 0 : self.walsenders.unregister(self.id);
389 0 : }
390 : }
391 :
392 : impl SafekeeperPostgresHandler {
393 : /// Wrapper around handle_start_replication_guts handling result. Error is
394 : /// handled here while we're still in walsender ttid span; with API
395 : /// extension, this can probably be moved into postgres_backend.
396 0 : pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin>(
397 0 : &mut self,
398 0 : pgb: &mut PostgresBackend<IO>,
399 0 : start_pos: Lsn,
400 0 : term: Option<Term>,
401 0 : ) -> Result<(), QueryError> {
402 0 : let tli = self
403 0 : .global_timelines
404 0 : .get(self.ttid)
405 0 : .map_err(|e| QueryError::Other(e.into()))?;
406 0 : let residence_guard = tli.wal_residence_guard().await?;
407 :
408 0 : if let Err(end) = self
409 0 : .handle_start_replication_guts(pgb, start_pos, term, residence_guard)
410 0 : .await
411 : {
412 0 : let info = tli.get_safekeeper_info(&self.conf).await;
413 : // Log the result and probably send it to the client, closing the stream.
414 0 : pgb.handle_copy_stream_end(end)
415 0 : .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.flush_lsn)))
416 0 : .await;
417 0 : }
418 0 : Ok(())
419 0 : }
420 :
421 0 : pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin>(
422 0 : &mut self,
423 0 : pgb: &mut PostgresBackend<IO>,
424 0 : start_pos: Lsn,
425 0 : term: Option<Term>,
426 0 : tli: WalResidentTimeline,
427 0 : ) -> Result<(), CopyStreamHandlerEnd> {
428 0 : let appname = self.appname.clone();
429 0 :
430 0 : // Use a guard object to remove our entry from the timeline when we are done.
431 0 : let ws_guard = Arc::new(tli.get_walsenders().register(
432 0 : self.ttid,
433 0 : *pgb.get_peer_addr(),
434 0 : self.conn_id,
435 0 : self.appname.clone(),
436 0 : ));
437 :
438 : // Walsender can operate in one of two modes which we select by
439 : // application_name: give only committed WAL (used by pageserver) or all
440 : // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
441 : // The second case is always driven by a consensus leader which term
442 : // must be supplied.
443 0 : let end_watch = if term.is_some() {
444 0 : EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
445 : } else {
446 0 : EndWatch::Commit(tli.get_commit_lsn_watch_rx())
447 : };
448 : // we don't check term here; it will be checked on first waiting/WAL reading anyway.
449 0 : let end_pos = end_watch.get();
450 0 :
451 0 : if end_pos < start_pos {
452 0 : warn!(
453 0 : "requested start_pos {} is ahead of available WAL end_pos {}",
454 : start_pos, end_pos
455 : );
456 0 : }
457 :
458 0 : info!(
459 0 : "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={:?}",
460 : start_pos,
461 : end_pos,
462 0 : matches!(end_watch, EndWatch::Flush(_)),
463 : appname,
464 0 : self.protocol(),
465 : );
466 :
467 : // switch to copy
468 0 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
469 :
470 0 : let wal_reader = tli.get_walreader(start_pos).await?;
471 :
472 : // Split to concurrently receive and send data; replies are generally
473 : // not synchronized with sends, so this avoids deadlocks.
474 0 : let reader = pgb.split().context("START_REPLICATION split")?;
475 :
476 0 : let send_fut = match self.protocol() {
477 : PostgresClientProtocol::Vanilla => {
478 0 : let sender = WalSender {
479 0 : pgb,
480 0 : // should succeed since we're already holding another guard
481 0 : tli: tli.wal_residence_guard().await?,
482 0 : appname,
483 0 : start_pos,
484 0 : end_pos,
485 0 : term,
486 0 : end_watch,
487 0 : ws_guard: ws_guard.clone(),
488 0 : wal_reader,
489 0 : send_buf: vec![0u8; MAX_SEND_SIZE],
490 0 : };
491 0 :
492 0 : Either::Left(sender.run())
493 : }
494 : PostgresClientProtocol::Interpreted {
495 0 : format,
496 0 : compression,
497 : } => {
498 0 : let pg_version = tli.tli.get_state().await.1.server.pg_version / 10000;
499 0 : let end_watch_view = end_watch.view();
500 0 : let wal_stream_builder = WalReaderStreamBuilder {
501 0 : tli: tli.wal_residence_guard().await?,
502 0 : start_pos,
503 0 : end_pos,
504 0 : term,
505 0 : end_watch,
506 0 : wal_sender_guard: ws_guard.clone(),
507 0 : };
508 0 :
509 0 : let sender = InterpretedWalSender {
510 0 : format,
511 0 : compression,
512 0 : pgb,
513 0 : wal_stream_builder,
514 0 : end_watch_view,
515 0 : shard: self.shard.unwrap(),
516 0 : pg_version,
517 0 : appname,
518 0 : };
519 0 :
520 0 : Either::Right(sender.run())
521 : }
522 : };
523 :
524 0 : let tli_cancel = tli.cancel.clone();
525 0 :
526 0 : let mut reply_reader = ReplyReader {
527 0 : reader,
528 0 : ws_guard: ws_guard.clone(),
529 0 : tli,
530 0 : };
531 :
532 0 : let res = tokio::select! {
533 : // todo: add read|write .context to these errors
534 0 : r = send_fut => r,
535 0 : r = reply_reader.run() => r,
536 0 : _ = tli_cancel.cancelled() => {
537 0 : return Err(CopyStreamHandlerEnd::Cancelled);
538 : }
539 : };
540 :
541 0 : let ws_state = ws_guard
542 0 : .walsenders
543 0 : .mutex
544 0 : .lock()
545 0 : .get_slot(ws_guard.id)
546 0 : .clone();
547 0 : info!(
548 0 : "finished streaming to {}, feedback={:?}",
549 : ws_state.addr, ws_state.feedback,
550 : );
551 :
552 : // Join pg backend back.
553 0 : pgb.unsplit(reply_reader.reader)?;
554 :
555 0 : res
556 0 : }
557 : }
558 :
559 : /// TODO(vlad): maybe lift this instead
560 : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
561 : /// given term (recovery by walproposer or peer safekeeper).
562 : #[derive(Clone)]
563 : pub(crate) enum EndWatch {
564 : Commit(Receiver<Lsn>),
565 : Flush(Receiver<TermLsn>),
566 : }
567 :
568 : impl EndWatch {
569 0 : pub(crate) fn view(&self) -> EndWatchView {
570 0 : EndWatchView(self.clone())
571 0 : }
572 :
573 : /// Get current end of WAL.
574 0 : pub(crate) fn get(&self) -> Lsn {
575 0 : match self {
576 0 : EndWatch::Commit(r) => *r.borrow(),
577 0 : EndWatch::Flush(r) => r.borrow().lsn,
578 : }
579 0 : }
580 :
581 : /// Wait for the update.
582 0 : pub(crate) async fn changed(&mut self) -> anyhow::Result<()> {
583 0 : match self {
584 0 : EndWatch::Commit(r) => r.changed().await?,
585 0 : EndWatch::Flush(r) => r.changed().await?,
586 : }
587 0 : Ok(())
588 0 : }
589 :
590 0 : pub(crate) async fn wait_for_lsn(
591 0 : &mut self,
592 0 : lsn: Lsn,
593 0 : client_term: Option<Term>,
594 0 : ) -> anyhow::Result<Lsn> {
595 : loop {
596 0 : let end_pos = self.get();
597 0 : if end_pos > lsn {
598 0 : return Ok(end_pos);
599 0 : }
600 0 : if let EndWatch::Flush(rx) = &self {
601 0 : let curr_term = rx.borrow().term;
602 0 : if let Some(client_term) = client_term {
603 0 : if curr_term != client_term {
604 0 : bail!("term changed: requested {}, now {}", client_term, curr_term);
605 0 : }
606 0 : }
607 0 : }
608 0 : self.changed().await?;
609 : }
610 0 : }
611 : }
612 :
613 : pub(crate) struct EndWatchView(EndWatch);
614 :
615 : impl EndWatchView {
616 0 : pub(crate) fn get(&self) -> Lsn {
617 0 : self.0.get()
618 0 : }
619 : }
620 : /// A half driving sending WAL.
621 : struct WalSender<'a, IO> {
622 : pgb: &'a mut PostgresBackend<IO>,
623 : tli: WalResidentTimeline,
624 : appname: Option<String>,
625 : // Position since which we are sending next chunk.
626 : start_pos: Lsn,
627 : // WAL up to this position is known to be locally available.
628 : // Usually this is the same as the latest commit_lsn, but in case of
629 : // walproposer recovery, this is flush_lsn.
630 : //
631 : // We send this LSN to the receiver as wal_end, so that it knows how much
632 : // WAL this safekeeper has. This LSN should be as fresh as possible.
633 : end_pos: Lsn,
634 : /// When streaming uncommitted part, the term the client acts as the leader
635 : /// in. Streaming is stopped if local term changes to a different (higher)
636 : /// value.
637 : term: Option<Term>,
638 : /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
639 : end_watch: EndWatch,
640 : ws_guard: Arc<WalSenderGuard>,
641 : wal_reader: WalReader,
642 : // buffer for readling WAL into to send it
643 : send_buf: Vec<u8>,
644 : }
645 :
646 : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
647 :
648 : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
649 : /// Send WAL until
650 : /// - an error occurs
651 : /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
652 : /// - timeline's cancellation token fires
653 : ///
654 : /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
655 : /// convenience.
656 0 : async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
657 : loop {
658 : // Wait for the next portion if it is not there yet, or just
659 : // update our end of WAL available for sending value, we
660 : // communicate it to the receiver.
661 0 : self.wait_wal().await?;
662 0 : assert!(
663 0 : self.end_pos > self.start_pos,
664 0 : "nothing to send after waiting for WAL"
665 : );
666 :
667 : // try to send as much as available, capped by MAX_SEND_SIZE
668 0 : let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
669 0 : // if we went behind available WAL, back off
670 0 : if chunk_end_pos >= self.end_pos {
671 0 : chunk_end_pos = self.end_pos;
672 0 : } else {
673 0 : // If sending not up to end pos, round down to page boundary to
674 0 : // avoid breaking WAL record not at page boundary, as protocol
675 0 : // demands. See walsender.c (XLogSendPhysical).
676 0 : chunk_end_pos = chunk_end_pos
677 0 : .checked_sub(chunk_end_pos.block_offset())
678 0 : .unwrap();
679 0 : }
680 0 : let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
681 0 : let send_buf = &mut self.send_buf[..send_size];
682 : let send_size: usize;
683 : {
684 : // If uncommitted part is being pulled, check that the term is
685 : // still the expected one.
686 0 : let _term_guard = if let Some(t) = self.term {
687 0 : Some(self.tli.acquire_term(t).await?)
688 : } else {
689 0 : None
690 : };
691 : // Read WAL into buffer. send_size can be additionally capped to
692 : // segment boundary here.
693 0 : send_size = self.wal_reader.read(send_buf).await?
694 : };
695 0 : let send_buf = &send_buf[..send_size];
696 0 :
697 0 : // and send it, while respecting Timeline::cancel
698 0 : let msg = BeMessage::XLogData(XLogDataBody {
699 0 : wal_start: self.start_pos.0,
700 0 : wal_end: self.end_pos.0,
701 0 : timestamp: get_current_timestamp(),
702 0 : data: send_buf,
703 0 : });
704 0 : self.pgb.write_message(&msg).await?;
705 :
706 0 : if let Some(appname) = &self.appname {
707 0 : if appname == "replica" {
708 0 : failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
709 0 : }
710 0 : }
711 0 : trace!(
712 0 : "sent {} bytes of WAL {}-{}",
713 0 : send_size,
714 0 : self.start_pos,
715 0 : self.start_pos + send_size as u64
716 : );
717 0 : self.start_pos += send_size as u64;
718 : }
719 0 : }
720 :
721 : /// wait until we have WAL to stream, sending keepalives and checking for
722 : /// exit in the meanwhile
723 0 : async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
724 : loop {
725 0 : self.end_pos = self.end_watch.get();
726 0 : let have_something_to_send = (|| {
727 0 : fail::fail_point!(
728 0 : "sk-pause-send",
729 0 : self.appname.as_deref() != Some("pageserver"),
730 0 : |_| { false }
731 0 : );
732 0 : self.end_pos > self.start_pos
733 0 : })();
734 0 :
735 0 : if have_something_to_send {
736 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
737 0 : return Ok(());
738 0 : }
739 :
740 : // Wait for WAL to appear, now self.end_pos == self.start_pos.
741 0 : if let Some(lsn) = self.wait_for_lsn().await? {
742 0 : self.end_pos = lsn;
743 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
744 0 : return Ok(());
745 0 : }
746 0 :
747 0 : // Timed out waiting for WAL, check for termination and send KA.
748 0 : // Check for termination only if we are streaming up to commit_lsn
749 0 : // (to pageserver).
750 0 : if let EndWatch::Commit(_) = self.end_watch {
751 0 : if let Some(remote_consistent_lsn) = self
752 0 : .ws_guard
753 0 : .walsenders
754 0 : .get_ws_remote_consistent_lsn(self.ws_guard.id)
755 : {
756 0 : if self.tli.should_walsender_stop(remote_consistent_lsn).await {
757 : // Terminate if there is nothing more to send.
758 : // Note that "ending streaming" part of the string is used by
759 : // pageserver to identify WalReceiverError::SuccessfulCompletion,
760 : // do not change this string without updating pageserver.
761 0 : return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
762 0 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
763 0 : self.appname, self.start_pos,
764 0 : )));
765 0 : }
766 0 : }
767 0 : }
768 :
769 0 : let msg = BeMessage::KeepAlive(WalSndKeepAlive {
770 0 : wal_end: self.end_pos.0,
771 0 : timestamp: get_current_timestamp(),
772 0 : request_reply: true,
773 0 : });
774 0 :
775 0 : self.pgb.write_message(&msg).await?;
776 : }
777 0 : }
778 :
779 : /// Wait until we have available WAL > start_pos or timeout expires. Returns
780 : /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
781 : /// - Ok(None) if timeout expired;
782 : /// - Err in case of error -- only if 1) term changed while fetching in recovery
783 : /// mode 2) watch channel closed, which must never happen.
784 0 : async fn wait_for_lsn(&mut self) -> anyhow::Result<Option<Lsn>> {
785 0 : let fp = (|| {
786 0 : fail::fail_point!(
787 0 : "sk-pause-send",
788 0 : self.appname.as_deref() != Some("pageserver"),
789 0 : |_| { true }
790 0 : );
791 0 : false
792 0 : })();
793 0 : if fp {
794 0 : tokio::time::sleep(POLL_STATE_TIMEOUT).await;
795 0 : return Ok(None);
796 0 : }
797 :
798 0 : let res = timeout(POLL_STATE_TIMEOUT, async move {
799 : loop {
800 0 : let end_pos = self.end_watch.get();
801 0 : if end_pos > self.start_pos {
802 0 : return Ok(end_pos);
803 0 : }
804 0 : if let EndWatch::Flush(rx) = &self.end_watch {
805 0 : let curr_term = rx.borrow().term;
806 0 : if let Some(client_term) = self.term {
807 0 : if curr_term != client_term {
808 0 : bail!("term changed: requested {}, now {}", client_term, curr_term);
809 0 : }
810 0 : }
811 0 : }
812 0 : self.end_watch.changed().await?;
813 : }
814 0 : })
815 0 : .await;
816 :
817 0 : match res {
818 : // success
819 0 : Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
820 : // error inside closure
821 0 : Ok(Err(err)) => Err(err),
822 : // timeout
823 0 : Err(_) => Ok(None),
824 : }
825 0 : }
826 : }
827 :
828 : /// A half driving receiving replies.
829 : struct ReplyReader<IO> {
830 : reader: PostgresBackendReader<IO>,
831 : ws_guard: Arc<WalSenderGuard>,
832 : tli: WalResidentTimeline,
833 : }
834 :
835 : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
836 0 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
837 : loop {
838 0 : let msg = self.reader.read_copy_message().await?;
839 0 : self.handle_feedback(&msg).await?
840 : }
841 0 : }
842 :
843 0 : async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
844 0 : match msg.first().cloned() {
845 : Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
846 : // Note: deserializing is on m[1..] because we skip the tag byte.
847 0 : let mut hs_feedback = HotStandbyFeedback::des(&msg[1..])
848 0 : .context("failed to deserialize HotStandbyFeedback")?;
849 : // TODO: xmin/catalog_xmin are serialized by walreceiver.c in this way:
850 : // pq_sendint32(&reply_message, xmin);
851 : // pq_sendint32(&reply_message, xmin_epoch);
852 : // So it is two big endian 32-bit words in low endian order!
853 0 : hs_feedback.xmin = hs_feedback.xmin.rotate_left(32);
854 0 : hs_feedback.catalog_xmin = hs_feedback.catalog_xmin.rotate_left(32);
855 0 : self.ws_guard
856 0 : .walsenders
857 0 : .record_hs_feedback(self.ws_guard.id, &hs_feedback);
858 : }
859 : Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
860 0 : let reply =
861 0 : StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
862 0 : self.ws_guard
863 0 : .walsenders
864 0 : .record_standby_reply(self.ws_guard.id, &reply);
865 : }
866 : Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
867 : // pageserver sends this.
868 : // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
869 0 : let buf = Bytes::copy_from_slice(&msg[9..]);
870 0 : let ps_feedback = PageserverFeedback::parse(buf);
871 0 :
872 0 : trace!("PageserverFeedback is {:?}", ps_feedback);
873 0 : self.ws_guard
874 0 : .walsenders
875 0 : .record_ps_feedback(self.ws_guard.id, &ps_feedback);
876 0 : self.tli
877 0 : .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
878 0 : .await;
879 : // in principle new remote_consistent_lsn could allow to
880 : // deactivate the timeline, but we check that regularly through
881 : // broker updated, not need to do it here
882 : }
883 0 : _ => warn!("unexpected message {:?}", msg),
884 : }
885 0 : Ok(())
886 0 : }
887 : }
888 :
889 : #[cfg(test)]
890 : mod tests {
891 : use utils::id::{TenantId, TimelineId};
892 :
893 : use super::*;
894 :
895 4 : fn mock_ttid() -> TenantTimelineId {
896 4 : TenantTimelineId {
897 4 : tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
898 4 : timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
899 4 : }
900 4 : }
901 :
902 4 : fn mock_addr() -> SocketAddr {
903 4 : "127.0.0.1:8080".parse().unwrap()
904 4 : }
905 :
906 : // add to wss specified feedback setting other fields to dummy values
907 4 : fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
908 4 : let walsender_state = WalSenderState {
909 4 : ttid: mock_ttid(),
910 4 : addr: mock_addr(),
911 4 : conn_id: 1,
912 4 : appname: None,
913 4 : feedback,
914 4 : };
915 4 : wss.slots.push(Some(walsender_state))
916 4 : }
917 :
918 : // form standby feedback with given hot standby feedback ts/xmin and the
919 : // rest set to dummy values.
920 4 : fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
921 4 : ReplicationFeedback::Standby(StandbyFeedback {
922 4 : reply: StandbyReply::empty(),
923 4 : hs_feedback: HotStandbyFeedback {
924 4 : ts,
925 4 : xmin,
926 4 : catalog_xmin: 0,
927 4 : },
928 4 : })
929 4 : }
930 :
931 : // test that hs aggregation works as expected
932 : #[test]
933 1 : fn test_hs_feedback_no_valid() {
934 1 : let mut wss = WalSendersShared::new();
935 1 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
936 1 : wss.update_reply_feedback();
937 1 : assert_eq!(
938 1 : wss.agg_standby_feedback.hs_feedback.xmin,
939 1 : INVALID_FULL_TRANSACTION_ID
940 1 : );
941 1 : }
942 :
943 : #[test]
944 1 : fn test_hs_feedback() {
945 1 : let mut wss = WalSendersShared::new();
946 1 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
947 1 : push_feedback(&mut wss, hs_feedback(1, 42));
948 1 : push_feedback(&mut wss, hs_feedback(1, 64));
949 1 : wss.update_reply_feedback();
950 1 : assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42);
951 1 : }
952 : }
|