Line data Source code
1 : use std::time::Duration;
2 :
3 : use anyhow::Context;
4 : use futures::StreamExt;
5 : use pageserver_api::shard::ShardIdentity;
6 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
7 : use postgres_ffi::MAX_SEND_SIZE;
8 : use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
9 : use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
10 : use tokio::io::{AsyncRead, AsyncWrite};
11 : use tokio::time::MissedTickBehavior;
12 : use utils::bin_ser::BeSer;
13 : use utils::lsn::Lsn;
14 : use wal_decoder::models::InterpretedWalRecord;
15 :
16 : use crate::send_wal::EndWatchView;
17 : use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
18 :
19 : /// Shard-aware interpreted record sender.
20 : /// This is used for sending WAL to the pageserver. Said WAL
21 : /// is pre-interpreted and filtered for the shard.
22 : pub(crate) struct InterpretedWalSender<'a, IO> {
23 : pub(crate) pgb: &'a mut PostgresBackend<IO>,
24 : pub(crate) wal_stream_builder: WalReaderStreamBuilder,
25 : pub(crate) end_watch_view: EndWatchView,
26 : pub(crate) shard: ShardIdentity,
27 : pub(crate) pg_version: u32,
28 : pub(crate) appname: Option<String>,
29 : }
30 :
31 : impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
32 : /// Send interpreted WAL to a receiver.
33 : /// Stops when an error occurs or the receiver is caught up and there's no active compute.
34 : ///
35 : /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
36 : /// convenience.
37 0 : pub(crate) async fn run(self) -> Result<(), CopyStreamHandlerEnd> {
38 0 : let mut wal_position = self.wal_stream_builder.start_pos();
39 0 : let mut wal_decoder =
40 0 : WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version);
41 :
42 0 : let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?;
43 0 : let mut stream = std::pin::pin!(stream);
44 0 :
45 0 : let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
46 0 : keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
47 0 : keepalive_ticker.reset();
48 :
49 : loop {
50 0 : tokio::select! {
51 : // Get some WAL from the stream and then: decode, interpret and send it
52 0 : wal = stream.next() => {
53 0 : let WalBytes { wal, wal_start_lsn: _, wal_end_lsn, available_wal_end_lsn } = match wal {
54 0 : Some(some) => some?,
55 0 : None => { break; }
56 : };
57 :
58 0 : wal_position = wal_end_lsn;
59 0 : wal_decoder.feed_bytes(&wal);
60 0 :
61 0 : let mut records = Vec::new();
62 0 : let mut max_next_record_lsn = None;
63 0 : while let Some((next_record_lsn, recdata)) = wal_decoder
64 0 : .poll_decode()
65 0 : .with_context(|| "Failed to decode WAL")?
66 : {
67 0 : assert!(next_record_lsn.is_aligned());
68 0 : max_next_record_lsn = Some(next_record_lsn);
69 :
70 : // Deserialize and interpret WAL record
71 0 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
72 0 : recdata,
73 0 : &self.shard,
74 0 : next_record_lsn,
75 0 : self.pg_version,
76 0 : )
77 0 : .with_context(|| "Failed to interpret WAL")?;
78 :
79 0 : if !interpreted.is_empty() {
80 0 : records.push(interpreted);
81 0 : }
82 : }
83 :
84 0 : let mut buf = Vec::new();
85 0 : records
86 0 : .ser_into(&mut buf)
87 0 : .with_context(|| "Failed to serialize interpreted WAL")?;
88 :
89 : // Reset the keep alive ticker since we are sending something
90 : // over the wire now.
91 0 : keepalive_ticker.reset();
92 0 :
93 0 : self.pgb
94 0 : .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
95 0 : streaming_lsn: wal_end_lsn.0,
96 0 : commit_lsn: available_wal_end_lsn.0,
97 0 : next_record_lsn: max_next_record_lsn.unwrap_or(Lsn::INVALID).0,
98 0 : data: buf.as_slice(),
99 0 : })).await?;
100 : }
101 :
102 : // Send a periodic keep alive when the connection has been idle for a while.
103 0 : _ = keepalive_ticker.tick() => {
104 0 : self.pgb
105 0 : .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
106 0 : wal_end: self.end_watch_view.get().0,
107 0 : timestamp: get_current_timestamp(),
108 0 : request_reply: true,
109 0 : }))
110 0 : .await?;
111 : }
112 : }
113 : }
114 :
115 : // The loop above ends when the receiver is caught up and there's no more WAL to send.
116 0 : Err(CopyStreamHandlerEnd::ServerInitiated(format!(
117 0 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
118 0 : self.appname, wal_position,
119 0 : )))
120 0 : }
121 : }
|