Line data Source code
1 : use std::pin::Pin;
2 : use std::task::{Context, Poll};
3 :
4 : use bytes::Bytes;
5 : use futures::stream::BoxStream;
6 : use futures::{Stream, StreamExt};
7 : use safekeeper_api::Term;
8 : use utils::lsn::Lsn;
9 :
10 : use crate::send_wal::EndWatch;
11 : use crate::timeline::WalResidentTimeline;
12 : use crate::wal_storage::WalReader;
13 :
14 : #[derive(PartialEq, Eq, Debug)]
15 : pub(crate) struct WalBytes {
16 : /// Raw PG WAL
17 : pub(crate) wal: Bytes,
18 : /// Start LSN of [`Self::wal`]
19 : #[allow(dead_code)]
20 : pub(crate) wal_start_lsn: Lsn,
21 : /// End LSN of [`Self::wal`]
22 : pub(crate) wal_end_lsn: Lsn,
23 : /// End LSN of WAL available on the safekeeper.
24 : ///
25 : /// For pagservers this will be commit LSN,
26 : /// while for the compute it will be the flush LSN.
27 : pub(crate) available_wal_end_lsn: Lsn,
28 : }
29 :
30 : struct PositionedWalReader {
31 : start: Lsn,
32 : end: Lsn,
33 : reader: Option<WalReader>,
34 : }
35 :
36 : /// A streaming WAL reader wrapper which can be reset while running
37 : pub(crate) struct StreamingWalReader {
38 : stream: BoxStream<'static, WalOrReset>,
39 : start_changed_tx: tokio::sync::watch::Sender<Lsn>,
40 : }
41 :
42 : pub(crate) enum WalOrReset {
43 : Wal(anyhow::Result<WalBytes>),
44 : Reset(Lsn),
45 : }
46 :
47 : impl WalOrReset {
48 76 : pub(crate) fn get_wal(self) -> Option<anyhow::Result<WalBytes>> {
49 76 : match self {
50 76 : WalOrReset::Wal(wal) => Some(wal),
51 0 : WalOrReset::Reset(_) => None,
52 : }
53 76 : }
54 : }
55 :
56 : impl StreamingWalReader {
57 5 : pub(crate) fn new(
58 5 : tli: WalResidentTimeline,
59 5 : term: Option<Term>,
60 5 : start: Lsn,
61 5 : end: Lsn,
62 5 : end_watch: EndWatch,
63 5 : buffer_size: usize,
64 5 : ) -> Self {
65 5 : let (start_changed_tx, start_changed_rx) = tokio::sync::watch::channel(start);
66 5 :
67 5 : let state = WalReaderStreamState {
68 5 : tli,
69 5 : wal_reader: PositionedWalReader {
70 5 : start,
71 5 : end,
72 5 : reader: None,
73 5 : },
74 5 : term,
75 5 : end_watch,
76 5 : buffer: vec![0; buffer_size],
77 5 : buffer_size,
78 5 : };
79 5 :
80 5 : // When a change notification is received while polling the internal
81 5 : // reader, stop polling the read future and service the change.
82 5 : let stream = futures::stream::unfold(
83 5 : (state, start_changed_rx),
84 84 : |(mut state, mut rx)| async move {
85 84 : let wal_or_reset = tokio::select! {
86 84 : read_res = state.read() => { WalOrReset::Wal(read_res) },
87 84 : changed_res = rx.changed() => {
88 4 : if changed_res.is_err() {
89 0 : return None;
90 4 : }
91 4 :
92 4 : let new_start_pos = rx.borrow_and_update();
93 4 : WalOrReset::Reset(*new_start_pos)
94 : }
95 : };
96 :
97 80 : if let WalOrReset::Reset(lsn) = wal_or_reset {
98 4 : state.wal_reader.start = lsn;
99 4 : state.wal_reader.reader = None;
100 76 : }
101 :
102 80 : Some((wal_or_reset, (state, rx)))
103 164 : },
104 5 : )
105 5 : .boxed();
106 5 :
107 5 : Self {
108 5 : stream,
109 5 : start_changed_tx,
110 5 : }
111 5 : }
112 :
113 : /// Reset the stream to a given position.
114 4 : pub(crate) async fn reset(&mut self, start: Lsn) {
115 4 : self.start_changed_tx.send(start).unwrap();
116 4 : while let Some(wal_or_reset) = self.stream.next().await {
117 4 : match wal_or_reset {
118 4 : WalOrReset::Reset(at) => {
119 4 : // Stream confirmed the reset.
120 4 : // There may only one ongoing reset at any given time,
121 4 : // hence the assertion.
122 4 : assert_eq!(at, start);
123 4 : break;
124 : }
125 0 : WalOrReset::Wal(_) => {
126 0 : // Ignore wal generated before reset was handled
127 0 : }
128 : }
129 : }
130 4 : }
131 : }
132 :
133 : impl Stream for StreamingWalReader {
134 : type Item = WalOrReset;
135 :
136 365 : fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
137 365 : Pin::new(&mut self.stream).poll_next(cx)
138 365 : }
139 : }
140 :
141 : struct WalReaderStreamState {
142 : tli: WalResidentTimeline,
143 : wal_reader: PositionedWalReader,
144 : term: Option<Term>,
145 : end_watch: EndWatch,
146 : buffer: Vec<u8>,
147 : buffer_size: usize,
148 : }
149 :
150 : impl WalReaderStreamState {
151 84 : async fn read(&mut self) -> anyhow::Result<WalBytes> {
152 83 : // Create reader if needed
153 83 : if self.wal_reader.reader.is_none() {
154 9 : self.wal_reader.reader = Some(self.tli.get_walreader(self.wal_reader.start).await?);
155 74 : }
156 :
157 83 : let have_something_to_send = self.wal_reader.end > self.wal_reader.start;
158 83 : if !have_something_to_send {
159 4 : tracing::debug!(
160 0 : "Waiting for wal: start={}, end={}",
161 : self.wal_reader.end,
162 : self.wal_reader.start
163 : );
164 4 : self.wal_reader.end = self
165 4 : .end_watch
166 4 : .wait_for_lsn(self.wal_reader.start, self.term)
167 4 : .await?;
168 0 : tracing::debug!(
169 0 : "Done waiting for wal: start={}, end={}",
170 : self.wal_reader.end,
171 : self.wal_reader.start
172 : );
173 79 : }
174 :
175 79 : assert!(
176 79 : self.wal_reader.end > self.wal_reader.start,
177 0 : "nothing to send after waiting for WAL"
178 : );
179 :
180 : // Calculate chunk size
181 79 : let mut chunk_end_pos = self.wal_reader.start + self.buffer_size as u64;
182 79 : if chunk_end_pos >= self.wal_reader.end {
183 6 : chunk_end_pos = self.wal_reader.end;
184 73 : } else {
185 73 : chunk_end_pos = chunk_end_pos
186 73 : .checked_sub(chunk_end_pos.block_offset())
187 73 : .unwrap();
188 73 : }
189 :
190 79 : let send_size = (chunk_end_pos.0 - self.wal_reader.start.0) as usize;
191 79 : let buffer = &mut self.buffer[..send_size];
192 :
193 : // Read WAL
194 76 : let send_size = {
195 79 : let _term_guard = if let Some(t) = self.term {
196 0 : Some(self.tli.acquire_term(t).await?)
197 : } else {
198 79 : None
199 : };
200 79 : self.wal_reader
201 79 : .reader
202 79 : .as_mut()
203 79 : .unwrap()
204 79 : .read(buffer)
205 79 : .await?
206 : };
207 :
208 76 : let wal = Bytes::copy_from_slice(&buffer[..send_size]);
209 76 : let result = WalBytes {
210 76 : wal,
211 76 : wal_start_lsn: self.wal_reader.start,
212 76 : wal_end_lsn: self.wal_reader.start + send_size as u64,
213 76 : available_wal_end_lsn: self.wal_reader.end,
214 76 : };
215 76 :
216 76 : self.wal_reader.start += send_size as u64;
217 76 :
218 76 : Ok(result)
219 76 : }
220 : }
221 :
222 : #[cfg(test)]
223 : mod tests {
224 : use std::str::FromStr;
225 :
226 : use futures::StreamExt;
227 : use postgres_ffi::MAX_SEND_SIZE;
228 : use utils::id::{NodeId, TenantTimelineId};
229 : use utils::lsn::Lsn;
230 :
231 : use crate::test_utils::Env;
232 : use crate::wal_reader_stream::StreamingWalReader;
233 :
234 : #[tokio::test]
235 1 : async fn test_streaming_wal_reader_reset() {
236 1 : let _ = env_logger::builder().is_test(true).try_init();
237 1 :
238 1 : const SIZE: usize = 8 * 1024;
239 1 : const MSG_COUNT: usize = 200;
240 1 :
241 1 : let start_lsn = Lsn::from_str("0/149FD18").unwrap();
242 1 : let env = Env::new(true).unwrap();
243 1 : let tli = env
244 1 : .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
245 1 : .await
246 1 : .unwrap();
247 1 :
248 1 : let resident_tli = tli.wal_residence_guard().await.unwrap();
249 1 : let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, c"neon-file:", None)
250 1 : .await
251 1 : .unwrap();
252 1 : let end_pos = end_watch.get();
253 1 :
254 1 : tracing::info!("Doing first round of reads ...");
255 1 :
256 1 : let mut streaming_wal_reader = StreamingWalReader::new(
257 1 : resident_tli,
258 1 : None,
259 1 : start_lsn,
260 1 : end_pos,
261 1 : end_watch,
262 1 : MAX_SEND_SIZE,
263 1 : );
264 1 :
265 1 : let mut before_reset = Vec::new();
266 13 : while let Some(wor) = streaming_wal_reader.next().await {
267 13 : let wal = wor.get_wal().unwrap().unwrap();
268 13 : let stop = wal.available_wal_end_lsn == wal.wal_end_lsn;
269 13 : before_reset.push(wal);
270 13 :
271 13 : if stop {
272 1 : break;
273 12 : }
274 1 : }
275 1 :
276 1 : tracing::info!("Resetting the WAL stream ...");
277 1 :
278 1 : streaming_wal_reader.reset(start_lsn).await;
279 1 :
280 1 : tracing::info!("Doing second round of reads ...");
281 1 :
282 1 : let mut after_reset = Vec::new();
283 13 : while let Some(wor) = streaming_wal_reader.next().await {
284 13 : let wal = wor.get_wal().unwrap().unwrap();
285 13 : let stop = wal.available_wal_end_lsn == wal.wal_end_lsn;
286 13 : after_reset.push(wal);
287 13 :
288 13 : if stop {
289 1 : break;
290 12 : }
291 1 : }
292 1 :
293 1 : assert_eq!(before_reset, after_reset);
294 1 : }
295 : }
|