TLA Line data Source code
1 : use std::time::{Duration, SystemTime};
2 :
3 : use bytes::{Buf, BufMut, Bytes, BytesMut};
4 : use pq_proto::{read_cstr, PG_EPOCH};
5 : use serde::{Deserialize, Serialize};
6 : use serde_with::{serde_as, DisplayFromStr};
7 : use tracing::{trace, warn};
8 :
9 : use crate::lsn::Lsn;
10 :
11 : /// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
12 : /// Serialized in custom flexible key/value format. In replication protocol, it
13 : /// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
14 : /// Standby status update / Hot standby feedback messages.
15 : ///
16 : /// serde Serialize is used only for human readable dump to json (e.g. in
17 : /// safekeepers debug_dump).
18 : #[serde_as]
19 CBC 97 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20 : pub struct PageserverFeedback {
21 : /// Last known size of the timeline. Used to enforce timeline size limit.
22 : pub current_timeline_size: u64,
23 : /// LSN last received and ingested by the pageserver. Controls backpressure.
24 : #[serde_as(as = "DisplayFromStr")]
25 : pub last_received_lsn: Lsn,
26 : /// LSN up to which data is persisted by the pageserver to its local disc.
27 : /// Controls backpressure.
28 : #[serde_as(as = "DisplayFromStr")]
29 : pub disk_consistent_lsn: Lsn,
30 : /// LSN up to which data is persisted by the pageserver on s3; safekeepers
31 : /// consider WAL before it can be removed.
32 : #[serde_as(as = "DisplayFromStr")]
33 : pub remote_consistent_lsn: Lsn,
34 : // Serialize with RFC3339 format.
35 : #[serde(with = "serde_systemtime")]
36 : pub replytime: SystemTime,
37 : }
38 :
39 : // NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback.
40 : // Do not remove previously available fields because this might be backwards incompatible.
41 : pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5;
42 :
43 : impl PageserverFeedback {
44 4020002 : pub fn empty() -> PageserverFeedback {
45 4020002 : PageserverFeedback {
46 4020002 : current_timeline_size: 0,
47 4020002 : last_received_lsn: Lsn::INVALID,
48 4020002 : remote_consistent_lsn: Lsn::INVALID,
49 4020002 : disk_consistent_lsn: Lsn::INVALID,
50 4020002 : replytime: *PG_EPOCH,
51 4020002 : }
52 4020002 : }
53 :
54 : // Serialize PageserverFeedback using custom format
55 : // to support protocol extensibility.
56 : //
57 : // Following layout is used:
58 : // char - number of key-value pairs that follow.
59 : //
60 : // key-value pairs:
61 : // null-terminated string - key,
62 : // uint32 - value length in bytes
63 : // value itself
64 : //
65 : // TODO: change serialized fields names once all computes migrate to rename.
66 3236663 : pub fn serialize(&self, buf: &mut BytesMut) {
67 3236663 : buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys
68 3236663 : buf.put_slice(b"current_timeline_size\0");
69 3236663 : buf.put_i32(8);
70 3236663 : buf.put_u64(self.current_timeline_size);
71 3236663 :
72 3236663 : buf.put_slice(b"ps_writelsn\0");
73 3236663 : buf.put_i32(8);
74 3236663 : buf.put_u64(self.last_received_lsn.0);
75 3236663 : buf.put_slice(b"ps_flushlsn\0");
76 3236663 : buf.put_i32(8);
77 3236663 : buf.put_u64(self.disk_consistent_lsn.0);
78 3236663 : buf.put_slice(b"ps_applylsn\0");
79 3236663 : buf.put_i32(8);
80 3236663 : buf.put_u64(self.remote_consistent_lsn.0);
81 3236663 :
82 3236663 : let timestamp = self
83 3236663 : .replytime
84 3236663 : .duration_since(*PG_EPOCH)
85 3236663 : .expect("failed to serialize pg_replytime earlier than PG_EPOCH")
86 3236663 : .as_micros() as i64;
87 3236663 :
88 3236663 : buf.put_slice(b"ps_replytime\0");
89 3236663 : buf.put_i32(8);
90 3236663 : buf.put_i64(timestamp);
91 3236663 : }
92 :
93 : // Deserialize PageserverFeedback message
94 : // TODO: change serialized fields names once all computes migrate to rename.
95 779443 : pub fn parse(mut buf: Bytes) -> PageserverFeedback {
96 779443 : let mut rf = PageserverFeedback::empty();
97 779443 : let nfields = buf.get_u8();
98 779443 : for _ in 0..nfields {
99 3897216 : let key = read_cstr(&mut buf).unwrap();
100 3897216 : match key.as_ref() {
101 3897216 : b"current_timeline_size" => {
102 779443 : let len = buf.get_i32();
103 779443 : assert_eq!(len, 8);
104 779443 : rf.current_timeline_size = buf.get_u64();
105 : }
106 3117773 : b"ps_writelsn" => {
107 779443 : let len = buf.get_i32();
108 779443 : assert_eq!(len, 8);
109 779443 : rf.last_received_lsn = Lsn(buf.get_u64());
110 : }
111 : b"ps_flushlsn" => {
112 779443 : let len = buf.get_i32();
113 779443 : assert_eq!(len, 8);
114 779443 : rf.disk_consistent_lsn = Lsn(buf.get_u64());
115 : }
116 : b"ps_applylsn" => {
117 779443 : let len = buf.get_i32();
118 779443 : assert_eq!(len, 8);
119 779443 : rf.remote_consistent_lsn = Lsn(buf.get_u64());
120 : }
121 779444 : b"ps_replytime" => {
122 779443 : let len = buf.get_i32();
123 779443 : assert_eq!(len, 8);
124 779443 : let raw_time = buf.get_i64();
125 779443 : if raw_time > 0 {
126 779443 : rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
127 779443 : } else {
128 UBC 0 : rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
129 0 : }
130 : }
131 : _ => {
132 CBC 1 : let len = buf.get_i32();
133 1 : warn!(
134 UBC 0 : "PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
135 0 : String::from_utf8_lossy(key.as_ref())
136 0 : );
137 CBC 1 : buf.advance(len as usize);
138 : }
139 : }
140 : }
141 779443 : trace!("PageserverFeedback parsed is {:?}", rf);
142 779443 : rf
143 779443 : }
144 : }
145 :
146 : mod serde_systemtime {
147 : use std::time::SystemTime;
148 :
149 : use chrono::{DateTime, Utc};
150 : use serde::{Deserialize, Deserializer, Serializer};
151 :
152 97 : pub fn serialize<S>(ts: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
153 97 : where
154 97 : S: Serializer,
155 97 : {
156 97 : let chrono_dt: DateTime<Utc> = (*ts).into();
157 97 : serializer.serialize_str(&chrono_dt.to_rfc3339())
158 97 : }
159 :
160 1 : pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
161 1 : where
162 1 : D: Deserializer<'de>,
163 1 : {
164 1 : let time: String = Deserialize::deserialize(deserializer)?;
165 1 : Ok(DateTime::parse_from_rfc3339(&time)
166 1 : .map_err(serde::de::Error::custom)?
167 1 : .into())
168 1 : }
169 : }
170 :
171 : #[cfg(test)]
172 : mod tests {
173 : use super::*;
174 :
175 1 : #[test]
176 1 : fn test_replication_feedback_serialization() {
177 1 : let mut rf = PageserverFeedback::empty();
178 1 : // Fill rf with some values
179 1 : rf.current_timeline_size = 12345678;
180 1 : // Set rounded time to be able to compare it with deserialized value,
181 1 : // because it is rounded up to microseconds during serialization.
182 1 : rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
183 1 : let mut data = BytesMut::new();
184 1 : rf.serialize(&mut data);
185 1 :
186 1 : let rf_parsed = PageserverFeedback::parse(data.freeze());
187 1 : assert_eq!(rf, rf_parsed);
188 1 : }
189 :
190 1 : #[test]
191 1 : fn test_replication_feedback_unknown_key() {
192 1 : let mut rf = PageserverFeedback::empty();
193 1 : // Fill rf with some values
194 1 : rf.current_timeline_size = 12345678;
195 1 : // Set rounded time to be able to compare it with deserialized value,
196 1 : // because it is rounded up to microseconds during serialization.
197 1 : rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
198 1 : let mut data = BytesMut::new();
199 1 : rf.serialize(&mut data);
200 :
201 : // Add an extra field to the buffer and adjust number of keys
202 1 : if let Some(first) = data.first_mut() {
203 1 : *first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1;
204 1 : }
205 :
206 1 : data.put_slice(b"new_field_one\0");
207 1 : data.put_i32(8);
208 1 : data.put_u64(42);
209 1 :
210 1 : // Parse serialized data and check that new field is not parsed
211 1 : let rf_parsed = PageserverFeedback::parse(data.freeze());
212 1 : assert_eq!(rf, rf_parsed);
213 1 : }
214 : }
|