Line data Source code
1 : use std::{str::FromStr, time::Duration};
2 :
3 : use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
4 : use reqwest::StatusCode;
5 : use safekeeper_client::mgmt_api;
6 : use tokio_util::sync::CancellationToken;
7 : use utils::{backoff, id::NodeId, logging::SecretString};
8 :
9 : use crate::{
10 : heartbeater::SafekeeperState,
11 : persistence::{DatabaseError, SafekeeperPersistence},
12 : safekeeper_client::SafekeeperClient,
13 : };
14 :
15 : #[derive(Clone)]
16 : pub struct Safekeeper {
17 : pub(crate) skp: SafekeeperPersistence,
18 : cancel: CancellationToken,
19 : listen_http_addr: String,
20 : listen_http_port: u16,
21 : id: NodeId,
22 : availability: SafekeeperState,
23 : }
24 :
25 : impl Safekeeper {
26 0 : pub(crate) fn from_persistence(skp: SafekeeperPersistence, cancel: CancellationToken) -> Self {
27 0 : Self {
28 0 : cancel,
29 0 : listen_http_addr: skp.host.clone(),
30 0 : listen_http_port: skp.http_port as u16,
31 0 : id: NodeId(skp.id as u64),
32 0 : skp,
33 0 : availability: SafekeeperState::Offline,
34 0 : }
35 0 : }
36 0 : pub(crate) fn base_url(&self) -> String {
37 0 : format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
38 0 : }
39 :
40 0 : pub(crate) fn get_id(&self) -> NodeId {
41 0 : self.id
42 0 : }
43 0 : pub(crate) fn describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
44 0 : self.skp.as_describe_response()
45 0 : }
46 0 : pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
47 0 : self.availability = availability;
48 0 : }
49 : /// Perform an operation (which is given a [`SafekeeperClient`]) with retries
50 0 : pub(crate) async fn with_client_retries<T, O, F>(
51 0 : &self,
52 0 : mut op: O,
53 0 : jwt: &Option<SecretString>,
54 0 : warn_threshold: u32,
55 0 : max_retries: u32,
56 0 : timeout: Duration,
57 0 : cancel: &CancellationToken,
58 0 : ) -> mgmt_api::Result<T>
59 0 : where
60 0 : O: FnMut(SafekeeperClient) -> F,
61 0 : F: std::future::Future<Output = mgmt_api::Result<T>>,
62 0 : {
63 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
64 : use mgmt_api::Error::*;
65 0 : match e {
66 0 : ReceiveBody(_) | ReceiveErrorBody(_) => false,
67 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
68 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
69 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
70 0 : ApiError(_, _) => true,
71 0 : Cancelled => true,
72 : }
73 0 : }
74 :
75 0 : backoff::retry(
76 0 : || {
77 0 : let http_client = reqwest::ClientBuilder::new()
78 0 : .timeout(timeout)
79 0 : .build()
80 0 : .expect("Failed to construct HTTP client");
81 0 :
82 0 : let client = SafekeeperClient::from_client(
83 0 : self.get_id(),
84 0 : http_client,
85 0 : self.base_url(),
86 0 : jwt.clone(),
87 0 : );
88 0 :
89 0 : let node_cancel_fut = self.cancel.cancelled();
90 0 :
91 0 : let op_fut = op(client);
92 :
93 0 : async {
94 0 : tokio::select! {
95 0 : r = op_fut=> {r},
96 0 : _ = node_cancel_fut => {
97 0 : Err(mgmt_api::Error::Cancelled)
98 : }}
99 0 : }
100 0 : },
101 0 : is_fatal,
102 0 : warn_threshold,
103 0 : max_retries,
104 0 : &format!(
105 0 : "Call to node {} ({}:{}) management API",
106 0 : self.id, self.listen_http_addr, self.listen_http_port
107 0 : ),
108 0 : cancel,
109 0 : )
110 0 : .await
111 0 : .unwrap_or(Err(mgmt_api::Error::Cancelled))
112 0 : }
113 :
114 0 : pub(crate) fn update_from_record(&mut self, record: crate::persistence::SafekeeperUpsert) {
115 0 : let crate::persistence::SafekeeperUpsert {
116 0 : active: _,
117 0 : availability_zone_id: _,
118 0 : host,
119 0 : http_port,
120 0 : id,
121 0 : port: _,
122 0 : region_id: _,
123 0 : version: _,
124 0 : } = record.clone();
125 0 : if id != self.id.0 as i64 {
126 : // The way the function is called ensures this. If we regress on that, it's a bug.
127 0 : panic!(
128 0 : "id can't be changed via update_from_record function: {id} != {}",
129 0 : self.id.0
130 0 : );
131 0 : }
132 0 : self.skp = crate::persistence::SafekeeperPersistence::from_upsert(
133 0 : record,
134 0 : SkSchedulingPolicy::from_str(&self.skp.scheduling_policy).unwrap(),
135 0 : );
136 0 : self.listen_http_port = http_port as u16;
137 0 : self.listen_http_addr = host;
138 0 : }
139 : }
|