LCOV - code coverage report
Current view: top level - storage_controller/src/service - safekeeper_reconciler.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 377 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 51 0

            Line data    Source code
       1              : use std::{
       2              :     collections::HashMap,
       3              :     str::FromStr,
       4              :     sync::{Arc, atomic::AtomicU64},
       5              :     time::Duration,
       6              : };
       7              : 
       8              : use clashmap::{ClashMap, Entry};
       9              : use safekeeper_api::models::PullTimelineRequest;
      10              : use safekeeper_client::mgmt_api;
      11              : use tokio::sync::{
      12              :     Semaphore,
      13              :     mpsc::{self, UnboundedReceiver, UnboundedSender},
      14              : };
      15              : use tokio_util::sync::CancellationToken;
      16              : use tracing::Instrument;
      17              : use utils::{
      18              :     id::{NodeId, TenantId, TimelineId},
      19              :     logging::SecretString,
      20              : };
      21              : 
      22              : use crate::{
      23              :     metrics::{METRICS_REGISTRY, SafekeeperReconcilerLabelGroup},
      24              :     persistence::SafekeeperTimelineOpKind,
      25              :     safekeeper::Safekeeper,
      26              :     safekeeper_client::SafekeeperClient,
      27              : };
      28              : 
      29              : use super::Service;
      30              : 
      31              : pub(crate) struct SafekeeperReconcilers {
      32              :     cancel: CancellationToken,
      33              :     reconcilers: HashMap<NodeId, ReconcilerHandle>,
      34              : }
      35              : 
      36              : impl SafekeeperReconcilers {
      37            0 :     pub fn new(cancel: CancellationToken) -> Self {
      38            0 :         SafekeeperReconcilers {
      39            0 :             cancel,
      40            0 :             reconcilers: HashMap::new(),
      41            0 :         }
      42            0 :     }
      43              :     /// Adds a safekeeper-specific reconciler.
      44              :     /// Can be called multiple times, but it needs to be called at least once
      45              :     /// for every new safekeeper added.
      46            0 :     pub(crate) fn start_reconciler(&mut self, node_id: NodeId, service: &Arc<Service>) {
      47            0 :         self.reconcilers.entry(node_id).or_insert_with(|| {
      48            0 :             SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone())
      49            0 :         });
      50            0 :     }
      51              :     /// Stop a safekeeper-specific reconciler.
      52              :     /// Stops the reconciler, cancelling all ongoing tasks.
      53            0 :     pub(crate) fn stop_reconciler(&mut self, node_id: NodeId) {
      54            0 :         if let Some(handle) = self.reconcilers.remove(&node_id) {
      55            0 :             handle.cancel.cancel();
      56            0 :         }
      57            0 :     }
      58            0 :     pub(crate) fn schedule_request_vec(&self, reqs: Vec<ScheduleRequest>) {
      59            0 :         tracing::info!(
      60            0 :             "Scheduling {} pending safekeeper ops loaded from db",
      61            0 :             reqs.len()
      62              :         );
      63            0 :         for req in reqs {
      64            0 :             self.schedule_request(req);
      65            0 :         }
      66            0 :     }
      67            0 :     pub(crate) fn schedule_request(&self, req: ScheduleRequest) {
      68            0 :         let node_id = req.safekeeper.get_id();
      69            0 :         let reconciler_handle = self.reconcilers.get(&node_id).unwrap();
      70            0 :         reconciler_handle.schedule_reconcile(req);
      71            0 :     }
      72              :     /// Cancel ongoing reconciles for the given timeline
      73              :     ///
      74              :     /// Specifying `None` here only removes reconciles for the tenant-global reconciliation,
      75              :     /// instead of doing this for all timelines of the tenant.
      76              :     ///
      77              :     /// Callers must remove the reconciles from the db manually
      78            0 :     pub(crate) fn cancel_reconciles_for_timeline(
      79            0 :         &mut self,
      80            0 :         node_id: NodeId,
      81            0 :         tenant_id: TenantId,
      82            0 :         timeline_id: Option<TimelineId>,
      83            0 :     ) {
      84            0 :         if let Some(handle) = self.reconcilers.get(&node_id) {
      85            0 :             handle.cancel_reconciliation(tenant_id, timeline_id);
      86            0 :         }
      87            0 :     }
      88              : }
      89              : 
      90              : /// Initial load of the pending operations from the db
      91            0 : pub(crate) async fn load_schedule_requests(
      92            0 :     service: &Arc<Service>,
      93            0 :     safekeepers: &HashMap<NodeId, Safekeeper>,
      94            0 : ) -> anyhow::Result<Vec<ScheduleRequest>> {
      95            0 :     let pending_ops_timelines = service
      96            0 :         .persistence
      97            0 :         .list_pending_ops_with_timelines()
      98            0 :         .await?;
      99            0 :     let mut res = Vec::with_capacity(pending_ops_timelines.len());
     100            0 :     for (op_persist, timeline_persist) in pending_ops_timelines {
     101            0 :         let node_id = NodeId(op_persist.sk_id as u64);
     102            0 :         let Some(sk) = safekeepers.get(&node_id) else {
     103              :             // This shouldn't happen, at least the safekeeper should exist as decomissioned.
     104            0 :             tracing::warn!(
     105              :                 tenant_id = op_persist.tenant_id,
     106              :                 timeline_id = op_persist.timeline_id,
     107            0 :                 "couldn't find safekeeper with pending op id {node_id} in list of stored safekeepers"
     108              :             );
     109            0 :             continue;
     110              :         };
     111            0 :         let sk = Box::new(sk.clone());
     112            0 :         let tenant_id = TenantId::from_str(&op_persist.tenant_id)?;
     113            0 :         let timeline_id = if !op_persist.timeline_id.is_empty() {
     114            0 :             Some(TimelineId::from_str(&op_persist.timeline_id)?)
     115              :         } else {
     116            0 :             None
     117              :         };
     118            0 :         let host_list = match op_persist.op_kind {
     119            0 :             SafekeeperTimelineOpKind::Delete => Vec::new(),
     120            0 :             SafekeeperTimelineOpKind::Exclude => Vec::new(),
     121              :             SafekeeperTimelineOpKind::Pull => {
     122            0 :                 if timeline_id.is_none() {
     123              :                     // We only do this extra check (outside of timeline_persist check) to give better error msgs
     124            0 :                     anyhow::bail!(
     125            0 :                         "timeline_id is empty for `pull` schedule request for {tenant_id}"
     126              :                     );
     127            0 :                 };
     128            0 :                 let Some(timeline_persist) = timeline_persist else {
     129              :                     // This shouldn't happen, the timeline should still exist
     130            0 :                     tracing::warn!(
     131              :                         tenant_id = op_persist.tenant_id,
     132              :                         timeline_id = op_persist.timeline_id,
     133            0 :                         "couldn't find timeline for corresponding pull op"
     134              :                     );
     135            0 :                     continue;
     136              :                 };
     137            0 :                 timeline_persist
     138            0 :                     .sk_set
     139            0 :                     .iter()
     140            0 :                     .filter_map(|sk_id| {
     141            0 :                         let other_node_id = NodeId(*sk_id as u64);
     142            0 :                         if node_id == other_node_id {
     143              :                             // We obviously don't want to pull from ourselves
     144            0 :                             return None;
     145            0 :                         }
     146            0 :                         let Some(sk) = safekeepers.get(&other_node_id) else {
     147            0 :                             tracing::warn!(
     148            0 :                                 "couldn't find safekeeper with pending op id {other_node_id}, not pulling from it"
     149              :                             );
     150            0 :                             return None;
     151              :                         };
     152            0 :                         Some((other_node_id, sk.base_url()))
     153            0 :                     })
     154            0 :                     .collect::<Vec<_>>()
     155              :             }
     156              :         };
     157            0 :         let req = ScheduleRequest {
     158            0 :             safekeeper: sk,
     159            0 :             host_list,
     160            0 :             tenant_id,
     161            0 :             timeline_id,
     162            0 :             generation: op_persist.generation as u32,
     163            0 :             kind: op_persist.op_kind,
     164            0 :         };
     165            0 :         res.push(req);
     166              :     }
     167            0 :     Ok(res)
     168            0 : }
     169              : 
     170              : pub(crate) struct ScheduleRequest {
     171              :     pub(crate) safekeeper: Box<Safekeeper>,
     172              :     pub(crate) host_list: Vec<(NodeId, String)>,
     173              :     pub(crate) tenant_id: TenantId,
     174              :     pub(crate) timeline_id: Option<TimelineId>,
     175              :     pub(crate) generation: u32,
     176              :     pub(crate) kind: SafekeeperTimelineOpKind,
     177              : }
     178              : 
     179              : /// A way to keep ongoing/queued reconcile requests apart
     180              : #[derive(Copy, Clone, PartialEq, Eq)]
     181              : struct TokenId(u64);
     182              : 
     183              : type OngoingTokens = ClashMap<(TenantId, Option<TimelineId>), (CancellationToken, TokenId)>;
     184              : 
     185              : /// Handle to per safekeeper reconciler.
     186              : struct ReconcilerHandle {
     187              :     tx: UnboundedSender<(ScheduleRequest, CancellationToken, TokenId)>,
     188              :     ongoing_tokens: Arc<OngoingTokens>,
     189              :     token_id_counter: AtomicU64,
     190              :     cancel: CancellationToken,
     191              : }
     192              : 
     193              : impl ReconcilerHandle {
     194              :     /// Obtain a new token slot, cancelling any existing reconciliations for
     195              :     /// that timeline. It is not useful to have >1 operation per <tenant_id,
     196              :     /// timeline_id, safekeeper>, hence scheduling op cancels current one if it
     197              :     /// exists.
     198            0 :     fn new_token_slot(
     199            0 :         &self,
     200            0 :         tenant_id: TenantId,
     201            0 :         timeline_id: Option<TimelineId>,
     202            0 :     ) -> (CancellationToken, TokenId) {
     203            0 :         let token_id = self
     204            0 :             .token_id_counter
     205            0 :             .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
     206            0 :         let token_id = TokenId(token_id);
     207            0 :         let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
     208            0 :         if let Entry::Occupied(entry) = &entry {
     209            0 :             let (cancel, _) = entry.get();
     210            0 :             cancel.cancel();
     211            0 :         }
     212            0 :         entry.insert((self.cancel.child_token(), token_id)).clone()
     213            0 :     }
     214              :     /// Cancel an ongoing reconciliation
     215            0 :     fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
     216            0 :         if let Some((_, (cancel, _id))) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
     217            0 :             cancel.cancel();
     218            0 :         }
     219            0 :     }
     220            0 :     fn schedule_reconcile(&self, req: ScheduleRequest) {
     221            0 :         let (cancel, token_id) = self.new_token_slot(req.tenant_id, req.timeline_id);
     222            0 :         let hostname = req.safekeeper.skp.host.clone();
     223            0 :         let sk_az = req.safekeeper.skp.availability_zone_id.clone();
     224            0 :         let sk_node_id = req.safekeeper.get_id().to_string();
     225              : 
     226              :         // We don't have direct access to the queue depth here, so increase it blindly by 1.
     227              :         // We know that putting into the queue increases the queue depth. The receiver will
     228              :         // update with the correct value once it processes the next item. To avoid races where we
     229              :         // reduce before we increase, leaving the gauge with a 1 value for a long time, we
     230              :         // increase it before putting into the queue.
     231            0 :         let queued_gauge = &METRICS_REGISTRY
     232            0 :             .metrics_group
     233            0 :             .storage_controller_safekeeper_reconciles_queued;
     234            0 :         let label_group = SafekeeperReconcilerLabelGroup {
     235            0 :             sk_az: &sk_az,
     236            0 :             sk_node_id: &sk_node_id,
     237            0 :             sk_hostname: &hostname,
     238            0 :         };
     239            0 :         queued_gauge.inc(label_group.clone());
     240              : 
     241            0 :         if let Err(err) = self.tx.send((req, cancel, token_id)) {
     242            0 :             queued_gauge.set(label_group, 0);
     243            0 :             tracing::info!("scheduling request onto {hostname} returned error: {err}");
     244            0 :         }
     245            0 :     }
     246              : }
     247              : 
     248              : pub(crate) struct SafekeeperReconciler {
     249              :     inner: SafekeeperReconcilerInner,
     250              :     concurrency_limiter: Arc<Semaphore>,
     251              :     rx: UnboundedReceiver<(ScheduleRequest, CancellationToken, TokenId)>,
     252              :     cancel: CancellationToken,
     253              : }
     254              : 
     255              : /// Thin wrapper over `Service` to not clutter its inherent functions
     256              : #[derive(Clone)]
     257              : struct SafekeeperReconcilerInner {
     258              :     ongoing_tokens: Arc<OngoingTokens>,
     259              :     service: Arc<Service>,
     260              : }
     261              : 
     262              : impl SafekeeperReconciler {
     263            0 :     fn spawn(cancel: CancellationToken, service: Arc<Service>) -> ReconcilerHandle {
     264              :         // We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
     265            0 :         let (tx, rx) = mpsc::unbounded_channel();
     266            0 :         let concurrency = service.config.safekeeper_reconciler_concurrency;
     267            0 :         let ongoing_tokens = Arc::new(ClashMap::new());
     268            0 :         let mut reconciler = SafekeeperReconciler {
     269            0 :             inner: SafekeeperReconcilerInner {
     270            0 :                 service,
     271            0 :                 ongoing_tokens: ongoing_tokens.clone(),
     272            0 :             },
     273            0 :             rx,
     274            0 :             concurrency_limiter: Arc::new(Semaphore::new(concurrency)),
     275            0 :             cancel: cancel.clone(),
     276            0 :         };
     277            0 :         let handle = ReconcilerHandle {
     278            0 :             tx,
     279            0 :             ongoing_tokens,
     280            0 :             token_id_counter: AtomicU64::new(0),
     281            0 :             cancel,
     282            0 :         };
     283            0 :         tokio::spawn(async move { reconciler.run().await });
     284            0 :         handle
     285            0 :     }
     286            0 :     async fn run(&mut self) {
     287              :         loop {
     288            0 :             let req = tokio::select! {
     289            0 :                 req = self.rx.recv() => req,
     290            0 :                 _ = self.cancel.cancelled() => break,
     291              :             };
     292            0 :             let Some((req, req_cancel, req_token_id)) = req else {
     293            0 :                 break;
     294              :             };
     295              : 
     296            0 :             let permit_res = tokio::select! {
     297            0 :                 req = self.concurrency_limiter.clone().acquire_owned() => req,
     298            0 :                 _ = self.cancel.cancelled() => break,
     299              :             };
     300            0 :             let Ok(_permit) = permit_res else { return };
     301              : 
     302            0 :             let inner = self.inner.clone();
     303            0 :             if req_cancel.is_cancelled() {
     304            0 :                 continue;
     305            0 :             }
     306              : 
     307            0 :             let queued_gauge = &METRICS_REGISTRY
     308            0 :                 .metrics_group
     309            0 :                 .storage_controller_safekeeper_reconciles_queued;
     310            0 :             queued_gauge.set(
     311            0 :                 SafekeeperReconcilerLabelGroup {
     312            0 :                     sk_az: &req.safekeeper.skp.availability_zone_id,
     313            0 :                     sk_node_id: &req.safekeeper.get_id().to_string(),
     314            0 :                     sk_hostname: &req.safekeeper.skp.host,
     315            0 :                 },
     316            0 :                 self.rx.len() as i64,
     317              :             );
     318              : 
     319            0 :             tokio::task::spawn(async move {
     320            0 :                 let kind = req.kind;
     321            0 :                 let tenant_id = req.tenant_id;
     322            0 :                 let timeline_id = req.timeline_id;
     323            0 :                 let node_id = req.safekeeper.skp.id;
     324            0 :                 inner
     325            0 :                     .reconcile_one(req, req_cancel, req_token_id)
     326            0 :                     .instrument(tracing::info_span!(
     327              :                         "reconcile_one",
     328              :                         ?kind,
     329              :                         %tenant_id,
     330              :                         ?timeline_id,
     331              :                         %node_id,
     332              :                     ))
     333            0 :                     .await;
     334            0 :             });
     335              :         }
     336            0 :     }
     337              : }
     338              : 
     339              : impl SafekeeperReconcilerInner {
     340            0 :     async fn reconcile_one(
     341            0 :         &self,
     342            0 :         req: ScheduleRequest,
     343            0 :         req_cancel: CancellationToken,
     344            0 :         req_token_id: TokenId,
     345            0 :     ) {
     346            0 :         let req_host = req.safekeeper.skp.host.clone();
     347              :         let success;
     348            0 :         match req.kind {
     349              :             SafekeeperTimelineOpKind::Pull => {
     350            0 :                 let Some(timeline_id) = req.timeline_id else {
     351            0 :                     tracing::warn!(
     352            0 :                         "ignoring invalid schedule request: timeline_id is empty for `pull`"
     353              :                     );
     354            0 :                     return;
     355              :                 };
     356            0 :                 let our_id = req.safekeeper.get_id();
     357            0 :                 let http_hosts = req
     358            0 :                     .host_list
     359            0 :                     .iter()
     360            0 :                     .filter(|(node_id, _hostname)| *node_id != our_id)
     361            0 :                     .map(|(_, hostname)| hostname.clone())
     362            0 :                     .collect::<Vec<_>>();
     363            0 :                 let pull_req = PullTimelineRequest {
     364            0 :                     http_hosts,
     365            0 :                     tenant_id: req.tenant_id,
     366            0 :                     timeline_id,
     367            0 :                     ignore_tombstone: Some(false),
     368            0 :                 };
     369            0 :                 success = self
     370            0 :                     .reconcile_inner(
     371            0 :                         &req,
     372            0 :                         async |client| client.pull_timeline(&pull_req).await,
     373            0 :                         |resp| {
     374            0 :                             if let Some(host) = resp.safekeeper_host {
     375            0 :                                 tracing::info!("pulled timeline from {host} onto {req_host}");
     376              :                             } else {
     377            0 :                                 tracing::info!(
     378            0 :                                     "timeline already present on safekeeper on {req_host}"
     379              :                                 );
     380              :                             }
     381            0 :                         },
     382            0 :                         req_cancel,
     383              :                     )
     384            0 :                     .await;
     385              :             }
     386              :             SafekeeperTimelineOpKind::Exclude => {
     387              :                 // TODO actually exclude instead of delete here
     388            0 :                 let tenant_id = req.tenant_id;
     389            0 :                 let Some(timeline_id) = req.timeline_id else {
     390            0 :                     tracing::warn!(
     391            0 :                         "ignoring invalid schedule request: timeline_id is empty for `exclude`"
     392              :                     );
     393            0 :                     return;
     394              :                 };
     395            0 :                 success = self
     396            0 :                     .reconcile_inner(
     397            0 :                         &req,
     398            0 :                         async |client| client.delete_timeline(tenant_id, timeline_id).await,
     399            0 :                         |_resp| {
     400            0 :                             tracing::info!("deleted timeline from {req_host}");
     401            0 :                         },
     402            0 :                         req_cancel,
     403              :                     )
     404            0 :                     .await;
     405              :             }
     406              :             SafekeeperTimelineOpKind::Delete => {
     407            0 :                 let tenant_id = req.tenant_id;
     408            0 :                 if let Some(timeline_id) = req.timeline_id {
     409            0 :                     success = self
     410            0 :                         .reconcile_inner(
     411            0 :                             &req,
     412            0 :                             async |client| client.delete_timeline(tenant_id, timeline_id).await,
     413            0 :                             |_resp| {
     414            0 :                                 tracing::info!("deleted timeline from {req_host}");
     415            0 :                             },
     416            0 :                             req_cancel,
     417              :                         )
     418            0 :                         .await;
     419            0 :                     if success {
     420            0 :                         self.delete_timeline_from_db(tenant_id, timeline_id).await;
     421            0 :                     }
     422              :                 } else {
     423            0 :                     success = self
     424            0 :                         .reconcile_inner(
     425            0 :                             &req,
     426            0 :                             async |client| client.delete_tenant(tenant_id).await,
     427            0 :                             |_resp| {
     428            0 :                                 tracing::info!(%tenant_id, "deleted tenant from {req_host}");
     429            0 :                             },
     430            0 :                             req_cancel,
     431              :                         )
     432            0 :                         .await;
     433            0 :                     if success {
     434            0 :                         self.delete_tenant_timelines_from_db(tenant_id).await;
     435            0 :                     }
     436              :                 }
     437              :             }
     438              :         }
     439            0 :         if success {
     440            0 :             self.ongoing_tokens.remove_if(
     441            0 :                 &(req.tenant_id, req.timeline_id),
     442            0 :                 |_ttid, (_cancel, token_id)| {
     443              :                     // Ensure that this request is indeed the request we just finished and not a new one
     444            0 :                     req_token_id == *token_id
     445            0 :                 },
     446              :             );
     447            0 :         }
     448            0 :     }
     449            0 :     async fn delete_timeline_from_db(&self, tenant_id: TenantId, timeline_id: TimelineId) {
     450            0 :         match self
     451            0 :             .service
     452            0 :             .persistence
     453            0 :             .list_pending_ops_for_timeline(tenant_id, timeline_id)
     454            0 :             .await
     455              :         {
     456            0 :             Ok(list) => {
     457            0 :                 if !list.is_empty() {
     458              :                     // duplicate the timeline_id here because it might be None in the reconcile context
     459            0 :                     tracing::info!(%timeline_id, "not deleting timeline from db as there is {} open reconciles", list.len());
     460            0 :                     return;
     461            0 :                 }
     462              :             }
     463            0 :             Err(e) => {
     464            0 :                 tracing::warn!(%timeline_id, "couldn't query pending ops: {e}");
     465            0 :                 return;
     466              :             }
     467              :         }
     468            0 :         tracing::info!(%tenant_id, %timeline_id, "deleting timeline from db after all reconciles succeeded");
     469              :         // In theory we could crash right after deleting the op from the db and right before reaching this,
     470              :         // but then we'll boot up with a timeline that has deleted_at set, so hopefully we'll issue deletion ops for it again.
     471            0 :         if let Err(err) = self
     472            0 :             .service
     473            0 :             .persistence
     474            0 :             .delete_timeline(tenant_id, timeline_id)
     475            0 :             .await
     476              :         {
     477            0 :             tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
     478            0 :         }
     479            0 :     }
     480            0 :     async fn delete_tenant_timelines_from_db(&self, tenant_id: TenantId) {
     481            0 :         let timeline_list = match self
     482            0 :             .service
     483            0 :             .persistence
     484            0 :             .list_timelines_for_tenant(tenant_id)
     485            0 :             .await
     486              :         {
     487            0 :             Ok(timeline_list) => timeline_list,
     488            0 :             Err(e) => {
     489            0 :                 tracing::warn!(%tenant_id, "couldn't query timelines: {e}");
     490            0 :                 return;
     491              :             }
     492              :         };
     493            0 :         for timeline in timeline_list {
     494            0 :             let Ok(timeline_id) = TimelineId::from_str(&timeline.timeline_id) else {
     495            0 :                 tracing::warn!("Invalid timeline ID in database {}", timeline.timeline_id);
     496            0 :                 continue;
     497              :             };
     498            0 :             self.delete_timeline_from_db(tenant_id, timeline_id).await;
     499              :         }
     500            0 :     }
     501              :     /// Returns whether the reconciliation happened successfully (or we got cancelled)
     502            0 :     async fn reconcile_inner<T, F, U>(
     503            0 :         &self,
     504            0 :         req: &ScheduleRequest,
     505            0 :         closure: impl Fn(SafekeeperClient) -> F,
     506            0 :         log_success: impl FnOnce(T) -> U,
     507            0 :         req_cancel: CancellationToken,
     508            0 :     ) -> bool
     509            0 :     where
     510            0 :         F: Future<Output = Result<T, safekeeper_client::mgmt_api::Error>>,
     511            0 :     {
     512            0 :         let jwt = self
     513            0 :             .service
     514            0 :             .config
     515            0 :             .safekeeper_jwt_token
     516            0 :             .clone()
     517            0 :             .map(SecretString::from);
     518              :         loop {
     519            0 :             let res = req
     520            0 :                 .safekeeper
     521            0 :                 .with_client_retries(
     522            0 :                     |client| {
     523            0 :                         let closure = &closure;
     524            0 :                         async move { closure(client).await }
     525            0 :                     },
     526            0 :                     self.service.get_http_client(),
     527            0 :                     &jwt,
     528              :                     3,
     529              :                     10,
     530            0 :                     Duration::from_secs(10),
     531            0 :                     &req_cancel,
     532              :                 )
     533            0 :                 .await;
     534            0 :             match res {
     535            0 :                 Ok(resp) => {
     536            0 :                     log_success(resp);
     537            0 :                     let res = self
     538            0 :                         .service
     539            0 :                         .persistence
     540            0 :                         .remove_pending_op(
     541            0 :                             req.tenant_id,
     542            0 :                             req.timeline_id,
     543            0 :                             req.safekeeper.get_id(),
     544            0 :                             req.generation,
     545            0 :                         )
     546            0 :                         .await;
     547              : 
     548            0 :                     let complete_counter = &METRICS_REGISTRY
     549            0 :                         .metrics_group
     550            0 :                         .storage_controller_safekeeper_reconciles_complete;
     551            0 :                     complete_counter.inc(SafekeeperReconcilerLabelGroup {
     552            0 :                         sk_az: &req.safekeeper.skp.availability_zone_id,
     553            0 :                         sk_node_id: &req.safekeeper.get_id().to_string(),
     554            0 :                         sk_hostname: &req.safekeeper.skp.host,
     555            0 :                     });
     556              : 
     557            0 :                     if let Err(err) = res {
     558            0 :                         tracing::info!(
     559            0 :                             "couldn't remove reconciliation request onto {} from persistence: {err:?}",
     560              :                             req.safekeeper.skp.host
     561              :                         );
     562            0 :                     }
     563            0 :                     return true;
     564              :                 }
     565              :                 Err(mgmt_api::Error::Cancelled) => {
     566              :                     // On cancellation, the code that issued it will take care of removing db entries (if needed)
     567            0 :                     return false;
     568              :                 }
     569            0 :                 Err(e) => {
     570            0 :                     tracing::info!(
     571            0 :                         "Reconcile attempt for safekeeper {} failed, retrying after sleep: {e:?}",
     572              :                         req.safekeeper.skp.host
     573              :                     );
     574              :                     const SLEEP_TIME: Duration = Duration::from_secs(1);
     575            0 :                     tokio::time::sleep(SLEEP_TIME).await;
     576              :                 }
     577              :             }
     578              :         }
     579            0 :     }
     580              : }
        

Generated by: LCOV version 2.1-beta