TLA Line data Source code
1 : //!
2 : //! Import data and WAL from a PostgreSQL data directory and WAL segments into
3 : //! a neon Timeline.
4 : //!
5 : use std::io::SeekFrom;
6 : use std::path::{Path, PathBuf};
7 :
8 : use anyhow::{bail, ensure, Context, Result};
9 : use async_compression::tokio::bufread::ZstdDecoder;
10 : use async_compression::{tokio::write::ZstdEncoder, zstd::CParameter, Level};
11 : use bytes::Bytes;
12 : use camino::Utf8Path;
13 : use futures::StreamExt;
14 : use nix::NixPath;
15 : use tokio::fs::{File, OpenOptions};
16 : use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
17 : use tokio_tar::Archive;
18 : use tokio_tar::Builder;
19 : use tokio_tar::HeaderMode;
20 : use tracing::*;
21 : use walkdir::WalkDir;
22 :
23 : use crate::context::RequestContext;
24 : use crate::metrics::WAL_INGEST;
25 : use crate::pgdatadir_mapping::*;
26 : use crate::tenant::remote_timeline_client::INITDB_PATH;
27 : use crate::tenant::Timeline;
28 : use crate::walingest::WalIngest;
29 : use crate::walrecord::DecodedWALRecord;
30 : use pageserver_api::reltag::{RelTag, SlruKind};
31 : use postgres_ffi::pg_constants;
32 : use postgres_ffi::relfile_utils::*;
33 : use postgres_ffi::waldecoder::WalStreamDecoder;
34 : use postgres_ffi::ControlFileData;
35 : use postgres_ffi::DBState_DB_SHUTDOWNED;
36 : use postgres_ffi::Oid;
37 : use postgres_ffi::XLogFileName;
38 : use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
39 : use utils::lsn::Lsn;
40 :
41 : // Returns checkpoint LSN from controlfile
42 CBC 525 : pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
43 525 : // Read control file to extract the LSN
44 525 : let controlfile_path = path.join("global").join("pg_control");
45 525 : let controlfile_buf = std::fs::read(&controlfile_path)
46 525 : .with_context(|| format!("reading controlfile: {controlfile_path}"))?;
47 525 : let controlfile = ControlFileData::decode(&controlfile_buf)?;
48 525 : let lsn = controlfile.checkPoint;
49 525 :
50 525 : Ok(Lsn(lsn))
51 525 : }
52 :
53 : ///
54 : /// Import all relation data pages from local disk into the repository.
55 : ///
56 : /// This is currently only used to import a cluster freshly created by initdb.
57 : /// The code that deals with the checkpoint would not work right if the
58 : /// cluster was not shut down cleanly.
59 524 : pub async fn import_timeline_from_postgres_datadir(
60 524 : tline: &Timeline,
61 524 : pgdata_path: &Utf8Path,
62 524 : pgdata_lsn: Lsn,
63 524 : ctx: &RequestContext,
64 524 : ) -> Result<()> {
65 524 : let mut pg_control: Option<ControlFileData> = None;
66 524 :
67 524 : // TODO this shoud be start_lsn, which is not necessarily equal to end_lsn (aka lsn)
68 524 : // Then fishing out pg_control would be unnecessary
69 524 : let mut modification = tline.begin_modification(pgdata_lsn);
70 524 : modification.init_empty()?;
71 :
72 : // Import all but pg_wal
73 524 : let all_but_wal = WalkDir::new(pgdata_path)
74 524 : .into_iter()
75 510392 : .filter_entry(|entry| !entry.path().ends_with("pg_wal"));
76 510392 : for entry in all_but_wal {
77 509868 : let entry = entry?;
78 509868 : let metadata = entry.metadata().expect("error getting dir entry metadata");
79 509868 : if metadata.is_file() {
80 497292 : let absolute_path = entry.path();
81 497292 : let relative_path = absolute_path.strip_prefix(pgdata_path)?;
82 :
83 497292 : let mut file = tokio::fs::File::open(absolute_path).await?;
84 497292 : let len = metadata.len() as usize;
85 524 : if let Some(control_file) =
86 2107906 : import_file(&mut modification, relative_path, &mut file, len, ctx).await?
87 524 : {
88 524 : pg_control = Some(control_file);
89 496768 : }
90 497292 : modification.flush(ctx).await?;
91 12576 : }
92 : }
93 :
94 : // We're done importing all the data files.
95 12573 : modification.commit(ctx).await?;
96 :
97 : // We expect the Postgres server to be shut down cleanly.
98 524 : let pg_control = pg_control.context("pg_control file not found")?;
99 : ensure!(
100 524 : pg_control.state == DBState_DB_SHUTDOWNED,
101 UBC 0 : "Postgres cluster was not shut down cleanly"
102 : );
103 : ensure!(
104 CBC 524 : pg_control.checkPointCopy.redo == pgdata_lsn.0,
105 UBC 0 : "unexpected checkpoint REDO pointer"
106 : );
107 :
108 : // Import WAL. This is needed even when starting from a shutdown checkpoint, because
109 : // this reads the checkpoint record itself, advancing the tip of the timeline to
110 : // *after* the checkpoint record. And crucially, it initializes the 'prev_lsn'.
111 CBC 524 : import_wal(
112 524 : &pgdata_path.join("pg_wal"),
113 524 : tline,
114 524 : Lsn(pg_control.checkPointCopy.redo),
115 524 : pgdata_lsn,
116 524 : ctx,
117 524 : )
118 UBC 0 : .await?;
119 :
120 CBC 524 : Ok(())
121 524 : }
122 :
123 : // subroutine of import_timeline_from_postgres_datadir(), to load one relation file.
124 497201 : async fn import_rel(
125 497201 : modification: &mut DatadirModification<'_>,
126 497201 : path: &Path,
127 497201 : spcoid: Oid,
128 497201 : dboid: Oid,
129 497201 : reader: &mut (impl AsyncRead + Unpin),
130 497201 : len: usize,
131 497201 : ctx: &RequestContext,
132 497201 : ) -> anyhow::Result<()> {
133 : // Does it look like a relation file?
134 UBC 0 : trace!("importing rel file {}", path.display());
135 :
136 CBC 497201 : let filename = &path
137 497201 : .file_name()
138 497201 : .expect("missing rel filename")
139 497201 : .to_string_lossy();
140 497201 : let (relnode, forknum, segno) = parse_relfilename(filename).map_err(|e| {
141 UBC 0 : warn!("unrecognized file in postgres datadir: {:?} ({})", path, e);
142 0 : e
143 CBC 497201 : })?;
144 :
145 497201 : let mut buf: [u8; 8192] = [0u8; 8192];
146 497201 :
147 497201 : ensure!(len % BLCKSZ as usize == 0);
148 497201 : let nblocks = len / BLCKSZ as usize;
149 497201 :
150 497201 : let rel = RelTag {
151 497201 : spcnode: spcoid,
152 497201 : dbnode: dboid,
153 497201 : relnode,
154 497201 : forknum,
155 497201 : };
156 497201 :
157 497201 : let mut blknum: u32 = segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
158 :
159 : // Call put_rel_creation for every segment of the relation,
160 : // because there is no guarantee about the order in which we are processing segments.
161 : // ignore "relation already exists" error
162 : //
163 : // FIXME: Keep track of which relations we've already created?
164 : // https://github.com/neondatabase/neon/issues/3309
165 497201 : if let Err(e) = modification
166 497201 : .put_rel_creation(rel, nblocks as u32, ctx)
167 UBC 0 : .await
168 : {
169 0 : match e {
170 : RelationError::AlreadyExists => {
171 0 : debug!("Relation {} already exist. We must be extending it.", rel)
172 : }
173 0 : _ => return Err(e.into()),
174 : }
175 CBC 497201 : }
176 :
177 : loop {
178 2159304 : let r = reader.read_exact(&mut buf).await;
179 2159304 : match r {
180 : Ok(_) => {
181 1662103 : modification.put_rel_page_image(rel, blknum, Bytes::copy_from_slice(&buf))?;
182 : }
183 :
184 : // TODO: UnexpectedEof is expected
185 497201 : Err(err) => match err.kind() {
186 : std::io::ErrorKind::UnexpectedEof => {
187 : // reached EOF. That's expected.
188 497201 : let relative_blknum = blknum - segno * (1024 * 1024 * 1024 / BLCKSZ as u32);
189 497201 : ensure!(relative_blknum == nblocks as u32, "unexpected EOF");
190 497201 : break;
191 : }
192 : _ => {
193 UBC 0 : bail!("error reading file {}: {:#}", path.display(), err);
194 : }
195 : },
196 : };
197 CBC 1662103 : blknum += 1;
198 : }
199 :
200 : // Update relation size
201 : //
202 : // If we process rel segments out of order,
203 : // put_rel_extend will skip the update.
204 497201 : modification.put_rel_extend(rel, blknum, ctx).await?;
205 :
206 497201 : Ok(())
207 497201 : }
208 :
209 : /// Import an SLRU segment file
210 : ///
211 1602 : async fn import_slru(
212 1602 : modification: &mut DatadirModification<'_>,
213 1602 : slru: SlruKind,
214 1602 : path: &Path,
215 1602 : reader: &mut (impl AsyncRead + Unpin),
216 1602 : len: usize,
217 1602 : ctx: &RequestContext,
218 1602 : ) -> anyhow::Result<()> {
219 1602 : info!("importing slru file {path:?}");
220 :
221 1602 : let mut buf: [u8; 8192] = [0u8; 8192];
222 1602 : let filename = &path
223 1602 : .file_name()
224 1602 : .with_context(|| format!("missing slru filename for path {path:?}"))?
225 1602 : .to_string_lossy();
226 1602 : let segno = u32::from_str_radix(filename, 16)?;
227 :
228 1602 : ensure!(len % BLCKSZ as usize == 0); // we assume SLRU block size is the same as BLCKSZ
229 1602 : let nblocks = len / BLCKSZ as usize;
230 1602 :
231 1602 : ensure!(nblocks <= pg_constants::SLRU_PAGES_PER_SEGMENT as usize);
232 :
233 1602 : modification
234 1602 : .put_slru_segment_creation(slru, segno, nblocks as u32, ctx)
235 UBC 0 : .await?;
236 :
237 CBC 1602 : let mut rpageno = 0;
238 : loop {
239 3204 : let r = reader.read_exact(&mut buf).await;
240 3204 : match r {
241 : Ok(_) => {
242 1602 : modification.put_slru_page_image(
243 1602 : slru,
244 1602 : segno,
245 1602 : rpageno,
246 1602 : Bytes::copy_from_slice(&buf),
247 1602 : )?;
248 : }
249 :
250 : // TODO: UnexpectedEof is expected
251 1602 : Err(err) => match err.kind() {
252 : std::io::ErrorKind::UnexpectedEof => {
253 : // reached EOF. That's expected.
254 1602 : ensure!(rpageno == nblocks as u32, "unexpected EOF");
255 1602 : break;
256 : }
257 : _ => {
258 UBC 0 : bail!("error reading file {}: {:#}", path.display(), err);
259 : }
260 : },
261 : };
262 CBC 1602 : rpageno += 1;
263 : }
264 :
265 1602 : Ok(())
266 1602 : }
267 :
268 : /// Scan PostgreSQL WAL files in given directory and load all records between
269 : /// 'startpoint' and 'endpoint' into the repository.
270 524 : async fn import_wal(
271 524 : walpath: &Utf8Path,
272 524 : tline: &Timeline,
273 524 : startpoint: Lsn,
274 524 : endpoint: Lsn,
275 524 : ctx: &RequestContext,
276 524 : ) -> anyhow::Result<()> {
277 524 : let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
278 524 :
279 524 : let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
280 524 : let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
281 524 : let mut last_lsn = startpoint;
282 :
283 524 : let mut walingest = WalIngest::new(tline, startpoint, ctx).await?;
284 :
285 1048 : while last_lsn <= endpoint {
286 : // FIXME: assume postgresql tli 1 for now
287 524 : let filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
288 524 : let mut buf = Vec::new();
289 524 :
290 524 : // Read local file
291 524 : let mut path = walpath.join(&filename);
292 524 :
293 524 : // It could be as .partial
294 524 : if !PathBuf::from(&path).exists() {
295 UBC 0 : path = walpath.join(filename + ".partial");
296 CBC 524 : }
297 :
298 : // Slurp the WAL file
299 524 : let mut file = std::fs::File::open(&path)?;
300 :
301 524 : if offset > 0 {
302 : use std::io::Seek;
303 524 : file.seek(std::io::SeekFrom::Start(offset as u64))?;
304 UBC 0 : }
305 :
306 : use std::io::Read;
307 CBC 524 : let nread = file.read_to_end(&mut buf)?;
308 524 : if nread != WAL_SEGMENT_SIZE - offset {
309 : // Maybe allow this for .partial files?
310 UBC 0 : error!("read only {} bytes from WAL file", nread);
311 CBC 524 : }
312 :
313 524 : waldecoder.feed_bytes(&buf);
314 524 :
315 524 : let mut nrecords = 0;
316 524 : let mut modification = tline.begin_modification(last_lsn);
317 524 : let mut decoded = DecodedWALRecord::default();
318 1048 : while last_lsn <= endpoint {
319 524 : if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
320 524 : walingest
321 524 : .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
322 UBC 0 : .await?;
323 CBC 524 : WAL_INGEST.records_committed.inc();
324 524 :
325 524 : modification.commit(ctx).await?;
326 524 : last_lsn = lsn;
327 524 :
328 524 : nrecords += 1;
329 :
330 UBC 0 : trace!("imported record at {} (end {})", lsn, endpoint);
331 0 : }
332 : }
333 :
334 0 : debug!("imported {} records up to {}", nrecords, last_lsn);
335 :
336 CBC 524 : segno += 1;
337 524 : offset = 0;
338 : }
339 :
340 524 : if last_lsn != startpoint {
341 524 : info!("reached end of WAL at {}", last_lsn);
342 : } else {
343 UBC 0 : info!("no WAL to import at {}", last_lsn);
344 : }
345 :
346 CBC 524 : Ok(())
347 524 : }
348 :
349 11 : pub async fn import_basebackup_from_tar(
350 11 : tline: &Timeline,
351 11 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
352 11 : base_lsn: Lsn,
353 11 : ctx: &RequestContext,
354 11 : ) -> Result<()> {
355 11 : info!("importing base at {base_lsn}");
356 11 : let mut modification = tline.begin_modification(base_lsn);
357 11 : modification.init_empty()?;
358 :
359 11 : let mut pg_control: Option<ControlFileData> = None;
360 :
361 : // Import base
362 11 : let mut entries = Archive::new(reader).entries()?;
363 9781 : while let Some(base_tar_entry) = entries.next().await {
364 9770 : let mut entry = base_tar_entry?;
365 9770 : let header = entry.header();
366 9770 : let len = header.entry_size()? as usize;
367 9770 : let file_path = header.path()?.into_owned();
368 9770 :
369 9770 : match header.entry_type() {
370 : tokio_tar::EntryType::Regular => {
371 9 : if let Some(res) =
372 9520 : import_file(&mut modification, file_path.as_ref(), &mut entry, len, ctx).await?
373 9 : {
374 9 : // We found the pg_control file.
375 9 : pg_control = Some(res);
376 9511 : }
377 9520 : modification.flush(ctx).await?;
378 : }
379 : tokio_tar::EntryType::Directory => {
380 UBC 0 : debug!("directory {:?}", file_path);
381 : }
382 : _ => {
383 0 : bail!(
384 0 : "entry {} in backup tar archive is of unexpected type: {:?}",
385 0 : file_path.display(),
386 0 : header.entry_type()
387 0 : );
388 : }
389 : }
390 : }
391 :
392 : // sanity check: ensure that pg_control is loaded
393 CBC 11 : let _pg_control = pg_control.context("pg_control file not found")?;
394 :
395 259 : modification.commit(ctx).await?;
396 9 : Ok(())
397 11 : }
398 :
399 2 : pub async fn import_wal_from_tar(
400 2 : tline: &Timeline,
401 2 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
402 2 : start_lsn: Lsn,
403 2 : end_lsn: Lsn,
404 2 : ctx: &RequestContext,
405 2 : ) -> Result<()> {
406 2 : // Set up walingest mutable state
407 2 : let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
408 2 : let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
409 2 : let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
410 2 : let mut last_lsn = start_lsn;
411 2 : let mut walingest = WalIngest::new(tline, start_lsn, ctx).await?;
412 :
413 : // Ingest wal until end_lsn
414 2 : info!("importing wal until {}", end_lsn);
415 2 : let mut pg_wal_tar = Archive::new(reader);
416 2 : let mut pg_wal_entries = pg_wal_tar.entries()?;
417 4 : while last_lsn <= end_lsn {
418 2 : let bytes = {
419 2 : let mut entry = pg_wal_entries
420 2 : .next()
421 2 : .await
422 2 : .ok_or_else(|| anyhow::anyhow!("expected more wal"))??;
423 2 : let header = entry.header();
424 2 : let file_path = header.path()?.into_owned();
425 2 :
426 2 : match header.entry_type() {
427 : tokio_tar::EntryType::Regular => {
428 : // FIXME: assume postgresql tli 1 for now
429 2 : let expected_filename = XLogFileName(1, segno, WAL_SEGMENT_SIZE);
430 2 : let file_name = file_path
431 2 : .file_name()
432 2 : .expect("missing wal filename")
433 2 : .to_string_lossy();
434 2 : ensure!(expected_filename == file_name);
435 :
436 UBC 0 : debug!("processing wal file {:?}", file_path);
437 CBC 3773 : read_all_bytes(&mut entry).await?
438 : }
439 : tokio_tar::EntryType::Directory => {
440 UBC 0 : debug!("directory {:?}", file_path);
441 0 : continue;
442 : }
443 : _ => {
444 0 : bail!(
445 0 : "entry {} in WAL tar archive is of unexpected type: {:?}",
446 0 : file_path.display(),
447 0 : header.entry_type()
448 0 : );
449 : }
450 : }
451 : };
452 :
453 CBC 2 : waldecoder.feed_bytes(&bytes[offset..]);
454 2 :
455 2 : let mut modification = tline.begin_modification(last_lsn);
456 2 : let mut decoded = DecodedWALRecord::default();
457 10 : while last_lsn <= end_lsn {
458 8 : if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
459 8 : walingest
460 8 : .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
461 UBC 0 : .await?;
462 CBC 8 : modification.commit(ctx).await?;
463 8 : last_lsn = lsn;
464 :
465 UBC 0 : debug!("imported record at {} (end {})", lsn, end_lsn);
466 0 : }
467 : }
468 :
469 0 : debug!("imported records up to {}", last_lsn);
470 CBC 2 : segno += 1;
471 2 : offset = 0;
472 : }
473 :
474 2 : if last_lsn != start_lsn {
475 2 : info!("reached end of WAL at {}", last_lsn);
476 : } else {
477 UBC 0 : info!("there was no WAL to import at {}", last_lsn);
478 : }
479 :
480 : // Log any extra unused files
481 CBC 2 : while let Some(e) = pg_wal_entries.next().await {
482 UBC 0 : let entry = e?;
483 0 : let header = entry.header();
484 0 : let file_path = header.path()?.into_owned();
485 0 : info!("skipping {:?}", file_path);
486 : }
487 :
488 CBC 2 : Ok(())
489 2 : }
490 :
491 506812 : async fn import_file(
492 506812 : modification: &mut DatadirModification<'_>,
493 506812 : file_path: &Path,
494 506812 : reader: &mut (impl AsyncRead + Send + Sync + Unpin),
495 506812 : len: usize,
496 506812 : ctx: &RequestContext,
497 506812 : ) -> Result<Option<ControlFileData>> {
498 506812 : let file_name = match file_path.file_name() {
499 506812 : Some(name) => name.to_string_lossy(),
500 UBC 0 : None => return Ok(None),
501 : };
502 :
503 CBC 506812 : if file_name.starts_with('.') {
504 : // tar archives on macOs, created without COPYFILE_DISABLE=1 env var
505 : // will contain "fork files", skip them.
506 UBC 0 : return Ok(None);
507 CBC 506812 : }
508 506812 :
509 506812 : if file_path.starts_with("global") {
510 30440 : let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
511 30440 : let dbnode = 0;
512 30440 :
513 30440 : match file_name.as_ref() {
514 30440 : "pg_control" => {
515 5203 : let bytes = read_all_bytes(reader).await?;
516 :
517 : // Extract the checkpoint record and import it separately.
518 533 : let pg_control = ControlFileData::decode(&bytes[..])?;
519 533 : let checkpoint_bytes = pg_control.checkPointCopy.encode()?;
520 533 : modification.put_checkpoint(checkpoint_bytes)?;
521 UBC 0 : debug!("imported control file");
522 :
523 : // Import it as ControlFile
524 CBC 533 : modification.put_control_file(bytes)?;
525 533 : return Ok(Some(pg_control));
526 : }
527 29907 : "pg_filenode.map" => {
528 3117 : let bytes = read_all_bytes(reader).await?;
529 534 : modification
530 534 : .put_relmap_file(spcnode, dbnode, bytes, ctx)
531 UBC 0 : .await?;
532 0 : debug!("imported relmap file")
533 : }
534 CBC 29373 : "PG_VERSION" => {
535 UBC 0 : debug!("ignored PG_VERSION file");
536 : }
537 : _ => {
538 CBC 62846 : import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
539 UBC 0 : debug!("imported rel creation");
540 : }
541 : }
542 CBC 476372 : } else if file_path.starts_with("base") {
543 471032 : let spcnode = pg_constants::DEFAULTTABLESPACE_OID;
544 471032 : let dbnode: u32 = file_path
545 471032 : .iter()
546 471032 : .nth(1)
547 471032 : .expect("invalid file path, expected dbnode")
548 471032 : .to_string_lossy()
549 471032 : .parse()?;
550 :
551 471032 : match file_name.as_ref() {
552 471032 : "pg_filenode.map" => {
553 9316 : let bytes = read_all_bytes(reader).await?;
554 1602 : modification
555 1602 : .put_relmap_file(spcnode, dbnode, bytes, ctx)
556 UBC 0 : .await?;
557 0 : debug!("imported relmap file")
558 : }
559 CBC 469430 : "PG_VERSION" => {
560 UBC 0 : debug!("ignored PG_VERSION file");
561 : }
562 : _ => {
563 CBC 2032950 : import_rel(modification, file_path, spcnode, dbnode, reader, len, ctx).await?;
564 UBC 0 : debug!("imported rel creation");
565 : }
566 : }
567 CBC 5340 : } else if file_path.starts_with("pg_xact") {
568 534 : let slru = SlruKind::Clog;
569 534 :
570 1041 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
571 UBC 0 : debug!("imported clog slru");
572 CBC 4806 : } else if file_path.starts_with("pg_multixact/offsets") {
573 534 : let slru = SlruKind::MultiXactOffsets;
574 534 :
575 1041 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
576 UBC 0 : debug!("imported multixact offsets slru");
577 CBC 4272 : } else if file_path.starts_with("pg_multixact/members") {
578 534 : let slru = SlruKind::MultiXactMembers;
579 534 :
580 1042 : import_slru(modification, slru, file_path, reader, len, ctx).await?;
581 UBC 0 : debug!("imported multixact members slru");
582 CBC 3738 : } else if file_path.starts_with("pg_twophase") {
583 UBC 0 : let xid = u32::from_str_radix(file_name.as_ref(), 16)?;
584 :
585 0 : let bytes = read_all_bytes(reader).await?;
586 0 : modification
587 0 : .put_twophase_file(xid, Bytes::copy_from_slice(&bytes[..]), ctx)
588 0 : .await?;
589 0 : debug!("imported twophase file");
590 CBC 3738 : } else if file_path.starts_with("pg_wal") {
591 UBC 0 : debug!("found wal file in base section. ignore it");
592 CBC 3731 : } else if file_path.starts_with("zenith.signal") {
593 : // Parse zenith signal file to set correct previous LSN
594 7 : let bytes = read_all_bytes(reader).await?;
595 : // zenith.signal format is "PREV LSN: prev_lsn"
596 : // TODO write serialization and deserialization in the same place.
597 7 : let zenith_signal = std::str::from_utf8(&bytes)?.trim();
598 7 : let prev_lsn = match zenith_signal {
599 7 : "PREV LSN: none" => Lsn(0),
600 7 : "PREV LSN: invalid" => Lsn(0),
601 7 : other => {
602 7 : let split = other.split(':').collect::<Vec<_>>();
603 7 : split[1]
604 7 : .trim()
605 7 : .parse::<Lsn>()
606 7 : .context("can't parse zenith.signal")?
607 : }
608 : };
609 :
610 : // zenith.signal is not necessarily the last file, that we handle
611 : // but it is ok to call `finish_write()`, because final `modification.commit()`
612 : // will update lsn once more to the final one.
613 7 : let writer = modification.tline.writer().await;
614 7 : writer.finish_write(prev_lsn);
615 :
616 UBC 0 : debug!("imported zenith signal {}", prev_lsn);
617 CBC 3724 : } else if file_path.starts_with("pg_tblspc") {
618 : // TODO Backups exported from neon won't have pg_tblspc, but we will need
619 : // this to import arbitrary postgres databases.
620 UBC 0 : bail!("Importing pg_tblspc is not implemented");
621 : } else {
622 0 : debug!(
623 0 : "ignoring unrecognized file \"{}\" in tar archive",
624 0 : file_path.display()
625 0 : );
626 : }
627 :
628 CBC 506279 : Ok(None)
629 506812 : }
630 :
631 2678 : async fn read_all_bytes(reader: &mut (impl AsyncRead + Unpin)) -> Result<Bytes> {
632 2678 : let mut buf: Vec<u8> = vec![];
633 21409 : reader.read_to_end(&mut buf).await?;
634 2678 : Ok(Bytes::from(buf))
635 2678 : }
636 :
637 522 : pub async fn create_tar_zst(pgdata_path: &Utf8Path, tmp_path: &Utf8Path) -> Result<(File, u64)> {
638 522 : let file = OpenOptions::new()
639 522 : .create(true)
640 522 : .truncate(true)
641 522 : .read(true)
642 522 : .write(true)
643 522 : .open(&tmp_path)
644 938 : .await
645 522 : .with_context(|| format!("tempfile creation {tmp_path}"))?;
646 :
647 522 : let mut paths = Vec::new();
648 509472 : for entry in WalkDir::new(pgdata_path) {
649 509472 : let entry = entry?;
650 509472 : let metadata = entry.metadata().expect("error getting dir entry metadata");
651 509472 : // Also allow directories so that we also get empty directories
652 509472 : if !(metadata.is_file() || metadata.is_dir()) {
653 UBC 0 : continue;
654 CBC 509472 : }
655 509472 : let path = entry.into_path();
656 509472 : paths.push(path);
657 : }
658 : // Do a sort to get a more consistent listing
659 522 : paths.sort_unstable();
660 522 : let zstd = ZstdEncoder::with_quality_and_params(
661 522 : file,
662 522 : Level::Default,
663 522 : &[CParameter::enable_long_distance_matching(true)],
664 522 : );
665 522 : let mut builder = Builder::new(zstd);
666 522 : // Use reproducible header mode
667 522 : builder.mode(HeaderMode::Deterministic);
668 509994 : for path in paths {
669 509472 : let rel_path = path.strip_prefix(pgdata_path)?;
670 509472 : if rel_path.is_empty() {
671 : // The top directory should not be compressed,
672 : // the tar crate doesn't like that
673 522 : continue;
674 508950 : }
675 4267999 : builder.append_path_with_name(&path, rel_path).await?;
676 : }
677 522 : let mut zstd = builder.into_inner().await?;
678 522 : zstd.shutdown().await?;
679 522 : let mut compressed = zstd.into_inner();
680 522 : let compressed_len = compressed.metadata().await?.len();
681 522 : const INITDB_TAR_ZST_WARN_LIMIT: u64 = 2 * 1024 * 1024;
682 522 : if compressed_len > INITDB_TAR_ZST_WARN_LIMIT {
683 UBC 0 : warn!("compressed {INITDB_PATH} size of {compressed_len} is above limit {INITDB_TAR_ZST_WARN_LIMIT}.");
684 CBC 522 : }
685 522 : compressed.seek(SeekFrom::Start(0)).await?;
686 522 : Ok((compressed, compressed_len))
687 522 : }
688 :
689 3 : pub async fn extract_tar_zst(
690 3 : pgdata_path: &Utf8Path,
691 3 : tar_zst: impl AsyncBufRead + Unpin,
692 3 : ) -> Result<()> {
693 3 : let tar = Box::pin(ZstdDecoder::new(tar_zst));
694 3 : let mut archive = Archive::new(tar);
695 16100 : archive.unpack(pgdata_path).await?;
696 3 : Ok(())
697 3 : }
|