Line data Source code
1 : use bytes::{Buf, BufMut, Bytes};
2 : use pageserver_api::key::{Key, AUX_KEY_PREFIX, METADATA_KEY_SIZE};
3 : use tracing::warn;
4 :
5 : /// Create a metadata key from a hash, encoded as [AUX_KEY_PREFIX, 2B directory prefix, first 13B of 128b xxhash].
6 12 : fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key {
7 12 : let mut key = [0; METADATA_KEY_SIZE];
8 12 : let hash = twox_hash::xxh3::hash128(data).to_be_bytes();
9 12 : key[0] = AUX_KEY_PREFIX;
10 12 : key[1] = dir_level1;
11 12 : key[2] = dir_level2;
12 12 : key[3..16].copy_from_slice(&hash[0..13]);
13 12 : Key::from_metadata_key_fixed_size(&key)
14 12 : }
15 :
16 : const AUX_DIR_PG_LOGICAL: u8 = 0x01;
17 : const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
18 : const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
19 :
20 : /// Encode the aux file into a fixed-size key.
21 : ///
22 : /// The first byte is the AUX key prefix. We use the next 2 bytes of the key for the directory / aux file type.
23 : /// We have one-to-one mapping for each of the aux file that we support. We hash the remaining part of the path
24 : /// (usually a single file name, or several components) into 13-byte hash. The way we determine the 2-byte prefix
25 : /// is roughly based on the first two components of the path, one unique number for one component.
26 : ///
27 : /// * pg_logical/mappings -> 0x0101
28 : /// * pg_logical/snapshots -> 0x0102
29 : /// * pg_logical/replorigin_checkpoint -> 0x0103
30 : /// * pg_logical/others -> 0x01FF
31 : /// * pg_replslot/ -> 0x0201
32 : /// * others -> 0xFFFF
33 : ///
34 : /// If you add new AUX files to this function, please also add a test case to `test_encoding_portable`.
35 : /// The new file type must have never been written to the storage before. Otherwise, there could be data
36 : /// corruptions as the new file belongs to a new prefix but it might have been stored under the `others` prefix.
37 12 : pub fn encode_aux_file_key(path: &str) -> Key {
38 12 : if let Some(fname) = path.strip_prefix("pg_logical/mappings/") {
39 2 : aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0x01, fname.as_bytes())
40 10 : } else if let Some(fname) = path.strip_prefix("pg_logical/snapshots/") {
41 2 : aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0x02, fname.as_bytes())
42 8 : } else if path == "pg_logical/replorigin_checkpoint" {
43 2 : aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0x03, b"")
44 6 : } else if let Some(fname) = path.strip_prefix("pg_logical/") {
45 2 : if cfg!(debug_assertions) {
46 2 : warn!(
47 0 : "unsupported pg_logical aux file type: {}, putting to 0x01FF, would affect path scanning",
48 : path
49 : );
50 0 : }
51 2 : aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
52 4 : } else if let Some(fname) = path.strip_prefix("pg_replslot/") {
53 2 : aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
54 : } else {
55 2 : if cfg!(debug_assertions) {
56 2 : warn!(
57 0 : "unsupported aux file type: {}, putting to 0xFFFF, would affect path scanning",
58 : path
59 : );
60 0 : }
61 2 : aux_hash_to_metadata_key(AUX_DIR_PG_UNKNOWN, 0xFF, path.as_bytes())
62 : }
63 12 : }
64 :
65 : const AUX_FILE_ENCODING_VERSION: u8 = 0x01;
66 :
67 4 : pub fn decode_file_value(val: &[u8]) -> anyhow::Result<Vec<(&str, &[u8])>> {
68 4 : let mut ptr = val;
69 4 : if ptr.is_empty() {
70 : // empty value = no files
71 2 : return Ok(Vec::new());
72 2 : }
73 2 : assert_eq!(
74 2 : ptr.get_u8(),
75 : AUX_FILE_ENCODING_VERSION,
76 0 : "unsupported aux file value"
77 : );
78 2 : let mut files = vec![];
79 6 : while ptr.has_remaining() {
80 4 : let key_len = ptr.get_u32() as usize;
81 4 : let key = &ptr[..key_len];
82 4 : ptr.advance(key_len);
83 4 : let val_len = ptr.get_u32() as usize;
84 4 : let content = &ptr[..val_len];
85 4 : ptr.advance(val_len);
86 :
87 4 : let path = std::str::from_utf8(key)?;
88 4 : files.push((path, content));
89 : }
90 2 : Ok(files)
91 4 : }
92 :
93 : /// Decode an aux file key-value pair into a list of files. The returned `Bytes` contains reference
94 : /// to the original value slice. Be cautious about memory consumption.
95 0 : pub fn decode_file_value_bytes(val: &Bytes) -> anyhow::Result<Vec<(String, Bytes)>> {
96 0 : let mut ptr = val.clone();
97 0 : if ptr.is_empty() {
98 : // empty value = no files
99 0 : return Ok(Vec::new());
100 0 : }
101 0 : assert_eq!(
102 0 : ptr.get_u8(),
103 : AUX_FILE_ENCODING_VERSION,
104 0 : "unsupported aux file value"
105 : );
106 0 : let mut files = vec![];
107 0 : while ptr.has_remaining() {
108 0 : let key_len = ptr.get_u32() as usize;
109 0 : let key = ptr.slice(..key_len);
110 0 : ptr.advance(key_len);
111 0 : let val_len = ptr.get_u32() as usize;
112 0 : let content = ptr.slice(..val_len);
113 0 : ptr.advance(val_len);
114 :
115 0 : let path = std::str::from_utf8(&key)?.to_string();
116 0 : files.push((path, content));
117 : }
118 0 : Ok(files)
119 0 : }
120 :
121 4 : pub fn encode_file_value(files: &[(&str, &[u8])]) -> anyhow::Result<Vec<u8>> {
122 4 : if files.is_empty() {
123 : // no files = empty value
124 2 : return Ok(Vec::new());
125 2 : }
126 2 : let mut encoded = vec![];
127 2 : encoded.put_u8(AUX_FILE_ENCODING_VERSION);
128 6 : for (path, content) in files {
129 4 : if path.len() > u32::MAX as usize {
130 0 : anyhow::bail!("{} exceeds path size limit", path);
131 4 : }
132 4 : encoded.put_u32(path.len() as u32);
133 4 : encoded.put_slice(path.as_bytes());
134 4 : if content.len() > u32::MAX as usize {
135 0 : anyhow::bail!("{} exceeds content size limit", path);
136 4 : }
137 4 : encoded.put_u32(content.len() as u32);
138 4 : encoded.put_slice(content);
139 : }
140 2 : Ok(encoded)
141 4 : }
142 :
143 : #[cfg(test)]
144 : mod tests {
145 : use super::*;
146 :
147 : #[test]
148 2 : fn test_hash_portable() {
149 2 : // AUX file encoding requires the hash to be portable across all platforms. This test case checks
150 2 : // if the algorithm produces the same hash across different environments.
151 2 : assert_eq!(
152 2 : 305317690835051308206966631765527126151,
153 2 : twox_hash::xxh3::hash128("test1".as_bytes())
154 2 : );
155 2 : assert_eq!(
156 2 : 85104974691013376326742244813280798847,
157 2 : twox_hash::xxh3::hash128("test/test2".as_bytes())
158 2 : );
159 2 : assert_eq!(0, twox_hash::xxh3::hash128("".as_bytes()));
160 2 : }
161 :
162 : #[test]
163 2 : fn test_encoding_portable() {
164 2 : // To correct retrieve AUX files, the generated keys for the same file must be the same for all versions
165 2 : // of the page server.
166 2 : assert_eq!(
167 2 : "6200000101E5B20C5F8DD5AA3289D6D9EAFA",
168 2 : encode_aux_file_key("pg_logical/mappings/test1").to_string()
169 2 : );
170 2 : assert_eq!(
171 2 : "620000010239AAC544893139B26F501B97E6",
172 2 : encode_aux_file_key("pg_logical/snapshots/test2").to_string()
173 2 : );
174 2 : assert_eq!(
175 2 : "620000010300000000000000000000000000",
176 2 : encode_aux_file_key("pg_logical/replorigin_checkpoint").to_string()
177 2 : );
178 2 : assert_eq!(
179 2 : "62000001FF8635AF2134B7266EC5B4189FD6",
180 2 : encode_aux_file_key("pg_logical/unsupported").to_string()
181 2 : );
182 2 : assert_eq!(
183 2 : "6200000201772D0E5D71DE14DA86142A1619",
184 2 : encode_aux_file_key("pg_replslot/test3").to_string()
185 2 : );
186 2 : assert_eq!(
187 2 : "620000FFFF1866EBEB53B807B26A2416F317",
188 2 : encode_aux_file_key("other_file_not_supported").to_string()
189 2 : );
190 2 : }
191 :
192 : #[test]
193 2 : fn test_value_encoding() {
194 2 : let files = vec![
195 2 : ("pg_logical/1.file", "1111".as_bytes()),
196 2 : ("pg_logical/2.file", "2222".as_bytes()),
197 2 : ];
198 2 : assert_eq!(
199 2 : files,
200 2 : decode_file_value(&encode_file_value(&files).unwrap()).unwrap()
201 2 : );
202 2 : let files = vec![];
203 2 : assert_eq!(
204 2 : files,
205 2 : decode_file_value(&encode_file_value(&files).unwrap()).unwrap()
206 2 : );
207 2 : }
208 : }
|