LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - import_pgdata.rs (source / functions) Coverage Total Hit
Test: 472031e0b71f3195f7f21b1f2b20de09fd07bb56.info Lines: 0.0 % 186 0
Test Date: 2025-05-26 10:37:33 Functions: 0.0 % 13 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use anyhow::{Context, bail};
       4              : use importbucket_client::{ControlFile, RemoteStorageWrapper};
       5              : use pageserver_api::models::ShardImportStatus;
       6              : use remote_storage::RemotePath;
       7              : use tokio::task::JoinHandle;
       8              : use tokio_util::sync::CancellationToken;
       9              : use tracing::info;
      10              : use utils::lsn::Lsn;
      11              : 
      12              : use super::Timeline;
      13              : use crate::context::RequestContext;
      14              : use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
      15              : use crate::tenant::metadata::TimelineMetadata;
      16              : 
      17              : mod flow;
      18              : mod importbucket_client;
      19              : mod importbucket_format;
      20              : pub(crate) mod index_part_format;
      21              : 
      22              : pub(crate) struct ImportingTimeline {
      23              :     pub import_task_handle: JoinHandle<()>,
      24              :     pub timeline: Arc<Timeline>,
      25              : }
      26              : 
      27              : impl ImportingTimeline {
      28            0 :     pub(crate) fn shutdown(self) {
      29            0 :         self.import_task_handle.abort();
      30            0 :     }
      31              : }
      32              : 
      33            0 : pub async fn doit(
      34            0 :     timeline: &Arc<Timeline>,
      35            0 :     index_part: index_part_format::Root,
      36            0 :     ctx: &RequestContext,
      37            0 :     cancel: CancellationToken,
      38            0 : ) -> anyhow::Result<()> {
      39            0 :     let index_part_format::Root::V1(v1) = index_part;
      40              :     let index_part_format::InProgress {
      41            0 :         location,
      42              :         idempotency_key: _,
      43              :         started_at: _,
      44            0 :     } = match v1 {
      45            0 :         index_part_format::V1::Done(_) => return Ok(()),
      46            0 :         index_part_format::V1::InProgress(in_progress) => in_progress,
      47            0 :     };
      48            0 : 
      49            0 :     let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel);
      50              : 
      51            0 :     let shard_status = storcon_client
      52            0 :         .get_timeline_import_status(
      53            0 :             timeline.tenant_shard_id,
      54            0 :             timeline.timeline_id,
      55            0 :             timeline.generation,
      56            0 :         )
      57            0 :         .await
      58            0 :         .map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?;
      59              : 
      60            0 :     info!(?shard_status, "peeking shard status");
      61            0 :     match shard_status {
      62            0 :         ShardImportStatus::InProgress(maybe_progress) => {
      63            0 :             let storage =
      64            0 :                 importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
      65              : 
      66            0 :             let control_file_res = if maybe_progress.is_none() {
      67              :                 // Only prepare the import once when there's no progress.
      68            0 :                 prepare_import(timeline, storage.clone(), &cancel).await
      69              :             } else {
      70            0 :                 storage.get_control_file().await
      71              :             };
      72              : 
      73            0 :             let control_file = match control_file_res {
      74            0 :                 Ok(cf) => cf,
      75            0 :                 Err(err) => {
      76            0 :                     return Err(
      77            0 :                         terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
      78              :                     );
      79              :                 }
      80              :             };
      81              : 
      82            0 :             let res = flow::run(
      83            0 :                 timeline.clone(),
      84            0 :                 control_file,
      85            0 :                 storage.clone(),
      86            0 :                 maybe_progress,
      87            0 :                 ctx,
      88            0 :             )
      89            0 :             .await;
      90            0 :             if let Err(err) = res {
      91              :                 return Err(
      92            0 :                     terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
      93              :                 );
      94            0 :             }
      95            0 : 
      96            0 :             // Communicate that shard is done.
      97            0 :             // Ensure at-least-once delivery of the upcall to storage controller
      98            0 :             // before we mark the task as done and never come here again.
      99            0 :             //
     100            0 :             // Note that we do not mark the import complete in the index part now.
     101            0 :             // This happens in [`Tenant::finalize_importing_timeline`] in response
     102            0 :             // to the storage controller calling
     103            0 :             // `/v1/tenant/:tenant_id/timeline/:timeline_id/activate_post_import`.
     104            0 :             storcon_client
     105            0 :                 .put_timeline_import_status(
     106            0 :                     timeline.tenant_shard_id,
     107            0 :                     timeline.timeline_id,
     108            0 :                     timeline.generation,
     109            0 :                     ShardImportStatus::Done,
     110            0 :                 )
     111            0 :                 .await
     112            0 :                 .map_err(|_err| {
     113            0 :                     anyhow::anyhow!("Shut down while putting timeline import status")
     114            0 :                 })?;
     115              :         }
     116            0 :         ShardImportStatus::Error(err) => {
     117            0 :             info!(
     118            0 :                 "shard status indicates that the shard is done (error), skipping import {}",
     119              :                 err
     120              :             );
     121              :         }
     122              :         ShardImportStatus::Done => {
     123            0 :             info!("shard status indicates that the shard is done (success), skipping import");
     124              :         }
     125              :     }
     126              : 
     127            0 :     Ok(())
     128            0 : }
     129              : 
     130            0 : async fn prepare_import(
     131            0 :     timeline: &Arc<Timeline>,
     132            0 :     storage: RemoteStorageWrapper,
     133            0 :     cancel: &CancellationToken,
     134            0 : ) -> anyhow::Result<ControlFile> {
     135            0 :     // Wipe the slate clean before starting the import as a precaution.
     136            0 :     // This method is only called when there's no recorded checkpoint for the import
     137            0 :     // in the storage controller.
     138            0 :     //
     139            0 :     // Note that this is split-brain safe (two imports for same timeline shards running in
     140            0 :     // different generations) because we go through the usual deletion path, including deletion queue.
     141            0 :     info!("wipe the slate clean");
     142              :     {
     143              :         // TODO: do we need to hold GC lock for this?
     144            0 :         let mut guard = timeline.layers.write().await;
     145            0 :         assert!(
     146            0 :             guard.layer_map()?.open_layer.is_none(),
     147            0 :             "while importing, there should be no in-memory layer" // this just seems like a good place to assert it
     148              :         );
     149            0 :         let all_layers_keys = guard.all_persistent_layers();
     150            0 :         let all_layers: Vec<_> = all_layers_keys
     151            0 :             .iter()
     152            0 :             .map(|key| guard.get_from_key(key))
     153            0 :             .collect();
     154            0 :         let open = guard.open_mut().context("open_mut")?;
     155              : 
     156            0 :         timeline.remote_client.schedule_gc_update(&all_layers)?;
     157            0 :         open.finish_gc_timeline(&all_layers);
     158            0 :     }
     159            0 : 
     160            0 :     //
     161            0 :     // Wait for pgdata to finish uploading
     162            0 :     //
     163            0 :     info!("wait for pgdata to reach status 'done'");
     164            0 :     let status_prefix = RemotePath::from_string("status").unwrap();
     165            0 :     let pgdata_status_key = status_prefix.join("pgdata");
     166              :     loop {
     167            0 :         let res = async {
     168            0 :             let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
     169            0 :                 .get_json(&pgdata_status_key)
     170            0 :                 .await
     171            0 :                 .context("get pgdata status")?;
     172            0 :             info!(?pgdata_status, "peeking pgdata status");
     173            0 :             if pgdata_status.map(|st| st.done).unwrap_or(false) {
     174            0 :                 Ok(())
     175              :             } else {
     176            0 :                 Err(anyhow::anyhow!("pgdata not done yet"))
     177              :             }
     178            0 :         }
     179            0 :         .await;
     180            0 :         match res {
     181            0 :             Ok(_) => break,
     182            0 :             Err(err) => {
     183            0 :                 info!(?err, "indefinitely waiting for pgdata to finish");
     184            0 :                 if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
     185            0 :                     .await
     186            0 :                     .is_ok()
     187              :                 {
     188            0 :                     bail!("cancelled while waiting for pgdata");
     189            0 :                 }
     190              :             }
     191              :         }
     192              :     }
     193              : 
     194            0 :     let control_file = storage.get_control_file().await?;
     195            0 :     let base_lsn = control_file.base_lsn();
     196            0 : 
     197            0 :     info!("update TimelineMetadata based on LSNs from control file");
     198            0 :     {
     199            0 :         let pg_version = control_file.pg_version();
     200            0 :         async move {
     201            0 :             // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
     202            0 :             // checkpoint record, and prev_record_lsn should point to its beginning.
     203            0 :             // We should read the real end of the record from the WAL, but here we
     204            0 :             // just fake it.
     205            0 :             let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
     206            0 :             let prev_record_lsn = base_lsn;
     207            0 :             let metadata = TimelineMetadata::new(
     208            0 :                 disk_consistent_lsn,
     209            0 :                 Some(prev_record_lsn),
     210            0 :                 None,     // no ancestor
     211            0 :                 Lsn(0),   // no ancestor lsn
     212            0 :                 base_lsn, // latest_gc_cutoff_lsn
     213            0 :                 base_lsn, // initdb_lsn
     214            0 :                 pg_version,
     215            0 :             );
     216            0 : 
     217            0 :             let _start_lsn = disk_consistent_lsn + 1;
     218            0 : 
     219            0 :             timeline
     220            0 :                 .remote_client
     221            0 :                 .schedule_index_upload_for_full_metadata_update(&metadata)?;
     222              : 
     223            0 :             timeline.remote_client.wait_completion().await?;
     224              : 
     225            0 :             anyhow::Ok(())
     226            0 :         }
     227            0 :     }
     228            0 :     .await?;
     229              : 
     230            0 :     Ok(control_file)
     231            0 : }
     232              : 
     233            0 : async fn terminate_flow_with_error(
     234            0 :     timeline: &Arc<Timeline>,
     235            0 :     error: anyhow::Error,
     236            0 :     storcon_client: &StorageControllerUpcallClient,
     237            0 :     cancel: &CancellationToken,
     238            0 : ) -> anyhow::Error {
     239            0 :     // The import task is a aborted on tenant shutdown, so in principle, it should
     240            0 :     // never be cancelled. To be on the safe side, check the cancellation tokens
     241            0 :     // before marking the import as failed.
     242            0 :     if !(cancel.is_cancelled() || timeline.cancel.is_cancelled()) {
     243            0 :         let notify_res = storcon_client
     244            0 :             .put_timeline_import_status(
     245            0 :                 timeline.tenant_shard_id,
     246            0 :                 timeline.timeline_id,
     247            0 :                 timeline.generation,
     248            0 :                 ShardImportStatus::Error(format!("{error:#}")),
     249            0 :             )
     250            0 :             .await;
     251              : 
     252            0 :         if let Err(_notify_error) = notify_res {
     253              :             // The [`StorageControllerUpcallClient::put_timeline_import_status`] retries
     254              :             // forever internally, so errors returned by it can only be due to cancellation.
     255            0 :             info!("failed to notify storcon about permanent import error");
     256            0 :         }
     257              : 
     258              :         // Will be logged by [`Tenant::create_timeline_import_pgdata_task`]
     259            0 :         error
     260              :     } else {
     261            0 :         anyhow::anyhow!("Import task cancelled")
     262              :     }
     263            0 : }
        

Generated by: LCOV version 2.1-beta