LCOV - code coverage report
Current view: top level - storage_controller/src/service - safekeeper_service.rs (source / functions) Coverage Total Hit
Test: 553e39c2773e5840c720c90d86e56f89a4330d43.info Lines: 0.0 % 602 0
Test Date: 2025-06-13 20:01:21 Functions: 0.0 % 49 0

            Line data    Source code
       1              : use std::collections::HashSet;
       2              : use std::str::FromStr;
       3              : use std::sync::Arc;
       4              : use std::time::Duration;
       5              : 
       6              : use super::safekeeper_reconciler::ScheduleRequest;
       7              : use crate::heartbeater::SafekeeperState;
       8              : use crate::metrics;
       9              : use crate::persistence::{
      10              :     DatabaseError, SafekeeperTimelineOpKind, TimelinePendingOpPersistence, TimelinePersistence,
      11              : };
      12              : use crate::safekeeper::Safekeeper;
      13              : use crate::timeline_import::TimelineImportFinalizeError;
      14              : use anyhow::Context;
      15              : use http_utils::error::ApiError;
      16              : use pageserver_api::controller_api::{
      17              :     SafekeeperDescribeResponse, SkSchedulingPolicy, TimelineImportRequest,
      18              : };
      19              : use pageserver_api::models::{SafekeeperInfo, SafekeepersInfo, TimelineInfo};
      20              : use safekeeper_api::membership::{MemberSet, SafekeeperGeneration, SafekeeperId};
      21              : use tokio::task::JoinSet;
      22              : use tokio_util::sync::CancellationToken;
      23              : use utils::id::{NodeId, TenantId, TimelineId};
      24              : use utils::logging::SecretString;
      25              : use utils::lsn::Lsn;
      26              : 
      27              : use super::Service;
      28              : 
      29            0 : #[derive(serde::Serialize, serde::Deserialize, Clone)]
      30              : pub struct TimelineLocateResponse {
      31              :     pub generation: SafekeeperGeneration,
      32              :     pub sk_set: Vec<NodeId>,
      33              :     pub new_sk_set: Option<Vec<NodeId>>,
      34              : }
      35              : 
      36              : impl Service {
      37              :     /// Timeline creation on safekeepers
      38              :     ///
      39              :     /// Returns `Ok(left)` if the timeline has been created on a quorum of safekeepers,
      40              :     /// where `left` contains the list of safekeepers that didn't have a successful response.
      41              :     /// Assumes tenant lock is held while calling this function.
      42            0 :     pub(super) async fn tenant_timeline_create_safekeepers_quorum(
      43            0 :         &self,
      44            0 :         tenant_id: TenantId,
      45            0 :         timeline_id: TimelineId,
      46            0 :         pg_version: u32,
      47            0 :         timeline_persistence: &TimelinePersistence,
      48            0 :     ) -> Result<Vec<NodeId>, ApiError> {
      49            0 :         // If quorum is reached, return if we are outside of a specified timeout
      50            0 :         let jwt = self
      51            0 :             .config
      52            0 :             .safekeeper_jwt_token
      53            0 :             .clone()
      54            0 :             .map(SecretString::from);
      55            0 :         let mut joinset = JoinSet::new();
      56            0 : 
      57            0 :         // Prepare membership::Configuration from choosen safekeepers.
      58            0 :         let safekeepers = {
      59            0 :             let locked = self.inner.read().unwrap();
      60            0 :             locked.safekeepers.clone()
      61            0 :         };
      62            0 : 
      63            0 :         let mut members = Vec::new();
      64            0 :         for sk_id in timeline_persistence.sk_set.iter() {
      65            0 :             let sk_id = NodeId(*sk_id as u64);
      66            0 :             let Some(safekeeper) = safekeepers.get(&sk_id) else {
      67            0 :                 return Err(ApiError::InternalServerError(anyhow::anyhow!(
      68            0 :                     "couldn't find entry for safekeeper with id {sk_id}"
      69            0 :                 )))?;
      70              :             };
      71            0 :             members.push(SafekeeperId {
      72            0 :                 id: sk_id,
      73            0 :                 host: safekeeper.skp.host.clone(),
      74            0 :                 pg_port: safekeeper.skp.port as u16,
      75            0 :             });
      76              :         }
      77            0 :         let mset = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
      78            0 :         let mconf = safekeeper_api::membership::Configuration::new(mset);
      79            0 : 
      80            0 :         let req = safekeeper_api::models::TimelineCreateRequest {
      81            0 :             commit_lsn: None,
      82            0 :             mconf,
      83            0 :             pg_version,
      84            0 :             start_lsn: timeline_persistence.start_lsn.0,
      85            0 :             system_id: None,
      86            0 :             tenant_id,
      87            0 :             timeline_id,
      88            0 :             wal_seg_size: None,
      89            0 :         };
      90              :         const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
      91            0 :         for sk in timeline_persistence.sk_set.iter() {
      92            0 :             let sk_id = NodeId(*sk as u64);
      93            0 :             let safekeepers = safekeepers.clone();
      94            0 :             let http_client = self.http_client.clone();
      95            0 :             let jwt = jwt.clone();
      96            0 :             let req = req.clone();
      97            0 :             joinset.spawn(async move {
      98            0 :                 // Unwrap is fine as we already would have returned error above
      99            0 :                 let sk_p = safekeepers.get(&sk_id).unwrap();
     100            0 :                 let res = sk_p
     101            0 :                     .with_client_retries(
     102            0 :                         |client| {
     103            0 :                             let req = req.clone();
     104            0 :                             async move { client.create_timeline(&req).await }
     105            0 :                         },
     106            0 :                         &http_client,
     107            0 :                         &jwt,
     108            0 :                         3,
     109            0 :                         3,
     110            0 :                         SK_CREATE_TIMELINE_RECONCILE_TIMEOUT,
     111            0 :                         &CancellationToken::new(),
     112            0 :                     )
     113            0 :                     .await;
     114            0 :                 (sk_id, sk_p.skp.host.clone(), res)
     115            0 :             });
     116            0 :         }
     117              :         // After we have built the joinset, we now wait for the tasks to complete,
     118              :         // but with a specified timeout to make sure we return swiftly, either with
     119              :         // a failure or success.
     120            0 :         let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
     121            0 : 
     122            0 :         // Wait until all tasks finish or timeout is hit, whichever occurs
     123            0 :         // first.
     124            0 :         let mut reconcile_results = Vec::new();
     125              :         loop {
     126            0 :             if let Ok(res) = tokio::time::timeout_at(reconcile_deadline, joinset.join_next()).await
     127              :             {
     128            0 :                 let Some(res) = res else { break };
     129            0 :                 match res {
     130            0 :                     Ok(res) => {
     131            0 :                         tracing::info!(
     132            0 :                             "response from safekeeper id:{} at {}: {:?}",
     133              :                             res.0,
     134              :                             res.1,
     135              :                             res.2
     136              :                         );
     137            0 :                         reconcile_results.push(res);
     138              :                     }
     139            0 :                     Err(join_err) => {
     140            0 :                         tracing::info!("join_err for task in joinset: {join_err}");
     141              :                     }
     142              :                 }
     143              :             } else {
     144            0 :                 tracing::info!(
     145            0 :                     "timeout for creation call after {} responses",
     146            0 :                     reconcile_results.len()
     147              :                 );
     148            0 :                 break;
     149              :             }
     150              :         }
     151              : 
     152              :         // Now check now if quorum was reached in reconcile_results.
     153            0 :         let total_result_count = reconcile_results.len();
     154            0 :         let remaining = reconcile_results
     155            0 :             .into_iter()
     156            0 :             .filter_map(|res| res.2.is_err().then_some(res.0))
     157            0 :             .collect::<Vec<_>>();
     158            0 :         tracing::info!(
     159            0 :             "Got {} non-successful responses from initial creation request of total {total_result_count} responses",
     160            0 :             remaining.len()
     161              :         );
     162            0 :         let target_sk_count = timeline_persistence.sk_set.len();
     163            0 :         let quorum_size = match target_sk_count {
     164              :             0 => {
     165            0 :                 return Err(ApiError::InternalServerError(anyhow::anyhow!(
     166            0 :                     "timeline configured without any safekeepers",
     167            0 :                 )));
     168              :             }
     169              :             1 | 2 => {
     170              :                 #[cfg(feature = "testing")]
     171              :                 {
     172              :                     // In test settings, it is allowed to have one or two safekeepers
     173            0 :                     target_sk_count
     174              :                 }
     175              :                 #[cfg(not(feature = "testing"))]
     176              :                 {
     177              :                     // The region is misconfigured: we need at least three safekeepers to be configured
     178              :                     // in order to schedule work to them
     179              :                     tracing::warn!(
     180              :                         "couldn't find at least 3 safekeepers for timeline, found: {:?}",
     181              :                         timeline_persistence.sk_set
     182              :                     );
     183              :                     return Err(ApiError::InternalServerError(anyhow::anyhow!(
     184              :                         "couldn't find at least 3 safekeepers to put timeline to"
     185              :                     )));
     186              :                 }
     187              :             }
     188            0 :             _ => target_sk_count / 2 + 1,
     189              :         };
     190            0 :         let success_count = target_sk_count - remaining.len();
     191            0 :         if success_count < quorum_size {
     192              :             // Failure
     193            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
     194            0 :                 "not enough successful reconciliations to reach quorum size: {success_count} of {quorum_size} of total {target_sk_count}"
     195            0 :             )));
     196            0 :         }
     197            0 : 
     198            0 :         Ok(remaining)
     199            0 :     }
     200              : 
     201              :     /// Create timeline in controller database and on safekeepers.
     202              :     /// `timeline_info` is result of timeline creation on pageserver.
     203              :     ///
     204              :     /// All actions must be idempotent as the call is retried until success. It
     205              :     /// tries to create timeline in the db and on at least majority of
     206              :     /// safekeepers + queue creation for safekeepers which missed it in the db
     207              :     /// for infinite retries; after that, call returns Ok.
     208              :     ///
     209              :     /// The idea is that once this is reached as long as we have alive majority
     210              :     /// of safekeepers it is expected to get eventually operational as storcon
     211              :     /// will be able to seed timeline on nodes which missed creation by making
     212              :     /// pull_timeline from peers. On the other hand we don't want to fail
     213              :     /// timeline creation if one safekeeper is down.
     214            0 :     pub(super) async fn tenant_timeline_create_safekeepers(
     215            0 :         self: &Arc<Self>,
     216            0 :         tenant_id: TenantId,
     217            0 :         timeline_info: &TimelineInfo,
     218            0 :         read_only: bool,
     219            0 :     ) -> Result<SafekeepersInfo, ApiError> {
     220            0 :         let timeline_id = timeline_info.timeline_id;
     221            0 :         let pg_version = timeline_info.pg_version * 10000;
     222            0 :         // Initially start_lsn is determined by last_record_lsn in pageserver
     223            0 :         // response as it does initdb. However, later we persist it and in sk
     224            0 :         // creation calls replace with the value from the timeline row if it
     225            0 :         // previously existed as on retries in theory endpoint might have
     226            0 :         // already written some data and advanced last_record_lsn, while we want
     227            0 :         // safekeepers to have consistent start_lsn.
     228            0 :         let start_lsn = timeline_info.last_record_lsn;
     229              : 
     230              :         // Choose initial set of safekeepers respecting affinity
     231            0 :         let sks = if !read_only {
     232            0 :             self.safekeepers_for_new_timeline().await?
     233              :         } else {
     234            0 :             Vec::new()
     235              :         };
     236            0 :         let sks_persistence = sks.iter().map(|sk| sk.id.0 as i64).collect::<Vec<_>>();
     237            0 :         // Add timeline to db
     238            0 :         let mut timeline_persist = TimelinePersistence {
     239            0 :             tenant_id: tenant_id.to_string(),
     240            0 :             timeline_id: timeline_id.to_string(),
     241            0 :             start_lsn: start_lsn.into(),
     242            0 :             generation: 1,
     243            0 :             sk_set: sks_persistence.clone(),
     244            0 :             new_sk_set: None,
     245            0 :             cplane_notified_generation: 0,
     246            0 :             deleted_at: None,
     247            0 :         };
     248            0 :         let inserted = self
     249            0 :             .persistence
     250            0 :             .insert_timeline(timeline_persist.clone())
     251            0 :             .await?;
     252            0 :         if !inserted {
     253            0 :             if let Some(existent_persist) = self
     254            0 :                 .persistence
     255            0 :                 .get_timeline(tenant_id, timeline_id)
     256            0 :                 .await?
     257            0 :             {
     258            0 :                 // Replace with what we have in the db, to get stuff like the generation right.
     259            0 :                 // We do still repeat the http calls to the safekeepers. After all, we could have
     260            0 :                 // crashed right after the wrote to the DB.
     261            0 :                 timeline_persist = existent_persist;
     262            0 :             } else {
     263            0 :                 return Err(ApiError::InternalServerError(anyhow::anyhow!(
     264            0 :                     "insertion said timeline already in db, but looking it up, it was gone"
     265            0 :                 )));
     266              :             }
     267            0 :         }
     268            0 :         let ret = SafekeepersInfo {
     269            0 :             generation: timeline_persist.generation as u32,
     270            0 :             safekeepers: sks.clone(),
     271            0 :             tenant_id,
     272            0 :             timeline_id,
     273            0 :         };
     274            0 :         if read_only {
     275            0 :             return Ok(ret);
     276            0 :         }
     277              : 
     278              :         // Create the timeline on a quorum of safekeepers
     279            0 :         let remaining = self
     280            0 :             .tenant_timeline_create_safekeepers_quorum(
     281            0 :                 tenant_id,
     282            0 :                 timeline_id,
     283            0 :                 pg_version,
     284            0 :                 &timeline_persist,
     285            0 :             )
     286            0 :             .await?;
     287              : 
     288              :         // For the remaining safekeepers, take care of their reconciliation asynchronously
     289            0 :         for &remaining_id in remaining.iter() {
     290            0 :             let pending_op = TimelinePendingOpPersistence {
     291            0 :                 tenant_id: tenant_id.to_string(),
     292            0 :                 timeline_id: timeline_id.to_string(),
     293            0 :                 generation: timeline_persist.generation,
     294            0 :                 op_kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
     295            0 :                 sk_id: remaining_id.0 as i64,
     296            0 :             };
     297            0 :             tracing::info!("writing pending op for sk id {remaining_id}");
     298            0 :             self.persistence.insert_pending_op(pending_op).await?;
     299              :         }
     300            0 :         if !remaining.is_empty() {
     301            0 :             let locked = self.inner.read().unwrap();
     302            0 :             for remaining_id in remaining {
     303            0 :                 let Some(sk) = locked.safekeepers.get(&remaining_id) else {
     304            0 :                     return Err(ApiError::InternalServerError(anyhow::anyhow!(
     305            0 :                         "Couldn't find safekeeper with id {remaining_id}"
     306            0 :                     )));
     307              :                 };
     308            0 :                 let Ok(host_list) = sks
     309            0 :                     .iter()
     310            0 :                     .map(|sk| {
     311            0 :                         Ok((
     312            0 :                             sk.id,
     313            0 :                             locked
     314            0 :                                 .safekeepers
     315            0 :                                 .get(&sk.id)
     316            0 :                                 .ok_or_else(|| {
     317            0 :                                     ApiError::InternalServerError(anyhow::anyhow!(
     318            0 :                                         "Couldn't find safekeeper with id {} to pull from",
     319            0 :                                         sk.id
     320            0 :                                     ))
     321            0 :                                 })?
     322            0 :                                 .base_url(),
     323              :                         ))
     324            0 :                     })
     325            0 :                     .collect::<Result<_, ApiError>>()
     326              :                 else {
     327            0 :                     continue;
     328              :                 };
     329            0 :                 let req = ScheduleRequest {
     330            0 :                     safekeeper: Box::new(sk.clone()),
     331            0 :                     host_list,
     332            0 :                     tenant_id,
     333            0 :                     timeline_id: Some(timeline_id),
     334            0 :                     generation: timeline_persist.generation as u32,
     335            0 :                     kind: crate::persistence::SafekeeperTimelineOpKind::Pull,
     336            0 :                 };
     337            0 :                 locked.safekeeper_reconcilers.schedule_request(req);
     338              :             }
     339            0 :         }
     340              : 
     341            0 :         Ok(ret)
     342            0 :     }
     343              : 
     344            0 :     pub(crate) async fn tenant_timeline_create_safekeepers_until_success(
     345            0 :         self: &Arc<Self>,
     346            0 :         tenant_id: TenantId,
     347            0 :         timeline_info: TimelineInfo,
     348            0 :     ) -> Result<(), TimelineImportFinalizeError> {
     349              :         const BACKOFF: Duration = Duration::from_secs(5);
     350              : 
     351              :         loop {
     352            0 :             if self.cancel.is_cancelled() {
     353            0 :                 return Err(TimelineImportFinalizeError::ShuttingDown);
     354            0 :             }
     355            0 : 
     356            0 :             // This function is only used in non-read-only scenarios
     357            0 :             let read_only = false;
     358            0 :             let res = self
     359            0 :                 .tenant_timeline_create_safekeepers(tenant_id, &timeline_info, read_only)
     360            0 :                 .await;
     361              : 
     362            0 :             match res {
     363              :                 Ok(_) => {
     364            0 :                     tracing::info!("Timeline created on safekeepers");
     365            0 :                     break;
     366              :                 }
     367            0 :                 Err(err) => {
     368            0 :                     tracing::error!("Failed to create timeline on safekeepers: {err}");
     369            0 :                     tokio::select! {
     370            0 :                         _ = self.cancel.cancelled() => {
     371            0 :                             return Err(TimelineImportFinalizeError::ShuttingDown);
     372              :                         },
     373            0 :                         _ = tokio::time::sleep(BACKOFF) => {}
     374              :                     };
     375              :                 }
     376              :             }
     377              :         }
     378              : 
     379            0 :         Ok(())
     380            0 :     }
     381              : 
     382              :     /// Directly insert the timeline into the database without reconciling it with safekeepers.
     383              :     ///
     384              :     /// Useful if the timeline already exists on the specified safekeepers,
     385              :     /// but we want to make it storage controller managed.
     386            0 :     pub(crate) async fn timeline_import(&self, req: TimelineImportRequest) -> Result<(), ApiError> {
     387            0 :         let persistence = TimelinePersistence {
     388            0 :             tenant_id: req.tenant_id.to_string(),
     389            0 :             timeline_id: req.timeline_id.to_string(),
     390            0 :             start_lsn: Lsn::INVALID.into(),
     391            0 :             generation: 1,
     392            0 :             sk_set: req.sk_set.iter().map(|sk_id| sk_id.0 as i64).collect(),
     393            0 :             new_sk_set: None,
     394            0 :             cplane_notified_generation: 1,
     395            0 :             deleted_at: None,
     396            0 :         };
     397            0 :         let inserted = self.persistence.insert_timeline(persistence).await?;
     398            0 :         if inserted {
     399            0 :             tracing::info!("imported timeline into db");
     400              :         } else {
     401            0 :             tracing::info!("didn't import timeline into db, as it is already present in db");
     402              :         }
     403            0 :         Ok(())
     404            0 :     }
     405              : 
     406              :     /// Locate safekeepers for a timeline.
     407              :     /// Return the generation, sk_set and new_sk_set if present.
     408              :     /// If the timeline is not storcon-managed, return NotFound.
     409            0 :     pub(crate) async fn tenant_timeline_locate(
     410            0 :         &self,
     411            0 :         tenant_id: TenantId,
     412            0 :         timeline_id: TimelineId,
     413            0 :     ) -> Result<TimelineLocateResponse, ApiError> {
     414            0 :         let timeline = self
     415            0 :             .persistence
     416            0 :             .get_timeline(tenant_id, timeline_id)
     417            0 :             .await?;
     418              : 
     419            0 :         let Some(timeline) = timeline else {
     420            0 :             return Err(ApiError::NotFound(
     421            0 :                 anyhow::anyhow!("Timeline {}/{} not found", tenant_id, timeline_id).into(),
     422            0 :             ));
     423              :         };
     424              : 
     425            0 :         Ok(TimelineLocateResponse {
     426            0 :             generation: SafekeeperGeneration::new(timeline.generation as u32),
     427            0 :             sk_set: timeline
     428            0 :                 .sk_set
     429            0 :                 .iter()
     430            0 :                 .map(|id| NodeId(*id as u64))
     431            0 :                 .collect(),
     432            0 :             new_sk_set: timeline
     433            0 :                 .new_sk_set
     434            0 :                 .map(|sk_set| sk_set.iter().map(|id| NodeId(*id as u64)).collect()),
     435            0 :         })
     436            0 :     }
     437              : 
     438              :     /// Perform timeline deletion on safekeepers. Will return success: we persist the deletion into the reconciler.
     439            0 :     pub(super) async fn tenant_timeline_delete_safekeepers(
     440            0 :         self: &Arc<Self>,
     441            0 :         tenant_id: TenantId,
     442            0 :         timeline_id: TimelineId,
     443            0 :     ) -> Result<(), ApiError> {
     444            0 :         let tl = self
     445            0 :             .persistence
     446            0 :             .get_timeline(tenant_id, timeline_id)
     447            0 :             .await?;
     448            0 :         let Some(tl) = tl else {
     449            0 :             tracing::info!(
     450            0 :                 "timeline {tenant_id}/{timeline_id} doesn't exist in timelines table, no deletions on safekeepers needed"
     451              :             );
     452            0 :             return Ok(());
     453              :         };
     454            0 :         self.persistence
     455            0 :             .timeline_set_deleted_at(tenant_id, timeline_id)
     456            0 :             .await?;
     457            0 :         let all_sks = tl
     458            0 :             .new_sk_set
     459            0 :             .iter()
     460            0 :             .flatten()
     461            0 :             .chain(tl.sk_set.iter())
     462            0 :             .collect::<HashSet<_>>();
     463            0 : 
     464            0 :         // The timeline has no safekeepers: we need to delete it from the db manually,
     465            0 :         // as no safekeeper reconciler will get to it
     466            0 :         if all_sks.is_empty() {
     467            0 :             if let Err(err) = self
     468            0 :                 .persistence
     469            0 :                 .delete_timeline(tenant_id, timeline_id)
     470            0 :                 .await
     471              :             {
     472            0 :                 tracing::warn!(%tenant_id, %timeline_id, "couldn't delete timeline from db: {err}");
     473            0 :             }
     474            0 :         }
     475              : 
     476              :         // Schedule reconciliations
     477            0 :         for &sk_id in all_sks.iter() {
     478            0 :             let pending_op = TimelinePendingOpPersistence {
     479            0 :                 tenant_id: tenant_id.to_string(),
     480            0 :                 timeline_id: timeline_id.to_string(),
     481            0 :                 generation: i32::MAX,
     482            0 :                 op_kind: SafekeeperTimelineOpKind::Delete,
     483            0 :                 sk_id: *sk_id,
     484            0 :             };
     485            0 :             tracing::info!("writing pending op for sk id {sk_id}");
     486            0 :             self.persistence.insert_pending_op(pending_op).await?;
     487              :         }
     488              :         {
     489            0 :             let locked = self.inner.read().unwrap();
     490            0 :             for sk_id in all_sks {
     491            0 :                 let sk_id = NodeId(*sk_id as u64);
     492            0 :                 let Some(sk) = locked.safekeepers.get(&sk_id) else {
     493            0 :                     return Err(ApiError::InternalServerError(anyhow::anyhow!(
     494            0 :                         "Couldn't find safekeeper with id {sk_id}"
     495            0 :                     )));
     496              :                 };
     497              : 
     498            0 :                 let req = ScheduleRequest {
     499            0 :                     safekeeper: Box::new(sk.clone()),
     500            0 :                     // we don't use this for this kind, put a dummy value
     501            0 :                     host_list: Vec::new(),
     502            0 :                     tenant_id,
     503            0 :                     timeline_id: Some(timeline_id),
     504            0 :                     generation: tl.generation as u32,
     505            0 :                     kind: SafekeeperTimelineOpKind::Delete,
     506            0 :                 };
     507            0 :                 locked.safekeeper_reconcilers.schedule_request(req);
     508              :             }
     509              :         }
     510            0 :         Ok(())
     511            0 :     }
     512              : 
     513              :     /// Perform tenant deletion on safekeepers.
     514            0 :     pub(super) async fn tenant_delete_safekeepers(
     515            0 :         self: &Arc<Self>,
     516            0 :         tenant_id: TenantId,
     517            0 :     ) -> Result<(), ApiError> {
     518            0 :         let timeline_list = self
     519            0 :             .persistence
     520            0 :             .list_timelines_for_tenant(tenant_id)
     521            0 :             .await?;
     522              : 
     523            0 :         if timeline_list.is_empty() {
     524              :             // Early exit: the tenant is either empty or not migrated to the storcon yet
     525            0 :             tracing::info!("Skipping tenant delete as the timeline doesn't exist in db");
     526            0 :             return Ok(());
     527            0 :         }
     528              : 
     529            0 :         let timeline_list = timeline_list
     530            0 :             .into_iter()
     531            0 :             .map(|timeline| {
     532            0 :                 let timeline_id = TimelineId::from_str(&timeline.timeline_id)
     533            0 :                     .context("timeline id loaded from db")
     534            0 :                     .map_err(ApiError::InternalServerError)?;
     535            0 :                 Ok((timeline_id, timeline))
     536            0 :             })
     537            0 :             .collect::<Result<Vec<_>, ApiError>>()?;
     538              : 
     539              :         // Remove pending ops from db, and set `deleted_at`.
     540              :         // We cancel them in a later iteration once we hold the state lock.
     541            0 :         for (timeline_id, _timeline) in timeline_list.iter() {
     542            0 :             self.persistence
     543            0 :                 .remove_pending_ops_for_timeline(tenant_id, Some(*timeline_id))
     544            0 :                 .await?;
     545            0 :             self.persistence
     546            0 :                 .timeline_set_deleted_at(tenant_id, *timeline_id)
     547            0 :                 .await?;
     548              :         }
     549              : 
     550              :         // The list of safekeepers that have any of the timelines
     551            0 :         let mut sk_list = HashSet::new();
     552              : 
     553              :         // List all pending ops for all timelines, cancel them
     554            0 :         for (_timeline_id, timeline) in timeline_list.iter() {
     555            0 :             let sk_iter = timeline
     556            0 :                 .sk_set
     557            0 :                 .iter()
     558            0 :                 .chain(timeline.new_sk_set.iter().flatten())
     559            0 :                 .map(|id| NodeId(*id as u64));
     560            0 :             sk_list.extend(sk_iter);
     561            0 :         }
     562              : 
     563            0 :         for &sk_id in sk_list.iter() {
     564            0 :             let pending_op = TimelinePendingOpPersistence {
     565            0 :                 tenant_id: tenant_id.to_string(),
     566            0 :                 timeline_id: String::new(),
     567            0 :                 generation: i32::MAX,
     568            0 :                 op_kind: SafekeeperTimelineOpKind::Delete,
     569            0 :                 sk_id: sk_id.0 as i64,
     570            0 :             };
     571            0 :             tracing::info!("writing pending op for sk id {sk_id}");
     572            0 :             self.persistence.insert_pending_op(pending_op).await?;
     573              :         }
     574              : 
     575            0 :         let mut locked = self.inner.write().unwrap();
     576              : 
     577            0 :         for (timeline_id, _timeline) in timeline_list.iter() {
     578            0 :             for sk_id in sk_list.iter() {
     579            0 :                 locked
     580            0 :                     .safekeeper_reconcilers
     581            0 :                     .cancel_reconciles_for_timeline(*sk_id, tenant_id, Some(*timeline_id));
     582            0 :             }
     583              :         }
     584              : 
     585              :         // unwrap is safe: we return above for an empty timeline list
     586            0 :         let max_generation = timeline_list
     587            0 :             .iter()
     588            0 :             .map(|(_tl_id, tl)| tl.generation as u32)
     589            0 :             .max()
     590            0 :             .unwrap();
     591              : 
     592            0 :         for sk_id in sk_list {
     593            0 :             let Some(safekeeper) = locked.safekeepers.get(&sk_id) else {
     594            0 :                 tracing::warn!("Couldn't find safekeeper with id {sk_id}");
     595            0 :                 continue;
     596              :             };
     597              :             // Add pending op for tenant deletion
     598            0 :             let req = ScheduleRequest {
     599            0 :                 generation: max_generation,
     600            0 :                 host_list: Vec::new(),
     601            0 :                 kind: SafekeeperTimelineOpKind::Delete,
     602            0 :                 safekeeper: Box::new(safekeeper.clone()),
     603            0 :                 tenant_id,
     604            0 :                 timeline_id: None,
     605            0 :             };
     606            0 :             locked.safekeeper_reconcilers.schedule_request(req);
     607              :         }
     608            0 :         Ok(())
     609            0 :     }
     610              : 
     611              :     /// Choose safekeepers for the new timeline: 3 in different azs.
     612            0 :     pub(crate) async fn safekeepers_for_new_timeline(
     613            0 :         &self,
     614            0 :     ) -> Result<Vec<SafekeeperInfo>, ApiError> {
     615            0 :         let mut all_safekeepers = {
     616            0 :             let locked = self.inner.read().unwrap();
     617            0 :             locked
     618            0 :                 .safekeepers
     619            0 :                 .iter()
     620            0 :                 .filter_map(|sk| {
     621            0 :                     if sk.1.scheduling_policy() != SkSchedulingPolicy::Active {
     622              :                         // If we don't want to schedule stuff onto the safekeeper, respect that.
     623            0 :                         return None;
     624            0 :                     }
     625            0 :                     let utilization_opt = if let SafekeeperState::Available {
     626              :                         last_seen_at: _,
     627            0 :                         utilization,
     628            0 :                     } = sk.1.availability()
     629              :                     {
     630            0 :                         Some(utilization)
     631              :                     } else {
     632              :                         // non-available safekeepers still get a chance for new timelines,
     633              :                         // but put them last in the list.
     634            0 :                         None
     635              :                     };
     636            0 :                     let info = SafekeeperInfo {
     637            0 :                         hostname: sk.1.skp.host.clone(),
     638            0 :                         id: NodeId(sk.1.skp.id as u64),
     639            0 :                     };
     640            0 :                     Some((utilization_opt, info, sk.1.skp.availability_zone_id.clone()))
     641            0 :                 })
     642            0 :                 .collect::<Vec<_>>()
     643            0 :         };
     644            0 :         all_safekeepers.sort_by_key(|sk| {
     645            0 :             (
     646            0 :                 sk.0.as_ref()
     647            0 :                     .map(|ut| ut.timeline_count)
     648            0 :                     .unwrap_or(u64::MAX),
     649            0 :                 // Use the id to decide on equal scores for reliability
     650            0 :                 sk.1.id.0,
     651            0 :             )
     652            0 :         });
     653              :         // Number of safekeepers in different AZs we are looking for
     654            0 :         let wanted_count = match all_safekeepers.len() {
     655              :             0 => {
     656            0 :                 return Err(ApiError::InternalServerError(anyhow::anyhow!(
     657            0 :                     "couldn't find any active safekeeper for new timeline",
     658            0 :                 )));
     659              :             }
     660              :             // Have laxer requirements on testig mode as we don't want to
     661              :             // spin up three safekeepers for every single test
     662              :             #[cfg(feature = "testing")]
     663            0 :             1 | 2 => all_safekeepers.len(),
     664            0 :             _ => 3,
     665              :         };
     666            0 :         let mut sks = Vec::new();
     667            0 :         let mut azs = HashSet::new();
     668            0 :         for (_sk_util, sk_info, az_id) in all_safekeepers.iter() {
     669            0 :             if !azs.insert(az_id) {
     670            0 :                 continue;
     671            0 :             }
     672            0 :             sks.push(sk_info.clone());
     673            0 :             if sks.len() == wanted_count {
     674            0 :                 break;
     675            0 :             }
     676              :         }
     677            0 :         if sks.len() == wanted_count {
     678            0 :             Ok(sks)
     679              :         } else {
     680            0 :             Err(ApiError::InternalServerError(anyhow::anyhow!(
     681            0 :                 "couldn't find {wanted_count} safekeepers in different AZs for new timeline (found: {}, total active: {})",
     682            0 :                 sks.len(),
     683            0 :                 all_safekeepers.len(),
     684            0 :             )))
     685              :         }
     686            0 :     }
     687              : 
     688            0 :     pub(crate) async fn safekeepers_list(
     689            0 :         &self,
     690            0 :     ) -> Result<Vec<SafekeeperDescribeResponse>, DatabaseError> {
     691            0 :         let locked = self.inner.read().unwrap();
     692            0 :         let mut list = locked
     693            0 :             .safekeepers
     694            0 :             .iter()
     695            0 :             .map(|sk| sk.1.describe_response())
     696            0 :             .collect::<Result<Vec<_>, _>>()?;
     697            0 :         list.sort_by_key(|v| v.id);
     698            0 :         Ok(list)
     699            0 :     }
     700              : 
     701            0 :     pub(crate) async fn get_safekeeper(
     702            0 :         &self,
     703            0 :         id: i64,
     704            0 :     ) -> Result<SafekeeperDescribeResponse, DatabaseError> {
     705            0 :         let locked = self.inner.read().unwrap();
     706            0 :         let sk = locked
     707            0 :             .safekeepers
     708            0 :             .get(&NodeId(id as u64))
     709            0 :             .ok_or(diesel::result::Error::NotFound)?;
     710            0 :         sk.describe_response()
     711            0 :     }
     712              : 
     713            0 :     pub(crate) async fn upsert_safekeeper(
     714            0 :         self: &Arc<Service>,
     715            0 :         record: crate::persistence::SafekeeperUpsert,
     716            0 :     ) -> Result<(), ApiError> {
     717            0 :         let node_id = NodeId(record.id as u64);
     718            0 :         let use_https = self.config.use_https_safekeeper_api;
     719            0 : 
     720            0 :         if use_https && record.https_port.is_none() {
     721            0 :             return Err(ApiError::PreconditionFailed(
     722            0 :                 format!(
     723            0 :                     "cannot upsert safekeeper {node_id}: \
     724            0 :                     https is enabled, but https port is not specified"
     725            0 :                 )
     726            0 :                 .into(),
     727            0 :             ));
     728            0 :         }
     729            0 : 
     730            0 :         self.persistence.safekeeper_upsert(record.clone()).await?;
     731              :         {
     732            0 :             let mut locked = self.inner.write().unwrap();
     733            0 :             let mut safekeepers = (*locked.safekeepers).clone();
     734            0 :             match safekeepers.entry(node_id) {
     735            0 :                 std::collections::hash_map::Entry::Occupied(mut entry) => entry
     736            0 :                     .get_mut()
     737            0 :                     .update_from_record(record)
     738            0 :                     .expect("all preconditions should be checked before upsert to database"),
     739            0 :                 std::collections::hash_map::Entry::Vacant(entry) => {
     740            0 :                     entry.insert(
     741            0 :                         Safekeeper::from_persistence(
     742            0 :                             crate::persistence::SafekeeperPersistence::from_upsert(
     743            0 :                                 record,
     744            0 :                                 SkSchedulingPolicy::Pause,
     745            0 :                             ),
     746            0 :                             CancellationToken::new(),
     747            0 :                             use_https,
     748            0 :                         )
     749            0 :                         .expect("all preconditions should be checked before upsert to database"),
     750            0 :                     );
     751            0 :                 }
     752              :             }
     753            0 :             locked
     754            0 :                 .safekeeper_reconcilers
     755            0 :                 .start_reconciler(node_id, self);
     756            0 :             locked.safekeepers = Arc::new(safekeepers);
     757            0 :             metrics::METRICS_REGISTRY
     758            0 :                 .metrics_group
     759            0 :                 .storage_controller_safekeeper_nodes
     760            0 :                 .set(locked.safekeepers.len() as i64);
     761            0 :             metrics::METRICS_REGISTRY
     762            0 :                 .metrics_group
     763            0 :                 .storage_controller_https_safekeeper_nodes
     764            0 :                 .set(
     765            0 :                     locked
     766            0 :                         .safekeepers
     767            0 :                         .values()
     768            0 :                         .filter(|s| s.has_https_port())
     769            0 :                         .count() as i64,
     770            0 :                 );
     771            0 :         }
     772            0 :         Ok(())
     773            0 :     }
     774              : 
     775            0 :     pub(crate) async fn set_safekeeper_scheduling_policy(
     776            0 :         self: &Arc<Service>,
     777            0 :         id: i64,
     778            0 :         scheduling_policy: SkSchedulingPolicy,
     779            0 :     ) -> Result<(), DatabaseError> {
     780            0 :         self.persistence
     781            0 :             .set_safekeeper_scheduling_policy(id, scheduling_policy)
     782            0 :             .await?;
     783            0 :         let node_id = NodeId(id as u64);
     784            0 :         // After the change has been persisted successfully, update the in-memory state
     785            0 :         {
     786            0 :             let mut locked = self.inner.write().unwrap();
     787            0 :             let mut safekeepers = (*locked.safekeepers).clone();
     788            0 :             let sk = safekeepers
     789            0 :                 .get_mut(&node_id)
     790            0 :                 .ok_or(DatabaseError::Logical("Not found".to_string()))?;
     791            0 :             sk.set_scheduling_policy(scheduling_policy);
     792            0 : 
     793            0 :             match scheduling_policy {
     794            0 :                 SkSchedulingPolicy::Active => {
     795            0 :                     locked
     796            0 :                         .safekeeper_reconcilers
     797            0 :                         .start_reconciler(node_id, self);
     798            0 :                 }
     799            0 :                 SkSchedulingPolicy::Decomissioned | SkSchedulingPolicy::Pause => {
     800            0 :                     locked.safekeeper_reconcilers.stop_reconciler(node_id);
     801            0 :                 }
     802              :             }
     803              : 
     804            0 :             locked.safekeepers = Arc::new(safekeepers);
     805            0 :         }
     806            0 :         Ok(())
     807            0 :     }
     808              : }
        

Generated by: LCOV version 2.1-beta