Line data Source code
1 : //! Types used in safekeeper http API. Many of them are also reused internally.
2 :
3 : use pageserver_api::shard::ShardIdentity;
4 : use postgres_ffi::TimestampTz;
5 : use serde::{Deserialize, Serialize};
6 : use std::net::SocketAddr;
7 : use tokio::time::Instant;
8 :
9 : use utils::{
10 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
11 : lsn::Lsn,
12 : pageserver_feedback::PageserverFeedback,
13 : };
14 :
15 : use crate::{membership::Configuration, ServerInfo, Term};
16 :
17 : #[derive(Debug, Serialize)]
18 : pub struct SafekeeperStatus {
19 : pub id: NodeId,
20 : }
21 :
22 0 : #[derive(Serialize, Deserialize)]
23 : pub struct TimelineCreateRequest {
24 : pub tenant_id: TenantId,
25 : pub timeline_id: TimelineId,
26 : pub mconf: Configuration,
27 : pub pg_version: u32,
28 : pub system_id: Option<u64>,
29 : // By default WAL_SEGMENT_SIZE
30 : pub wal_seg_size: Option<u32>,
31 : pub start_lsn: Lsn,
32 : // Normal creation should omit this field (start_lsn initializes all LSNs).
33 : // However, we allow specifying custom value higher than start_lsn for
34 : // manual recovery case, see test_s3_wal_replay.
35 : pub commit_lsn: Option<Lsn>,
36 : }
37 :
38 : /// Same as TermLsn, but serializes LSN using display serializer
39 : /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
40 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
41 : pub struct TermSwitchApiEntry {
42 : pub term: Term,
43 : pub lsn: Lsn,
44 : }
45 :
46 : /// Augment AcceptorState with last_log_term for convenience
47 0 : #[derive(Debug, Serialize, Deserialize)]
48 : pub struct AcceptorStateStatus {
49 : pub term: Term,
50 : pub epoch: Term, // aka last_log_term, old `epoch` name is left for compatibility
51 : pub term_history: Vec<TermSwitchApiEntry>,
52 : }
53 :
54 : /// Things safekeeper should know about timeline state on peers.
55 : /// Used as both model and internally.
56 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
57 : pub struct PeerInfo {
58 : pub sk_id: NodeId,
59 : pub term: Term,
60 : /// Term of the last entry.
61 : pub last_log_term: Term,
62 : /// LSN of the last record.
63 : pub flush_lsn: Lsn,
64 : pub commit_lsn: Lsn,
65 : /// Since which LSN safekeeper has WAL.
66 : pub local_start_lsn: Lsn,
67 : /// When info was received. Serde annotations are not very useful but make
68 : /// the code compile -- we don't rely on this field externally.
69 : #[serde(skip)]
70 : #[serde(default = "Instant::now")]
71 : pub ts: Instant,
72 : pub pg_connstr: String,
73 : pub http_connstr: String,
74 : }
75 :
76 : pub type FullTransactionId = u64;
77 :
78 : /// Hot standby feedback received from replica
79 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
80 : pub struct HotStandbyFeedback {
81 : pub ts: TimestampTz,
82 : pub xmin: FullTransactionId,
83 : pub catalog_xmin: FullTransactionId,
84 : }
85 :
86 : pub const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
87 :
88 : impl HotStandbyFeedback {
89 2936 : pub fn empty() -> HotStandbyFeedback {
90 2936 : HotStandbyFeedback {
91 2936 : ts: 0,
92 2936 : xmin: 0,
93 2936 : catalog_xmin: 0,
94 2936 : }
95 2936 : }
96 : }
97 :
98 : /// Standby status update
99 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
100 : pub struct StandbyReply {
101 : pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
102 : pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
103 : pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
104 : pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
105 : pub reply_requested: bool,
106 : }
107 :
108 : impl StandbyReply {
109 11 : pub fn empty() -> Self {
110 11 : StandbyReply {
111 11 : write_lsn: Lsn::INVALID,
112 11 : flush_lsn: Lsn::INVALID,
113 11 : apply_lsn: Lsn::INVALID,
114 11 : reply_ts: 0,
115 11 : reply_requested: false,
116 11 : }
117 11 : }
118 : }
119 :
120 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
121 : pub struct StandbyFeedback {
122 : pub reply: StandbyReply,
123 : pub hs_feedback: HotStandbyFeedback,
124 : }
125 :
126 : impl StandbyFeedback {
127 5 : pub fn empty() -> Self {
128 5 : StandbyFeedback {
129 5 : reply: StandbyReply::empty(),
130 5 : hs_feedback: HotStandbyFeedback::empty(),
131 5 : }
132 5 : }
133 : }
134 :
135 : /// Receiver is either pageserver or regular standby, which have different
136 : /// feedbacks.
137 : /// Used as both model and internally.
138 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
139 : pub enum ReplicationFeedback {
140 : Pageserver(PageserverFeedback),
141 : Standby(StandbyFeedback),
142 : }
143 :
144 : /// Uniquely identifies a WAL service connection. Logged in spans for
145 : /// observability.
146 : pub type ConnectionId = u32;
147 :
148 : /// Serialize is used only for json'ing in API response. Also used internally.
149 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
150 : pub enum WalSenderState {
151 : Vanilla(VanillaWalSenderState),
152 : Interpreted(InterpretedWalSenderState),
153 : }
154 :
155 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
156 : pub struct VanillaWalSenderState {
157 : pub ttid: TenantTimelineId,
158 : pub addr: SocketAddr,
159 : pub conn_id: ConnectionId,
160 : // postgres application_name
161 : pub appname: Option<String>,
162 : pub feedback: ReplicationFeedback,
163 : }
164 :
165 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
166 : pub struct InterpretedWalSenderState {
167 : pub ttid: TenantTimelineId,
168 : pub shard: ShardIdentity,
169 : pub addr: SocketAddr,
170 : pub conn_id: ConnectionId,
171 : // postgres application_name
172 : pub appname: Option<String>,
173 : pub feedback: ReplicationFeedback,
174 : }
175 :
176 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
177 : pub struct WalReceiverState {
178 : /// None means it is recovery initiated by us (this safekeeper).
179 : pub conn_id: Option<ConnectionId>,
180 : pub status: WalReceiverStatus,
181 : }
182 :
183 : /// Walreceiver status. Currently only whether it passed voting stage and
184 : /// started receiving the stream, but it is easy to add more if needed.
185 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
186 : pub enum WalReceiverStatus {
187 : Voting,
188 : Streaming,
189 : }
190 :
191 : /// Info about timeline on safekeeper ready for reporting.
192 0 : #[derive(Debug, Serialize, Deserialize)]
193 : pub struct TimelineStatus {
194 : pub tenant_id: TenantId,
195 : pub timeline_id: TimelineId,
196 : pub mconf: Configuration,
197 : pub acceptor_state: AcceptorStateStatus,
198 : pub pg_info: ServerInfo,
199 : pub flush_lsn: Lsn,
200 : pub timeline_start_lsn: Lsn,
201 : pub local_start_lsn: Lsn,
202 : pub commit_lsn: Lsn,
203 : pub backup_lsn: Lsn,
204 : pub peer_horizon_lsn: Lsn,
205 : pub remote_consistent_lsn: Lsn,
206 : pub peers: Vec<PeerInfo>,
207 : pub walsenders: Vec<WalSenderState>,
208 : pub walreceivers: Vec<WalReceiverState>,
209 : }
210 :
211 : /// Request to switch membership configuration.
212 0 : #[derive(Serialize, Deserialize)]
213 : #[serde(transparent)]
214 : pub struct TimelineMembershipSwitchRequest {
215 : pub mconf: Configuration,
216 : }
217 :
218 : /// In response both previous and current configuration are sent.
219 0 : #[derive(Serialize, Deserialize)]
220 : pub struct TimelineMembershipSwitchResponse {
221 : pub previous_conf: Configuration,
222 : pub current_conf: Configuration,
223 : }
224 :
225 0 : fn lsn_invalid() -> Lsn {
226 0 : Lsn::INVALID
227 0 : }
228 :
229 : /// Data about safekeeper's timeline, mirrors broker.proto.
230 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
231 : pub struct SkTimelineInfo {
232 : /// Term.
233 : pub term: Option<u64>,
234 : /// Term of the last entry.
235 : pub last_log_term: Option<u64>,
236 : /// LSN of the last record.
237 : #[serde(default = "lsn_invalid")]
238 : pub flush_lsn: Lsn,
239 : /// Up to which LSN safekeeper regards its WAL as committed.
240 : #[serde(default = "lsn_invalid")]
241 : pub commit_lsn: Lsn,
242 : /// LSN up to which safekeeper has backed WAL.
243 : #[serde(default = "lsn_invalid")]
244 : pub backup_lsn: Lsn,
245 : /// LSN of last checkpoint uploaded by pageserver.
246 : #[serde(default = "lsn_invalid")]
247 : pub remote_consistent_lsn: Lsn,
248 : #[serde(default = "lsn_invalid")]
249 : pub peer_horizon_lsn: Lsn,
250 : #[serde(default = "lsn_invalid")]
251 : pub local_start_lsn: Lsn,
252 : /// A connection string to use for WAL receiving.
253 : #[serde(default)]
254 : pub safekeeper_connstr: Option<String>,
255 : #[serde(default)]
256 : pub http_connstr: Option<String>,
257 : // Minimum of all active RO replicas flush LSN
258 : #[serde(default = "lsn_invalid")]
259 : pub standby_horizon: Lsn,
260 : }
261 :
262 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
263 : pub struct TimelineCopyRequest {
264 : pub target_timeline_id: TimelineId,
265 : pub until_lsn: Lsn,
266 : }
267 :
268 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
269 : pub struct TimelineTermBumpRequest {
270 : /// bump to
271 : pub term: Option<u64>,
272 : }
273 :
274 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
275 : pub struct TimelineTermBumpResponse {
276 : // before the request
277 : pub previous_term: u64,
278 : pub current_term: u64,
279 : }
280 :
281 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
282 : pub struct SafekeeperUtilization {
283 : pub timeline_count: u64,
284 : }
285 :
286 : /// pull_timeline request body.
287 0 : #[derive(Debug, Deserialize, Serialize)]
288 : pub struct PullTimelineRequest {
289 : pub tenant_id: TenantId,
290 : pub timeline_id: TimelineId,
291 : pub http_hosts: Vec<String>,
292 : }
293 :
294 0 : #[derive(Debug, Serialize, Deserialize)]
295 : pub struct PullTimelineResponse {
296 : // Donor safekeeper host
297 : pub safekeeper_host: String,
298 : // TODO: add more fields?
299 : }
|