LCOV - code coverage report
Current view: top level - pageserver/src - import_datadir.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 64.1 % 448 287
Test Date: 2025-04-24 20:31:15 Functions: 41.2 % 34 14

            Line data    Source code
       1              : //!
       2              : //! Import data and WAL from a PostgreSQL data directory and WAL segments into
       3              : //! a neon Timeline.
       4              : //!
       5              : use std::path::{Path, PathBuf};
       6              : 
       7              : use anyhow::{Context, Result, bail, ensure};
       8              : use bytes::Bytes;
       9              : use camino::Utf8Path;
      10              : use futures::StreamExt;
      11              : use pageserver_api::key::rel_block_to_key;
      12              : use pageserver_api::reltag::{RelTag, SlruKind};
      13              : use postgres_ffi::relfile_utils::*;
      14              : use postgres_ffi::waldecoder::WalStreamDecoder;
      15              : use postgres_ffi::{
      16              :     BLCKSZ, ControlFileData, DBState_DB_SHUTDOWNED, Oid, WAL_SEGMENT_SIZE, XLogFileName,
      17              :     pg_constants,
      18              : };
      19              : use tokio::io::{AsyncRead, AsyncReadExt};
      20              : use tokio_tar::Archive;
      21              : use tracing::*;
      22              : use utils::lsn::Lsn;
      23              : use wal_decoder::models::InterpretedWalRecord;
      24              : use walkdir::WalkDir;
      25              : 
      26              : use crate::context::RequestContext;
      27              : use crate::metrics::WAL_INGEST;
      28              : use crate::pgdatadir_mapping::*;
      29              : use crate::tenant::Timeline;
      30              : use crate::walingest::{WalIngest, WalIngestErrorKind};
      31              : 
      32              : // Returns checkpoint LSN from controlfile
      33           12 : pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
      34           12 :     // Read control file to extract the LSN
      35           12 :     let controlfile_path = path.join("global").join("pg_control");
      36           12 :     let controlfile_buf = std::fs::read(&controlfile_path)
      37           12 :         .with_context(|| format!("reading controlfile: {controlfile_path}"))?;
      38           12 :     let controlfile = ControlFileData::decode(&controlfile_buf)?;
      39           12 :     let lsn = controlfile.checkPoint;
      40           12 : 
      41           12 :     Ok(Lsn(lsn))
      42           12 : }
      43              : 
      44              : ///
      45              : /// Import all relation data pages from local disk into the repository.
      46              : ///
      47              : /// This is currently only used to import a cluster freshly created by initdb.
      48              : /// The code that deals with the checkpoint would not work right if the
      49              : /// cluster was not shut down cleanly.
      50           12 : pub async fn import_timeline_from_postgres_datadir(
      51           12 :     tline: &Timeline,
      52           12 :     pgdata_path: &Utf8Path,
      53           12 :     pgdata_lsn: Lsn,
      54           12 :     ctx: &RequestContext,
      55           12 : ) -> Result<()> {
      56           12 :     let mut pg_control: Option<ControlFileData> = None;
      57           12 : 
      58           12 :     // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
      59           12 :     // Then fishing out pg_control would be unnecessary
      60           12 :     let mut modification = tline.begin_modification(pgdata_lsn);
      61           12 :     modification.init_empty()?;
      62              : 
      63              :     // Import all but pg_wal
      64           12 :     let all_but_wal = WalkDir::new(pgdata_path)
      65           12 :         .into_iter()
      66        11880 :         .filter_entry(|entry| !entry.path().ends_with("pg_wal"));
      67        11880 :     for entry in all_but_wal {
      68        11868 :         let entry = entry?;
      69        11868 :         let metadata = entry.metadata().expect("error getting dir entry metadata");
      70        11868 :         if metadata.is_file() {
      71        11580 :             let absolute_path = entry.path();
      72        11580 :             let relative_path = absolute_path.strip_prefix(pgdata_path)?;
      73              : 
      74        11580 :             let mut file = tokio::fs::File::open(absolute_path).await?;
      75        11580 :             let len = metadata.len() as usize;
      76           12 :             if let Some(control_file) =
      77        11580 :                 import_file(&mut modification, relative_path, &mut file, len, ctx).await?
      78           12 :             {
      79           12 :                 pg_control = Some(control_file);
      80        11568 :             }
      81        11580 :             modification.flush(ctx).await?;
      82          288 :         }
      83              :     }
      84              : 
      85              :     // We're done importing all the data files.
      86           12 :     modification.commit(ctx).await?;
      87              : 
      88              :     // We expect the Postgres server to be shut down cleanly.
      89           12 :     let pg_control = pg_control.context("pg_control file not found")?;
      90           12 :     ensure!(
      91           12 :         pg_control.state == DBState_DB_SHUTDOWNED,
      92            0 :         "Postgres cluster was not shut down cleanly"
      93              :     );
      94           12 :     ensure!(
      95           12 :         pg_control.checkPointCopy.redo == pgdata_lsn.0,
      96            0 :         "unexpected checkpoint REDO pointer"
      97              :     );
      98              : 
      99              :     // Import WAL. This is needed even when starting from a shutdown checkpoint, because
     100              :     // this reads the checkpoint record itself, advancing the tip of the timeline to
     101              :     // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
     102           12 :     import_wal(
     103           12 :         &pgdata_path.join("pg_wal"),
     104           12 :         tline,
     105           12 :         Lsn(pg_control.checkPointCopy.redo),
     106           12 :         pgdata_lsn,
     107           12 :         ctx,
     108           12 :     )
     109           12 :     .await?;
     110              : 
     111           12 :     Ok(())
     112           12 : }
     113              : 
     114              : // subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
     115        11352 : async fn import_rel(
     116        11352 :     modification: &mut DatadirModification<'_>,
     117        11352 :     path: &Path,
     118        11352 :     spcoid: Oid,
     119        11352 :     dboid: Oid,
     120        11352 :     reader: &mut (impl AsyncRead + Unpin),
     121        11352 :     len: usize,
     122        11352 :     ctx: &RequestContext,
     123        11352 : ) -> anyhow::Result<()> {
     124        11352 :     // Does it look like a relation file?
     125        11352 :     trace!("importing rel file {}", path.display());
     126              : 
     127        11352 :     let filename = &path
     128        11352 :         .file_name()
     129        11352 :         .expect("missing rel filename")
     130        11352 :         .to_string_lossy();
     131        11352 :     let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
     132            0 :         warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
     133            0 :         e
     134        11352 :     })?;
     135              : 
     136        11352 :     let mut buf: [u8; 8192] = [0u8; 8192];
     137        11352 : 
     138        11352 :     ensure!(len % BLCKSZ as usize == 0);
     139        11352 :     let nblocks = len / BLCKSZ as usize;
     140        11352 : 
     141        11352 :     let rel = RelTag {
     142        11352 :         spcnode: spcoid,
     143        11352 :         dbnode: dboid,
     144        11352 :         relnode,
     145        11352 :         forknum,
     146        11352 :     };
     147        11352 : 
     148        11352 :     let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     149              : 
     150              :     // Call put_rel_creation for every segment of the relation,
     151              :     // because there is no guarantee about the order in which we are processing segments.
     152              :     // ignore "relation already exists" error
     153              :     //
     154              :     // FIXME: Keep track of which relations we've already created?
     155              :     // https://github.com/neondatabase/neon/issues/3309
     156        11352 :     if let Err(e) = modification
     157        11352 :         .put_rel_creation(rel, nblocks as u32, ctx)
     158        11352 :         .await
     159              :     {
     160            0 :         match e.kind {
     161            0 :             WalIngestErrorKind::RelationAlreadyExists(rel) => {
     162            0 :                 debug!("Relation {rel} already exists. We must be extending it.")
     163              :             }
     164            0 :             _ => return Err(e.into()),
     165              :         }
     166        11352 :     }
     167              : 
     168              :     loop {
     169        43992 :         let r = reader.read_exact(&mut buf).await;
     170        43992 :         match r {
     171              :             Ok(_) => {
     172        32640 :                 let key = rel_block_to_key(rel, blknum);
     173        32640 :                 if modification.tline.get_shard_identity().is_key_local(&key) {
     174        32640 :                     modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
     175            0 :                 }
     176              :             }
     177              : 
     178              :             // TODO: UnexpectedEof is expected
     179        11352 :             Err(err) => match err.kind() {
     180              :                 std::io::ErrorKind::UnexpectedEof => {
     181              :                     // reached EOF. That's expected.
     182        11352 :                     let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     183        11352 :                     ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
     184        11352 :                     break;
     185              :                 }
     186              :                 _ => {
     187            0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     188              :                 }
     189              :             },
     190              :         };
     191        32640 :         blknum += 1;
     192              :     }
     193              : 
     194              :     // Update relation size
     195              :     //
     196              :     // If we process rel segments out of order,
     197              :     // put_rel_extend will skip the update.
     198        11352 :     modification.put_rel_extend(rel, blknum, ctx).await?;
     199              : 
     200        11352 :     Ok(())
     201        11352 : }
     202              : 
     203              : /// Import an SLRU segment file
     204              : ///
     205           36 : async fn import_slru(
     206           36 :     modification: &mut DatadirModification<'_>,
     207           36 :     slru: SlruKind,
     208           36 :     path: &Path,
     209           36 :     reader: &mut (impl AsyncRead + Unpin),
     210           36 :     len: usize,
     211           36 :     ctx: &RequestContext,
     212           36 : ) -> anyhow::Result<()> {
     213           36 :     info!("importing slru file {path:?}");
     214              : 
     215           36 :     let mut buf: [u8; 8192] = [0u8; 8192];
     216           36 :     let filename = &path
     217           36 :         .file_name()
     218           36 :         .with_context(|| format!("missing slru filename for path {path:?}"))?
     219           36 :         .to_string_lossy();
     220           36 :     let segno = u32::from_str_radix(filename, 16)?;
     221              : 
     222           36 :     ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
     223           36 :     let nblocks = len / BLCKSZ as usize;
     224           36 : 
     225           36 :     ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
     226              : 
     227           36 :     modification
     228           36 :         .put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
     229           36 :         .await?;
     230              : 
     231           36 :     let mut rpageno = 0;
     232              :     loop {
     233           72 :         let r = reader.read_exact(&mut buf).await;
     234           72 :         match r {
     235              :             Ok(_) => {
     236           36 :                 modification.put_slru_page_image(
     237           36 :                     slru,
     238           36 :                     segno,
     239           36 :                     rpageno,
     240           36 :                     Bytes::copy_from_slice(&buf),
     241           36 :                 )?;
     242              :             }
     243              : 
     244              :             // TODO: UnexpectedEof is expected
     245           36 :             Err(err) => match err.kind() {
     246              :                 std::io::ErrorKind::UnexpectedEof => {
     247              :                     // reached EOF. That's expected.
     248           36 :                     ensure!(rpageno == nblocks as u32, "unexpected EOF");
     249           36 :                     break;
     250              :                 }
     251              :                 _ => {
     252            0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     253              :                 }
     254              :             },
     255              :         };
     256           36 :         rpageno += 1;
     257              :     }
     258              : 
     259           36 :     Ok(())
     260           36 : }
     261              : 
     262              : /// Scan PostgreSQL WAL files in given directory and load all records between
     263              : /// 'startpoint' and 'endpoint' into the repository.
     264           12 : async fn import_wal(
     265           12 :     walpath: &Utf8Path,
     266           12 :     tline: &Timeline,
     267           12 :     startpoint: Lsn,
     268           12 :     endpoint: Lsn,
     269           12 :     ctx: &RequestContext,
     270           12 : ) -> anyhow::Result<()> {
     271           12 :     let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
     272           12 : 
     273           12 :     let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
     274           12 :     let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
     275           12 :     let mut last_lsn = startpoint;
     276              : 
     277           12 :     let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
     278              : 
     279           12 :     let shard = vec![*tline.get_shard_identity()];
     280              : 
     281           24 :     while last_lsn <= endpoint {
     282              :         // FIXME: assume postgresql tli 1 for now
     283           12 :         let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     284           12 :         let mut buf = Vec::new();
     285           12 : 
     286           12 :         // Read local file
     287           12 :         let mut path = walpath.join(&filename);
     288           12 : 
     289           12 :         // It could be as .partial
     290           12 :         if !PathBuf::from(&path).exists() {
     291            0 :             path = walpath.join(filename + ".partial");
     292           12 :         }
     293              : 
     294              :         // Slurp the WAL file
     295           12 :         let mut file = std::fs::File::open(&path)?;
     296              : 
     297           12 :         if offset > 0 {
     298           12 :             use std::io::Seek;
     299           12 :             file.seek(std::io::SeekFrom::Start(offset as u64))?;
     300            0 :         }
     301              : 
     302              :         use std::io::Read;
     303           12 :         let nread = file.read_to_end(&mut buf)?;
     304           12 :         if nread != WAL_SEGMENT_SIZE - offset {
     305              :             // Maybe allow this for .partial files?
     306            0 :             error!("read only {} bytes from WAL file", nread);
     307           12 :         }
     308              : 
     309           12 :         waldecoder.feed_bytes(&buf);
     310           12 : 
     311           12 :         let mut nrecords = 0;
     312           12 :         let mut modification = tline.begin_modification(last_lsn);
     313           24 :         while last_lsn <= endpoint {
     314           12 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     315           12 :                 let interpreted = InterpretedWalRecord::from_bytes_filtered(
     316           12 :                     recdata,
     317           12 :                     &shard,
     318           12 :                     lsn,
     319           12 :                     tline.pg_version,
     320           12 :                 )?
     321           12 :                 .remove(tline.get_shard_identity())
     322           12 :                 .unwrap();
     323           12 : 
     324           12 :                 walingest
     325           12 :                     .ingest_record(interpreted, &mut modification, ctx)
     326           12 :                     .await?;
     327           12 :                 WAL_INGEST.records_committed.inc();
     328           12 : 
     329           12 :                 modification.commit(ctx).await?;
     330           12 :                 last_lsn = lsn;
     331           12 : 
     332           12 :                 nrecords += 1;
     333           12 : 
     334           12 :                 trace!("imported record at {} (end {})", lsn, endpoint);
     335            0 :             }
     336              :         }
     337              : 
     338           12 :         debug!("imported {} records up to {}", nrecords, last_lsn);
     339              : 
     340           12 :         segno += 1;
     341           12 :         offset = 0;
     342              :     }
     343              : 
     344           12 :     if last_lsn != startpoint {
     345           12 :         info!("reached end of WAL at {}", last_lsn);
     346              :     } else {
     347            0 :         info!("no WAL to import at {}", last_lsn);
     348              :     }
     349              : 
     350           12 :     Ok(())
     351           12 : }
     352              : 
     353            0 : pub async fn import_basebackup_from_tar(
     354            0 :     tline: &Timeline,
     355            0 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     356            0 :     base_lsn: Lsn,
     357            0 :     ctx: &RequestContext,
     358            0 : ) -> Result<()> {
     359            0 :     info!("importing base at {base_lsn}");
     360            0 :     let mut modification = tline.begin_modification(base_lsn);
     361            0 :     modification.init_empty()?;
     362              : 
     363            0 :     let mut pg_control: Option<ControlFileData> = None;
     364              : 
     365              :     // Import base
     366            0 :     let mut entries = Archive::new(reader).entries()?;
     367            0 :     while let Some(base_tar_entry) = entries.next().await {
     368            0 :         let mut entry = base_tar_entry?;
     369            0 :         let header = entry.header();
     370            0 :         let len = header.entry_size()? as usize;
     371            0 :         let file_path = header.path()?.into_owned();
     372            0 : 
     373            0 :         match header.entry_type() {
     374              :             tokio_tar::EntryType::Regular => {
     375            0 :                 if let Some(res) =
     376            0 :                     import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
     377            0 :                 {
     378            0 :                     // We found the pg_control file.
     379            0 :                     pg_control = Some(res);
     380            0 :                 }
     381            0 :                 modification.flush(ctx).await?;
     382              :             }
     383              :             tokio_tar::EntryType::Directory => {
     384            0 :                 debug!("directory {:?}", file_path);
     385              :             }
     386              :             _ => {
     387            0 :                 bail!(
     388            0 :                     "entry {} in backup tar archive is of unexpected type: {:?}",
     389            0 :                     file_path.display(),
     390            0 :                     header.entry_type()
     391            0 :                 );
     392              :             }
     393              :         }
     394              :     }
     395              : 
     396              :     // sanity check: ensure that pg_control is loaded
     397            0 :     let _pg_control = pg_control.context("pg_control file not found")?;
     398              : 
     399            0 :     modification.commit(ctx).await?;
     400            0 :     Ok(())
     401            0 : }
     402              : 
     403            0 : pub async fn import_wal_from_tar(
     404            0 :     tline: &Timeline,
     405            0 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     406            0 :     start_lsn: Lsn,
     407            0 :     end_lsn: Lsn,
     408            0 :     ctx: &RequestContext,
     409            0 : ) -> Result<()> {
     410            0 :     // Set up walingest mutable state
     411            0 :     let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
     412            0 :     let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
     413            0 :     let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
     414            0 :     let mut last_lsn = start_lsn;
     415            0 :     let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
     416            0 :     let shard = vec![*tline.get_shard_identity()];
     417            0 : 
     418            0 :     // Ingest wal until end_lsn
     419            0 :     info!("importing wal until {}", end_lsn);
     420            0 :     let mut pg_wal_tar = Archive::new(reader);
     421            0 :     let mut pg_wal_entries = pg_wal_tar.entries()?;
     422            0 :     while last_lsn <= end_lsn {
     423            0 :         let bytes = {
     424            0 :             let mut entry = pg_wal_entries
     425            0 :                 .next()
     426            0 :                 .await
     427            0 :                 .ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
     428            0 :             let header = entry.header();
     429            0 :             let file_path = header.path()?.into_owned();
     430            0 : 
     431            0 :             match header.entry_type() {
     432              :                 tokio_tar::EntryType::Regular => {
     433              :                     // FIXME: assume postgresql tli 1 for now
     434            0 :                     let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     435            0 :                     let file_name = file_path
     436            0 :                         .file_name()
     437            0 :                         .expect("missing wal filename")
     438            0 :                         .to_string_lossy();
     439            0 :                     ensure!(expected_filename == file_name);
     440              : 
     441            0 :                     debug!("processing wal file {:?}", file_path);
     442            0 :                     read_all_bytes(&mut entry).await?
     443              :                 }
     444              :                 tokio_tar::EntryType::Directory => {
     445            0 :                     debug!("directory {:?}", file_path);
     446            0 :                     continue;
     447              :                 }
     448              :                 _ => {
     449            0 :                     bail!(
     450            0 :                         "entry {} in WAL tar archive is of unexpected type: {:?}",
     451            0 :                         file_path.display(),
     452            0 :                         header.entry_type()
     453            0 :                     );
     454              :                 }
     455              :             }
     456              :         };
     457              : 
     458            0 :         waldecoder.feed_bytes(&bytes[offset..]);
     459            0 : 
     460            0 :         let mut modification = tline.begin_modification(last_lsn);
     461            0 :         while last_lsn <= end_lsn {
     462            0 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     463            0 :                 let interpreted = InterpretedWalRecord::from_bytes_filtered(
     464            0 :                     recdata,
     465            0 :                     &shard,
     466            0 :                     lsn,
     467            0 :                     tline.pg_version,
     468            0 :                 )?
     469            0 :                 .remove(tline.get_shard_identity())
     470            0 :                 .unwrap();
     471            0 : 
     472            0 :                 walingest
     473            0 :                     .ingest_record(interpreted, &mut modification, ctx)
     474            0 :                     .await?;
     475            0 :                 modification.commit(ctx).await?;
     476            0 :                 last_lsn = lsn;
     477            0 : 
     478            0 :                 debug!("imported record at {} (end {})", lsn, end_lsn);
     479            0 :             }
     480              :         }
     481              : 
     482            0 :         debug!("imported records up to {}", last_lsn);
     483            0 :         segno += 1;
     484            0 :         offset = 0;
     485              :     }
     486              : 
     487            0 :     if last_lsn != start_lsn {
     488            0 :         info!("reached end of WAL at {}", last_lsn);
     489              :     } else {
     490            0 :         info!("there was no WAL to import at {}", last_lsn);
     491              :     }
     492              : 
     493              :     // Log any extra unused files
     494            0 :     while let Some(e) = pg_wal_entries.next().await {
     495            0 :         let entry = e?;
     496            0 :         let header = entry.header();
     497            0 :         let file_path = header.path()?.into_owned();
     498            0 :         info!("skipping {:?}", file_path);
     499              :     }
     500              : 
     501            0 :     Ok(())
     502            0 : }
     503              : 
     504        11580 : async fn import_file(
     505        11580 :     modification: &mut DatadirModification<'_>,
     506        11580 :     file_path: &Path,
     507        11580 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     508        11580 :     len: usize,
     509        11580 :     ctx: &RequestContext,
     510        11580 : ) -> Result<Option<ControlFileData>> {
     511        11580 :     let file_name = match file_path.file_name() {
     512        11580 :         Some(name) => name.to_string_lossy(),
     513            0 :         None => return Ok(None),
     514              :     };
     515              : 
     516        11580 :     if file_name.starts_with('.') {
     517              :         // tar archives on macOs, created without COPYFILE_DISABLE=1 env var
     518              :         // will contain "fork files", skip them.
     519            0 :         return Ok(None);
     520        11580 :     }
     521        11580 : 
     522        11580 :     if file_path.starts_with("global") {
     523          720 :         let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
     524          720 :         let dbnode = 0;
     525          720 : 
     526          720 :         match file_name.as_ref() {
     527          720 :             "pg_control" => {
     528           12 :                 let bytes = read_all_bytes(reader).await?;
     529              : 
     530              :                 // Extract the checkpoint record and import it separately.
     531           12 :                 let pg_control = ControlFileData::decode(&bytes[..])?;
     532           12 :                 let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
     533           12 :                 modification.put_checkpoint(checkpoint_bytes)?;
     534           12 :                 debug!("imported control file");
     535              : 
     536              :                 // Import it as ControlFile
     537           12 :                 modification.put_control_file(bytes)?;
     538           12 :                 return Ok(Some(pg_control));
     539              :             }
     540          708 :             "pg_filenode.map" => {
     541           12 :                 let bytes = read_all_bytes(reader).await?;
     542           12 :                 modification
     543           12 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     544           12 :                     .await?;
     545           12 :                 debug!("imported relmap file")
     546              :             }
     547          696 :             "PG_VERSION" => {
     548            0 :                 debug!("ignored PG_VERSION file");
     549              :             }
     550              :             _ => {
     551          696 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     552          696 :                 debug!("imported rel creation");
     553              :             }
     554              :         }
     555        10860 :     } else if file_path.starts_with("base") {
     556        10728 :         let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
     557        10728 :         let dbnode: u32 = file_path
     558        10728 :             .iter()
     559        10728 :             .nth(1)
     560        10728 :             .expect("invalid file path, expected dbnode")
     561        10728 :             .to_string_lossy()
     562        10728 :             .parse()?;
     563              : 
     564        10728 :         match file_name.as_ref() {
     565        10728 :             "pg_filenode.map" => {
     566           36 :                 let bytes = read_all_bytes(reader).await?;
     567           36 :                 modification
     568           36 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     569           36 :                     .await?;
     570           36 :                 debug!("imported relmap file")
     571              :             }
     572        10692 :             "PG_VERSION" => {
     573           36 :                 debug!("ignored PG_VERSION file");
     574              :             }
     575              :             _ => {
     576        10656 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     577        10656 :                 debug!("imported rel creation");
     578              :             }
     579              :         }
     580          132 :     } else if file_path.starts_with("pg_xact") {
     581           12 :         let slru = SlruKind::Clog;
     582           12 : 
     583           12 :         if modification.tline.tenant_shard_id.is_shard_zero() {
     584           12 :             import_slru(modification, slru, file_path, reader, len, ctx).await?;
     585           12 :             debug!("imported clog slru");
     586            0 :         }
     587          120 :     } else if file_path.starts_with("pg_multixact/offsets") {
     588           12 :         let slru = SlruKind::MultiXactOffsets;
     589           12 : 
     590           12 :         if modification.tline.tenant_shard_id.is_shard_zero() {
     591           12 :             import_slru(modification, slru, file_path, reader, len, ctx).await?;
     592           12 :             debug!("imported multixact offsets slru");
     593            0 :         }
     594          108 :     } else if file_path.starts_with("pg_multixact/members") {
     595           12 :         let slru = SlruKind::MultiXactMembers;
     596           12 : 
     597           12 :         if modification.tline.tenant_shard_id.is_shard_zero() {
     598           12 :             import_slru(modification, slru, file_path, reader, len, ctx).await?;
     599           12 :             debug!("imported multixact members slru");
     600            0 :         }
     601           96 :     } else if file_path.starts_with("pg_twophase") {
     602            0 :         let bytes = read_all_bytes(reader).await?;
     603              : 
     604              :         // In PostgreSQL v17, this is a 64-bit FullTransactionid. In previous versions,
     605              :         // it's a 32-bit TransactionId, which fits in u64 anyway.
     606            0 :         let xid = u64::from_str_radix(file_name.as_ref(), 16)?;
     607            0 :         modification
     608            0 :             .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
     609            0 :             .await?;
     610            0 :         debug!("imported twophase file");
     611           96 :     } else if file_path.starts_with("pg_wal") {
     612            0 :         debug!("found wal file in base section. ignore it");
     613           96 :     } else if file_path.starts_with("zenith.signal") {
     614              :         // Parse zenith signal file to set correct previous LSN
     615            0 :         let bytes = read_all_bytes(reader).await?;
     616              :         // zenith.signal format is "PREV LSN: prev_lsn"
     617              :         // TODO write serialization and deserialization in the same place.
     618            0 :         let zenith_signal = std::str::from_utf8(&bytes)?.trim();
     619            0 :         let prev_lsn = match zenith_signal {
     620            0 :             "PREV LSN: none" => Lsn(0),
     621            0 :             "PREV LSN: invalid" => Lsn(0),
     622            0 :             other => {
     623            0 :                 let split = other.split(':').collect::<Vec<_>>();
     624            0 :                 split[1]
     625            0 :                     .trim()
     626            0 :                     .parse::<Lsn>()
     627            0 :                     .context("can't parse zenith.signal")?
     628              :             }
     629              :         };
     630              : 
     631              :         // zenith.signal is not necessarily the last file, that we handle
     632              :         // but it is ok to call `finish_write()`, because final `modification.commit()`
     633              :         // will update lsn once more to the final one.
     634            0 :         let writer = modification.tline.writer().await;
     635            0 :         writer.finish_write(prev_lsn);
     636            0 : 
     637            0 :         debug!("imported zenith signal {}", prev_lsn);
     638           96 :     } else if file_path.starts_with("pg_tblspc") {
     639              :         // TODO Backups exported from neon won't have pg_tblspc, but we will need
     640              :         // this to import arbitrary postgres databases.
     641            0 :         bail!("Importing pg_tblspc is not implemented");
     642              :     } else {
     643           96 :         debug!(
     644            0 :             "ignoring unrecognized file \"{}\" in tar archive",
     645            0 :             file_path.display()
     646              :         );
     647              :     }
     648              : 
     649        11568 :     Ok(None)
     650        11580 : }
     651              : 
     652           60 : async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
     653           60 :     let mut buf: Vec<u8> = vec![];
     654           60 :     reader.read_to_end(&mut buf).await?;
     655           60 :     Ok(Bytes::from(buf))
     656           60 : }
        

Generated by: LCOV version 2.1-beta