Line data Source code
1 : //!
2 : //! Generate a tarball with files needed to bootstrap ComputeNode.
3 : //!
4 : //! TODO: this module has nothing to do with PostgreSQL pg_basebackup.
5 : //! It could use a better name.
6 : //!
7 : //! Stateless Postgres compute node is launched by sending a tarball
8 : //! which contains non-relational data (multixacts, clog, filenodemaps, twophase files),
9 : //! generated pg_control and dummy segment of WAL.
10 : //! This module is responsible for creation of such tarball
11 : //! from data stored in object storage.
12 : //!
13 : use anyhow::{anyhow, Context};
14 : use bytes::{BufMut, Bytes, BytesMut};
15 : use fail::fail_point;
16 : use pageserver_api::key::Key;
17 : use postgres_ffi::pg_constants;
18 : use std::fmt::Write as FmtWrite;
19 : use std::time::SystemTime;
20 : use tokio::io;
21 : use tokio::io::AsyncWrite;
22 : use tracing::*;
23 :
24 : use tokio_tar::{Builder, EntryType, Header};
25 :
26 : use crate::context::RequestContext;
27 : use crate::pgdatadir_mapping::Version;
28 : use crate::tenant::Timeline;
29 : use pageserver_api::reltag::{RelTag, SlruKind};
30 :
31 : use postgres_ffi::dispatch_pgversion;
32 : use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
33 : use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
34 : use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
35 : use postgres_ffi::TransactionId;
36 : use postgres_ffi::XLogFileName;
37 : use postgres_ffi::PG_TLI;
38 : use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE};
39 : use utils::lsn::Lsn;
40 :
41 0 : #[derive(Debug, thiserror::Error)]
42 : pub enum BasebackupError {
43 : #[error("basebackup pageserver error {0:#}")]
44 : Server(#[from] anyhow::Error),
45 : #[error("basebackup client error {0:#}")]
46 : Client(#[source] io::Error),
47 : }
48 :
49 : /// Create basebackup with non-rel data in it.
50 : /// Only include relational data if 'full_backup' is true.
51 : ///
52 : /// Currently we use empty 'req_lsn' in two cases:
53 : /// * During the basebackup right after timeline creation
54 : /// * When working without safekeepers. In this situation it is important to match the lsn
55 : /// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
56 : /// to start the replication.
57 0 : pub async fn send_basebackup_tarball<'a, W>(
58 0 : write: &'a mut W,
59 0 : timeline: &'a Timeline,
60 0 : req_lsn: Option<Lsn>,
61 0 : prev_lsn: Option<Lsn>,
62 0 : full_backup: bool,
63 0 : ctx: &'a RequestContext,
64 0 : ) -> Result<(), BasebackupError>
65 0 : where
66 0 : W: AsyncWrite + Send + Sync + Unpin,
67 0 : {
68 : // Compute postgres doesn't have any previous WAL files, but the first
69 : // record that it's going to write needs to include the LSN of the
70 : // previous record (xl_prev). We include prev_record_lsn in the
71 : // "zenith.signal" file, so that postgres can read it during startup.
72 : //
73 : // We don't keep full history of record boundaries in the page server,
74 : // however, only the predecessor of the latest record on each
75 : // timeline. So we can only provide prev_record_lsn when you take a
76 : // base backup at the end of the timeline, i.e. at last_record_lsn.
77 : // Even at the end of the timeline, we sometimes don't have a valid
78 : // prev_lsn value; that happens if the timeline was just branched from
79 : // an old LSN and it doesn't have any WAL of its own yet. We will set
80 : // prev_lsn to Lsn(0) if we cannot provide the correct value.
81 0 : let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
82 : // Backup was requested at a particular LSN. The caller should've
83 : // already checked that it's a valid LSN.
84 :
85 : // If the requested point is the end of the timeline, we can
86 : // provide prev_lsn. (get_last_record_rlsn() might return it as
87 : // zero, though, if no WAL has been generated on this timeline
88 : // yet.)
89 0 : let end_of_timeline = timeline.get_last_record_rlsn();
90 0 : if req_lsn == end_of_timeline.last {
91 0 : (end_of_timeline.prev, req_lsn)
92 : } else {
93 0 : (Lsn(0), req_lsn)
94 : }
95 : } else {
96 : // Backup was requested at end of the timeline.
97 0 : let end_of_timeline = timeline.get_last_record_rlsn();
98 0 : (end_of_timeline.prev, end_of_timeline.last)
99 : };
100 :
101 : // Consolidate the derived and the provided prev_lsn values
102 0 : let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
103 0 : if backup_prev != Lsn(0) && backup_prev != provided_prev_lsn {
104 0 : return Err(BasebackupError::Server(anyhow!(
105 0 : "backup_prev {backup_prev} != provided_prev_lsn {provided_prev_lsn}"
106 0 : )));
107 0 : }
108 0 : provided_prev_lsn
109 : } else {
110 0 : backup_prev
111 : };
112 :
113 0 : info!(
114 0 : "taking basebackup lsn={}, prev_lsn={} (full_backup={})",
115 : backup_lsn, prev_lsn, full_backup
116 : );
117 :
118 0 : let basebackup = Basebackup {
119 0 : ar: Builder::new_non_terminated(write),
120 0 : timeline,
121 0 : lsn: backup_lsn,
122 0 : prev_record_lsn: prev_lsn,
123 0 : full_backup,
124 0 : ctx,
125 0 : };
126 0 : basebackup
127 0 : .send_tarball()
128 0 : .instrument(info_span!("send_tarball", backup_lsn=%backup_lsn))
129 0 : .await
130 0 : }
131 :
132 : /// This is short-living object only for the time of tarball creation,
133 : /// created mostly to avoid passing a lot of parameters between various functions
134 : /// used for constructing tarball.
135 : struct Basebackup<'a, W>
136 : where
137 : W: AsyncWrite + Send + Sync + Unpin,
138 : {
139 : ar: Builder<&'a mut W>,
140 : timeline: &'a Timeline,
141 : lsn: Lsn,
142 : prev_record_lsn: Lsn,
143 : full_backup: bool,
144 : ctx: &'a RequestContext,
145 : }
146 :
147 : /// A sink that accepts SLRU blocks ordered by key and forwards
148 : /// full segments to the archive.
149 : struct SlruSegmentsBuilder<'a, 'b, W>
150 : where
151 : W: AsyncWrite + Send + Sync + Unpin,
152 : {
153 : ar: &'a mut Builder<&'b mut W>,
154 : buf: Vec<u8>,
155 : current_segment: Option<(SlruKind, u32)>,
156 : total_blocks: usize,
157 : }
158 :
159 : impl<'a, 'b, W> SlruSegmentsBuilder<'a, 'b, W>
160 : where
161 : W: AsyncWrite + Send + Sync + Unpin,
162 : {
163 0 : fn new(ar: &'a mut Builder<&'b mut W>) -> Self {
164 0 : Self {
165 0 : ar,
166 0 : buf: Vec::new(),
167 0 : current_segment: None,
168 0 : total_blocks: 0,
169 0 : }
170 0 : }
171 :
172 0 : async fn add_block(&mut self, key: &Key, block: Bytes) -> Result<(), BasebackupError> {
173 0 : let (kind, segno, _) = key.to_slru_block()?;
174 :
175 0 : match kind {
176 : SlruKind::Clog => {
177 0 : if !(block.len() == BLCKSZ as usize || block.len() == BLCKSZ as usize + 8) {
178 0 : return Err(BasebackupError::Server(anyhow!(
179 0 : "invalid SlruKind::Clog record: block.len()={}",
180 0 : block.len()
181 0 : )));
182 0 : }
183 : }
184 : SlruKind::MultiXactMembers | SlruKind::MultiXactOffsets => {
185 0 : if block.len() != BLCKSZ as usize {
186 0 : return Err(BasebackupError::Server(anyhow!(
187 0 : "invalid {:?} record: block.len()={}",
188 0 : kind,
189 0 : block.len()
190 0 : )));
191 0 : }
192 : }
193 : }
194 :
195 0 : let segment = (kind, segno);
196 0 : match self.current_segment {
197 0 : None => {
198 0 : self.current_segment = Some(segment);
199 0 : self.buf
200 0 : .extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
201 0 : }
202 0 : Some(current_seg) if current_seg == segment => {
203 0 : self.buf
204 0 : .extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
205 0 : }
206 : Some(_) => {
207 0 : self.flush().await?;
208 :
209 0 : self.current_segment = Some(segment);
210 0 : self.buf
211 0 : .extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
212 : }
213 : }
214 :
215 0 : Ok(())
216 0 : }
217 :
218 0 : async fn flush(&mut self) -> Result<(), BasebackupError> {
219 0 : let nblocks = self.buf.len() / BLCKSZ as usize;
220 0 : let (kind, segno) = self.current_segment.take().unwrap();
221 0 : let segname = format!("{}/{:>04X}", kind.to_str(), segno);
222 0 : let header = new_tar_header(&segname, self.buf.len() as u64)?;
223 0 : self.ar
224 0 : .append(&header, self.buf.as_slice())
225 0 : .await
226 0 : .map_err(BasebackupError::Client)?;
227 :
228 0 : self.total_blocks += nblocks;
229 0 : debug!("Added to basebackup slru {} relsize {}", segname, nblocks);
230 :
231 0 : self.buf.clear();
232 0 :
233 0 : Ok(())
234 0 : }
235 :
236 0 : async fn finish(mut self) -> Result<(), BasebackupError> {
237 0 : let res = if self.current_segment.is_none() || self.buf.is_empty() {
238 0 : Ok(())
239 : } else {
240 0 : self.flush().await
241 : };
242 :
243 0 : info!("Collected {} SLRU blocks", self.total_blocks);
244 :
245 0 : res
246 0 : }
247 : }
248 :
249 : impl<'a, W> Basebackup<'a, W>
250 : where
251 : W: AsyncWrite + Send + Sync + Unpin,
252 : {
253 0 : async fn send_tarball(mut self) -> Result<(), BasebackupError> {
254 : // TODO include checksum
255 :
256 0 : let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
257 :
258 : // Create pgdata subdirs structure
259 0 : for dir in PGDATA_SUBDIRS.iter() {
260 0 : let header = new_tar_header_dir(dir)?;
261 0 : self.ar
262 0 : .append(&header, &mut io::empty())
263 0 : .await
264 0 : .context("could not add directory to basebackup tarball")?;
265 : }
266 :
267 : // Send config files.
268 0 : for filepath in PGDATA_SPECIAL_FILES.iter() {
269 0 : if *filepath == "pg_hba.conf" {
270 0 : let data = PG_HBA.as_bytes();
271 0 : let header = new_tar_header(filepath, data.len() as u64)?;
272 0 : self.ar
273 0 : .append(&header, data)
274 0 : .await
275 0 : .context("could not add config file to basebackup tarball")?;
276 : } else {
277 0 : let header = new_tar_header(filepath, 0)?;
278 0 : self.ar
279 0 : .append(&header, &mut io::empty())
280 0 : .await
281 0 : .context("could not add config file to basebackup tarball")?;
282 : }
283 : }
284 0 : if !lazy_slru_download {
285 : // Gather non-relational files from object storage pages.
286 0 : let slru_partitions = self
287 0 : .timeline
288 0 : .get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
289 0 : .await
290 0 : .map_err(|e| BasebackupError::Server(e.into()))?
291 0 : .partition(
292 0 : self.timeline.get_shard_identity(),
293 0 : Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
294 0 : );
295 0 :
296 0 : let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
297 :
298 0 : for part in slru_partitions.parts {
299 0 : let blocks = self
300 0 : .timeline
301 0 : .get_vectored(part, self.lsn, self.ctx)
302 0 : .await
303 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
304 :
305 0 : for (key, block) in blocks {
306 0 : let block = block.map_err(|e| BasebackupError::Server(e.into()))?;
307 0 : slru_builder.add_block(&key, block).await?;
308 : }
309 : }
310 0 : slru_builder.finish().await?;
311 0 : }
312 :
313 0 : let mut min_restart_lsn: Lsn = Lsn::MAX;
314 : // Create tablespace directories
315 0 : for ((spcnode, dbnode), has_relmap_file) in self
316 0 : .timeline
317 0 : .list_dbdirs(self.lsn, self.ctx)
318 0 : .await
319 0 : .map_err(|e| BasebackupError::Server(e.into()))?
320 : {
321 0 : self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
322 :
323 : // If full backup is requested, include all relation files.
324 : // Otherwise only include init forks of unlogged relations.
325 0 : let rels = self
326 0 : .timeline
327 0 : .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
328 0 : .await
329 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
330 0 : for &rel in rels.iter() {
331 : // Send init fork as main fork to provide well formed empty
332 : // contents of UNLOGGED relations. Postgres copies it in
333 : // `reinit.c` during recovery.
334 0 : if rel.forknum == INIT_FORKNUM {
335 : // I doubt we need _init fork itself, but having it at least
336 : // serves as a marker relation is unlogged.
337 0 : self.add_rel(rel, rel).await?;
338 0 : self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
339 0 : continue;
340 0 : }
341 0 :
342 0 : if self.full_backup {
343 0 : if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
344 : {
345 : // skip this, will include it when we reach the init fork
346 0 : continue;
347 0 : }
348 0 : self.add_rel(rel, rel).await?;
349 0 : }
350 : }
351 : }
352 :
353 0 : for (path, content) in self
354 0 : .timeline
355 0 : .list_aux_files(self.lsn, self.ctx)
356 0 : .await
357 0 : .map_err(|e| BasebackupError::Server(e.into()))?
358 : {
359 0 : if path.starts_with("pg_replslot") {
360 0 : let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
361 0 : let restart_lsn = Lsn(u64::from_le_bytes(
362 0 : content[offs..offs + 8].try_into().unwrap(),
363 0 : ));
364 0 : info!("Replication slot {} restart LSN={}", path, restart_lsn);
365 0 : min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
366 0 : } else if path == "pg_logical/replorigin_checkpoint" {
367 : // replorigin_checkoint is written only on compute shutdown, so it contains
368 : // deteriorated values. So we generate our own version of this file for the particular LSN
369 : // based on information about replorigins extracted from transaction commit records.
370 : // In future we will not generate AUX record for "pg_logical/replorigin_checkpoint" at all,
371 : // but now we should handle (skip) it for backward compatibility.
372 0 : continue;
373 0 : }
374 0 : let header = new_tar_header(&path, content.len() as u64)?;
375 0 : self.ar
376 0 : .append(&header, &*content)
377 0 : .await
378 0 : .context("could not add aux file to basebackup tarball")?;
379 : }
380 :
381 0 : if min_restart_lsn != Lsn::MAX {
382 0 : info!(
383 0 : "Min restart LSN for logical replication is {}",
384 : min_restart_lsn
385 : );
386 0 : let data = min_restart_lsn.0.to_le_bytes();
387 0 : let header = new_tar_header("restart.lsn", data.len() as u64)?;
388 0 : self.ar
389 0 : .append(&header, &data[..])
390 0 : .await
391 0 : .context("could not add restart.lsn file to basebackup tarball")?;
392 0 : }
393 0 : for xid in self
394 0 : .timeline
395 0 : .list_twophase_files(self.lsn, self.ctx)
396 0 : .await
397 0 : .map_err(|e| BasebackupError::Server(e.into()))?
398 : {
399 0 : self.add_twophase_file(xid).await?;
400 : }
401 0 : let repl_origins = self
402 0 : .timeline
403 0 : .get_replorigins(self.lsn, self.ctx)
404 0 : .await
405 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
406 0 : let n_origins = repl_origins.len();
407 0 : if n_origins != 0 {
408 : //
409 : // Construct "pg_logical/replorigin_checkpoint" file based on information about replication origins
410 : // extracted from transaction commit record. We are using this file to pass information about replication
411 : // origins to compute to allow logical replication to restart from proper point.
412 : //
413 0 : let mut content = Vec::with_capacity(n_origins * 16 + 8);
414 0 : content.extend_from_slice(&pg_constants::REPLICATION_STATE_MAGIC.to_le_bytes());
415 0 : for (origin_id, origin_lsn) in repl_origins {
416 0 : content.extend_from_slice(&origin_id.to_le_bytes());
417 0 : content.extend_from_slice(&[0u8; 6]); // align to 8 bytes
418 0 : content.extend_from_slice(&origin_lsn.0.to_le_bytes());
419 0 : }
420 0 : let crc32 = crc32c::crc32c(&content);
421 0 : content.extend_from_slice(&crc32.to_le_bytes());
422 0 : let header = new_tar_header("pg_logical/replorigin_checkpoint", content.len() as u64)?;
423 0 : self.ar.append(&header, &*content).await.context(
424 0 : "could not add pg_logical/replorigin_checkpoint file to basebackup tarball",
425 0 : )?;
426 0 : }
427 :
428 0 : fail_point!("basebackup-before-control-file", |_| {
429 0 : Err(BasebackupError::Server(anyhow!(
430 0 : "failpoint basebackup-before-control-file"
431 0 : )))
432 0 : });
433 :
434 : // Generate pg_control and bootstrap WAL segment.
435 0 : self.add_pgcontrol_file().await?;
436 0 : self.ar.finish().await.map_err(BasebackupError::Client)?;
437 0 : debug!("all tarred up!");
438 0 : Ok(())
439 0 : }
440 :
441 : /// Add contents of relfilenode `src`, naming it as `dst`.
442 0 : async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> Result<(), BasebackupError> {
443 0 : let nblocks = self
444 0 : .timeline
445 0 : .get_rel_size(src, Version::Lsn(self.lsn), self.ctx)
446 0 : .await
447 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
448 :
449 : // If the relation is empty, create an empty file
450 0 : if nblocks == 0 {
451 0 : let file_name = dst.to_segfile_name(0);
452 0 : let header = new_tar_header(&file_name, 0)?;
453 0 : self.ar
454 0 : .append(&header, &mut io::empty())
455 0 : .await
456 0 : .map_err(BasebackupError::Client)?;
457 0 : return Ok(());
458 0 : }
459 0 :
460 0 : // Add a file for each chunk of blocks (aka segment)
461 0 : let mut startblk = 0;
462 0 : let mut seg = 0;
463 0 : while startblk < nblocks {
464 0 : let endblk = std::cmp::min(startblk + RELSEG_SIZE, nblocks);
465 0 :
466 0 : let mut segment_data: Vec<u8> = vec![];
467 0 : for blknum in startblk..endblk {
468 0 : let img = self
469 0 : .timeline
470 0 : .get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), self.ctx)
471 0 : .await
472 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
473 0 : segment_data.extend_from_slice(&img[..]);
474 : }
475 :
476 0 : let file_name = dst.to_segfile_name(seg as u32);
477 0 : let header = new_tar_header(&file_name, segment_data.len() as u64)?;
478 0 : self.ar
479 0 : .append(&header, segment_data.as_slice())
480 0 : .await
481 0 : .map_err(BasebackupError::Client)?;
482 :
483 0 : seg += 1;
484 0 : startblk = endblk;
485 : }
486 :
487 0 : Ok(())
488 0 : }
489 :
490 : //
491 : // Include database/tablespace directories.
492 : //
493 : // Each directory contains a PG_VERSION file, and the default database
494 : // directories also contain pg_filenode.map files.
495 : //
496 0 : async fn add_dbdir(
497 0 : &mut self,
498 0 : spcnode: u32,
499 0 : dbnode: u32,
500 0 : has_relmap_file: bool,
501 0 : ) -> Result<(), BasebackupError> {
502 0 : let relmap_img = if has_relmap_file {
503 0 : let img = self
504 0 : .timeline
505 0 : .get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
506 0 : .await
507 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
508 :
509 0 : if img.len()
510 0 : != dispatch_pgversion!(self.timeline.pg_version, pgv::bindings::SIZEOF_RELMAPFILE)
511 : {
512 0 : return Err(BasebackupError::Server(anyhow!(
513 0 : "img.len() != SIZE_OF_RELMAPFILE, img.len()={}",
514 0 : img.len(),
515 0 : )));
516 0 : }
517 0 :
518 0 : Some(img)
519 : } else {
520 0 : None
521 : };
522 :
523 0 : if spcnode == GLOBALTABLESPACE_OID {
524 0 : let pg_version_str = match self.timeline.pg_version {
525 0 : 14 | 15 => self.timeline.pg_version.to_string(),
526 0 : ver => format!("{ver}\x0A"),
527 : };
528 0 : let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?;
529 0 : self.ar
530 0 : .append(&header, pg_version_str.as_bytes())
531 0 : .await
532 0 : .map_err(BasebackupError::Client)?;
533 :
534 0 : info!("timeline.pg_version {}", self.timeline.pg_version);
535 :
536 0 : if let Some(img) = relmap_img {
537 : // filenode map for global tablespace
538 0 : let header = new_tar_header("global/pg_filenode.map", img.len() as u64)?;
539 0 : self.ar
540 0 : .append(&header, &img[..])
541 0 : .await
542 0 : .map_err(BasebackupError::Client)?;
543 : } else {
544 0 : warn!("global/pg_filenode.map is missing");
545 : }
546 : } else {
547 : // User defined tablespaces are not supported. However, as
548 : // a special case, if a tablespace/db directory is
549 : // completely empty, we can leave it out altogether. This
550 : // makes taking a base backup after the 'tablespace'
551 : // regression test pass, because the test drops the
552 : // created tablespaces after the tests.
553 : //
554 : // FIXME: this wouldn't be necessary, if we handled
555 : // XLOG_TBLSPC_DROP records. But we probably should just
556 : // throw an error on CREATE TABLESPACE in the first place.
557 0 : if !has_relmap_file
558 0 : && self
559 0 : .timeline
560 0 : .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
561 0 : .await
562 0 : .map_err(|e| BasebackupError::Server(e.into()))?
563 0 : .is_empty()
564 : {
565 0 : return Ok(());
566 0 : }
567 0 : // User defined tablespaces are not supported
568 0 : if spcnode != DEFAULTTABLESPACE_OID {
569 0 : return Err(BasebackupError::Server(anyhow!(
570 0 : "spcnode != DEFAULTTABLESPACE_OID, spcnode={spcnode}"
571 0 : )));
572 0 : }
573 0 :
574 0 : // Append dir path for each database
575 0 : let path = format!("base/{}", dbnode);
576 0 : let header = new_tar_header_dir(&path)?;
577 0 : self.ar
578 0 : .append(&header, &mut io::empty())
579 0 : .await
580 0 : .map_err(BasebackupError::Client)?;
581 :
582 0 : if let Some(img) = relmap_img {
583 0 : let dst_path = format!("base/{}/PG_VERSION", dbnode);
584 :
585 0 : let pg_version_str = match self.timeline.pg_version {
586 0 : 14 | 15 => self.timeline.pg_version.to_string(),
587 0 : ver => format!("{ver}\x0A"),
588 : };
589 0 : let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
590 0 : self.ar
591 0 : .append(&header, pg_version_str.as_bytes())
592 0 : .await
593 0 : .map_err(BasebackupError::Client)?;
594 :
595 0 : let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
596 0 : let header = new_tar_header(&relmap_path, img.len() as u64)?;
597 0 : self.ar
598 0 : .append(&header, &img[..])
599 0 : .await
600 0 : .map_err(BasebackupError::Client)?;
601 0 : }
602 : };
603 0 : Ok(())
604 0 : }
605 :
606 : //
607 : // Extract twophase state files
608 : //
609 0 : async fn add_twophase_file(&mut self, xid: TransactionId) -> Result<(), BasebackupError> {
610 0 : let img = self
611 0 : .timeline
612 0 : .get_twophase_file(xid, self.lsn, self.ctx)
613 0 : .await
614 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
615 :
616 0 : let mut buf = BytesMut::new();
617 0 : buf.extend_from_slice(&img[..]);
618 0 : let crc = crc32c::crc32c(&img[..]);
619 0 : buf.put_u32_le(crc);
620 0 : let path = format!("pg_twophase/{:>08X}", xid);
621 0 : let header = new_tar_header(&path, buf.len() as u64)?;
622 0 : self.ar
623 0 : .append(&header, &buf[..])
624 0 : .await
625 0 : .map_err(BasebackupError::Client)?;
626 :
627 0 : Ok(())
628 0 : }
629 :
630 : //
631 : // Add generated pg_control file and bootstrap WAL segment.
632 : // Also send zenith.signal file with extra bootstrap data.
633 : //
634 0 : async fn add_pgcontrol_file(&mut self) -> Result<(), BasebackupError> {
635 0 : // add zenith.signal file
636 0 : let mut zenith_signal = String::new();
637 0 : if self.prev_record_lsn == Lsn(0) {
638 0 : if self.timeline.is_ancestor_lsn(self.lsn) {
639 0 : write!(zenith_signal, "PREV LSN: none")
640 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
641 : } else {
642 0 : write!(zenith_signal, "PREV LSN: invalid")
643 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
644 : }
645 : } else {
646 0 : write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)
647 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
648 : }
649 0 : self.ar
650 0 : .append(
651 0 : &new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
652 0 : zenith_signal.as_bytes(),
653 : )
654 0 : .await
655 0 : .map_err(BasebackupError::Client)?;
656 :
657 0 : let checkpoint_bytes = self
658 0 : .timeline
659 0 : .get_checkpoint(self.lsn, self.ctx)
660 0 : .await
661 0 : .context("failed to get checkpoint bytes")?;
662 0 : let pg_control_bytes = self
663 0 : .timeline
664 0 : .get_control_file(self.lsn, self.ctx)
665 0 : .await
666 0 : .context("failed get control bytes")?;
667 :
668 0 : let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control(
669 0 : &pg_control_bytes,
670 0 : &checkpoint_bytes,
671 0 : self.lsn,
672 0 : self.timeline.pg_version,
673 0 : )?;
674 :
675 : //send pg_control
676 0 : let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
677 0 : self.ar
678 0 : .append(&header, &pg_control_bytes[..])
679 0 : .await
680 0 : .map_err(BasebackupError::Client)?;
681 :
682 : //send wal segment
683 0 : let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
684 0 : let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE);
685 0 : let wal_file_path = format!("pg_wal/{}", wal_file_name);
686 0 : let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
687 :
688 0 : let wal_seg = postgres_ffi::generate_wal_segment(
689 0 : segno,
690 0 : system_identifier,
691 0 : self.timeline.pg_version,
692 0 : self.lsn,
693 0 : )
694 0 : .map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
695 0 : if wal_seg.len() != WAL_SEGMENT_SIZE {
696 0 : return Err(BasebackupError::Server(anyhow!(
697 0 : "wal_seg.len() != WAL_SEGMENT_SIZE, wal_seg.len()={}",
698 0 : wal_seg.len()
699 0 : )));
700 0 : }
701 0 : self.ar
702 0 : .append(&header, &wal_seg[..])
703 0 : .await
704 0 : .map_err(BasebackupError::Client)?;
705 0 : Ok(())
706 0 : }
707 : }
708 :
709 : //
710 : // Create new tarball entry header
711 : //
712 0 : fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
713 0 : let mut header = Header::new_gnu();
714 0 : header.set_size(size);
715 0 : header.set_path(path)?;
716 0 : header.set_mode(0b110000000); // -rw-------
717 0 : header.set_mtime(
718 0 : // use currenttime as last modified time
719 0 : SystemTime::now()
720 0 : .duration_since(SystemTime::UNIX_EPOCH)
721 0 : .unwrap()
722 0 : .as_secs(),
723 0 : );
724 0 : header.set_cksum();
725 0 : Ok(header)
726 0 : }
727 :
728 0 : fn new_tar_header_dir(path: &str) -> anyhow::Result<Header> {
729 0 : let mut header = Header::new_gnu();
730 0 : header.set_size(0);
731 0 : header.set_path(path)?;
732 0 : header.set_mode(0o755); // -rw-------
733 0 : header.set_entry_type(EntryType::dir());
734 0 : header.set_mtime(
735 0 : // use currenttime as last modified time
736 0 : SystemTime::now()
737 0 : .duration_since(SystemTime::UNIX_EPOCH)
738 0 : .unwrap()
739 0 : .as_secs(),
740 0 : );
741 0 : header.set_cksum();
742 0 : Ok(header)
743 0 : }
|