Line data Source code
1 : use std::ops::ControlFlow;
2 : use std::sync::Arc;
3 :
4 : use tokio_util::sync::CancellationToken;
5 : use tracing::{Instrument, info, info_span, warn};
6 : use utils::sync::duplex;
7 :
8 : use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
9 : use crate::context::RequestContext;
10 : use crate::virtual_file::MaybeFatalIo;
11 : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned;
12 : use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
13 :
14 : /// A handle to the flush task.
15 : pub struct FlushHandle<Buf, W> {
16 : inner: Option<FlushHandleInner<Buf, W>>,
17 : }
18 :
19 : pub struct FlushHandleInner<Buf, W> {
20 : /// A bi-directional channel that sends (buffer, offset) for writes,
21 : /// and receives recyled buffer.
22 : channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
23 : /// Join handle for the background flush task.
24 : join_handle: tokio::task::JoinHandle<Result<Arc<W>, FlushTaskError>>,
25 : }
26 :
27 : struct FlushRequest<Buf> {
28 : slice: FullSlice<Buf>,
29 : offset: u64,
30 : #[cfg(test)]
31 : ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
32 : #[cfg(test)]
33 : done_flush_tx: tokio::sync::oneshot::Sender<()>,
34 : }
35 :
36 : /// Constructs a request and a control object for a new flush operation.
37 : #[cfg(not(test))]
38 0 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
39 0 : let request = FlushRequest { slice, offset };
40 0 : let control = FlushControl::untracked();
41 0 :
42 0 : (request, control)
43 0 : }
44 :
45 : /// Constructs a request and a control object for a new flush operation.
46 : #[cfg(test)]
47 13246 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
48 13246 : let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
49 13246 : let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
50 13246 : let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
51 13246 :
52 13246 : let request = FlushRequest {
53 13246 : slice,
54 13246 : offset,
55 13246 : ready_to_flush_rx,
56 13246 : done_flush_tx,
57 13246 : };
58 13246 : (request, control)
59 13246 : }
60 :
61 : /// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
62 : #[cfg(test)]
63 : pub(crate) struct FlushControl {
64 : not_started: FlushNotStarted,
65 : }
66 :
67 : #[cfg(not(test))]
68 : pub(crate) struct FlushControl;
69 :
70 : impl FlushControl {
71 : #[cfg(test)]
72 13246 : fn not_started(
73 13246 : ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
74 13246 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
75 13246 : ) -> Self {
76 13246 : FlushControl {
77 13246 : not_started: FlushNotStarted {
78 13246 : ready_to_flush_tx,
79 13246 : done_flush_rx,
80 13246 : },
81 13246 : }
82 13246 : }
83 :
84 : #[cfg(not(test))]
85 0 : fn untracked() -> Self {
86 0 : FlushControl
87 0 : }
88 :
89 : /// In tests, turn flush control into a not started state.
90 : #[cfg(test)]
91 4 : pub(crate) fn into_not_started(self) -> FlushNotStarted {
92 4 : self.not_started
93 4 : }
94 :
95 : /// Release control to the submitted buffer.
96 : ///
97 : /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
98 13224 : pub async fn release(self) {
99 13224 : #[cfg(test)]
100 13224 : {
101 13224 : self.not_started
102 13224 : .ready_to_flush()
103 13224 : .wait_until_flush_is_done()
104 13224 : .await;
105 0 : }
106 13224 : }
107 : }
108 :
109 : impl<Buf, W> FlushHandle<Buf, W>
110 : where
111 : Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
112 : W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
113 : {
114 : /// Spawns a new background flush task and obtains a handle.
115 : ///
116 : /// Handle and background task are connected through a duplex channel.
117 : /// Dirty buffers are sent to the background task for flushing.
118 : /// Clean buffers are sent back to the handle for reuse.
119 : ///
120 : /// The queue depth is 1, and the passed-in `buf` seeds the queue depth.
121 : /// I.e., the passed-in buf is immediately available to the handle as a recycled buffer.
122 2650 : pub fn spawn_new<B>(
123 2650 : file: Arc<W>,
124 2650 : buf: B,
125 2650 : gate_guard: utils::sync::gate::GateGuard,
126 2650 : cancel: CancellationToken,
127 2650 : ctx: RequestContext,
128 2650 : span: tracing::Span,
129 2650 : ) -> Self
130 2650 : where
131 2650 : B: Buffer<IoBuf = Buf> + Send + 'static,
132 2650 : {
133 2650 : let (front, back) = duplex::mpsc::channel(1);
134 2650 : back.try_send(buf.flush())
135 2650 : .expect("we just created it with capacity 1");
136 2650 :
137 2650 : let join_handle = tokio::spawn(
138 2650 : FlushBackgroundTask::new(back, file, gate_guard, cancel, ctx)
139 2650 : .run()
140 2650 : .instrument(span),
141 2650 : );
142 2650 :
143 2650 : FlushHandle {
144 2650 : inner: Some(FlushHandleInner {
145 2650 : channel: front,
146 2650 : join_handle,
147 2650 : }),
148 2650 : }
149 2650 : }
150 :
151 : /// Submits a buffer to be flushed in the background task.
152 : /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
153 : /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
154 : /// clear `maybe_flushed`.
155 13246 : pub async fn flush(
156 13246 : &mut self,
157 13246 : slice: FullSlice<Buf>,
158 13246 : offset: u64,
159 13246 : ) -> Result<(FullSlice<Buf>, FlushControl), FlushTaskError> {
160 13246 : let (request, flush_control) = new_flush_op(slice, offset);
161 :
162 : // Submits the buffer to the background task.
163 13246 : let submit = self.inner_mut().channel.send(request).await;
164 13246 : if submit.is_err() {
165 0 : return self.handle_error().await;
166 13246 : }
167 :
168 : // Wait for an available buffer from the background flush task.
169 : // This is the BACKPRESSURE mechanism: if the flush task can't keep up,
170 : // then the write path will eventually wait for it here.
171 13246 : let Some(recycled) = self.inner_mut().channel.recv().await else {
172 0 : return self.handle_error().await;
173 : };
174 :
175 13246 : Ok((recycled, flush_control))
176 13246 : }
177 :
178 0 : async fn handle_error<T>(&mut self) -> Result<T, FlushTaskError> {
179 0 : Err(self
180 0 : .shutdown()
181 0 : .await
182 0 : .expect_err("flush task only disconnects duplex if it exits with an error"))
183 0 : }
184 :
185 : /// Cleans up the channel, join the flush task.
186 18 : pub async fn shutdown(&mut self) -> Result<Arc<W>, FlushTaskError> {
187 18 : let handle = self
188 18 : .inner
189 18 : .take()
190 18 : .expect("must not use after we returned an error");
191 18 : drop(handle.channel.tx);
192 18 : handle.join_handle.await.unwrap()
193 18 : }
194 :
195 : /// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
196 : /// This only happens if the handle is used after an error.
197 26492 : fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
198 26492 : self.inner
199 26492 : .as_mut()
200 26492 : .expect("must not use after we returned an error")
201 26492 : }
202 : }
203 :
204 : /// A background task for flushing data to disk.
205 : pub struct FlushBackgroundTask<Buf, W> {
206 : /// A bi-directional channel that receives (buffer, offset) for writes,
207 : /// and send back recycled buffer.
208 : channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
209 : /// A writter for persisting data to disk.
210 : writer: Arc<W>,
211 : ctx: RequestContext,
212 : cancel: CancellationToken,
213 : /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
214 : _gate_guard: utils::sync::gate::GateGuard,
215 : }
216 :
217 : #[derive(Debug, thiserror::Error)]
218 : pub enum FlushTaskError {
219 : #[error("flush task cancelled")]
220 : Cancelled,
221 : }
222 :
223 : impl<Buf, W> FlushBackgroundTask<Buf, W>
224 : where
225 : Buf: IoBufAligned + Send + Sync,
226 : W: OwnedAsyncWriter + Sync + 'static,
227 : {
228 : /// Creates a new background flush task.
229 2650 : fn new(
230 2650 : channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
231 2650 : file: Arc<W>,
232 2650 : gate_guard: utils::sync::gate::GateGuard,
233 2650 : cancel: CancellationToken,
234 2650 : ctx: RequestContext,
235 2650 : ) -> Self {
236 2650 : FlushBackgroundTask {
237 2650 : channel,
238 2650 : writer: file,
239 2650 : _gate_guard: gate_guard,
240 2650 : cancel,
241 2650 : ctx,
242 2650 : }
243 2650 : }
244 :
245 : /// Runs the background flush task.
246 2650 : async fn run(mut self) -> Result<Arc<W>, FlushTaskError> {
247 : // Exit condition: channel is closed and there is no remaining buffer to be flushed
248 15874 : while let Some(request) = self.channel.recv().await {
249 : #[cfg(test)]
250 : {
251 : // In test, wait for control to signal that we are ready to flush.
252 13246 : if request.ready_to_flush_rx.await.is_err() {
253 18 : tracing::debug!("control dropped");
254 13228 : }
255 : }
256 :
257 : // Write slice to disk at `offset`.
258 : //
259 : // Error handling happens according to the current policy of crashing
260 : // on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
261 : // (The upper layers of the Pageserver write path are not equipped to retry write errors
262 : // becasuse they often deallocate the buffers that were already written).
263 : //
264 : // TODO: use utils::backoff::retry once async closures are actually usable
265 : //
266 13246 : let mut slice_storage = Some(request.slice);
267 13246 : for attempt in 1.. {
268 13246 : if self.cancel.is_cancelled() {
269 0 : return Err(FlushTaskError::Cancelled);
270 13246 : }
271 13246 : let result = async {
272 13246 : if attempt > 1 {
273 0 : info!("retrying flush");
274 13246 : }
275 13246 : let slice = slice_storage.take().expect(
276 13246 : "likely previous invocation of this future didn't get polled to completion",
277 13246 : );
278 : // Don't cancel this write by doing tokio::select with self.cancel.cancelled().
279 : // The underlying tokio-epoll-uring slot / kernel operation is still ongoing and occupies resources.
280 : // If we retry indefinitely, we'll deplete those resources.
281 : // Future: teach tokio-epoll-uring io_uring operation cancellation, but still,
282 : // wait for cancelled ops to complete and discard their error.
283 13246 : let (slice, res) = self.writer.write_all_at(slice, request.offset, &self.ctx).await;
284 13246 : slice_storage = Some(slice);
285 13246 : let res = res.maybe_fatal_err("owned_buffers_io flush");
286 13246 : let Err(err) = res else {
287 13246 : return ControlFlow::Break(());
288 : };
289 0 : warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
290 0 : utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &self.cancel).await;
291 0 : ControlFlow::Continue(())
292 13246 : }
293 13246 : .instrument(info_span!("flush_attempt", %attempt))
294 13246 : .await;
295 13246 : match result {
296 13246 : ControlFlow::Break(()) => break,
297 0 : ControlFlow::Continue(()) => continue,
298 : }
299 : }
300 13246 : let slice = slice_storage.expect("loop must have run at least once");
301 13246 :
302 13246 : #[cfg(test)]
303 13246 : {
304 13246 : // In test, tell control we are done flushing buffer.
305 13246 : if request.done_flush_tx.send(()).is_err() {
306 18 : tracing::debug!("control dropped");
307 13228 : }
308 0 : }
309 0 :
310 0 : // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
311 13246 : if self.channel.send(slice).await.is_err() {
312 : // Although channel is closed. Still need to finish flushing the remaining buffers.
313 0 : continue;
314 13246 : }
315 : }
316 :
317 2377 : Ok(self.writer)
318 2377 : }
319 : }
320 :
321 : #[cfg(test)]
322 : pub(crate) struct FlushNotStarted {
323 : ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
324 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
325 : }
326 :
327 : #[cfg(test)]
328 : pub(crate) struct FlushInProgress {
329 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
330 : }
331 :
332 : #[cfg(test)]
333 : pub(crate) struct FlushDone;
334 :
335 : #[cfg(test)]
336 : impl FlushNotStarted {
337 : /// Signals the background task the buffer is ready to flush to disk.
338 13228 : pub fn ready_to_flush(self) -> FlushInProgress {
339 13228 : self.ready_to_flush_tx
340 13228 : .send(())
341 13228 : .map(|_| FlushInProgress {
342 13228 : done_flush_rx: self.done_flush_rx,
343 13228 : })
344 13228 : .unwrap()
345 13228 : }
346 : }
347 :
348 : #[cfg(test)]
349 : impl FlushInProgress {
350 : /// Waits until background flush is done.
351 13228 : pub async fn wait_until_flush_is_done(self) -> FlushDone {
352 13228 : self.done_flush_rx.await.unwrap();
353 13228 : FlushDone
354 13228 : }
355 : }
|