Line data Source code
1 : use hyper::{Body, Request, Response, StatusCode, Uri};
2 :
3 : use once_cell::sync::Lazy;
4 : use postgres_ffi::WAL_SEGMENT_SIZE;
5 : use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
6 : use serde::{Deserialize, Serialize};
7 : use std::collections::{HashMap, HashSet};
8 : use std::fmt;
9 : use std::str::FromStr;
10 : use std::sync::Arc;
11 : use storage_broker::proto::SafekeeperTimelineInfo;
12 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
13 : use tokio::fs::File;
14 : use tokio::io::AsyncReadExt;
15 : use tokio_util::sync::CancellationToken;
16 : use utils::failpoint_support::failpoints_handler;
17 : use utils::http::request::parse_query_param;
18 :
19 : use std::io::Write as _;
20 : use tokio::sync::mpsc;
21 : use tokio_stream::wrappers::ReceiverStream;
22 : use tracing::{info_span, Instrument};
23 : use utils::http::endpoint::{request_span, ChannelWriter};
24 :
25 : use crate::debug_dump::TimelineDigestRequest;
26 : use crate::receive_wal::WalReceiverState;
27 : use crate::safekeeper::Term;
28 : use crate::safekeeper::{ServerInfo, TermLsn};
29 : use crate::send_wal::WalSenderState;
30 : use crate::timeline::PeerInfo;
31 : use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline};
32 :
33 : use crate::timelines_global_map::TimelineDeleteForceResult;
34 : use crate::GlobalTimelines;
35 : use crate::SafeKeeperConf;
36 : use utils::{
37 : auth::SwappableJwtAuth,
38 : http::{
39 : endpoint::{self, auth_middleware, check_permission_with},
40 : error::ApiError,
41 : json::{json_request, json_response},
42 : request::{ensure_no_body, parse_request_param},
43 : RequestExt, RouterBuilder,
44 : },
45 : id::{NodeId, TenantId, TenantTimelineId, TimelineId},
46 : lsn::Lsn,
47 : };
48 :
49 : use super::models::TimelineCreateRequest;
50 :
51 1023 : #[derive(Debug, Serialize)]
52 : struct SafekeeperStatus {
53 : id: NodeId,
54 : }
55 :
56 : /// Healthcheck handler.
57 1023 : async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
58 1023 : check_permission(&request, None)?;
59 1023 : let conf = get_conf(&request);
60 1023 : let status = SafekeeperStatus { id: conf.my_id };
61 1023 : json_response(StatusCode::OK, status)
62 1023 : }
63 :
64 1275 : fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
65 1275 : request
66 1275 : .data::<Arc<SafeKeeperConf>>()
67 1275 : .expect("unknown state type")
68 1275 : .as_ref()
69 1275 : }
70 :
71 : /// Same as TermLsn, but serializes LSN using display serializer
72 : /// in Postgres format, i.e. 0/FFFFFFFF. Used only for the API response.
73 630 : #[derive(Debug, Clone, Copy, Serialize, Deserialize)]
74 : pub struct TermSwitchApiEntry {
75 : pub term: Term,
76 : pub lsn: Lsn,
77 : }
78 :
79 : impl From<TermSwitchApiEntry> for TermLsn {
80 2 : fn from(api_val: TermSwitchApiEntry) -> Self {
81 2 : TermLsn {
82 2 : term: api_val.term,
83 2 : lsn: api_val.lsn,
84 2 : }
85 2 : }
86 : }
87 :
88 : /// Augment AcceptorState with epoch for convenience
89 254 : #[derive(Debug, Serialize, Deserialize)]
90 : pub struct AcceptorStateStatus {
91 : pub term: Term,
92 : pub epoch: Term,
93 : pub term_history: Vec<TermSwitchApiEntry>,
94 : }
95 :
96 : /// Info about timeline on safekeeper ready for reporting.
97 252 : #[derive(Debug, Serialize, Deserialize)]
98 : pub struct TimelineStatus {
99 : pub tenant_id: TenantId,
100 : pub timeline_id: TimelineId,
101 : pub acceptor_state: AcceptorStateStatus,
102 : pub pg_info: ServerInfo,
103 : pub flush_lsn: Lsn,
104 : pub timeline_start_lsn: Lsn,
105 : pub local_start_lsn: Lsn,
106 : pub commit_lsn: Lsn,
107 : pub backup_lsn: Lsn,
108 : pub peer_horizon_lsn: Lsn,
109 : pub remote_consistent_lsn: Lsn,
110 : pub peers: Vec<PeerInfo>,
111 : pub walsenders: Vec<WalSenderState>,
112 : pub walreceivers: Vec<WalReceiverState>,
113 : }
114 :
115 1426 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
116 1426 : check_permission_with(request, |claims| {
117 29 : crate::auth::check_permission(claims, tenant_id)
118 1426 : })
119 1426 : }
120 :
121 : /// Report info about timeline.
122 253 : async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
123 253 : let ttid = TenantTimelineId::new(
124 253 : parse_request_param(&request, "tenant_id")?,
125 253 : parse_request_param(&request, "timeline_id")?,
126 : );
127 253 : check_permission(&request, Some(ttid.tenant_id))?;
128 :
129 252 : let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
130 252 : let (inmem, state) = tli.get_state().await;
131 252 : let flush_lsn = tli.get_flush_lsn().await;
132 :
133 252 : let epoch = state.acceptor_state.get_epoch(flush_lsn);
134 252 : let term_history = state
135 252 : .acceptor_state
136 252 : .term_history
137 252 : .0
138 252 : .into_iter()
139 628 : .map(|ts| TermSwitchApiEntry {
140 628 : term: ts.term,
141 628 : lsn: ts.lsn,
142 628 : })
143 252 : .collect();
144 252 : let acc_state = AcceptorStateStatus {
145 252 : term: state.acceptor_state.term,
146 252 : epoch,
147 252 : term_history,
148 252 : };
149 252 :
150 252 : let conf = get_conf(&request);
151 : // Note: we report in memory values which can be lost.
152 252 : let status = TimelineStatus {
153 252 : tenant_id: ttid.tenant_id,
154 252 : timeline_id: ttid.timeline_id,
155 252 : acceptor_state: acc_state,
156 252 : pg_info: state.server,
157 252 : flush_lsn,
158 252 : timeline_start_lsn: state.timeline_start_lsn,
159 252 : local_start_lsn: state.local_start_lsn,
160 252 : commit_lsn: inmem.commit_lsn,
161 252 : backup_lsn: inmem.backup_lsn,
162 252 : peer_horizon_lsn: inmem.peer_horizon_lsn,
163 252 : remote_consistent_lsn: inmem.remote_consistent_lsn,
164 252 : peers: tli.get_peers(conf).await,
165 252 : walsenders: tli.get_walsenders().get_all(),
166 252 : walreceivers: tli.get_walreceivers().get_all(),
167 252 : };
168 252 : json_response(StatusCode::OK, status)
169 253 : }
170 :
171 3 : async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
172 3 : let request_data: TimelineCreateRequest = json_request(&mut request).await?;
173 :
174 3 : let ttid = TenantTimelineId {
175 3 : tenant_id: request_data.tenant_id,
176 3 : timeline_id: request_data.timeline_id,
177 3 : };
178 3 : check_permission(&request, Some(ttid.tenant_id))?;
179 :
180 3 : let server_info = ServerInfo {
181 3 : pg_version: request_data.pg_version,
182 3 : system_id: request_data.system_id.unwrap_or(0),
183 3 : wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
184 3 : };
185 3 : let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| {
186 3 : request_data
187 3 : .commit_lsn
188 3 : .segment_lsn(server_info.wal_seg_size as usize)
189 3 : });
190 3 : GlobalTimelines::create(ttid, server_info, request_data.commit_lsn, local_start_lsn)
191 15 : .await
192 3 : .map_err(ApiError::InternalServerError)?;
193 :
194 3 : json_response(StatusCode::OK, ())
195 3 : }
196 :
197 : /// Pull timeline from peer safekeeper instances.
198 1 : async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
199 1 : check_permission(&request, None)?;
200 :
201 1 : let data: pull_timeline::Request = json_request(&mut request).await?;
202 :
203 1 : let resp = pull_timeline::handle_request(data)
204 111 : .await
205 1 : .map_err(ApiError::InternalServerError)?;
206 1 : json_response(StatusCode::OK, resp)
207 1 : }
208 :
209 48 : async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
210 48 : check_permission(&request, None)?;
211 :
212 48 : let request_data: TimelineCopyRequest = json_request(&mut request).await?;
213 48 : let ttid = TenantTimelineId::new(
214 48 : parse_request_param(&request, "tenant_id")?,
215 48 : parse_request_param(&request, "source_timeline_id")?,
216 : );
217 :
218 48 : let source = GlobalTimelines::get(ttid)?;
219 :
220 48 : copy_timeline::handle_request(copy_timeline::Request{
221 48 : source,
222 48 : until_lsn: request_data.until_lsn,
223 48 : destination_ttid: TenantTimelineId::new(ttid.tenant_id, request_data.target_timeline_id),
224 48 : })
225 48 : .instrument(info_span!("copy_timeline", from=%ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
226 2683 : .await
227 48 : .map_err(ApiError::InternalServerError)?;
228 :
229 48 : json_response(StatusCode::OK, ())
230 48 : }
231 :
232 64 : async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
233 64 : let ttid = TenantTimelineId::new(
234 64 : parse_request_param(&request, "tenant_id")?,
235 64 : parse_request_param(&request, "timeline_id")?,
236 : );
237 64 : check_permission(&request, Some(ttid.tenant_id))?;
238 :
239 64 : let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
240 64 : let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
241 :
242 64 : let request = TimelineDigestRequest {
243 64 : from_lsn: from_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
244 64 : "from_lsn is required"
245 64 : )))?,
246 64 : until_lsn: until_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
247 64 : "until_lsn is required"
248 64 : )))?,
249 : };
250 :
251 64 : let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
252 :
253 64 : let response = debug_dump::calculate_digest(&tli, request)
254 3288 : .await
255 64 : .map_err(ApiError::InternalServerError)?;
256 64 : json_response(StatusCode::OK, response)
257 64 : }
258 :
259 : /// Download a file from the timeline directory.
260 : // TODO: figure out a better way to copy files between safekeepers
261 3 : async fn timeline_files_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
262 3 : let ttid = TenantTimelineId::new(
263 3 : parse_request_param(&request, "tenant_id")?,
264 3 : parse_request_param(&request, "timeline_id")?,
265 : );
266 3 : check_permission(&request, Some(ttid.tenant_id))?;
267 :
268 3 : let filename: String = parse_request_param(&request, "filename")?;
269 :
270 3 : let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
271 :
272 3 : let filepath = tli.timeline_dir.join(filename);
273 3 : let mut file = File::open(&filepath)
274 3 : .await
275 3 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
276 :
277 3 : let mut content = Vec::new();
278 3 : // TODO: don't store files in memory
279 3 : file.read_to_end(&mut content)
280 55 : .await
281 3 : .map_err(|e| ApiError::InternalServerError(e.into()))?;
282 :
283 3 : Response::builder()
284 3 : .status(StatusCode::OK)
285 3 : .header("Content-Type", "application/octet-stream")
286 3 : .body(Body::from(content))
287 3 : .map_err(|e| ApiError::InternalServerError(e.into()))
288 3 : }
289 :
290 : /// Deactivates the timeline and removes its data directory.
291 15 : async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
292 15 : let ttid = TenantTimelineId::new(
293 15 : parse_request_param(&request, "tenant_id")?,
294 15 : parse_request_param(&request, "timeline_id")?,
295 : );
296 15 : let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
297 15 : check_permission(&request, Some(ttid.tenant_id))?;
298 14 : ensure_no_body(&mut request).await?;
299 : // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
300 : // error handling here when we're able to.
301 14 : let resp = GlobalTimelines::delete(&ttid, only_local)
302 30 : .await
303 14 : .map_err(ApiError::InternalServerError)?;
304 14 : json_response(StatusCode::OK, resp)
305 15 : }
306 :
307 : /// Deactivates all timelines for the tenant and removes its data directory.
308 : /// See `timeline_delete_handler`.
309 5 : async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
310 5 : let tenant_id = parse_request_param(&request, "tenant_id")?;
311 5 : let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
312 5 : check_permission(&request, Some(tenant_id))?;
313 4 : ensure_no_body(&mut request).await?;
314 : // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
315 : // Using an `InternalServerError` should be fixed when the types support it
316 4 : let delete_info = GlobalTimelines::delete_force_all_for_tenant(&tenant_id, only_local)
317 16 : .await
318 4 : .map_err(ApiError::InternalServerError)?;
319 4 : json_response(
320 4 : StatusCode::OK,
321 4 : delete_info
322 4 : .iter()
323 16 : .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
324 4 : .collect::<HashMap<String, TimelineDeleteForceResult>>(),
325 4 : )
326 5 : }
327 :
328 : /// Used only in tests to hand craft required data.
329 4 : async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
330 4 : let ttid = TenantTimelineId::new(
331 4 : parse_request_param(&request, "tenant_id")?,
332 4 : parse_request_param(&request, "timeline_id")?,
333 : );
334 4 : check_permission(&request, Some(ttid.tenant_id))?;
335 3 : let sk_info: SkTimelineInfo = json_request(&mut request).await?;
336 3 : let proto_sk_info = SafekeeperTimelineInfo {
337 3 : safekeeper_id: 0,
338 3 : tenant_timeline_id: Some(ProtoTenantTimelineId {
339 3 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
340 3 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
341 3 : }),
342 3 : term: sk_info.term.unwrap_or(0),
343 3 : last_log_term: sk_info.last_log_term.unwrap_or(0),
344 3 : flush_lsn: sk_info.flush_lsn.0,
345 3 : commit_lsn: sk_info.commit_lsn.0,
346 3 : remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
347 3 : peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
348 3 : safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
349 3 : http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
350 3 : backup_lsn: sk_info.backup_lsn.0,
351 3 : local_start_lsn: sk_info.local_start_lsn.0,
352 3 : availability_zone: None,
353 3 : standby_horizon: sk_info.standby_horizon.0,
354 3 : };
355 :
356 3 : let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
357 3 : tli.record_safekeeper_info(proto_sk_info)
358 9 : .await
359 3 : .map_err(ApiError::InternalServerError)?;
360 :
361 3 : json_response(StatusCode::OK, ())
362 4 : }
363 :
364 9 : fn parse_kv_str<E: fmt::Display, T: FromStr<Err = E>>(k: &str, v: &str) -> Result<T, ApiError> {
365 9 : v.parse()
366 9 : .map_err(|e| ApiError::BadRequest(anyhow::anyhow!("cannot parse {k}: {e}")))
367 9 : }
368 :
369 : /// Dump debug info about all available safekeeper state.
370 6 : async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
371 6 : check_permission(&request, None)?;
372 6 : ensure_no_body(&mut request).await?;
373 :
374 6 : let mut dump_all: Option<bool> = None;
375 6 : let mut dump_control_file: Option<bool> = None;
376 6 : let mut dump_memory: Option<bool> = None;
377 6 : let mut dump_disk_content: Option<bool> = None;
378 6 : let mut dump_term_history: Option<bool> = None;
379 6 : let mut tenant_id: Option<TenantId> = None;
380 6 : let mut timeline_id: Option<TimelineId> = None;
381 6 :
382 6 : let query = request.uri().query().unwrap_or("");
383 6 : let mut values = url::form_urlencoded::parse(query.as_bytes());
384 :
385 15 : for (k, v) in &mut values {
386 9 : match k.as_ref() {
387 9 : "dump_all" => dump_all = Some(parse_kv_str(&k, &v)?),
388 4 : "dump_control_file" => dump_control_file = Some(parse_kv_str(&k, &v)?),
389 3 : "dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
390 3 : "dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
391 3 : "dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
392 3 : "tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
393 2 : "timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
394 0 : _ => Err(ApiError::BadRequest(anyhow::anyhow!(
395 0 : "Unknown query parameter: {}",
396 0 : k
397 0 : )))?,
398 : }
399 : }
400 :
401 6 : let dump_all = dump_all.unwrap_or(false);
402 6 : let dump_control_file = dump_control_file.unwrap_or(dump_all);
403 6 : let dump_memory = dump_memory.unwrap_or(dump_all);
404 6 : let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
405 6 : let dump_term_history = dump_term_history.unwrap_or(true);
406 6 :
407 6 : let args = debug_dump::Args {
408 6 : dump_all,
409 6 : dump_control_file,
410 6 : dump_memory,
411 6 : dump_disk_content,
412 6 : dump_term_history,
413 6 : tenant_id,
414 6 : timeline_id,
415 6 : };
416 :
417 6 : let resp = debug_dump::build(args)
418 0 : .await
419 6 : .map_err(ApiError::InternalServerError)?;
420 :
421 6 : let started_at = std::time::Instant::now();
422 6 :
423 6 : let (tx, rx) = mpsc::channel(1);
424 6 :
425 6 : let body = Body::wrap_stream(ReceiverStream::new(rx));
426 6 :
427 6 : let mut writer = ChannelWriter::new(128 * 1024, tx);
428 6 :
429 6 : let response = Response::builder()
430 6 : .status(200)
431 6 : .header(hyper::header::CONTENT_TYPE, "application/octet-stream")
432 6 : .body(body)
433 6 : .unwrap();
434 :
435 6 : let span = info_span!("blocking");
436 6 : tokio::task::spawn_blocking(move || {
437 6 : let _span = span.entered();
438 6 :
439 6 : let res = serde_json::to_writer(&mut writer, &resp)
440 6 : .map_err(std::io::Error::from)
441 6 : .and_then(|_| writer.flush());
442 6 :
443 6 : match res {
444 : Ok(()) => {
445 6 : tracing::info!(
446 6 : bytes = writer.flushed_bytes(),
447 6 : elapsed_ms = started_at.elapsed().as_millis(),
448 6 : "responded /v1/debug_dump"
449 6 : );
450 : }
451 0 : Err(e) => {
452 0 : tracing::warn!("failed to write out /v1/debug_dump response: {e:#}");
453 : // semantics of this error are quite... unclear. we want to error the stream out to
454 : // abort the response to somehow notify the client that we failed.
455 : //
456 : // though, most likely the reason for failure is that the receiver is already gone.
457 0 : drop(
458 0 : writer
459 0 : .tx
460 0 : .blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
461 0 : );
462 : }
463 : }
464 6 : });
465 6 :
466 6 : Ok(response)
467 6 : }
468 :
469 1 : async fn patch_control_file_handler(
470 1 : mut request: Request<Body>,
471 1 : ) -> Result<Response<Body>, ApiError> {
472 1 : check_permission(&request, None)?;
473 :
474 1 : let ttid = TenantTimelineId::new(
475 1 : parse_request_param(&request, "tenant_id")?,
476 1 : parse_request_param(&request, "timeline_id")?,
477 : );
478 :
479 1 : let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
480 :
481 1 : let patch_request: patch_control_file::Request = json_request(&mut request).await?;
482 1 : let response = patch_control_file::handle_request(tli, patch_request)
483 3 : .await
484 1 : .map_err(ApiError::InternalServerError)?;
485 :
486 1 : json_response(StatusCode::OK, response)
487 1 : }
488 :
489 : /// Safekeeper http router.
490 510 : pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError> {
491 510 : let mut router = endpoint::make_router();
492 510 : if conf.http_auth.is_some() {
493 78 : router = router.middleware(auth_middleware(|request| {
494 78 : #[allow(clippy::mutable_key_type)]
495 78 : static ALLOWLIST_ROUTES: Lazy<HashSet<Uri>> = Lazy::new(|| {
496 20 : ["/v1/status", "/metrics"]
497 20 : .iter()
498 40 : .map(|v| v.parse().unwrap())
499 20 : .collect()
500 20 : });
501 78 : if ALLOWLIST_ROUTES.contains(request.uri()) {
502 44 : None
503 : } else {
504 : // Option<Arc<SwappableJwtAuth>> is always provided as data below, hence unwrap().
505 34 : request
506 34 : .data::<Option<Arc<SwappableJwtAuth>>>()
507 34 : .unwrap()
508 34 : .as_deref()
509 : }
510 78 : }))
511 490 : }
512 :
513 : // NB: on any changes do not forget to update the OpenAPI spec
514 : // located nearby (/safekeeper/src/http/openapi_spec.yaml).
515 510 : let auth = conf.http_auth.clone();
516 510 : router
517 510 : .data(Arc::new(conf))
518 510 : .data(auth)
519 1023 : .get("/v1/status", |r| request_span(r, status_handler))
520 510 : .put("/v1/failpoints", |r| {
521 1 : request_span(r, move |r| async {
522 1 : let cancel = CancellationToken::new();
523 1 : failpoints_handler(r, cancel).await
524 1 : })
525 510 : })
526 510 : // Will be used in the future instead of implicit timeline creation
527 510 : .post("/v1/tenant/timeline", |r| {
528 3 : request_span(r, timeline_create_handler)
529 510 : })
530 510 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
531 253 : request_span(r, timeline_status_handler)
532 510 : })
533 510 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
534 15 : request_span(r, timeline_delete_handler)
535 510 : })
536 510 : .delete("/v1/tenant/:tenant_id", |r| {
537 5 : request_span(r, tenant_delete_handler)
538 510 : })
539 510 : .post("/v1/pull_timeline", |r| {
540 1 : request_span(r, timeline_pull_handler)
541 510 : })
542 510 : .get(
543 510 : "/v1/tenant/:tenant_id/timeline/:timeline_id/file/:filename",
544 510 : |r| request_span(r, timeline_files_handler),
545 510 : )
546 510 : .post(
547 510 : "/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
548 510 : |r| request_span(r, timeline_copy_handler),
549 510 : )
550 510 : .patch(
551 510 : "/v1/tenant/:tenant_id/timeline/:timeline_id/control_file",
552 510 : |r| request_span(r, patch_control_file_handler),
553 510 : )
554 510 : // for tests
555 510 : .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
556 4 : request_span(r, record_safekeeper_info)
557 510 : })
558 510 : .get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
559 510 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
560 64 : request_span(r, timeline_digest_handler)
561 510 : })
562 510 : }
563 :
564 : #[cfg(test)]
565 : mod tests {
566 : use super::*;
567 :
568 2 : #[test]
569 2 : fn test_term_switch_entry_api_serialize() {
570 2 : let state = AcceptorStateStatus {
571 2 : term: 1,
572 2 : epoch: 1,
573 2 : term_history: vec![TermSwitchApiEntry {
574 2 : term: 1,
575 2 : lsn: Lsn(0x16FFDDDD),
576 2 : }],
577 2 : };
578 2 : let json = serde_json::to_string(&state).unwrap();
579 2 : assert_eq!(
580 2 : json,
581 2 : "{\"term\":1,\"epoch\":1,\"term_history\":[{\"term\":1,\"lsn\":\"0/16FFDDDD\"}]}"
582 2 : );
583 2 : }
584 : }
|