LCOV - code coverage report
Current view: top level - storage_controller/src - timeline_import.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 144 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 19 0

            Line data    Source code
       1              : use std::time::Duration;
       2              : use std::{collections::HashMap, str::FromStr};
       3              : 
       4              : use http_utils::error::ApiError;
       5              : use reqwest::Method;
       6              : use serde::{Deserialize, Serialize};
       7              : 
       8              : use pageserver_api::models::{ShardImportProgress, ShardImportStatus};
       9              : use tokio_util::sync::CancellationToken;
      10              : use utils::sync::gate::Gate;
      11              : use utils::{
      12              :     id::{TenantId, TimelineId},
      13              :     shard::ShardIndex,
      14              : };
      15              : 
      16              : use crate::{persistence::TimelineImportPersistence, service::Config};
      17              : 
      18            0 : #[derive(Deserialize, Serialize, PartialEq, Eq)]
      19              : pub(crate) enum TimelineImportState {
      20              :     Importing,
      21              :     Idle,
      22              : }
      23              : 
      24            0 : #[derive(Serialize, Deserialize, Clone, Debug)]
      25              : pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
      26              : 
      27              : impl ShardImportStatuses {
      28            0 :     pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
      29              :         ShardImportStatuses(
      30            0 :             shards
      31            0 :                 .into_iter()
      32            0 :                 .map(|ts_id| {
      33            0 :                     (
      34            0 :                         ts_id,
      35            0 :                         ShardImportStatus::InProgress(None::<ShardImportProgress>),
      36            0 :                     )
      37            0 :                 })
      38            0 :                 .collect(),
      39              :         )
      40            0 :     }
      41              : }
      42              : 
      43              : #[derive(Debug)]
      44              : pub(crate) struct TimelineImport {
      45              :     pub(crate) tenant_id: TenantId,
      46              :     pub(crate) timeline_id: TimelineId,
      47              :     pub(crate) shard_statuses: ShardImportStatuses,
      48              : }
      49              : 
      50              : pub(crate) enum TimelineImportUpdateFollowUp {
      51              :     Persist,
      52              :     None,
      53              : }
      54              : 
      55              : #[derive(thiserror::Error, Debug)]
      56              : pub(crate) enum TimelineImportFinalizeError {
      57              :     #[error("Shut down interrupted import finalize")]
      58              :     ShuttingDown,
      59              :     #[error("Import finalization was cancelled")]
      60              :     Cancelled,
      61              :     #[error("Mismatched shard detected during import finalize: {0}")]
      62              :     MismatchedShards(ShardIndex),
      63              : }
      64              : 
      65              : pub(crate) enum TimelineImportUpdateError {
      66              :     ImportNotFound {
      67              :         tenant_id: TenantId,
      68              :         timeline_id: TimelineId,
      69              :     },
      70              :     MismatchedShards,
      71              :     UnexpectedUpdate,
      72              : }
      73              : 
      74              : impl From<TimelineImportUpdateError> for ApiError {
      75            0 :     fn from(err: TimelineImportUpdateError) -> ApiError {
      76            0 :         match err {
      77              :             TimelineImportUpdateError::ImportNotFound {
      78            0 :                 tenant_id,
      79            0 :                 timeline_id,
      80            0 :             } => ApiError::NotFound(
      81            0 :                 anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
      82            0 :             ),
      83              :             TimelineImportUpdateError::MismatchedShards => {
      84            0 :                 ApiError::InternalServerError(anyhow::anyhow!(
      85            0 :                     "Import shards do not match update request, likely a shard split happened during import, this is a bug"
      86            0 :                 ))
      87              :             }
      88              :             TimelineImportUpdateError::UnexpectedUpdate => {
      89            0 :                 ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
      90              :             }
      91              :         }
      92            0 :     }
      93              : }
      94              : 
      95              : impl TimelineImport {
      96            0 :     pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
      97            0 :         let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
      98            0 :         let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
      99            0 :         let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
     100              : 
     101            0 :         Ok(TimelineImport {
     102            0 :             tenant_id,
     103            0 :             timeline_id,
     104            0 :             shard_statuses,
     105            0 :         })
     106            0 :     }
     107              : 
     108            0 :     pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
     109            0 :         TimelineImportPersistence {
     110            0 :             tenant_id: self.tenant_id.to_string(),
     111            0 :             timeline_id: self.timeline_id.to_string(),
     112            0 :             shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
     113            0 :         }
     114            0 :     }
     115              : 
     116            0 :     pub(crate) fn update(
     117            0 :         &mut self,
     118            0 :         shard: ShardIndex,
     119            0 :         status: ShardImportStatus,
     120            0 :     ) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
     121              :         use std::collections::hash_map::Entry::*;
     122              : 
     123            0 :         match self.shard_statuses.0.entry(shard) {
     124            0 :             Occupied(mut occ) => {
     125            0 :                 let crnt = occ.get_mut();
     126            0 :                 if *crnt == status {
     127            0 :                     Ok(TimelineImportUpdateFollowUp::None)
     128            0 :                 } else if crnt.is_terminal() && *crnt != status {
     129            0 :                     Err(TimelineImportUpdateError::UnexpectedUpdate)
     130              :                 } else {
     131            0 :                     *crnt = status;
     132            0 :                     Ok(TimelineImportUpdateFollowUp::Persist)
     133              :                 }
     134              :             }
     135            0 :             Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
     136              :         }
     137            0 :     }
     138              : 
     139            0 :     pub(crate) fn is_complete(&self) -> bool {
     140            0 :         self.shard_statuses
     141            0 :             .0
     142            0 :             .values()
     143            0 :             .all(|status| status.is_terminal())
     144            0 :     }
     145              : 
     146            0 :     pub(crate) fn completion_error(&self) -> Option<String> {
     147            0 :         assert!(self.is_complete());
     148              : 
     149            0 :         let shard_errors: HashMap<_, _> = self
     150            0 :             .shard_statuses
     151            0 :             .0
     152            0 :             .iter()
     153            0 :             .filter_map(|(shard, status)| {
     154            0 :                 if let ShardImportStatus::Error(err) = status {
     155            0 :                     Some((*shard, err.clone()))
     156              :                 } else {
     157            0 :                     None
     158              :                 }
     159            0 :             })
     160            0 :             .collect();
     161              : 
     162            0 :         if shard_errors.is_empty() {
     163            0 :             None
     164              :         } else {
     165            0 :             Some(serde_json::to_string(&shard_errors).unwrap())
     166              :         }
     167            0 :     }
     168              : }
     169              : 
     170              : pub(crate) struct FinalizingImport {
     171              :     pub(crate) gate: Gate,
     172              :     pub(crate) cancel: CancellationToken,
     173              : }
     174              : 
     175              : pub(crate) type ImportResult = Result<(), String>;
     176              : 
     177              : pub(crate) struct UpcallClient {
     178              :     authorization_header: Option<String>,
     179              :     client: reqwest::Client,
     180              :     cancel: CancellationToken,
     181              :     base_url: String,
     182              : }
     183              : 
     184              : const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
     185              : 
     186            0 : #[derive(Serialize, Deserialize, Debug)]
     187              : struct ImportCompleteRequest {
     188              :     tenant_id: TenantId,
     189              :     timeline_id: TimelineId,
     190              :     error: Option<String>,
     191              : }
     192              : 
     193              : impl UpcallClient {
     194            0 :     pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
     195            0 :         let authorization_header = config
     196            0 :             .control_plane_jwt_token
     197            0 :             .clone()
     198            0 :             .map(|jwt| format!("Bearer {jwt}"));
     199              : 
     200            0 :         let client = reqwest::ClientBuilder::new()
     201            0 :             .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
     202            0 :             .build()
     203            0 :             .expect("Failed to construct HTTP client");
     204              : 
     205            0 :         let base_url = config
     206            0 :             .control_plane_url
     207            0 :             .clone()
     208            0 :             .expect("must be configured");
     209              : 
     210            0 :         Self {
     211            0 :             authorization_header,
     212            0 :             client,
     213            0 :             cancel,
     214            0 :             base_url,
     215            0 :         }
     216            0 :     }
     217              : 
     218              :     /// Notify control plane of a completed import
     219              :     ///
     220              :     /// This method guarantees at least once delivery semantics assuming
     221              :     /// eventual cplane availability. The cplane API is idempotent.
     222            0 :     pub(crate) async fn notify_import_complete(
     223            0 :         &self,
     224            0 :         tenant_id: TenantId,
     225            0 :         timeline_id: TimelineId,
     226            0 :         import_result: ImportResult,
     227            0 :     ) -> anyhow::Result<()> {
     228            0 :         let endpoint = if self.base_url.ends_with('/') {
     229            0 :             format!("{}import_complete", self.base_url)
     230              :         } else {
     231            0 :             format!("{}/import_complete", self.base_url)
     232              :         };
     233              : 
     234            0 :         let request = self
     235            0 :             .client
     236            0 :             .request(Method::PUT, endpoint)
     237            0 :             .json(&ImportCompleteRequest {
     238            0 :                 tenant_id,
     239            0 :                 timeline_id,
     240            0 :                 error: import_result.err(),
     241            0 :             })
     242            0 :             .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
     243              : 
     244            0 :         let request = if let Some(auth) = &self.authorization_header {
     245            0 :             request.header(reqwest::header::AUTHORIZATION, auth)
     246              :         } else {
     247            0 :             request
     248              :         };
     249              : 
     250              :         const RETRY_DELAY: Duration = Duration::from_secs(1);
     251            0 :         let mut attempt = 1;
     252              : 
     253              :         loop {
     254            0 :             if self.cancel.is_cancelled() {
     255            0 :                 return Err(anyhow::anyhow!(
     256            0 :                     "Shutting down while notifying cplane of import completion"
     257            0 :                 ));
     258            0 :             }
     259              : 
     260            0 :             match request.try_clone().unwrap().send().await {
     261            0 :                 Ok(response) if response.status().is_success() => {
     262            0 :                     return Ok(());
     263              :                 }
     264            0 :                 Ok(response) => {
     265            0 :                     tracing::warn!(
     266            0 :                         "Import complete notification failed with status {}, attempt {}",
     267            0 :                         response.status(),
     268              :                         attempt
     269              :                     );
     270              :                 }
     271            0 :                 Err(e) => {
     272            0 :                     tracing::warn!(
     273            0 :                         "Import complete notification failed with error: {}, attempt {}",
     274              :                         e,
     275              :                         attempt
     276              :                     );
     277              :                 }
     278              :             }
     279              : 
     280            0 :             tokio::select! {
     281            0 :                 _ = tokio::time::sleep(RETRY_DELAY) => {}
     282            0 :                 _ = self.cancel.cancelled() => {
     283            0 :                     return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
     284              :                 }
     285              :             }
     286            0 :             attempt += 1;
     287              :         }
     288            0 :     }
     289              : }
        

Generated by: LCOV version 2.1-beta