LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - import_pgdata.rs (source / functions) Coverage Total Hit
Test: 2ff680a820af2e5030dd8e14ace9c8cb73b50f66.info Lines: 0.0 % 184 0
Test Date: 2025-05-27 12:46:00 Functions: 0.0 % 14 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use anyhow::{Context, bail};
       4              : use importbucket_client::{ControlFile, RemoteStorageWrapper};
       5              : use pageserver_api::models::ShardImportStatus;
       6              : use remote_storage::RemotePath;
       7              : use tokio::task::JoinHandle;
       8              : use tokio_util::sync::CancellationToken;
       9              : use tracing::info;
      10              : use utils::lsn::Lsn;
      11              : 
      12              : use super::Timeline;
      13              : use crate::context::RequestContext;
      14              : use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
      15              : use crate::tenant::metadata::TimelineMetadata;
      16              : 
      17              : mod flow;
      18              : mod importbucket_client;
      19              : mod importbucket_format;
      20              : pub(crate) mod index_part_format;
      21              : 
      22              : pub(crate) struct ImportingTimeline {
      23              :     pub import_task_handle: JoinHandle<()>,
      24              :     pub timeline: Arc<Timeline>,
      25              : }
      26              : 
      27              : impl ImportingTimeline {
      28            0 :     pub(crate) async fn shutdown(self) {
      29            0 :         self.import_task_handle.abort();
      30            0 :         let _ = self.import_task_handle.await;
      31              : 
      32            0 :         self.timeline.remote_client.shutdown().await;
      33            0 :     }
      34              : }
      35              : 
      36            0 : pub async fn doit(
      37            0 :     timeline: &Arc<Timeline>,
      38            0 :     index_part: index_part_format::Root,
      39            0 :     ctx: &RequestContext,
      40            0 :     cancel: CancellationToken,
      41            0 : ) -> anyhow::Result<()> {
      42            0 :     let index_part_format::Root::V1(v1) = index_part;
      43              :     let index_part_format::InProgress {
      44            0 :         location,
      45              :         idempotency_key: _,
      46              :         started_at: _,
      47            0 :     } = match v1 {
      48            0 :         index_part_format::V1::Done(_) => return Ok(()),
      49            0 :         index_part_format::V1::InProgress(in_progress) => in_progress,
      50            0 :     };
      51            0 : 
      52            0 :     let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel);
      53              : 
      54            0 :     let shard_status = storcon_client
      55            0 :         .get_timeline_import_status(
      56            0 :             timeline.tenant_shard_id,
      57            0 :             timeline.timeline_id,
      58            0 :             timeline.generation,
      59            0 :         )
      60            0 :         .await
      61            0 :         .map_err(|_err| anyhow::anyhow!("Shut down while getting timeline import status"))?;
      62              : 
      63            0 :     info!(?shard_status, "peeking shard status");
      64            0 :     match shard_status {
      65            0 :         ShardImportStatus::InProgress(maybe_progress) => {
      66            0 :             let storage =
      67            0 :                 importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
      68              : 
      69            0 :             let control_file_res = if maybe_progress.is_none() {
      70              :                 // Only prepare the import once when there's no progress.
      71            0 :                 prepare_import(timeline, storage.clone(), &cancel).await
      72              :             } else {
      73            0 :                 storage.get_control_file().await
      74              :             };
      75              : 
      76            0 :             let control_file = match control_file_res {
      77            0 :                 Ok(cf) => cf,
      78            0 :                 Err(err) => {
      79            0 :                     return Err(
      80            0 :                         terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
      81              :                     );
      82              :                 }
      83              :             };
      84              : 
      85            0 :             let res = flow::run(
      86            0 :                 timeline.clone(),
      87            0 :                 control_file,
      88            0 :                 storage.clone(),
      89            0 :                 maybe_progress,
      90            0 :                 ctx,
      91            0 :             )
      92            0 :             .await;
      93            0 :             if let Err(err) = res {
      94              :                 return Err(
      95            0 :                     terminate_flow_with_error(timeline, err, &storcon_client, &cancel).await,
      96              :                 );
      97            0 :             }
      98            0 : 
      99            0 :             timeline
     100            0 :                 .remote_client
     101            0 :                 .schedule_index_upload_for_file_changes()?;
     102            0 :             timeline.remote_client.wait_completion().await?;
     103              : 
     104              :             // Communicate that shard is done.
     105              :             // Ensure at-least-once delivery of the upcall to storage controller
     106              :             // before we mark the task as done and never come here again.
     107              :             //
     108              :             // Note that we do not mark the import complete in the index part now.
     109              :             // This happens in [`Tenant::finalize_importing_timeline`] in response
     110              :             // to the storage controller calling
     111              :             // `/v1/tenant/:tenant_id/timeline/:timeline_id/activate_post_import`.
     112            0 :             storcon_client
     113            0 :                 .put_timeline_import_status(
     114            0 :                     timeline.tenant_shard_id,
     115            0 :                     timeline.timeline_id,
     116            0 :                     timeline.generation,
     117            0 :                     ShardImportStatus::Done,
     118            0 :                 )
     119            0 :                 .await
     120            0 :                 .map_err(|_err| {
     121            0 :                     anyhow::anyhow!("Shut down while putting timeline import status")
     122            0 :                 })?;
     123              :         }
     124            0 :         ShardImportStatus::Error(err) => {
     125            0 :             info!(
     126            0 :                 "shard status indicates that the shard is done (error), skipping import {}",
     127              :                 err
     128              :             );
     129              :         }
     130              :         ShardImportStatus::Done => {
     131            0 :             info!("shard status indicates that the shard is done (success), skipping import");
     132              :         }
     133              :     }
     134              : 
     135            0 :     Ok(())
     136            0 : }
     137              : 
     138            0 : async fn prepare_import(
     139            0 :     timeline: &Arc<Timeline>,
     140            0 :     storage: RemoteStorageWrapper,
     141            0 :     cancel: &CancellationToken,
     142            0 : ) -> anyhow::Result<ControlFile> {
     143            0 :     // Wipe the slate clean before starting the import as a precaution.
     144            0 :     // This method is only called when there's no recorded checkpoint for the import
     145            0 :     // in the storage controller.
     146            0 :     //
     147            0 :     // Note that this is split-brain safe (two imports for same timeline shards running in
     148            0 :     // different generations) because we go through the usual deletion path, including deletion queue.
     149            0 :     info!("wipe the slate clean");
     150              :     {
     151              :         // TODO: do we need to hold GC lock for this?
     152            0 :         let mut guard = timeline.layers.write().await;
     153            0 :         assert!(
     154            0 :             guard.layer_map()?.open_layer.is_none(),
     155            0 :             "while importing, there should be no in-memory layer" // this just seems like a good place to assert it
     156              :         );
     157            0 :         let all_layers_keys = guard.all_persistent_layers();
     158            0 :         let all_layers: Vec<_> = all_layers_keys
     159            0 :             .iter()
     160            0 :             .map(|key| guard.get_from_key(key))
     161            0 :             .collect();
     162            0 :         let open = guard.open_mut().context("open_mut")?;
     163              : 
     164            0 :         timeline.remote_client.schedule_gc_update(&all_layers)?;
     165            0 :         open.finish_gc_timeline(&all_layers);
     166            0 :     }
     167            0 : 
     168            0 :     //
     169            0 :     // Wait for pgdata to finish uploading
     170            0 :     //
     171            0 :     info!("wait for pgdata to reach status 'done'");
     172            0 :     let status_prefix = RemotePath::from_string("status").unwrap();
     173            0 :     let pgdata_status_key = status_prefix.join("pgdata");
     174              :     loop {
     175            0 :         let res = async {
     176            0 :             let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
     177            0 :                 .get_json(&pgdata_status_key)
     178            0 :                 .await
     179            0 :                 .context("get pgdata status")?;
     180            0 :             info!(?pgdata_status, "peeking pgdata status");
     181            0 :             if pgdata_status.map(|st| st.done).unwrap_or(false) {
     182            0 :                 Ok(())
     183              :             } else {
     184            0 :                 Err(anyhow::anyhow!("pgdata not done yet"))
     185              :             }
     186            0 :         }
     187            0 :         .await;
     188            0 :         match res {
     189            0 :             Ok(_) => break,
     190            0 :             Err(err) => {
     191            0 :                 info!(?err, "indefinitely waiting for pgdata to finish");
     192            0 :                 if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
     193            0 :                     .await
     194            0 :                     .is_ok()
     195              :                 {
     196            0 :                     bail!("cancelled while waiting for pgdata");
     197            0 :                 }
     198              :             }
     199              :         }
     200              :     }
     201              : 
     202            0 :     let control_file = storage.get_control_file().await?;
     203            0 :     let base_lsn = control_file.base_lsn();
     204            0 : 
     205            0 :     info!("update TimelineMetadata based on LSNs from control file");
     206            0 :     {
     207            0 :         let pg_version = control_file.pg_version();
     208            0 :         async move {
     209            0 :             // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
     210            0 :             // checkpoint record, and prev_record_lsn should point to its beginning.
     211            0 :             // We should read the real end of the record from the WAL, but here we
     212            0 :             // just fake it.
     213            0 :             let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
     214            0 :             let prev_record_lsn = base_lsn;
     215            0 :             let metadata = TimelineMetadata::new(
     216            0 :                 disk_consistent_lsn,
     217            0 :                 Some(prev_record_lsn),
     218            0 :                 None,     // no ancestor
     219            0 :                 Lsn(0),   // no ancestor lsn
     220            0 :                 base_lsn, // latest_gc_cutoff_lsn
     221            0 :                 base_lsn, // initdb_lsn
     222            0 :                 pg_version,
     223            0 :             );
     224            0 : 
     225            0 :             let _start_lsn = disk_consistent_lsn + 1;
     226            0 : 
     227            0 :             timeline
     228            0 :                 .remote_client
     229            0 :                 .schedule_index_upload_for_full_metadata_update(&metadata)?;
     230              : 
     231            0 :             timeline.remote_client.wait_completion().await?;
     232              : 
     233            0 :             anyhow::Ok(())
     234            0 :         }
     235            0 :     }
     236            0 :     .await?;
     237              : 
     238            0 :     Ok(control_file)
     239            0 : }
     240              : 
     241            0 : async fn terminate_flow_with_error(
     242            0 :     timeline: &Arc<Timeline>,
     243            0 :     error: anyhow::Error,
     244            0 :     storcon_client: &StorageControllerUpcallClient,
     245            0 :     cancel: &CancellationToken,
     246            0 : ) -> anyhow::Error {
     247            0 :     // The import task is a aborted on tenant shutdown, so in principle, it should
     248            0 :     // never be cancelled. To be on the safe side, check the cancellation tokens
     249            0 :     // before marking the import as failed.
     250            0 :     if !(cancel.is_cancelled() || timeline.cancel.is_cancelled()) {
     251            0 :         let notify_res = storcon_client
     252            0 :             .put_timeline_import_status(
     253            0 :                 timeline.tenant_shard_id,
     254            0 :                 timeline.timeline_id,
     255            0 :                 timeline.generation,
     256            0 :                 ShardImportStatus::Error(format!("{error:#}")),
     257            0 :             )
     258            0 :             .await;
     259              : 
     260            0 :         if let Err(_notify_error) = notify_res {
     261              :             // The [`StorageControllerUpcallClient::put_timeline_import_status`] retries
     262              :             // forever internally, so errors returned by it can only be due to cancellation.
     263            0 :             info!("failed to notify storcon about permanent import error");
     264            0 :         }
     265              : 
     266              :         // Will be logged by [`Tenant::create_timeline_import_pgdata_task`]
     267            0 :         error
     268              :     } else {
     269            0 :         anyhow::anyhow!("Import task cancelled")
     270              :     }
     271            0 : }
        

Generated by: LCOV version 2.1-beta