Line data Source code
1 : //! Implementation of append-only file data structure
2 : //! used to keep in-memory layers spilled on disk.
3 :
4 : use crate::config::PageServerConf;
5 : use crate::context::RequestContext;
6 : use crate::page_cache;
7 : use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
8 : use crate::virtual_file::{self, VirtualFile};
9 : use camino::Utf8PathBuf;
10 : use pageserver_api::shard::TenantShardId;
11 :
12 : use std::io;
13 : use std::sync::atomic::AtomicU64;
14 : use utils::id::TimelineId;
15 :
16 : pub struct EphemeralFile {
17 : _tenant_shard_id: TenantShardId,
18 : _timeline_id: TimelineId,
19 :
20 : rw: page_caching::RW,
21 : }
22 :
23 : mod page_caching;
24 : mod zero_padded_read_write;
25 :
26 : impl EphemeralFile {
27 1241 : pub async fn create(
28 1241 : conf: &PageServerConf,
29 1241 : tenant_shard_id: TenantShardId,
30 1241 : timeline_id: TimelineId,
31 1241 : ctx: &RequestContext,
32 1241 : ) -> Result<EphemeralFile, io::Error> {
33 1241 : static NEXT_FILENAME: AtomicU64 = AtomicU64::new(1);
34 1241 : let filename_disambiguator =
35 1241 : NEXT_FILENAME.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
36 1241 :
37 1241 : let filename = conf
38 1241 : .timeline_path(&tenant_shard_id, &timeline_id)
39 1241 : .join(Utf8PathBuf::from(format!(
40 1241 : "ephemeral-{filename_disambiguator}"
41 1241 : )));
42 :
43 1241 : let file = VirtualFile::open_with_options(
44 1241 : &filename,
45 1241 : virtual_file::OpenOptions::new()
46 1241 : .read(true)
47 1241 : .write(true)
48 1241 : .create(true),
49 1241 : ctx,
50 1241 : )
51 690 : .await?;
52 :
53 1241 : Ok(EphemeralFile {
54 1241 : _tenant_shard_id: tenant_shard_id,
55 1241 : _timeline_id: timeline_id,
56 1241 : rw: page_caching::RW::new(file),
57 1241 : })
58 1241 : }
59 :
60 5091649 : pub(crate) fn len(&self) -> u64 {
61 5091649 : self.rw.bytes_written()
62 5091649 : }
63 :
64 1239 : pub(crate) fn page_cache_file_id(&self) -> page_cache::FileId {
65 1239 : self.rw.page_cache_file_id()
66 1239 : }
67 :
68 4955798 : pub(crate) async fn read_blk(
69 4955798 : &self,
70 4955798 : blknum: u32,
71 4955798 : ctx: &RequestContext,
72 4955798 : ) -> Result<BlockLease, io::Error> {
73 4955798 : self.rw.read_blk(blknum, ctx).await
74 4955798 : }
75 :
76 5110616 : pub(crate) async fn write_blob(
77 5110616 : &mut self,
78 5110616 : srcbuf: &[u8],
79 5110616 : ctx: &RequestContext,
80 5110616 : ) -> Result<u64, io::Error> {
81 5110616 : let pos = self.rw.bytes_written();
82 5110616 :
83 5110616 : // Write the length field
84 5110616 : if srcbuf.len() < 0x80 {
85 : // short one-byte length header
86 4956798 : let len_buf = [srcbuf.len() as u8];
87 4956798 :
88 4956798 : self.rw.write_all_borrowed(&len_buf, ctx).await?;
89 : } else {
90 153818 : let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
91 153818 : len_buf[0] |= 0x80;
92 153818 : self.rw.write_all_borrowed(&len_buf, ctx).await?;
93 : }
94 :
95 : // Write the payload
96 5110616 : self.rw.write_all_borrowed(srcbuf, ctx).await?;
97 :
98 5110616 : Ok(pos)
99 5110616 : }
100 : }
101 :
102 : /// Does the given filename look like an ephemeral file?
103 0 : pub fn is_ephemeral_file(filename: &str) -> bool {
104 0 : if let Some(rest) = filename.strip_prefix("ephemeral-") {
105 0 : rest.parse::<u32>().is_ok()
106 : } else {
107 0 : false
108 : }
109 0 : }
110 :
111 : impl BlockReader for EphemeralFile {
112 607179 : fn block_cursor(&self) -> super::block_io::BlockCursor<'_> {
113 607179 : BlockCursor::new(super::block_io::BlockReaderRef::EphemeralFile(self))
114 607179 : }
115 : }
116 :
117 : #[cfg(test)]
118 : mod tests {
119 : use super::*;
120 : use crate::context::DownloadBehavior;
121 : use crate::task_mgr::TaskKind;
122 : use crate::tenant::block_io::BlockReaderRef;
123 : use rand::{thread_rng, RngCore};
124 : use std::fs;
125 : use std::str::FromStr;
126 :
127 2 : fn harness(
128 2 : test_name: &str,
129 2 : ) -> Result<
130 2 : (
131 2 : &'static PageServerConf,
132 2 : TenantShardId,
133 2 : TimelineId,
134 2 : RequestContext,
135 2 : ),
136 2 : io::Error,
137 2 : > {
138 2 : let repo_dir = PageServerConf::test_repo_dir(test_name);
139 2 : let _ = fs::remove_dir_all(&repo_dir);
140 2 : let conf = PageServerConf::dummy_conf(repo_dir);
141 2 : // Make a static copy of the config. This can never be free'd, but that's
142 2 : // OK in a test.
143 2 : let conf: &'static PageServerConf = Box::leak(Box::new(conf));
144 2 :
145 2 : let tenant_shard_id = TenantShardId::from_str("11000000000000000000000000000000").unwrap();
146 2 : let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
147 2 : fs::create_dir_all(conf.timeline_path(&tenant_shard_id, &timeline_id))?;
148 :
149 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
150 2 :
151 2 : Ok((conf, tenant_shard_id, timeline_id, ctx))
152 2 : }
153 :
154 : #[tokio::test]
155 2 : async fn test_ephemeral_blobs() -> Result<(), io::Error> {
156 2 : let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
157 2 :
158 2 : let mut file = EphemeralFile::create(conf, tenant_id, timeline_id, &ctx).await?;
159 2 :
160 2 : let pos_foo = file.write_blob(b"foo", &ctx).await?;
161 2 : assert_eq!(
162 2 : b"foo",
163 2 : file.block_cursor()
164 2 : .read_blob(pos_foo, &ctx)
165 2 : .await?
166 2 : .as_slice()
167 2 : );
168 2 : let pos_bar = file.write_blob(b"bar", &ctx).await?;
169 2 : assert_eq!(
170 2 : b"foo",
171 2 : file.block_cursor()
172 2 : .read_blob(pos_foo, &ctx)
173 2 : .await?
174 2 : .as_slice()
175 2 : );
176 2 : assert_eq!(
177 2 : b"bar",
178 2 : file.block_cursor()
179 2 : .read_blob(pos_bar, &ctx)
180 2 : .await?
181 2 : .as_slice()
182 2 : );
183 2 :
184 2 : let mut blobs = Vec::new();
185 20002 : for i in 0..10000 {
186 20000 : let data = Vec::from(format!("blob{}", i).as_bytes());
187 20000 : let pos = file.write_blob(&data, &ctx).await?;
188 20000 : blobs.push((pos, data));
189 2 : }
190 2 : // also test with a large blobs
191 202 : for i in 0..100 {
192 200 : let data = format!("blob{}", i).as_bytes().repeat(100);
193 200 : let pos = file.write_blob(&data, &ctx).await?;
194 200 : blobs.push((pos, data));
195 2 : }
196 2 :
197 2 : let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
198 20202 : for (pos, expected) in blobs {
199 20200 : let actual = cursor.read_blob(pos, &ctx).await?;
200 20200 : assert_eq!(actual, expected);
201 2 : }
202 2 :
203 2 : // Test a large blob that spans multiple pages
204 2 : let mut large_data = vec![0; 20000];
205 2 : thread_rng().fill_bytes(&mut large_data);
206 2 : let pos_large = file.write_blob(&large_data, &ctx).await?;
207 2 : let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
208 2 : assert_eq!(result, large_data);
209 2 :
210 2 : Ok(())
211 2 : }
212 : }
|