LCOV - code coverage report
Current view: top level - storage_controller/src/service - safekeeper_service.rs (source / functions) Coverage Total Hit
Test: c53b4545c8d86170903275d60fd25954560b5db8.info Lines: 0.0 % 1156 0
Test Date: 2025-07-29 09:21:06 Functions: 0.0 % 115 0

            Line data    Source code
       1              : use std::collections::HashSet;
       2              : use std::str::FromStr;
       3              : use std::sync::Arc;
       4              : use std::time::Duration;
       5              : 
       6              : use super::safekeeper_reconciler::ScheduleRequest;
       7              : use crate::compute_hook;
       8              : use crate::heartbeater::SafekeeperState;
       9              : use crate::id_lock_map::trace_shared_lock;
      10              : use crate::metrics;
      11              : use crate::persistence::{
      12              :     DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
      13              :     TimelineUpdate,
      14              : };
      15              : use crate::safekeeper::Safekeeper;
      16              : use crate::safekeeper_client::SafekeeperClient;
      17              : use crate::service::TenantOperations;
      18              : use crate::timeline_import::TimelineImportFinalizeError;
      19              : use anyhow::Context;
      20              : use http_utils::error::ApiError;
      21              : use pageserver_api::controller_api::{
      22              :     SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
      23              :     TimelineSafekeeperMigrateRequest,
      24              : };
      25              : use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
      26              : use safekeeper_api::PgVersionId;
      27              : use safekeeper_api::membership::{self, MemberSet, SafekeeperGeneration};
      28              : use safekeeper_api::models::{
      29              :     PullTimelineRequest, TimelineLocateResponse, TimelineMembershipSwitchRequest,
      30              :     TimelineMembershipSwitchResponse,
      31              : };
      32              : use safekeeper_api::{INITIAL_TERM, Term};
      33              : use safekeeper_client::mgmt_api;
      34              : use tokio::task::JoinSet;
      35              : use tokio_util::sync::CancellationToken;
      36              : use utils::id::{NodeId, TenantId, TimelineId};
      37              : use utils::logging::SecretString;
      38              : use utils::lsn::Lsn;
      39              : 
      40              : use super::Service;
      41              : 
      42              : impl Service {
      43            0 :     fn make_member_set(safekeepers: &[Safekeeper]) -> Result<MemberSet, anyhow::Error> {
      44            0 :         let members = safekeepers
      45            0 :             .iter()
      46            0 :             .map(|sk| sk.get_safekeeper_id())
      47            0 :             .collect::<Vec<_>>();
      48              : 
      49            0 :         MemberSet::new(members)
      50            0 :     }
      51              : 
      52            0 :     fn get_safekeepers(&self, ids: &[i64]) -> Result<Vec<Safekeeper>, ApiError> {
      53            0 :         let safekeepers = {
      54            0 :             let locked = self.inner.read().unwrap();
      55            0 :             locked.safekeepers.clone()
      56              :         };
      57              : 
      58            0 :         ids.iter()
      59            0 :             .map(|&id| {
      60            0 :                 let node_id = NodeId(id as u64);
      61            0 :                 safekeepers.get(&node_id).cloned().ok_or_else(|| {
      62            0 :                     ApiError::InternalServerError(anyhow::anyhow!(
      63            0 :                         "safekeeper {node_id} is not registered"
      64            0 :                     ))
      65            0 :                 })
      66            0 :             })
      67            0 :             .collect::<Result<Vec<_>, _>>()
      68            0 :     }
      69              : 
      70              :     /// Timeline creation on safekeepers
      71              :     ///
      72              :     /// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
      73              :     /// where `left` contains the list of safekeepers that didn't have a successful response.
      74              :     /// Assumes tenant lock is held while calling this function.
      75            0 :     pub(super) async fn tenant_timeline_create_safekeepers_quorum(
      76            0 :         &self,
      77            0 :         tenant_id: TenantId,
      78            0 :         timeline_id: TimelineId,
      79            0 :         pg_version: PgVersionId,
      80            0 :         timeline_persistence: &TimelinePersistence,
      81            0 :     ) -> Result<Vec<NodeId>, ApiError> {
      82            0 :         let safekeepers = self.get_safekeepers(&timeline_persistence.sk_set)?;
      83              : 
      84            0 :         let mset = Self::make_member_set(&safekeepers).map_err(ApiError::InternalServerError)?;
      85            0 :         let mconf = safekeeper_api::membership::Configuration::new(mset);
      86              : 
      87            0 :         let req = safekeeper_api::models::TimelineCreateRequest {
      88            0 :             commit_lsn: None,
      89            0 :             mconf,
      90            0 :             pg_version,
      91            0 :             start_lsn: timeline_persistence.start_lsn.0,
      92            0 :             system_id: None,
      93            0 :             tenant_id,
      94            0 :             timeline_id,
      95            0 :             wal_seg_size: None,
      96            0 :         };
      97              : 
      98              :         const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
      99              : 
     100            0 :         let results = self
     101            0 :             .tenant_timeline_safekeeper_op_quorum(
     102            0 :                 &safekeepers,
     103            0 :                 move |client| {
     104            0 :                     let req = req.clone();
     105            0 :                     async move { client.create_timeline(&req).await }
     106            0 :                 },
     107              :                 SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
     108              :             )
     109            0 :             .await?;
     110              : 
     111            0 :         Ok(results
     112            0 :             .into_iter()
     113            0 :             .enumerate()
     114            0 :             .filter_map(|(idx, res)| {
     115            0 :                 if res.is_ok() {
     116            0 :                     None // Success, don't return this safekeeper
     117              :                 } else {
     118            0 :                     Some(safekeepers[idx].get_id()) // Failure, return this safekeeper
     119              :                 }
     120            0 :             })
     121            0 :             .collect::<Vec<_>>())
     122            0 :     }
     123              : 
     124              :     /// Perform an operation on a list of safekeepers in parallel with retries.
     125              :     ///
     126              :     /// If desired_success_count is set, the remaining operations will be cancelled
     127              :     /// when the desired number of successful responses is reached.
     128              :     ///
     129              :     /// Return the results of the operation on each safekeeper in the input order.
     130            0 :     async fn tenant_timeline_safekeeper_op<T, O, F>(
     131            0 :         &self,
     132            0 :         safekeepers: &[Safekeeper],
     133            0 :         op: O,
     134            0 :         max_retries: u32,
     135            0 :         timeout: Duration,
     136            0 :         desired_success_count: Option<usize>,
     137            0 :     ) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
     138            0 :     where
     139            0 :         O: FnMut(SafekeeperClient) -> F + Send + 'static,
     140            0 :         O: Clone,
     141            0 :         F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
     142            0 :         T: Sync + Send + 'static,
     143            0 :     {
     144            0 :         let warn_threshold = std::cmp::min(3, max_retries);
     145            0 :         let jwt = self
     146            0 :             .config
     147            0 :             .safekeeper_jwt_token
     148            0 :             .clone()
     149            0 :             .map(SecretString::from);
     150            0 :         let mut joinset = JoinSet::new();
     151              : 
     152            0 :         let cancel_new_retries = CancellationToken::new();
     153              : 
     154            0 :         for (idx, sk) in safekeepers.iter().enumerate() {
     155            0 :             let sk = sk.clone();
     156            0 :             let http_client = self.http_client.clone();
     157            0 :             let jwt = jwt.clone();
     158            0 :             let op = op.clone();
     159            0 :             let cancel_new_retries = cancel_new_retries.clone();
     160            0 :             joinset.spawn(async move {
     161            0 :                 let res = sk
     162            0 :                     .with_client_retries(
     163            0 :                         op,
     164            0 :                         &http_client,
     165            0 :                         &jwt,
     166            0 :                         warn_threshold,
     167            0 :                         max_retries,
     168            0 :                         // TODO(diko): This is a wrong timeout.
     169            0 :                         // It should be scaled to the retry count.
     170            0 :                         timeout,
     171            0 :                         &cancel_new_retries,
     172            0 :                     )
     173            0 :                     .await;
     174            0 :                 (idx, res)
     175            0 :             });
     176              :         }
     177              : 
     178              :         // Initialize results with timeout errors in case we never get a response.
     179            0 :         let mut results: Vec<mgmt_api::Result<T>> = safekeepers
     180            0 :             .iter()
     181            0 :             .map(|_| {
     182            0 :                 Err(mgmt_api::Error::Timeout(
     183            0 :                     "safekeeper operation timed out".to_string(),
     184            0 :                 ))
     185            0 :             })
     186            0 :             .collect();
     187              : 
     188              :         // After we have built the joinset, we now wait for the tasks to complete,
     189              :         // but with a specified timeout to make sure we return swiftly, either with
     190              :         // a failure or success.
     191            0 :         let reconcile_deadline = tokio::time::Instant::now() + timeout;
     192              : 
     193              :         // Wait until all tasks finish or timeout is hit, whichever occurs
     194              :         // first.
     195            0 :         let mut result_count = 0;
     196            0 :         let mut success_count = 0;
     197              :         loop {
     198            0 :             if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
     199              :             {
     200            0 :                 let Some(res) = res else { break };
     201            0 :                 match res {
     202            0 :                     Ok((idx, res)) => {
     203            0 :                         let sk = &safekeepers[idx];
     204            0 :                         tracing::info!(
     205            0 :                             "response from safekeeper id:{} at {}: {:?}",
     206            0 :                             sk.get_id(),
     207              :                             sk.skp.host,
     208              :                             // Only print errors, as there is no Debug trait for T.
     209            0 :                             res.as_ref().map(|_| ()),
     210              :                         );
     211            0 :                         if res.is_ok() {
     212            0 :                             success_count += 1;
     213            0 :                             if desired_success_count == Some(success_count) {
     214            0 :                                 // We reached the desired number of successful responses, cancel new retries for
     215            0 :                                 // the remaining safekeepers.
     216            0 :                                 // It does not cancel already started requests, we will still wait for them.
     217            0 :                                 cancel_new_retries.cancel();
     218            0 :                             }
     219            0 :                         }
     220            0 :                         results[idx] = res;
     221            0 :                         result_count += 1;
     222              :                     }
     223            0 :                     Err(join_err) => {
     224            0 :                         tracing::info!("join_err for task in joinset: {join_err}");
     225              :                     }
     226              :                 }
     227              :             } else {
     228            0 :                 tracing::info!("timeout for operation call after {result_count} responses",);
     229            0 :                 break;
     230              :             }
     231              :         }
     232              : 
     233            0 :         Ok(results)
     234            0 :     }
     235              : 
     236              :     /// Perform an operation on a list of safekeepers in parallel with retries,
     237              :     /// and validates that we reach a quorum of successful responses.
     238              :     ///
     239              :     /// Return the results of the operation on each safekeeper in the input order.
     240              :     /// It's guaranteed that at least a quorum of the responses are successful.
     241            0 :     async fn tenant_timeline_safekeeper_op_quorum<T, O, F>(
     242            0 :         &self,
     243            0 :         safekeepers: &[Safekeeper],
     244            0 :         op: O,
     245            0 :         timeout: Duration,
     246            0 :     ) -> Result<Vec<mgmt_api::Result<T>>, ApiError>
     247            0 :     where
     248            0 :         O: FnMut(SafekeeperClient) -> F,
     249            0 :         O: Clone + Send + 'static,
     250            0 :         F: std::future::Future<Output = mgmt_api::Result<T>> + Send + 'static,
     251            0 :         T: Sync + Send + 'static,
     252            0 :     {
     253            0 :         let target_sk_count = safekeepers.len();
     254              : 
     255            0 :         if target_sk_count == 0 {
     256            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
     257            0 :                 "timeline configured without any safekeepers"
     258            0 :             )));
     259            0 :         }
     260              : 
     261            0 :         if target_sk_count < self.config.timeline_safekeeper_count {
     262            0 :             tracing::warn!(
     263            0 :                 "running a quorum operation with {} safekeepers, which is less than configured {} safekeepers per timeline",
     264              :                 target_sk_count,
     265              :                 self.config.timeline_safekeeper_count
     266              :             );
     267            0 :         }
     268              : 
     269            0 :         let quorum_size = target_sk_count / 2 + 1;
     270            0 :         let max_retries = 3;
     271              : 
     272            0 :         let results = self
     273            0 :             .tenant_timeline_safekeeper_op(safekeepers, op, max_retries, timeout, Some(quorum_size))
     274            0 :             .await?;
     275              : 
     276              :         // Now check if quorum was reached in results.
     277            0 :         let success_count = results.iter().filter(|res| res.is_ok()).count();
     278            0 :         if success_count < quorum_size {
     279              :             // Failure
     280            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
     281            0 :                 "not enough successful reconciliations to reach quorum size: {success_count} of {quorum_size} of total {target_sk_count}"
     282            0 :             )));
     283            0 :         }
     284              : 
     285            0 :         Ok(results)
     286            0 :     }
     287              : 
     288              :     /// Create timeline in controller database and on safekeepers.
     289              :     /// `timeline_info` is result of timeline creation on pageserver.
     290              :     ///
     291              :     /// All actions must be idempotent as the call is retried until success. It
     292              :     /// tries to create timeline in the db and on at least majority of
     293              :     /// safekeepers + queue creation for safekeepers which missed it in the db
     294              :     /// for infinite retries; after that, call returns Ok.
     295              :     ///
     296              :     /// The idea is that once this is reached as long as we have alive majority
     297              :     /// of safekeepers it is expected to get eventually operational as storcon
     298              :     /// will be able to seed timeline on nodes which missed creation by making
     299              :     /// pull_timeline from peers. On the other hand we don't want to fail
     300              :     /// timeline creation if one safekeeper is down.
     301            0 :     pub(super) async fn tenant_timeline_create_safekeepers(
     302            0 :         self: &Arc<Self>,
     303            0 :         tenant_id: TenantId,
     304            0 :         timeline_info: &TimelineInfo,
     305            0 :         read_only: bool,
     306            0 :     ) -> Result<SafekeepersInfo, ApiError> {
     307            0 :         let timeline_id = timeline_info.timeline_id;
     308            0 :         let pg_version = PgVersionId::from(timeline_info.pg_version);
     309              :         // Initially start_lsn is determined by last_record_lsn in pageserver
     310              :         // response as it does initdb. However, later we persist it and in sk
     311              :         // creation calls replace with the value from the timeline row if it
     312              :         // previously existed as on retries in theory endpoint might have
     313              :         // already written some data and advanced last_record_lsn, while we want
     314              :         // safekeepers to have consistent start_lsn.
     315            0 :         let start_lsn = timeline_info.last_record_lsn;
     316              : 
     317              :         // Choose initial set of safekeepers respecting affinity
     318            0 :         let sks = if !read_only {
     319            0 :             self.safekeepers_for_new_timeline().await?
     320              :         } else {
     321            0 :             Vec::new()
     322              :         };
     323            0 :         let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
     324              :         // Add timeline to db
     325            0 :         let mut timeline_persist = TimelinePersistence {
     326            0 :             tenant_id: tenant_id.to_string(),
     327            0 :             timeline_id: timeline_id.to_string(),
     328            0 :             start_lsn: start_lsn.into(),
     329            0 :             generation: 1,
     330            0 :             sk_set: sks_persistence.clone(),
     331            0 :             new_sk_set: None,
     332            0 :             cplane_notified_generation: 0,
     333            0 :             deleted_at: None,
     334            0 :             sk_set_notified_generation: 0,
     335            0 :         };
     336            0 :         let inserted = self
     337            0 :             .persistence
     338            0 :             .insert_timeline(timeline_persist.clone())
     339            0 :             .await?;
     340            0 :         if !inserted {
     341            0 :             if let Some(existent_persist) = self
     342            0 :                 .persistence
     343            0 :                 .get_timeline(tenant_id, timeline_id)
     344            0 :                 .await?
     345            0 :             {
     346            0 :                 // Replace with what we have in the db, to get stuff like the generation right.
     347            0 :                 // We do still repeat the http calls to the safekeepers. After all, we could have
     348            0 :                 // crashed right after the wrote to the DB.
     349            0 :                 timeline_persist = existent_persist;
     350            0 :             } else {
     351            0 :                 return Err(ApiError::InternalServerError(anyhow::anyhow!(
     352            0 :                     "insertion said timeline already in db, but looking it up, it was gone"
     353            0 :                 )));
     354              :             }
     355            0 :         }
     356            0 :         let ret = SafekeepersInfo {
     357            0 :             generation: timeline_persist.generation as u32,
     358            0 :             safekeepers: sks.clone(),
     359            0 :             tenant_id,
     360            0 :             timeline_id,
     361            0 :         };
     362            0 :         if read_only {
     363            0 :             return Ok(ret);
     364            0 :         }
     365              : 
     366              :         // Create the timeline on a quorum of safekeepers
     367            0 :         let remaining = self
     368            0 :             .tenant_timeline_create_safekeepers_quorum(
     369            0 :                 tenant_id,
     370            0 :                 timeline_id,
     371            0 :                 pg_version,
     372            0 :                 &timeline_persist,
     373            0 :             )
     374            0 :             .await?;
     375              : 
     376              :         // For the remaining safekeepers, take care of their reconciliation asynchronously
     377            0 :         for &remaining_id in remaining.iter() {
     378            0 :             let pending_op = TimelinePendingOpPersistence {
     379            0 :                 tenant_id: tenant_id.to_string(),
     380            0 :                 timeline_id: timeline_id.to_string(),
     381            0 :                 generation: timeline_persist.generation,
     382            0 :                 op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
     383            0 :                 sk_id: remaining_id.0 as i64,
     384            0 :             };
     385            0 :             tracing::info!("writing pending op for sk id {remaining_id}");
     386            0 :             self.persistence.insert_pending_op(pending_op).await?;
     387              :         }
     388            0 :         if !remaining.is_empty() {
     389            0 :             let locked = self.inner.read().unwrap();
     390            0 :             for remaining_id in remaining {
     391            0 :                 let Some(sk) = locked.safekeepers.get(&remaining_id) else {
     392            0 :                     return Err(ApiError::InternalServerError(anyhow::anyhow!(
     393            0 :                         "Couldn't find safekeeper with id {remaining_id}"
     394            0 :                     )));
     395              :                 };
     396            0 :                 let Ok(host_list) = sks
     397            0 :                     .iter()
     398            0 :                     .map(|sk| {
     399              :                         Ok((
     400            0 :                             sk.id,
     401            0 :                             locked
     402            0 :                                 .safekeepers
     403            0 :                                 .get(&sk.id)
     404            0 :                                 .ok_or_else(|| {
     405            0 :                                     ApiError::InternalServerError(anyhow::anyhow!(
     406            0 :                                         "Couldn't find safekeeper with id {} to pull from",
     407            0 :                                         sk.id
     408            0 :                                     ))
     409            0 :                                 })?
     410            0 :                                 .base_url(),
     411              :                         ))
     412            0 :                     })
     413            0 :                     .collect::<Result<_, ApiError>>()
     414              :                 else {
     415            0 :                     continue;
     416              :                 };
     417            0 :                 let req = ScheduleRequest {
     418            0 :                     safekeeper: Box::new(sk.clone()),
     419            0 :                     host_list,
     420            0 :                     tenant_id,
     421            0 :                     timeline_id: Some(timeline_id),
     422            0 :                     generation: timeline_persist.generation as u32,
     423            0 :                     kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
     424            0 :                 };
     425            0 :                 locked.safekeeper_reconcilers.schedule_request(req);
     426              :             }
     427            0 :         }
     428              : 
     429            0 :         Ok(ret)
     430            0 :     }
     431              : 
     432            0 :     pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
     433            0 :         self: &Arc<Self>,
     434            0 :         tenant_id: TenantId,
     435            0 :         timeline_info: TimelineInfo,
     436            0 :     ) -> Result<(), TimelineImportFinalizeError> {
     437              :         const BACKOFF: Duration = Duration::from_secs(5);
     438              : 
     439              :         loop {
     440            0 :             if self.cancel.is_cancelled() {
     441            0 :                 return Err(TimelineImportFinalizeError::ShuttingDown);
     442            0 :             }
     443              : 
     444              :             // This function is only used in non-read-only scenarios
     445            0 :             let read_only = false;
     446            0 :             let res = self
     447            0 :                 .tenant_timeline_create_safekeepers(tenant_id, &timeline_info, read_only)
     448            0 :                 .await;
     449              : 
     450            0 :             match res {
     451              :                 Ok(_) => {
     452            0 :                     tracing::info!("Timeline created on safekeepers");
     453            0 :                     break;
     454              :                 }
     455            0 :                 Err(err) => {
     456            0 :                     tracing::error!("Failed to create timeline on safekeepers: {err}");
     457            0 :                     tokio::select! {
     458            0 :                         _ = self.cancel.cancelled() => {
     459            0 :                             return Err(TimelineImportFinalizeError::ShuttingDown);
     460              :                         },
     461            0 :                         _ = tokio::time::sleep(BACKOFF) => {}
     462              :                     };
     463              :                 }
     464              :             }
     465              :         }
     466              : 
     467            0 :         Ok(())
     468            0 :     }
     469              : 
     470              :     /// Directly insert the timeline into the database without reconciling it with safekeepers.
     471              :     ///
     472              :     /// Useful if the timeline already exists on the specified safekeepers,
     473              :     /// but we want to make it storage controller managed.
     474            0 :     pub(crate) async fn timeline_import(&self, req: TimelineImportRequest) -> Result<(), ApiError> {
     475            0 :         let persistence = TimelinePersistence {
     476            0 :             tenant_id: req.tenant_id.to_string(),
     477            0 :             timeline_id: req.timeline_id.to_string(),
     478            0 :             start_lsn: req.start_lsn.into(),
     479              :             generation: 1,
     480            0 :             sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
     481            0 :             new_sk_set: None,
     482              :             cplane_notified_generation: 1,
     483            0 :             deleted_at: None,
     484              :             sk_set_notified_generation: 1,
     485              :         };
     486            0 :         let inserted = self
     487            0 :             .persistence
     488            0 :             .insert_timeline(persistence.clone())
     489            0 :             .await?;
     490            0 :         if inserted {
     491            0 :             tracing::info!("imported timeline into db");
     492            0 :             return Ok(());
     493            0 :         }
     494            0 :         tracing::info!("timeline already present in db, updating");
     495              : 
     496            0 :         let update = TimelineUpdate {
     497            0 :             tenant_id: persistence.tenant_id,
     498            0 :             timeline_id: persistence.timeline_id,
     499            0 :             start_lsn: persistence.start_lsn,
     500            0 :             sk_set: persistence.sk_set,
     501            0 :             new_sk_set: persistence.new_sk_set,
     502            0 :         };
     503            0 :         self.persistence.update_timeline_unsafe(update).await?;
     504            0 :         tracing::info!("timeline updated");
     505              : 
     506            0 :         Ok(())
     507            0 :     }
     508              : 
     509              :     /// Locate safekeepers for a timeline.
     510              :     /// Return the generation, sk_set and new_sk_set if present.
     511              :     /// If the timeline is not storcon-managed, return NotFound.
     512            0 :     pub(crate) async fn tenant_timeline_locate(
     513            0 :         &self,
     514            0 :         tenant_id: TenantId,
     515            0 :         timeline_id: TimelineId,
     516            0 :     ) -> Result<TimelineLocateResponse, ApiError> {
     517            0 :         let timeline = self
     518            0 :             .persistence
     519            0 :             .get_timeline(tenant_id, timeline_id)
     520            0 :             .await?;
     521              : 
     522            0 :         let Some(timeline) = timeline else {
     523            0 :             return Err(ApiError::NotFound(
     524            0 :                 anyhow::anyhow!("Timeline {}/{} not found", tenant_id, timeline_id).into(),
     525            0 :             ));
     526              :         };
     527              : 
     528              :         Ok(TimelineLocateResponse {
     529            0 :             generation: SafekeeperGeneration::new(timeline.generation as u32),
     530            0 :             sk_set: timeline
     531            0 :                 .sk_set
     532            0 :                 .iter()
     533            0 :                 .map(|id| NodeId(*id as u64))
     534            0 :                 .collect(),
     535            0 :             new_sk_set: timeline
     536            0 :                 .new_sk_set
     537            0 :                 .map(|sk_set| sk_set.iter().map(|id| NodeId(*id as u64)).collect()),
     538              :         })
     539            0 :     }
     540              : 
     541              :     /// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
     542            0 :     pub(super) async fn tenant_timeline_delete_safekeepers(
     543            0 :         self: &Arc<Self>,
     544            0 :         tenant_id: TenantId,
     545            0 :         timeline_id: TimelineId,
     546            0 :     ) -> Result<(), ApiError> {
     547            0 :         let tl = self
     548            0 :             .persistence
     549            0 :             .get_timeline(tenant_id, timeline_id)
     550            0 :             .await?;
     551            0 :         let Some(tl) = tl else {
     552            0 :             tracing::info!(
     553            0 :                 "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed"
     554              :             );
     555            0 :             return Ok(());
     556              :         };
     557            0 :         self.persistence
     558            0 :             .timeline_set_deleted_at(tenant_id, timeline_id)
     559            0 :             .await?;
     560            0 :         let all_sks = tl
     561            0 :             .new_sk_set
     562            0 :             .iter()
     563            0 :             .flatten()
     564            0 :             .chain(tl.sk_set.iter())
     565            0 :             .collect::<HashSet<_>>();
     566              : 
     567              :         // The timeline has no safekeepers: we need to delete it from the db manually,
     568              :         // as no safekeeper reconciler will get to it
     569            0 :         if all_sks.is_empty() {
     570            0 :             if let Err(err) = self
     571            0 :                 .persistence
     572            0 :                 .delete_timeline(tenant_id, timeline_id)
     573            0 :                 .await
     574              :             {
     575            0 :                 tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
     576            0 :             }
     577            0 :         }
     578              : 
     579              :         // Schedule reconciliations
     580            0 :         for &sk_id in all_sks.iter() {
     581            0 :             let pending_op = TimelinePendingOpPersistence {
     582            0 :                 tenant_id: tenant_id.to_string(),
     583            0 :                 timeline_id: timeline_id.to_string(),
     584            0 :                 generation: i32::MAX,
     585            0 :                 op_kind: SafekeeperTimelineOpKind::Delete,
     586            0 :                 sk_id: *sk_id,
     587            0 :             };
     588            0 :             tracing::info!("writing pending op for sk id {sk_id}");
     589            0 :             self.persistence.insert_pending_op(pending_op).await?;
     590              :         }
     591              :         {
     592            0 :             let locked = self.inner.read().unwrap();
     593            0 :             for sk_id in all_sks {
     594            0 :                 let sk_id = NodeId(*sk_id as u64);
     595            0 :                 let Some(sk) = locked.safekeepers.get(&sk_id) else {
     596            0 :                     return Err(ApiError::InternalServerError(anyhow::anyhow!(
     597            0 :                         "Couldn't find safekeeper with id {sk_id}"
     598            0 :                     )));
     599              :                 };
     600              : 
     601            0 :                 let req = ScheduleRequest {
     602            0 :                     safekeeper: Box::new(sk.clone()),
     603            0 :                     // we don't use this for this kind, put a dummy value
     604            0 :                     host_list: Vec::new(),
     605            0 :                     tenant_id,
     606            0 :                     timeline_id: Some(timeline_id),
     607            0 :                     generation: tl.generation as u32,
     608            0 :                     kind: SafekeeperTimelineOpKind::Delete,
     609            0 :                 };
     610            0 :                 locked.safekeeper_reconcilers.schedule_request(req);
     611              :             }
     612              :         }
     613            0 :         Ok(())
     614            0 :     }
     615              : 
     616              :     /// Perform tenant deletion on safekeepers.
     617            0 :     pub(super) async fn tenant_delete_safekeepers(
     618            0 :         self: &Arc<Self>,
     619            0 :         tenant_id: TenantId,
     620            0 :     ) -> Result<(), ApiError> {
     621            0 :         let timeline_list = self
     622            0 :             .persistence
     623            0 :             .list_timelines_for_tenant(tenant_id)
     624            0 :             .await?;
     625              : 
     626            0 :         if timeline_list.is_empty() {
     627              :             // Early exit: the tenant is either empty or not migrated to the storcon yet
     628            0 :             tracing::info!("Skipping tenant delete as the timeline doesn't exist in db");
     629            0 :             return Ok(());
     630            0 :         }
     631              : 
     632            0 :         let timeline_list = timeline_list
     633            0 :             .into_iter()
     634            0 :             .map(|timeline| {
     635            0 :                 let timeline_id = TimelineId::from_str(&timeline.timeline_id)
     636            0 :                     .context("timeline id loaded from db")
     637            0 :                     .map_err(ApiError::InternalServerError)?;
     638            0 :                 Ok((timeline_id, timeline))
     639            0 :             })
     640            0 :             .collect::<Result<Vec<_>, ApiError>>()?;
     641              : 
     642              :         // Remove pending ops from db, and set `deleted_at`.
     643              :         // We cancel them in a later iteration once we hold the state lock.
     644            0 :         for (timeline_id, _timeline) in timeline_list.iter() {
     645            0 :             self.persistence
     646            0 :                 .remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id))
     647            0 :                 .await?;
     648            0 :             self.persistence
     649            0 :                 .timeline_set_deleted_at(tenant_id, *timeline_id)
     650            0 :                 .await?;
     651              :         }
     652              : 
     653              :         // The list of safekeepers that have any of the timelines
     654            0 :         let mut sk_list = HashSet::new();
     655              : 
     656              :         // List all pending ops for all timelines, cancel them
     657            0 :         for (_timeline_id, timeline) in timeline_list.iter() {
     658            0 :             let sk_iter = timeline
     659            0 :                 .sk_set
     660            0 :                 .iter()
     661            0 :                 .chain(timeline.new_sk_set.iter().flatten())
     662            0 :                 .map(|id| NodeId(*id as u64));
     663            0 :             sk_list.extend(sk_iter);
     664              :         }
     665              : 
     666            0 :         for &sk_id in sk_list.iter() {
     667            0 :             let pending_op = TimelinePendingOpPersistence {
     668            0 :                 tenant_id: tenant_id.to_string(),
     669            0 :                 timeline_id: String::new(),
     670            0 :                 generation: i32::MAX,
     671            0 :                 op_kind: SafekeeperTimelineOpKind::Delete,
     672            0 :                 sk_id: sk_id.0 as i64,
     673            0 :             };
     674            0 :             tracing::info!("writing pending op for sk id {sk_id}");
     675            0 :             self.persistence.insert_pending_op(pending_op).await?;
     676              :         }
     677              : 
     678            0 :         let mut locked = self.inner.write().unwrap();
     679              : 
     680            0 :         for (timeline_id, _timeline) in timeline_list.iter() {
     681            0 :             for sk_id in sk_list.iter() {
     682            0 :                 locked
     683            0 :                     .safekeeper_reconcilers
     684            0 :                     .cancel_reconciles_for_timeline(*sk_id, tenant_id, Some(*timeline_id));
     685            0 :             }
     686              :         }
     687              : 
     688              :         // unwrap is safe: we return above for an empty timeline list
     689            0 :         let max_generation = timeline_list
     690            0 :             .iter()
     691            0 :             .map(|(_tl_id, tl)| tl.generation as u32)
     692            0 :             .max()
     693            0 :             .unwrap();
     694              : 
     695            0 :         for sk_id in sk_list {
     696            0 :             let Some(safekeeper) = locked.safekeepers.get(&sk_id) else {
     697            0 :                 tracing::warn!("Couldn't find safekeeper with id {sk_id}");
     698            0 :                 continue;
     699              :             };
     700              :             // Add pending op for tenant deletion
     701            0 :             let req = ScheduleRequest {
     702            0 :                 generation: max_generation,
     703            0 :                 host_list: Vec::new(),
     704            0 :                 kind: SafekeeperTimelineOpKind::Delete,
     705            0 :                 safekeeper: Box::new(safekeeper.clone()),
     706            0 :                 tenant_id,
     707            0 :                 timeline_id: None,
     708            0 :             };
     709            0 :             locked.safekeeper_reconcilers.schedule_request(req);
     710              :         }
     711            0 :         Ok(())
     712            0 :     }
     713              : 
     714              :     /// Choose safekeepers for the new timeline in different azs.
     715              :     /// 3 are choosen by default, but may be configured via config (for testing).
     716            0 :     pub(crate) async fn safekeepers_for_new_timeline(
     717            0 :         &self,
     718            0 :     ) -> Result<Vec<SafekeeperInfo>, ApiError> {
     719            0 :         let mut all_safekeepers = {
     720            0 :             let locked = self.inner.read().unwrap();
     721            0 :             locked
     722            0 :                 .safekeepers
     723            0 :                 .iter()
     724            0 :                 .filter_map(|sk| {
     725            0 :                     if sk.1.scheduling_policy() != SkSchedulingPolicy::Active {
     726              :                         // If we don't want to schedule stuff onto the safekeeper, respect that.
     727            0 :                         return None;
     728            0 :                     }
     729            0 :                     let utilization_opt = if let SafekeeperState::Available {
     730              :                         last_seen_at: _,
     731            0 :                         utilization,
     732            0 :                     } = sk.1.availability()
     733              :                     {
     734            0 :                         Some(utilization)
     735              :                     } else {
     736              :                         // non-available safekeepers still get a chance for new timelines,
     737              :                         // but put them last in the list.
     738            0 :                         None
     739              :                     };
     740            0 :                     let info = SafekeeperInfo {
     741            0 :                         hostname: sk.1.skp.host.clone(),
     742            0 :                         id: NodeId(sk.1.skp.id as u64),
     743            0 :                     };
     744            0 :                     Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone()))
     745            0 :                 })
     746            0 :                 .collect::<Vec<_>>()
     747              :         };
     748            0 :         all_safekeepers.sort_by_key(|sk| {
     749              :             (
     750            0 :                 sk.0.as_ref()
     751            0 :                     .map(|ut| ut.timeline_count)
     752            0 :                     .unwrap_or(u64::MAX),
     753              :                 // Use the id to decide on equal scores for reliability
     754            0 :                 sk.1.id.0,
     755              :             )
     756            0 :         });
     757              :         // Number of safekeepers in different AZs we are looking for
     758            0 :         let wanted_count = self.config.timeline_safekeeper_count;
     759              : 
     760            0 :         let mut sks = Vec::new();
     761            0 :         let mut azs = HashSet::new();
     762            0 :         for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
     763            0 :             if !azs.insert(az_id) {
     764            0 :                 continue;
     765            0 :             }
     766            0 :             sks.push(sk_info.clone());
     767            0 :             if sks.len() == wanted_count {
     768            0 :                 break;
     769            0 :             }
     770              :         }
     771            0 :         if sks.len() == wanted_count {
     772            0 :             Ok(sks)
     773              :         } else {
     774            0 :             Err(ApiError::InternalServerError(anyhow::anyhow!(
     775            0 :                 "couldn't find {wanted_count} safekeepers in different AZs for new timeline (found: {}, total active: {})",
     776            0 :                 sks.len(),
     777            0 :                 all_safekeepers.len(),
     778            0 :             )))
     779              :         }
     780            0 :     }
     781              : 
     782            0 :     pub(crate) async fn safekeepers_list(
     783            0 :         &self,
     784            0 :     ) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
     785            0 :         let locked = self.inner.read().unwrap();
     786            0 :         let mut list = locked
     787            0 :             .safekeepers
     788            0 :             .iter()
     789            0 :             .map(|sk| sk.1.describe_response())
     790            0 :             .collect::<Result<Vec<_>, _>>()?;
     791            0 :         list.sort_by_key(|v| v.id);
     792            0 :         Ok(list)
     793            0 :     }
     794              : 
     795            0 :     pub(crate) async fn get_safekeeper(
     796            0 :         &self,
     797            0 :         id: i64,
     798            0 :     ) -> Result<SafekeeperDescribeResponse, DatabaseError> {
     799            0 :         let locked = self.inner.read().unwrap();
     800            0 :         let sk = locked
     801            0 :             .safekeepers
     802            0 :             .get(&NodeId(id as u64))
     803            0 :             .ok_or(diesel::result::Error::NotFound)?;
     804            0 :         sk.describe_response()
     805            0 :     }
     806              : 
     807            0 :     pub(crate) async fn upsert_safekeeper(
     808            0 :         self: &Arc<Service>,
     809            0 :         record: crate::persistence::SafekeeperUpsert,
     810            0 :     ) -> Result<(), ApiError> {
     811            0 :         let node_id = NodeId(record.id as u64);
     812            0 :         let use_https = self.config.use_https_safekeeper_api;
     813              : 
     814            0 :         if use_https && record.https_port.is_none() {
     815            0 :             return Err(ApiError::PreconditionFailed(
     816            0 :                 format!(
     817            0 :                     "cannot upsert safekeeper {node_id}: \
     818            0 :                     https is enabled, but https port is not specified"
     819            0 :                 )
     820            0 :                 .into(),
     821            0 :             ));
     822            0 :         }
     823              : 
     824            0 :         self.persistence.safekeeper_upsert(record.clone()).await?;
     825              :         {
     826            0 :             let mut locked = self.inner.write().unwrap();
     827            0 :             let mut safekeepers = (*locked.safekeepers).clone();
     828            0 :             match safekeepers.entry(node_id) {
     829            0 :                 std::collections::hash_map::Entry::Occupied(mut entry) => entry
     830            0 :                     .get_mut()
     831            0 :                     .update_from_record(record)
     832            0 :                     .expect("all preconditions should be checked before upsert to database"),
     833            0 :                 std::collections::hash_map::Entry::Vacant(entry) => {
     834            0 :                     entry.insert(
     835            0 :                         Safekeeper::from_persistence(
     836            0 :                             crate::persistence::SafekeeperPersistence::from_upsert(
     837            0 :                                 record,
     838            0 :                                 SkSchedulingPolicy::Activating,
     839            0 :                             ),
     840            0 :                             CancellationToken::new(),
     841            0 :                             use_https,
     842            0 :                         )
     843            0 :                         .expect("all preconditions should be checked before upsert to database"),
     844            0 :                     );
     845            0 :                 }
     846              :             }
     847            0 :             locked
     848            0 :                 .safekeeper_reconcilers
     849            0 :                 .start_reconciler(node_id, self);
     850            0 :             locked.safekeepers = Arc::new(safekeepers);
     851            0 :             metrics::METRICS_REGISTRY
     852            0 :                 .metrics_group
     853            0 :                 .storage_controller_safekeeper_nodes
     854            0 :                 .set(locked.safekeepers.len() as i64);
     855            0 :             metrics::METRICS_REGISTRY
     856            0 :                 .metrics_group
     857            0 :                 .storage_controller_https_safekeeper_nodes
     858            0 :                 .set(
     859            0 :                     locked
     860            0 :                         .safekeepers
     861            0 :                         .values()
     862            0 :                         .filter(|s| s.has_https_port())
     863            0 :                         .count() as i64,
     864              :                 );
     865              :         }
     866            0 :         Ok(())
     867            0 :     }
     868              : 
     869            0 :     pub(crate) async fn set_safekeeper_scheduling_policy(
     870            0 :         self: &Arc<Service>,
     871            0 :         id: i64,
     872            0 :         scheduling_policy: SkSchedulingPolicy,
     873            0 :     ) -> Result<(), DatabaseError> {
     874            0 :         self.persistence
     875            0 :             .set_safekeeper_scheduling_policy(id, scheduling_policy)
     876            0 :             .await?;
     877            0 :         let node_id = NodeId(id as u64);
     878              :         // After the change has been persisted successfully, update the in-memory state
     879            0 :         self.set_safekeeper_scheduling_policy_in_mem(node_id, scheduling_policy)
     880            0 :             .await
     881            0 :     }
     882              : 
     883            0 :     pub(crate) async fn set_safekeeper_scheduling_policy_in_mem(
     884            0 :         self: &Arc<Service>,
     885            0 :         node_id: NodeId,
     886            0 :         scheduling_policy: SkSchedulingPolicy,
     887            0 :     ) -> Result<(), DatabaseError> {
     888            0 :         let mut locked = self.inner.write().unwrap();
     889            0 :         let mut safekeepers = (*locked.safekeepers).clone();
     890            0 :         let sk = safekeepers
     891            0 :             .get_mut(&node_id)
     892            0 :             .ok_or(DatabaseError::Logical("Not found".to_string()))?;
     893            0 :         sk.set_scheduling_policy(scheduling_policy);
     894              : 
     895            0 :         match scheduling_policy {
     896            0 :             SkSchedulingPolicy::Active => {
     897            0 :                 locked
     898            0 :                     .safekeeper_reconcilers
     899            0 :                     .start_reconciler(node_id, self);
     900            0 :             }
     901              :             SkSchedulingPolicy::Decomissioned
     902              :             | SkSchedulingPolicy::Pause
     903            0 :             | SkSchedulingPolicy::Activating => {
     904            0 :                 locked.safekeeper_reconcilers.stop_reconciler(node_id);
     905            0 :             }
     906              :         }
     907              : 
     908            0 :         locked.safekeepers = Arc::new(safekeepers);
     909            0 :         Ok(())
     910            0 :     }
     911              : 
     912              :     /// Call `switch_timeline_membership` on all safekeepers with retries
     913              :     /// till the quorum of successful responses is reached.
     914              :     ///
     915              :     /// If min_position is not None, validates that majority of safekeepers
     916              :     /// reached at least min_position.
     917              :     ///
     918              :     /// If update_notified_generation is set, also updates sk_set_notified_generation
     919              :     /// in the timelines table.
     920              :     ///
     921              :     /// Return responses from safekeepers in the input order.
     922            0 :     async fn tenant_timeline_set_membership_quorum(
     923            0 :         self: &Arc<Self>,
     924            0 :         tenant_id: TenantId,
     925            0 :         timeline_id: TimelineId,
     926            0 :         safekeepers: &[Safekeeper],
     927            0 :         mconf: &membership::Configuration,
     928            0 :         min_position: Option<(Term, Lsn)>,
     929            0 :         update_notified_generation: bool,
     930            0 :     ) -> Result<Vec<mgmt_api::Result<TimelineMembershipSwitchResponse>>, ApiError> {
     931            0 :         let req = TimelineMembershipSwitchRequest {
     932            0 :             mconf: mconf.clone(),
     933            0 :         };
     934              : 
     935              :         const SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
     936              : 
     937            0 :         let results = self
     938            0 :             .tenant_timeline_safekeeper_op_quorum(
     939            0 :                 safekeepers,
     940            0 :                 move |client| {
     941            0 :                     let req = req.clone();
     942            0 :                     async move {
     943            0 :                         let mut res = client
     944            0 :                             .switch_timeline_membership(tenant_id, timeline_id, &req)
     945            0 :                             .await;
     946              : 
     947              :                         // If min_position is not reached, map the response to an error,
     948              :                         // so it isn't counted toward the quorum.
     949            0 :                         if let Some(min_position) = min_position {
     950            0 :                             if let Ok(ok_res) = &res {
     951            0 :                                 if (ok_res.last_log_term, ok_res.flush_lsn) < min_position {
     952            0 :                                     // Use Error::Timeout to make this error retriable.
     953            0 :                                     res = Err(mgmt_api::Error::Timeout(
     954            0 :                                         format!(
     955            0 :                                         "safekeeper {} returned position {:?} which is less than minimum required position {:?}",
     956            0 :                                         client.node_id_label(),
     957            0 :                                         (ok_res.last_log_term, ok_res.flush_lsn),
     958            0 :                                         min_position
     959            0 :                                         )
     960            0 :                                     ));
     961            0 :                                 }
     962            0 :                             }
     963            0 :                         }
     964              : 
     965            0 :                         res
     966            0 :                     }
     967            0 :                 },
     968              :                 SK_SET_MEM_TIMELINE_RECONCILE_TIMEOUT,
     969              :             )
     970            0 :             .await?;
     971              : 
     972            0 :         for res in results.iter().flatten() {
     973            0 :             if res.current_conf.generation > mconf.generation {
     974              :                 // Antoher switch_membership raced us.
     975            0 :                 return Err(ApiError::Conflict(format!(
     976            0 :                     "received configuration with generation {} from safekeeper, but expected {}",
     977            0 :                     res.current_conf.generation, mconf.generation
     978            0 :                 )));
     979            0 :             } else if res.current_conf.generation < mconf.generation {
     980              :                 // Note: should never happen.
     981              :                 // If we get a response, it should be at least the sent generation.
     982            0 :                 tracing::error!(
     983            0 :                     "received configuration with generation {} from safekeeper, but expected {}",
     984              :                     res.current_conf.generation,
     985              :                     mconf.generation
     986              :                 );
     987            0 :                 return Err(ApiError::InternalServerError(anyhow::anyhow!(
     988            0 :                     "received configuration with generation {} from safekeeper, but expected {}",
     989            0 :                     res.current_conf.generation,
     990            0 :                     mconf.generation
     991            0 :                 )));
     992            0 :             }
     993              :         }
     994              : 
     995            0 :         if update_notified_generation {
     996            0 :             self.persistence
     997            0 :                 .update_sk_set_notified_generation(tenant_id, timeline_id, mconf.generation)
     998            0 :                 .await?;
     999            0 :         }
    1000              : 
    1001            0 :         Ok(results)
    1002            0 :     }
    1003              : 
    1004              :     /// Pull timeline to to_safekeepers from from_safekeepers with retries.
    1005              :     ///
    1006              :     /// Returns Ok(()) only if all the pull_timeline requests were successful.
    1007            0 :     async fn tenant_timeline_pull_from_peers(
    1008            0 :         self: &Arc<Self>,
    1009            0 :         tenant_id: TenantId,
    1010            0 :         timeline_id: TimelineId,
    1011            0 :         to_safekeepers: &[Safekeeper],
    1012            0 :         from_safekeepers: &[Safekeeper],
    1013            0 :         mconf: membership::Configuration,
    1014            0 :     ) -> Result<(), ApiError> {
    1015            0 :         let http_hosts = from_safekeepers
    1016            0 :             .iter()
    1017            0 :             .map(|sk| sk.base_url())
    1018            0 :             .collect::<Vec<_>>();
    1019              : 
    1020            0 :         tracing::info!(
    1021            0 :             "pulling timeline to {:?} from {:?}",
    1022            0 :             to_safekeepers
    1023            0 :                 .iter()
    1024            0 :                 .map(|sk| sk.get_id())
    1025            0 :                 .collect::<Vec<_>>(),
    1026            0 :             from_safekeepers
    1027            0 :                 .iter()
    1028            0 :                 .map(|sk| sk.get_id())
    1029            0 :                 .collect::<Vec<_>>()
    1030              :         );
    1031              : 
    1032            0 :         let req = PullTimelineRequest {
    1033            0 :             tenant_id,
    1034            0 :             timeline_id,
    1035            0 :             http_hosts,
    1036            0 :             mconf: Some(mconf),
    1037            0 :         };
    1038              : 
    1039              :         const SK_PULL_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
    1040            0 :         let max_retries = 3;
    1041              : 
    1042            0 :         let responses = self
    1043            0 :             .tenant_timeline_safekeeper_op(
    1044            0 :                 to_safekeepers,
    1045            0 :                 move |client| {
    1046            0 :                     let req = req.clone();
    1047            0 :                     async move { client.pull_timeline(&req).await }
    1048            0 :                 },
    1049            0 :                 max_retries,
    1050              :                 SK_PULL_TIMELINE_RECONCILE_TIMEOUT,
    1051            0 :                 None,
    1052              :             )
    1053            0 :             .await?;
    1054              : 
    1055            0 :         if let Some((idx, err)) = responses
    1056            0 :             .iter()
    1057            0 :             .enumerate()
    1058            0 :             .find_map(|(idx, res)| Some((idx, res.as_ref().err()?)))
    1059              :         {
    1060            0 :             let sk_id = to_safekeepers[idx].get_id();
    1061            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1062            0 :                 "pull_timeline to {sk_id} failed: {err}",
    1063            0 :             )));
    1064            0 :         }
    1065              : 
    1066            0 :         Ok(())
    1067            0 :     }
    1068              : 
    1069              :     /// Exclude a timeline from safekeepers in parallel with retries.
    1070              :     ///
    1071              :     /// Assumes that the exclude requests are already persistent in the database.
    1072              :     ///
    1073              :     /// The function does best effort: if an exclude request is unsuccessful,
    1074              :     /// it will be added to the in-memory reconciler, and the function will succeed anyway.
    1075              :     ///
    1076              :     /// Might fail if there is error accessing the database.
    1077            0 :     async fn tenant_timeline_safekeeper_exclude_reconcile(
    1078            0 :         self: &Arc<Self>,
    1079            0 :         tenant_id: TenantId,
    1080            0 :         timeline_id: TimelineId,
    1081            0 :         safekeepers: &[Safekeeper],
    1082            0 :         mconf: &membership::Configuration,
    1083            0 :     ) -> Result<(), ApiError> {
    1084            0 :         let req = TimelineMembershipSwitchRequest {
    1085            0 :             mconf: mconf.clone(),
    1086            0 :         };
    1087              : 
    1088              :         const SK_EXCLUDE_TIMELINE_TIMEOUT: Duration = Duration::from_secs(30);
    1089              :         // Do not retry failed requests to speed up the finishing phase.
    1090              :         // They will be retried in the reconciler.
    1091            0 :         let max_retries = 0;
    1092              : 
    1093            0 :         let results = self
    1094            0 :             .tenant_timeline_safekeeper_op(
    1095            0 :                 safekeepers,
    1096            0 :                 move |client| {
    1097            0 :                     let req = req.clone();
    1098            0 :                     async move { client.exclude_timeline(tenant_id, timeline_id, &req).await }
    1099            0 :                 },
    1100            0 :                 max_retries,
    1101              :                 SK_EXCLUDE_TIMELINE_TIMEOUT,
    1102            0 :                 None,
    1103              :             )
    1104            0 :             .await?;
    1105              : 
    1106            0 :         let mut reconcile_requests = Vec::new();
    1107              : 
    1108            0 :         fail::fail_point!("sk-migration-step-9-mid-exclude", |_| {
    1109            0 :             Err(ApiError::BadRequest(anyhow::anyhow!(
    1110            0 :                 "failpoint sk-migration-step-9-mid-exclude"
    1111            0 :             )))
    1112            0 :         });
    1113              : 
    1114            0 :         for (idx, res) in results.iter().enumerate() {
    1115            0 :             let sk_id = safekeepers[idx].skp.id;
    1116            0 :             let generation = mconf.generation.into_inner();
    1117              : 
    1118            0 :             if res.is_ok() {
    1119            0 :                 self.persistence
    1120            0 :                     .remove_pending_op(
    1121            0 :                         tenant_id,
    1122            0 :                         Some(timeline_id),
    1123            0 :                         NodeId(sk_id as u64),
    1124            0 :                         generation,
    1125            0 :                     )
    1126            0 :                     .await?;
    1127            0 :             } else {
    1128            0 :                 let req = ScheduleRequest {
    1129            0 :                     safekeeper: Box::new(safekeepers[idx].clone()),
    1130            0 :                     host_list: Vec::new(),
    1131            0 :                     tenant_id,
    1132            0 :                     timeline_id: Some(timeline_id),
    1133            0 :                     generation,
    1134            0 :                     kind: SafekeeperTimelineOpKind::Exclude,
    1135            0 :                 };
    1136            0 :                 reconcile_requests.push(req);
    1137            0 :             }
    1138              :         }
    1139              : 
    1140            0 :         if !reconcile_requests.is_empty() {
    1141            0 :             let locked = self.inner.read().unwrap();
    1142            0 :             for req in reconcile_requests {
    1143            0 :                 locked.safekeeper_reconcilers.schedule_request(req);
    1144            0 :             }
    1145            0 :         }
    1146              : 
    1147            0 :         Ok(())
    1148            0 :     }
    1149              : 
    1150              :     /// Migrate timeline safekeeper set to a new set.
    1151              :     ///
    1152              :     /// This function implements an algorithm from RFC-035.
    1153              :     /// <https://github.com/neondatabase/neon/blob/main/docs/rfcs/035-safekeeper-dynamic-membership-change.md>
    1154            0 :     pub(crate) async fn tenant_timeline_safekeeper_migrate(
    1155            0 :         self: &Arc<Self>,
    1156            0 :         tenant_id: TenantId,
    1157            0 :         timeline_id: TimelineId,
    1158            0 :         req: TimelineSafekeeperMigrateRequest,
    1159            0 :     ) -> Result<(), ApiError> {
    1160            0 :         let all_safekeepers = self.inner.read().unwrap().safekeepers.clone();
    1161              : 
    1162            0 :         let new_sk_set = req.new_sk_set;
    1163              : 
    1164            0 :         for sk_id in new_sk_set.iter() {
    1165            0 :             if !all_safekeepers.contains_key(sk_id) {
    1166            0 :                 return Err(ApiError::BadRequest(anyhow::anyhow!(
    1167            0 :                     "safekeeper {sk_id} does not exist"
    1168            0 :                 )));
    1169            0 :             }
    1170              :         }
    1171              : 
    1172            0 :         if new_sk_set.is_empty() {
    1173            0 :             return Err(ApiError::BadRequest(anyhow::anyhow!(
    1174            0 :                 "new safekeeper set is empty"
    1175            0 :             )));
    1176            0 :         }
    1177              : 
    1178            0 :         if new_sk_set.len() < self.config.timeline_safekeeper_count {
    1179            0 :             return Err(ApiError::BadRequest(anyhow::anyhow!(
    1180            0 :                 "new safekeeper set must have at least {} safekeepers",
    1181            0 :                 self.config.timeline_safekeeper_count
    1182            0 :             )));
    1183            0 :         }
    1184              : 
    1185            0 :         let new_sk_set_i64 = new_sk_set.iter().map(|id| id.0 as i64).collect::<Vec<_>>();
    1186            0 :         let new_safekeepers = self.get_safekeepers(&new_sk_set_i64)?;
    1187              :         // Construct new member set in advance to validate it.
    1188              :         // E.g. validates that there is no duplicate safekeepers.
    1189            0 :         let new_sk_member_set =
    1190            0 :             Self::make_member_set(&new_safekeepers).map_err(ApiError::BadRequest)?;
    1191              : 
    1192              :         // TODO(diko): per-tenant lock is too wide. Consider introducing per-timeline locks.
    1193            0 :         let _tenant_lock = trace_shared_lock(
    1194            0 :             &self.tenant_op_locks,
    1195            0 :             tenant_id,
    1196            0 :             TenantOperations::TimelineSafekeeperMigrate,
    1197            0 :         )
    1198            0 :         .await;
    1199              : 
    1200              :         // 1. Fetch current timeline configuration from the configuration storage.
    1201              : 
    1202            0 :         let timeline = self
    1203            0 :             .persistence
    1204            0 :             .get_timeline(tenant_id, timeline_id)
    1205            0 :             .await?;
    1206              : 
    1207            0 :         let Some(timeline) = timeline else {
    1208            0 :             return Err(ApiError::NotFound(
    1209            0 :                 anyhow::anyhow!(
    1210            0 :                     "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table"
    1211            0 :                 )
    1212            0 :                 .into(),
    1213            0 :             ));
    1214              :         };
    1215              : 
    1216            0 :         let cur_sk_set = timeline
    1217            0 :             .sk_set
    1218            0 :             .iter()
    1219            0 :             .map(|&id| NodeId(id as u64))
    1220            0 :             .collect::<Vec<_>>();
    1221              : 
    1222              :         // Validate that we are not migrating to a decomissioned safekeeper.
    1223            0 :         for sk in new_safekeepers.iter() {
    1224            0 :             if !cur_sk_set.contains(&sk.get_id())
    1225            0 :                 && sk.scheduling_policy() == SkSchedulingPolicy::Decomissioned
    1226              :             {
    1227            0 :                 return Err(ApiError::BadRequest(anyhow::anyhow!(
    1228            0 :                     "safekeeper {} is decomissioned",
    1229            0 :                     sk.get_id()
    1230            0 :                 )));
    1231            0 :             }
    1232              :         }
    1233              : 
    1234            0 :         tracing::info!(
    1235              :             ?cur_sk_set,
    1236              :             ?new_sk_set,
    1237            0 :             "Migrating timeline to new safekeeper set",
    1238              :         );
    1239              : 
    1240            0 :         let mut generation = SafekeeperGeneration::new(timeline.generation as u32);
    1241              : 
    1242            0 :         if let Some(ref presistent_new_sk_set) = timeline.new_sk_set {
    1243              :             // 2. If it is already joint one and new_set is different from desired_set refuse to change.
    1244            0 :             if presistent_new_sk_set
    1245            0 :                 .iter()
    1246            0 :                 .map(|&id| NodeId(id as u64))
    1247            0 :                 .ne(new_sk_set.iter().cloned())
    1248              :             {
    1249            0 :                 tracing::info!(
    1250              :                     ?presistent_new_sk_set,
    1251              :                     ?new_sk_set,
    1252            0 :                     "different new safekeeper set is already set in the database",
    1253              :                 );
    1254            0 :                 return Err(ApiError::Conflict(format!(
    1255            0 :                     "the timeline is already migrating to a different safekeeper set: {presistent_new_sk_set:?}"
    1256            0 :                 )));
    1257            0 :             }
    1258              :             // It it is the same new_sk_set, we can continue the migration (retry).
    1259              :         } else {
    1260            0 :             let prev_finished = timeline.cplane_notified_generation == timeline.generation
    1261            0 :                 && timeline.sk_set_notified_generation == timeline.generation;
    1262              : 
    1263            0 :             if !prev_finished {
    1264              :                 // The previous migration is committed, but the finish step failed.
    1265              :                 // Safekeepers/cplane might not know about the last membership configuration.
    1266              :                 // Retry the finish step to ensure smooth migration.
    1267            0 :                 self.finish_safekeeper_migration_retry(tenant_id, timeline_id, &timeline)
    1268            0 :                     .await?;
    1269            0 :             }
    1270              : 
    1271            0 :             if cur_sk_set == new_sk_set {
    1272            0 :                 tracing::info!("timeline is already at the desired safekeeper set");
    1273            0 :                 return Ok(());
    1274            0 :             }
    1275              : 
    1276              :             // 3. No active migration yet.
    1277              :             // Increment current generation and put desired_set to new_sk_set.
    1278            0 :             generation = generation.next();
    1279              : 
    1280            0 :             self.persistence
    1281            0 :                 .update_timeline_membership(
    1282            0 :                     tenant_id,
    1283            0 :                     timeline_id,
    1284            0 :                     generation,
    1285            0 :                     &cur_sk_set,
    1286            0 :                     Some(&new_sk_set),
    1287            0 :                     &[],
    1288            0 :                 )
    1289            0 :                 .await?;
    1290              : 
    1291            0 :             fail::fail_point!("sk-migration-after-step-3", |_| {
    1292            0 :                 Err(ApiError::BadRequest(anyhow::anyhow!(
    1293            0 :                     "failpoint sk-migration-after-step-3"
    1294            0 :                 )))
    1295            0 :             });
    1296              :         }
    1297              : 
    1298            0 :         let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
    1299            0 :         let cur_sk_member_set =
    1300            0 :             Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
    1301              : 
    1302            0 :         let joint_config = membership::Configuration {
    1303            0 :             generation,
    1304            0 :             members: cur_sk_member_set,
    1305            0 :             new_members: Some(new_sk_member_set.clone()),
    1306            0 :         };
    1307              : 
    1308              :         // 4. Call PUT configuration on safekeepers from the current set,
    1309              :         // delivering them joint_conf.
    1310              : 
    1311              :         // Notify cplane/compute about the membership change BEFORE changing the membership on safekeepers.
    1312              :         // This way the compute will know about new safekeepers from joint_config before we require to
    1313              :         // collect a quorum from them.
    1314            0 :         self.cplane_notify_safekeepers(tenant_id, timeline_id, &joint_config)
    1315            0 :             .await?;
    1316              : 
    1317            0 :         let results = self
    1318            0 :             .tenant_timeline_set_membership_quorum(
    1319            0 :                 tenant_id,
    1320            0 :                 timeline_id,
    1321            0 :                 &cur_safekeepers,
    1322            0 :                 &joint_config,
    1323            0 :                 None, // no min position
    1324            0 :                 true, // update notified generation
    1325            0 :             )
    1326            0 :             .await?;
    1327              : 
    1328            0 :         let mut sync_position = (INITIAL_TERM, Lsn::INVALID);
    1329            0 :         for res in results.into_iter().flatten() {
    1330            0 :             let sk_position = (res.last_log_term, res.flush_lsn);
    1331            0 :             if sync_position < sk_position {
    1332            0 :                 sync_position = sk_position;
    1333            0 :             }
    1334              :         }
    1335              : 
    1336            0 :         tracing::info!(
    1337              :             %generation,
    1338              :             ?sync_position,
    1339            0 :             "safekeepers set membership updated",
    1340              :         );
    1341              : 
    1342            0 :         fail::fail_point!("sk-migration-after-step-4", |_| {
    1343            0 :             Err(ApiError::BadRequest(anyhow::anyhow!(
    1344            0 :                 "failpoint sk-migration-after-step-4"
    1345            0 :             )))
    1346            0 :         });
    1347              : 
    1348              :         // 5. Initialize timeline on safekeeper(s) from new_sk_set where it doesn't exist yet
    1349              :         // by doing pull_timeline from the majority of the current set.
    1350              : 
    1351              :         // Filter out safekeepers which are already in the current set.
    1352            0 :         let from_ids: HashSet<NodeId> = cur_safekeepers.iter().map(|sk| sk.get_id()).collect();
    1353            0 :         let pull_to_safekeepers = new_safekeepers
    1354            0 :             .iter()
    1355            0 :             .filter(|sk| !from_ids.contains(&sk.get_id()))
    1356            0 :             .cloned()
    1357            0 :             .collect::<Vec<_>>();
    1358              : 
    1359            0 :         self.tenant_timeline_pull_from_peers(
    1360            0 :             tenant_id,
    1361            0 :             timeline_id,
    1362            0 :             &pull_to_safekeepers,
    1363            0 :             &cur_safekeepers,
    1364            0 :             joint_config.clone(),
    1365            0 :         )
    1366            0 :         .await?;
    1367              : 
    1368            0 :         fail::fail_point!("sk-migration-after-step-5", |_| {
    1369            0 :             Err(ApiError::BadRequest(anyhow::anyhow!(
    1370            0 :                 "failpoint sk-migration-after-step-5"
    1371            0 :             )))
    1372            0 :         });
    1373              : 
    1374              :         // 6. Call POST bump_term(sync_term) on safekeepers from the new set. Success on majority is enough.
    1375              : 
    1376              :         // TODO(diko): do we need to bump timeline term?
    1377              : 
    1378              :         // 7. Repeatedly call PUT configuration on safekeepers from the new set,
    1379              :         // delivering them joint_conf and collecting their positions.
    1380              : 
    1381            0 :         tracing::info!(?sync_position, "waiting for safekeepers to sync position");
    1382              : 
    1383            0 :         self.tenant_timeline_set_membership_quorum(
    1384            0 :             tenant_id,
    1385            0 :             timeline_id,
    1386            0 :             &new_safekeepers,
    1387            0 :             &joint_config,
    1388            0 :             Some(sync_position),
    1389            0 :             false, // we're just waiting for sync position, don't update notified generation
    1390            0 :         )
    1391            0 :         .await?;
    1392              : 
    1393            0 :         fail::fail_point!("sk-migration-after-step-7", |_| {
    1394            0 :             Err(ApiError::BadRequest(anyhow::anyhow!(
    1395            0 :                 "failpoint sk-migration-after-step-7"
    1396            0 :             )))
    1397            0 :         });
    1398              : 
    1399              :         // 8. Create new_conf: Configuration incrementing joint_conf generation and
    1400              :         // having new safekeeper set as sk_set and None new_sk_set.
    1401              : 
    1402            0 :         let generation = generation.next();
    1403              : 
    1404            0 :         let new_conf = membership::Configuration {
    1405            0 :             generation,
    1406            0 :             members: new_sk_member_set,
    1407            0 :             new_members: None,
    1408            0 :         };
    1409              : 
    1410            0 :         let new_ids: HashSet<NodeId> = new_safekeepers.iter().map(|sk| sk.get_id()).collect();
    1411            0 :         let exclude_safekeepers = cur_safekeepers
    1412            0 :             .into_iter()
    1413            0 :             .filter(|sk| !new_ids.contains(&sk.get_id()))
    1414            0 :             .collect::<Vec<_>>();
    1415            0 :         let exclude_requests = exclude_safekeepers
    1416            0 :             .iter()
    1417            0 :             .map(|sk| TimelinePendingOpPersistence {
    1418            0 :                 sk_id: sk.skp.id,
    1419            0 :                 tenant_id: tenant_id.to_string(),
    1420            0 :                 timeline_id: timeline_id.to_string(),
    1421            0 :                 generation: generation.into_inner() as i32,
    1422            0 :                 op_kind: SafekeeperTimelineOpKind::Exclude,
    1423            0 :             })
    1424            0 :             .collect::<Vec<_>>();
    1425              : 
    1426            0 :         self.persistence
    1427            0 :             .update_timeline_membership(
    1428            0 :                 tenant_id,
    1429            0 :                 timeline_id,
    1430            0 :                 generation,
    1431            0 :                 &new_sk_set,
    1432            0 :                 None,
    1433            0 :                 &exclude_requests,
    1434            0 :             )
    1435            0 :             .await?;
    1436              : 
    1437            0 :         fail::fail_point!("sk-migration-after-step-8", |_| {
    1438            0 :             Err(ApiError::BadRequest(anyhow::anyhow!(
    1439            0 :                 "failpoint sk-migration-after-step-8"
    1440            0 :             )))
    1441            0 :         });
    1442              : 
    1443              :         // At this point we have already updated the timeline in the database, so the final
    1444              :         // membership configuration is commited and the migration is not abortable anymore.
    1445              :         // But safekeepers and cplane/compute still need to be notified about the new configuration.
    1446              :         // The [`Self::finish_safekeeper_migration`] does exactly that: notifies everyone about
    1447              :         // the new configuration and reconciles excluded safekeepers.
    1448              :         // If it fails, the safkeeper migration call should be retried.
    1449              : 
    1450            0 :         self.finish_safekeeper_migration(
    1451            0 :             tenant_id,
    1452            0 :             timeline_id,
    1453            0 :             &new_safekeepers,
    1454            0 :             &new_conf,
    1455            0 :             &exclude_safekeepers,
    1456            0 :         )
    1457            0 :         .await?;
    1458              : 
    1459            0 :         Ok(())
    1460            0 :     }
    1461              : 
    1462              :     /// Notify cplane about safekeeper membership change.
    1463              :     /// The cplane will receive a joint set of safekeepers as a safekeeper list.
    1464            0 :     async fn cplane_notify_safekeepers(
    1465            0 :         &self,
    1466            0 :         tenant_id: TenantId,
    1467            0 :         timeline_id: TimelineId,
    1468            0 :         mconf: &membership::Configuration,
    1469            0 :     ) -> Result<(), ApiError> {
    1470            0 :         let mut safekeepers = Vec::new();
    1471            0 :         let mut ids: HashSet<_> = HashSet::new();
    1472              : 
    1473            0 :         for member in mconf
    1474            0 :             .members
    1475            0 :             .m
    1476            0 :             .iter()
    1477            0 :             .chain(mconf.new_members.iter().flat_map(|m| m.m.iter()))
    1478              :         {
    1479            0 :             if ids.insert(member.id) {
    1480            0 :                 safekeepers.push(compute_hook::SafekeeperInfo {
    1481            0 :                     id: member.id,
    1482            0 :                     hostname: Some(member.host.clone()),
    1483            0 :                 });
    1484            0 :             }
    1485              :         }
    1486              : 
    1487            0 :         self.compute_hook
    1488            0 :             .notify_safekeepers(
    1489            0 :                 compute_hook::SafekeepersUpdate {
    1490            0 :                     tenant_id,
    1491            0 :                     timeline_id,
    1492            0 :                     generation: mconf.generation,
    1493            0 :                     safekeepers,
    1494            0 :                 },
    1495            0 :                 &self.cancel,
    1496            0 :             )
    1497            0 :             .await
    1498            0 :             .map_err(|err| {
    1499            0 :                 ApiError::InternalServerError(anyhow::anyhow!(
    1500            0 :                     "failed to notify cplane about safekeeper membership change: {err}"
    1501            0 :                 ))
    1502            0 :             })?;
    1503              : 
    1504            0 :         self.persistence
    1505            0 :             .update_cplane_notified_generation(tenant_id, timeline_id, mconf.generation)
    1506            0 :             .await?;
    1507              : 
    1508            0 :         Ok(())
    1509            0 :     }
    1510              : 
    1511              :     /// Finish safekeeper migration.
    1512              :     ///
    1513              :     /// It is the last step of the safekeeper migration.
    1514              :     ///
    1515              :     /// Notifies safekeepers and cplane about the final membership configuration,
    1516              :     /// reconciles excluded safekeepers and updates *_notified_generation in the database.
    1517            0 :     async fn finish_safekeeper_migration(
    1518            0 :         self: &Arc<Self>,
    1519            0 :         tenant_id: TenantId,
    1520            0 :         timeline_id: TimelineId,
    1521            0 :         new_safekeepers: &[Safekeeper],
    1522            0 :         new_conf: &membership::Configuration,
    1523            0 :         exclude_safekeepers: &[Safekeeper],
    1524            0 :     ) -> Result<(), ApiError> {
    1525              :         // 9. Call PUT configuration on safekeepers from the new set, delivering them new_conf.
    1526              :         // Also try to exclude safekeepers and notify cplane about the membership change.
    1527              : 
    1528            0 :         self.tenant_timeline_set_membership_quorum(
    1529            0 :             tenant_id,
    1530            0 :             timeline_id,
    1531            0 :             new_safekeepers,
    1532            0 :             new_conf,
    1533            0 :             None, // no min position
    1534            0 :             true, // update notified generation
    1535            0 :         )
    1536            0 :         .await?;
    1537              : 
    1538            0 :         fail::fail_point!("sk-migration-step-9-after-set-membership", |_| {
    1539            0 :             Err(ApiError::BadRequest(anyhow::anyhow!(
    1540            0 :                 "failpoint sk-migration-step-9-after-set-membership"
    1541            0 :             )))
    1542            0 :         });
    1543              : 
    1544            0 :         self.tenant_timeline_safekeeper_exclude_reconcile(
    1545            0 :             tenant_id,
    1546            0 :             timeline_id,
    1547            0 :             exclude_safekeepers,
    1548            0 :             new_conf,
    1549            0 :         )
    1550            0 :         .await?;
    1551              : 
    1552            0 :         fail::fail_point!("sk-migration-step-9-after-exclude", |_| {
    1553            0 :             Err(ApiError::BadRequest(anyhow::anyhow!(
    1554            0 :                 "failpoint sk-migration-step-9-after-exclude"
    1555            0 :             )))
    1556            0 :         });
    1557              : 
    1558              :         // Notify cplane/compute about the membership change AFTER changing the membership on safekeepers.
    1559              :         // This way the compute will stop talking to excluded safekeepers only after we stop requiring to
    1560              :         // collect a quorum from them.
    1561            0 :         self.cplane_notify_safekeepers(tenant_id, timeline_id, new_conf)
    1562            0 :             .await?;
    1563              : 
    1564            0 :         fail::fail_point!("sk-migration-after-step-9", |_| {
    1565            0 :             Err(ApiError::BadRequest(anyhow::anyhow!(
    1566            0 :                 "failpoint sk-migration-after-step-9"
    1567            0 :             )))
    1568            0 :         });
    1569              : 
    1570            0 :         Ok(())
    1571            0 :     }
    1572              : 
    1573              :     /// Same as [`Self::finish_safekeeper_migration`], but restores the migration state from the database.
    1574              :     /// It's used when the migration failed during the finish step and we need to retry it.
    1575            0 :     async fn finish_safekeeper_migration_retry(
    1576            0 :         self: &Arc<Self>,
    1577            0 :         tenant_id: TenantId,
    1578            0 :         timeline_id: TimelineId,
    1579            0 :         timeline: &TimelinePersistence,
    1580            0 :     ) -> Result<(), ApiError> {
    1581            0 :         if timeline.new_sk_set.is_some() {
    1582              :             // Logical error, should never happen.
    1583            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1584            0 :                 "can't finish timeline migration for {tenant_id}/{timeline_id}: new_sk_set is not None"
    1585            0 :             )));
    1586            0 :         }
    1587              : 
    1588            0 :         let cur_safekeepers = self.get_safekeepers(&timeline.sk_set)?;
    1589            0 :         let cur_sk_member_set =
    1590            0 :             Self::make_member_set(&cur_safekeepers).map_err(ApiError::InternalServerError)?;
    1591              : 
    1592            0 :         let mconf = membership::Configuration {
    1593            0 :             generation: SafekeeperGeneration::new(timeline.generation as u32),
    1594            0 :             members: cur_sk_member_set,
    1595            0 :             new_members: None,
    1596            0 :         };
    1597              : 
    1598              :         // We might have failed between commiting reconciliation requests and adding them to the in-memory reconciler.
    1599              :         // Reload them from the database.
    1600            0 :         let pending_ops = self
    1601            0 :             .persistence
    1602            0 :             .list_pending_ops_for_timeline(tenant_id, timeline_id)
    1603            0 :             .await?;
    1604              : 
    1605            0 :         let mut exclude_sk_ids = Vec::new();
    1606              : 
    1607            0 :         for op in pending_ops {
    1608            0 :             if op.op_kind == SafekeeperTimelineOpKind::Exclude
    1609            0 :                 && op.generation == timeline.generation
    1610            0 :             {
    1611            0 :                 exclude_sk_ids.push(op.sk_id);
    1612            0 :             }
    1613              :         }
    1614              : 
    1615            0 :         let exclude_safekeepers = self.get_safekeepers(&exclude_sk_ids)?;
    1616              : 
    1617            0 :         self.finish_safekeeper_migration(
    1618            0 :             tenant_id,
    1619            0 :             timeline_id,
    1620            0 :             &cur_safekeepers,
    1621            0 :             &mconf,
    1622            0 :             &exclude_safekeepers,
    1623            0 :         )
    1624            0 :         .await?;
    1625              : 
    1626            0 :         Ok(())
    1627            0 :     }
    1628              : }
        

Generated by: LCOV version 2.1-beta