LCOV - code coverage report
Current view: top level - storage_controller/src - timeline_import.rs (source / functions) Coverage Total Hit
Test: 2620485e474b48c32427149a5d91ef8fc2cd649e.info Lines: 0.0 % 145 0
Test Date: 2025-05-01 22:50:11 Functions: 0.0 % 26 0

            Line data    Source code
       1              : use std::time::Duration;
       2              : use std::{collections::HashMap, str::FromStr};
       3              : 
       4              : use http_utils::error::ApiError;
       5              : use reqwest::Method;
       6              : use serde::{Deserialize, Serialize};
       7              : 
       8              : use pageserver_api::models::ShardImportStatus;
       9              : use tokio_util::sync::CancellationToken;
      10              : use utils::{
      11              :     id::{TenantId, TimelineId},
      12              :     shard::ShardIndex,
      13              : };
      14              : 
      15              : use crate::{persistence::TimelineImportPersistence, service::Config};
      16              : 
      17            0 : #[derive(Deserialize, Serialize, PartialEq, Eq)]
      18              : pub(crate) enum TimelineImportState {
      19              :     Importing,
      20              :     Idle,
      21              : }
      22              : 
      23            0 : #[derive(Serialize, Deserialize, Clone, Debug)]
      24              : pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
      25              : 
      26              : impl ShardImportStatuses {
      27            0 :     pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
      28            0 :         ShardImportStatuses(
      29            0 :             shards
      30            0 :                 .into_iter()
      31            0 :                 .map(|ts_id| (ts_id, ShardImportStatus::InProgress))
      32            0 :                 .collect(),
      33            0 :         )
      34            0 :     }
      35              : }
      36              : 
      37              : #[derive(Debug)]
      38              : pub(crate) struct TimelineImport {
      39              :     pub(crate) tenant_id: TenantId,
      40              :     pub(crate) timeline_id: TimelineId,
      41              :     pub(crate) shard_statuses: ShardImportStatuses,
      42              : }
      43              : 
      44              : pub(crate) enum TimelineImportUpdateFollowUp {
      45              :     Persist,
      46              :     None,
      47              : }
      48              : 
      49              : pub(crate) enum TimelineImportUpdateError {
      50              :     ImportNotFound {
      51              :         tenant_id: TenantId,
      52              :         timeline_id: TimelineId,
      53              :     },
      54              :     MismatchedShards,
      55              :     UnexpectedUpdate,
      56              : }
      57              : 
      58              : impl From<TimelineImportUpdateError> for ApiError {
      59            0 :     fn from(err: TimelineImportUpdateError) -> ApiError {
      60            0 :         match err {
      61              :             TimelineImportUpdateError::ImportNotFound {
      62            0 :                 tenant_id,
      63            0 :                 timeline_id,
      64            0 :             } => ApiError::NotFound(
      65            0 :                 anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
      66            0 :             ),
      67              :             TimelineImportUpdateError::MismatchedShards => {
      68            0 :                 ApiError::InternalServerError(anyhow::anyhow!(
      69            0 :                     "Import shards do not match update request, likely a shard split happened during import, this is a bug"
      70            0 :                 ))
      71              :             }
      72              :             TimelineImportUpdateError::UnexpectedUpdate => {
      73            0 :                 ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
      74              :             }
      75              :         }
      76            0 :     }
      77              : }
      78              : 
      79              : impl TimelineImport {
      80            0 :     pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
      81            0 :         let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
      82            0 :         let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
      83            0 :         let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
      84              : 
      85            0 :         Ok(TimelineImport {
      86            0 :             tenant_id,
      87            0 :             timeline_id,
      88            0 :             shard_statuses,
      89            0 :         })
      90            0 :     }
      91              : 
      92            0 :     pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
      93            0 :         TimelineImportPersistence {
      94            0 :             tenant_id: self.tenant_id.to_string(),
      95            0 :             timeline_id: self.timeline_id.to_string(),
      96            0 :             shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
      97            0 :         }
      98            0 :     }
      99              : 
     100            0 :     pub(crate) fn update(
     101            0 :         &mut self,
     102            0 :         shard: ShardIndex,
     103            0 :         status: ShardImportStatus,
     104            0 :     ) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
     105              :         use std::collections::hash_map::Entry::*;
     106              : 
     107            0 :         match self.shard_statuses.0.entry(shard) {
     108            0 :             Occupied(mut occ) => {
     109            0 :                 let crnt = occ.get_mut();
     110            0 :                 if *crnt == status {
     111            0 :                     Ok(TimelineImportUpdateFollowUp::None)
     112            0 :                 } else if crnt.is_terminal() && *crnt != status {
     113            0 :                     Err(TimelineImportUpdateError::UnexpectedUpdate)
     114              :                 } else {
     115            0 :                     *crnt = status;
     116            0 :                     Ok(TimelineImportUpdateFollowUp::Persist)
     117              :                 }
     118              :             }
     119            0 :             Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
     120              :         }
     121            0 :     }
     122              : 
     123            0 :     pub(crate) fn is_complete(&self) -> bool {
     124            0 :         self.shard_statuses
     125            0 :             .0
     126            0 :             .values()
     127            0 :             .all(|status| status.is_terminal())
     128            0 :     }
     129              : 
     130            0 :     pub(crate) fn completion_error(&self) -> Option<String> {
     131            0 :         assert!(self.is_complete());
     132              : 
     133            0 :         let shard_errors: HashMap<_, _> = self
     134            0 :             .shard_statuses
     135            0 :             .0
     136            0 :             .iter()
     137            0 :             .filter_map(|(shard, status)| {
     138            0 :                 if let ShardImportStatus::Error(err) = status {
     139            0 :                     Some((*shard, err.clone()))
     140              :                 } else {
     141            0 :                     None
     142              :                 }
     143            0 :             })
     144            0 :             .collect();
     145            0 : 
     146            0 :         if shard_errors.is_empty() {
     147            0 :             None
     148              :         } else {
     149            0 :             Some(serde_json::to_string(&shard_errors).unwrap())
     150              :         }
     151            0 :     }
     152              : }
     153              : 
     154              : pub(crate) struct UpcallClient {
     155              :     authorization_header: Option<String>,
     156              :     client: reqwest::Client,
     157              :     cancel: CancellationToken,
     158              :     base_url: String,
     159              : }
     160              : 
     161              : const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
     162              : 
     163            0 : #[derive(Serialize, Deserialize, Debug)]
     164              : struct ImportCompleteRequest {
     165              :     tenant_id: TenantId,
     166              :     timeline_id: TimelineId,
     167              :     error: Option<String>,
     168              : }
     169              : 
     170              : impl UpcallClient {
     171            0 :     pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
     172            0 :         let authorization_header = config
     173            0 :             .control_plane_jwt_token
     174            0 :             .clone()
     175            0 :             .map(|jwt| format!("Bearer {}", jwt));
     176            0 : 
     177            0 :         let client = reqwest::ClientBuilder::new()
     178            0 :             .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
     179            0 :             .build()
     180            0 :             .expect("Failed to construct HTTP client");
     181            0 : 
     182            0 :         let base_url = config
     183            0 :             .control_plane_url
     184            0 :             .clone()
     185            0 :             .expect("must be configured");
     186            0 : 
     187            0 :         Self {
     188            0 :             authorization_header,
     189            0 :             client,
     190            0 :             cancel,
     191            0 :             base_url,
     192            0 :         }
     193            0 :     }
     194              : 
     195              :     /// Notify control plane of a completed import
     196              :     ///
     197              :     /// This method guarantees at least once delivery semantics assuming
     198              :     /// eventual cplane availability. The cplane API is idempotent.
     199            0 :     pub(crate) async fn notify_import_complete(
     200            0 :         &self,
     201            0 :         import: &TimelineImport,
     202            0 :     ) -> anyhow::Result<()> {
     203            0 :         let endpoint = if self.base_url.ends_with('/') {
     204            0 :             format!("{}import_complete", self.base_url)
     205              :         } else {
     206            0 :             format!("{}/import_complete", self.base_url)
     207              :         };
     208              : 
     209            0 :         tracing::info!("Endpoint is {endpoint}");
     210              : 
     211            0 :         let request = self
     212            0 :             .client
     213            0 :             .request(Method::PUT, endpoint)
     214            0 :             .json(&ImportCompleteRequest {
     215            0 :                 tenant_id: import.tenant_id,
     216            0 :                 timeline_id: import.timeline_id,
     217            0 :                 error: import.completion_error(),
     218            0 :             })
     219            0 :             .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
     220              : 
     221            0 :         let request = if let Some(auth) = &self.authorization_header {
     222            0 :             request.header(reqwest::header::AUTHORIZATION, auth)
     223              :         } else {
     224            0 :             request
     225              :         };
     226              : 
     227              :         const RETRY_DELAY: Duration = Duration::from_secs(1);
     228            0 :         let mut attempt = 1;
     229              : 
     230              :         loop {
     231            0 :             if self.cancel.is_cancelled() {
     232            0 :                 return Err(anyhow::anyhow!(
     233            0 :                     "Shutting down while notifying cplane of import completion"
     234            0 :                 ));
     235            0 :             }
     236            0 : 
     237            0 :             match request.try_clone().unwrap().send().await {
     238            0 :                 Ok(response) if response.status().is_success() => {
     239            0 :                     return Ok(());
     240              :                 }
     241            0 :                 Ok(response) => {
     242            0 :                     tracing::warn!(
     243            0 :                         "Import complete notification failed with status {}, attempt {}",
     244            0 :                         response.status(),
     245              :                         attempt
     246              :                     );
     247              :                 }
     248            0 :                 Err(e) => {
     249            0 :                     tracing::warn!(
     250            0 :                         "Import complete notification failed with error: {}, attempt {}",
     251              :                         e,
     252              :                         attempt
     253              :                     );
     254              :                 }
     255              :             }
     256              : 
     257            0 :             tokio::select! {
     258            0 :                 _ = tokio::time::sleep(RETRY_DELAY) => {}
     259            0 :                 _ = self.cancel.cancelled() => {
     260            0 :                     return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
     261              :                 }
     262              :             }
     263            0 :             attempt += 1;
     264              :         }
     265            0 :     }
     266              : }
        

Generated by: LCOV version 2.1-beta