Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::config::PageServerConf;
5 : use crate::context::RequestContext;
6 : use crate::page_cache;
7 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
8 : use crate::virtual_file::{self, VirtualFile};
9 : use camino::Utf8PathBuf;
10 : use pageserver_api::shard::TenantShardId;
11 :
12 : use std::io;
13 : use std::sync::atomic::AtomicU64;
14 : use utils::id::TimelineId;
15 :
16 : pub struct EphemeralFile {
17 : _tenant_shard_id: TenantShardId,
18 : _timeline_id: TimelineId,
19 :
20 : rw: page_caching::RW,
21 : }
22 :
23 : mod page_caching;
24 : mod zero_padded_read_write;
25 :
26 : impl EphemeralFile {
27 1268 : pub async fn create(
28 1268 : conf: &PageServerConf,
29 1268 : tenant_shard_id: TenantShardId,
30 1268 : timeline_id: TimelineId,
31 1268 : gate_guard: utils::sync::gate::GateGuard,
32 1268 : ctx: &RequestContext,
33 1268 : ) -> Result<EphemeralFile, io::Error> {
34 1268 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
35 1268 : let filename_disambiguator =
36 1268 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
37 1268 :
38 1268 : let filename = conf
39 1268 : .timeline_path(&tenant_shard_id, &timeline_id)
40 1268 : .join(Utf8PathBuf::from(format!(
41 1268 : "ephemeral-{filename_disambiguator}"
42 1268 : )));
43 :
44 1268 : let file = VirtualFile::open_with_options(
45 1268 : &filename,
46 1268 : virtual_file::OpenOptions::new()
47 1268 : .read(true)
48 1268 : .write(true)
49 1268 : .create(true),
50 1268 : ctx,
51 1268 : )
52 716 : .await?;
53 :
54 1268 : Ok(EphemeralFile {
55 1268 : _tenant_shard_id: tenant_shard_id,
56 1268 : _timeline_id: timeline_id,
57 1268 : rw: page_caching::RW::new(file, gate_guard),
58 1268 : })
59 1268 : }
60 :
61 5092842 : pub(crate) fn len(&self) -> u64 {
62 5092842 : self.rw.bytes_written()
63 5092842 : }
64 :
65 1264 : pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
66 1264 : self.rw.page_cache_file_id()
67 1264 : }
68 :
69 : /// See [`self::page_caching::RW::load_to_vec`].
70 968 : pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
71 968 : self.rw.load_to_vec(ctx).await
72 968 : }
73 :
74 524745 : pub(crate) async fn read_blk(
75 524745 : &self,
76 524745 : blknum: u32,
77 524745 : ctx: &RequestContext,
78 524745 : ) -> Result<BlockLease, io::Error> {
79 524745 : self.rw.read_blk(blknum, ctx).await
80 524745 : }
81 :
82 5110816 : pub(crate) async fn write_blob(
83 5110816 : &mut self,
84 5110816 : srcbuf: &[u8],
85 5110816 : ctx: &RequestContext,
86 5110816 : ) -> Result<u64, io::Error> {
87 5110816 : let pos = self.rw.bytes_written();
88 5110816 :
89 5110816 : // Write the length field
90 5110816 : if srcbuf.len() < 0x80 {
91 : // short one-byte length header
92 4956998 : let len_buf = [srcbuf.len() as u8];
93 4956998 :
94 4956998 : self.rw.write_all_borrowed(&len_buf, ctx).await?;
95 : } else {
96 153818 : let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
97 153818 : len_buf[0] |= 0x80;
98 153818 : self.rw.write_all_borrowed(&len_buf, ctx).await?;
99 : }
100 :
101 : // Write the payload
102 5110816 : self.rw.write_all_borrowed(srcbuf, ctx).await?;
103 :
104 5110816 : Ok(pos)
105 5110816 : }
106 : }
107 :
108 : /// Does the given filename look like an ephemeral file?
109 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
110 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
111 0 : rest.parse::<u32>().is_ok()
112 : } else {
113 0 : false
114 : }
115 0 : }
116 :
117 : impl BlockReader for EphemeralFile {
118 606151 : fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
119 606151 : BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
120 606151 : }
121 : }
122 :
123 : #[cfg(test)]
124 : mod tests {
125 : use super::*;
126 : use crate::context::DownloadBehavior;
127 : use crate::task_mgr::TaskKind;
128 : use crate::tenant::block_io::BlockReaderRef;
129 : use rand::{thread_rng, RngCore};
130 : use std::fs;
131 : use std::str::FromStr;
132 :
133 4 : fn harness(
134 4 : test_name: &str,
135 4 : ) -> Result<
136 4 : (
137 4 : &'static PageServerConf,
138 4 : TenantShardId,
139 4 : TimelineId,
140 4 : RequestContext,
141 4 : ),
142 4 : io::Error,
143 4 : > {
144 4 : let repo_dir = PageServerConf::test_repo_dir(test_name);
145 4 : let _ = fs::remove_dir_all(&repo_dir);
146 4 : let conf = PageServerConf::dummy_conf(repo_dir);
147 4 : // Make a static copy of the config. This can never be free'd, but that's
148 4 : // OK in a test.
149 4 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
150 4 :
151 4 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
152 4 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
153 4 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
154 :
155 4 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
156 4 :
157 4 : Ok((conf, tenant_shard_id, timeline_id, ctx))
158 4 : }
159 :
160 : #[tokio::test]
161 2 : async fn test_ephemeral_blobs() -> Result<(), io::Error> {
162 2 : let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
163 2 :
164 2 : let gate = utils::sync::gate::Gate::default();
165 2 :
166 2 : let entered = gate.enter().unwrap();
167 2 :
168 2 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, entered, &ctx).await?;
169 2 :
170 2 : let pos_foo = file.write_blob(b"foo", &ctx).await?;
171 2 : assert_eq!(
172 2 : b"foo",
173 2 : file.block_cursor()
174 2 : .read_blob(pos_foo, &ctx)
175 2 : .await?
176 2 : .as_slice()
177 2 : );
178 2 : let pos_bar = file.write_blob(b"bar", &ctx).await?;
179 2 : assert_eq!(
180 2 : b"foo",
181 2 : file.block_cursor()
182 2 : .read_blob(pos_foo, &ctx)
183 2 : .await?
184 2 : .as_slice()
185 2 : );
186 2 : assert_eq!(
187 2 : b"bar",
188 2 : file.block_cursor()
189 2 : .read_blob(pos_bar, &ctx)
190 2 : .await?
191 2 : .as_slice()
192 2 : );
193 2 :
194 2 : let mut blobs = Vec::new();
195 20002 : for i in 0..10000 {
196 20000 : let data = Vec::from(format!("blob{}", i).as_bytes());
197 20000 : let pos = file.write_blob(&data, &ctx).await?;
198 20000 : blobs.push((pos, data));
199 2 : }
200 2 : // also test with a large blobs
201 202 : for i in 0..100 {
202 200 : let data = format!("blob{}", i).as_bytes().repeat(100);
203 200 : let pos = file.write_blob(&data, &ctx).await?;
204 200 : blobs.push((pos, data));
205 2 : }
206 2 :
207 2 : let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
208 20202 : for (pos, expected) in blobs {
209 20200 : let actual = cursor.read_blob(pos, &ctx).await?;
210 20200 : assert_eq!(actual, expected);
211 2 : }
212 2 :
213 2 : // Test a large blob that spans multiple pages
214 2 : let mut large_data = vec![0; 20000];
215 2 : thread_rng().fill_bytes(&mut large_data);
216 2 : let pos_large = file.write_blob(&large_data, &ctx).await?;
217 2 : let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
218 2 : assert_eq!(result, large_data);
219 2 :
220 2 : Ok(())
221 2 : }
222 :
223 : #[tokio::test]
224 2 : async fn ephemeral_file_holds_gate_open() {
225 2 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(5);
226 2 :
227 2 : let (conf, tenant_id, timeline_id, ctx) =
228 2 : harness("ephemeral_file_holds_gate_open").unwrap();
229 2 :
230 2 : let gate = utils::sync::gate::Gate::default();
231 2 :
232 2 : let file = EphemeralFile::create(conf, tenant_id, timeline_id, gate.enter().unwrap(), &ctx)
233 2 : .await
234 2 : .unwrap();
235 2 :
236 2 : let mut closing = tokio::task::spawn(async move {
237 4 : gate.close().await;
238 2 : });
239 2 :
240 2 : // gate is entered until the ephemeral file is dropped
241 2 : // do not start paused tokio-epoll-uring has a sleep loop
242 2 : tokio::time::pause();
243 2 : tokio::time::timeout(FOREVER, &mut closing)
244 2 : .await
245 2 : .expect_err("closing cannot complete before dropping");
246 2 :
247 2 : // this is a requirement of the reset_tenant functionality: we have to be able to restart a
248 2 : // tenant fast, and for that, we need all tenant_dir operations be guarded by entering a gate
249 2 : drop(file);
250 2 :
251 2 : tokio::time::timeout(FOREVER, &mut closing)
252 2 : .await
253 2 : .expect("closing completes right away")
254 2 : .expect("closing does not panic");
255 2 : }
256 : }
|