Line data Source code
1 : use hyper::{Body, Request, Response, StatusCode, Uri};
2 : use once_cell::sync::Lazy;
3 : use serde::{Deserialize, Serialize};
4 : use std::collections::{HashMap, HashSet};
5 : use std::fmt;
6 : use std::io::Write as _;
7 : use std::str::FromStr;
8 : use std::sync::Arc;
9 : use storage_broker::proto::SafekeeperTimelineInfo;
10 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
11 : use tokio::sync::mpsc;
12 : use tokio::task;
13 : use tokio_stream::wrappers::ReceiverStream;
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::{info_span, Instrument};
16 : use utils::failpoint_support::failpoints_handler;
17 : use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWriter};
18 : use utils::http::request::parse_query_param;
19 :
20 : use postgres_ffi::WAL_SEGMENT_SIZE;
21 : use safekeeper_api::models::TimelineCreateRequest;
22 : use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
23 : use utils::{
24 : auth::SwappableJwtAuth,
25 : http::{
26 : endpoint::{self, auth_middleware, check_permission_with},
27 : error::ApiError,
28 : json::{json_request, json_response},
29 : request::{ensure_no_body, parse_request_param},
30 : RequestExt, RouterBuilder,
31 : },
32 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
33 : lsn::Lsn,
34 : };
35 :
36 : use crate::debug_dump::TimelineDigestRequest;
37 : use crate::receive_wal::WalReceiverState;
38 : use crate::safekeeper::Term;
39 : use crate::safekeeper::{ServerInfo, TermLsn};
40 : use crate::send_wal::WalSenderState;
41 : use crate::timeline::PeerInfo;
42 : use crate::timelines_global_map::TimelineDeleteForceResult;
43 : use crate::GlobalTimelines;
44 : use crate::SafeKeeperConf;
45 : use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline};
46 :
47 : #[derive(Debug, Serialize)]
48 : struct SafekeeperStatus {
49 : id: NodeId,
50 : }
51 :
52 : /// Healthcheck handler.
53 0 : async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
54 0 : check_permission(&request, None)?;
55 0 : let conf = get_conf(&request);
56 0 : let status = SafekeeperStatus { id: conf.my_id };
57 0 : json_response(StatusCode::OK, status)
58 0 : }
59 :
60 0 : fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
61 0 : request
62 0 : .data::<Arc<SafeKeeperConf>>()
63 0 : .expect("unknown state type")
64 0 : .as_ref()
65 0 : }
66 :
67 : /// Same as TermLsn, but serializes LSN using display serializer
68 : /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
69 0 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
70 : pub struct TermSwitchApiEntry {
71 : pub term: Term,
72 : pub lsn: Lsn,
73 : }
74 :
75 : impl From<TermSwitchApiEntry> for TermLsn {
76 0 : fn from(api_val: TermSwitchApiEntry) -> Self {
77 0 : TermLsn {
78 0 : term: api_val.term,
79 0 : lsn: api_val.lsn,
80 0 : }
81 0 : }
82 : }
83 :
84 : /// Augment AcceptorState with last_log_term for convenience
85 0 : #[derive(Debug, Serialize, Deserialize)]
86 : pub struct AcceptorStateStatus {
87 : pub term: Term,
88 : pub epoch: Term, // aka last_log_term
89 : pub term_history: Vec<TermSwitchApiEntry>,
90 : }
91 :
92 : /// Info about timeline on safekeeper ready for reporting.
93 0 : #[derive(Debug, Serialize, Deserialize)]
94 : pub struct TimelineStatus {
95 : pub tenant_id: TenantId,
96 : pub timeline_id: TimelineId,
97 : pub acceptor_state: AcceptorStateStatus,
98 : pub pg_info: ServerInfo,
99 : pub flush_lsn: Lsn,
100 : pub timeline_start_lsn: Lsn,
101 : pub local_start_lsn: Lsn,
102 : pub commit_lsn: Lsn,
103 : pub backup_lsn: Lsn,
104 : pub peer_horizon_lsn: Lsn,
105 : pub remote_consistent_lsn: Lsn,
106 : pub peers: Vec<PeerInfo>,
107 : pub walsenders: Vec<WalSenderState>,
108 : pub walreceivers: Vec<WalReceiverState>,
109 : }
110 :
111 0 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
112 0 : check_permission_with(request, |claims| {
113 0 : crate::auth::check_permission(claims, tenant_id)
114 0 : })
115 0 : }
116 :
117 : /// Report info about timeline.
118 0 : async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
119 0 : let ttid = TenantTimelineId::new(
120 0 : parse_request_param(&request, "tenant_id")?,
121 0 : parse_request_param(&request, "timeline_id")?,
122 : );
123 0 : check_permission(&request, Some(ttid.tenant_id))?;
124 :
125 0 : let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
126 0 : let (inmem, state) = tli.get_state().await;
127 0 : let flush_lsn = tli.get_flush_lsn().await;
128 :
129 0 : let last_log_term = state.acceptor_state.get_last_log_term(flush_lsn);
130 0 : let term_history = state
131 0 : .acceptor_state
132 0 : .term_history
133 0 : .0
134 0 : .into_iter()
135 0 : .map(|ts| TermSwitchApiEntry {
136 0 : term: ts.term,
137 0 : lsn: ts.lsn,
138 0 : })
139 0 : .collect();
140 0 : let acc_state = AcceptorStateStatus {
141 0 : term: state.acceptor_state.term,
142 0 : epoch: last_log_term,
143 0 : term_history,
144 0 : };
145 0 :
146 0 : let conf = get_conf(&request);
147 : // Note: we report in memory values which can be lost.
148 0 : let status = TimelineStatus {
149 0 : tenant_id: ttid.tenant_id,
150 0 : timeline_id: ttid.timeline_id,
151 0 : acceptor_state: acc_state,
152 0 : pg_info: state.server,
153 0 : flush_lsn,
154 0 : timeline_start_lsn: state.timeline_start_lsn,
155 0 : local_start_lsn: state.local_start_lsn,
156 0 : commit_lsn: inmem.commit_lsn,
157 0 : backup_lsn: inmem.backup_lsn,
158 0 : peer_horizon_lsn: inmem.peer_horizon_lsn,
159 0 : remote_consistent_lsn: inmem.remote_consistent_lsn,
160 0 : peers: tli.get_peers(conf).await,
161 0 : walsenders: tli.get_walsenders().get_all(),
162 0 : walreceivers: tli.get_walreceivers().get_all(),
163 0 : };
164 0 : json_response(StatusCode::OK, status)
165 0 : }
166 :
167 0 : async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
168 0 : let request_data: TimelineCreateRequest = json_request(&mut request).await?;
169 :
170 0 : let ttid = TenantTimelineId {
171 0 : tenant_id: request_data.tenant_id,
172 0 : timeline_id: request_data.timeline_id,
173 0 : };
174 0 : check_permission(&request, Some(ttid.tenant_id))?;
175 :
176 0 : let server_info = ServerInfo {
177 0 : pg_version: request_data.pg_version,
178 0 : system_id: request_data.system_id.unwrap_or(0),
179 0 : wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
180 0 : };
181 0 : let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| {
182 0 : request_data
183 0 : .commit_lsn
184 0 : .segment_lsn(server_info.wal_seg_size as usize)
185 0 : });
186 0 : GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
187 0 : .await
188 0 : .map_err(ApiError::InternalServerError)?;
189 :
190 0 : json_response(StatusCode::OK, ())
191 0 : }
192 :
193 : /// Pull timeline from peer safekeeper instances.
194 0 : async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
195 0 : check_permission(&request, None)?;
196 :
197 0 : let data: pull_timeline::Request = json_request(&mut request).await?;
198 0 : let conf = get_conf(&request);
199 :
200 0 : let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone())
201 0 : .await
202 0 : .map_err(ApiError::InternalServerError)?;
203 0 : json_response(StatusCode::OK, resp)
204 0 : }
205 :
206 : /// Stream tar archive with all timeline data.
207 0 : async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
208 0 : let destination = parse_request_param(&request, "destination_id")?;
209 0 : let ttid = TenantTimelineId::new(
210 0 : parse_request_param(&request, "tenant_id")?,
211 0 : parse_request_param(&request, "timeline_id")?,
212 : );
213 0 : check_permission(&request, Some(ttid.tenant_id))?;
214 :
215 0 : let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
216 : // Note: with evicted timelines it should work better then de-evict them and
217 : // stream; probably start_snapshot would copy partial s3 file to dest path
218 : // and stream control file, or return WalResidentTimeline if timeline is not
219 : // evicted.
220 0 : let tli = tli
221 0 : .wal_residence_guard()
222 0 : .await
223 0 : .map_err(ApiError::InternalServerError)?;
224 :
225 : // To stream the body use wrap_stream which wants Stream of Result<Bytes>,
226 : // so create the chan and write to it in another task.
227 0 : let (tx, rx) = mpsc::channel(1);
228 0 :
229 0 : let conf = get_conf(&request);
230 0 : task::spawn(pull_timeline::stream_snapshot(
231 0 : tli,
232 0 : conf.my_id,
233 0 : destination,
234 0 : tx,
235 0 : ));
236 0 :
237 0 : let rx_stream = ReceiverStream::new(rx);
238 0 : let body = Body::wrap_stream(rx_stream);
239 0 :
240 0 : let response = Response::builder()
241 0 : .status(200)
242 0 : .header(hyper::header::CONTENT_TYPE, "application/octet-stream")
243 0 : .body(body)
244 0 : .unwrap();
245 0 :
246 0 : Ok(response)
247 0 : }
248 :
249 0 : async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
250 0 : check_permission(&request, None)?;
251 :
252 0 : let request_data: TimelineCopyRequest = json_request(&mut request).await?;
253 0 : let ttid = TenantTimelineId::new(
254 0 : parse_request_param(&request, "tenant_id")?,
255 0 : parse_request_param(&request, "source_timeline_id")?,
256 : );
257 :
258 0 : let source = GlobalTimelines::get(ttid)?;
259 :
260 0 : copy_timeline::handle_request(copy_timeline::Request{
261 0 : source,
262 0 : until_lsn: request_data.until_lsn,
263 0 : destination_ttid: TenantTimelineId::new(ttid.tenant_id, request_data.target_timeline_id),
264 0 : })
265 0 : .instrument(info_span!("copy_timeline", from=%ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
266 0 : .await
267 0 : .map_err(ApiError::InternalServerError)?;
268 :
269 0 : json_response(StatusCode::OK, ())
270 0 : }
271 :
272 0 : async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
273 0 : let ttid = TenantTimelineId::new(
274 0 : parse_request_param(&request, "tenant_id")?,
275 0 : parse_request_param(&request, "timeline_id")?,
276 : );
277 0 : check_permission(&request, Some(ttid.tenant_id))?;
278 :
279 0 : let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
280 0 : let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
281 :
282 0 : let request = TimelineDigestRequest {
283 0 : from_lsn: from_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
284 0 : "from_lsn is required"
285 0 : )))?,
286 0 : until_lsn: until_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
287 0 : "until_lsn is required"
288 0 : )))?,
289 : };
290 :
291 0 : let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
292 0 : let tli = tli
293 0 : .wal_residence_guard()
294 0 : .await
295 0 : .map_err(ApiError::InternalServerError)?;
296 :
297 0 : let response = debug_dump::calculate_digest(&tli, request)
298 0 : .await
299 0 : .map_err(ApiError::InternalServerError)?;
300 0 : json_response(StatusCode::OK, response)
301 0 : }
302 :
303 : /// Force persist control file.
304 0 : async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
305 0 : check_permission(&request, None)?;
306 :
307 0 : let ttid = TenantTimelineId::new(
308 0 : parse_request_param(&request, "tenant_id")?,
309 0 : parse_request_param(&request, "timeline_id")?,
310 : );
311 :
312 0 : let tli = GlobalTimelines::get(ttid)?;
313 0 : tli.write_shared_state()
314 0 : .await
315 : .sk
316 0 : .state_mut()
317 0 : .flush()
318 0 : .await
319 0 : .map_err(ApiError::InternalServerError)?;
320 0 : json_response(StatusCode::OK, ())
321 0 : }
322 :
323 : /// Deactivates the timeline and removes its data directory.
324 0 : async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
325 0 : let ttid = TenantTimelineId::new(
326 0 : parse_request_param(&request, "tenant_id")?,
327 0 : parse_request_param(&request, "timeline_id")?,
328 : );
329 0 : let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
330 0 : check_permission(&request, Some(ttid.tenant_id))?;
331 0 : ensure_no_body(&mut request).await?;
332 : // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
333 : // error handling here when we're able to.
334 0 : let resp = GlobalTimelines::delete(&ttid, only_local)
335 0 : .await
336 0 : .map_err(ApiError::InternalServerError)?;
337 0 : json_response(StatusCode::OK, resp)
338 0 : }
339 :
340 : /// Deactivates all timelines for the tenant and removes its data directory.
341 : /// See `timeline_delete_handler`.
342 0 : async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
343 0 : let tenant_id = parse_request_param(&request, "tenant_id")?;
344 0 : let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
345 0 : check_permission(&request, Some(tenant_id))?;
346 0 : ensure_no_body(&mut request).await?;
347 : // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
348 : // Using an `InternalServerError` should be fixed when the types support it
349 0 : let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local)
350 0 : .await
351 0 : .map_err(ApiError::InternalServerError)?;
352 0 : json_response(
353 0 : StatusCode::OK,
354 0 : delete_info
355 0 : .iter()
356 0 : .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
357 0 : .collect::<HashMap<String, TimelineDeleteForceResult>>(),
358 0 : )
359 0 : }
360 :
361 : /// Used only in tests to hand craft required data.
362 0 : async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
363 0 : let ttid = TenantTimelineId::new(
364 0 : parse_request_param(&request, "tenant_id")?,
365 0 : parse_request_param(&request, "timeline_id")?,
366 : );
367 0 : check_permission(&request, Some(ttid.tenant_id))?;
368 0 : let sk_info: SkTimelineInfo = json_request(&mut request).await?;
369 0 : let proto_sk_info = SafekeeperTimelineInfo {
370 0 : safekeeper_id: 0,
371 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
372 0 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
373 0 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
374 0 : }),
375 0 : term: sk_info.term.unwrap_or(0),
376 0 : last_log_term: sk_info.last_log_term.unwrap_or(0),
377 0 : flush_lsn: sk_info.flush_lsn.0,
378 0 : commit_lsn: sk_info.commit_lsn.0,
379 0 : remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
380 0 : peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
381 0 : safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
382 0 : http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
383 0 : backup_lsn: sk_info.backup_lsn.0,
384 0 : local_start_lsn: sk_info.local_start_lsn.0,
385 0 : availability_zone: None,
386 0 : standby_horizon: sk_info.standby_horizon.0,
387 0 : };
388 :
389 0 : let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
390 0 : tli.record_safekeeper_info(proto_sk_info)
391 0 : .await
392 0 : .map_err(ApiError::InternalServerError)?;
393 :
394 0 : json_response(StatusCode::OK, ())
395 0 : }
396 :
397 0 : fn parse_kv_str<E: fmt::Display, T: FromStr<Err = E>>(k: &str, v: &str) -> Result<T, ApiError> {
398 0 : v.parse()
399 0 : .map_err(|e| ApiError::BadRequest(anyhow::anyhow!("cannot parse {k}: {e}")))
400 0 : }
401 :
402 : /// Dump debug info about all available safekeeper state.
403 0 : async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
404 0 : check_permission(&request, None)?;
405 0 : ensure_no_body(&mut request).await?;
406 :
407 0 : let mut dump_all: Option<bool> = None;
408 0 : let mut dump_control_file: Option<bool> = None;
409 0 : let mut dump_memory: Option<bool> = None;
410 0 : let mut dump_disk_content: Option<bool> = None;
411 0 : let mut dump_term_history: Option<bool> = None;
412 0 : let mut tenant_id: Option<TenantId> = None;
413 0 : let mut timeline_id: Option<TimelineId> = None;
414 0 :
415 0 : let query = request.uri().query().unwrap_or("");
416 0 : let mut values = url::form_urlencoded::parse(query.as_bytes());
417 :
418 0 : for (k, v) in &mut values {
419 0 : match k.as_ref() {
420 0 : "dump_all" => dump_all = Some(parse_kv_str(&k, &v)?),
421 0 : "dump_control_file" => dump_control_file = Some(parse_kv_str(&k, &v)?),
422 0 : "dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
423 0 : "dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
424 0 : "dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
425 0 : "tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
426 0 : "timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
427 0 : _ => Err(ApiError::BadRequest(anyhow::anyhow!(
428 0 : "Unknown query parameter: {}",
429 0 : k
430 0 : )))?,
431 : }
432 : }
433 :
434 0 : let dump_all = dump_all.unwrap_or(false);
435 0 : let dump_control_file = dump_control_file.unwrap_or(dump_all);
436 0 : let dump_memory = dump_memory.unwrap_or(dump_all);
437 0 : let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
438 0 : let dump_term_history = dump_term_history.unwrap_or(true);
439 0 :
440 0 : let args = debug_dump::Args {
441 0 : dump_all,
442 0 : dump_control_file,
443 0 : dump_memory,
444 0 : dump_disk_content,
445 0 : dump_term_history,
446 0 : tenant_id,
447 0 : timeline_id,
448 0 : };
449 :
450 0 : let resp = debug_dump::build(args)
451 0 : .await
452 0 : .map_err(ApiError::InternalServerError)?;
453 :
454 0 : let started_at = std::time::Instant::now();
455 0 :
456 0 : let (tx, rx) = mpsc::channel(1);
457 0 :
458 0 : let body = Body::wrap_stream(ReceiverStream::new(rx));
459 0 :
460 0 : let mut writer = ChannelWriter::new(128 * 1024, tx);
461 0 :
462 0 : let response = Response::builder()
463 0 : .status(200)
464 0 : .header(hyper::header::CONTENT_TYPE, "application/octet-stream")
465 0 : .body(body)
466 0 : .unwrap();
467 :
468 0 : let span = info_span!("blocking");
469 0 : tokio::task::spawn_blocking(move || {
470 0 : let _span = span.entered();
471 0 :
472 0 : let res = serde_json::to_writer(&mut writer, &resp)
473 0 : .map_err(std::io::Error::from)
474 0 : .and_then(|_| writer.flush());
475 0 :
476 0 : match res {
477 : Ok(()) => {
478 0 : tracing::info!(
479 0 : bytes = writer.flushed_bytes(),
480 0 : elapsed_ms = started_at.elapsed().as_millis(),
481 0 : "responded /v1/debug_dump"
482 : );
483 : }
484 0 : Err(e) => {
485 0 : tracing::warn!("failed to write out /v1/debug_dump response: {e:#}");
486 : // semantics of this error are quite... unclear. we want to error the stream out to
487 : // abort the response to somehow notify the client that we failed.
488 : //
489 : // though, most likely the reason for failure is that the receiver is already gone.
490 0 : drop(
491 0 : writer
492 0 : .tx
493 0 : .blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
494 0 : );
495 : }
496 : }
497 0 : });
498 0 :
499 0 : Ok(response)
500 0 : }
501 :
502 0 : async fn patch_control_file_handler(
503 0 : mut request: Request<Body>,
504 0 : ) -> Result<Response<Body>, ApiError> {
505 0 : check_permission(&request, None)?;
506 :
507 0 : let ttid = TenantTimelineId::new(
508 0 : parse_request_param(&request, "tenant_id")?,
509 0 : parse_request_param(&request, "timeline_id")?,
510 : );
511 :
512 0 : let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
513 :
514 0 : let patch_request: patch_control_file::Request = json_request(&mut request).await?;
515 0 : let response = patch_control_file::handle_request(tli, patch_request)
516 0 : .await
517 0 : .map_err(ApiError::InternalServerError)?;
518 :
519 0 : json_response(StatusCode::OK, response)
520 0 : }
521 :
522 : /// Safekeeper http router.
523 0 : pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
524 0 : let mut router = endpoint::make_router();
525 0 : if conf.http_auth.is_some() {
526 0 : router = router.middleware(auth_middleware(|request| {
527 0 : #[allow(clippy::mutable_key_type)]
528 0 : static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
529 0 : ["/v1/status", "/metrics"]
530 0 : .iter()
531 0 : .map(|v| v.parse().unwrap())
532 0 : .collect()
533 0 : });
534 0 : if ALLOWLIST_ROUTES.contains(request.uri()) {
535 0 : None
536 : } else {
537 : // Option<Arc<SwappableJwtAuth>> is always provided as data below, hence unwrap().
538 0 : request
539 0 : .data::<Option<Arc<SwappableJwtAuth>>>()
540 0 : .unwrap()
541 0 : .as_deref()
542 : }
543 0 : }))
544 0 : }
545 :
546 : // NB: on any changes do not forget to update the OpenAPI spec
547 : // located nearby (/safekeeper/src/http/openapi_spec.yaml).
548 0 : let auth = conf.http_auth.clone();
549 0 : router
550 0 : .data(Arc::new(conf))
551 0 : .data(auth)
552 0 : .get("/metrics", |r| request_span(r, prometheus_metrics_handler))
553 0 : .get("/v1/status", |r| request_span(r, status_handler))
554 0 : .put("/v1/failpoints", |r| {
555 0 : request_span(r, move |r| async {
556 0 : check_permission(&r, None)?;
557 0 : let cancel = CancellationToken::new();
558 0 : failpoints_handler(r, cancel).await
559 0 : })
560 0 : })
561 0 : // Will be used in the future instead of implicit timeline creation
562 0 : .post("/v1/tenant/timeline", |r| {
563 0 : request_span(r, timeline_create_handler)
564 0 : })
565 0 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
566 0 : request_span(r, timeline_status_handler)
567 0 : })
568 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
569 0 : request_span(r, timeline_delete_handler)
570 0 : })
571 0 : .delete("/v1/tenant/:tenant_id", |r| {
572 0 : request_span(r, tenant_delete_handler)
573 0 : })
574 0 : .get(
575 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
576 0 : |r| request_span(r, timeline_snapshot_handler),
577 0 : )
578 0 : .post("/v1/pull_timeline", |r| {
579 0 : request_span(r, timeline_pull_handler)
580 0 : })
581 0 : .post(
582 0 : "/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
583 0 : |r| request_span(r, timeline_copy_handler),
584 0 : )
585 0 : .patch(
586 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/control_file",
587 0 : |r| request_span(r, patch_control_file_handler),
588 0 : )
589 0 : .post(
590 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
591 0 : |r| request_span(r, timeline_checkpoint_handler),
592 0 : )
593 0 : // for tests
594 0 : .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
595 0 : request_span(r, record_safekeeper_info)
596 0 : })
597 0 : .get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
598 0 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
599 0 : request_span(r, timeline_digest_handler)
600 0 : })
601 0 : }
602 :
603 : #[cfg(test)]
604 : mod tests {
605 : use super::*;
606 :
607 : #[test]
608 2 : fn test_term_switch_entry_api_serialize() {
609 2 : let state = AcceptorStateStatus {
610 2 : term: 1,
611 2 : epoch: 1,
612 2 : term_history: vec![TermSwitchApiEntry {
613 2 : term: 1,
614 2 : lsn: Lsn(0x16FFDDDD),
615 2 : }],
616 2 : };
617 2 : let json = serde_json::to_string(&state).unwrap();
618 2 : assert_eq!(
619 2 : json,
620 2 : "{\"term\":1,\"epoch\":1,\"term_history\":[{\"term\":1,\"lsn\":\"0/16FFDDDD\"}]}"
621 2 : );
622 2 : }
623 : }
|