LCOV - code coverage report
Current view: top level - storage_controller/src/service - safekeeper_reconciler.rs (source / functions) Coverage Total Hit
Test: 046155f5c3321e806c1c5acca9ccd26414587b38.info Lines: 0.0 % 274 0
Test Date: 2025-03-27 12:42:09 Functions: 0.0 % 44 0

            Line data    Source code
       1              : use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration};
       2              : 
       3              : use clashmap::{ClashMap, Entry};
       4              : use safekeeper_api::models::PullTimelineRequest;
       5              : use safekeeper_client::mgmt_api;
       6              : use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
       7              : use tokio_util::sync::CancellationToken;
       8              : use tracing::Instrument;
       9              : use utils::{
      10              :     id::{NodeId, TenantId, TimelineId},
      11              :     logging::SecretString,
      12              : };
      13              : 
      14              : use crate::{
      15              :     persistence::SafekeeperTimelineOpKind, safekeeper::Safekeeper,
      16              :     safekeeper_client::SafekeeperClient,
      17              : };
      18              : 
      19              : use super::Service;
      20              : 
      21              : pub(crate) struct SafekeeperReconcilers {
      22              :     cancel: CancellationToken,
      23              :     reconcilers: HashMap<NodeId, ReconcilerHandle>,
      24              : }
      25              : 
      26              : impl SafekeeperReconcilers {
      27            0 :     pub fn new(cancel: CancellationToken) -> Self {
      28            0 :         SafekeeperReconcilers {
      29            0 :             cancel,
      30            0 :             reconcilers: HashMap::new(),
      31            0 :         }
      32            0 :     }
      33            0 :     pub(crate) fn schedule_request_vec(
      34            0 :         &mut self,
      35            0 :         service: &Arc<Service>,
      36            0 :         reqs: Vec<ScheduleRequest>,
      37            0 :     ) {
      38            0 :         for req in reqs {
      39            0 :             self.schedule_request(service, req);
      40            0 :         }
      41            0 :     }
      42            0 :     pub(crate) fn schedule_request(&mut self, service: &Arc<Service>, req: ScheduleRequest) {
      43            0 :         let node_id = req.safekeeper.get_id();
      44            0 :         let reconciler_handle = self.reconcilers.entry(node_id).or_insert_with(|| {
      45            0 :             SafekeeperReconciler::spawn(self.cancel.child_token(), service.clone())
      46            0 :         });
      47            0 :         reconciler_handle.schedule_reconcile(req);
      48            0 :     }
      49            0 :     pub(crate) fn cancel_safekeeper(&mut self, node_id: NodeId) {
      50            0 :         if let Some(handle) = self.reconcilers.remove(&node_id) {
      51            0 :             handle.cancel.cancel();
      52            0 :         }
      53            0 :     }
      54              :     /// Cancel ongoing reconciles for the given timeline
      55              :     ///
      56              :     /// Specifying `None` here only removes reconciles for the tenant-global reconciliation,
      57              :     /// instead of doing this for all timelines of the tenant.
      58              :     ///
      59              :     /// Callers must remove the reconciles from the db manually
      60            0 :     pub(crate) fn cancel_reconciles_for_timeline(
      61            0 :         &mut self,
      62            0 :         node_id: NodeId,
      63            0 :         tenant_id: TenantId,
      64            0 :         timeline_id: Option<TimelineId>,
      65            0 :     ) {
      66            0 :         if let Some(handle) = self.reconcilers.get(&node_id) {
      67            0 :             handle.cancel_reconciliation(tenant_id, timeline_id);
      68            0 :         }
      69            0 :     }
      70              : }
      71              : 
      72              : /// Initial load of the pending operations from the db
      73            0 : pub(crate) async fn load_schedule_requests(
      74            0 :     service: &Arc<Service>,
      75            0 :     safekeepers: &HashMap<NodeId, Safekeeper>,
      76            0 : ) -> anyhow::Result<Vec<ScheduleRequest>> {
      77            0 :     let pending_ops = service.persistence.list_pending_ops(None).await?;
      78            0 :     let mut res = Vec::with_capacity(pending_ops.len());
      79            0 :     for op_persist in pending_ops {
      80            0 :         let node_id = NodeId(op_persist.sk_id as u64);
      81            0 :         let Some(sk) = safekeepers.get(&node_id) else {
      82              :             // This shouldn't happen, at least the safekeeper should exist as decomissioned.
      83            0 :             tracing::warn!(
      84              :                 tenant_id = op_persist.tenant_id,
      85              :                 timeline_id = op_persist.timeline_id,
      86            0 :                 "couldn't find safekeeper with pending op id {node_id} in list of stored safekeepers"
      87              :             );
      88            0 :             continue;
      89              :         };
      90            0 :         let sk = Box::new(sk.clone());
      91            0 :         let tenant_id = TenantId::from_str(&op_persist.tenant_id)?;
      92            0 :         let timeline_id = if !op_persist.timeline_id.is_empty() {
      93            0 :             Some(TimelineId::from_str(&op_persist.timeline_id)?)
      94              :         } else {
      95            0 :             None
      96              :         };
      97            0 :         let host_list = match op_persist.op_kind {
      98            0 :             SafekeeperTimelineOpKind::Delete => Vec::new(),
      99            0 :             SafekeeperTimelineOpKind::Exclude => Vec::new(),
     100              :             SafekeeperTimelineOpKind::Pull => {
     101              :                 // TODO this code is super hacky, it doesn't take migrations into account
     102            0 :                 let Some(timeline_id) = timeline_id else {
     103            0 :                     anyhow::bail!(
     104            0 :                         "timeline_id is empty for `pull` schedule request for {tenant_id}"
     105            0 :                     );
     106              :                 };
     107            0 :                 let timeline_persist = service
     108            0 :                     .persistence
     109            0 :                     .get_timeline(tenant_id, timeline_id)
     110            0 :                     .await?;
     111            0 :                 let Some(timeline_persist) = timeline_persist else {
     112              :                     // This shouldn't happen, the timeline should still exist
     113            0 :                     tracing::warn!(
     114              :                         tenant_id = op_persist.tenant_id,
     115              :                         timeline_id = op_persist.timeline_id,
     116            0 :                         "couldn't find timeline for corresponding pull op"
     117              :                     );
     118            0 :                     continue;
     119              :                 };
     120            0 :                 timeline_persist
     121            0 :                     .sk_set
     122            0 :                     .iter()
     123            0 :                     .filter_map(|sk_id| {
     124            0 :                         let other_node_id = NodeId(*sk_id as u64);
     125            0 :                         if node_id == other_node_id {
     126              :                             // We obviously don't want to pull from ourselves
     127            0 :                             return None;
     128            0 :                         }
     129            0 :                         let Some(sk) = safekeepers.get(&other_node_id) else {
     130            0 :                             tracing::warn!(
     131            0 :                                 "couldnt find safekeeper with pending op id {other_node_id}, not pulling from it"
     132              :                             );
     133            0 :                             return None;
     134              :                         };
     135            0 :                         Some((other_node_id, sk.base_url()))
     136            0 :                     })
     137            0 :                     .collect::<Vec<_>>()
     138              :             }
     139              :         };
     140            0 :         let req = ScheduleRequest {
     141            0 :             safekeeper: sk,
     142            0 :             host_list,
     143            0 :             tenant_id,
     144            0 :             timeline_id,
     145            0 :             generation: op_persist.generation as u32,
     146            0 :             kind: op_persist.op_kind,
     147            0 :         };
     148            0 :         res.push(req);
     149              :     }
     150            0 :     Ok(res)
     151            0 : }
     152              : 
     153              : pub(crate) struct ScheduleRequest {
     154              :     pub(crate) safekeeper: Box<Safekeeper>,
     155              :     pub(crate) host_list: Vec<(NodeId, String)>,
     156              :     pub(crate) tenant_id: TenantId,
     157              :     pub(crate) timeline_id: Option<TimelineId>,
     158              :     pub(crate) generation: u32,
     159              :     pub(crate) kind: SafekeeperTimelineOpKind,
     160              : }
     161              : 
     162              : struct ReconcilerHandle {
     163              :     tx: UnboundedSender<(ScheduleRequest, Arc<CancellationToken>)>,
     164              :     #[allow(clippy::type_complexity)]
     165              :     ongoing_tokens: Arc<ClashMap<(TenantId, Option<TimelineId>), Arc<CancellationToken>>>,
     166              :     cancel: CancellationToken,
     167              : }
     168              : 
     169              : impl ReconcilerHandle {
     170              :     /// Obtain a new token slot, cancelling any existing reconciliations for that timeline
     171            0 :     fn new_token_slot(
     172            0 :         &self,
     173            0 :         tenant_id: TenantId,
     174            0 :         timeline_id: Option<TimelineId>,
     175            0 :     ) -> Arc<CancellationToken> {
     176            0 :         let entry = self.ongoing_tokens.entry((tenant_id, timeline_id));
     177            0 :         if let Entry::Occupied(entry) = &entry {
     178            0 :             let cancel: &CancellationToken = entry.get();
     179            0 :             cancel.cancel();
     180            0 :         }
     181            0 :         entry.insert(Arc::new(self.cancel.child_token())).clone()
     182            0 :     }
     183              :     /// Cancel an ongoing reconciliation
     184            0 :     fn cancel_reconciliation(&self, tenant_id: TenantId, timeline_id: Option<TimelineId>) {
     185            0 :         if let Some((_, cancel)) = self.ongoing_tokens.remove(&(tenant_id, timeline_id)) {
     186            0 :             cancel.cancel();
     187            0 :         }
     188            0 :     }
     189            0 :     fn schedule_reconcile(&self, req: ScheduleRequest) {
     190            0 :         let cancel = self.new_token_slot(req.tenant_id, req.timeline_id);
     191            0 :         let hostname = req.safekeeper.skp.host.clone();
     192            0 :         if let Err(err) = self.tx.send((req, cancel)) {
     193            0 :             tracing::info!("scheduling request onto {hostname} returned error: {err}");
     194            0 :         }
     195            0 :     }
     196              : }
     197              : 
     198              : pub(crate) struct SafekeeperReconciler {
     199              :     service: Arc<Service>,
     200              :     rx: UnboundedReceiver<(ScheduleRequest, Arc<CancellationToken>)>,
     201              :     cancel: CancellationToken,
     202              : }
     203              : 
     204              : impl SafekeeperReconciler {
     205            0 :     fn spawn(cancel: CancellationToken, service: Arc<Service>) -> ReconcilerHandle {
     206            0 :         // We hold the ServiceInner lock so we don't want to make sending to the reconciler channel to be blocking.
     207            0 :         let (tx, rx) = mpsc::unbounded_channel();
     208            0 :         let mut reconciler = SafekeeperReconciler {
     209            0 :             service,
     210            0 :             rx,
     211            0 :             cancel: cancel.clone(),
     212            0 :         };
     213            0 :         let handle = ReconcilerHandle {
     214            0 :             tx,
     215            0 :             ongoing_tokens: Arc::new(ClashMap::new()),
     216            0 :             cancel,
     217            0 :         };
     218            0 :         tokio::spawn(async move { reconciler.run().await });
     219            0 :         handle
     220            0 :     }
     221            0 :     async fn run(&mut self) {
     222              :         loop {
     223              :             // TODO add parallelism with semaphore here
     224            0 :             let req = tokio::select! {
     225            0 :                 req = self.rx.recv() => req,
     226            0 :                 _ = self.cancel.cancelled() => break,
     227              :             };
     228            0 :             let Some((req, req_cancel)) = req else { break };
     229            0 :             if req_cancel.is_cancelled() {
     230            0 :                 continue;
     231            0 :             }
     232            0 : 
     233            0 :             let kind = req.kind;
     234            0 :             let tenant_id = req.tenant_id;
     235            0 :             let timeline_id = req.timeline_id;
     236            0 :             self.reconcile_one(req, req_cancel)
     237            0 :                 .instrument(tracing::info_span!(
     238            0 :                     "reconcile_one",
     239            0 :                     ?kind,
     240            0 :                     %tenant_id,
     241            0 :                     ?timeline_id
     242            0 :                 ))
     243            0 :                 .await;
     244              :         }
     245            0 :     }
     246            0 :     async fn reconcile_one(&self, req: ScheduleRequest, req_cancel: Arc<CancellationToken>) {
     247            0 :         let req_host = req.safekeeper.skp.host.clone();
     248            0 :         match req.kind {
     249              :             SafekeeperTimelineOpKind::Pull => {
     250            0 :                 let Some(timeline_id) = req.timeline_id else {
     251            0 :                     tracing::warn!(
     252            0 :                         "ignoring invalid schedule request: timeline_id is empty for `pull`"
     253              :                     );
     254            0 :                     return;
     255              :                 };
     256            0 :                 let our_id = req.safekeeper.get_id();
     257            0 :                 let http_hosts = req
     258            0 :                     .host_list
     259            0 :                     .iter()
     260            0 :                     .filter(|(node_id, _hostname)| *node_id != our_id)
     261            0 :                     .map(|(_, hostname)| hostname.clone())
     262            0 :                     .collect::<Vec<_>>();
     263            0 :                 let pull_req = PullTimelineRequest {
     264            0 :                     http_hosts,
     265            0 :                     tenant_id: req.tenant_id,
     266            0 :                     timeline_id,
     267            0 :                 };
     268            0 :                 self.reconcile_inner(
     269            0 :                     req,
     270            0 :                     async |client| client.pull_timeline(&pull_req).await,
     271            0 :                     |resp| {
     272            0 :                         tracing::info!(
     273            0 :                             "pulled timeline from {} onto {req_host}",
     274              :                             resp.safekeeper_host,
     275              :                         );
     276            0 :                     },
     277            0 :                     req_cancel,
     278            0 :                 )
     279            0 :                 .await;
     280              :             }
     281              :             SafekeeperTimelineOpKind::Exclude => {
     282              :                 // TODO actually exclude instead of delete here
     283            0 :                 let tenant_id = req.tenant_id;
     284            0 :                 let Some(timeline_id) = req.timeline_id else {
     285            0 :                     tracing::warn!(
     286            0 :                         "ignoring invalid schedule request: timeline_id is empty for `exclude`"
     287              :                     );
     288            0 :                     return;
     289              :                 };
     290            0 :                 self.reconcile_inner(
     291            0 :                     req,
     292            0 :                     async |client| client.delete_timeline(tenant_id, timeline_id).await,
     293            0 :                     |_resp| {
     294            0 :                         tracing::info!("deleted timeline from {req_host}");
     295            0 :                     },
     296            0 :                     req_cancel,
     297            0 :                 )
     298            0 :                 .await;
     299              :             }
     300              :             SafekeeperTimelineOpKind::Delete => {
     301            0 :                 let tenant_id = req.tenant_id;
     302            0 :                 if let Some(timeline_id) = req.timeline_id {
     303            0 :                     self.reconcile_inner(
     304            0 :                         req,
     305            0 :                         async |client| client.delete_timeline(tenant_id, timeline_id).await,
     306            0 :                         |_resp| {
     307            0 :                             tracing::info!("deleted timeline from {req_host}");
     308            0 :                         },
     309            0 :                         req_cancel,
     310            0 :                     )
     311            0 :                     .await;
     312              :                 } else {
     313            0 :                     self.reconcile_inner(
     314            0 :                         req,
     315            0 :                         async |client| client.delete_tenant(tenant_id).await,
     316            0 :                         |_resp| {
     317            0 :                             tracing::info!("deleted tenant from {req_host}");
     318            0 :                         },
     319            0 :                         req_cancel,
     320            0 :                     )
     321            0 :                     .await;
     322              :                 }
     323              :             }
     324              :         }
     325            0 :     }
     326            0 :     async fn reconcile_inner<T, F, U>(
     327            0 :         &self,
     328            0 :         req: ScheduleRequest,
     329            0 :         closure: impl Fn(SafekeeperClient) -> F,
     330            0 :         log_success: impl FnOnce(T) -> U,
     331            0 :         req_cancel: Arc<CancellationToken>,
     332            0 :     ) where
     333            0 :         F: Future<Output = Result<T, safekeeper_client::mgmt_api::Error>>,
     334            0 :     {
     335            0 :         let jwt = self
     336            0 :             .service
     337            0 :             .config
     338            0 :             .safekeeper_jwt_token
     339            0 :             .clone()
     340            0 :             .map(SecretString::from);
     341              :         loop {
     342            0 :             let res = req
     343            0 :                 .safekeeper
     344            0 :                 .with_client_retries(
     345            0 :                     |client| {
     346            0 :                         let closure = &closure;
     347            0 :                         async move { closure(client).await }
     348            0 :                     },
     349            0 :                     self.service.get_http_client(),
     350            0 :                     &jwt,
     351            0 :                     3,
     352            0 :                     10,
     353            0 :                     Duration::from_secs(10),
     354            0 :                     &req_cancel,
     355            0 :                 )
     356            0 :                 .await;
     357            0 :             match res {
     358            0 :                 Ok(resp) => {
     359            0 :                     log_success(resp);
     360            0 :                     let res = self
     361            0 :                         .service
     362            0 :                         .persistence
     363            0 :                         .remove_pending_op(
     364            0 :                             req.tenant_id,
     365            0 :                             req.timeline_id,
     366            0 :                             req.safekeeper.get_id(),
     367            0 :                             req.generation,
     368            0 :                         )
     369            0 :                         .await;
     370            0 :                     if let Err(err) = res {
     371            0 :                         tracing::info!(
     372            0 :                             "couldn't remove reconciliation request onto {} from persistence: {err:?}",
     373              :                             req.safekeeper.skp.host
     374              :                         );
     375            0 :                     }
     376            0 :                     return;
     377              :                 }
     378              :                 Err(mgmt_api::Error::Cancelled) => {
     379              :                     // On cancellation, the code that issued it will take care of removing db entries (if needed)
     380            0 :                     return;
     381              :                 }
     382            0 :                 Err(e) => {
     383            0 :                     tracing::info!(
     384            0 :                         "Reconcile attempt for safekeeper {} failed, retrying after sleep: {e:?}",
     385              :                         req.safekeeper.skp.host
     386              :                     );
     387              :                     const SLEEP_TIME: Duration = Duration::from_secs(1);
     388            0 :                     tokio::time::sleep(SLEEP_TIME).await;
     389              :                 }
     390              :             }
     391              :         }
     392            0 :     }
     393              : }
        

Generated by: LCOV version 2.1-beta