LCOV - code coverage report
Current view: top level - storage_controller/src - timeline_import.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 0.0 % 144 0
Test Date: 2025-04-24 20:31:15 Functions: 0.0 % 22 0

            Line data    Source code
       1              : use std::time::Duration;
       2              : use std::{collections::HashMap, str::FromStr};
       3              : 
       4              : use http_utils::error::ApiError;
       5              : use reqwest::Method;
       6              : use serde::{Deserialize, Serialize};
       7              : 
       8              : use pageserver_api::models::ShardImportStatus;
       9              : use tokio_util::sync::CancellationToken;
      10              : use utils::{
      11              :     id::{TenantId, TimelineId},
      12              :     shard::ShardIndex,
      13              : };
      14              : 
      15              : use crate::{persistence::TimelineImportPersistence, service::Config};
      16              : 
      17            0 : #[derive(Serialize, Deserialize, Clone, Debug)]
      18              : pub(crate) struct ShardImportStatuses(pub(crate) HashMap<ShardIndex, ShardImportStatus>);
      19              : 
      20              : impl ShardImportStatuses {
      21            0 :     pub(crate) fn new(shards: Vec<ShardIndex>) -> Self {
      22            0 :         ShardImportStatuses(
      23            0 :             shards
      24            0 :                 .into_iter()
      25            0 :                 .map(|ts_id| (ts_id, ShardImportStatus::InProgress))
      26            0 :                 .collect(),
      27            0 :         )
      28            0 :     }
      29              : }
      30              : 
      31              : #[derive(Debug)]
      32              : pub(crate) struct TimelineImport {
      33              :     pub(crate) tenant_id: TenantId,
      34              :     pub(crate) timeline_id: TimelineId,
      35              :     pub(crate) shard_statuses: ShardImportStatuses,
      36              : }
      37              : 
      38              : pub(crate) enum TimelineImportUpdateFollowUp {
      39              :     Persist,
      40              :     None,
      41              : }
      42              : 
      43              : pub(crate) enum TimelineImportUpdateError {
      44              :     ImportNotFound {
      45              :         tenant_id: TenantId,
      46              :         timeline_id: TimelineId,
      47              :     },
      48              :     MismatchedShards,
      49              :     UnexpectedUpdate,
      50              : }
      51              : 
      52              : impl From<TimelineImportUpdateError> for ApiError {
      53            0 :     fn from(err: TimelineImportUpdateError) -> ApiError {
      54            0 :         match err {
      55              :             TimelineImportUpdateError::ImportNotFound {
      56            0 :                 tenant_id,
      57            0 :                 timeline_id,
      58            0 :             } => ApiError::NotFound(
      59            0 :                 anyhow::anyhow!("Import for {tenant_id}/{timeline_id} not found").into(),
      60            0 :             ),
      61              :             TimelineImportUpdateError::MismatchedShards => {
      62            0 :                 ApiError::InternalServerError(anyhow::anyhow!(
      63            0 :                     "Import shards do not match update request, likely a shard split happened during import, this is a bug"
      64            0 :                 ))
      65              :             }
      66              :             TimelineImportUpdateError::UnexpectedUpdate => {
      67            0 :                 ApiError::InternalServerError(anyhow::anyhow!("Update request is unexpected"))
      68              :             }
      69              :         }
      70            0 :     }
      71              : }
      72              : 
      73              : impl TimelineImport {
      74            0 :     pub(crate) fn from_persistent(persistent: TimelineImportPersistence) -> anyhow::Result<Self> {
      75            0 :         let tenant_id = TenantId::from_str(persistent.tenant_id.as_str())?;
      76            0 :         let timeline_id = TimelineId::from_str(persistent.timeline_id.as_str())?;
      77            0 :         let shard_statuses = serde_json::from_value(persistent.shard_statuses)?;
      78              : 
      79            0 :         Ok(TimelineImport {
      80            0 :             tenant_id,
      81            0 :             timeline_id,
      82            0 :             shard_statuses,
      83            0 :         })
      84            0 :     }
      85              : 
      86            0 :     pub(crate) fn to_persistent(&self) -> TimelineImportPersistence {
      87            0 :         TimelineImportPersistence {
      88            0 :             tenant_id: self.tenant_id.to_string(),
      89            0 :             timeline_id: self.timeline_id.to_string(),
      90            0 :             shard_statuses: serde_json::to_value(self.shard_statuses.clone()).unwrap(),
      91            0 :         }
      92            0 :     }
      93              : 
      94            0 :     pub(crate) fn update(
      95            0 :         &mut self,
      96            0 :         shard: ShardIndex,
      97            0 :         status: ShardImportStatus,
      98            0 :     ) -> Result<TimelineImportUpdateFollowUp, TimelineImportUpdateError> {
      99              :         use std::collections::hash_map::Entry::*;
     100              : 
     101            0 :         match self.shard_statuses.0.entry(shard) {
     102            0 :             Occupied(mut occ) => {
     103            0 :                 let crnt = occ.get_mut();
     104            0 :                 if *crnt == status {
     105            0 :                     Ok(TimelineImportUpdateFollowUp::None)
     106            0 :                 } else if crnt.is_terminal() && *crnt != status {
     107            0 :                     Err(TimelineImportUpdateError::UnexpectedUpdate)
     108              :                 } else {
     109            0 :                     *crnt = status;
     110            0 :                     Ok(TimelineImportUpdateFollowUp::Persist)
     111              :                 }
     112              :             }
     113            0 :             Vacant(_) => Err(TimelineImportUpdateError::MismatchedShards),
     114              :         }
     115            0 :     }
     116              : 
     117            0 :     pub(crate) fn is_complete(&self) -> bool {
     118            0 :         self.shard_statuses
     119            0 :             .0
     120            0 :             .values()
     121            0 :             .all(|status| status.is_terminal())
     122            0 :     }
     123              : 
     124            0 :     pub(crate) fn completion_error(&self) -> Option<String> {
     125            0 :         assert!(self.is_complete());
     126              : 
     127            0 :         let shard_errors: HashMap<_, _> = self
     128            0 :             .shard_statuses
     129            0 :             .0
     130            0 :             .iter()
     131            0 :             .filter_map(|(shard, status)| {
     132            0 :                 if let ShardImportStatus::Error(err) = status {
     133            0 :                     Some((*shard, err.clone()))
     134              :                 } else {
     135            0 :                     None
     136              :                 }
     137            0 :             })
     138            0 :             .collect();
     139            0 : 
     140            0 :         if shard_errors.is_empty() {
     141            0 :             None
     142              :         } else {
     143            0 :             Some(serde_json::to_string(&shard_errors).unwrap())
     144              :         }
     145            0 :     }
     146              : }
     147              : 
     148              : pub(crate) struct UpcallClient {
     149              :     authorization_header: Option<String>,
     150              :     client: reqwest::Client,
     151              :     cancel: CancellationToken,
     152              :     base_url: String,
     153              : }
     154              : 
     155              : const IMPORT_COMPLETE_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
     156              : 
     157            0 : #[derive(Serialize, Deserialize, Debug)]
     158              : struct ImportCompleteRequest {
     159              :     tenant_id: TenantId,
     160              :     timeline_id: TimelineId,
     161              :     error: Option<String>,
     162              : }
     163              : 
     164              : impl UpcallClient {
     165            0 :     pub(crate) fn new(config: &Config, cancel: CancellationToken) -> Self {
     166            0 :         let authorization_header = config
     167            0 :             .control_plane_jwt_token
     168            0 :             .clone()
     169            0 :             .map(|jwt| format!("Bearer {}", jwt));
     170            0 : 
     171            0 :         let client = reqwest::ClientBuilder::new()
     172            0 :             .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT)
     173            0 :             .build()
     174            0 :             .expect("Failed to construct HTTP client");
     175            0 : 
     176            0 :         let base_url = config
     177            0 :             .control_plane_url
     178            0 :             .clone()
     179            0 :             .expect("must be configured");
     180            0 : 
     181            0 :         Self {
     182            0 :             authorization_header,
     183            0 :             client,
     184            0 :             cancel,
     185            0 :             base_url,
     186            0 :         }
     187            0 :     }
     188              : 
     189              :     /// Notify control plane of a completed import
     190              :     ///
     191              :     /// This method guarantees at least once delivery semantics assuming
     192              :     /// eventual cplane availability. The cplane API is idempotent.
     193            0 :     pub(crate) async fn notify_import_complete(
     194            0 :         &self,
     195            0 :         import: &TimelineImport,
     196            0 :     ) -> anyhow::Result<()> {
     197            0 :         let endpoint = if self.base_url.ends_with('/') {
     198            0 :             format!("{}import_complete", self.base_url)
     199              :         } else {
     200            0 :             format!("{}/import_complete", self.base_url)
     201              :         };
     202              : 
     203            0 :         tracing::info!("Endpoint is {endpoint}");
     204              : 
     205            0 :         let request = self
     206            0 :             .client
     207            0 :             .request(Method::PUT, endpoint)
     208            0 :             .json(&ImportCompleteRequest {
     209            0 :                 tenant_id: import.tenant_id,
     210            0 :                 timeline_id: import.timeline_id,
     211            0 :                 error: import.completion_error(),
     212            0 :             })
     213            0 :             .timeout(IMPORT_COMPLETE_REQUEST_TIMEOUT);
     214              : 
     215            0 :         let request = if let Some(auth) = &self.authorization_header {
     216            0 :             request.header(reqwest::header::AUTHORIZATION, auth)
     217              :         } else {
     218            0 :             request
     219              :         };
     220              : 
     221              :         const RETRY_DELAY: Duration = Duration::from_secs(1);
     222            0 :         let mut attempt = 1;
     223              : 
     224              :         loop {
     225            0 :             if self.cancel.is_cancelled() {
     226            0 :                 return Err(anyhow::anyhow!(
     227            0 :                     "Shutting down while notifying cplane of import completion"
     228            0 :                 ));
     229            0 :             }
     230            0 : 
     231            0 :             match request.try_clone().unwrap().send().await {
     232            0 :                 Ok(response) if response.status().is_success() => {
     233            0 :                     return Ok(());
     234              :                 }
     235            0 :                 Ok(response) => {
     236            0 :                     tracing::warn!(
     237            0 :                         "Import complete notification failed with status {}, attempt {}",
     238            0 :                         response.status(),
     239              :                         attempt
     240              :                     );
     241              :                 }
     242            0 :                 Err(e) => {
     243            0 :                     tracing::warn!(
     244            0 :                         "Import complete notification failed with error: {}, attempt {}",
     245              :                         e,
     246              :                         attempt
     247              :                     );
     248              :                 }
     249              :             }
     250              : 
     251            0 :             tokio::select! {
     252            0 :                 _ = tokio::time::sleep(RETRY_DELAY) => {}
     253            0 :                 _ = self.cancel.cancelled() => {
     254            0 :                     return Err(anyhow::anyhow!("Shutting down while notifying cplane of import completion"));
     255              :                 }
     256              :             }
     257            0 :             attempt += 1;
     258              :         }
     259            0 :     }
     260              : }
        

Generated by: LCOV version 2.1-beta