Line data Source code
1 : use std::sync::Arc;
2 :
3 : use utils::sync::duplex;
4 :
5 : use crate::{
6 : context::RequestContext,
7 : virtual_file::owned_buffers_io::{io_buf_aligned::IoBufAligned, io_buf_ext::FullSlice},
8 : };
9 :
10 : use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
11 :
12 : /// A handle to the flush task.
13 : pub struct FlushHandle<Buf, W> {
14 : inner: Option<FlushHandleInner<Buf, W>>,
15 : /// Immutable buffer for serving tail reads.
16 : /// `None` if no flush request has been submitted.
17 : pub(super) maybe_flushed: Option<FullSlice<Buf>>,
18 : }
19 :
20 : pub struct FlushHandleInner<Buf, W> {
21 : /// A bi-directional channel that sends (buffer, offset) for writes,
22 : /// and receives recyled buffer.
23 : channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
24 : /// Join handle for the background flush task.
25 : join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
26 : }
27 :
28 : struct FlushRequest<Buf> {
29 : slice: FullSlice<Buf>,
30 : offset: u64,
31 : #[cfg(test)]
32 : ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
33 : #[cfg(test)]
34 : done_flush_tx: tokio::sync::oneshot::Sender<()>,
35 : }
36 :
37 : /// Constructs a request and a control object for a new flush operation.
38 : #[cfg(not(test))]
39 0 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
40 0 : let request = FlushRequest { slice, offset };
41 0 : let control = FlushControl::untracked();
42 0 :
43 0 : (request, control)
44 0 : }
45 :
46 : /// Constructs a request and a control object for a new flush operation.
47 : #[cfg(test)]
48 13246 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
49 13246 : let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
50 13246 : let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
51 13246 : let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
52 13246 :
53 13246 : let request = FlushRequest {
54 13246 : slice,
55 13246 : offset,
56 13246 : ready_to_flush_rx,
57 13246 : done_flush_tx,
58 13246 : };
59 13246 : (request, control)
60 13246 : }
61 :
62 : /// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
63 : #[cfg(test)]
64 : pub(crate) struct FlushControl {
65 : not_started: FlushNotStarted,
66 : }
67 :
68 : #[cfg(not(test))]
69 : pub(crate) struct FlushControl;
70 :
71 : impl FlushControl {
72 : #[cfg(test)]
73 13246 : fn not_started(
74 13246 : ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
75 13246 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
76 13246 : ) -> Self {
77 13246 : FlushControl {
78 13246 : not_started: FlushNotStarted {
79 13246 : ready_to_flush_tx,
80 13246 : done_flush_rx,
81 13246 : },
82 13246 : }
83 13246 : }
84 :
85 : #[cfg(not(test))]
86 0 : fn untracked() -> Self {
87 0 : FlushControl
88 0 : }
89 :
90 : /// In tests, turn flush control into a not started state.
91 : #[cfg(test)]
92 4 : pub(crate) fn into_not_started(self) -> FlushNotStarted {
93 4 : self.not_started
94 4 : }
95 :
96 : /// Release control to the submitted buffer.
97 : ///
98 : /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
99 13224 : pub async fn release(self) {
100 13224 : #[cfg(test)]
101 13224 : {
102 13224 : self.not_started
103 13224 : .ready_to_flush()
104 13224 : .wait_until_flush_is_done()
105 13224 : .await;
106 0 : }
107 13224 : }
108 : }
109 :
110 : impl<Buf, W> FlushHandle<Buf, W>
111 : where
112 : Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
113 : W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
114 : {
115 : /// Spawns a new background flush task and obtains a handle.
116 : ///
117 : /// Note: The background task so we do not need to explicitly maintain a queue of buffers.
118 2630 : pub fn spawn_new<B>(
119 2630 : file: Arc<W>,
120 2630 : buf: B,
121 2630 : gate_guard: utils::sync::gate::GateGuard,
122 2630 : ctx: RequestContext,
123 2630 : ) -> Self
124 2630 : where
125 2630 : B: Buffer<IoBuf = Buf> + Send + 'static,
126 2630 : {
127 2630 : // It is fine to buffer up to only 1 message. We only 1 message in-flight at a time.
128 2630 : let (front, back) = duplex::mpsc::channel(1);
129 2630 :
130 2630 : let join_handle = tokio::spawn(async move {
131 2615 : FlushBackgroundTask::new(back, file, gate_guard, ctx)
132 2615 : .run(buf.flush())
133 2615 : .await
134 2630 : });
135 2630 :
136 2630 : FlushHandle {
137 2630 : inner: Some(FlushHandleInner {
138 2630 : channel: front,
139 2630 : join_handle,
140 2630 : }),
141 2630 : maybe_flushed: None,
142 2630 : }
143 2630 : }
144 :
145 : /// Submits a buffer to be flushed in the background task.
146 : /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
147 : /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
148 : /// clear `maybe_flushed`.
149 13246 : pub async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)>
150 13246 : where
151 13246 : B: Buffer<IoBuf = Buf> + Send + 'static,
152 13246 : {
153 13246 : let slice = buf.flush();
154 13246 :
155 13246 : // Saves a buffer for read while flushing. This also removes reference to the old buffer.
156 13246 : self.maybe_flushed = Some(slice.cheap_clone());
157 13246 :
158 13246 : let (request, flush_control) = new_flush_op(slice, offset);
159 :
160 : // Submits the buffer to the background task.
161 13246 : let submit = self.inner_mut().channel.send(request).await;
162 13246 : if submit.is_err() {
163 0 : return self.handle_error().await;
164 13246 : }
165 :
166 : // Wait for an available buffer from the background flush task.
167 : // This is the BACKPRESSURE mechanism: if the flush task can't keep up,
168 : // then the write path will eventually wait for it here.
169 13246 : let Some(recycled) = self.inner_mut().channel.recv().await else {
170 0 : return self.handle_error().await;
171 : };
172 :
173 : // The only other place that could hold a reference to the recycled buffer
174 : // is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
175 13246 : let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
176 13246 : Ok((recycled, flush_control))
177 13246 : }
178 :
179 0 : async fn handle_error<T>(&mut self) -> std::io::Result<T> {
180 0 : Err(self
181 0 : .shutdown()
182 0 : .await
183 0 : .expect_err("flush task only disconnects duplex if it exits with an error"))
184 0 : }
185 :
186 : /// Cleans up the channel, join the flush task.
187 18 : pub async fn shutdown(&mut self) -> std::io::Result<Arc<W>> {
188 18 : let handle = self
189 18 : .inner
190 18 : .take()
191 18 : .expect("must not use after we returned an error");
192 18 : drop(handle.channel.tx);
193 18 : handle.join_handle.await.unwrap()
194 18 : }
195 :
196 : /// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
197 : /// This only happens if the handle is used after an error.
198 26492 : fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
199 26492 : self.inner
200 26492 : .as_mut()
201 26492 : .expect("must not use after we returned an error")
202 26492 : }
203 : }
204 :
205 : /// A background task for flushing data to disk.
206 : pub struct FlushBackgroundTask<Buf, W> {
207 : /// A bi-directional channel that receives (buffer, offset) for writes,
208 : /// and send back recycled buffer.
209 : channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
210 : /// A writter for persisting data to disk.
211 : writer: Arc<W>,
212 : ctx: RequestContext,
213 : /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
214 : _gate_guard: utils::sync::gate::GateGuard,
215 : }
216 :
217 : impl<Buf, W> FlushBackgroundTask<Buf, W>
218 : where
219 : Buf: IoBufAligned + Send + Sync,
220 : W: OwnedAsyncWriter + Sync + 'static,
221 : {
222 : /// Creates a new background flush task.
223 2615 : fn new(
224 2615 : channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
225 2615 : file: Arc<W>,
226 2615 : gate_guard: utils::sync::gate::GateGuard,
227 2615 : ctx: RequestContext,
228 2615 : ) -> Self {
229 2615 : FlushBackgroundTask {
230 2615 : channel,
231 2615 : writer: file,
232 2615 : _gate_guard: gate_guard,
233 2615 : ctx,
234 2615 : }
235 2615 : }
236 :
237 : /// Runs the background flush task.
238 : /// The passed in slice is immediately sent back to the flush handle through the duplex channel.
239 2615 : async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
240 2615 : // Sends the extra buffer back to the handle.
241 2615 : self.channel.send(slice).await.map_err(|_| {
242 0 : std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
243 2615 : })?;
244 :
245 : // Exit condition: channel is closed and there is no remaining buffer to be flushed
246 15861 : while let Some(request) = self.channel.recv().await {
247 : #[cfg(test)]
248 : {
249 : // In test, wait for control to signal that we are ready to flush.
250 13246 : if request.ready_to_flush_rx.await.is_err() {
251 18 : tracing::debug!("control dropped");
252 13228 : }
253 : }
254 :
255 : // Write slice to disk at `offset`.
256 13246 : let slice = self
257 13246 : .writer
258 13246 : .write_all_at(request.slice, request.offset, &self.ctx)
259 13246 : .await?;
260 :
261 : #[cfg(test)]
262 : {
263 : // In test, tell control we are done flushing buffer.
264 13246 : if request.done_flush_tx.send(()).is_err() {
265 18 : tracing::debug!("control dropped");
266 13228 : }
267 : }
268 :
269 : // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
270 13246 : if self.channel.send(slice).await.is_err() {
271 : // Although channel is closed. Still need to finish flushing the remaining buffers.
272 0 : continue;
273 13246 : }
274 : }
275 :
276 2337 : Ok(self.writer)
277 2337 : }
278 : }
279 :
280 : #[cfg(test)]
281 : pub(crate) struct FlushNotStarted {
282 : ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
283 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
284 : }
285 :
286 : #[cfg(test)]
287 : pub(crate) struct FlushInProgress {
288 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
289 : }
290 :
291 : #[cfg(test)]
292 : pub(crate) struct FlushDone;
293 :
294 : #[cfg(test)]
295 : impl FlushNotStarted {
296 : /// Signals the background task the buffer is ready to flush to disk.
297 13228 : pub fn ready_to_flush(self) -> FlushInProgress {
298 13228 : self.ready_to_flush_tx
299 13228 : .send(())
300 13228 : .map(|_| FlushInProgress {
301 13228 : done_flush_rx: self.done_flush_rx,
302 13228 : })
303 13228 : .unwrap()
304 13228 : }
305 : }
306 :
307 : #[cfg(test)]
308 : impl FlushInProgress {
309 : /// Waits until background flush is done.
310 13228 : pub async fn wait_until_flush_is_done(self) -> FlushDone {
311 13228 : self.done_flush_rx.await.unwrap();
312 13228 : FlushDone
313 13228 : }
314 : }
|