Line data Source code
1 : //!
2 : //! Import data and WAL from a PostgreSQL data directory and WAL segments into
3 : //! a neon Timeline.
4 : //!
5 : use std::path::{Path, PathBuf};
6 :
7 : use anyhow::{bail, ensure, Context, Result};
8 : use bytes::Bytes;
9 : use camino::Utf8Path;
10 : use futures::StreamExt;
11 : use pageserver_api::key::rel_block_to_key;
12 : use tokio::io::{AsyncRead, AsyncReadExt};
13 : use tokio_tar::Archive;
14 : use tracing::*;
15 : use walkdir::WalkDir;
16 :
17 : use crate::context::RequestContext;
18 : use crate::metrics::WAL_INGEST;
19 : use crate::pgdatadir_mapping::*;
20 : use crate::tenant::Timeline;
21 : use crate::walingest::WalIngest;
22 : use crate::walrecord::decode_wal_record;
23 : use crate::walrecord::DecodedWALRecord;
24 : use pageserver_api::reltag::{RelTag, SlruKind};
25 : use postgres_ffi::pg_constants;
26 : use postgres_ffi::relfile_utils::*;
27 : use postgres_ffi::waldecoder::WalStreamDecoder;
28 : use postgres_ffi::ControlFileData;
29 : use postgres_ffi::DBState_DB_SHUTDOWNED;
30 : use postgres_ffi::Oid;
31 : use postgres_ffi::XLogFileName;
32 : use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
33 : use utils::lsn::Lsn;
34 :
35 : // Returns checkpoint LSN from controlfile
36 6 : pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
37 6 : // Read control file to extract the LSN
38 6 : let controlfile_path = path.join("global").join("pg_control");
39 6 : let controlfile_buf = std::fs::read(&controlfile_path)
40 6 : .with_context(|| format!("reading controlfile: {controlfile_path}"))?;
41 6 : let controlfile = ControlFileData::decode(&controlfile_buf)?;
42 6 : let lsn = controlfile.checkPoint;
43 6 :
44 6 : Ok(Lsn(lsn))
45 6 : }
46 :
47 : ///
48 : /// Import all relation data pages from local disk into the repository.
49 : ///
50 : /// This is currently only used to import a cluster freshly created by initdb.
51 : /// The code that deals with the checkpoint would not work right if the
52 : /// cluster was not shut down cleanly.
53 6 : pub async fn import_timeline_from_postgres_datadir(
54 6 : tline: &Timeline,
55 6 : pgdata_path: &Utf8Path,
56 6 : pgdata_lsn: Lsn,
57 6 : ctx: &RequestContext,
58 6 : ) -> Result<()> {
59 6 : let mut pg_control: Option<ControlFileData> = None;
60 6 :
61 6 : // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
62 6 : // Then fishing out pg_control would be unnecessary
63 6 : let mut modification = tline.begin_modification(pgdata_lsn);
64 6 : modification.init_empty()?;
65 :
66 : // Import all but pg_wal
67 6 : let all_but_wal = WalkDir::new(pgdata_path)
68 6 : .into_iter()
69 5940 : .filter_entry(|entry| !entry.path().ends_with("pg_wal"));
70 5940 : for entry in all_but_wal {
71 5934 : let entry = entry?;
72 5934 : let metadata = entry.metadata().expect("error getting dir entry metadata");
73 5934 : if metadata.is_file() {
74 5790 : let absolute_path = entry.path();
75 5790 : let relative_path = absolute_path.strip_prefix(pgdata_path)?;
76 :
77 5790 : let mut file = tokio::fs::File::open(absolute_path).await?;
78 5790 : let len = metadata.len() as usize;
79 6 : if let Some(control_file) =
80 21164 : import_file(&mut modification, relative_path, &mut file, len, ctx).await?
81 6 : {
82 6 : pg_control = Some(control_file);
83 5784 : }
84 5790 : modification.flush(ctx).await?;
85 144 : }
86 : }
87 :
88 : // We're done importing all the data files.
89 1044 : modification.commit(ctx).await?;
90 :
91 : // We expect the Postgres server to be shut down cleanly.
92 6 : let pg_control = pg_control.context("pg_control file not found")?;
93 6 : ensure!(
94 6 : pg_control.state == DBState_DB_SHUTDOWNED,
95 0 : "Postgres cluster was not shut down cleanly"
96 : );
97 6 : ensure!(
98 6 : pg_control.checkPointCopy.redo == pgdata_lsn.0,
99 0 : "unexpected checkpoint REDO pointer"
100 : );
101 :
102 : // Import WAL. This is needed even when starting from a shutdown checkpoint, because
103 : // this reads the checkpoint record itself, advancing the tip of the timeline to
104 : // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
105 6 : import_wal(
106 6 : &pgdata_path.join("pg_wal"),
107 6 : tline,
108 6 : Lsn(pg_control.checkPointCopy.redo),
109 6 : pgdata_lsn,
110 6 : ctx,
111 6 : )
112 3 : .await?;
113 :
114 6 : Ok(())
115 6 : }
116 :
117 : // subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
118 5676 : async fn import_rel(
119 5676 : modification: &mut DatadirModification<'_>,
120 5676 : path: &Path,
121 5676 : spcoid: Oid,
122 5676 : dboid: Oid,
123 5676 : reader: &mut (impl AsyncRead + Unpin),
124 5676 : len: usize,
125 5676 : ctx: &RequestContext,
126 5676 : ) -> anyhow::Result<()> {
127 5676 : // Does it look like a relation file?
128 5676 : trace!("importing rel file {}", path.display());
129 :
130 5676 : let filename = &path
131 5676 : .file_name()
132 5676 : .expect("missing rel filename")
133 5676 : .to_string_lossy();
134 5676 : let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
135 0 : warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
136 0 : e
137 5676 : })?;
138 :
139 5676 : let mut buf: [u8; 8192] = [0u8; 8192];
140 5676 :
141 5676 : ensure!(len % BLCKSZ as usize == 0);
142 5676 : let nblocks = len / BLCKSZ as usize;
143 5676 :
144 5676 : let rel = RelTag {
145 5676 : spcnode: spcoid,
146 5676 : dbnode: dboid,
147 5676 : relnode,
148 5676 : forknum,
149 5676 : };
150 5676 :
151 5676 : let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
152 :
153 : // Call put_rel_creation for every segment of the relation,
154 : // because there is no guarantee about the order in which we are processing segments.
155 : // ignore "relation already exists" error
156 : //
157 : // FIXME: Keep track of which relations we've already created?
158 : // https://github.com/neondatabase/neon/issues/3309
159 5676 : if let Err(e) = modification
160 5676 : .put_rel_creation(rel, nblocks as u32, ctx)
161 0 : .await
162 : {
163 0 : match e {
164 : RelationError::AlreadyExists => {
165 0 : debug!("Relation {} already exist. We must be extending it.", rel)
166 : }
167 0 : _ => return Err(e.into()),
168 : }
169 5676 : }
170 :
171 : loop {
172 21996 : let r = reader.read_exact(&mut buf).await;
173 21996 : match r {
174 : Ok(_) => {
175 16320 : let key = rel_block_to_key(rel, blknum);
176 16320 : if modification.tline.get_shard_identity().is_key_local(&key) {
177 16320 : modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
178 0 : }
179 : }
180 :
181 : // TODO: UnexpectedEof is expected
182 5676 : Err(err) => match err.kind() {
183 : std::io::ErrorKind::UnexpectedEof => {
184 : // reached EOF. That's expected.
185 5676 : let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
186 5676 : ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
187 5676 : break;
188 : }
189 : _ => {
190 0 : bail!("error reading file {}: {:#}", path.display(), err);
191 : }
192 : },
193 : };
194 16320 : blknum += 1;
195 : }
196 :
197 : // Update relation size
198 : //
199 : // If we process rel segments out of order,
200 : // put_rel_extend will skip the update.
201 5676 : modification.put_rel_extend(rel, blknum, ctx).await?;
202 :
203 5676 : Ok(())
204 5676 : }
205 :
206 : /// Import an SLRU segment file
207 : ///
208 18 : async fn import_slru(
209 18 : modification: &mut DatadirModification<'_>,
210 18 : slru: SlruKind,
211 18 : path: &Path,
212 18 : reader: &mut (impl AsyncRead + Unpin),
213 18 : len: usize,
214 18 : ctx: &RequestContext,
215 18 : ) -> anyhow::Result<()> {
216 18 : info!("importing slru file {path:?}");
217 :
218 18 : let mut buf: [u8; 8192] = [0u8; 8192];
219 18 : let filename = &path
220 18 : .file_name()
221 18 : .with_context(|| format!("missing slru filename for path {path:?}"))?
222 18 : .to_string_lossy();
223 18 : let segno = u32::from_str_radix(filename, 16)?;
224 :
225 18 : ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
226 18 : let nblocks = len / BLCKSZ as usize;
227 18 :
228 18 : ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
229 :
230 18 : modification
231 18 : .put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
232 0 : .await?;
233 :
234 18 : let mut rpageno = 0;
235 : loop {
236 36 : let r = reader.read_exact(&mut buf).await;
237 36 : match r {
238 : Ok(_) => {
239 18 : modification.put_slru_page_image(
240 18 : slru,
241 18 : segno,
242 18 : rpageno,
243 18 : Bytes::copy_from_slice(&buf),
244 18 : )?;
245 : }
246 :
247 : // TODO: UnexpectedEof is expected
248 18 : Err(err) => match err.kind() {
249 : std::io::ErrorKind::UnexpectedEof => {
250 : // reached EOF. That's expected.
251 18 : ensure!(rpageno == nblocks as u32, "unexpected EOF");
252 18 : break;
253 : }
254 : _ => {
255 0 : bail!("error reading file {}: {:#}", path.display(), err);
256 : }
257 : },
258 : };
259 18 : rpageno += 1;
260 : }
261 :
262 18 : Ok(())
263 18 : }
264 :
265 : /// Scan PostgreSQL WAL files in given directory and load all records between
266 : /// 'startpoint' and 'endpoint' into the repository.
267 6 : async fn import_wal(
268 6 : walpath: &Utf8Path,
269 6 : tline: &Timeline,
270 6 : startpoint: Lsn,
271 6 : endpoint: Lsn,
272 6 : ctx: &RequestContext,
273 6 : ) -> anyhow::Result<()> {
274 6 : let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
275 6 :
276 6 : let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
277 6 : let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
278 6 : let mut last_lsn = startpoint;
279 :
280 6 : let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
281 :
282 12 : while last_lsn <= endpoint {
283 : // FIXME: assume postgresql tli 1 for now
284 6 : let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
285 6 : let mut buf = Vec::new();
286 6 :
287 6 : // Read local file
288 6 : let mut path = walpath.join(&filename);
289 6 :
290 6 : // It could be as .partial
291 6 : if !PathBuf::from(&path).exists() {
292 0 : path = walpath.join(filename + ".partial");
293 6 : }
294 :
295 : // Slurp the WAL file
296 6 : let mut file = std::fs::File::open(&path)?;
297 :
298 6 : if offset > 0 {
299 6 : use std::io::Seek;
300 6 : file.seek(std::io::SeekFrom::Start(offset as u64))?;
301 0 : }
302 :
303 : use std::io::Read;
304 6 : let nread = file.read_to_end(&mut buf)?;
305 6 : if nread != WAL_SEGMENT_SIZE - offset {
306 : // Maybe allow this for .partial files?
307 0 : error!("read only {} bytes from WAL file", nread);
308 6 : }
309 :
310 6 : waldecoder.feed_bytes(&buf);
311 6 :
312 6 : let mut nrecords = 0;
313 6 : let mut modification = tline.begin_modification(last_lsn);
314 12 : while last_lsn <= endpoint {
315 6 : if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
316 6 : let mut decoded = DecodedWALRecord::default();
317 6 : decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
318 :
319 6 : walingest
320 6 : .ingest_record(decoded, lsn, &mut modification, ctx)
321 2 : .await?;
322 6 : WAL_INGEST.records_committed.inc();
323 6 :
324 6 : modification.commit(ctx).await?;
325 6 : last_lsn = lsn;
326 6 :
327 6 : nrecords += 1;
328 6 :
329 6 : trace!("imported record at {} (end {})", lsn, endpoint);
330 0 : }
331 : }
332 :
333 6 : debug!("imported {} records up to {}", nrecords, last_lsn);
334 :
335 6 : segno += 1;
336 6 : offset = 0;
337 : }
338 :
339 6 : if last_lsn != startpoint {
340 6 : info!("reached end of WAL at {}", last_lsn);
341 : } else {
342 0 : info!("no WAL to import at {}", last_lsn);
343 : }
344 :
345 6 : Ok(())
346 6 : }
347 :
348 0 : pub async fn import_basebackup_from_tar(
349 0 : tline: &Timeline,
350 0 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
351 0 : base_lsn: Lsn,
352 0 : ctx: &RequestContext,
353 0 : ) -> Result<()> {
354 0 : info!("importing base at {base_lsn}");
355 0 : let mut modification = tline.begin_modification(base_lsn);
356 0 : modification.init_empty()?;
357 :
358 0 : let mut pg_control: Option<ControlFileData> = None;
359 :
360 : // Import base
361 0 : let mut entries = Archive::new(reader).entries()?;
362 0 : while let Some(base_tar_entry) = entries.next().await {
363 0 : let mut entry = base_tar_entry?;
364 0 : let header = entry.header();
365 0 : let len = header.entry_size()? as usize;
366 0 : let file_path = header.path()?.into_owned();
367 0 :
368 0 : match header.entry_type() {
369 : tokio_tar::EntryType::Regular => {
370 0 : if let Some(res) =
371 0 : import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
372 0 : {
373 0 : // We found the pg_control file.
374 0 : pg_control = Some(res);
375 0 : }
376 0 : modification.flush(ctx).await?;
377 : }
378 : tokio_tar::EntryType::Directory => {
379 0 : debug!("directory {:?}", file_path);
380 : }
381 : _ => {
382 0 : bail!(
383 0 : "entry {} in backup tar archive is of unexpected type: {:?}",
384 0 : file_path.display(),
385 0 : header.entry_type()
386 0 : );
387 : }
388 : }
389 : }
390 :
391 : // sanity check: ensure that pg_control is loaded
392 0 : let _pg_control = pg_control.context("pg_control file not found")?;
393 :
394 0 : modification.commit(ctx).await?;
395 0 : Ok(())
396 0 : }
397 :
398 0 : pub async fn import_wal_from_tar(
399 0 : tline: &Timeline,
400 0 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
401 0 : start_lsn: Lsn,
402 0 : end_lsn: Lsn,
403 0 : ctx: &RequestContext,
404 0 : ) -> Result<()> {
405 0 : // Set up walingest mutable state
406 0 : let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
407 0 : let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
408 0 : let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
409 0 : let mut last_lsn = start_lsn;
410 0 : let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
411 :
412 : // Ingest wal until end_lsn
413 0 : info!("importing wal until {}", end_lsn);
414 0 : let mut pg_wal_tar = Archive::new(reader);
415 0 : let mut pg_wal_entries = pg_wal_tar.entries()?;
416 0 : while last_lsn <= end_lsn {
417 0 : let bytes = {
418 0 : let mut entry = pg_wal_entries
419 0 : .next()
420 0 : .await
421 0 : .ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
422 0 : let header = entry.header();
423 0 : let file_path = header.path()?.into_owned();
424 0 :
425 0 : match header.entry_type() {
426 : tokio_tar::EntryType::Regular => {
427 : // FIXME: assume postgresql tli 1 for now
428 0 : let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
429 0 : let file_name = file_path
430 0 : .file_name()
431 0 : .expect("missing wal filename")
432 0 : .to_string_lossy();
433 0 : ensure!(expected_filename == file_name);
434 :
435 0 : debug!("processing wal file {:?}", file_path);
436 0 : read_all_bytes(&mut entry).await?
437 : }
438 : tokio_tar::EntryType::Directory => {
439 0 : debug!("directory {:?}", file_path);
440 0 : continue;
441 : }
442 : _ => {
443 0 : bail!(
444 0 : "entry {} in WAL tar archive is of unexpected type: {:?}",
445 0 : file_path.display(),
446 0 : header.entry_type()
447 0 : );
448 : }
449 : }
450 : };
451 :
452 0 : waldecoder.feed_bytes(&bytes[offset..]);
453 0 :
454 0 : let mut modification = tline.begin_modification(last_lsn);
455 0 : while last_lsn <= end_lsn {
456 0 : if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
457 0 : let mut decoded = DecodedWALRecord::default();
458 0 : decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
459 0 : walingest
460 0 : .ingest_record(decoded, lsn, &mut modification, ctx)
461 0 : .await?;
462 0 : modification.commit(ctx).await?;
463 0 : last_lsn = lsn;
464 0 :
465 0 : debug!("imported record at {} (end {})", lsn, end_lsn);
466 0 : }
467 : }
468 :
469 0 : debug!("imported records up to {}", last_lsn);
470 0 : segno += 1;
471 0 : offset = 0;
472 : }
473 :
474 0 : if last_lsn != start_lsn {
475 0 : info!("reached end of WAL at {}", last_lsn);
476 : } else {
477 0 : info!("there was no WAL to import at {}", last_lsn);
478 : }
479 :
480 : // Log any extra unused files
481 0 : while let Some(e) = pg_wal_entries.next().await {
482 0 : let entry = e?;
483 0 : let header = entry.header();
484 0 : let file_path = header.path()?.into_owned();
485 0 : info!("skipping {:?}", file_path);
486 : }
487 :
488 0 : Ok(())
489 0 : }
490 :
491 5790 : async fn import_file(
492 5790 : modification: &mut DatadirModification<'_>,
493 5790 : file_path: &Path,
494 5790 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
495 5790 : len: usize,
496 5790 : ctx: &RequestContext,
497 5790 : ) -> Result<Option<ControlFileData>> {
498 5790 : let file_name = match file_path.file_name() {
499 5790 : Some(name) => name.to_string_lossy(),
500 0 : None => return Ok(None),
501 : };
502 :
503 5790 : if file_name.starts_with('.') {
504 : // tar archives on macOs, created without COPYFILE_DISABLE=1 env var
505 : // will contain "fork files", skip them.
506 0 : return Ok(None);
507 5790 : }
508 5790 :
509 5790 : if file_path.starts_with("global") {
510 360 : let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
511 360 : let dbnode = 0;
512 360 :
513 360 : match file_name.as_ref() {
514 360 : "pg_control" => {
515 55 : let bytes = read_all_bytes(reader).await?;
516 :
517 : // Extract the checkpoint record and import it separately.
518 6 : let pg_control = ControlFileData::decode(&bytes[..])?;
519 6 : let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
520 6 : modification.put_checkpoint(checkpoint_bytes)?;
521 6 : debug!("imported control file");
522 :
523 : // Import it as ControlFile
524 6 : modification.put_control_file(bytes)?;
525 6 : return Ok(Some(pg_control));
526 : }
527 354 : "pg_filenode.map" => {
528 34 : let bytes = read_all_bytes(reader).await?;
529 6 : modification
530 6 : .put_relmap_file(spcnode, dbnode, bytes, ctx)
531 0 : .await?;
532 6 : debug!("imported relmap file")
533 : }
534 348 : "PG_VERSION" => {
535 0 : debug!("ignored PG_VERSION file");
536 : }
537 : _ => {
538 689 : import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
539 348 : debug!("imported rel creation");
540 : }
541 : }
542 5430 : } else if file_path.starts_with("base") {
543 5364 : let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
544 5364 : let dbnode: u32 = file_path
545 5364 : .iter()
546 5364 : .nth(1)
547 5364 : .expect("invalid file path, expected dbnode")
548 5364 : .to_string_lossy()
549 5364 : .parse()?;
550 :
551 5364 : match file_name.as_ref() {
552 5364 : "pg_filenode.map" => {
553 102 : let bytes = read_all_bytes(reader).await?;
554 18 : modification
555 18 : .put_relmap_file(spcnode, dbnode, bytes, ctx)
556 0 : .await?;
557 18 : debug!("imported relmap file")
558 : }
559 5346 : "PG_VERSION" => {
560 18 : debug!("ignored PG_VERSION file");
561 : }
562 : _ => {
563 20249 : import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
564 5328 : debug!("imported rel creation");
565 : }
566 : }
567 66 : } else if file_path.starts_with("pg_xact") {
568 6 : let slru = SlruKind::Clog;
569 6 :
570 11 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
571 6 : debug!("imported clog slru");
572 60 : } else if file_path.starts_with("pg_multixact/offsets") {
573 6 : let slru = SlruKind::MultiXactOffsets;
574 6 :
575 12 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
576 6 : debug!("imported multixact offsets slru");
577 54 : } else if file_path.starts_with("pg_multixact/members") {
578 6 : let slru = SlruKind::MultiXactMembers;
579 6 :
580 12 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
581 6 : debug!("imported multixact members slru");
582 48 : } else if file_path.starts_with("pg_twophase") {
583 0 : let bytes = read_all_bytes(reader).await?;
584 :
585 : // In PostgreSQL v17, this is a 64-bit FullTransactionid. In previous versions,
586 : // it's a 32-bit TransactionId, which fits in u64 anyway.
587 0 : let xid = u64::from_str_radix(file_name.as_ref(), 16)?;
588 0 : modification
589 0 : .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
590 0 : .await?;
591 0 : debug!("imported twophase file");
592 48 : } else if file_path.starts_with("pg_wal") {
593 0 : debug!("found wal file in base section. ignore it");
594 48 : } else if file_path.starts_with("zenith.signal") {
595 : // Parse zenith signal file to set correct previous LSN
596 0 : let bytes = read_all_bytes(reader).await?;
597 : // zenith.signal format is "PREV LSN: prev_lsn"
598 : // TODO write serialization and deserialization in the same place.
599 0 : let zenith_signal = std::str::from_utf8(&bytes)?.trim();
600 0 : let prev_lsn = match zenith_signal {
601 0 : "PREV LSN: none" => Lsn(0),
602 0 : "PREV LSN: invalid" => Lsn(0),
603 0 : other => {
604 0 : let split = other.split(':').collect::<Vec<_>>();
605 0 : split[1]
606 0 : .trim()
607 0 : .parse::<Lsn>()
608 0 : .context("can't parse zenith.signal")?
609 : }
610 : };
611 :
612 : // zenith.signal is not necessarily the last file, that we handle
613 : // but it is ok to call `finish_write()`, because final `modification.commit()`
614 : // will update lsn once more to the final one.
615 0 : let writer = modification.tline.writer().await;
616 0 : writer.finish_write(prev_lsn);
617 0 :
618 0 : debug!("imported zenith signal {}", prev_lsn);
619 48 : } else if file_path.starts_with("pg_tblspc") {
620 : // TODO Backups exported from neon won't have pg_tblspc, but we will need
621 : // this to import arbitrary postgres databases.
622 0 : bail!("Importing pg_tblspc is not implemented");
623 : } else {
624 48 : debug!(
625 0 : "ignoring unrecognized file \"{}\" in tar archive",
626 0 : file_path.display()
627 : );
628 : }
629 :
630 5784 : Ok(None)
631 5790 : }
632 :
633 30 : async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
634 30 : let mut buf: Vec<u8> = vec![];
635 191 : reader.read_to_end(&mut buf).await?;
636 30 : Ok(Bytes::from(buf))
637 30 : }
|