Line data Source code
1 : use std::time::{Duration, SystemTime};
2 :
3 : use bytes::{Buf, BufMut, Bytes, BytesMut};
4 : use pq_proto::{PG_EPOCH, read_cstr};
5 : use serde::{Deserialize, Serialize};
6 : use tracing::{trace, warn};
7 :
8 : use crate::lsn::Lsn;
9 :
10 : /// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
11 : ///
12 : /// Serialized in custom flexible key/value format. In replication protocol, it
13 : /// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
14 : /// Standby status update / Hot standby feedback messages.
15 : ///
16 : /// serde Serialize is used only for human readable dump to json (e.g. in
17 : /// safekeepers debug_dump).
18 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19 : pub struct PageserverFeedback {
20 : /// Last known size of the timeline. Used to enforce timeline size limit.
21 : pub current_timeline_size: u64,
22 : /// LSN last received and ingested by the pageserver. Controls backpressure.
23 : pub last_received_lsn: Lsn,
24 : /// LSN up to which data is persisted by the pageserver to its local disc.
25 : /// Controls backpressure.
26 : pub disk_consistent_lsn: Lsn,
27 : /// LSN up to which data is persisted by the pageserver on s3; safekeepers
28 : /// consider WAL before it can be removed.
29 : pub remote_consistent_lsn: Lsn,
30 : // Serialize with RFC3339 format.
31 : #[serde(with = "serde_systemtime")]
32 : pub replytime: SystemTime,
33 : /// Used to track feedbacks from different shards. Always zero for unsharded tenants.
34 : pub shard_number: u32,
35 : /// If true, the pageserver has detected corruption and the safekeeper and postgres
36 : /// should stop sending WAL.
37 : pub corruption_detected: bool,
38 : }
39 :
40 : impl PageserverFeedback {
41 13 : pub fn empty() -> PageserverFeedback {
42 13 : PageserverFeedback {
43 13 : current_timeline_size: 0,
44 13 : last_received_lsn: Lsn::INVALID,
45 13 : remote_consistent_lsn: Lsn::INVALID,
46 13 : disk_consistent_lsn: Lsn::INVALID,
47 13 : replytime: *PG_EPOCH,
48 13 : shard_number: 0,
49 13 : corruption_detected: false,
50 13 : }
51 13 : }
52 :
53 : // Serialize PageserverFeedback using custom format
54 : // to support protocol extensibility.
55 : //
56 : // Following layout is used:
57 : // char - number of key-value pairs that follow.
58 : //
59 : // key-value pairs:
60 : // null-terminated string - key,
61 : // uint32 - value length in bytes
62 : // value itself
63 : //
64 : // TODO: change serialized fields names once all computes migrate to rename.
65 3 : pub fn serialize(&self, buf: &mut BytesMut) {
66 3 : let buf_ptr = buf.len();
67 3 : buf.put_u8(0); // # of keys, will be filled later
68 3 : let mut nkeys = 0;
69 :
70 3 : nkeys += 1;
71 3 : buf.put_slice(b"current_timeline_size\0");
72 3 : buf.put_i32(8);
73 3 : buf.put_u64(self.current_timeline_size);
74 :
75 3 : nkeys += 1;
76 3 : buf.put_slice(b"ps_writelsn\0");
77 3 : buf.put_i32(8);
78 3 : buf.put_u64(self.last_received_lsn.0);
79 :
80 3 : nkeys += 1;
81 3 : buf.put_slice(b"ps_flushlsn\0");
82 3 : buf.put_i32(8);
83 3 : buf.put_u64(self.disk_consistent_lsn.0);
84 :
85 3 : nkeys += 1;
86 3 : buf.put_slice(b"ps_applylsn\0");
87 3 : buf.put_i32(8);
88 3 : buf.put_u64(self.remote_consistent_lsn.0);
89 :
90 3 : let timestamp = self
91 3 : .replytime
92 3 : .duration_since(*PG_EPOCH)
93 3 : .expect("failed to serialize pg_replytime earlier than PG_EPOCH")
94 3 : .as_micros() as i64;
95 :
96 3 : nkeys += 1;
97 3 : buf.put_slice(b"ps_replytime\0");
98 3 : buf.put_i32(8);
99 3 : buf.put_i64(timestamp);
100 :
101 3 : if self.shard_number > 0 {
102 1 : nkeys += 1;
103 1 : buf.put_slice(b"shard_number\0");
104 1 : buf.put_i32(4);
105 1 : buf.put_u32(self.shard_number);
106 2 : }
107 :
108 3 : if self.corruption_detected {
109 1 : nkeys += 1;
110 1 : buf.put_slice(b"corruption_detected\0");
111 1 : buf.put_i32(1);
112 1 : buf.put_u8(1);
113 2 : }
114 :
115 3 : buf[buf_ptr] = nkeys;
116 3 : }
117 :
118 : // Deserialize PageserverFeedback message
119 : // TODO: change serialized fields names once all computes migrate to rename.
120 3 : pub fn parse(mut buf: Bytes) -> PageserverFeedback {
121 3 : let mut rf = PageserverFeedback::empty();
122 3 : let nfields = buf.get_u8();
123 3 : for _ in 0..nfields {
124 18 : let key = read_cstr(&mut buf).unwrap();
125 18 : match key.as_ref() {
126 18 : b"current_timeline_size" => {
127 3 : let len = buf.get_i32();
128 3 : assert_eq!(len, 8);
129 3 : rf.current_timeline_size = buf.get_u64();
130 : }
131 15 : b"ps_writelsn" => {
132 3 : let len = buf.get_i32();
133 3 : assert_eq!(len, 8);
134 3 : rf.last_received_lsn = Lsn(buf.get_u64());
135 : }
136 : b"ps_flushlsn" => {
137 3 : let len = buf.get_i32();
138 3 : assert_eq!(len, 8);
139 3 : rf.disk_consistent_lsn = Lsn(buf.get_u64());
140 : }
141 : b"ps_applylsn" => {
142 3 : let len = buf.get_i32();
143 3 : assert_eq!(len, 8);
144 3 : rf.remote_consistent_lsn = Lsn(buf.get_u64());
145 : }
146 6 : b"ps_replytime" => {
147 3 : let len = buf.get_i32();
148 3 : assert_eq!(len, 8);
149 3 : let raw_time = buf.get_i64();
150 3 : if raw_time > 0 {
151 3 : rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
152 3 : } else {
153 0 : rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
154 0 : }
155 : }
156 : b"shard_number" => {
157 1 : let len = buf.get_i32();
158 1 : assert_eq!(len, 4);
159 1 : rf.shard_number = buf.get_u32();
160 : }
161 2 : b"corruption_detected" => {
162 1 : let len = buf.get_i32();
163 1 : assert_eq!(len, 1);
164 1 : rf.corruption_detected = buf.get_u8() != 0;
165 : }
166 : _ => {
167 1 : let len = buf.get_i32();
168 1 : warn!(
169 0 : "PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
170 0 : String::from_utf8_lossy(key.as_ref())
171 : );
172 1 : buf.advance(len as usize);
173 : }
174 : }
175 : }
176 3 : trace!("PageserverFeedback parsed is {:?}", rf);
177 3 : rf
178 3 : }
179 : }
180 :
181 : mod serde_systemtime {
182 : use std::time::SystemTime;
183 :
184 : use chrono::{DateTime, Utc};
185 : use serde::{Deserialize, Deserializer, Serializer};
186 :
187 0 : pub fn serialize<S>(ts: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
188 0 : where
189 0 : S: Serializer,
190 : {
191 0 : let chrono_dt: DateTime<Utc> = (*ts).into();
192 0 : serializer.serialize_str(&chrono_dt.to_rfc3339())
193 0 : }
194 :
195 0 : pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
196 0 : where
197 0 : D: Deserializer<'de>,
198 : {
199 0 : let time: String = Deserialize::deserialize(deserializer)?;
200 0 : Ok(DateTime::parse_from_rfc3339(&time)
201 0 : .map_err(serde::de::Error::custom)?
202 0 : .into())
203 0 : }
204 : }
205 :
206 : #[cfg(test)]
207 : mod tests {
208 : use super::*;
209 :
210 : #[test]
211 1 : fn test_replication_feedback_serialization() {
212 1 : let mut rf = PageserverFeedback::empty();
213 : // Fill rf with some values
214 1 : rf.current_timeline_size = 12345678;
215 : // Set rounded time to be able to compare it with deserialized value,
216 : // because it is rounded up to microseconds during serialization.
217 1 : rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
218 1 : let mut data = BytesMut::new();
219 1 : rf.serialize(&mut data);
220 :
221 1 : let rf_parsed = PageserverFeedback::parse(data.freeze());
222 1 : assert_eq!(rf, rf_parsed);
223 1 : }
224 :
225 : // Test that databricks-specific fields added to the PageserverFeedback message are serialized
226 : // and deserialized correctly, in addition to the existing fields from upstream.
227 : #[test]
228 1 : fn test_replication_feedback_databricks_fields() {
229 1 : let mut rf = PageserverFeedback::empty();
230 1 : rf.current_timeline_size = 12345678;
231 1 : rf.last_received_lsn = Lsn(23456789);
232 1 : rf.disk_consistent_lsn = Lsn(34567890);
233 1 : rf.remote_consistent_lsn = Lsn(45678901);
234 1 : rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
235 1 : rf.shard_number = 1;
236 1 : rf.corruption_detected = true;
237 :
238 1 : let mut data = BytesMut::new();
239 1 : rf.serialize(&mut data);
240 :
241 1 : let rf_parsed = PageserverFeedback::parse(data.freeze());
242 1 : assert_eq!(rf, rf_parsed);
243 1 : }
244 :
245 : #[test]
246 1 : fn test_replication_feedback_unknown_key() {
247 1 : let mut rf = PageserverFeedback::empty();
248 : // Fill rf with some values
249 1 : rf.current_timeline_size = 12345678;
250 : // Set rounded time to be able to compare it with deserialized value,
251 : // because it is rounded up to microseconds during serialization.
252 1 : rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
253 1 : let mut data = BytesMut::new();
254 1 : rf.serialize(&mut data);
255 :
256 : // Add an extra field to the buffer and adjust number of keys
257 1 : data[0] += 1;
258 1 : data.put_slice(b"new_field_one\0");
259 1 : data.put_i32(8);
260 1 : data.put_u64(42);
261 :
262 : // Parse serialized data and check that new field is not parsed
263 1 : let rf_parsed = PageserverFeedback::parse(data.freeze());
264 1 : assert_eq!(rf, rf_parsed);
265 1 : }
266 : }
|