Line data Source code
1 : use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
2 :
3 : use clashmap::{ClashMap, Entry};
4 : use safekeeper_api::models::PullTimelineRequest;
5 : use safekeeper_client::mgmt_api;
6 : use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
7 : use tokio_util::sync::CancellationToken;
8 : use tracing::Instrument;
9 : use utils::{
10 : id::{NodeId, TenantId, TimelineId},
11 : logging::SecretString,
12 : };
13 :
14 : use crate::{
15 : persistence::SafekeeperTimelineOpKind, safekeeper::Safekeeper,
16 : safekeeper_client::SafekeeperClient,
17 : };
18 :
19 : use super::Service;
20 :
21 : pub(crate) struct SafekeeperReconcilers {
22 : cancel: CancellationToken,
23 : reconcilers: HashMap<NodeId, ReconcilerHandle>,
24 : }
25 :
26 : impl SafekeeperReconcilers {
27 0 : pub fn new(cancel: CancellationToken) -> Self {
28 0 : SafekeeperReconcilers {
29 0 : cancel,
30 0 : reconcilers: HashMap::new(),
31 0 : }
32 0 : }
33 0 : pub(crate) fn schedule_request_vec(
34 0 : &mut self,
35 0 : service: &Arc<Service>,
36 0 : reqs: Vec<ScheduleRequest>,
37 0 : ) {
38 0 : for req in reqs {
39 0 : self.schedule_request(service, req);
40 0 : }
41 0 : }
42 0 : pub(crate) fn schedule_request(&mut self, service: &Arc<Service>, req: ScheduleRequest) {
43 0 : let node_id = req.safekeeper.get_id();
44 0 : let reconciler_handle = self.reconcilers.entry(node_id).or_insert_with(|| {
45 0 : SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone())
46 0 : });
47 0 : reconciler_handle.schedule_reconcile(req);
48 0 : }
49 0 : pub(crate) fn cancel_safekeeper(&mut self, node_id: NodeId) {
50 0 : if let Some(handle) = self.reconcilers.remove(&node_id) {
51 0 : handle.cancel.cancel();
52 0 : }
53 0 : }
54 : }
55 :
56 : /// Initial load of the pending operations from the db
57 0 : pub(crate) async fn load_schedule_requests(
58 0 : service: &Arc<Service>,
59 0 : safekeepers: &HashMap<NodeId, Safekeeper>,
60 0 : ) -> anyhow::Result<Vec<ScheduleRequest>> {
61 0 : let pending_ops = service.persistence.list_pending_ops(None).await?;
62 0 : let mut res = Vec::with_capacity(pending_ops.len());
63 0 : for op_persist in pending_ops {
64 0 : let node_id = NodeId(op_persist.sk_id as u64);
65 0 : let Some(sk) = safekeepers.get(&node_id) else {
66 : // This shouldn't happen, at least the safekeeper should exist as decomissioned.
67 0 : tracing::warn!(
68 : tenant_id = op_persist.tenant_id,
69 : timeline_id = op_persist.timeline_id,
70 0 : "couldn't find safekeeper with pending op id {node_id} in list of stored safekeepers"
71 : );
72 0 : continue;
73 : };
74 0 : let sk = Box::new(sk.clone());
75 0 : let tenant_id = TenantId::from_str(&op_persist.tenant_id)?;
76 0 : let timeline_id = TimelineId::from_str(&op_persist.timeline_id)?;
77 0 : let host_list = match op_persist.op_kind {
78 0 : SafekeeperTimelineOpKind::Delete => Vec::new(),
79 0 : SafekeeperTimelineOpKind::Exclude => Vec::new(),
80 : SafekeeperTimelineOpKind::Pull => {
81 : // TODO this code is super hacky, it doesn't take migrations into account
82 0 : let timeline_persist = service
83 0 : .persistence
84 0 : .get_timeline(tenant_id, timeline_id)
85 0 : .await?;
86 0 : let Some(timeline_persist) = timeline_persist else {
87 : // This shouldn't happen, the timeline should still exist
88 0 : tracing::warn!(
89 : tenant_id = op_persist.tenant_id,
90 : timeline_id = op_persist.timeline_id,
91 0 : "couldn't find timeline for corresponding pull op"
92 : );
93 0 : continue;
94 : };
95 0 : timeline_persist
96 0 : .sk_set
97 0 : .iter()
98 0 : .filter_map(|sk_id| {
99 0 : let other_node_id = NodeId(*sk_id as u64);
100 0 : if node_id == other_node_id {
101 : // We obviously don't want to pull from ourselves
102 0 : return None;
103 0 : }
104 0 : let Some(sk) = safekeepers.get(&other_node_id) else {
105 0 : tracing::warn!(
106 0 : "couldnt find safekeeper with pending op id {other_node_id}, not pulling from it"
107 : );
108 0 : return None;
109 : };
110 0 : Some((other_node_id, sk.base_url()))
111 0 : })
112 0 : .collect::<Vec<_>>()
113 : }
114 : };
115 0 : let req = ScheduleRequest {
116 0 : safekeeper: sk,
117 0 : host_list,
118 0 : tenant_id,
119 0 : timeline_id,
120 0 : generation: op_persist.generation as u32,
121 0 : kind: op_persist.op_kind,
122 0 : };
123 0 : res.push(req);
124 : }
125 0 : Ok(res)
126 0 : }
127 :
128 : pub(crate) struct ScheduleRequest {
129 : pub(crate) safekeeper: Box<Safekeeper>,
130 : pub(crate) host_list: Vec<(NodeId, String)>,
131 : pub(crate) tenant_id: TenantId,
132 : pub(crate) timeline_id: TimelineId,
133 : pub(crate) generation: u32,
134 : pub(crate) kind: SafekeeperTimelineOpKind,
135 : }
136 :
137 : struct ReconcilerHandle {
138 : tx: UnboundedSender<(ScheduleRequest, Arc<CancellationToken>)>,
139 : ongoing_tokens: Arc<ClashMap<(TenantId, TimelineId), Arc<CancellationToken>>>,
140 : cancel: CancellationToken,
141 : }
142 :
143 : impl ReconcilerHandle {
144 : /// Obtain a new token slot, cancelling any existing reconciliations for that timeline
145 0 : fn new_token_slot(
146 0 : &self,
147 0 : tenant_id: TenantId,
148 0 : timeline_id: TimelineId,
149 0 : ) -> Arc<CancellationToken> {
150 0 : let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
151 0 : if let Entry::Occupied(entry) = &entry {
152 0 : let cancel: &CancellationToken = entry.get();
153 0 : cancel.cancel();
154 0 : }
155 0 : entry.insert(Arc::new(self.cancel.child_token())).clone()
156 0 : }
157 0 : fn schedule_reconcile(&self, req: ScheduleRequest) {
158 0 : let cancel = self.new_token_slot(req.tenant_id, req.timeline_id);
159 0 : let hostname = req.safekeeper.skp.host.clone();
160 0 : if let Err(err) = self.tx.send((req, cancel)) {
161 0 : tracing::info!("scheduling request onto {hostname} returned error: {err}");
162 0 : }
163 0 : }
164 : }
165 :
166 : pub(crate) struct SafekeeperReconciler {
167 : service: Arc<Service>,
168 : rx: UnboundedReceiver<(ScheduleRequest, Arc<CancellationToken>)>,
169 : cancel: CancellationToken,
170 : }
171 :
172 : impl SafekeeperReconciler {
173 0 : fn spawn(cancel: CancellationToken, service: Arc<Service>) -> ReconcilerHandle {
174 0 : // We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
175 0 : let (tx, rx) = mpsc::unbounded_channel();
176 0 : let mut reconciler = SafekeeperReconciler {
177 0 : service,
178 0 : rx,
179 0 : cancel: cancel.clone(),
180 0 : };
181 0 : let handle = ReconcilerHandle {
182 0 : tx,
183 0 : ongoing_tokens: Arc::new(ClashMap::new()),
184 0 : cancel,
185 0 : };
186 0 : tokio::spawn(async move { reconciler.run().await });
187 0 : handle
188 0 : }
189 0 : async fn run(&mut self) {
190 : loop {
191 : // TODO add parallelism with semaphore here
192 0 : let req = tokio::select! {
193 0 : req = self.rx.recv() => req,
194 0 : _ = self.cancel.cancelled() => break,
195 : };
196 0 : let Some((req, req_cancel)) = req else { break };
197 0 : if req_cancel.is_cancelled() {
198 0 : continue;
199 0 : }
200 0 :
201 0 : let kind = req.kind;
202 0 : let tenant_id = req.tenant_id;
203 0 : let timeline_id = req.timeline_id;
204 0 : self.reconcile_one(req, req_cancel)
205 0 : .instrument(tracing::info_span!(
206 0 : "reconcile_one",
207 0 : ?kind,
208 0 : %tenant_id,
209 0 : %timeline_id
210 0 : ))
211 0 : .await;
212 : }
213 0 : }
214 0 : async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: Arc<CancellationToken>) {
215 0 : let req_host = req.safekeeper.skp.host.clone();
216 0 : match req.kind {
217 : SafekeeperTimelineOpKind::Pull => {
218 0 : let our_id = req.safekeeper.get_id();
219 0 : let http_hosts = req
220 0 : .host_list
221 0 : .iter()
222 0 : .filter(|(node_id, _hostname)| *node_id != our_id)
223 0 : .map(|(_, hostname)| hostname.clone())
224 0 : .collect::<Vec<_>>();
225 0 : let pull_req = PullTimelineRequest {
226 0 : http_hosts,
227 0 : tenant_id: req.tenant_id,
228 0 : timeline_id: req.timeline_id,
229 0 : };
230 0 : self.reconcile_inner(
231 0 : req,
232 0 : async |client| client.pull_timeline(&pull_req).await,
233 0 : |resp| {
234 0 : tracing::info!(
235 0 : "pulled timeline from {} onto {req_host}",
236 : resp.safekeeper_host,
237 : );
238 0 : },
239 0 : req_cancel,
240 0 : )
241 0 : .await;
242 : }
243 : SafekeeperTimelineOpKind::Exclude => {
244 : // TODO actually exclude instead of delete here
245 0 : let tenant_id = req.tenant_id;
246 0 : let timeline_id = req.timeline_id;
247 0 : self.reconcile_inner(
248 0 : req,
249 0 : async |client| client.delete_timeline(tenant_id, timeline_id).await,
250 0 : |_resp| {
251 0 : tracing::info!("deleted timeline from {req_host}");
252 0 : },
253 0 : req_cancel,
254 0 : )
255 0 : .await;
256 : }
257 : SafekeeperTimelineOpKind::Delete => {
258 0 : let tenant_id = req.tenant_id;
259 0 : let timeline_id = req.timeline_id;
260 0 : self.reconcile_inner(
261 0 : req,
262 0 : async |client| client.delete_timeline(tenant_id, timeline_id).await,
263 0 : |_resp| {
264 0 : tracing::info!("deleted timeline from {req_host}");
265 0 : },
266 0 : req_cancel,
267 0 : )
268 0 : .await;
269 : }
270 : }
271 0 : }
272 0 : async fn reconcile_inner<T, F, U>(
273 0 : &self,
274 0 : req: ScheduleRequest,
275 0 : closure: impl Fn(SafekeeperClient) -> F,
276 0 : log_success: impl FnOnce(T) -> U,
277 0 : req_cancel: Arc<CancellationToken>,
278 0 : ) where
279 0 : F: Future<Output = Result<T, safekeeper_client::mgmt_api::Error>>,
280 0 : {
281 0 : let jwt = self
282 0 : .service
283 0 : .config
284 0 : .safekeeper_jwt_token
285 0 : .clone()
286 0 : .map(SecretString::from);
287 0 : let ssl_ca_cert = self.service.config.ssl_ca_cert.clone();
288 : loop {
289 0 : let res = req
290 0 : .safekeeper
291 0 : .with_client_retries(
292 0 : |client| {
293 0 : let closure = &closure;
294 0 : async move { closure(client).await }
295 0 : },
296 0 : &jwt,
297 0 : &ssl_ca_cert,
298 0 : 3,
299 0 : 10,
300 0 : Duration::from_secs(10),
301 0 : &req_cancel,
302 0 : )
303 0 : .await;
304 0 : match res {
305 0 : Ok(resp) => {
306 0 : log_success(resp);
307 0 : let res = self
308 0 : .service
309 0 : .persistence
310 0 : .remove_pending_op(
311 0 : req.tenant_id,
312 0 : req.timeline_id,
313 0 : req.safekeeper.get_id(),
314 0 : req.generation,
315 0 : )
316 0 : .await;
317 0 : if let Err(err) = res {
318 0 : tracing::info!(
319 0 : "couldn't remove reconciliation request onto {} from persistence: {err:?}",
320 : req.safekeeper.skp.host
321 : );
322 0 : }
323 0 : return;
324 : }
325 : Err(mgmt_api::Error::Cancelled) => {
326 : // On cancellation, the code that issued it will take care of removing db entries (if needed)
327 0 : return;
328 : }
329 0 : Err(e) => {
330 0 : tracing::info!(
331 0 : "Reconcile attempt for safekeeper {} failed, retrying after sleep: {e:?}",
332 : req.safekeeper.skp.host
333 : );
334 : const SLEEP_TIME: Duration = Duration::from_secs(1);
335 0 : tokio::time::sleep(SLEEP_TIME).await;
336 : }
337 : }
338 : }
339 0 : }
340 : }
|