LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline/import_pgdata - flow.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 0.0 % 406 0
Test Date: 2024-11-25 17:48:16 Functions: 0.0 % 53 0

            Line data    Source code
       1              : //! Import a PGDATA directory into an empty root timeline.
       2              : //!
       3              : //! This module is adapted hackathon code by Heikki and Stas.
       4              : //! Other code in the parent module was written by Christian as part of a customer PoC.
       5              : //!
       6              : //! The hackathon code was producing image layer files as a free-standing program.
       7              : //!
       8              : //! It has been modified to
       9              : //! - run inside a running Pageserver, within the proper lifecycles of Timeline -> Tenant(Shard)
      10              : //! - => sharding-awareness: produce image layers with only the data relevant for this shard
      11              : //! - => S3 as the source for the PGDATA instead of local filesystem
      12              : //!
      13              : //! TODOs before productionization:
      14              : //! - ChunkProcessingJob size / ImportJob::total_size does not account for sharding.
      15              : //!   => produced image layers likely too small.
      16              : //! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size.
      17              : //! - asserts / unwraps need to be replaced with errors
      18              : //! - don't trust remote objects will be small (=prevent OOMs in those cases)
      19              : //!     - limit all in-memory buffers in size, or download to disk and read from there
      20              : //! - limit task concurrency
      21              : //! - generally play nice with other tenants in the system
      22              : //!   - importbucket is different bucket than main pageserver storage, so, should be fine wrt S3 rate limits
      23              : //!   - but concerns like network bandwidth, local disk write bandwidth, local disk capacity, etc
      24              : //! - integrate with layer eviction system
      25              : //! - audit for Tenant::cancel nor Timeline::cancel responsivity
      26              : //! - audit for Tenant/Timeline gate holding (we spawn tokio tasks during this flow!)
      27              : //!
      28              : //! An incomplete set of TODOs from the Hackathon:
      29              : //! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest)
      30              : 
      31              : use std::sync::Arc;
      32              : 
      33              : use anyhow::{bail, ensure};
      34              : use bytes::Bytes;
      35              : 
      36              : use itertools::Itertools;
      37              : use pageserver_api::{
      38              :     key::{rel_block_to_key, rel_dir_to_key, rel_size_to_key, relmap_file_key, DBDIR_KEY},
      39              :     reltag::RelTag,
      40              :     shard::ShardIdentity,
      41              : };
      42              : use postgres_ffi::{pg_constants, relfile_utils::parse_relfilename, BLCKSZ};
      43              : use tokio::task::JoinSet;
      44              : use tracing::{debug, info_span, instrument, Instrument};
      45              : 
      46              : use crate::{
      47              :     assert_u64_eq_usize::UsizeIsU64,
      48              :     pgdatadir_mapping::{SlruSegmentDirectory, TwoPhaseDirectory},
      49              : };
      50              : use crate::{
      51              :     context::{DownloadBehavior, RequestContext},
      52              :     pgdatadir_mapping::{DbDirectory, RelDirectory},
      53              :     task_mgr::TaskKind,
      54              :     tenant::storage_layer::{ImageLayerWriter, Layer},
      55              : };
      56              : 
      57              : use pageserver_api::key::Key;
      58              : use pageserver_api::key::{
      59              :     slru_block_to_key, slru_dir_to_key, slru_segment_size_to_key, CHECKPOINT_KEY, CONTROLFILE_KEY,
      60              :     TWOPHASEDIR_KEY,
      61              : };
      62              : use pageserver_api::keyspace::singleton_range;
      63              : use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range};
      64              : use pageserver_api::reltag::SlruKind;
      65              : use utils::bin_ser::BeSer;
      66              : use utils::lsn::Lsn;
      67              : 
      68              : use std::collections::HashSet;
      69              : use std::ops::Range;
      70              : 
      71              : use super::{
      72              :     importbucket_client::{ControlFile, RemoteStorageWrapper},
      73              :     Timeline,
      74              : };
      75              : 
      76              : use remote_storage::RemotePath;
      77              : 
      78            0 : pub async fn run(
      79            0 :     timeline: Arc<Timeline>,
      80            0 :     pgdata_lsn: Lsn,
      81            0 :     control_file: ControlFile,
      82            0 :     storage: RemoteStorageWrapper,
      83            0 :     ctx: &RequestContext,
      84            0 : ) -> anyhow::Result<()> {
      85            0 :     Flow {
      86            0 :         timeline,
      87            0 :         pgdata_lsn,
      88            0 :         control_file,
      89            0 :         tasks: Vec::new(),
      90            0 :         storage,
      91            0 :     }
      92            0 :     .run(ctx)
      93            0 :     .await
      94            0 : }
      95              : 
      96              : struct Flow {
      97              :     timeline: Arc<Timeline>,
      98              :     pgdata_lsn: Lsn,
      99              :     control_file: ControlFile,
     100              :     tasks: Vec<AnyImportTask>,
     101              :     storage: RemoteStorageWrapper,
     102              : }
     103              : 
     104              : impl Flow {
     105              :     /// Perform the ingestion into [`Self::timeline`].
     106              :     /// Assumes the timeline is empty (= no layers).
     107            0 :     pub async fn run(mut self, ctx: &RequestContext) -> anyhow::Result<()> {
     108            0 :         let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
     109            0 : 
     110            0 :         self.pgdata_lsn = pgdata_lsn;
     111              : 
     112            0 :         let datadir = PgDataDir::new(&self.storage).await?;
     113              : 
     114              :         // Import dbdir (00:00:00 keyspace)
     115              :         // This is just constructed here, but will be written to the image layer in the first call to import_db()
     116            0 :         let dbdir_buf = Bytes::from(DbDirectory::ser(&DbDirectory {
     117            0 :             dbdirs: datadir
     118            0 :                 .dbs
     119            0 :                 .iter()
     120            0 :                 .map(|db| ((db.spcnode, db.dboid), true))
     121            0 :                 .collect(),
     122            0 :         })?);
     123            0 :         self.tasks
     124            0 :             .push(ImportSingleKeyTask::new(DBDIR_KEY, dbdir_buf).into());
     125              : 
     126              :         // Import databases (00:spcnode:dbnode keyspace for each db)
     127            0 :         for db in datadir.dbs {
     128            0 :             self.import_db(&db).await?;
     129              :         }
     130              : 
     131              :         // Import SLRUs
     132              : 
     133              :         // pg_xact (01:00 keyspace)
     134            0 :         self.import_slru(SlruKind::Clog, &self.storage.pgdata().join("pg_xact"))
     135            0 :             .await?;
     136              :         // pg_multixact/members (01:01 keyspace)
     137            0 :         self.import_slru(
     138            0 :             SlruKind::MultiXactMembers,
     139            0 :             &self.storage.pgdata().join("pg_multixact/members"),
     140            0 :         )
     141            0 :         .await?;
     142              :         // pg_multixact/offsets (01:02 keyspace)
     143            0 :         self.import_slru(
     144            0 :             SlruKind::MultiXactOffsets,
     145            0 :             &self.storage.pgdata().join("pg_multixact/offsets"),
     146            0 :         )
     147            0 :         .await?;
     148              : 
     149              :         // Import pg_twophase.
     150              :         // TODO: as empty
     151            0 :         let twophasedir_buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory {
     152            0 :             xids: HashSet::new(),
     153            0 :         })?;
     154            0 :         self.tasks
     155            0 :             .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     156            0 :                 TWOPHASEDIR_KEY,
     157            0 :                 Bytes::from(twophasedir_buf),
     158            0 :             )));
     159            0 : 
     160            0 :         // Controlfile, checkpoint
     161            0 :         self.tasks
     162            0 :             .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     163            0 :                 CONTROLFILE_KEY,
     164            0 :                 self.control_file.control_file_buf().clone(),
     165            0 :             )));
     166              : 
     167            0 :         let checkpoint_buf = self
     168            0 :             .control_file
     169            0 :             .control_file_data()
     170            0 :             .checkPointCopy
     171            0 :             .encode()?;
     172            0 :         self.tasks
     173            0 :             .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     174            0 :                 CHECKPOINT_KEY,
     175            0 :                 checkpoint_buf,
     176            0 :             )));
     177            0 : 
     178            0 :         // Assigns parts of key space to later parallel jobs
     179            0 :         let mut last_end_key = Key::MIN;
     180            0 :         let mut current_chunk = Vec::new();
     181            0 :         let mut current_chunk_size: usize = 0;
     182            0 :         let mut parallel_jobs = Vec::new();
     183            0 :         for task in std::mem::take(&mut self.tasks).into_iter() {
     184            0 :             if current_chunk_size + task.total_size() > 1024 * 1024 * 1024 {
     185            0 :                 let key_range = last_end_key..task.key_range().start;
     186            0 :                 parallel_jobs.push(ChunkProcessingJob::new(
     187            0 :                     key_range.clone(),
     188            0 :                     std::mem::take(&mut current_chunk),
     189            0 :                     &self,
     190            0 :                 ));
     191            0 :                 last_end_key = key_range.end;
     192            0 :                 current_chunk_size = 0;
     193            0 :             }
     194            0 :             current_chunk_size += task.total_size();
     195            0 :             current_chunk.push(task);
     196              :         }
     197            0 :         parallel_jobs.push(ChunkProcessingJob::new(
     198            0 :             last_end_key..Key::MAX,
     199            0 :             current_chunk,
     200            0 :             &self,
     201            0 :         ));
     202            0 : 
     203            0 :         // Start all jobs simultaneosly
     204            0 :         let mut work = JoinSet::new();
     205              :         // TODO: semaphore?
     206            0 :         for job in parallel_jobs {
     207            0 :             let ctx: RequestContext =
     208            0 :                 ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Error);
     209            0 :             work.spawn(async move { job.run(&ctx).await }.instrument(info_span!("parallel_job")));
     210              :         }
     211            0 :         let mut results = Vec::new();
     212            0 :         while let Some(result) = work.join_next().await {
     213            0 :             match result {
     214            0 :                 Ok(res) => {
     215            0 :                     results.push(res);
     216            0 :                 }
     217            0 :                 Err(_joinset_err) => {
     218            0 :                     results.push(Err(anyhow::anyhow!(
     219            0 :                         "parallel job panicked or cancelled, check pageserver logs"
     220            0 :                     )));
     221            0 :                 }
     222              :             }
     223              :         }
     224              : 
     225            0 :         if results.iter().all(|r| r.is_ok()) {
     226            0 :             Ok(())
     227              :         } else {
     228            0 :             let mut msg = String::new();
     229            0 :             for result in results {
     230            0 :                 if let Err(err) = result {
     231            0 :                     msg.push_str(&format!("{err:?}\n\n"));
     232            0 :                 }
     233              :             }
     234            0 :             bail!("Some parallel jobs failed:\n\n{msg}");
     235              :         }
     236            0 :     }
     237              : 
     238            0 :     #[instrument(level = tracing::Level::DEBUG, skip_all, fields(dboid=%db.dboid, tablespace=%db.spcnode, path=%db.path))]
     239              :     async fn import_db(&mut self, db: &PgDataDirDb) -> anyhow::Result<()> {
     240              :         debug!("start");
     241              :         scopeguard::defer! {
     242              :             debug!("return");
     243              :         }
     244              : 
     245              :         // Import relmap (00:spcnode:dbnode:00:*:00)
     246              :         let relmap_key = relmap_file_key(db.spcnode, db.dboid);
     247              :         debug!("Constructing relmap entry, key {relmap_key}");
     248              :         let relmap_path = db.path.join("pg_filenode.map");
     249              :         let relmap_buf = self.storage.get(&relmap_path).await?;
     250              :         self.tasks
     251              :             .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     252              :                 relmap_key, relmap_buf,
     253              :             )));
     254              : 
     255              :         // Import reldir (00:spcnode:dbnode:00:*:01)
     256              :         let reldir_key = rel_dir_to_key(db.spcnode, db.dboid);
     257              :         debug!("Constructing reldirs entry, key {reldir_key}");
     258              :         let reldir_buf = RelDirectory::ser(&RelDirectory {
     259              :             rels: db
     260              :                 .files
     261              :                 .iter()
     262            0 :                 .map(|f| (f.rel_tag.relnode, f.rel_tag.forknum))
     263              :                 .collect(),
     264              :         })?;
     265              :         self.tasks
     266              :             .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     267              :                 reldir_key,
     268              :                 Bytes::from(reldir_buf),
     269              :             )));
     270              : 
     271              :         // Import data (00:spcnode:dbnode:reloid:fork:blk) and set sizes for each last
     272              :         // segment in a given relation (00:spcnode:dbnode:reloid:fork:ff)
     273              :         for file in &db.files {
     274              :             debug!(%file.path, %file.filesize, "importing file");
     275              :             let len = file.filesize;
     276              :             ensure!(len % 8192 == 0);
     277              :             let start_blk: u32 = file.segno * (1024 * 1024 * 1024 / 8192);
     278              :             let start_key = rel_block_to_key(file.rel_tag, start_blk);
     279              :             let end_key = rel_block_to_key(file.rel_tag, start_blk + (len / 8192) as u32);
     280              :             self.tasks
     281              :                 .push(AnyImportTask::RelBlocks(ImportRelBlocksTask::new(
     282              :                     *self.timeline.get_shard_identity(),
     283              :                     start_key..end_key,
     284              :                     &file.path,
     285              :                     self.storage.clone(),
     286              :                 )));
     287              : 
     288              :             // Set relsize for the last segment (00:spcnode:dbnode:reloid:fork:ff)
     289              :             if let Some(nblocks) = file.nblocks {
     290              :                 let size_key = rel_size_to_key(file.rel_tag);
     291              :                 //debug!("Setting relation size (path={path}, rel_tag={rel_tag}, segno={segno}) to {nblocks}, key {size_key}");
     292              :                 let buf = nblocks.to_le_bytes();
     293              :                 self.tasks
     294              :                     .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     295              :                         size_key,
     296              :                         Bytes::from(buf.to_vec()),
     297              :                     )));
     298              :             }
     299              :         }
     300              : 
     301              :         Ok(())
     302              :     }
     303              : 
     304            0 :     async fn import_slru(&mut self, kind: SlruKind, path: &RemotePath) -> anyhow::Result<()> {
     305            0 :         let segments = self.storage.listfilesindir(path).await?;
     306            0 :         let segments: Vec<(String, u32, usize)> = segments
     307            0 :             .into_iter()
     308            0 :             .filter_map(|(path, size)| {
     309            0 :                 let filename = path.object_name()?;
     310            0 :                 let segno = u32::from_str_radix(filename, 16).ok()?;
     311            0 :                 Some((filename.to_string(), segno, size))
     312            0 :             })
     313            0 :             .collect();
     314            0 : 
     315            0 :         // Write SlruDir
     316            0 :         let slrudir_key = slru_dir_to_key(kind);
     317            0 :         let segnos: HashSet<u32> = segments
     318            0 :             .iter()
     319            0 :             .map(|(_path, segno, _size)| *segno)
     320            0 :             .collect();
     321            0 :         let slrudir = SlruSegmentDirectory { segments: segnos };
     322            0 :         let slrudir_buf = SlruSegmentDirectory::ser(&slrudir)?;
     323            0 :         self.tasks
     324            0 :             .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     325            0 :                 slrudir_key,
     326            0 :                 Bytes::from(slrudir_buf),
     327            0 :             )));
     328              : 
     329            0 :         for (segpath, segno, size) in segments {
     330              :             // SlruSegBlocks for each segment
     331            0 :             let p = path.join(&segpath);
     332            0 :             let file_size = size;
     333            0 :             ensure!(file_size % 8192 == 0);
     334            0 :             let nblocks = u32::try_from(file_size / 8192)?;
     335            0 :             let start_key = slru_block_to_key(kind, segno, 0);
     336            0 :             let end_key = slru_block_to_key(kind, segno, nblocks);
     337            0 :             debug!(%p, segno=%segno, %size, %start_key, %end_key, "scheduling SLRU segment");
     338            0 :             self.tasks
     339            0 :                 .push(AnyImportTask::SlruBlocks(ImportSlruBlocksTask::new(
     340            0 :                     *self.timeline.get_shard_identity(),
     341            0 :                     start_key..end_key,
     342            0 :                     &p,
     343            0 :                     self.storage.clone(),
     344            0 :                 )));
     345            0 : 
     346            0 :             // Followed by SlruSegSize
     347            0 :             let segsize_key = slru_segment_size_to_key(kind, segno);
     348            0 :             let segsize_buf = nblocks.to_le_bytes();
     349            0 :             self.tasks
     350            0 :                 .push(AnyImportTask::SingleKey(ImportSingleKeyTask::new(
     351            0 :                     segsize_key,
     352            0 :                     Bytes::copy_from_slice(&segsize_buf),
     353            0 :                 )));
     354              :         }
     355            0 :         Ok(())
     356            0 :     }
     357              : }
     358              : 
     359              : //
     360              : // dbdir iteration tools
     361              : //
     362              : 
     363              : struct PgDataDir {
     364              :     pub dbs: Vec<PgDataDirDb>, // spcnode, dboid, path
     365              : }
     366              : 
     367              : struct PgDataDirDb {
     368              :     pub spcnode: u32,
     369              :     pub dboid: u32,
     370              :     pub path: RemotePath,
     371              :     pub files: Vec<PgDataDirDbFile>,
     372              : }
     373              : 
     374              : struct PgDataDirDbFile {
     375              :     pub path: RemotePath,
     376              :     pub rel_tag: RelTag,
     377              :     pub segno: u32,
     378              :     pub filesize: usize,
     379              :     // Cummulative size of the given fork, set only for the last segment of that fork
     380              :     pub nblocks: Option<usize>,
     381              : }
     382              : 
     383              : impl PgDataDir {
     384            0 :     async fn new(storage: &RemoteStorageWrapper) -> anyhow::Result<Self> {
     385            0 :         let datadir_path = storage.pgdata();
     386            0 :         // Import ordinary databases, DEFAULTTABLESPACE_OID is smaller than GLOBALTABLESPACE_OID, so import them first
     387            0 :         // Traverse database in increasing oid order
     388            0 : 
     389            0 :         let basedir = &datadir_path.join("base");
     390            0 :         let db_oids: Vec<_> = storage
     391            0 :             .listdir(basedir)
     392            0 :             .await?
     393            0 :             .into_iter()
     394            0 :             .filter_map(|path| path.object_name().and_then(|name| name.parse::<u32>().ok()))
     395            0 :             .sorted()
     396            0 :             .collect();
     397            0 :         debug!(?db_oids, "found databases");
     398            0 :         let mut databases = Vec::new();
     399            0 :         for dboid in db_oids {
     400            0 :             databases.push(
     401            0 :                 PgDataDirDb::new(
     402            0 :                     storage,
     403            0 :                     &basedir.join(dboid.to_string()),
     404            0 :                     pg_constants::DEFAULTTABLESPACE_OID,
     405            0 :                     dboid,
     406            0 :                     &datadir_path,
     407            0 :                 )
     408            0 :                 .await?,
     409              :             );
     410              :         }
     411              : 
     412              :         // special case for global catalogs
     413            0 :         databases.push(
     414            0 :             PgDataDirDb::new(
     415            0 :                 storage,
     416            0 :                 &datadir_path.join("global"),
     417            0 :                 postgres_ffi::pg_constants::GLOBALTABLESPACE_OID,
     418            0 :                 0,
     419            0 :                 &datadir_path,
     420            0 :             )
     421            0 :             .await?,
     422              :         );
     423              : 
     424            0 :         databases.sort_by_key(|db| (db.spcnode, db.dboid));
     425            0 : 
     426            0 :         Ok(Self { dbs: databases })
     427            0 :     }
     428              : }
     429              : 
     430              : impl PgDataDirDb {
     431            0 :     #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%dboid, %db_path))]
     432              :     async fn new(
     433              :         storage: &RemoteStorageWrapper,
     434              :         db_path: &RemotePath,
     435              :         spcnode: u32,
     436              :         dboid: u32,
     437              :         datadir_path: &RemotePath,
     438              :     ) -> anyhow::Result<Self> {
     439              :         let mut files: Vec<PgDataDirDbFile> = storage
     440              :             .listfilesindir(db_path)
     441              :             .await?
     442              :             .into_iter()
     443            0 :             .filter_map(|(path, size)| {
     444            0 :                 debug!(%path, %size, "found file in dbdir");
     445            0 :                 path.object_name().and_then(|name| {
     446            0 :                     // returns (relnode, forknum, segno)
     447            0 :                     parse_relfilename(name).ok().map(|x| (size, x))
     448            0 :                 })
     449            0 :             })
     450            0 :             .sorted_by_key(|(_, relfilename)| *relfilename)
     451            0 :             .map(|(filesize, (relnode, forknum, segno))| {
     452            0 :                 let rel_tag = RelTag {
     453            0 :                     spcnode,
     454            0 :                     dbnode: dboid,
     455            0 :                     relnode,
     456            0 :                     forknum,
     457            0 :                 };
     458            0 : 
     459            0 :                 let path = datadir_path.join(rel_tag.to_segfile_name(segno));
     460            0 :                 assert!(filesize % BLCKSZ as usize == 0); // TODO: this should result in an error
     461            0 :                 let nblocks = filesize / BLCKSZ as usize;
     462            0 : 
     463            0 :                 PgDataDirDbFile {
     464            0 :                     path,
     465            0 :                     filesize,
     466            0 :                     rel_tag,
     467            0 :                     segno,
     468            0 :                     nblocks: Some(nblocks), // first non-cummulative sizes
     469            0 :                 }
     470            0 :             })
     471              :             .collect();
     472              : 
     473              :         // Set cummulative sizes. Do all of that math here, so that later we could easier
     474              :         // parallelize over segments and know with which segments we need to write relsize
     475              :         // entry.
     476              :         let mut cumulative_nblocks: usize = 0;
     477              :         let mut prev_rel_tag: Option<RelTag> = None;
     478              :         for i in 0..files.len() {
     479              :             if prev_rel_tag == Some(files[i].rel_tag) {
     480              :                 cumulative_nblocks += files[i].nblocks.unwrap();
     481              :             } else {
     482              :                 cumulative_nblocks = files[i].nblocks.unwrap();
     483              :             }
     484              : 
     485              :             files[i].nblocks = if i == files.len() - 1 || files[i + 1].rel_tag != files[i].rel_tag {
     486              :                 Some(cumulative_nblocks)
     487              :             } else {
     488              :                 None
     489              :             };
     490              : 
     491              :             prev_rel_tag = Some(files[i].rel_tag);
     492              :         }
     493              : 
     494              :         Ok(PgDataDirDb {
     495              :             files,
     496              :             path: db_path.clone(),
     497              :             spcnode,
     498              :             dboid,
     499              :         })
     500              :     }
     501              : }
     502              : 
     503              : trait ImportTask {
     504              :     fn key_range(&self) -> Range<Key>;
     505              : 
     506            0 :     fn total_size(&self) -> usize {
     507            0 :         // TODO: revisit this
     508            0 :         if is_contiguous_range(&self.key_range()) {
     509            0 :             contiguous_range_len(&self.key_range()) as usize * 8192
     510              :         } else {
     511            0 :             u32::MAX as usize
     512              :         }
     513            0 :     }
     514              : 
     515              :     async fn doit(
     516              :         self,
     517              :         layer_writer: &mut ImageLayerWriter,
     518              :         ctx: &RequestContext,
     519              :     ) -> anyhow::Result<usize>;
     520              : }
     521              : 
     522              : struct ImportSingleKeyTask {
     523              :     key: Key,
     524              :     buf: Bytes,
     525              : }
     526              : 
     527              : impl ImportSingleKeyTask {
     528            0 :     fn new(key: Key, buf: Bytes) -> Self {
     529            0 :         ImportSingleKeyTask { key, buf }
     530            0 :     }
     531              : }
     532              : 
     533              : impl ImportTask for ImportSingleKeyTask {
     534            0 :     fn key_range(&self) -> Range<Key> {
     535            0 :         singleton_range(self.key)
     536            0 :     }
     537              : 
     538            0 :     async fn doit(
     539            0 :         self,
     540            0 :         layer_writer: &mut ImageLayerWriter,
     541            0 :         ctx: &RequestContext,
     542            0 :     ) -> anyhow::Result<usize> {
     543            0 :         layer_writer.put_image(self.key, self.buf, ctx).await?;
     544            0 :         Ok(1)
     545            0 :     }
     546              : }
     547              : 
     548              : struct ImportRelBlocksTask {
     549              :     shard_identity: ShardIdentity,
     550              :     key_range: Range<Key>,
     551              :     path: RemotePath,
     552              :     storage: RemoteStorageWrapper,
     553              : }
     554              : 
     555              : impl ImportRelBlocksTask {
     556            0 :     fn new(
     557            0 :         shard_identity: ShardIdentity,
     558            0 :         key_range: Range<Key>,
     559            0 :         path: &RemotePath,
     560            0 :         storage: RemoteStorageWrapper,
     561            0 :     ) -> Self {
     562            0 :         ImportRelBlocksTask {
     563            0 :             shard_identity,
     564            0 :             key_range,
     565            0 :             path: path.clone(),
     566            0 :             storage,
     567            0 :         }
     568            0 :     }
     569              : }
     570              : 
     571              : impl ImportTask for ImportRelBlocksTask {
     572            0 :     fn key_range(&self) -> Range<Key> {
     573            0 :         self.key_range.clone()
     574            0 :     }
     575              : 
     576            0 :     #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%self.path))]
     577              :     async fn doit(
     578              :         self,
     579              :         layer_writer: &mut ImageLayerWriter,
     580              :         ctx: &RequestContext,
     581              :     ) -> anyhow::Result<usize> {
     582              :         debug!("Importing relation file");
     583              : 
     584              :         let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?;
     585              :         let (rel_tag_end, end_blk) = self.key_range.end.to_rel_block()?;
     586              :         assert_eq!(rel_tag, rel_tag_end);
     587              : 
     588              :         let ranges = (start_blk..end_blk)
     589              :             .enumerate()
     590            0 :             .filter_map(|(i, blknum)| {
     591            0 :                 let key = rel_block_to_key(rel_tag, blknum);
     592            0 :                 if self.shard_identity.is_key_disposable(&key) {
     593            0 :                     return None;
     594            0 :                 }
     595            0 :                 let file_offset = i.checked_mul(8192).unwrap();
     596            0 :                 Some((
     597            0 :                     vec![key],
     598            0 :                     file_offset,
     599            0 :                     file_offset.checked_add(8192).unwrap(),
     600            0 :                 ))
     601            0 :             })
     602            0 :             .coalesce(|(mut acc, acc_start, acc_end), (mut key, start, end)| {
     603            0 :                 assert_eq!(key.len(), 1);
     604            0 :                 assert!(!acc.is_empty());
     605            0 :                 assert!(acc_end > acc_start);
     606            0 :                 if acc_end == start /* TODO additional max range check here, to limit memory consumption per task to X */ {
     607            0 :                     acc.push(key.pop().unwrap());
     608            0 :                     Ok((acc, acc_start, end))
     609              :                 } else {
     610            0 :                     Err(((acc, acc_start, acc_end), (key, start, end)))
     611              :                 }
     612            0 :             });
     613              : 
     614              :         let mut nimages = 0;
     615              :         for (keys, range_start, range_end) in ranges {
     616              :             let range_buf = self
     617              :                 .storage
     618              :                 .get_range(&self.path, range_start.into_u64(), range_end.into_u64())
     619              :                 .await?;
     620              :             let mut buf = Bytes::from(range_buf);
     621              :             // TODO: batched writes
     622              :             for key in keys {
     623              :                 let image = buf.split_to(8192);
     624              :                 layer_writer.put_image(key, image, ctx).await?;
     625              :                 nimages += 1;
     626              :             }
     627              :         }
     628              : 
     629              :         Ok(nimages)
     630              :     }
     631              : }
     632              : 
     633              : struct ImportSlruBlocksTask {
     634              :     shard_identity: ShardIdentity,
     635              :     key_range: Range<Key>,
     636              :     path: RemotePath,
     637              :     storage: RemoteStorageWrapper,
     638              : }
     639              : 
     640              : impl ImportSlruBlocksTask {
     641            0 :     fn new(
     642            0 :         shard_identity: ShardIdentity,
     643            0 :         key_range: Range<Key>,
     644            0 :         path: &RemotePath,
     645            0 :         storage: RemoteStorageWrapper,
     646            0 :     ) -> Self {
     647            0 :         ImportSlruBlocksTask {
     648            0 :             shard_identity,
     649            0 :             key_range,
     650            0 :             path: path.clone(),
     651            0 :             storage,
     652            0 :         }
     653            0 :     }
     654              : }
     655              : 
     656              : impl ImportTask for ImportSlruBlocksTask {
     657            0 :     fn key_range(&self) -> Range<Key> {
     658            0 :         self.key_range.clone()
     659            0 :     }
     660              : 
     661            0 :     async fn doit(
     662            0 :         self,
     663            0 :         layer_writer: &mut ImageLayerWriter,
     664            0 :         ctx: &RequestContext,
     665            0 :     ) -> anyhow::Result<usize> {
     666            0 :         debug!("Importing SLRU segment file {}", self.path);
     667            0 :         let buf = self.storage.get(&self.path).await?;
     668              : 
     669            0 :         let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?;
     670            0 :         let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?;
     671            0 :         let mut blknum = start_blk;
     672            0 :         let mut nimages = 0;
     673            0 :         let mut file_offset = 0;
     674            0 :         while blknum < end_blk {
     675            0 :             let key = slru_block_to_key(kind, segno, blknum);
     676            0 :             assert!(
     677            0 :                 !self.shard_identity.is_key_disposable(&key),
     678            0 :                 "SLRU keys need to go into every shard"
     679              :             );
     680            0 :             let buf = &buf[file_offset..(file_offset + 8192)];
     681            0 :             file_offset += 8192;
     682            0 :             layer_writer
     683            0 :                 .put_image(key, Bytes::copy_from_slice(buf), ctx)
     684            0 :                 .await?;
     685            0 :             blknum += 1;
     686            0 :             nimages += 1;
     687              :         }
     688            0 :         Ok(nimages)
     689            0 :     }
     690              : }
     691              : 
     692              : enum AnyImportTask {
     693              :     SingleKey(ImportSingleKeyTask),
     694              :     RelBlocks(ImportRelBlocksTask),
     695              :     SlruBlocks(ImportSlruBlocksTask),
     696              : }
     697              : 
     698              : impl ImportTask for AnyImportTask {
     699            0 :     fn key_range(&self) -> Range<Key> {
     700            0 :         match self {
     701            0 :             Self::SingleKey(t) => t.key_range(),
     702            0 :             Self::RelBlocks(t) => t.key_range(),
     703            0 :             Self::SlruBlocks(t) => t.key_range(),
     704              :         }
     705            0 :     }
     706              :     /// returns the number of images put into the `layer_writer`
     707            0 :     async fn doit(
     708            0 :         self,
     709            0 :         layer_writer: &mut ImageLayerWriter,
     710            0 :         ctx: &RequestContext,
     711            0 :     ) -> anyhow::Result<usize> {
     712            0 :         match self {
     713            0 :             Self::SingleKey(t) => t.doit(layer_writer, ctx).await,
     714            0 :             Self::RelBlocks(t) => t.doit(layer_writer, ctx).await,
     715            0 :             Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await,
     716              :         }
     717            0 :     }
     718              : }
     719              : 
     720              : impl From<ImportSingleKeyTask> for AnyImportTask {
     721            0 :     fn from(t: ImportSingleKeyTask) -> Self {
     722            0 :         Self::SingleKey(t)
     723            0 :     }
     724              : }
     725              : 
     726              : impl From<ImportRelBlocksTask> for AnyImportTask {
     727            0 :     fn from(t: ImportRelBlocksTask) -> Self {
     728            0 :         Self::RelBlocks(t)
     729            0 :     }
     730              : }
     731              : 
     732              : impl From<ImportSlruBlocksTask> for AnyImportTask {
     733            0 :     fn from(t: ImportSlruBlocksTask) -> Self {
     734            0 :         Self::SlruBlocks(t)
     735            0 :     }
     736              : }
     737              : 
     738              : struct ChunkProcessingJob {
     739              :     timeline: Arc<Timeline>,
     740              :     range: Range<Key>,
     741              :     tasks: Vec<AnyImportTask>,
     742              : 
     743              :     pgdata_lsn: Lsn,
     744              : }
     745              : 
     746              : impl ChunkProcessingJob {
     747            0 :     fn new(range: Range<Key>, tasks: Vec<AnyImportTask>, env: &Flow) -> Self {
     748            0 :         assert!(env.pgdata_lsn.is_valid());
     749            0 :         Self {
     750            0 :             timeline: env.timeline.clone(),
     751            0 :             range,
     752            0 :             tasks,
     753            0 :             pgdata_lsn: env.pgdata_lsn,
     754            0 :         }
     755            0 :     }
     756              : 
     757            0 :     async fn run(self, ctx: &RequestContext) -> anyhow::Result<()> {
     758            0 :         let mut writer = ImageLayerWriter::new(
     759            0 :             self.timeline.conf,
     760            0 :             self.timeline.timeline_id,
     761            0 :             self.timeline.tenant_shard_id,
     762            0 :             &self.range,
     763            0 :             self.pgdata_lsn,
     764            0 :             ctx,
     765            0 :         )
     766            0 :         .await?;
     767              : 
     768            0 :         let mut nimages = 0;
     769            0 :         for task in self.tasks {
     770            0 :             nimages += task.doit(&mut writer, ctx).await?;
     771              :         }
     772              : 
     773            0 :         let resident_layer = if nimages > 0 {
     774            0 :             let (desc, path) = writer.finish(ctx).await?;
     775            0 :             Layer::finish_creating(self.timeline.conf, &self.timeline, desc, &path)?
     776              :         } else {
     777              :             // dropping the writer cleans up
     778            0 :             return Ok(());
     779              :         };
     780              : 
     781              :         // this is sharing the same code as create_image_layers
     782            0 :         let mut guard = self.timeline.layers.write().await;
     783            0 :         guard
     784            0 :             .open_mut()?
     785            0 :             .track_new_image_layers(&[resident_layer.clone()], &self.timeline.metrics);
     786            0 :         crate::tenant::timeline::drop_wlock(guard);
     787            0 : 
     788            0 :         // Schedule the layer for upload but don't add barriers such as
     789            0 :         // wait for completion or index upload, so we don't inhibit upload parallelism.
     790            0 :         // TODO: limit upload parallelism somehow (e.g. by limiting concurrency of jobs?)
     791            0 :         // TODO: or regulate parallelism by upload queue depth? Prob should happen at a higher level.
     792            0 :         self.timeline
     793            0 :             .remote_client
     794            0 :             .schedule_layer_file_upload(resident_layer)?;
     795              : 
     796            0 :         Ok(())
     797            0 :     }
     798              : }
        

Generated by: LCOV version 2.1-beta