Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use std::io;
5 : use std::sync::Arc;
6 : use std::sync::atomic::AtomicU64;
7 :
8 : use camino::Utf8PathBuf;
9 : use num_traits::Num;
10 : use pageserver_api::shard::TenantShardId;
11 : use tokio_epoll_uring::{BoundedBuf, Slice};
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{error, info_span};
14 : use utils::id::TimelineId;
15 : use utils::sync::gate::GateGuard;
16 :
17 : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
18 : use crate::config::PageServerConf;
19 : use crate::context::RequestContext;
20 : use crate::page_cache;
21 : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
22 : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
23 : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
24 : use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
25 : use crate::virtual_file::{self, IoBufferMut, TempVirtualFile, VirtualFile, owned_buffers_io};
26 :
27 : use self::owned_buffers_io::write::OwnedAsyncWriter;
28 :
29 : pub struct EphemeralFile {
30 : _tenant_shard_id: TenantShardId,
31 : _timeline_id: TimelineId,
32 : page_cache_file_id: page_cache::FileId,
33 : bytes_written: u64,
34 : file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
35 : buffered_writer: BufferedWriter,
36 : }
37 :
38 : type BufferedWriter = owned_buffers_io::write::BufferedWriter<
39 : IoBufferMut,
40 : TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
41 : >;
42 :
43 : /// A TempVirtualFile that is co-owned by the [`EphemeralFile`]` and [`BufferedWriter`].
44 : ///
45 : /// (Actually [`BufferedWriter`] internally is just a client to a background flush task.
46 : /// The co-ownership is between [`EphemeralFile`] and that flush task.)
47 : ///
48 : /// Co-ownership allows us to serve reads for data that has already been flushed by the [`BufferedWriter`].
49 : #[derive(Debug, Clone)]
50 : struct TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
51 : inner: Arc<TempVirtualFile>,
52 : }
53 :
54 : const TAIL_SZ: usize = 64 * 1024;
55 :
56 : impl EphemeralFile {
57 665 : pub async fn create(
58 665 : conf: &PageServerConf,
59 665 : tenant_shard_id: TenantShardId,
60 665 : timeline_id: TimelineId,
61 665 : gate: &utils::sync::gate::Gate,
62 665 : cancel: &CancellationToken,
63 665 : ctx: &RequestContext,
64 665 : ) -> anyhow::Result<EphemeralFile> {
65 : // TempVirtualFile requires us to never reuse a filename while an old
66 : // instance of TempVirtualFile created with that filename is not done dropping yet.
67 : // So, we use a monotonic counter to disambiguate the filenames.
68 : static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
69 665 : let filename_disambiguator =
70 665 : NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
71 665 :
72 665 : let filename = conf
73 665 : .timeline_path(&tenant_shard_id, &timeline_id)
74 665 : .join(Utf8PathBuf::from(format!(
75 665 : "ephemeral-{filename_disambiguator}"
76 665 : )));
77 :
78 665 : let file = TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter::new(
79 665 : VirtualFile::open_with_options_v2(
80 665 : &filename,
81 665 : virtual_file::OpenOptions::new()
82 665 : .create_new(true)
83 665 : .read(true)
84 665 : .write(true),
85 665 : ctx,
86 665 : )
87 665 : .await?,
88 665 : gate.enter()?,
89 : );
90 :
91 665 : let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
92 665 :
93 665 : Ok(EphemeralFile {
94 665 : _tenant_shard_id: tenant_shard_id,
95 665 : _timeline_id: timeline_id,
96 665 : page_cache_file_id,
97 665 : bytes_written: 0,
98 665 : file: file.clone(),
99 665 : buffered_writer: BufferedWriter::new(
100 665 : file,
101 : 0,
102 1330 : || IoBufferMut::with_capacity(TAIL_SZ),
103 665 : gate.enter()?,
104 665 : cancel.child_token(),
105 665 : ctx,
106 665 : info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
107 : ),
108 : })
109 665 : }
110 : }
111 :
112 : impl TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
113 665 : fn new(file: VirtualFile, gate_guard: GateGuard) -> Self {
114 665 : Self {
115 665 : inner: Arc::new(TempVirtualFile::new(file, gate_guard)),
116 665 : }
117 665 : }
118 : }
119 :
120 : impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
121 3320 : fn write_all_at<Buf: owned_buffers_io::io_buf_aligned::IoBufAligned + Send>(
122 3320 : &self,
123 3320 : buf: owned_buffers_io::io_buf_ext::FullSlice<Buf>,
124 3320 : offset: u64,
125 3320 : ctx: &RequestContext,
126 3320 : ) -> impl std::future::Future<
127 3320 : Output = (
128 3320 : owned_buffers_io::io_buf_ext::FullSlice<Buf>,
129 3320 : std::io::Result<()>,
130 3320 : ),
131 3320 : > + Send {
132 3320 : self.inner.write_all_at(buf, offset, ctx)
133 3320 : }
134 :
135 0 : fn set_len(
136 0 : &self,
137 0 : len: u64,
138 0 : ctx: &RequestContext,
139 0 : ) -> impl Future<Output = std::io::Result<()>> + Send {
140 0 : self.inner.set_len(len, ctx)
141 0 : }
142 : }
143 :
144 : impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
145 : type Target = VirtualFile;
146 :
147 18658 : fn deref(&self) -> &Self::Target {
148 18658 : &self.inner
149 18658 : }
150 : }
151 :
152 : #[derive(Debug, thiserror::Error)]
153 : pub(crate) enum EphemeralFileWriteError {
154 : #[error("{0}")]
155 : TooLong(String),
156 : #[error("cancelled")]
157 : Cancelled,
158 : }
159 :
160 : impl EphemeralFile {
161 4806584 : pub(crate) fn len(&self) -> u64 {
162 4806584 : self.bytes_written
163 4806584 : }
164 :
165 661 : pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
166 661 : self.page_cache_file_id
167 661 : }
168 :
169 485 : pub(crate) async fn load_to_io_buf(
170 485 : &self,
171 485 : ctx: &RequestContext,
172 485 : ) -> Result<IoBufferMut, io::Error> {
173 485 : let size = self.len().into_usize();
174 485 : let buf = IoBufferMut::with_capacity(size);
175 485 : let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
176 485 : assert_eq!(nread, size);
177 485 : let buf = slice.into_inner();
178 485 : assert_eq!(buf.len(), nread);
179 485 : assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
180 485 : Ok(buf)
181 485 : }
182 :
183 : /// Returns the offset at which the first byte of the input was written, for use
184 : /// in constructing indices over the written value.
185 : ///
186 : /// Panics if the write is short because there's no way we can recover from that.
187 : /// TODO: make upstack handle this as an error.
188 2402449 : pub(crate) async fn write_raw(
189 2402449 : &mut self,
190 2402449 : srcbuf: &[u8],
191 2402449 : ctx: &RequestContext,
192 2402449 : ) -> Result<u64, EphemeralFileWriteError> {
193 2402449 : let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
194 2402449 : if let Some(control) = control {
195 2769 : control.release().await;
196 2399680 : }
197 2402449 : Ok(pos)
198 2402449 : }
199 :
200 2402450 : async fn write_raw_controlled(
201 2402450 : &mut self,
202 2402450 : srcbuf: &[u8],
203 2402450 : ctx: &RequestContext,
204 2402450 : ) -> Result<(u64, Option<owned_buffers_io::write::FlushControl>), EphemeralFileWriteError> {
205 2402450 : let pos = self.bytes_written;
206 :
207 2402450 : let new_bytes_written = pos.checked_add(srcbuf.len().into_u64()).ok_or_else(|| {
208 0 : EphemeralFileWriteError::TooLong(format!(
209 0 : "write would grow EphemeralFile beyond u64::MAX: len={pos} writen={srcbuf_len}",
210 0 : srcbuf_len = srcbuf.len(),
211 0 : ))
212 2402450 : })?;
213 :
214 : // Write the payload
215 2402450 : let (nwritten, control) = self
216 2402450 : .buffered_writer
217 2402450 : .write_buffered_borrowed_controlled(srcbuf, ctx)
218 2402450 : .await
219 2402450 : .map_err(|e| match e {
220 0 : FlushTaskError::Cancelled => EphemeralFileWriteError::Cancelled,
221 2402450 : })?;
222 2402450 : assert_eq!(
223 2402450 : nwritten,
224 2402450 : srcbuf.len(),
225 0 : "buffered writer has no short writes"
226 : );
227 :
228 2402450 : self.bytes_written = new_bytes_written;
229 2402450 :
230 2402450 : Ok((pos, control))
231 2402450 : }
232 : }
233 :
234 : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
235 262364 : async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
236 262364 : &self,
237 262364 : start: u64,
238 262364 : dst: tokio_epoll_uring::Slice<B>,
239 262364 : ctx: &RequestContext,
240 262364 : ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
241 262364 : let submitted_offset = self.buffered_writer.bytes_submitted();
242 :
243 262364 : let mutable = match self.buffered_writer.inspect_mutable() {
244 262364 : Some(mutable) => &mutable[0..mutable.pending()],
245 : None => {
246 : // Timeline::cancel and hence buffered writer flush was cancelled.
247 : // Remain read-available while timeline is shutting down.
248 0 : &[]
249 : }
250 : };
251 :
252 262364 : let maybe_flushed = self.buffered_writer.inspect_maybe_flushed();
253 262364 :
254 262364 : let dst_cap = dst.bytes_total().into_u64();
255 262364 : let end = {
256 : // saturating_add is correct here because the max file size is u64::MAX, so,
257 : // if start + dst.len() > u64::MAX, then we know it will be a short read
258 262364 : let mut end: u64 = start.saturating_add(dst_cap);
259 262364 : if end > self.bytes_written {
260 136136 : end = self.bytes_written;
261 136136 : }
262 262364 : end
263 : };
264 :
265 : // inclusive, exclusive
266 : #[derive(Debug)]
267 : struct Range<N>(N, N);
268 : impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
269 1702823 : fn len(&self) -> N {
270 1702823 : if self.0 > self.1 {
271 910336 : N::zero()
272 : } else {
273 792487 : self.1 - self.0
274 : }
275 1702823 : }
276 : }
277 :
278 262364 : let (written_range, maybe_flushed_range) = {
279 262364 : if maybe_flushed.is_some() {
280 : // [ written ][ maybe_flushed ][ mutable ]
281 : // ^
282 : // `submitted_offset`
283 : // <++++++ on disk +++++++????????????????>
284 257108 : (
285 257108 : Range(
286 257108 : start,
287 257108 : std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
288 257108 : ),
289 257108 : Range(
290 257108 : std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
291 257108 : std::cmp::min(end, submitted_offset),
292 257108 : ),
293 257108 : )
294 : } else {
295 : // [ written ][ mutable ]
296 : // ^
297 : // `submitted_offset`
298 : // <++++++ on disk +++++++++++++++++++++++>
299 5256 : (
300 5256 : Range(start, std::cmp::min(end, submitted_offset)),
301 5256 : // zero len
302 5256 : Range(submitted_offset, u64::MIN),
303 5256 : )
304 : }
305 : };
306 :
307 262364 : let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
308 :
309 262364 : let dst = if written_range.len() > 0 {
310 18656 : let bounds = dst.bounds();
311 18656 : let slice = self
312 18656 : .file
313 18656 : .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
314 18656 : .await?;
315 18656 : Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
316 : } else {
317 243708 : dst
318 : };
319 :
320 262364 : let dst = if maybe_flushed_range.len() > 0 {
321 81665 : let offset_in_buffer = maybe_flushed_range
322 81665 : .0
323 81665 : .checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
324 81665 : .unwrap()
325 81665 : .into_usize();
326 81665 : // Checked previously the buffer is Some.
327 81665 : let maybe_flushed = maybe_flushed.unwrap();
328 81665 : let to_copy = &maybe_flushed
329 81665 : [offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
330 81665 : let bounds = dst.bounds();
331 81665 : let mut view = dst.slice({
332 81665 : let start = written_range.len().into_usize();
333 81665 : let end = start
334 81665 : .checked_add(maybe_flushed_range.len().into_usize())
335 81665 : .unwrap();
336 81665 : start..end
337 81665 : });
338 81665 : view.as_mut_rust_slice_full_zeroed()
339 81665 : .copy_from_slice(to_copy);
340 81665 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
341 : } else {
342 180699 : dst
343 : };
344 :
345 262364 : let dst = if mutable_range.len() > 0 {
346 163020 : let offset_in_buffer = mutable_range
347 163020 : .0
348 163020 : .checked_sub(submitted_offset)
349 163020 : .unwrap()
350 163020 : .into_usize();
351 163020 : let to_copy =
352 163020 : &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
353 163020 : let bounds = dst.bounds();
354 163020 : let mut view = dst.slice({
355 163020 : let start =
356 163020 : written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
357 163020 : let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
358 163020 : start..end
359 163020 : });
360 163020 : view.as_mut_rust_slice_full_zeroed()
361 163020 : .copy_from_slice(to_copy);
362 163020 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
363 : } else {
364 99344 : dst
365 : };
366 :
367 : // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
368 :
369 262364 : Ok((dst, (end - start).into_usize()))
370 262364 : }
371 : }
372 :
373 : /// Does the given filename look like an ephemeral file?
374 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
375 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
376 0 : rest.parse::<u32>().is_ok()
377 : } else {
378 0 : false
379 : }
380 0 : }
381 :
382 : #[cfg(test)]
383 : mod tests {
384 : use std::fs;
385 : use std::str::FromStr;
386 :
387 : use rand::Rng;
388 :
389 : use super::*;
390 : use crate::context::DownloadBehavior;
391 : use crate::task_mgr::TaskKind;
392 :
393 4 : fn harness(
394 4 : test_name: &str,
395 4 : ) -> Result<
396 4 : (
397 4 : &'static PageServerConf,
398 4 : TenantShardId,
399 4 : TimelineId,
400 4 : RequestContext,
401 4 : ),
402 4 : io::Error,
403 4 : > {
404 4 : let repo_dir = PageServerConf::test_repo_dir(test_name);
405 4 : let _ = fs::remove_dir_all(&repo_dir);
406 4 : let conf = PageServerConf::dummy_conf(repo_dir);
407 4 : // Make a static copy of the config. This can never be free'd, but that's
408 4 : // OK in a test.
409 4 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
410 4 :
411 4 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
412 4 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
413 4 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
414 :
415 4 : let ctx =
416 4 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
417 4 :
418 4 : Ok((conf, tenant_shard_id, timeline_id, ctx))
419 4 : }
420 :
421 : #[tokio::test]
422 1 : async fn ephemeral_file_holds_gate_open() {
423 1 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
424 1 :
425 1 : let (conf, tenant_id, timeline_id, ctx) =
426 1 : harness("ephemeral_file_holds_gate_open").unwrap();
427 1 :
428 1 : let gate = utils::sync::gate::Gate::default();
429 1 : let cancel = CancellationToken::new();
430 1 :
431 1 : let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
432 1 : .await
433 1 : .unwrap();
434 1 :
435 1 : let mut closing = tokio::task::spawn(async move {
436 1 : gate.close().await;
437 1 : });
438 1 :
439 1 : // gate is entered until the ephemeral file is dropped
440 1 : // do not start paused tokio-epoll-uring has a sleep loop
441 1 : tokio::time::pause();
442 1 : tokio::time::timeout(FOREVER, &mut closing)
443 1 : .await
444 1 : .expect_err("closing cannot complete before dropping");
445 1 :
446 1 : // this is a requirement of the reset_tenant functionality: we have to be able to restart a
447 1 : // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
448 1 : drop(file);
449 1 :
450 1 : tokio::time::timeout(FOREVER, &mut closing)
451 1 : .await
452 1 : .expect("closing completes right away")
453 1 : .expect("closing does not panic");
454 1 : }
455 :
456 : #[tokio::test]
457 1 : async fn test_ephemeral_file_basics() {
458 1 : let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
459 1 :
460 1 : let gate = utils::sync::gate::Gate::default();
461 1 : let cancel = CancellationToken::new();
462 1 :
463 1 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
464 1 : .await
465 1 : .unwrap();
466 1 :
467 1 : let mutable = file.buffered_writer.mutable();
468 1 : let cap = mutable.capacity();
469 1 : let align = mutable.align();
470 1 :
471 1 : let write_nbytes = cap * 2 + cap / 2;
472 1 :
473 1 : let content: Vec<u8> = rand::thread_rng()
474 1 : .sample_iter(rand::distributions::Standard)
475 1 : .take(write_nbytes)
476 1 : .collect();
477 1 :
478 1 : let mut value_offsets = Vec::new();
479 320 : for range in (0..write_nbytes)
480 1 : .step_by(align)
481 320 : .map(|start| start..(start + align).min(write_nbytes))
482 1 : {
483 320 : let off = file.write_raw(&content[range], &ctx).await.unwrap();
484 320 : value_offsets.push(off);
485 1 : }
486 1 :
487 1 : assert_eq!(file.len() as usize, write_nbytes);
488 320 : for (i, range) in (0..write_nbytes)
489 1 : .step_by(align)
490 320 : .map(|start| start..(start + align).min(write_nbytes))
491 1 : .enumerate()
492 1 : {
493 320 : assert_eq!(value_offsets[i], range.start.into_u64());
494 320 : let buf = IoBufferMut::with_capacity(range.len());
495 320 : let (buf_slice, nread) = file
496 320 : .read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx)
497 320 : .await
498 320 : .unwrap();
499 320 : let buf = buf_slice.into_inner();
500 320 : assert_eq!(nread, range.len());
501 320 : assert_eq!(&buf, &content[range]);
502 1 : }
503 1 :
504 1 : let file_contents = std::fs::read(file.file.path()).unwrap();
505 1 : assert!(file_contents == content[0..cap * 2]);
506 1 :
507 1 : let maybe_flushed_buffer_contents = file.buffered_writer.inspect_maybe_flushed().unwrap();
508 1 : assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
509 1 :
510 1 : let mutable_buffer_contents = file.buffered_writer.mutable();
511 1 : assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
512 1 : }
513 :
514 : #[tokio::test]
515 1 : async fn test_flushes_do_happen() {
516 1 : let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
517 1 :
518 1 : let gate = utils::sync::gate::Gate::default();
519 1 : let cancel = CancellationToken::new();
520 1 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
521 1 : .await
522 1 : .unwrap();
523 1 :
524 1 : // mutable buffer and maybe_flushed buffer each has `cap` bytes.
525 1 : let cap = file.buffered_writer.mutable().capacity();
526 1 :
527 1 : let content: Vec<u8> = rand::thread_rng()
528 1 : .sample_iter(rand::distributions::Standard)
529 1 : .take(cap * 2 + cap / 2)
530 1 : .collect();
531 1 :
532 1 : file.write_raw(&content, &ctx).await.unwrap();
533 1 :
534 1 : // assert the state is as this test expects it to be
535 1 : let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
536 1 : assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
537 1 : let md = file.file.path().metadata().unwrap();
538 1 : assert_eq!(
539 1 : md.len(),
540 1 : 2 * cap.into_u64(),
541 1 : "buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
542 1 : );
543 1 : assert_eq!(
544 1 : &file.buffered_writer.inspect_maybe_flushed().unwrap()[0..cap],
545 1 : &content[cap..cap * 2]
546 1 : );
547 1 : assert_eq!(
548 1 : &file.buffered_writer.mutable()[0..cap / 2],
549 1 : &content[cap * 2..cap * 2 + cap / 2]
550 1 : );
551 1 : }
552 :
553 : #[tokio::test]
554 1 : async fn test_read_split_across_file_and_buffer() {
555 1 : // This test exercises the logic on the read path that splits the logical read
556 1 : // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
557 1 : //
558 1 : // This test build on the assertions in test_flushes_do_happen
559 1 :
560 1 : let (conf, tenant_id, timeline_id, ctx) =
561 1 : harness("test_read_split_across_file_and_buffer").unwrap();
562 1 :
563 1 : let gate = utils::sync::gate::Gate::default();
564 1 : let cancel = CancellationToken::new();
565 1 :
566 1 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
567 1 : .await
568 1 : .unwrap();
569 1 :
570 1 : let mutable = file.buffered_writer.mutable();
571 1 : let cap = mutable.capacity();
572 1 : let align = mutable.align();
573 1 : let content: Vec<u8> = rand::thread_rng()
574 1 : .sample_iter(rand::distributions::Standard)
575 1 : .take(cap * 2 + cap / 2)
576 1 : .collect();
577 1 :
578 1 : let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap();
579 1 :
580 27 : let test_read = |start: usize, len: usize| {
581 27 : let file = &file;
582 27 : let ctx = &ctx;
583 27 : let content = &content;
584 27 : async move {
585 27 : let (buf, nread) = file
586 27 : .read_exact_at_eof_ok(
587 27 : start.into_u64(),
588 27 : IoBufferMut::with_capacity(len).slice_full(),
589 27 : ctx,
590 27 : )
591 27 : .await
592 27 : .unwrap();
593 27 : assert_eq!(nread, len);
594 27 : assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
595 27 : }
596 27 : };
597 1 :
598 3 : let test_read_all_offset_combinations = || {
599 3 : async move {
600 3 : test_read(align, align).await;
601 1 : // border onto edge of file
602 3 : test_read(cap - align, align).await;
603 1 : // read across file and buffer
604 3 : test_read(cap - align, 2 * align).await;
605 1 : // stay from start of maybe flushed buffer
606 3 : test_read(cap, align).await;
607 1 : // completely within maybe flushed buffer
608 3 : test_read(cap + align, align).await;
609 1 : // border onto edge of maybe flushed buffer.
610 3 : test_read(cap * 2 - align, align).await;
611 1 : // read across maybe flushed and mutable buffer
612 3 : test_read(cap * 2 - align, 2 * align).await;
613 1 : // read across three segments
614 3 : test_read(cap - align, cap + 2 * align).await;
615 1 : // completely within mutable buffer
616 3 : test_read(cap * 2 + align, align).await;
617 3 : }
618 3 : };
619 1 :
620 1 : // completely within the file range
621 1 : assert!(align < cap, "test assumption");
622 1 : assert!(cap % align == 0);
623 1 :
624 1 : // test reads at different flush stages.
625 1 : let not_started = control.unwrap().into_not_started();
626 1 : test_read_all_offset_combinations().await;
627 1 : let in_progress = not_started.ready_to_flush();
628 1 : test_read_all_offset_combinations().await;
629 1 : in_progress.wait_until_flush_is_done().await;
630 1 : test_read_all_offset_combinations().await;
631 1 : }
632 : }
|