Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::config::PageServerConf;
5 : use crate::context::RequestContext;
6 : use crate::page_cache;
7 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
8 : use crate::virtual_file::{self, VirtualFile};
9 : use camino::Utf8PathBuf;
10 : use pageserver_api::shard::TenantShardId;
11 :
12 : use std::io;
13 : use std::sync::atomic::AtomicU64;
14 : use utils::id::TimelineId;
15 :
16 : pub struct EphemeralFile {
17 : _tenant_shard_id: TenantShardId,
18 : _timeline_id: TimelineId,
19 :
20 : rw: page_caching::RW,
21 : }
22 :
23 : mod page_caching;
24 : mod zero_padded_read_write;
25 :
26 : impl EphemeralFile {
27 1052 : pub async fn create(
28 1052 : conf: &PageServerConf,
29 1052 : tenant_shard_id: TenantShardId,
30 1052 : timeline_id: TimelineId,
31 1052 : ) -> Result<EphemeralFile, io::Error> {
32 1052 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
33 1052 : let filename_disambiguator =
34 1052 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
35 1052 :
36 1052 : let filename = conf
37 1052 : .timeline_path(&tenant_shard_id, &timeline_id)
38 1052 : .join(Utf8PathBuf::from(format!(
39 1052 : "ephemeral-{filename_disambiguator}"
40 1052 : )));
41 :
42 1052 : let file = VirtualFile::open_with_options(
43 1052 : &filename,
44 1052 : virtual_file::OpenOptions::new()
45 1052 : .read(true)
46 1052 : .write(true)
47 1052 : .create(true),
48 1052 : )
49 579 : .await?;
50 :
51 1052 : Ok(EphemeralFile {
52 1052 : _tenant_shard_id: tenant_shard_id,
53 1052 : _timeline_id: timeline_id,
54 1052 : rw: page_caching::RW::new(file),
55 1052 : })
56 1052 : }
57 :
58 9846066 : pub(crate) fn len(&self) -> u64 {
59 9846066 : self.rw.bytes_written()
60 9846066 : }
61 :
62 1050 : pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
63 1050 : self.rw.page_cache_file_id()
64 1050 : }
65 :
66 4930954 : pub(crate) async fn read_blk(
67 4930954 : &self,
68 4930954 : blknum: u32,
69 4930954 : ctx: &RequestContext,
70 4930954 : ) -> Result<BlockLease, io::Error> {
71 4930954 : self.rw.read_blk(blknum, ctx).await
72 4930954 : }
73 :
74 5086244 : pub(crate) async fn write_blob(
75 5086244 : &mut self,
76 5086244 : srcbuf: &[u8],
77 5086244 : ctx: &RequestContext,
78 5086244 : ) -> Result<u64, io::Error> {
79 5086244 : let pos = self.rw.bytes_written();
80 5086244 :
81 5086244 : // Write the length field
82 5086244 : if srcbuf.len() < 0x80 {
83 : // short one-byte length header
84 4932426 : let len_buf = [srcbuf.len() as u8];
85 4932426 :
86 4932426 : self.rw.write_all_borrowed(&len_buf, ctx).await?;
87 : } else {
88 153818 : let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
89 153818 : len_buf[0] |= 0x80;
90 153818 : self.rw.write_all_borrowed(&len_buf, ctx).await?;
91 : }
92 :
93 : // Write the payload
94 5086244 : self.rw.write_all_borrowed(srcbuf, ctx).await?;
95 :
96 5086244 : Ok(pos)
97 5086244 : }
98 : }
99 :
100 : /// Does the given filename look like an ephemeral file?
101 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
102 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
103 0 : rest.parse::<u32>().is_ok()
104 : } else {
105 0 : false
106 : }
107 0 : }
108 :
109 : impl BlockReader for EphemeralFile {
110 606419 : fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
111 606419 : BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
112 606419 : }
113 : }
114 :
115 : #[cfg(test)]
116 : mod tests {
117 : use super::*;
118 : use crate::context::DownloadBehavior;
119 : use crate::task_mgr::TaskKind;
120 : use crate::tenant::block_io::BlockReaderRef;
121 : use rand::{thread_rng, RngCore};
122 : use std::fs;
123 : use std::str::FromStr;
124 :
125 2 : fn harness(
126 2 : test_name: &str,
127 2 : ) -> Result<
128 2 : (
129 2 : &'static PageServerConf,
130 2 : TenantShardId,
131 2 : TimelineId,
132 2 : RequestContext,
133 2 : ),
134 2 : io::Error,
135 2 : > {
136 2 : let repo_dir = PageServerConf::test_repo_dir(test_name);
137 2 : let _ = fs::remove_dir_all(&repo_dir);
138 2 : let conf = PageServerConf::dummy_conf(repo_dir);
139 2 : // Make a static copy of the config. This can never be free'd, but that's
140 2 : // OK in a test.
141 2 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
142 2 :
143 2 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
144 2 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
145 2 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
146 :
147 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
148 2 :
149 2 : Ok((conf, tenant_shard_id, timeline_id, ctx))
150 2 : }
151 :
152 : #[tokio::test]
153 2 : async fn test_ephemeral_blobs() -> Result<(), io::Error> {
154 2 : let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
155 2 :
156 2 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
157 2 :
158 2 : let pos_foo = file.write_blob(b"foo", &ctx).await?;
159 2 : assert_eq!(
160 2 : b"foo",
161 2 : file.block_cursor()
162 2 : .read_blob(pos_foo, &ctx)
163 2 : .await?
164 2 : .as_slice()
165 2 : );
166 2 : let pos_bar = file.write_blob(b"bar", &ctx).await?;
167 2 : assert_eq!(
168 2 : b"foo",
169 2 : file.block_cursor()
170 2 : .read_blob(pos_foo, &ctx)
171 2 : .await?
172 2 : .as_slice()
173 2 : );
174 2 : assert_eq!(
175 2 : b"bar",
176 2 : file.block_cursor()
177 2 : .read_blob(pos_bar, &ctx)
178 2 : .await?
179 2 : .as_slice()
180 2 : );
181 2 :
182 2 : let mut blobs = Vec::new();
183 20002 : for i in 0..10000 {
184 20000 : let data = Vec::from(format!("blob{}", i).as_bytes());
185 20000 : let pos = file.write_blob(&data, &ctx).await?;
186 20000 : blobs.push((pos, data));
187 2 : }
188 2 : // also test with a large blobs
189 202 : for i in 0..100 {
190 200 : let data = format!("blob{}", i).as_bytes().repeat(100);
191 200 : let pos = file.write_blob(&data, &ctx).await?;
192 200 : blobs.push((pos, data));
193 2 : }
194 2 :
195 2 : let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
196 20202 : for (pos, expected) in blobs {
197 20200 : let actual = cursor.read_blob(pos, &ctx).await?;
198 20200 : assert_eq!(actual, expected);
199 2 : }
200 2 :
201 2 : // Test a large blob that spans multiple pages
202 2 : let mut large_data = vec![0; 20000];
203 2 : thread_rng().fill_bytes(&mut large_data);
204 2 : let pos_large = file.write_blob(&large_data, &ctx).await?;
205 2 : let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
206 2 : assert_eq!(result, large_data);
207 2 :
208 2 : Ok(())
209 2 : }
210 : }
|