Line data Source code
1 : use std::sync::Arc;
2 :
3 : use ::metrics::IntGauge;
4 : use bytes::{Buf, BufMut, Bytes};
5 : use pageserver_api::key::{Key, AUX_KEY_PREFIX, METADATA_KEY_SIZE};
6 : use tracing::warn;
7 :
8 : // BEGIN Copyright (c) 2017 Servo Contributors
9 :
10 : /// Const version of FNV hash.
11 : #[inline]
12 : #[must_use]
13 74 : pub const fn fnv_hash(bytes: &[u8]) -> u128 {
14 : const INITIAL_STATE: u128 = 0x6c62272e07bb014262b821756295c58d;
15 : const PRIME: u128 = 0x0000000001000000000000000000013B;
16 :
17 74 : let mut hash = INITIAL_STATE;
18 74 : let mut i = 0;
19 596 : while i < bytes.len() {
20 522 : hash ^= bytes[i] as u128;
21 522 : hash = hash.wrapping_mul(PRIME);
22 522 : i += 1;
23 522 : }
24 74 : hash
25 74 : }
26 :
27 : // END Copyright (c) 2017 Servo Contributors
28 :
29 : /// Create a metadata key from a hash, encoded as [AUX_KEY_PREFIX, 2B directory prefix, least significant 13B of FNV hash].
30 62 : fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key {
31 62 : let mut key: [u8; 16] = [0; METADATA_KEY_SIZE];
32 62 : let hash = fnv_hash(data).to_be_bytes();
33 62 : key[0] = AUX_KEY_PREFIX;
34 62 : key[1] = dir_level1;
35 62 : key[2] = dir_level2;
36 62 : key[3..16].copy_from_slice(&hash[3..16]);
37 62 : Key::from_metadata_key_fixed_size(&key)
38 62 : }
39 :
40 : const AUX_DIR_PG_LOGICAL: u8 = 0x01;
41 : const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
42 : const AUX_DIR_PG_STAT: u8 = 0x03;
43 : const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
44 :
45 : /// Encode the aux file into a fixed-size key.
46 : ///
47 : /// The first byte is the AUX key prefix. We use the next 2 bytes of the key for the directory / aux file type.
48 : /// We have one-to-one mapping for each of the aux file that we support. We hash the remaining part of the path
49 : /// (usually a single file name, or several components) into 13-byte hash. The way we determine the 2-byte prefix
50 : /// is roughly based on the first two components of the path, one unique number for one component.
51 : ///
52 : /// * pg_logical/mappings -> 0x0101
53 : /// * pg_logical/snapshots -> 0x0102
54 : /// * pg_logical/replorigin_checkpoint -> 0x0103
55 : /// * pg_logical/others -> 0x01FF
56 : /// * pg_replslot/ -> 0x0201
57 : /// * pg_stat/pgstat.stat -> 0x0301
58 : /// * others -> 0xFFFF
59 : ///
60 : /// If you add new AUX files to this function, please also add a test case to `test_encoding_portable`.
61 : /// The new file type must have never been written to the storage before. Otherwise, there could be data
62 : /// corruptions as the new file belongs to a new prefix but it might have been stored under the `others` prefix.
63 62 : pub fn encode_aux_file_key(path: &str) -> Key {
64 62 : if let Some(fname) = path.strip_prefix("pg_logical/mappings/") {
65 13 : aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0x01, fname.as_bytes())
66 49 : } else if let Some(fname) = path.strip_prefix("pg_logical/snapshots/") {
67 5 : aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0x02, fname.as_bytes())
68 44 : } else if path == "pg_logical/replorigin_checkpoint" {
69 5 : aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0x03, b"")
70 39 : } else if let Some(fname) = path.strip_prefix("pg_logical/") {
71 5 : if cfg!(debug_assertions) {
72 5 : warn!(
73 0 : "unsupported pg_logical aux file type: {}, putting to 0x01FF, would affect path scanning",
74 : path
75 : );
76 0 : }
77 5 : aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
78 34 : } else if let Some(fname) = path.strip_prefix("pg_replslot/") {
79 5 : aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
80 29 : } else if let Some(fname) = path.strip_prefix("pg_stat/") {
81 0 : aux_hash_to_metadata_key(AUX_DIR_PG_STAT, 0x01, fname.as_bytes())
82 : } else {
83 29 : if cfg!(debug_assertions) {
84 29 : warn!(
85 0 : "unsupported aux file type: {}, putting to 0xFFFF, would affect path scanning",
86 : path
87 : );
88 0 : }
89 29 : aux_hash_to_metadata_key(AUX_DIR_PG_UNKNOWN, 0xFF, path.as_bytes())
90 : }
91 62 : }
92 :
93 : const AUX_FILE_ENCODING_VERSION: u8 = 0x01;
94 :
95 16 : pub fn decode_file_value(val: &[u8]) -> anyhow::Result<Vec<(&str, &[u8])>> {
96 16 : let mut ptr = val;
97 16 : if ptr.is_empty() {
98 : // empty value = no files
99 4 : return Ok(Vec::new());
100 12 : }
101 12 : assert_eq!(
102 12 : ptr.get_u8(),
103 : AUX_FILE_ENCODING_VERSION,
104 0 : "unsupported aux file value"
105 : );
106 12 : let mut files = vec![];
107 28 : while ptr.has_remaining() {
108 16 : let key_len = ptr.get_u32() as usize;
109 16 : let key = &ptr[..key_len];
110 16 : ptr.advance(key_len);
111 16 : let val_len = ptr.get_u32() as usize;
112 16 : let content = &ptr[..val_len];
113 16 : ptr.advance(val_len);
114 :
115 16 : let path = std::str::from_utf8(key)?;
116 16 : files.push((path, content));
117 : }
118 12 : Ok(files)
119 16 : }
120 :
121 : /// Decode an aux file key-value pair into a list of files. The returned `Bytes` contains reference
122 : /// to the original value slice. Be cautious about memory consumption.
123 36 : pub fn decode_file_value_bytes(val: &Bytes) -> anyhow::Result<Vec<(String, Bytes)>> {
124 36 : let mut ptr = val.clone();
125 36 : if ptr.is_empty() {
126 : // empty value = no files
127 4 : return Ok(Vec::new());
128 32 : }
129 32 : assert_eq!(
130 32 : ptr.get_u8(),
131 : AUX_FILE_ENCODING_VERSION,
132 0 : "unsupported aux file value"
133 : );
134 32 : let mut files = vec![];
135 64 : while ptr.has_remaining() {
136 32 : let key_len = ptr.get_u32() as usize;
137 32 : let key = ptr.slice(..key_len);
138 32 : ptr.advance(key_len);
139 32 : let val_len = ptr.get_u32() as usize;
140 32 : let content = ptr.slice(..val_len);
141 32 : ptr.advance(val_len);
142 :
143 32 : let path = std::str::from_utf8(&key)?.to_string();
144 32 : files.push((path, content));
145 : }
146 32 : Ok(files)
147 36 : }
148 :
149 40 : pub fn encode_file_value(files: &[(&str, &[u8])]) -> anyhow::Result<Vec<u8>> {
150 40 : if files.is_empty() {
151 : // no files = empty value
152 8 : return Ok(Vec::new());
153 32 : }
154 32 : let mut encoded = vec![];
155 32 : encoded.put_u8(AUX_FILE_ENCODING_VERSION);
156 68 : for (path, content) in files {
157 36 : if path.len() > u32::MAX as usize {
158 0 : anyhow::bail!("{} exceeds path size limit", path);
159 36 : }
160 36 : encoded.put_u32(path.len() as u32);
161 36 : encoded.put_slice(path.as_bytes());
162 36 : if content.len() > u32::MAX as usize {
163 0 : anyhow::bail!("{} exceeds content size limit", path);
164 36 : }
165 36 : encoded.put_u32(content.len() as u32);
166 36 : encoded.put_slice(content);
167 : }
168 32 : Ok(encoded)
169 40 : }
170 :
171 : /// An estimation of the size of aux files.
172 : pub struct AuxFileSizeEstimator {
173 : aux_file_size_gauge: IntGauge,
174 : size: Arc<std::sync::Mutex<Option<isize>>>,
175 : }
176 :
177 : impl AuxFileSizeEstimator {
178 896 : pub fn new(aux_file_size_gauge: IntGauge) -> Self {
179 896 : Self {
180 896 : aux_file_size_gauge,
181 896 : size: Arc::new(std::sync::Mutex::new(None)),
182 896 : }
183 896 : }
184 :
185 : /// When generating base backup or doing initial logical size calculation
186 24 : pub fn on_initial(&self, new_size: usize) {
187 24 : let mut guard = self.size.lock().unwrap();
188 24 : *guard = Some(new_size as isize);
189 24 : self.report(new_size as isize);
190 24 : }
191 :
192 24 : pub fn on_add(&self, file_size: usize) {
193 24 : let mut guard = self.size.lock().unwrap();
194 24 : if let Some(size) = &mut *guard {
195 4 : *size += file_size as isize;
196 4 : self.report(*size);
197 20 : }
198 24 : }
199 :
200 4 : pub fn on_remove(&self, file_size: usize) {
201 4 : let mut guard = self.size.lock().unwrap();
202 4 : if let Some(size) = &mut *guard {
203 4 : *size -= file_size as isize;
204 4 : self.report(*size);
205 4 : }
206 4 : }
207 :
208 4 : pub fn on_update(&self, old_size: usize, new_size: usize) {
209 4 : let mut guard = self.size.lock().unwrap();
210 4 : if let Some(size) = &mut *guard {
211 4 : *size += new_size as isize - old_size as isize;
212 4 : self.report(*size);
213 4 : }
214 4 : }
215 :
216 36 : pub fn report(&self, size: isize) {
217 36 : self.aux_file_size_gauge.set(size as i64);
218 36 : }
219 : }
220 :
221 : #[cfg(test)]
222 : mod tests {
223 : use super::*;
224 :
225 : #[test]
226 4 : fn test_hash_portable() {
227 4 : // AUX file encoding requires the hash to be portable across all platforms. This test case checks
228 4 : // if the algorithm produces the same hash across different environments.
229 4 :
230 4 : assert_eq!(
231 4 : 265160408618497461376862998434862070044,
232 4 : super::fnv_hash("test1".as_bytes())
233 4 : );
234 4 : assert_eq!(
235 4 : 295486155126299629456360817749600553988,
236 4 : super::fnv_hash("test/test2".as_bytes())
237 4 : );
238 4 : assert_eq!(
239 4 : 144066263297769815596495629667062367629,
240 4 : super::fnv_hash("".as_bytes())
241 4 : );
242 4 : }
243 :
244 : #[test]
245 4 : fn test_encoding_portable() {
246 4 : // To correct retrieve AUX files, the generated keys for the same file must be the same for all versions
247 4 : // of the page server.
248 4 : assert_eq!(
249 4 : "62000001017F8B83D94F7081693471ABF91C",
250 4 : encode_aux_file_key("pg_logical/mappings/test1").to_string(),
251 4 : );
252 4 : assert_eq!(
253 4 : "62000001027F8E83D94F7081693471ABFCCD",
254 4 : encode_aux_file_key("pg_logical/snapshots/test2").to_string(),
255 4 : );
256 4 : assert_eq!(
257 4 : "62000001032E07BB014262B821756295C58D",
258 4 : encode_aux_file_key("pg_logical/replorigin_checkpoint").to_string(),
259 4 : );
260 4 : assert_eq!(
261 4 : "62000001FF4F38E1C74754E7D03C1A660178",
262 4 : encode_aux_file_key("pg_logical/unsupported").to_string(),
263 4 : );
264 4 : assert_eq!(
265 4 : "62000002017F8D83D94F7081693471ABFB92",
266 4 : encode_aux_file_key("pg_replslot/test3").to_string()
267 4 : );
268 4 : assert_eq!(
269 4 : "620000FFFF2B6ECC8AEF93F643DC44F15E03",
270 4 : encode_aux_file_key("other_file_not_supported").to_string(),
271 4 : );
272 4 : }
273 :
274 : #[test]
275 4 : fn test_value_encoding() {
276 4 : let files = vec![
277 4 : ("pg_logical/1.file", "1111".as_bytes()),
278 4 : ("pg_logical/2.file", "2222".as_bytes()),
279 4 : ];
280 4 : assert_eq!(
281 4 : files,
282 4 : decode_file_value(&encode_file_value(&files).unwrap()).unwrap()
283 4 : );
284 4 : let files = vec![];
285 4 : assert_eq!(
286 4 : files,
287 4 : decode_file_value(&encode_file_value(&files).unwrap()).unwrap()
288 4 : );
289 4 : }
290 : }
|