Line data Source code
1 : use http_utils::failpoints::failpoints_handler;
2 : use hyper::{Body, Request, Response, StatusCode};
3 : use safekeeper_api::models;
4 : use safekeeper_api::models::AcceptorStateStatus;
5 : use safekeeper_api::models::PullTimelineRequest;
6 : use safekeeper_api::models::SafekeeperStatus;
7 : use safekeeper_api::models::TermSwitchApiEntry;
8 : use safekeeper_api::models::TimelineStatus;
9 : use safekeeper_api::ServerInfo;
10 : use std::collections::HashMap;
11 : use std::fmt;
12 : use std::io::Write as _;
13 : use std::str::FromStr;
14 : use std::sync::Arc;
15 : use storage_broker::proto::SafekeeperTimelineInfo;
16 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
17 : use tokio::sync::mpsc;
18 : use tokio::task;
19 : use tokio_stream::wrappers::ReceiverStream;
20 : use tokio_util::sync::CancellationToken;
21 : use tracing::{info_span, Instrument};
22 :
23 : use http_utils::endpoint::{
24 : profile_cpu_handler, profile_heap_handler, prometheus_metrics_handler, request_span,
25 : };
26 : use http_utils::{
27 : endpoint::{self, auth_middleware, check_permission_with, ChannelWriter},
28 : error::ApiError,
29 : json::{json_request, json_response},
30 : request::{ensure_no_body, parse_query_param, parse_request_param},
31 : RequestExt, RouterBuilder,
32 : };
33 :
34 : use postgres_ffi::WAL_SEGMENT_SIZE;
35 : use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
36 : use safekeeper_api::models::{TimelineCreateRequest, TimelineTermBumpRequest};
37 : use utils::{
38 : auth::SwappableJwtAuth,
39 : id::{TenantId, TenantTimelineId, TimelineId},
40 : lsn::Lsn,
41 : };
42 :
43 : use crate::debug_dump::TimelineDigestRequest;
44 : use crate::safekeeper::TermLsn;
45 : use crate::timelines_global_map::TimelineDeleteForceResult;
46 : use crate::GlobalTimelines;
47 : use crate::SafeKeeperConf;
48 : use crate::{copy_timeline, debug_dump, patch_control_file, pull_timeline};
49 :
50 : /// Healthcheck handler.
51 0 : async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
52 0 : check_permission(&request, None)?;
53 0 : let conf = get_conf(&request);
54 0 : let status = SafekeeperStatus { id: conf.my_id };
55 0 : json_response(StatusCode::OK, status)
56 0 : }
57 :
58 0 : fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
59 0 : request
60 0 : .data::<Arc<SafeKeeperConf>>()
61 0 : .expect("unknown state type")
62 0 : .as_ref()
63 0 : }
64 :
65 0 : fn get_global_timelines(request: &Request<Body>) -> Arc<GlobalTimelines> {
66 0 : request
67 0 : .data::<Arc<GlobalTimelines>>()
68 0 : .expect("unknown state type")
69 0 : .clone()
70 0 : }
71 :
72 0 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
73 0 : check_permission_with(request, |claims| {
74 0 : crate::auth::check_permission(claims, tenant_id)
75 0 : })
76 0 : }
77 :
78 : /// Deactivates all timelines for the tenant and removes its data directory.
79 : /// See `timeline_delete_handler`.
80 0 : async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
81 0 : let tenant_id = parse_request_param(&request, "tenant_id")?;
82 0 : let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
83 0 : check_permission(&request, Some(tenant_id))?;
84 0 : ensure_no_body(&mut request).await?;
85 0 : let global_timelines = get_global_timelines(&request);
86 : // FIXME: `delete_force_all_for_tenant` can return an error for multiple different reasons;
87 : // Using an `InternalServerError` should be fixed when the types support it
88 0 : let delete_info = global_timelines
89 0 : .delete_force_all_for_tenant(&tenant_id, only_local)
90 0 : .await
91 0 : .map_err(ApiError::InternalServerError)?;
92 0 : json_response(
93 0 : StatusCode::OK,
94 0 : delete_info
95 0 : .iter()
96 0 : .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
97 0 : .collect::<HashMap<String, TimelineDeleteForceResult>>(),
98 0 : )
99 0 : }
100 :
101 0 : async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
102 0 : let request_data: TimelineCreateRequest = json_request(&mut request).await?;
103 :
104 0 : let ttid = TenantTimelineId {
105 0 : tenant_id: request_data.tenant_id,
106 0 : timeline_id: request_data.timeline_id,
107 0 : };
108 0 : check_permission(&request, Some(ttid.tenant_id))?;
109 :
110 0 : let server_info = ServerInfo {
111 0 : pg_version: request_data.pg_version,
112 0 : system_id: request_data.system_id.unwrap_or(0),
113 0 : wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
114 0 : };
115 0 : let global_timelines = get_global_timelines(&request);
116 0 : global_timelines
117 0 : .create(
118 0 : ttid,
119 0 : request_data.mconf,
120 0 : server_info,
121 0 : request_data.start_lsn,
122 0 : request_data.commit_lsn.unwrap_or(request_data.start_lsn),
123 0 : )
124 0 : .await
125 0 : .map_err(ApiError::InternalServerError)?;
126 :
127 0 : json_response(StatusCode::OK, ())
128 0 : }
129 :
130 0 : async fn utilization_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
131 0 : check_permission(&request, None)?;
132 0 : let global_timelines = get_global_timelines(&request);
133 0 : let utilization = global_timelines.get_timeline_counts();
134 0 : json_response(StatusCode::OK, utilization)
135 0 : }
136 :
137 : /// List all (not deleted) timelines.
138 : /// Note: it is possible to do the same with debug_dump.
139 0 : async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
140 0 : check_permission(&request, None)?;
141 0 : let global_timelines = get_global_timelines(&request);
142 0 : let res: Vec<TenantTimelineId> = global_timelines
143 0 : .get_all()
144 0 : .iter()
145 0 : .map(|tli| tli.ttid)
146 0 : .collect();
147 0 : json_response(StatusCode::OK, res)
148 0 : }
149 :
150 : impl From<TermSwitchApiEntry> for TermLsn {
151 0 : fn from(api_val: TermSwitchApiEntry) -> Self {
152 0 : TermLsn {
153 0 : term: api_val.term,
154 0 : lsn: api_val.lsn,
155 0 : }
156 0 : }
157 : }
158 :
159 : /// Report info about timeline.
160 0 : async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
161 0 : let ttid = TenantTimelineId::new(
162 0 : parse_request_param(&request, "tenant_id")?,
163 0 : parse_request_param(&request, "timeline_id")?,
164 : );
165 0 : check_permission(&request, Some(ttid.tenant_id))?;
166 :
167 0 : let global_timelines = get_global_timelines(&request);
168 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
169 0 : let (inmem, state) = tli.get_state().await;
170 0 : let flush_lsn = tli.get_flush_lsn().await;
171 :
172 0 : let last_log_term = state.acceptor_state.get_last_log_term(flush_lsn);
173 0 : let term_history = state
174 0 : .acceptor_state
175 0 : .term_history
176 0 : .0
177 0 : .into_iter()
178 0 : .map(|ts| TermSwitchApiEntry {
179 0 : term: ts.term,
180 0 : lsn: ts.lsn,
181 0 : })
182 0 : .collect();
183 0 : let acc_state = AcceptorStateStatus {
184 0 : term: state.acceptor_state.term,
185 0 : epoch: last_log_term,
186 0 : term_history,
187 0 : };
188 0 :
189 0 : let conf = get_conf(&request);
190 : // Note: we report in memory values which can be lost.
191 0 : let status = TimelineStatus {
192 0 : tenant_id: ttid.tenant_id,
193 0 : timeline_id: ttid.timeline_id,
194 0 : mconf: state.mconf,
195 0 : acceptor_state: acc_state,
196 0 : pg_info: state.server,
197 0 : flush_lsn,
198 0 : timeline_start_lsn: state.timeline_start_lsn,
199 0 : local_start_lsn: state.local_start_lsn,
200 0 : commit_lsn: inmem.commit_lsn,
201 0 : backup_lsn: inmem.backup_lsn,
202 0 : peer_horizon_lsn: inmem.peer_horizon_lsn,
203 0 : remote_consistent_lsn: inmem.remote_consistent_lsn,
204 0 : peers: tli.get_peers(conf).await,
205 0 : walsenders: tli.get_walsenders().get_all_public(),
206 0 : walreceivers: tli.get_walreceivers().get_all(),
207 0 : };
208 0 : json_response(StatusCode::OK, status)
209 0 : }
210 :
211 : /// Deactivates the timeline and removes its data directory.
212 0 : async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
213 0 : let ttid = TenantTimelineId::new(
214 0 : parse_request_param(&request, "tenant_id")?,
215 0 : parse_request_param(&request, "timeline_id")?,
216 : );
217 0 : let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
218 0 : check_permission(&request, Some(ttid.tenant_id))?;
219 0 : ensure_no_body(&mut request).await?;
220 0 : let global_timelines = get_global_timelines(&request);
221 : // FIXME: `delete_force` can fail from both internal errors and bad requests. Add better
222 : // error handling here when we're able to.
223 0 : let resp = global_timelines
224 0 : .delete(&ttid, only_local)
225 0 : .await
226 0 : .map_err(ApiError::InternalServerError)?;
227 0 : json_response(StatusCode::OK, resp)
228 0 : }
229 :
230 : /// Pull timeline from peer safekeeper instances.
231 0 : async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
232 0 : check_permission(&request, None)?;
233 :
234 0 : let data: PullTimelineRequest = json_request(&mut request).await?;
235 0 : let conf = get_conf(&request);
236 0 : let global_timelines = get_global_timelines(&request);
237 :
238 0 : let resp = pull_timeline::handle_request(data, conf.sk_auth_token.clone(), global_timelines)
239 0 : .await
240 0 : .map_err(ApiError::InternalServerError)?;
241 0 : json_response(StatusCode::OK, resp)
242 0 : }
243 :
244 : /// Stream tar archive with all timeline data.
245 0 : async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
246 0 : let destination = parse_request_param(&request, "destination_id")?;
247 0 : let ttid = TenantTimelineId::new(
248 0 : parse_request_param(&request, "tenant_id")?,
249 0 : parse_request_param(&request, "timeline_id")?,
250 : );
251 0 : check_permission(&request, Some(ttid.tenant_id))?;
252 :
253 0 : let global_timelines = get_global_timelines(&request);
254 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
255 :
256 : // To stream the body use wrap_stream which wants Stream of Result<Bytes>,
257 : // so create the chan and write to it in another task.
258 0 : let (tx, rx) = mpsc::channel(1);
259 0 :
260 0 : let conf = get_conf(&request);
261 0 : task::spawn(pull_timeline::stream_snapshot(
262 0 : tli,
263 0 : conf.my_id,
264 0 : destination,
265 0 : tx,
266 0 : ));
267 0 :
268 0 : let rx_stream = ReceiverStream::new(rx);
269 0 : let body = Body::wrap_stream(rx_stream);
270 0 :
271 0 : let response = Response::builder()
272 0 : .status(200)
273 0 : .header(hyper::header::CONTENT_TYPE, "application/octet-stream")
274 0 : .body(body)
275 0 : .unwrap();
276 0 :
277 0 : Ok(response)
278 0 : }
279 :
280 : /// Consider switching timeline membership configuration to the provided one.
281 0 : async fn timeline_membership_handler(
282 0 : mut request: Request<Body>,
283 0 : ) -> Result<Response<Body>, ApiError> {
284 0 : let ttid = TenantTimelineId::new(
285 0 : parse_request_param(&request, "tenant_id")?,
286 0 : parse_request_param(&request, "timeline_id")?,
287 : );
288 0 : check_permission(&request, Some(ttid.tenant_id))?;
289 :
290 0 : let global_timelines = get_global_timelines(&request);
291 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
292 :
293 0 : let data: models::TimelineMembershipSwitchRequest = json_request(&mut request).await?;
294 0 : let response = tli
295 0 : .membership_switch(data.mconf)
296 0 : .await
297 0 : .map_err(ApiError::InternalServerError)?;
298 :
299 0 : json_response(StatusCode::OK, response)
300 0 : }
301 :
302 0 : async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
303 0 : check_permission(&request, None)?;
304 :
305 0 : let request_data: TimelineCopyRequest = json_request(&mut request).await?;
306 0 : let source_ttid = TenantTimelineId::new(
307 0 : parse_request_param(&request, "tenant_id")?,
308 0 : parse_request_param(&request, "source_timeline_id")?,
309 : );
310 :
311 0 : let global_timelines = get_global_timelines(&request);
312 0 :
313 0 : copy_timeline::handle_request(copy_timeline::Request{
314 0 : source_ttid,
315 0 : until_lsn: request_data.until_lsn,
316 0 : destination_ttid: TenantTimelineId::new(source_ttid.tenant_id, request_data.target_timeline_id),
317 0 : }, global_timelines)
318 0 : .instrument(info_span!("copy_timeline", from=%source_ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
319 0 : .await
320 0 : .map_err(ApiError::InternalServerError)?;
321 :
322 0 : json_response(StatusCode::OK, ())
323 0 : }
324 :
325 0 : async fn patch_control_file_handler(
326 0 : mut request: Request<Body>,
327 0 : ) -> Result<Response<Body>, ApiError> {
328 0 : check_permission(&request, None)?;
329 :
330 0 : let ttid = TenantTimelineId::new(
331 0 : parse_request_param(&request, "tenant_id")?,
332 0 : parse_request_param(&request, "timeline_id")?,
333 : );
334 :
335 0 : let global_timelines = get_global_timelines(&request);
336 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
337 :
338 0 : let patch_request: patch_control_file::Request = json_request(&mut request).await?;
339 0 : let response = patch_control_file::handle_request(tli, patch_request)
340 0 : .await
341 0 : .map_err(ApiError::InternalServerError)?;
342 :
343 0 : json_response(StatusCode::OK, response)
344 0 : }
345 :
346 : /// Force persist control file.
347 0 : async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
348 0 : check_permission(&request, None)?;
349 :
350 0 : let ttid = TenantTimelineId::new(
351 0 : parse_request_param(&request, "tenant_id")?,
352 0 : parse_request_param(&request, "timeline_id")?,
353 : );
354 :
355 0 : let global_timelines = get_global_timelines(&request);
356 0 : let tli = global_timelines.get(ttid)?;
357 0 : tli.write_shared_state()
358 0 : .await
359 : .sk
360 0 : .state_mut()
361 0 : .flush()
362 0 : .await
363 0 : .map_err(ApiError::InternalServerError)?;
364 0 : json_response(StatusCode::OK, ())
365 0 : }
366 :
367 0 : async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
368 0 : let ttid = TenantTimelineId::new(
369 0 : parse_request_param(&request, "tenant_id")?,
370 0 : parse_request_param(&request, "timeline_id")?,
371 : );
372 0 : check_permission(&request, Some(ttid.tenant_id))?;
373 :
374 0 : let global_timelines = get_global_timelines(&request);
375 0 : let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
376 0 : let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
377 :
378 0 : let request = TimelineDigestRequest {
379 0 : from_lsn: from_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
380 0 : "from_lsn is required"
381 0 : )))?,
382 0 : until_lsn: until_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
383 0 : "until_lsn is required"
384 0 : )))?,
385 : };
386 :
387 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
388 0 : let tli = tli
389 0 : .wal_residence_guard()
390 0 : .await
391 0 : .map_err(ApiError::InternalServerError)?;
392 :
393 0 : let response = debug_dump::calculate_digest(&tli, request)
394 0 : .await
395 0 : .map_err(ApiError::InternalServerError)?;
396 0 : json_response(StatusCode::OK, response)
397 0 : }
398 :
399 : /// Unevict timeline and remove uploaded partial segment(s) from the remote storage.
400 : /// Successfull response returns list of segments existed before the deletion.
401 : /// Aimed for one-off usage not normally needed.
402 0 : async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Response<Body>, ApiError> {
403 0 : let ttid = TenantTimelineId::new(
404 0 : parse_request_param(&request, "tenant_id")?,
405 0 : parse_request_param(&request, "timeline_id")?,
406 : );
407 0 : check_permission(&request, Some(ttid.tenant_id))?;
408 :
409 0 : let global_timelines = get_global_timelines(&request);
410 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
411 :
412 0 : let response = tli
413 0 : .backup_partial_reset()
414 0 : .await
415 0 : .map_err(ApiError::InternalServerError)?;
416 0 : json_response(StatusCode::OK, response)
417 0 : }
418 :
419 : /// Make term at least as high as one in request. If one in request is None,
420 : /// increment current one.
421 0 : async fn timeline_term_bump_handler(
422 0 : mut request: Request<Body>,
423 0 : ) -> Result<Response<Body>, ApiError> {
424 0 : let ttid = TenantTimelineId::new(
425 0 : parse_request_param(&request, "tenant_id")?,
426 0 : parse_request_param(&request, "timeline_id")?,
427 : );
428 0 : check_permission(&request, Some(ttid.tenant_id))?;
429 :
430 0 : let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
431 :
432 0 : let global_timelines = get_global_timelines(&request);
433 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
434 0 : let response = tli
435 0 : .term_bump(request_data.term)
436 0 : .await
437 0 : .map_err(ApiError::InternalServerError)?;
438 :
439 0 : json_response(StatusCode::OK, response)
440 0 : }
441 :
442 : /// Used only in tests to hand craft required data.
443 0 : async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
444 0 : let ttid = TenantTimelineId::new(
445 0 : parse_request_param(&request, "tenant_id")?,
446 0 : parse_request_param(&request, "timeline_id")?,
447 : );
448 0 : check_permission(&request, Some(ttid.tenant_id))?;
449 0 : let sk_info: SkTimelineInfo = json_request(&mut request).await?;
450 0 : let proto_sk_info = SafekeeperTimelineInfo {
451 0 : safekeeper_id: 0,
452 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
453 0 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
454 0 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
455 0 : }),
456 0 : term: sk_info.term.unwrap_or(0),
457 0 : last_log_term: sk_info.last_log_term.unwrap_or(0),
458 0 : flush_lsn: sk_info.flush_lsn.0,
459 0 : commit_lsn: sk_info.commit_lsn.0,
460 0 : remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
461 0 : peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
462 0 : safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
463 0 : http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
464 0 : backup_lsn: sk_info.backup_lsn.0,
465 0 : local_start_lsn: sk_info.local_start_lsn.0,
466 0 : availability_zone: None,
467 0 : standby_horizon: sk_info.standby_horizon.0,
468 0 : };
469 0 :
470 0 : let global_timelines = get_global_timelines(&request);
471 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
472 0 : tli.record_safekeeper_info(proto_sk_info)
473 0 : .await
474 0 : .map_err(ApiError::InternalServerError)?;
475 :
476 0 : json_response(StatusCode::OK, ())
477 0 : }
478 :
479 0 : fn parse_kv_str<E: fmt::Display, T: FromStr<Err = E>>(k: &str, v: &str) -> Result<T, ApiError> {
480 0 : v.parse()
481 0 : .map_err(|e| ApiError::BadRequest(anyhow::anyhow!("cannot parse {k}: {e}")))
482 0 : }
483 :
484 : /// Dump debug info about all available safekeeper state.
485 0 : async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
486 0 : check_permission(&request, None)?;
487 0 : ensure_no_body(&mut request).await?;
488 :
489 0 : let mut dump_all: Option<bool> = None;
490 0 : let mut dump_control_file: Option<bool> = None;
491 0 : let mut dump_memory: Option<bool> = None;
492 0 : let mut dump_disk_content: Option<bool> = None;
493 0 : let mut dump_term_history: Option<bool> = None;
494 0 : let mut dump_wal_last_modified: Option<bool> = None;
495 0 : let mut tenant_id: Option<TenantId> = None;
496 0 : let mut timeline_id: Option<TimelineId> = None;
497 0 :
498 0 : let query = request.uri().query().unwrap_or("");
499 0 : let mut values = url::form_urlencoded::parse(query.as_bytes());
500 :
501 0 : for (k, v) in &mut values {
502 0 : match k.as_ref() {
503 0 : "dump_all" => dump_all = Some(parse_kv_str(&k, &v)?),
504 0 : "dump_control_file" => dump_control_file = Some(parse_kv_str(&k, &v)?),
505 0 : "dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
506 0 : "dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
507 0 : "dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
508 0 : "dump_wal_last_modified" => dump_wal_last_modified = Some(parse_kv_str(&k, &v)?),
509 0 : "tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
510 0 : "timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
511 0 : _ => Err(ApiError::BadRequest(anyhow::anyhow!(
512 0 : "Unknown query parameter: {}",
513 0 : k
514 0 : )))?,
515 : }
516 : }
517 :
518 0 : let dump_all = dump_all.unwrap_or(false);
519 0 : let dump_control_file = dump_control_file.unwrap_or(dump_all);
520 0 : let dump_memory = dump_memory.unwrap_or(dump_all);
521 0 : let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
522 0 : let dump_term_history = dump_term_history.unwrap_or(true);
523 0 : let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all);
524 0 :
525 0 : let global_timelines = get_global_timelines(&request);
526 0 :
527 0 : let args = debug_dump::Args {
528 0 : dump_all,
529 0 : dump_control_file,
530 0 : dump_memory,
531 0 : dump_disk_content,
532 0 : dump_term_history,
533 0 : dump_wal_last_modified,
534 0 : tenant_id,
535 0 : timeline_id,
536 0 : };
537 :
538 0 : let resp = debug_dump::build(args, global_timelines)
539 0 : .await
540 0 : .map_err(ApiError::InternalServerError)?;
541 :
542 0 : let started_at = std::time::Instant::now();
543 0 :
544 0 : let (tx, rx) = mpsc::channel(1);
545 0 :
546 0 : let body = Body::wrap_stream(ReceiverStream::new(rx));
547 0 :
548 0 : let mut writer = ChannelWriter::new(128 * 1024, tx);
549 0 :
550 0 : let response = Response::builder()
551 0 : .status(200)
552 0 : .header(hyper::header::CONTENT_TYPE, "application/octet-stream")
553 0 : .body(body)
554 0 : .unwrap();
555 :
556 0 : let span = info_span!("blocking");
557 0 : tokio::task::spawn_blocking(move || {
558 0 : let _span = span.entered();
559 0 :
560 0 : let res = serde_json::to_writer(&mut writer, &resp)
561 0 : .map_err(std::io::Error::from)
562 0 : .and_then(|_| writer.flush());
563 0 :
564 0 : match res {
565 : Ok(()) => {
566 0 : tracing::info!(
567 0 : bytes = writer.flushed_bytes(),
568 0 : elapsed_ms = started_at.elapsed().as_millis(),
569 0 : "responded /v1/debug_dump"
570 : );
571 : }
572 0 : Err(e) => {
573 0 : tracing::warn!("failed to write out /v1/debug_dump response: {e:#}");
574 : // semantics of this error are quite... unclear. we want to error the stream out to
575 : // abort the response to somehow notify the client that we failed.
576 : //
577 : // though, most likely the reason for failure is that the receiver is already gone.
578 0 : drop(
579 0 : writer
580 0 : .tx
581 0 : .blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
582 0 : );
583 : }
584 : }
585 0 : });
586 0 :
587 0 : Ok(response)
588 0 : }
589 :
590 : /// Safekeeper http router.
591 0 : pub fn make_router(
592 0 : conf: Arc<SafeKeeperConf>,
593 0 : global_timelines: Arc<GlobalTimelines>,
594 0 : ) -> RouterBuilder<hyper::Body, ApiError> {
595 0 : let mut router = endpoint::make_router();
596 0 : if conf.http_auth.is_some() {
597 0 : router = router.middleware(auth_middleware(|request| {
598 : const ALLOWLIST_ROUTES: &[&str] =
599 : &["/v1/status", "/metrics", "/profile/cpu", "/profile/heap"];
600 0 : if ALLOWLIST_ROUTES.contains(&request.uri().path()) {
601 0 : None
602 : } else {
603 : // Option<Arc<SwappableJwtAuth>> is always provided as data below, hence unwrap().
604 0 : request
605 0 : .data::<Option<Arc<SwappableJwtAuth>>>()
606 0 : .unwrap()
607 0 : .as_deref()
608 : }
609 0 : }))
610 0 : }
611 :
612 : // NB: on any changes do not forget to update the OpenAPI spec
613 : // located nearby (/safekeeper/src/http/openapi_spec.yaml).
614 0 : let auth = conf.http_auth.clone();
615 0 : router
616 0 : .data(conf)
617 0 : .data(global_timelines)
618 0 : .data(auth)
619 0 : .get("/metrics", |r| request_span(r, prometheus_metrics_handler))
620 0 : .get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
621 0 : .get("/profile/heap", |r| request_span(r, profile_heap_handler))
622 0 : .get("/v1/status", |r| request_span(r, status_handler))
623 0 : .put("/v1/failpoints", |r| {
624 0 : request_span(r, move |r| async {
625 0 : check_permission(&r, None)?;
626 0 : let cancel = CancellationToken::new();
627 0 : failpoints_handler(r, cancel).await
628 0 : })
629 0 : })
630 0 : .get("/v1/utilization", |r| request_span(r, utilization_handler))
631 0 : .delete("/v1/tenant/:tenant_id", |r| {
632 0 : request_span(r, tenant_delete_handler)
633 0 : })
634 0 : // Will be used in the future instead of implicit timeline creation
635 0 : .post("/v1/tenant/timeline", |r| {
636 0 : request_span(r, timeline_create_handler)
637 0 : })
638 0 : .get("/v1/tenant/timeline", |r| {
639 0 : request_span(r, timeline_list_handler)
640 0 : })
641 0 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
642 0 : request_span(r, timeline_status_handler)
643 0 : })
644 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
645 0 : request_span(r, timeline_delete_handler)
646 0 : })
647 0 : .post("/v1/pull_timeline", |r| {
648 0 : request_span(r, timeline_pull_handler)
649 0 : })
650 0 : .get(
651 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
652 0 : |r| request_span(r, timeline_snapshot_handler),
653 0 : )
654 0 : .post(
655 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/membership",
656 0 : |r| request_span(r, timeline_membership_handler),
657 0 : )
658 0 : .post(
659 0 : "/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
660 0 : |r| request_span(r, timeline_copy_handler),
661 0 : )
662 0 : .patch(
663 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/control_file",
664 0 : |r| request_span(r, patch_control_file_handler),
665 0 : )
666 0 : .post(
667 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
668 0 : |r| request_span(r, timeline_checkpoint_handler),
669 0 : )
670 0 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
671 0 : request_span(r, timeline_digest_handler)
672 0 : })
673 0 : .post(
674 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset",
675 0 : |r| request_span(r, timeline_backup_partial_reset),
676 0 : )
677 0 : .post(
678 0 : "/v1/tenant/:tenant_id/timeline/:timeline_id/term_bump",
679 0 : |r| request_span(r, timeline_term_bump_handler),
680 0 : )
681 0 : .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
682 0 : request_span(r, record_safekeeper_info)
683 0 : })
684 0 : .get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
685 0 : }
686 :
687 : #[cfg(test)]
688 : mod tests {
689 : use super::*;
690 :
691 : #[test]
692 1 : fn test_term_switch_entry_api_serialize() {
693 1 : let state = AcceptorStateStatus {
694 1 : term: 1,
695 1 : epoch: 1,
696 1 : term_history: vec![TermSwitchApiEntry {
697 1 : term: 1,
698 1 : lsn: Lsn(0x16FFDDDD),
699 1 : }],
700 1 : };
701 1 : let json = serde_json::to_string(&state).unwrap();
702 1 : assert_eq!(
703 1 : json,
704 1 : "{\"term\":1,\"epoch\":1,\"term_history\":[{\"term\":1,\"lsn\":\"0/16FFDDDD\"}]}"
705 1 : );
706 1 : }
707 : }
|