Line data Source code
1 : //! The heart of how [`super::EphemeralFile`] does its reads and writes.
2 : //!
3 : //! # Writes
4 : //!
5 : //! [`super::EphemeralFile`] writes small, borrowed buffers using [`RW::write_all_borrowed`].
6 : //! The [`RW`] batches these into [`TAIL_SZ`] bigger writes, using [`owned_buffers_io::write::BufferedWriter`].
7 : //!
8 : //! # Reads
9 : //!
10 : //! [`super::EphemeralFile`] always reads full [`PAGE_SZ`]ed blocks using [`RW::read_blk`].
11 : //!
12 : //! The [`RW`] serves these reads either from the buffered writer's in-memory buffer
13 : //! or redirects the caller to read from the underlying [`OwnedAsyncWriter`]
14 : //! if the read is for the prefix that has already been flushed.
15 : //!
16 : //! # Current Usage
17 : //!
18 : //! The current user of this module is [`super::page_caching::RW`].
19 :
20 : mod zero_padded;
21 :
22 : use crate::{
23 : context::RequestContext,
24 : page_cache::PAGE_SZ,
25 : virtual_file::owned_buffers_io::{
26 : self,
27 : write::{Buffer, OwnedAsyncWriter},
28 : },
29 : };
30 :
31 : const TAIL_SZ: usize = 64 * 1024;
32 :
33 : /// See module-level comment.
34 : pub struct RW<W: OwnedAsyncWriter> {
35 : buffered_writer: owned_buffers_io::write::BufferedWriter<
36 : zero_padded::Buffer<TAIL_SZ>,
37 : owned_buffers_io::util::size_tracking_writer::Writer<W>,
38 : >,
39 : }
40 :
41 : pub enum ReadResult<'a, W> {
42 : NeedsReadFromWriter { writer: &'a W },
43 : ServedFromZeroPaddedMutableTail { buffer: &'a [u8; PAGE_SZ] },
44 : }
45 :
46 : impl<W> RW<W>
47 : where
48 : W: OwnedAsyncWriter,
49 : {
50 1241 : pub fn new(writer: W) -> Self {
51 1241 : let bytes_flushed_tracker =
52 1241 : owned_buffers_io::util::size_tracking_writer::Writer::new(writer);
53 1241 : let buffered_writer = owned_buffers_io::write::BufferedWriter::new(
54 1241 : bytes_flushed_tracker,
55 1241 : zero_padded::Buffer::default(),
56 1241 : );
57 1241 : Self { buffered_writer }
58 1241 : }
59 :
60 4303496 : pub(crate) fn as_writer(&self) -> &W {
61 4303496 : self.buffered_writer.as_inner().as_inner()
62 4303496 : }
63 :
64 10221232 : pub async fn write_all_borrowed(
65 10221232 : &mut self,
66 10221232 : buf: &[u8],
67 10221232 : ctx: &RequestContext,
68 10221232 : ) -> std::io::Result<usize> {
69 10221232 : self.buffered_writer.write_buffered_borrowed(buf, ctx).await
70 10221232 : }
71 :
72 10202265 : pub fn bytes_written(&self) -> u64 {
73 10202265 : let flushed_offset = self.buffered_writer.as_inner().bytes_written();
74 10202265 : let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
75 10202265 : flushed_offset + u64::try_from(buffer.pending()).unwrap()
76 10202265 : }
77 :
78 4955798 : pub(crate) async fn read_blk(&self, blknum: u32) -> Result<ReadResult<'_, W>, std::io::Error> {
79 4955798 : let flushed_offset = self.buffered_writer.as_inner().bytes_written();
80 4955798 : let buffer: &zero_padded::Buffer<TAIL_SZ> = self.buffered_writer.inspect_buffer();
81 4955798 : let buffered_offset = flushed_offset + u64::try_from(buffer.pending()).unwrap();
82 4955798 : let read_offset = (blknum as u64) * (PAGE_SZ as u64);
83 :
84 : // The trailing page ("block") might only be partially filled,
85 : // yet the blob_io code relies on us to return a full PAGE_SZed slice anyway.
86 : // Moreover, it has to be zero-padded, because when we still had
87 : // a write-back page cache, it provided pre-zeroed pages, and blob_io came to rely on it.
88 : // DeltaLayer probably has the same issue, not sure why it needs no special treatment.
89 : // => check here that the read doesn't go beyond this potentially trailing
90 : // => the zero-padding is done in the `else` branch below
91 4955798 : let blocks_written = if buffered_offset % (PAGE_SZ as u64) == 0 {
92 64 : buffered_offset / (PAGE_SZ as u64)
93 : } else {
94 4955734 : (buffered_offset / (PAGE_SZ as u64)) + 1
95 : };
96 4955798 : if (blknum as u64) >= blocks_written {
97 0 : return Err(std::io::Error::new(std::io::ErrorKind::Other, anyhow::anyhow!("read past end of ephemeral_file: read=0x{read_offset:x} buffered=0x{buffered_offset:x} flushed=0x{flushed_offset}")));
98 4955798 : }
99 4955798 :
100 4955798 : // assertions for the `if-else` below
101 4955798 : assert_eq!(
102 4955798 : flushed_offset % (TAIL_SZ as u64), 0,
103 0 : "we only use write_buffered_borrowed to write to the buffered writer, so it's guaranteed that flushes happen buffer.cap()-sized chunks"
104 : );
105 4955798 : assert_eq!(
106 4955798 : flushed_offset % (PAGE_SZ as u64),
107 : 0,
108 0 : "the logic below can't handle if the page is spread across the flushed part and the buffer"
109 : );
110 :
111 4955798 : if read_offset < flushed_offset {
112 4302384 : assert!(read_offset + (PAGE_SZ as u64) <= flushed_offset);
113 4302384 : Ok(ReadResult::NeedsReadFromWriter {
114 4302384 : writer: self.as_writer(),
115 4302384 : })
116 : } else {
117 653414 : let read_offset_in_buffer = read_offset
118 653414 : .checked_sub(flushed_offset)
119 653414 : .expect("would have taken `if` branch instead of this one");
120 653414 : let read_offset_in_buffer = usize::try_from(read_offset_in_buffer).unwrap();
121 653414 : let zero_padded_slice = buffer.as_zero_padded_slice();
122 653414 : let page = &zero_padded_slice[read_offset_in_buffer..(read_offset_in_buffer + PAGE_SZ)];
123 653414 : Ok(ReadResult::ServedFromZeroPaddedMutableTail {
124 653414 : buffer: page
125 653414 : .try_into()
126 653414 : .expect("the slice above got it as page-size slice"),
127 653414 : })
128 : }
129 4955798 : }
130 : }
|