Line data Source code
1 : use std::ops::ControlFlow;
2 :
3 : use tokio_util::sync::CancellationToken;
4 : use tracing::{Instrument, info_span, warn};
5 : use utils::sync::duplex;
6 :
7 : use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
8 : use crate::context::RequestContext;
9 : use crate::virtual_file::MaybeFatalIo;
10 : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned;
11 : use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
12 :
13 : /// A handle to the flush task.
14 : pub struct FlushHandle<Buf, W> {
15 : inner: Option<FlushHandleInner<Buf, W>>,
16 : }
17 :
18 : pub struct FlushHandleInner<Buf, W> {
19 : /// A bi-directional channel that sends (buffer, offset) for writes,
20 : /// and receives recyled buffer.
21 : channel: duplex::mpsc::Duplex<Request<Buf>, FullSlice<Buf>>,
22 : /// Join handle for the background flush task.
23 : join_handle: tokio::task::JoinHandle<Result<W, FlushTaskError>>,
24 : }
25 :
26 : struct FlushRequest<Buf> {
27 : slice: FullSlice<Buf>,
28 : offset: u64,
29 : #[cfg(test)]
30 : ready_to_flush_rx: Option<tokio::sync::oneshot::Receiver<()>>,
31 : #[cfg(test)]
32 : done_flush_tx: Option<tokio::sync::oneshot::Sender<()>>,
33 : }
34 :
35 : pub struct ShutdownRequest {
36 : pub set_len: Option<u64>,
37 : }
38 :
39 : enum Request<Buf> {
40 : Flush(FlushRequest<Buf>),
41 : Shutdown(ShutdownRequest),
42 : }
43 :
44 : impl<Buf> Request<Buf> {
45 12769 : fn op_str(&self) -> &'static str {
46 12769 : match self {
47 11817 : Request::Flush(_) => "flush",
48 952 : Request::Shutdown(_) => "shutdown",
49 : }
50 12769 : }
51 : }
52 :
53 : /// Constructs a request and a control object for a new flush operation.
54 : #[cfg(not(test))]
55 0 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
56 0 : let request = FlushRequest { slice, offset };
57 0 : let control = FlushControl::untracked();
58 :
59 0 : (request, control)
60 0 : }
61 :
62 : /// Constructs a request and a control object for a new flush operation.
63 : #[cfg(test)]
64 11817 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
65 11817 : let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
66 11817 : let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
67 11817 : let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
68 :
69 11817 : let request = FlushRequest {
70 11817 : slice,
71 11817 : offset,
72 11817 : ready_to_flush_rx: Some(ready_to_flush_rx),
73 11817 : done_flush_tx: Some(done_flush_tx),
74 11817 : };
75 11817 : (request, control)
76 11817 : }
77 :
78 : /// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
79 : #[cfg(test)]
80 : pub(crate) struct FlushControl {
81 : not_started: FlushNotStarted,
82 : }
83 :
84 : #[cfg(not(test))]
85 : pub(crate) struct FlushControl;
86 :
87 : impl FlushControl {
88 : #[cfg(test)]
89 11817 : fn not_started(
90 11817 : ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
91 11817 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
92 11817 : ) -> Self {
93 11817 : FlushControl {
94 11817 : not_started: FlushNotStarted {
95 11817 : ready_to_flush_tx,
96 11817 : done_flush_rx,
97 11817 : },
98 11817 : }
99 11817 : }
100 :
101 : #[cfg(not(test))]
102 0 : fn untracked() -> Self {
103 0 : FlushControl
104 0 : }
105 :
106 : /// In tests, turn flush control into a not started state.
107 : #[cfg(test)]
108 1 : pub(crate) fn into_not_started(self) -> FlushNotStarted {
109 1 : self.not_started
110 1 : }
111 :
112 : /// Release control to the submitted buffer.
113 : ///
114 : /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
115 10866 : pub async fn release(self) {
116 : #[cfg(test)]
117 : {
118 10866 : self.not_started
119 10866 : .ready_to_flush()
120 10866 : .wait_until_flush_is_done()
121 10866 : .await;
122 : }
123 10866 : }
124 : }
125 :
126 : impl<Buf, W> FlushHandle<Buf, W>
127 : where
128 : Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
129 : W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
130 : {
131 : /// Spawns a new background flush task and obtains a handle.
132 : ///
133 : /// Handle and background task are connected through a duplex channel.
134 : /// Dirty buffers are sent to the background task for flushing.
135 : /// Clean buffers are sent back to the handle for reuse.
136 : ///
137 : /// The queue depth is 1, and the passed-in `buf` seeds the queue depth.
138 : /// I.e., the passed-in buf is immediately available to the handle as a recycled buffer.
139 1809 : pub fn spawn_new<B>(
140 1809 : file: W,
141 1809 : buf: B,
142 1809 : gate_guard: utils::sync::gate::GateGuard,
143 1809 : cancel: CancellationToken,
144 1809 : ctx: RequestContext,
145 1809 : span: tracing::Span,
146 1809 : ) -> Self
147 1809 : where
148 1809 : B: Buffer<IoBuf = Buf> + Send + 'static,
149 : {
150 1809 : let (front, back) = duplex::mpsc::channel(1);
151 1809 : back.try_send(buf.flush())
152 1809 : .expect("we just created it with capacity 1");
153 :
154 1809 : let join_handle = tokio::spawn(
155 1809 : FlushBackgroundTask::new(back, file, gate_guard, cancel, ctx)
156 1809 : .run()
157 1809 : .instrument(span),
158 : );
159 :
160 1809 : FlushHandle {
161 1809 : inner: Some(FlushHandleInner {
162 1809 : channel: front,
163 1809 : join_handle,
164 1809 : }),
165 1809 : }
166 1809 : }
167 :
168 : /// Submits a buffer to be flushed in the background task.
169 : /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
170 : /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
171 : /// clear `maybe_flushed`.
172 11817 : pub async fn flush(
173 11817 : &mut self,
174 11817 : slice: FullSlice<Buf>,
175 11817 : offset: u64,
176 11817 : ) -> Result<(FullSlice<Buf>, FlushControl), FlushTaskError> {
177 11817 : let (request, flush_control) = new_flush_op(slice, offset);
178 :
179 : // Submits the buffer to the background task.
180 11817 : self.send(Request::Flush(request)).await?;
181 :
182 : // Wait for an available buffer from the background flush task.
183 : // This is the BACKPRESSURE mechanism: if the flush task can't keep up,
184 : // then the write path will eventually wait for it here.
185 11817 : let Some(recycled) = self.inner_mut().channel.recv().await else {
186 0 : return self.handle_error().await;
187 : };
188 :
189 11817 : Ok((recycled, flush_control))
190 11817 : }
191 :
192 : /// Sends poison pill to flush task and waits for it to exit.
193 952 : pub async fn shutdown(&mut self, req: ShutdownRequest) -> Result<W, FlushTaskError> {
194 952 : self.send(Request::Shutdown(req)).await?;
195 952 : self.wait().await
196 952 : }
197 :
198 12769 : async fn send(&mut self, request: Request<Buf>) -> Result<(), FlushTaskError> {
199 12769 : let submit = self.inner_mut().channel.send(request).await;
200 12769 : if submit.is_err() {
201 0 : return self.handle_error().await;
202 12769 : }
203 12769 : Ok(())
204 12769 : }
205 :
206 0 : async fn handle_error<T>(&mut self) -> Result<T, FlushTaskError> {
207 0 : Err(self
208 0 : .wait()
209 0 : .await
210 0 : .expect_err("flush task only disconnects duplex if it exits with an error"))
211 0 : }
212 :
213 952 : async fn wait(&mut self) -> Result<W, FlushTaskError> {
214 952 : let handle = self
215 952 : .inner
216 952 : .take()
217 952 : .expect("must not use after we returned an error");
218 952 : drop(handle.channel.tx);
219 952 : handle.join_handle.await.unwrap()
220 952 : }
221 :
222 : /// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
223 : /// This only happens if the handle is used after an error.
224 24586 : fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
225 24586 : self.inner
226 24586 : .as_mut()
227 24586 : .expect("must not use after we returned an error")
228 24586 : }
229 : }
230 :
231 : /// A background task for flushing data to disk.
232 : pub struct FlushBackgroundTask<Buf, W> {
233 : /// A bi-directional channel that receives (buffer, offset) for writes,
234 : /// and send back recycled buffer.
235 : channel: duplex::mpsc::Duplex<FullSlice<Buf>, Request<Buf>>,
236 : /// A writter for persisting data to disk.
237 : writer: W,
238 : ctx: RequestContext,
239 : cancel: CancellationToken,
240 : /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
241 : _gate_guard: utils::sync::gate::GateGuard,
242 : }
243 :
244 : #[derive(Debug, thiserror::Error)]
245 : pub enum FlushTaskError {
246 : #[error("flush task cancelled")]
247 : Cancelled,
248 : }
249 :
250 : impl FlushTaskError {
251 0 : pub fn is_cancel(&self) -> bool {
252 0 : match self {
253 0 : FlushTaskError::Cancelled => true,
254 : }
255 0 : }
256 0 : pub fn into_anyhow(self) -> anyhow::Error {
257 0 : match self {
258 0 : FlushTaskError::Cancelled => anyhow::anyhow!(self),
259 : }
260 0 : }
261 : }
262 :
263 : impl<Buf, W> FlushBackgroundTask<Buf, W>
264 : where
265 : Buf: IoBufAligned + Send + Sync,
266 : W: OwnedAsyncWriter + Sync + 'static,
267 : {
268 : /// Creates a new background flush task.
269 1809 : fn new(
270 1809 : channel: duplex::mpsc::Duplex<FullSlice<Buf>, Request<Buf>>,
271 1809 : file: W,
272 1809 : gate_guard: utils::sync::gate::GateGuard,
273 1809 : cancel: CancellationToken,
274 1809 : ctx: RequestContext,
275 1809 : ) -> Self {
276 1809 : FlushBackgroundTask {
277 1809 : channel,
278 1809 : writer: file,
279 1809 : _gate_guard: gate_guard,
280 1809 : cancel,
281 1809 : ctx,
282 1809 : }
283 1809 : }
284 :
285 : /// Runs the background flush task.
286 1809 : async fn run(mut self) -> Result<W, FlushTaskError> {
287 : // Exit condition: channel is closed and there is no remaining buffer to be flushed
288 14574 : while let Some(request) = self.channel.recv().await {
289 12769 : let op_kind = request.op_str();
290 :
291 : // Perform the requested operation.
292 : //
293 : // Error handling happens according to the current policy of crashing
294 : // on fatal IO errors and retrying in place otherwise (deeming all other errors retryable).
295 : // (The upper layers of the Pageserver write path are not equipped to retry write errors
296 : // becasuse they often deallocate the buffers that were already written).
297 : //
298 : // TODO: use utils::backoff::retry once async closures are actually usable
299 : //
300 12769 : let mut request_storage = Some(request);
301 12769 : for attempt in 1.. {
302 12769 : if self.cancel.is_cancelled() {
303 0 : return Err(FlushTaskError::Cancelled);
304 12769 : }
305 12769 : let result = async {
306 12769 : let request: Request<Buf> = request_storage .take().expect(
307 12769 : "likely previous invocation of this future didn't get polled to completion",
308 : );
309 952 : match &request {
310 : Request::Shutdown(ShutdownRequest { set_len: None }) => {
311 944 : request_storage = Some(request);
312 944 : return ControlFlow::Break(());
313 : },
314 11825 : Request::Flush(_) | Request::Shutdown(ShutdownRequest { set_len: Some(_) }) => {
315 11825 : },
316 : }
317 11825 : if attempt > 1 {
318 0 : warn!(op=%request.op_str(), "retrying");
319 11825 : }
320 : // borrows so we can async move the requests into async block while not moving these borrows here
321 11825 : let writer = &self.writer;
322 11825 : let request_storage = &mut request_storage;
323 11825 : let ctx = &self.ctx;
324 11825 : let io_fut = match request {
325 11817 : Request::Flush(FlushRequest { slice, offset, #[cfg(test)] ready_to_flush_rx, #[cfg(test)] done_flush_tx }) => futures::future::Either::Left(async move {
326 : #[cfg(test)]
327 11817 : if let Some(ready_to_flush_rx) = ready_to_flush_rx {
328 : {
329 : // In test, wait for control to signal that we are ready to flush.
330 11817 : if ready_to_flush_rx.await.is_err() {
331 950 : tracing::debug!("control dropped");
332 10867 : }
333 : }
334 0 : }
335 11817 : let (slice, res) = writer.write_all_at(slice, offset, ctx).await;
336 11817 : *request_storage = Some(Request::Flush(FlushRequest {
337 11817 : slice,
338 11817 : offset,
339 11817 : #[cfg(test)]
340 11817 : ready_to_flush_rx: None, // the contract is that we notify before first attempt
341 11817 : #[cfg(test)]
342 11817 : done_flush_tx
343 11817 : }));
344 11817 : res
345 11817 : }),
346 8 : Request::Shutdown(ShutdownRequest { set_len }) => futures::future::Either::Right(async move {
347 8 : let set_len = set_len.expect("we filter out the None case above");
348 8 : let res = writer.set_len(set_len, ctx).await;
349 8 : *request_storage = Some(Request::Shutdown(ShutdownRequest {
350 8 : set_len: Some(set_len),
351 8 : }));
352 8 : res
353 8 : }),
354 : };
355 : // Don't cancel the io_fut by doing tokio::select with self.cancel.cancelled().
356 : // The underlying tokio-epoll-uring slot / kernel operation is still ongoing and occupies resources.
357 : // If we retry indefinitely, we'll deplete those resources.
358 : // Future: teach tokio-epoll-uring io_uring operation cancellation, but still,
359 : // wait for cancelled ops to complete and discard their error.
360 11825 : let res = io_fut.await;
361 11825 : let res = res.maybe_fatal_err("owned_buffers_io flush");
362 11825 : let Err(err) = res else {
363 11825 : if attempt > 1 {
364 0 : warn!(op=%op_kind, "retry succeeded");
365 11825 : }
366 11825 : return ControlFlow::Break(());
367 : };
368 0 : warn!(%err, "error flushing buffered writer buffer to disk, retrying after backoff");
369 0 : utils::backoff::exponential_backoff(attempt, 1.0, 10.0, &self.cancel).await;
370 0 : ControlFlow::Continue(())
371 12769 : }
372 12769 : .instrument(info_span!("attempt", %attempt, %op_kind))
373 12769 : .await;
374 12769 : match result {
375 12769 : ControlFlow::Break(()) => break,
376 0 : ControlFlow::Continue(()) => continue,
377 : }
378 : }
379 12769 : let request = request_storage.expect("loop must have run at least once");
380 :
381 12769 : let slice = match request {
382 : Request::Flush(FlushRequest {
383 11817 : slice,
384 : #[cfg(test)]
385 11817 : mut done_flush_tx,
386 : ..
387 : }) => {
388 : #[cfg(test)]
389 : {
390 : // In test, tell control we are done flushing buffer.
391 11817 : if done_flush_tx.take().expect("always Some").send(()).is_err() {
392 950 : tracing::debug!("control dropped");
393 10867 : }
394 : }
395 11817 : slice
396 : }
397 : Request::Shutdown(_) => {
398 : // next iteration will observe recv() returning None
399 952 : continue;
400 : }
401 : };
402 :
403 : // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
404 11817 : let send_res = self.channel.send(slice).await;
405 11817 : if send_res.is_err() {
406 : // Although channel is closed. Still need to finish flushing the remaining buffers.
407 0 : continue;
408 11817 : }
409 : }
410 :
411 1727 : Ok(self.writer)
412 1727 : }
413 : }
414 :
415 : #[cfg(test)]
416 : pub(crate) struct FlushNotStarted {
417 : ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
418 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
419 : }
420 :
421 : #[cfg(test)]
422 : pub(crate) struct FlushInProgress {
423 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
424 : }
425 :
426 : #[cfg(test)]
427 : pub(crate) struct FlushDone;
428 :
429 : #[cfg(test)]
430 : impl FlushNotStarted {
431 : /// Signals the background task the buffer is ready to flush to disk.
432 10867 : pub fn ready_to_flush(self) -> FlushInProgress {
433 10867 : self.ready_to_flush_tx
434 10867 : .send(())
435 10867 : .map(|_| FlushInProgress {
436 10867 : done_flush_rx: self.done_flush_rx,
437 10867 : })
438 10867 : .unwrap()
439 10867 : }
440 : }
441 :
442 : #[cfg(test)]
443 : impl FlushInProgress {
444 : /// Waits until background flush is done.
445 10867 : pub async fn wait_until_flush_is_done(self) -> FlushDone {
446 10867 : self.done_flush_rx.await.unwrap();
447 10867 : FlushDone
448 10867 : }
449 : }
|