Line data Source code
1 : //! Types used in safekeeper http API. Many of them are also reused internally.
2 :
3 : use std::net::SocketAddr;
4 :
5 : use pageserver_api::shard::ShardIdentity;
6 : use postgres_ffi::TimestampTz;
7 : use serde::{Deserialize, Serialize};
8 : use tokio::time::Instant;
9 : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
10 : use utils::lsn::Lsn;
11 : use utils::pageserver_feedback::PageserverFeedback;
12 :
13 : use crate::membership::Configuration;
14 : use crate::{ServerInfo, Term};
15 :
16 : #[derive(Debug, Serialize)]
17 : pub struct SafekeeperStatus {
18 : pub id: NodeId,
19 : }
20 :
21 0 : #[derive(Serialize, Deserialize, Clone)]
22 : pub struct TimelineCreateRequest {
23 : pub tenant_id: TenantId,
24 : pub timeline_id: TimelineId,
25 : pub mconf: Configuration,
26 : /// In the PG_VERSION_NUM macro format, like 140017.
27 : pub pg_version: u32,
28 : pub system_id: Option<u64>,
29 : // By default WAL_SEGMENT_SIZE
30 : pub wal_seg_size: Option<u32>,
31 : pub start_lsn: Lsn,
32 : // Normal creation should omit this field (start_lsn initializes all LSNs).
33 : // However, we allow specifying custom value higher than start_lsn for
34 : // manual recovery case, see test_s3_wal_replay.
35 : pub commit_lsn: Option<Lsn>,
36 : }
37 :
38 : /// Same as TermLsn, but serializes LSN using display serializer
39 : /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
40 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
41 : pub struct TermSwitchApiEntry {
42 : pub term: Term,
43 : pub lsn: Lsn,
44 : }
45 :
46 : /// Augment AcceptorState with last_log_term for convenience
47 0 : #[derive(Debug, Serialize, Deserialize)]
48 : pub struct AcceptorStateStatus {
49 : pub term: Term,
50 : pub epoch: Term, // aka last_log_term, old `epoch` name is left for compatibility
51 : pub term_history: Vec<TermSwitchApiEntry>,
52 : }
53 :
54 : /// Things safekeeper should know about timeline state on peers.
55 : /// Used as both model and internally.
56 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
57 : pub struct PeerInfo {
58 : pub sk_id: NodeId,
59 : pub term: Term,
60 : /// Term of the last entry.
61 : pub last_log_term: Term,
62 : /// LSN of the last record.
63 : pub flush_lsn: Lsn,
64 : pub commit_lsn: Lsn,
65 : /// Since which LSN safekeeper has WAL.
66 : pub local_start_lsn: Lsn,
67 : /// When info was received. Serde annotations are not very useful but make
68 : /// the code compile -- we don't rely on this field externally.
69 : #[serde(skip)]
70 : #[serde(default = "Instant::now")]
71 : pub ts: Instant,
72 : pub pg_connstr: String,
73 : pub http_connstr: String,
74 : }
75 :
76 : pub type FullTransactionId = u64;
77 :
78 : /// Hot standby feedback received from replica
79 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
80 : pub struct HotStandbyFeedback {
81 : pub ts: TimestampTz,
82 : pub xmin: FullTransactionId,
83 : pub catalog_xmin: FullTransactionId,
84 : }
85 :
86 : pub const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
87 :
88 : impl HotStandbyFeedback {
89 3310 : pub fn empty() -> HotStandbyFeedback {
90 3310 : HotStandbyFeedback {
91 3310 : ts: 0,
92 3310 : xmin: 0,
93 3310 : catalog_xmin: 0,
94 3310 : }
95 3310 : }
96 : }
97 :
98 : /// Standby status update
99 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
100 : pub struct StandbyReply {
101 : pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
102 : pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
103 : pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
104 : pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
105 : pub reply_requested: bool,
106 : }
107 :
108 : impl StandbyReply {
109 13 : pub fn empty() -> Self {
110 13 : StandbyReply {
111 13 : write_lsn: Lsn::INVALID,
112 13 : flush_lsn: Lsn::INVALID,
113 13 : apply_lsn: Lsn::INVALID,
114 13 : reply_ts: 0,
115 13 : reply_requested: false,
116 13 : }
117 13 : }
118 : }
119 :
120 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
121 : pub struct StandbyFeedback {
122 : pub reply: StandbyReply,
123 : pub hs_feedback: HotStandbyFeedback,
124 : }
125 :
126 : impl StandbyFeedback {
127 7 : pub fn empty() -> Self {
128 7 : StandbyFeedback {
129 7 : reply: StandbyReply::empty(),
130 7 : hs_feedback: HotStandbyFeedback::empty(),
131 7 : }
132 7 : }
133 : }
134 :
135 : /// Receiver is either pageserver or regular standby, which have different
136 : /// feedbacks.
137 : /// Used as both model and internally.
138 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
139 : pub enum ReplicationFeedback {
140 : Pageserver(PageserverFeedback),
141 : Standby(StandbyFeedback),
142 : }
143 :
144 : /// Uniquely identifies a WAL service connection. Logged in spans for
145 : /// observability.
146 : pub type ConnectionId = u32;
147 :
148 : /// Serialize is used only for json'ing in API response. Also used internally.
149 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
150 : pub enum WalSenderState {
151 : Vanilla(VanillaWalSenderState),
152 : Interpreted(InterpretedWalSenderState),
153 : }
154 :
155 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
156 : pub struct VanillaWalSenderState {
157 : pub ttid: TenantTimelineId,
158 : pub addr: SocketAddr,
159 : pub conn_id: ConnectionId,
160 : // postgres application_name
161 : pub appname: Option<String>,
162 : pub feedback: ReplicationFeedback,
163 : }
164 :
165 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
166 : pub struct InterpretedWalSenderState {
167 : pub ttid: TenantTimelineId,
168 : pub shard: ShardIdentity,
169 : pub addr: SocketAddr,
170 : pub conn_id: ConnectionId,
171 : // postgres application_name
172 : pub appname: Option<String>,
173 : pub feedback: ReplicationFeedback,
174 : }
175 :
176 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
177 : pub struct WalReceiverState {
178 : /// None means it is recovery initiated by us (this safekeeper).
179 : pub conn_id: Option<ConnectionId>,
180 : pub status: WalReceiverStatus,
181 : }
182 :
183 : /// Walreceiver status. Currently only whether it passed voting stage and
184 : /// started receiving the stream, but it is easy to add more if needed.
185 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
186 : pub enum WalReceiverStatus {
187 : Voting,
188 : Streaming,
189 : }
190 :
191 : /// Info about timeline on safekeeper ready for reporting.
192 0 : #[derive(Debug, Serialize, Deserialize)]
193 : pub struct TimelineStatus {
194 : pub tenant_id: TenantId,
195 : pub timeline_id: TimelineId,
196 : pub mconf: Configuration,
197 : pub acceptor_state: AcceptorStateStatus,
198 : pub pg_info: ServerInfo,
199 : pub flush_lsn: Lsn,
200 : pub timeline_start_lsn: Lsn,
201 : pub local_start_lsn: Lsn,
202 : pub commit_lsn: Lsn,
203 : pub backup_lsn: Lsn,
204 : pub peer_horizon_lsn: Lsn,
205 : pub remote_consistent_lsn: Lsn,
206 : pub peers: Vec<PeerInfo>,
207 : pub walsenders: Vec<WalSenderState>,
208 : pub walreceivers: Vec<WalReceiverState>,
209 : }
210 :
211 : /// Request to switch membership configuration.
212 0 : #[derive(Serialize, Deserialize)]
213 : #[serde(transparent)]
214 : pub struct TimelineMembershipSwitchRequest {
215 : pub mconf: Configuration,
216 : }
217 :
218 : /// In response both previous and current configuration are sent.
219 0 : #[derive(Serialize, Deserialize)]
220 : pub struct TimelineMembershipSwitchResponse {
221 : pub previous_conf: Configuration,
222 : pub current_conf: Configuration,
223 : }
224 :
225 0 : #[derive(Clone, Copy, Serialize, Deserialize)]
226 : pub struct TimelineDeleteResult {
227 : pub dir_existed: bool,
228 : }
229 :
230 0 : fn lsn_invalid() -> Lsn {
231 0 : Lsn::INVALID
232 0 : }
233 :
234 : /// Data about safekeeper's timeline, mirrors broker.proto.
235 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
236 : pub struct SkTimelineInfo {
237 : /// Term.
238 : pub term: Option<u64>,
239 : /// Term of the last entry.
240 : pub last_log_term: Option<u64>,
241 : /// LSN of the last record.
242 : #[serde(default = "lsn_invalid")]
243 : pub flush_lsn: Lsn,
244 : /// Up to which LSN safekeeper regards its WAL as committed.
245 : #[serde(default = "lsn_invalid")]
246 : pub commit_lsn: Lsn,
247 : /// LSN up to which safekeeper has backed WAL.
248 : #[serde(default = "lsn_invalid")]
249 : pub backup_lsn: Lsn,
250 : /// LSN of last checkpoint uploaded by pageserver.
251 : #[serde(default = "lsn_invalid")]
252 : pub remote_consistent_lsn: Lsn,
253 : #[serde(default = "lsn_invalid")]
254 : pub peer_horizon_lsn: Lsn,
255 : #[serde(default = "lsn_invalid")]
256 : pub local_start_lsn: Lsn,
257 : /// A connection string to use for WAL receiving.
258 : #[serde(default)]
259 : pub safekeeper_connstr: Option<String>,
260 : #[serde(default)]
261 : pub http_connstr: Option<String>,
262 : // Minimum of all active RO replicas flush LSN
263 : #[serde(default = "lsn_invalid")]
264 : pub standby_horizon: Lsn,
265 : }
266 :
267 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
268 : pub struct TimelineCopyRequest {
269 : pub target_timeline_id: TimelineId,
270 : pub until_lsn: Lsn,
271 : }
272 :
273 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
274 : pub struct TimelineTermBumpRequest {
275 : /// bump to
276 : pub term: Option<u64>,
277 : }
278 :
279 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
280 : pub struct TimelineTermBumpResponse {
281 : // before the request
282 : pub previous_term: u64,
283 : pub current_term: u64,
284 : }
285 :
286 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
287 : pub struct SafekeeperUtilization {
288 : pub timeline_count: u64,
289 : }
290 :
291 : /// pull_timeline request body.
292 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
293 : pub struct PullTimelineRequest {
294 : pub tenant_id: TenantId,
295 : pub timeline_id: TimelineId,
296 : pub http_hosts: Vec<String>,
297 : }
298 :
299 0 : #[derive(Debug, Serialize, Deserialize)]
300 : pub struct PullTimelineResponse {
301 : // Donor safekeeper host
302 : pub safekeeper_host: String,
303 : // TODO: add more fields?
304 : }
|