LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline/import_pgdata - flow.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 537 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 58 0

            Line data    Source code
       1              : //! Import a PGDATA directory into an empty root timeline.
       2              : //!
       3              : //! This module is adapted hackathon code by Heikki and Stas.
       4              : //! Other code in the parent module was written by Christian as part of a customer PoC.
       5              : //!
       6              : //! The hackathon code was producing image layer files as a free-standing program.
       7              : //!
       8              : //! It has been modified to
       9              : //! - run inside a running Pageserver, within the proper lifecycles of Timeline -> Tenant(Shard)
      10              : //! - => sharding-awareness: produce image layers with only the data relevant for this shard
      11              : //! - => S3 as the source for the PGDATA instead of local filesystem
      12              : //!
      13              : //! TODOs before productionization:
      14              : //! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size.
      15              : //!
      16              : //! An incomplete set of TODOs from the Hackathon:
      17              : //! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest)
      18              : 
      19              : use std::collections::HashSet;
      20              : use std::hash::{Hash, Hasher};
      21              : use std::num::NonZeroUsize;
      22              : use std::ops::Range;
      23              : use std::sync::Arc;
      24              : 
      25              : use anyhow::ensure;
      26              : use bytes::Bytes;
      27              : use futures::stream::FuturesOrdered;
      28              : use itertools::Itertools;
      29              : use pageserver_api::config::TimelineImportConfig;
      30              : use pageserver_api::key::{
      31              :     CHECKPOINT_KEY, CONTROLFILE_KEY, DBDIR_KEY, Key, TWOPHASEDIR_KEY, rel_block_to_key,
      32              :     rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
      33              :     slru_segment_size_to_key,
      34              : };
      35              : use pageserver_api::keyspace::{ShardedRange, singleton_range};
      36              : use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus};
      37              : use pageserver_api::reltag::{RelTag, SlruKind};
      38              : use pageserver_api::shard::ShardIdentity;
      39              : use postgres_ffi::BLCKSZ;
      40              : use postgres_ffi::relfile_utils::parse_relfilename;
      41              : use remote_storage::RemotePath;
      42              : use tokio::sync::Semaphore;
      43              : use tokio_stream::StreamExt;
      44              : use tracing::{debug, instrument};
      45              : use utils::bin_ser::BeSer;
      46              : use utils::lsn::Lsn;
      47              : use utils::pausable_failpoint;
      48              : 
      49              : use super::Timeline;
      50              : use super::importbucket_client::{ControlFile, RemoteStorageWrapper};
      51              : use crate::assert_u64_eq_usize::UsizeIsU64;
      52              : use crate::context::{DownloadBehavior, RequestContext};
      53              : use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
      54              : use crate::pgdatadir_mapping::{
      55              :     DbDirectory, RelDirectory, SlruSegmentDirectory, TwoPhaseDirectory,
      56              : };
      57              : use crate::task_mgr::TaskKind;
      58              : use crate::tenant::storage_layer::{AsLayerDesc, ImageLayerWriter, Layer};
      59              : use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
      60              : 
      61            0 : pub async fn run(
      62            0 :     timeline: Arc<Timeline>,
      63            0 :     control_file: ControlFile,
      64            0 :     storage: RemoteStorageWrapper,
      65            0 :     import_progress: Option<ShardImportProgress>,
      66            0 :     ctx: &RequestContext,
      67            0 : ) -> anyhow::Result<()> {
      68              :     // Match how we run the import based on the progress version.
      69              :     // If there's no import progress, it means that this is a new import
      70              :     // and we can use whichever version we want.
      71            0 :     match import_progress {
      72            0 :         Some(ShardImportProgress::V1(progress)) => {
      73            0 :             run_v1(timeline, control_file, storage, Some(progress), ctx).await
      74              :         }
      75            0 :         None => run_v1(timeline, control_file, storage, None, ctx).await,
      76              :     }
      77            0 : }
      78              : 
      79            0 : async fn run_v1(
      80            0 :     timeline: Arc<Timeline>,
      81            0 :     control_file: ControlFile,
      82            0 :     storage: RemoteStorageWrapper,
      83            0 :     import_progress: Option<ShardImportProgressV1>,
      84            0 :     ctx: &RequestContext,
      85            0 : ) -> anyhow::Result<()> {
      86            0 :     let planner = Planner {
      87            0 :         control_file,
      88            0 :         storage: storage.clone(),
      89            0 :         shard: timeline.shard_identity,
      90            0 :         tasks: Vec::default(),
      91            0 :     };
      92              : 
      93              :     // Use the job size limit encoded in the progress if we are resuming an import.
      94              :     // This ensures that imports have stable plans even if the pageserver config changes.
      95            0 :     let import_config = {
      96            0 :         match &import_progress {
      97            0 :             Some(progress) => {
      98            0 :                 let base = &timeline.conf.timeline_import_config;
      99            0 :                 TimelineImportConfig {
     100            0 :                     import_job_soft_size_limit: NonZeroUsize::new(progress.job_soft_size_limit)
     101            0 :                         .unwrap(),
     102            0 :                     import_job_concurrency: base.import_job_concurrency,
     103            0 :                     import_job_checkpoint_threshold: base.import_job_checkpoint_threshold,
     104            0 :                     import_job_max_byte_range_size: base.import_job_max_byte_range_size,
     105            0 :                 }
     106              :             }
     107            0 :             None => timeline.conf.timeline_import_config.clone(),
     108              :         }
     109              :     };
     110              : 
     111            0 :     let plan = planner.plan(&import_config).await?;
     112              : 
     113              :     // Hash the plan and compare with the hash of the plan we got back from the storage controller.
     114              :     // If the two match, it means that the planning stage had the same output.
     115              :     //
     116              :     // This is not intended to be a cryptographically secure hash.
     117              :     const SEED: u64 = 42;
     118            0 :     let mut hasher = twox_hash::XxHash64::with_seed(SEED);
     119            0 :     plan.hash(&mut hasher);
     120            0 :     let plan_hash = hasher.finish();
     121              : 
     122            0 :     if let Some(progress) = &import_progress {
     123              :         // Handle collisions on jobs of unequal length
     124            0 :         if progress.jobs != plan.jobs.len() {
     125            0 :             anyhow::bail!("Import plan job length does not match storcon metadata")
     126            0 :         }
     127              : 
     128            0 :         if plan_hash != progress.import_plan_hash {
     129            0 :             anyhow::bail!("Import plan does not match storcon metadata");
     130            0 :         }
     131            0 :     }
     132              : 
     133            0 :     pausable_failpoint!("import-timeline-pre-execute-pausable");
     134              : 
     135            0 :     let jobs_count = import_progress.as_ref().map(|p| p.jobs);
     136            0 :     let start_from_job_idx = import_progress.map(|progress| progress.completed);
     137              : 
     138            0 :     tracing::info!(
     139              :         start_from_job_idx=?start_from_job_idx,
     140              :         jobs=?jobs_count,
     141            0 :         "Executing import plan"
     142              :     );
     143              : 
     144            0 :     plan.execute(timeline, start_from_job_idx, plan_hash, &import_config, ctx)
     145            0 :         .await
     146            0 : }
     147              : 
     148              : struct Planner {
     149              :     control_file: ControlFile,
     150              :     storage: RemoteStorageWrapper,
     151              :     shard: ShardIdentity,
     152              :     tasks: Vec<AnyImportTask>,
     153              : }
     154              : 
     155              : #[derive(Hash)]
     156              : struct Plan {
     157              :     jobs: Vec<ChunkProcessingJob>,
     158              :     // Included here such that it ends up in the hash for the plan
     159              :     shard: ShardIdentity,
     160              : }
     161              : 
     162              : impl Planner {
     163              :     /// Creates an import plan
     164              :     ///
     165              :     /// This function is and must remain pure: given the same input, it will generate the same import plan.
     166            0 :     async fn plan(mut self, import_config: &TimelineImportConfig) -> anyhow::Result<Plan> {
     167            0 :         let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
     168            0 :         anyhow::ensure!(pgdata_lsn.is_valid());
     169              : 
     170            0 :         let datadir = PgDataDir::new(&self.storage).await?;
     171              : 
     172              :         // Import dbdir (00:00:00 keyspace)
     173              :         // This is just constructed here, but will be written to the image layer in the first call to import_db()
     174            0 :         let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory {
     175            0 :             dbdirs: datadir
     176            0 :                 .dbs
     177            0 :                 .iter()
     178            0 :                 .map(|db| ((db.spcnode, db.dboid), true))
     179            0 :                 .collect(),
     180            0 :         })?);
     181            0 :         self.tasks
     182            0 :             .push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into());
     183              : 
     184              :         // Import databases (00:spcnode:dbnode keyspace for each db)
     185            0 :         for db in datadir.dbs {
     186            0 :             self.import_db(&db).await?;
     187              :         }
     188              : 
     189              :         // Import SLRUs
     190            0 :         if self.shard.is_shard_zero() {
     191              :             // pg_xact (01:00 keyspace)
     192            0 :             self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
     193            0 :                 .await?;
     194              :             // pg_multixact/members (01:01 keyspace)
     195            0 :             self.import_slru(
     196            0 :                 SlruKind::MultiXactMembers,
     197            0 :                 &self.storage.pgdata().join("pg_multixact/members"),
     198            0 :             )
     199            0 :             .await?;
     200              :             // pg_multixact/offsets (01:02 keyspace)
     201            0 :             self.import_slru(
     202            0 :                 SlruKind::MultiXactOffsets,
     203            0 :                 &self.storage.pgdata().join("pg_multixact/offsets"),
     204            0 :             )
     205            0 :             .await?;
     206            0 :         }
     207              : 
     208              :         // Import pg_twophase.
     209              :         // TODO: as empty
     210            0 :         let twophasedir_buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
     211            0 :             xids: HashSet::new(),
     212            0 :         })?;
     213            0 :         self.tasks
     214            0 :             .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     215            0 :                 TWOPHASEDIR_KEY,
     216            0 :                 Bytes::from(twophasedir_buf),
     217            0 :             )));
     218              : 
     219              :         // Controlfile, checkpoint
     220            0 :         self.tasks
     221            0 :             .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     222            0 :                 CONTROLFILE_KEY,
     223            0 :                 self.control_file.control_file_buf().clone(),
     224            0 :             )));
     225              : 
     226            0 :         let checkpoint_buf = self
     227            0 :             .control_file
     228            0 :             .control_file_data()
     229            0 :             .checkPointCopy
     230            0 :             .encode()?;
     231            0 :         self.tasks
     232            0 :             .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     233            0 :                 CHECKPOINT_KEY,
     234            0 :                 checkpoint_buf,
     235            0 :             )));
     236              : 
     237              :         // Sort the tasks by the key ranges they handle.
     238              :         // The plan being generated here needs to be stable across invocations
     239              :         // of this method.
     240            0 :         self.tasks.sort_by_key(|task| match task {
     241            0 :             AnyImportTask::SingleKey(key) => (key.key, key.key.next()),
     242            0 :             AnyImportTask::RelBlocks(rel_blocks) => {
     243            0 :                 (rel_blocks.key_range.start, rel_blocks.key_range.end)
     244              :             }
     245            0 :             AnyImportTask::SlruBlocks(slru_blocks) => {
     246            0 :                 (slru_blocks.key_range.start, slru_blocks.key_range.end)
     247              :             }
     248            0 :         });
     249              : 
     250              :         // Assigns parts of key space to later parallel jobs
     251              :         // Note: The image layers produced here may have gaps, meaning,
     252              :         //       there is not an image for each key in the layer's key range.
     253              :         //       The read path stops traversal at the first image layer, regardless
     254              :         //       of whether a base image has been found for a key or not.
     255              :         //       (Concept of sparse image layers doesn't exist.)
     256              :         //       This behavior is exactly right for the base image layers we're producing here.
     257              :         //       But, since no other place in the code currently produces image layers with gaps,
     258              :         //       it seems noteworthy.
     259            0 :         let mut last_end_key = Key::MIN;
     260            0 :         let mut current_chunk = Vec::new();
     261            0 :         let mut current_chunk_size: usize = 0;
     262            0 :         let mut jobs = Vec::new();
     263            0 :         for task in std::mem::take(&mut self.tasks).into_iter() {
     264            0 :             let task_size = task.total_size(&self.shard);
     265            0 :             let projected_chunk_size = current_chunk_size.saturating_add(task_size);
     266            0 :             if projected_chunk_size > import_config.import_job_soft_size_limit.into() {
     267            0 :                 let key_range = last_end_key..task.key_range().start;
     268            0 :                 jobs.push(ChunkProcessingJob::new(
     269            0 :                     key_range.clone(),
     270            0 :                     std::mem::take(&mut current_chunk),
     271            0 :                     pgdata_lsn,
     272            0 :                 ));
     273            0 :                 last_end_key = key_range.end;
     274            0 :                 current_chunk_size = 0;
     275            0 :             }
     276            0 :             current_chunk_size = current_chunk_size.saturating_add(task_size);
     277            0 :             current_chunk.push(task);
     278              :         }
     279            0 :         jobs.push(ChunkProcessingJob::new(
     280            0 :             last_end_key..Key::MAX,
     281            0 :             current_chunk,
     282            0 :             pgdata_lsn,
     283              :         ));
     284              : 
     285            0 :         Ok(Plan {
     286            0 :             jobs,
     287            0 :             shard: self.shard,
     288            0 :         })
     289            0 :     }
     290              : 
     291            0 :     #[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
     292              :     async fn import_db(&mut self, db: &PgDataDirDb) -> anyhow::Result<()> {
     293              :         debug!("start");
     294              :         scopeguard::defer! {
     295              :             debug!("return");
     296              :         }
     297              : 
     298              :         // Import relmap (00:spcnode:dbnode:00:*:00)
     299              :         let relmap_key = relmap_file_key(db.spcnode, db.dboid);
     300              :         debug!("Constructing relmap entry, key {relmap_key}");
     301              :         let relmap_path = db.path.join("pg_filenode.map");
     302              :         let relmap_buf = self.storage.get(&relmap_path).await?;
     303              :         self.tasks
     304              :             .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     305              :                 relmap_key, relmap_buf,
     306              :             )));
     307              : 
     308              :         // Import reldir (00:spcnode:dbnode:00:*:01)
     309              :         let reldir_key = rel_dir_to_key(db.spcnode, db.dboid);
     310              :         debug!("Constructing reldirs entry, key {reldir_key}");
     311              :         let reldir_buf = RelDirectory::ser(&RelDirectory {
     312              :             rels: db
     313              :                 .files
     314              :                 .iter()
     315            0 :                 .map(|f| (f.rel_tag.relnode, f.rel_tag.forknum))
     316              :                 .collect(),
     317              :         })?;
     318              :         self.tasks
     319              :             .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     320              :                 reldir_key,
     321              :                 Bytes::from(reldir_buf),
     322              :             )));
     323              : 
     324              :         // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last
     325              :         // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff)
     326              :         for file in &db.files {
     327              :             debug!(%file.path, %file.filesize, "importing file");
     328              :             let len = file.filesize;
     329              :             ensure!(len % 8192 == 0);
     330              :             let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192);
     331              :             let start_key = rel_block_to_key(file.rel_tag, start_blk);
     332              :             let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
     333              :             self.tasks
     334              :                 .push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
     335              :                     self.shard,
     336              :                     start_key..end_key,
     337              :                     &file.path,
     338              :                     self.storage.clone(),
     339              :                 )));
     340              : 
     341              :             // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff)
     342              :             if let Some(nblocks) = file.nblocks {
     343              :                 let size_key = rel_size_to_key(file.rel_tag);
     344              :                 //debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}");
     345              :                 let buf = nblocks.to_le_bytes();
     346              :                 self.tasks
     347              :                     .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     348              :                         size_key,
     349              :                         Bytes::from(buf.to_vec()),
     350              :                     )));
     351              :             }
     352              :         }
     353              : 
     354              :         Ok(())
     355              :     }
     356              : 
     357            0 :     async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
     358            0 :         assert!(self.shard.is_shard_zero());
     359              : 
     360            0 :         let segments = self.storage.listfilesindir(path).await?;
     361            0 :         let segments: Vec<(String, u32, usize)> = segments
     362            0 :             .into_iter()
     363            0 :             .filter_map(|(path, size)| {
     364            0 :                 let filename = path.object_name()?;
     365            0 :                 let segno = u32::from_str_radix(filename, 16).ok()?;
     366            0 :                 Some((filename.to_string(), segno, size))
     367            0 :             })
     368            0 :             .collect();
     369              : 
     370              :         // Write SlruDir
     371            0 :         let slrudir_key = slru_dir_to_key(kind);
     372            0 :         let segnos: HashSet<u32> = segments
     373            0 :             .iter()
     374            0 :             .map(|(_path, segno, _size)| *segno)
     375            0 :             .collect();
     376            0 :         let slrudir = SlruSegmentDirectory { segments: segnos };
     377            0 :         let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?;
     378            0 :         self.tasks
     379            0 :             .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     380            0 :                 slrudir_key,
     381            0 :                 Bytes::from(slrudir_buf),
     382            0 :             )));
     383              : 
     384            0 :         for (segpath, segno, size) in segments {
     385              :             // SlruSegBlocks for each segment
     386            0 :             let p = path.join(&segpath);
     387            0 :             let file_size = size;
     388            0 :             ensure!(file_size % 8192 == 0);
     389            0 :             let nblocks = u32::try_from(file_size / 8192)?;
     390            0 :             let start_key = slru_block_to_key(kind, segno, 0);
     391            0 :             let end_key = slru_block_to_key(kind, segno, nblocks);
     392            0 :             debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment");
     393            0 :             self.tasks
     394            0 :                 .push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(
     395            0 :                     start_key..end_key,
     396            0 :                     &p,
     397            0 :                     self.storage.clone(),
     398            0 :                 )));
     399              : 
     400              :             // Followed by SlruSegSize
     401            0 :             let segsize_key = slru_segment_size_to_key(kind, segno);
     402            0 :             let segsize_buf = nblocks.to_le_bytes();
     403            0 :             self.tasks
     404            0 :                 .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     405            0 :                     segsize_key,
     406            0 :                     Bytes::copy_from_slice(&segsize_buf),
     407            0 :                 )));
     408              :         }
     409            0 :         Ok(())
     410            0 :     }
     411              : }
     412              : 
     413              : impl Plan {
     414            0 :     async fn execute(
     415            0 :         self,
     416            0 :         timeline: Arc<Timeline>,
     417            0 :         start_after_job_idx: Option<usize>,
     418            0 :         import_plan_hash: u64,
     419            0 :         import_config: &TimelineImportConfig,
     420            0 :         ctx: &RequestContext,
     421            0 :     ) -> anyhow::Result<()> {
     422            0 :         let storcon_client = StorageControllerUpcallClient::new(timeline.conf, &timeline.cancel);
     423              : 
     424            0 :         let mut work = FuturesOrdered::new();
     425            0 :         let semaphore = Arc::new(Semaphore::new(import_config.import_job_concurrency.into()));
     426              : 
     427            0 :         let jobs_in_plan = self.jobs.len();
     428              : 
     429            0 :         let mut jobs = self
     430            0 :             .jobs
     431            0 :             .into_iter()
     432            0 :             .enumerate()
     433            0 :             .map(|(idx, job)| (idx + 1, job))
     434            0 :             .filter(|(idx, _job)| {
     435              :                 // Filter out any jobs that have been done already
     436            0 :                 if let Some(start_after) = start_after_job_idx {
     437            0 :                     *idx > start_after
     438              :                 } else {
     439            0 :                     true
     440              :                 }
     441            0 :             })
     442            0 :             .peekable();
     443              : 
     444            0 :         let mut last_completed_job_idx = start_after_job_idx.unwrap_or(0);
     445            0 :         let checkpoint_every: usize = import_config.import_job_checkpoint_threshold.into();
     446            0 :         let max_byte_range_size: usize = import_config.import_job_max_byte_range_size.into();
     447              : 
     448              :         // Run import jobs concurrently up to the limit specified by the pageserver configuration.
     449              :         // Note that we process completed futures in the oreder of insertion. This will be the
     450              :         // building block for resuming imports across pageserver restarts or tenant migrations.
     451            0 :         while last_completed_job_idx < jobs_in_plan {
     452            0 :             tokio::select! {
     453            0 :                 permit = semaphore.clone().acquire_owned(), if jobs.peek().is_some() => {
     454            0 :                     let permit = permit.expect("never closed");
     455            0 :                     let (job_idx, job) = jobs.next().expect("we peeked");
     456              : 
     457            0 :                     let job_timeline = timeline.clone();
     458            0 :                     let ctx = ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
     459              : 
     460            0 :                     work.push_back(tokio::task::spawn(async move {
     461            0 :                         let _permit = permit;
     462            0 :                         let res = job.run(job_timeline, max_byte_range_size, &ctx).await;
     463            0 :                         (job_idx, res)
     464            0 :                     }));
     465              :                 },
     466            0 :                 maybe_complete_job_idx = work.next() => {
     467            0 :                     pausable_failpoint!("import-task-complete-pausable");
     468              : 
     469            0 :                     match maybe_complete_job_idx {
     470            0 :                         Some(Ok((job_idx, res))) => {
     471            0 :                             assert!(last_completed_job_idx.checked_add(1).unwrap() == job_idx);
     472              : 
     473            0 :                             res?;
     474            0 :                             last_completed_job_idx = job_idx;
     475              : 
     476            0 :                             if last_completed_job_idx % checkpoint_every == 0 {
     477            0 :                                 tracing::info!(last_completed_job_idx, jobs=%jobs_in_plan, "Checkpointing import status");
     478              : 
     479            0 :                                 let progress = ShardImportProgressV1 {
     480            0 :                                     jobs: jobs_in_plan,
     481            0 :                                     completed: last_completed_job_idx,
     482            0 :                                     import_plan_hash,
     483            0 :                                     job_soft_size_limit: import_config.import_job_soft_size_limit.into(),
     484            0 :                                 };
     485              : 
     486            0 :                                 timeline.remote_client.schedule_index_upload_for_file_changes()?;
     487            0 :                                 timeline.remote_client.wait_completion().await?;
     488              : 
     489            0 :                                 storcon_client.put_timeline_import_status(
     490            0 :                                     timeline.tenant_shard_id,
     491            0 :                                     timeline.timeline_id,
     492            0 :                                     timeline.generation,
     493            0 :                                     ShardImportStatus::InProgress(Some(ShardImportProgress::V1(progress)))
     494            0 :                                 )
     495            0 :                                 .await
     496            0 :                                 .map_err(|_err| {
     497            0 :                                     anyhow::anyhow!("Shut down while putting timeline import status")
     498            0 :                                 })?;
     499            0 :                             }
     500              :                         },
     501              :                         Some(Err(_)) => {
     502            0 :                             anyhow::bail!(
     503            0 :                                 "import job panicked or cancelled"
     504              :                             );
     505              :                         }
     506            0 :                         None => {}
     507              :                     }
     508              :                 }
     509              :             }
     510              :         }
     511              : 
     512            0 :         Ok(())
     513            0 :     }
     514              : }
     515              : 
     516              : //
     517              : // dbdir iteration tools
     518              : //
     519              : 
     520              : struct PgDataDir {
     521              :     pub dbs: Vec<PgDataDirDb>, // spcnode, dboid, path
     522              : }
     523              : 
     524              : struct PgDataDirDb {
     525              :     pub spcnode: u32,
     526              :     pub dboid: u32,
     527              :     pub path: RemotePath,
     528              :     pub files: Vec<PgDataDirDbFile>,
     529              : }
     530              : 
     531              : struct PgDataDirDbFile {
     532              :     pub path: RemotePath,
     533              :     pub rel_tag: RelTag,
     534              :     pub segno: u32,
     535              :     pub filesize: usize,
     536              :     // Cummulative size of the given fork, set only for the last segment of that fork
     537              :     pub nblocks: Option<usize>,
     538              : }
     539              : 
     540              : impl PgDataDir {
     541            0 :     async fn new(storage: &RemoteStorageWrapper) -> anyhow::Result<Self> {
     542            0 :         let datadir_path = storage.pgdata();
     543              :         // Import ordinary databases, DEFAULTTABLESPACE_OID is smaller than GLOBALTABLESPACE_OID, so import them first
     544              :         // Traverse database in increasing oid order
     545              : 
     546            0 :         let basedir = &datadir_path.join("base");
     547            0 :         let db_oids: Vec<_> = storage
     548            0 :             .listdir(basedir)
     549            0 :             .await?
     550            0 :             .into_iter()
     551            0 :             .filter_map(|path| path.object_name().and_then(|name| name.parse::<u32>().ok()))
     552            0 :             .sorted()
     553            0 :             .collect();
     554            0 :         debug!(?db_oids, "found databases");
     555            0 :         let mut databases = Vec::new();
     556            0 :         for dboid in db_oids {
     557            0 :             databases.push(
     558            0 :                 PgDataDirDb::new(
     559            0 :                     storage,
     560            0 :                     &basedir.join(dboid.to_string()),
     561            0 :                     postgres_ffi_types::constants::DEFAULTTABLESPACE_OID,
     562            0 :                     dboid,
     563            0 :                     &datadir_path,
     564            0 :                 )
     565            0 :                 .await?,
     566              :             );
     567              :         }
     568              : 
     569              :         // special case for global catalogs
     570            0 :         databases.push(
     571            0 :             PgDataDirDb::new(
     572            0 :                 storage,
     573            0 :                 &datadir_path.join("global"),
     574            0 :                 postgres_ffi_types::constants::GLOBALTABLESPACE_OID,
     575            0 :                 0,
     576            0 :                 &datadir_path,
     577            0 :             )
     578            0 :             .await?,
     579              :         );
     580              : 
     581            0 :         databases.sort_by_key(|db| (db.spcnode, db.dboid));
     582              : 
     583            0 :         Ok(Self { dbs: databases })
     584            0 :     }
     585              : }
     586              : 
     587              : impl PgDataDirDb {
     588            0 :     #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%dboid, %db_path))]
     589              :     async fn new(
     590              :         storage: &RemoteStorageWrapper,
     591              :         db_path: &RemotePath,
     592              :         spcnode: u32,
     593              :         dboid: u32,
     594              :         datadir_path: &RemotePath,
     595              :     ) -> anyhow::Result<Self> {
     596              :         let mut files: Vec<PgDataDirDbFile> = storage
     597              :             .listfilesindir(db_path)
     598              :             .await?
     599              :             .into_iter()
     600            0 :             .filter_map(|(path, size)| {
     601            0 :                 debug!(%path, %size, "found file in dbdir");
     602            0 :                 path.object_name().and_then(|name| {
     603              :                     // returns (relnode, forknum, segno)
     604            0 :                     parse_relfilename(name).ok().map(|x| (size, x))
     605            0 :                 })
     606            0 :             })
     607              :             .sorted_by_key(|(_, relfilename)| *relfilename)
     608            0 :             .map(|(filesize, (relnode, forknum, segno))| {
     609            0 :                 let rel_tag = RelTag {
     610            0 :                     spcnode,
     611            0 :                     dbnode: dboid,
     612            0 :                     relnode,
     613            0 :                     forknum,
     614            0 :                 };
     615              : 
     616            0 :                 let path = datadir_path.join(rel_tag.to_segfile_name(segno));
     617            0 :                 anyhow::ensure!(filesize % BLCKSZ as usize == 0);
     618            0 :                 let nblocks = filesize / BLCKSZ as usize;
     619              : 
     620            0 :                 Ok(PgDataDirDbFile {
     621            0 :                     path,
     622            0 :                     filesize,
     623            0 :                     rel_tag,
     624            0 :                     segno,
     625            0 :                     nblocks: Some(nblocks), // first non-cummulative sizes
     626            0 :                 })
     627            0 :             })
     628              :             .collect::<anyhow::Result<_, _>>()?;
     629              : 
     630              :         // Set cummulative sizes. Do all of that math here, so that later we could easier
     631              :         // parallelize over segments and know with which segments we need to write relsize
     632              :         // entry.
     633              :         let mut cumulative_nblocks: usize = 0;
     634              :         let mut prev_rel_tag: Option<RelTag> = None;
     635              :         for i in 0..files.len() {
     636              :             if prev_rel_tag == Some(files[i].rel_tag) {
     637              :                 cumulative_nblocks += files[i].nblocks.unwrap();
     638              :             } else {
     639              :                 cumulative_nblocks = files[i].nblocks.unwrap();
     640              :             }
     641              : 
     642              :             files[i].nblocks = if i == files.len() - 1 || files[i + 1].rel_tag != files[i].rel_tag {
     643              :                 Some(cumulative_nblocks)
     644              :             } else {
     645              :                 None
     646              :             };
     647              : 
     648              :             prev_rel_tag = Some(files[i].rel_tag);
     649              :         }
     650              : 
     651              :         Ok(PgDataDirDb {
     652              :             files,
     653              :             path: db_path.clone(),
     654              :             spcnode,
     655              :             dboid,
     656              :         })
     657              :     }
     658              : }
     659              : 
     660              : trait ImportTask {
     661              :     fn key_range(&self) -> Range<Key>;
     662              : 
     663            0 :     fn total_size(&self, shard_identity: &ShardIdentity) -> usize {
     664            0 :         let range = ShardedRange::new(self.key_range(), shard_identity);
     665            0 :         let page_count = range.page_count();
     666            0 :         if page_count == u32::MAX {
     667            0 :             tracing::warn!(
     668            0 :                 "Import task has non contiguous key range: {}..{}",
     669            0 :                 self.key_range().start,
     670            0 :                 self.key_range().end
     671              :             );
     672              : 
     673              :             // Tasks should operate on contiguous ranges. It is unexpected for
     674              :             // ranges to violate this assumption. Calling code handles this by mapping
     675              :             // any task on a non contiguous range to its own image layer.
     676            0 :             usize::MAX
     677              :         } else {
     678            0 :             page_count as usize * 8192
     679              :         }
     680            0 :     }
     681              : 
     682              :     async fn doit(
     683              :         self,
     684              :         layer_writer: &mut ImageLayerWriter,
     685              :         max_byte_range_size: usize,
     686              :         ctx: &RequestContext,
     687              :     ) -> anyhow::Result<usize>;
     688              : }
     689              : 
     690              : struct ImportSingleKeyTask {
     691              :     key: Key,
     692              :     buf: Bytes,
     693              : }
     694              : 
     695              : impl Hash for ImportSingleKeyTask {
     696            0 :     fn hash<H: Hasher>(&self, state: &mut H) {
     697            0 :         let ImportSingleKeyTask { key, buf } = self;
     698              : 
     699            0 :         key.hash(state);
     700              :         // The key value might not have a stable binary representation.
     701              :         // For instance, the db directory uses an unstable hash-map.
     702              :         // To work around this we are a bit lax here and only hash the
     703              :         // size of the buffer which must be consistent.
     704            0 :         buf.len().hash(state);
     705            0 :     }
     706              : }
     707              : 
     708              : impl ImportSingleKeyTask {
     709            0 :     fn new(key: Key, buf: Bytes) -> Self {
     710            0 :         ImportSingleKeyTask { key, buf }
     711            0 :     }
     712              : }
     713              : 
     714              : impl ImportTask for ImportSingleKeyTask {
     715            0 :     fn key_range(&self) -> Range<Key> {
     716            0 :         singleton_range(self.key)
     717            0 :     }
     718              : 
     719            0 :     async fn doit(
     720            0 :         self,
     721            0 :         layer_writer: &mut ImageLayerWriter,
     722            0 :         _max_byte_range_size: usize,
     723            0 :         ctx: &RequestContext,
     724            0 :     ) -> anyhow::Result<usize> {
     725            0 :         layer_writer.put_image(self.key, self.buf, ctx).await?;
     726            0 :         Ok(1)
     727            0 :     }
     728              : }
     729              : 
     730              : struct ImportRelBlocksTask {
     731              :     shard_identity: ShardIdentity,
     732              :     key_range: Range<Key>,
     733              :     path: RemotePath,
     734              :     storage: RemoteStorageWrapper,
     735              : }
     736              : 
     737              : impl Hash for ImportRelBlocksTask {
     738            0 :     fn hash<H: Hasher>(&self, state: &mut H) {
     739              :         let ImportRelBlocksTask {
     740              :             shard_identity: _,
     741            0 :             key_range,
     742            0 :             path,
     743              :             storage: _,
     744            0 :         } = self;
     745              : 
     746            0 :         key_range.hash(state);
     747            0 :         path.hash(state);
     748            0 :     }
     749              : }
     750              : 
     751              : impl ImportRelBlocksTask {
     752            0 :     fn new(
     753            0 :         shard_identity: ShardIdentity,
     754            0 :         key_range: Range<Key>,
     755            0 :         path: &RemotePath,
     756            0 :         storage: RemoteStorageWrapper,
     757            0 :     ) -> Self {
     758            0 :         ImportRelBlocksTask {
     759            0 :             shard_identity,
     760            0 :             key_range,
     761            0 :             path: path.clone(),
     762            0 :             storage,
     763            0 :         }
     764            0 :     }
     765              : }
     766              : 
     767              : impl ImportTask for ImportRelBlocksTask {
     768            0 :     fn key_range(&self) -> Range<Key> {
     769            0 :         self.key_range.clone()
     770            0 :     }
     771              : 
     772            0 :     #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%self.path))]
     773              :     async fn doit(
     774              :         self,
     775              :         layer_writer: &mut ImageLayerWriter,
     776              :         max_byte_range_size: usize,
     777              :         ctx: &RequestContext,
     778              :     ) -> anyhow::Result<usize> {
     779              :         debug!("Importing relation file");
     780              : 
     781              :         let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?;
     782              :         let (rel_tag_end, end_blk) = self.key_range.end.to_rel_block()?;
     783              :         assert_eq!(rel_tag, rel_tag_end);
     784              : 
     785              :         let ranges = (start_blk..end_blk)
     786              :             .enumerate()
     787            0 :             .filter_map(|(i, blknum)| {
     788            0 :                 let key = rel_block_to_key(rel_tag, blknum);
     789            0 :                 if self.shard_identity.is_key_disposable(&key) {
     790            0 :                     return None;
     791            0 :                 }
     792            0 :                 let file_offset = i.checked_mul(8192).unwrap();
     793            0 :                 Some((
     794            0 :                     vec![key],
     795            0 :                     file_offset,
     796            0 :                     file_offset.checked_add(8192).unwrap(),
     797            0 :                 ))
     798            0 :             })
     799            0 :             .coalesce(|(mut acc, acc_start, acc_end), (mut key, start, end)| {
     800            0 :                 assert_eq!(key.len(), 1);
     801            0 :                 assert!(!acc.is_empty());
     802            0 :                 assert!(acc_end > acc_start);
     803            0 :                 if acc_end == start && end - acc_start <= max_byte_range_size {
     804            0 :                     acc.push(key.pop().unwrap());
     805            0 :                     Ok((acc, acc_start, end))
     806              :                 } else {
     807            0 :                     Err(((acc, acc_start, acc_end), (key, start, end)))
     808              :                 }
     809            0 :             });
     810              : 
     811              :         let mut nimages = 0;
     812              :         for (keys, range_start, range_end) in ranges {
     813              :             let range_buf = self
     814              :                 .storage
     815              :                 .get_range(&self.path, range_start.into_u64(), range_end.into_u64())
     816              :                 .await?;
     817              :             let mut buf = Bytes::from(range_buf);
     818              :             for key in keys {
     819              :                 // The writer buffers writes internally
     820              :                 let image = buf.split_to(8192);
     821              :                 layer_writer.put_image(key, image, ctx).await?;
     822              :                 nimages += 1;
     823              :             }
     824              :         }
     825              : 
     826              :         Ok(nimages)
     827              :     }
     828              : }
     829              : 
     830              : struct ImportSlruBlocksTask {
     831              :     key_range: Range<Key>,
     832              :     path: RemotePath,
     833              :     storage: RemoteStorageWrapper,
     834              : }
     835              : 
     836              : impl Hash for ImportSlruBlocksTask {
     837            0 :     fn hash<H: Hasher>(&self, state: &mut H) {
     838              :         let ImportSlruBlocksTask {
     839            0 :             key_range,
     840            0 :             path,
     841              :             storage: _,
     842            0 :         } = self;
     843              : 
     844            0 :         key_range.hash(state);
     845            0 :         path.hash(state);
     846            0 :     }
     847              : }
     848              : 
     849              : impl ImportSlruBlocksTask {
     850            0 :     fn new(key_range: Range<Key>, path: &RemotePath, storage: RemoteStorageWrapper) -> Self {
     851            0 :         ImportSlruBlocksTask {
     852            0 :             key_range,
     853            0 :             path: path.clone(),
     854            0 :             storage,
     855            0 :         }
     856            0 :     }
     857              : }
     858              : 
     859              : impl ImportTask for ImportSlruBlocksTask {
     860            0 :     fn key_range(&self) -> Range<Key> {
     861            0 :         self.key_range.clone()
     862            0 :     }
     863              : 
     864            0 :     async fn doit(
     865            0 :         self,
     866            0 :         layer_writer: &mut ImageLayerWriter,
     867            0 :         _max_byte_range_size: usize,
     868            0 :         ctx: &RequestContext,
     869            0 :     ) -> anyhow::Result<usize> {
     870            0 :         debug!("Importing SLRU segment file {}", self.path);
     871            0 :         let buf = self.storage.get(&self.path).await?;
     872              : 
     873              :         // TODO(vlad): Does timestamp to LSN work for imported timelines?
     874              :         // Probably not since we don't append the `xact_time` to it as in
     875              :         // [`WalIngest::ingest_xact_record`].
     876            0 :         let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?;
     877            0 :         let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?;
     878            0 :         let mut blknum = start_blk;
     879            0 :         let mut nimages = 0;
     880            0 :         let mut file_offset = 0;
     881            0 :         while blknum < end_blk {
     882            0 :             let key = slru_block_to_key(kind, segno, blknum);
     883            0 :             let buf = &buf[file_offset..(file_offset + 8192)];
     884            0 :             file_offset += 8192;
     885            0 :             layer_writer
     886            0 :                 .put_image(key, Bytes::copy_from_slice(buf), ctx)
     887            0 :                 .await?;
     888            0 :             nimages += 1;
     889            0 :             blknum += 1;
     890              :         }
     891            0 :         Ok(nimages)
     892            0 :     }
     893              : }
     894              : 
     895              : #[derive(Hash)]
     896              : enum AnyImportTask {
     897              :     SingleKey(ImportSingleKeyTask),
     898              :     RelBlocks(ImportRelBlocksTask),
     899              :     SlruBlocks(ImportSlruBlocksTask),
     900              : }
     901              : 
     902              : impl ImportTask for AnyImportTask {
     903            0 :     fn key_range(&self) -> Range<Key> {
     904            0 :         match self {
     905            0 :             Self::SingleKey(t) => t.key_range(),
     906            0 :             Self::RelBlocks(t) => t.key_range(),
     907            0 :             Self::SlruBlocks(t) => t.key_range(),
     908              :         }
     909            0 :     }
     910              :     /// returns the number of images put into the `layer_writer`
     911            0 :     async fn doit(
     912            0 :         self,
     913            0 :         layer_writer: &mut ImageLayerWriter,
     914            0 :         max_byte_range_size: usize,
     915            0 :         ctx: &RequestContext,
     916            0 :     ) -> anyhow::Result<usize> {
     917            0 :         match self {
     918            0 :             Self::SingleKey(t) => t.doit(layer_writer, max_byte_range_size, ctx).await,
     919            0 :             Self::RelBlocks(t) => t.doit(layer_writer, max_byte_range_size, ctx).await,
     920            0 :             Self::SlruBlocks(t) => t.doit(layer_writer, max_byte_range_size, ctx).await,
     921              :         }
     922            0 :     }
     923              : }
     924              : 
     925              : impl From<ImportSingleKeyTask> for AnyImportTask {
     926            0 :     fn from(t: ImportSingleKeyTask) -> Self {
     927            0 :         Self::SingleKey(t)
     928            0 :     }
     929              : }
     930              : 
     931              : impl From<ImportRelBlocksTask> for AnyImportTask {
     932            0 :     fn from(t: ImportRelBlocksTask) -> Self {
     933            0 :         Self::RelBlocks(t)
     934            0 :     }
     935              : }
     936              : 
     937              : impl From<ImportSlruBlocksTask> for AnyImportTask {
     938            0 :     fn from(t: ImportSlruBlocksTask) -> Self {
     939            0 :         Self::SlruBlocks(t)
     940            0 :     }
     941              : }
     942              : 
     943              : #[derive(Hash)]
     944              : struct ChunkProcessingJob {
     945              :     range: Range<Key>,
     946              :     tasks: Vec<AnyImportTask>,
     947              : 
     948              :     pgdata_lsn: Lsn,
     949              : }
     950              : 
     951              : impl ChunkProcessingJob {
     952            0 :     fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, pgdata_lsn: Lsn) -> Self {
     953            0 :         assert!(pgdata_lsn.is_valid());
     954            0 :         Self {
     955            0 :             range,
     956            0 :             tasks,
     957            0 :             pgdata_lsn,
     958            0 :         }
     959            0 :     }
     960              : 
     961            0 :     async fn run(
     962            0 :         self,
     963            0 :         timeline: Arc<Timeline>,
     964            0 :         max_byte_range_size: usize,
     965            0 :         ctx: &RequestContext,
     966            0 :     ) -> anyhow::Result<()> {
     967            0 :         let mut writer = ImageLayerWriter::new(
     968            0 :             timeline.conf,
     969            0 :             timeline.timeline_id,
     970            0 :             timeline.tenant_shard_id,
     971            0 :             &self.range,
     972            0 :             self.pgdata_lsn,
     973            0 :             &timeline.gate,
     974            0 :             timeline.cancel.clone(),
     975            0 :             ctx,
     976            0 :         )
     977            0 :         .await?;
     978              : 
     979            0 :         let mut nimages = 0;
     980            0 :         for task in self.tasks {
     981            0 :             nimages += task.doit(&mut writer, max_byte_range_size, ctx).await?;
     982              :         }
     983              : 
     984            0 :         let resident_layer = if nimages > 0 {
     985            0 :             let (desc, path) = writer.finish(ctx).await?;
     986              : 
     987              :             {
     988            0 :                 let guard = timeline
     989            0 :                     .layers
     990            0 :                     .read(LayerManagerLockHolder::ImportPgData)
     991            0 :                     .await;
     992            0 :                 let existing_layer = guard.try_get_from_key(&desc.key());
     993            0 :                 if let Some(layer) = existing_layer {
     994            0 :                     if layer.metadata().generation == timeline.generation {
     995            0 :                         return Err(anyhow::anyhow!(
     996            0 :                             "Import attempted to rewrite layer file in the same generation: {}",
     997            0 :                             layer.local_path()
     998            0 :                         ));
     999            0 :                     }
    1000            0 :                 }
    1001              :             }
    1002              : 
    1003            0 :             Layer::finish_creating(timeline.conf, &timeline, desc, &path)?
    1004              :         } else {
    1005              :             // dropping the writer cleans up
    1006            0 :             return Ok(());
    1007              :         };
    1008              : 
    1009              :         // The same import job might run multiple times since not each job is checkpointed.
    1010              :         // Hence, we must support the cases where the layer already exists. We cannot be
    1011              :         // certain that the existing layer is identical to the new one, so in that case
    1012              :         // we replace the old layer with the one we just generated.
    1013              : 
    1014            0 :         let mut guard = timeline
    1015            0 :             .layers
    1016            0 :             .write(LayerManagerLockHolder::ImportPgData)
    1017            0 :             .await;
    1018              : 
    1019            0 :         let existing_layer = guard
    1020            0 :             .try_get_from_key(&resident_layer.layer_desc().key())
    1021            0 :             .cloned();
    1022            0 :         match existing_layer {
    1023            0 :             Some(existing) => {
    1024              :                 // Unlink the remote layer from the index without scheduling its deletion.
    1025              :                 // When `existing_layer` drops [`LayerInner::drop`] will schedule its deletion from
    1026              :                 // remote storage, but that assumes that the layer was unlinked from the index first.
    1027            0 :                 timeline
    1028            0 :                     .remote_client
    1029            0 :                     .schedule_unlinking_of_layers_from_index_part(std::iter::once(
    1030            0 :                         existing.layer_desc().layer_name(),
    1031            0 :                     ))?;
    1032              : 
    1033            0 :                 guard.open_mut()?.rewrite_layers(
    1034            0 :                     &[(existing.clone(), resident_layer.clone())],
    1035            0 :                     &[],
    1036            0 :                     &timeline.metrics,
    1037              :                 );
    1038              :             }
    1039              :             None => {
    1040            0 :                 guard
    1041            0 :                     .open_mut()?
    1042            0 :                     .track_new_image_layers(&[resident_layer.clone()], &timeline.metrics);
    1043              :             }
    1044              :         }
    1045              : 
    1046            0 :         crate::tenant::timeline::drop_layer_manager_wlock(guard);
    1047              : 
    1048            0 :         timeline
    1049            0 :             .remote_client
    1050            0 :             .schedule_layer_file_upload(resident_layer)?;
    1051              : 
    1052            0 :         Ok(())
    1053            0 :     }
    1054              : }
        

Generated by: LCOV version 2.1-beta