Line data Source code
1 : use crate::walrecord::NeonWalRecord;
2 : use anyhow::Result;
3 : use bytes::Bytes;
4 : use serde::{Deserialize, Serialize};
5 : use std::ops::AddAssign;
6 : use std::time::Duration;
7 :
8 : pub use pageserver_api::key::{Key, KEY_SIZE};
9 :
10 : /// A 'value' stored for a one Key.
11 14196530 : #[derive(Debug, Clone, Serialize, Deserialize)]
12 : #[cfg_attr(test, derive(PartialEq))]
13 : pub enum Value {
14 : /// An Image value contains a full copy of the value
15 : Image(Bytes),
16 : /// A WalRecord value contains a WAL record that needs to be
17 : /// replayed get the full value. Replaying the WAL record
18 : /// might need a previous version of the value (if will_init()
19 : /// returns false), or it may be replayed stand-alone (true).
20 : WalRecord(NeonWalRecord),
21 : }
22 :
23 : impl Value {
24 0 : pub fn is_image(&self) -> bool {
25 0 : matches!(self, Value::Image(_))
26 0 : }
27 :
28 6449612 : pub fn will_init(&self) -> bool {
29 6449612 : match self {
30 6449592 : Value::Image(_) => true,
31 20 : Value::WalRecord(rec) => rec.will_init(),
32 : }
33 6449612 : }
34 : }
35 :
36 : #[derive(Debug, PartialEq)]
37 : pub(crate) enum InvalidInput {
38 : TooShortValue,
39 : TooShortPostgresRecord,
40 : }
41 :
42 : /// We could have a ValueRef where everything is `serde(borrow)`. Before implementing that, lets
43 : /// use this type for querying if a slice looks some particular way.
44 : pub(crate) struct ValueBytes;
45 :
46 : impl ValueBytes {
47 100 : pub(crate) fn will_init(raw: &[u8]) -> Result<bool, InvalidInput> {
48 100 : if raw.len() < 12 {
49 48 : return Err(InvalidInput::TooShortValue);
50 52 : }
51 52 :
52 52 : let value_discriminator = &raw[0..4];
53 52 :
54 52 : if value_discriminator == [0, 0, 0, 0] {
55 : // Value::Image always initializes
56 16 : return Ok(true);
57 36 : }
58 36 :
59 36 : if value_discriminator != [0, 0, 0, 1] {
60 : // not a Value::WalRecord(..)
61 0 : return Ok(false);
62 36 : }
63 36 :
64 36 : let walrecord_discriminator = &raw[4..8];
65 36 :
66 36 : if walrecord_discriminator != [0, 0, 0, 0] {
67 : // only NeonWalRecord::Postgres can have will_init
68 2 : return Ok(false);
69 34 : }
70 34 :
71 34 : if raw.len() < 17 {
72 10 : return Err(InvalidInput::TooShortPostgresRecord);
73 24 : }
74 24 :
75 24 : Ok(raw[8] == 1)
76 100 : }
77 : }
78 :
79 : #[cfg(test)]
80 : mod test {
81 : use super::*;
82 :
83 : use utils::bin_ser::BeSer;
84 :
85 : macro_rules! roundtrip {
86 : ($orig:expr, $expected:expr) => {{
87 : let orig: Value = $orig;
88 :
89 : let actual = Value::ser(&orig).unwrap();
90 : let expected: &[u8] = &$expected;
91 :
92 : assert_eq!(utils::Hex(&actual), utils::Hex(expected));
93 :
94 : let deser = Value::des(&actual).unwrap();
95 :
96 : assert_eq!(orig, deser);
97 : }};
98 : }
99 :
100 : #[test]
101 2 : fn image_roundtrip() {
102 2 : let image = Bytes::from_static(b"foobar");
103 2 : let image = Value::Image(image);
104 2 :
105 2 : #[rustfmt::skip]
106 2 : let expected = [
107 2 : // top level discriminator of 4 bytes
108 2 : 0x00, 0x00, 0x00, 0x00,
109 2 : // 8 byte length
110 2 : 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
111 2 : // foobar
112 2 : 0x66, 0x6f, 0x6f, 0x62, 0x61, 0x72
113 2 : ];
114 2 :
115 2 : roundtrip!(image, expected);
116 :
117 2 : assert!(ValueBytes::will_init(&expected).unwrap());
118 2 : }
119 :
120 : #[test]
121 2 : fn walrecord_postgres_roundtrip() {
122 2 : let rec = NeonWalRecord::Postgres {
123 2 : will_init: true,
124 2 : rec: Bytes::from_static(b"foobar"),
125 2 : };
126 2 : let rec = Value::WalRecord(rec);
127 2 :
128 2 : #[rustfmt::skip]
129 2 : let expected = [
130 2 : // flattened discriminator of total 8 bytes
131 2 : 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
132 2 : // will_init
133 2 : 0x01,
134 2 : // 8 byte length
135 2 : 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
136 2 : // foobar
137 2 : 0x66, 0x6f, 0x6f, 0x62, 0x61, 0x72
138 2 : ];
139 2 :
140 2 : roundtrip!(rec, expected);
141 :
142 2 : assert!(ValueBytes::will_init(&expected).unwrap());
143 2 : }
144 :
145 : #[test]
146 2 : fn bytes_inspection_too_short_image() {
147 2 : let rec = Value::Image(Bytes::from_static(b""));
148 2 :
149 2 : #[rustfmt::skip]
150 2 : let expected = [
151 2 : // top level discriminator of 4 bytes
152 2 : 0x00, 0x00, 0x00, 0x00,
153 2 : // 8 byte length
154 2 : 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
155 2 : ];
156 2 :
157 2 : roundtrip!(rec, expected);
158 :
159 2 : assert!(ValueBytes::will_init(&expected).unwrap());
160 2 : assert_eq!(expected.len(), 12);
161 26 : for len in 0..12 {
162 24 : assert_eq!(
163 24 : ValueBytes::will_init(&expected[..len]).unwrap_err(),
164 24 : InvalidInput::TooShortValue
165 24 : );
166 : }
167 2 : }
168 :
169 : #[test]
170 2 : fn bytes_inspection_too_short_postgres_record() {
171 2 : let rec = NeonWalRecord::Postgres {
172 2 : will_init: false,
173 2 : rec: Bytes::from_static(b""),
174 2 : };
175 2 : let rec = Value::WalRecord(rec);
176 2 :
177 2 : #[rustfmt::skip]
178 2 : let expected = [
179 2 : // flattened discriminator of total 8 bytes
180 2 : 0x00, 0x00, 0x00, 0x01,
181 2 : 0x00, 0x00, 0x00, 0x00,
182 2 : // will_init
183 2 : 0x00,
184 2 : // 8 byte length
185 2 : 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
186 2 : ];
187 2 :
188 2 : roundtrip!(rec, expected);
189 :
190 2 : assert!(!ValueBytes::will_init(&expected).unwrap());
191 2 : assert_eq!(expected.len(), 17);
192 12 : for len in 12..17 {
193 10 : assert_eq!(
194 10 : ValueBytes::will_init(&expected[..len]).unwrap_err(),
195 10 : InvalidInput::TooShortPostgresRecord
196 10 : )
197 : }
198 26 : for len in 0..12 {
199 24 : assert_eq!(
200 24 : ValueBytes::will_init(&expected[..len]).unwrap_err(),
201 24 : InvalidInput::TooShortValue
202 24 : )
203 : }
204 2 : }
205 :
206 : #[test]
207 2 : fn clear_visibility_map_flags_example() {
208 2 : let rec = NeonWalRecord::ClearVisibilityMapFlags {
209 2 : new_heap_blkno: Some(0x11),
210 2 : old_heap_blkno: None,
211 2 : flags: 0x03,
212 2 : };
213 2 : let rec = Value::WalRecord(rec);
214 2 :
215 2 : #[rustfmt::skip]
216 2 : let expected = [
217 2 : // discriminators
218 2 : 0x00, 0x00, 0x00, 0x01,
219 2 : 0x00, 0x00, 0x00, 0x01,
220 2 : // Some == 1 followed by 4 bytes
221 2 : 0x01, 0x00, 0x00, 0x00, 0x11,
222 2 : // None == 0
223 2 : 0x00,
224 2 : // flags
225 2 : 0x03
226 2 : ];
227 2 :
228 2 : roundtrip!(rec, expected);
229 :
230 2 : assert!(!ValueBytes::will_init(&expected).unwrap());
231 2 : }
232 : }
233 :
234 : ///
235 : /// Result of performing GC
236 : ///
237 0 : #[derive(Default, Serialize, Debug)]
238 : pub struct GcResult {
239 : pub layers_total: u64,
240 : pub layers_needed_by_cutoff: u64,
241 : pub layers_needed_by_pitr: u64,
242 : pub layers_needed_by_branches: u64,
243 : pub layers_needed_by_leases: u64,
244 : pub layers_not_updated: u64,
245 : pub layers_removed: u64, // # of layer files removed because they have been made obsolete by newer ondisk files.
246 :
247 : #[serde(serialize_with = "serialize_duration_as_millis")]
248 : pub elapsed: Duration,
249 :
250 : /// The layers which were garbage collected.
251 : ///
252 : /// Used in `/v1/tenant/:tenant_id/timeline/:timeline_id/do_gc` to wait for the layers to be
253 : /// dropped in tests.
254 : #[cfg(feature = "testing")]
255 : #[serde(skip)]
256 : pub(crate) doomed_layers: Vec<crate::tenant::storage_layer::Layer>,
257 : }
258 :
259 : // helper function for `GcResult`, serializing a `Duration` as an integer number of milliseconds
260 0 : fn serialize_duration_as_millis<S>(d: &Duration, serializer: S) -> Result<S::Ok, S::Error>
261 0 : where
262 0 : S: serde::Serializer,
263 0 : {
264 0 : d.as_millis().serialize(serializer)
265 0 : }
266 :
267 : impl AddAssign for GcResult {
268 754 : fn add_assign(&mut self, other: Self) {
269 754 : self.layers_total += other.layers_total;
270 754 : self.layers_needed_by_pitr += other.layers_needed_by_pitr;
271 754 : self.layers_needed_by_cutoff += other.layers_needed_by_cutoff;
272 754 : self.layers_needed_by_branches += other.layers_needed_by_branches;
273 754 : self.layers_needed_by_leases += other.layers_needed_by_leases;
274 754 : self.layers_not_updated += other.layers_not_updated;
275 754 : self.layers_removed += other.layers_removed;
276 754 :
277 754 : self.elapsed += other.elapsed;
278 754 :
279 754 : #[cfg(feature = "testing")]
280 754 : {
281 754 : let mut other = other;
282 754 : self.doomed_layers.append(&mut other.doomed_layers);
283 754 : }
284 754 : }
285 : }
|