Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use std::io;
5 : use std::sync::Arc;
6 : use std::sync::atomic::{AtomicU64, Ordering};
7 :
8 : use camino::Utf8PathBuf;
9 : use num_traits::Num;
10 : use pageserver_api::shard::TenantShardId;
11 : use tokio_epoll_uring::{BoundedBuf, Slice};
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{error, info_span};
14 : use utils::id::TimelineId;
15 : use utils::sync::gate::GateGuard;
16 :
17 : use crate::assert_u64_eq_usize::{U64IsUsize, UsizeIsU64};
18 : use crate::config::PageServerConf;
19 : use crate::context::RequestContext;
20 : use crate::page_cache;
21 : use crate::tenant::storage_layer::inmemory_layer::GlobalResourceUnits;
22 : use crate::tenant::storage_layer::inmemory_layer::vectored_dio_read::File;
23 : use crate::virtual_file::owned_buffers_io::io_buf_aligned::IoBufAlignedMut;
24 : use crate::virtual_file::owned_buffers_io::slice::SliceMutExt;
25 : use crate::virtual_file::owned_buffers_io::write::{Buffer, FlushTaskError};
26 : use crate::virtual_file::{self, IoBufferMut, TempVirtualFile, VirtualFile, owned_buffers_io};
27 :
28 : use self::owned_buffers_io::write::OwnedAsyncWriter;
29 :
30 : pub struct EphemeralFile {
31 : _tenant_shard_id: TenantShardId,
32 : _timeline_id: TimelineId,
33 : page_cache_file_id: page_cache::FileId,
34 : file: TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
35 :
36 : buffered_writer: tokio::sync::RwLock<BufferedWriter>,
37 :
38 : bytes_written: AtomicU64,
39 :
40 : resource_units: std::sync::Mutex<GlobalResourceUnits>,
41 : }
42 :
43 : type BufferedWriter = owned_buffers_io::write::BufferedWriter<
44 : IoBufferMut,
45 : TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter,
46 : >;
47 :
48 : /// A TempVirtualFile that is co-owned by the [`EphemeralFile`]` and [`BufferedWriter`].
49 : ///
50 : /// (Actually [`BufferedWriter`] internally is just a client to a background flush task.
51 : /// The co-ownership is between [`EphemeralFile`] and that flush task.)
52 : ///
53 : /// Co-ownership allows us to serve reads for data that has already been flushed by the [`BufferedWriter`].
54 : #[derive(Debug, Clone)]
55 : struct TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
56 : inner: Arc<TempVirtualFile>,
57 : }
58 :
59 : const TAIL_SZ: usize = 64 * 1024;
60 :
61 : impl EphemeralFile {
62 669 : pub async fn create(
63 669 : conf: &PageServerConf,
64 669 : tenant_shard_id: TenantShardId,
65 669 : timeline_id: TimelineId,
66 669 : gate: &utils::sync::gate::Gate,
67 669 : cancel: &CancellationToken,
68 669 : ctx: &RequestContext,
69 669 : ) -> anyhow::Result<EphemeralFile> {
70 : // TempVirtualFile requires us to never reuse a filename while an old
71 : // instance of TempVirtualFile created with that filename is not done dropping yet.
72 : // So, we use a monotonic counter to disambiguate the filenames.
73 : static NEXT_TEMP_DISAMBIGUATOR: AtomicU64 = AtomicU64::new(1);
74 669 : let filename_disambiguator =
75 669 : NEXT_TEMP_DISAMBIGUATOR.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
76 :
77 669 : let filename = conf
78 669 : .timeline_path(&tenant_shard_id, &timeline_id)
79 669 : .join(Utf8PathBuf::from(format!(
80 669 : "ephemeral-{filename_disambiguator}"
81 669 : )));
82 :
83 669 : let file = TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter::new(
84 669 : VirtualFile::open_with_options_v2(
85 669 : &filename,
86 669 : virtual_file::OpenOptions::new()
87 669 : .create_new(true)
88 669 : .read(true)
89 669 : .write(true),
90 669 : ctx,
91 669 : )
92 669 : .await?,
93 669 : gate.enter()?,
94 : );
95 :
96 669 : let page_cache_file_id = page_cache::next_file_id(); // XXX get rid, we're not page-caching anymore
97 :
98 : Ok(EphemeralFile {
99 669 : _tenant_shard_id: tenant_shard_id,
100 669 : _timeline_id: timeline_id,
101 669 : page_cache_file_id,
102 669 : file: file.clone(),
103 669 : buffered_writer: tokio::sync::RwLock::new(BufferedWriter::new(
104 669 : file,
105 : 0,
106 1338 : || IoBufferMut::with_capacity(TAIL_SZ),
107 669 : gate.enter()?,
108 669 : cancel.child_token(),
109 669 : ctx,
110 669 : info_span!(parent: None, "ephemeral_file_buffered_writer", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), timeline_id=%timeline_id, path = %filename),
111 : )),
112 669 : bytes_written: AtomicU64::new(0),
113 669 : resource_units: std::sync::Mutex::new(GlobalResourceUnits::new()),
114 : })
115 669 : }
116 : }
117 :
118 : impl TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
119 669 : fn new(file: VirtualFile, gate_guard: GateGuard) -> Self {
120 669 : Self {
121 669 : inner: Arc::new(TempVirtualFile::new(file, gate_guard)),
122 669 : }
123 669 : }
124 : }
125 :
126 : impl OwnedAsyncWriter for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
127 3320 : fn write_all_at<Buf: owned_buffers_io::io_buf_aligned::IoBufAligned + Send>(
128 3320 : &self,
129 3320 : buf: owned_buffers_io::io_buf_ext::FullSlice<Buf>,
130 3320 : offset: u64,
131 3320 : ctx: &RequestContext,
132 3320 : ) -> impl std::future::Future<
133 3320 : Output = (
134 3320 : owned_buffers_io::io_buf_ext::FullSlice<Buf>,
135 3320 : std::io::Result<()>,
136 3320 : ),
137 3320 : > + Send {
138 3320 : self.inner.write_all_at(buf, offset, ctx)
139 3320 : }
140 :
141 0 : fn set_len(
142 0 : &self,
143 0 : len: u64,
144 0 : ctx: &RequestContext,
145 0 : ) -> impl Future<Output = std::io::Result<()>> + Send {
146 0 : self.inner.set_len(len, ctx)
147 0 : }
148 : }
149 :
150 : impl std::ops::Deref for TempVirtualFileCoOwnedByEphemeralFileAndBufferedWriter {
151 : type Target = VirtualFile;
152 :
153 18647 : fn deref(&self) -> &Self::Target {
154 18647 : &self.inner
155 18647 : }
156 : }
157 :
158 : #[derive(Debug, thiserror::Error)]
159 : pub(crate) enum EphemeralFileWriteError {
160 : #[error("cancelled")]
161 : Cancelled,
162 : }
163 :
164 : impl EphemeralFile {
165 4806605 : pub(crate) fn len(&self) -> u64 {
166 : // TODO(vlad): The value returned here is not always correct if
167 : // we have more than one concurrent writer. Writes are always
168 : // sequenced, but we could grab the buffered writer lock if we wanted
169 : // to.
170 4806605 : self.bytes_written.load(Ordering::Acquire)
171 4806605 : }
172 :
173 665 : pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
174 665 : self.page_cache_file_id
175 665 : }
176 :
177 487 : pub(crate) async fn load_to_io_buf(
178 487 : &self,
179 487 : ctx: &RequestContext,
180 487 : ) -> Result<IoBufferMut, io::Error> {
181 487 : let size = self.len().into_usize();
182 487 : let buf = IoBufferMut::with_capacity(size);
183 487 : let (slice, nread) = self.read_exact_at_eof_ok(0, buf.slice_full(), ctx).await?;
184 487 : assert_eq!(nread, size);
185 487 : let buf = slice.into_inner();
186 487 : assert_eq!(buf.len(), nread);
187 487 : assert_eq!(buf.capacity(), size, "we shouldn't be reallocating");
188 487 : Ok(buf)
189 487 : }
190 :
191 : /// Returns the offset at which the first byte of the input was written, for use
192 : /// in constructing indices over the written value.
193 : ///
194 : /// Panics if the write is short because there's no way we can recover from that.
195 : /// TODO: make upstack handle this as an error.
196 2402453 : pub(crate) async fn write_raw(
197 2402453 : &self,
198 2402453 : srcbuf: &[u8],
199 2402453 : ctx: &RequestContext,
200 2402453 : ) -> Result<u64, EphemeralFileWriteError> {
201 2402453 : let (pos, control) = self.write_raw_controlled(srcbuf, ctx).await?;
202 2402453 : if let Some(control) = control {
203 2769 : control.release().await;
204 2399684 : }
205 2402453 : Ok(pos)
206 2402453 : }
207 :
208 2402454 : async fn write_raw_controlled(
209 2402454 : &self,
210 2402454 : srcbuf: &[u8],
211 2402454 : ctx: &RequestContext,
212 2402454 : ) -> Result<(u64, Option<owned_buffers_io::write::FlushControl>), EphemeralFileWriteError> {
213 2402454 : let mut writer = self.buffered_writer.write().await;
214 :
215 2402454 : let (nwritten, control) = writer
216 2402454 : .write_buffered_borrowed_controlled(srcbuf, ctx)
217 2402454 : .await
218 2402454 : .map_err(|e| match e {
219 0 : FlushTaskError::Cancelled => EphemeralFileWriteError::Cancelled,
220 0 : })?;
221 2402454 : assert_eq!(
222 : nwritten,
223 2402454 : srcbuf.len(),
224 0 : "buffered writer has no short writes"
225 : );
226 :
227 : // There's no realistic risk of overflow here. We won't have exabytes sized files on disk.
228 2402454 : let pos = self
229 2402454 : .bytes_written
230 2402454 : .fetch_add(srcbuf.len().into_u64(), Ordering::AcqRel);
231 :
232 2402454 : let mut resource_units = self.resource_units.lock().unwrap();
233 2402454 : resource_units.maybe_publish_size(self.bytes_written.load(Ordering::Relaxed));
234 :
235 2402454 : Ok((pos, control))
236 2402454 : }
237 :
238 0 : pub(crate) fn tick(&self) -> Option<u64> {
239 0 : let mut resource_units = self.resource_units.lock().unwrap();
240 0 : let len = self.bytes_written.load(Ordering::Relaxed);
241 0 : resource_units.publish_size(len)
242 0 : }
243 : }
244 :
245 : impl super::storage_layer::inmemory_layer::vectored_dio_read::File for EphemeralFile {
246 262460 : async fn read_exact_at_eof_ok<B: IoBufAlignedMut + Send>(
247 262460 : &self,
248 262460 : start: u64,
249 262460 : mut dst: tokio_epoll_uring::Slice<B>,
250 262460 : ctx: &RequestContext,
251 262460 : ) -> std::io::Result<(tokio_epoll_uring::Slice<B>, usize)> {
252 : // We will fill the slice in back to front. Hence, we need
253 : // the slice to be fully initialized.
254 : // TODO(vlad): Is there a nicer way of doing this?
255 262460 : dst.as_mut_rust_slice_full_zeroed();
256 :
257 262460 : let writer = self.buffered_writer.read().await;
258 :
259 : // Read bytes written while under lock. This is a hack to deal with concurrent
260 : // writes updating the number of bytes written. `bytes_written` is not DIO alligned
261 : // but we may end the read there.
262 : //
263 : // TODO(vlad): Feels like there's a nicer path where we align the end if it
264 : // shoots over the end of the file.
265 262460 : let bytes_written = self.bytes_written.load(Ordering::Acquire);
266 :
267 262460 : let dst_cap = dst.bytes_total().into_u64();
268 262460 : let end = {
269 : // saturating_add is correct here because the max file size is u64::MAX, so,
270 : // if start + dst.len() > u64::MAX, then we know it will be a short read
271 262460 : let mut end: u64 = start.saturating_add(dst_cap);
272 262460 : if end > bytes_written {
273 136159 : end = bytes_written;
274 136159 : }
275 262460 : end
276 : };
277 :
278 262460 : let submitted_offset = writer.bytes_submitted();
279 262460 : let maybe_flushed = writer.inspect_maybe_flushed();
280 :
281 262460 : let mutable = match writer.inspect_mutable() {
282 262460 : Some(mutable) => &mutable[0..mutable.pending()],
283 : None => {
284 : // Timeline::cancel and hence buffered writer flush was cancelled.
285 : // Remain read-available while timeline is shutting down.
286 0 : &[]
287 : }
288 : };
289 :
290 : // inclusive, exclusive
291 : #[derive(Debug)]
292 : struct Range<N>(N, N);
293 : impl<N: Num + Clone + Copy + PartialOrd + Ord> Range<N> {
294 1703392 : fn len(&self) -> N {
295 1703392 : if self.0 > self.1 {
296 910616 : N::zero()
297 : } else {
298 792776 : self.1 - self.0
299 : }
300 1703392 : }
301 : }
302 :
303 262460 : let (written_range, maybe_flushed_range) = {
304 262460 : if maybe_flushed.is_some() {
305 : // [ written ][ maybe_flushed ][ mutable ]
306 : // ^
307 : // `submitted_offset`
308 : // <++++++ on disk +++++++????????????????>
309 257206 : (
310 257206 : Range(
311 257206 : start,
312 257206 : std::cmp::min(end, submitted_offset.saturating_sub(TAIL_SZ as u64)),
313 257206 : ),
314 257206 : Range(
315 257206 : std::cmp::max(start, submitted_offset.saturating_sub(TAIL_SZ as u64)),
316 257206 : std::cmp::min(end, submitted_offset),
317 257206 : ),
318 257206 : )
319 : } else {
320 : // [ written ][ mutable ]
321 : // ^
322 : // `submitted_offset`
323 : // <++++++ on disk +++++++++++++++++++++++>
324 5254 : (
325 5254 : Range(start, std::cmp::min(end, submitted_offset)),
326 5254 : // zero len
327 5254 : Range(submitted_offset, u64::MIN),
328 5254 : )
329 : }
330 : };
331 :
332 262460 : let mutable_range = Range(std::cmp::max(start, submitted_offset), end);
333 :
334 : // There are three sources from which we might have to read data:
335 : // 1. The file itself
336 : // 2. The buffer which contains changes currently being flushed
337 : // 3. The buffer which contains chnages yet to be flushed
338 : //
339 : // For better concurrency, we do them in reverse order: perform the in-memory
340 : // reads while holding the writer lock, drop the writer lock and read from the
341 : // file if required.
342 :
343 262460 : let dst = if mutable_range.len() > 0 {
344 162997 : let offset_in_buffer = mutable_range
345 162997 : .0
346 162997 : .checked_sub(submitted_offset)
347 162997 : .unwrap()
348 162997 : .into_usize();
349 162997 : let to_copy =
350 162997 : &mutable[offset_in_buffer..(offset_in_buffer + mutable_range.len().into_usize())];
351 162997 : let bounds = dst.bounds();
352 162997 : let mut view = dst.slice({
353 162997 : let start =
354 162997 : written_range.len().into_usize() + maybe_flushed_range.len().into_usize();
355 162997 : let end = start.checked_add(mutable_range.len().into_usize()).unwrap();
356 162997 : start..end
357 : });
358 162997 : view.as_mut_rust_slice_full_zeroed()
359 162997 : .copy_from_slice(to_copy);
360 162997 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
361 : } else {
362 99463 : dst
363 : };
364 :
365 262460 : let dst = if maybe_flushed_range.len() > 0 {
366 81793 : let offset_in_buffer = maybe_flushed_range
367 81793 : .0
368 81793 : .checked_sub(submitted_offset.saturating_sub(TAIL_SZ as u64))
369 81793 : .unwrap()
370 81793 : .into_usize();
371 : // Checked previously the buffer is Some.
372 81793 : let maybe_flushed = maybe_flushed.unwrap();
373 81793 : let to_copy = &maybe_flushed
374 81793 : [offset_in_buffer..(offset_in_buffer + maybe_flushed_range.len().into_usize())];
375 81793 : let bounds = dst.bounds();
376 81793 : let mut view = dst.slice({
377 81793 : let start = written_range.len().into_usize();
378 81793 : let end = start
379 81793 : .checked_add(maybe_flushed_range.len().into_usize())
380 81793 : .unwrap();
381 81793 : start..end
382 : });
383 81793 : view.as_mut_rust_slice_full_zeroed()
384 81793 : .copy_from_slice(to_copy);
385 81793 : Slice::from_buf_bounds(Slice::into_inner(view), bounds)
386 : } else {
387 180667 : dst
388 : };
389 :
390 262460 : drop(writer);
391 :
392 262460 : let dst = if written_range.len() > 0 {
393 18645 : let bounds = dst.bounds();
394 18645 : let slice = self
395 18645 : .file
396 18645 : .read_exact_at(dst.slice(0..written_range.len().into_usize()), start, ctx)
397 18645 : .await?;
398 18645 : Slice::from_buf_bounds(Slice::into_inner(slice), bounds)
399 : } else {
400 243815 : dst
401 : };
402 :
403 : // TODO: in debug mode, randomize the remaining bytes in `dst` to catch bugs
404 :
405 262460 : Ok((dst, (end - start).into_usize()))
406 262460 : }
407 : }
408 :
409 : /// Does the given filename look like an ephemeral file?
410 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
411 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
412 0 : rest.parse::<u32>().is_ok()
413 : } else {
414 0 : false
415 : }
416 0 : }
417 :
418 : #[cfg(test)]
419 : mod tests {
420 : use std::fs;
421 : use std::str::FromStr;
422 :
423 : use rand::Rng;
424 :
425 : use super::*;
426 : use crate::context::DownloadBehavior;
427 : use crate::task_mgr::TaskKind;
428 :
429 4 : fn harness(
430 4 : test_name: &str,
431 4 : ) -> Result<
432 4 : (
433 4 : &'static PageServerConf,
434 4 : TenantShardId,
435 4 : TimelineId,
436 4 : RequestContext,
437 4 : ),
438 4 : io::Error,
439 4 : > {
440 4 : let repo_dir = PageServerConf::test_repo_dir(test_name);
441 4 : let _ = fs::remove_dir_all(&repo_dir);
442 4 : let conf = PageServerConf::dummy_conf(repo_dir);
443 : // Make a static copy of the config. This can never be free'd, but that's
444 : // OK in a test.
445 4 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
446 :
447 4 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
448 4 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
449 4 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
450 :
451 4 : let ctx =
452 4 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error).with_scope_unit_test();
453 :
454 4 : Ok((conf, tenant_shard_id, timeline_id, ctx))
455 4 : }
456 :
457 : #[tokio::test]
458 1 : async fn ephemeral_file_holds_gate_open() {
459 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
460 :
461 1 : let (conf, tenant_id, timeline_id, ctx) =
462 1 : harness("ephemeral_file_holds_gate_open").unwrap();
463 :
464 1 : let gate = utils::sync::gate::Gate::default();
465 1 : let cancel = CancellationToken::new();
466 :
467 1 : let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
468 1 : .await
469 1 : .unwrap();
470 :
471 1 : let mut closing = tokio::task::spawn(async move {
472 1 : gate.close().await;
473 1 : });
474 :
475 : // gate is entered until the ephemeral file is dropped
476 : // do not start paused tokio-epoll-uring has a sleep loop
477 1 : tokio::time::pause();
478 1 : tokio::time::timeout(FOREVER, &mut closing)
479 1 : .await
480 1 : .expect_err("closing cannot complete before dropping");
481 :
482 : // this is a requirement of the reset_tenant functionality: we have to be able to restart a
483 : // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
484 1 : drop(file);
485 :
486 1 : tokio::time::timeout(FOREVER, &mut closing)
487 1 : .await
488 1 : .expect("closing completes right away")
489 1 : .expect("closing does not panic");
490 1 : }
491 :
492 : #[tokio::test]
493 1 : async fn test_ephemeral_file_basics() {
494 1 : let (conf, tenant_id, timeline_id, ctx) = harness("test_ephemeral_file_basics").unwrap();
495 :
496 1 : let gate = utils::sync::gate::Gate::default();
497 1 : let cancel = CancellationToken::new();
498 :
499 1 : let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
500 1 : .await
501 1 : .unwrap();
502 :
503 1 : let writer = file.buffered_writer.read().await;
504 1 : let mutable = writer.mutable();
505 1 : let cap = mutable.capacity();
506 1 : let align = mutable.align();
507 1 : drop(writer);
508 :
509 1 : let write_nbytes = cap * 2 + cap / 2;
510 :
511 1 : let content: Vec<u8> = rand::thread_rng()
512 1 : .sample_iter(rand::distributions::Standard)
513 1 : .take(write_nbytes)
514 1 : .collect();
515 :
516 1 : let mut value_offsets = Vec::new();
517 320 : for range in (0..write_nbytes)
518 1 : .step_by(align)
519 320 : .map(|start| start..(start + align).min(write_nbytes))
520 : {
521 320 : let off = file.write_raw(&content[range], &ctx).await.unwrap();
522 320 : value_offsets.push(off);
523 : }
524 :
525 1 : assert_eq!(file.len() as usize, write_nbytes);
526 320 : for (i, range) in (0..write_nbytes)
527 1 : .step_by(align)
528 320 : .map(|start| start..(start + align).min(write_nbytes))
529 1 : .enumerate()
530 : {
531 320 : assert_eq!(value_offsets[i], range.start.into_u64());
532 320 : let buf = IoBufferMut::with_capacity(range.len());
533 320 : let (buf_slice, nread) = file
534 320 : .read_exact_at_eof_ok(range.start.into_u64(), buf.slice_full(), &ctx)
535 320 : .await
536 320 : .unwrap();
537 320 : let buf = buf_slice.into_inner();
538 320 : assert_eq!(nread, range.len());
539 320 : assert_eq!(&buf, &content[range]);
540 : }
541 :
542 1 : let file_contents = std::fs::read(file.file.path()).unwrap();
543 1 : assert!(file_contents == content[0..cap * 2]);
544 :
545 1 : let writer = file.buffered_writer.read().await;
546 1 : let maybe_flushed_buffer_contents = writer.inspect_maybe_flushed().unwrap();
547 1 : assert_eq!(&maybe_flushed_buffer_contents[..], &content[cap..cap * 2]);
548 :
549 1 : let mutable_buffer_contents = writer.mutable();
550 1 : assert_eq!(mutable_buffer_contents, &content[cap * 2..write_nbytes]);
551 1 : }
552 :
553 : #[tokio::test]
554 1 : async fn test_flushes_do_happen() {
555 1 : let (conf, tenant_id, timeline_id, ctx) = harness("test_flushes_do_happen").unwrap();
556 :
557 1 : let gate = utils::sync::gate::Gate::default();
558 1 : let cancel = CancellationToken::new();
559 1 : let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
560 1 : .await
561 1 : .unwrap();
562 :
563 : // mutable buffer and maybe_flushed buffer each has `cap` bytes.
564 1 : let writer = file.buffered_writer.read().await;
565 1 : let cap = writer.mutable().capacity();
566 1 : drop(writer);
567 :
568 1 : let content: Vec<u8> = rand::thread_rng()
569 1 : .sample_iter(rand::distributions::Standard)
570 1 : .take(cap * 2 + cap / 2)
571 1 : .collect();
572 :
573 1 : file.write_raw(&content, &ctx).await.unwrap();
574 :
575 : // assert the state is as this test expects it to be
576 1 : let load_io_buf_res = file.load_to_io_buf(&ctx).await.unwrap();
577 1 : assert_eq!(&load_io_buf_res[..], &content[0..cap * 2 + cap / 2]);
578 1 : let md = file.file.path().metadata().unwrap();
579 1 : assert_eq!(
580 1 : md.len(),
581 1 : 2 * cap.into_u64(),
582 0 : "buffered writer requires one write to be flushed if we write 2.5x buffer capacity"
583 : );
584 1 : let writer = file.buffered_writer.read().await;
585 1 : assert_eq!(
586 1 : &writer.inspect_maybe_flushed().unwrap()[0..cap],
587 1 : &content[cap..cap * 2]
588 : );
589 1 : assert_eq!(
590 1 : &writer.mutable()[0..cap / 2],
591 1 : &content[cap * 2..cap * 2 + cap / 2]
592 1 : );
593 1 : }
594 :
595 : #[tokio::test]
596 1 : async fn test_read_split_across_file_and_buffer() {
597 : // This test exercises the logic on the read path that splits the logical read
598 : // into a read from the flushed part (= the file) and a copy from the buffered writer's buffer.
599 : //
600 : // This test build on the assertions in test_flushes_do_happen
601 :
602 1 : let (conf, tenant_id, timeline_id, ctx) =
603 1 : harness("test_read_split_across_file_and_buffer").unwrap();
604 :
605 1 : let gate = utils::sync::gate::Gate::default();
606 1 : let cancel = CancellationToken::new();
607 :
608 1 : let file = EphemeralFile::create(conf, tenant_id, timeline_id, &gate, &cancel, &ctx)
609 1 : .await
610 1 : .unwrap();
611 :
612 1 : let writer = file.buffered_writer.read().await;
613 1 : let mutable = writer.mutable();
614 1 : let cap = mutable.capacity();
615 1 : let align = mutable.align();
616 1 : drop(writer);
617 1 : let content: Vec<u8> = rand::thread_rng()
618 1 : .sample_iter(rand::distributions::Standard)
619 1 : .take(cap * 2 + cap / 2)
620 1 : .collect();
621 :
622 1 : let (_, control) = file.write_raw_controlled(&content, &ctx).await.unwrap();
623 :
624 27 : let test_read = |start: usize, len: usize| {
625 27 : let file = &file;
626 27 : let ctx = &ctx;
627 27 : let content = &content;
628 27 : async move {
629 27 : let (buf, nread) = file
630 27 : .read_exact_at_eof_ok(
631 27 : start.into_u64(),
632 27 : IoBufferMut::with_capacity(len).slice_full(),
633 27 : ctx,
634 27 : )
635 27 : .await
636 27 : .unwrap();
637 27 : assert_eq!(nread, len);
638 27 : assert_eq!(&buf.into_inner(), &content[start..(start + len)]);
639 27 : }
640 27 : };
641 :
642 3 : let test_read_all_offset_combinations = || {
643 3 : async move {
644 3 : test_read(align, align).await;
645 : // border onto edge of file
646 3 : test_read(cap - align, align).await;
647 : // read across file and buffer
648 3 : test_read(cap - align, 2 * align).await;
649 : // stay from start of maybe flushed buffer
650 3 : test_read(cap, align).await;
651 : // completely within maybe flushed buffer
652 3 : test_read(cap + align, align).await;
653 : // border onto edge of maybe flushed buffer.
654 3 : test_read(cap * 2 - align, align).await;
655 : // read across maybe flushed and mutable buffer
656 3 : test_read(cap * 2 - align, 2 * align).await;
657 : // read across three segments
658 3 : test_read(cap - align, cap + 2 * align).await;
659 : // completely within mutable buffer
660 3 : test_read(cap * 2 + align, align).await;
661 3 : }
662 3 : };
663 :
664 : // completely within the file range
665 1 : assert!(align < cap, "test assumption");
666 1 : assert!(cap % align == 0);
667 :
668 : // test reads at different flush stages.
669 1 : let not_started = control.unwrap().into_not_started();
670 1 : test_read_all_offset_combinations().await;
671 1 : let in_progress = not_started.ready_to_flush();
672 1 : test_read_all_offset_combinations().await;
673 1 : in_progress.wait_until_flush_is_done().await;
674 1 : test_read_all_offset_combinations().await;
675 1 : }
676 : }
|