Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use std::io;
5 : use std::sync::Arc;
6 : use std::sync::atomic::AtomicU64;
7 :
8 : use camino::Utf8PathBuf;
9 : use num_traits::Num;
10 : use pageserver_api::shard::TenantShardId;
11 : use tokio_epoll_uring::{BoundedBuf, Slice};
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{error, info_span};
14 : use utils::id::TimelineId;
15 :
16 : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
17 : use crate::config::PageServerConf;
18 : use crate::context::RequestContext;
19 : use crate::page_cache;
20 : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
21 : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
22 : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
23 : use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
24 : use crate::virtual_file::{self, IoBufferMut, VirtualFile, owned_buffers_io};
25 :
26 : pub struct EphemeralFile {
27 : _tenant_shard_id: TenantShardId,
28 : _timeline_id: TimelineId,
29 : page_cache_file_id: page_cache::FileId,
30 : bytes_written: u64,
31 : buffered_writer: owned_buffers_io::write::BufferedWriter<IoBufferMut, VirtualFile>,
32 : /// Gate guard is held on as long as we need to do operations in the path (delete on drop)
33 : _gate_guard: utils::sync::gate::GateGuard,
34 : }
35 :
36 : const TAIL_SZ: usize = 64 * 1024;
37 :
38 : impl EphemeralFile {
39 7956 : pub async fn create(
40 7956 : conf: &PageServerConf,
41 7956 : tenant_shard_id: TenantShardId,
42 7956 : timeline_id: TimelineId,
43 7956 : gate: &utils::sync::gate::Gate,
44 7956 : cancel: &CancellationToken,
45 7956 : ctx: &RequestContext,
46 7956 : ) -> anyhow::Result<EphemeralFile> {
47 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
48 7956 : let filename_disambiguator =
49 7956 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
50 7956 :
51 7956 : let filename = conf
52 7956 : .timeline_path(&tenant_shard_id, &timeline_id)
53 7956 : .join(Utf8PathBuf::from(format!(
54 7956 : "ephemeral-{filename_disambiguator}"
55 7956 : )));
56 :
57 7956 : let file = Arc::new(
58 7956 : VirtualFile::open_with_options_v2(
59 7956 : &filename,
60 7956 : virtual_file::OpenOptions::new()
61 7956 : .read(true)
62 7956 : .write(true)
63 7956 : .create(true),
64 7956 : ctx,
65 7956 : )
66 7956 : .await?,
67 : );
68 :
69 7956 : let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
70 7956 :
71 7956 : Ok(EphemeralFile {
72 7956 : _tenant_shard_id: tenant_shard_id,
73 7956 : _timeline_id: timeline_id,
74 7956 : page_cache_file_id,
75 7956 : bytes_written: 0,
76 7956 : buffered_writer: owned_buffers_io::write::BufferedWriter::new(
77 7956 : file,
78 15912 : || IoBufferMut::with_capacity(TAIL_SZ),
79 7956 : gate.enter()?,
80 7956 : cancel.child_token(),
81 7956 : ctx,
82 7956 : info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
83 : ),
84 7956 : _gate_guard: gate.enter()?,
85 : })
86 7956 : }
87 : }
88 :
89 : impl Drop for EphemeralFile {
90 7176 : fn drop(&mut self) {
91 7176 : // unlink the file
92 7176 : // we are clear to do this, because we have entered a gate
93 7176 : let path = self.buffered_writer.as_inner().path();
94 7176 : let res = std::fs::remove_file(path);
95 7176 : if let Err(e) = res {
96 12 : if e.kind() != std::io::ErrorKind::NotFound {
97 : // just never log the not found errors, we cannot do anything for them; on detach
98 : // the tenant directory is already gone.
99 : //
100 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
101 0 : error!("could not remove ephemeral file '{path}': {e}");
102 12 : }
103 7164 : }
104 7176 : }
105 : }
106 :
107 : #[derive(Debug, thiserror::Error)]
108 : pub(crate) enum EphemeralFileWriteError {
109 : #[error("{0}")]
110 : TooLong(String),
111 : #[error("cancelled")]
112 : Cancelled,
113 : }
114 :
115 : impl EphemeralFile {
116 57678888 : pub(crate) fn len(&self) -> u64 {
117 57678888 : self.bytes_written
118 57678888 : }
119 :
120 7908 : pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
121 7908 : self.page_cache_file_id
122 7908 : }
123 :
124 5820 : pub(crate) async fn load_to_io_buf(
125 5820 : &self,
126 5820 : ctx: &RequestContext,
127 5820 : ) -> Result<IoBufferMut, io::Error> {
128 5820 : let size = self.len().into_usize();
129 5820 : let buf = IoBufferMut::with_capacity(size);
130 5820 : let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
131 5820 : assert_eq!(nread, size);
132 5820 : let buf = slice.into_inner();
133 5820 : assert_eq!(buf.len(), nread);
134 5820 : assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
135 5820 : Ok(buf)
136 5820 : }
137 :
138 : /// Returns the offset at which the first byte of the input was written, for use
139 : /// in constructing indices over the written value.
140 : ///
141 : /// Panics if the write is short because there's no way we can recover from that.
142 : /// TODO: make upstack handle this as an error.
143 28829352 : pub(crate) async fn write_raw(
144 28829352 : &mut self,
145 28829352 : srcbuf: &[u8],
146 28829352 : ctx: &RequestContext,
147 28829352 : ) -> Result<u64, EphemeralFileWriteError> {
148 28829352 : let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
149 28829352 : if let Some(control) = control {
150 33228 : control.release().await;
151 28796124 : }
152 28829352 : Ok(pos)
153 28829352 : }
154 :
155 28829364 : async fn write_raw_controlled(
156 28829364 : &mut self,
157 28829364 : srcbuf: &[u8],
158 28829364 : ctx: &RequestContext,
159 28829364 : ) -> Result<(u64, Option<owned_buffers_io::write::FlushControl>), EphemeralFileWriteError> {
160 28829364 : let pos = self.bytes_written;
161 :
162 28829364 : let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
163 0 : EphemeralFileWriteError::TooLong(format!(
164 0 : "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
165 0 : srcbuf_len = srcbuf.len(),
166 0 : ))
167 28829364 : })?;
168 :
169 : // Write the payload
170 28829364 : let (nwritten, control) = self
171 28829364 : .buffered_writer
172 28829364 : .write_buffered_borrowed_controlled(srcbuf, ctx)
173 28829364 : .await
174 28829364 : .map_err(|e| match e {
175 0 : FlushTaskError::Cancelled => EphemeralFileWriteError::Cancelled,
176 28829364 : })?;
177 28829364 : assert_eq!(
178 28829364 : nwritten,
179 28829364 : srcbuf.len(),
180 0 : "buffered writer has no short writes"
181 : );
182 :
183 28829364 : self.bytes_written = new_bytes_written;
184 28829364 :
185 28829364 : Ok((pos, control))
186 28829364 : }
187 : }
188 :
189 : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
190 3178887 : async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
191 3178887 : &self,
192 3178887 : start: u64,
193 3178887 : dst: tokio_epoll_uring::Slice<B>,
194 3178887 : ctx: &RequestContext,
195 3178887 : ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
196 3178887 : let submitted_offset = self.buffered_writer.bytes_submitted();
197 :
198 3178887 : let mutable = match self.buffered_writer.inspect_mutable() {
199 3178887 : Some(mutable) => &mutable[0..mutable.pending()],
200 : None => {
201 : // Timeline::cancel and hence buffered writer flush was cancelled.
202 : // Remain read-available while timeline is shutting down.
203 0 : &[]
204 : }
205 : };
206 :
207 3178887 : let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
208 3178887 :
209 3178887 : let dst_cap = dst.bytes_total().into_u64();
210 3178887 : let end = {
211 : // saturating_add is correct here because the max file size is u64::MAX, so,
212 : // if start + dst.len() > u64::MAX, then we know it will be a short read
213 3178887 : let mut end: u64 = start.saturating_add(dst_cap);
214 3178887 : if end > self.bytes_written {
215 1664255 : end = self.bytes_written;
216 1664255 : }
217 3178887 : end
218 : };
219 :
220 : // inclusive, exclusive
221 : #[derive(Debug)]
222 : struct Range<N>(N, N);
223 : impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
224 20647579 : fn len(&self) -> N {
225 20647579 : if self.0 > self.1 {
226 11047847 : N::zero()
227 : } else {
228 9599732 : self.1 - self.0
229 : }
230 20647579 : }
231 : }
232 :
233 3178887 : let (written_range, maybe_flushed_range) = {
234 3178887 : if maybe_flushed.is_some() {
235 : // [ written ][ maybe_flushed ][ mutable ]
236 : // ^
237 : // `submitted_offset`
238 : // <++++++ on disk +++++++????????????????>
239 3116214 : (
240 3116214 : Range(
241 3116214 : start,
242 3116214 : std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
243 3116214 : ),
244 3116214 : Range(
245 3116214 : std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
246 3116214 : std::cmp::min(end, submitted_offset),
247 3116214 : ),
248 3116214 : )
249 : } else {
250 : // [ written ][ mutable ]
251 : // ^
252 : // `submitted_offset`
253 : // <++++++ on disk +++++++++++++++++++++++>
254 62673 : (
255 62673 : Range(start, std::cmp::min(end, submitted_offset)),
256 62673 : // zero len
257 62673 : Range(submitted_offset, u64::MIN),
258 62673 : )
259 : }
260 : };
261 :
262 3178887 : let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
263 :
264 3178887 : let dst = if written_range.len() > 0 {
265 223795 : let file: &VirtualFile = self.buffered_writer.as_inner();
266 223795 : let bounds = dst.bounds();
267 223795 : let slice = file
268 223795 : .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
269 223795 : .await?;
270 223795 : Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
271 : } else {
272 2955092 : dst
273 : };
274 :
275 3178887 : let dst = if maybe_flushed_range.len() > 0 {
276 980373 : let offset_in_buffer = maybe_flushed_range
277 980373 : .0
278 980373 : .checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
279 980373 : .unwrap()
280 980373 : .into_usize();
281 980373 : // Checked previously the buffer is Some.
282 980373 : let maybe_flushed = maybe_flushed.unwrap();
283 980373 : let to_copy = &maybe_flushed
284 980373 : [offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
285 980373 : let bounds = dst.bounds();
286 980373 : let mut view = dst.slice({
287 980373 : let start = written_range.len().into_usize();
288 980373 : let end = start
289 980373 : .checked_add(maybe_flushed_range.len().into_usize())
290 980373 : .unwrap();
291 980373 : start..end
292 980373 : });
293 980373 : view.as_mut_rust_slice_full_zeroed()
294 980373 : .copy_from_slice(to_copy);
295 980373 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
296 : } else {
297 2198514 : dst
298 : };
299 :
300 3178887 : let dst = if mutable_range.len() > 0 {
301 1986501 : let offset_in_buffer = mutable_range
302 1986501 : .0
303 1986501 : .checked_sub(submitted_offset)
304 1986501 : .unwrap()
305 1986501 : .into_usize();
306 1986501 : let to_copy =
307 1986501 : &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
308 1986501 : let bounds = dst.bounds();
309 1986501 : let mut view = dst.slice({
310 1986501 : let start =
311 1986501 : written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
312 1986501 : let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
313 1986501 : start..end
314 1986501 : });
315 1986501 : view.as_mut_rust_slice_full_zeroed()
316 1986501 : .copy_from_slice(to_copy);
317 1986501 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
318 : } else {
319 1192386 : dst
320 : };
321 :
322 : // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
323 :
324 3178887 : Ok((dst, (end - start).into_usize()))
325 3178887 : }
326 : }
327 :
328 : /// Does the given filename look like an ephemeral file?
329 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
330 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
331 0 : rest.parse::<u32>().is_ok()
332 : } else {
333 0 : false
334 : }
335 0 : }
336 :
337 : #[cfg(test)]
338 : mod tests {
339 : use std::fs;
340 : use std::str::FromStr;
341 :
342 : use rand::Rng;
343 :
344 : use super::*;
345 : use crate::context::DownloadBehavior;
346 : use crate::task_mgr::TaskKind;
347 :
348 48 : fn harness(
349 48 : test_name: &str,
350 48 : ) -> Result<
351 48 : (
352 48 : &'static PageServerConf,
353 48 : TenantShardId,
354 48 : TimelineId,
355 48 : RequestContext,
356 48 : ),
357 48 : io::Error,
358 48 : > {
359 48 : let repo_dir = PageServerConf::test_repo_dir(test_name);
360 48 : let _ = fs::remove_dir_all(&repo_dir);
361 48 : let conf = PageServerConf::dummy_conf(repo_dir);
362 48 : // Make a static copy of the config. This can never be free'd, but that's
363 48 : // OK in a test.
364 48 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
365 48 :
366 48 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
367 48 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
368 48 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
369 :
370 48 : let ctx =
371 48 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
372 48 :
373 48 : Ok((conf, tenant_shard_id, timeline_id, ctx))
374 48 : }
375 :
376 : #[tokio::test]
377 12 : async fn ephemeral_file_holds_gate_open() {
378 12 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
379 12 :
380 12 : let (conf, tenant_id, timeline_id, ctx) =
381 12 : harness("ephemeral_file_holds_gate_open").unwrap();
382 12 :
383 12 : let gate = utils::sync::gate::Gate::default();
384 12 : let cancel = CancellationToken::new();
385 12 :
386 12 : let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
387 12 : .await
388 12 : .unwrap();
389 12 :
390 12 : let mut closing = tokio::task::spawn(async move {
391 12 : gate.close().await;
392 12 : });
393 12 :
394 12 : // gate is entered until the ephemeral file is dropped
395 12 : // do not start paused tokio-epoll-uring has a sleep loop
396 12 : tokio::time::pause();
397 12 : tokio::time::timeout(FOREVER, &mut closing)
398 12 : .await
399 12 : .expect_err("closing cannot complete before dropping");
400 12 :
401 12 : // this is a requirement of the reset_tenant functionality: we have to be able to restart a
402 12 : // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
403 12 : drop(file);
404 12 :
405 12 : tokio::time::timeout(FOREVER, &mut closing)
406 12 : .await
407 12 : .expect("closing completes right away")
408 12 : .expect("closing does not panic");
409 12 : }
410 :
411 : #[tokio::test]
412 12 : async fn test_ephemeral_file_basics() {
413 12 : let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
414 12 :
415 12 : let gate = utils::sync::gate::Gate::default();
416 12 : let cancel = CancellationToken::new();
417 12 :
418 12 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
419 12 : .await
420 12 : .unwrap();
421 12 :
422 12 : let mutable = file.buffered_writer.mutable();
423 12 : let cap = mutable.capacity();
424 12 : let align = mutable.align();
425 12 :
426 12 : let write_nbytes = cap * 2 + cap / 2;
427 12 :
428 12 : let content: Vec<u8> = rand::thread_rng()
429 12 : .sample_iter(rand::distributions::Standard)
430 12 : .take(write_nbytes)
431 12 : .collect();
432 12 :
433 12 : let mut value_offsets = Vec::new();
434 3840 : for range in (0..write_nbytes)
435 12 : .step_by(align)
436 3840 : .map(|start| start..(start + align).min(write_nbytes))
437 12 : {
438 3840 : let off = file.write_raw(&content[range], &ctx).await.unwrap();
439 3840 : value_offsets.push(off);
440 12 : }
441 12 :
442 12 : assert_eq!(file.len() as usize, write_nbytes);
443 3840 : for (i, range) in (0..write_nbytes)
444 12 : .step_by(align)
445 3840 : .map(|start| start..(start + align).min(write_nbytes))
446 12 : .enumerate()
447 12 : {
448 3840 : assert_eq!(value_offsets[i], range.start.into_u64());
449 3840 : let buf = IoBufferMut::with_capacity(range.len());
450 3840 : let (buf_slice, nread) = file
451 3840 : .read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx)
452 3840 : .await
453 3840 : .unwrap();
454 3840 : let buf = buf_slice.into_inner();
455 3840 : assert_eq!(nread, range.len());
456 3840 : assert_eq!(&buf, &content[range]);
457 12 : }
458 12 :
459 12 : let file_contents = std::fs::read(file.buffered_writer.as_inner().path()).unwrap();
460 12 : assert!(file_contents == content[0..cap * 2]);
461 12 :
462 12 : let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
463 12 : assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
464 12 :
465 12 : let mutable_buffer_contents = file.buffered_writer.mutable();
466 12 : assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
467 12 : }
468 :
469 : #[tokio::test]
470 12 : async fn test_flushes_do_happen() {
471 12 : let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
472 12 :
473 12 : let gate = utils::sync::gate::Gate::default();
474 12 : let cancel = CancellationToken::new();
475 12 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
476 12 : .await
477 12 : .unwrap();
478 12 :
479 12 : // mutable buffer and maybe_flushed buffer each has `cap` bytes.
480 12 : let cap = file.buffered_writer.mutable().capacity();
481 12 :
482 12 : let content: Vec<u8> = rand::thread_rng()
483 12 : .sample_iter(rand::distributions::Standard)
484 12 : .take(cap * 2 + cap / 2)
485 12 : .collect();
486 12 :
487 12 : file.write_raw(&content, &ctx).await.unwrap();
488 12 :
489 12 : // assert the state is as this test expects it to be
490 12 : let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
491 12 : assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
492 12 : let md = file.buffered_writer.as_inner().path().metadata().unwrap();
493 12 : assert_eq!(
494 12 : md.len(),
495 12 : 2 * cap.into_u64(),
496 12 : "buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
497 12 : );
498 12 : assert_eq!(
499 12 : &file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
500 12 : &content[cap..cap * 2]
501 12 : );
502 12 : assert_eq!(
503 12 : &file.buffered_writer.mutable()[0..cap / 2],
504 12 : &content[cap * 2..cap * 2 + cap / 2]
505 12 : );
506 12 : }
507 :
508 : #[tokio::test]
509 12 : async fn test_read_split_across_file_and_buffer() {
510 12 : // This test exercises the logic on the read path that splits the logical read
511 12 : // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
512 12 : //
513 12 : // This test build on the assertions in test_flushes_do_happen
514 12 :
515 12 : let (conf, tenant_id, timeline_id, ctx) =
516 12 : harness("test_read_split_across_file_and_buffer").unwrap();
517 12 :
518 12 : let gate = utils::sync::gate::Gate::default();
519 12 : let cancel = CancellationToken::new();
520 12 :
521 12 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
522 12 : .await
523 12 : .unwrap();
524 12 :
525 12 : let mutable = file.buffered_writer.mutable();
526 12 : let cap = mutable.capacity();
527 12 : let align = mutable.align();
528 12 : let content: Vec<u8> = rand::thread_rng()
529 12 : .sample_iter(rand::distributions::Standard)
530 12 : .take(cap * 2 + cap / 2)
531 12 : .collect();
532 12 :
533 12 : let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap();
534 12 :
535 324 : let test_read = |start: usize, len: usize| {
536 324 : let file = &file;
537 324 : let ctx = &ctx;
538 324 : let content = &content;
539 324 : async move {
540 324 : let (buf, nread) = file
541 324 : .read_exact_at_eof_ok(
542 324 : start.into_u64(),
543 324 : IoBufferMut::with_capacity(len).slice_full(),
544 324 : ctx,
545 324 : )
546 324 : .await
547 324 : .unwrap();
548 324 : assert_eq!(nread, len);
549 324 : assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
550 324 : }
551 324 : };
552 12 :
553 36 : let test_read_all_offset_combinations = || {
554 36 : async move {
555 36 : test_read(align, align).await;
556 12 : // border onto edge of file
557 36 : test_read(cap - align, align).await;
558 12 : // read across file and buffer
559 36 : test_read(cap - align, 2 * align).await;
560 12 : // stay from start of maybe flushed buffer
561 36 : test_read(cap, align).await;
562 12 : // completely within maybe flushed buffer
563 36 : test_read(cap + align, align).await;
564 12 : // border onto edge of maybe flushed buffer.
565 36 : test_read(cap * 2 - align, align).await;
566 12 : // read across maybe flushed and mutable buffer
567 36 : test_read(cap * 2 - align, 2 * align).await;
568 12 : // read across three segments
569 36 : test_read(cap - align, cap + 2 * align).await;
570 12 : // completely within mutable buffer
571 36 : test_read(cap * 2 + align, align).await;
572 36 : }
573 36 : };
574 12 :
575 12 : // completely within the file range
576 12 : assert!(align < cap, "test assumption");
577 12 : assert!(cap % align == 0);
578 12 :
579 12 : // test reads at different flush stages.
580 12 : let not_started = control.unwrap().into_not_started();
581 12 : test_read_all_offset_combinations().await;
582 12 : let in_progress = not_started.ready_to_flush();
583 12 : test_read_all_offset_combinations().await;
584 12 : in_progress.wait_until_flush_is_done().await;
585 12 : test_read_all_offset_combinations().await;
586 12 : }
587 : }
|