Line data Source code
1 : use std::pin::Pin;
2 : use std::task::{Context, Poll};
3 :
4 : use crate::send_wal::EndWatch;
5 : use crate::timeline::WalResidentTimeline;
6 : use crate::wal_storage::WalReader;
7 : use bytes::Bytes;
8 : use futures::stream::BoxStream;
9 : use futures::{Stream, StreamExt};
10 : use safekeeper_api::Term;
11 : use utils::id::TenantTimelineId;
12 : use utils::lsn::Lsn;
13 :
14 : #[derive(PartialEq, Eq, Debug)]
15 : pub(crate) struct WalBytes {
16 : /// Raw PG WAL
17 : pub(crate) wal: Bytes,
18 : /// Start LSN of [`Self::wal`]
19 : #[allow(dead_code)]
20 : pub(crate) wal_start_lsn: Lsn,
21 : /// End LSN of [`Self::wal`]
22 : pub(crate) wal_end_lsn: Lsn,
23 : /// End LSN of WAL available on the safekeeper.
24 : ///
25 : /// For pagservers this will be commit LSN,
26 : /// while for the compute it will be the flush LSN.
27 : pub(crate) available_wal_end_lsn: Lsn,
28 : }
29 :
30 : struct PositionedWalReader {
31 : start: Lsn,
32 : end: Lsn,
33 : reader: Option<WalReader>,
34 : }
35 :
36 : /// A streaming WAL reader wrapper which can be reset while running
37 : pub(crate) struct StreamingWalReader {
38 : stream: BoxStream<'static, WalOrReset>,
39 : start_changed_tx: tokio::sync::watch::Sender<Lsn>,
40 : // HADRON: Added TenantTimelineId for instrumentation purposes.
41 : pub(crate) ttid: TenantTimelineId,
42 : }
43 :
44 : pub(crate) enum WalOrReset {
45 : Wal(anyhow::Result<WalBytes>),
46 : Reset(Lsn),
47 : }
48 :
49 : impl WalOrReset {
50 76 : pub(crate) fn get_wal(self) -> Option<anyhow::Result<WalBytes>> {
51 76 : match self {
52 76 : WalOrReset::Wal(wal) => Some(wal),
53 0 : WalOrReset::Reset(_) => None,
54 : }
55 76 : }
56 : }
57 :
58 : impl StreamingWalReader {
59 5 : pub(crate) fn new(
60 5 : tli: WalResidentTimeline,
61 5 : term: Option<Term>,
62 5 : start: Lsn,
63 5 : end: Lsn,
64 5 : end_watch: EndWatch,
65 5 : buffer_size: usize,
66 5 : ) -> Self {
67 5 : let (start_changed_tx, start_changed_rx) = tokio::sync::watch::channel(start);
68 5 : let ttid = tli.ttid;
69 :
70 5 : let state = WalReaderStreamState {
71 5 : tli,
72 5 : wal_reader: PositionedWalReader {
73 5 : start,
74 5 : end,
75 5 : reader: None,
76 5 : },
77 5 : term,
78 5 : end_watch,
79 5 : buffer: vec![0; buffer_size],
80 5 : buffer_size,
81 5 : };
82 :
83 : // When a change notification is received while polling the internal
84 : // reader, stop polling the read future and service the change.
85 5 : let stream = futures::stream::unfold(
86 5 : (state, start_changed_rx),
87 84 : |(mut state, mut rx)| async move {
88 84 : let wal_or_reset = tokio::select! {
89 84 : read_res = state.read() => { WalOrReset::Wal(read_res) },
90 84 : changed_res = rx.changed() => {
91 4 : if changed_res.is_err() {
92 0 : return None;
93 4 : }
94 :
95 4 : let new_start_pos = rx.borrow_and_update();
96 4 : WalOrReset::Reset(*new_start_pos)
97 : }
98 : };
99 :
100 80 : if let WalOrReset::Reset(lsn) = wal_or_reset {
101 4 : state.wal_reader.start = lsn;
102 4 : state.wal_reader.reader = None;
103 76 : }
104 :
105 80 : Some((wal_or_reset, (state, rx)))
106 164 : },
107 : )
108 5 : .boxed();
109 :
110 5 : Self {
111 5 : stream,
112 5 : start_changed_tx,
113 5 : ttid,
114 5 : }
115 5 : }
116 :
117 : /// Reset the stream to a given position.
118 4 : pub(crate) async fn reset(&mut self, start: Lsn) {
119 4 : self.start_changed_tx.send(start).unwrap();
120 4 : while let Some(wal_or_reset) = self.stream.next().await {
121 4 : match wal_or_reset {
122 4 : WalOrReset::Reset(at) => {
123 : // Stream confirmed the reset.
124 : // There may only one ongoing reset at any given time,
125 : // hence the assertion.
126 4 : assert_eq!(at, start);
127 4 : break;
128 : }
129 0 : WalOrReset::Wal(_) => {
130 0 : // Ignore wal generated before reset was handled
131 0 : }
132 : }
133 : }
134 4 : }
135 : }
136 :
137 : impl Stream for StreamingWalReader {
138 : type Item = WalOrReset;
139 :
140 9430 : fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
141 9430 : Pin::new(&mut self.stream).poll_next(cx)
142 9430 : }
143 : }
144 :
145 : struct WalReaderStreamState {
146 : tli: WalResidentTimeline,
147 : wal_reader: PositionedWalReader,
148 : term: Option<Term>,
149 : end_watch: EndWatch,
150 : buffer: Vec<u8>,
151 : buffer_size: usize,
152 : }
153 :
154 : impl WalReaderStreamState {
155 84 : async fn read(&mut self) -> anyhow::Result<WalBytes> {
156 : // Create reader if needed
157 84 : if self.wal_reader.reader.is_none() {
158 9 : self.wal_reader.reader = Some(self.tli.get_walreader(self.wal_reader.start).await?);
159 75 : }
160 :
161 84 : let have_something_to_send = self.wal_reader.end > self.wal_reader.start;
162 84 : if !have_something_to_send {
163 5 : tracing::debug!(
164 0 : "Waiting for wal: start={}, end={}",
165 : self.wal_reader.end,
166 : self.wal_reader.start
167 : );
168 5 : self.wal_reader.end = self
169 5 : .end_watch
170 5 : .wait_for_lsn(self.wal_reader.start, self.term)
171 5 : .await?;
172 0 : tracing::debug!(
173 0 : "Done waiting for wal: start={}, end={}",
174 : self.wal_reader.end,
175 : self.wal_reader.start
176 : );
177 79 : }
178 :
179 79 : assert!(
180 79 : self.wal_reader.end > self.wal_reader.start,
181 0 : "nothing to send after waiting for WAL"
182 : );
183 :
184 : // Calculate chunk size
185 79 : let mut chunk_end_pos = self.wal_reader.start + self.buffer_size as u64;
186 79 : if chunk_end_pos >= self.wal_reader.end {
187 6 : chunk_end_pos = self.wal_reader.end;
188 73 : } else {
189 73 : chunk_end_pos = chunk_end_pos
190 73 : .checked_sub(chunk_end_pos.block_offset())
191 73 : .unwrap();
192 73 : }
193 :
194 79 : let send_size = (chunk_end_pos.0 - self.wal_reader.start.0) as usize;
195 79 : let buffer = &mut self.buffer[..send_size];
196 :
197 : // Read WAL
198 76 : let send_size = {
199 79 : let _term_guard = if let Some(t) = self.term {
200 0 : Some(self.tli.acquire_term(t).await?)
201 : } else {
202 79 : None
203 : };
204 79 : self.wal_reader
205 79 : .reader
206 79 : .as_mut()
207 79 : .unwrap()
208 79 : .read(buffer)
209 79 : .await?
210 : };
211 :
212 76 : let wal = Bytes::copy_from_slice(&buffer[..send_size]);
213 76 : let result = WalBytes {
214 76 : wal,
215 76 : wal_start_lsn: self.wal_reader.start,
216 76 : wal_end_lsn: self.wal_reader.start + send_size as u64,
217 76 : available_wal_end_lsn: self.wal_reader.end,
218 76 : };
219 :
220 76 : self.wal_reader.start += send_size as u64;
221 :
222 76 : Ok(result)
223 76 : }
224 : }
225 :
226 : #[cfg(test)]
227 : mod tests {
228 : use std::str::FromStr;
229 :
230 : use futures::StreamExt;
231 : use postgres_ffi::MAX_SEND_SIZE;
232 : use utils::id::{NodeId, TenantTimelineId};
233 : use utils::lsn::Lsn;
234 :
235 : use crate::test_utils::Env;
236 : use crate::wal_reader_stream::StreamingWalReader;
237 :
238 : #[tokio::test]
239 1 : async fn test_streaming_wal_reader_reset() {
240 1 : let _ = env_logger::builder().is_test(true).try_init();
241 :
242 : const SIZE: usize = 8 * 1024;
243 : const MSG_COUNT: usize = 200;
244 :
245 1 : let start_lsn = Lsn::from_str("0/149FD18").unwrap();
246 1 : let env = Env::new(true).unwrap();
247 1 : let tli = env
248 1 : .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
249 1 : .await
250 1 : .unwrap();
251 :
252 1 : let resident_tli = tli.wal_residence_guard().await.unwrap();
253 1 : let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, c"neon-file:", None)
254 1 : .await
255 1 : .unwrap();
256 1 : let end_pos = end_watch.get();
257 :
258 1 : tracing::info!("Doing first round of reads ...");
259 :
260 1 : let mut streaming_wal_reader = StreamingWalReader::new(
261 1 : resident_tli,
262 1 : None,
263 1 : start_lsn,
264 1 : end_pos,
265 1 : end_watch,
266 : MAX_SEND_SIZE,
267 : );
268 :
269 1 : let mut before_reset = Vec::new();
270 13 : while let Some(wor) = streaming_wal_reader.next().await {
271 13 : let wal = wor.get_wal().unwrap().unwrap();
272 13 : let stop = wal.available_wal_end_lsn == wal.wal_end_lsn;
273 13 : before_reset.push(wal);
274 :
275 13 : if stop {
276 1 : break;
277 12 : }
278 : }
279 :
280 1 : tracing::info!("Resetting the WAL stream ...");
281 :
282 1 : streaming_wal_reader.reset(start_lsn).await;
283 :
284 1 : tracing::info!("Doing second round of reads ...");
285 :
286 1 : let mut after_reset = Vec::new();
287 13 : while let Some(wor) = streaming_wal_reader.next().await {
288 13 : let wal = wor.get_wal().unwrap().unwrap();
289 13 : let stop = wal.available_wal_end_lsn == wal.wal_end_lsn;
290 13 : after_reset.push(wal);
291 1 :
292 13 : if stop {
293 1 : break;
294 12 : }
295 1 : }
296 1 :
297 1 : assert_eq!(before_reset, after_reset);
298 1 : }
299 : }
|