Line data Source code
1 : use std::ops::ControlFlow;
2 : use std::sync::Arc;
3 :
4 : use once_cell::sync::Lazy;
5 : use tokio_util::sync::CancellationToken;
6 : use tracing::{Instrument, info, info_span, warn};
7 : use utils::sync::duplex;
8 :
9 : use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
10 : use crate::context::RequestContext;
11 : use crate::virtual_file::MaybeFatalIo;
12 : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned;
13 : use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
14 :
15 : /// A handle to the flush task.
16 : pub struct FlushHandle<Buf, W> {
17 : inner: Option<FlushHandleInner<Buf, W>>,
18 : /// Immutable buffer for serving tail reads.
19 : /// `None` if no flush request has been submitted.
20 : pub(super) maybe_flushed: Option<FullSlice<Buf>>,
21 : }
22 :
23 : pub struct FlushHandleInner<Buf, W> {
24 : /// A bi-directional channel that sends (buffer, offset) for writes,
25 : /// and receives recyled buffer.
26 : channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
27 : /// Join handle for the background flush task.
28 : join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
29 : }
30 :
31 : struct FlushRequest<Buf> {
32 : slice: FullSlice<Buf>,
33 : offset: u64,
34 : #[cfg(test)]
35 : ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
36 : #[cfg(test)]
37 : done_flush_tx: tokio::sync::oneshot::Sender<()>,
38 : }
39 :
40 : /// Constructs a request and a control object for a new flush operation.
41 : #[cfg(not(test))]
42 0 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
43 0 : let request = FlushRequest { slice, offset };
44 0 : let control = FlushControl::untracked();
45 0 :
46 0 : (request, control)
47 0 : }
48 :
49 : /// Constructs a request and a control object for a new flush operation.
50 : #[cfg(test)]
51 13246 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
52 13246 : let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
53 13246 : let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
54 13246 : let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
55 13246 :
56 13246 : let request = FlushRequest {
57 13246 : slice,
58 13246 : offset,
59 13246 : ready_to_flush_rx,
60 13246 : done_flush_tx,
61 13246 : };
62 13246 : (request, control)
63 13246 : }
64 :
65 : /// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
66 : #[cfg(test)]
67 : pub(crate) struct FlushControl {
68 : not_started: FlushNotStarted,
69 : }
70 :
71 : #[cfg(not(test))]
72 : pub(crate) struct FlushControl;
73 :
74 : impl FlushControl {
75 : #[cfg(test)]
76 13246 : fn not_started(
77 13246 : ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
78 13246 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
79 13246 : ) -> Self {
80 13246 : FlushControl {
81 13246 : not_started: FlushNotStarted {
82 13246 : ready_to_flush_tx,
83 13246 : done_flush_rx,
84 13246 : },
85 13246 : }
86 13246 : }
87 :
88 : #[cfg(not(test))]
89 0 : fn untracked() -> Self {
90 0 : FlushControl
91 0 : }
92 :
93 : /// In tests, turn flush control into a not started state.
94 : #[cfg(test)]
95 4 : pub(crate) fn into_not_started(self) -> FlushNotStarted {
96 4 : self.not_started
97 4 : }
98 :
99 : /// Release control to the submitted buffer.
100 : ///
101 : /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
102 13224 : pub async fn release(self) {
103 13224 : #[cfg(test)]
104 13224 : {
105 13224 : self.not_started
106 13224 : .ready_to_flush()
107 13224 : .wait_until_flush_is_done()
108 13224 : .await;
109 0 : }
110 13224 : }
111 : }
112 :
113 : impl<Buf, W> FlushHandle<Buf, W>
114 : where
115 : Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
116 : W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
117 : {
118 : /// Spawns a new background flush task and obtains a handle.
119 : ///
120 : /// Note: The background task so we do not need to explicitly maintain a queue of buffers.
121 2650 : pub fn spawn_new<B>(
122 2650 : file: Arc<W>,
123 2650 : buf: B,
124 2650 : gate_guard: utils::sync::gate::GateGuard,
125 2650 : ctx: RequestContext,
126 2650 : span: tracing::Span,
127 2650 : ) -> Self
128 2650 : where
129 2650 : B: Buffer<IoBuf = Buf> + Send + 'static,
130 2650 : {
131 2650 : // It is fine to buffer up to only 1 message. We only 1 message in-flight at a time.
132 2650 : let (front, back) = duplex::mpsc::channel(1);
133 2650 :
134 2650 : let join_handle = tokio::spawn(
135 2650 : async move {
136 2628 : FlushBackgroundTask::new(back, file, gate_guard, ctx)
137 2628 : .run(buf.flush())
138 2628 : .await
139 2650 : }
140 2650 : .instrument(span),
141 2650 : );
142 2650 :
143 2650 : FlushHandle {
144 2650 : inner: Some(FlushHandleInner {
145 2650 : channel: front,
146 2650 : join_handle,
147 2650 : }),
148 2650 : maybe_flushed: None,
149 2650 : }
150 2650 : }
151 :
152 : /// Submits a buffer to be flushed in the background task.
153 : /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
154 : /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
155 : /// clear `maybe_flushed`.
156 13246 : pub async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)>
157 13246 : where
158 13246 : B: Buffer<IoBuf = Buf> + Send + 'static,
159 13246 : {
160 13246 : let slice = buf.flush();
161 13246 :
162 13246 : // Saves a buffer for read while flushing. This also removes reference to the old buffer.
163 13246 : self.maybe_flushed = Some(slice.cheap_clone());
164 13246 :
165 13246 : let (request, flush_control) = new_flush_op(slice, offset);
166 :
167 : // Submits the buffer to the background task.
168 13246 : let submit = self.inner_mut().channel.send(request).await;
169 13246 : if submit.is_err() {
170 0 : return self.handle_error().await;
171 13246 : }
172 :
173 : // Wait for an available buffer from the background flush task.
174 : // This is the BACKPRESSURE mechanism: if the flush task can't keep up,
175 : // then the write path will eventually wait for it here.
176 13246 : let Some(recycled) = self.inner_mut().channel.recv().await else {
177 0 : return self.handle_error().await;
178 : };
179 :
180 : // The only other place that could hold a reference to the recycled buffer
181 : // is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
182 13246 : let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
183 13246 : Ok((recycled, flush_control))
184 13246 : }
185 :
186 0 : async fn handle_error<T>(&mut self) -> std::io::Result<T> {
187 0 : Err(self
188 0 : .shutdown()
189 0 : .await
190 0 : .expect_err("flush task only disconnects duplex if it exits with an error"))
191 0 : }
192 :
193 : /// Cleans up the channel, join the flush task.
194 18 : pub async fn shutdown(&mut self) -> std::io::Result<Arc<W>> {
195 18 : let handle = self
196 18 : .inner
197 18 : .take()
198 18 : .expect("must not use after we returned an error");
199 18 : drop(handle.channel.tx);
200 18 : handle.join_handle.await.unwrap()
201 18 : }
202 :
203 : /// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
204 : /// This only happens if the handle is used after an error.
205 26492 : fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
206 26492 : self.inner
207 26492 : .as_mut()
208 26492 : .expect("must not use after we returned an error")
209 26492 : }
210 : }
211 :
212 : /// A background task for flushing data to disk.
213 : pub struct FlushBackgroundTask<Buf, W> {
214 : /// A bi-directional channel that receives (buffer, offset) for writes,
215 : /// and send back recycled buffer.
216 : channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
217 : /// A writter for persisting data to disk.
218 : writer: Arc<W>,
219 : ctx: RequestContext,
220 : /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
221 : _gate_guard: utils::sync::gate::GateGuard,
222 : }
223 :
224 : impl<Buf, W> FlushBackgroundTask<Buf, W>
225 : where
226 : Buf: IoBufAligned + Send + Sync,
227 : W: OwnedAsyncWriter + Sync + 'static,
228 : {
229 : /// Creates a new background flush task.
230 2628 : fn new(
231 2628 : channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
232 2628 : file: Arc<W>,
233 2628 : gate_guard: utils::sync::gate::GateGuard,
234 2628 : ctx: RequestContext,
235 2628 : ) -> Self {
236 2628 : FlushBackgroundTask {
237 2628 : channel,
238 2628 : writer: file,
239 2628 : _gate_guard: gate_guard,
240 2628 : ctx,
241 2628 : }
242 2628 : }
243 :
244 : /// Runs the background flush task.
245 : /// The passed in slice is immediately sent back to the flush handle through the duplex channel.
246 2628 : async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
247 2628 : // Sends the extra buffer back to the handle.
248 2628 : // TODO: can this ever await and or fail? I think not.
249 2628 : self.channel.send(slice).await.map_err(|_| {
250 0 : std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
251 2628 : })?;
252 :
253 : // Exit condition: channel is closed and there is no remaining buffer to be flushed
254 15874 : while let Some(request) = self.channel.recv().await {
255 : #[cfg(test)]
256 : {
257 : // In test, wait for control to signal that we are ready to flush.
258 13246 : if request.ready_to_flush_rx.await.is_err() {
259 18 : tracing::debug!("control dropped");
260 13228 : }
261 : }
262 :
263 : // Write slice to disk at `offset`.
264 : //
265 : // Error handling happens according to the current policy of crashing
266 : // on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
267 : // (The upper layers of the Pageserver write path are not equipped to retry write errors
268 : // becasuse they often deallocate the buffers that were already written).
269 : //
270 : // TODO: cancellation sensitiity.
271 : // Without it, if we hit a bug where retrying is never successful,
272 : // then we can't shut down the timeline/tenant/pageserver cleanly because
273 : // layers of the Pageserver write path are holding the gate open for EphemeralFile.
274 : //
275 : // TODO: use utils::backoff::retry once async closures are actually usable
276 : //
277 13246 : let mut slice_storage = Some(request.slice);
278 13246 : for attempt in 1.. {
279 13246 : let result = async {
280 13246 : if attempt > 1 {
281 0 : info!("retrying flush");
282 13246 : }
283 13246 : let slice = slice_storage.take().expect(
284 13246 : "likely previous invocation of this future didn't get polled to completion",
285 13246 : );
286 13246 : let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await;
287 13246 : slice_storage = Some(slice);
288 13246 : let res = res.maybe_fatal_err("owned_buffers_io flush");
289 13246 : let Err(err) = res else {
290 13246 : return ControlFlow::Break(());
291 : };
292 0 : warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
293 : static NO_CANCELLATION: Lazy<CancellationToken> = Lazy::new(CancellationToken::new);
294 0 : utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &NO_CANCELLATION).await;
295 0 : ControlFlow::Continue(())
296 13246 : }
297 13246 : .instrument(info_span!("flush_attempt", %attempt))
298 13246 : .await;
299 13246 : match result {
300 13246 : ControlFlow::Break(()) => break,
301 0 : ControlFlow::Continue(()) => continue,
302 : }
303 : }
304 13246 : let slice = slice_storage.expect("loop must have run at least once");
305 13246 :
306 13246 : #[cfg(test)]
307 13246 : {
308 13246 : // In test, tell control we are done flushing buffer.
309 13246 : if request.done_flush_tx.send(()).is_err() {
310 18 : tracing::debug!("control dropped");
311 13228 : }
312 0 : }
313 0 :
314 0 : // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
315 13246 : if self.channel.send(slice).await.is_err() {
316 : // Although channel is closed. Still need to finish flushing the remaining buffers.
317 0 : continue;
318 13246 : }
319 : }
320 :
321 2327 : Ok(self.writer)
322 2327 : }
323 : }
324 :
325 : #[cfg(test)]
326 : pub(crate) struct FlushNotStarted {
327 : ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
328 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
329 : }
330 :
331 : #[cfg(test)]
332 : pub(crate) struct FlushInProgress {
333 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
334 : }
335 :
336 : #[cfg(test)]
337 : pub(crate) struct FlushDone;
338 :
339 : #[cfg(test)]
340 : impl FlushNotStarted {
341 : /// Signals the background task the buffer is ready to flush to disk.
342 13228 : pub fn ready_to_flush(self) -> FlushInProgress {
343 13228 : self.ready_to_flush_tx
344 13228 : .send(())
345 13228 : .map(|_| FlushInProgress {
346 13228 : done_flush_rx: self.done_flush_rx,
347 13228 : })
348 13228 : .unwrap()
349 13228 : }
350 : }
351 :
352 : #[cfg(test)]
353 : impl FlushInProgress {
354 : /// Waits until background flush is done.
355 13228 : pub async fn wait_until_flush_is_done(self) -> FlushDone {
356 13228 : self.done_flush_rx.await.unwrap();
357 13228 : FlushDone
358 13228 : }
359 : }
|