TLA Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::config::PageServerConf;
5 : use crate::context::RequestContext;
6 : use crate::page_cache::{self, PAGE_SZ};
7 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
8 : use crate::virtual_file::VirtualFile;
9 : use camino::Utf8PathBuf;
10 : use pageserver_api::shard::TenantShardId;
11 : use std::cmp::min;
12 : use std::fs::OpenOptions;
13 : use std::io::{self, ErrorKind};
14 : use std::ops::DerefMut;
15 : use std::sync::atomic::AtomicU64;
16 : use tracing::*;
17 : use utils::id::TimelineId;
18 :
19 : pub struct EphemeralFile {
20 : page_cache_file_id: page_cache::FileId,
21 :
22 : _tenant_shard_id: TenantShardId,
23 : _timeline_id: TimelineId,
24 : file: VirtualFile,
25 : len: u64,
26 : /// An ephemeral file is append-only.
27 : /// We keep the last page, which can still be modified, in [`Self::mutable_tail`].
28 : /// The other pages, which can no longer be modified, are accessed through the page cache.
29 : mutable_tail: [u8; PAGE_SZ],
30 : }
31 :
32 : impl EphemeralFile {
33 CBC 4824 : pub async fn create(
34 4824 : conf: &PageServerConf,
35 4824 : tenant_shard_id: TenantShardId,
36 4824 : timeline_id: TimelineId,
37 4824 : ) -> Result<EphemeralFile, io::Error> {
38 4824 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
39 4824 : let filename_disambiguator =
40 4824 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
41 4824 :
42 4824 : let filename = conf
43 4824 : .timeline_path(&tenant_shard_id, &timeline_id)
44 4824 : .join(Utf8PathBuf::from(format!(
45 4824 : "ephemeral-{filename_disambiguator}"
46 4824 : )));
47 :
48 4824 : let file = VirtualFile::open_with_options(
49 4824 : &filename,
50 4824 : OpenOptions::new().read(true).write(true).create(true),
51 4824 : )
52 UBC 0 : .await?;
53 :
54 CBC 4824 : Ok(EphemeralFile {
55 4824 : page_cache_file_id: page_cache::next_file_id(),
56 4824 : _tenant_shard_id: tenant_shard_id,
57 4824 : _timeline_id: timeline_id,
58 4824 : file,
59 4824 : len: 0,
60 4824 : mutable_tail: [0u8; PAGE_SZ],
61 4824 : })
62 4824 : }
63 :
64 565114 : pub(crate) fn len(&self) -> u64 {
65 565114 : self.len
66 565114 : }
67 :
68 62755269 : pub(crate) async fn read_blk(
69 62755269 : &self,
70 62755269 : blknum: u32,
71 62755269 : ctx: &RequestContext,
72 62755269 : ) -> Result<BlockLease, io::Error> {
73 62755195 : let flushed_blknums = 0..self.len / PAGE_SZ as u64;
74 62755195 : if flushed_blknums.contains(&(blknum as u64)) {
75 62185362 : let cache = page_cache::get();
76 62185362 : match cache
77 62185362 : .read_immutable_buf(self.page_cache_file_id, blknum, ctx)
78 801599 : .await
79 62185361 : .map_err(|e| {
80 UBC 0 : std::io::Error::new(
81 0 : std::io::ErrorKind::Other,
82 0 : // order path before error because error is anyhow::Error => might have many contexts
83 0 : format!(
84 0 : "ephemeral file: read immutable page #{}: {}: {:#}",
85 0 : blknum, self.file.path, e,
86 0 : ),
87 0 : )
88 CBC 62185361 : })? {
89 60937274 : page_cache::ReadBufResult::Found(guard) => {
90 60937274 : return Ok(BlockLease::PageReadGuard(guard))
91 : }
92 1248087 : page_cache::ReadBufResult::NotFound(mut write_guard) => {
93 1248087 : let buf: &mut [u8] = write_guard.deref_mut();
94 1248087 : debug_assert_eq!(buf.len(), PAGE_SZ);
95 1248087 : self.file
96 1248087 : .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
97 UBC 0 : .await?;
98 CBC 1248087 : let read_guard = write_guard.mark_valid();
99 1248087 : return Ok(BlockLease::PageReadGuard(read_guard));
100 : }
101 : };
102 : } else {
103 569833 : debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
104 569833 : Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
105 : }
106 62755194 : }
107 :
108 53139611 : pub(crate) async fn write_blob(
109 53139611 : &mut self,
110 53139611 : srcbuf: &[u8],
111 53139611 : ctx: &RequestContext,
112 53139611 : ) -> Result<u64, io::Error> {
113 53139611 : struct Writer<'a> {
114 53139611 : ephemeral_file: &'a mut EphemeralFile,
115 53139611 : /// The block to which the next [`push_bytes`] will write.
116 53139611 : blknum: u32,
117 53139611 : /// The offset inside the block identified by [`blknum`] to which [`push_bytes`] will write.
118 53139611 : off: usize,
119 53139611 : }
120 53139611 : impl<'a> Writer<'a> {
121 53139611 : fn new(ephemeral_file: &'a mut EphemeralFile) -> io::Result<Writer<'a>> {
122 53139611 : Ok(Writer {
123 53139611 : blknum: (ephemeral_file.len / PAGE_SZ as u64) as u32,
124 53139611 : off: (ephemeral_file.len % PAGE_SZ as u64) as usize,
125 53139611 : ephemeral_file,
126 53139611 : })
127 53139611 : }
128 53139611 : #[inline(always)]
129 106279222 : async fn push_bytes(
130 106279222 : &mut self,
131 106279222 : src: &[u8],
132 106279222 : ctx: &RequestContext,
133 106279222 : ) -> Result<(), io::Error> {
134 106279222 : let mut src_remaining = src;
135 215769135 : while !src_remaining.is_empty() {
136 109489913 : let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
137 109489913 : let n = min(dst_remaining.len(), src_remaining.len());
138 109489913 : dst_remaining[..n].copy_from_slice(&src_remaining[..n]);
139 109489913 : self.off += n;
140 109489913 : src_remaining = &src_remaining[n..];
141 109489913 : if self.off == PAGE_SZ {
142 53139611 : match self
143 3223139 : .ephemeral_file
144 3223139 : .file
145 3223139 : .write_all_at(
146 3223139 : &self.ephemeral_file.mutable_tail,
147 3223139 : self.blknum as u64 * PAGE_SZ as u64,
148 3223139 : )
149 53139611 : .await
150 53139611 : {
151 53139611 : Ok(_) => {
152 53139611 : // Pre-warm the page cache with what we just wrote.
153 53139611 : // This isn't necessary for coherency/correctness, but it's how we've always done it.
154 53139611 : let cache = page_cache::get();
155 3223139 : match cache
156 3223139 : .read_immutable_buf(
157 3223139 : self.ephemeral_file.page_cache_file_id,
158 3223139 : self.blknum,
159 3223139 : ctx,
160 3223139 : )
161 53139611 : .await
162 53139611 : {
163 53139611 : Ok(page_cache::ReadBufResult::Found(_guard)) => {
164 UBC 0 : // This function takes &mut self, so, it shouldn't be possible to reach this point.
165 0 : unreachable!("we just wrote blknum {} and this function takes &mut self, so, no concurrent read_blk is possible", self.blknum);
166 CBC 53139611 : }
167 53139611 : Ok(page_cache::ReadBufResult::NotFound(mut write_guard)) => {
168 3223139 : let buf: &mut [u8] = write_guard.deref_mut();
169 53139611 : debug_assert_eq!(buf.len(), PAGE_SZ);
170 53139611 : buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
171 3223139 : let _ = write_guard.mark_valid();
172 53139611 : // pre-warm successful
173 53139611 : }
174 53139611 : Err(e) => {
175 53139611 : error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}");
176 53139611 : // fail gracefully, it's not the end of the world if we can't pre-warm the cache here
177 53139611 : }
178 53139611 : }
179 53139611 : // Zero the buffer for re-use.
180 53139611 : // Zeroing is critical for correcntess because the write_blob code below
181 53139611 : // and similarly read_blk expect zeroed pages.
182 53139611 : self.ephemeral_file.mutable_tail.fill(0);
183 3223139 : // This block is done, move to next one.
184 3223139 : self.blknum += 1;
185 3223139 : self.off = 0;
186 53139611 : }
187 53139611 : Err(e) => {
188 UBC 0 : return Err(std::io::Error::new(
189 0 : ErrorKind::Other,
190 0 : // order error before path because path is long and error is short
191 0 : format!(
192 0 : "ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
193 0 : self.blknum,
194 0 : e,
195 0 : self.ephemeral_file.file.path,
196 0 : ),
197 0 : ));
198 CBC 53139611 : }
199 53139611 : }
200 106266774 : }
201 53139611 : }
202 106279222 : Ok(())
203 106279222 : }
204 53139611 : }
205 53139611 :
206 53139611 : let pos = self.len;
207 53139611 : let mut writer = Writer::new(self)?;
208 :
209 : // Write the length field
210 53139611 : if srcbuf.len() < 0x80 {
211 : // short one-byte length header
212 41116657 : let len_buf = [srcbuf.len() as u8];
213 41116657 : writer.push_bytes(&len_buf, ctx).await?;
214 : } else {
215 12022954 : let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
216 12022954 : len_buf[0] |= 0x80;
217 12022954 : writer.push_bytes(&len_buf, ctx).await?;
218 : }
219 :
220 : // Write the payload
221 53139611 : writer.push_bytes(srcbuf, ctx).await?;
222 :
223 53139611 : if srcbuf.len() < 0x80 {
224 41116657 : self.len += 1;
225 41116657 : } else {
226 12022954 : self.len += 4;
227 12022954 : }
228 53139611 : self.len += srcbuf.len() as u64;
229 53139611 :
230 53139611 : Ok(pos)
231 53139611 : }
232 : }
233 :
234 : /// Does the given filename look like an ephemeral file?
235 57 : pub fn is_ephemeral_file(filename: &str) -> bool {
236 57 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
237 51 : rest.parse::<u32>().is_ok()
238 : } else {
239 6 : false
240 : }
241 57 : }
242 :
243 : impl Drop for EphemeralFile {
244 4539 : fn drop(&mut self) {
245 4539 : // There might still be pages in the [`crate::page_cache`] for this file.
246 4539 : // We leave them there, [`crate::page_cache::PageCache::find_victim`] will evict them when needed.
247 4539 :
248 4539 : // unlink the file
249 4539 : let res = std::fs::remove_file(&self.file.path);
250 4539 : if let Err(e) = res {
251 66 : if e.kind() != std::io::ErrorKind::NotFound {
252 : // just never log the not found errors, we cannot do anything for them; on detach
253 : // the tenant directory is already gone.
254 : //
255 : // not found files might also be related to https://github.com/neondatabase/neon/issues/2442
256 UBC 0 : error!(
257 0 : "could not remove ephemeral file '{}': {}",
258 0 : self.file.path, e
259 0 : );
260 CBC 66 : }
261 4473 : }
262 4539 : }
263 : }
264 :
265 : impl BlockReader for EphemeralFile {
266 4672869 : fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
267 4672869 : BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
268 4672869 : }
269 : }
270 :
271 : #[cfg(test)]
272 : mod tests {
273 : use super::*;
274 : use crate::context::DownloadBehavior;
275 : use crate::task_mgr::TaskKind;
276 : use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
277 : use rand::{thread_rng, RngCore};
278 : use std::fs;
279 : use std::str::FromStr;
280 :
281 1 : fn harness(
282 1 : test_name: &str,
283 1 : ) -> Result<
284 1 : (
285 1 : &'static PageServerConf,
286 1 : TenantShardId,
287 1 : TimelineId,
288 1 : RequestContext,
289 1 : ),
290 1 : io::Error,
291 1 : > {
292 1 : let repo_dir = PageServerConf::test_repo_dir(test_name);
293 1 : let _ = fs::remove_dir_all(&repo_dir);
294 1 : let conf = PageServerConf::dummy_conf(repo_dir);
295 1 : // Make a static copy of the config. This can never be free'd, but that's
296 1 : // OK in a test.
297 1 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
298 1 :
299 1 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
300 1 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
301 1 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
302 :
303 1 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
304 1 :
305 1 : Ok((conf, tenant_shard_id, timeline_id, ctx))
306 1 : }
307 :
308 1 : #[tokio::test]
309 1 : async fn test_ephemeral_blobs() -> Result<(), io::Error> {
310 1 : let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
311 :
312 1 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
313 :
314 1 : let pos_foo = file.write_blob(b"foo", &ctx).await?;
315 1 : assert_eq!(
316 1 : b"foo",
317 1 : file.block_cursor()
318 1 : .read_blob(pos_foo, &ctx)
319 UBC 0 : .await?
320 CBC 1 : .as_slice()
321 : );
322 1 : let pos_bar = file.write_blob(b"bar", &ctx).await?;
323 1 : assert_eq!(
324 1 : b"foo",
325 1 : file.block_cursor()
326 1 : .read_blob(pos_foo, &ctx)
327 UBC 0 : .await?
328 CBC 1 : .as_slice()
329 : );
330 1 : assert_eq!(
331 1 : b"bar",
332 1 : file.block_cursor()
333 1 : .read_blob(pos_bar, &ctx)
334 UBC 0 : .await?
335 CBC 1 : .as_slice()
336 : );
337 :
338 1 : let mut blobs = Vec::new();
339 10001 : for i in 0..10000 {
340 10000 : let data = Vec::from(format!("blob{}", i).as_bytes());
341 10000 : let pos = file.write_blob(&data, &ctx).await?;
342 10000 : blobs.push((pos, data));
343 : }
344 : // also test with a large blobs
345 101 : for i in 0..100 {
346 100 : let data = format!("blob{}", i).as_bytes().repeat(100);
347 100 : let pos = file.write_blob(&data, &ctx).await?;
348 100 : blobs.push((pos, data));
349 : }
350 :
351 1 : let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
352 10101 : for (pos, expected) in blobs {
353 10100 : let actual = cursor.read_blob(pos, &ctx).await?;
354 10100 : assert_eq!(actual, expected);
355 : }
356 :
357 : // Test a large blob that spans multiple pages
358 1 : let mut large_data = vec![0; 20000];
359 1 : thread_rng().fill_bytes(&mut large_data);
360 1 : let pos_large = file.write_blob(&large_data, &ctx).await?;
361 1 : let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
362 1 : assert_eq!(result, large_data);
363 :
364 1 : Ok(())
365 : }
366 : }
|