Line data Source code
1 : use crate::walrecord::NeonWalRecord;
2 : use anyhow::Result;
3 : use bytes::Bytes;
4 : use serde::{Deserialize, Serialize};
5 : use std::ops::AddAssign;
6 : use std::time::Duration;
7 :
8 : pub use pageserver_api::key::{Key, KEY_SIZE};
9 :
10 : /// A 'value' stored for a one Key.
11 16484658 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
12 : pub enum Value {
13 : /// An Image value contains a full copy of the value
14 : Image(Bytes),
15 : /// A WalRecord value contains a WAL record that needs to be
16 : /// replayed get the full value. Replaying the WAL record
17 : /// might need a previous version of the value (if will_init()
18 : /// returns false), or it may be replayed stand-alone (true).
19 : WalRecord(NeonWalRecord),
20 : }
21 :
22 : impl Value {
23 1812 : pub fn is_image(&self) -> bool {
24 1812 : matches!(self, Value::Image(_))
25 1812 : }
26 :
27 21534126 : pub fn will_init(&self) -> bool {
28 21534126 : match self {
29 21095466 : Value::Image(_) => true,
30 438660 : Value::WalRecord(rec) => rec.will_init(),
31 : }
32 21534126 : }
33 : }
34 :
35 : #[derive(Debug, PartialEq)]
36 : pub(crate) enum InvalidInput {
37 : TooShortValue,
38 : TooShortPostgresRecord,
39 : }
40 :
41 : /// We could have a ValueRef where everything is `serde(borrow)`. Before implementing that, lets
42 : /// use this type for querying if a slice looks some particular way.
43 : pub(crate) struct ValueBytes;
44 :
45 : impl ValueBytes {
46 300 : pub(crate) fn will_init(raw: &[u8]) -> Result<bool, InvalidInput> {
47 300 : if raw.len() < 12 {
48 144 : return Err(InvalidInput::TooShortValue);
49 156 : }
50 156 :
51 156 : let value_discriminator = &raw[0..4];
52 156 :
53 156 : if value_discriminator == [0, 0, 0, 0] {
54 : // Value::Image always initializes
55 48 : return Ok(true);
56 108 : }
57 108 :
58 108 : if value_discriminator != [0, 0, 0, 1] {
59 : // not a Value::WalRecord(..)
60 0 : return Ok(false);
61 108 : }
62 108 :
63 108 : let walrecord_discriminator = &raw[4..8];
64 108 :
65 108 : if walrecord_discriminator != [0, 0, 0, 0] {
66 : // only NeonWalRecord::Postgres can have will_init
67 6 : return Ok(false);
68 102 : }
69 102 :
70 102 : if raw.len() < 17 {
71 30 : return Err(InvalidInput::TooShortPostgresRecord);
72 72 : }
73 72 :
74 72 : Ok(raw[8] == 1)
75 300 : }
76 : }
77 :
78 : #[cfg(test)]
79 : mod test {
80 : use super::*;
81 :
82 : use utils::bin_ser::BeSer;
83 :
84 : macro_rules! roundtrip {
85 : ($orig:expr, $expected:expr) => {{
86 : let orig: Value = $orig;
87 :
88 : let actual = Value::ser(&orig).unwrap();
89 : let expected: &[u8] = &$expected;
90 :
91 : assert_eq!(utils::Hex(&actual), utils::Hex(expected));
92 :
93 : let deser = Value::des(&actual).unwrap();
94 :
95 : assert_eq!(orig, deser);
96 : }};
97 : }
98 :
99 : #[test]
100 6 : fn image_roundtrip() {
101 6 : let image = Bytes::from_static(b"foobar");
102 6 : let image = Value::Image(image);
103 6 :
104 6 : #[rustfmt::skip]
105 6 : let expected = [
106 6 : // top level discriminator of 4 bytes
107 6 : 0x00, 0x00, 0x00, 0x00,
108 6 : // 8 byte length
109 6 : 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
110 6 : // foobar
111 6 : 0x66, 0x6f, 0x6f, 0x62, 0x61, 0x72
112 6 : ];
113 6 :
114 6 : roundtrip!(image, expected);
115 :
116 6 : assert!(ValueBytes::will_init(&expected).unwrap());
117 6 : }
118 :
119 : #[test]
120 6 : fn walrecord_postgres_roundtrip() {
121 6 : let rec = NeonWalRecord::Postgres {
122 6 : will_init: true,
123 6 : rec: Bytes::from_static(b"foobar"),
124 6 : };
125 6 : let rec = Value::WalRecord(rec);
126 6 :
127 6 : #[rustfmt::skip]
128 6 : let expected = [
129 6 : // flattened discriminator of total 8 bytes
130 6 : 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
131 6 : // will_init
132 6 : 0x01,
133 6 : // 8 byte length
134 6 : 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
135 6 : // foobar
136 6 : 0x66, 0x6f, 0x6f, 0x62, 0x61, 0x72
137 6 : ];
138 6 :
139 6 : roundtrip!(rec, expected);
140 :
141 6 : assert!(ValueBytes::will_init(&expected).unwrap());
142 6 : }
143 :
144 : #[test]
145 6 : fn bytes_inspection_too_short_image() {
146 6 : let rec = Value::Image(Bytes::from_static(b""));
147 6 :
148 6 : #[rustfmt::skip]
149 6 : let expected = [
150 6 : // top level discriminator of 4 bytes
151 6 : 0x00, 0x00, 0x00, 0x00,
152 6 : // 8 byte length
153 6 : 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
154 6 : ];
155 6 :
156 6 : roundtrip!(rec, expected);
157 :
158 6 : assert!(ValueBytes::will_init(&expected).unwrap());
159 6 : assert_eq!(expected.len(), 12);
160 78 : for len in 0..12 {
161 72 : assert_eq!(
162 72 : ValueBytes::will_init(&expected[..len]).unwrap_err(),
163 72 : InvalidInput::TooShortValue
164 72 : );
165 : }
166 6 : }
167 :
168 : #[test]
169 6 : fn bytes_inspection_too_short_postgres_record() {
170 6 : let rec = NeonWalRecord::Postgres {
171 6 : will_init: false,
172 6 : rec: Bytes::from_static(b""),
173 6 : };
174 6 : let rec = Value::WalRecord(rec);
175 6 :
176 6 : #[rustfmt::skip]
177 6 : let expected = [
178 6 : // flattened discriminator of total 8 bytes
179 6 : 0x00, 0x00, 0x00, 0x01,
180 6 : 0x00, 0x00, 0x00, 0x00,
181 6 : // will_init
182 6 : 0x00,
183 6 : // 8 byte length
184 6 : 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
185 6 : ];
186 6 :
187 6 : roundtrip!(rec, expected);
188 :
189 6 : assert!(!ValueBytes::will_init(&expected).unwrap());
190 6 : assert_eq!(expected.len(), 17);
191 36 : for len in 12..17 {
192 30 : assert_eq!(
193 30 : ValueBytes::will_init(&expected[..len]).unwrap_err(),
194 30 : InvalidInput::TooShortPostgresRecord
195 30 : )
196 : }
197 78 : for len in 0..12 {
198 72 : assert_eq!(
199 72 : ValueBytes::will_init(&expected[..len]).unwrap_err(),
200 72 : InvalidInput::TooShortValue
201 72 : )
202 : }
203 6 : }
204 :
205 : #[test]
206 6 : fn clear_visibility_map_flags_example() {
207 6 : let rec = NeonWalRecord::ClearVisibilityMapFlags {
208 6 : new_heap_blkno: Some(0x11),
209 6 : old_heap_blkno: None,
210 6 : flags: 0x03,
211 6 : };
212 6 : let rec = Value::WalRecord(rec);
213 6 :
214 6 : #[rustfmt::skip]
215 6 : let expected = [
216 6 : // discriminators
217 6 : 0x00, 0x00, 0x00, 0x01,
218 6 : 0x00, 0x00, 0x00, 0x01,
219 6 : // Some == 1 followed by 4 bytes
220 6 : 0x01, 0x00, 0x00, 0x00, 0x11,
221 6 : // None == 0
222 6 : 0x00,
223 6 : // flags
224 6 : 0x03
225 6 : ];
226 6 :
227 6 : roundtrip!(rec, expected);
228 :
229 6 : assert!(!ValueBytes::will_init(&expected).unwrap());
230 6 : }
231 : }
232 :
233 : ///
234 : /// Result of performing GC
235 : ///
236 0 : #[derive(Default, Serialize, Debug)]
237 : pub struct GcResult {
238 : pub layers_total: u64,
239 : pub layers_needed_by_cutoff: u64,
240 : pub layers_needed_by_pitr: u64,
241 : pub layers_needed_by_branches: u64,
242 : pub layers_needed_by_leases: u64,
243 : pub layers_not_updated: u64,
244 : pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
245 :
246 : #[serde(serialize_with = "serialize_duration_as_millis")]
247 : pub elapsed: Duration,
248 :
249 : /// The layers which were garbage collected.
250 : ///
251 : /// Used in `/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc` to wait for the layers to be
252 : /// dropped in tests.
253 : #[cfg(feature = "testing")]
254 : #[serde(skip)]
255 : pub(crate) doomed_layers: Vec<crate::tenant::storage_layer::Layer>,
256 : }
257 :
258 : // helper function for `GcResult`, serializing a `Duration` as an integer number of milliseconds
259 0 : fn serialize_duration_as_millis<S>(d: &Duration, serializer: S) -> Result<S::Ok, S::Error>
260 0 : where
261 0 : S: serde::Serializer,
262 0 : {
263 0 : d.as_millis().serialize(serializer)
264 0 : }
265 :
266 : impl AddAssign for GcResult {
267 2262 : fn add_assign(&mut self, other: Self) {
268 2262 : self.layers_total += other.layers_total;
269 2262 : self.layers_needed_by_pitr += other.layers_needed_by_pitr;
270 2262 : self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
271 2262 : self.layers_needed_by_branches += other.layers_needed_by_branches;
272 2262 : self.layers_needed_by_leases += other.layers_needed_by_leases;
273 2262 : self.layers_not_updated += other.layers_not_updated;
274 2262 : self.layers_removed += other.layers_removed;
275 2262 :
276 2262 : self.elapsed += other.elapsed;
277 2262 :
278 2262 : #[cfg(feature = "testing")]
279 2262 : {
280 2262 : let mut other = other;
281 2262 : self.doomed_layers.append(&mut other.doomed_layers);
282 2262 : }
283 2262 : }
284 : }
|