LCOV - code coverage report
Current view: top level - storage_controller/src/service - safekeeper_service.rs (source / functions) Coverage Total Hit
Test: ae4948feae6a1d420c855050eb8c189119446a71.info Lines: 0.0 % 405 0
Test Date: 2025-03-18 18:33:46 Functions: 0.0 % 31 0

            Line data    Source code
       1              : use std::collections::{HashMap, HashSet};
       2              : use std::sync::Arc;
       3              : use std::time::Duration;
       4              : 
       5              : use super::safekeeper_reconciler::ScheduleRequest;
       6              : use crate::heartbeater::SafekeeperState;
       7              : use crate::persistence::{
       8              :     DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
       9              : };
      10              : use crate::safekeeper::Safekeeper;
      11              : use http_utils::error::ApiError;
      12              : use pageserver_api::controller_api::{SafekeeperDescribeResponse, SkSchedulingPolicy};
      13              : use pageserver_api::models::{self, SafekeeperInfo, SafekeepersInfo, TimelineInfo};
      14              : use safekeeper_api::membership::{MemberSet, SafekeeperId};
      15              : use tokio::task::JoinSet;
      16              : use tokio_util::sync::CancellationToken;
      17              : use utils::id::{NodeId, TenantId, TimelineId};
      18              : use utils::logging::SecretString;
      19              : 
      20              : use super::Service;
      21              : 
      22              : impl Service {
      23              :     /// Timeline creation on safekeepers
      24              :     ///
      25              :     /// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
      26              :     /// where `left` contains the list of safekeepers that didn't have a successful response.
      27              :     /// Assumes tenant lock is held while calling this function.
      28            0 :     pub(super) async fn tenant_timeline_create_safekeepers_quorum(
      29            0 :         &self,
      30            0 :         tenant_id: TenantId,
      31            0 :         timeline_id: TimelineId,
      32            0 :         pg_version: u32,
      33            0 :         timeline_persistence: &TimelinePersistence,
      34            0 :     ) -> Result<Vec<NodeId>, ApiError> {
      35            0 :         // If quorum is reached, return if we are outside of a specified timeout
      36            0 :         let jwt = self
      37            0 :             .config
      38            0 :             .safekeeper_jwt_token
      39            0 :             .clone()
      40            0 :             .map(SecretString::from);
      41            0 :         let mut joinset = JoinSet::new();
      42            0 : 
      43            0 :         let safekeepers = {
      44            0 :             let locked = self.inner.read().unwrap();
      45            0 :             locked.safekeepers.clone()
      46            0 :         };
      47            0 : 
      48            0 :         let mut members = Vec::new();
      49            0 :         for sk_id in timeline_persistence.sk_set.iter() {
      50            0 :             let sk_id = NodeId(*sk_id as u64);
      51            0 :             let Some(safekeeper) = safekeepers.get(&sk_id) else {
      52            0 :                 return Err(ApiError::InternalServerError(anyhow::anyhow!(
      53            0 :                     "couldn't find entry for safekeeper with id {sk_id}"
      54            0 :                 )))?;
      55              :             };
      56            0 :             members.push(SafekeeperId {
      57            0 :                 id: sk_id,
      58            0 :                 host: safekeeper.skp.host.clone(),
      59            0 :                 pg_port: safekeeper.skp.port as u16,
      60            0 :             });
      61              :         }
      62            0 :         let mset = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
      63            0 :         let mconf = safekeeper_api::membership::Configuration::new(mset);
      64            0 : 
      65            0 :         let req = safekeeper_api::models::TimelineCreateRequest {
      66            0 :             commit_lsn: None,
      67            0 :             mconf,
      68            0 :             pg_version,
      69            0 :             start_lsn: timeline_persistence.start_lsn.0,
      70            0 :             system_id: None,
      71            0 :             tenant_id,
      72            0 :             timeline_id,
      73            0 :             wal_seg_size: None,
      74            0 :         };
      75              :         const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
      76            0 :         for sk in timeline_persistence.sk_set.iter() {
      77            0 :             let sk_id = NodeId(*sk as u64);
      78            0 :             let safekeepers = safekeepers.clone();
      79            0 :             let jwt = jwt.clone();
      80            0 :             let ssl_ca_cert = self.config.ssl_ca_cert.clone();
      81            0 :             let req = req.clone();
      82            0 :             joinset.spawn(async move {
      83            0 :                 // Unwrap is fine as we already would have returned error above
      84            0 :                 let sk_p = safekeepers.get(&sk_id).unwrap();
      85            0 :                 let res = sk_p
      86            0 :                     .with_client_retries(
      87            0 :                         |client| {
      88            0 :                             let req = req.clone();
      89            0 :                             async move { client.create_timeline(&req).await }
      90            0 :                         },
      91            0 :                         &jwt,
      92            0 :                         &ssl_ca_cert,
      93            0 :                         3,
      94            0 :                         3,
      95            0 :                         SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
      96            0 :                         &CancellationToken::new(),
      97            0 :                     )
      98            0 :                     .await;
      99            0 :                 (sk_id, sk_p.skp.host.clone(), res)
     100            0 :             });
     101            0 :         }
     102              :         // After we have built the joinset, we now wait for the tasks to complete,
     103              :         // but with a specified timeout to make sure we return swiftly, either with
     104              :         // a failure or success.
     105            0 :         let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
     106            0 : 
     107            0 :         // Wait until all tasks finish or timeout is hit, whichever occurs
     108            0 :         // first.
     109            0 :         let mut reconcile_results = Vec::new();
     110              :         loop {
     111            0 :             if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
     112              :             {
     113            0 :                 let Some(res) = res else { break };
     114            0 :                 match res {
     115            0 :                     Ok(res) => {
     116            0 :                         tracing::info!(
     117            0 :                             "response from safekeeper id:{} at {}: {:?}",
     118              :                             res.0,
     119              :                             res.1,
     120              :                             res.2
     121              :                         );
     122            0 :                         reconcile_results.push(res);
     123              :                     }
     124            0 :                     Err(join_err) => {
     125            0 :                         tracing::info!("join_err for task in joinset: {join_err}");
     126              :                     }
     127              :                 }
     128              :             } else {
     129            0 :                 tracing::info!(
     130            0 :                     "timeout for creation call after {} responses",
     131            0 :                     reconcile_results.len()
     132              :                 );
     133            0 :                 break;
     134              :             }
     135              :         }
     136              : 
     137              :         // Now check now if quorum was reached in reconcile_results.
     138            0 :         let total_result_count = reconcile_results.len();
     139            0 :         let remaining = reconcile_results
     140            0 :             .into_iter()
     141            0 :             .filter_map(|res| res.2.is_err().then_some(res.0))
     142            0 :             .collect::<Vec<_>>();
     143            0 :         tracing::info!(
     144            0 :             "Got {} non-successful responses from initial creation request of total {total_result_count} responses",
     145            0 :             remaining.len()
     146              :         );
     147            0 :         if remaining.len() >= 2 {
     148              :             // Failure
     149            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
     150            0 :                 "not enough successful reconciliations to reach quorum, please retry: {} errored",
     151            0 :                 remaining.len()
     152            0 :             )));
     153            0 :         }
     154            0 : 
     155            0 :         Ok(remaining)
     156            0 :     }
     157              : 
     158              :     /// Create timeline in controller database and on safekeepers.
     159              :     /// `timeline_info` is result of timeline creation on pageserver.
     160              :     ///
     161              :     /// All actions must be idempotent as the call is retried until success. It
     162              :     /// tries to create timeline in the db and on at least majority of
     163              :     /// safekeepers + queue creation for safekeepers which missed it in the db
     164              :     /// for infinite retries; after that, call returns Ok.
     165              :     ///
     166              :     /// The idea is that once this is reached as long as we have alive majority
     167              :     /// of safekeepers it is expected to get eventually operational as storcon
     168              :     /// will be able to seed timeline on nodes which missed creation by making
     169              :     /// pull_timeline from peers. On the other hand we don't want to fail
     170              :     /// timeline creation if one safekeeper is down.
     171            0 :     pub(super) async fn tenant_timeline_create_safekeepers(
     172            0 :         self: &Arc<Self>,
     173            0 :         tenant_id: TenantId,
     174            0 :         timeline_info: &TimelineInfo,
     175            0 :         create_mode: models::TimelineCreateRequestMode,
     176            0 :     ) -> Result<SafekeepersInfo, ApiError> {
     177            0 :         let timeline_id = timeline_info.timeline_id;
     178            0 :         let pg_version = timeline_info.pg_version * 10000;
     179              :         // Initially start_lsn is determined by last_record_lsn in pageserver
     180              :         // response as it does initdb. However, later we persist it and in sk
     181              :         // creation calls replace with the value from the timeline row if it
     182              :         // previously existed as on retries in theory endpoint might have
     183              :         // already written some data and advanced last_record_lsn, while we want
     184              :         // safekeepers to have consistent start_lsn.
     185            0 :         let start_lsn = match create_mode {
     186            0 :             models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
     187            0 :             models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
     188              :             models::TimelineCreateRequestMode::ImportPgdata { .. } => {
     189            0 :                 return Err(ApiError::InternalServerError(anyhow::anyhow!(
     190            0 :                     "import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
     191            0 :                 )))?;
     192              :             }
     193              :         };
     194              :         // Choose initial set of safekeepers respecting affinity
     195            0 :         let sks = self.safekeepers_for_new_timeline().await?;
     196            0 :         let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
     197            0 :         // Add timeline to db
     198            0 :         let mut timeline_persist = TimelinePersistence {
     199            0 :             tenant_id: tenant_id.to_string(),
     200            0 :             timeline_id: timeline_id.to_string(),
     201            0 :             start_lsn: start_lsn.into(),
     202            0 :             generation: 0,
     203            0 :             sk_set: sks_persistence.clone(),
     204            0 :             new_sk_set: None,
     205            0 :             cplane_notified_generation: 0,
     206            0 :             deleted_at: None,
     207            0 :         };
     208            0 :         let inserted = self
     209            0 :             .persistence
     210            0 :             .insert_timeline(timeline_persist.clone())
     211            0 :             .await?;
     212            0 :         if !inserted {
     213            0 :             if let Some(existent_persist) = self
     214            0 :                 .persistence
     215            0 :                 .get_timeline(tenant_id, timeline_id)
     216            0 :                 .await?
     217            0 :             {
     218            0 :                 // Replace with what we have in the db, to get stuff like the generation right.
     219            0 :                 // We do still repeat the http calls to the safekeepers. After all, we could have
     220            0 :                 // crashed right after the wrote to the DB.
     221            0 :                 timeline_persist = existent_persist;
     222            0 :             } else {
     223            0 :                 return Err(ApiError::InternalServerError(anyhow::anyhow!(
     224            0 :                     "insertion said timeline already in db, but looking it up, it was gone"
     225            0 :                 )));
     226              :             }
     227            0 :         }
     228              :         // Create the timeline on a quorum of safekeepers
     229            0 :         let remaining = self
     230            0 :             .tenant_timeline_create_safekeepers_quorum(
     231            0 :                 tenant_id,
     232            0 :                 timeline_id,
     233            0 :                 pg_version,
     234            0 :                 &timeline_persist,
     235            0 :             )
     236            0 :             .await?;
     237              : 
     238              :         // For the remaining safekeepers, take care of their reconciliation asynchronously
     239            0 :         for &remaining_id in remaining.iter() {
     240            0 :             let pending_op = TimelinePendingOpPersistence {
     241            0 :                 tenant_id: tenant_id.to_string(),
     242            0 :                 timeline_id: timeline_id.to_string(),
     243            0 :                 generation: timeline_persist.generation,
     244            0 :                 op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
     245            0 :                 sk_id: remaining_id.0 as i64,
     246            0 :             };
     247            0 :             tracing::info!("writing pending op for sk id {remaining_id}");
     248            0 :             self.persistence.insert_pending_op(pending_op).await?;
     249              :         }
     250            0 :         if !remaining.is_empty() {
     251            0 :             let mut locked = self.inner.write().unwrap();
     252            0 :             for remaining_id in remaining {
     253            0 :                 let Some(sk) = locked.safekeepers.get(&remaining_id) else {
     254            0 :                     return Err(ApiError::InternalServerError(anyhow::anyhow!(
     255            0 :                         "Couldn't find safekeeper with id {remaining_id}"
     256            0 :                     )));
     257              :                 };
     258            0 :                 let Ok(host_list) = sks
     259            0 :                     .iter()
     260            0 :                     .map(|sk| {
     261            0 :                         Ok((
     262            0 :                             sk.id,
     263            0 :                             locked
     264            0 :                                 .safekeepers
     265            0 :                                 .get(&sk.id)
     266            0 :                                 .ok_or_else(|| {
     267            0 :                                     ApiError::InternalServerError(anyhow::anyhow!(
     268            0 :                                         "Couldn't find safekeeper with id {remaining_id} to pull from"
     269            0 :                                     ))
     270            0 :                                 })?
     271            0 :                                 .base_url(),
     272              :                         ))
     273            0 :                     })
     274            0 :                     .collect::<Result<_, ApiError>>()
     275              :                 else {
     276            0 :                     continue;
     277              :                 };
     278            0 :                 let req = ScheduleRequest {
     279            0 :                     safekeeper: Box::new(sk.clone()),
     280            0 :                     host_list,
     281            0 :                     tenant_id,
     282            0 :                     timeline_id,
     283            0 :                     generation: timeline_persist.generation as u32,
     284            0 :                     kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
     285            0 :                 };
     286            0 :                 locked.safekeeper_reconcilers.schedule_request(self, req);
     287              :             }
     288            0 :         }
     289              : 
     290            0 :         Ok(SafekeepersInfo {
     291            0 :             generation: timeline_persist.generation as u32,
     292            0 :             safekeepers: sks,
     293            0 :             tenant_id,
     294            0 :             timeline_id,
     295            0 :         })
     296            0 :     }
     297              :     /// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
     298            0 :     pub(super) async fn tenant_timeline_delete_safekeepers(
     299            0 :         self: &Arc<Self>,
     300            0 :         tenant_id: TenantId,
     301            0 :         timeline_id: TimelineId,
     302            0 :     ) -> Result<(), ApiError> {
     303            0 :         let tl = self
     304            0 :             .persistence
     305            0 :             .get_timeline(tenant_id, timeline_id)
     306            0 :             .await?;
     307            0 :         let Some(tl) = tl else {
     308            0 :             tracing::info!(
     309            0 :                 "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed"
     310              :             );
     311            0 :             return Ok(());
     312              :         };
     313            0 :         let all_sks = tl
     314            0 :             .new_sk_set
     315            0 :             .iter()
     316            0 :             .flat_map(|sks| {
     317            0 :                 sks.iter()
     318            0 :                     .map(|sk| (*sk, SafekeeperTimelineOpKind::Exclude))
     319            0 :             })
     320            0 :             .chain(
     321            0 :                 tl.sk_set
     322            0 :                     .iter()
     323            0 :                     .map(|v| (*v, SafekeeperTimelineOpKind::Delete)),
     324            0 :             )
     325            0 :             .collect::<HashMap<_, _>>();
     326            0 : 
     327            0 :         // Schedule reconciliations
     328            0 :         {
     329            0 :             let mut locked = self.inner.write().unwrap();
     330            0 :             for (sk_id, kind) in all_sks {
     331            0 :                 let sk_id = NodeId(sk_id as u64);
     332            0 :                 let Some(sk) = locked.safekeepers.get(&sk_id) else {
     333            0 :                     return Err(ApiError::InternalServerError(anyhow::anyhow!(
     334            0 :                         "Couldn't find safekeeper with id {sk_id}"
     335            0 :                     )));
     336              :                 };
     337              : 
     338            0 :                 let req = ScheduleRequest {
     339            0 :                     safekeeper: Box::new(sk.clone()),
     340            0 :                     // we don't use this for this kind, put a dummy value
     341            0 :                     host_list: Vec::new(),
     342            0 :                     tenant_id,
     343            0 :                     timeline_id,
     344            0 :                     generation: tl.generation as u32,
     345            0 :                     kind,
     346            0 :                 };
     347            0 :                 locked.safekeeper_reconcilers.schedule_request(self, req);
     348              :             }
     349              :         }
     350            0 :         Ok(())
     351            0 :     }
     352              : 
     353              :     /// Choose safekeepers for the new timeline: 3 in different azs.
     354            0 :     pub(crate) async fn safekeepers_for_new_timeline(
     355            0 :         &self,
     356            0 :     ) -> Result<Vec<SafekeeperInfo>, ApiError> {
     357            0 :         // Number of safekeepers in different AZs we are looking for
     358            0 :         let wanted_count = 3;
     359            0 :         let mut all_safekeepers = {
     360            0 :             let locked = self.inner.read().unwrap();
     361            0 :             locked
     362            0 :                 .safekeepers
     363            0 :                 .iter()
     364            0 :                 .filter_map(|sk| {
     365            0 :                     if sk.1.scheduling_policy() != SkSchedulingPolicy::Active {
     366              :                         // If we don't want to schedule stuff onto the safekeeper, respect that.
     367            0 :                         return None;
     368            0 :                     }
     369            0 :                     let utilization_opt = if let SafekeeperState::Available {
     370              :                         last_seen_at: _,
     371            0 :                         utilization,
     372            0 :                     } = sk.1.availability()
     373              :                     {
     374            0 :                         Some(utilization)
     375              :                     } else {
     376              :                         // non-available safekeepers still get a chance for new timelines,
     377              :                         // but put them last in the list.
     378            0 :                         None
     379              :                     };
     380            0 :                     let info = SafekeeperInfo {
     381            0 :                         hostname: sk.1.skp.host.clone(),
     382            0 :                         id: NodeId(sk.1.skp.id as u64),
     383            0 :                     };
     384            0 :                     Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone()))
     385            0 :                 })
     386            0 :                 .collect::<Vec<_>>()
     387            0 :         };
     388            0 :         all_safekeepers.sort_by_key(|sk| {
     389            0 :             (
     390            0 :                 sk.0.as_ref()
     391            0 :                     .map(|ut| ut.timeline_count)
     392            0 :                     .unwrap_or(u64::MAX),
     393            0 :                 // Use the id to decide on equal scores for reliability
     394            0 :                 sk.1.id.0,
     395            0 :             )
     396            0 :         });
     397            0 :         let mut sks = Vec::new();
     398            0 :         let mut azs = HashSet::new();
     399            0 :         for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
     400            0 :             if !azs.insert(az_id) {
     401            0 :                 continue;
     402            0 :             }
     403            0 :             sks.push(sk_info.clone());
     404            0 :             if sks.len() == wanted_count {
     405            0 :                 break;
     406            0 :             }
     407              :         }
     408            0 :         if sks.len() == wanted_count {
     409            0 :             Ok(sks)
     410              :         } else {
     411            0 :             Err(ApiError::InternalServerError(anyhow::anyhow!(
     412            0 :                 "couldn't find {wanted_count} safekeepers in different AZs for new timeline (found: {}, total active: {})",
     413            0 :                 sks.len(),
     414            0 :                 all_safekeepers.len(),
     415            0 :             )))
     416              :         }
     417            0 :     }
     418              : 
     419            0 :     pub(crate) async fn safekeepers_list(
     420            0 :         &self,
     421            0 :     ) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
     422            0 :         let locked = self.inner.read().unwrap();
     423            0 :         let mut list = locked
     424            0 :             .safekeepers
     425            0 :             .iter()
     426            0 :             .map(|sk| sk.1.describe_response())
     427            0 :             .collect::<Result<Vec<_>, _>>()?;
     428            0 :         list.sort_by_key(|v| v.id);
     429            0 :         Ok(list)
     430            0 :     }
     431              : 
     432            0 :     pub(crate) async fn get_safekeeper(
     433            0 :         &self,
     434            0 :         id: i64,
     435            0 :     ) -> Result<SafekeeperDescribeResponse, DatabaseError> {
     436            0 :         let locked = self.inner.read().unwrap();
     437            0 :         let sk = locked
     438            0 :             .safekeepers
     439            0 :             .get(&NodeId(id as u64))
     440            0 :             .ok_or(diesel::result::Error::NotFound)?;
     441            0 :         sk.describe_response()
     442            0 :     }
     443              : 
     444            0 :     pub(crate) async fn upsert_safekeeper(
     445            0 :         &self,
     446            0 :         record: crate::persistence::SafekeeperUpsert,
     447            0 :     ) -> Result<(), ApiError> {
     448            0 :         let node_id = NodeId(record.id as u64);
     449            0 :         let use_https = self.config.use_https_safekeeper_api;
     450            0 : 
     451            0 :         if use_https && record.https_port.is_none() {
     452            0 :             return Err(ApiError::PreconditionFailed(
     453            0 :                 format!(
     454            0 :                     "cannot upsert safekeeper {node_id}: \
     455            0 :                     https is enabled, but https port is not specified"
     456            0 :                 )
     457            0 :                 .into(),
     458            0 :             ));
     459            0 :         }
     460            0 : 
     461            0 :         self.persistence.safekeeper_upsert(record.clone()).await?;
     462              :         {
     463            0 :             let mut locked = self.inner.write().unwrap();
     464            0 :             let mut safekeepers = (*locked.safekeepers).clone();
     465            0 :             match safekeepers.entry(node_id) {
     466            0 :                 std::collections::hash_map::Entry::Occupied(mut entry) => entry
     467            0 :                     .get_mut()
     468            0 :                     .update_from_record(record)
     469            0 :                     .expect("all preconditions should be checked before upsert to database"),
     470            0 :                 std::collections::hash_map::Entry::Vacant(entry) => {
     471            0 :                     entry.insert(
     472            0 :                         Safekeeper::from_persistence(
     473            0 :                             crate::persistence::SafekeeperPersistence::from_upsert(
     474            0 :                                 record,
     475            0 :                                 SkSchedulingPolicy::Pause,
     476            0 :                             ),
     477            0 :                             CancellationToken::new(),
     478            0 :                             use_https,
     479            0 :                         )
     480            0 :                         .expect("all preconditions should be checked before upsert to database"),
     481            0 :                     );
     482            0 :                 }
     483              :             }
     484            0 :             locked.safekeepers = Arc::new(safekeepers);
     485            0 :         }
     486            0 :         Ok(())
     487            0 :     }
     488              : 
     489            0 :     pub(crate) async fn set_safekeeper_scheduling_policy(
     490            0 :         &self,
     491            0 :         id: i64,
     492            0 :         scheduling_policy: SkSchedulingPolicy,
     493            0 :     ) -> Result<(), DatabaseError> {
     494            0 :         self.persistence
     495            0 :             .set_safekeeper_scheduling_policy(id, scheduling_policy)
     496            0 :             .await?;
     497            0 :         let node_id = NodeId(id as u64);
     498            0 :         // After the change has been persisted successfully, update the in-memory state
     499            0 :         {
     500            0 :             let mut locked = self.inner.write().unwrap();
     501            0 :             let mut safekeepers = (*locked.safekeepers).clone();
     502            0 :             let sk = safekeepers
     503            0 :                 .get_mut(&node_id)
     504            0 :                 .ok_or(DatabaseError::Logical("Not found".to_string()))?;
     505            0 :             sk.set_scheduling_policy(scheduling_policy);
     506            0 : 
     507            0 :             match scheduling_policy {
     508            0 :                 SkSchedulingPolicy::Active => (),
     509            0 :                 SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => {
     510            0 :                     locked.safekeeper_reconcilers.cancel_safekeeper(node_id);
     511            0 :                 }
     512              :             }
     513              : 
     514            0 :             locked.safekeepers = Arc::new(safekeepers);
     515            0 :         }
     516            0 :         Ok(())
     517            0 :     }
     518              : }
        

Generated by: LCOV version 2.1-beta