TLA Line data Source code
1 : use std::time::{Duration, SystemTime};
2 :
3 : use bytes::{Buf, BufMut, Bytes, BytesMut};
4 : use pq_proto::{read_cstr, PG_EPOCH};
5 : use serde::{Deserialize, Serialize};
6 : use tracing::{trace, warn};
7 :
8 : use crate::lsn::Lsn;
9 :
10 : /// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
11 : /// Serialized in custom flexible key/value format. In replication protocol, it
12 : /// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
13 : /// Standby status update / Hot standby feedback messages.
14 : ///
15 : /// serde Serialize is used only for human readable dump to json (e.g. in
16 : /// safekeepers debug_dump).
17 CBC 95 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18 : pub struct PageserverFeedback {
19 : /// Last known size of the timeline. Used to enforce timeline size limit.
20 : pub current_timeline_size: u64,
21 : /// LSN last received and ingested by the pageserver. Controls backpressure.
22 : pub last_received_lsn: Lsn,
23 : /// LSN up to which data is persisted by the pageserver to its local disc.
24 : /// Controls backpressure.
25 : pub disk_consistent_lsn: Lsn,
26 : /// LSN up to which data is persisted by the pageserver on s3; safekeepers
27 : /// consider WAL before it can be removed.
28 : pub remote_consistent_lsn: Lsn,
29 : // Serialize with RFC3339 format.
30 : #[serde(with = "serde_systemtime")]
31 : pub replytime: SystemTime,
32 : }
33 :
34 : // NOTE: Do not forget to increment this number when adding new fields to PageserverFeedback.
35 : // Do not remove previously available fields because this might be backwards incompatible.
36 : pub const PAGESERVER_FEEDBACK_FIELDS_NUMBER: u8 = 5;
37 :
38 : impl PageserverFeedback {
39 2427181 : pub fn empty() -> PageserverFeedback {
40 2427181 : PageserverFeedback {
41 2427181 : current_timeline_size: 0,
42 2427181 : last_received_lsn: Lsn::INVALID,
43 2427181 : remote_consistent_lsn: Lsn::INVALID,
44 2427181 : disk_consistent_lsn: Lsn::INVALID,
45 2427181 : replytime: *PG_EPOCH,
46 2427181 : }
47 2427181 : }
48 :
49 : // Serialize PageserverFeedback using custom format
50 : // to support protocol extensibility.
51 : //
52 : // Following layout is used:
53 : // char - number of key-value pairs that follow.
54 : //
55 : // key-value pairs:
56 : // null-terminated string - key,
57 : // uint32 - value length in bytes
58 : // value itself
59 : //
60 : // TODO: change serialized fields names once all computes migrate to rename.
61 1860100 : pub fn serialize(&self, buf: &mut BytesMut) {
62 1860100 : buf.put_u8(PAGESERVER_FEEDBACK_FIELDS_NUMBER); // # of keys
63 1860100 : buf.put_slice(b"current_timeline_size\0");
64 1860100 : buf.put_i32(8);
65 1860100 : buf.put_u64(self.current_timeline_size);
66 1860100 :
67 1860100 : buf.put_slice(b"ps_writelsn\0");
68 1860100 : buf.put_i32(8);
69 1860100 : buf.put_u64(self.last_received_lsn.0);
70 1860100 : buf.put_slice(b"ps_flushlsn\0");
71 1860100 : buf.put_i32(8);
72 1860100 : buf.put_u64(self.disk_consistent_lsn.0);
73 1860100 : buf.put_slice(b"ps_applylsn\0");
74 1860100 : buf.put_i32(8);
75 1860100 : buf.put_u64(self.remote_consistent_lsn.0);
76 1860100 :
77 1860100 : let timestamp = self
78 1860100 : .replytime
79 1860100 : .duration_since(*PG_EPOCH)
80 1860100 : .expect("failed to serialize pg_replytime earlier than PG_EPOCH")
81 1860100 : .as_micros() as i64;
82 1860100 :
83 1860100 : buf.put_slice(b"ps_replytime\0");
84 1860100 : buf.put_i32(8);
85 1860100 : buf.put_i64(timestamp);
86 1860100 : }
87 :
88 : // Deserialize PageserverFeedback message
89 : // TODO: change serialized fields names once all computes migrate to rename.
90 565813 : pub fn parse(mut buf: Bytes) -> PageserverFeedback {
91 565813 : let mut rf = PageserverFeedback::empty();
92 565813 : let nfields = buf.get_u8();
93 565813 : for _ in 0..nfields {
94 2829066 : let key = read_cstr(&mut buf).unwrap();
95 2829066 : match key.as_ref() {
96 2829066 : b"current_timeline_size" => {
97 565813 : let len = buf.get_i32();
98 565813 : assert_eq!(len, 8);
99 565813 : rf.current_timeline_size = buf.get_u64();
100 : }
101 2263253 : b"ps_writelsn" => {
102 565813 : let len = buf.get_i32();
103 565813 : assert_eq!(len, 8);
104 565813 : rf.last_received_lsn = Lsn(buf.get_u64());
105 : }
106 : b"ps_flushlsn" => {
107 565813 : let len = buf.get_i32();
108 565813 : assert_eq!(len, 8);
109 565813 : rf.disk_consistent_lsn = Lsn(buf.get_u64());
110 : }
111 : b"ps_applylsn" => {
112 565813 : let len = buf.get_i32();
113 565813 : assert_eq!(len, 8);
114 565813 : rf.remote_consistent_lsn = Lsn(buf.get_u64());
115 : }
116 565814 : b"ps_replytime" => {
117 565813 : let len = buf.get_i32();
118 565813 : assert_eq!(len, 8);
119 565813 : let raw_time = buf.get_i64();
120 565813 : if raw_time > 0 {
121 565813 : rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
122 565813 : } else {
123 UBC 0 : rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
124 0 : }
125 : }
126 : _ => {
127 CBC 1 : let len = buf.get_i32();
128 1 : warn!(
129 UBC 0 : "PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
130 0 : String::from_utf8_lossy(key.as_ref())
131 0 : );
132 CBC 1 : buf.advance(len as usize);
133 : }
134 : }
135 : }
136 565813 : trace!("PageserverFeedback parsed is {:?}", rf);
137 565813 : rf
138 565813 : }
139 : }
140 :
141 : mod serde_systemtime {
142 : use std::time::SystemTime;
143 :
144 : use chrono::{DateTime, Utc};
145 : use serde::{Deserialize, Deserializer, Serializer};
146 :
147 95 : pub fn serialize<S>(ts: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
148 95 : where
149 95 : S: Serializer,
150 95 : {
151 95 : let chrono_dt: DateTime<Utc> = (*ts).into();
152 95 : serializer.serialize_str(&chrono_dt.to_rfc3339())
153 95 : }
154 :
155 3 : pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
156 3 : where
157 3 : D: Deserializer<'de>,
158 3 : {
159 3 : let time: String = Deserialize::deserialize(deserializer)?;
160 3 : Ok(DateTime::parse_from_rfc3339(&time)
161 3 : .map_err(serde::de::Error::custom)?
162 3 : .into())
163 3 : }
164 : }
165 :
166 : #[cfg(test)]
167 : mod tests {
168 : use super::*;
169 :
170 1 : #[test]
171 1 : fn test_replication_feedback_serialization() {
172 1 : let mut rf = PageserverFeedback::empty();
173 1 : // Fill rf with some values
174 1 : rf.current_timeline_size = 12345678;
175 1 : // Set rounded time to be able to compare it with deserialized value,
176 1 : // because it is rounded up to microseconds during serialization.
177 1 : rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
178 1 : let mut data = BytesMut::new();
179 1 : rf.serialize(&mut data);
180 1 :
181 1 : let rf_parsed = PageserverFeedback::parse(data.freeze());
182 1 : assert_eq!(rf, rf_parsed);
183 1 : }
184 :
185 1 : #[test]
186 1 : fn test_replication_feedback_unknown_key() {
187 1 : let mut rf = PageserverFeedback::empty();
188 1 : // Fill rf with some values
189 1 : rf.current_timeline_size = 12345678;
190 1 : // Set rounded time to be able to compare it with deserialized value,
191 1 : // because it is rounded up to microseconds during serialization.
192 1 : rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
193 1 : let mut data = BytesMut::new();
194 1 : rf.serialize(&mut data);
195 :
196 : // Add an extra field to the buffer and adjust number of keys
197 1 : if let Some(first) = data.first_mut() {
198 1 : *first = PAGESERVER_FEEDBACK_FIELDS_NUMBER + 1;
199 1 : }
200 :
201 1 : data.put_slice(b"new_field_one\0");
202 1 : data.put_i32(8);
203 1 : data.put_u64(42);
204 1 :
205 1 : // Parse serialized data and check that new field is not parsed
206 1 : let rf_parsed = PageserverFeedback::parse(data.freeze());
207 1 : assert_eq!(rf, rf_parsed);
208 1 : }
209 : }
|