LCOV - code coverage report
Current view: top level - pageserver/src - import_datadir.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 64.2 % 447 287
Test Date: 2025-02-20 13:11:02 Functions: 41.2 % 34 14

            Line data    Source code
       1              : //!
       2              : //! Import data and WAL from a PostgreSQL data directory and WAL segments into
       3              : //! a neon Timeline.
       4              : //!
       5              : use std::path::{Path, PathBuf};
       6              : 
       7              : use anyhow::{bail, ensure, Context, Result};
       8              : use bytes::Bytes;
       9              : use camino::Utf8Path;
      10              : use futures::StreamExt;
      11              : use pageserver_api::key::rel_block_to_key;
      12              : use tokio::io::{AsyncRead, AsyncReadExt};
      13              : use tokio_tar::Archive;
      14              : use tracing::*;
      15              : use wal_decoder::models::InterpretedWalRecord;
      16              : use walkdir::WalkDir;
      17              : 
      18              : use crate::context::RequestContext;
      19              : use crate::metrics::WAL_INGEST;
      20              : use crate::pgdatadir_mapping::*;
      21              : use crate::tenant::Timeline;
      22              : use crate::walingest::WalIngest;
      23              : use pageserver_api::reltag::{RelTag, SlruKind};
      24              : use postgres_ffi::pg_constants;
      25              : use postgres_ffi::relfile_utils::*;
      26              : use postgres_ffi::waldecoder::WalStreamDecoder;
      27              : use postgres_ffi::ControlFileData;
      28              : use postgres_ffi::DBState_DB_SHUTDOWNED;
      29              : use postgres_ffi::Oid;
      30              : use postgres_ffi::XLogFileName;
      31              : use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
      32              : use utils::lsn::Lsn;
      33              : 
      34              : // Returns checkpoint LSN from controlfile
      35            4 : pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
      36            4 :     // Read control file to extract the LSN
      37            4 :     let controlfile_path = path.join("global").join("pg_control");
      38            4 :     let controlfile_buf = std::fs::read(&controlfile_path)
      39            4 :         .with_context(|| format!("reading controlfile: {controlfile_path}"))?;
      40            4 :     let controlfile = ControlFileData::decode(&controlfile_buf)?;
      41            4 :     let lsn = controlfile.checkPoint;
      42            4 : 
      43            4 :     Ok(Lsn(lsn))
      44            4 : }
      45              : 
      46              : ///
      47              : /// Import all relation data pages from local disk into the repository.
      48              : ///
      49              : /// This is currently only used to import a cluster freshly created by initdb.
      50              : /// The code that deals with the checkpoint would not work right if the
      51              : /// cluster was not shut down cleanly.
      52            4 : pub async fn import_timeline_from_postgres_datadir(
      53            4 :     tline: &Timeline,
      54            4 :     pgdata_path: &Utf8Path,
      55            4 :     pgdata_lsn: Lsn,
      56            4 :     ctx: &RequestContext,
      57            4 : ) -> Result<()> {
      58            4 :     let mut pg_control: Option<ControlFileData> = None;
      59            4 : 
      60            4 :     // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
      61            4 :     // Then fishing out pg_control would be unnecessary
      62            4 :     let mut modification = tline.begin_modification(pgdata_lsn);
      63            4 :     modification.init_empty()?;
      64              : 
      65              :     // Import all but pg_wal
      66            4 :     let all_but_wal = WalkDir::new(pgdata_path)
      67            4 :         .into_iter()
      68         3960 :         .filter_entry(|entry| !entry.path().ends_with("pg_wal"));
      69         3960 :     for entry in all_but_wal {
      70         3956 :         let entry = entry?;
      71         3956 :         let metadata = entry.metadata().expect("error getting dir entry metadata");
      72         3956 :         if metadata.is_file() {
      73         3860 :             let absolute_path = entry.path();
      74         3860 :             let relative_path = absolute_path.strip_prefix(pgdata_path)?;
      75              : 
      76         3860 :             let mut file = tokio::fs::File::open(absolute_path).await?;
      77         3860 :             let len = metadata.len() as usize;
      78            4 :             if let Some(control_file) =
      79         3860 :                 import_file(&mut modification, relative_path, &mut file, len, ctx).await?
      80            4 :             {
      81            4 :                 pg_control = Some(control_file);
      82         3856 :             }
      83         3860 :             modification.flush(ctx).await?;
      84           96 :         }
      85              :     }
      86              : 
      87              :     // We're done importing all the data files.
      88            4 :     modification.commit(ctx).await?;
      89              : 
      90              :     // We expect the Postgres server to be shut down cleanly.
      91            4 :     let pg_control = pg_control.context("pg_control file not found")?;
      92            4 :     ensure!(
      93            4 :         pg_control.state == DBState_DB_SHUTDOWNED,
      94            0 :         "Postgres cluster was not shut down cleanly"
      95              :     );
      96            4 :     ensure!(
      97            4 :         pg_control.checkPointCopy.redo == pgdata_lsn.0,
      98            0 :         "unexpected checkpoint REDO pointer"
      99              :     );
     100              : 
     101              :     // Import WAL. This is needed even when starting from a shutdown checkpoint, because
     102              :     // this reads the checkpoint record itself, advancing the tip of the timeline to
     103              :     // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
     104            4 :     import_wal(
     105            4 :         &pgdata_path.join("pg_wal"),
     106            4 :         tline,
     107            4 :         Lsn(pg_control.checkPointCopy.redo),
     108            4 :         pgdata_lsn,
     109            4 :         ctx,
     110            4 :     )
     111            4 :     .await?;
     112              : 
     113            4 :     Ok(())
     114            4 : }
     115              : 
     116              : // subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
     117         3784 : async fn import_rel(
     118         3784 :     modification: &mut DatadirModification<'_>,
     119         3784 :     path: &Path,
     120         3784 :     spcoid: Oid,
     121         3784 :     dboid: Oid,
     122         3784 :     reader: &mut (impl AsyncRead + Unpin),
     123         3784 :     len: usize,
     124         3784 :     ctx: &RequestContext,
     125         3784 : ) -> anyhow::Result<()> {
     126         3784 :     // Does it look like a relation file?
     127         3784 :     trace!("importing rel file {}", path.display());
     128              : 
     129         3784 :     let filename = &path
     130         3784 :         .file_name()
     131         3784 :         .expect("missing rel filename")
     132         3784 :         .to_string_lossy();
     133         3784 :     let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
     134            0 :         warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
     135            0 :         e
     136         3784 :     })?;
     137              : 
     138         3784 :     let mut buf: [u8; 8192] = [0u8; 8192];
     139         3784 : 
     140         3784 :     ensure!(len % BLCKSZ as usize == 0);
     141         3784 :     let nblocks = len / BLCKSZ as usize;
     142         3784 : 
     143         3784 :     let rel = RelTag {
     144         3784 :         spcnode: spcoid,
     145         3784 :         dbnode: dboid,
     146         3784 :         relnode,
     147         3784 :         forknum,
     148         3784 :     };
     149         3784 : 
     150         3784 :     let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     151              : 
     152              :     // Call put_rel_creation for every segment of the relation,
     153              :     // because there is no guarantee about the order in which we are processing segments.
     154              :     // ignore "relation already exists" error
     155              :     //
     156              :     // FIXME: Keep track of which relations we've already created?
     157              :     // https://github.com/neondatabase/neon/issues/3309
     158         3784 :     if let Err(e) = modification
     159         3784 :         .put_rel_creation(rel, nblocks as u32, ctx)
     160         3784 :         .await
     161              :     {
     162            0 :         match e {
     163              :             RelationError::AlreadyExists => {
     164            0 :                 debug!("Relation {} already exist. We must be extending it.", rel)
     165              :             }
     166            0 :             _ => return Err(e.into()),
     167              :         }
     168         3784 :     }
     169              : 
     170              :     loop {
     171        14664 :         let r = reader.read_exact(&mut buf).await;
     172        14664 :         match r {
     173              :             Ok(_) => {
     174        10880 :                 let key = rel_block_to_key(rel, blknum);
     175        10880 :                 if modification.tline.get_shard_identity().is_key_local(&key) {
     176        10880 :                     modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
     177            0 :                 }
     178              :             }
     179              : 
     180              :             // TODO: UnexpectedEof is expected
     181         3784 :             Err(err) => match err.kind() {
     182              :                 std::io::ErrorKind::UnexpectedEof => {
     183              :                     // reached EOF. That's expected.
     184         3784 :                     let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     185         3784 :                     ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
     186         3784 :                     break;
     187              :                 }
     188              :                 _ => {
     189            0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     190              :                 }
     191              :             },
     192              :         };
     193        10880 :         blknum += 1;
     194              :     }
     195              : 
     196              :     // Update relation size
     197              :     //
     198              :     // If we process rel segments out of order,
     199              :     // put_rel_extend will skip the update.
     200         3784 :     modification.put_rel_extend(rel, blknum, ctx).await?;
     201              : 
     202         3784 :     Ok(())
     203         3784 : }
     204              : 
     205              : /// Import an SLRU segment file
     206              : ///
     207           12 : async fn import_slru(
     208           12 :     modification: &mut DatadirModification<'_>,
     209           12 :     slru: SlruKind,
     210           12 :     path: &Path,
     211           12 :     reader: &mut (impl AsyncRead + Unpin),
     212           12 :     len: usize,
     213           12 :     ctx: &RequestContext,
     214           12 : ) -> anyhow::Result<()> {
     215           12 :     info!("importing slru file {path:?}");
     216              : 
     217           12 :     let mut buf: [u8; 8192] = [0u8; 8192];
     218           12 :     let filename = &path
     219           12 :         .file_name()
     220           12 :         .with_context(|| format!("missing slru filename for path {path:?}"))?
     221           12 :         .to_string_lossy();
     222           12 :     let segno = u32::from_str_radix(filename, 16)?;
     223              : 
     224           12 :     ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
     225           12 :     let nblocks = len / BLCKSZ as usize;
     226           12 : 
     227           12 :     ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
     228              : 
     229           12 :     modification
     230           12 :         .put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
     231           12 :         .await?;
     232              : 
     233           12 :     let mut rpageno = 0;
     234              :     loop {
     235           24 :         let r = reader.read_exact(&mut buf).await;
     236           24 :         match r {
     237              :             Ok(_) => {
     238           12 :                 modification.put_slru_page_image(
     239           12 :                     slru,
     240           12 :                     segno,
     241           12 :                     rpageno,
     242           12 :                     Bytes::copy_from_slice(&buf),
     243           12 :                 )?;
     244              :             }
     245              : 
     246              :             // TODO: UnexpectedEof is expected
     247           12 :             Err(err) => match err.kind() {
     248              :                 std::io::ErrorKind::UnexpectedEof => {
     249              :                     // reached EOF. That's expected.
     250           12 :                     ensure!(rpageno == nblocks as u32, "unexpected EOF");
     251           12 :                     break;
     252              :                 }
     253              :                 _ => {
     254            0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     255              :                 }
     256              :             },
     257              :         };
     258           12 :         rpageno += 1;
     259              :     }
     260              : 
     261           12 :     Ok(())
     262           12 : }
     263              : 
     264              : /// Scan PostgreSQL WAL files in given directory and load all records between
     265              : /// 'startpoint' and 'endpoint' into the repository.
     266            4 : async fn import_wal(
     267            4 :     walpath: &Utf8Path,
     268            4 :     tline: &Timeline,
     269            4 :     startpoint: Lsn,
     270            4 :     endpoint: Lsn,
     271            4 :     ctx: &RequestContext,
     272            4 : ) -> anyhow::Result<()> {
     273            4 :     let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
     274            4 : 
     275            4 :     let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
     276            4 :     let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
     277            4 :     let mut last_lsn = startpoint;
     278              : 
     279            4 :     let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
     280              : 
     281            4 :     let shard = vec![*tline.get_shard_identity()];
     282              : 
     283            8 :     while last_lsn <= endpoint {
     284              :         // FIXME: assume postgresql tli 1 for now
     285            4 :         let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     286            4 :         let mut buf = Vec::new();
     287            4 : 
     288            4 :         // Read local file
     289            4 :         let mut path = walpath.join(&filename);
     290            4 : 
     291            4 :         // It could be as .partial
     292            4 :         if !PathBuf::from(&path).exists() {
     293            0 :             path = walpath.join(filename + ".partial");
     294            4 :         }
     295              : 
     296              :         // Slurp the WAL file
     297            4 :         let mut file = std::fs::File::open(&path)?;
     298              : 
     299            4 :         if offset > 0 {
     300            4 :             use std::io::Seek;
     301            4 :             file.seek(std::io::SeekFrom::Start(offset as u64))?;
     302            0 :         }
     303              : 
     304              :         use std::io::Read;
     305            4 :         let nread = file.read_to_end(&mut buf)?;
     306            4 :         if nread != WAL_SEGMENT_SIZE - offset {
     307              :             // Maybe allow this for .partial files?
     308            0 :             error!("read only {} bytes from WAL file", nread);
     309            4 :         }
     310              : 
     311            4 :         waldecoder.feed_bytes(&buf);
     312            4 : 
     313            4 :         let mut nrecords = 0;
     314            4 :         let mut modification = tline.begin_modification(last_lsn);
     315            8 :         while last_lsn <= endpoint {
     316            4 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     317            4 :                 let interpreted = InterpretedWalRecord::from_bytes_filtered(
     318            4 :                     recdata,
     319            4 :                     &shard,
     320            4 :                     lsn,
     321            4 :                     tline.pg_version,
     322            4 :                 )?
     323            4 :                 .remove(tline.get_shard_identity())
     324            4 :                 .unwrap();
     325            4 : 
     326            4 :                 walingest
     327            4 :                     .ingest_record(interpreted, &mut modification, ctx)
     328            4 :                     .await?;
     329            4 :                 WAL_INGEST.records_committed.inc();
     330            4 : 
     331            4 :                 modification.commit(ctx).await?;
     332            4 :                 last_lsn = lsn;
     333            4 : 
     334            4 :                 nrecords += 1;
     335            4 : 
     336            4 :                 trace!("imported record at {} (end {})", lsn, endpoint);
     337            0 :             }
     338              :         }
     339              : 
     340            4 :         debug!("imported {} records up to {}", nrecords, last_lsn);
     341              : 
     342            4 :         segno += 1;
     343            4 :         offset = 0;
     344              :     }
     345              : 
     346            4 :     if last_lsn != startpoint {
     347            4 :         info!("reached end of WAL at {}", last_lsn);
     348              :     } else {
     349            0 :         info!("no WAL to import at {}", last_lsn);
     350              :     }
     351              : 
     352            4 :     Ok(())
     353            4 : }
     354              : 
     355            0 : pub async fn import_basebackup_from_tar(
     356            0 :     tline: &Timeline,
     357            0 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     358            0 :     base_lsn: Lsn,
     359            0 :     ctx: &RequestContext,
     360            0 : ) -> Result<()> {
     361            0 :     info!("importing base at {base_lsn}");
     362            0 :     let mut modification = tline.begin_modification(base_lsn);
     363            0 :     modification.init_empty()?;
     364              : 
     365            0 :     let mut pg_control: Option<ControlFileData> = None;
     366              : 
     367              :     // Import base
     368            0 :     let mut entries = Archive::new(reader).entries()?;
     369            0 :     while let Some(base_tar_entry) = entries.next().await {
     370            0 :         let mut entry = base_tar_entry?;
     371            0 :         let header = entry.header();
     372            0 :         let len = header.entry_size()? as usize;
     373            0 :         let file_path = header.path()?.into_owned();
     374            0 : 
     375            0 :         match header.entry_type() {
     376              :             tokio_tar::EntryType::Regular => {
     377            0 :                 if let Some(res) =
     378            0 :                     import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
     379            0 :                 {
     380            0 :                     // We found the pg_control file.
     381            0 :                     pg_control = Some(res);
     382            0 :                 }
     383            0 :                 modification.flush(ctx).await?;
     384              :             }
     385              :             tokio_tar::EntryType::Directory => {
     386            0 :                 debug!("directory {:?}", file_path);
     387              :             }
     388              :             _ => {
     389            0 :                 bail!(
     390            0 :                     "entry {} in backup tar archive is of unexpected type: {:?}",
     391            0 :                     file_path.display(),
     392            0 :                     header.entry_type()
     393            0 :                 );
     394              :             }
     395              :         }
     396              :     }
     397              : 
     398              :     // sanity check: ensure that pg_control is loaded
     399            0 :     let _pg_control = pg_control.context("pg_control file not found")?;
     400              : 
     401            0 :     modification.commit(ctx).await?;
     402            0 :     Ok(())
     403            0 : }
     404              : 
     405            0 : pub async fn import_wal_from_tar(
     406            0 :     tline: &Timeline,
     407            0 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     408            0 :     start_lsn: Lsn,
     409            0 :     end_lsn: Lsn,
     410            0 :     ctx: &RequestContext,
     411            0 : ) -> Result<()> {
     412            0 :     // Set up walingest mutable state
     413            0 :     let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
     414            0 :     let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
     415            0 :     let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
     416            0 :     let mut last_lsn = start_lsn;
     417            0 :     let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
     418            0 :     let shard = vec![*tline.get_shard_identity()];
     419            0 : 
     420            0 :     // Ingest wal until end_lsn
     421            0 :     info!("importing wal until {}", end_lsn);
     422            0 :     let mut pg_wal_tar = Archive::new(reader);
     423            0 :     let mut pg_wal_entries = pg_wal_tar.entries()?;
     424            0 :     while last_lsn <= end_lsn {
     425            0 :         let bytes = {
     426            0 :             let mut entry = pg_wal_entries
     427            0 :                 .next()
     428            0 :                 .await
     429            0 :                 .ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
     430            0 :             let header = entry.header();
     431            0 :             let file_path = header.path()?.into_owned();
     432            0 : 
     433            0 :             match header.entry_type() {
     434              :                 tokio_tar::EntryType::Regular => {
     435              :                     // FIXME: assume postgresql tli 1 for now
     436            0 :                     let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     437            0 :                     let file_name = file_path
     438            0 :                         .file_name()
     439            0 :                         .expect("missing wal filename")
     440            0 :                         .to_string_lossy();
     441            0 :                     ensure!(expected_filename == file_name);
     442              : 
     443            0 :                     debug!("processing wal file {:?}", file_path);
     444            0 :                     read_all_bytes(&mut entry).await?
     445              :                 }
     446              :                 tokio_tar::EntryType::Directory => {
     447            0 :                     debug!("directory {:?}", file_path);
     448            0 :                     continue;
     449              :                 }
     450              :                 _ => {
     451            0 :                     bail!(
     452            0 :                         "entry {} in WAL tar archive is of unexpected type: {:?}",
     453            0 :                         file_path.display(),
     454            0 :                         header.entry_type()
     455            0 :                     );
     456              :                 }
     457              :             }
     458              :         };
     459              : 
     460            0 :         waldecoder.feed_bytes(&bytes[offset..]);
     461            0 : 
     462            0 :         let mut modification = tline.begin_modification(last_lsn);
     463            0 :         while last_lsn <= end_lsn {
     464            0 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     465            0 :                 let interpreted = InterpretedWalRecord::from_bytes_filtered(
     466            0 :                     recdata,
     467            0 :                     &shard,
     468            0 :                     lsn,
     469            0 :                     tline.pg_version,
     470            0 :                 )?
     471            0 :                 .remove(tline.get_shard_identity())
     472            0 :                 .unwrap();
     473            0 : 
     474            0 :                 walingest
     475            0 :                     .ingest_record(interpreted, &mut modification, ctx)
     476            0 :                     .await?;
     477            0 :                 modification.commit(ctx).await?;
     478            0 :                 last_lsn = lsn;
     479            0 : 
     480            0 :                 debug!("imported record at {} (end {})", lsn, end_lsn);
     481            0 :             }
     482              :         }
     483              : 
     484            0 :         debug!("imported records up to {}", last_lsn);
     485            0 :         segno += 1;
     486            0 :         offset = 0;
     487              :     }
     488              : 
     489            0 :     if last_lsn != start_lsn {
     490            0 :         info!("reached end of WAL at {}", last_lsn);
     491              :     } else {
     492            0 :         info!("there was no WAL to import at {}", last_lsn);
     493              :     }
     494              : 
     495              :     // Log any extra unused files
     496            0 :     while let Some(e) = pg_wal_entries.next().await {
     497            0 :         let entry = e?;
     498            0 :         let header = entry.header();
     499            0 :         let file_path = header.path()?.into_owned();
     500            0 :         info!("skipping {:?}", file_path);
     501              :     }
     502              : 
     503            0 :     Ok(())
     504            0 : }
     505              : 
     506         3860 : async fn import_file(
     507         3860 :     modification: &mut DatadirModification<'_>,
     508         3860 :     file_path: &Path,
     509         3860 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     510         3860 :     len: usize,
     511         3860 :     ctx: &RequestContext,
     512         3860 : ) -> Result<Option<ControlFileData>> {
     513         3860 :     let file_name = match file_path.file_name() {
     514         3860 :         Some(name) => name.to_string_lossy(),
     515            0 :         None => return Ok(None),
     516              :     };
     517              : 
     518         3860 :     if file_name.starts_with('.') {
     519              :         // tar archives on macOs, created without COPYFILE_DISABLE=1 env var
     520              :         // will contain "fork files", skip them.
     521            0 :         return Ok(None);
     522         3860 :     }
     523         3860 : 
     524         3860 :     if file_path.starts_with("global") {
     525          240 :         let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
     526          240 :         let dbnode = 0;
     527          240 : 
     528          240 :         match file_name.as_ref() {
     529          240 :             "pg_control" => {
     530            4 :                 let bytes = read_all_bytes(reader).await?;
     531              : 
     532              :                 // Extract the checkpoint record and import it separately.
     533            4 :                 let pg_control = ControlFileData::decode(&bytes[..])?;
     534            4 :                 let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
     535            4 :                 modification.put_checkpoint(checkpoint_bytes)?;
     536            4 :                 debug!("imported control file");
     537              : 
     538              :                 // Import it as ControlFile
     539            4 :                 modification.put_control_file(bytes)?;
     540            4 :                 return Ok(Some(pg_control));
     541              :             }
     542          236 :             "pg_filenode.map" => {
     543            4 :                 let bytes = read_all_bytes(reader).await?;
     544            4 :                 modification
     545            4 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     546            4 :                     .await?;
     547            4 :                 debug!("imported relmap file")
     548              :             }
     549          232 :             "PG_VERSION" => {
     550            0 :                 debug!("ignored PG_VERSION file");
     551              :             }
     552              :             _ => {
     553          232 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     554          232 :                 debug!("imported rel creation");
     555              :             }
     556              :         }
     557         3620 :     } else if file_path.starts_with("base") {
     558         3576 :         let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
     559         3576 :         let dbnode: u32 = file_path
     560         3576 :             .iter()
     561         3576 :             .nth(1)
     562         3576 :             .expect("invalid file path, expected dbnode")
     563         3576 :             .to_string_lossy()
     564         3576 :             .parse()?;
     565              : 
     566         3576 :         match file_name.as_ref() {
     567         3576 :             "pg_filenode.map" => {
     568           12 :                 let bytes = read_all_bytes(reader).await?;
     569           12 :                 modification
     570           12 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     571           12 :                     .await?;
     572           12 :                 debug!("imported relmap file")
     573              :             }
     574         3564 :             "PG_VERSION" => {
     575           12 :                 debug!("ignored PG_VERSION file");
     576              :             }
     577              :             _ => {
     578         3552 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     579         3552 :                 debug!("imported rel creation");
     580              :             }
     581              :         }
     582           44 :     } else if file_path.starts_with("pg_xact") {
     583            4 :         let slru = SlruKind::Clog;
     584            4 : 
     585            4 :         if modification.tline.tenant_shard_id.is_shard_zero() {
     586            4 :             import_slru(modification, slru, file_path, reader, len, ctx).await?;
     587            4 :             debug!("imported clog slru");
     588            0 :         }
     589           40 :     } else if file_path.starts_with("pg_multixact/offsets") {
     590            4 :         let slru = SlruKind::MultiXactOffsets;
     591            4 : 
     592            4 :         if modification.tline.tenant_shard_id.is_shard_zero() {
     593            4 :             import_slru(modification, slru, file_path, reader, len, ctx).await?;
     594            4 :             debug!("imported multixact offsets slru");
     595            0 :         }
     596           36 :     } else if file_path.starts_with("pg_multixact/members") {
     597            4 :         let slru = SlruKind::MultiXactMembers;
     598            4 : 
     599            4 :         if modification.tline.tenant_shard_id.is_shard_zero() {
     600            4 :             import_slru(modification, slru, file_path, reader, len, ctx).await?;
     601            4 :             debug!("imported multixact members slru");
     602            0 :         }
     603           32 :     } else if file_path.starts_with("pg_twophase") {
     604            0 :         let bytes = read_all_bytes(reader).await?;
     605              : 
     606              :         // In PostgreSQL v17, this is a 64-bit FullTransactionid. In previous versions,
     607              :         // it's a 32-bit TransactionId, which fits in u64 anyway.
     608            0 :         let xid = u64::from_str_radix(file_name.as_ref(), 16)?;
     609            0 :         modification
     610            0 :             .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
     611            0 :             .await?;
     612            0 :         debug!("imported twophase file");
     613           32 :     } else if file_path.starts_with("pg_wal") {
     614            0 :         debug!("found wal file in base section. ignore it");
     615           32 :     } else if file_path.starts_with("zenith.signal") {
     616              :         // Parse zenith signal file to set correct previous LSN
     617            0 :         let bytes = read_all_bytes(reader).await?;
     618              :         // zenith.signal format is "PREV LSN: prev_lsn"
     619              :         // TODO write serialization and deserialization in the same place.
     620            0 :         let zenith_signal = std::str::from_utf8(&bytes)?.trim();
     621            0 :         let prev_lsn = match zenith_signal {
     622            0 :             "PREV LSN: none" => Lsn(0),
     623            0 :             "PREV LSN: invalid" => Lsn(0),
     624            0 :             other => {
     625            0 :                 let split = other.split(':').collect::<Vec<_>>();
     626            0 :                 split[1]
     627            0 :                     .trim()
     628            0 :                     .parse::<Lsn>()
     629            0 :                     .context("can't parse zenith.signal")?
     630              :             }
     631              :         };
     632              : 
     633              :         // zenith.signal is not necessarily the last file, that we handle
     634              :         // but it is ok to call `finish_write()`, because final `modification.commit()`
     635              :         // will update lsn once more to the final one.
     636            0 :         let writer = modification.tline.writer().await;
     637            0 :         writer.finish_write(prev_lsn);
     638            0 : 
     639            0 :         debug!("imported zenith signal {}", prev_lsn);
     640           32 :     } else if file_path.starts_with("pg_tblspc") {
     641              :         // TODO Backups exported from neon won't have pg_tblspc, but we will need
     642              :         // this to import arbitrary postgres databases.
     643            0 :         bail!("Importing pg_tblspc is not implemented");
     644              :     } else {
     645           32 :         debug!(
     646            0 :             "ignoring unrecognized file \"{}\" in tar archive",
     647            0 :             file_path.display()
     648              :         );
     649              :     }
     650              : 
     651         3856 :     Ok(None)
     652         3860 : }
     653              : 
     654           20 : async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
     655           20 :     let mut buf: Vec<u8> = vec![];
     656           20 :     reader.read_to_end(&mut buf).await?;
     657           20 :     Ok(Bytes::from(buf))
     658           20 : }
        

Generated by: LCOV version 2.1-beta