Line data Source code
1 : use std::time::{Duration, SystemTime};
2 :
3 : use bytes::{Buf, BufMut, Bytes, BytesMut};
4 : use pq_proto::{read_cstr, PG_EPOCH};
5 : use serde::{Deserialize, Serialize};
6 : use tracing::{trace, warn};
7 :
8 : use crate::lsn::Lsn;
9 :
10 : /// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
11 : ///
12 : /// Serialized in custom flexible key/value format. In replication protocol, it
13 : /// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
14 : /// Standby status update / Hot standby feedback messages.
15 : ///
16 : /// serde Serialize is used only for human readable dump to json (e.g. in
17 : /// safekeepers debug_dump).
18 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19 : pub struct PageserverFeedback {
20 : /// Last known size of the timeline. Used to enforce timeline size limit.
21 : pub current_timeline_size: u64,
22 : /// LSN last received and ingested by the pageserver. Controls backpressure.
23 : pub last_received_lsn: Lsn,
24 : /// LSN up to which data is persisted by the pageserver to its local disc.
25 : /// Controls backpressure.
26 : pub disk_consistent_lsn: Lsn,
27 : /// LSN up to which data is persisted by the pageserver on s3; safekeepers
28 : /// consider WAL before it can be removed.
29 : pub remote_consistent_lsn: Lsn,
30 : // Serialize with RFC3339 format.
31 : #[serde(with = "serde_systemtime")]
32 : pub replytime: SystemTime,
33 : /// Used to track feedbacks from different shards. Always zero for unsharded tenants.
34 : pub shard_number: u32,
35 : }
36 :
37 : impl PageserverFeedback {
38 6 : pub fn empty() -> PageserverFeedback {
39 6 : PageserverFeedback {
40 6 : current_timeline_size: 0,
41 6 : last_received_lsn: Lsn::INVALID,
42 6 : remote_consistent_lsn: Lsn::INVALID,
43 6 : disk_consistent_lsn: Lsn::INVALID,
44 6 : replytime: *PG_EPOCH,
45 6 : shard_number: 0,
46 6 : }
47 6 : }
48 :
49 : // Serialize PageserverFeedback using custom format
50 : // to support protocol extensibility.
51 : //
52 : // Following layout is used:
53 : // char - number of key-value pairs that follow.
54 : //
55 : // key-value pairs:
56 : // null-terminated string - key,
57 : // uint32 - value length in bytes
58 : // value itself
59 : //
60 : // TODO: change serialized fields names once all computes migrate to rename.
61 2 : pub fn serialize(&self, buf: &mut BytesMut) {
62 2 : let buf_ptr = buf.len();
63 2 : buf.put_u8(0); // # of keys, will be filled later
64 2 : let mut nkeys = 0;
65 2 :
66 2 : nkeys += 1;
67 2 : buf.put_slice(b"current_timeline_size\0");
68 2 : buf.put_i32(8);
69 2 : buf.put_u64(self.current_timeline_size);
70 2 :
71 2 : nkeys += 1;
72 2 : buf.put_slice(b"ps_writelsn\0");
73 2 : buf.put_i32(8);
74 2 : buf.put_u64(self.last_received_lsn.0);
75 2 :
76 2 : nkeys += 1;
77 2 : buf.put_slice(b"ps_flushlsn\0");
78 2 : buf.put_i32(8);
79 2 : buf.put_u64(self.disk_consistent_lsn.0);
80 2 :
81 2 : nkeys += 1;
82 2 : buf.put_slice(b"ps_applylsn\0");
83 2 : buf.put_i32(8);
84 2 : buf.put_u64(self.remote_consistent_lsn.0);
85 2 :
86 2 : let timestamp = self
87 2 : .replytime
88 2 : .duration_since(*PG_EPOCH)
89 2 : .expect("failed to serialize pg_replytime earlier than PG_EPOCH")
90 2 : .as_micros() as i64;
91 2 :
92 2 : nkeys += 1;
93 2 : buf.put_slice(b"ps_replytime\0");
94 2 : buf.put_i32(8);
95 2 : buf.put_i64(timestamp);
96 2 :
97 2 : if self.shard_number > 0 {
98 0 : nkeys += 1;
99 0 : buf.put_slice(b"shard_number\0");
100 0 : buf.put_i32(4);
101 0 : buf.put_u32(self.shard_number);
102 2 : }
103 :
104 2 : buf[buf_ptr] = nkeys;
105 2 : }
106 :
107 : // Deserialize PageserverFeedback message
108 : // TODO: change serialized fields names once all computes migrate to rename.
109 2 : pub fn parse(mut buf: Bytes) -> PageserverFeedback {
110 2 : let mut rf = PageserverFeedback::empty();
111 2 : let nfields = buf.get_u8();
112 2 : for _ in 0..nfields {
113 11 : let key = read_cstr(&mut buf).unwrap();
114 11 : match key.as_ref() {
115 11 : b"current_timeline_size" => {
116 2 : let len = buf.get_i32();
117 2 : assert_eq!(len, 8);
118 2 : rf.current_timeline_size = buf.get_u64();
119 : }
120 9 : b"ps_writelsn" => {
121 2 : let len = buf.get_i32();
122 2 : assert_eq!(len, 8);
123 2 : rf.last_received_lsn = Lsn(buf.get_u64());
124 : }
125 : b"ps_flushlsn" => {
126 2 : let len = buf.get_i32();
127 2 : assert_eq!(len, 8);
128 2 : rf.disk_consistent_lsn = Lsn(buf.get_u64());
129 : }
130 : b"ps_applylsn" => {
131 2 : let len = buf.get_i32();
132 2 : assert_eq!(len, 8);
133 2 : rf.remote_consistent_lsn = Lsn(buf.get_u64());
134 : }
135 3 : b"ps_replytime" => {
136 2 : let len = buf.get_i32();
137 2 : assert_eq!(len, 8);
138 2 : let raw_time = buf.get_i64();
139 2 : if raw_time > 0 {
140 2 : rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
141 2 : } else {
142 0 : rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
143 0 : }
144 : }
145 : b"shard_number" => {
146 0 : let len = buf.get_i32();
147 0 : assert_eq!(len, 4);
148 0 : rf.shard_number = buf.get_u32();
149 : }
150 : _ => {
151 1 : let len = buf.get_i32();
152 1 : warn!(
153 0 : "PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
154 0 : String::from_utf8_lossy(key.as_ref())
155 : );
156 1 : buf.advance(len as usize);
157 : }
158 : }
159 : }
160 2 : trace!("PageserverFeedback parsed is {:?}", rf);
161 2 : rf
162 2 : }
163 : }
164 :
165 : mod serde_systemtime {
166 : use std::time::SystemTime;
167 :
168 : use chrono::{DateTime, Utc};
169 : use serde::{Deserialize, Deserializer, Serializer};
170 :
171 0 : pub fn serialize<S>(ts: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
172 0 : where
173 0 : S: Serializer,
174 0 : {
175 0 : let chrono_dt: DateTime<Utc> = (*ts).into();
176 0 : serializer.serialize_str(&chrono_dt.to_rfc3339())
177 0 : }
178 :
179 0 : pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
180 0 : where
181 0 : D: Deserializer<'de>,
182 0 : {
183 0 : let time: String = Deserialize::deserialize(deserializer)?;
184 0 : Ok(DateTime::parse_from_rfc3339(&time)
185 0 : .map_err(serde::de::Error::custom)?
186 0 : .into())
187 0 : }
188 : }
189 :
190 : #[cfg(test)]
191 : mod tests {
192 : use super::*;
193 :
194 : #[test]
195 1 : fn test_replication_feedback_serialization() {
196 1 : let mut rf = PageserverFeedback::empty();
197 1 : // Fill rf with some values
198 1 : rf.current_timeline_size = 12345678;
199 1 : // Set rounded time to be able to compare it with deserialized value,
200 1 : // because it is rounded up to microseconds during serialization.
201 1 : rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
202 1 : let mut data = BytesMut::new();
203 1 : rf.serialize(&mut data);
204 1 :
205 1 : let rf_parsed = PageserverFeedback::parse(data.freeze());
206 1 : assert_eq!(rf, rf_parsed);
207 1 : }
208 :
209 : #[test]
210 1 : fn test_replication_feedback_unknown_key() {
211 1 : let mut rf = PageserverFeedback::empty();
212 1 : // Fill rf with some values
213 1 : rf.current_timeline_size = 12345678;
214 1 : // Set rounded time to be able to compare it with deserialized value,
215 1 : // because it is rounded up to microseconds during serialization.
216 1 : rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
217 1 : let mut data = BytesMut::new();
218 1 : rf.serialize(&mut data);
219 1 :
220 1 : // Add an extra field to the buffer and adjust number of keys
221 1 : data[0] += 1;
222 1 : data.put_slice(b"new_field_one\0");
223 1 : data.put_i32(8);
224 1 : data.put_u64(42);
225 1 :
226 1 : // Parse serialized data and check that new field is not parsed
227 1 : let rf_parsed = PageserverFeedback::parse(data.freeze());
228 1 : assert_eq!(rf, rf_parsed);
229 1 : }
230 : }
|