TLA Line data Source code
1 : use hyper::{Body, Request, Response, StatusCode, Uri};
2 :
3 : use once_cell::sync::Lazy;
4 : use postgres_ffi::WAL_SEGMENT_SIZE;
5 : use safekeeper_api::models::SkTimelineInfo;
6 : use serde::{Deserialize, Serialize};
7 : use serde_with::{serde_as, DisplayFromStr};
8 : use std::collections::{HashMap, HashSet};
9 : use std::fmt;
10 : use std::str::FromStr;
11 : use std::sync::Arc;
12 : use storage_broker::proto::SafekeeperTimelineInfo;
13 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
14 : use tokio::fs::File;
15 : use tokio::io::AsyncReadExt;
16 : use utils::http::endpoint::request_span;
17 :
18 : use crate::receive_wal::WalReceiverState;
19 : use crate::safekeeper::ServerInfo;
20 : use crate::safekeeper::Term;
21 : use crate::send_wal::WalSenderState;
22 : use crate::timeline::PeerInfo;
23 : use crate::{debug_dump, pull_timeline};
24 :
25 : use crate::timelines_global_map::TimelineDeleteForceResult;
26 : use crate::GlobalTimelines;
27 : use crate::SafeKeeperConf;
28 : use utils::{
29 : auth::JwtAuth,
30 : http::{
31 : endpoint::{self, auth_middleware, check_permission_with},
32 : error::ApiError,
33 : json::{json_request, json_response},
34 : request::{ensure_no_body, parse_request_param},
35 : RequestExt, RouterBuilder,
36 : },
37 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
38 : lsn::Lsn,
39 : };
40 :
41 : use super::models::TimelineCreateRequest;
42 :
43 CBC 997 : #[derive(Debug, Serialize)]
44 : struct SafekeeperStatus {
45 : id: NodeId,
46 : }
47 :
48 : /// Healthcheck handler.
49 997 : async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
50 997 : check_permission(&request, None)?;
51 997 : let conf = get_conf(&request);
52 997 : let status = SafekeeperStatus { id: conf.my_id };
53 997 : json_response(StatusCode::OK, status)
54 997 : }
55 :
56 1209 : fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
57 1209 : request
58 1209 : .data::<Arc<SafeKeeperConf>>()
59 1209 : .expect("unknown state type")
60 1209 : .as_ref()
61 1209 : }
62 :
63 : /// Same as TermSwitchEntry, but serializes LSN using display serializer
64 : /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
65 : #[serde_as]
66 293 : #[derive(Debug, Serialize, Deserialize)]
67 : pub struct TermSwitchApiEntry {
68 : pub term: Term,
69 : #[serde_as(as = "DisplayFromStr")]
70 : pub lsn: Lsn,
71 : }
72 :
73 : /// Augment AcceptorState with epoch for convenience
74 213 : #[derive(Debug, Serialize, Deserialize)]
75 : pub struct AcceptorStateStatus {
76 : pub term: Term,
77 : pub epoch: Term,
78 : pub term_history: Vec<TermSwitchApiEntry>,
79 : }
80 :
81 : /// Info about timeline on safekeeper ready for reporting.
82 : #[serde_as]
83 212 : #[derive(Debug, Serialize, Deserialize)]
84 : pub struct TimelineStatus {
85 : #[serde_as(as = "DisplayFromStr")]
86 : pub tenant_id: TenantId,
87 : #[serde_as(as = "DisplayFromStr")]
88 : pub timeline_id: TimelineId,
89 : pub acceptor_state: AcceptorStateStatus,
90 : pub pg_info: ServerInfo,
91 : #[serde_as(as = "DisplayFromStr")]
92 : pub flush_lsn: Lsn,
93 : #[serde_as(as = "DisplayFromStr")]
94 : pub timeline_start_lsn: Lsn,
95 : #[serde_as(as = "DisplayFromStr")]
96 : pub local_start_lsn: Lsn,
97 : #[serde_as(as = "DisplayFromStr")]
98 : pub commit_lsn: Lsn,
99 : #[serde_as(as = "DisplayFromStr")]
100 : pub backup_lsn: Lsn,
101 : #[serde_as(as = "DisplayFromStr")]
102 : pub peer_horizon_lsn: Lsn,
103 : #[serde_as(as = "DisplayFromStr")]
104 : pub remote_consistent_lsn: Lsn,
105 : pub peers: Vec<PeerInfo>,
106 : pub walsenders: Vec<WalSenderState>,
107 : pub walreceivers: Vec<WalReceiverState>,
108 : }
109 :
110 1257 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
111 1257 : check_permission_with(request, |claims| {
112 26 : crate::auth::check_permission(claims, tenant_id)
113 1257 : })
114 1257 : }
115 :
116 : /// Report info about timeline.
117 213 : async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
118 213 : let ttid = TenantTimelineId::new(
119 213 : parse_request_param(&request, "tenant_id")?,
120 213 : parse_request_param(&request, "timeline_id")?,
121 : );
122 213 : check_permission(&request, Some(ttid.tenant_id))?;
123 :
124 212 : let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
125 212 : let (inmem, state) = tli.get_state().await;
126 212 : let flush_lsn = tli.get_flush_lsn().await;
127 :
128 212 : let epoch = state.acceptor_state.get_epoch(flush_lsn);
129 212 : let term_history = state
130 212 : .acceptor_state
131 212 : .term_history
132 212 : .0
133 212 : .into_iter()
134 292 : .map(|ts| TermSwitchApiEntry {
135 292 : term: ts.term,
136 292 : lsn: ts.lsn,
137 292 : })
138 212 : .collect();
139 212 : let acc_state = AcceptorStateStatus {
140 212 : term: state.acceptor_state.term,
141 212 : epoch,
142 212 : term_history,
143 212 : };
144 212 :
145 212 : let conf = get_conf(&request);
146 : // Note: we report in memory values which can be lost.
147 212 : let status = TimelineStatus {
148 212 : tenant_id: ttid.tenant_id,
149 212 : timeline_id: ttid.timeline_id,
150 212 : acceptor_state: acc_state,
151 212 : pg_info: state.server,
152 212 : flush_lsn,
153 212 : timeline_start_lsn: state.timeline_start_lsn,
154 212 : local_start_lsn: state.local_start_lsn,
155 212 : commit_lsn: inmem.commit_lsn,
156 212 : backup_lsn: inmem.backup_lsn,
157 212 : peer_horizon_lsn: inmem.peer_horizon_lsn,
158 212 : remote_consistent_lsn: tli.get_walsenders().get_remote_consistent_lsn(),
159 212 : peers: tli.get_peers(conf).await,
160 212 : walsenders: tli.get_walsenders().get_all(),
161 212 : walreceivers: tli.get_walreceivers().get_all(),
162 212 : };
163 212 : json_response(StatusCode::OK, status)
164 213 : }
165 :
166 9 : async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
167 9 : let request_data: TimelineCreateRequest = json_request(&mut request).await?;
168 :
169 9 : let ttid = TenantTimelineId {
170 9 : tenant_id: request_data.tenant_id,
171 9 : timeline_id: request_data.timeline_id,
172 9 : };
173 9 : check_permission(&request, Some(ttid.tenant_id))?;
174 :
175 9 : let server_info = ServerInfo {
176 9 : pg_version: request_data.pg_version,
177 9 : system_id: request_data.system_id.unwrap_or(0),
178 9 : wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
179 9 : };
180 9 : let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| {
181 9 : request_data
182 9 : .commit_lsn
183 9 : .segment_lsn(server_info.wal_seg_size as usize)
184 9 : });
185 9 : GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
186 45 : .await
187 9 : .map_err(ApiError::InternalServerError)?;
188 :
189 9 : json_response(StatusCode::OK, ())
190 9 : }
191 :
192 : /// Pull timeline from peer safekeeper instances.
193 1 : async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
194 1 : check_permission(&request, None)?;
195 :
196 1 : let data: pull_timeline::Request = json_request(&mut request).await?;
197 :
198 1 : let resp = pull_timeline::handle_request(data)
199 113 : .await
200 1 : .map_err(ApiError::InternalServerError)?;
201 1 : json_response(StatusCode::OK, resp)
202 1 : }
203 :
204 : /// Download a file from the timeline directory.
205 : // TODO: figure out a better way to copy files between safekeepers
206 3 : async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
207 3 : let ttid = TenantTimelineId::new(
208 3 : parse_request_param(&request, "tenant_id")?,
209 3 : parse_request_param(&request, "timeline_id")?,
210 : );
211 3 : check_permission(&request, Some(ttid.tenant_id))?;
212 :
213 3 : let filename: String = parse_request_param(&request, "filename")?;
214 :
215 3 : let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
216 :
217 3 : let filepath = tli.timeline_dir.join(filename);
218 3 : let mut file = File::open(&filepath)
219 3 : .await
220 3 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
221 :
222 3 : let mut content = Vec::new();
223 3 : // TODO: don't store files in memory
224 3 : file.read_to_end(&mut content)
225 55 : .await
226 3 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
227 :
228 3 : Response::builder()
229 3 : .status(StatusCode::OK)
230 3 : .header("Content-Type", "application/octet-stream")
231 3 : .body(Body::from(content))
232 3 : .map_err(|e| ApiError::InternalServerError(e.into()))
233 3 : }
234 :
235 : /// Deactivates the timeline and removes its data directory.
236 18 : async fn timeline_delete_force_handler(
237 18 : mut request: Request<Body>,
238 18 : ) -> Result<Response<Body>, ApiError> {
239 18 : let ttid = TenantTimelineId::new(
240 18 : parse_request_param(&request, "tenant_id")?,
241 18 : parse_request_param(&request, "timeline_id")?,
242 : );
243 18 : check_permission(&request, Some(ttid.tenant_id))?;
244 17 : ensure_no_body(&mut request).await?;
245 : // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
246 : // error handling here when we're able to.
247 17 : let resp = GlobalTimelines::delete_force(&ttid)
248 15 : .await
249 17 : .map_err(ApiError::InternalServerError)?;
250 17 : json_response(StatusCode::OK, resp)
251 18 : }
252 :
253 : /// Deactivates all timelines for the tenant and removes its data directory.
254 : /// See `timeline_delete_force_handler`.
255 5 : async fn tenant_delete_force_handler(
256 5 : mut request: Request<Body>,
257 5 : ) -> Result<Response<Body>, ApiError> {
258 5 : let tenant_id = parse_request_param(&request, "tenant_id")?;
259 5 : check_permission(&request, Some(tenant_id))?;
260 4 : ensure_no_body(&mut request).await?;
261 : // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
262 : // Using an `InternalServerError` should be fixed when the types support it
263 4 : let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id)
264 16 : .await
265 4 : .map_err(ApiError::InternalServerError)?;
266 4 : json_response(
267 4 : StatusCode::OK,
268 4 : delete_info
269 4 : .iter()
270 16 : .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
271 4 : .collect::<HashMap<String, TimelineDeleteForceResult>>(),
272 4 : )
273 5 : }
274 :
275 : /// Used only in tests to hand craft required data.
276 6 : async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
277 6 : let ttid = TenantTimelineId::new(
278 6 : parse_request_param(&request, "tenant_id")?,
279 6 : parse_request_param(&request, "timeline_id")?,
280 : );
281 6 : check_permission(&request, Some(ttid.tenant_id))?;
282 5 : let sk_info: SkTimelineInfo = json_request(&mut request).await?;
283 5 : let proto_sk_info = SafekeeperTimelineInfo {
284 5 : safekeeper_id: 0,
285 5 : tenant_timeline_id: Some(ProtoTenantTimelineId {
286 5 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
287 5 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
288 5 : }),
289 5 : term: sk_info.term.unwrap_or(0),
290 5 : last_log_term: sk_info.last_log_term.unwrap_or(0),
291 5 : flush_lsn: sk_info.flush_lsn.0,
292 5 : commit_lsn: sk_info.commit_lsn.0,
293 5 : remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
294 5 : peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
295 5 : safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
296 5 : http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
297 5 : backup_lsn: sk_info.backup_lsn.0,
298 5 : local_start_lsn: sk_info.local_start_lsn.0,
299 5 : availability_zone: None,
300 5 : };
301 :
302 5 : let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
303 5 : tli.record_safekeeper_info(proto_sk_info)
304 15 : .await
305 5 : .map_err(ApiError::InternalServerError)?;
306 :
307 5 : json_response(StatusCode::OK, ())
308 6 : }
309 :
310 7 : fn parse_kv_str<E: fmt::Display, T: FromStr<Err = E>>(k: &str, v: &str) -> Result<T, ApiError> {
311 7 : v.parse()
312 7 : .map_err(|e| ApiError::BadRequest(anyhow::anyhow!("cannot parse {k}: {e}")))
313 7 : }
314 :
315 : /// Dump debug info about all available safekeeper state.
316 5 : async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
317 5 : check_permission(&request, None)?;
318 5 : ensure_no_body(&mut request).await?;
319 :
320 5 : let mut dump_all: Option<bool> = None;
321 5 : let mut dump_control_file: Option<bool> = None;
322 5 : let mut dump_memory: Option<bool> = None;
323 5 : let mut dump_disk_content: Option<bool> = None;
324 5 : let mut dump_term_history: Option<bool> = None;
325 5 : let mut tenant_id: Option<TenantId> = None;
326 5 : let mut timeline_id: Option<TimelineId> = None;
327 5 :
328 5 : let query = request.uri().query().unwrap_or("");
329 5 : let mut values = url::form_urlencoded::parse(query.as_bytes());
330 :
331 12 : for (k, v) in &mut values {
332 7 : match k.as_ref() {
333 7 : "dump_all" => dump_all = Some(parse_kv_str(&k, &v)?),
334 2 : "dump_control_file" => dump_control_file = Some(parse_kv_str(&k, &v)?),
335 2 : "dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
336 2 : "dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
337 2 : "dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
338 2 : "tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
339 1 : "timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
340 UBC 0 : _ => Err(ApiError::BadRequest(anyhow::anyhow!(
341 0 : "Unknown query parameter: {}",
342 0 : k
343 0 : )))?,
344 : }
345 : }
346 :
347 CBC 5 : let dump_all = dump_all.unwrap_or(false);
348 5 : let dump_control_file = dump_control_file.unwrap_or(dump_all);
349 5 : let dump_memory = dump_memory.unwrap_or(dump_all);
350 5 : let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
351 5 : let dump_term_history = dump_term_history.unwrap_or(true);
352 5 :
353 5 : let args = debug_dump::Args {
354 5 : dump_all,
355 5 : dump_control_file,
356 5 : dump_memory,
357 5 : dump_disk_content,
358 5 : dump_term_history,
359 5 : tenant_id,
360 5 : timeline_id,
361 5 : };
362 :
363 5 : let resp = debug_dump::build(args)
364 UBC 0 : .await
365 CBC 5 : .map_err(ApiError::InternalServerError)?;
366 :
367 : // TODO: use streaming response
368 5 : json_response(StatusCode::OK, resp)
369 5 : }
370 :
371 : /// Safekeeper http router.
372 500 : pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
373 500 : let mut router = endpoint::make_router();
374 500 : if conf.http_auth.is_some() {
375 71 : router = router.middleware(auth_middleware(|request| {
376 71 : #[allow(clippy::mutable_key_type)]
377 71 : static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
378 18 : ["/v1/status", "/metrics"]
379 18 : .iter()
380 36 : .map(|v| v.parse().unwrap())
381 18 : .collect()
382 18 : });
383 71 : if ALLOWLIST_ROUTES.contains(request.uri()) {
384 40 : None
385 : } else {
386 : // Option<Arc<JwtAuth>> is always provided as data below, hence unwrap().
387 31 : request.data::<Option<Arc<JwtAuth>>>().unwrap().as_deref()
388 : }
389 71 : }))
390 482 : }
391 :
392 : // NB: on any changes do not forget to update the OpenAPI spec
393 : // located nearby (/safekeeper/src/http/openapi_spec.yaml).
394 500 : let auth = conf.http_auth.clone();
395 500 : router
396 500 : .data(Arc::new(conf))
397 500 : .data(auth)
398 997 : .get("/v1/status", |r| request_span(r, status_handler))
399 500 : // Will be used in the future instead of implicit timeline creation
400 500 : .post("/v1/tenant/timeline", |r| {
401 9 : request_span(r, timeline_create_handler)
402 500 : })
403 500 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
404 213 : request_span(r, timeline_status_handler)
405 500 : })
406 500 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
407 18 : request_span(r, timeline_delete_force_handler)
408 500 : })
409 500 : .delete("/v1/tenant/:tenant_id", |r| {
410 5 : request_span(r, tenant_delete_force_handler)
411 500 : })
412 500 : .post("/v1/pull_timeline", |r| {
413 1 : request_span(r, timeline_pull_handler)
414 500 : })
415 500 : .get(
416 500 : "/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
417 500 : |r| request_span(r, timeline_files_handler),
418 500 : )
419 500 : // for tests
420 500 : .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
421 6 : request_span(r, record_safekeeper_info)
422 500 : })
423 500 : .get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
424 500 : }
425 :
426 : #[cfg(test)]
427 : mod tests {
428 : use super::*;
429 :
430 1 : #[test]
431 1 : fn test_term_switch_entry_api_serialize() {
432 1 : let state = AcceptorStateStatus {
433 1 : term: 1,
434 1 : epoch: 1,
435 1 : term_history: vec![TermSwitchApiEntry {
436 1 : term: 1,
437 1 : lsn: Lsn(0x16FFDDDD),
438 1 : }],
439 1 : };
440 1 : let json = serde_json::to_string(&state).unwrap();
441 1 : assert_eq!(
442 1 : json,
443 1 : "{\"term\":1,\"epoch\":1,\"term_history\":[{\"term\":1,\"lsn\":\"0/16FFDDDD\"}]}"
444 1 : );
445 1 : }
446 : }
|