LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - import_pgdata.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 0.0 % 141 0
Test Date: 2025-04-24 20:31:15 Functions: 0.0 % 8 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use anyhow::{Context, bail};
       4              : use pageserver_api::models::ShardImportStatus;
       5              : use remote_storage::RemotePath;
       6              : use tokio_util::sync::CancellationToken;
       7              : use tracing::info;
       8              : use utils::lsn::Lsn;
       9              : 
      10              : use super::Timeline;
      11              : use crate::context::RequestContext;
      12              : use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
      13              : use crate::tenant::metadata::TimelineMetadata;
      14              : 
      15              : mod flow;
      16              : mod importbucket_client;
      17              : mod importbucket_format;
      18              : pub(crate) mod index_part_format;
      19              : 
      20            0 : pub async fn doit(
      21            0 :     timeline: &Arc<Timeline>,
      22            0 :     index_part: index_part_format::Root,
      23            0 :     ctx: &RequestContext,
      24            0 :     cancel: CancellationToken,
      25            0 : ) -> anyhow::Result<()> {
      26            0 :     let index_part_format::Root::V1(v1) = index_part;
      27              :     let index_part_format::InProgress {
      28            0 :         location,
      29            0 :         idempotency_key,
      30            0 :         started_at,
      31            0 :     } = match v1 {
      32            0 :         index_part_format::V1::Done(_) => return Ok(()),
      33            0 :         index_part_format::V1::InProgress(in_progress) => in_progress,
      34              :     };
      35              : 
      36            0 :     let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
      37              : 
      38            0 :     let status_prefix = RemotePath::from_string("status").unwrap();
      39            0 : 
      40            0 :     //
      41            0 :     // See if shard is done.
      42            0 :     // TODO: incorporate generations into status key for split brain safety. Figure out together with checkpointing.
      43            0 :     //
      44            0 :     let shard_status_key =
      45            0 :         status_prefix.join(format!("shard-{}", timeline.tenant_shard_id.shard_slug()));
      46            0 :     let shard_status: Option<importbucket_format::ShardStatus> =
      47            0 :         storage.get_json(&shard_status_key).await?;
      48            0 :     info!(?shard_status, "peeking shard status");
      49            0 :     if shard_status.map(|st| st.done).unwrap_or(false) {
      50            0 :         info!("shard status indicates that the shard is done, skipping import");
      51              :     } else {
      52              :         // TODO: checkpoint the progress into the IndexPart instead of restarting
      53              :         // from the beginning.
      54              : 
      55              :         //
      56              :         // Wipe the slate clean - the flow does not allow resuming.
      57              :         // We can implement resuming in the future by checkpointing the progress into the IndexPart.
      58              :         //
      59            0 :         info!("wipe the slate clean");
      60              :         {
      61              :             // TODO: do we need to hold GC lock for this?
      62            0 :             let mut guard = timeline.layers.write().await;
      63            0 :             assert!(
      64            0 :                 guard.layer_map()?.open_layer.is_none(),
      65            0 :                 "while importing, there should be no in-memory layer" // this just seems like a good place to assert it
      66              :             );
      67            0 :             let all_layers_keys = guard.all_persistent_layers();
      68            0 :             let all_layers: Vec<_> = all_layers_keys
      69            0 :                 .iter()
      70            0 :                 .map(|key| guard.get_from_key(key))
      71            0 :                 .collect();
      72            0 :             let open = guard.open_mut().context("open_mut")?;
      73              : 
      74            0 :             timeline.remote_client.schedule_gc_update(&all_layers)?;
      75            0 :             open.finish_gc_timeline(&all_layers);
      76            0 :         }
      77            0 : 
      78            0 :         //
      79            0 :         // Wait for pgdata to finish uploading
      80            0 :         //
      81            0 :         info!("wait for pgdata to reach status 'done'");
      82            0 :         let pgdata_status_key = status_prefix.join("pgdata");
      83              :         loop {
      84            0 :             let res = async {
      85            0 :                 let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
      86            0 :                     .get_json(&pgdata_status_key)
      87            0 :                     .await
      88            0 :                     .context("get pgdata status")?;
      89            0 :                 info!(?pgdata_status, "peeking pgdata status");
      90            0 :                 if pgdata_status.map(|st| st.done).unwrap_or(false) {
      91            0 :                     Ok(())
      92              :                 } else {
      93            0 :                     Err(anyhow::anyhow!("pgdata not done yet"))
      94              :                 }
      95            0 :             }
      96            0 :             .await;
      97            0 :             match res {
      98            0 :                 Ok(_) => break,
      99            0 :                 Err(err) => {
     100            0 :                     info!(?err, "indefinitely waiting for pgdata to finish");
     101            0 :                     if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
     102            0 :                         .await
     103            0 :                         .is_ok()
     104              :                     {
     105            0 :                         bail!("cancelled while waiting for pgdata");
     106            0 :                     }
     107              :                 }
     108              :             }
     109              :         }
     110              : 
     111              :         //
     112              :         // Do the import
     113              :         //
     114            0 :         info!("do the import");
     115            0 :         let control_file = storage.get_control_file().await?;
     116            0 :         let base_lsn = control_file.base_lsn();
     117            0 : 
     118            0 :         info!("update TimelineMetadata based on LSNs from control file");
     119            0 :         {
     120            0 :             let pg_version = control_file.pg_version();
     121            0 :             let _ctx: &RequestContext = ctx;
     122            0 :             async move {
     123            0 :                 // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
     124            0 :                 // checkpoint record, and prev_record_lsn should point to its beginning.
     125            0 :                 // We should read the real end of the record from the WAL, but here we
     126            0 :                 // just fake it.
     127            0 :                 let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
     128            0 :                 let prev_record_lsn = base_lsn;
     129            0 :                 let metadata = TimelineMetadata::new(
     130            0 :                     disk_consistent_lsn,
     131            0 :                     Some(prev_record_lsn),
     132            0 :                     None,     // no ancestor
     133            0 :                     Lsn(0),   // no ancestor lsn
     134            0 :                     base_lsn, // latest_gc_cutoff_lsn
     135            0 :                     base_lsn, // initdb_lsn
     136            0 :                     pg_version,
     137            0 :                 );
     138            0 : 
     139            0 :                 let _start_lsn = disk_consistent_lsn + 1;
     140            0 : 
     141            0 :                 timeline
     142            0 :                     .remote_client
     143            0 :                     .schedule_index_upload_for_full_metadata_update(&metadata)?;
     144              : 
     145            0 :                 timeline.remote_client.wait_completion().await?;
     146              : 
     147            0 :                 anyhow::Ok(())
     148            0 :             }
     149            0 :         }
     150            0 :         .await?;
     151              : 
     152            0 :         flow::run(
     153            0 :             timeline.clone(),
     154            0 :             base_lsn,
     155            0 :             control_file,
     156            0 :             storage.clone(),
     157            0 :             ctx,
     158            0 :         )
     159            0 :         .await?;
     160              : 
     161              :         //
     162              :         // Communicate that shard is done.
     163              :         // Ensure at-least-once delivery of the upcall to storage controller
     164              :         // before we mark the task as done and never come here again.
     165              :         //
     166            0 :         let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &cancel)?
     167            0 :             .expect("storcon configured");
     168            0 :         storcon_client
     169            0 :             .put_timeline_import_status(
     170            0 :                 timeline.tenant_shard_id,
     171            0 :                 timeline.timeline_id,
     172            0 :                 // TODO(vlad): What about import errors?
     173            0 :                 ShardImportStatus::Done,
     174            0 :             )
     175            0 :             .await
     176            0 :             .map_err(|_err| anyhow::anyhow!("Shut down while putting timeline import status"))?;
     177              : 
     178            0 :         storage
     179            0 :             .put_json(
     180            0 :                 &shard_status_key,
     181            0 :                 &importbucket_format::ShardStatus { done: true },
     182            0 :             )
     183            0 :             .await
     184            0 :             .context("put shard status")?;
     185              :     }
     186              : 
     187              :     //
     188              :     // Mark as done in index_part.
     189              :     // This makes subsequent timeline loads enter the normal load code path
     190              :     // instead of spawning the import task and calling this here function.
     191              :     //
     192            0 :     info!("mark import as complete in index part");
     193            0 :     timeline
     194            0 :         .remote_client
     195            0 :         .schedule_index_upload_for_import_pgdata_state_update(Some(index_part_format::Root::V1(
     196            0 :             index_part_format::V1::Done(index_part_format::Done {
     197            0 :                 idempotency_key,
     198            0 :                 started_at,
     199            0 :                 finished_at: chrono::Utc::now().naive_utc(),
     200            0 :             }),
     201            0 :         )))?;
     202              : 
     203            0 :     timeline.remote_client.wait_completion().await?;
     204              : 
     205            0 :     Ok(())
     206            0 : }
        

Generated by: LCOV version 2.1-beta