LCOV - code coverage report
Current view: top level - pageserver/src - import_datadir.rs (source / functions) Coverage Total Hit
Test: b4ae4c4857f9ef3e144e982a35ee23bc84c71983.info Lines: 63.8 % 423 270
Test Date: 2024-10-22 22:13:45 Functions: 41.2 % 34 14

            Line data    Source code
       1              : //!
       2              : //! Import data and WAL from a PostgreSQL data directory and WAL segments into
       3              : //! a neon Timeline.
       4              : //!
       5              : use std::path::{Path, PathBuf};
       6              : 
       7              : use anyhow::{bail, ensure, Context, Result};
       8              : use bytes::Bytes;
       9              : use camino::Utf8Path;
      10              : use futures::StreamExt;
      11              : use pageserver_api::key::rel_block_to_key;
      12              : use tokio::io::{AsyncRead, AsyncReadExt};
      13              : use tokio_tar::Archive;
      14              : use tracing::*;
      15              : use walkdir::WalkDir;
      16              : 
      17              : use crate::context::RequestContext;
      18              : use crate::metrics::WAL_INGEST;
      19              : use crate::pgdatadir_mapping::*;
      20              : use crate::tenant::Timeline;
      21              : use crate::walingest::WalIngest;
      22              : use crate::walrecord::decode_wal_record;
      23              : use crate::walrecord::DecodedWALRecord;
      24              : use pageserver_api::reltag::{RelTag, SlruKind};
      25              : use postgres_ffi::pg_constants;
      26              : use postgres_ffi::relfile_utils::*;
      27              : use postgres_ffi::waldecoder::WalStreamDecoder;
      28              : use postgres_ffi::ControlFileData;
      29              : use postgres_ffi::DBState_DB_SHUTDOWNED;
      30              : use postgres_ffi::Oid;
      31              : use postgres_ffi::XLogFileName;
      32              : use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
      33              : use utils::lsn::Lsn;
      34              : 
      35              : // Returns checkpoint LSN from controlfile
      36            2 : pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
      37            2 :     // Read control file to extract the LSN
      38            2 :     let controlfile_path = path.join("global").join("pg_control");
      39            2 :     let controlfile_buf = std::fs::read(&controlfile_path)
      40            2 :         .with_context(|| format!("reading controlfile: {controlfile_path}"))?;
      41            2 :     let controlfile = ControlFileData::decode(&controlfile_buf)?;
      42            2 :     let lsn = controlfile.checkPoint;
      43            2 : 
      44            2 :     Ok(Lsn(lsn))
      45            2 : }
      46              : 
      47              : ///
      48              : /// Import all relation data pages from local disk into the repository.
      49              : ///
      50              : /// This is currently only used to import a cluster freshly created by initdb.
      51              : /// The code that deals with the checkpoint would not work right if the
      52              : /// cluster was not shut down cleanly.
      53            2 : pub async fn import_timeline_from_postgres_datadir(
      54            2 :     tline: &Timeline,
      55            2 :     pgdata_path: &Utf8Path,
      56            2 :     pgdata_lsn: Lsn,
      57            2 :     ctx: &RequestContext,
      58            2 : ) -> Result<()> {
      59            2 :     let mut pg_control: Option<ControlFileData> = None;
      60            2 : 
      61            2 :     // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
      62            2 :     // Then fishing out pg_control would be unnecessary
      63            2 :     let mut modification = tline.begin_modification(pgdata_lsn);
      64            2 :     modification.init_empty()?;
      65              : 
      66              :     // Import all but pg_wal
      67            2 :     let all_but_wal = WalkDir::new(pgdata_path)
      68            2 :         .into_iter()
      69         1980 :         .filter_entry(|entry| !entry.path().ends_with("pg_wal"));
      70         1980 :     for entry in all_but_wal {
      71         1978 :         let entry = entry?;
      72         1978 :         let metadata = entry.metadata().expect("error getting dir entry metadata");
      73         1978 :         if metadata.is_file() {
      74         1930 :             let absolute_path = entry.path();
      75         1930 :             let relative_path = absolute_path.strip_prefix(pgdata_path)?;
      76              : 
      77         1930 :             let mut file = tokio::fs::File::open(absolute_path).await?;
      78         1930 :             let len = metadata.len() as usize;
      79            2 :             if let Some(control_file) =
      80         6971 :                 import_file(&mut modification, relative_path, &mut file, len, ctx).await?
      81            2 :             {
      82            2 :                 pg_control = Some(control_file);
      83         1928 :             }
      84         1930 :             modification.flush(ctx).await?;
      85           48 :         }
      86              :     }
      87              : 
      88              :     // We're done importing all the data files.
      89          348 :     modification.commit(ctx).await?;
      90              : 
      91              :     // We expect the Postgres server to be shut down cleanly.
      92            2 :     let pg_control = pg_control.context("pg_control file not found")?;
      93            2 :     ensure!(
      94            2 :         pg_control.state == DBState_DB_SHUTDOWNED,
      95            0 :         "Postgres cluster was not shut down cleanly"
      96              :     );
      97            2 :     ensure!(
      98            2 :         pg_control.checkPointCopy.redo == pgdata_lsn.0,
      99            0 :         "unexpected checkpoint REDO pointer"
     100              :     );
     101              : 
     102              :     // Import WAL. This is needed even when starting from a shutdown checkpoint, because
     103              :     // this reads the checkpoint record itself, advancing the tip of the timeline to
     104              :     // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
     105            2 :     import_wal(
     106            2 :         &pgdata_path.join("pg_wal"),
     107            2 :         tline,
     108            2 :         Lsn(pg_control.checkPointCopy.redo),
     109            2 :         pgdata_lsn,
     110            2 :         ctx,
     111            2 :     )
     112            0 :     .await?;
     113              : 
     114            2 :     Ok(())
     115            2 : }
     116              : 
     117              : // subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
     118         1892 : async fn import_rel(
     119         1892 :     modification: &mut DatadirModification<'_>,
     120         1892 :     path: &Path,
     121         1892 :     spcoid: Oid,
     122         1892 :     dboid: Oid,
     123         1892 :     reader: &mut (impl AsyncRead + Unpin),
     124         1892 :     len: usize,
     125         1892 :     ctx: &RequestContext,
     126         1892 : ) -> anyhow::Result<()> {
     127         1892 :     // Does it look like a relation file?
     128         1892 :     trace!("importing rel file {}", path.display());
     129              : 
     130         1892 :     let filename = &path
     131         1892 :         .file_name()
     132         1892 :         .expect("missing rel filename")
     133         1892 :         .to_string_lossy();
     134         1892 :     let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
     135            0 :         warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
     136            0 :         e
     137         1892 :     })?;
     138              : 
     139         1892 :     let mut buf: [u8; 8192] = [0u8; 8192];
     140         1892 : 
     141         1892 :     ensure!(len % BLCKSZ as usize == 0);
     142         1892 :     let nblocks = len / BLCKSZ as usize;
     143         1892 : 
     144         1892 :     let rel = RelTag {
     145         1892 :         spcnode: spcoid,
     146         1892 :         dbnode: dboid,
     147         1892 :         relnode,
     148         1892 :         forknum,
     149         1892 :     };
     150         1892 : 
     151         1892 :     let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     152              : 
     153              :     // Call put_rel_creation for every segment of the relation,
     154              :     // because there is no guarantee about the order in which we are processing segments.
     155              :     // ignore "relation already exists" error
     156              :     //
     157              :     // FIXME: Keep track of which relations we've already created?
     158              :     // https://github.com/neondatabase/neon/issues/3309
     159         1892 :     if let Err(e) = modification
     160         1892 :         .put_rel_creation(rel, nblocks as u32, ctx)
     161            0 :         .await
     162              :     {
     163            0 :         match e {
     164              :             RelationError::AlreadyExists => {
     165            0 :                 debug!("Relation {} already exist. We must be extending it.", rel)
     166              :             }
     167            0 :             _ => return Err(e.into()),
     168              :         }
     169         1892 :     }
     170              : 
     171              :     loop {
     172         7332 :         let r = reader.read_exact(&mut buf).await;
     173         7332 :         match r {
     174              :             Ok(_) => {
     175         5440 :                 let key = rel_block_to_key(rel, blknum);
     176         5440 :                 if modification.tline.get_shard_identity().is_key_local(&key) {
     177         5440 :                     modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
     178            0 :                 }
     179              :             }
     180              : 
     181              :             // TODO: UnexpectedEof is expected
     182         1892 :             Err(err) => match err.kind() {
     183              :                 std::io::ErrorKind::UnexpectedEof => {
     184              :                     // reached EOF. That's expected.
     185         1892 :                     let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     186         1892 :                     ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
     187         1892 :                     break;
     188              :                 }
     189              :                 _ => {
     190            0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     191              :                 }
     192              :             },
     193              :         };
     194         5440 :         blknum += 1;
     195              :     }
     196              : 
     197              :     // Update relation size
     198              :     //
     199              :     // If we process rel segments out of order,
     200              :     // put_rel_extend will skip the update.
     201         1892 :     modification.put_rel_extend(rel, blknum, ctx).await?;
     202              : 
     203         1892 :     Ok(())
     204         1892 : }
     205              : 
     206              : /// Import an SLRU segment file
     207              : ///
     208            6 : async fn import_slru(
     209            6 :     modification: &mut DatadirModification<'_>,
     210            6 :     slru: SlruKind,
     211            6 :     path: &Path,
     212            6 :     reader: &mut (impl AsyncRead + Unpin),
     213            6 :     len: usize,
     214            6 :     ctx: &RequestContext,
     215            6 : ) -> anyhow::Result<()> {
     216            6 :     info!("importing slru file {path:?}");
     217              : 
     218            6 :     let mut buf: [u8; 8192] = [0u8; 8192];
     219            6 :     let filename = &path
     220            6 :         .file_name()
     221            6 :         .with_context(|| format!("missing slru filename for path {path:?}"))?
     222            6 :         .to_string_lossy();
     223            6 :     let segno = u32::from_str_radix(filename, 16)?;
     224              : 
     225            6 :     ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
     226            6 :     let nblocks = len / BLCKSZ as usize;
     227            6 : 
     228            6 :     ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
     229              : 
     230            6 :     modification
     231            6 :         .put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
     232            0 :         .await?;
     233              : 
     234            6 :     let mut rpageno = 0;
     235              :     loop {
     236           12 :         let r = reader.read_exact(&mut buf).await;
     237           12 :         match r {
     238              :             Ok(_) => {
     239            6 :                 modification.put_slru_page_image(
     240            6 :                     slru,
     241            6 :                     segno,
     242            6 :                     rpageno,
     243            6 :                     Bytes::copy_from_slice(&buf),
     244            6 :                 )?;
     245              :             }
     246              : 
     247              :             // TODO: UnexpectedEof is expected
     248            6 :             Err(err) => match err.kind() {
     249              :                 std::io::ErrorKind::UnexpectedEof => {
     250              :                     // reached EOF. That's expected.
     251            6 :                     ensure!(rpageno == nblocks as u32, "unexpected EOF");
     252            6 :                     break;
     253              :                 }
     254              :                 _ => {
     255            0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     256              :                 }
     257              :             },
     258              :         };
     259            6 :         rpageno += 1;
     260              :     }
     261              : 
     262            6 :     Ok(())
     263            6 : }
     264              : 
     265              : /// Scan PostgreSQL WAL files in given directory and load all records between
     266              : /// 'startpoint' and 'endpoint' into the repository.
     267            2 : async fn import_wal(
     268            2 :     walpath: &Utf8Path,
     269            2 :     tline: &Timeline,
     270            2 :     startpoint: Lsn,
     271            2 :     endpoint: Lsn,
     272            2 :     ctx: &RequestContext,
     273            2 : ) -> anyhow::Result<()> {
     274            2 :     let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
     275            2 : 
     276            2 :     let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
     277            2 :     let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
     278            2 :     let mut last_lsn = startpoint;
     279              : 
     280            2 :     let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
     281              : 
     282            4 :     while last_lsn <= endpoint {
     283              :         // FIXME: assume postgresql tli 1 for now
     284            2 :         let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     285            2 :         let mut buf = Vec::new();
     286            2 : 
     287            2 :         // Read local file
     288            2 :         let mut path = walpath.join(&filename);
     289            2 : 
     290            2 :         // It could be as .partial
     291            2 :         if !PathBuf::from(&path).exists() {
     292            0 :             path = walpath.join(filename + ".partial");
     293            2 :         }
     294              : 
     295              :         // Slurp the WAL file
     296            2 :         let mut file = std::fs::File::open(&path)?;
     297              : 
     298            2 :         if offset > 0 {
     299            2 :             use std::io::Seek;
     300            2 :             file.seek(std::io::SeekFrom::Start(offset as u64))?;
     301            0 :         }
     302              : 
     303              :         use std::io::Read;
     304            2 :         let nread = file.read_to_end(&mut buf)?;
     305            2 :         if nread != WAL_SEGMENT_SIZE - offset {
     306              :             // Maybe allow this for .partial files?
     307            0 :             error!("read only {} bytes from WAL file", nread);
     308            2 :         }
     309              : 
     310            2 :         waldecoder.feed_bytes(&buf);
     311            2 : 
     312            2 :         let mut nrecords = 0;
     313            2 :         let mut modification = tline.begin_modification(last_lsn);
     314            4 :         while last_lsn <= endpoint {
     315            2 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     316            2 :                 let mut decoded = DecodedWALRecord::default();
     317            2 :                 decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
     318              : 
     319            2 :                 walingest
     320            2 :                     .ingest_record(decoded, lsn, &mut modification, ctx)
     321            0 :                     .await?;
     322            2 :                 WAL_INGEST.records_committed.inc();
     323            2 : 
     324            2 :                 modification.commit(ctx).await?;
     325            2 :                 last_lsn = lsn;
     326            2 : 
     327            2 :                 nrecords += 1;
     328            2 : 
     329            2 :                 trace!("imported record at {} (end {})", lsn, endpoint);
     330            0 :             }
     331              :         }
     332              : 
     333            2 :         debug!("imported {} records up to {}", nrecords, last_lsn);
     334              : 
     335            2 :         segno += 1;
     336            2 :         offset = 0;
     337              :     }
     338              : 
     339            2 :     if last_lsn != startpoint {
     340            2 :         info!("reached end of WAL at {}", last_lsn);
     341              :     } else {
     342            0 :         info!("no WAL to import at {}", last_lsn);
     343              :     }
     344              : 
     345            2 :     Ok(())
     346            2 : }
     347              : 
     348            0 : pub async fn import_basebackup_from_tar(
     349            0 :     tline: &Timeline,
     350            0 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     351            0 :     base_lsn: Lsn,
     352            0 :     ctx: &RequestContext,
     353            0 : ) -> Result<()> {
     354            0 :     info!("importing base at {base_lsn}");
     355            0 :     let mut modification = tline.begin_modification(base_lsn);
     356            0 :     modification.init_empty()?;
     357              : 
     358            0 :     let mut pg_control: Option<ControlFileData> = None;
     359              : 
     360              :     // Import base
     361            0 :     let mut entries = Archive::new(reader).entries()?;
     362            0 :     while let Some(base_tar_entry) = entries.next().await {
     363            0 :         let mut entry = base_tar_entry?;
     364            0 :         let header = entry.header();
     365            0 :         let len = header.entry_size()? as usize;
     366            0 :         let file_path = header.path()?.into_owned();
     367            0 : 
     368            0 :         match header.entry_type() {
     369              :             tokio_tar::EntryType::Regular => {
     370            0 :                 if let Some(res) =
     371            0 :                     import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
     372            0 :                 {
     373            0 :                     // We found the pg_control file.
     374            0 :                     pg_control = Some(res);
     375            0 :                 }
     376            0 :                 modification.flush(ctx).await?;
     377              :             }
     378              :             tokio_tar::EntryType::Directory => {
     379            0 :                 debug!("directory {:?}", file_path);
     380              :             }
     381              :             _ => {
     382            0 :                 bail!(
     383            0 :                     "entry {} in backup tar archive is of unexpected type: {:?}",
     384            0 :                     file_path.display(),
     385            0 :                     header.entry_type()
     386            0 :                 );
     387              :             }
     388              :         }
     389              :     }
     390              : 
     391              :     // sanity check: ensure that pg_control is loaded
     392            0 :     let _pg_control = pg_control.context("pg_control file not found")?;
     393              : 
     394            0 :     modification.commit(ctx).await?;
     395            0 :     Ok(())
     396            0 : }
     397              : 
     398            0 : pub async fn import_wal_from_tar(
     399            0 :     tline: &Timeline,
     400            0 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     401            0 :     start_lsn: Lsn,
     402            0 :     end_lsn: Lsn,
     403            0 :     ctx: &RequestContext,
     404            0 : ) -> Result<()> {
     405            0 :     // Set up walingest mutable state
     406            0 :     let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
     407            0 :     let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
     408            0 :     let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
     409            0 :     let mut last_lsn = start_lsn;
     410            0 :     let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
     411              : 
     412              :     // Ingest wal until end_lsn
     413            0 :     info!("importing wal until {}", end_lsn);
     414            0 :     let mut pg_wal_tar = Archive::new(reader);
     415            0 :     let mut pg_wal_entries = pg_wal_tar.entries()?;
     416            0 :     while last_lsn <= end_lsn {
     417            0 :         let bytes = {
     418            0 :             let mut entry = pg_wal_entries
     419            0 :                 .next()
     420            0 :                 .await
     421            0 :                 .ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
     422            0 :             let header = entry.header();
     423            0 :             let file_path = header.path()?.into_owned();
     424            0 : 
     425            0 :             match header.entry_type() {
     426              :                 tokio_tar::EntryType::Regular => {
     427              :                     // FIXME: assume postgresql tli 1 for now
     428            0 :                     let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     429            0 :                     let file_name = file_path
     430            0 :                         .file_name()
     431            0 :                         .expect("missing wal filename")
     432            0 :                         .to_string_lossy();
     433            0 :                     ensure!(expected_filename == file_name);
     434              : 
     435            0 :                     debug!("processing wal file {:?}", file_path);
     436            0 :                     read_all_bytes(&mut entry).await?
     437              :                 }
     438              :                 tokio_tar::EntryType::Directory => {
     439            0 :                     debug!("directory {:?}", file_path);
     440            0 :                     continue;
     441              :                 }
     442              :                 _ => {
     443            0 :                     bail!(
     444            0 :                         "entry {} in WAL tar archive is of unexpected type: {:?}",
     445            0 :                         file_path.display(),
     446            0 :                         header.entry_type()
     447            0 :                     );
     448              :                 }
     449              :             }
     450              :         };
     451              : 
     452            0 :         waldecoder.feed_bytes(&bytes[offset..]);
     453            0 : 
     454            0 :         let mut modification = tline.begin_modification(last_lsn);
     455            0 :         while last_lsn <= end_lsn {
     456            0 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     457            0 :                 let mut decoded = DecodedWALRecord::default();
     458            0 :                 decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
     459            0 :                 walingest
     460            0 :                     .ingest_record(decoded, lsn, &mut modification, ctx)
     461            0 :                     .await?;
     462            0 :                 modification.commit(ctx).await?;
     463            0 :                 last_lsn = lsn;
     464            0 : 
     465            0 :                 debug!("imported record at {} (end {})", lsn, end_lsn);
     466            0 :             }
     467              :         }
     468              : 
     469            0 :         debug!("imported records up to {}", last_lsn);
     470            0 :         segno += 1;
     471            0 :         offset = 0;
     472              :     }
     473              : 
     474            0 :     if last_lsn != start_lsn {
     475            0 :         info!("reached end of WAL at {}", last_lsn);
     476              :     } else {
     477            0 :         info!("there was no WAL to import at {}", last_lsn);
     478              :     }
     479              : 
     480              :     // Log any extra unused files
     481            0 :     while let Some(e) = pg_wal_entries.next().await {
     482            0 :         let entry = e?;
     483            0 :         let header = entry.header();
     484            0 :         let file_path = header.path()?.into_owned();
     485            0 :         info!("skipping {:?}", file_path);
     486              :     }
     487              : 
     488            0 :     Ok(())
     489            0 : }
     490              : 
     491         1930 : async fn import_file(
     492         1930 :     modification: &mut DatadirModification<'_>,
     493         1930 :     file_path: &Path,
     494         1930 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     495         1930 :     len: usize,
     496         1930 :     ctx: &RequestContext,
     497         1930 : ) -> Result<Option<ControlFileData>> {
     498         1930 :     let file_name = match file_path.file_name() {
     499         1930 :         Some(name) => name.to_string_lossy(),
     500            0 :         None => return Ok(None),
     501              :     };
     502              : 
     503         1930 :     if file_name.starts_with('.') {
     504              :         // tar archives on macOs, created without COPYFILE_DISABLE=1 env var
     505              :         // will contain "fork files", skip them.
     506            0 :         return Ok(None);
     507         1930 :     }
     508         1930 : 
     509         1930 :     if file_path.starts_with("global") {
     510          120 :         let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
     511          120 :         let dbnode = 0;
     512          120 : 
     513          120 :         match file_name.as_ref() {
     514          120 :             "pg_control" => {
     515           20 :                 let bytes = read_all_bytes(reader).await?;
     516              : 
     517              :                 // Extract the checkpoint record and import it separately.
     518            2 :                 let pg_control = ControlFileData::decode(&bytes[..])?;
     519            2 :                 let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
     520            2 :                 modification.put_checkpoint(checkpoint_bytes)?;
     521            2 :                 debug!("imported control file");
     522              : 
     523              :                 // Import it as ControlFile
     524            2 :                 modification.put_control_file(bytes)?;
     525            2 :                 return Ok(Some(pg_control));
     526              :             }
     527          118 :             "pg_filenode.map" => {
     528           12 :                 let bytes = read_all_bytes(reader).await?;
     529            2 :                 modification
     530            2 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     531            0 :                     .await?;
     532            2 :                 debug!("imported relmap file")
     533              :             }
     534          116 :             "PG_VERSION" => {
     535            0 :                 debug!("ignored PG_VERSION file");
     536              :             }
     537              :             _ => {
     538          240 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     539          116 :                 debug!("imported rel creation");
     540              :             }
     541              :         }
     542         1810 :     } else if file_path.starts_with("base") {
     543         1788 :         let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
     544         1788 :         let dbnode: u32 = file_path
     545         1788 :             .iter()
     546         1788 :             .nth(1)
     547         1788 :             .expect("invalid file path, expected dbnode")
     548         1788 :             .to_string_lossy()
     549         1788 :             .parse()?;
     550              : 
     551         1788 :         match file_name.as_ref() {
     552         1788 :             "pg_filenode.map" => {
     553           33 :                 let bytes = read_all_bytes(reader).await?;
     554            6 :                 modification
     555            6 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     556            0 :                     .await?;
     557            6 :                 debug!("imported relmap file")
     558              :             }
     559         1782 :             "PG_VERSION" => {
     560            6 :                 debug!("ignored PG_VERSION file");
     561              :             }
     562              :             _ => {
     563         6654 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     564         1776 :                 debug!("imported rel creation");
     565              :             }
     566              :         }
     567           22 :     } else if file_path.starts_with("pg_xact") {
     568            2 :         let slru = SlruKind::Clog;
     569            2 : 
     570            4 :         import_slru(modification, slru, file_path, reader, len, ctx).await?;
     571            2 :         debug!("imported clog slru");
     572           20 :     } else if file_path.starts_with("pg_multixact/offsets") {
     573            2 :         let slru = SlruKind::MultiXactOffsets;
     574            2 : 
     575            4 :         import_slru(modification, slru, file_path, reader, len, ctx).await?;
     576            2 :         debug!("imported multixact offsets slru");
     577           18 :     } else if file_path.starts_with("pg_multixact/members") {
     578            2 :         let slru = SlruKind::MultiXactMembers;
     579            2 : 
     580            4 :         import_slru(modification, slru, file_path, reader, len, ctx).await?;
     581            2 :         debug!("imported multixact members slru");
     582           16 :     } else if file_path.starts_with("pg_twophase") {
     583            0 :         let bytes = read_all_bytes(reader).await?;
     584              : 
     585              :         // In PostgreSQL v17, this is a 64-bit FullTransactionid. In previous versions,
     586              :         // it's a 32-bit TransactionId, which fits in u64 anyway.
     587            0 :         let xid = u64::from_str_radix(file_name.as_ref(), 16)?;
     588            0 :         modification
     589            0 :             .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
     590            0 :             .await?;
     591            0 :         debug!("imported twophase file");
     592           16 :     } else if file_path.starts_with("pg_wal") {
     593            0 :         debug!("found wal file in base section. ignore it");
     594           16 :     } else if file_path.starts_with("zenith.signal") {
     595              :         // Parse zenith signal file to set correct previous LSN
     596            0 :         let bytes = read_all_bytes(reader).await?;
     597              :         // zenith.signal format is "PREV LSN: prev_lsn"
     598              :         // TODO write serialization and deserialization in the same place.
     599            0 :         let zenith_signal = std::str::from_utf8(&bytes)?.trim();
     600            0 :         let prev_lsn = match zenith_signal {
     601            0 :             "PREV LSN: none" => Lsn(0),
     602            0 :             "PREV LSN: invalid" => Lsn(0),
     603            0 :             other => {
     604            0 :                 let split = other.split(':').collect::<Vec<_>>();
     605            0 :                 split[1]
     606            0 :                     .trim()
     607            0 :                     .parse::<Lsn>()
     608            0 :                     .context("can't parse zenith.signal")?
     609              :             }
     610              :         };
     611              : 
     612              :         // zenith.signal is not necessarily the last file, that we handle
     613              :         // but it is ok to call `finish_write()`, because final `modification.commit()`
     614              :         // will update lsn once more to the final one.
     615            0 :         let writer = modification.tline.writer().await;
     616            0 :         writer.finish_write(prev_lsn);
     617            0 : 
     618            0 :         debug!("imported zenith signal {}", prev_lsn);
     619           16 :     } else if file_path.starts_with("pg_tblspc") {
     620              :         // TODO Backups exported from neon won't have pg_tblspc, but we will need
     621              :         // this to import arbitrary postgres databases.
     622            0 :         bail!("Importing pg_tblspc is not implemented");
     623              :     } else {
     624           16 :         debug!(
     625            0 :             "ignoring unrecognized file \"{}\" in tar archive",
     626            0 :             file_path.display()
     627              :         );
     628              :     }
     629              : 
     630         1928 :     Ok(None)
     631         1930 : }
     632              : 
     633           10 : async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
     634           10 :     let mut buf: Vec<u8> = vec![];
     635           65 :     reader.read_to_end(&mut buf).await?;
     636           10 :     Ok(Bytes::from(buf))
     637           10 : }
        

Generated by: LCOV version 2.1-beta