Line data Source code
1 : use std::{str::FromStr, time::Duration};
2 :
3 : use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
4 : use reqwest::StatusCode;
5 : use safekeeper_client::mgmt_api;
6 : use tokio_util::sync::CancellationToken;
7 : use utils::{backoff, id::NodeId, logging::SecretString};
8 :
9 : use crate::{
10 : heartbeater::SafekeeperState,
11 : persistence::{DatabaseError, SafekeeperPersistence},
12 : safekeeper_client::SafekeeperClient,
13 : };
14 :
15 : #[derive(Clone)]
16 : pub struct Safekeeper {
17 : pub(crate) skp: SafekeeperPersistence,
18 : cancel: CancellationToken,
19 : listen_http_addr: String,
20 : listen_http_port: u16,
21 : scheduling_policy: SkSchedulingPolicy,
22 : id: NodeId,
23 : availability: SafekeeperState,
24 : }
25 :
26 : impl Safekeeper {
27 0 : pub(crate) fn from_persistence(skp: SafekeeperPersistence, cancel: CancellationToken) -> Self {
28 0 : let scheduling_policy = SkSchedulingPolicy::from_str(&skp.scheduling_policy).unwrap();
29 0 : Self {
30 0 : cancel,
31 0 : listen_http_addr: skp.host.clone(),
32 0 : listen_http_port: skp.http_port as u16,
33 0 : id: NodeId(skp.id as u64),
34 0 : skp,
35 0 : availability: SafekeeperState::Offline,
36 0 : scheduling_policy,
37 0 : }
38 0 : }
39 0 : pub(crate) fn base_url(&self) -> String {
40 0 : format!("http://{}:{}", self.listen_http_addr, self.listen_http_port)
41 0 : }
42 :
43 0 : pub(crate) fn get_id(&self) -> NodeId {
44 0 : self.id
45 0 : }
46 0 : pub(crate) fn describe_response(&self) -> Result<SafekeeperDescribeResponse, DatabaseError> {
47 0 : self.skp.as_describe_response()
48 0 : }
49 0 : pub(crate) fn set_availability(&mut self, availability: SafekeeperState) {
50 0 : self.availability = availability;
51 0 : }
52 0 : pub(crate) fn scheduling_policy(&self) -> SkSchedulingPolicy {
53 0 : self.scheduling_policy
54 0 : }
55 0 : pub(crate) fn set_scheduling_policy(&mut self, scheduling_policy: SkSchedulingPolicy) {
56 0 : self.scheduling_policy = scheduling_policy;
57 0 : self.skp.scheduling_policy = String::from(scheduling_policy);
58 0 : }
59 : /// Perform an operation (which is given a [`SafekeeperClient`]) with retries
60 0 : pub(crate) async fn with_client_retries<T, O, F>(
61 0 : &self,
62 0 : mut op: O,
63 0 : jwt: &Option<SecretString>,
64 0 : warn_threshold: u32,
65 0 : max_retries: u32,
66 0 : timeout: Duration,
67 0 : cancel: &CancellationToken,
68 0 : ) -> mgmt_api::Result<T>
69 0 : where
70 0 : O: FnMut(SafekeeperClient) -> F,
71 0 : F: std::future::Future<Output = mgmt_api::Result<T>>,
72 0 : {
73 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
74 : use mgmt_api::Error::*;
75 0 : match e {
76 0 : ReceiveBody(_) | ReceiveErrorBody(_) => false,
77 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
78 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
79 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
80 0 : ApiError(_, _) => true,
81 0 : Cancelled => true,
82 : }
83 0 : }
84 :
85 0 : backoff::retry(
86 0 : || {
87 0 : let http_client = reqwest::ClientBuilder::new()
88 0 : .timeout(timeout)
89 0 : .build()
90 0 : .expect("Failed to construct HTTP client");
91 0 :
92 0 : let client = SafekeeperClient::from_client(
93 0 : self.get_id(),
94 0 : http_client,
95 0 : self.base_url(),
96 0 : jwt.clone(),
97 0 : );
98 0 :
99 0 : let node_cancel_fut = self.cancel.cancelled();
100 0 :
101 0 : let op_fut = op(client);
102 :
103 0 : async {
104 0 : tokio::select! {
105 0 : r = op_fut=> {r},
106 0 : _ = node_cancel_fut => {
107 0 : Err(mgmt_api::Error::Cancelled)
108 : }}
109 0 : }
110 0 : },
111 0 : is_fatal,
112 0 : warn_threshold,
113 0 : max_retries,
114 0 : &format!(
115 0 : "Call to safekeeper {} ({}:{}) management API",
116 0 : self.id, self.listen_http_addr, self.listen_http_port
117 0 : ),
118 0 : cancel,
119 0 : )
120 0 : .await
121 0 : .unwrap_or(Err(mgmt_api::Error::Cancelled))
122 0 : }
123 :
124 0 : pub(crate) fn update_from_record(&mut self, record: crate::persistence::SafekeeperUpsert) {
125 0 : let crate::persistence::SafekeeperUpsert {
126 0 : active: _,
127 0 : availability_zone_id: _,
128 0 : host,
129 0 : http_port,
130 0 : id,
131 0 : port: _,
132 0 : region_id: _,
133 0 : version: _,
134 0 : } = record.clone();
135 0 : if id != self.id.0 as i64 {
136 : // The way the function is called ensures this. If we regress on that, it's a bug.
137 0 : panic!(
138 0 : "id can't be changed via update_from_record function: {id} != {}",
139 0 : self.id.0
140 0 : );
141 0 : }
142 0 : self.skp =
143 0 : crate::persistence::SafekeeperPersistence::from_upsert(record, self.scheduling_policy);
144 0 : self.listen_http_port = http_port as u16;
145 0 : self.listen_http_addr = host;
146 0 : }
147 : }
|