LCOV - code coverage report
Current view: top level - storage_controller/src - timeline_import.rs (source / functions) Coverage Total Hit
Test: 472031e0b71f3195f7f21b1f2b20de09fd07bb56.info Lines: 0.0 % 151 0
Test Date: 2025-05-26 10:37:33 Functions: 0.0 % 26 0

            Line data    Source code
       1              : use std::time::Duration;
       2              : use std::{collections::HashMap, str::FromStr};
       3              : 
       4              : use http_utils::error::ApiError;
       5              : use reqwest::Method;
       6              : use serde::{Deserialize, Serialize};
       7              : 
       8              : use pageserver_api::models::{ShardImportProgress, ShardImportStatus};
       9              : use tokio_util::sync::CancellationToken;
      10              : use utils::{
      11              :     id::{TenantId, TimelineId},
      12              :     shard::ShardIndex,
      13              : };
      14              : 
      15              : use crate::{persistence::TimelineImportPersistence, service::Config};
      16              : 
      17            0 : #[derive(Deserialize, Serialize, PartialEq, Eq)]
      18              : pub(crate) enum TimelineImportState {
      19              :     Importing,
      20              :     Idle,
      21              : }
      22              : 
      23            0 : #[derive(Serialize, Deserialize, Clone, Debug)]
      24              : pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
      25              : 
      26              : impl ShardImportStatuses {
      27            0 :     pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
      28            0 :         ShardImportStatuses(
      29            0 :             shards
      30            0 :                 .into_iter()
      31            0 :                 .map(|ts_id| {
      32            0 :                     (
      33            0 :                         ts_id,
      34            0 :                         ShardImportStatus::InProgress(None::<ShardImportProgress>),
      35            0 :                     )
      36            0 :                 })
      37            0 :                 .collect(),
      38            0 :         )
      39            0 :     }
      40              : }
      41              : 
      42              : #[derive(Debug)]
      43              : pub(crate) struct TimelineImport {
      44              :     pub(crate) tenant_id: TenantId,
      45              :     pub(crate) timeline_id: TimelineId,
      46              :     pub(crate) shard_statuses: ShardImportStatuses,
      47              : }
      48              : 
      49              : pub(crate) enum TimelineImportUpdateFollowUp {
      50              :     Persist,
      51              :     None,
      52              : }
      53              : 
      54              : #[derive(thiserror::Error, Debug)]
      55              : pub(crate) enum TimelineImportFinalizeError {
      56              :     #[error("Shut down interrupted import finalize")]
      57              :     ShuttingDown,
      58              :     #[error("Mismatched shard detected during import finalize: {0}")]
      59              :     MismatchedShards(ShardIndex),
      60              : }
      61              : 
      62              : pub(crate) enum TimelineImportUpdateError {
      63              :     ImportNotFound {
      64              :         tenant_id: TenantId,
      65              :         timeline_id: TimelineId,
      66              :     },
      67              :     MismatchedShards,
      68              :     UnexpectedUpdate,
      69              : }
      70              : 
      71              : impl From<TimelineImportUpdateError> for ApiError {
      72            0 :     fn from(err: TimelineImportUpdateError) -> ApiError {
      73            0 :         match err {
      74              :             TimelineImportUpdateError::ImportNotFound {
      75            0 :                 tenant_id,
      76            0 :                 timeline_id,
      77            0 :             } => ApiError::NotFound(
      78            0 :                 anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
      79            0 :             ),
      80              :             TimelineImportUpdateError::MismatchedShards => {
      81            0 :                 ApiError::InternalServerError(anyhow::anyhow!(
      82            0 :                     "Import shards do not match update request, likely a shard split happened during import, this is a bug"
      83            0 :                 ))
      84              :             }
      85              :             TimelineImportUpdateError::UnexpectedUpdate => {
      86            0 :                 ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
      87              :             }
      88              :         }
      89            0 :     }
      90              : }
      91              : 
      92              : impl TimelineImport {
      93            0 :     pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
      94            0 :         let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
      95            0 :         let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
      96            0 :         let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
      97              : 
      98            0 :         Ok(TimelineImport {
      99            0 :             tenant_id,
     100            0 :             timeline_id,
     101            0 :             shard_statuses,
     102            0 :         })
     103            0 :     }
     104              : 
     105            0 :     pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
     106            0 :         TimelineImportPersistence {
     107            0 :             tenant_id: self.tenant_id.to_string(),
     108            0 :             timeline_id: self.timeline_id.to_string(),
     109            0 :             shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
     110            0 :         }
     111            0 :     }
     112              : 
     113            0 :     pub(crate) fn update(
     114            0 :         &mut self,
     115            0 :         shard: ShardIndex,
     116            0 :         status: ShardImportStatus,
     117            0 :     ) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
     118              :         use std::collections::hash_map::Entry::*;
     119              : 
     120            0 :         match self.shard_statuses.0.entry(shard) {
     121            0 :             Occupied(mut occ) => {
     122            0 :                 let crnt = occ.get_mut();
     123            0 :                 if *crnt == status {
     124            0 :                     Ok(TimelineImportUpdateFollowUp::None)
     125            0 :                 } else if crnt.is_terminal() && *crnt != status {
     126            0 :                     Err(TimelineImportUpdateError::UnexpectedUpdate)
     127              :                 } else {
     128            0 :                     *crnt = status;
     129            0 :                     Ok(TimelineImportUpdateFollowUp::Persist)
     130              :                 }
     131              :             }
     132            0 :             Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
     133              :         }
     134            0 :     }
     135              : 
     136            0 :     pub(crate) fn is_complete(&self) -> bool {
     137            0 :         self.shard_statuses
     138            0 :             .0
     139            0 :             .values()
     140            0 :             .all(|status| status.is_terminal())
     141            0 :     }
     142              : 
     143            0 :     pub(crate) fn completion_error(&self) -> Option<String> {
     144            0 :         assert!(self.is_complete());
     145              : 
     146            0 :         let shard_errors: HashMap<_, _> = self
     147            0 :             .shard_statuses
     148            0 :             .0
     149            0 :             .iter()
     150            0 :             .filter_map(|(shard, status)| {
     151            0 :                 if let ShardImportStatus::Error(err) = status {
     152            0 :                     Some((*shard, err.clone()))
     153              :                 } else {
     154            0 :                     None
     155              :                 }
     156            0 :             })
     157            0 :             .collect();
     158            0 : 
     159            0 :         if shard_errors.is_empty() {
     160            0 :             None
     161              :         } else {
     162            0 :             Some(serde_json::to_string(&shard_errors).unwrap())
     163              :         }
     164            0 :     }
     165              : }
     166              : 
     167              : pub(crate) type ImportResult = Result<(), String>;
     168              : 
     169              : pub(crate) struct UpcallClient {
     170              :     authorization_header: Option<String>,
     171              :     client: reqwest::Client,
     172              :     cancel: CancellationToken,
     173              :     base_url: String,
     174              : }
     175              : 
     176              : const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
     177              : 
     178            0 : #[derive(Serialize, Deserialize, Debug)]
     179              : struct ImportCompleteRequest {
     180              :     tenant_id: TenantId,
     181              :     timeline_id: TimelineId,
     182              :     error: Option<String>,
     183              : }
     184              : 
     185              : impl UpcallClient {
     186            0 :     pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
     187            0 :         let authorization_header = config
     188            0 :             .control_plane_jwt_token
     189            0 :             .clone()
     190            0 :             .map(|jwt| format!("Bearer {}", jwt));
     191            0 : 
     192            0 :         let client = reqwest::ClientBuilder::new()
     193            0 :             .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
     194            0 :             .build()
     195            0 :             .expect("Failed to construct HTTP client");
     196            0 : 
     197            0 :         let base_url = config
     198            0 :             .control_plane_url
     199            0 :             .clone()
     200            0 :             .expect("must be configured");
     201            0 : 
     202            0 :         Self {
     203            0 :             authorization_header,
     204            0 :             client,
     205            0 :             cancel,
     206            0 :             base_url,
     207            0 :         }
     208            0 :     }
     209              : 
     210              :     /// Notify control plane of a completed import
     211              :     ///
     212              :     /// This method guarantees at least once delivery semantics assuming
     213              :     /// eventual cplane availability. The cplane API is idempotent.
     214            0 :     pub(crate) async fn notify_import_complete(
     215            0 :         &self,
     216            0 :         tenant_id: TenantId,
     217            0 :         timeline_id: TimelineId,
     218            0 :         import_result: ImportResult,
     219            0 :     ) -> anyhow::Result<()> {
     220            0 :         let endpoint = if self.base_url.ends_with('/') {
     221            0 :             format!("{}import_complete", self.base_url)
     222              :         } else {
     223            0 :             format!("{}/import_complete", self.base_url)
     224              :         };
     225              : 
     226            0 :         let request = self
     227            0 :             .client
     228            0 :             .request(Method::PUT, endpoint)
     229            0 :             .json(&ImportCompleteRequest {
     230            0 :                 tenant_id,
     231            0 :                 timeline_id,
     232            0 :                 error: import_result.err(),
     233            0 :             })
     234            0 :             .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
     235              : 
     236            0 :         let request = if let Some(auth) = &self.authorization_header {
     237            0 :             request.header(reqwest::header::AUTHORIZATION, auth)
     238              :         } else {
     239            0 :             request
     240              :         };
     241              : 
     242              :         const RETRY_DELAY: Duration = Duration::from_secs(1);
     243            0 :         let mut attempt = 1;
     244              : 
     245              :         loop {
     246            0 :             if self.cancel.is_cancelled() {
     247            0 :                 return Err(anyhow::anyhow!(
     248            0 :                     "Shutting down while notifying cplane of import completion"
     249            0 :                 ));
     250            0 :             }
     251            0 : 
     252            0 :             match request.try_clone().unwrap().send().await {
     253            0 :                 Ok(response) if response.status().is_success() => {
     254            0 :                     return Ok(());
     255              :                 }
     256            0 :                 Ok(response) => {
     257            0 :                     tracing::warn!(
     258            0 :                         "Import complete notification failed with status {}, attempt {}",
     259            0 :                         response.status(),
     260              :                         attempt
     261              :                     );
     262              :                 }
     263            0 :                 Err(e) => {
     264            0 :                     tracing::warn!(
     265            0 :                         "Import complete notification failed with error: {}, attempt {}",
     266              :                         e,
     267              :                         attempt
     268              :                     );
     269              :                 }
     270              :             }
     271              : 
     272            0 :             tokio::select! {
     273            0 :                 _ = tokio::time::sleep(RETRY_DELAY) => {}
     274            0 :                 _ = self.cancel.cancelled() => {
     275            0 :                     return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
     276              :                 }
     277              :             }
     278            0 :             attempt += 1;
     279              :         }
     280            0 :     }
     281              : }
        

Generated by: LCOV version 2.1-beta