Line data Source code
1 : use std::time::Duration;
2 :
3 : use anyhow::Context;
4 : use futures::StreamExt;
5 : use pageserver_api::shard::ShardIdentity;
6 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
7 : use postgres_ffi::MAX_SEND_SIZE;
8 : use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
9 : use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
10 : use tokio::io::{AsyncRead, AsyncWrite};
11 : use tokio::time::MissedTickBehavior;
12 : use utils::lsn::Lsn;
13 : use utils::postgres_client::Compression;
14 : use utils::postgres_client::InterpretedFormat;
15 : use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
16 : use wal_decoder::wire_format::ToWireFormat;
17 :
18 : use crate::send_wal::EndWatchView;
19 : use crate::wal_reader_stream::{WalBytes, WalReaderStreamBuilder};
20 :
21 : /// Shard-aware interpreted record sender.
22 : /// This is used for sending WAL to the pageserver. Said WAL
23 : /// is pre-interpreted and filtered for the shard.
24 : pub(crate) struct InterpretedWalSender<'a, IO> {
25 : pub(crate) format: InterpretedFormat,
26 : pub(crate) compression: Option<Compression>,
27 : pub(crate) pgb: &'a mut PostgresBackend<IO>,
28 : pub(crate) wal_stream_builder: WalReaderStreamBuilder,
29 : pub(crate) end_watch_view: EndWatchView,
30 : pub(crate) shard: ShardIdentity,
31 : pub(crate) pg_version: u32,
32 : pub(crate) appname: Option<String>,
33 : }
34 :
35 : struct Batch {
36 : wal_end_lsn: Lsn,
37 : available_wal_end_lsn: Lsn,
38 : records: InterpretedWalRecords,
39 : }
40 :
41 : impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
42 : /// Send interpreted WAL to a receiver.
43 : /// Stops when an error occurs or the receiver is caught up and there's no active compute.
44 : ///
45 : /// Err(CopyStreamHandlerEnd) is always returned; Result is used only for ?
46 : /// convenience.
47 0 : pub(crate) async fn run(self) -> Result<(), CopyStreamHandlerEnd> {
48 0 : let mut wal_position = self.wal_stream_builder.start_pos();
49 0 : let mut wal_decoder =
50 0 : WalStreamDecoder::new(self.wal_stream_builder.start_pos(), self.pg_version);
51 :
52 0 : let stream = self.wal_stream_builder.build(MAX_SEND_SIZE).await?;
53 0 : let mut stream = std::pin::pin!(stream);
54 0 :
55 0 : let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
56 0 : keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
57 0 : keepalive_ticker.reset();
58 0 :
59 0 : let (tx, mut rx) = tokio::sync::mpsc::channel::<Batch>(2);
60 :
61 : loop {
62 0 : tokio::select! {
63 : // Get some WAL from the stream and then: decode, interpret and push it down the
64 : // pipeline.
65 0 : wal = stream.next(), if tx.capacity() > 0 => {
66 0 : let WalBytes { wal, wal_start_lsn: _, wal_end_lsn, available_wal_end_lsn } = match wal {
67 0 : Some(some) => some?,
68 0 : None => { break; }
69 : };
70 :
71 0 : wal_position = wal_end_lsn;
72 0 : wal_decoder.feed_bytes(&wal);
73 0 :
74 0 : let mut records = Vec::new();
75 0 : let mut max_next_record_lsn = None;
76 0 : while let Some((next_record_lsn, recdata)) = wal_decoder
77 0 : .poll_decode()
78 0 : .with_context(|| "Failed to decode WAL")?
79 : {
80 0 : assert!(next_record_lsn.is_aligned());
81 0 : max_next_record_lsn = Some(next_record_lsn);
82 :
83 : // Deserialize and interpret WAL record
84 0 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
85 0 : recdata,
86 0 : &self.shard,
87 0 : next_record_lsn,
88 0 : self.pg_version,
89 0 : )
90 0 : .with_context(|| "Failed to interpret WAL")?;
91 :
92 0 : if !interpreted.is_empty() {
93 0 : records.push(interpreted);
94 0 : }
95 : }
96 :
97 0 : let max_next_record_lsn = match max_next_record_lsn {
98 0 : Some(lsn) => lsn,
99 0 : None => { continue; }
100 : };
101 :
102 0 : let batch = InterpretedWalRecords {
103 0 : records,
104 0 : next_record_lsn: Some(max_next_record_lsn),
105 0 : };
106 0 :
107 0 : tx.send(Batch {wal_end_lsn, available_wal_end_lsn, records: batch}).await.unwrap();
108 : },
109 : // For a previously interpreted batch, serialize it and push it down the wire.
110 0 : batch = rx.recv() => {
111 0 : let batch = match batch {
112 0 : Some(b) => b,
113 0 : None => { break; }
114 : };
115 :
116 0 : let buf = batch
117 0 : .records
118 0 : .to_wire(self.format, self.compression)
119 0 : .await
120 0 : .with_context(|| "Failed to serialize interpreted WAL")
121 0 : .map_err(CopyStreamHandlerEnd::from)?;
122 :
123 : // Reset the keep alive ticker since we are sending something
124 : // over the wire now.
125 0 : keepalive_ticker.reset();
126 0 :
127 0 : self.pgb
128 0 : .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
129 0 : streaming_lsn: batch.wal_end_lsn.0,
130 0 : commit_lsn: batch.available_wal_end_lsn.0,
131 0 : data: &buf,
132 0 : })).await?;
133 : }
134 : // Send a periodic keep alive when the connection has been idle for a while.
135 0 : _ = keepalive_ticker.tick() => {
136 0 : self.pgb
137 0 : .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
138 0 : wal_end: self.end_watch_view.get().0,
139 0 : timestamp: get_current_timestamp(),
140 0 : request_reply: true,
141 0 : }))
142 0 : .await?;
143 : }
144 : }
145 : }
146 :
147 : // The loop above ends when the receiver is caught up and there's no more WAL to send.
148 0 : Err(CopyStreamHandlerEnd::ServerInitiated(format!(
149 0 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
150 0 : self.appname, wal_position,
151 0 : )))
152 0 : }
153 : }
|