LCOV - code coverage report
Current view: top level - pageserver/src - import_datadir.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 82.6 % 408 337
Test Date: 2023-09-06 10:18:01 Functions: 27.8 % 97 27

            Line data    Source code
       1              : //!
       2              : //! Import data and WAL from a PostgreSQL data directory and WAL segments into
       3              : //! a neon Timeline.
       4              : //!
       5              : use std::path::{Path, PathBuf};
       6              : 
       7              : use anyhow::{bail, ensure, Context, Result};
       8              : use bytes::Bytes;
       9              : use futures::StreamExt;
      10              : use tokio::io::{AsyncRead, AsyncReadExt};
      11              : use tokio_tar::Archive;
      12              : use tracing::*;
      13              : use walkdir::WalkDir;
      14              : 
      15              : use crate::context::RequestContext;
      16              : use crate::pgdatadir_mapping::*;
      17              : use crate::tenant::Timeline;
      18              : use crate::walingest::WalIngest;
      19              : use crate::walrecord::DecodedWALRecord;
      20              : use pageserver_api::reltag::{RelTag, SlruKind};
      21              : use postgres_ffi::pg_constants;
      22              : use postgres_ffi::relfile_utils::*;
      23              : use postgres_ffi::waldecoder::WalStreamDecoder;
      24              : use postgres_ffi::ControlFileData;
      25              : use postgres_ffi::DBState_DB_SHUTDOWNED;
      26              : use postgres_ffi::Oid;
      27              : use postgres_ffi::XLogFileName;
      28              : use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
      29              : use utils::lsn::Lsn;
      30              : 
      31              : // Returns checkpoint LSN from controlfile
      32          644 : pub fn get_lsn_from_controlfile(path: &Path) -> Result<Lsn> {
      33          644 :     // Read control file to extract the LSN
      34          644 :     let controlfile_path = path.join("global").join("pg_control");
      35          644 :     let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?;
      36          644 :     let lsn = controlfile.checkPoint;
      37          644 : 
      38          644 :     Ok(Lsn(lsn))
      39          644 : }
      40              : 
      41              : ///
      42              : /// Import all relation data pages from local disk into the repository.
      43              : ///
      44              : /// This is currently only used to import a cluster freshly created by initdb.
      45              : /// The code that deals with the checkpoint would not work right if the
      46              : /// cluster was not shut down cleanly.
      47          643 : pub async fn import_timeline_from_postgres_datadir(
      48          643 :     tline: &Timeline,
      49          643 :     pgdata_path: &Path,
      50          643 :     pgdata_lsn: Lsn,
      51          643 :     ctx: &RequestContext,
      52          643 : ) -> Result<()> {
      53          643 :     let mut pg_control: Option<ControlFileData> = None;
      54          643 : 
      55          643 :     // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
      56          643 :     // Then fishing out pg_control would be unnecessary
      57          643 :     let mut modification = tline.begin_modification(pgdata_lsn);
      58          643 :     modification.init_empty()?;
      59              : 
      60              :     // Import all but pg_wal
      61          643 :     let all_but_wal = WalkDir::new(pgdata_path)
      62          643 :         .into_iter()
      63       626282 :         .filter_entry(|entry| !entry.path().ends_with("pg_wal"));
      64       626282 :     for entry in all_but_wal {
      65       625639 :         let entry = entry?;
      66       625639 :         let metadata = entry.metadata().expect("error getting dir entry metadata");
      67       625639 :         if metadata.is_file() {
      68       610207 :             let absolute_path = entry.path();
      69       610207 :             let relative_path = absolute_path.strip_prefix(pgdata_path)?;
      70              : 
      71       610207 :             let mut file = tokio::fs::File::open(absolute_path).await?;
      72       610207 :             let len = metadata.len() as usize;
      73          643 :             if let Some(control_file) =
      74      2570147 :                 import_file(&mut modification, relative_path, &mut file, len, ctx).await?
      75          643 :             {
      76          643 :                 pg_control = Some(control_file);
      77       609564 :             }
      78       610207 :             modification.flush().await?;
      79        15432 :         }
      80              :     }
      81              : 
      82              :     // We're done importing all the data files.
      83        40509 :     modification.commit().await?;
      84              : 
      85              :     // We expect the Postgres server to be shut down cleanly.
      86          643 :     let pg_control = pg_control.context("pg_control file not found")?;
      87          643 :     ensure!(
      88          643 :         pg_control.state == DBState_DB_SHUTDOWNED,
      89            0 :         "Postgres cluster was not shut down cleanly"
      90              :     );
      91          643 :     ensure!(
      92          643 :         pg_control.checkPointCopy.redo == pgdata_lsn.0,
      93            0 :         "unexpected checkpoint REDO pointer"
      94              :     );
      95              : 
      96              :     // Import WAL. This is needed even when starting from a shutdown checkpoint, because
      97              :     // this reads the checkpoint record itself, advancing the tip of the timeline to
      98              :     // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
      99          643 :     import_wal(
     100          643 :         &pgdata_path.join("pg_wal"),
     101          643 :         tline,
     102          643 :         Lsn(pg_control.checkPointCopy.redo),
     103          643 :         pgdata_lsn,
     104          643 :         ctx,
     105          643 :     )
     106            0 :     .await?;
     107              : 
     108          643 :     Ok(())
     109          643 : }
     110              : 
     111              : // subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
     112       602369 : async fn import_rel(
     113       602369 :     modification: &mut DatadirModification<'_>,
     114       602369 :     path: &Path,
     115       602369 :     spcoid: Oid,
     116       602369 :     dboid: Oid,
     117       602369 :     reader: &mut (impl AsyncRead + Unpin),
     118       602369 :     len: usize,
     119       602369 :     ctx: &RequestContext,
     120       602369 : ) -> anyhow::Result<()> {
     121              :     // Does it look like a relation file?
     122            0 :     trace!("importing rel file {}", path.display());
     123              : 
     124       602369 :     let filename = &path
     125       602369 :         .file_name()
     126       602369 :         .expect("missing rel filename")
     127       602369 :         .to_string_lossy();
     128       602369 :     let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
     129            0 :         warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
     130            0 :         e
     131       602369 :     })?;
     132              : 
     133       602369 :     let mut buf: [u8; 8192] = [0u8; 8192];
     134       602369 : 
     135       602369 :     ensure!(len % BLCKSZ as usize == 0);
     136       602369 :     let nblocks = len / BLCKSZ as usize;
     137       602369 : 
     138       602369 :     let rel = RelTag {
     139       602369 :         spcnode: spcoid,
     140       602369 :         dbnode: dboid,
     141       602369 :         relnode,
     142       602369 :         forknum,
     143       602369 :     };
     144       602369 : 
     145       602369 :     let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     146              : 
     147              :     // Call put_rel_creation for every segment of the relation,
     148              :     // because there is no guarantee about the order in which we are processing segments.
     149              :     // ignore "relation already exists" error
     150              :     //
     151              :     // FIXME: Keep track of which relations we've already created?
     152              :     // https://github.com/neondatabase/neon/issues/3309
     153       602369 :     if let Err(e) = modification
     154       602369 :         .put_rel_creation(rel, nblocks as u32, ctx)
     155            0 :         .await
     156              :     {
     157            0 :         match e {
     158              :             RelationError::AlreadyExists => {
     159            0 :                 debug!("Relation {} already exist. We must be extending it.", rel)
     160              :             }
     161            0 :             _ => return Err(e.into()),
     162              :         }
     163       602369 :     }
     164              : 
     165              :     loop {
     166      2613014 :         let r = reader.read_exact(&mut buf).await;
     167      2613014 :         match r {
     168              :             Ok(_) => {
     169      2010645 :                 modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
     170              :             }
     171              : 
     172              :             // TODO: UnexpectedEof is expected
     173       602369 :             Err(err) => match err.kind() {
     174              :                 std::io::ErrorKind::UnexpectedEof => {
     175              :                     // reached EOF. That's expected.
     176       602369 :                     let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
     177       602369 :                     ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
     178       602369 :                     break;
     179              :                 }
     180              :                 _ => {
     181            0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     182              :                 }
     183              :             },
     184              :         };
     185      2010645 :         blknum += 1;
     186              :     }
     187              : 
     188              :     // Update relation size
     189              :     //
     190              :     // If we process rel segments out of order,
     191              :     // put_rel_extend will skip the update.
     192       602369 :     modification.put_rel_extend(rel, blknum, ctx).await?;
     193              : 
     194       602369 :     Ok(())
     195       602369 : }
     196              : 
     197              : /// Import an SLRU segment file
     198              : ///
     199         1941 : async fn import_slru(
     200         1941 :     modification: &mut DatadirModification<'_>,
     201         1941 :     slru: SlruKind,
     202         1941 :     path: &Path,
     203         1941 :     reader: &mut (impl AsyncRead + Unpin),
     204         1941 :     len: usize,
     205         1941 :     ctx: &RequestContext,
     206         1941 : ) -> anyhow::Result<()> {
     207         1941 :     info!("importing slru file {path:?}");
     208              : 
     209         1941 :     let mut buf: [u8; 8192] = [0u8; 8192];
     210         1941 :     let filename = &path
     211         1941 :         .file_name()
     212         1941 :         .with_context(|| format!("missing slru filename for path {path:?}"))?
     213         1941 :         .to_string_lossy();
     214         1941 :     let segno = u32::from_str_radix(filename, 16)?;
     215              : 
     216         1941 :     ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
     217         1941 :     let nblocks = len / BLCKSZ as usize;
     218         1941 : 
     219         1941 :     ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
     220              : 
     221         1941 :     modification
     222         1941 :         .put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
     223            0 :         .await?;
     224              : 
     225         1941 :     let mut rpageno = 0;
     226              :     loop {
     227         3882 :         let r = reader.read_exact(&mut buf).await;
     228         3882 :         match r {
     229              :             Ok(_) => {
     230         1941 :                 modification.put_slru_page_image(
     231         1941 :                     slru,
     232         1941 :                     segno,
     233         1941 :                     rpageno,
     234         1941 :                     Bytes::copy_from_slice(&buf),
     235         1941 :                 )?;
     236              :             }
     237              : 
     238              :             // TODO: UnexpectedEof is expected
     239         1941 :             Err(err) => match err.kind() {
     240              :                 std::io::ErrorKind::UnexpectedEof => {
     241              :                     // reached EOF. That's expected.
     242         1941 :                     ensure!(rpageno == nblocks as u32, "unexpected EOF");
     243         1941 :                     break;
     244              :                 }
     245              :                 _ => {
     246            0 :                     bail!("error reading file {}: {:#}", path.display(), err);
     247              :                 }
     248              :             },
     249              :         };
     250         1941 :         rpageno += 1;
     251              :     }
     252              : 
     253         1941 :     Ok(())
     254         1941 : }
     255              : 
     256              : /// Scan PostgreSQL WAL files in given directory and load all records between
     257              : /// 'startpoint' and 'endpoint' into the repository.
     258          643 : async fn import_wal(
     259          643 :     walpath: &Path,
     260          643 :     tline: &Timeline,
     261          643 :     startpoint: Lsn,
     262          643 :     endpoint: Lsn,
     263          643 :     ctx: &RequestContext,
     264          643 : ) -> anyhow::Result<()> {
     265          643 :     let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
     266          643 : 
     267          643 :     let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
     268          643 :     let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
     269          643 :     let mut last_lsn = startpoint;
     270              : 
     271          643 :     let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
     272              : 
     273         1286 :     while last_lsn <= endpoint {
     274              :         // FIXME: assume postgresql tli 1 for now
     275          643 :         let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     276          643 :         let mut buf = Vec::new();
     277          643 : 
     278          643 :         // Read local file
     279          643 :         let mut path = walpath.join(&filename);
     280          643 : 
     281          643 :         // It could be as .partial
     282          643 :         if !PathBuf::from(&path).exists() {
     283            0 :             path = walpath.join(filename + ".partial");
     284          643 :         }
     285              : 
     286              :         // Slurp the WAL file
     287          643 :         let mut file = std::fs::File::open(&path)?;
     288              : 
     289          643 :         if offset > 0 {
     290              :             use std::io::Seek;
     291          643 :             file.seek(std::io::SeekFrom::Start(offset as u64))?;
     292            0 :         }
     293              : 
     294              :         use std::io::Read;
     295          643 :         let nread = file.read_to_end(&mut buf)?;
     296          643 :         if nread != WAL_SEGMENT_SIZE - offset {
     297              :             // Maybe allow this for .partial files?
     298            0 :             error!("read only {} bytes from WAL file", nread);
     299          643 :         }
     300              : 
     301          643 :         waldecoder.feed_bytes(&buf);
     302          643 : 
     303          643 :         let mut nrecords = 0;
     304          643 :         let mut modification = tline.begin_modification(endpoint);
     305          643 :         let mut decoded = DecodedWALRecord::default();
     306         1286 :         while last_lsn <= endpoint {
     307          643 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     308          643 :                 walingest
     309          643 :                     .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
     310            0 :                     .await?;
     311          643 :                 last_lsn = lsn;
     312          643 : 
     313          643 :                 nrecords += 1;
     314              : 
     315            0 :                 trace!("imported record at {} (end {})", lsn, endpoint);
     316            0 :             }
     317              :         }
     318              : 
     319            0 :         debug!("imported {} records up to {}", nrecords, last_lsn);
     320              : 
     321          643 :         segno += 1;
     322          643 :         offset = 0;
     323              :     }
     324              : 
     325          643 :     if last_lsn != startpoint {
     326          643 :         info!("reached end of WAL at {}", last_lsn);
     327              :     } else {
     328            0 :         info!("no WAL to import at {}", last_lsn);
     329              :     }
     330              : 
     331          643 :     Ok(())
     332          643 : }
     333              : 
     334            5 : pub async fn import_basebackup_from_tar(
     335            5 :     tline: &Timeline,
     336            5 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     337            5 :     base_lsn: Lsn,
     338            5 :     ctx: &RequestContext,
     339            5 : ) -> Result<()> {
     340            5 :     info!("importing base at {base_lsn}");
     341            5 :     let mut modification = tline.begin_modification(base_lsn);
     342            5 :     modification.init_empty()?;
     343              : 
     344            5 :     let mut pg_control: Option<ControlFileData> = None;
     345              : 
     346              :     // Import base
     347            5 :     let mut entries = Archive::new(reader).entries()?;
     348         3917 :     while let Some(base_tar_entry) = entries.next().await {
     349         3912 :         let mut entry = base_tar_entry?;
     350         3912 :         let header = entry.header();
     351         3912 :         let len = header.entry_size()? as usize;
     352         3912 :         let file_path = header.path()?.into_owned();
     353         3912 : 
     354         3912 :         match header.entry_type() {
     355              :             tokio_tar::EntryType::Regular => {
     356            3 :                 if let Some(res) =
     357         3812 :                     import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
     358            3 :                 {
     359            3 :                     // We found the pg_control file.
     360            3 :                     pg_control = Some(res);
     361         3809 :                 }
     362         3812 :                 modification.flush().await?;
     363              :             }
     364              :             tokio_tar::EntryType::Directory => {
     365            0 :                 debug!("directory {:?}", file_path);
     366              :             }
     367              :             _ => {
     368            0 :                 bail!(
     369            0 :                     "entry {} in backup tar archive is of unexpected type: {:?}",
     370            0 :                     file_path.display(),
     371            0 :                     header.entry_type()
     372            0 :                 );
     373              :             }
     374              :         }
     375              :     }
     376              : 
     377              :     // sanity check: ensure that pg_control is loaded
     378            5 :     let _pg_control = pg_control.context("pg_control file not found")?;
     379              : 
     380          268 :     modification.commit().await?;
     381            3 :     Ok(())
     382            5 : }
     383              : 
     384            2 : pub async fn import_wal_from_tar(
     385            2 :     tline: &Timeline,
     386            2 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     387            2 :     start_lsn: Lsn,
     388            2 :     end_lsn: Lsn,
     389            2 :     ctx: &RequestContext,
     390            2 : ) -> Result<()> {
     391            2 :     // Set up walingest mutable state
     392            2 :     let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
     393            2 :     let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
     394            2 :     let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
     395            2 :     let mut last_lsn = start_lsn;
     396            2 :     let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
     397              : 
     398              :     // Ingest wal until end_lsn
     399            2 :     info!("importing wal until {}", end_lsn);
     400            2 :     let mut pg_wal_tar = Archive::new(reader);
     401            2 :     let mut pg_wal_entries = pg_wal_tar.entries()?;
     402            4 :     while last_lsn <= end_lsn {
     403            2 :         let bytes = {
     404            2 :             let mut entry = pg_wal_entries
     405            2 :                 .next()
     406            2 :                 .await
     407            2 :                 .ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
     408            2 :             let header = entry.header();
     409            2 :             let file_path = header.path()?.into_owned();
     410            2 : 
     411            2 :             match header.entry_type() {
     412              :                 tokio_tar::EntryType::Regular => {
     413              :                     // FIXME: assume postgresql tli 1 for now
     414            2 :                     let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
     415            2 :                     let file_name = file_path
     416            2 :                         .file_name()
     417            2 :                         .expect("missing wal filename")
     418            2 :                         .to_string_lossy();
     419            2 :                     ensure!(expected_filename == file_name);
     420              : 
     421            0 :                     debug!("processing wal file {:?}", file_path);
     422          120 :                     read_all_bytes(&mut entry).await?
     423              :                 }
     424              :                 tokio_tar::EntryType::Directory => {
     425            0 :                     debug!("directory {:?}", file_path);
     426            0 :                     continue;
     427              :                 }
     428              :                 _ => {
     429            0 :                     bail!(
     430            0 :                         "entry {} in WAL tar archive is of unexpected type: {:?}",
     431            0 :                         file_path.display(),
     432            0 :                         header.entry_type()
     433            0 :                     );
     434              :                 }
     435              :             }
     436              :         };
     437              : 
     438            2 :         waldecoder.feed_bytes(&bytes[offset..]);
     439            2 : 
     440            2 :         let mut modification = tline.begin_modification(end_lsn);
     441            2 :         let mut decoded = DecodedWALRecord::default();
     442           10 :         while last_lsn <= end_lsn {
     443            8 :             if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
     444            8 :                 walingest
     445            8 :                     .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
     446            0 :                     .await?;
     447            8 :                 last_lsn = lsn;
     448              : 
     449            0 :                 debug!("imported record at {} (end {})", lsn, end_lsn);
     450            0 :             }
     451              :         }
     452              : 
     453            0 :         debug!("imported records up to {}", last_lsn);
     454            2 :         segno += 1;
     455            2 :         offset = 0;
     456              :     }
     457              : 
     458            2 :     if last_lsn != start_lsn {
     459            2 :         info!("reached end of WAL at {}", last_lsn);
     460              :     } else {
     461            0 :         info!("there was no WAL to import at {}", last_lsn);
     462              :     }
     463              : 
     464              :     // Log any extra unused files
     465            2 :     while let Some(e) = pg_wal_entries.next().await {
     466            0 :         let entry = e?;
     467            0 :         let header = entry.header();
     468            0 :         let file_path = header.path()?.into_owned();
     469            0 :         info!("skipping {:?}", file_path);
     470              :     }
     471              : 
     472            2 :     Ok(())
     473            2 : }
     474              : 
     475       614019 : async fn import_file(
     476       614019 :     modification: &mut DatadirModification<'_>,
     477       614019 :     file_path: &Path,
     478       614019 :     reader: &mut (impl AsyncRead + Send + Sync + Unpin),
     479       614019 :     len: usize,
     480       614019 :     ctx: &RequestContext,
     481       614019 : ) -> Result<Option<ControlFileData>> {
     482       614019 :     let file_name = match file_path.file_name() {
     483       614019 :         Some(name) => name.to_string_lossy(),
     484            0 :         None => return Ok(None),
     485              :     };
     486              : 
     487       614019 :     if file_name.starts_with('.') {
     488              :         // tar archives on macOs, created without COPYFILE_DISABLE=1 env var
     489              :         // will contain "fork files", skip them.
     490            0 :         return Ok(None);
     491       614019 :     }
     492       614019 : 
     493       614019 :     if file_path.starts_with("global") {
     494        36878 :         let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
     495        36878 :         let dbnode = 0;
     496        36878 : 
     497        36878 :         match file_name.as_ref() {
     498        36878 :             "pg_control" => {
     499         6425 :                 let bytes = read_all_bytes(reader).await?;
     500              : 
     501              :                 // Extract the checkpoint record and import it separately.
     502          646 :                 let pg_control = ControlFileData::decode(&bytes[..])?;
     503          646 :                 let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
     504          646 :                 modification.put_checkpoint(checkpoint_bytes)?;
     505            0 :                 debug!("imported control file");
     506              : 
     507              :                 // Import it as ControlFile
     508          646 :                 modification.put_control_file(bytes)?;
     509          646 :                 return Ok(Some(pg_control));
     510              :             }
     511        36232 :             "pg_filenode.map" => {
     512         3858 :                 let bytes = read_all_bytes(reader).await?;
     513          647 :                 modification
     514          647 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     515            0 :                     .await?;
     516            0 :                 debug!("imported relmap file")
     517              :             }
     518        35585 :             "PG_VERSION" => {
     519            0 :                 debug!("ignored PG_VERSION file");
     520              :             }
     521              :             _ => {
     522        77732 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     523            0 :                 debug!("imported rel creation");
     524              :             }
     525              :         }
     526       577141 :     } else if file_path.starts_with("base") {
     527       570666 :         let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
     528       570666 :         let dbnode: u32 = file_path
     529       570666 :             .iter()
     530       570666 :             .nth(1)
     531       570666 :             .expect("invalid file path, expected dbnode")
     532       570666 :             .to_string_lossy()
     533       570666 :             .parse()?;
     534              : 
     535       570666 :         match file_name.as_ref() {
     536       570666 :             "pg_filenode.map" => {
     537        11377 :                 let bytes = read_all_bytes(reader).await?;
     538         1941 :                 modification
     539         1941 :                     .put_relmap_file(spcnode, dbnode, bytes, ctx)
     540            0 :                     .await?;
     541            0 :                 debug!("imported relmap file")
     542              :             }
     543       568725 :             "PG_VERSION" => {
     544            0 :                 debug!("ignored PG_VERSION file");
     545              :             }
     546              :             _ => {
     547      2467180 :                 import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
     548            0 :                 debug!("imported rel creation");
     549              :             }
     550              :         }
     551         6475 :     } else if file_path.starts_with("pg_xact") {
     552          647 :         let slru = SlruKind::Clog;
     553          647 : 
     554         1286 :         import_slru(modification, slru, file_path, reader, len, ctx).await?;
     555            0 :         debug!("imported clog slru");
     556         5828 :     } else if file_path.starts_with("pg_multixact/offsets") {
     557          647 :         let slru = SlruKind::MultiXactOffsets;
     558          647 : 
     559         1269 :         import_slru(modification, slru, file_path, reader, len, ctx).await?;
     560            0 :         debug!("imported multixact offsets slru");
     561         5181 :     } else if file_path.starts_with("pg_multixact/members") {
     562          647 :         let slru = SlruKind::MultiXactMembers;
     563          647 : 
     564         1263 :         import_slru(modification, slru, file_path, reader, len, ctx).await?;
     565            0 :         debug!("imported multixact members slru");
     566         4534 :     } else if file_path.starts_with("pg_twophase") {
     567            0 :         let xid = u32::from_str_radix(file_name.as_ref(), 16)?;
     568              : 
     569            0 :         let bytes = read_all_bytes(reader).await?;
     570            0 :         modification
     571            0 :             .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
     572            0 :             .await?;
     573            0 :         debug!("imported twophase file");
     574         4534 :     } else if file_path.starts_with("pg_wal") {
     575            0 :         debug!("found wal file in base section. ignore it");
     576         4533 :     } else if file_path.starts_with("zenith.signal") {
     577              :         // Parse zenith signal file to set correct previous LSN
     578            1 :         let bytes = read_all_bytes(reader).await?;
     579              :         // zenith.signal format is "PREV LSN: prev_lsn"
     580              :         // TODO write serialization and deserialization in the same place.
     581            1 :         let zenith_signal = std::str::from_utf8(&bytes)?.trim();
     582            1 :         let prev_lsn = match zenith_signal {
     583            1 :             "PREV LSN: none" => Lsn(0),
     584            1 :             "PREV LSN: invalid" => Lsn(0),
     585            1 :             other => {
     586            1 :                 let split = other.split(':').collect::<Vec<_>>();
     587            1 :                 split[1]
     588            1 :                     .trim()
     589            1 :                     .parse::<Lsn>()
     590            1 :                     .context("can't parse zenith.signal")?
     591              :             }
     592              :         };
     593              : 
     594              :         // zenith.signal is not necessarily the last file, that we handle
     595              :         // but it is ok to call `finish_write()`, because final `modification.commit()`
     596              :         // will update lsn once more to the final one.
     597            1 :         let writer = modification.tline.writer().await;
     598            1 :         writer.finish_write(prev_lsn);
     599              : 
     600            0 :         debug!("imported zenith signal {}", prev_lsn);
     601         4532 :     } else if file_path.starts_with("pg_tblspc") {
     602              :         // TODO Backups exported from neon won't have pg_tblspc, but we will need
     603              :         // this to import arbitrary postgres databases.
     604            0 :         bail!("Importing pg_tblspc is not implemented");
     605              :     } else {
     606            0 :         debug!(
     607            0 :             "ignoring unrecognized file \"{}\" in tar archive",
     608            0 :             file_path.display()
     609            0 :         );
     610              :     }
     611              : 
     612       613373 :     Ok(None)
     613       614019 : }
     614              : 
     615         3237 : async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
     616         3237 :     let mut buf: Vec<u8> = vec![];
     617        21780 :     reader.read_to_end(&mut buf).await?;
     618         3237 :     Ok(Bytes::from(buf))
     619         3237 : }
        

Generated by: LCOV version 2.1-beta