Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::config::PageServerConf;
5 : use crate::context::RequestContext;
6 : use crate::page_cache;
7 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
8 : use crate::virtual_file::{self, VirtualFile};
9 : use camino::Utf8PathBuf;
10 : use pageserver_api::shard::TenantShardId;
11 :
12 : use std::io;
13 : use std::sync::atomic::AtomicU64;
14 : use utils::id::TimelineId;
15 :
16 : pub struct EphemeralFile {
17 : _tenant_shard_id: TenantShardId,
18 : _timeline_id: TimelineId,
19 :
20 : rw: page_caching::RW,
21 : }
22 :
23 : mod page_caching;
24 : pub(crate) use page_caching::PrewarmOnWrite as PrewarmPageCacheOnWrite;
25 : mod zero_padded_read_write;
26 :
27 : impl EphemeralFile {
28 1246 : pub async fn create(
29 1246 : conf: &PageServerConf,
30 1246 : tenant_shard_id: TenantShardId,
31 1246 : timeline_id: TimelineId,
32 1246 : ctx: &RequestContext,
33 1246 : ) -> Result<EphemeralFile, io::Error> {
34 1246 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
35 1246 : let filename_disambiguator =
36 1246 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
37 1246 :
38 1246 : let filename = conf
39 1246 : .timeline_path(&tenant_shard_id, &timeline_id)
40 1246 : .join(Utf8PathBuf::from(format!(
41 1246 : "ephemeral-{filename_disambiguator}"
42 1246 : )));
43 :
44 1246 : let file = VirtualFile::open_with_options(
45 1246 : &filename,
46 1246 : virtual_file::OpenOptions::new()
47 1246 : .read(true)
48 1246 : .write(true)
49 1246 : .create(true),
50 1246 : ctx,
51 1246 : )
52 694 : .await?;
53 :
54 1246 : Ok(EphemeralFile {
55 1246 : _tenant_shard_id: tenant_shard_id,
56 1246 : _timeline_id: timeline_id,
57 1246 : rw: page_caching::RW::new(file, conf.l0_flush.prewarm_on_write()),
58 1246 : })
59 1246 : }
60 :
61 5091694 : pub(crate) fn len(&self) -> u64 {
62 5091694 : self.rw.bytes_written()
63 5091694 : }
64 :
65 1244 : pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
66 1244 : self.rw.page_cache_file_id()
67 1244 : }
68 :
69 : /// See [`self::page_caching::RW::load_to_vec`].
70 0 : pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
71 0 : self.rw.load_to_vec(ctx).await
72 0 : }
73 :
74 4955837 : pub(crate) async fn read_blk(
75 4955837 : &self,
76 4955837 : blknum: u32,
77 4955837 : ctx: &RequestContext,
78 4955837 : ) -> Result<BlockLease, io::Error> {
79 4955837 : self.rw.read_blk(blknum, ctx).await
80 4955837 : }
81 :
82 5110656 : pub(crate) async fn write_blob(
83 5110656 : &mut self,
84 5110656 : srcbuf: &[u8],
85 5110656 : ctx: &RequestContext,
86 5110656 : ) -> Result<u64, io::Error> {
87 5110656 : let pos = self.rw.bytes_written();
88 5110656 :
89 5110656 : // Write the length field
90 5110656 : if srcbuf.len() < 0x80 {
91 : // short one-byte length header
92 4956838 : let len_buf = [srcbuf.len() as u8];
93 4956838 :
94 4956838 : self.rw.write_all_borrowed(&len_buf, ctx).await?;
95 : } else {
96 153818 : let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
97 153818 : len_buf[0] |= 0x80;
98 153818 : self.rw.write_all_borrowed(&len_buf, ctx).await?;
99 : }
100 :
101 : // Write the payload
102 5110656 : self.rw.write_all_borrowed(srcbuf, ctx).await?;
103 :
104 5110656 : Ok(pos)
105 5110656 : }
106 : }
107 :
108 : /// Does the given filename look like an ephemeral file?
109 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
110 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
111 0 : rest.parse::<u32>().is_ok()
112 : } else {
113 0 : false
114 : }
115 0 : }
116 :
117 : impl BlockReader for EphemeralFile {
118 607250 : fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
119 607250 : BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
120 607250 : }
121 : }
122 :
123 : #[cfg(test)]
124 : mod tests {
125 : use super::*;
126 : use crate::context::DownloadBehavior;
127 : use crate::task_mgr::TaskKind;
128 : use crate::tenant::block_io::BlockReaderRef;
129 : use rand::{thread_rng, RngCore};
130 : use std::fs;
131 : use std::str::FromStr;
132 :
133 2 : fn harness(
134 2 : test_name: &str,
135 2 : ) -> Result<
136 2 : (
137 2 : &'static PageServerConf,
138 2 : TenantShardId,
139 2 : TimelineId,
140 2 : RequestContext,
141 2 : ),
142 2 : io::Error,
143 2 : > {
144 2 : let repo_dir = PageServerConf::test_repo_dir(test_name);
145 2 : let _ = fs::remove_dir_all(&repo_dir);
146 2 : let conf = PageServerConf::dummy_conf(repo_dir);
147 2 : // Make a static copy of the config. This can never be free'd, but that's
148 2 : // OK in a test.
149 2 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
150 2 :
151 2 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
152 2 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
153 2 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
154 :
155 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
156 2 :
157 2 : Ok((conf, tenant_shard_id, timeline_id, ctx))
158 2 : }
159 :
160 : #[tokio::test]
161 2 : async fn test_ephemeral_blobs() -> Result<(), io::Error> {
162 2 : let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
163 2 :
164 2 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &ctx).await?;
165 2 :
166 2 : let pos_foo = file.write_blob(b"foo", &ctx).await?;
167 2 : assert_eq!(
168 2 : b"foo",
169 2 : file.block_cursor()
170 2 : .read_blob(pos_foo, &ctx)
171 2 : .await?
172 2 : .as_slice()
173 2 : );
174 2 : let pos_bar = file.write_blob(b"bar", &ctx).await?;
175 2 : assert_eq!(
176 2 : b"foo",
177 2 : file.block_cursor()
178 2 : .read_blob(pos_foo, &ctx)
179 2 : .await?
180 2 : .as_slice()
181 2 : );
182 2 : assert_eq!(
183 2 : b"bar",
184 2 : file.block_cursor()
185 2 : .read_blob(pos_bar, &ctx)
186 2 : .await?
187 2 : .as_slice()
188 2 : );
189 2 :
190 2 : let mut blobs = Vec::new();
191 20002 : for i in 0..10000 {
192 20000 : let data = Vec::from(format!("blob{}", i).as_bytes());
193 20000 : let pos = file.write_blob(&data, &ctx).await?;
194 20000 : blobs.push((pos, data));
195 2 : }
196 2 : // also test with a large blobs
197 202 : for i in 0..100 {
198 200 : let data = format!("blob{}", i).as_bytes().repeat(100);
199 200 : let pos = file.write_blob(&data, &ctx).await?;
200 200 : blobs.push((pos, data));
201 2 : }
202 2 :
203 2 : let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
204 20202 : for (pos, expected) in blobs {
205 20200 : let actual = cursor.read_blob(pos, &ctx).await?;
206 20200 : assert_eq!(actual, expected);
207 2 : }
208 2 :
209 2 : // Test a large blob that spans multiple pages
210 2 : let mut large_data = vec![0; 20000];
211 2 : thread_rng().fill_bytes(&mut large_data);
212 2 : let pos_large = file.write_blob(&large_data, &ctx).await?;
213 2 : let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
214 2 : assert_eq!(result, large_data);
215 2 :
216 2 : Ok(())
217 2 : }
218 : }
|