LCOV - differential code coverage report
Current view: top level - pageserver/src - import_datadir.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 84.3 % 466 393 73 393
Current Date: 2024-01-09 02:06:09 Functions: 34.6 % 104 36 68 36
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //!
       2                 : //! Import data and WAL from a PostgreSQL data directory and WAL segments into
       3                 : //! a neon Timeline.
       4                 : //!
       5                 : use std::io::SeekFrom;
       6                 : use std::path::{Path, PathBuf};
       7                 : 
       8                 : use anyhow::{bail, ensure, Context, Result};
       9                 : use async_compression::tokio::bufread::ZstdDecoder;
      10                 : use async_compression::{tokio::write::ZstdEncoder, zstd::CParameter, Level};
      11                 : use bytes::Bytes;
      12                 : use camino::Utf8Path;
      13                 : use futures::StreamExt;
      14                 : use nix::NixPath;
      15                 : use tokio::fs::{File, OpenOptions};
      16                 : use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
      17                 : use tokio_tar::Archive;
      18                 : use tokio_tar::Builder;
      19                 : use tokio_tar::HeaderMode;
      20                 : use tracing::*;
      21                 : use walkdir::WalkDir;
      22                 : 
      23                 : use crate::context::RequestContext;
      24                 : use crate::metrics::WAL_INGEST;
      25                 : use crate::pgdatadir_mapping::*;
      26                 : use crate::tenant::remote_timeline_client::INITDB_PATH;
      27                 : use crate::tenant::Timeline;
      28                 : use crate::walingest::WalIngest;
      29                 : use crate::walrecord::DecodedWALRecord;
      30                 : use pageserver_api::reltag::{RelTag, SlruKind};
      31                 : use postgres_ffi::pg_constants;
      32                 : use postgres_ffi::relfile_utils::*;
      33                 : use postgres_ffi::waldecoder::WalStreamDecoder;
      34                 : use postgres_ffi::ControlFileData;
      35                 : use postgres_ffi::DBState_DB_SHUTDOWNED;
      36                 : use postgres_ffi::Oid;
      37                 : use postgres_ffi::XLogFileName;
      38                 : use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
      39                 : use utils::lsn::Lsn;
      40                 : 
      41                 : // Returns checkpoint LSN from controlfile
      42 CBC         525 : pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
      43             525 :     // Read control file to extract the LSN
      44             525 :     let controlfile_path = path.join("global").join("pg_control");
      45             525 :     let controlfile_buf = std::fs::read(&controlfile_path)
      46             525 :         .with_context(|| format!("reading controlfile: {controlfile_path}"))?;
      47             525 :     let controlfile = ControlFileData::decode(&controlfile_buf)?;
      48             525 :     let lsn = controlfile.checkPoint;
      49             525 : 
      50             525 :     Ok(Lsn(lsn))
      51             525 : }
      52                 : 
      53                 : ///
      54                 : /// Import all relation data pages from local disk into the repository.
      55                 : ///
      56                 : /// This is currently only used to import a cluster freshly created by initdb.
      57                 : /// The code that deals with the checkpoint would not work right if the
      58                 : /// cluster was not shut down cleanly.
      59             524 : pub async fn import_timeline_from_postgres_datadir(
      60             524 :     tline: &Timeline,
      61             524 :     pgdata_path: &Utf8Path,
      62             524 :     pgdata_lsn: Lsn,
      63             524 :     ctx: &RequestContext,
      64             524 : ) -> Result<()> {
      65             524 :     let mut pg_control: Option<ControlFileData> = None;
      66             524 : 
      67             524 :     // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
      68             524 :     // Then fishing out pg_control would be unnecessary
      69             524 :     let mut modification = tline.begin_modification(pgdata_lsn);
      70             524 :     modification.init_empty()?;
      71                 : 
      72                 :     // Import all but pg_wal
      73             524 :     let all_but_wal = WalkDir::new(pgdata_path)
      74             524 :         .into_iter()
      75          510392 :         .filter_entry(|entry| !entry.path().ends_with("pg_wal"));
      76          510392 :     for entry in all_but_wal {
      77          509868 :         let entry = entry?;
      78          509868 :         let metadata = entry.metadata().expect("error getting dir entry metadata");
      79          509868 :         if metadata.is_file() {
      80          497292 :             let absolute_path = entry.path();
      81          497292 :             let relative_path = absolute_path.strip_prefix(pgdata_path)?;
      82                 : 
      83          497292 :             let mut file = tokio::fs::File::open(absolute_path).await?;
      84          497292 :             let len = metadata.len() as usize;
      85             524 :             if let Some(control_file) =
      86         2107906 :                 import_file(&mut modification, relative_path, &mut file, len, ctx).await?
      87             524 :             {
      88             524 :                 pg_control = Some(control_file);
      89          496768 :             }
      90          497292 :             modification.flush(ctx).await?;
      91           12576 :         }
      92                 :     }
      93                 : 
      94                 :     // We're done importing all the data files.
      95           12573 :     modification.commit(ctx).await?;
      96                 : 
      97                 :     // We expect the Postgres server to be shut down cleanly.
      98             524 :     let pg_control = pg_control.context("pg_control file not found")?;
      99                 :     ensure!(
     100             524 :         pg_control.state == DBState_DB_SHUTDOWNED,
     101 UBC           0 :         "Postgres cluster was not shut down cleanly"
     102                 :     );
     103                 :     ensure!(
     104 CBC         524 :         pg_control.checkPointCopy.redo == pgdata_lsn.0,
     105 UBC           0 :         "unexpected checkpoint REDO pointer"
     106                 :     );
     107                 : 
     108                 :     // Import WAL. This is needed even when starting from a shutdown checkpoint, because
     109                 :     // this reads the checkpoint record itself, advancing the tip of the timeline to
     110                 :     // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
     111 CBC         524 :     import_wal(
     112             524 :         &pgdata_path.join("pg_wal"),
     113             524 :         tline,
     114             524 :         Lsn(pg_control.checkPointCopy.redo),
     115             524 :         pgdata_lsn,
     116             524 :         ctx,
     117             524 :     )
     118 UBC           0 :     .await?;
     119                 : 
     120 CBC         524 :     Ok(())
     121             524 : }
     122                 : 
     123                 : // subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
     124          497201 : async fn import_rel(
     125          497201 :     modification: &mut DatadirModification<'_>,
     126          497201 :     path: &Path,
     127          497201 :     spcoid: Oid,
     128          497201 :     dboid: Oid,
     129          497201 :     reader: &mut (impl AsyncRead + Unpin),
     130          497201 :     len: usize,
     131          497201 :     ctx: &RequestContext,
     132          497201 : ) -> anyhow::Result<()> {
     133                 :     // Does it look like a relation file?
     134 UBC           0 :     trace!("importing rel file {}", path.display());
     135                 : 
     136 CBC      497201 :     let filename = &path
     137          497201 :         .file_name()
     138          497201 :         .expect("missing rel filename")
     139          497201 :         .to_string_lossy();
     140          497201 :     let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
     141 UBC           0 :         warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
     142               0 :         e
     143 CBC      497201 :     })?;
     144                 : 
     145          497201 :     let mut buf: [u8; 8192] = [0u8; 8192];
     146          497201 : 
     147          497201 :     ensure!(len % BLCKSZ as usize == 0);
     148          497201 :     let nblocks = len / BLCKSZ as usize;
     149          497201 : 
     150          497201 :     let rel = RelTag {
     151          497201 :         spcnode: spcoid,
     152          497201 :         dbnode: dboid,
     153          497201 :         relnode,
     154          497201 :         forknum,
     155          497201 :     };
     156          497201 : 
     157          497201 :     let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     158                 : 
     159                 :     // Call put_rel_creation for every segment of the relation,
     160                 :     // because there is no guarantee about the order in which we are processing segments.
     161                 :     // ignore "relation already exists" error
     162                 :     //
     163                 :     // FIXME: Keep track of which relations we've already created?
     164                 :     // https://github.com/neondatabase/neon/issues/3309
     165          497201 :     if let Err(e) = modification
     166          497201 :         .put_rel_creation(rel, nblocks as u32, ctx)
     167 UBC           0 :         .await
     168                 :     {
     169               0 :         match e {
     170                 :             RelationError::AlreadyExists => {
     171               0 :                 debug!("Relation {} already exist. We must be extending it.", rel)
     172                 :             }
     173               0 :             _ => return Err(e.into()),
     174                 :         }
     175 CBC      497201 :     }
     176                 : 
     177                 :     loop {
     178         2159304 :         let r = reader.read_exact(&mut buf).await;
     179         2159304 :         match r {
     180                 :             Ok(_) => {
     181         1662103 :                 modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
     182                 :             }
     183                 : 
     184                 :             // TODO: UnexpectedEof is expected
     185          497201 :             Err(err) => match err.kind() {
     186                 :                 std::io::ErrorKind::UnexpectedEof => {
     187                 :                     // reached EOF. That's expected.
     188          497201 :                     let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     189          497201 :                     ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
     190          497201 :                     break;
     191                 :                 }
     192                 :                 _ => {
     193 UBC           0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     194                 :                 }
     195                 :             },
     196                 :         };
     197 CBC     1662103 :         blknum += 1;
     198                 :     }
     199                 : 
     200                 :     // Update relation size
     201                 :     //
     202                 :     // If we process rel segments out of order,
     203                 :     // put_rel_extend will skip the update.
     204          497201 :     modification.put_rel_extend(rel, blknum, ctx).await?;
     205                 : 
     206          497201 :     Ok(())
     207          497201 : }
     208                 : 
     209                 : /// Import an SLRU segment file
     210                 : ///
     211            1602 : async fn import_slru(
     212            1602 :     modification: &mut DatadirModification<'_>,
     213            1602 :     slru: SlruKind,
     214            1602 :     path: &Path,
     215            1602 :     reader: &mut (impl AsyncRead + Unpin),
     216            1602 :     len: usize,
     217            1602 :     ctx: &RequestContext,
     218            1602 : ) -> anyhow::Result<()> {
     219            1602 :     info!("importing slru file {path:?}");
     220                 : 
     221            1602 :     let mut buf: [u8; 8192] = [0u8; 8192];
     222            1602 :     let filename = &path
     223            1602 :         .file_name()
     224            1602 :         .with_context(|| format!("missing slru filename for path {path:?}"))?
     225            1602 :         .to_string_lossy();
     226            1602 :     let segno = u32::from_str_radix(filename, 16)?;
     227                 : 
     228            1602 :     ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
     229            1602 :     let nblocks = len / BLCKSZ as usize;
     230            1602 : 
     231            1602 :     ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
     232                 : 
     233            1602 :     modification
     234            1602 :         .put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
     235 UBC           0 :         .await?;
     236                 : 
     237 CBC        1602 :     let mut rpageno = 0;
     238                 :     loop {
     239            3204 :         let r = reader.read_exact(&mut buf).await;
     240            3204 :         match r {
     241                 :             Ok(_) => {
     242            1602 :                 modification.put_slru_page_image(
     243            1602 :                     slru,
     244            1602 :                     segno,
     245            1602 :                     rpageno,
     246            1602 :                     Bytes::copy_from_slice(&buf),
     247            1602 :                 )?;
     248                 :             }
     249                 : 
     250                 :             // TODO: UnexpectedEof is expected
     251            1602 :             Err(err) => match err.kind() {
     252                 :                 std::io::ErrorKind::UnexpectedEof => {
     253                 :                     // reached EOF. That's expected.
     254            1602 :                     ensure!(rpageno == nblocks as u32, "unexpected EOF");
     255            1602 :                     break;
     256                 :                 }
     257                 :                 _ => {
     258 UBC           0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     259                 :                 }
     260                 :             },
     261                 :         };
     262 CBC        1602 :         rpageno += 1;
     263                 :     }
     264                 : 
     265            1602 :     Ok(())
     266            1602 : }
     267                 : 
     268                 : /// Scan PostgreSQL WAL files in given directory and load all records between
     269                 : /// 'startpoint' and 'endpoint' into the repository.
     270             524 : async fn import_wal(
     271             524 :     walpath: &Utf8Path,
     272             524 :     tline: &Timeline,
     273             524 :     startpoint: Lsn,
     274             524 :     endpoint: Lsn,
     275             524 :     ctx: &RequestContext,
     276             524 : ) -> anyhow::Result<()> {
     277             524 :     let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
     278             524 : 
     279             524 :     let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
     280             524 :     let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
     281             524 :     let mut last_lsn = startpoint;
     282                 : 
     283             524 :     let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
     284                 : 
     285            1048 :     while last_lsn <= endpoint {
     286                 :         // FIXME: assume postgresql tli 1 for now
     287             524 :         let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     288             524 :         let mut buf = Vec::new();
     289             524 : 
     290             524 :         // Read local file
     291             524 :         let mut path = walpath.join(&filename);
     292             524 : 
     293             524 :         // It could be as .partial
     294             524 :         if !PathBuf::from(&path).exists() {
     295 UBC           0 :             path = walpath.join(filename + ".partial");
     296 CBC         524 :         }
     297                 : 
     298                 :         // Slurp the WAL file
     299             524 :         let mut file = std::fs::File::open(&path)?;
     300                 : 
     301             524 :         if offset > 0 {
     302                 :             use std::io::Seek;
     303             524 :             file.seek(std::io::SeekFrom::Start(offset as u64))?;
     304 UBC           0 :         }
     305                 : 
     306                 :         use std::io::Read;
     307 CBC         524 :         let nread = file.read_to_end(&mut buf)?;
     308             524 :         if nread != WAL_SEGMENT_SIZE - offset {
     309                 :             // Maybe allow this for .partial files?
     310 UBC           0 :             error!("read only {} bytes from WAL file", nread);
     311 CBC         524 :         }
     312                 : 
     313             524 :         waldecoder.feed_bytes(&buf);
     314             524 : 
     315             524 :         let mut nrecords = 0;
     316             524 :         let mut modification = tline.begin_modification(last_lsn);
     317             524 :         let mut decoded = DecodedWALRecord::default();
     318            1048 :         while last_lsn <= endpoint {
     319             524 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     320             524 :                 walingest
     321             524 :                     .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
     322 UBC           0 :                     .await?;
     323 CBC         524 :                 WAL_INGEST.records_committed.inc();
     324             524 : 
     325             524 :                 modification.commit(ctx).await?;
     326             524 :                 last_lsn = lsn;
     327             524 : 
     328             524 :                 nrecords += 1;
     329                 : 
     330 UBC           0 :                 trace!("imported record at {} (end {})", lsn, endpoint);
     331               0 :             }
     332                 :         }
     333                 : 
     334               0 :         debug!("imported {} records up to {}", nrecords, last_lsn);
     335                 : 
     336 CBC         524 :         segno += 1;
     337             524 :         offset = 0;
     338                 :     }
     339                 : 
     340             524 :     if last_lsn != startpoint {
     341             524 :         info!("reached end of WAL at {}", last_lsn);
     342                 :     } else {
     343 UBC           0 :         info!("no WAL to import at {}", last_lsn);
     344                 :     }
     345                 : 
     346 CBC         524 :     Ok(())
     347             524 : }
     348                 : 
     349              11 : pub async fn import_basebackup_from_tar(
     350              11 :     tline: &Timeline,
     351              11 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     352              11 :     base_lsn: Lsn,
     353              11 :     ctx: &RequestContext,
     354              11 : ) -> Result<()> {
     355              11 :     info!("importing base at {base_lsn}");
     356              11 :     let mut modification = tline.begin_modification(base_lsn);
     357              11 :     modification.init_empty()?;
     358                 : 
     359              11 :     let mut pg_control: Option<ControlFileData> = None;
     360                 : 
     361                 :     // Import base
     362              11 :     let mut entries = Archive::new(reader).entries()?;
     363            9781 :     while let Some(base_tar_entry) = entries.next().await {
     364            9770 :         let mut entry = base_tar_entry?;
     365            9770 :         let header = entry.header();
     366            9770 :         let len = header.entry_size()? as usize;
     367            9770 :         let file_path = header.path()?.into_owned();
     368            9770 : 
     369            9770 :         match header.entry_type() {
     370                 :             tokio_tar::EntryType::Regular => {
     371               9 :                 if let Some(res) =
     372            9520 :                     import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
     373               9 :                 {
     374               9 :                     // We found the pg_control file.
     375               9 :                     pg_control = Some(res);
     376            9511 :                 }
     377            9520 :                 modification.flush(ctx).await?;
     378                 :             }
     379                 :             tokio_tar::EntryType::Directory => {
     380 UBC           0 :                 debug!("directory {:?}", file_path);
     381                 :             }
     382                 :             _ => {
     383               0 :                 bail!(
     384               0 :                     "entry {} in backup tar archive is of unexpected type: {:?}",
     385               0 :                     file_path.display(),
     386               0 :                     header.entry_type()
     387               0 :                 );
     388                 :             }
     389                 :         }
     390                 :     }
     391                 : 
     392                 :     // sanity check: ensure that pg_control is loaded
     393 CBC          11 :     let _pg_control = pg_control.context("pg_control file not found")?;
     394                 : 
     395             259 :     modification.commit(ctx).await?;
     396               9 :     Ok(())
     397              11 : }
     398                 : 
     399               2 : pub async fn import_wal_from_tar(
     400               2 :     tline: &Timeline,
     401               2 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     402               2 :     start_lsn: Lsn,
     403               2 :     end_lsn: Lsn,
     404               2 :     ctx: &RequestContext,
     405               2 : ) -> Result<()> {
     406               2 :     // Set up walingest mutable state
     407               2 :     let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
     408               2 :     let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
     409               2 :     let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
     410               2 :     let mut last_lsn = start_lsn;
     411               2 :     let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
     412                 : 
     413                 :     // Ingest wal until end_lsn
     414               2 :     info!("importing wal until {}", end_lsn);
     415               2 :     let mut pg_wal_tar = Archive::new(reader);
     416               2 :     let mut pg_wal_entries = pg_wal_tar.entries()?;
     417               4 :     while last_lsn <= end_lsn {
     418               2 :         let bytes = {
     419               2 :             let mut entry = pg_wal_entries
     420               2 :                 .next()
     421               2 :                 .await
     422               2 :                 .ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
     423               2 :             let header = entry.header();
     424               2 :             let file_path = header.path()?.into_owned();
     425               2 : 
     426               2 :             match header.entry_type() {
     427                 :                 tokio_tar::EntryType::Regular => {
     428                 :                     // FIXME: assume postgresql tli 1 for now
     429               2 :                     let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     430               2 :                     let file_name = file_path
     431               2 :                         .file_name()
     432               2 :                         .expect("missing wal filename")
     433               2 :                         .to_string_lossy();
     434               2 :                     ensure!(expected_filename == file_name);
     435                 : 
     436 UBC           0 :                     debug!("processing wal file {:?}", file_path);
     437 CBC        3773 :                     read_all_bytes(&mut entry).await?
     438                 :                 }
     439                 :                 tokio_tar::EntryType::Directory => {
     440 UBC           0 :                     debug!("directory {:?}", file_path);
     441               0 :                     continue;
     442                 :                 }
     443                 :                 _ => {
     444               0 :                     bail!(
     445               0 :                         "entry {} in WAL tar archive is of unexpected type: {:?}",
     446               0 :                         file_path.display(),
     447               0 :                         header.entry_type()
     448               0 :                     );
     449                 :                 }
     450                 :             }
     451                 :         };
     452                 : 
     453 CBC           2 :         waldecoder.feed_bytes(&bytes[offset..]);
     454               2 : 
     455               2 :         let mut modification = tline.begin_modification(last_lsn);
     456               2 :         let mut decoded = DecodedWALRecord::default();
     457              10 :         while last_lsn <= end_lsn {
     458               8 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     459               8 :                 walingest
     460               8 :                     .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
     461 UBC           0 :                     .await?;
     462 CBC           8 :                 modification.commit(ctx).await?;
     463               8 :                 last_lsn = lsn;
     464                 : 
     465 UBC           0 :                 debug!("imported record at {} (end {})", lsn, end_lsn);
     466               0 :             }
     467                 :         }
     468                 : 
     469               0 :         debug!("imported records up to {}", last_lsn);
     470 CBC           2 :         segno += 1;
     471               2 :         offset = 0;
     472                 :     }
     473                 : 
     474               2 :     if last_lsn != start_lsn {
     475               2 :         info!("reached end of WAL at {}", last_lsn);
     476                 :     } else {
     477 UBC           0 :         info!("there was no WAL to import at {}", last_lsn);
     478                 :     }
     479                 : 
     480                 :     // Log any extra unused files
     481 CBC           2 :     while let Some(e) = pg_wal_entries.next().await {
     482 UBC           0 :         let entry = e?;
     483               0 :         let header = entry.header();
     484               0 :         let file_path = header.path()?.into_owned();
     485               0 :         info!("skipping {:?}", file_path);
     486                 :     }
     487                 : 
     488 CBC           2 :     Ok(())
     489               2 : }
     490                 : 
     491          506812 : async fn import_file(
     492          506812 :     modification: &mut DatadirModification<'_>,
     493          506812 :     file_path: &Path,
     494          506812 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     495          506812 :     len: usize,
     496          506812 :     ctx: &RequestContext,
     497          506812 : ) -> Result<Option<ControlFileData>> {
     498          506812 :     let file_name = match file_path.file_name() {
     499          506812 :         Some(name) => name.to_string_lossy(),
     500 UBC           0 :         None => return Ok(None),
     501                 :     };
     502                 : 
     503 CBC      506812 :     if file_name.starts_with('.') {
     504                 :         // tar archives on macOs, created without COPYFILE_DISABLE=1 env var
     505                 :         // will contain "fork files", skip them.
     506 UBC           0 :         return Ok(None);
     507 CBC      506812 :     }
     508          506812 : 
     509          506812 :     if file_path.starts_with("global") {
     510           30440 :         let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
     511           30440 :         let dbnode = 0;
     512           30440 : 
     513           30440 :         match file_name.as_ref() {
     514           30440 :             "pg_control" => {
     515            5203 :                 let bytes = read_all_bytes(reader).await?;
     516                 : 
     517                 :                 // Extract the checkpoint record and import it separately.
     518             533 :                 let pg_control = ControlFileData::decode(&bytes[..])?;
     519             533 :                 let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
     520             533 :                 modification.put_checkpoint(checkpoint_bytes)?;
     521 UBC           0 :                 debug!("imported control file");
     522                 : 
     523                 :                 // Import it as ControlFile
     524 CBC         533 :                 modification.put_control_file(bytes)?;
     525             533 :                 return Ok(Some(pg_control));
     526                 :             }
     527           29907 :             "pg_filenode.map" => {
     528            3117 :                 let bytes = read_all_bytes(reader).await?;
     529             534 :                 modification
     530             534 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     531 UBC           0 :                     .await?;
     532               0 :                 debug!("imported relmap file")
     533                 :             }
     534 CBC       29373 :             "PG_VERSION" => {
     535 UBC           0 :                 debug!("ignored PG_VERSION file");
     536                 :             }
     537                 :             _ => {
     538 CBC       62846 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     539 UBC           0 :                 debug!("imported rel creation");
     540                 :             }
     541                 :         }
     542 CBC      476372 :     } else if file_path.starts_with("base") {
     543          471032 :         let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
     544          471032 :         let dbnode: u32 = file_path
     545          471032 :             .iter()
     546          471032 :             .nth(1)
     547          471032 :             .expect("invalid file path, expected dbnode")
     548          471032 :             .to_string_lossy()
     549          471032 :             .parse()?;
     550                 : 
     551          471032 :         match file_name.as_ref() {
     552          471032 :             "pg_filenode.map" => {
     553            9316 :                 let bytes = read_all_bytes(reader).await?;
     554            1602 :                 modification
     555            1602 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     556 UBC           0 :                     .await?;
     557               0 :                 debug!("imported relmap file")
     558                 :             }
     559 CBC      469430 :             "PG_VERSION" => {
     560 UBC           0 :                 debug!("ignored PG_VERSION file");
     561                 :             }
     562                 :             _ => {
     563 CBC     2032950 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     564 UBC           0 :                 debug!("imported rel creation");
     565                 :             }
     566                 :         }
     567 CBC        5340 :     } else if file_path.starts_with("pg_xact") {
     568             534 :         let slru = SlruKind::Clog;
     569             534 : 
     570            1041 :         import_slru(modification, slru, file_path, reader, len, ctx).await?;
     571 UBC           0 :         debug!("imported clog slru");
     572 CBC        4806 :     } else if file_path.starts_with("pg_multixact/offsets") {
     573             534 :         let slru = SlruKind::MultiXactOffsets;
     574             534 : 
     575            1041 :         import_slru(modification, slru, file_path, reader, len, ctx).await?;
     576 UBC           0 :         debug!("imported multixact offsets slru");
     577 CBC        4272 :     } else if file_path.starts_with("pg_multixact/members") {
     578             534 :         let slru = SlruKind::MultiXactMembers;
     579             534 : 
     580            1042 :         import_slru(modification, slru, file_path, reader, len, ctx).await?;
     581 UBC           0 :         debug!("imported multixact members slru");
     582 CBC        3738 :     } else if file_path.starts_with("pg_twophase") {
     583 UBC           0 :         let xid = u32::from_str_radix(file_name.as_ref(), 16)?;
     584                 : 
     585               0 :         let bytes = read_all_bytes(reader).await?;
     586               0 :         modification
     587               0 :             .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
     588               0 :             .await?;
     589               0 :         debug!("imported twophase file");
     590 CBC        3738 :     } else if file_path.starts_with("pg_wal") {
     591 UBC           0 :         debug!("found wal file in base section. ignore it");
     592 CBC        3731 :     } else if file_path.starts_with("zenith.signal") {
     593                 :         // Parse zenith signal file to set correct previous LSN
     594               7 :         let bytes = read_all_bytes(reader).await?;
     595                 :         // zenith.signal format is "PREV LSN: prev_lsn"
     596                 :         // TODO write serialization and deserialization in the same place.
     597               7 :         let zenith_signal = std::str::from_utf8(&bytes)?.trim();
     598               7 :         let prev_lsn = match zenith_signal {
     599               7 :             "PREV LSN: none" => Lsn(0),
     600               7 :             "PREV LSN: invalid" => Lsn(0),
     601               7 :             other => {
     602               7 :                 let split = other.split(':').collect::<Vec<_>>();
     603               7 :                 split[1]
     604               7 :                     .trim()
     605               7 :                     .parse::<Lsn>()
     606               7 :                     .context("can't parse zenith.signal")?
     607                 :             }
     608                 :         };
     609                 : 
     610                 :         // zenith.signal is not necessarily the last file, that we handle
     611                 :         // but it is ok to call `finish_write()`, because final `modification.commit()`
     612                 :         // will update lsn once more to the final one.
     613               7 :         let writer = modification.tline.writer().await;
     614               7 :         writer.finish_write(prev_lsn);
     615                 : 
     616 UBC           0 :         debug!("imported zenith signal {}", prev_lsn);
     617 CBC        3724 :     } else if file_path.starts_with("pg_tblspc") {
     618                 :         // TODO Backups exported from neon won't have pg_tblspc, but we will need
     619                 :         // this to import arbitrary postgres databases.
     620 UBC           0 :         bail!("Importing pg_tblspc is not implemented");
     621                 :     } else {
     622               0 :         debug!(
     623               0 :             "ignoring unrecognized file \"{}\" in tar archive",
     624               0 :             file_path.display()
     625               0 :         );
     626                 :     }
     627                 : 
     628 CBC      506279 :     Ok(None)
     629          506812 : }
     630                 : 
     631            2678 : async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
     632            2678 :     let mut buf: Vec<u8> = vec![];
     633           21409 :     reader.read_to_end(&mut buf).await?;
     634            2678 :     Ok(Bytes::from(buf))
     635            2678 : }
     636                 : 
     637             522 : pub async fn create_tar_zst(pgdata_path: &Utf8Path, tmp_path: &Utf8Path) -> Result<(File, u64)> {
     638             522 :     let file = OpenOptions::new()
     639             522 :         .create(true)
     640             522 :         .truncate(true)
     641             522 :         .read(true)
     642             522 :         .write(true)
     643             522 :         .open(&tmp_path)
     644             938 :         .await
     645             522 :         .with_context(|| format!("tempfile creation {tmp_path}"))?;
     646                 : 
     647             522 :     let mut paths = Vec::new();
     648          509472 :     for entry in WalkDir::new(pgdata_path) {
     649          509472 :         let entry = entry?;
     650          509472 :         let metadata = entry.metadata().expect("error getting dir entry metadata");
     651          509472 :         // Also allow directories so that we also get empty directories
     652          509472 :         if !(metadata.is_file() || metadata.is_dir()) {
     653 UBC           0 :             continue;
     654 CBC      509472 :         }
     655          509472 :         let path = entry.into_path();
     656          509472 :         paths.push(path);
     657                 :     }
     658                 :     // Do a sort to get a more consistent listing
     659             522 :     paths.sort_unstable();
     660             522 :     let zstd = ZstdEncoder::with_quality_and_params(
     661             522 :         file,
     662             522 :         Level::Default,
     663             522 :         &[CParameter::enable_long_distance_matching(true)],
     664             522 :     );
     665             522 :     let mut builder = Builder::new(zstd);
     666             522 :     // Use reproducible header mode
     667             522 :     builder.mode(HeaderMode::Deterministic);
     668          509994 :     for path in paths {
     669          509472 :         let rel_path = path.strip_prefix(pgdata_path)?;
     670          509472 :         if rel_path.is_empty() {
     671                 :             // The top directory should not be compressed,
     672                 :             // the tar crate doesn't like that
     673             522 :             continue;
     674          508950 :         }
     675         4267999 :         builder.append_path_with_name(&path, rel_path).await?;
     676                 :     }
     677             522 :     let mut zstd = builder.into_inner().await?;
     678             522 :     zstd.shutdown().await?;
     679             522 :     let mut compressed = zstd.into_inner();
     680             522 :     let compressed_len = compressed.metadata().await?.len();
     681             522 :     const INITDB_TAR_ZST_WARN_LIMIT: u64 = 2 * 1024 * 1024;
     682             522 :     if compressed_len > INITDB_TAR_ZST_WARN_LIMIT {
     683 UBC           0 :         warn!("compressed {INITDB_PATH} size of {compressed_len} is above limit {INITDB_TAR_ZST_WARN_LIMIT}.");
     684 CBC         522 :     }
     685             522 :     compressed.seek(SeekFrom::Start(0)).await?;
     686             522 :     Ok((compressed, compressed_len))
     687             522 : }
     688                 : 
     689               3 : pub async fn extract_tar_zst(
     690               3 :     pgdata_path: &Utf8Path,
     691               3 :     tar_zst: impl AsyncBufRead + Unpin,
     692               3 : ) -> Result<()> {
     693               3 :     let tar = Box::pin(ZstdDecoder::new(tar_zst));
     694               3 :     let mut archive = Archive::new(tar);
     695           16100 :     archive.unpack(pgdata_path).await?;
     696               3 :     Ok(())
     697               3 : }
        

Generated by: LCOV version 2.1-beta