TLA Line data Source code
1 : //!
2 : //! Import data and WAL from a PostgreSQL data directory and WAL segments into
3 : //! a neon Timeline.
4 : //!
5 : use std::path::{Path, PathBuf};
6 :
7 : use anyhow::{bail, ensure, Context, Result};
8 : use bytes::Bytes;
9 : use camino::Utf8Path;
10 : use futures::StreamExt;
11 : use tokio::io::{AsyncRead, AsyncReadExt};
12 : use tokio_tar::Archive;
13 : use tracing::*;
14 : use walkdir::WalkDir;
15 :
16 : use crate::context::RequestContext;
17 : use crate::pgdatadir_mapping::*;
18 : use crate::tenant::Timeline;
19 : use crate::walingest::WalIngest;
20 : use crate::walrecord::DecodedWALRecord;
21 : use pageserver_api::reltag::{RelTag, SlruKind};
22 : use postgres_ffi::pg_constants;
23 : use postgres_ffi::relfile_utils::*;
24 : use postgres_ffi::waldecoder::WalStreamDecoder;
25 : use postgres_ffi::ControlFileData;
26 : use postgres_ffi::DBState_DB_SHUTDOWNED;
27 : use postgres_ffi::Oid;
28 : use postgres_ffi::XLogFileName;
29 : use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
30 : use utils::lsn::Lsn;
31 :
32 : // Returns checkpoint LSN from controlfile
33 CBC 575 : pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
34 575 : // Read control file to extract the LSN
35 575 : let controlfile_path = path.join("global").join("pg_control");
36 575 : let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?;
37 575 : let lsn = controlfile.checkPoint;
38 575 :
39 575 : Ok(Lsn(lsn))
40 575 : }
41 :
42 : ///
43 : /// Import all relation data pages from local disk into the repository.
44 : ///
45 : /// This is currently only used to import a cluster freshly created by initdb.
46 : /// The code that deals with the checkpoint would not work right if the
47 : /// cluster was not shut down cleanly.
48 574 : pub async fn import_timeline_from_postgres_datadir(
49 574 : tline: &Timeline,
50 574 : pgdata_path: &Utf8Path,
51 574 : pgdata_lsn: Lsn,
52 574 : ctx: &RequestContext,
53 574 : ) -> Result<()> {
54 574 : let mut pg_control: Option<ControlFileData> = None;
55 574 :
56 574 : // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
57 574 : // Then fishing out pg_control would be unnecessary
58 574 : let mut modification = tline.begin_modification(pgdata_lsn);
59 574 : modification.init_empty()?;
60 :
61 : // Import all but pg_wal
62 574 : let all_but_wal = WalkDir::new(pgdata_path)
63 574 : .into_iter()
64 559076 : .filter_entry(|entry| !entry.path().ends_with("pg_wal"));
65 559076 : for entry in all_but_wal {
66 558502 : let entry = entry?;
67 558502 : let metadata = entry.metadata().expect("error getting dir entry metadata");
68 558502 : if metadata.is_file() {
69 544726 : let absolute_path = entry.path();
70 544726 : let relative_path = absolute_path.strip_prefix(pgdata_path)?;
71 :
72 544726 : let mut file = tokio::fs::File::open(absolute_path).await?;
73 544726 : let len = metadata.len() as usize;
74 574 : if let Some(control_file) =
75 2302630 : import_file(&mut modification, relative_path, &mut file, len, ctx).await?
76 574 : {
77 574 : pg_control = Some(control_file);
78 544152 : }
79 544726 : modification.flush(ctx).await?;
80 13776 : }
81 : }
82 :
83 : // We're done importing all the data files.
84 49938 : modification.commit(ctx).await?;
85 :
86 : // We expect the Postgres server to be shut down cleanly.
87 574 : let pg_control = pg_control.context("pg_control file not found")?;
88 574 : ensure!(
89 574 : pg_control.state == DBState_DB_SHUTDOWNED,
90 UBC 0 : "Postgres cluster was not shut down cleanly"
91 : );
92 CBC 574 : ensure!(
93 574 : pg_control.checkPointCopy.redo == pgdata_lsn.0,
94 UBC 0 : "unexpected checkpoint REDO pointer"
95 : );
96 :
97 : // Import WAL. This is needed even when starting from a shutdown checkpoint, because
98 : // this reads the checkpoint record itself, advancing the tip of the timeline to
99 : // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
100 CBC 574 : import_wal(
101 574 : &pgdata_path.join("pg_wal"),
102 574 : tline,
103 574 : Lsn(pg_control.checkPointCopy.redo),
104 574 : pgdata_lsn,
105 574 : ctx,
106 574 : )
107 UBC 0 : .await?;
108 :
109 CBC 574 : Ok(())
110 574 : }
111 :
112 : // subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
113 538130 : async fn import_rel(
114 538130 : modification: &mut DatadirModification<'_>,
115 538130 : path: &Path,
116 538130 : spcoid: Oid,
117 538130 : dboid: Oid,
118 538130 : reader: &mut (impl AsyncRead + Unpin),
119 538130 : len: usize,
120 538130 : ctx: &RequestContext,
121 538130 : ) -> anyhow::Result<()> {
122 : // Does it look like a relation file?
123 UBC 0 : trace!("importing rel file {}", path.display());
124 :
125 CBC 538130 : let filename = &path
126 538130 : .file_name()
127 538130 : .expect("missing rel filename")
128 538130 : .to_string_lossy();
129 538130 : let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
130 UBC 0 : warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
131 0 : e
132 CBC 538130 : })?;
133 :
134 538130 : let mut buf: [u8; 8192] = [0u8; 8192];
135 538130 :
136 538130 : ensure!(len % BLCKSZ as usize == 0);
137 538130 : let nblocks = len / BLCKSZ as usize;
138 538130 :
139 538130 : let rel = RelTag {
140 538130 : spcnode: spcoid,
141 538130 : dbnode: dboid,
142 538130 : relnode,
143 538130 : forknum,
144 538130 : };
145 538130 :
146 538130 : let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
147 :
148 : // Call put_rel_creation for every segment of the relation,
149 : // because there is no guarantee about the order in which we are processing segments.
150 : // ignore "relation already exists" error
151 : //
152 : // FIXME: Keep track of which relations we've already created?
153 : // https://github.com/neondatabase/neon/issues/3309
154 538130 : if let Err(e) = modification
155 538130 : .put_rel_creation(rel, nblocks as u32, ctx)
156 UBC 0 : .await
157 : {
158 0 : match e {
159 : RelationError::AlreadyExists => {
160 0 : debug!("Relation {} already exist. We must be extending it.", rel)
161 : }
162 0 : _ => return Err(e.into()),
163 : }
164 CBC 538130 : }
165 :
166 : loop {
167 2335151 : let r = reader.read_exact(&mut buf).await;
168 2335151 : match r {
169 : Ok(_) => {
170 1797021 : modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
171 : }
172 :
173 : // TODO: UnexpectedEof is expected
174 538130 : Err(err) => match err.kind() {
175 : std::io::ErrorKind::UnexpectedEof => {
176 : // reached EOF. That's expected.
177 538130 : let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
178 538130 : ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
179 538130 : break;
180 : }
181 : _ => {
182 UBC 0 : bail!("error reading file {}: {:#}", path.display(), err);
183 : }
184 : },
185 : };
186 CBC 1797021 : blknum += 1;
187 : }
188 :
189 : // Update relation size
190 : //
191 : // If we process rel segments out of order,
192 : // put_rel_extend will skip the update.
193 538130 : modification.put_rel_extend(rel, blknum, ctx).await?;
194 :
195 538130 : Ok(())
196 538130 : }
197 :
198 : /// Import an SLRU segment file
199 : ///
200 1734 : async fn import_slru(
201 1734 : modification: &mut DatadirModification<'_>,
202 1734 : slru: SlruKind,
203 1734 : path: &Path,
204 1734 : reader: &mut (impl AsyncRead + Unpin),
205 1734 : len: usize,
206 1734 : ctx: &RequestContext,
207 1734 : ) -> anyhow::Result<()> {
208 1734 : info!("importing slru file {path:?}");
209 :
210 1734 : let mut buf: [u8; 8192] = [0u8; 8192];
211 1734 : let filename = &path
212 1734 : .file_name()
213 1734 : .with_context(|| format!("missing slru filename for path {path:?}"))?
214 1734 : .to_string_lossy();
215 1734 : let segno = u32::from_str_radix(filename, 16)?;
216 :
217 1734 : ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
218 1734 : let nblocks = len / BLCKSZ as usize;
219 1734 :
220 1734 : ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
221 :
222 1734 : modification
223 1734 : .put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
224 UBC 0 : .await?;
225 :
226 CBC 1734 : let mut rpageno = 0;
227 : loop {
228 3468 : let r = reader.read_exact(&mut buf).await;
229 3468 : match r {
230 : Ok(_) => {
231 1734 : modification.put_slru_page_image(
232 1734 : slru,
233 1734 : segno,
234 1734 : rpageno,
235 1734 : Bytes::copy_from_slice(&buf),
236 1734 : )?;
237 : }
238 :
239 : // TODO: UnexpectedEof is expected
240 1734 : Err(err) => match err.kind() {
241 : std::io::ErrorKind::UnexpectedEof => {
242 : // reached EOF. That's expected.
243 1734 : ensure!(rpageno == nblocks as u32, "unexpected EOF");
244 1734 : break;
245 : }
246 : _ => {
247 UBC 0 : bail!("error reading file {}: {:#}", path.display(), err);
248 : }
249 : },
250 : };
251 CBC 1734 : rpageno += 1;
252 : }
253 :
254 1734 : Ok(())
255 1734 : }
256 :
257 : /// Scan PostgreSQL WAL files in given directory and load all records between
258 : /// 'startpoint' and 'endpoint' into the repository.
259 574 : async fn import_wal(
260 574 : walpath: &Utf8Path,
261 574 : tline: &Timeline,
262 574 : startpoint: Lsn,
263 574 : endpoint: Lsn,
264 574 : ctx: &RequestContext,
265 574 : ) -> anyhow::Result<()> {
266 574 : let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
267 574 :
268 574 : let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
269 574 : let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
270 574 : let mut last_lsn = startpoint;
271 :
272 574 : let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
273 :
274 1148 : while last_lsn <= endpoint {
275 : // FIXME: assume postgresql tli 1 for now
276 574 : let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
277 574 : let mut buf = Vec::new();
278 574 :
279 574 : // Read local file
280 574 : let mut path = walpath.join(&filename);
281 574 :
282 574 : // It could be as .partial
283 574 : if !PathBuf::from(&path).exists() {
284 UBC 0 : path = walpath.join(filename + ".partial");
285 CBC 574 : }
286 :
287 : // Slurp the WAL file
288 574 : let mut file = std::fs::File::open(&path)?;
289 :
290 574 : if offset > 0 {
291 : use std::io::Seek;
292 574 : file.seek(std::io::SeekFrom::Start(offset as u64))?;
293 UBC 0 : }
294 :
295 : use std::io::Read;
296 CBC 574 : let nread = file.read_to_end(&mut buf)?;
297 574 : if nread != WAL_SEGMENT_SIZE - offset {
298 : // Maybe allow this for .partial files?
299 UBC 0 : error!("read only {} bytes from WAL file", nread);
300 CBC 574 : }
301 :
302 574 : waldecoder.feed_bytes(&buf);
303 574 :
304 574 : let mut nrecords = 0;
305 574 : let mut modification = tline.begin_modification(endpoint);
306 574 : let mut decoded = DecodedWALRecord::default();
307 1148 : while last_lsn <= endpoint {
308 574 : if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
309 574 : walingest
310 574 : .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
311 UBC 0 : .await?;
312 CBC 574 : last_lsn = lsn;
313 574 :
314 574 : nrecords += 1;
315 :
316 UBC 0 : trace!("imported record at {} (end {})", lsn, endpoint);
317 0 : }
318 : }
319 :
320 0 : debug!("imported {} records up to {}", nrecords, last_lsn);
321 :
322 CBC 574 : segno += 1;
323 574 : offset = 0;
324 : }
325 :
326 574 : if last_lsn != startpoint {
327 574 : info!("reached end of WAL at {}", last_lsn);
328 : } else {
329 UBC 0 : info!("no WAL to import at {}", last_lsn);
330 : }
331 :
332 CBC 574 : Ok(())
333 574 : }
334 :
335 5 : pub async fn import_basebackup_from_tar(
336 5 : tline: &Timeline,
337 5 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
338 5 : base_lsn: Lsn,
339 5 : ctx: &RequestContext,
340 5 : ) -> Result<()> {
341 5 : info!("importing base at {base_lsn}");
342 5 : let mut modification = tline.begin_modification(base_lsn);
343 5 : modification.init_empty()?;
344 :
345 5 : let mut pg_control: Option<ControlFileData> = None;
346 :
347 : // Import base
348 5 : let mut entries = Archive::new(reader).entries()?;
349 3917 : while let Some(base_tar_entry) = entries.next().await {
350 3912 : let mut entry = base_tar_entry?;
351 3912 : let header = entry.header();
352 3912 : let len = header.entry_size()? as usize;
353 3912 : let file_path = header.path()?.into_owned();
354 3912 :
355 3912 : match header.entry_type() {
356 : tokio_tar::EntryType::Regular => {
357 3 : if let Some(res) =
358 3812 : import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
359 3 : {
360 3 : // We found the pg_control file.
361 3 : pg_control = Some(res);
362 3809 : }
363 3812 : modification.flush(ctx).await?;
364 : }
365 : tokio_tar::EntryType::Directory => {
366 UBC 0 : debug!("directory {:?}", file_path);
367 : }
368 : _ => {
369 0 : bail!(
370 0 : "entry {} in backup tar archive is of unexpected type: {:?}",
371 0 : file_path.display(),
372 0 : header.entry_type()
373 0 : );
374 : }
375 : }
376 : }
377 :
378 : // sanity check: ensure that pg_control is loaded
379 CBC 5 : let _pg_control = pg_control.context("pg_control file not found")?;
380 :
381 382 : modification.commit(ctx).await?;
382 3 : Ok(())
383 5 : }
384 :
385 2 : pub async fn import_wal_from_tar(
386 2 : tline: &Timeline,
387 2 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
388 2 : start_lsn: Lsn,
389 2 : end_lsn: Lsn,
390 2 : ctx: &RequestContext,
391 2 : ) -> Result<()> {
392 2 : // Set up walingest mutable state
393 2 : let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
394 2 : let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
395 2 : let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
396 2 : let mut last_lsn = start_lsn;
397 2 : let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
398 :
399 : // Ingest wal until end_lsn
400 2 : info!("importing wal until {}", end_lsn);
401 2 : let mut pg_wal_tar = Archive::new(reader);
402 2 : let mut pg_wal_entries = pg_wal_tar.entries()?;
403 4 : while last_lsn <= end_lsn {
404 2 : let bytes = {
405 2 : let mut entry = pg_wal_entries
406 2 : .next()
407 2 : .await
408 2 : .ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
409 2 : let header = entry.header();
410 2 : let file_path = header.path()?.into_owned();
411 2 :
412 2 : match header.entry_type() {
413 : tokio_tar::EntryType::Regular => {
414 : // FIXME: assume postgresql tli 1 for now
415 2 : let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
416 2 : let file_name = file_path
417 2 : .file_name()
418 2 : .expect("missing wal filename")
419 2 : .to_string_lossy();
420 2 : ensure!(expected_filename == file_name);
421 :
422 UBC 0 : debug!("processing wal file {:?}", file_path);
423 CBC 240 : read_all_bytes(&mut entry).await?
424 : }
425 : tokio_tar::EntryType::Directory => {
426 UBC 0 : debug!("directory {:?}", file_path);
427 0 : continue;
428 : }
429 : _ => {
430 0 : bail!(
431 0 : "entry {} in WAL tar archive is of unexpected type: {:?}",
432 0 : file_path.display(),
433 0 : header.entry_type()
434 0 : );
435 : }
436 : }
437 : };
438 :
439 CBC 2 : waldecoder.feed_bytes(&bytes[offset..]);
440 2 :
441 2 : let mut modification = tline.begin_modification(end_lsn);
442 2 : let mut decoded = DecodedWALRecord::default();
443 10 : while last_lsn <= end_lsn {
444 8 : if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
445 8 : walingest
446 8 : .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
447 UBC 0 : .await?;
448 CBC 8 : last_lsn = lsn;
449 :
450 UBC 0 : debug!("imported record at {} (end {})", lsn, end_lsn);
451 0 : }
452 : }
453 :
454 0 : debug!("imported records up to {}", last_lsn);
455 CBC 2 : segno += 1;
456 2 : offset = 0;
457 : }
458 :
459 2 : if last_lsn != start_lsn {
460 2 : info!("reached end of WAL at {}", last_lsn);
461 : } else {
462 UBC 0 : info!("there was no WAL to import at {}", last_lsn);
463 : }
464 :
465 : // Log any extra unused files
466 CBC 2 : while let Some(e) = pg_wal_entries.next().await {
467 UBC 0 : let entry = e?;
468 0 : let header = entry.header();
469 0 : let file_path = header.path()?.into_owned();
470 0 : info!("skipping {:?}", file_path);
471 : }
472 :
473 CBC 2 : Ok(())
474 2 : }
475 :
476 548538 : async fn import_file(
477 548538 : modification: &mut DatadirModification<'_>,
478 548538 : file_path: &Path,
479 548538 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
480 548538 : len: usize,
481 548538 : ctx: &RequestContext,
482 548538 : ) -> Result<Option<ControlFileData>> {
483 548538 : let file_name = match file_path.file_name() {
484 548538 : Some(name) => name.to_string_lossy(),
485 UBC 0 : None => return Ok(None),
486 : };
487 :
488 CBC 548538 : if file_name.starts_with('.') {
489 : // tar archives on macOs, created without COPYFILE_DISABLE=1 env var
490 : // will contain "fork files", skip them.
491 UBC 0 : return Ok(None);
492 CBC 548538 : }
493 548538 :
494 548538 : if file_path.starts_with("global") {
495 32945 : let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
496 32945 : let dbnode = 0;
497 32945 :
498 32945 : match file_name.as_ref() {
499 32945 : "pg_control" => {
500 5668 : let bytes = read_all_bytes(reader).await?;
501 :
502 : // Extract the checkpoint record and import it separately.
503 577 : let pg_control = ControlFileData::decode(&bytes[..])?;
504 577 : let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
505 577 : modification.put_checkpoint(checkpoint_bytes)?;
506 UBC 0 : debug!("imported control file");
507 :
508 : // Import it as ControlFile
509 CBC 577 : modification.put_control_file(bytes)?;
510 577 : return Ok(Some(pg_control));
511 : }
512 32368 : "pg_filenode.map" => {
513 3408 : let bytes = read_all_bytes(reader).await?;
514 578 : modification
515 578 : .put_relmap_file(spcnode, dbnode, bytes, ctx)
516 UBC 0 : .await?;
517 0 : debug!("imported relmap file")
518 : }
519 CBC 31790 : "PG_VERSION" => {
520 UBC 0 : debug!("ignored PG_VERSION file");
521 : }
522 : _ => {
523 CBC 68482 : import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
524 UBC 0 : debug!("imported rel creation");
525 : }
526 : }
527 CBC 515593 : } else if file_path.starts_with("base") {
528 509808 : let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
529 509808 : let dbnode: u32 = file_path
530 509808 : .iter()
531 509808 : .nth(1)
532 509808 : .expect("invalid file path, expected dbnode")
533 509808 : .to_string_lossy()
534 509808 : .parse()?;
535 :
536 509808 : match file_name.as_ref() {
537 509808 : "pg_filenode.map" => {
538 10183 : let bytes = read_all_bytes(reader).await?;
539 1734 : modification
540 1734 : .put_relmap_file(spcnode, dbnode, bytes, ctx)
541 UBC 0 : .await?;
542 0 : debug!("imported relmap file")
543 : }
544 CBC 508074 : "PG_VERSION" => {
545 UBC 0 : debug!("ignored PG_VERSION file");
546 : }
547 : _ => {
548 CBC 2211852 : import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
549 UBC 0 : debug!("imported rel creation");
550 : }
551 : }
552 CBC 5785 : } else if file_path.starts_with("pg_xact") {
553 578 : let slru = SlruKind::Clog;
554 578 :
555 1137 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
556 UBC 0 : debug!("imported clog slru");
557 CBC 5207 : } else if file_path.starts_with("pg_multixact/offsets") {
558 578 : let slru = SlruKind::MultiXactOffsets;
559 578 :
560 1148 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
561 UBC 0 : debug!("imported multixact offsets slru");
562 CBC 4629 : } else if file_path.starts_with("pg_multixact/members") {
563 578 : let slru = SlruKind::MultiXactMembers;
564 578 :
565 1148 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
566 UBC 0 : debug!("imported multixact members slru");
567 CBC 4051 : } else if file_path.starts_with("pg_twophase") {
568 UBC 0 : let xid = u32::from_str_radix(file_name.as_ref(), 16)?;
569 :
570 0 : let bytes = read_all_bytes(reader).await?;
571 0 : modification
572 0 : .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
573 0 : .await?;
574 0 : debug!("imported twophase file");
575 CBC 4051 : } else if file_path.starts_with("pg_wal") {
576 UBC 0 : debug!("found wal file in base section. ignore it");
577 CBC 4050 : } else if file_path.starts_with("zenith.signal") {
578 : // Parse zenith signal file to set correct previous LSN
579 1 : let bytes = read_all_bytes(reader).await?;
580 : // zenith.signal format is "PREV LSN: prev_lsn"
581 : // TODO write serialization and deserialization in the same place.
582 1 : let zenith_signal = std::str::from_utf8(&bytes)?.trim();
583 1 : let prev_lsn = match zenith_signal {
584 1 : "PREV LSN: none" => Lsn(0),
585 1 : "PREV LSN: invalid" => Lsn(0),
586 1 : other => {
587 1 : let split = other.split(':').collect::<Vec<_>>();
588 1 : split[1]
589 1 : .trim()
590 1 : .parse::<Lsn>()
591 1 : .context("can't parse zenith.signal")?
592 : }
593 : };
594 :
595 : // zenith.signal is not necessarily the last file, that we handle
596 : // but it is ok to call `finish_write()`, because final `modification.commit()`
597 : // will update lsn once more to the final one.
598 1 : let writer = modification.tline.writer().await;
599 1 : writer.finish_write(prev_lsn);
600 :
601 UBC 0 : debug!("imported zenith signal {}", prev_lsn);
602 CBC 4049 : } else if file_path.starts_with("pg_tblspc") {
603 : // TODO Backups exported from neon won't have pg_tblspc, but we will need
604 : // this to import arbitrary postgres databases.
605 UBC 0 : bail!("Importing pg_tblspc is not implemented");
606 : } else {
607 0 : debug!(
608 0 : "ignoring unrecognized file \"{}\" in tar archive",
609 0 : file_path.display()
610 0 : );
611 : }
612 :
613 CBC 547961 : Ok(None)
614 548538 : }
615 :
616 2892 : async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
617 2892 : let mut buf: Vec<u8> = vec![];
618 19499 : reader.read_to_end(&mut buf).await?;
619 2892 : Ok(Bytes::from(buf))
620 2892 : }
|