LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - import_pgdata.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 164 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 14 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use anyhow::{Context, bail};
       4              : use importbucket_client::{ControlFile, RemoteStorageWrapper};
       5              : use pageserver_api::models::ShardImportStatus;
       6              : use remote_storage::RemotePath;
       7              : use tokio::task::JoinHandle;
       8              : use tokio_util::sync::CancellationToken;
       9              : use tracing::info;
      10              : use utils::lsn::Lsn;
      11              : use utils::pausable_failpoint;
      12              : use utils::sync::gate::Gate;
      13              : 
      14              : use super::{Timeline, TimelineDeleteProgress};
      15              : use crate::context::RequestContext;
      16              : use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
      17              : use crate::tenant::metadata::TimelineMetadata;
      18              : use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
      19              : 
      20              : mod flow;
      21              : mod importbucket_client;
      22              : mod importbucket_format;
      23              : pub(crate) mod index_part_format;
      24              : 
      25              : pub struct ImportingTimeline {
      26              :     pub import_task_handle: JoinHandle<()>,
      27              :     pub import_task_gate: Gate,
      28              :     pub timeline: Arc<Timeline>,
      29              :     pub delete_progress: TimelineDeleteProgress,
      30              : }
      31              : 
      32              : impl std::fmt::Debug for ImportingTimeline {
      33            0 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
      34            0 :         write!(f, "ImportingTimeline<{}>", self.timeline.timeline_id)
      35            0 :     }
      36              : }
      37              : 
      38              : impl ImportingTimeline {
      39            0 :     pub async fn shutdown(&self) {
      40            0 :         self.import_task_handle.abort();
      41            0 :         self.import_task_gate.close().await;
      42              : 
      43            0 :         self.timeline.remote_client.shutdown().await;
      44            0 :     }
      45              : }
      46              : 
      47            0 : pub async fn doit(
      48            0 :     timeline: &Arc<Timeline>,
      49            0 :     index_part: index_part_format::Root,
      50            0 :     ctx: &RequestContext,
      51            0 :     cancel: CancellationToken,
      52            0 : ) -> anyhow::Result<()> {
      53            0 :     let index_part_format::Root::V1(v1) = index_part;
      54              :     let index_part_format::InProgress {
      55            0 :         location,
      56              :         idempotency_key: _,
      57              :         started_at: _,
      58            0 :     } = match v1 {
      59            0 :         index_part_format::V1::Done(_) => return Ok(()),
      60            0 :         index_part_format::V1::InProgress(in_progress) => in_progress,
      61              :     };
      62              : 
      63            0 :     let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel);
      64              : 
      65            0 :     let shard_status = storcon_client
      66            0 :         .get_timeline_import_status(
      67            0 :             timeline.tenant_shard_id,
      68            0 :             timeline.timeline_id,
      69            0 :             timeline.generation,
      70            0 :         )
      71            0 :         .await
      72            0 :         .map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?;
      73              : 
      74            0 :     info!(?shard_status, "peeking shard status");
      75            0 :     match shard_status {
      76            0 :         ShardImportStatus::InProgress(maybe_progress) => {
      77            0 :             let storage =
      78            0 :                 importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
      79              : 
      80            0 :             let control_file_res = if maybe_progress.is_none() {
      81              :                 // Only prepare the import once when there's no progress.
      82            0 :                 prepare_import(timeline, storage.clone(), &cancel).await
      83              :             } else {
      84            0 :                 storage.get_control_file().await
      85              :             };
      86              : 
      87            0 :             let control_file = match control_file_res {
      88            0 :                 Ok(cf) => cf,
      89            0 :                 Err(err) => {
      90              :                     return Err(
      91            0 :                         terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
      92              :                     );
      93              :                 }
      94              :             };
      95              : 
      96            0 :             let res = flow::run(
      97            0 :                 timeline.clone(),
      98            0 :                 control_file,
      99            0 :                 storage.clone(),
     100            0 :                 maybe_progress,
     101            0 :                 ctx,
     102            0 :             )
     103            0 :             .await;
     104            0 :             if let Err(err) = res {
     105              :                 return Err(
     106            0 :                     terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
     107              :                 );
     108            0 :             }
     109              : 
     110            0 :             tracing::info!("Import plan executed. Flushing remote changes and notifying storcon");
     111              : 
     112            0 :             timeline
     113            0 :                 .remote_client
     114            0 :                 .schedule_index_upload_for_file_changes()?;
     115            0 :             timeline.remote_client.wait_completion().await?;
     116              : 
     117            0 :             pausable_failpoint!("import-timeline-pre-success-notify-pausable");
     118              : 
     119              :             // Communicate that shard is done.
     120              :             // Ensure at-least-once delivery of the upcall to storage controller
     121              :             // before we mark the task as done and never come here again.
     122              :             //
     123              :             // Note that we do not mark the import complete in the index part now.
     124              :             // This happens in [`Tenant::finalize_importing_timeline`] in response
     125              :             // to the storage controller calling
     126              :             // `/v1/tenant/:tenant_id/timeline/:timeline_id/activate_post_import`.
     127            0 :             storcon_client
     128            0 :                 .put_timeline_import_status(
     129            0 :                     timeline.tenant_shard_id,
     130            0 :                     timeline.timeline_id,
     131            0 :                     timeline.generation,
     132            0 :                     ShardImportStatus::Done,
     133            0 :                 )
     134            0 :                 .await
     135            0 :                 .map_err(|_err| {
     136            0 :                     anyhow::anyhow!("Shut down while putting timeline import status")
     137            0 :                 })?;
     138              :         }
     139            0 :         ShardImportStatus::Error(err) => {
     140            0 :             info!(
     141            0 :                 "shard status indicates that the shard is done (error), skipping import {}",
     142              :                 err
     143              :             );
     144              :         }
     145              :         ShardImportStatus::Done => {
     146            0 :             info!("shard status indicates that the shard is done (success), skipping import");
     147              :         }
     148              :     }
     149              : 
     150            0 :     Ok(())
     151            0 : }
     152              : 
     153            0 : async fn prepare_import(
     154            0 :     timeline: &Arc<Timeline>,
     155            0 :     storage: RemoteStorageWrapper,
     156            0 :     cancel: &CancellationToken,
     157            0 : ) -> anyhow::Result<ControlFile> {
     158              :     // Wipe the slate clean before starting the import as a precaution.
     159              :     // This method is only called when there's no recorded checkpoint for the import
     160              :     // in the storage controller.
     161              :     //
     162              :     // Note that this is split-brain safe (two imports for same timeline shards running in
     163              :     // different generations) because we go through the usual deletion path, including deletion queue.
     164            0 :     info!("wipe the slate clean");
     165              :     {
     166              :         // TODO: do we need to hold GC lock for this?
     167            0 :         let mut guard = timeline
     168            0 :             .layers
     169            0 :             .write(LayerManagerLockHolder::ImportPgData)
     170            0 :             .await;
     171            0 :         assert!(
     172            0 :             guard.layer_map()?.open_layer.is_none(),
     173            0 :             "while importing, there should be no in-memory layer" // this just seems like a good place to assert it
     174              :         );
     175            0 :         let all_layers_keys = guard.all_persistent_layers();
     176            0 :         let all_layers: Vec<_> = all_layers_keys
     177            0 :             .iter()
     178            0 :             .map(|key| guard.get_from_key(key))
     179            0 :             .collect();
     180            0 :         let open = guard.open_mut().context("open_mut")?;
     181              : 
     182            0 :         timeline.remote_client.schedule_gc_update(&all_layers)?;
     183            0 :         open.finish_gc_timeline(&all_layers);
     184              :     }
     185              : 
     186              :     //
     187              :     // Wait for pgdata to finish uploading
     188              :     //
     189            0 :     info!("wait for pgdata to reach status 'done'");
     190            0 :     let status_prefix = RemotePath::from_string("status").unwrap();
     191            0 :     let pgdata_status_key = status_prefix.join("pgdata");
     192              :     loop {
     193            0 :         let res = async {
     194            0 :             let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
     195            0 :                 .get_json(&pgdata_status_key)
     196            0 :                 .await
     197            0 :                 .context("get pgdata status")?;
     198            0 :             info!(?pgdata_status, "peeking pgdata status");
     199            0 :             if pgdata_status.map(|st| st.done).unwrap_or(false) {
     200            0 :                 Ok(())
     201              :             } else {
     202            0 :                 Err(anyhow::anyhow!("pgdata not done yet"))
     203              :             }
     204            0 :         }
     205            0 :         .await;
     206            0 :         match res {
     207            0 :             Ok(_) => break,
     208            0 :             Err(_err) => {
     209            0 :                 info!("indefinitely waiting for pgdata to finish");
     210            0 :                 if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
     211            0 :                     .await
     212            0 :                     .is_ok()
     213              :                 {
     214            0 :                     bail!("cancelled while waiting for pgdata");
     215            0 :                 }
     216              :             }
     217              :         }
     218              :     }
     219              : 
     220            0 :     let control_file = storage.get_control_file().await?;
     221            0 :     let base_lsn = control_file.base_lsn();
     222              : 
     223            0 :     info!("update TimelineMetadata based on LSNs from control file");
     224              :     {
     225            0 :         let pg_version = control_file.pg_version();
     226            0 :         async move {
     227              :             // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
     228              :             // checkpoint record, and prev_record_lsn should point to its beginning.
     229              :             // We should read the real end of the record from the WAL, but here we
     230              :             // just fake it.
     231            0 :             let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
     232            0 :             let prev_record_lsn = base_lsn;
     233            0 :             let metadata = TimelineMetadata::new(
     234            0 :                 disk_consistent_lsn,
     235            0 :                 Some(prev_record_lsn),
     236            0 :                 None,     // no ancestor
     237            0 :                 Lsn(0),   // no ancestor lsn
     238            0 :                 base_lsn, // latest_gc_cutoff_lsn
     239            0 :                 base_lsn, // initdb_lsn
     240            0 :                 pg_version,
     241              :             );
     242              : 
     243            0 :             let _start_lsn = disk_consistent_lsn + 1;
     244              : 
     245            0 :             timeline
     246            0 :                 .remote_client
     247            0 :                 .schedule_index_upload_for_full_metadata_update(&metadata)?;
     248              : 
     249            0 :             timeline.remote_client.wait_completion().await?;
     250              : 
     251            0 :             anyhow::Ok(())
     252            0 :         }
     253              :     }
     254            0 :     .await?;
     255              : 
     256            0 :     Ok(control_file)
     257            0 : }
     258              : 
     259            0 : async fn terminate_flow_with_error(
     260            0 :     timeline: &Arc<Timeline>,
     261            0 :     error: anyhow::Error,
     262            0 :     storcon_client: &StorageControllerUpcallClient,
     263            0 :     cancel: &CancellationToken,
     264            0 : ) -> anyhow::Error {
     265              :     // The import task is a aborted on tenant shutdown, so in principle, it should
     266              :     // never be cancelled. To be on the safe side, check the cancellation tokens
     267              :     // before marking the import as failed.
     268            0 :     if !(cancel.is_cancelled() || timeline.cancel.is_cancelled()) {
     269            0 :         let notify_res = storcon_client
     270            0 :             .put_timeline_import_status(
     271            0 :                 timeline.tenant_shard_id,
     272            0 :                 timeline.timeline_id,
     273            0 :                 timeline.generation,
     274            0 :                 ShardImportStatus::Error(format!("{error:#}")),
     275            0 :             )
     276            0 :             .await;
     277              : 
     278            0 :         if let Err(_notify_error) = notify_res {
     279              :             // The [`StorageControllerUpcallClient::put_timeline_import_status`] retries
     280              :             // forever internally, so errors returned by it can only be due to cancellation.
     281            0 :             info!("failed to notify storcon about permanent import error");
     282            0 :         }
     283              : 
     284              :         // Will be logged by [`Tenant::create_timeline_import_pgdata_task`]
     285            0 :         error
     286              :     } else {
     287            0 :         anyhow::anyhow!("Import task cancelled")
     288              :     }
     289            0 : }
        

Generated by: LCOV version 2.1-beta