Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use std::io;
5 : use std::sync::Arc;
6 : use std::sync::atomic::AtomicU64;
7 :
8 : use camino::Utf8PathBuf;
9 : use num_traits::Num;
10 : use pageserver_api::shard::TenantShardId;
11 : use tokio_epoll_uring::{BoundedBuf, Slice};
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{error, info_span};
14 : use utils::id::TimelineId;
15 : use utils::sync::gate::GateGuard;
16 :
17 : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
18 : use crate::config::PageServerConf;
19 : use crate::context::RequestContext;
20 : use crate::page_cache;
21 : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
22 : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
23 : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
24 : use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
25 : use crate::virtual_file::{self, IoBufferMut, TempVirtualFile, VirtualFile, owned_buffers_io};
26 :
27 : use self::owned_buffers_io::write::OwnedAsyncWriter;
28 :
29 : pub struct EphemeralFile {
30 : _tenant_shard_id: TenantShardId,
31 : _timeline_id: TimelineId,
32 : page_cache_file_id: page_cache::FileId,
33 : bytes_written: u64,
34 : file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
35 : buffered_writer: BufferedWriter,
36 : }
37 :
38 : type BufferedWriter = owned_buffers_io::write::BufferedWriter<
39 : IoBufferMut,
40 : TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
41 : >;
42 :
43 : /// A TempVirtualFile that is co-owned by the [`EphemeralFile`]` and [`BufferedWriter`].
44 : ///
45 : /// (Actually [`BufferedWriter`] internally is just a client to a background flush task.
46 : /// The co-ownership is between [`EphemeralFile`] and that flush task.)
47 : ///
48 : /// Co-ownership allows us to serve reads for data that has already been flushed by the [`BufferedWriter`].
49 : #[derive(Debug, Clone)]
50 : struct TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
51 : inner: Arc<TempVirtualFile>,
52 : }
53 :
54 : const TAIL_SZ: usize = 64 * 1024;
55 :
56 : impl EphemeralFile {
57 7956 : pub async fn create(
58 7956 : conf: &PageServerConf,
59 7956 : tenant_shard_id: TenantShardId,
60 7956 : timeline_id: TimelineId,
61 7956 : gate: &utils::sync::gate::Gate,
62 7956 : cancel: &CancellationToken,
63 7956 : ctx: &RequestContext,
64 7956 : ) -> anyhow::Result<EphemeralFile> {
65 : // TempVirtualFile requires us to never reuse a filename while an old
66 : // instance of TempVirtualFile created with that filename is not done dropping yet.
67 : // So, we use a monotonic counter to disambiguate the filenames.
68 : static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
69 7956 : let filename_disambiguator =
70 7956 : NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
71 7956 :
72 7956 : let filename = conf
73 7956 : .timeline_path(&tenant_shard_id, &timeline_id)
74 7956 : .join(Utf8PathBuf::from(format!(
75 7956 : "ephemeral-{filename_disambiguator}"
76 7956 : )));
77 :
78 7956 : let file = TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter::new(
79 7956 : VirtualFile::open_with_options_v2(
80 7956 : &filename,
81 7956 : virtual_file::OpenOptions::new()
82 7956 : .create_new(true)
83 7956 : .read(true)
84 7956 : .write(true),
85 7956 : ctx,
86 7956 : )
87 7956 : .await?,
88 7956 : gate.enter()?,
89 : );
90 :
91 7956 : let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
92 7956 :
93 7956 : Ok(EphemeralFile {
94 7956 : _tenant_shard_id: tenant_shard_id,
95 7956 : _timeline_id: timeline_id,
96 7956 : page_cache_file_id,
97 7956 : bytes_written: 0,
98 7956 : file: file.clone(),
99 7956 : buffered_writer: BufferedWriter::new(
100 7956 : file,
101 : 0,
102 15912 : || IoBufferMut::with_capacity(TAIL_SZ),
103 7956 : gate.enter()?,
104 7956 : cancel.child_token(),
105 7956 : ctx,
106 7956 : info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
107 : ),
108 : })
109 7956 : }
110 : }
111 :
112 : impl TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
113 7956 : fn new(file: VirtualFile, gate_guard: GateGuard) -> Self {
114 7956 : Self {
115 7956 : inner: Arc::new(TempVirtualFile::new(file, gate_guard)),
116 7956 : }
117 7956 : }
118 : }
119 :
120 : impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
121 39840 : fn write_all_at<Buf: owned_buffers_io::io_buf_aligned::IoBufAligned + Send>(
122 39840 : &self,
123 39840 : buf: owned_buffers_io::io_buf_ext::FullSlice<Buf>,
124 39840 : offset: u64,
125 39840 : ctx: &RequestContext,
126 39840 : ) -> impl std::future::Future<
127 39840 : Output = (
128 39840 : owned_buffers_io::io_buf_ext::FullSlice<Buf>,
129 39840 : std::io::Result<()>,
130 39840 : ),
131 39840 : > + Send {
132 39840 : self.inner.write_all_at(buf, offset, ctx)
133 39840 : }
134 :
135 0 : fn set_len(
136 0 : &self,
137 0 : len: u64,
138 0 : ctx: &RequestContext,
139 0 : ) -> impl Future<Output = std::io::Result<()>> + Send {
140 0 : self.inner.set_len(len, ctx)
141 0 : }
142 : }
143 :
144 : impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
145 : type Target = VirtualFile;
146 :
147 223847 : fn deref(&self) -> &Self::Target {
148 223847 : &self.inner
149 223847 : }
150 : }
151 :
152 : #[derive(Debug, thiserror::Error)]
153 : pub(crate) enum EphemeralFileWriteError {
154 : #[error("{0}")]
155 : TooLong(String),
156 : #[error("cancelled")]
157 : Cancelled,
158 : }
159 :
160 : impl EphemeralFile {
161 57678888 : pub(crate) fn len(&self) -> u64 {
162 57678888 : self.bytes_written
163 57678888 : }
164 :
165 7908 : pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
166 7908 : self.page_cache_file_id
167 7908 : }
168 :
169 5820 : pub(crate) async fn load_to_io_buf(
170 5820 : &self,
171 5820 : ctx: &RequestContext,
172 5820 : ) -> Result<IoBufferMut, io::Error> {
173 5820 : let size = self.len().into_usize();
174 5820 : let buf = IoBufferMut::with_capacity(size);
175 5820 : let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
176 5820 : assert_eq!(nread, size);
177 5820 : let buf = slice.into_inner();
178 5820 : assert_eq!(buf.len(), nread);
179 5820 : assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
180 5820 : Ok(buf)
181 5820 : }
182 :
183 : /// Returns the offset at which the first byte of the input was written, for use
184 : /// in constructing indices over the written value.
185 : ///
186 : /// Panics if the write is short because there's no way we can recover from that.
187 : /// TODO: make upstack handle this as an error.
188 28829352 : pub(crate) async fn write_raw(
189 28829352 : &mut self,
190 28829352 : srcbuf: &[u8],
191 28829352 : ctx: &RequestContext,
192 28829352 : ) -> Result<u64, EphemeralFileWriteError> {
193 28829352 : let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
194 28829352 : if let Some(control) = control {
195 33228 : control.release().await;
196 28796124 : }
197 28829352 : Ok(pos)
198 28829352 : }
199 :
200 28829364 : async fn write_raw_controlled(
201 28829364 : &mut self,
202 28829364 : srcbuf: &[u8],
203 28829364 : ctx: &RequestContext,
204 28829364 : ) -> Result<(u64, Option<owned_buffers_io::write::FlushControl>), EphemeralFileWriteError> {
205 28829364 : let pos = self.bytes_written;
206 :
207 28829364 : let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
208 0 : EphemeralFileWriteError::TooLong(format!(
209 0 : "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
210 0 : srcbuf_len = srcbuf.len(),
211 0 : ))
212 28829364 : })?;
213 :
214 : // Write the payload
215 28829364 : let (nwritten, control) = self
216 28829364 : .buffered_writer
217 28829364 : .write_buffered_borrowed_controlled(srcbuf, ctx)
218 28829364 : .await
219 28829364 : .map_err(|e| match e {
220 0 : FlushTaskError::Cancelled => EphemeralFileWriteError::Cancelled,
221 28829364 : })?;
222 28829364 : assert_eq!(
223 28829364 : nwritten,
224 28829364 : srcbuf.len(),
225 0 : "buffered writer has no short writes"
226 : );
227 :
228 28829364 : self.bytes_written = new_bytes_written;
229 28829364 :
230 28829364 : Ok((pos, control))
231 28829364 : }
232 : }
233 :
234 : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
235 3179205 : async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
236 3179205 : &self,
237 3179205 : start: u64,
238 3179205 : dst: tokio_epoll_uring::Slice<B>,
239 3179205 : ctx: &RequestContext,
240 3179205 : ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
241 3179205 : let submitted_offset = self.buffered_writer.bytes_submitted();
242 :
243 3179205 : let mutable = match self.buffered_writer.inspect_mutable() {
244 3179205 : Some(mutable) => &mutable[0..mutable.pending()],
245 : None => {
246 : // Timeline::cancel and hence buffered writer flush was cancelled.
247 : // Remain read-available while timeline is shutting down.
248 0 : &[]
249 : }
250 : };
251 :
252 3179205 : let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
253 3179205 :
254 3179205 : let dst_cap = dst.bytes_total().into_u64();
255 3179205 : let end = {
256 : // saturating_add is correct here because the max file size is u64::MAX, so,
257 : // if start + dst.len() > u64::MAX, then we know it will be a short read
258 3179205 : let mut end: u64 = start.saturating_add(dst_cap);
259 3179205 : if end > self.bytes_written {
260 1664257 : end = self.bytes_written;
261 1664257 : }
262 3179205 : end
263 : };
264 :
265 : // inclusive, exclusive
266 : #[derive(Debug)]
267 : struct Range<N>(N, N);
268 : impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
269 20649276 : fn len(&self) -> N {
270 20649276 : if self.0 > self.1 {
271 11048944 : N::zero()
272 : } else {
273 9600332 : self.1 - self.0
274 : }
275 20649276 : }
276 : }
277 :
278 3179205 : let (written_range, maybe_flushed_range) = {
279 3179205 : if maybe_flushed.is_some() {
280 : // [ written ][ maybe_flushed ][ mutable ]
281 : // ^
282 : // `submitted_offset`
283 : // <++++++ on disk +++++++????????????????>
284 3116461 : (
285 3116461 : Range(
286 3116461 : start,
287 3116461 : std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
288 3116461 : ),
289 3116461 : Range(
290 3116461 : std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
291 3116461 : std::cmp::min(end, submitted_offset),
292 3116461 : ),
293 3116461 : )
294 : } else {
295 : // [ written ][ mutable ]
296 : // ^
297 : // `submitted_offset`
298 : // <++++++ on disk +++++++++++++++++++++++>
299 62744 : (
300 62744 : Range(start, std::cmp::min(end, submitted_offset)),
301 62744 : // zero len
302 62744 : Range(submitted_offset, u64::MIN),
303 62744 : )
304 : }
305 : };
306 :
307 3179205 : let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
308 :
309 3179205 : let dst = if written_range.len() > 0 {
310 223823 : let bounds = dst.bounds();
311 223823 : let slice = self
312 223823 : .file
313 223823 : .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
314 223823 : .await?;
315 223823 : Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
316 : } else {
317 2955382 : dst
318 : };
319 :
320 3179205 : let dst = if maybe_flushed_range.len() > 0 {
321 980578 : let offset_in_buffer = maybe_flushed_range
322 980578 : .0
323 980578 : .checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
324 980578 : .unwrap()
325 980578 : .into_usize();
326 980578 : // Checked previously the buffer is Some.
327 980578 : let maybe_flushed = maybe_flushed.unwrap();
328 980578 : let to_copy = &maybe_flushed
329 980578 : [offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
330 980578 : let bounds = dst.bounds();
331 980578 : let mut view = dst.slice({
332 980578 : let start = written_range.len().into_usize();
333 980578 : let end = start
334 980578 : .checked_add(maybe_flushed_range.len().into_usize())
335 980578 : .unwrap();
336 980578 : start..end
337 980578 : });
338 980578 : view.as_mut_rust_slice_full_zeroed()
339 980578 : .copy_from_slice(to_copy);
340 980578 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
341 : } else {
342 2198627 : dst
343 : };
344 :
345 3179205 : let dst = if mutable_range.len() > 0 {
346 1986526 : let offset_in_buffer = mutable_range
347 1986526 : .0
348 1986526 : .checked_sub(submitted_offset)
349 1986526 : .unwrap()
350 1986526 : .into_usize();
351 1986526 : let to_copy =
352 1986526 : &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
353 1986526 : let bounds = dst.bounds();
354 1986526 : let mut view = dst.slice({
355 1986526 : let start =
356 1986526 : written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
357 1986526 : let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
358 1986526 : start..end
359 1986526 : });
360 1986526 : view.as_mut_rust_slice_full_zeroed()
361 1986526 : .copy_from_slice(to_copy);
362 1986526 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
363 : } else {
364 1192679 : dst
365 : };
366 :
367 : // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
368 :
369 3179205 : Ok((dst, (end - start).into_usize()))
370 3179205 : }
371 : }
372 :
373 : /// Does the given filename look like an ephemeral file?
374 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
375 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
376 0 : rest.parse::<u32>().is_ok()
377 : } else {
378 0 : false
379 : }
380 0 : }
381 :
382 : #[cfg(test)]
383 : mod tests {
384 : use std::fs;
385 : use std::str::FromStr;
386 :
387 : use rand::Rng;
388 :
389 : use super::*;
390 : use crate::context::DownloadBehavior;
391 : use crate::task_mgr::TaskKind;
392 :
393 48 : fn harness(
394 48 : test_name: &str,
395 48 : ) -> Result<
396 48 : (
397 48 : &'static PageServerConf,
398 48 : TenantShardId,
399 48 : TimelineId,
400 48 : RequestContext,
401 48 : ),
402 48 : io::Error,
403 48 : > {
404 48 : let repo_dir = PageServerConf::test_repo_dir(test_name);
405 48 : let _ = fs::remove_dir_all(&repo_dir);
406 48 : let conf = PageServerConf::dummy_conf(repo_dir);
407 48 : // Make a static copy of the config. This can never be free'd, but that's
408 48 : // OK in a test.
409 48 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
410 48 :
411 48 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
412 48 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
413 48 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
414 :
415 48 : let ctx =
416 48 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
417 48 :
418 48 : Ok((conf, tenant_shard_id, timeline_id, ctx))
419 48 : }
420 :
421 : #[tokio::test]
422 12 : async fn ephemeral_file_holds_gate_open() {
423 12 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
424 12 :
425 12 : let (conf, tenant_id, timeline_id, ctx) =
426 12 : harness("ephemeral_file_holds_gate_open").unwrap();
427 12 :
428 12 : let gate = utils::sync::gate::Gate::default();
429 12 : let cancel = CancellationToken::new();
430 12 :
431 12 : let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
432 12 : .await
433 12 : .unwrap();
434 12 :
435 12 : let mut closing = tokio::task::spawn(async move {
436 12 : gate.close().await;
437 12 : });
438 12 :
439 12 : // gate is entered until the ephemeral file is dropped
440 12 : // do not start paused tokio-epoll-uring has a sleep loop
441 12 : tokio::time::pause();
442 12 : tokio::time::timeout(FOREVER, &mut closing)
443 12 : .await
444 12 : .expect_err("closing cannot complete before dropping");
445 12 :
446 12 : // this is a requirement of the reset_tenant functionality: we have to be able to restart a
447 12 : // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
448 12 : drop(file);
449 12 :
450 12 : tokio::time::timeout(FOREVER, &mut closing)
451 12 : .await
452 12 : .expect("closing completes right away")
453 12 : .expect("closing does not panic");
454 12 : }
455 :
456 : #[tokio::test]
457 12 : async fn test_ephemeral_file_basics() {
458 12 : let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
459 12 :
460 12 : let gate = utils::sync::gate::Gate::default();
461 12 : let cancel = CancellationToken::new();
462 12 :
463 12 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
464 12 : .await
465 12 : .unwrap();
466 12 :
467 12 : let mutable = file.buffered_writer.mutable();
468 12 : let cap = mutable.capacity();
469 12 : let align = mutable.align();
470 12 :
471 12 : let write_nbytes = cap * 2 + cap / 2;
472 12 :
473 12 : let content: Vec<u8> = rand::thread_rng()
474 12 : .sample_iter(rand::distributions::Standard)
475 12 : .take(write_nbytes)
476 12 : .collect();
477 12 :
478 12 : let mut value_offsets = Vec::new();
479 3840 : for range in (0..write_nbytes)
480 12 : .step_by(align)
481 3840 : .map(|start| start..(start + align).min(write_nbytes))
482 12 : {
483 3840 : let off = file.write_raw(&content[range], &ctx).await.unwrap();
484 3840 : value_offsets.push(off);
485 12 : }
486 12 :
487 12 : assert_eq!(file.len() as usize, write_nbytes);
488 3840 : for (i, range) in (0..write_nbytes)
489 12 : .step_by(align)
490 3840 : .map(|start| start..(start + align).min(write_nbytes))
491 12 : .enumerate()
492 12 : {
493 3840 : assert_eq!(value_offsets[i], range.start.into_u64());
494 3840 : let buf = IoBufferMut::with_capacity(range.len());
495 3840 : let (buf_slice, nread) = file
496 3840 : .read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx)
497 3840 : .await
498 3840 : .unwrap();
499 3840 : let buf = buf_slice.into_inner();
500 3840 : assert_eq!(nread, range.len());
501 3840 : assert_eq!(&buf, &content[range]);
502 12 : }
503 12 :
504 12 : let file_contents = std::fs::read(file.file.path()).unwrap();
505 12 : assert!(file_contents == content[0..cap * 2]);
506 12 :
507 12 : let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
508 12 : assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
509 12 :
510 12 : let mutable_buffer_contents = file.buffered_writer.mutable();
511 12 : assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
512 12 : }
513 :
514 : #[tokio::test]
515 12 : async fn test_flushes_do_happen() {
516 12 : let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
517 12 :
518 12 : let gate = utils::sync::gate::Gate::default();
519 12 : let cancel = CancellationToken::new();
520 12 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
521 12 : .await
522 12 : .unwrap();
523 12 :
524 12 : // mutable buffer and maybe_flushed buffer each has `cap` bytes.
525 12 : let cap = file.buffered_writer.mutable().capacity();
526 12 :
527 12 : let content: Vec<u8> = rand::thread_rng()
528 12 : .sample_iter(rand::distributions::Standard)
529 12 : .take(cap * 2 + cap / 2)
530 12 : .collect();
531 12 :
532 12 : file.write_raw(&content, &ctx).await.unwrap();
533 12 :
534 12 : // assert the state is as this test expects it to be
535 12 : let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
536 12 : assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
537 12 : let md = file.file.path().metadata().unwrap();
538 12 : assert_eq!(
539 12 : md.len(),
540 12 : 2 * cap.into_u64(),
541 12 : "buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
542 12 : );
543 12 : assert_eq!(
544 12 : &file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
545 12 : &content[cap..cap * 2]
546 12 : );
547 12 : assert_eq!(
548 12 : &file.buffered_writer.mutable()[0..cap / 2],
549 12 : &content[cap * 2..cap * 2 + cap / 2]
550 12 : );
551 12 : }
552 :
553 : #[tokio::test]
554 12 : async fn test_read_split_across_file_and_buffer() {
555 12 : // This test exercises the logic on the read path that splits the logical read
556 12 : // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
557 12 : //
558 12 : // This test build on the assertions in test_flushes_do_happen
559 12 :
560 12 : let (conf, tenant_id, timeline_id, ctx) =
561 12 : harness("test_read_split_across_file_and_buffer").unwrap();
562 12 :
563 12 : let gate = utils::sync::gate::Gate::default();
564 12 : let cancel = CancellationToken::new();
565 12 :
566 12 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
567 12 : .await
568 12 : .unwrap();
569 12 :
570 12 : let mutable = file.buffered_writer.mutable();
571 12 : let cap = mutable.capacity();
572 12 : let align = mutable.align();
573 12 : let content: Vec<u8> = rand::thread_rng()
574 12 : .sample_iter(rand::distributions::Standard)
575 12 : .take(cap * 2 + cap / 2)
576 12 : .collect();
577 12 :
578 12 : let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap();
579 12 :
580 324 : let test_read = |start: usize, len: usize| {
581 324 : let file = &file;
582 324 : let ctx = &ctx;
583 324 : let content = &content;
584 324 : async move {
585 324 : let (buf, nread) = file
586 324 : .read_exact_at_eof_ok(
587 324 : start.into_u64(),
588 324 : IoBufferMut::with_capacity(len).slice_full(),
589 324 : ctx,
590 324 : )
591 324 : .await
592 324 : .unwrap();
593 324 : assert_eq!(nread, len);
594 324 : assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
595 324 : }
596 324 : };
597 12 :
598 36 : let test_read_all_offset_combinations = || {
599 36 : async move {
600 36 : test_read(align, align).await;
601 12 : // border onto edge of file
602 36 : test_read(cap - align, align).await;
603 12 : // read across file and buffer
604 36 : test_read(cap - align, 2 * align).await;
605 12 : // stay from start of maybe flushed buffer
606 36 : test_read(cap, align).await;
607 12 : // completely within maybe flushed buffer
608 36 : test_read(cap + align, align).await;
609 12 : // border onto edge of maybe flushed buffer.
610 36 : test_read(cap * 2 - align, align).await;
611 12 : // read across maybe flushed and mutable buffer
612 36 : test_read(cap * 2 - align, 2 * align).await;
613 12 : // read across three segments
614 36 : test_read(cap - align, cap + 2 * align).await;
615 12 : // completely within mutable buffer
616 36 : test_read(cap * 2 + align, align).await;
617 36 : }
618 36 : };
619 12 :
620 12 : // completely within the file range
621 12 : assert!(align < cap, "test assumption");
622 12 : assert!(cap % align == 0);
623 12 :
624 12 : // test reads at different flush stages.
625 12 : let not_started = control.unwrap().into_not_started();
626 12 : test_read_all_offset_combinations().await;
627 12 : let in_progress = not_started.ready_to_flush();
628 12 : test_read_all_offset_combinations().await;
629 12 : in_progress.wait_until_flush_is_done().await;
630 12 : test_read_all_offset_combinations().await;
631 12 : }
632 : }
|