LCOV - code coverage report
Current view: top level - pageserver/src - import_datadir.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 64.2 % 447 287
Test Date: 2025-03-12 00:01:28 Functions: 41.2 % 34 14

            Line data    Source code
       1              : //!
       2              : //! Import data and WAL from a PostgreSQL data directory and WAL segments into
       3              : //! a neon Timeline.
       4              : //!
       5              : use std::path::{Path, PathBuf};
       6              : 
       7              : use anyhow::{Context, Result, bail, ensure};
       8              : use bytes::Bytes;
       9              : use camino::Utf8Path;
      10              : use futures::StreamExt;
      11              : use pageserver_api::key::rel_block_to_key;
      12              : use pageserver_api::reltag::{RelTag, SlruKind};
      13              : use postgres_ffi::relfile_utils::*;
      14              : use postgres_ffi::waldecoder::WalStreamDecoder;
      15              : use postgres_ffi::{
      16              :     BLCKSZ, ControlFileData, DBState_DB_SHUTDOWNED, Oid, WAL_SEGMENT_SIZE, XLogFileName,
      17              :     pg_constants,
      18              : };
      19              : use tokio::io::{AsyncRead, AsyncReadExt};
      20              : use tokio_tar::Archive;
      21              : use tracing::*;
      22              : use utils::lsn::Lsn;
      23              : use wal_decoder::models::InterpretedWalRecord;
      24              : use walkdir::WalkDir;
      25              : 
      26              : use crate::context::RequestContext;
      27              : use crate::metrics::WAL_INGEST;
      28              : use crate::pgdatadir_mapping::*;
      29              : use crate::tenant::Timeline;
      30              : use crate::walingest::WalIngest;
      31              : 
      32              : // Returns checkpoint LSN from controlfile
      33            4 : pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
      34            4 :     // Read control file to extract the LSN
      35            4 :     let controlfile_path = path.join("global").join("pg_control");
      36            4 :     let controlfile_buf = std::fs::read(&controlfile_path)
      37            4 :         .with_context(|| format!("reading controlfile: {controlfile_path}"))?;
      38            4 :     let controlfile = ControlFileData::decode(&controlfile_buf)?;
      39            4 :     let lsn = controlfile.checkPoint;
      40            4 : 
      41            4 :     Ok(Lsn(lsn))
      42            4 : }
      43              : 
      44              : ///
      45              : /// Import all relation data pages from local disk into the repository.
      46              : ///
      47              : /// This is currently only used to import a cluster freshly created by initdb.
      48              : /// The code that deals with the checkpoint would not work right if the
      49              : /// cluster was not shut down cleanly.
      50            4 : pub async fn import_timeline_from_postgres_datadir(
      51            4 :     tline: &Timeline,
      52            4 :     pgdata_path: &Utf8Path,
      53            4 :     pgdata_lsn: Lsn,
      54            4 :     ctx: &RequestContext,
      55            4 : ) -> Result<()> {
      56            4 :     let mut pg_control: Option<ControlFileData> = None;
      57            4 : 
      58            4 :     // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
      59            4 :     // Then fishing out pg_control would be unnecessary
      60            4 :     let mut modification = tline.begin_modification(pgdata_lsn);
      61            4 :     modification.init_empty()?;
      62              : 
      63              :     // Import all but pg_wal
      64            4 :     let all_but_wal = WalkDir::new(pgdata_path)
      65            4 :         .into_iter()
      66         3960 :         .filter_entry(|entry| !entry.path().ends_with("pg_wal"));
      67         3960 :     for entry in all_but_wal {
      68         3956 :         let entry = entry?;
      69         3956 :         let metadata = entry.metadata().expect("error getting dir entry metadata");
      70         3956 :         if metadata.is_file() {
      71         3860 :             let absolute_path = entry.path();
      72         3860 :             let relative_path = absolute_path.strip_prefix(pgdata_path)?;
      73              : 
      74         3860 :             let mut file = tokio::fs::File::open(absolute_path).await?;
      75         3860 :             let len = metadata.len() as usize;
      76            4 :             if let Some(control_file) =
      77         3860 :                 import_file(&mut modification, relative_path, &mut file, len, ctx).await?
      78            4 :             {
      79            4 :                 pg_control = Some(control_file);
      80         3856 :             }
      81         3860 :             modification.flush(ctx).await?;
      82           96 :         }
      83              :     }
      84              : 
      85              :     // We're done importing all the data files.
      86            4 :     modification.commit(ctx).await?;
      87              : 
      88              :     // We expect the Postgres server to be shut down cleanly.
      89            4 :     let pg_control = pg_control.context("pg_control file not found")?;
      90            4 :     ensure!(
      91            4 :         pg_control.state == DBState_DB_SHUTDOWNED,
      92            0 :         "Postgres cluster was not shut down cleanly"
      93              :     );
      94            4 :     ensure!(
      95            4 :         pg_control.checkPointCopy.redo == pgdata_lsn.0,
      96            0 :         "unexpected checkpoint REDO pointer"
      97              :     );
      98              : 
      99              :     // Import WAL. This is needed even when starting from a shutdown checkpoint, because
     100              :     // this reads the checkpoint record itself, advancing the tip of the timeline to
     101              :     // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
     102            4 :     import_wal(
     103            4 :         &pgdata_path.join("pg_wal"),
     104            4 :         tline,
     105            4 :         Lsn(pg_control.checkPointCopy.redo),
     106            4 :         pgdata_lsn,
     107            4 :         ctx,
     108            4 :     )
     109            4 :     .await?;
     110              : 
     111            4 :     Ok(())
     112            4 : }
     113              : 
     114              : // subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
     115         3784 : async fn import_rel(
     116         3784 :     modification: &mut DatadirModification<'_>,
     117         3784 :     path: &Path,
     118         3784 :     spcoid: Oid,
     119         3784 :     dboid: Oid,
     120         3784 :     reader: &mut (impl AsyncRead + Unpin),
     121         3784 :     len: usize,
     122         3784 :     ctx: &RequestContext,
     123         3784 : ) -> anyhow::Result<()> {
     124         3784 :     // Does it look like a relation file?
     125         3784 :     trace!("importing rel file {}", path.display());
     126              : 
     127         3784 :     let filename = &path
     128         3784 :         .file_name()
     129         3784 :         .expect("missing rel filename")
     130         3784 :         .to_string_lossy();
     131         3784 :     let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
     132            0 :         warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
     133            0 :         e
     134         3784 :     })?;
     135              : 
     136         3784 :     let mut buf: [u8; 8192] = [0u8; 8192];
     137         3784 : 
     138         3784 :     ensure!(len % BLCKSZ as usize == 0);
     139         3784 :     let nblocks = len / BLCKSZ as usize;
     140         3784 : 
     141         3784 :     let rel = RelTag {
     142         3784 :         spcnode: spcoid,
     143         3784 :         dbnode: dboid,
     144         3784 :         relnode,
     145         3784 :         forknum,
     146         3784 :     };
     147         3784 : 
     148         3784 :     let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     149              : 
     150              :     // Call put_rel_creation for every segment of the relation,
     151              :     // because there is no guarantee about the order in which we are processing segments.
     152              :     // ignore "relation already exists" error
     153              :     //
     154              :     // FIXME: Keep track of which relations we've already created?
     155              :     // https://github.com/neondatabase/neon/issues/3309
     156         3784 :     if let Err(e) = modification
     157         3784 :         .put_rel_creation(rel, nblocks as u32, ctx)
     158         3784 :         .await
     159              :     {
     160            0 :         match e {
     161              :             RelationError::AlreadyExists => {
     162            0 :                 debug!("Relation {} already exist. We must be extending it.", rel)
     163              :             }
     164            0 :             _ => return Err(e.into()),
     165              :         }
     166         3784 :     }
     167              : 
     168              :     loop {
     169        14664 :         let r = reader.read_exact(&mut buf).await;
     170        14664 :         match r {
     171              :             Ok(_) => {
     172        10880 :                 let key = rel_block_to_key(rel, blknum);
     173        10880 :                 if modification.tline.get_shard_identity().is_key_local(&key) {
     174        10880 :                     modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
     175            0 :                 }
     176              :             }
     177              : 
     178              :             // TODO: UnexpectedEof is expected
     179         3784 :             Err(err) => match err.kind() {
     180              :                 std::io::ErrorKind::UnexpectedEof => {
     181              :                     // reached EOF. That's expected.
     182         3784 :                     let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     183         3784 :                     ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
     184         3784 :                     break;
     185              :                 }
     186              :                 _ => {
     187            0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     188              :                 }
     189              :             },
     190              :         };
     191        10880 :         blknum += 1;
     192              :     }
     193              : 
     194              :     // Update relation size
     195              :     //
     196              :     // If we process rel segments out of order,
     197              :     // put_rel_extend will skip the update.
     198         3784 :     modification.put_rel_extend(rel, blknum, ctx).await?;
     199              : 
     200         3784 :     Ok(())
     201         3784 : }
     202              : 
     203              : /// Import an SLRU segment file
     204              : ///
     205           12 : async fn import_slru(
     206           12 :     modification: &mut DatadirModification<'_>,
     207           12 :     slru: SlruKind,
     208           12 :     path: &Path,
     209           12 :     reader: &mut (impl AsyncRead + Unpin),
     210           12 :     len: usize,
     211           12 :     ctx: &RequestContext,
     212           12 : ) -> anyhow::Result<()> {
     213           12 :     info!("importing slru file {path:?}");
     214              : 
     215           12 :     let mut buf: [u8; 8192] = [0u8; 8192];
     216           12 :     let filename = &path
     217           12 :         .file_name()
     218           12 :         .with_context(|| format!("missing slru filename for path {path:?}"))?
     219           12 :         .to_string_lossy();
     220           12 :     let segno = u32::from_str_radix(filename, 16)?;
     221              : 
     222           12 :     ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
     223           12 :     let nblocks = len / BLCKSZ as usize;
     224           12 : 
     225           12 :     ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
     226              : 
     227           12 :     modification
     228           12 :         .put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
     229           12 :         .await?;
     230              : 
     231           12 :     let mut rpageno = 0;
     232              :     loop {
     233           24 :         let r = reader.read_exact(&mut buf).await;
     234           24 :         match r {
     235              :             Ok(_) => {
     236           12 :                 modification.put_slru_page_image(
     237           12 :                     slru,
     238           12 :                     segno,
     239           12 :                     rpageno,
     240           12 :                     Bytes::copy_from_slice(&buf),
     241           12 :                 )?;
     242              :             }
     243              : 
     244              :             // TODO: UnexpectedEof is expected
     245           12 :             Err(err) => match err.kind() {
     246              :                 std::io::ErrorKind::UnexpectedEof => {
     247              :                     // reached EOF. That's expected.
     248           12 :                     ensure!(rpageno == nblocks as u32, "unexpected EOF");
     249           12 :                     break;
     250              :                 }
     251              :                 _ => {
     252            0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     253              :                 }
     254              :             },
     255              :         };
     256           12 :         rpageno += 1;
     257              :     }
     258              : 
     259           12 :     Ok(())
     260           12 : }
     261              : 
     262              : /// Scan PostgreSQL WAL files in given directory and load all records between
     263              : /// 'startpoint' and 'endpoint' into the repository.
     264            4 : async fn import_wal(
     265            4 :     walpath: &Utf8Path,
     266            4 :     tline: &Timeline,
     267            4 :     startpoint: Lsn,
     268            4 :     endpoint: Lsn,
     269            4 :     ctx: &RequestContext,
     270            4 : ) -> anyhow::Result<()> {
     271            4 :     let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
     272            4 : 
     273            4 :     let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
     274            4 :     let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
     275            4 :     let mut last_lsn = startpoint;
     276              : 
     277            4 :     let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
     278              : 
     279            4 :     let shard = vec![*tline.get_shard_identity()];
     280              : 
     281            8 :     while last_lsn <= endpoint {
     282              :         // FIXME: assume postgresql tli 1 for now
     283            4 :         let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     284            4 :         let mut buf = Vec::new();
     285            4 : 
     286            4 :         // Read local file
     287            4 :         let mut path = walpath.join(&filename);
     288            4 : 
     289            4 :         // It could be as .partial
     290            4 :         if !PathBuf::from(&path).exists() {
     291            0 :             path = walpath.join(filename + ".partial");
     292            4 :         }
     293              : 
     294              :         // Slurp the WAL file
     295            4 :         let mut file = std::fs::File::open(&path)?;
     296              : 
     297            4 :         if offset > 0 {
     298            4 :             use std::io::Seek;
     299            4 :             file.seek(std::io::SeekFrom::Start(offset as u64))?;
     300            0 :         }
     301              : 
     302              :         use std::io::Read;
     303            4 :         let nread = file.read_to_end(&mut buf)?;
     304            4 :         if nread != WAL_SEGMENT_SIZE - offset {
     305              :             // Maybe allow this for .partial files?
     306            0 :             error!("read only {} bytes from WAL file", nread);
     307            4 :         }
     308              : 
     309            4 :         waldecoder.feed_bytes(&buf);
     310            4 : 
     311            4 :         let mut nrecords = 0;
     312            4 :         let mut modification = tline.begin_modification(last_lsn);
     313            8 :         while last_lsn <= endpoint {
     314            4 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     315            4 :                 let interpreted = InterpretedWalRecord::from_bytes_filtered(
     316            4 :                     recdata,
     317            4 :                     &shard,
     318            4 :                     lsn,
     319            4 :                     tline.pg_version,
     320            4 :                 )?
     321            4 :                 .remove(tline.get_shard_identity())
     322            4 :                 .unwrap();
     323            4 : 
     324            4 :                 walingest
     325            4 :                     .ingest_record(interpreted, &mut modification, ctx)
     326            4 :                     .await?;
     327            4 :                 WAL_INGEST.records_committed.inc();
     328            4 : 
     329            4 :                 modification.commit(ctx).await?;
     330            4 :                 last_lsn = lsn;
     331            4 : 
     332            4 :                 nrecords += 1;
     333            4 : 
     334            4 :                 trace!("imported record at {} (end {})", lsn, endpoint);
     335            0 :             }
     336              :         }
     337              : 
     338            4 :         debug!("imported {} records up to {}", nrecords, last_lsn);
     339              : 
     340            4 :         segno += 1;
     341            4 :         offset = 0;
     342              :     }
     343              : 
     344            4 :     if last_lsn != startpoint {
     345            4 :         info!("reached end of WAL at {}", last_lsn);
     346              :     } else {
     347            0 :         info!("no WAL to import at {}", last_lsn);
     348              :     }
     349              : 
     350            4 :     Ok(())
     351            4 : }
     352              : 
     353            0 : pub async fn import_basebackup_from_tar(
     354            0 :     tline: &Timeline,
     355            0 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     356            0 :     base_lsn: Lsn,
     357            0 :     ctx: &RequestContext,
     358            0 : ) -> Result<()> {
     359            0 :     info!("importing base at {base_lsn}");
     360            0 :     let mut modification = tline.begin_modification(base_lsn);
     361            0 :     modification.init_empty()?;
     362              : 
     363            0 :     let mut pg_control: Option<ControlFileData> = None;
     364              : 
     365              :     // Import base
     366            0 :     let mut entries = Archive::new(reader).entries()?;
     367            0 :     while let Some(base_tar_entry) = entries.next().await {
     368            0 :         let mut entry = base_tar_entry?;
     369            0 :         let header = entry.header();
     370            0 :         let len = header.entry_size()? as usize;
     371            0 :         let file_path = header.path()?.into_owned();
     372            0 : 
     373            0 :         match header.entry_type() {
     374              :             tokio_tar::EntryType::Regular => {
     375            0 :                 if let Some(res) =
     376            0 :                     import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
     377            0 :                 {
     378            0 :                     // We found the pg_control file.
     379            0 :                     pg_control = Some(res);
     380            0 :                 }
     381            0 :                 modification.flush(ctx).await?;
     382              :             }
     383              :             tokio_tar::EntryType::Directory => {
     384            0 :                 debug!("directory {:?}", file_path);
     385              :             }
     386              :             _ => {
     387            0 :                 bail!(
     388            0 :                     "entry {} in backup tar archive is of unexpected type: {:?}",
     389            0 :                     file_path.display(),
     390            0 :                     header.entry_type()
     391            0 :                 );
     392              :             }
     393              :         }
     394              :     }
     395              : 
     396              :     // sanity check: ensure that pg_control is loaded
     397            0 :     let _pg_control = pg_control.context("pg_control file not found")?;
     398              : 
     399            0 :     modification.commit(ctx).await?;
     400            0 :     Ok(())
     401            0 : }
     402              : 
     403            0 : pub async fn import_wal_from_tar(
     404            0 :     tline: &Timeline,
     405            0 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     406            0 :     start_lsn: Lsn,
     407            0 :     end_lsn: Lsn,
     408            0 :     ctx: &RequestContext,
     409            0 : ) -> Result<()> {
     410            0 :     // Set up walingest mutable state
     411            0 :     let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
     412            0 :     let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
     413            0 :     let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
     414            0 :     let mut last_lsn = start_lsn;
     415            0 :     let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
     416            0 :     let shard = vec![*tline.get_shard_identity()];
     417            0 : 
     418            0 :     // Ingest wal until end_lsn
     419            0 :     info!("importing wal until {}", end_lsn);
     420            0 :     let mut pg_wal_tar = Archive::new(reader);
     421            0 :     let mut pg_wal_entries = pg_wal_tar.entries()?;
     422            0 :     while last_lsn <= end_lsn {
     423            0 :         let bytes = {
     424            0 :             let mut entry = pg_wal_entries
     425            0 :                 .next()
     426            0 :                 .await
     427            0 :                 .ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
     428            0 :             let header = entry.header();
     429            0 :             let file_path = header.path()?.into_owned();
     430            0 : 
     431            0 :             match header.entry_type() {
     432              :                 tokio_tar::EntryType::Regular => {
     433              :                     // FIXME: assume postgresql tli 1 for now
     434            0 :                     let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     435            0 :                     let file_name = file_path
     436            0 :                         .file_name()
     437            0 :                         .expect("missing wal filename")
     438            0 :                         .to_string_lossy();
     439            0 :                     ensure!(expected_filename == file_name);
     440              : 
     441            0 :                     debug!("processing wal file {:?}", file_path);
     442            0 :                     read_all_bytes(&mut entry).await?
     443              :                 }
     444              :                 tokio_tar::EntryType::Directory => {
     445            0 :                     debug!("directory {:?}", file_path);
     446            0 :                     continue;
     447              :                 }
     448              :                 _ => {
     449            0 :                     bail!(
     450            0 :                         "entry {} in WAL tar archive is of unexpected type: {:?}",
     451            0 :                         file_path.display(),
     452            0 :                         header.entry_type()
     453            0 :                     );
     454              :                 }
     455              :             }
     456              :         };
     457              : 
     458            0 :         waldecoder.feed_bytes(&bytes[offset..]);
     459            0 : 
     460            0 :         let mut modification = tline.begin_modification(last_lsn);
     461            0 :         while last_lsn <= end_lsn {
     462            0 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     463            0 :                 let interpreted = InterpretedWalRecord::from_bytes_filtered(
     464            0 :                     recdata,
     465            0 :                     &shard,
     466            0 :                     lsn,
     467            0 :                     tline.pg_version,
     468            0 :                 )?
     469            0 :                 .remove(tline.get_shard_identity())
     470            0 :                 .unwrap();
     471            0 : 
     472            0 :                 walingest
     473            0 :                     .ingest_record(interpreted, &mut modification, ctx)
     474            0 :                     .await?;
     475            0 :                 modification.commit(ctx).await?;
     476            0 :                 last_lsn = lsn;
     477            0 : 
     478            0 :                 debug!("imported record at {} (end {})", lsn, end_lsn);
     479            0 :             }
     480              :         }
     481              : 
     482            0 :         debug!("imported records up to {}", last_lsn);
     483            0 :         segno += 1;
     484            0 :         offset = 0;
     485              :     }
     486              : 
     487            0 :     if last_lsn != start_lsn {
     488            0 :         info!("reached end of WAL at {}", last_lsn);
     489              :     } else {
     490            0 :         info!("there was no WAL to import at {}", last_lsn);
     491              :     }
     492              : 
     493              :     // Log any extra unused files
     494            0 :     while let Some(e) = pg_wal_entries.next().await {
     495            0 :         let entry = e?;
     496            0 :         let header = entry.header();
     497            0 :         let file_path = header.path()?.into_owned();
     498            0 :         info!("skipping {:?}", file_path);
     499              :     }
     500              : 
     501            0 :     Ok(())
     502            0 : }
     503              : 
     504         3860 : async fn import_file(
     505         3860 :     modification: &mut DatadirModification<'_>,
     506         3860 :     file_path: &Path,
     507         3860 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     508         3860 :     len: usize,
     509         3860 :     ctx: &RequestContext,
     510         3860 : ) -> Result<Option<ControlFileData>> {
     511         3860 :     let file_name = match file_path.file_name() {
     512         3860 :         Some(name) => name.to_string_lossy(),
     513            0 :         None => return Ok(None),
     514              :     };
     515              : 
     516         3860 :     if file_name.starts_with('.') {
     517              :         // tar archives on macOs, created without COPYFILE_DISABLE=1 env var
     518              :         // will contain "fork files", skip them.
     519            0 :         return Ok(None);
     520         3860 :     }
     521         3860 : 
     522         3860 :     if file_path.starts_with("global") {
     523          240 :         let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
     524          240 :         let dbnode = 0;
     525          240 : 
     526          240 :         match file_name.as_ref() {
     527          240 :             "pg_control" => {
     528            4 :                 let bytes = read_all_bytes(reader).await?;
     529              : 
     530              :                 // Extract the checkpoint record and import it separately.
     531            4 :                 let pg_control = ControlFileData::decode(&bytes[..])?;
     532            4 :                 let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
     533            4 :                 modification.put_checkpoint(checkpoint_bytes)?;
     534            4 :                 debug!("imported control file");
     535              : 
     536              :                 // Import it as ControlFile
     537            4 :                 modification.put_control_file(bytes)?;
     538            4 :                 return Ok(Some(pg_control));
     539              :             }
     540          236 :             "pg_filenode.map" => {
     541            4 :                 let bytes = read_all_bytes(reader).await?;
     542            4 :                 modification
     543            4 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     544            4 :                     .await?;
     545            4 :                 debug!("imported relmap file")
     546              :             }
     547          232 :             "PG_VERSION" => {
     548            0 :                 debug!("ignored PG_VERSION file");
     549              :             }
     550              :             _ => {
     551          232 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     552          232 :                 debug!("imported rel creation");
     553              :             }
     554              :         }
     555         3620 :     } else if file_path.starts_with("base") {
     556         3576 :         let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
     557         3576 :         let dbnode: u32 = file_path
     558         3576 :             .iter()
     559         3576 :             .nth(1)
     560         3576 :             .expect("invalid file path, expected dbnode")
     561         3576 :             .to_string_lossy()
     562         3576 :             .parse()?;
     563              : 
     564         3576 :         match file_name.as_ref() {
     565         3576 :             "pg_filenode.map" => {
     566           12 :                 let bytes = read_all_bytes(reader).await?;
     567           12 :                 modification
     568           12 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     569           12 :                     .await?;
     570           12 :                 debug!("imported relmap file")
     571              :             }
     572         3564 :             "PG_VERSION" => {
     573           12 :                 debug!("ignored PG_VERSION file");
     574              :             }
     575              :             _ => {
     576         3552 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     577         3552 :                 debug!("imported rel creation");
     578              :             }
     579              :         }
     580           44 :     } else if file_path.starts_with("pg_xact") {
     581            4 :         let slru = SlruKind::Clog;
     582            4 : 
     583            4 :         if modification.tline.tenant_shard_id.is_shard_zero() {
     584            4 :             import_slru(modification, slru, file_path, reader, len, ctx).await?;
     585            4 :             debug!("imported clog slru");
     586            0 :         }
     587           40 :     } else if file_path.starts_with("pg_multixact/offsets") {
     588            4 :         let slru = SlruKind::MultiXactOffsets;
     589            4 : 
     590            4 :         if modification.tline.tenant_shard_id.is_shard_zero() {
     591            4 :             import_slru(modification, slru, file_path, reader, len, ctx).await?;
     592            4 :             debug!("imported multixact offsets slru");
     593            0 :         }
     594           36 :     } else if file_path.starts_with("pg_multixact/members") {
     595            4 :         let slru = SlruKind::MultiXactMembers;
     596            4 : 
     597            4 :         if modification.tline.tenant_shard_id.is_shard_zero() {
     598            4 :             import_slru(modification, slru, file_path, reader, len, ctx).await?;
     599            4 :             debug!("imported multixact members slru");
     600            0 :         }
     601           32 :     } else if file_path.starts_with("pg_twophase") {
     602            0 :         let bytes = read_all_bytes(reader).await?;
     603              : 
     604              :         // In PostgreSQL v17, this is a 64-bit FullTransactionid. In previous versions,
     605              :         // it's a 32-bit TransactionId, which fits in u64 anyway.
     606            0 :         let xid = u64::from_str_radix(file_name.as_ref(), 16)?;
     607            0 :         modification
     608            0 :             .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
     609            0 :             .await?;
     610            0 :         debug!("imported twophase file");
     611           32 :     } else if file_path.starts_with("pg_wal") {
     612            0 :         debug!("found wal file in base section. ignore it");
     613           32 :     } else if file_path.starts_with("zenith.signal") {
     614              :         // Parse zenith signal file to set correct previous LSN
     615            0 :         let bytes = read_all_bytes(reader).await?;
     616              :         // zenith.signal format is "PREV LSN: prev_lsn"
     617              :         // TODO write serialization and deserialization in the same place.
     618            0 :         let zenith_signal = std::str::from_utf8(&bytes)?.trim();
     619            0 :         let prev_lsn = match zenith_signal {
     620            0 :             "PREV LSN: none" => Lsn(0),
     621            0 :             "PREV LSN: invalid" => Lsn(0),
     622            0 :             other => {
     623            0 :                 let split = other.split(':').collect::<Vec<_>>();
     624            0 :                 split[1]
     625            0 :                     .trim()
     626            0 :                     .parse::<Lsn>()
     627            0 :                     .context("can't parse zenith.signal")?
     628              :             }
     629              :         };
     630              : 
     631              :         // zenith.signal is not necessarily the last file, that we handle
     632              :         // but it is ok to call `finish_write()`, because final `modification.commit()`
     633              :         // will update lsn once more to the final one.
     634            0 :         let writer = modification.tline.writer().await;
     635            0 :         writer.finish_write(prev_lsn);
     636            0 : 
     637            0 :         debug!("imported zenith signal {}", prev_lsn);
     638           32 :     } else if file_path.starts_with("pg_tblspc") {
     639              :         // TODO Backups exported from neon won't have pg_tblspc, but we will need
     640              :         // this to import arbitrary postgres databases.
     641            0 :         bail!("Importing pg_tblspc is not implemented");
     642              :     } else {
     643           32 :         debug!(
     644            0 :             "ignoring unrecognized file \"{}\" in tar archive",
     645            0 :             file_path.display()
     646              :         );
     647              :     }
     648              : 
     649         3856 :     Ok(None)
     650         3860 : }
     651              : 
     652           20 : async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
     653           20 :     let mut buf: Vec<u8> = vec![];
     654           20 :     reader.read_to_end(&mut buf).await?;
     655           20 :     Ok(Bytes::from(buf))
     656           20 : }
        

Generated by: LCOV version 2.1-beta