Line data Source code
1 : use std::collections::HashMap;
2 : use std::fmt;
3 : use std::io::Write as _;
4 : use std::str::FromStr;
5 : use std::sync::Arc;
6 :
7 : use http_utils::endpoint::{
8 : self, ChannelWriter, auth_middleware, check_permission_with, profile_cpu_handler,
9 : profile_heap_handler, prometheus_metrics_handler, request_span,
10 : };
11 : use http_utils::error::ApiError;
12 : use http_utils::failpoints::failpoints_handler;
13 : use http_utils::json::{json_request, json_response};
14 : use http_utils::request::{ensure_no_body, parse_query_param, parse_request_param};
15 : use http_utils::{RequestExt, RouterBuilder};
16 : use hyper::{Body, Request, Response, StatusCode};
17 : use pem::Pem;
18 : use postgres_ffi::WAL_SEGMENT_SIZE;
19 : use safekeeper_api::models::{
20 : AcceptorStateStatus, PullTimelineRequest, SafekeeperStatus, SkTimelineInfo, TenantDeleteResult,
21 : TermSwitchApiEntry, TimelineCopyRequest, TimelineCreateRequest, TimelineDeleteResult,
22 : TimelineStatus, TimelineTermBumpRequest,
23 : };
24 : use safekeeper_api::{ServerInfo, membership, models};
25 : use storage_broker::proto::{SafekeeperTimelineInfo, TenantTimelineId as ProtoTenantTimelineId};
26 : use tokio::sync::mpsc;
27 : use tokio::task;
28 : use tokio_stream::wrappers::ReceiverStream;
29 : use tokio_util::sync::CancellationToken;
30 : use tracing::{Instrument, info_span};
31 : use utils::auth::SwappableJwtAuth;
32 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
33 : use utils::lsn::Lsn;
34 :
35 : use crate::debug_dump::TimelineDigestRequest;
36 : use crate::hadron::{get_filesystem_capacity, get_filesystem_usage};
37 : use crate::safekeeper::TermLsn;
38 : use crate::timelines_global_map::DeleteOrExclude;
39 : use crate::{
40 : GlobalTimelines, SafeKeeperConf, copy_timeline, debug_dump, patch_control_file, pull_timeline,
41 : };
42 : use serde_json::json;
43 :
44 : /// Healthcheck handler.
45 0 : async fn status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
46 0 : check_permission(&request, None)?;
47 0 : let conf = get_conf(&request);
48 0 : let status = SafekeeperStatus { id: conf.my_id };
49 0 : json_response(StatusCode::OK, status)
50 0 : }
51 :
52 0 : fn get_conf(request: &Request<Body>) -> &SafeKeeperConf {
53 0 : request
54 0 : .data::<Arc<SafeKeeperConf>>()
55 0 : .expect("unknown state type")
56 0 : .as_ref()
57 0 : }
58 :
59 0 : fn get_global_timelines(request: &Request<Body>) -> Arc<GlobalTimelines> {
60 0 : request
61 0 : .data::<Arc<GlobalTimelines>>()
62 0 : .expect("unknown state type")
63 0 : .clone()
64 0 : }
65 :
66 0 : fn check_permission(request: &Request<Body>, tenant_id: Option<TenantId>) -> Result<(), ApiError> {
67 0 : check_permission_with(request, |claims| {
68 0 : crate::auth::check_permission(claims, tenant_id)
69 0 : })
70 0 : }
71 :
72 : /// Deactivates all timelines for the tenant and removes its data directory.
73 : /// See `timeline_delete_handler`.
74 0 : async fn tenant_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
75 0 : let tenant_id = parse_request_param(&request, "tenant_id")?;
76 0 : let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
77 0 : check_permission(&request, Some(tenant_id))?;
78 0 : ensure_no_body(&mut request).await?;
79 0 : let global_timelines = get_global_timelines(&request);
80 0 : let action = if only_local {
81 0 : DeleteOrExclude::DeleteLocal
82 : } else {
83 0 : DeleteOrExclude::Delete
84 : };
85 0 : let delete_info = global_timelines
86 0 : .delete_all_for_tenant(&tenant_id, action)
87 0 : .await
88 0 : .map_err(ApiError::InternalServerError)?;
89 0 : let response_body: TenantDeleteResult = delete_info
90 0 : .iter()
91 0 : .map(|(ttid, resp)| (format!("{}", ttid.timeline_id), *resp))
92 0 : .collect::<HashMap<String, TimelineDeleteResult>>();
93 0 : json_response(StatusCode::OK, response_body)
94 0 : }
95 :
96 0 : async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
97 0 : let request_data: TimelineCreateRequest = json_request(&mut request).await?;
98 :
99 0 : let ttid = TenantTimelineId {
100 0 : tenant_id: request_data.tenant_id,
101 0 : timeline_id: request_data.timeline_id,
102 0 : };
103 0 : check_permission(&request, Some(ttid.tenant_id))?;
104 :
105 0 : let server_info = ServerInfo {
106 0 : pg_version: request_data.pg_version,
107 0 : system_id: request_data.system_id.unwrap_or(0),
108 0 : wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
109 0 : };
110 0 : let global_timelines = get_global_timelines(&request);
111 0 : global_timelines
112 0 : .create(
113 0 : ttid,
114 0 : request_data.mconf,
115 0 : server_info,
116 0 : request_data.start_lsn,
117 0 : request_data.commit_lsn.unwrap_or(request_data.start_lsn),
118 0 : )
119 0 : .await
120 0 : .map_err(ApiError::InternalServerError)?;
121 :
122 0 : json_response(StatusCode::OK, ())
123 0 : }
124 :
125 0 : async fn utilization_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
126 0 : check_permission(&request, None)?;
127 0 : let global_timelines = get_global_timelines(&request);
128 0 : let utilization = global_timelines.get_timeline_counts();
129 0 : json_response(StatusCode::OK, utilization)
130 0 : }
131 :
132 : /// Returns filesystem capacity and current utilization for the safekeeper data directory.
133 0 : async fn filesystem_usage_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
134 0 : check_permission(&request, None)?;
135 0 : let conf = get_conf(&request);
136 0 : let path = conf.workdir.as_std_path();
137 0 : let capacity = get_filesystem_capacity(path).map_err(ApiError::InternalServerError)?;
138 0 : let usage = get_filesystem_usage(path);
139 0 : let resp = json!({
140 0 : "data_dir": path,
141 0 : "capacity_bytes": capacity,
142 0 : "usage_bytes": usage,
143 : });
144 0 : json_response(StatusCode::OK, resp)
145 0 : }
146 :
147 : /// List all (not deleted) timelines.
148 : /// Note: it is possible to do the same with debug_dump.
149 0 : async fn timeline_list_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
150 0 : check_permission(&request, None)?;
151 0 : let global_timelines = get_global_timelines(&request);
152 0 : let res: Vec<TenantTimelineId> = global_timelines
153 0 : .get_all()
154 0 : .iter()
155 0 : .map(|tli| tli.ttid)
156 0 : .collect();
157 0 : json_response(StatusCode::OK, res)
158 0 : }
159 :
160 : impl From<TermSwitchApiEntry> for TermLsn {
161 0 : fn from(api_val: TermSwitchApiEntry) -> Self {
162 0 : TermLsn {
163 0 : term: api_val.term,
164 0 : lsn: api_val.lsn,
165 0 : }
166 0 : }
167 : }
168 :
169 : /// Report info about timeline.
170 0 : async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
171 0 : let ttid = TenantTimelineId::new(
172 0 : parse_request_param(&request, "tenant_id")?,
173 0 : parse_request_param(&request, "timeline_id")?,
174 : );
175 0 : check_permission(&request, Some(ttid.tenant_id))?;
176 :
177 0 : let global_timelines = get_global_timelines(&request);
178 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
179 0 : let (inmem, state) = tli.get_state().await;
180 0 : let flush_lsn = tli.get_flush_lsn().await;
181 :
182 0 : let last_log_term = state.acceptor_state.get_last_log_term(flush_lsn);
183 0 : let term_history = state
184 0 : .acceptor_state
185 0 : .term_history
186 0 : .0
187 0 : .into_iter()
188 0 : .map(|ts| TermSwitchApiEntry {
189 0 : term: ts.term,
190 0 : lsn: ts.lsn,
191 0 : })
192 0 : .collect();
193 0 : let acc_state = AcceptorStateStatus {
194 0 : term: state.acceptor_state.term,
195 0 : epoch: last_log_term,
196 0 : term_history,
197 0 : };
198 :
199 0 : let conf = get_conf(&request);
200 : // Note: we report in memory values which can be lost.
201 0 : let status = TimelineStatus {
202 0 : tenant_id: ttid.tenant_id,
203 0 : timeline_id: ttid.timeline_id,
204 0 : mconf: state.mconf,
205 0 : acceptor_state: acc_state,
206 0 : pg_info: state.server,
207 0 : flush_lsn,
208 0 : timeline_start_lsn: state.timeline_start_lsn,
209 0 : local_start_lsn: state.local_start_lsn,
210 0 : commit_lsn: inmem.commit_lsn,
211 0 : backup_lsn: inmem.backup_lsn,
212 0 : peer_horizon_lsn: inmem.peer_horizon_lsn,
213 0 : remote_consistent_lsn: inmem.remote_consistent_lsn,
214 0 : peers: tli.get_peers(conf).await,
215 0 : walsenders: tli.get_walsenders().get_all_public(),
216 0 : walreceivers: tli.get_walreceivers().get_all(),
217 : };
218 0 : json_response(StatusCode::OK, status)
219 0 : }
220 :
221 : /// Deactivates the timeline and removes its data directory.
222 0 : async fn timeline_delete_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
223 0 : let ttid = TenantTimelineId::new(
224 0 : parse_request_param(&request, "tenant_id")?,
225 0 : parse_request_param(&request, "timeline_id")?,
226 : );
227 0 : let only_local = parse_query_param(&request, "only_local")?.unwrap_or(false);
228 0 : check_permission(&request, Some(ttid.tenant_id))?;
229 0 : ensure_no_body(&mut request).await?;
230 0 : let global_timelines = get_global_timelines(&request);
231 0 : let action = if only_local {
232 0 : DeleteOrExclude::DeleteLocal
233 : } else {
234 0 : DeleteOrExclude::Delete
235 : };
236 0 : let resp = global_timelines
237 0 : .delete_or_exclude(&ttid, action)
238 0 : .await
239 0 : .map_err(ApiError::from)?;
240 0 : json_response(StatusCode::OK, resp)
241 0 : }
242 :
243 : /// Pull timeline from peer safekeeper instances.
244 0 : async fn timeline_pull_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
245 0 : check_permission(&request, None)?;
246 :
247 0 : let data: PullTimelineRequest = json_request(&mut request).await?;
248 0 : let conf = get_conf(&request);
249 0 : let global_timelines = get_global_timelines(&request);
250 :
251 0 : let ca_certs = conf
252 0 : .ssl_ca_certs
253 0 : .iter()
254 0 : .map(Pem::contents)
255 0 : .map(reqwest::Certificate::from_der)
256 0 : .collect::<Result<Vec<_>, _>>()
257 0 : .map_err(|e| {
258 0 : ApiError::InternalServerError(anyhow::anyhow!("failed to parse CA certs: {e}"))
259 0 : })?;
260 :
261 0 : let resp = pull_timeline::handle_request(
262 0 : data,
263 0 : conf.sk_auth_token.clone(),
264 0 : ca_certs,
265 0 : global_timelines,
266 0 : false,
267 0 : )
268 0 : .await?;
269 0 : json_response(StatusCode::OK, resp)
270 0 : }
271 :
272 : /// Stream tar archive with all timeline data.
273 0 : async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
274 0 : let destination = parse_request_param(&request, "destination_id")?;
275 0 : let ttid = TenantTimelineId::new(
276 0 : parse_request_param(&request, "tenant_id")?,
277 0 : parse_request_param(&request, "timeline_id")?,
278 : );
279 0 : check_permission(&request, Some(ttid.tenant_id))?;
280 :
281 0 : let global_timelines = get_global_timelines(&request);
282 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
283 0 : let storage = global_timelines.get_wal_backup().get_storage();
284 :
285 : // To stream the body use wrap_stream which wants Stream of Result<Bytes>,
286 : // so create the chan and write to it in another task.
287 0 : let (tx, rx) = mpsc::channel(1);
288 :
289 0 : let conf = get_conf(&request);
290 0 : task::spawn(pull_timeline::stream_snapshot(
291 0 : tli,
292 0 : conf.my_id,
293 0 : destination,
294 0 : tx,
295 0 : storage,
296 : ));
297 :
298 0 : let rx_stream = ReceiverStream::new(rx);
299 0 : let body = Body::wrap_stream(rx_stream);
300 :
301 0 : let response = Response::builder()
302 0 : .status(200)
303 0 : .header(hyper::header::CONTENT_TYPE, "application/octet-stream")
304 0 : .body(body)
305 0 : .unwrap();
306 :
307 0 : Ok(response)
308 0 : }
309 :
310 : /// Error type for delete_or_exclude: either generation conflict or something
311 : /// internal.
312 : #[derive(thiserror::Error, Debug)]
313 : pub enum DeleteOrExcludeError {
314 : #[error("refused to switch into excluding mconf {requested}, current: {current}")]
315 : Conflict {
316 : requested: membership::Configuration,
317 : current: membership::Configuration,
318 : },
319 : #[error(transparent)]
320 : Other(#[from] anyhow::Error),
321 : }
322 :
323 : /// Convert DeleteOrExcludeError to ApiError.
324 : impl From<DeleteOrExcludeError> for ApiError {
325 0 : fn from(de: DeleteOrExcludeError) -> ApiError {
326 0 : match de {
327 : DeleteOrExcludeError::Conflict {
328 : requested: _,
329 : current: _,
330 0 : } => ApiError::Conflict(de.to_string()),
331 0 : DeleteOrExcludeError::Other(e) => ApiError::InternalServerError(e),
332 : }
333 0 : }
334 : }
335 :
336 : /// Remove timeline locally after this node has been excluded from the
337 : /// membership configuration. The body is the same as in the membership endpoint
338 : /// -- conf where node is excluded -- and in principle single ep could be used
339 : /// for both actions, but since this is a data deletion op let's keep them
340 : /// separate.
341 0 : async fn timeline_exclude_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
342 0 : let ttid = TenantTimelineId::new(
343 0 : parse_request_param(&request, "tenant_id")?,
344 0 : parse_request_param(&request, "timeline_id")?,
345 : );
346 0 : check_permission(&request, Some(ttid.tenant_id))?;
347 :
348 0 : let global_timelines = get_global_timelines(&request);
349 0 : let data: models::TimelineMembershipSwitchRequest = json_request(&mut request).await?;
350 0 : let my_id = get_conf(&request).my_id;
351 : // If request doesn't exclude us, membership switch endpoint should be used
352 : // instead.
353 0 : if data.mconf.contains(my_id) {
354 0 : return Err(ApiError::Forbidden(format!(
355 0 : "refused to switch into {}, node {} is member of it",
356 0 : data.mconf, my_id
357 0 : )));
358 0 : }
359 0 : let action = DeleteOrExclude::Exclude(data.mconf);
360 :
361 0 : let resp = global_timelines
362 0 : .delete_or_exclude(&ttid, action)
363 0 : .await
364 0 : .map_err(ApiError::from)?;
365 0 : json_response(StatusCode::OK, resp)
366 0 : }
367 :
368 : /// Consider switching timeline membership configuration to the provided one.
369 0 : async fn timeline_membership_handler(
370 0 : mut request: Request<Body>,
371 0 : ) -> Result<Response<Body>, ApiError> {
372 0 : let ttid = TenantTimelineId::new(
373 0 : parse_request_param(&request, "tenant_id")?,
374 0 : parse_request_param(&request, "timeline_id")?,
375 : );
376 0 : check_permission(&request, Some(ttid.tenant_id))?;
377 :
378 0 : let global_timelines = get_global_timelines(&request);
379 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
380 :
381 0 : let data: models::TimelineMembershipSwitchRequest = json_request(&mut request).await?;
382 0 : let my_id = get_conf(&request).my_id;
383 : // If request excludes us, exclude endpoint should be used instead.
384 0 : if !data.mconf.contains(my_id) {
385 0 : return Err(ApiError::Forbidden(format!(
386 0 : "refused to switch into {}, node {} is not a member of it",
387 0 : data.mconf, my_id
388 0 : )));
389 0 : }
390 0 : let req_gen = data.mconf.generation;
391 0 : let response = tli
392 0 : .membership_switch(data.mconf)
393 0 : .await
394 0 : .map_err(ApiError::InternalServerError)?;
395 :
396 : // Return 409 if request was ignored.
397 0 : if req_gen == response.current_conf.generation {
398 0 : json_response(StatusCode::OK, response)
399 : } else {
400 0 : Err(ApiError::Conflict(format!(
401 0 : "request to switch into {} ignored, current generation {}",
402 0 : req_gen, response.current_conf.generation
403 0 : )))
404 : }
405 0 : }
406 :
407 0 : async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
408 0 : check_permission(&request, None)?;
409 :
410 0 : let request_data: TimelineCopyRequest = json_request(&mut request).await?;
411 0 : let source_ttid = TenantTimelineId::new(
412 0 : parse_request_param(&request, "tenant_id")?,
413 0 : parse_request_param(&request, "source_timeline_id")?,
414 : );
415 :
416 0 : let global_timelines = get_global_timelines(&request);
417 0 : let wal_backup = global_timelines.get_wal_backup();
418 0 : let storage = wal_backup
419 0 : .get_storage()
420 0 : .ok_or(ApiError::BadRequest(anyhow::anyhow!(
421 0 : "Remote Storage is not configured"
422 0 : )))?;
423 :
424 0 : copy_timeline::handle_request(copy_timeline::Request{
425 0 : source_ttid,
426 0 : until_lsn: request_data.until_lsn,
427 0 : destination_ttid: TenantTimelineId::new(source_ttid.tenant_id, request_data.target_timeline_id),
428 0 : }, global_timelines, storage)
429 0 : .instrument(info_span!("copy_timeline", from=%source_ttid, to=%request_data.target_timeline_id, until_lsn=%request_data.until_lsn))
430 0 : .await
431 0 : .map_err(ApiError::InternalServerError)?;
432 :
433 0 : json_response(StatusCode::OK, ())
434 0 : }
435 :
436 0 : async fn patch_control_file_handler(
437 0 : mut request: Request<Body>,
438 0 : ) -> Result<Response<Body>, ApiError> {
439 0 : check_permission(&request, None)?;
440 :
441 0 : let ttid = TenantTimelineId::new(
442 0 : parse_request_param(&request, "tenant_id")?,
443 0 : parse_request_param(&request, "timeline_id")?,
444 : );
445 :
446 0 : let global_timelines = get_global_timelines(&request);
447 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
448 :
449 0 : let patch_request: patch_control_file::Request = json_request(&mut request).await?;
450 0 : let response = patch_control_file::handle_request(tli, patch_request)
451 0 : .await
452 0 : .map_err(ApiError::InternalServerError)?;
453 :
454 0 : json_response(StatusCode::OK, response)
455 0 : }
456 :
457 : /// Force persist control file.
458 0 : async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
459 0 : check_permission(&request, None)?;
460 :
461 0 : let ttid = TenantTimelineId::new(
462 0 : parse_request_param(&request, "tenant_id")?,
463 0 : parse_request_param(&request, "timeline_id")?,
464 : );
465 :
466 0 : let global_timelines = get_global_timelines(&request);
467 0 : let tli = global_timelines.get(ttid)?;
468 0 : tli.write_shared_state()
469 0 : .await
470 : .sk
471 0 : .state_mut()
472 0 : .flush()
473 0 : .await
474 0 : .map_err(ApiError::InternalServerError)?;
475 0 : json_response(StatusCode::OK, ())
476 0 : }
477 :
478 0 : async fn timeline_digest_handler(request: Request<Body>) -> Result<Response<Body>, ApiError> {
479 0 : let ttid = TenantTimelineId::new(
480 0 : parse_request_param(&request, "tenant_id")?,
481 0 : parse_request_param(&request, "timeline_id")?,
482 : );
483 0 : check_permission(&request, Some(ttid.tenant_id))?;
484 :
485 0 : let global_timelines = get_global_timelines(&request);
486 0 : let from_lsn: Option<Lsn> = parse_query_param(&request, "from_lsn")?;
487 0 : let until_lsn: Option<Lsn> = parse_query_param(&request, "until_lsn")?;
488 :
489 0 : let request = TimelineDigestRequest {
490 0 : from_lsn: from_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
491 0 : "from_lsn is required"
492 0 : )))?,
493 0 : until_lsn: until_lsn.ok_or(ApiError::BadRequest(anyhow::anyhow!(
494 0 : "until_lsn is required"
495 0 : )))?,
496 : };
497 :
498 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
499 0 : let tli = tli
500 0 : .wal_residence_guard()
501 0 : .await
502 0 : .map_err(ApiError::InternalServerError)?;
503 :
504 0 : let response = debug_dump::calculate_digest(&tli, request)
505 0 : .await
506 0 : .map_err(ApiError::InternalServerError)?;
507 0 : json_response(StatusCode::OK, response)
508 0 : }
509 :
510 : /// Unevict timeline and remove uploaded partial segment(s) from the remote storage.
511 : /// Successfull response returns list of segments existed before the deletion.
512 : /// Aimed for one-off usage not normally needed.
513 0 : async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Response<Body>, ApiError> {
514 0 : let ttid = TenantTimelineId::new(
515 0 : parse_request_param(&request, "tenant_id")?,
516 0 : parse_request_param(&request, "timeline_id")?,
517 : );
518 0 : check_permission(&request, Some(ttid.tenant_id))?;
519 :
520 0 : let global_timelines = get_global_timelines(&request);
521 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
522 :
523 0 : let response = tli
524 0 : .backup_partial_reset()
525 0 : .await
526 0 : .map_err(ApiError::InternalServerError)?;
527 0 : json_response(StatusCode::OK, response)
528 0 : }
529 :
530 : /// Make term at least as high as one in request. If one in request is None,
531 : /// increment current one.
532 0 : async fn timeline_term_bump_handler(
533 0 : mut request: Request<Body>,
534 0 : ) -> Result<Response<Body>, ApiError> {
535 0 : let ttid = TenantTimelineId::new(
536 0 : parse_request_param(&request, "tenant_id")?,
537 0 : parse_request_param(&request, "timeline_id")?,
538 : );
539 0 : check_permission(&request, Some(ttid.tenant_id))?;
540 :
541 0 : let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;
542 :
543 0 : let global_timelines = get_global_timelines(&request);
544 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
545 0 : let response = tli
546 0 : .term_bump(request_data.term)
547 0 : .await
548 0 : .map_err(ApiError::InternalServerError)?;
549 :
550 0 : json_response(StatusCode::OK, response)
551 0 : }
552 :
553 : /// Used only in tests to hand craft required data.
554 0 : async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
555 0 : let ttid = TenantTimelineId::new(
556 0 : parse_request_param(&request, "tenant_id")?,
557 0 : parse_request_param(&request, "timeline_id")?,
558 : );
559 0 : check_permission(&request, Some(ttid.tenant_id))?;
560 0 : let sk_info: SkTimelineInfo = json_request(&mut request).await?;
561 0 : let proto_sk_info = SafekeeperTimelineInfo {
562 : safekeeper_id: 0,
563 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
564 0 : tenant_id: ttid.tenant_id.as_ref().to_owned(),
565 0 : timeline_id: ttid.timeline_id.as_ref().to_owned(),
566 0 : }),
567 0 : term: sk_info.term.unwrap_or(0),
568 0 : last_log_term: sk_info.last_log_term.unwrap_or(0),
569 0 : flush_lsn: sk_info.flush_lsn.0,
570 0 : commit_lsn: sk_info.commit_lsn.0,
571 0 : remote_consistent_lsn: sk_info.remote_consistent_lsn.0,
572 0 : peer_horizon_lsn: sk_info.peer_horizon_lsn.0,
573 0 : safekeeper_connstr: sk_info.safekeeper_connstr.unwrap_or_else(|| "".to_owned()),
574 0 : http_connstr: sk_info.http_connstr.unwrap_or_else(|| "".to_owned()),
575 0 : https_connstr: sk_info.https_connstr,
576 0 : backup_lsn: sk_info.backup_lsn.0,
577 0 : local_start_lsn: sk_info.local_start_lsn.0,
578 0 : availability_zone: None,
579 0 : standby_horizon: sk_info.standby_horizon.0,
580 : };
581 :
582 0 : let global_timelines = get_global_timelines(&request);
583 0 : let tli = global_timelines.get(ttid).map_err(ApiError::from)?;
584 0 : tli.record_safekeeper_info(proto_sk_info)
585 0 : .await
586 0 : .map_err(ApiError::InternalServerError)?;
587 :
588 0 : json_response(StatusCode::OK, ())
589 0 : }
590 :
591 0 : fn parse_kv_str<E: fmt::Display, T: FromStr<Err = E>>(k: &str, v: &str) -> Result<T, ApiError> {
592 0 : v.parse()
593 0 : .map_err(|e| ApiError::BadRequest(anyhow::anyhow!("cannot parse {k}: {e}")))
594 0 : }
595 :
596 : /// Dump debug info about all available safekeeper state.
597 0 : async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
598 0 : check_permission(&request, None)?;
599 0 : ensure_no_body(&mut request).await?;
600 :
601 0 : let mut dump_all: Option<bool> = None;
602 0 : let mut dump_control_file: Option<bool> = None;
603 0 : let mut dump_memory: Option<bool> = None;
604 0 : let mut dump_disk_content: Option<bool> = None;
605 0 : let mut dump_term_history: Option<bool> = None;
606 0 : let mut dump_wal_last_modified: Option<bool> = None;
607 0 : let mut tenant_id: Option<TenantId> = None;
608 0 : let mut timeline_id: Option<TimelineId> = None;
609 :
610 0 : let query = request.uri().query().unwrap_or("");
611 0 : let mut values = url::form_urlencoded::parse(query.as_bytes());
612 :
613 0 : for (k, v) in &mut values {
614 0 : match k.as_ref() {
615 0 : "dump_all" => dump_all = Some(parse_kv_str(&k, &v)?),
616 0 : "dump_control_file" => dump_control_file = Some(parse_kv_str(&k, &v)?),
617 0 : "dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
618 0 : "dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
619 0 : "dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
620 0 : "dump_wal_last_modified" => dump_wal_last_modified = Some(parse_kv_str(&k, &v)?),
621 0 : "tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
622 0 : "timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
623 0 : _ => Err(ApiError::BadRequest(anyhow::anyhow!(
624 0 : "Unknown query parameter: {}",
625 0 : k
626 0 : )))?,
627 : }
628 : }
629 :
630 0 : let dump_all = dump_all.unwrap_or(false);
631 0 : let dump_control_file = dump_control_file.unwrap_or(dump_all);
632 0 : let dump_memory = dump_memory.unwrap_or(dump_all);
633 0 : let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
634 0 : let dump_term_history = dump_term_history.unwrap_or(true);
635 0 : let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all);
636 :
637 0 : let global_timelines = get_global_timelines(&request);
638 :
639 0 : let args = debug_dump::Args {
640 0 : dump_all,
641 0 : dump_control_file,
642 0 : dump_memory,
643 0 : dump_disk_content,
644 0 : dump_term_history,
645 0 : dump_wal_last_modified,
646 0 : tenant_id,
647 0 : timeline_id,
648 0 : };
649 :
650 0 : let resp = debug_dump::build(args, global_timelines)
651 0 : .await
652 0 : .map_err(ApiError::InternalServerError)?;
653 :
654 0 : let started_at = std::time::Instant::now();
655 :
656 0 : let (tx, rx) = mpsc::channel(1);
657 :
658 0 : let body = Body::wrap_stream(ReceiverStream::new(rx));
659 :
660 0 : let mut writer = ChannelWriter::new(128 * 1024, tx);
661 :
662 0 : let response = Response::builder()
663 0 : .status(200)
664 0 : .header(hyper::header::CONTENT_TYPE, "application/octet-stream")
665 0 : .body(body)
666 0 : .unwrap();
667 :
668 0 : let span = info_span!("blocking");
669 0 : tokio::task::spawn_blocking(move || {
670 0 : let _span = span.entered();
671 :
672 0 : let res = serde_json::to_writer(&mut writer, &resp)
673 0 : .map_err(std::io::Error::from)
674 0 : .and_then(|_| writer.flush());
675 :
676 0 : match res {
677 : Ok(()) => {
678 0 : tracing::info!(
679 0 : bytes = writer.flushed_bytes(),
680 0 : elapsed_ms = started_at.elapsed().as_millis(),
681 0 : "responded /v1/debug_dump"
682 : );
683 : }
684 0 : Err(e) => {
685 0 : tracing::warn!("failed to write out /v1/debug_dump response: {e:#}");
686 : // semantics of this error are quite... unclear. we want to error the stream out to
687 : // abort the response to somehow notify the client that we failed.
688 : //
689 : // though, most likely the reason for failure is that the receiver is already gone.
690 0 : drop(
691 0 : writer
692 0 : .tx
693 0 : .blocking_send(Err(std::io::ErrorKind::BrokenPipe.into())),
694 : );
695 : }
696 : }
697 0 : });
698 :
699 0 : Ok(response)
700 0 : }
701 :
702 : /// Safekeeper http router.
703 0 : pub fn make_router(
704 0 : conf: Arc<SafeKeeperConf>,
705 0 : global_timelines: Arc<GlobalTimelines>,
706 0 : ) -> RouterBuilder<hyper::Body, ApiError> {
707 0 : let mut router = endpoint::make_router();
708 0 : if conf.http_auth.is_some() {
709 0 : router = router.middleware(auth_middleware(|request| {
710 : const ALLOWLIST_ROUTES: &[&str] =
711 : &["/v1/status", "/metrics", "/profile/cpu", "/profile/heap"];
712 0 : if ALLOWLIST_ROUTES.contains(&request.uri().path()) {
713 0 : None
714 : } else {
715 : // Option<Arc<SwappableJwtAuth>> is always provided as data below, hence unwrap().
716 0 : request
717 0 : .data::<Option<Arc<SwappableJwtAuth>>>()
718 0 : .unwrap()
719 0 : .as_deref()
720 : }
721 0 : }))
722 0 : }
723 :
724 0 : let force_metric_collection_on_scrape = conf.force_metric_collection_on_scrape;
725 :
726 0 : let prometheus_metrics_handler_wrapper =
727 0 : move |req| prometheus_metrics_handler(req, force_metric_collection_on_scrape);
728 :
729 : // NB: on any changes do not forget to update the OpenAPI spec
730 : // located nearby (/safekeeper/src/http/openapi_spec.yaml).
731 0 : let auth = conf.http_auth.clone();
732 0 : router
733 0 : .data(conf)
734 0 : .data(global_timelines)
735 0 : .data(auth)
736 0 : .get("/metrics", move |r| {
737 0 : request_span(r, prometheus_metrics_handler_wrapper)
738 0 : })
739 0 : .get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
740 0 : .get("/profile/heap", |r| request_span(r, profile_heap_handler))
741 0 : .get("/v1/status", |r| request_span(r, status_handler))
742 0 : .put("/v1/failpoints", |r| {
743 0 : request_span(r, move |r| async {
744 0 : check_permission(&r, None)?;
745 0 : let cancel = CancellationToken::new();
746 0 : failpoints_handler(r, cancel).await
747 0 : })
748 0 : })
749 0 : .get("/v1/utilization", |r| request_span(r, utilization_handler))
750 : /* BEGIN_HADRON */
751 0 : .get("/v1/debug/filesystem_usage", |r| {
752 0 : request_span(r, filesystem_usage_handler)
753 0 : })
754 : /* END_HADRON */
755 0 : .delete("/v1/tenant/:tenant_id", |r| {
756 0 : request_span(r, tenant_delete_handler)
757 0 : })
758 : // Will be used in the future instead of implicit timeline creation
759 0 : .post("/v1/tenant/timeline", |r| {
760 0 : request_span(r, timeline_create_handler)
761 0 : })
762 0 : .get("/v1/tenant/timeline", |r| {
763 0 : request_span(r, timeline_list_handler)
764 0 : })
765 0 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
766 0 : request_span(r, timeline_status_handler)
767 0 : })
768 0 : .delete("/v1/tenant/:tenant_id/timeline/:timeline_id", |r| {
769 0 : request_span(r, timeline_delete_handler)
770 0 : })
771 0 : .post("/v1/pull_timeline", |r| {
772 0 : request_span(r, timeline_pull_handler)
773 0 : })
774 0 : .put("/v1/tenant/:tenant_id/timeline/:timeline_id/exclude", |r| {
775 0 : request_span(r, timeline_exclude_handler)
776 0 : })
777 0 : .get(
778 : "/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
779 0 : |r| request_span(r, timeline_snapshot_handler),
780 : )
781 0 : .put(
782 : "/v1/tenant/:tenant_id/timeline/:timeline_id/membership",
783 0 : |r| request_span(r, timeline_membership_handler),
784 : )
785 0 : .post(
786 : "/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
787 0 : |r| request_span(r, timeline_copy_handler),
788 : )
789 0 : .patch(
790 : "/v1/tenant/:tenant_id/timeline/:timeline_id/control_file",
791 0 : |r| request_span(r, patch_control_file_handler),
792 : )
793 0 : .post(
794 : "/v1/tenant/:tenant_id/timeline/:timeline_id/checkpoint",
795 0 : |r| request_span(r, timeline_checkpoint_handler),
796 : )
797 0 : .get("/v1/tenant/:tenant_id/timeline/:timeline_id/digest", |r| {
798 0 : request_span(r, timeline_digest_handler)
799 0 : })
800 0 : .post(
801 : "/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset",
802 0 : |r| request_span(r, timeline_backup_partial_reset),
803 : )
804 0 : .post(
805 : "/v1/tenant/:tenant_id/timeline/:timeline_id/term_bump",
806 0 : |r| request_span(r, timeline_term_bump_handler),
807 : )
808 0 : .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
809 0 : request_span(r, record_safekeeper_info)
810 0 : })
811 0 : .get("/v1/debug_dump", |r| request_span(r, dump_debug_handler))
812 0 : }
813 :
814 : #[cfg(test)]
815 : mod tests {
816 : use super::*;
817 :
818 : #[test]
819 1 : fn test_term_switch_entry_api_serialize() {
820 1 : let state = AcceptorStateStatus {
821 1 : term: 1,
822 1 : epoch: 1,
823 1 : term_history: vec![TermSwitchApiEntry {
824 1 : term: 1,
825 1 : lsn: Lsn(0x16FFDDDD),
826 1 : }],
827 1 : };
828 1 : let json = serde_json::to_string(&state).unwrap();
829 1 : assert_eq!(
830 : json,
831 : "{\"term\":1,\"epoch\":1,\"term_history\":[{\"term\":1,\"lsn\":\"0/16FFDDDD\"}]}"
832 : );
833 1 : }
834 : }
|