LCOV - differential code coverage report
Current view: top level - pageserver/src - import_datadir.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 82.6 % 408 337 71 337
Current Date: 2023-10-19 02:04:12 Functions: 27.8 % 97 27 70 27
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! Import data and WAL from a PostgreSQL data directory and WAL segments into
       3                 : //! a neon Timeline.
       4                 : //!
       5                 : use std::path::{Path, PathBuf};
       6                 : 
       7                 : use anyhow::{bail, ensure, Context, Result};
       8                 : use bytes::Bytes;
       9                 : use camino::Utf8Path;
      10                 : use futures::StreamExt;
      11                 : use tokio::io::{AsyncRead, AsyncReadExt};
      12                 : use tokio_tar::Archive;
      13                 : use tracing::*;
      14                 : use walkdir::WalkDir;
      15                 : 
      16                 : use crate::context::RequestContext;
      17                 : use crate::pgdatadir_mapping::*;
      18                 : use crate::tenant::Timeline;
      19                 : use crate::walingest::WalIngest;
      20                 : use crate::walrecord::DecodedWALRecord;
      21                 : use pageserver_api::reltag::{RelTag, SlruKind};
      22                 : use postgres_ffi::pg_constants;
      23                 : use postgres_ffi::relfile_utils::*;
      24                 : use postgres_ffi::waldecoder::WalStreamDecoder;
      25                 : use postgres_ffi::ControlFileData;
      26                 : use postgres_ffi::DBState_DB_SHUTDOWNED;
      27                 : use postgres_ffi::Oid;
      28                 : use postgres_ffi::XLogFileName;
      29                 : use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
      30                 : use utils::lsn::Lsn;
      31                 : 
      32                 : // Returns checkpoint LSN from controlfile
      33 CBC         575 : pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
      34             575 :     // Read control file to extract the LSN
      35             575 :     let controlfile_path = path.join("global").join("pg_control");
      36             575 :     let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?;
      37             575 :     let lsn = controlfile.checkPoint;
      38             575 : 
      39             575 :     Ok(Lsn(lsn))
      40             575 : }
      41                 : 
      42                 : ///
      43                 : /// Import all relation data pages from local disk into the repository.
      44                 : ///
      45                 : /// This is currently only used to import a cluster freshly created by initdb.
      46                 : /// The code that deals with the checkpoint would not work right if the
      47                 : /// cluster was not shut down cleanly.
      48             574 : pub async fn import_timeline_from_postgres_datadir(
      49             574 :     tline: &Timeline,
      50             574 :     pgdata_path: &Utf8Path,
      51             574 :     pgdata_lsn: Lsn,
      52             574 :     ctx: &RequestContext,
      53             574 : ) -> Result<()> {
      54             574 :     let mut pg_control: Option<ControlFileData> = None;
      55             574 : 
      56             574 :     // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
      57             574 :     // Then fishing out pg_control would be unnecessary
      58             574 :     let mut modification = tline.begin_modification(pgdata_lsn);
      59             574 :     modification.init_empty()?;
      60                 : 
      61                 :     // Import all but pg_wal
      62             574 :     let all_but_wal = WalkDir::new(pgdata_path)
      63             574 :         .into_iter()
      64          559076 :         .filter_entry(|entry| !entry.path().ends_with("pg_wal"));
      65          559076 :     for entry in all_but_wal {
      66          558502 :         let entry = entry?;
      67          558502 :         let metadata = entry.metadata().expect("error getting dir entry metadata");
      68          558502 :         if metadata.is_file() {
      69          544726 :             let absolute_path = entry.path();
      70          544726 :             let relative_path = absolute_path.strip_prefix(pgdata_path)?;
      71                 : 
      72          544726 :             let mut file = tokio::fs::File::open(absolute_path).await?;
      73          544726 :             let len = metadata.len() as usize;
      74             574 :             if let Some(control_file) =
      75         2302630 :                 import_file(&mut modification, relative_path, &mut file, len, ctx).await?
      76             574 :             {
      77             574 :                 pg_control = Some(control_file);
      78          544152 :             }
      79          544726 :             modification.flush(ctx).await?;
      80           13776 :         }
      81                 :     }
      82                 : 
      83                 :     // We're done importing all the data files.
      84           49938 :     modification.commit(ctx).await?;
      85                 : 
      86                 :     // We expect the Postgres server to be shut down cleanly.
      87             574 :     let pg_control = pg_control.context("pg_control file not found")?;
      88             574 :     ensure!(
      89             574 :         pg_control.state == DBState_DB_SHUTDOWNED,
      90 UBC           0 :         "Postgres cluster was not shut down cleanly"
      91                 :     );
      92 CBC         574 :     ensure!(
      93             574 :         pg_control.checkPointCopy.redo == pgdata_lsn.0,
      94 UBC           0 :         "unexpected checkpoint REDO pointer"
      95                 :     );
      96                 : 
      97                 :     // Import WAL. This is needed even when starting from a shutdown checkpoint, because
      98                 :     // this reads the checkpoint record itself, advancing the tip of the timeline to
      99                 :     // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
     100 CBC         574 :     import_wal(
     101             574 :         &pgdata_path.join("pg_wal"),
     102             574 :         tline,
     103             574 :         Lsn(pg_control.checkPointCopy.redo),
     104             574 :         pgdata_lsn,
     105             574 :         ctx,
     106             574 :     )
     107 UBC           0 :     .await?;
     108                 : 
     109 CBC         574 :     Ok(())
     110             574 : }
     111                 : 
     112                 : // subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
     113          538130 : async fn import_rel(
     114          538130 :     modification: &mut DatadirModification<'_>,
     115          538130 :     path: &Path,
     116          538130 :     spcoid: Oid,
     117          538130 :     dboid: Oid,
     118          538130 :     reader: &mut (impl AsyncRead + Unpin),
     119          538130 :     len: usize,
     120          538130 :     ctx: &RequestContext,
     121          538130 : ) -> anyhow::Result<()> {
     122                 :     // Does it look like a relation file?
     123 UBC           0 :     trace!("importing rel file {}", path.display());
     124                 : 
     125 CBC      538130 :     let filename = &path
     126          538130 :         .file_name()
     127          538130 :         .expect("missing rel filename")
     128          538130 :         .to_string_lossy();
     129          538130 :     let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
     130 UBC           0 :         warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
     131               0 :         e
     132 CBC      538130 :     })?;
     133                 : 
     134          538130 :     let mut buf: [u8; 8192] = [0u8; 8192];
     135          538130 : 
     136          538130 :     ensure!(len % BLCKSZ as usize == 0);
     137          538130 :     let nblocks = len / BLCKSZ as usize;
     138          538130 : 
     139          538130 :     let rel = RelTag {
     140          538130 :         spcnode: spcoid,
     141          538130 :         dbnode: dboid,
     142          538130 :         relnode,
     143          538130 :         forknum,
     144          538130 :     };
     145          538130 : 
     146          538130 :     let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     147                 : 
     148                 :     // Call put_rel_creation for every segment of the relation,
     149                 :     // because there is no guarantee about the order in which we are processing segments.
     150                 :     // ignore "relation already exists" error
     151                 :     //
     152                 :     // FIXME: Keep track of which relations we've already created?
     153                 :     // https://github.com/neondatabase/neon/issues/3309
     154          538130 :     if let Err(e) = modification
     155          538130 :         .put_rel_creation(rel, nblocks as u32, ctx)
     156 UBC           0 :         .await
     157                 :     {
     158               0 :         match e {
     159                 :             RelationError::AlreadyExists => {
     160               0 :                 debug!("Relation {} already exist. We must be extending it.", rel)
     161                 :             }
     162               0 :             _ => return Err(e.into()),
     163                 :         }
     164 CBC      538130 :     }
     165                 : 
     166                 :     loop {
     167         2335151 :         let r = reader.read_exact(&mut buf).await;
     168         2335151 :         match r {
     169                 :             Ok(_) => {
     170         1797021 :                 modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
     171                 :             }
     172                 : 
     173                 :             // TODO: UnexpectedEof is expected
     174          538130 :             Err(err) => match err.kind() {
     175                 :                 std::io::ErrorKind::UnexpectedEof => {
     176                 :                     // reached EOF. That's expected.
     177          538130 :                     let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     178          538130 :                     ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
     179          538130 :                     break;
     180                 :                 }
     181                 :                 _ => {
     182 UBC           0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     183                 :                 }
     184                 :             },
     185                 :         };
     186 CBC     1797021 :         blknum += 1;
     187                 :     }
     188                 : 
     189                 :     // Update relation size
     190                 :     //
     191                 :     // If we process rel segments out of order,
     192                 :     // put_rel_extend will skip the update.
     193          538130 :     modification.put_rel_extend(rel, blknum, ctx).await?;
     194                 : 
     195          538130 :     Ok(())
     196          538130 : }
     197                 : 
     198                 : /// Import an SLRU segment file
     199                 : ///
     200            1734 : async fn import_slru(
     201            1734 :     modification: &mut DatadirModification<'_>,
     202            1734 :     slru: SlruKind,
     203            1734 :     path: &Path,
     204            1734 :     reader: &mut (impl AsyncRead + Unpin),
     205            1734 :     len: usize,
     206            1734 :     ctx: &RequestContext,
     207            1734 : ) -> anyhow::Result<()> {
     208            1734 :     info!("importing slru file {path:?}");
     209                 : 
     210            1734 :     let mut buf: [u8; 8192] = [0u8; 8192];
     211            1734 :     let filename = &path
     212            1734 :         .file_name()
     213            1734 :         .with_context(|| format!("missing slru filename for path {path:?}"))?
     214            1734 :         .to_string_lossy();
     215            1734 :     let segno = u32::from_str_radix(filename, 16)?;
     216                 : 
     217            1734 :     ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
     218            1734 :     let nblocks = len / BLCKSZ as usize;
     219            1734 : 
     220            1734 :     ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
     221                 : 
     222            1734 :     modification
     223            1734 :         .put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
     224 UBC           0 :         .await?;
     225                 : 
     226 CBC        1734 :     let mut rpageno = 0;
     227                 :     loop {
     228            3468 :         let r = reader.read_exact(&mut buf).await;
     229            3468 :         match r {
     230                 :             Ok(_) => {
     231            1734 :                 modification.put_slru_page_image(
     232            1734 :                     slru,
     233            1734 :                     segno,
     234            1734 :                     rpageno,
     235            1734 :                     Bytes::copy_from_slice(&buf),
     236            1734 :                 )?;
     237                 :             }
     238                 : 
     239                 :             // TODO: UnexpectedEof is expected
     240            1734 :             Err(err) => match err.kind() {
     241                 :                 std::io::ErrorKind::UnexpectedEof => {
     242                 :                     // reached EOF. That's expected.
     243            1734 :                     ensure!(rpageno == nblocks as u32, "unexpected EOF");
     244            1734 :                     break;
     245                 :                 }
     246                 :                 _ => {
     247 UBC           0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     248                 :                 }
     249                 :             },
     250                 :         };
     251 CBC        1734 :         rpageno += 1;
     252                 :     }
     253                 : 
     254            1734 :     Ok(())
     255            1734 : }
     256                 : 
     257                 : /// Scan PostgreSQL WAL files in given directory and load all records between
     258                 : /// 'startpoint' and 'endpoint' into the repository.
     259             574 : async fn import_wal(
     260             574 :     walpath: &Utf8Path,
     261             574 :     tline: &Timeline,
     262             574 :     startpoint: Lsn,
     263             574 :     endpoint: Lsn,
     264             574 :     ctx: &RequestContext,
     265             574 : ) -> anyhow::Result<()> {
     266             574 :     let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
     267             574 : 
     268             574 :     let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
     269             574 :     let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
     270             574 :     let mut last_lsn = startpoint;
     271                 : 
     272             574 :     let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
     273                 : 
     274            1148 :     while last_lsn <= endpoint {
     275                 :         // FIXME: assume postgresql tli 1 for now
     276             574 :         let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     277             574 :         let mut buf = Vec::new();
     278             574 : 
     279             574 :         // Read local file
     280             574 :         let mut path = walpath.join(&filename);
     281             574 : 
     282             574 :         // It could be as .partial
     283             574 :         if !PathBuf::from(&path).exists() {
     284 UBC           0 :             path = walpath.join(filename + ".partial");
     285 CBC         574 :         }
     286                 : 
     287                 :         // Slurp the WAL file
     288             574 :         let mut file = std::fs::File::open(&path)?;
     289                 : 
     290             574 :         if offset > 0 {
     291                 :             use std::io::Seek;
     292             574 :             file.seek(std::io::SeekFrom::Start(offset as u64))?;
     293 UBC           0 :         }
     294                 : 
     295                 :         use std::io::Read;
     296 CBC         574 :         let nread = file.read_to_end(&mut buf)?;
     297             574 :         if nread != WAL_SEGMENT_SIZE - offset {
     298                 :             // Maybe allow this for .partial files?
     299 UBC           0 :             error!("read only {} bytes from WAL file", nread);
     300 CBC         574 :         }
     301                 : 
     302             574 :         waldecoder.feed_bytes(&buf);
     303             574 : 
     304             574 :         let mut nrecords = 0;
     305             574 :         let mut modification = tline.begin_modification(endpoint);
     306             574 :         let mut decoded = DecodedWALRecord::default();
     307            1148 :         while last_lsn <= endpoint {
     308             574 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     309             574 :                 walingest
     310             574 :                     .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
     311 UBC           0 :                     .await?;
     312 CBC         574 :                 last_lsn = lsn;
     313             574 : 
     314             574 :                 nrecords += 1;
     315                 : 
     316 UBC           0 :                 trace!("imported record at {} (end {})", lsn, endpoint);
     317               0 :             }
     318                 :         }
     319                 : 
     320               0 :         debug!("imported {} records up to {}", nrecords, last_lsn);
     321                 : 
     322 CBC         574 :         segno += 1;
     323             574 :         offset = 0;
     324                 :     }
     325                 : 
     326             574 :     if last_lsn != startpoint {
     327             574 :         info!("reached end of WAL at {}", last_lsn);
     328                 :     } else {
     329 UBC           0 :         info!("no WAL to import at {}", last_lsn);
     330                 :     }
     331                 : 
     332 CBC         574 :     Ok(())
     333             574 : }
     334                 : 
     335               5 : pub async fn import_basebackup_from_tar(
     336               5 :     tline: &Timeline,
     337               5 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     338               5 :     base_lsn: Lsn,
     339               5 :     ctx: &RequestContext,
     340               5 : ) -> Result<()> {
     341               5 :     info!("importing base at {base_lsn}");
     342               5 :     let mut modification = tline.begin_modification(base_lsn);
     343               5 :     modification.init_empty()?;
     344                 : 
     345               5 :     let mut pg_control: Option<ControlFileData> = None;
     346                 : 
     347                 :     // Import base
     348               5 :     let mut entries = Archive::new(reader).entries()?;
     349            3917 :     while let Some(base_tar_entry) = entries.next().await {
     350            3912 :         let mut entry = base_tar_entry?;
     351            3912 :         let header = entry.header();
     352            3912 :         let len = header.entry_size()? as usize;
     353            3912 :         let file_path = header.path()?.into_owned();
     354            3912 : 
     355            3912 :         match header.entry_type() {
     356                 :             tokio_tar::EntryType::Regular => {
     357               3 :                 if let Some(res) =
     358            3812 :                     import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
     359               3 :                 {
     360               3 :                     // We found the pg_control file.
     361               3 :                     pg_control = Some(res);
     362            3809 :                 }
     363            3812 :                 modification.flush(ctx).await?;
     364                 :             }
     365                 :             tokio_tar::EntryType::Directory => {
     366 UBC           0 :                 debug!("directory {:?}", file_path);
     367                 :             }
     368                 :             _ => {
     369               0 :                 bail!(
     370               0 :                     "entry {} in backup tar archive is of unexpected type: {:?}",
     371               0 :                     file_path.display(),
     372               0 :                     header.entry_type()
     373               0 :                 );
     374                 :             }
     375                 :         }
     376                 :     }
     377                 : 
     378                 :     // sanity check: ensure that pg_control is loaded
     379 CBC           5 :     let _pg_control = pg_control.context("pg_control file not found")?;
     380                 : 
     381             382 :     modification.commit(ctx).await?;
     382               3 :     Ok(())
     383               5 : }
     384                 : 
     385               2 : pub async fn import_wal_from_tar(
     386               2 :     tline: &Timeline,
     387               2 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     388               2 :     start_lsn: Lsn,
     389               2 :     end_lsn: Lsn,
     390               2 :     ctx: &RequestContext,
     391               2 : ) -> Result<()> {
     392               2 :     // Set up walingest mutable state
     393               2 :     let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
     394               2 :     let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
     395               2 :     let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
     396               2 :     let mut last_lsn = start_lsn;
     397               2 :     let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
     398                 : 
     399                 :     // Ingest wal until end_lsn
     400               2 :     info!("importing wal until {}", end_lsn);
     401               2 :     let mut pg_wal_tar = Archive::new(reader);
     402               2 :     let mut pg_wal_entries = pg_wal_tar.entries()?;
     403               4 :     while last_lsn <= end_lsn {
     404               2 :         let bytes = {
     405               2 :             let mut entry = pg_wal_entries
     406               2 :                 .next()
     407               2 :                 .await
     408               2 :                 .ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
     409               2 :             let header = entry.header();
     410               2 :             let file_path = header.path()?.into_owned();
     411               2 : 
     412               2 :             match header.entry_type() {
     413                 :                 tokio_tar::EntryType::Regular => {
     414                 :                     // FIXME: assume postgresql tli 1 for now
     415               2 :                     let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     416               2 :                     let file_name = file_path
     417               2 :                         .file_name()
     418               2 :                         .expect("missing wal filename")
     419               2 :                         .to_string_lossy();
     420               2 :                     ensure!(expected_filename == file_name);
     421                 : 
     422 UBC           0 :                     debug!("processing wal file {:?}", file_path);
     423 CBC         240 :                     read_all_bytes(&mut entry).await?
     424                 :                 }
     425                 :                 tokio_tar::EntryType::Directory => {
     426 UBC           0 :                     debug!("directory {:?}", file_path);
     427               0 :                     continue;
     428                 :                 }
     429                 :                 _ => {
     430               0 :                     bail!(
     431               0 :                         "entry {} in WAL tar archive is of unexpected type: {:?}",
     432               0 :                         file_path.display(),
     433               0 :                         header.entry_type()
     434               0 :                     );
     435                 :                 }
     436                 :             }
     437                 :         };
     438                 : 
     439 CBC           2 :         waldecoder.feed_bytes(&bytes[offset..]);
     440               2 : 
     441               2 :         let mut modification = tline.begin_modification(end_lsn);
     442               2 :         let mut decoded = DecodedWALRecord::default();
     443              10 :         while last_lsn <= end_lsn {
     444               8 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     445               8 :                 walingest
     446               8 :                     .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
     447 UBC           0 :                     .await?;
     448 CBC           8 :                 last_lsn = lsn;
     449                 : 
     450 UBC           0 :                 debug!("imported record at {} (end {})", lsn, end_lsn);
     451               0 :             }
     452                 :         }
     453                 : 
     454               0 :         debug!("imported records up to {}", last_lsn);
     455 CBC           2 :         segno += 1;
     456               2 :         offset = 0;
     457                 :     }
     458                 : 
     459               2 :     if last_lsn != start_lsn {
     460               2 :         info!("reached end of WAL at {}", last_lsn);
     461                 :     } else {
     462 UBC           0 :         info!("there was no WAL to import at {}", last_lsn);
     463                 :     }
     464                 : 
     465                 :     // Log any extra unused files
     466 CBC           2 :     while let Some(e) = pg_wal_entries.next().await {
     467 UBC           0 :         let entry = e?;
     468               0 :         let header = entry.header();
     469               0 :         let file_path = header.path()?.into_owned();
     470               0 :         info!("skipping {:?}", file_path);
     471                 :     }
     472                 : 
     473 CBC           2 :     Ok(())
     474               2 : }
     475                 : 
     476          548538 : async fn import_file(
     477          548538 :     modification: &mut DatadirModification<'_>,
     478          548538 :     file_path: &Path,
     479          548538 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     480          548538 :     len: usize,
     481          548538 :     ctx: &RequestContext,
     482          548538 : ) -> Result<Option<ControlFileData>> {
     483          548538 :     let file_name = match file_path.file_name() {
     484          548538 :         Some(name) => name.to_string_lossy(),
     485 UBC           0 :         None => return Ok(None),
     486                 :     };
     487                 : 
     488 CBC      548538 :     if file_name.starts_with('.') {
     489                 :         // tar archives on macOs, created without COPYFILE_DISABLE=1 env var
     490                 :         // will contain "fork files", skip them.
     491 UBC           0 :         return Ok(None);
     492 CBC      548538 :     }
     493          548538 : 
     494          548538 :     if file_path.starts_with("global") {
     495           32945 :         let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
     496           32945 :         let dbnode = 0;
     497           32945 : 
     498           32945 :         match file_name.as_ref() {
     499           32945 :             "pg_control" => {
     500            5668 :                 let bytes = read_all_bytes(reader).await?;
     501                 : 
     502                 :                 // Extract the checkpoint record and import it separately.
     503             577 :                 let pg_control = ControlFileData::decode(&bytes[..])?;
     504             577 :                 let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
     505             577 :                 modification.put_checkpoint(checkpoint_bytes)?;
     506 UBC           0 :                 debug!("imported control file");
     507                 : 
     508                 :                 // Import it as ControlFile
     509 CBC         577 :                 modification.put_control_file(bytes)?;
     510             577 :                 return Ok(Some(pg_control));
     511                 :             }
     512           32368 :             "pg_filenode.map" => {
     513            3408 :                 let bytes = read_all_bytes(reader).await?;
     514             578 :                 modification
     515             578 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     516 UBC           0 :                     .await?;
     517               0 :                 debug!("imported relmap file")
     518                 :             }
     519 CBC       31790 :             "PG_VERSION" => {
     520 UBC           0 :                 debug!("ignored PG_VERSION file");
     521                 :             }
     522                 :             _ => {
     523 CBC       68482 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     524 UBC           0 :                 debug!("imported rel creation");
     525                 :             }
     526                 :         }
     527 CBC      515593 :     } else if file_path.starts_with("base") {
     528          509808 :         let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
     529          509808 :         let dbnode: u32 = file_path
     530          509808 :             .iter()
     531          509808 :             .nth(1)
     532          509808 :             .expect("invalid file path, expected dbnode")
     533          509808 :             .to_string_lossy()
     534          509808 :             .parse()?;
     535                 : 
     536          509808 :         match file_name.as_ref() {
     537          509808 :             "pg_filenode.map" => {
     538           10183 :                 let bytes = read_all_bytes(reader).await?;
     539            1734 :                 modification
     540            1734 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     541 UBC           0 :                     .await?;
     542               0 :                 debug!("imported relmap file")
     543                 :             }
     544 CBC      508074 :             "PG_VERSION" => {
     545 UBC           0 :                 debug!("ignored PG_VERSION file");
     546                 :             }
     547                 :             _ => {
     548 CBC     2211852 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     549 UBC           0 :                 debug!("imported rel creation");
     550                 :             }
     551                 :         }
     552 CBC        5785 :     } else if file_path.starts_with("pg_xact") {
     553             578 :         let slru = SlruKind::Clog;
     554             578 : 
     555            1137 :         import_slru(modification, slru, file_path, reader, len, ctx).await?;
     556 UBC           0 :         debug!("imported clog slru");
     557 CBC        5207 :     } else if file_path.starts_with("pg_multixact/offsets") {
     558             578 :         let slru = SlruKind::MultiXactOffsets;
     559             578 : 
     560            1148 :         import_slru(modification, slru, file_path, reader, len, ctx).await?;
     561 UBC           0 :         debug!("imported multixact offsets slru");
     562 CBC        4629 :     } else if file_path.starts_with("pg_multixact/members") {
     563             578 :         let slru = SlruKind::MultiXactMembers;
     564             578 : 
     565            1148 :         import_slru(modification, slru, file_path, reader, len, ctx).await?;
     566 UBC           0 :         debug!("imported multixact members slru");
     567 CBC        4051 :     } else if file_path.starts_with("pg_twophase") {
     568 UBC           0 :         let xid = u32::from_str_radix(file_name.as_ref(), 16)?;
     569                 : 
     570               0 :         let bytes = read_all_bytes(reader).await?;
     571               0 :         modification
     572               0 :             .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
     573               0 :             .await?;
     574               0 :         debug!("imported twophase file");
     575 CBC        4051 :     } else if file_path.starts_with("pg_wal") {
     576 UBC           0 :         debug!("found wal file in base section. ignore it");
     577 CBC        4050 :     } else if file_path.starts_with("zenith.signal") {
     578                 :         // Parse zenith signal file to set correct previous LSN
     579               1 :         let bytes = read_all_bytes(reader).await?;
     580                 :         // zenith.signal format is "PREV LSN: prev_lsn"
     581                 :         // TODO write serialization and deserialization in the same place.
     582               1 :         let zenith_signal = std::str::from_utf8(&bytes)?.trim();
     583               1 :         let prev_lsn = match zenith_signal {
     584               1 :             "PREV LSN: none" => Lsn(0),
     585               1 :             "PREV LSN: invalid" => Lsn(0),
     586               1 :             other => {
     587               1 :                 let split = other.split(':').collect::<Vec<_>>();
     588               1 :                 split[1]
     589               1 :                     .trim()
     590               1 :                     .parse::<Lsn>()
     591               1 :                     .context("can't parse zenith.signal")?
     592                 :             }
     593                 :         };
     594                 : 
     595                 :         // zenith.signal is not necessarily the last file, that we handle
     596                 :         // but it is ok to call `finish_write()`, because final `modification.commit()`
     597                 :         // will update lsn once more to the final one.
     598               1 :         let writer = modification.tline.writer().await;
     599               1 :         writer.finish_write(prev_lsn);
     600                 : 
     601 UBC           0 :         debug!("imported zenith signal {}", prev_lsn);
     602 CBC        4049 :     } else if file_path.starts_with("pg_tblspc") {
     603                 :         // TODO Backups exported from neon won't have pg_tblspc, but we will need
     604                 :         // this to import arbitrary postgres databases.
     605 UBC           0 :         bail!("Importing pg_tblspc is not implemented");
     606                 :     } else {
     607               0 :         debug!(
     608               0 :             "ignoring unrecognized file \"{}\" in tar archive",
     609               0 :             file_path.display()
     610               0 :         );
     611                 :     }
     612                 : 
     613 CBC      547961 :     Ok(None)
     614          548538 : }
     615                 : 
     616            2892 : async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
     617            2892 :     let mut buf: Vec<u8> = vec![];
     618           19499 :     reader.read_to_end(&mut buf).await?;
     619            2892 :     Ok(Bytes::from(buf))
     620            2892 : }
        

Generated by: LCOV version 2.1-beta