Line data Source code
1 : //! Types used in safekeeper http API. Many of them are also reused internally.
2 :
3 : use std::net::SocketAddr;
4 :
5 : use pageserver_api::shard::ShardIdentity;
6 : use postgres_ffi::TimestampTz;
7 : use serde::{Deserialize, Serialize};
8 : use tokio::time::Instant;
9 : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
10 : use utils::lsn::Lsn;
11 : use utils::pageserver_feedback::PageserverFeedback;
12 :
13 : use crate::membership::Configuration;
14 : use crate::{ServerInfo, Term};
15 :
16 : #[derive(Debug, Serialize)]
17 : pub struct SafekeeperStatus {
18 : pub id: NodeId,
19 : }
20 :
21 0 : #[derive(Serialize, Deserialize, Clone)]
22 : pub struct TimelineCreateRequest {
23 : pub tenant_id: TenantId,
24 : pub timeline_id: TimelineId,
25 : pub mconf: Configuration,
26 : pub pg_version: u32,
27 : pub system_id: Option<u64>,
28 : // By default WAL_SEGMENT_SIZE
29 : pub wal_seg_size: Option<u32>,
30 : pub start_lsn: Lsn,
31 : // Normal creation should omit this field (start_lsn initializes all LSNs).
32 : // However, we allow specifying custom value higher than start_lsn for
33 : // manual recovery case, see test_s3_wal_replay.
34 : pub commit_lsn: Option<Lsn>,
35 : }
36 :
37 : /// Same as TermLsn, but serializes LSN using display serializer
38 : /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
39 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
40 : pub struct TermSwitchApiEntry {
41 : pub term: Term,
42 : pub lsn: Lsn,
43 : }
44 :
45 : /// Augment AcceptorState with last_log_term for convenience
46 0 : #[derive(Debug, Serialize, Deserialize)]
47 : pub struct AcceptorStateStatus {
48 : pub term: Term,
49 : pub epoch: Term, // aka last_log_term, old `epoch` name is left for compatibility
50 : pub term_history: Vec<TermSwitchApiEntry>,
51 : }
52 :
53 : /// Things safekeeper should know about timeline state on peers.
54 : /// Used as both model and internally.
55 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
56 : pub struct PeerInfo {
57 : pub sk_id: NodeId,
58 : pub term: Term,
59 : /// Term of the last entry.
60 : pub last_log_term: Term,
61 : /// LSN of the last record.
62 : pub flush_lsn: Lsn,
63 : pub commit_lsn: Lsn,
64 : /// Since which LSN safekeeper has WAL.
65 : pub local_start_lsn: Lsn,
66 : /// When info was received. Serde annotations are not very useful but make
67 : /// the code compile -- we don't rely on this field externally.
68 : #[serde(skip)]
69 : #[serde(default = "Instant::now")]
70 : pub ts: Instant,
71 : pub pg_connstr: String,
72 : pub http_connstr: String,
73 : }
74 :
75 : pub type FullTransactionId = u64;
76 :
77 : /// Hot standby feedback received from replica
78 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
79 : pub struct HotStandbyFeedback {
80 : pub ts: TimestampTz,
81 : pub xmin: FullTransactionId,
82 : pub catalog_xmin: FullTransactionId,
83 : }
84 :
85 : pub const INVALID_FULL_TRANSACTION_ID: FullTransactionId = 0;
86 :
87 : impl HotStandbyFeedback {
88 3400 : pub fn empty() -> HotStandbyFeedback {
89 3400 : HotStandbyFeedback {
90 3400 : ts: 0,
91 3400 : xmin: 0,
92 3400 : catalog_xmin: 0,
93 3400 : }
94 3400 : }
95 : }
96 :
97 : /// Standby status update
98 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
99 : pub struct StandbyReply {
100 : pub write_lsn: Lsn, // The location of the last WAL byte + 1 received and written to disk in the standby.
101 : pub flush_lsn: Lsn, // The location of the last WAL byte + 1 flushed to disk in the standby.
102 : pub apply_lsn: Lsn, // The location of the last WAL byte + 1 applied in the standby.
103 : pub reply_ts: TimestampTz, // The client's system clock at the time of transmission, as microseconds since midnight on 2000-01-01.
104 : pub reply_requested: bool,
105 : }
106 :
107 : impl StandbyReply {
108 13 : pub fn empty() -> Self {
109 13 : StandbyReply {
110 13 : write_lsn: Lsn::INVALID,
111 13 : flush_lsn: Lsn::INVALID,
112 13 : apply_lsn: Lsn::INVALID,
113 13 : reply_ts: 0,
114 13 : reply_requested: false,
115 13 : }
116 13 : }
117 : }
118 :
119 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
120 : pub struct StandbyFeedback {
121 : pub reply: StandbyReply,
122 : pub hs_feedback: HotStandbyFeedback,
123 : }
124 :
125 : impl StandbyFeedback {
126 7 : pub fn empty() -> Self {
127 7 : StandbyFeedback {
128 7 : reply: StandbyReply::empty(),
129 7 : hs_feedback: HotStandbyFeedback::empty(),
130 7 : }
131 7 : }
132 : }
133 :
134 : /// Receiver is either pageserver or regular standby, which have different
135 : /// feedbacks.
136 : /// Used as both model and internally.
137 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
138 : pub enum ReplicationFeedback {
139 : Pageserver(PageserverFeedback),
140 : Standby(StandbyFeedback),
141 : }
142 :
143 : /// Uniquely identifies a WAL service connection. Logged in spans for
144 : /// observability.
145 : pub type ConnectionId = u32;
146 :
147 : /// Serialize is used only for json'ing in API response. Also used internally.
148 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
149 : pub enum WalSenderState {
150 : Vanilla(VanillaWalSenderState),
151 : Interpreted(InterpretedWalSenderState),
152 : }
153 :
154 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
155 : pub struct VanillaWalSenderState {
156 : pub ttid: TenantTimelineId,
157 : pub addr: SocketAddr,
158 : pub conn_id: ConnectionId,
159 : // postgres application_name
160 : pub appname: Option<String>,
161 : pub feedback: ReplicationFeedback,
162 : }
163 :
164 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
165 : pub struct InterpretedWalSenderState {
166 : pub ttid: TenantTimelineId,
167 : pub shard: ShardIdentity,
168 : pub addr: SocketAddr,
169 : pub conn_id: ConnectionId,
170 : // postgres application_name
171 : pub appname: Option<String>,
172 : pub feedback: ReplicationFeedback,
173 : }
174 :
175 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
176 : pub struct WalReceiverState {
177 : /// None means it is recovery initiated by us (this safekeeper).
178 : pub conn_id: Option<ConnectionId>,
179 : pub status: WalReceiverStatus,
180 : }
181 :
182 : /// Walreceiver status. Currently only whether it passed voting stage and
183 : /// started receiving the stream, but it is easy to add more if needed.
184 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
185 : pub enum WalReceiverStatus {
186 : Voting,
187 : Streaming,
188 : }
189 :
190 : /// Info about timeline on safekeeper ready for reporting.
191 0 : #[derive(Debug, Serialize, Deserialize)]
192 : pub struct TimelineStatus {
193 : pub tenant_id: TenantId,
194 : pub timeline_id: TimelineId,
195 : pub mconf: Configuration,
196 : pub acceptor_state: AcceptorStateStatus,
197 : pub pg_info: ServerInfo,
198 : pub flush_lsn: Lsn,
199 : pub timeline_start_lsn: Lsn,
200 : pub local_start_lsn: Lsn,
201 : pub commit_lsn: Lsn,
202 : pub backup_lsn: Lsn,
203 : pub peer_horizon_lsn: Lsn,
204 : pub remote_consistent_lsn: Lsn,
205 : pub peers: Vec<PeerInfo>,
206 : pub walsenders: Vec<WalSenderState>,
207 : pub walreceivers: Vec<WalReceiverState>,
208 : }
209 :
210 : /// Request to switch membership configuration.
211 0 : #[derive(Serialize, Deserialize)]
212 : #[serde(transparent)]
213 : pub struct TimelineMembershipSwitchRequest {
214 : pub mconf: Configuration,
215 : }
216 :
217 : /// In response both previous and current configuration are sent.
218 0 : #[derive(Serialize, Deserialize)]
219 : pub struct TimelineMembershipSwitchResponse {
220 : pub previous_conf: Configuration,
221 : pub current_conf: Configuration,
222 : }
223 :
224 0 : fn lsn_invalid() -> Lsn {
225 0 : Lsn::INVALID
226 0 : }
227 :
228 : /// Data about safekeeper's timeline, mirrors broker.proto.
229 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
230 : pub struct SkTimelineInfo {
231 : /// Term.
232 : pub term: Option<u64>,
233 : /// Term of the last entry.
234 : pub last_log_term: Option<u64>,
235 : /// LSN of the last record.
236 : #[serde(default = "lsn_invalid")]
237 : pub flush_lsn: Lsn,
238 : /// Up to which LSN safekeeper regards its WAL as committed.
239 : #[serde(default = "lsn_invalid")]
240 : pub commit_lsn: Lsn,
241 : /// LSN up to which safekeeper has backed WAL.
242 : #[serde(default = "lsn_invalid")]
243 : pub backup_lsn: Lsn,
244 : /// LSN of last checkpoint uploaded by pageserver.
245 : #[serde(default = "lsn_invalid")]
246 : pub remote_consistent_lsn: Lsn,
247 : #[serde(default = "lsn_invalid")]
248 : pub peer_horizon_lsn: Lsn,
249 : #[serde(default = "lsn_invalid")]
250 : pub local_start_lsn: Lsn,
251 : /// A connection string to use for WAL receiving.
252 : #[serde(default)]
253 : pub safekeeper_connstr: Option<String>,
254 : #[serde(default)]
255 : pub http_connstr: Option<String>,
256 : // Minimum of all active RO replicas flush LSN
257 : #[serde(default = "lsn_invalid")]
258 : pub standby_horizon: Lsn,
259 : }
260 :
261 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
262 : pub struct TimelineCopyRequest {
263 : pub target_timeline_id: TimelineId,
264 : pub until_lsn: Lsn,
265 : }
266 :
267 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
268 : pub struct TimelineTermBumpRequest {
269 : /// bump to
270 : pub term: Option<u64>,
271 : }
272 :
273 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
274 : pub struct TimelineTermBumpResponse {
275 : // before the request
276 : pub previous_term: u64,
277 : pub current_term: u64,
278 : }
279 :
280 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
281 : pub struct SafekeeperUtilization {
282 : pub timeline_count: u64,
283 : }
284 :
285 : /// pull_timeline request body.
286 0 : #[derive(Debug, Clone, Deserialize, Serialize)]
287 : pub struct PullTimelineRequest {
288 : pub tenant_id: TenantId,
289 : pub timeline_id: TimelineId,
290 : pub http_hosts: Vec<String>,
291 : }
292 :
293 0 : #[derive(Debug, Serialize, Deserialize)]
294 : pub struct PullTimelineResponse {
295 : // Donor safekeeper host
296 : pub safekeeper_host: String,
297 : // TODO: add more fields?
298 : }
|