Line data Source code
1 : use std::sync::Arc;
2 :
3 : use utils::sync::duplex;
4 :
5 : use super::{Buffer, CheapCloneForRead, OwnedAsyncWriter};
6 : use crate::context::RequestContext;
7 : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAligned;
8 : use crate::virtual_file::owned_buffers_io::io_buf_ext::FullSlice;
9 :
10 : /// A handle to the flush task.
11 : pub struct FlushHandle<Buf, W> {
12 : inner: Option<FlushHandleInner<Buf, W>>,
13 : /// Immutable buffer for serving tail reads.
14 : /// `None` if no flush request has been submitted.
15 : pub(super) maybe_flushed: Option<FullSlice<Buf>>,
16 : }
17 :
18 : pub struct FlushHandleInner<Buf, W> {
19 : /// A bi-directional channel that sends (buffer, offset) for writes,
20 : /// and receives recyled buffer.
21 : channel: duplex::mpsc::Duplex<FlushRequest<Buf>, FullSlice<Buf>>,
22 : /// Join handle for the background flush task.
23 : join_handle: tokio::task::JoinHandle<std::io::Result<Arc<W>>>,
24 : }
25 :
26 : struct FlushRequest<Buf> {
27 : slice: FullSlice<Buf>,
28 : offset: u64,
29 : #[cfg(test)]
30 : ready_to_flush_rx: tokio::sync::oneshot::Receiver<()>,
31 : #[cfg(test)]
32 : done_flush_tx: tokio::sync::oneshot::Sender<()>,
33 : }
34 :
35 : /// Constructs a request and a control object for a new flush operation.
36 : #[cfg(not(test))]
37 0 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
38 0 : let request = FlushRequest { slice, offset };
39 0 : let control = FlushControl::untracked();
40 0 :
41 0 : (request, control)
42 0 : }
43 :
44 : /// Constructs a request and a control object for a new flush operation.
45 : #[cfg(test)]
46 13246 : fn new_flush_op<Buf>(slice: FullSlice<Buf>, offset: u64) -> (FlushRequest<Buf>, FlushControl) {
47 13246 : let (ready_to_flush_tx, ready_to_flush_rx) = tokio::sync::oneshot::channel();
48 13246 : let (done_flush_tx, done_flush_rx) = tokio::sync::oneshot::channel();
49 13246 : let control = FlushControl::not_started(ready_to_flush_tx, done_flush_rx);
50 13246 :
51 13246 : let request = FlushRequest {
52 13246 : slice,
53 13246 : offset,
54 13246 : ready_to_flush_rx,
55 13246 : done_flush_tx,
56 13246 : };
57 13246 : (request, control)
58 13246 : }
59 :
60 : /// A handle to a `FlushRequest` that allows unit tests precise control over flush behavior.
61 : #[cfg(test)]
62 : pub(crate) struct FlushControl {
63 : not_started: FlushNotStarted,
64 : }
65 :
66 : #[cfg(not(test))]
67 : pub(crate) struct FlushControl;
68 :
69 : impl FlushControl {
70 : #[cfg(test)]
71 13246 : fn not_started(
72 13246 : ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
73 13246 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
74 13246 : ) -> Self {
75 13246 : FlushControl {
76 13246 : not_started: FlushNotStarted {
77 13246 : ready_to_flush_tx,
78 13246 : done_flush_rx,
79 13246 : },
80 13246 : }
81 13246 : }
82 :
83 : #[cfg(not(test))]
84 0 : fn untracked() -> Self {
85 0 : FlushControl
86 0 : }
87 :
88 : /// In tests, turn flush control into a not started state.
89 : #[cfg(test)]
90 4 : pub(crate) fn into_not_started(self) -> FlushNotStarted {
91 4 : self.not_started
92 4 : }
93 :
94 : /// Release control to the submitted buffer.
95 : ///
96 : /// In `cfg(test)` environment, the buffer is guranteed to be flushed to disk after [`FlushControl::release`] is finishes execution.
97 13224 : pub async fn release(self) {
98 13224 : #[cfg(test)]
99 13224 : {
100 13224 : self.not_started
101 13224 : .ready_to_flush()
102 13224 : .wait_until_flush_is_done()
103 13224 : .await;
104 0 : }
105 13224 : }
106 : }
107 :
108 : impl<Buf, W> FlushHandle<Buf, W>
109 : where
110 : Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
111 : W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
112 : {
113 : /// Spawns a new background flush task and obtains a handle.
114 : ///
115 : /// Note: The background task so we do not need to explicitly maintain a queue of buffers.
116 2650 : pub fn spawn_new<B>(
117 2650 : file: Arc<W>,
118 2650 : buf: B,
119 2650 : gate_guard: utils::sync::gate::GateGuard,
120 2650 : ctx: RequestContext,
121 2650 : ) -> Self
122 2650 : where
123 2650 : B: Buffer<IoBuf = Buf> + Send + 'static,
124 2650 : {
125 2650 : // It is fine to buffer up to only 1 message. We only 1 message in-flight at a time.
126 2650 : let (front, back) = duplex::mpsc::channel(1);
127 2650 :
128 2650 : let join_handle = tokio::spawn(async move {
129 2627 : FlushBackgroundTask::new(back, file, gate_guard, ctx)
130 2627 : .run(buf.flush())
131 2627 : .await
132 2650 : });
133 2650 :
134 2650 : FlushHandle {
135 2650 : inner: Some(FlushHandleInner {
136 2650 : channel: front,
137 2650 : join_handle,
138 2650 : }),
139 2650 : maybe_flushed: None,
140 2650 : }
141 2650 : }
142 :
143 : /// Submits a buffer to be flushed in the background task.
144 : /// Returns a buffer that completed flushing for re-use, length reset to 0, capacity unchanged.
145 : /// If `save_buf_for_read` is true, then we save the buffer in `Self::maybe_flushed`, otherwise
146 : /// clear `maybe_flushed`.
147 13246 : pub async fn flush<B>(&mut self, buf: B, offset: u64) -> std::io::Result<(B, FlushControl)>
148 13246 : where
149 13246 : B: Buffer<IoBuf = Buf> + Send + 'static,
150 13246 : {
151 13246 : let slice = buf.flush();
152 13246 :
153 13246 : // Saves a buffer for read while flushing. This also removes reference to the old buffer.
154 13246 : self.maybe_flushed = Some(slice.cheap_clone());
155 13246 :
156 13246 : let (request, flush_control) = new_flush_op(slice, offset);
157 :
158 : // Submits the buffer to the background task.
159 13246 : let submit = self.inner_mut().channel.send(request).await;
160 13246 : if submit.is_err() {
161 0 : return self.handle_error().await;
162 13246 : }
163 :
164 : // Wait for an available buffer from the background flush task.
165 : // This is the BACKPRESSURE mechanism: if the flush task can't keep up,
166 : // then the write path will eventually wait for it here.
167 13246 : let Some(recycled) = self.inner_mut().channel.recv().await else {
168 0 : return self.handle_error().await;
169 : };
170 :
171 : // The only other place that could hold a reference to the recycled buffer
172 : // is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
173 13246 : let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
174 13246 : Ok((recycled, flush_control))
175 13246 : }
176 :
177 0 : async fn handle_error<T>(&mut self) -> std::io::Result<T> {
178 0 : Err(self
179 0 : .shutdown()
180 0 : .await
181 0 : .expect_err("flush task only disconnects duplex if it exits with an error"))
182 0 : }
183 :
184 : /// Cleans up the channel, join the flush task.
185 18 : pub async fn shutdown(&mut self) -> std::io::Result<Arc<W>> {
186 18 : let handle = self
187 18 : .inner
188 18 : .take()
189 18 : .expect("must not use after we returned an error");
190 18 : drop(handle.channel.tx);
191 18 : handle.join_handle.await.unwrap()
192 18 : }
193 :
194 : /// Gets a mutable reference to the inner handle. Panics if [`Self::inner`] is `None`.
195 : /// This only happens if the handle is used after an error.
196 26492 : fn inner_mut(&mut self) -> &mut FlushHandleInner<Buf, W> {
197 26492 : self.inner
198 26492 : .as_mut()
199 26492 : .expect("must not use after we returned an error")
200 26492 : }
201 : }
202 :
203 : /// A background task for flushing data to disk.
204 : pub struct FlushBackgroundTask<Buf, W> {
205 : /// A bi-directional channel that receives (buffer, offset) for writes,
206 : /// and send back recycled buffer.
207 : channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
208 : /// A writter for persisting data to disk.
209 : writer: Arc<W>,
210 : ctx: RequestContext,
211 : /// Prevent timeline from shuting down until the flush background task finishes flushing all remaining buffers to disk.
212 : _gate_guard: utils::sync::gate::GateGuard,
213 : }
214 :
215 : impl<Buf, W> FlushBackgroundTask<Buf, W>
216 : where
217 : Buf: IoBufAligned + Send + Sync,
218 : W: OwnedAsyncWriter + Sync + 'static,
219 : {
220 : /// Creates a new background flush task.
221 2627 : fn new(
222 2627 : channel: duplex::mpsc::Duplex<FullSlice<Buf>, FlushRequest<Buf>>,
223 2627 : file: Arc<W>,
224 2627 : gate_guard: utils::sync::gate::GateGuard,
225 2627 : ctx: RequestContext,
226 2627 : ) -> Self {
227 2627 : FlushBackgroundTask {
228 2627 : channel,
229 2627 : writer: file,
230 2627 : _gate_guard: gate_guard,
231 2627 : ctx,
232 2627 : }
233 2627 : }
234 :
235 : /// Runs the background flush task.
236 : /// The passed in slice is immediately sent back to the flush handle through the duplex channel.
237 2627 : async fn run(mut self, slice: FullSlice<Buf>) -> std::io::Result<Arc<W>> {
238 2627 : // Sends the extra buffer back to the handle.
239 2627 : self.channel.send(slice).await.map_err(|_| {
240 0 : std::io::Error::new(std::io::ErrorKind::BrokenPipe, "flush handle closed early")
241 2627 : })?;
242 :
243 : // Exit condition: channel is closed and there is no remaining buffer to be flushed
244 15873 : while let Some(request) = self.channel.recv().await {
245 : #[cfg(test)]
246 : {
247 : // In test, wait for control to signal that we are ready to flush.
248 13246 : if request.ready_to_flush_rx.await.is_err() {
249 18 : tracing::debug!("control dropped");
250 13228 : }
251 : }
252 :
253 : // Write slice to disk at `offset`.
254 13246 : let slice = self
255 13246 : .writer
256 13246 : .write_all_at(request.slice, request.offset, &self.ctx)
257 13246 : .await?;
258 :
259 : #[cfg(test)]
260 : {
261 : // In test, tell control we are done flushing buffer.
262 13246 : if request.done_flush_tx.send(()).is_err() {
263 18 : tracing::debug!("control dropped");
264 13228 : }
265 : }
266 :
267 : // Sends the buffer back to the handle for reuse. The handle is in charged of cleaning the buffer.
268 13246 : if self.channel.send(slice).await.is_err() {
269 : // Although channel is closed. Still need to finish flushing the remaining buffers.
270 0 : continue;
271 13246 : }
272 : }
273 :
274 2321 : Ok(self.writer)
275 2321 : }
276 : }
277 :
278 : #[cfg(test)]
279 : pub(crate) struct FlushNotStarted {
280 : ready_to_flush_tx: tokio::sync::oneshot::Sender<()>,
281 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
282 : }
283 :
284 : #[cfg(test)]
285 : pub(crate) struct FlushInProgress {
286 : done_flush_rx: tokio::sync::oneshot::Receiver<()>,
287 : }
288 :
289 : #[cfg(test)]
290 : pub(crate) struct FlushDone;
291 :
292 : #[cfg(test)]
293 : impl FlushNotStarted {
294 : /// Signals the background task the buffer is ready to flush to disk.
295 13228 : pub fn ready_to_flush(self) -> FlushInProgress {
296 13228 : self.ready_to_flush_tx
297 13228 : .send(())
298 13228 : .map(|_| FlushInProgress {
299 13228 : done_flush_rx: self.done_flush_rx,
300 13228 : })
301 13228 : .unwrap()
302 13228 : }
303 : }
304 :
305 : #[cfg(test)]
306 : impl FlushInProgress {
307 : /// Waits until background flush is done.
308 13228 : pub async fn wait_until_flush_is_done(self) -> FlushDone {
309 13228 : self.done_flush_rx.await.unwrap();
310 13228 : FlushDone
311 13228 : }
312 : }
|