Line data Source code
1 : use std::{
2 : collections::HashMap,
3 : str::FromStr,
4 : sync::{Arc, atomic::AtomicU64},
5 : time::Duration,
6 : };
7 :
8 : use clashmap::{ClashMap, Entry};
9 : use safekeeper_api::models::PullTimelineRequest;
10 : use safekeeper_client::mgmt_api;
11 : use tokio::sync::{
12 : Semaphore,
13 : mpsc::{self, UnboundedReceiver, UnboundedSender},
14 : };
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::Instrument;
17 : use utils::{
18 : id::{NodeId, TenantId, TimelineId},
19 : logging::SecretString,
20 : };
21 :
22 : use crate::{
23 : metrics::{METRICS_REGISTRY, SafekeeperReconcilerLabelGroup},
24 : persistence::SafekeeperTimelineOpKind,
25 : safekeeper::Safekeeper,
26 : safekeeper_client::SafekeeperClient,
27 : };
28 :
29 : use super::Service;
30 :
31 : pub(crate) struct SafekeeperReconcilers {
32 : cancel: CancellationToken,
33 : reconcilers: HashMap<NodeId, ReconcilerHandle>,
34 : }
35 :
36 : impl SafekeeperReconcilers {
37 0 : pub fn new(cancel: CancellationToken) -> Self {
38 0 : SafekeeperReconcilers {
39 0 : cancel,
40 0 : reconcilers: HashMap::new(),
41 0 : }
42 0 : }
43 : /// Adds a safekeeper-specific reconciler.
44 : /// Can be called multiple times, but it needs to be called at least once
45 : /// for every new safekeeper added.
46 0 : pub(crate) fn start_reconciler(&mut self, node_id: NodeId, service: &Arc<Service>) {
47 0 : self.reconcilers.entry(node_id).or_insert_with(|| {
48 0 : SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone())
49 0 : });
50 0 : }
51 : /// Stop a safekeeper-specific reconciler.
52 : /// Stops the reconciler, cancelling all ongoing tasks.
53 0 : pub(crate) fn stop_reconciler(&mut self, node_id: NodeId) {
54 0 : if let Some(handle) = self.reconcilers.remove(&node_id) {
55 0 : handle.cancel.cancel();
56 0 : }
57 0 : }
58 0 : pub(crate) fn schedule_request_vec(&self, reqs: Vec<ScheduleRequest>) {
59 0 : tracing::info!(
60 0 : "Scheduling {} pending safekeeper ops loaded from db",
61 0 : reqs.len()
62 : );
63 0 : for req in reqs {
64 0 : self.schedule_request(req);
65 0 : }
66 0 : }
67 0 : pub(crate) fn schedule_request(&self, req: ScheduleRequest) {
68 0 : let node_id = req.safekeeper.get_id();
69 0 : let reconciler_handle = self.reconcilers.get(&node_id).unwrap();
70 0 : reconciler_handle.schedule_reconcile(req);
71 0 : }
72 : /// Cancel ongoing reconciles for the given timeline
73 : ///
74 : /// Specifying `None` here only removes reconciles for the tenant-global reconciliation,
75 : /// instead of doing this for all timelines of the tenant.
76 : ///
77 : /// Callers must remove the reconciles from the db manually
78 0 : pub(crate) fn cancel_reconciles_for_timeline(
79 0 : &mut self,
80 0 : node_id: NodeId,
81 0 : tenant_id: TenantId,
82 0 : timeline_id: Option<TimelineId>,
83 0 : ) {
84 0 : if let Some(handle) = self.reconcilers.get(&node_id) {
85 0 : handle.cancel_reconciliation(tenant_id, timeline_id);
86 0 : }
87 0 : }
88 : }
89 :
90 : /// Initial load of the pending operations from the db
91 0 : pub(crate) async fn load_schedule_requests(
92 0 : service: &Arc<Service>,
93 0 : safekeepers: &HashMap<NodeId, Safekeeper>,
94 0 : ) -> anyhow::Result<Vec<ScheduleRequest>> {
95 0 : let pending_ops_timelines = service
96 0 : .persistence
97 0 : .list_pending_ops_with_timelines()
98 0 : .await?;
99 0 : let mut res = Vec::with_capacity(pending_ops_timelines.len());
100 0 : for (op_persist, timeline_persist) in pending_ops_timelines {
101 0 : let node_id = NodeId(op_persist.sk_id as u64);
102 0 : let Some(sk) = safekeepers.get(&node_id) else {
103 : // This shouldn't happen, at least the safekeeper should exist as decomissioned.
104 0 : tracing::warn!(
105 : tenant_id = op_persist.tenant_id,
106 : timeline_id = op_persist.timeline_id,
107 0 : "couldn't find safekeeper with pending op id {node_id} in list of stored safekeepers"
108 : );
109 0 : continue;
110 : };
111 0 : let sk = Box::new(sk.clone());
112 0 : let tenant_id = TenantId::from_str(&op_persist.tenant_id)?;
113 0 : let timeline_id = if !op_persist.timeline_id.is_empty() {
114 0 : Some(TimelineId::from_str(&op_persist.timeline_id)?)
115 : } else {
116 0 : None
117 : };
118 0 : let host_list = match op_persist.op_kind {
119 0 : SafekeeperTimelineOpKind::Delete => Vec::new(),
120 0 : SafekeeperTimelineOpKind::Exclude => Vec::new(),
121 : SafekeeperTimelineOpKind::Pull => {
122 0 : if timeline_id.is_none() {
123 : // We only do this extra check (outside of timeline_persist check) to give better error msgs
124 0 : anyhow::bail!(
125 0 : "timeline_id is empty for `pull` schedule request for {tenant_id}"
126 : );
127 0 : };
128 0 : let Some(timeline_persist) = timeline_persist else {
129 : // This shouldn't happen, the timeline should still exist
130 0 : tracing::warn!(
131 : tenant_id = op_persist.tenant_id,
132 : timeline_id = op_persist.timeline_id,
133 0 : "couldn't find timeline for corresponding pull op"
134 : );
135 0 : continue;
136 : };
137 0 : timeline_persist
138 0 : .sk_set
139 0 : .iter()
140 0 : .filter_map(|sk_id| {
141 0 : let other_node_id = NodeId(*sk_id as u64);
142 0 : if node_id == other_node_id {
143 : // We obviously don't want to pull from ourselves
144 0 : return None;
145 0 : }
146 0 : let Some(sk) = safekeepers.get(&other_node_id) else {
147 0 : tracing::warn!(
148 0 : "couldn't find safekeeper with pending op id {other_node_id}, not pulling from it"
149 : );
150 0 : return None;
151 : };
152 0 : Some((other_node_id, sk.base_url()))
153 0 : })
154 0 : .collect::<Vec<_>>()
155 : }
156 : };
157 0 : let req = ScheduleRequest {
158 0 : safekeeper: sk,
159 0 : host_list,
160 0 : tenant_id,
161 0 : timeline_id,
162 0 : generation: op_persist.generation as u32,
163 0 : kind: op_persist.op_kind,
164 0 : };
165 0 : res.push(req);
166 : }
167 0 : Ok(res)
168 0 : }
169 :
170 : pub(crate) struct ScheduleRequest {
171 : pub(crate) safekeeper: Box<Safekeeper>,
172 : pub(crate) host_list: Vec<(NodeId, String)>,
173 : pub(crate) tenant_id: TenantId,
174 : pub(crate) timeline_id: Option<TimelineId>,
175 : pub(crate) generation: u32,
176 : pub(crate) kind: SafekeeperTimelineOpKind,
177 : }
178 :
179 : /// A way to keep ongoing/queued reconcile requests apart
180 : #[derive(Copy, Clone, PartialEq, Eq)]
181 : struct TokenId(u64);
182 :
183 : type OngoingTokens = ClashMap<(TenantId, Option<TimelineId>), (CancellationToken, TokenId)>;
184 :
185 : /// Handle to per safekeeper reconciler.
186 : struct ReconcilerHandle {
187 : tx: UnboundedSender<(ScheduleRequest, CancellationToken, TokenId)>,
188 : ongoing_tokens: Arc<OngoingTokens>,
189 : token_id_counter: AtomicU64,
190 : cancel: CancellationToken,
191 : }
192 :
193 : impl ReconcilerHandle {
194 : /// Obtain a new token slot, cancelling any existing reconciliations for
195 : /// that timeline. It is not useful to have >1 operation per <tenant_id,
196 : /// timeline_id, safekeeper>, hence scheduling op cancels current one if it
197 : /// exists.
198 0 : fn new_token_slot(
199 0 : &self,
200 0 : tenant_id: TenantId,
201 0 : timeline_id: Option<TimelineId>,
202 0 : ) -> (CancellationToken, TokenId) {
203 0 : let token_id = self
204 0 : .token_id_counter
205 0 : .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
206 0 : let token_id = TokenId(token_id);
207 0 : let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
208 0 : if let Entry::Occupied(entry) = &entry {
209 0 : let (cancel, _) = entry.get();
210 0 : cancel.cancel();
211 0 : }
212 0 : entry.insert((self.cancel.child_token(), token_id)).clone()
213 0 : }
214 : /// Cancel an ongoing reconciliation
215 0 : fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
216 0 : if let Some((_, (cancel, _id))) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
217 0 : cancel.cancel();
218 0 : }
219 0 : }
220 0 : fn schedule_reconcile(&self, req: ScheduleRequest) {
221 0 : let (cancel, token_id) = self.new_token_slot(req.tenant_id, req.timeline_id);
222 0 : let hostname = req.safekeeper.skp.host.clone();
223 0 : let sk_az = req.safekeeper.skp.availability_zone_id.clone();
224 0 : let sk_node_id = req.safekeeper.get_id().to_string();
225 :
226 : // We don't have direct access to the queue depth here, so increase it blindly by 1.
227 : // We know that putting into the queue increases the queue depth. The receiver will
228 : // update with the correct value once it processes the next item. To avoid races where we
229 : // reduce before we increase, leaving the gauge with a 1 value for a long time, we
230 : // increase it before putting into the queue.
231 0 : let queued_gauge = &METRICS_REGISTRY
232 0 : .metrics_group
233 0 : .storage_controller_safekeeper_reconciles_queued;
234 0 : let label_group = SafekeeperReconcilerLabelGroup {
235 0 : sk_az: &sk_az,
236 0 : sk_node_id: &sk_node_id,
237 0 : sk_hostname: &hostname,
238 0 : };
239 0 : queued_gauge.inc(label_group.clone());
240 :
241 0 : if let Err(err) = self.tx.send((req, cancel, token_id)) {
242 0 : queued_gauge.set(label_group, 0);
243 0 : tracing::info!("scheduling request onto {hostname} returned error: {err}");
244 0 : }
245 0 : }
246 : }
247 :
248 : pub(crate) struct SafekeeperReconciler {
249 : inner: SafekeeperReconcilerInner,
250 : concurrency_limiter: Arc<Semaphore>,
251 : rx: UnboundedReceiver<(ScheduleRequest, CancellationToken, TokenId)>,
252 : cancel: CancellationToken,
253 : }
254 :
255 : /// Thin wrapper over `Service` to not clutter its inherent functions
256 : #[derive(Clone)]
257 : struct SafekeeperReconcilerInner {
258 : ongoing_tokens: Arc<OngoingTokens>,
259 : service: Arc<Service>,
260 : }
261 :
262 : impl SafekeeperReconciler {
263 0 : fn spawn(cancel: CancellationToken, service: Arc<Service>) -> ReconcilerHandle {
264 : // We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
265 0 : let (tx, rx) = mpsc::unbounded_channel();
266 0 : let concurrency = service.config.safekeeper_reconciler_concurrency;
267 0 : let ongoing_tokens = Arc::new(ClashMap::new());
268 0 : let mut reconciler = SafekeeperReconciler {
269 0 : inner: SafekeeperReconcilerInner {
270 0 : service,
271 0 : ongoing_tokens: ongoing_tokens.clone(),
272 0 : },
273 0 : rx,
274 0 : concurrency_limiter: Arc::new(Semaphore::new(concurrency)),
275 0 : cancel: cancel.clone(),
276 0 : };
277 0 : let handle = ReconcilerHandle {
278 0 : tx,
279 0 : ongoing_tokens,
280 0 : token_id_counter: AtomicU64::new(0),
281 0 : cancel,
282 0 : };
283 0 : tokio::spawn(async move { reconciler.run().await });
284 0 : handle
285 0 : }
286 0 : async fn run(&mut self) {
287 : loop {
288 0 : let req = tokio::select! {
289 0 : req = self.rx.recv() => req,
290 0 : _ = self.cancel.cancelled() => break,
291 : };
292 0 : let Some((req, req_cancel, req_token_id)) = req else {
293 0 : break;
294 : };
295 :
296 0 : let permit_res = tokio::select! {
297 0 : req = self.concurrency_limiter.clone().acquire_owned() => req,
298 0 : _ = self.cancel.cancelled() => break,
299 : };
300 0 : let Ok(_permit) = permit_res else { return };
301 :
302 0 : let inner = self.inner.clone();
303 0 : if req_cancel.is_cancelled() {
304 0 : continue;
305 0 : }
306 :
307 0 : let queued_gauge = &METRICS_REGISTRY
308 0 : .metrics_group
309 0 : .storage_controller_safekeeper_reconciles_queued;
310 0 : queued_gauge.set(
311 0 : SafekeeperReconcilerLabelGroup {
312 0 : sk_az: &req.safekeeper.skp.availability_zone_id,
313 0 : sk_node_id: &req.safekeeper.get_id().to_string(),
314 0 : sk_hostname: &req.safekeeper.skp.host,
315 0 : },
316 0 : self.rx.len() as i64,
317 : );
318 :
319 0 : tokio::task::spawn(async move {
320 0 : let kind = req.kind;
321 0 : let tenant_id = req.tenant_id;
322 0 : let timeline_id = req.timeline_id;
323 0 : let node_id = req.safekeeper.skp.id;
324 0 : inner
325 0 : .reconcile_one(req, req_cancel, req_token_id)
326 0 : .instrument(tracing::info_span!(
327 : "reconcile_one",
328 : ?kind,
329 : %tenant_id,
330 : ?timeline_id,
331 : %node_id,
332 : ))
333 0 : .await;
334 0 : });
335 : }
336 0 : }
337 : }
338 :
339 : impl SafekeeperReconcilerInner {
340 0 : async fn reconcile_one(
341 0 : &self,
342 0 : req: ScheduleRequest,
343 0 : req_cancel: CancellationToken,
344 0 : req_token_id: TokenId,
345 0 : ) {
346 0 : let req_host = req.safekeeper.skp.host.clone();
347 : let success;
348 0 : match req.kind {
349 : SafekeeperTimelineOpKind::Pull => {
350 0 : let Some(timeline_id) = req.timeline_id else {
351 0 : tracing::warn!(
352 0 : "ignoring invalid schedule request: timeline_id is empty for `pull`"
353 : );
354 0 : return;
355 : };
356 0 : let our_id = req.safekeeper.get_id();
357 0 : let http_hosts = req
358 0 : .host_list
359 0 : .iter()
360 0 : .filter(|(node_id, _hostname)| *node_id != our_id)
361 0 : .map(|(_, hostname)| hostname.clone())
362 0 : .collect::<Vec<_>>();
363 0 : let pull_req = PullTimelineRequest {
364 0 : http_hosts,
365 0 : tenant_id: req.tenant_id,
366 0 : timeline_id,
367 0 : ignore_tombstone: Some(false),
368 0 : };
369 0 : success = self
370 0 : .reconcile_inner(
371 0 : &req,
372 0 : async |client| client.pull_timeline(&pull_req).await,
373 0 : |resp| {
374 0 : if let Some(host) = resp.safekeeper_host {
375 0 : tracing::info!("pulled timeline from {host} onto {req_host}");
376 : } else {
377 0 : tracing::info!(
378 0 : "timeline already present on safekeeper on {req_host}"
379 : );
380 : }
381 0 : },
382 0 : req_cancel,
383 : )
384 0 : .await;
385 : }
386 : SafekeeperTimelineOpKind::Exclude => {
387 : // TODO actually exclude instead of delete here
388 0 : let tenant_id = req.tenant_id;
389 0 : let Some(timeline_id) = req.timeline_id else {
390 0 : tracing::warn!(
391 0 : "ignoring invalid schedule request: timeline_id is empty for `exclude`"
392 : );
393 0 : return;
394 : };
395 0 : success = self
396 0 : .reconcile_inner(
397 0 : &req,
398 0 : async |client| client.delete_timeline(tenant_id, timeline_id).await,
399 0 : |_resp| {
400 0 : tracing::info!("deleted timeline from {req_host}");
401 0 : },
402 0 : req_cancel,
403 : )
404 0 : .await;
405 : }
406 : SafekeeperTimelineOpKind::Delete => {
407 0 : let tenant_id = req.tenant_id;
408 0 : if let Some(timeline_id) = req.timeline_id {
409 0 : success = self
410 0 : .reconcile_inner(
411 0 : &req,
412 0 : async |client| client.delete_timeline(tenant_id, timeline_id).await,
413 0 : |_resp| {
414 0 : tracing::info!("deleted timeline from {req_host}");
415 0 : },
416 0 : req_cancel,
417 : )
418 0 : .await;
419 0 : if success {
420 0 : self.delete_timeline_from_db(tenant_id, timeline_id).await;
421 0 : }
422 : } else {
423 0 : success = self
424 0 : .reconcile_inner(
425 0 : &req,
426 0 : async |client| client.delete_tenant(tenant_id).await,
427 0 : |_resp| {
428 0 : tracing::info!(%tenant_id, "deleted tenant from {req_host}");
429 0 : },
430 0 : req_cancel,
431 : )
432 0 : .await;
433 0 : if success {
434 0 : self.delete_tenant_timelines_from_db(tenant_id).await;
435 0 : }
436 : }
437 : }
438 : }
439 0 : if success {
440 0 : self.ongoing_tokens.remove_if(
441 0 : &(req.tenant_id, req.timeline_id),
442 0 : |_ttid, (_cancel, token_id)| {
443 : // Ensure that this request is indeed the request we just finished and not a new one
444 0 : req_token_id == *token_id
445 0 : },
446 : );
447 0 : }
448 0 : }
449 0 : async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) {
450 0 : match self
451 0 : .service
452 0 : .persistence
453 0 : .list_pending_ops_for_timeline(tenant_id, timeline_id)
454 0 : .await
455 : {
456 0 : Ok(list) => {
457 0 : if !list.is_empty() {
458 : // duplicate the timeline_id here because it might be None in the reconcile context
459 0 : tracing::info!(%timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len());
460 0 : return;
461 0 : }
462 : }
463 0 : Err(e) => {
464 0 : tracing::warn!(%timeline_id, "couldn't query pending ops: {e}");
465 0 : return;
466 : }
467 : }
468 0 : tracing::info!(%tenant_id, %timeline_id, "deleting timeline from db after all reconciles succeeded");
469 : // In theory we could crash right after deleting the op from the db and right before reaching this,
470 : // but then we'll boot up with a timeline that has deleted_at set, so hopefully we'll issue deletion ops for it again.
471 0 : if let Err(err) = self
472 0 : .service
473 0 : .persistence
474 0 : .delete_timeline(tenant_id, timeline_id)
475 0 : .await
476 : {
477 0 : tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
478 0 : }
479 0 : }
480 0 : async fn delete_tenant_timelines_from_db(&self, tenant_id: TenantId) {
481 0 : let timeline_list = match self
482 0 : .service
483 0 : .persistence
484 0 : .list_timelines_for_tenant(tenant_id)
485 0 : .await
486 : {
487 0 : Ok(timeline_list) => timeline_list,
488 0 : Err(e) => {
489 0 : tracing::warn!(%tenant_id, "couldn't query timelines: {e}");
490 0 : return;
491 : }
492 : };
493 0 : for timeline in timeline_list {
494 0 : let Ok(timeline_id) = TimelineId::from_str(&timeline.timeline_id) else {
495 0 : tracing::warn!("Invalid timeline ID in database {}", timeline.timeline_id);
496 0 : continue;
497 : };
498 0 : self.delete_timeline_from_db(tenant_id, timeline_id).await;
499 : }
500 0 : }
501 : /// Returns whether the reconciliation happened successfully (or we got cancelled)
502 0 : async fn reconcile_inner<T, F, U>(
503 0 : &self,
504 0 : req: &ScheduleRequest,
505 0 : closure: impl Fn(SafekeeperClient) -> F,
506 0 : log_success: impl FnOnce(T) -> U,
507 0 : req_cancel: CancellationToken,
508 0 : ) -> bool
509 0 : where
510 0 : F: Future<Output = Result<T, safekeeper_client::mgmt_api::Error>>,
511 0 : {
512 0 : let jwt = self
513 0 : .service
514 0 : .config
515 0 : .safekeeper_jwt_token
516 0 : .clone()
517 0 : .map(SecretString::from);
518 : loop {
519 0 : let res = req
520 0 : .safekeeper
521 0 : .with_client_retries(
522 0 : |client| {
523 0 : let closure = &closure;
524 0 : async move { closure(client).await }
525 0 : },
526 0 : self.service.get_http_client(),
527 0 : &jwt,
528 : 3,
529 : 10,
530 0 : Duration::from_secs(10),
531 0 : &req_cancel,
532 : )
533 0 : .await;
534 0 : match res {
535 0 : Ok(resp) => {
536 0 : log_success(resp);
537 0 : let res = self
538 0 : .service
539 0 : .persistence
540 0 : .remove_pending_op(
541 0 : req.tenant_id,
542 0 : req.timeline_id,
543 0 : req.safekeeper.get_id(),
544 0 : req.generation,
545 0 : )
546 0 : .await;
547 :
548 0 : let complete_counter = &METRICS_REGISTRY
549 0 : .metrics_group
550 0 : .storage_controller_safekeeper_reconciles_complete;
551 0 : complete_counter.inc(SafekeeperReconcilerLabelGroup {
552 0 : sk_az: &req.safekeeper.skp.availability_zone_id,
553 0 : sk_node_id: &req.safekeeper.get_id().to_string(),
554 0 : sk_hostname: &req.safekeeper.skp.host,
555 0 : });
556 :
557 0 : if let Err(err) = res {
558 0 : tracing::info!(
559 0 : "couldn't remove reconciliation request onto {} from persistence: {err:?}",
560 : req.safekeeper.skp.host
561 : );
562 0 : }
563 0 : return true;
564 : }
565 : Err(mgmt_api::Error::Cancelled) => {
566 : // On cancellation, the code that issued it will take care of removing db entries (if needed)
567 0 : return false;
568 : }
569 0 : Err(e) => {
570 0 : tracing::info!(
571 0 : "Reconcile attempt for safekeeper {} failed, retrying after sleep: {e:?}",
572 : req.safekeeper.skp.host
573 : );
574 : const SLEEP_TIME: Duration = Duration::from_secs(1);
575 0 : tokio::time::sleep(SLEEP_TIME).await;
576 : }
577 : }
578 : }
579 0 : }
580 : }
|