Line data Source code
1 : //!
2 : //! Import data and WAL from a PostgreSQL data directory and WAL segments into
3 : //! a neon Timeline.
4 : //!
5 : use std::path::{Path, PathBuf};
6 :
7 : use anyhow::{bail, ensure, Context, Result};
8 : use bytes::Bytes;
9 : use futures::StreamExt;
10 : use tokio::io::{AsyncRead, AsyncReadExt};
11 : use tokio_tar::Archive;
12 : use tracing::*;
13 : use walkdir::WalkDir;
14 :
15 : use crate::context::RequestContext;
16 : use crate::pgdatadir_mapping::*;
17 : use crate::tenant::Timeline;
18 : use crate::walingest::WalIngest;
19 : use crate::walrecord::DecodedWALRecord;
20 : use pageserver_api::reltag::{RelTag, SlruKind};
21 : use postgres_ffi::pg_constants;
22 : use postgres_ffi::relfile_utils::*;
23 : use postgres_ffi::waldecoder::WalStreamDecoder;
24 : use postgres_ffi::ControlFileData;
25 : use postgres_ffi::DBState_DB_SHUTDOWNED;
26 : use postgres_ffi::Oid;
27 : use postgres_ffi::XLogFileName;
28 : use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
29 : use utils::lsn::Lsn;
30 :
31 : // Returns checkpoint LSN from controlfile
32 644 : pub fn get_lsn_from_controlfile(path: &Path) -> Result<Lsn> {
33 644 : // Read control file to extract the LSN
34 644 : let controlfile_path = path.join("global").join("pg_control");
35 644 : let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?;
36 644 : let lsn = controlfile.checkPoint;
37 644 :
38 644 : Ok(Lsn(lsn))
39 644 : }
40 :
41 : ///
42 : /// Import all relation data pages from local disk into the repository.
43 : ///
44 : /// This is currently only used to import a cluster freshly created by initdb.
45 : /// The code that deals with the checkpoint would not work right if the
46 : /// cluster was not shut down cleanly.
47 643 : pub async fn import_timeline_from_postgres_datadir(
48 643 : tline: &Timeline,
49 643 : pgdata_path: &Path,
50 643 : pgdata_lsn: Lsn,
51 643 : ctx: &RequestContext,
52 643 : ) -> Result<()> {
53 643 : let mut pg_control: Option<ControlFileData> = None;
54 643 :
55 643 : // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
56 643 : // Then fishing out pg_control would be unnecessary
57 643 : let mut modification = tline.begin_modification(pgdata_lsn);
58 643 : modification.init_empty()?;
59 :
60 : // Import all but pg_wal
61 643 : let all_but_wal = WalkDir::new(pgdata_path)
62 643 : .into_iter()
63 626282 : .filter_entry(|entry| !entry.path().ends_with("pg_wal"));
64 626282 : for entry in all_but_wal {
65 625639 : let entry = entry?;
66 625639 : let metadata = entry.metadata().expect("error getting dir entry metadata");
67 625639 : if metadata.is_file() {
68 610207 : let absolute_path = entry.path();
69 610207 : let relative_path = absolute_path.strip_prefix(pgdata_path)?;
70 :
71 610207 : let mut file = tokio::fs::File::open(absolute_path).await?;
72 610207 : let len = metadata.len() as usize;
73 643 : if let Some(control_file) =
74 2570147 : import_file(&mut modification, relative_path, &mut file, len, ctx).await?
75 643 : {
76 643 : pg_control = Some(control_file);
77 609564 : }
78 610207 : modification.flush().await?;
79 15432 : }
80 : }
81 :
82 : // We're done importing all the data files.
83 40509 : modification.commit().await?;
84 :
85 : // We expect the Postgres server to be shut down cleanly.
86 643 : let pg_control = pg_control.context("pg_control file not found")?;
87 643 : ensure!(
88 643 : pg_control.state == DBState_DB_SHUTDOWNED,
89 0 : "Postgres cluster was not shut down cleanly"
90 : );
91 643 : ensure!(
92 643 : pg_control.checkPointCopy.redo == pgdata_lsn.0,
93 0 : "unexpected checkpoint REDO pointer"
94 : );
95 :
96 : // Import WAL. This is needed even when starting from a shutdown checkpoint, because
97 : // this reads the checkpoint record itself, advancing the tip of the timeline to
98 : // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
99 643 : import_wal(
100 643 : &pgdata_path.join("pg_wal"),
101 643 : tline,
102 643 : Lsn(pg_control.checkPointCopy.redo),
103 643 : pgdata_lsn,
104 643 : ctx,
105 643 : )
106 0 : .await?;
107 :
108 643 : Ok(())
109 643 : }
110 :
111 : // subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
112 602369 : async fn import_rel(
113 602369 : modification: &mut DatadirModification<'_>,
114 602369 : path: &Path,
115 602369 : spcoid: Oid,
116 602369 : dboid: Oid,
117 602369 : reader: &mut (impl AsyncRead + Unpin),
118 602369 : len: usize,
119 602369 : ctx: &RequestContext,
120 602369 : ) -> anyhow::Result<()> {
121 : // Does it look like a relation file?
122 0 : trace!("importing rel file {}", path.display());
123 :
124 602369 : let filename = &path
125 602369 : .file_name()
126 602369 : .expect("missing rel filename")
127 602369 : .to_string_lossy();
128 602369 : let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
129 0 : warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
130 0 : e
131 602369 : })?;
132 :
133 602369 : let mut buf: [u8; 8192] = [0u8; 8192];
134 602369 :
135 602369 : ensure!(len % BLCKSZ as usize == 0);
136 602369 : let nblocks = len / BLCKSZ as usize;
137 602369 :
138 602369 : let rel = RelTag {
139 602369 : spcnode: spcoid,
140 602369 : dbnode: dboid,
141 602369 : relnode,
142 602369 : forknum,
143 602369 : };
144 602369 :
145 602369 : let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
146 :
147 : // Call put_rel_creation for every segment of the relation,
148 : // because there is no guarantee about the order in which we are processing segments.
149 : // ignore "relation already exists" error
150 : //
151 : // FIXME: Keep track of which relations we've already created?
152 : // https://github.com/neondatabase/neon/issues/3309
153 602369 : if let Err(e) = modification
154 602369 : .put_rel_creation(rel, nblocks as u32, ctx)
155 0 : .await
156 : {
157 0 : match e {
158 : RelationError::AlreadyExists => {
159 0 : debug!("Relation {} already exist. We must be extending it.", rel)
160 : }
161 0 : _ => return Err(e.into()),
162 : }
163 602369 : }
164 :
165 : loop {
166 2613014 : let r = reader.read_exact(&mut buf).await;
167 2613014 : match r {
168 : Ok(_) => {
169 2010645 : modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
170 : }
171 :
172 : // TODO: UnexpectedEof is expected
173 602369 : Err(err) => match err.kind() {
174 : std::io::ErrorKind::UnexpectedEof => {
175 : // reached EOF. That's expected.
176 602369 : let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
177 602369 : ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
178 602369 : break;
179 : }
180 : _ => {
181 0 : bail!("error reading file {}: {:#}", path.display(), err);
182 : }
183 : },
184 : };
185 2010645 : blknum += 1;
186 : }
187 :
188 : // Update relation size
189 : //
190 : // If we process rel segments out of order,
191 : // put_rel_extend will skip the update.
192 602369 : modification.put_rel_extend(rel, blknum, ctx).await?;
193 :
194 602369 : Ok(())
195 602369 : }
196 :
197 : /// Import an SLRU segment file
198 : ///
199 1941 : async fn import_slru(
200 1941 : modification: &mut DatadirModification<'_>,
201 1941 : slru: SlruKind,
202 1941 : path: &Path,
203 1941 : reader: &mut (impl AsyncRead + Unpin),
204 1941 : len: usize,
205 1941 : ctx: &RequestContext,
206 1941 : ) -> anyhow::Result<()> {
207 1941 : info!("importing slru file {path:?}");
208 :
209 1941 : let mut buf: [u8; 8192] = [0u8; 8192];
210 1941 : let filename = &path
211 1941 : .file_name()
212 1941 : .with_context(|| format!("missing slru filename for path {path:?}"))?
213 1941 : .to_string_lossy();
214 1941 : let segno = u32::from_str_radix(filename, 16)?;
215 :
216 1941 : ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
217 1941 : let nblocks = len / BLCKSZ as usize;
218 1941 :
219 1941 : ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
220 :
221 1941 : modification
222 1941 : .put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
223 0 : .await?;
224 :
225 1941 : let mut rpageno = 0;
226 : loop {
227 3882 : let r = reader.read_exact(&mut buf).await;
228 3882 : match r {
229 : Ok(_) => {
230 1941 : modification.put_slru_page_image(
231 1941 : slru,
232 1941 : segno,
233 1941 : rpageno,
234 1941 : Bytes::copy_from_slice(&buf),
235 1941 : )?;
236 : }
237 :
238 : // TODO: UnexpectedEof is expected
239 1941 : Err(err) => match err.kind() {
240 : std::io::ErrorKind::UnexpectedEof => {
241 : // reached EOF. That's expected.
242 1941 : ensure!(rpageno == nblocks as u32, "unexpected EOF");
243 1941 : break;
244 : }
245 : _ => {
246 0 : bail!("error reading file {}: {:#}", path.display(), err);
247 : }
248 : },
249 : };
250 1941 : rpageno += 1;
251 : }
252 :
253 1941 : Ok(())
254 1941 : }
255 :
256 : /// Scan PostgreSQL WAL files in given directory and load all records between
257 : /// 'startpoint' and 'endpoint' into the repository.
258 643 : async fn import_wal(
259 643 : walpath: &Path,
260 643 : tline: &Timeline,
261 643 : startpoint: Lsn,
262 643 : endpoint: Lsn,
263 643 : ctx: &RequestContext,
264 643 : ) -> anyhow::Result<()> {
265 643 : let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
266 643 :
267 643 : let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
268 643 : let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
269 643 : let mut last_lsn = startpoint;
270 :
271 643 : let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
272 :
273 1286 : while last_lsn <= endpoint {
274 : // FIXME: assume postgresql tli 1 for now
275 643 : let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
276 643 : let mut buf = Vec::new();
277 643 :
278 643 : // Read local file
279 643 : let mut path = walpath.join(&filename);
280 643 :
281 643 : // It could be as .partial
282 643 : if !PathBuf::from(&path).exists() {
283 0 : path = walpath.join(filename + ".partial");
284 643 : }
285 :
286 : // Slurp the WAL file
287 643 : let mut file = std::fs::File::open(&path)?;
288 :
289 643 : if offset > 0 {
290 : use std::io::Seek;
291 643 : file.seek(std::io::SeekFrom::Start(offset as u64))?;
292 0 : }
293 :
294 : use std::io::Read;
295 643 : let nread = file.read_to_end(&mut buf)?;
296 643 : if nread != WAL_SEGMENT_SIZE - offset {
297 : // Maybe allow this for .partial files?
298 0 : error!("read only {} bytes from WAL file", nread);
299 643 : }
300 :
301 643 : waldecoder.feed_bytes(&buf);
302 643 :
303 643 : let mut nrecords = 0;
304 643 : let mut modification = tline.begin_modification(endpoint);
305 643 : let mut decoded = DecodedWALRecord::default();
306 1286 : while last_lsn <= endpoint {
307 643 : if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
308 643 : walingest
309 643 : .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
310 0 : .await?;
311 643 : last_lsn = lsn;
312 643 :
313 643 : nrecords += 1;
314 :
315 0 : trace!("imported record at {} (end {})", lsn, endpoint);
316 0 : }
317 : }
318 :
319 0 : debug!("imported {} records up to {}", nrecords, last_lsn);
320 :
321 643 : segno += 1;
322 643 : offset = 0;
323 : }
324 :
325 643 : if last_lsn != startpoint {
326 643 : info!("reached end of WAL at {}", last_lsn);
327 : } else {
328 0 : info!("no WAL to import at {}", last_lsn);
329 : }
330 :
331 643 : Ok(())
332 643 : }
333 :
334 5 : pub async fn import_basebackup_from_tar(
335 5 : tline: &Timeline,
336 5 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
337 5 : base_lsn: Lsn,
338 5 : ctx: &RequestContext,
339 5 : ) -> Result<()> {
340 5 : info!("importing base at {base_lsn}");
341 5 : let mut modification = tline.begin_modification(base_lsn);
342 5 : modification.init_empty()?;
343 :
344 5 : let mut pg_control: Option<ControlFileData> = None;
345 :
346 : // Import base
347 5 : let mut entries = Archive::new(reader).entries()?;
348 3917 : while let Some(base_tar_entry) = entries.next().await {
349 3912 : let mut entry = base_tar_entry?;
350 3912 : let header = entry.header();
351 3912 : let len = header.entry_size()? as usize;
352 3912 : let file_path = header.path()?.into_owned();
353 3912 :
354 3912 : match header.entry_type() {
355 : tokio_tar::EntryType::Regular => {
356 3 : if let Some(res) =
357 3812 : import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
358 3 : {
359 3 : // We found the pg_control file.
360 3 : pg_control = Some(res);
361 3809 : }
362 3812 : modification.flush().await?;
363 : }
364 : tokio_tar::EntryType::Directory => {
365 0 : debug!("directory {:?}", file_path);
366 : }
367 : _ => {
368 0 : bail!(
369 0 : "entry {} in backup tar archive is of unexpected type: {:?}",
370 0 : file_path.display(),
371 0 : header.entry_type()
372 0 : );
373 : }
374 : }
375 : }
376 :
377 : // sanity check: ensure that pg_control is loaded
378 5 : let _pg_control = pg_control.context("pg_control file not found")?;
379 :
380 268 : modification.commit().await?;
381 3 : Ok(())
382 5 : }
383 :
384 2 : pub async fn import_wal_from_tar(
385 2 : tline: &Timeline,
386 2 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
387 2 : start_lsn: Lsn,
388 2 : end_lsn: Lsn,
389 2 : ctx: &RequestContext,
390 2 : ) -> Result<()> {
391 2 : // Set up walingest mutable state
392 2 : let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
393 2 : let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
394 2 : let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
395 2 : let mut last_lsn = start_lsn;
396 2 : let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
397 :
398 : // Ingest wal until end_lsn
399 2 : info!("importing wal until {}", end_lsn);
400 2 : let mut pg_wal_tar = Archive::new(reader);
401 2 : let mut pg_wal_entries = pg_wal_tar.entries()?;
402 4 : while last_lsn <= end_lsn {
403 2 : let bytes = {
404 2 : let mut entry = pg_wal_entries
405 2 : .next()
406 2 : .await
407 2 : .ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
408 2 : let header = entry.header();
409 2 : let file_path = header.path()?.into_owned();
410 2 :
411 2 : match header.entry_type() {
412 : tokio_tar::EntryType::Regular => {
413 : // FIXME: assume postgresql tli 1 for now
414 2 : let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
415 2 : let file_name = file_path
416 2 : .file_name()
417 2 : .expect("missing wal filename")
418 2 : .to_string_lossy();
419 2 : ensure!(expected_filename == file_name);
420 :
421 0 : debug!("processing wal file {:?}", file_path);
422 120 : read_all_bytes(&mut entry).await?
423 : }
424 : tokio_tar::EntryType::Directory => {
425 0 : debug!("directory {:?}", file_path);
426 0 : continue;
427 : }
428 : _ => {
429 0 : bail!(
430 0 : "entry {} in WAL tar archive is of unexpected type: {:?}",
431 0 : file_path.display(),
432 0 : header.entry_type()
433 0 : );
434 : }
435 : }
436 : };
437 :
438 2 : waldecoder.feed_bytes(&bytes[offset..]);
439 2 :
440 2 : let mut modification = tline.begin_modification(end_lsn);
441 2 : let mut decoded = DecodedWALRecord::default();
442 10 : while last_lsn <= end_lsn {
443 8 : if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
444 8 : walingest
445 8 : .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
446 0 : .await?;
447 8 : last_lsn = lsn;
448 :
449 0 : debug!("imported record at {} (end {})", lsn, end_lsn);
450 0 : }
451 : }
452 :
453 0 : debug!("imported records up to {}", last_lsn);
454 2 : segno += 1;
455 2 : offset = 0;
456 : }
457 :
458 2 : if last_lsn != start_lsn {
459 2 : info!("reached end of WAL at {}", last_lsn);
460 : } else {
461 0 : info!("there was no WAL to import at {}", last_lsn);
462 : }
463 :
464 : // Log any extra unused files
465 2 : while let Some(e) = pg_wal_entries.next().await {
466 0 : let entry = e?;
467 0 : let header = entry.header();
468 0 : let file_path = header.path()?.into_owned();
469 0 : info!("skipping {:?}", file_path);
470 : }
471 :
472 2 : Ok(())
473 2 : }
474 :
475 614019 : async fn import_file(
476 614019 : modification: &mut DatadirModification<'_>,
477 614019 : file_path: &Path,
478 614019 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
479 614019 : len: usize,
480 614019 : ctx: &RequestContext,
481 614019 : ) -> Result<Option<ControlFileData>> {
482 614019 : let file_name = match file_path.file_name() {
483 614019 : Some(name) => name.to_string_lossy(),
484 0 : None => return Ok(None),
485 : };
486 :
487 614019 : if file_name.starts_with('.') {
488 : // tar archives on macOs, created without COPYFILE_DISABLE=1 env var
489 : // will contain "fork files", skip them.
490 0 : return Ok(None);
491 614019 : }
492 614019 :
493 614019 : if file_path.starts_with("global") {
494 36878 : let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
495 36878 : let dbnode = 0;
496 36878 :
497 36878 : match file_name.as_ref() {
498 36878 : "pg_control" => {
499 6425 : let bytes = read_all_bytes(reader).await?;
500 :
501 : // Extract the checkpoint record and import it separately.
502 646 : let pg_control = ControlFileData::decode(&bytes[..])?;
503 646 : let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
504 646 : modification.put_checkpoint(checkpoint_bytes)?;
505 0 : debug!("imported control file");
506 :
507 : // Import it as ControlFile
508 646 : modification.put_control_file(bytes)?;
509 646 : return Ok(Some(pg_control));
510 : }
511 36232 : "pg_filenode.map" => {
512 3858 : let bytes = read_all_bytes(reader).await?;
513 647 : modification
514 647 : .put_relmap_file(spcnode, dbnode, bytes, ctx)
515 0 : .await?;
516 0 : debug!("imported relmap file")
517 : }
518 35585 : "PG_VERSION" => {
519 0 : debug!("ignored PG_VERSION file");
520 : }
521 : _ => {
522 77732 : import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
523 0 : debug!("imported rel creation");
524 : }
525 : }
526 577141 : } else if file_path.starts_with("base") {
527 570666 : let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
528 570666 : let dbnode: u32 = file_path
529 570666 : .iter()
530 570666 : .nth(1)
531 570666 : .expect("invalid file path, expected dbnode")
532 570666 : .to_string_lossy()
533 570666 : .parse()?;
534 :
535 570666 : match file_name.as_ref() {
536 570666 : "pg_filenode.map" => {
537 11377 : let bytes = read_all_bytes(reader).await?;
538 1941 : modification
539 1941 : .put_relmap_file(spcnode, dbnode, bytes, ctx)
540 0 : .await?;
541 0 : debug!("imported relmap file")
542 : }
543 568725 : "PG_VERSION" => {
544 0 : debug!("ignored PG_VERSION file");
545 : }
546 : _ => {
547 2467180 : import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
548 0 : debug!("imported rel creation");
549 : }
550 : }
551 6475 : } else if file_path.starts_with("pg_xact") {
552 647 : let slru = SlruKind::Clog;
553 647 :
554 1286 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
555 0 : debug!("imported clog slru");
556 5828 : } else if file_path.starts_with("pg_multixact/offsets") {
557 647 : let slru = SlruKind::MultiXactOffsets;
558 647 :
559 1269 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
560 0 : debug!("imported multixact offsets slru");
561 5181 : } else if file_path.starts_with("pg_multixact/members") {
562 647 : let slru = SlruKind::MultiXactMembers;
563 647 :
564 1263 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
565 0 : debug!("imported multixact members slru");
566 4534 : } else if file_path.starts_with("pg_twophase") {
567 0 : let xid = u32::from_str_radix(file_name.as_ref(), 16)?;
568 :
569 0 : let bytes = read_all_bytes(reader).await?;
570 0 : modification
571 0 : .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
572 0 : .await?;
573 0 : debug!("imported twophase file");
574 4534 : } else if file_path.starts_with("pg_wal") {
575 0 : debug!("found wal file in base section. ignore it");
576 4533 : } else if file_path.starts_with("zenith.signal") {
577 : // Parse zenith signal file to set correct previous LSN
578 1 : let bytes = read_all_bytes(reader).await?;
579 : // zenith.signal format is "PREV LSN: prev_lsn"
580 : // TODO write serialization and deserialization in the same place.
581 1 : let zenith_signal = std::str::from_utf8(&bytes)?.trim();
582 1 : let prev_lsn = match zenith_signal {
583 1 : "PREV LSN: none" => Lsn(0),
584 1 : "PREV LSN: invalid" => Lsn(0),
585 1 : other => {
586 1 : let split = other.split(':').collect::<Vec<_>>();
587 1 : split[1]
588 1 : .trim()
589 1 : .parse::<Lsn>()
590 1 : .context("can't parse zenith.signal")?
591 : }
592 : };
593 :
594 : // zenith.signal is not necessarily the last file, that we handle
595 : // but it is ok to call `finish_write()`, because final `modification.commit()`
596 : // will update lsn once more to the final one.
597 1 : let writer = modification.tline.writer().await;
598 1 : writer.finish_write(prev_lsn);
599 :
600 0 : debug!("imported zenith signal {}", prev_lsn);
601 4532 : } else if file_path.starts_with("pg_tblspc") {
602 : // TODO Backups exported from neon won't have pg_tblspc, but we will need
603 : // this to import arbitrary postgres databases.
604 0 : bail!("Importing pg_tblspc is not implemented");
605 : } else {
606 0 : debug!(
607 0 : "ignoring unrecognized file \"{}\" in tar archive",
608 0 : file_path.display()
609 0 : );
610 : }
611 :
612 613373 : Ok(None)
613 614019 : }
614 :
615 3237 : async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
616 3237 : let mut buf: Vec<u8> = vec![];
617 21780 : reader.read_to_end(&mut buf).await?;
618 3237 : Ok(Bytes::from(buf))
619 3237 : }
|