Line data Source code
1 : //!
2 : //! Import data and WAL from a PostgreSQL data directory and WAL segments into
3 : //! a neon Timeline.
4 : //!
5 : use std::path::{Path, PathBuf};
6 :
7 : use anyhow::{Context, Result, bail, ensure};
8 : use bytes::Bytes;
9 : use camino::Utf8Path;
10 : use futures::StreamExt;
11 : use pageserver_api::key::rel_block_to_key;
12 : use pageserver_api::reltag::{RelTag, SlruKind};
13 : use postgres_ffi::relfile_utils::*;
14 : use postgres_ffi::waldecoder::WalStreamDecoder;
15 : use postgres_ffi::{
16 : BLCKSZ, ControlFileData, DBState_DB_SHUTDOWNED, Oid, WAL_SEGMENT_SIZE, XLogFileName,
17 : pg_constants,
18 : };
19 : use tokio::io::{AsyncRead, AsyncReadExt};
20 : use tokio_tar::Archive;
21 : use tracing::*;
22 : use utils::lsn::Lsn;
23 : use wal_decoder::models::InterpretedWalRecord;
24 : use walkdir::WalkDir;
25 :
26 : use crate::context::RequestContext;
27 : use crate::metrics::WAL_INGEST;
28 : use crate::pgdatadir_mapping::*;
29 : use crate::tenant::Timeline;
30 : use crate::walingest::WalIngest;
31 :
32 : // Returns checkpoint LSN from controlfile
33 4 : pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
34 4 : // Read control file to extract the LSN
35 4 : let controlfile_path = path.join("global").join("pg_control");
36 4 : let controlfile_buf = std::fs::read(&controlfile_path)
37 4 : .with_context(|| format!("reading controlfile: {controlfile_path}"))?;
38 4 : let controlfile = ControlFileData::decode(&controlfile_buf)?;
39 4 : let lsn = controlfile.checkPoint;
40 4 :
41 4 : Ok(Lsn(lsn))
42 4 : }
43 :
44 : ///
45 : /// Import all relation data pages from local disk into the repository.
46 : ///
47 : /// This is currently only used to import a cluster freshly created by initdb.
48 : /// The code that deals with the checkpoint would not work right if the
49 : /// cluster was not shut down cleanly.
50 4 : pub async fn import_timeline_from_postgres_datadir(
51 4 : tline: &Timeline,
52 4 : pgdata_path: &Utf8Path,
53 4 : pgdata_lsn: Lsn,
54 4 : ctx: &RequestContext,
55 4 : ) -> Result<()> {
56 4 : let mut pg_control: Option<ControlFileData> = None;
57 4 :
58 4 : // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
59 4 : // Then fishing out pg_control would be unnecessary
60 4 : let mut modification = tline.begin_modification(pgdata_lsn);
61 4 : modification.init_empty()?;
62 :
63 : // Import all but pg_wal
64 4 : let all_but_wal = WalkDir::new(pgdata_path)
65 4 : .into_iter()
66 3960 : .filter_entry(|entry| !entry.path().ends_with("pg_wal"));
67 3960 : for entry in all_but_wal {
68 3956 : let entry = entry?;
69 3956 : let metadata = entry.metadata().expect("error getting dir entry metadata");
70 3956 : if metadata.is_file() {
71 3860 : let absolute_path = entry.path();
72 3860 : let relative_path = absolute_path.strip_prefix(pgdata_path)?;
73 :
74 3860 : let mut file = tokio::fs::File::open(absolute_path).await?;
75 3860 : let len = metadata.len() as usize;
76 4 : if let Some(control_file) =
77 3860 : import_file(&mut modification, relative_path, &mut file, len, ctx).await?
78 4 : {
79 4 : pg_control = Some(control_file);
80 3856 : }
81 3860 : modification.flush(ctx).await?;
82 96 : }
83 : }
84 :
85 : // We're done importing all the data files.
86 4 : modification.commit(ctx).await?;
87 :
88 : // We expect the Postgres server to be shut down cleanly.
89 4 : let pg_control = pg_control.context("pg_control file not found")?;
90 4 : ensure!(
91 4 : pg_control.state == DBState_DB_SHUTDOWNED,
92 0 : "Postgres cluster was not shut down cleanly"
93 : );
94 4 : ensure!(
95 4 : pg_control.checkPointCopy.redo == pgdata_lsn.0,
96 0 : "unexpected checkpoint REDO pointer"
97 : );
98 :
99 : // Import WAL. This is needed even when starting from a shutdown checkpoint, because
100 : // this reads the checkpoint record itself, advancing the tip of the timeline to
101 : // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
102 4 : import_wal(
103 4 : &pgdata_path.join("pg_wal"),
104 4 : tline,
105 4 : Lsn(pg_control.checkPointCopy.redo),
106 4 : pgdata_lsn,
107 4 : ctx,
108 4 : )
109 4 : .await?;
110 :
111 4 : Ok(())
112 4 : }
113 :
114 : // subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
115 3784 : async fn import_rel(
116 3784 : modification: &mut DatadirModification<'_>,
117 3784 : path: &Path,
118 3784 : spcoid: Oid,
119 3784 : dboid: Oid,
120 3784 : reader: &mut (impl AsyncRead + Unpin),
121 3784 : len: usize,
122 3784 : ctx: &RequestContext,
123 3784 : ) -> anyhow::Result<()> {
124 3784 : // Does it look like a relation file?
125 3784 : trace!("importing rel file {}", path.display());
126 :
127 3784 : let filename = &path
128 3784 : .file_name()
129 3784 : .expect("missing rel filename")
130 3784 : .to_string_lossy();
131 3784 : let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
132 0 : warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
133 0 : e
134 3784 : })?;
135 :
136 3784 : let mut buf: [u8; 8192] = [0u8; 8192];
137 3784 :
138 3784 : ensure!(len % BLCKSZ as usize == 0);
139 3784 : let nblocks = len / BLCKSZ as usize;
140 3784 :
141 3784 : let rel = RelTag {
142 3784 : spcnode: spcoid,
143 3784 : dbnode: dboid,
144 3784 : relnode,
145 3784 : forknum,
146 3784 : };
147 3784 :
148 3784 : let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
149 :
150 : // Call put_rel_creation for every segment of the relation,
151 : // because there is no guarantee about the order in which we are processing segments.
152 : // ignore "relation already exists" error
153 : //
154 : // FIXME: Keep track of which relations we've already created?
155 : // https://github.com/neondatabase/neon/issues/3309
156 3784 : if let Err(e) = modification
157 3784 : .put_rel_creation(rel, nblocks as u32, ctx)
158 3784 : .await
159 : {
160 0 : match e {
161 : RelationError::AlreadyExists => {
162 0 : debug!("Relation {} already exist. We must be extending it.", rel)
163 : }
164 0 : _ => return Err(e.into()),
165 : }
166 3784 : }
167 :
168 : loop {
169 14664 : let r = reader.read_exact(&mut buf).await;
170 14664 : match r {
171 : Ok(_) => {
172 10880 : let key = rel_block_to_key(rel, blknum);
173 10880 : if modification.tline.get_shard_identity().is_key_local(&key) {
174 10880 : modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
175 0 : }
176 : }
177 :
178 : // TODO: UnexpectedEof is expected
179 3784 : Err(err) => match err.kind() {
180 : std::io::ErrorKind::UnexpectedEof => {
181 : // reached EOF. That's expected.
182 3784 : let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
183 3784 : ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
184 3784 : break;
185 : }
186 : _ => {
187 0 : bail!("error reading file {}: {:#}", path.display(), err);
188 : }
189 : },
190 : };
191 10880 : blknum += 1;
192 : }
193 :
194 : // Update relation size
195 : //
196 : // If we process rel segments out of order,
197 : // put_rel_extend will skip the update.
198 3784 : modification.put_rel_extend(rel, blknum, ctx).await?;
199 :
200 3784 : Ok(())
201 3784 : }
202 :
203 : /// Import an SLRU segment file
204 : ///
205 12 : async fn import_slru(
206 12 : modification: &mut DatadirModification<'_>,
207 12 : slru: SlruKind,
208 12 : path: &Path,
209 12 : reader: &mut (impl AsyncRead + Unpin),
210 12 : len: usize,
211 12 : ctx: &RequestContext,
212 12 : ) -> anyhow::Result<()> {
213 12 : info!("importing slru file {path:?}");
214 :
215 12 : let mut buf: [u8; 8192] = [0u8; 8192];
216 12 : let filename = &path
217 12 : .file_name()
218 12 : .with_context(|| format!("missing slru filename for path {path:?}"))?
219 12 : .to_string_lossy();
220 12 : let segno = u32::from_str_radix(filename, 16)?;
221 :
222 12 : ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
223 12 : let nblocks = len / BLCKSZ as usize;
224 12 :
225 12 : ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
226 :
227 12 : modification
228 12 : .put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
229 12 : .await?;
230 :
231 12 : let mut rpageno = 0;
232 : loop {
233 24 : let r = reader.read_exact(&mut buf).await;
234 24 : match r {
235 : Ok(_) => {
236 12 : modification.put_slru_page_image(
237 12 : slru,
238 12 : segno,
239 12 : rpageno,
240 12 : Bytes::copy_from_slice(&buf),
241 12 : )?;
242 : }
243 :
244 : // TODO: UnexpectedEof is expected
245 12 : Err(err) => match err.kind() {
246 : std::io::ErrorKind::UnexpectedEof => {
247 : // reached EOF. That's expected.
248 12 : ensure!(rpageno == nblocks as u32, "unexpected EOF");
249 12 : break;
250 : }
251 : _ => {
252 0 : bail!("error reading file {}: {:#}", path.display(), err);
253 : }
254 : },
255 : };
256 12 : rpageno += 1;
257 : }
258 :
259 12 : Ok(())
260 12 : }
261 :
262 : /// Scan PostgreSQL WAL files in given directory and load all records between
263 : /// 'startpoint' and 'endpoint' into the repository.
264 4 : async fn import_wal(
265 4 : walpath: &Utf8Path,
266 4 : tline: &Timeline,
267 4 : startpoint: Lsn,
268 4 : endpoint: Lsn,
269 4 : ctx: &RequestContext,
270 4 : ) -> anyhow::Result<()> {
271 4 : let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
272 4 :
273 4 : let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
274 4 : let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
275 4 : let mut last_lsn = startpoint;
276 :
277 4 : let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
278 :
279 4 : let shard = vec![*tline.get_shard_identity()];
280 :
281 8 : while last_lsn <= endpoint {
282 : // FIXME: assume postgresql tli 1 for now
283 4 : let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
284 4 : let mut buf = Vec::new();
285 4 :
286 4 : // Read local file
287 4 : let mut path = walpath.join(&filename);
288 4 :
289 4 : // It could be as .partial
290 4 : if !PathBuf::from(&path).exists() {
291 0 : path = walpath.join(filename + ".partial");
292 4 : }
293 :
294 : // Slurp the WAL file
295 4 : let mut file = std::fs::File::open(&path)?;
296 :
297 4 : if offset > 0 {
298 4 : use std::io::Seek;
299 4 : file.seek(std::io::SeekFrom::Start(offset as u64))?;
300 0 : }
301 :
302 : use std::io::Read;
303 4 : let nread = file.read_to_end(&mut buf)?;
304 4 : if nread != WAL_SEGMENT_SIZE - offset {
305 : // Maybe allow this for .partial files?
306 0 : error!("read only {} bytes from WAL file", nread);
307 4 : }
308 :
309 4 : waldecoder.feed_bytes(&buf);
310 4 :
311 4 : let mut nrecords = 0;
312 4 : let mut modification = tline.begin_modification(last_lsn);
313 8 : while last_lsn <= endpoint {
314 4 : if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
315 4 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
316 4 : recdata,
317 4 : &shard,
318 4 : lsn,
319 4 : tline.pg_version,
320 4 : )?
321 4 : .remove(tline.get_shard_identity())
322 4 : .unwrap();
323 4 :
324 4 : walingest
325 4 : .ingest_record(interpreted, &mut modification, ctx)
326 4 : .await?;
327 4 : WAL_INGEST.records_committed.inc();
328 4 :
329 4 : modification.commit(ctx).await?;
330 4 : last_lsn = lsn;
331 4 :
332 4 : nrecords += 1;
333 4 :
334 4 : trace!("imported record at {} (end {})", lsn, endpoint);
335 0 : }
336 : }
337 :
338 4 : debug!("imported {} records up to {}", nrecords, last_lsn);
339 :
340 4 : segno += 1;
341 4 : offset = 0;
342 : }
343 :
344 4 : if last_lsn != startpoint {
345 4 : info!("reached end of WAL at {}", last_lsn);
346 : } else {
347 0 : info!("no WAL to import at {}", last_lsn);
348 : }
349 :
350 4 : Ok(())
351 4 : }
352 :
353 0 : pub async fn import_basebackup_from_tar(
354 0 : tline: &Timeline,
355 0 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
356 0 : base_lsn: Lsn,
357 0 : ctx: &RequestContext,
358 0 : ) -> Result<()> {
359 0 : info!("importing base at {base_lsn}");
360 0 : let mut modification = tline.begin_modification(base_lsn);
361 0 : modification.init_empty()?;
362 :
363 0 : let mut pg_control: Option<ControlFileData> = None;
364 :
365 : // Import base
366 0 : let mut entries = Archive::new(reader).entries()?;
367 0 : while let Some(base_tar_entry) = entries.next().await {
368 0 : let mut entry = base_tar_entry?;
369 0 : let header = entry.header();
370 0 : let len = header.entry_size()? as usize;
371 0 : let file_path = header.path()?.into_owned();
372 0 :
373 0 : match header.entry_type() {
374 : tokio_tar::EntryType::Regular => {
375 0 : if let Some(res) =
376 0 : import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
377 0 : {
378 0 : // We found the pg_control file.
379 0 : pg_control = Some(res);
380 0 : }
381 0 : modification.flush(ctx).await?;
382 : }
383 : tokio_tar::EntryType::Directory => {
384 0 : debug!("directory {:?}", file_path);
385 : }
386 : _ => {
387 0 : bail!(
388 0 : "entry {} in backup tar archive is of unexpected type: {:?}",
389 0 : file_path.display(),
390 0 : header.entry_type()
391 0 : );
392 : }
393 : }
394 : }
395 :
396 : // sanity check: ensure that pg_control is loaded
397 0 : let _pg_control = pg_control.context("pg_control file not found")?;
398 :
399 0 : modification.commit(ctx).await?;
400 0 : Ok(())
401 0 : }
402 :
403 0 : pub async fn import_wal_from_tar(
404 0 : tline: &Timeline,
405 0 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
406 0 : start_lsn: Lsn,
407 0 : end_lsn: Lsn,
408 0 : ctx: &RequestContext,
409 0 : ) -> Result<()> {
410 0 : // Set up walingest mutable state
411 0 : let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
412 0 : let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
413 0 : let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
414 0 : let mut last_lsn = start_lsn;
415 0 : let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
416 0 : let shard = vec![*tline.get_shard_identity()];
417 0 :
418 0 : // Ingest wal until end_lsn
419 0 : info!("importing wal until {}", end_lsn);
420 0 : let mut pg_wal_tar = Archive::new(reader);
421 0 : let mut pg_wal_entries = pg_wal_tar.entries()?;
422 0 : while last_lsn <= end_lsn {
423 0 : let bytes = {
424 0 : let mut entry = pg_wal_entries
425 0 : .next()
426 0 : .await
427 0 : .ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
428 0 : let header = entry.header();
429 0 : let file_path = header.path()?.into_owned();
430 0 :
431 0 : match header.entry_type() {
432 : tokio_tar::EntryType::Regular => {
433 : // FIXME: assume postgresql tli 1 for now
434 0 : let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
435 0 : let file_name = file_path
436 0 : .file_name()
437 0 : .expect("missing wal filename")
438 0 : .to_string_lossy();
439 0 : ensure!(expected_filename == file_name);
440 :
441 0 : debug!("processing wal file {:?}", file_path);
442 0 : read_all_bytes(&mut entry).await?
443 : }
444 : tokio_tar::EntryType::Directory => {
445 0 : debug!("directory {:?}", file_path);
446 0 : continue;
447 : }
448 : _ => {
449 0 : bail!(
450 0 : "entry {} in WAL tar archive is of unexpected type: {:?}",
451 0 : file_path.display(),
452 0 : header.entry_type()
453 0 : );
454 : }
455 : }
456 : };
457 :
458 0 : waldecoder.feed_bytes(&bytes[offset..]);
459 0 :
460 0 : let mut modification = tline.begin_modification(last_lsn);
461 0 : while last_lsn <= end_lsn {
462 0 : if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
463 0 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
464 0 : recdata,
465 0 : &shard,
466 0 : lsn,
467 0 : tline.pg_version,
468 0 : )?
469 0 : .remove(tline.get_shard_identity())
470 0 : .unwrap();
471 0 :
472 0 : walingest
473 0 : .ingest_record(interpreted, &mut modification, ctx)
474 0 : .await?;
475 0 : modification.commit(ctx).await?;
476 0 : last_lsn = lsn;
477 0 :
478 0 : debug!("imported record at {} (end {})", lsn, end_lsn);
479 0 : }
480 : }
481 :
482 0 : debug!("imported records up to {}", last_lsn);
483 0 : segno += 1;
484 0 : offset = 0;
485 : }
486 :
487 0 : if last_lsn != start_lsn {
488 0 : info!("reached end of WAL at {}", last_lsn);
489 : } else {
490 0 : info!("there was no WAL to import at {}", last_lsn);
491 : }
492 :
493 : // Log any extra unused files
494 0 : while let Some(e) = pg_wal_entries.next().await {
495 0 : let entry = e?;
496 0 : let header = entry.header();
497 0 : let file_path = header.path()?.into_owned();
498 0 : info!("skipping {:?}", file_path);
499 : }
500 :
501 0 : Ok(())
502 0 : }
503 :
504 3860 : async fn import_file(
505 3860 : modification: &mut DatadirModification<'_>,
506 3860 : file_path: &Path,
507 3860 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
508 3860 : len: usize,
509 3860 : ctx: &RequestContext,
510 3860 : ) -> Result<Option<ControlFileData>> {
511 3860 : let file_name = match file_path.file_name() {
512 3860 : Some(name) => name.to_string_lossy(),
513 0 : None => return Ok(None),
514 : };
515 :
516 3860 : if file_name.starts_with('.') {
517 : // tar archives on macOs, created without COPYFILE_DISABLE=1 env var
518 : // will contain "fork files", skip them.
519 0 : return Ok(None);
520 3860 : }
521 3860 :
522 3860 : if file_path.starts_with("global") {
523 240 : let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
524 240 : let dbnode = 0;
525 240 :
526 240 : match file_name.as_ref() {
527 240 : "pg_control" => {
528 4 : let bytes = read_all_bytes(reader).await?;
529 :
530 : // Extract the checkpoint record and import it separately.
531 4 : let pg_control = ControlFileData::decode(&bytes[..])?;
532 4 : let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
533 4 : modification.put_checkpoint(checkpoint_bytes)?;
534 4 : debug!("imported control file");
535 :
536 : // Import it as ControlFile
537 4 : modification.put_control_file(bytes)?;
538 4 : return Ok(Some(pg_control));
539 : }
540 236 : "pg_filenode.map" => {
541 4 : let bytes = read_all_bytes(reader).await?;
542 4 : modification
543 4 : .put_relmap_file(spcnode, dbnode, bytes, ctx)
544 4 : .await?;
545 4 : debug!("imported relmap file")
546 : }
547 232 : "PG_VERSION" => {
548 0 : debug!("ignored PG_VERSION file");
549 : }
550 : _ => {
551 232 : import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
552 232 : debug!("imported rel creation");
553 : }
554 : }
555 3620 : } else if file_path.starts_with("base") {
556 3576 : let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
557 3576 : let dbnode: u32 = file_path
558 3576 : .iter()
559 3576 : .nth(1)
560 3576 : .expect("invalid file path, expected dbnode")
561 3576 : .to_string_lossy()
562 3576 : .parse()?;
563 :
564 3576 : match file_name.as_ref() {
565 3576 : "pg_filenode.map" => {
566 12 : let bytes = read_all_bytes(reader).await?;
567 12 : modification
568 12 : .put_relmap_file(spcnode, dbnode, bytes, ctx)
569 12 : .await?;
570 12 : debug!("imported relmap file")
571 : }
572 3564 : "PG_VERSION" => {
573 12 : debug!("ignored PG_VERSION file");
574 : }
575 : _ => {
576 3552 : import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
577 3552 : debug!("imported rel creation");
578 : }
579 : }
580 44 : } else if file_path.starts_with("pg_xact") {
581 4 : let slru = SlruKind::Clog;
582 4 :
583 4 : if modification.tline.tenant_shard_id.is_shard_zero() {
584 4 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
585 4 : debug!("imported clog slru");
586 0 : }
587 40 : } else if file_path.starts_with("pg_multixact/offsets") {
588 4 : let slru = SlruKind::MultiXactOffsets;
589 4 :
590 4 : if modification.tline.tenant_shard_id.is_shard_zero() {
591 4 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
592 4 : debug!("imported multixact offsets slru");
593 0 : }
594 36 : } else if file_path.starts_with("pg_multixact/members") {
595 4 : let slru = SlruKind::MultiXactMembers;
596 4 :
597 4 : if modification.tline.tenant_shard_id.is_shard_zero() {
598 4 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
599 4 : debug!("imported multixact members slru");
600 0 : }
601 32 : } else if file_path.starts_with("pg_twophase") {
602 0 : let bytes = read_all_bytes(reader).await?;
603 :
604 : // In PostgreSQL v17, this is a 64-bit FullTransactionid. In previous versions,
605 : // it's a 32-bit TransactionId, which fits in u64 anyway.
606 0 : let xid = u64::from_str_radix(file_name.as_ref(), 16)?;
607 0 : modification
608 0 : .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
609 0 : .await?;
610 0 : debug!("imported twophase file");
611 32 : } else if file_path.starts_with("pg_wal") {
612 0 : debug!("found wal file in base section. ignore it");
613 32 : } else if file_path.starts_with("zenith.signal") {
614 : // Parse zenith signal file to set correct previous LSN
615 0 : let bytes = read_all_bytes(reader).await?;
616 : // zenith.signal format is "PREV LSN: prev_lsn"
617 : // TODO write serialization and deserialization in the same place.
618 0 : let zenith_signal = std::str::from_utf8(&bytes)?.trim();
619 0 : let prev_lsn = match zenith_signal {
620 0 : "PREV LSN: none" => Lsn(0),
621 0 : "PREV LSN: invalid" => Lsn(0),
622 0 : other => {
623 0 : let split = other.split(':').collect::<Vec<_>>();
624 0 : split[1]
625 0 : .trim()
626 0 : .parse::<Lsn>()
627 0 : .context("can't parse zenith.signal")?
628 : }
629 : };
630 :
631 : // zenith.signal is not necessarily the last file, that we handle
632 : // but it is ok to call `finish_write()`, because final `modification.commit()`
633 : // will update lsn once more to the final one.
634 0 : let writer = modification.tline.writer().await;
635 0 : writer.finish_write(prev_lsn);
636 0 :
637 0 : debug!("imported zenith signal {}", prev_lsn);
638 32 : } else if file_path.starts_with("pg_tblspc") {
639 : // TODO Backups exported from neon won't have pg_tblspc, but we will need
640 : // this to import arbitrary postgres databases.
641 0 : bail!("Importing pg_tblspc is not implemented");
642 : } else {
643 32 : debug!(
644 0 : "ignoring unrecognized file \"{}\" in tar archive",
645 0 : file_path.display()
646 : );
647 : }
648 :
649 3856 : Ok(None)
650 3860 : }
651 :
652 20 : async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
653 20 : let mut buf: Vec<u8> = vec![];
654 20 : reader.read_to_end(&mut buf).await?;
655 20 : Ok(Bytes::from(buf))
656 20 : }
|