Line data Source code
1 : use std::sync::Arc;
2 :
3 : use ::metrics::IntGauge;
4 : use bytes::{Buf, BufMut, Bytes};
5 : use pageserver_api::key::{Key, AUX_KEY_PREFIX, METADATA_KEY_SIZE};
6 : use tracing::warn;
7 :
8 : // BEGIN Copyright (c) 2017 Servo Contributors
9 :
10 : /// Const version of FNV hash.
11 : #[inline]
12 : #[must_use]
13 138 : pub const fn fnv_hash(bytes: &[u8]) -> u128 {
14 : const INITIAL_STATE: u128 = 0x6c62272e07bb014262b821756295c58d;
15 : const PRIME: u128 = 0x0000000001000000000000000000013B;
16 :
17 138 : let mut hash = INITIAL_STATE;
18 138 : let mut i = 0;
19 1056 : while i < bytes.len() {
20 918 : hash ^= bytes[i] as u128;
21 918 : hash = hash.wrapping_mul(PRIME);
22 918 : i += 1;
23 918 : }
24 138 : hash
25 138 : }
26 :
27 : // END Copyright (c) 2017 Servo Contributors
28 :
29 : /// Create a metadata key from a hash, encoded as [AUX_KEY_PREFIX, 2B directory prefix, least significant 13B of FNV hash].
30 120 : fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key {
31 120 : let mut key: [u8; 16] = [0; METADATA_KEY_SIZE];
32 120 : let hash = fnv_hash(data).to_be_bytes();
33 120 : key[0] = AUX_KEY_PREFIX;
34 120 : key[1] = dir_level1;
35 120 : key[2] = dir_level2;
36 120 : key[3..16].copy_from_slice(&hash[3..16]);
37 120 : Key::from_metadata_key_fixed_size(&key)
38 120 : }
39 :
40 : const AUX_DIR_PG_LOGICAL: u8 = 0x01;
41 : const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
42 : const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
43 :
44 : /// Encode the aux file into a fixed-size key.
45 : ///
46 : /// The first byte is the AUX key prefix. We use the next 2 bytes of the key for the directory / aux file type.
47 : /// We have one-to-one mapping for each of the aux file that we support. We hash the remaining part of the path
48 : /// (usually a single file name, or several components) into 13-byte hash. The way we determine the 2-byte prefix
49 : /// is roughly based on the first two components of the path, one unique number for one component.
50 : ///
51 : /// * pg_logical/mappings -> 0x0101
52 : /// * pg_logical/snapshots -> 0x0102
53 : /// * pg_logical/replorigin_checkpoint -> 0x0103
54 : /// * pg_logical/others -> 0x01FF
55 : /// * pg_replslot/ -> 0x0201
56 : /// * others -> 0xFFFF
57 : ///
58 : /// If you add new AUX files to this function, please also add a test case to `test_encoding_portable`.
59 : /// The new file type must have never been written to the storage before. Otherwise, there could be data
60 : /// corruptions as the new file belongs to a new prefix but it might have been stored under the `others` prefix.
61 120 : pub fn encode_aux_file_key(path: &str) -> Key {
62 120 : if let Some(fname) = path.strip_prefix("pg_logical/mappings/") {
63 49 : aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0x01, fname.as_bytes())
64 71 : } else if let Some(fname) = path.strip_prefix("pg_logical/snapshots/") {
65 7 : aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0x02, fname.as_bytes())
66 64 : } else if path == "pg_logical/replorigin_checkpoint" {
67 7 : aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0x03, b"")
68 57 : } else if let Some(fname) = path.strip_prefix("pg_logical/") {
69 7 : if cfg!(debug_assertions) {
70 7 : warn!(
71 0 : "unsupported pg_logical aux file type: {}, putting to 0x01FF, would affect path scanning",
72 : path
73 : );
74 0 : }
75 7 : aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
76 50 : } else if let Some(fname) = path.strip_prefix("pg_replslot/") {
77 7 : aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
78 : } else {
79 43 : if cfg!(debug_assertions) {
80 43 : warn!(
81 0 : "unsupported aux file type: {}, putting to 0xFFFF, would affect path scanning",
82 : path
83 : );
84 0 : }
85 43 : aux_hash_to_metadata_key(AUX_DIR_PG_UNKNOWN, 0xFF, path.as_bytes())
86 : }
87 120 : }
88 :
89 : const AUX_FILE_ENCODING_VERSION: u8 = 0x01;
90 :
91 30 : pub fn decode_file_value(val: &[u8]) -> anyhow::Result<Vec<(&str, &[u8])>> {
92 30 : let mut ptr = val;
93 30 : if ptr.is_empty() {
94 : // empty value = no files
95 6 : return Ok(Vec::new());
96 24 : }
97 24 : assert_eq!(
98 24 : ptr.get_u8(),
99 : AUX_FILE_ENCODING_VERSION,
100 0 : "unsupported aux file value"
101 : );
102 24 : let mut files = vec![];
103 54 : while ptr.has_remaining() {
104 30 : let key_len = ptr.get_u32() as usize;
105 30 : let key = &ptr[..key_len];
106 30 : ptr.advance(key_len);
107 30 : let val_len = ptr.get_u32() as usize;
108 30 : let content = &ptr[..val_len];
109 30 : ptr.advance(val_len);
110 :
111 30 : let path = std::str::from_utf8(key)?;
112 30 : files.push((path, content));
113 : }
114 24 : Ok(files)
115 30 : }
116 :
117 : /// Decode an aux file key-value pair into a list of files. The returned `Bytes` contains reference
118 : /// to the original value slice. Be cautious about memory consumption.
119 108 : pub fn decode_file_value_bytes(val: &Bytes) -> anyhow::Result<Vec<(String, Bytes)>> {
120 108 : let mut ptr = val.clone();
121 108 : if ptr.is_empty() {
122 : // empty value = no files
123 6 : return Ok(Vec::new());
124 102 : }
125 102 : assert_eq!(
126 102 : ptr.get_u8(),
127 : AUX_FILE_ENCODING_VERSION,
128 0 : "unsupported aux file value"
129 : );
130 102 : let mut files = vec![];
131 204 : while ptr.has_remaining() {
132 102 : let key_len = ptr.get_u32() as usize;
133 102 : let key = ptr.slice(..key_len);
134 102 : ptr.advance(key_len);
135 102 : let val_len = ptr.get_u32() as usize;
136 102 : let content = ptr.slice(..val_len);
137 102 : ptr.advance(val_len);
138 :
139 102 : let path = std::str::from_utf8(&key)?.to_string();
140 102 : files.push((path, content));
141 : }
142 102 : Ok(files)
143 108 : }
144 :
145 90 : pub fn encode_file_value(files: &[(&str, &[u8])]) -> anyhow::Result<Vec<u8>> {
146 90 : if files.is_empty() {
147 : // no files = empty value
148 12 : return Ok(Vec::new());
149 78 : }
150 78 : let mut encoded = vec![];
151 78 : encoded.put_u8(AUX_FILE_ENCODING_VERSION);
152 162 : for (path, content) in files {
153 84 : if path.len() > u32::MAX as usize {
154 0 : anyhow::bail!("{} exceeds path size limit", path);
155 84 : }
156 84 : encoded.put_u32(path.len() as u32);
157 84 : encoded.put_slice(path.as_bytes());
158 84 : if content.len() > u32::MAX as usize {
159 0 : anyhow::bail!("{} exceeds content size limit", path);
160 84 : }
161 84 : encoded.put_u32(content.len() as u32);
162 84 : encoded.put_slice(content);
163 : }
164 78 : Ok(encoded)
165 90 : }
166 :
167 : /// An estimation of the size of aux files.
168 : pub struct AuxFileSizeEstimator {
169 : aux_file_size_gauge: IntGauge,
170 : size: Arc<std::sync::Mutex<Option<isize>>>,
171 : }
172 :
173 : impl AuxFileSizeEstimator {
174 1242 : pub fn new(aux_file_size_gauge: IntGauge) -> Self {
175 1242 : Self {
176 1242 : aux_file_size_gauge,
177 1242 : size: Arc::new(std::sync::Mutex::new(None)),
178 1242 : }
179 1242 : }
180 :
181 : /// When generating base backup or doing initial logical size calculation
182 72 : pub fn on_initial(&self, new_size: usize) {
183 72 : let mut guard = self.size.lock().unwrap();
184 72 : *guard = Some(new_size as isize);
185 72 : self.report(new_size as isize);
186 72 : }
187 :
188 60 : pub fn on_add(&self, file_size: usize) {
189 60 : let mut guard = self.size.lock().unwrap();
190 60 : if let Some(size) = &mut *guard {
191 24 : *size += file_size as isize;
192 24 : self.report(*size);
193 36 : }
194 60 : }
195 :
196 6 : pub fn on_remove(&self, file_size: usize) {
197 6 : let mut guard = self.size.lock().unwrap();
198 6 : if let Some(size) = &mut *guard {
199 6 : *size -= file_size as isize;
200 6 : self.report(*size);
201 6 : }
202 6 : }
203 :
204 12 : pub fn on_update(&self, old_size: usize, new_size: usize) {
205 12 : let mut guard = self.size.lock().unwrap();
206 12 : if let Some(size) = &mut *guard {
207 12 : *size += new_size as isize - old_size as isize;
208 12 : self.report(*size);
209 12 : }
210 12 : }
211 :
212 114 : pub fn report(&self, size: isize) {
213 114 : self.aux_file_size_gauge.set(size as i64);
214 114 : }
215 : }
216 :
217 : #[cfg(test)]
218 : mod tests {
219 : use super::*;
220 :
221 : #[test]
222 6 : fn test_hash_portable() {
223 6 : // AUX file encoding requires the hash to be portable across all platforms. This test case checks
224 6 : // if the algorithm produces the same hash across different environments.
225 6 :
226 6 : assert_eq!(
227 6 : 265160408618497461376862998434862070044,
228 6 : super::fnv_hash("test1".as_bytes())
229 6 : );
230 6 : assert_eq!(
231 6 : 295486155126299629456360817749600553988,
232 6 : super::fnv_hash("test/test2".as_bytes())
233 6 : );
234 6 : assert_eq!(
235 6 : 144066263297769815596495629667062367629,
236 6 : super::fnv_hash("".as_bytes())
237 6 : );
238 6 : }
239 :
240 : #[test]
241 6 : fn test_encoding_portable() {
242 6 : // To correct retrieve AUX files, the generated keys for the same file must be the same for all versions
243 6 : // of the page server.
244 6 : assert_eq!(
245 6 : "62000001017F8B83D94F7081693471ABF91C",
246 6 : encode_aux_file_key("pg_logical/mappings/test1").to_string(),
247 6 : );
248 6 : assert_eq!(
249 6 : "62000001027F8E83D94F7081693471ABFCCD",
250 6 : encode_aux_file_key("pg_logical/snapshots/test2").to_string(),
251 6 : );
252 6 : assert_eq!(
253 6 : "62000001032E07BB014262B821756295C58D",
254 6 : encode_aux_file_key("pg_logical/replorigin_checkpoint").to_string(),
255 6 : );
256 6 : assert_eq!(
257 6 : "62000001FF4F38E1C74754E7D03C1A660178",
258 6 : encode_aux_file_key("pg_logical/unsupported").to_string(),
259 6 : );
260 6 : assert_eq!(
261 6 : "62000002017F8D83D94F7081693471ABFB92",
262 6 : encode_aux_file_key("pg_replslot/test3").to_string()
263 6 : );
264 6 : assert_eq!(
265 6 : "620000FFFF2B6ECC8AEF93F643DC44F15E03",
266 6 : encode_aux_file_key("other_file_not_supported").to_string(),
267 6 : );
268 6 : }
269 :
270 : #[test]
271 6 : fn test_value_encoding() {
272 6 : let files = vec![
273 6 : ("pg_logical/1.file", "1111".as_bytes()),
274 6 : ("pg_logical/2.file", "2222".as_bytes()),
275 6 : ];
276 6 : assert_eq!(
277 6 : files,
278 6 : decode_file_value(&encode_file_value(&files).unwrap()).unwrap()
279 6 : );
280 6 : let files = vec![];
281 6 : assert_eq!(
282 6 : files,
283 6 : decode_file_value(&encode_file_value(&files).unwrap()).unwrap()
284 6 : );
285 6 : }
286 : }
|