Line data Source code
1 : //!
2 : //! Import data and WAL from a PostgreSQL data directory and WAL segments into
3 : //! a neon Timeline.
4 : //!
5 : use std::path::{Path, PathBuf};
6 :
7 : use anyhow::{bail, ensure, Context, Result};
8 : use bytes::Bytes;
9 : use camino::Utf8Path;
10 : use futures::StreamExt;
11 : use pageserver_api::key::rel_block_to_key;
12 : use tokio::io::{AsyncRead, AsyncReadExt};
13 : use tokio_tar::Archive;
14 : use tracing::*;
15 : use wal_decoder::models::InterpretedWalRecord;
16 : use walkdir::WalkDir;
17 :
18 : use crate::context::RequestContext;
19 : use crate::metrics::WAL_INGEST;
20 : use crate::pgdatadir_mapping::*;
21 : use crate::tenant::Timeline;
22 : use crate::walingest::WalIngest;
23 : use pageserver_api::reltag::{RelTag, SlruKind};
24 : use postgres_ffi::pg_constants;
25 : use postgres_ffi::relfile_utils::*;
26 : use postgres_ffi::waldecoder::WalStreamDecoder;
27 : use postgres_ffi::ControlFileData;
28 : use postgres_ffi::DBState_DB_SHUTDOWNED;
29 : use postgres_ffi::Oid;
30 : use postgres_ffi::XLogFileName;
31 : use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
32 : use utils::lsn::Lsn;
33 :
34 : // Returns checkpoint LSN from controlfile
35 2 : pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
36 2 : // Read control file to extract the LSN
37 2 : let controlfile_path = path.join("global").join("pg_control");
38 2 : let controlfile_buf = std::fs::read(&controlfile_path)
39 2 : .with_context(|| format!("reading controlfile: {controlfile_path}"))?;
40 2 : let controlfile = ControlFileData::decode(&controlfile_buf)?;
41 2 : let lsn = controlfile.checkPoint;
42 2 :
43 2 : Ok(Lsn(lsn))
44 2 : }
45 :
46 : ///
47 : /// Import all relation data pages from local disk into the repository.
48 : ///
49 : /// This is currently only used to import a cluster freshly created by initdb.
50 : /// The code that deals with the checkpoint would not work right if the
51 : /// cluster was not shut down cleanly.
52 2 : pub async fn import_timeline_from_postgres_datadir(
53 2 : tline: &Timeline,
54 2 : pgdata_path: &Utf8Path,
55 2 : pgdata_lsn: Lsn,
56 2 : ctx: &RequestContext,
57 2 : ) -> Result<()> {
58 2 : let mut pg_control: Option<ControlFileData> = None;
59 2 :
60 2 : // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
61 2 : // Then fishing out pg_control would be unnecessary
62 2 : let mut modification = tline.begin_modification(pgdata_lsn);
63 2 : modification.init_empty()?;
64 :
65 : // Import all but pg_wal
66 2 : let all_but_wal = WalkDir::new(pgdata_path)
67 2 : .into_iter()
68 1980 : .filter_entry(|entry| !entry.path().ends_with("pg_wal"));
69 1980 : for entry in all_but_wal {
70 1978 : let entry = entry?;
71 1978 : let metadata = entry.metadata().expect("error getting dir entry metadata");
72 1978 : if metadata.is_file() {
73 1930 : let absolute_path = entry.path();
74 1930 : let relative_path = absolute_path.strip_prefix(pgdata_path)?;
75 :
76 1930 : let mut file = tokio::fs::File::open(absolute_path).await?;
77 1930 : let len = metadata.len() as usize;
78 2 : if let Some(control_file) =
79 1930 : import_file(&mut modification, relative_path, &mut file, len, ctx).await?
80 2 : {
81 2 : pg_control = Some(control_file);
82 1928 : }
83 1930 : modification.flush(ctx).await?;
84 48 : }
85 : }
86 :
87 : // We're done importing all the data files.
88 2 : modification.commit(ctx).await?;
89 :
90 : // We expect the Postgres server to be shut down cleanly.
91 2 : let pg_control = pg_control.context("pg_control file not found")?;
92 2 : ensure!(
93 2 : pg_control.state == DBState_DB_SHUTDOWNED,
94 0 : "Postgres cluster was not shut down cleanly"
95 : );
96 2 : ensure!(
97 2 : pg_control.checkPointCopy.redo == pgdata_lsn.0,
98 0 : "unexpected checkpoint REDO pointer"
99 : );
100 :
101 : // Import WAL. This is needed even when starting from a shutdown checkpoint, because
102 : // this reads the checkpoint record itself, advancing the tip of the timeline to
103 : // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
104 2 : import_wal(
105 2 : &pgdata_path.join("pg_wal"),
106 2 : tline,
107 2 : Lsn(pg_control.checkPointCopy.redo),
108 2 : pgdata_lsn,
109 2 : ctx,
110 2 : )
111 2 : .await?;
112 :
113 2 : Ok(())
114 2 : }
115 :
116 : // subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
117 1892 : async fn import_rel(
118 1892 : modification: &mut DatadirModification<'_>,
119 1892 : path: &Path,
120 1892 : spcoid: Oid,
121 1892 : dboid: Oid,
122 1892 : reader: &mut (impl AsyncRead + Unpin),
123 1892 : len: usize,
124 1892 : ctx: &RequestContext,
125 1892 : ) -> anyhow::Result<()> {
126 1892 : // Does it look like a relation file?
127 1892 : trace!("importing rel file {}", path.display());
128 :
129 1892 : let filename = &path
130 1892 : .file_name()
131 1892 : .expect("missing rel filename")
132 1892 : .to_string_lossy();
133 1892 : let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
134 0 : warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
135 0 : e
136 1892 : })?;
137 :
138 1892 : let mut buf: [u8; 8192] = [0u8; 8192];
139 1892 :
140 1892 : ensure!(len % BLCKSZ as usize == 0);
141 1892 : let nblocks = len / BLCKSZ as usize;
142 1892 :
143 1892 : let rel = RelTag {
144 1892 : spcnode: spcoid,
145 1892 : dbnode: dboid,
146 1892 : relnode,
147 1892 : forknum,
148 1892 : };
149 1892 :
150 1892 : let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
151 :
152 : // Call put_rel_creation for every segment of the relation,
153 : // because there is no guarantee about the order in which we are processing segments.
154 : // ignore "relation already exists" error
155 : //
156 : // FIXME: Keep track of which relations we've already created?
157 : // https://github.com/neondatabase/neon/issues/3309
158 1892 : if let Err(e) = modification
159 1892 : .put_rel_creation(rel, nblocks as u32, ctx)
160 1892 : .await
161 : {
162 0 : match e {
163 : RelationError::AlreadyExists => {
164 0 : debug!("Relation {} already exist. We must be extending it.", rel)
165 : }
166 0 : _ => return Err(e.into()),
167 : }
168 1892 : }
169 :
170 : loop {
171 7332 : let r = reader.read_exact(&mut buf).await;
172 7332 : match r {
173 : Ok(_) => {
174 5440 : let key = rel_block_to_key(rel, blknum);
175 5440 : if modification.tline.get_shard_identity().is_key_local(&key) {
176 5440 : modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
177 0 : }
178 : }
179 :
180 : // TODO: UnexpectedEof is expected
181 1892 : Err(err) => match err.kind() {
182 : std::io::ErrorKind::UnexpectedEof => {
183 : // reached EOF. That's expected.
184 1892 : let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
185 1892 : ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
186 1892 : break;
187 : }
188 : _ => {
189 0 : bail!("error reading file {}: {:#}", path.display(), err);
190 : }
191 : },
192 : };
193 5440 : blknum += 1;
194 : }
195 :
196 : // Update relation size
197 : //
198 : // If we process rel segments out of order,
199 : // put_rel_extend will skip the update.
200 1892 : modification.put_rel_extend(rel, blknum, ctx).await?;
201 :
202 1892 : Ok(())
203 1892 : }
204 :
205 : /// Import an SLRU segment file
206 : ///
207 6 : async fn import_slru(
208 6 : modification: &mut DatadirModification<'_>,
209 6 : slru: SlruKind,
210 6 : path: &Path,
211 6 : reader: &mut (impl AsyncRead + Unpin),
212 6 : len: usize,
213 6 : ctx: &RequestContext,
214 6 : ) -> anyhow::Result<()> {
215 6 : info!("importing slru file {path:?}");
216 :
217 6 : let mut buf: [u8; 8192] = [0u8; 8192];
218 6 : let filename = &path
219 6 : .file_name()
220 6 : .with_context(|| format!("missing slru filename for path {path:?}"))?
221 6 : .to_string_lossy();
222 6 : let segno = u32::from_str_radix(filename, 16)?;
223 :
224 6 : ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
225 6 : let nblocks = len / BLCKSZ as usize;
226 6 :
227 6 : ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
228 :
229 6 : modification
230 6 : .put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
231 6 : .await?;
232 :
233 6 : let mut rpageno = 0;
234 : loop {
235 12 : let r = reader.read_exact(&mut buf).await;
236 12 : match r {
237 : Ok(_) => {
238 6 : modification.put_slru_page_image(
239 6 : slru,
240 6 : segno,
241 6 : rpageno,
242 6 : Bytes::copy_from_slice(&buf),
243 6 : )?;
244 : }
245 :
246 : // TODO: UnexpectedEof is expected
247 6 : Err(err) => match err.kind() {
248 : std::io::ErrorKind::UnexpectedEof => {
249 : // reached EOF. That's expected.
250 6 : ensure!(rpageno == nblocks as u32, "unexpected EOF");
251 6 : break;
252 : }
253 : _ => {
254 0 : bail!("error reading file {}: {:#}", path.display(), err);
255 : }
256 : },
257 : };
258 6 : rpageno += 1;
259 : }
260 :
261 6 : Ok(())
262 6 : }
263 :
264 : /// Scan PostgreSQL WAL files in given directory and load all records between
265 : /// 'startpoint' and 'endpoint' into the repository.
266 2 : async fn import_wal(
267 2 : walpath: &Utf8Path,
268 2 : tline: &Timeline,
269 2 : startpoint: Lsn,
270 2 : endpoint: Lsn,
271 2 : ctx: &RequestContext,
272 2 : ) -> anyhow::Result<()> {
273 2 : let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
274 2 :
275 2 : let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
276 2 : let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
277 2 : let mut last_lsn = startpoint;
278 :
279 2 : let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
280 :
281 2 : let shard = vec![*tline.get_shard_identity()];
282 :
283 4 : while last_lsn <= endpoint {
284 : // FIXME: assume postgresql tli 1 for now
285 2 : let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
286 2 : let mut buf = Vec::new();
287 2 :
288 2 : // Read local file
289 2 : let mut path = walpath.join(&filename);
290 2 :
291 2 : // It could be as .partial
292 2 : if !PathBuf::from(&path).exists() {
293 0 : path = walpath.join(filename + ".partial");
294 2 : }
295 :
296 : // Slurp the WAL file
297 2 : let mut file = std::fs::File::open(&path)?;
298 :
299 2 : if offset > 0 {
300 2 : use std::io::Seek;
301 2 : file.seek(std::io::SeekFrom::Start(offset as u64))?;
302 0 : }
303 :
304 : use std::io::Read;
305 2 : let nread = file.read_to_end(&mut buf)?;
306 2 : if nread != WAL_SEGMENT_SIZE - offset {
307 : // Maybe allow this for .partial files?
308 0 : error!("read only {} bytes from WAL file", nread);
309 2 : }
310 :
311 2 : waldecoder.feed_bytes(&buf);
312 2 :
313 2 : let mut nrecords = 0;
314 2 : let mut modification = tline.begin_modification(last_lsn);
315 4 : while last_lsn <= endpoint {
316 2 : if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
317 2 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
318 2 : recdata,
319 2 : &shard,
320 2 : lsn,
321 2 : tline.pg_version,
322 2 : )?
323 2 : .remove(tline.get_shard_identity())
324 2 : .unwrap();
325 2 :
326 2 : walingest
327 2 : .ingest_record(interpreted, &mut modification, ctx)
328 2 : .await?;
329 2 : WAL_INGEST.records_committed.inc();
330 2 :
331 2 : modification.commit(ctx).await?;
332 2 : last_lsn = lsn;
333 2 :
334 2 : nrecords += 1;
335 2 :
336 2 : trace!("imported record at {} (end {})", lsn, endpoint);
337 0 : }
338 : }
339 :
340 2 : debug!("imported {} records up to {}", nrecords, last_lsn);
341 :
342 2 : segno += 1;
343 2 : offset = 0;
344 : }
345 :
346 2 : if last_lsn != startpoint {
347 2 : info!("reached end of WAL at {}", last_lsn);
348 : } else {
349 0 : info!("no WAL to import at {}", last_lsn);
350 : }
351 :
352 2 : Ok(())
353 2 : }
354 :
355 0 : pub async fn import_basebackup_from_tar(
356 0 : tline: &Timeline,
357 0 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
358 0 : base_lsn: Lsn,
359 0 : ctx: &RequestContext,
360 0 : ) -> Result<()> {
361 0 : info!("importing base at {base_lsn}");
362 0 : let mut modification = tline.begin_modification(base_lsn);
363 0 : modification.init_empty()?;
364 :
365 0 : let mut pg_control: Option<ControlFileData> = None;
366 :
367 : // Import base
368 0 : let mut entries = Archive::new(reader).entries()?;
369 0 : while let Some(base_tar_entry) = entries.next().await {
370 0 : let mut entry = base_tar_entry?;
371 0 : let header = entry.header();
372 0 : let len = header.entry_size()? as usize;
373 0 : let file_path = header.path()?.into_owned();
374 0 :
375 0 : match header.entry_type() {
376 : tokio_tar::EntryType::Regular => {
377 0 : if let Some(res) =
378 0 : import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
379 0 : {
380 0 : // We found the pg_control file.
381 0 : pg_control = Some(res);
382 0 : }
383 0 : modification.flush(ctx).await?;
384 : }
385 : tokio_tar::EntryType::Directory => {
386 0 : debug!("directory {:?}", file_path);
387 : }
388 : _ => {
389 0 : bail!(
390 0 : "entry {} in backup tar archive is of unexpected type: {:?}",
391 0 : file_path.display(),
392 0 : header.entry_type()
393 0 : );
394 : }
395 : }
396 : }
397 :
398 : // sanity check: ensure that pg_control is loaded
399 0 : let _pg_control = pg_control.context("pg_control file not found")?;
400 :
401 0 : modification.commit(ctx).await?;
402 0 : Ok(())
403 0 : }
404 :
405 0 : pub async fn import_wal_from_tar(
406 0 : tline: &Timeline,
407 0 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
408 0 : start_lsn: Lsn,
409 0 : end_lsn: Lsn,
410 0 : ctx: &RequestContext,
411 0 : ) -> Result<()> {
412 0 : // Set up walingest mutable state
413 0 : let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
414 0 : let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
415 0 : let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
416 0 : let mut last_lsn = start_lsn;
417 0 : let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
418 0 : let shard = vec![*tline.get_shard_identity()];
419 0 :
420 0 : // Ingest wal until end_lsn
421 0 : info!("importing wal until {}", end_lsn);
422 0 : let mut pg_wal_tar = Archive::new(reader);
423 0 : let mut pg_wal_entries = pg_wal_tar.entries()?;
424 0 : while last_lsn <= end_lsn {
425 0 : let bytes = {
426 0 : let mut entry = pg_wal_entries
427 0 : .next()
428 0 : .await
429 0 : .ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
430 0 : let header = entry.header();
431 0 : let file_path = header.path()?.into_owned();
432 0 :
433 0 : match header.entry_type() {
434 : tokio_tar::EntryType::Regular => {
435 : // FIXME: assume postgresql tli 1 for now
436 0 : let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
437 0 : let file_name = file_path
438 0 : .file_name()
439 0 : .expect("missing wal filename")
440 0 : .to_string_lossy();
441 0 : ensure!(expected_filename == file_name);
442 :
443 0 : debug!("processing wal file {:?}", file_path);
444 0 : read_all_bytes(&mut entry).await?
445 : }
446 : tokio_tar::EntryType::Directory => {
447 0 : debug!("directory {:?}", file_path);
448 0 : continue;
449 : }
450 : _ => {
451 0 : bail!(
452 0 : "entry {} in WAL tar archive is of unexpected type: {:?}",
453 0 : file_path.display(),
454 0 : header.entry_type()
455 0 : );
456 : }
457 : }
458 : };
459 :
460 0 : waldecoder.feed_bytes(&bytes[offset..]);
461 0 :
462 0 : let mut modification = tline.begin_modification(last_lsn);
463 0 : while last_lsn <= end_lsn {
464 0 : if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
465 0 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
466 0 : recdata,
467 0 : &shard,
468 0 : lsn,
469 0 : tline.pg_version,
470 0 : )?
471 0 : .remove(tline.get_shard_identity())
472 0 : .unwrap();
473 0 :
474 0 : walingest
475 0 : .ingest_record(interpreted, &mut modification, ctx)
476 0 : .await?;
477 0 : modification.commit(ctx).await?;
478 0 : last_lsn = lsn;
479 0 :
480 0 : debug!("imported record at {} (end {})", lsn, end_lsn);
481 0 : }
482 : }
483 :
484 0 : debug!("imported records up to {}", last_lsn);
485 0 : segno += 1;
486 0 : offset = 0;
487 : }
488 :
489 0 : if last_lsn != start_lsn {
490 0 : info!("reached end of WAL at {}", last_lsn);
491 : } else {
492 0 : info!("there was no WAL to import at {}", last_lsn);
493 : }
494 :
495 : // Log any extra unused files
496 0 : while let Some(e) = pg_wal_entries.next().await {
497 0 : let entry = e?;
498 0 : let header = entry.header();
499 0 : let file_path = header.path()?.into_owned();
500 0 : info!("skipping {:?}", file_path);
501 : }
502 :
503 0 : Ok(())
504 0 : }
505 :
506 1930 : async fn import_file(
507 1930 : modification: &mut DatadirModification<'_>,
508 1930 : file_path: &Path,
509 1930 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
510 1930 : len: usize,
511 1930 : ctx: &RequestContext,
512 1930 : ) -> Result<Option<ControlFileData>> {
513 1930 : let file_name = match file_path.file_name() {
514 1930 : Some(name) => name.to_string_lossy(),
515 0 : None => return Ok(None),
516 : };
517 :
518 1930 : if file_name.starts_with('.') {
519 : // tar archives on macOs, created without COPYFILE_DISABLE=1 env var
520 : // will contain "fork files", skip them.
521 0 : return Ok(None);
522 1930 : }
523 1930 :
524 1930 : if file_path.starts_with("global") {
525 120 : let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
526 120 : let dbnode = 0;
527 120 :
528 120 : match file_name.as_ref() {
529 120 : "pg_control" => {
530 2 : let bytes = read_all_bytes(reader).await?;
531 :
532 : // Extract the checkpoint record and import it separately.
533 2 : let pg_control = ControlFileData::decode(&bytes[..])?;
534 2 : let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
535 2 : modification.put_checkpoint(checkpoint_bytes)?;
536 2 : debug!("imported control file");
537 :
538 : // Import it as ControlFile
539 2 : modification.put_control_file(bytes)?;
540 2 : return Ok(Some(pg_control));
541 : }
542 118 : "pg_filenode.map" => {
543 2 : let bytes = read_all_bytes(reader).await?;
544 2 : modification
545 2 : .put_relmap_file(spcnode, dbnode, bytes, ctx)
546 2 : .await?;
547 2 : debug!("imported relmap file")
548 : }
549 116 : "PG_VERSION" => {
550 0 : debug!("ignored PG_VERSION file");
551 : }
552 : _ => {
553 116 : import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
554 116 : debug!("imported rel creation");
555 : }
556 : }
557 1810 : } else if file_path.starts_with("base") {
558 1788 : let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
559 1788 : let dbnode: u32 = file_path
560 1788 : .iter()
561 1788 : .nth(1)
562 1788 : .expect("invalid file path, expected dbnode")
563 1788 : .to_string_lossy()
564 1788 : .parse()?;
565 :
566 1788 : match file_name.as_ref() {
567 1788 : "pg_filenode.map" => {
568 6 : let bytes = read_all_bytes(reader).await?;
569 6 : modification
570 6 : .put_relmap_file(spcnode, dbnode, bytes, ctx)
571 6 : .await?;
572 6 : debug!("imported relmap file")
573 : }
574 1782 : "PG_VERSION" => {
575 6 : debug!("ignored PG_VERSION file");
576 : }
577 : _ => {
578 1776 : import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
579 1776 : debug!("imported rel creation");
580 : }
581 : }
582 22 : } else if file_path.starts_with("pg_xact") {
583 2 : let slru = SlruKind::Clog;
584 2 :
585 2 : if modification.tline.tenant_shard_id.is_shard_zero() {
586 2 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
587 2 : debug!("imported clog slru");
588 0 : }
589 20 : } else if file_path.starts_with("pg_multixact/offsets") {
590 2 : let slru = SlruKind::MultiXactOffsets;
591 2 :
592 2 : if modification.tline.tenant_shard_id.is_shard_zero() {
593 2 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
594 2 : debug!("imported multixact offsets slru");
595 0 : }
596 18 : } else if file_path.starts_with("pg_multixact/members") {
597 2 : let slru = SlruKind::MultiXactMembers;
598 2 :
599 2 : if modification.tline.tenant_shard_id.is_shard_zero() {
600 2 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
601 2 : debug!("imported multixact members slru");
602 0 : }
603 16 : } else if file_path.starts_with("pg_twophase") {
604 0 : let bytes = read_all_bytes(reader).await?;
605 :
606 : // In PostgreSQL v17, this is a 64-bit FullTransactionid. In previous versions,
607 : // it's a 32-bit TransactionId, which fits in u64 anyway.
608 0 : let xid = u64::from_str_radix(file_name.as_ref(), 16)?;
609 0 : modification
610 0 : .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
611 0 : .await?;
612 0 : debug!("imported twophase file");
613 16 : } else if file_path.starts_with("pg_wal") {
614 0 : debug!("found wal file in base section. ignore it");
615 16 : } else if file_path.starts_with("zenith.signal") {
616 : // Parse zenith signal file to set correct previous LSN
617 0 : let bytes = read_all_bytes(reader).await?;
618 : // zenith.signal format is "PREV LSN: prev_lsn"
619 : // TODO write serialization and deserialization in the same place.
620 0 : let zenith_signal = std::str::from_utf8(&bytes)?.trim();
621 0 : let prev_lsn = match zenith_signal {
622 0 : "PREV LSN: none" => Lsn(0),
623 0 : "PREV LSN: invalid" => Lsn(0),
624 0 : other => {
625 0 : let split = other.split(':').collect::<Vec<_>>();
626 0 : split[1]
627 0 : .trim()
628 0 : .parse::<Lsn>()
629 0 : .context("can't parse zenith.signal")?
630 : }
631 : };
632 :
633 : // zenith.signal is not necessarily the last file, that we handle
634 : // but it is ok to call `finish_write()`, because final `modification.commit()`
635 : // will update lsn once more to the final one.
636 0 : let writer = modification.tline.writer().await;
637 0 : writer.finish_write(prev_lsn);
638 0 :
639 0 : debug!("imported zenith signal {}", prev_lsn);
640 16 : } else if file_path.starts_with("pg_tblspc") {
641 : // TODO Backups exported from neon won't have pg_tblspc, but we will need
642 : // this to import arbitrary postgres databases.
643 0 : bail!("Importing pg_tblspc is not implemented");
644 : } else {
645 16 : debug!(
646 0 : "ignoring unrecognized file \"{}\" in tar archive",
647 0 : file_path.display()
648 : );
649 : }
650 :
651 1928 : Ok(None)
652 1930 : }
653 :
654 10 : async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
655 10 : let mut buf: Vec<u8> = vec![];
656 10 : reader.read_to_end(&mut buf).await?;
657 10 : Ok(Bytes::from(buf))
658 10 : }
|