Line data Source code
1 : use std::time::Duration;
2 :
3 : use anyhow::Context;
4 : use bytes::Bytes;
5 : use futures::StreamExt;
6 : use pageserver_api::shard::ShardIdentity;
7 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
8 : use postgres_ffi::MAX_SEND_SIZE;
9 : use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
10 : use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
11 : use tokio::io::{AsyncRead, AsyncWrite};
12 : use tokio::time::MissedTickBehavior;
13 : use utils::lsn::Lsn;
14 : use utils::postgres_client::Compression;
15 : use utils::postgres_client::InterpretedFormat;
16 : use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
17 : use wal_decoder::wire_format::ToWireFormat;
18 :
19 : use crate::send_wal::EndWatchView;
20 : use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
21 :
22 : /// Shard-aware interpreted record sender.
23 : /// This is used for sending WAL to the pageserver. Said WAL
24 : /// is pre-interpreted and filtered for the shard.
25 : pub(crate) struct InterpretedWalSender<'a, IO> {
26 : pub(crate) format: InterpretedFormat,
27 : pub(crate) compression: Option<Compression>,
28 : pub(crate) pgb: &'a mut PostgresBackend<IO>,
29 : pub(crate) wal_stream_builder: WalReaderStreamBuilder,
30 : pub(crate) end_watch_view: EndWatchView,
31 : pub(crate) shard: ShardIdentity,
32 : pub(crate) pg_version: u32,
33 : pub(crate) appname: Option<String>,
34 : }
35 :
36 : struct Batch {
37 : wal_end_lsn: Lsn,
38 : commit_lsn: Lsn,
39 : records: InterpretedWalRecords,
40 : }
41 :
42 : struct SerializedBatch {
43 : wal_end_lsn: Lsn,
44 : commit_lsn: Lsn,
45 : buf: Bytes,
46 : }
47 :
48 : impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
49 : /// Send interpreted WAL to a receiver.
50 : /// Stops when an error occurs or the receiver is caught up and there's no active compute.
51 : ///
52 : /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
53 : /// convenience.
54 0 : pub(crate) async fn run(self) -> Result<(), CopyStreamHandlerEnd> {
55 0 : let mut wal_position = self.wal_stream_builder.start_pos();
56 0 : let mut wal_decoder =
57 0 : WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version);
58 :
59 0 : let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?;
60 0 : let mut stream = std::pin::pin!(stream);
61 0 :
62 0 : let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
63 0 : keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
64 0 : keepalive_ticker.reset();
65 0 :
66 0 : let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(2);
67 0 : let batches_stream =
68 0 : tokio_stream::wrappers::ReceiverStream::new(rx).then(|batch| async move {
69 0 : let buf: Result<Bytes, CopyStreamHandlerEnd> = batch
70 0 : .records
71 0 : .to_wire(self.format, self.compression)
72 0 : .await
73 0 : .with_context(|| "Failed to serialize interpreted WAL")
74 0 : .map_err(CopyStreamHandlerEnd::from);
75 0 :
76 0 : Result::<_, CopyStreamHandlerEnd>::Ok(SerializedBatch {
77 0 : wal_end_lsn: batch.wal_end_lsn,
78 0 : commit_lsn: batch.commit_lsn,
79 0 : buf: buf?,
80 : })
81 0 : });
82 0 : let mut batches_stream = std::pin::pin!(batches_stream);
83 :
84 : loop {
85 0 : tokio::select! {
86 : // Get some WAL from the stream and then: decode, interpret and push it down the
87 : // pipeline.
88 0 : wal = stream.next(), if tx.capacity() > 0 => {
89 0 : let WalBytes { wal, wal_start_lsn, wal_end_lsn, commit_lsn } = match wal {
90 0 : Some(some) => some?,
91 0 : None => { break; }
92 : };
93 :
94 0 : wal_position = wal_start_lsn;
95 0 : wal_decoder.feed_bytes(&wal);
96 0 :
97 0 : let mut records = Vec::new();
98 0 : let mut max_next_record_lsn = None;
99 0 : while let Some((next_record_lsn, recdata)) = wal_decoder
100 0 : .poll_decode()
101 0 : .with_context(|| "Failed to decode WAL")?
102 : {
103 0 : assert!(next_record_lsn.is_aligned());
104 0 : max_next_record_lsn = Some(next_record_lsn);
105 :
106 : // Deserialize and interpret WAL record
107 0 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
108 0 : recdata,
109 0 : &self.shard,
110 0 : next_record_lsn,
111 0 : self.pg_version,
112 0 : )
113 0 : .with_context(|| "Failed to interpret WAL")?;
114 :
115 0 : if !interpreted.is_empty() {
116 0 : records.push(interpreted);
117 0 : }
118 : }
119 :
120 0 : let batch = InterpretedWalRecords {
121 0 : records,
122 0 : next_record_lsn: max_next_record_lsn
123 0 : };
124 0 :
125 0 : tx.send(Batch {wal_end_lsn, commit_lsn, records: batch}).await.unwrap();
126 : },
127 : // For a previously interpreted batch, serialize it and push it down the wire.
128 0 : encoded_batch = batches_stream.next() => {
129 0 : let SerializedBatch {wal_end_lsn, commit_lsn, buf } = match encoded_batch {
130 0 : Some(ser_batch) => ser_batch?,
131 0 : None => { break; }
132 : };
133 :
134 : // Reset the keep alive ticker since we are sending something
135 : // over the wire now.
136 0 : keepalive_ticker.reset();
137 0 :
138 0 : self.pgb
139 0 : .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
140 0 : streaming_lsn: wal_end_lsn.0,
141 0 : commit_lsn: commit_lsn.0,
142 0 : data: &buf,
143 0 : })).await?;
144 : }
145 : // Send a periodic keep alive when the connection has been idle for a while.
146 0 : _ = keepalive_ticker.tick() => {
147 0 : self.pgb
148 0 : .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
149 0 : wal_end: self.end_watch_view.get().0,
150 0 : timestamp: get_current_timestamp(),
151 0 : request_reply: true,
152 0 : }))
153 0 : .await?;
154 : }
155 : }
156 : }
157 :
158 : // The loop above ends when the receiver is caught up and there's no more WAL to send.
159 0 : Err(CopyStreamHandlerEnd::ServerInitiated(format!(
160 0 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
161 0 : self.appname, wal_position,
162 0 : )))
163 0 : }
164 : }
|