Line data Source code
1 : //! This module implements the streaming side of replication protocol, starting
2 : //! with the "START_REPLICATION" message, and registry of walsenders.
3 :
4 : use std::cmp::{max, min};
5 : use std::net::SocketAddr;
6 : use std::sync::Arc;
7 : use std::time::Duration;
8 :
9 : use anyhow::{Context as AnyhowContext, bail};
10 : use bytes::Bytes;
11 : use futures::FutureExt;
12 : use itertools::Itertools;
13 : use parking_lot::Mutex;
14 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend, PostgresBackendReader, QueryError};
15 : use postgres_ffi::{MAX_SEND_SIZE, PgMajorVersion, get_current_timestamp};
16 : use postgres_ffi_types::TimestampTz;
17 : use pq_proto::{BeMessage, WalSndKeepAlive, XLogDataBody};
18 : use safekeeper_api::Term;
19 : use safekeeper_api::models::{
20 : HotStandbyFeedback, INVALID_FULL_TRANSACTION_ID, ReplicationFeedback, StandbyFeedback,
21 : StandbyReply,
22 : };
23 : use tokio::io::{AsyncRead, AsyncWrite};
24 : use tokio::sync::watch::Receiver;
25 : use tokio::time::timeout;
26 : use tracing::*;
27 : use utils::bin_ser::BeSer;
28 : use utils::failpoint_support;
29 : use utils::lsn::Lsn;
30 : use utils::pageserver_feedback::PageserverFeedback;
31 : use utils::postgres_client::PostgresClientProtocol;
32 :
33 : use crate::handler::SafekeeperPostgresHandler;
34 : use crate::metrics::{RECEIVED_PS_FEEDBACKS, WAL_READERS};
35 : use crate::receive_wal::WalReceivers;
36 : use crate::safekeeper::TermLsn;
37 : use crate::send_interpreted_wal::{
38 : Batch, InterpretedWalReader, InterpretedWalReaderHandle, InterpretedWalSender,
39 : };
40 : use crate::timeline::WalResidentTimeline;
41 : use crate::wal_reader_stream::StreamingWalReader;
42 : use crate::wal_storage::WalReader;
43 :
44 : // See: https://www.postgresql.org/docs/13/protocol-replication.html
45 : const HOT_STANDBY_FEEDBACK_TAG_BYTE: u8 = b'h';
46 : const STANDBY_STATUS_UPDATE_TAG_BYTE: u8 = b'r';
47 : // neon extension of replication protocol
48 : const NEON_STATUS_UPDATE_TAG_BYTE: u8 = b'z';
49 :
50 : /// WalSenders registry. Timeline holds it (wrapped in Arc).
51 : pub struct WalSenders {
52 : mutex: Mutex<WalSendersShared>,
53 : walreceivers: Arc<WalReceivers>,
54 : }
55 :
56 : pub struct WalSendersTimelineMetricValues {
57 : pub ps_feedback_counter: u64,
58 : pub ps_corruption_detected: bool,
59 : pub last_ps_feedback: PageserverFeedback,
60 : pub interpreted_wal_reader_tasks: usize,
61 : }
62 :
63 : impl WalSenders {
64 5 : pub fn new(walreceivers: Arc<WalReceivers>) -> Arc<WalSenders> {
65 5 : Arc::new(WalSenders {
66 5 : mutex: Mutex::new(WalSendersShared::new()),
67 5 : walreceivers,
68 5 : })
69 5 : }
70 :
71 : /// Register new walsender. Returned guard provides access to the slot and
72 : /// automatically deregisters in Drop.
73 0 : fn register(self: &Arc<WalSenders>, walsender_state: WalSenderState) -> WalSenderGuard {
74 0 : let slots = &mut self.mutex.lock().slots;
75 : // find empty slot or create new one
76 0 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
77 0 : slots[pos] = Some(walsender_state);
78 0 : pos
79 : } else {
80 0 : let pos = slots.len();
81 0 : slots.push(Some(walsender_state));
82 0 : pos
83 : };
84 0 : WalSenderGuard {
85 0 : id: pos,
86 0 : walsenders: self.clone(),
87 0 : }
88 0 : }
89 :
90 0 : fn create_or_update_interpreted_reader<
91 0 : FUp: FnOnce(&Arc<InterpretedWalReaderHandle>) -> anyhow::Result<()>,
92 0 : FNew: FnOnce() -> InterpretedWalReaderHandle,
93 0 : >(
94 0 : self: &Arc<WalSenders>,
95 0 : id: WalSenderId,
96 0 : start_pos: Lsn,
97 0 : max_delta_for_fanout: Option<u64>,
98 0 : update: FUp,
99 0 : create: FNew,
100 0 : ) -> anyhow::Result<()> {
101 0 : let state = &mut self.mutex.lock();
102 :
103 0 : let mut selected_interpreted_reader = None;
104 0 : for slot in state.slots.iter().flatten() {
105 0 : if let WalSenderState::Interpreted(slot_state) = slot {
106 0 : if let Some(ref interpreted_reader) = slot_state.interpreted_wal_reader {
107 0 : let select = match (interpreted_reader.current_position(), max_delta_for_fanout)
108 : {
109 0 : (Some(pos), Some(max_delta)) => {
110 0 : let delta = pos.0.abs_diff(start_pos.0);
111 0 : delta <= max_delta
112 : }
113 : // Reader is not active
114 0 : (None, _) => false,
115 : // Gating fanout by max delta is disabled.
116 : // Attach to any active reader.
117 0 : (_, None) => true,
118 : };
119 :
120 0 : if select {
121 0 : selected_interpreted_reader = Some(interpreted_reader.clone());
122 0 : break;
123 0 : }
124 0 : }
125 0 : }
126 : }
127 :
128 0 : let slot = state.get_slot_mut(id);
129 0 : let slot_state = match slot {
130 0 : WalSenderState::Interpreted(s) => s,
131 0 : WalSenderState::Vanilla(_) => unreachable!(),
132 : };
133 :
134 0 : let selected_or_new = match selected_interpreted_reader {
135 0 : Some(selected) => {
136 0 : update(&selected)?;
137 0 : selected
138 : }
139 0 : None => Arc::new(create()),
140 : };
141 :
142 0 : slot_state.interpreted_wal_reader = Some(selected_or_new);
143 :
144 0 : Ok(())
145 0 : }
146 :
147 : /// Get state of all walsenders.
148 0 : pub fn get_all_public(self: &Arc<WalSenders>) -> Vec<safekeeper_api::models::WalSenderState> {
149 0 : self.mutex
150 0 : .lock()
151 0 : .slots
152 0 : .iter()
153 0 : .flatten()
154 0 : .map(|state| match state {
155 0 : WalSenderState::Vanilla(s) => {
156 0 : safekeeper_api::models::WalSenderState::Vanilla(s.clone())
157 : }
158 0 : WalSenderState::Interpreted(s) => {
159 0 : safekeeper_api::models::WalSenderState::Interpreted(s.public_state.clone())
160 : }
161 0 : })
162 0 : .collect()
163 0 : }
164 :
165 : /// Get LSN of the most lagging pageserver receiver. Return None if there are no
166 : /// active walsenders.
167 0 : pub fn laggard_lsn(self: &Arc<WalSenders>) -> Option<Lsn> {
168 0 : self.mutex
169 0 : .lock()
170 0 : .slots
171 0 : .iter()
172 0 : .flatten()
173 0 : .filter_map(|s| match s.get_feedback() {
174 0 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.last_received_lsn),
175 0 : ReplicationFeedback::Standby(_) => None,
176 0 : })
177 0 : .min()
178 0 : }
179 :
180 : /// Returns total counter of pageserver feedbacks received and last feedback.
181 0 : pub fn info_for_metrics(self: &Arc<WalSenders>) -> WalSendersTimelineMetricValues {
182 0 : let shared = self.mutex.lock();
183 :
184 0 : let interpreted_wal_reader_tasks = shared
185 0 : .slots
186 0 : .iter()
187 0 : .filter_map(|ss| match ss {
188 0 : Some(WalSenderState::Interpreted(int)) => int.interpreted_wal_reader.as_ref(),
189 0 : Some(WalSenderState::Vanilla(_)) => None,
190 0 : None => None,
191 0 : })
192 0 : .unique_by(|reader| Arc::as_ptr(reader))
193 0 : .count();
194 :
195 0 : WalSendersTimelineMetricValues {
196 0 : ps_feedback_counter: shared.ps_feedback_counter,
197 0 : ps_corruption_detected: shared.ps_corruption_detected,
198 0 : last_ps_feedback: shared.last_ps_feedback,
199 0 : interpreted_wal_reader_tasks,
200 0 : }
201 0 : }
202 :
203 : /// Get aggregated hot standby feedback (we send it to compute).
204 620 : pub fn get_hotstandby(self: &Arc<WalSenders>) -> StandbyFeedback {
205 620 : self.mutex.lock().agg_standby_feedback
206 620 : }
207 :
208 : /// Record new pageserver feedback, update aggregated values.
209 0 : fn record_ps_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &PageserverFeedback) {
210 0 : let mut shared = self.mutex.lock();
211 0 : *shared.get_slot_mut(id).get_mut_feedback() = ReplicationFeedback::Pageserver(*feedback);
212 0 : shared.last_ps_feedback = *feedback;
213 0 : shared.ps_feedback_counter += 1;
214 0 : if feedback.corruption_detected {
215 0 : shared.ps_corruption_detected = true;
216 0 : }
217 0 : drop(shared);
218 :
219 0 : RECEIVED_PS_FEEDBACKS.inc();
220 :
221 : // send feedback to connected walproposers
222 0 : self.walreceivers.broadcast_pageserver_feedback(*feedback);
223 0 : }
224 :
225 : /// Record standby reply.
226 0 : fn record_standby_reply(self: &Arc<WalSenders>, id: WalSenderId, reply: &StandbyReply) {
227 0 : let mut shared = self.mutex.lock();
228 0 : let slot = shared.get_slot_mut(id);
229 0 : debug!(
230 0 : "Record standby reply: ts={} apply_lsn={}",
231 : reply.reply_ts, reply.apply_lsn
232 : );
233 0 : match &mut slot.get_mut_feedback() {
234 0 : ReplicationFeedback::Standby(sf) => sf.reply = *reply,
235 : ReplicationFeedback::Pageserver(_) => {
236 0 : *slot.get_mut_feedback() = ReplicationFeedback::Standby(StandbyFeedback {
237 0 : reply: *reply,
238 0 : hs_feedback: HotStandbyFeedback::empty(),
239 0 : })
240 : }
241 : }
242 0 : }
243 :
244 : /// Record hot standby feedback, update aggregated value.
245 0 : fn record_hs_feedback(self: &Arc<WalSenders>, id: WalSenderId, feedback: &HotStandbyFeedback) {
246 0 : let mut shared = self.mutex.lock();
247 0 : let slot = shared.get_slot_mut(id);
248 0 : match &mut slot.get_mut_feedback() {
249 0 : ReplicationFeedback::Standby(sf) => sf.hs_feedback = *feedback,
250 : ReplicationFeedback::Pageserver(_) => {
251 0 : *slot.get_mut_feedback() = ReplicationFeedback::Standby(StandbyFeedback {
252 0 : reply: StandbyReply::empty(),
253 0 : hs_feedback: *feedback,
254 0 : })
255 : }
256 : }
257 0 : shared.update_reply_feedback();
258 0 : }
259 :
260 : /// Get remote_consistent_lsn reported by the pageserver. Returns None if
261 : /// client is not pageserver.
262 0 : pub fn get_ws_remote_consistent_lsn(self: &Arc<WalSenders>, id: WalSenderId) -> Option<Lsn> {
263 0 : let shared = self.mutex.lock();
264 0 : let slot = shared.get_slot(id);
265 0 : match slot.get_feedback() {
266 0 : ReplicationFeedback::Pageserver(feedback) => Some(feedback.remote_consistent_lsn),
267 0 : _ => None,
268 : }
269 0 : }
270 :
271 : /// Unregister walsender.
272 0 : fn unregister(self: &Arc<WalSenders>, id: WalSenderId) {
273 0 : let mut shared = self.mutex.lock();
274 0 : shared.slots[id] = None;
275 0 : shared.update_reply_feedback();
276 0 : }
277 : }
278 :
279 : struct WalSendersShared {
280 : // aggregated over all walsenders value
281 : agg_standby_feedback: StandbyFeedback,
282 : // last feedback ever received from any pageserver, empty if none
283 : last_ps_feedback: PageserverFeedback,
284 : // total counter of pageserver feedbacks received
285 : ps_feedback_counter: u64,
286 : // Hadron: true iff we received a pageserver feedback that incidated
287 : // data corruption in the timeline
288 : ps_corruption_detected: bool,
289 : slots: Vec<Option<WalSenderState>>,
290 : }
291 :
292 : /// Safekeeper internal definitions of wal sender state
293 : ///
294 : /// As opposed to [`safekeeper_api::models::WalSenderState`] these struct may
295 : /// include state that we don not wish to expose to the public api.
296 : #[derive(Debug, Clone)]
297 : pub(crate) enum WalSenderState {
298 : Vanilla(VanillaWalSenderInternalState),
299 : Interpreted(InterpretedWalSenderInternalState),
300 : }
301 :
302 : type VanillaWalSenderInternalState = safekeeper_api::models::VanillaWalSenderState;
303 :
304 : #[derive(Debug, Clone)]
305 : pub(crate) struct InterpretedWalSenderInternalState {
306 : public_state: safekeeper_api::models::InterpretedWalSenderState,
307 : interpreted_wal_reader: Option<Arc<InterpretedWalReaderHandle>>,
308 : }
309 :
310 : impl WalSenderState {
311 0 : fn get_addr(&self) -> &SocketAddr {
312 0 : match self {
313 0 : WalSenderState::Vanilla(state) => &state.addr,
314 0 : WalSenderState::Interpreted(state) => &state.public_state.addr,
315 : }
316 0 : }
317 :
318 4 : fn get_feedback(&self) -> &ReplicationFeedback {
319 4 : match self {
320 4 : WalSenderState::Vanilla(state) => &state.feedback,
321 0 : WalSenderState::Interpreted(state) => &state.public_state.feedback,
322 : }
323 4 : }
324 :
325 0 : fn get_mut_feedback(&mut self) -> &mut ReplicationFeedback {
326 0 : match self {
327 0 : WalSenderState::Vanilla(state) => &mut state.feedback,
328 0 : WalSenderState::Interpreted(state) => &mut state.public_state.feedback,
329 : }
330 0 : }
331 : }
332 :
333 : impl WalSendersShared {
334 7 : fn new() -> Self {
335 7 : WalSendersShared {
336 7 : agg_standby_feedback: StandbyFeedback::empty(),
337 7 : last_ps_feedback: PageserverFeedback::empty(),
338 7 : ps_feedback_counter: 0,
339 7 : ps_corruption_detected: false,
340 7 : slots: Vec::new(),
341 7 : }
342 7 : }
343 :
344 : /// Get content of provided id slot, it must exist.
345 0 : fn get_slot(&self, id: WalSenderId) -> &WalSenderState {
346 0 : self.slots[id].as_ref().expect("walsender doesn't exist")
347 0 : }
348 :
349 : /// Get mut content of provided id slot, it must exist.
350 0 : fn get_slot_mut(&mut self, id: WalSenderId) -> &mut WalSenderState {
351 0 : self.slots[id].as_mut().expect("walsender doesn't exist")
352 0 : }
353 :
354 : /// Update aggregated hot standy and normal reply feedbacks. We just take min of valid xmins
355 : /// and ts.
356 2 : fn update_reply_feedback(&mut self) {
357 2 : let mut agg = HotStandbyFeedback::empty();
358 2 : let mut reply_agg = StandbyReply::empty();
359 4 : for ws_state in self.slots.iter().flatten() {
360 4 : if let ReplicationFeedback::Standby(standby_feedback) = ws_state.get_feedback() {
361 4 : let hs_feedback = standby_feedback.hs_feedback;
362 : // doing Option math like op1.iter().chain(op2.iter()).min()
363 : // would be nicer, but we serialize/deserialize this struct
364 : // directly, so leave as is for now
365 4 : if hs_feedback.xmin != INVALID_FULL_TRANSACTION_ID {
366 2 : if agg.xmin != INVALID_FULL_TRANSACTION_ID {
367 1 : agg.xmin = min(agg.xmin, hs_feedback.xmin);
368 1 : } else {
369 1 : agg.xmin = hs_feedback.xmin;
370 1 : }
371 2 : agg.ts = max(agg.ts, hs_feedback.ts);
372 2 : }
373 4 : if hs_feedback.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
374 0 : if agg.catalog_xmin != INVALID_FULL_TRANSACTION_ID {
375 0 : agg.catalog_xmin = min(agg.catalog_xmin, hs_feedback.catalog_xmin);
376 0 : } else {
377 0 : agg.catalog_xmin = hs_feedback.catalog_xmin;
378 0 : }
379 0 : agg.ts = max(agg.ts, hs_feedback.ts);
380 4 : }
381 4 : let reply = standby_feedback.reply;
382 4 : if reply.write_lsn != Lsn::INVALID {
383 0 : if reply_agg.write_lsn != Lsn::INVALID {
384 0 : reply_agg.write_lsn = Lsn::min(reply_agg.write_lsn, reply.write_lsn);
385 0 : } else {
386 0 : reply_agg.write_lsn = reply.write_lsn;
387 0 : }
388 4 : }
389 4 : if reply.flush_lsn != Lsn::INVALID {
390 0 : if reply_agg.flush_lsn != Lsn::INVALID {
391 0 : reply_agg.flush_lsn = Lsn::min(reply_agg.flush_lsn, reply.flush_lsn);
392 0 : } else {
393 0 : reply_agg.flush_lsn = reply.flush_lsn;
394 0 : }
395 4 : }
396 4 : if reply.apply_lsn != Lsn::INVALID {
397 0 : if reply_agg.apply_lsn != Lsn::INVALID {
398 0 : reply_agg.apply_lsn = Lsn::min(reply_agg.apply_lsn, reply.apply_lsn);
399 0 : } else {
400 0 : reply_agg.apply_lsn = reply.apply_lsn;
401 0 : }
402 4 : }
403 4 : if reply.reply_ts != 0 {
404 0 : if reply_agg.reply_ts != 0 {
405 0 : reply_agg.reply_ts = TimestampTz::min(reply_agg.reply_ts, reply.reply_ts);
406 0 : } else {
407 0 : reply_agg.reply_ts = reply.reply_ts;
408 0 : }
409 4 : }
410 0 : }
411 : }
412 2 : self.agg_standby_feedback = StandbyFeedback {
413 2 : reply: reply_agg,
414 2 : hs_feedback: agg,
415 2 : };
416 2 : }
417 : }
418 :
419 : // id of the occupied slot in WalSenders to access it (and save in the
420 : // WalSenderGuard). We could give Arc directly to the slot, but there is not
421 : // much sense in that as values aggregation which is performed on each feedback
422 : // receival iterates over all walsenders.
423 : pub type WalSenderId = usize;
424 :
425 : /// Scope guard to access slot in WalSenders registry and unregister from it in
426 : /// Drop.
427 : pub struct WalSenderGuard {
428 : id: WalSenderId,
429 : walsenders: Arc<WalSenders>,
430 : }
431 :
432 : impl WalSenderGuard {
433 0 : pub fn id(&self) -> WalSenderId {
434 0 : self.id
435 0 : }
436 :
437 0 : pub fn walsenders(&self) -> &Arc<WalSenders> {
438 0 : &self.walsenders
439 0 : }
440 : }
441 :
442 : impl Drop for WalSenderGuard {
443 0 : fn drop(&mut self) {
444 0 : self.walsenders.unregister(self.id);
445 0 : }
446 : }
447 :
448 : impl SafekeeperPostgresHandler {
449 : /// Wrapper around handle_start_replication_guts handling result. Error is
450 : /// handled here while we're still in walsender ttid span; with API
451 : /// extension, this can probably be moved into postgres_backend.
452 0 : pub async fn handle_start_replication<IO: AsyncRead + AsyncWrite + Unpin + Send>(
453 0 : &mut self,
454 0 : pgb: &mut PostgresBackend<IO>,
455 0 : start_pos: Lsn,
456 0 : term: Option<Term>,
457 0 : ) -> Result<(), QueryError> {
458 0 : let tli = self
459 0 : .global_timelines
460 0 : .get(self.ttid)
461 0 : .map_err(|e| QueryError::Other(e.into()))?;
462 0 : let residence_guard = tli.wal_residence_guard().await?;
463 :
464 0 : if let Err(end) = self
465 0 : .handle_start_replication_guts(pgb, start_pos, term, residence_guard)
466 0 : .await
467 : {
468 0 : let info = tli.get_safekeeper_info(&self.conf).await;
469 : // Log the result and probably send it to the client, closing the stream.
470 0 : pgb.handle_copy_stream_end(end)
471 0 : .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.flush_lsn)))
472 0 : .await;
473 0 : }
474 0 : Ok(())
475 0 : }
476 :
477 0 : pub async fn handle_start_replication_guts<IO: AsyncRead + AsyncWrite + Unpin + Send>(
478 0 : &mut self,
479 0 : pgb: &mut PostgresBackend<IO>,
480 0 : start_pos: Lsn,
481 0 : term: Option<Term>,
482 0 : tli: WalResidentTimeline,
483 0 : ) -> Result<(), CopyStreamHandlerEnd> {
484 0 : let appname = self.appname.clone();
485 :
486 : // Use a guard object to remove our entry from the timeline when we are done.
487 0 : let ws_guard = match self.protocol() {
488 0 : PostgresClientProtocol::Vanilla => Arc::new(tli.get_walsenders().register(
489 0 : WalSenderState::Vanilla(VanillaWalSenderInternalState {
490 0 : ttid: self.ttid,
491 0 : addr: *pgb.get_peer_addr(),
492 0 : conn_id: self.conn_id,
493 0 : appname: self.appname.clone(),
494 0 : feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
495 0 : }),
496 : )),
497 0 : PostgresClientProtocol::Interpreted { .. } => Arc::new(tli.get_walsenders().register(
498 0 : WalSenderState::Interpreted(InterpretedWalSenderInternalState {
499 0 : public_state: safekeeper_api::models::InterpretedWalSenderState {
500 0 : ttid: self.ttid,
501 0 : shard: self.shard.unwrap(),
502 0 : addr: *pgb.get_peer_addr(),
503 0 : conn_id: self.conn_id,
504 0 : appname: self.appname.clone(),
505 0 : feedback: ReplicationFeedback::Pageserver(PageserverFeedback::empty()),
506 0 : },
507 0 : interpreted_wal_reader: None,
508 0 : }),
509 : )),
510 : };
511 :
512 : // Walsender can operate in one of two modes which we select by
513 : // application_name: give only committed WAL (used by pageserver) or all
514 : // existing WAL (up to flush_lsn, used by walproposer or peer recovery).
515 : // The second case is always driven by a consensus leader which term
516 : // must be supplied.
517 0 : let end_watch = if term.is_some() {
518 0 : EndWatch::Flush(tli.get_term_flush_lsn_watch_rx())
519 : } else {
520 0 : EndWatch::Commit(tli.get_commit_lsn_watch_rx())
521 : };
522 : // we don't check term here; it will be checked on first waiting/WAL reading anyway.
523 0 : let end_pos = end_watch.get();
524 :
525 0 : if end_pos < start_pos {
526 0 : info!(
527 0 : "requested start_pos {} is ahead of available WAL end_pos {}",
528 : start_pos, end_pos
529 : );
530 0 : }
531 :
532 0 : info!(
533 0 : "starting streaming from {:?}, available WAL ends at {}, recovery={}, appname={:?}, protocol={:?}",
534 : start_pos,
535 : end_pos,
536 0 : matches!(end_watch, EndWatch::Flush(_)),
537 : appname,
538 0 : self.protocol(),
539 : );
540 :
541 : // switch to copy
542 0 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
543 :
544 0 : let wal_reader = tli.get_walreader(start_pos).await?;
545 :
546 : // Split to concurrently receive and send data; replies are generally
547 : // not synchronized with sends, so this avoids deadlocks.
548 0 : let reader = pgb.split().context("START_REPLICATION split")?;
549 :
550 0 : let send_fut = match self.protocol() {
551 : PostgresClientProtocol::Vanilla => {
552 0 : let sender = WalSender {
553 0 : pgb,
554 : // should succeed since we're already holding another guard
555 0 : tli: tli.wal_residence_guard().await?,
556 0 : appname: appname.clone(),
557 0 : start_pos,
558 0 : end_pos,
559 0 : term,
560 0 : end_watch,
561 0 : ws_guard: ws_guard.clone(),
562 0 : wal_reader,
563 0 : send_buf: vec![0u8; MAX_SEND_SIZE],
564 : };
565 :
566 0 : FutureExt::boxed(sender.run())
567 : }
568 : PostgresClientProtocol::Interpreted {
569 0 : format,
570 0 : compression,
571 : } => {
572 0 : let pg_version =
573 0 : PgMajorVersion::try_from(tli.tli.get_state().await.1.server.pg_version)
574 0 : .unwrap();
575 0 : let end_watch_view = end_watch.view();
576 0 : let wal_residence_guard = tli.wal_residence_guard().await?;
577 0 : let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(2);
578 0 : let shard = self.shard.unwrap();
579 :
580 0 : if self.conf.wal_reader_fanout && !shard.is_unsharded() {
581 0 : let ws_id = ws_guard.id();
582 0 : ws_guard.walsenders().create_or_update_interpreted_reader(
583 0 : ws_id,
584 0 : start_pos,
585 0 : self.conf.max_delta_for_fanout,
586 : {
587 0 : let tx = tx.clone();
588 0 : |reader| {
589 0 : tracing::info!(
590 0 : "Fanning out interpreted wal reader at {}",
591 : start_pos
592 : );
593 0 : reader
594 0 : .fanout(shard, tx, start_pos)
595 0 : .with_context(|| "Failed to fan out reader")
596 0 : }
597 : },
598 0 : || {
599 0 : tracing::info!("Spawning interpreted wal reader at {}", start_pos);
600 :
601 0 : let wal_stream = StreamingWalReader::new(
602 0 : wal_residence_guard,
603 0 : term,
604 0 : start_pos,
605 0 : end_pos,
606 0 : end_watch,
607 : MAX_SEND_SIZE,
608 : );
609 :
610 0 : InterpretedWalReader::spawn(
611 0 : wal_stream, start_pos, tx, shard, pg_version, &appname,
612 : )
613 0 : },
614 0 : )?;
615 :
616 0 : let sender = InterpretedWalSender {
617 0 : format,
618 0 : compression,
619 0 : appname,
620 0 : tli: tli.wal_residence_guard().await?,
621 0 : start_lsn: start_pos,
622 0 : pgb,
623 0 : end_watch_view,
624 0 : wal_sender_guard: ws_guard.clone(),
625 0 : rx,
626 : };
627 :
628 0 : FutureExt::boxed(sender.run())
629 : } else {
630 0 : let wal_reader = StreamingWalReader::new(
631 0 : wal_residence_guard,
632 0 : term,
633 0 : start_pos,
634 0 : end_pos,
635 0 : end_watch,
636 : MAX_SEND_SIZE,
637 : );
638 :
639 0 : let reader = InterpretedWalReader::new(
640 0 : wal_reader, start_pos, tx, shard, pg_version, None,
641 : );
642 :
643 0 : let sender = InterpretedWalSender {
644 0 : format,
645 0 : compression,
646 0 : appname: appname.clone(),
647 0 : tli: tli.wal_residence_guard().await?,
648 0 : start_lsn: start_pos,
649 0 : pgb,
650 0 : end_watch_view,
651 0 : wal_sender_guard: ws_guard.clone(),
652 0 : rx,
653 : };
654 :
655 0 : FutureExt::boxed(async move {
656 : // Sender returns an Err on all code paths.
657 : // If the sender finishes first, we will drop the reader future.
658 : // If the reader finishes first, the sender will finish too since
659 : // the wal sender has dropped.
660 0 : let res = tokio::try_join!(sender.run(), reader.run(start_pos, &appname));
661 0 : match res.map(|_| ()) {
662 0 : Ok(_) => unreachable!("sender finishes with Err by convention"),
663 0 : err_res => err_res,
664 : }
665 0 : })
666 : }
667 : }
668 : };
669 :
670 0 : let tli_cancel = tli.cancel.clone();
671 :
672 0 : let mut reply_reader = ReplyReader {
673 0 : reader,
674 0 : ws_guard: ws_guard.clone(),
675 0 : tli,
676 0 : };
677 :
678 0 : let res = tokio::select! {
679 : // todo: add read|write .context to these errors
680 0 : r = send_fut => r,
681 0 : r = reply_reader.run() => r,
682 0 : _ = tli_cancel.cancelled() => {
683 0 : return Err(CopyStreamHandlerEnd::Cancelled);
684 : }
685 : };
686 :
687 0 : let ws_state = ws_guard
688 0 : .walsenders
689 0 : .mutex
690 0 : .lock()
691 0 : .get_slot(ws_guard.id)
692 0 : .clone();
693 0 : info!(
694 0 : "finished streaming to {}, feedback={:?}",
695 0 : ws_state.get_addr(),
696 0 : ws_state.get_feedback(),
697 : );
698 :
699 : // Join pg backend back.
700 0 : pgb.unsplit(reply_reader.reader)?;
701 :
702 0 : res
703 0 : }
704 : }
705 :
706 : /// TODO(vlad): maybe lift this instead
707 : /// Walsender streams either up to commit_lsn (normally) or flush_lsn in the
708 : /// given term (recovery by walproposer or peer safekeeper).
709 : #[derive(Clone)]
710 : pub(crate) enum EndWatch {
711 : Commit(Receiver<Lsn>),
712 : Flush(Receiver<TermLsn>),
713 : }
714 :
715 : impl EndWatch {
716 0 : pub(crate) fn view(&self) -> EndWatchView {
717 0 : EndWatchView(self.clone())
718 0 : }
719 :
720 : /// Get current end of WAL.
721 12 : pub(crate) fn get(&self) -> Lsn {
722 12 : match self {
723 12 : EndWatch::Commit(r) => *r.borrow(),
724 0 : EndWatch::Flush(r) => r.borrow().lsn,
725 : }
726 12 : }
727 :
728 : /// Wait for the update.
729 7 : pub(crate) async fn changed(&mut self) -> anyhow::Result<()> {
730 7 : match self {
731 7 : EndWatch::Commit(r) => r.changed().await?,
732 0 : EndWatch::Flush(r) => r.changed().await?,
733 : }
734 3 : Ok(())
735 3 : }
736 :
737 4 : pub(crate) async fn wait_for_lsn(
738 4 : &mut self,
739 4 : lsn: Lsn,
740 4 : client_term: Option<Term>,
741 4 : ) -> anyhow::Result<Lsn> {
742 : loop {
743 7 : let end_pos = self.get();
744 7 : if end_pos > lsn {
745 0 : return Ok(end_pos);
746 7 : }
747 7 : if let EndWatch::Flush(rx) = &self {
748 0 : let curr_term = rx.borrow().term;
749 0 : if let Some(client_term) = client_term {
750 0 : if curr_term != client_term {
751 0 : bail!("term changed: requested {}, now {}", client_term, curr_term);
752 0 : }
753 0 : }
754 7 : }
755 7 : self.changed().await?;
756 : }
757 0 : }
758 : }
759 :
760 : pub(crate) struct EndWatchView(EndWatch);
761 :
762 : impl EndWatchView {
763 0 : pub(crate) fn get(&self) -> Lsn {
764 0 : self.0.get()
765 0 : }
766 : }
767 : /// A half driving sending WAL.
768 : struct WalSender<'a, IO> {
769 : pgb: &'a mut PostgresBackend<IO>,
770 : tli: WalResidentTimeline,
771 : appname: Option<String>,
772 : // Position since which we are sending next chunk.
773 : start_pos: Lsn,
774 : // WAL up to this position is known to be locally available.
775 : // Usually this is the same as the latest commit_lsn, but in case of
776 : // walproposer recovery, this is flush_lsn.
777 : //
778 : // We send this LSN to the receiver as wal_end, so that it knows how much
779 : // WAL this safekeeper has. This LSN should be as fresh as possible.
780 : end_pos: Lsn,
781 : /// When streaming uncommitted part, the term the client acts as the leader
782 : /// in. Streaming is stopped if local term changes to a different (higher)
783 : /// value.
784 : term: Option<Term>,
785 : /// Watch channel receiver to learn end of available WAL (and wait for its advancement).
786 : end_watch: EndWatch,
787 : ws_guard: Arc<WalSenderGuard>,
788 : wal_reader: WalReader,
789 : // buffer for readling WAL into to send it
790 : send_buf: Vec<u8>,
791 : }
792 :
793 : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
794 :
795 : impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
796 : /// Send WAL until
797 : /// - an error occurs
798 : /// - receiver is caughtup and there is no computes (if streaming up to commit_lsn)
799 : /// - timeline's cancellation token fires
800 : ///
801 : /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
802 : /// convenience.
803 0 : async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
804 0 : let metric = WAL_READERS
805 0 : .get_metric_with_label_values(&[
806 0 : "future",
807 0 : self.appname.as_deref().unwrap_or("safekeeper"),
808 0 : ])
809 0 : .unwrap();
810 :
811 0 : metric.inc();
812 0 : scopeguard::defer! {
813 : metric.dec();
814 : }
815 :
816 : loop {
817 : // Wait for the next portion if it is not there yet, or just
818 : // update our end of WAL available for sending value, we
819 : // communicate it to the receiver.
820 0 : self.wait_wal().await?;
821 0 : assert!(
822 0 : self.end_pos > self.start_pos,
823 0 : "nothing to send after waiting for WAL"
824 : );
825 :
826 : // try to send as much as available, capped by MAX_SEND_SIZE
827 0 : let mut chunk_end_pos = self.start_pos + MAX_SEND_SIZE as u64;
828 : // if we went behind available WAL, back off
829 0 : if chunk_end_pos >= self.end_pos {
830 0 : chunk_end_pos = self.end_pos;
831 0 : } else {
832 0 : // If sending not up to end pos, round down to page boundary to
833 0 : // avoid breaking WAL record not at page boundary, as protocol
834 0 : // demands. See walsender.c (XLogSendPhysical).
835 0 : chunk_end_pos = chunk_end_pos
836 0 : .checked_sub(chunk_end_pos.block_offset())
837 0 : .unwrap();
838 0 : }
839 0 : let send_size = (chunk_end_pos.0 - self.start_pos.0) as usize;
840 0 : let send_buf = &mut self.send_buf[..send_size];
841 : let send_size: usize;
842 : {
843 : // If uncommitted part is being pulled, check that the term is
844 : // still the expected one.
845 0 : let _term_guard = if let Some(t) = self.term {
846 0 : Some(self.tli.acquire_term(t).await?)
847 : } else {
848 0 : None
849 : };
850 : // Read WAL into buffer. send_size can be additionally capped to
851 : // segment boundary here.
852 0 : send_size = self.wal_reader.read(send_buf).await?
853 : };
854 0 : let send_buf = &send_buf[..send_size];
855 :
856 : // and send it, while respecting Timeline::cancel
857 0 : let msg = BeMessage::XLogData(XLogDataBody {
858 0 : wal_start: self.start_pos.0,
859 0 : wal_end: self.end_pos.0,
860 0 : timestamp: get_current_timestamp(),
861 0 : data: send_buf,
862 0 : });
863 0 : self.pgb.write_message(&msg).await?;
864 :
865 0 : if let Some(appname) = &self.appname {
866 0 : if appname == "replica" {
867 0 : failpoint_support::sleep_millis_async!("sk-send-wal-replica-sleep");
868 0 : }
869 0 : }
870 0 : trace!(
871 0 : "sent {} bytes of WAL {}-{}",
872 : send_size,
873 : self.start_pos,
874 0 : self.start_pos + send_size as u64
875 : );
876 0 : self.start_pos += send_size as u64;
877 : }
878 0 : }
879 :
880 : /// wait until we have WAL to stream, sending keepalives and checking for
881 : /// exit in the meanwhile
882 0 : async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
883 : loop {
884 0 : self.end_pos = self.end_watch.get();
885 0 : let have_something_to_send = (|| {
886 0 : fail::fail_point!(
887 0 : "sk-pause-send",
888 0 : self.appname.as_deref() != Some("pageserver"),
889 0 : |_| { false }
890 : );
891 0 : self.end_pos > self.start_pos
892 : })();
893 :
894 0 : if have_something_to_send {
895 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
896 0 : return Ok(());
897 0 : }
898 :
899 : // Wait for WAL to appear, now self.end_pos == self.start_pos.
900 0 : if let Some(lsn) = self.wait_for_lsn().await? {
901 0 : self.end_pos = lsn;
902 0 : trace!("got end_pos {:?}, streaming", self.end_pos);
903 0 : return Ok(());
904 0 : }
905 :
906 : // Timed out waiting for WAL, check for termination and send KA.
907 : // Check for termination only if we are streaming up to commit_lsn
908 : // (to pageserver).
909 0 : if let EndWatch::Commit(_) = self.end_watch {
910 0 : if let Some(remote_consistent_lsn) = self
911 0 : .ws_guard
912 0 : .walsenders
913 0 : .get_ws_remote_consistent_lsn(self.ws_guard.id)
914 : {
915 0 : if self.tli.should_walsender_stop(remote_consistent_lsn).await {
916 : // Terminate if there is nothing more to send.
917 : // Note that "ending streaming" part of the string is used by
918 : // pageserver to identify WalReceiverError::SuccessfulCompletion,
919 : // do not change this string without updating pageserver.
920 0 : return Err(CopyStreamHandlerEnd::ServerInitiated(format!(
921 0 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
922 0 : self.appname, self.start_pos,
923 0 : )));
924 0 : }
925 0 : }
926 0 : }
927 :
928 0 : let msg = BeMessage::KeepAlive(WalSndKeepAlive {
929 0 : wal_end: self.end_pos.0,
930 0 : timestamp: get_current_timestamp(),
931 0 : request_reply: true,
932 0 : });
933 :
934 0 : self.pgb.write_message(&msg).await?;
935 : }
936 0 : }
937 :
938 : /// Wait until we have available WAL > start_pos or timeout expires. Returns
939 : /// - Ok(Some(end_pos)) if needed lsn is successfully observed;
940 : /// - Ok(None) if timeout expired;
941 : /// - Err in case of error -- only if 1) term changed while fetching in recovery
942 : /// mode 2) watch channel closed, which must never happen.
943 0 : async fn wait_for_lsn(&mut self) -> anyhow::Result<Option<Lsn>> {
944 0 : let fp = (|| {
945 0 : fail::fail_point!(
946 0 : "sk-pause-send",
947 0 : self.appname.as_deref() != Some("pageserver"),
948 0 : |_| { true }
949 : );
950 0 : false
951 : })();
952 0 : if fp {
953 0 : tokio::time::sleep(POLL_STATE_TIMEOUT).await;
954 0 : return Ok(None);
955 0 : }
956 :
957 0 : let res = timeout(POLL_STATE_TIMEOUT, async move {
958 : loop {
959 0 : let end_pos = self.end_watch.get();
960 0 : if end_pos > self.start_pos {
961 0 : return Ok(end_pos);
962 0 : }
963 0 : if let EndWatch::Flush(rx) = &self.end_watch {
964 0 : let curr_term = rx.borrow().term;
965 0 : if let Some(client_term) = self.term {
966 0 : if curr_term != client_term {
967 0 : bail!("term changed: requested {}, now {}", client_term, curr_term);
968 0 : }
969 0 : }
970 0 : }
971 0 : self.end_watch.changed().await?;
972 : }
973 0 : })
974 0 : .await;
975 :
976 0 : match res {
977 : // success
978 0 : Ok(Ok(commit_lsn)) => Ok(Some(commit_lsn)),
979 : // error inside closure
980 0 : Ok(Err(err)) => Err(err),
981 : // timeout
982 0 : Err(_) => Ok(None),
983 : }
984 0 : }
985 : }
986 :
987 : /// A half driving receiving replies.
988 : struct ReplyReader<IO> {
989 : reader: PostgresBackendReader<IO>,
990 : ws_guard: Arc<WalSenderGuard>,
991 : tli: WalResidentTimeline,
992 : }
993 :
994 : impl<IO: AsyncRead + AsyncWrite + Unpin> ReplyReader<IO> {
995 0 : async fn run(&mut self) -> Result<(), CopyStreamHandlerEnd> {
996 : loop {
997 0 : let msg = self.reader.read_copy_message().await?;
998 0 : self.handle_feedback(&msg).await?
999 : }
1000 0 : }
1001 :
1002 0 : async fn handle_feedback(&mut self, msg: &Bytes) -> anyhow::Result<()> {
1003 0 : match msg.first().cloned() {
1004 : Some(HOT_STANDBY_FEEDBACK_TAG_BYTE) => {
1005 : // Note: deserializing is on m[1..] because we skip the tag byte.
1006 0 : let mut hs_feedback = HotStandbyFeedback::des(&msg[1..])
1007 0 : .context("failed to deserialize HotStandbyFeedback")?;
1008 : // TODO: xmin/catalog_xmin are serialized by walreceiver.c in this way:
1009 : // pq_sendint32(&reply_message, xmin);
1010 : // pq_sendint32(&reply_message, xmin_epoch);
1011 : // So it is two big endian 32-bit words in low endian order!
1012 0 : hs_feedback.xmin = hs_feedback.xmin.rotate_left(32);
1013 0 : hs_feedback.catalog_xmin = hs_feedback.catalog_xmin.rotate_left(32);
1014 0 : self.ws_guard
1015 0 : .walsenders
1016 0 : .record_hs_feedback(self.ws_guard.id, &hs_feedback);
1017 : }
1018 : Some(STANDBY_STATUS_UPDATE_TAG_BYTE) => {
1019 0 : let reply =
1020 0 : StandbyReply::des(&msg[1..]).context("failed to deserialize StandbyReply")?;
1021 0 : self.ws_guard
1022 0 : .walsenders
1023 0 : .record_standby_reply(self.ws_guard.id, &reply);
1024 : }
1025 : Some(NEON_STATUS_UPDATE_TAG_BYTE) => {
1026 : // pageserver sends this.
1027 : // Note: deserializing is on m[9..] because we skip the tag byte and len bytes.
1028 0 : let buf = Bytes::copy_from_slice(&msg[9..]);
1029 0 : let ps_feedback = PageserverFeedback::parse(buf);
1030 :
1031 0 : trace!("PageserverFeedback is {:?}", ps_feedback);
1032 0 : self.ws_guard
1033 0 : .walsenders
1034 0 : .record_ps_feedback(self.ws_guard.id, &ps_feedback);
1035 0 : self.tli
1036 0 : .update_remote_consistent_lsn(ps_feedback.remote_consistent_lsn)
1037 0 : .await;
1038 : // in principle new remote_consistent_lsn could allow to
1039 : // deactivate the timeline, but we check that regularly through
1040 : // broker updated, not need to do it here
1041 : }
1042 0 : _ => warn!("unexpected message {:?}", msg),
1043 : }
1044 0 : Ok(())
1045 0 : }
1046 : }
1047 :
1048 : #[cfg(test)]
1049 : mod tests {
1050 : use safekeeper_api::models::FullTransactionId;
1051 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
1052 :
1053 : use super::*;
1054 :
1055 4 : fn mock_ttid() -> TenantTimelineId {
1056 4 : TenantTimelineId {
1057 4 : tenant_id: TenantId::from_slice(&[0x00; 16]).unwrap(),
1058 4 : timeline_id: TimelineId::from_slice(&[0x00; 16]).unwrap(),
1059 4 : }
1060 4 : }
1061 :
1062 4 : fn mock_addr() -> SocketAddr {
1063 4 : "127.0.0.1:8080".parse().unwrap()
1064 4 : }
1065 :
1066 : // add to wss specified feedback setting other fields to dummy values
1067 4 : fn push_feedback(wss: &mut WalSendersShared, feedback: ReplicationFeedback) {
1068 4 : let walsender_state = WalSenderState::Vanilla(VanillaWalSenderInternalState {
1069 4 : ttid: mock_ttid(),
1070 4 : addr: mock_addr(),
1071 4 : conn_id: 1,
1072 4 : appname: None,
1073 4 : feedback,
1074 4 : });
1075 4 : wss.slots.push(Some(walsender_state))
1076 4 : }
1077 :
1078 : // form standby feedback with given hot standby feedback ts/xmin and the
1079 : // rest set to dummy values.
1080 4 : fn hs_feedback(ts: TimestampTz, xmin: FullTransactionId) -> ReplicationFeedback {
1081 4 : ReplicationFeedback::Standby(StandbyFeedback {
1082 4 : reply: StandbyReply::empty(),
1083 4 : hs_feedback: HotStandbyFeedback {
1084 4 : ts,
1085 4 : xmin,
1086 4 : catalog_xmin: 0,
1087 4 : },
1088 4 : })
1089 4 : }
1090 :
1091 : // test that hs aggregation works as expected
1092 : #[test]
1093 1 : fn test_hs_feedback_no_valid() {
1094 1 : let mut wss = WalSendersShared::new();
1095 1 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
1096 1 : wss.update_reply_feedback();
1097 1 : assert_eq!(
1098 : wss.agg_standby_feedback.hs_feedback.xmin,
1099 : INVALID_FULL_TRANSACTION_ID
1100 : );
1101 1 : }
1102 :
1103 : #[test]
1104 1 : fn test_hs_feedback() {
1105 1 : let mut wss = WalSendersShared::new();
1106 1 : push_feedback(&mut wss, hs_feedback(1, INVALID_FULL_TRANSACTION_ID));
1107 1 : push_feedback(&mut wss, hs_feedback(1, 42));
1108 1 : push_feedback(&mut wss, hs_feedback(1, 64));
1109 1 : wss.update_reply_feedback();
1110 1 : assert_eq!(wss.agg_standby_feedback.hs_feedback.xmin, 42);
1111 1 : }
1112 : }
|