Line data Source code
1 : use std::time::{Duration, SystemTime};
2 :
3 : use bytes::{Buf, BufMut, Bytes, BytesMut};
4 : use pq_proto::{read_cstr, PG_EPOCH};
5 : use serde::{Deserialize, Serialize};
6 : use tracing::{trace, warn};
7 :
8 : use crate::lsn::Lsn;
9 :
10 : /// Feedback pageserver sends to safekeeper and safekeeper resends to compute.
11 : /// Serialized in custom flexible key/value format. In replication protocol, it
12 : /// is marked with NEON_STATUS_UPDATE_TAG_BYTE to differentiate from postgres
13 : /// Standby status update / Hot standby feedback messages.
14 : ///
15 : /// serde Serialize is used only for human readable dump to json (e.g. in
16 : /// safekeepers debug_dump).
17 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18 : pub struct PageserverFeedback {
19 : /// Last known size of the timeline. Used to enforce timeline size limit.
20 : pub current_timeline_size: u64,
21 : /// LSN last received and ingested by the pageserver. Controls backpressure.
22 : pub last_received_lsn: Lsn,
23 : /// LSN up to which data is persisted by the pageserver to its local disc.
24 : /// Controls backpressure.
25 : pub disk_consistent_lsn: Lsn,
26 : /// LSN up to which data is persisted by the pageserver on s3; safekeepers
27 : /// consider WAL before it can be removed.
28 : pub remote_consistent_lsn: Lsn,
29 : // Serialize with RFC3339 format.
30 : #[serde(with = "serde_systemtime")]
31 : pub replytime: SystemTime,
32 : /// Used to track feedbacks from different shards. Always zero for unsharded tenants.
33 : pub shard_number: u32,
34 : }
35 :
36 : impl PageserverFeedback {
37 12 : pub fn empty() -> PageserverFeedback {
38 12 : PageserverFeedback {
39 12 : current_timeline_size: 0,
40 12 : last_received_lsn: Lsn::INVALID,
41 12 : remote_consistent_lsn: Lsn::INVALID,
42 12 : disk_consistent_lsn: Lsn::INVALID,
43 12 : replytime: *PG_EPOCH,
44 12 : shard_number: 0,
45 12 : }
46 12 : }
47 :
48 : // Serialize PageserverFeedback using custom format
49 : // to support protocol extensibility.
50 : //
51 : // Following layout is used:
52 : // char - number of key-value pairs that follow.
53 : //
54 : // key-value pairs:
55 : // null-terminated string - key,
56 : // uint32 - value length in bytes
57 : // value itself
58 : //
59 : // TODO: change serialized fields names once all computes migrate to rename.
60 4 : pub fn serialize(&self, buf: &mut BytesMut) {
61 4 : let buf_ptr = buf.len();
62 4 : buf.put_u8(0); // # of keys, will be filled later
63 4 : let mut nkeys = 0;
64 4 :
65 4 : nkeys += 1;
66 4 : buf.put_slice(b"current_timeline_size\0");
67 4 : buf.put_i32(8);
68 4 : buf.put_u64(self.current_timeline_size);
69 4 :
70 4 : nkeys += 1;
71 4 : buf.put_slice(b"ps_writelsn\0");
72 4 : buf.put_i32(8);
73 4 : buf.put_u64(self.last_received_lsn.0);
74 4 :
75 4 : nkeys += 1;
76 4 : buf.put_slice(b"ps_flushlsn\0");
77 4 : buf.put_i32(8);
78 4 : buf.put_u64(self.disk_consistent_lsn.0);
79 4 :
80 4 : nkeys += 1;
81 4 : buf.put_slice(b"ps_applylsn\0");
82 4 : buf.put_i32(8);
83 4 : buf.put_u64(self.remote_consistent_lsn.0);
84 4 :
85 4 : let timestamp = self
86 4 : .replytime
87 4 : .duration_since(*PG_EPOCH)
88 4 : .expect("failed to serialize pg_replytime earlier than PG_EPOCH")
89 4 : .as_micros() as i64;
90 4 :
91 4 : nkeys += 1;
92 4 : buf.put_slice(b"ps_replytime\0");
93 4 : buf.put_i32(8);
94 4 : buf.put_i64(timestamp);
95 4 :
96 4 : if self.shard_number > 0 {
97 0 : nkeys += 1;
98 0 : buf.put_slice(b"shard_number\0");
99 0 : buf.put_i32(4);
100 0 : buf.put_u32(self.shard_number);
101 4 : }
102 :
103 4 : buf[buf_ptr] = nkeys;
104 4 : }
105 :
106 : // Deserialize PageserverFeedback message
107 : // TODO: change serialized fields names once all computes migrate to rename.
108 4 : pub fn parse(mut buf: Bytes) -> PageserverFeedback {
109 4 : let mut rf = PageserverFeedback::empty();
110 4 : let nfields = buf.get_u8();
111 4 : for _ in 0..nfields {
112 22 : let key = read_cstr(&mut buf).unwrap();
113 22 : match key.as_ref() {
114 22 : b"current_timeline_size" => {
115 4 : let len = buf.get_i32();
116 4 : assert_eq!(len, 8);
117 4 : rf.current_timeline_size = buf.get_u64();
118 : }
119 18 : b"ps_writelsn" => {
120 4 : let len = buf.get_i32();
121 4 : assert_eq!(len, 8);
122 4 : rf.last_received_lsn = Lsn(buf.get_u64());
123 : }
124 : b"ps_flushlsn" => {
125 4 : let len = buf.get_i32();
126 4 : assert_eq!(len, 8);
127 4 : rf.disk_consistent_lsn = Lsn(buf.get_u64());
128 : }
129 : b"ps_applylsn" => {
130 4 : let len = buf.get_i32();
131 4 : assert_eq!(len, 8);
132 4 : rf.remote_consistent_lsn = Lsn(buf.get_u64());
133 : }
134 6 : b"ps_replytime" => {
135 4 : let len = buf.get_i32();
136 4 : assert_eq!(len, 8);
137 4 : let raw_time = buf.get_i64();
138 4 : if raw_time > 0 {
139 4 : rf.replytime = *PG_EPOCH + Duration::from_micros(raw_time as u64);
140 4 : } else {
141 0 : rf.replytime = *PG_EPOCH - Duration::from_micros(-raw_time as u64);
142 0 : }
143 : }
144 : b"shard_number" => {
145 0 : let len = buf.get_i32();
146 0 : assert_eq!(len, 4);
147 0 : rf.shard_number = buf.get_u32();
148 : }
149 : _ => {
150 2 : let len = buf.get_i32();
151 2 : warn!(
152 0 : "PageserverFeedback parse. unknown key {} of len {len}. Skip it.",
153 0 : String::from_utf8_lossy(key.as_ref())
154 : );
155 2 : buf.advance(len as usize);
156 : }
157 : }
158 : }
159 4 : trace!("PageserverFeedback parsed is {:?}", rf);
160 4 : rf
161 4 : }
162 : }
163 :
164 : mod serde_systemtime {
165 : use std::time::SystemTime;
166 :
167 : use chrono::{DateTime, Utc};
168 : use serde::{Deserialize, Deserializer, Serializer};
169 :
170 0 : pub fn serialize<S>(ts: &SystemTime, serializer: S) -> Result<S::Ok, S::Error>
171 0 : where
172 0 : S: Serializer,
173 0 : {
174 0 : let chrono_dt: DateTime<Utc> = (*ts).into();
175 0 : serializer.serialize_str(&chrono_dt.to_rfc3339())
176 0 : }
177 :
178 0 : pub fn deserialize<'de, D>(deserializer: D) -> Result<SystemTime, D::Error>
179 0 : where
180 0 : D: Deserializer<'de>,
181 0 : {
182 0 : let time: String = Deserialize::deserialize(deserializer)?;
183 0 : Ok(DateTime::parse_from_rfc3339(&time)
184 0 : .map_err(serde::de::Error::custom)?
185 0 : .into())
186 0 : }
187 : }
188 :
189 : #[cfg(test)]
190 : mod tests {
191 : use super::*;
192 :
193 : #[test]
194 2 : fn test_replication_feedback_serialization() {
195 2 : let mut rf = PageserverFeedback::empty();
196 2 : // Fill rf with some values
197 2 : rf.current_timeline_size = 12345678;
198 2 : // Set rounded time to be able to compare it with deserialized value,
199 2 : // because it is rounded up to microseconds during serialization.
200 2 : rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
201 2 : let mut data = BytesMut::new();
202 2 : rf.serialize(&mut data);
203 2 :
204 2 : let rf_parsed = PageserverFeedback::parse(data.freeze());
205 2 : assert_eq!(rf, rf_parsed);
206 2 : }
207 :
208 : #[test]
209 2 : fn test_replication_feedback_unknown_key() {
210 2 : let mut rf = PageserverFeedback::empty();
211 2 : // Fill rf with some values
212 2 : rf.current_timeline_size = 12345678;
213 2 : // Set rounded time to be able to compare it with deserialized value,
214 2 : // because it is rounded up to microseconds during serialization.
215 2 : rf.replytime = *PG_EPOCH + Duration::from_secs(100_000_000);
216 2 : let mut data = BytesMut::new();
217 2 : rf.serialize(&mut data);
218 2 :
219 2 : // Add an extra field to the buffer and adjust number of keys
220 2 : data[0] += 1;
221 2 : data.put_slice(b"new_field_one\0");
222 2 : data.put_i32(8);
223 2 : data.put_u64(42);
224 2 :
225 2 : // Parse serialized data and check that new field is not parsed
226 2 : let rf_parsed = PageserverFeedback::parse(data.freeze());
227 2 : assert_eq!(rf, rf_parsed);
228 2 : }
229 : }
|