LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - import_pgdata.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 0.0 % 143 0
Test Date: 2024-11-25 17:48:16 Functions: 0.0 % 7 0

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use anyhow::{bail, Context};
       4              : use remote_storage::RemotePath;
       5              : use tokio_util::sync::CancellationToken;
       6              : use tracing::{info, info_span, Instrument};
       7              : use utils::lsn::Lsn;
       8              : 
       9              : use crate::{context::RequestContext, tenant::metadata::TimelineMetadata};
      10              : 
      11              : use super::Timeline;
      12              : 
      13              : mod flow;
      14              : mod importbucket_client;
      15              : mod importbucket_format;
      16              : pub(crate) mod index_part_format;
      17              : pub(crate) mod upcall_api;
      18              : 
      19            0 : pub async fn doit(
      20            0 :     timeline: &Arc<Timeline>,
      21            0 :     index_part: index_part_format::Root,
      22            0 :     ctx: &RequestContext,
      23            0 :     cancel: CancellationToken,
      24            0 : ) -> anyhow::Result<()> {
      25            0 :     let index_part_format::Root::V1(v1) = index_part;
      26              :     let index_part_format::InProgress {
      27            0 :         location,
      28            0 :         idempotency_key,
      29            0 :         started_at,
      30            0 :     } = match v1 {
      31            0 :         index_part_format::V1::Done(_) => return Ok(()),
      32            0 :         index_part_format::V1::InProgress(in_progress) => in_progress,
      33              :     };
      34              : 
      35            0 :     let storage = importbucket_client::new(timeline.conf, &location, cancel.clone()).await?;
      36              : 
      37            0 :     info!("get spec early so we know we'll be able to upcall when done");
      38            0 :     let Some(spec) = storage.get_spec().await? else {
      39            0 :         bail!("spec not found")
      40              :     };
      41              : 
      42            0 :     let upcall_client =
      43            0 :         upcall_api::Client::new(timeline.conf, cancel.clone()).context("create upcall client")?;
      44              : 
      45              :     //
      46              :     // send an early progress update to clean up k8s job early and generate potentially useful logs
      47              :     //
      48            0 :     info!("send early progress update");
      49            0 :     upcall_client
      50            0 :         .send_progress_until_success(&spec)
      51            0 :         .instrument(info_span!("early_progress_update"))
      52            0 :         .await?;
      53              : 
      54            0 :     let status_prefix = RemotePath::from_string("status").unwrap();
      55            0 : 
      56            0 :     //
      57            0 :     // See if shard is done.
      58            0 :     // TODO: incorporate generations into status key for split brain safety. Figure out together with checkpointing.
      59            0 :     //
      60            0 :     let shard_status_key =
      61            0 :         status_prefix.join(format!("shard-{}", timeline.tenant_shard_id.shard_slug()));
      62            0 :     let shard_status: Option<importbucket_format::ShardStatus> =
      63            0 :         storage.get_json(&shard_status_key).await?;
      64            0 :     info!(?shard_status, "peeking shard status");
      65            0 :     if shard_status.map(|st| st.done).unwrap_or(false) {
      66            0 :         info!("shard status indicates that the shard is done, skipping import");
      67              :     } else {
      68              :         // TODO: checkpoint the progress into the IndexPart instead of restarting
      69              :         // from the beginning.
      70              : 
      71              :         //
      72              :         // Wipe the slate clean - the flow does not allow resuming.
      73              :         // We can implement resuming in the future by checkpointing the progress into the IndexPart.
      74              :         //
      75            0 :         info!("wipe the slate clean");
      76              :         {
      77              :             // TODO: do we need to hold GC lock for this?
      78            0 :             let mut guard = timeline.layers.write().await;
      79            0 :             assert!(
      80            0 :                 guard.layer_map()?.open_layer.is_none(),
      81            0 :                 "while importing, there should be no in-memory layer" // this just seems like a good place to assert it
      82              :             );
      83            0 :             let all_layers_keys = guard.all_persistent_layers();
      84            0 :             let all_layers: Vec<_> = all_layers_keys
      85            0 :                 .iter()
      86            0 :                 .map(|key| guard.get_from_key(key))
      87            0 :                 .collect();
      88            0 :             let open = guard.open_mut().context("open_mut")?;
      89              : 
      90            0 :             timeline.remote_client.schedule_gc_update(&all_layers)?;
      91            0 :             open.finish_gc_timeline(&all_layers);
      92            0 :         }
      93            0 : 
      94            0 :         //
      95            0 :         // Wait for pgdata to finish uploading
      96            0 :         //
      97            0 :         info!("wait for pgdata to reach status 'done'");
      98            0 :         let pgdata_status_key = status_prefix.join("pgdata");
      99              :         loop {
     100            0 :             let res = async {
     101            0 :                 let pgdata_status: Option<importbucket_format::PgdataStatus> = storage
     102            0 :                     .get_json(&pgdata_status_key)
     103            0 :                     .await
     104            0 :                     .context("get pgdata status")?;
     105            0 :                 info!(?pgdata_status, "peeking pgdata status");
     106            0 :                 if pgdata_status.map(|st| st.done).unwrap_or(false) {
     107            0 :                     Ok(())
     108              :                 } else {
     109            0 :                     Err(anyhow::anyhow!("pgdata not done yet"))
     110              :                 }
     111            0 :             }
     112            0 :             .await;
     113            0 :             match res {
     114            0 :                 Ok(_) => break,
     115            0 :                 Err(err) => {
     116            0 :                     info!(?err, "indefintely waiting for pgdata to finish");
     117            0 :                     if tokio::time::timeout(std::time::Duration::from_secs(10), cancel.cancelled())
     118            0 :                         .await
     119            0 :                         .is_ok()
     120              :                     {
     121            0 :                         bail!("cancelled while waiting for pgdata");
     122            0 :                     }
     123              :                 }
     124              :             }
     125              :         }
     126              : 
     127              :         //
     128              :         // Do the import
     129              :         //
     130            0 :         info!("do the import");
     131            0 :         let control_file = storage.get_control_file().await?;
     132            0 :         let base_lsn = control_file.base_lsn();
     133            0 : 
     134            0 :         info!("update TimelineMetadata based on LSNs from control file");
     135              :         {
     136            0 :             let pg_version = control_file.pg_version();
     137            0 :             let _ctx: &RequestContext = ctx;
     138            0 :             async move {
     139            0 :                 // FIXME: The 'disk_consistent_lsn' should be the LSN at the *end* of the
     140            0 :                 // checkpoint record, and prev_record_lsn should point to its beginning.
     141            0 :                 // We should read the real end of the record from the WAL, but here we
     142            0 :                 // just fake it.
     143            0 :                 let disk_consistent_lsn = Lsn(base_lsn.0 + 8);
     144            0 :                 let prev_record_lsn = base_lsn;
     145            0 :                 let metadata = TimelineMetadata::new(
     146            0 :                     disk_consistent_lsn,
     147            0 :                     Some(prev_record_lsn),
     148            0 :                     None,     // no ancestor
     149            0 :                     Lsn(0),   // no ancestor lsn
     150            0 :                     base_lsn, // latest_gc_cutoff_lsn
     151            0 :                     base_lsn, // initdb_lsn
     152            0 :                     pg_version,
     153            0 :                 );
     154            0 : 
     155            0 :                 let _start_lsn = disk_consistent_lsn + 1;
     156            0 : 
     157            0 :                 timeline
     158            0 :                     .remote_client
     159            0 :                     .schedule_index_upload_for_full_metadata_update(&metadata)?;
     160              : 
     161            0 :                 timeline.remote_client.wait_completion().await?;
     162              : 
     163            0 :                 anyhow::Ok(())
     164            0 :             }
     165              :         }
     166            0 :         .await?;
     167              : 
     168            0 :         flow::run(
     169            0 :             timeline.clone(),
     170            0 :             base_lsn,
     171            0 :             control_file,
     172            0 :             storage.clone(),
     173            0 :             ctx,
     174            0 :         )
     175            0 :         .await?;
     176              : 
     177              :         //
     178              :         // Communicate that shard is done.
     179              :         //
     180            0 :         storage
     181            0 :             .put_json(
     182            0 :                 &shard_status_key,
     183            0 :                 &importbucket_format::ShardStatus { done: true },
     184            0 :             )
     185            0 :             .await
     186            0 :             .context("put shard status")?;
     187              :     }
     188              : 
     189              :     //
     190              :     // Ensure at-least-once deliver of the upcall to cplane
     191              :     // before we mark the task as done and never come here again.
     192              :     //
     193            0 :     info!("send final progress update");
     194            0 :     upcall_client
     195            0 :         .send_progress_until_success(&spec)
     196            0 :         .instrument(info_span!("final_progress_update"))
     197            0 :         .await?;
     198              : 
     199              :     //
     200              :     // Mark as done in index_part.
     201              :     // This makes subsequent timeline loads enter the normal load code path
     202              :     // instead of spawning the import task and calling this here function.
     203              :     //
     204            0 :     info!("mark import as complete in index part");
     205            0 :     timeline
     206            0 :         .remote_client
     207            0 :         .schedule_index_upload_for_import_pgdata_state_update(Some(index_part_format::Root::V1(
     208            0 :             index_part_format::V1::Done(index_part_format::Done {
     209            0 :                 idempotency_key,
     210            0 :                 started_at,
     211            0 :                 finished_at: chrono::Utc::now().naive_utc(),
     212            0 :             }),
     213            0 :         )))?;
     214              : 
     215            0 :     timeline.remote_client.wait_completion().await?;
     216              : 
     217            0 :     Ok(())
     218            0 : }
        

Generated by: LCOV version 2.1-beta