Line data Source code
1 : //!
2 : //! Generate a tarball with files needed to bootstrap ComputeNode.
3 : //!
4 : //! TODO: this module has nothing to do with PostgreSQL pg_basebackup.
5 : //! It could use a better name.
6 : //!
7 : //! Stateless Postgres compute node is launched by sending a tarball
8 : //! which contains non-relational data (multixacts, clog, filenodemaps, twophase files),
9 : //! generated pg_control and dummy segment of WAL.
10 : //! This module is responsible for creation of such tarball
11 : //! from data stored in object storage.
12 : //!
13 : use std::fmt::Write as FmtWrite;
14 : use std::time::{Instant, SystemTime};
15 :
16 : use anyhow::{Context, anyhow};
17 : use bytes::{BufMut, Bytes, BytesMut};
18 : use fail::fail_point;
19 : use pageserver_api::key::{Key, rel_block_to_key};
20 : use pageserver_api::reltag::{RelTag, SlruKind};
21 : use postgres_ffi::pg_constants::{
22 : DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID, PG_HBA, PGDATA_SPECIAL_FILES,
23 : };
24 : use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
25 : use postgres_ffi::{
26 : BLCKSZ, PG_TLI, RELSEG_SIZE, WAL_SEGMENT_SIZE, XLogFileName, dispatch_pgversion, pg_constants,
27 : };
28 : use tokio::io;
29 : use tokio::io::AsyncWrite;
30 : use tokio_tar::{Builder, EntryType, Header};
31 : use tracing::*;
32 : use utils::lsn::Lsn;
33 :
34 : use crate::context::RequestContext;
35 : use crate::pgdatadir_mapping::Version;
36 : use crate::tenant::storage_layer::IoConcurrency;
37 : use crate::tenant::timeline::GetVectoredError;
38 : use crate::tenant::{PageReconstructError, Timeline};
39 :
40 : #[derive(Debug, thiserror::Error)]
41 : pub enum BasebackupError {
42 : #[error("basebackup pageserver error {0:#}")]
43 : Server(#[from] anyhow::Error),
44 : #[error("basebackup client error {0:#} when {1}")]
45 : Client(#[source] io::Error, &'static str),
46 : #[error("basebackup during shutdown")]
47 : Shutdown,
48 : }
49 :
50 : impl From<PageReconstructError> for BasebackupError {
51 0 : fn from(value: PageReconstructError) -> Self {
52 0 : match value {
53 0 : PageReconstructError::Cancelled => BasebackupError::Shutdown,
54 0 : err => BasebackupError::Server(err.into()),
55 : }
56 0 : }
57 : }
58 :
59 : impl From<GetVectoredError> for BasebackupError {
60 0 : fn from(value: GetVectoredError) -> Self {
61 0 : match value {
62 0 : GetVectoredError::Cancelled => BasebackupError::Shutdown,
63 0 : err => BasebackupError::Server(err.into()),
64 : }
65 0 : }
66 : }
67 :
68 : /// Create basebackup with non-rel data in it.
69 : /// Only include relational data if 'full_backup' is true.
70 : ///
71 : /// Currently we use empty 'req_lsn' in two cases:
72 : /// * During the basebackup right after timeline creation
73 : /// * When working without safekeepers. In this situation it is important to match the lsn
74 : /// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
75 : /// to start the replication.
76 0 : pub async fn send_basebackup_tarball<'a, W>(
77 0 : write: &'a mut W,
78 0 : timeline: &'a Timeline,
79 0 : req_lsn: Option<Lsn>,
80 0 : prev_lsn: Option<Lsn>,
81 0 : full_backup: bool,
82 0 : replica: bool,
83 0 : ctx: &'a RequestContext,
84 0 : ) -> Result<(), BasebackupError>
85 0 : where
86 0 : W: AsyncWrite + Send + Sync + Unpin,
87 0 : {
88 : // Compute postgres doesn't have any previous WAL files, but the first
89 : // record that it's going to write needs to include the LSN of the
90 : // previous record (xl_prev). We include prev_record_lsn in the
91 : // "zenith.signal" file, so that postgres can read it during startup.
92 : //
93 : // We don't keep full history of record boundaries in the page server,
94 : // however, only the predecessor of the latest record on each
95 : // timeline. So we can only provide prev_record_lsn when you take a
96 : // base backup at the end of the timeline, i.e. at last_record_lsn.
97 : // Even at the end of the timeline, we sometimes don't have a valid
98 : // prev_lsn value; that happens if the timeline was just branched from
99 : // an old LSN and it doesn't have any WAL of its own yet. We will set
100 : // prev_lsn to Lsn(0) if we cannot provide the correct value.
101 0 : let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
102 : // Backup was requested at a particular LSN. The caller should've
103 : // already checked that it's a valid LSN.
104 :
105 : // If the requested point is the end of the timeline, we can
106 : // provide prev_lsn. (get_last_record_rlsn() might return it as
107 : // zero, though, if no WAL has been generated on this timeline
108 : // yet.)
109 0 : let end_of_timeline = timeline.get_last_record_rlsn();
110 0 : if req_lsn == end_of_timeline.last {
111 0 : (end_of_timeline.prev, req_lsn)
112 : } else {
113 0 : (Lsn(0), req_lsn)
114 : }
115 : } else {
116 : // Backup was requested at end of the timeline.
117 0 : let end_of_timeline = timeline.get_last_record_rlsn();
118 0 : (end_of_timeline.prev, end_of_timeline.last)
119 : };
120 :
121 : // Consolidate the derived and the provided prev_lsn values
122 0 : let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
123 0 : if backup_prev != Lsn(0) && backup_prev != provided_prev_lsn {
124 0 : return Err(BasebackupError::Server(anyhow!(
125 0 : "backup_prev {backup_prev} != provided_prev_lsn {provided_prev_lsn}"
126 0 : )));
127 0 : }
128 0 : provided_prev_lsn
129 : } else {
130 0 : backup_prev
131 : };
132 :
133 0 : info!(
134 0 : "taking basebackup lsn={}, prev_lsn={} (full_backup={}, replica={})",
135 : backup_lsn, prev_lsn, full_backup, replica
136 : );
137 :
138 0 : let basebackup = Basebackup {
139 0 : ar: Builder::new_non_terminated(write),
140 0 : timeline,
141 0 : lsn: backup_lsn,
142 0 : prev_record_lsn: prev_lsn,
143 0 : full_backup,
144 0 : replica,
145 0 : ctx,
146 0 : io_concurrency: IoConcurrency::spawn_from_conf(
147 0 : timeline.conf,
148 0 : timeline
149 0 : .gate
150 0 : .enter()
151 0 : .map_err(|_| BasebackupError::Shutdown)?,
152 : ),
153 : };
154 0 : basebackup
155 0 : .send_tarball()
156 0 : .instrument(info_span!("send_tarball", backup_lsn=%backup_lsn))
157 0 : .await
158 0 : }
159 :
160 : /// This is short-living object only for the time of tarball creation,
161 : /// created mostly to avoid passing a lot of parameters between various functions
162 : /// used for constructing tarball.
163 : struct Basebackup<'a, W>
164 : where
165 : W: AsyncWrite + Send + Sync + Unpin,
166 : {
167 : ar: Builder<&'a mut W>,
168 : timeline: &'a Timeline,
169 : lsn: Lsn,
170 : prev_record_lsn: Lsn,
171 : full_backup: bool,
172 : replica: bool,
173 : ctx: &'a RequestContext,
174 : io_concurrency: IoConcurrency,
175 : }
176 :
177 : /// A sink that accepts SLRU blocks ordered by key and forwards
178 : /// full segments to the archive.
179 : struct SlruSegmentsBuilder<'a, 'b, W>
180 : where
181 : W: AsyncWrite + Send + Sync + Unpin,
182 : {
183 : ar: &'a mut Builder<&'b mut W>,
184 : buf: Vec<u8>,
185 : current_segment: Option<(SlruKind, u32)>,
186 : total_blocks: usize,
187 : }
188 :
189 : impl<'a, 'b, W> SlruSegmentsBuilder<'a, 'b, W>
190 : where
191 : W: AsyncWrite + Send + Sync + Unpin,
192 : {
193 0 : fn new(ar: &'a mut Builder<&'b mut W>) -> Self {
194 0 : Self {
195 0 : ar,
196 0 : buf: Vec::new(),
197 0 : current_segment: None,
198 0 : total_blocks: 0,
199 0 : }
200 0 : }
201 :
202 0 : async fn add_block(&mut self, key: &Key, block: Bytes) -> Result<(), BasebackupError> {
203 0 : let (kind, segno, _) = key.to_slru_block()?;
204 :
205 0 : match kind {
206 : SlruKind::Clog => {
207 0 : if !(block.len() == BLCKSZ as usize || block.len() == BLCKSZ as usize + 8) {
208 0 : return Err(BasebackupError::Server(anyhow!(
209 0 : "invalid SlruKind::Clog record: block.len()={}",
210 0 : block.len()
211 0 : )));
212 0 : }
213 : }
214 : SlruKind::MultiXactMembers | SlruKind::MultiXactOffsets => {
215 0 : if block.len() != BLCKSZ as usize {
216 0 : return Err(BasebackupError::Server(anyhow!(
217 0 : "invalid {:?} record: block.len()={}",
218 0 : kind,
219 0 : block.len()
220 0 : )));
221 0 : }
222 : }
223 : }
224 :
225 0 : let segment = (kind, segno);
226 0 : match self.current_segment {
227 0 : None => {
228 0 : self.current_segment = Some(segment);
229 0 : self.buf
230 0 : .extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
231 0 : }
232 0 : Some(current_seg) if current_seg == segment => {
233 0 : self.buf
234 0 : .extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
235 0 : }
236 : Some(_) => {
237 0 : self.flush().await?;
238 :
239 0 : self.current_segment = Some(segment);
240 0 : self.buf
241 0 : .extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
242 : }
243 : }
244 :
245 0 : Ok(())
246 0 : }
247 :
248 0 : async fn flush(&mut self) -> Result<(), BasebackupError> {
249 0 : let nblocks = self.buf.len() / BLCKSZ as usize;
250 0 : let (kind, segno) = self.current_segment.take().unwrap();
251 0 : let segname = format!("{}/{:>04X}", kind.to_str(), segno);
252 0 : let header = new_tar_header(&segname, self.buf.len() as u64)?;
253 0 : self.ar
254 0 : .append(&header, self.buf.as_slice())
255 0 : .await
256 0 : .map_err(|e| BasebackupError::Client(e, "flush"))?;
257 :
258 0 : self.total_blocks += nblocks;
259 0 : debug!("Added to basebackup slru {} relsize {}", segname, nblocks);
260 :
261 0 : self.buf.clear();
262 0 :
263 0 : Ok(())
264 0 : }
265 :
266 0 : async fn finish(mut self) -> Result<(), BasebackupError> {
267 0 : let res = if self.current_segment.is_none() || self.buf.is_empty() {
268 0 : Ok(())
269 : } else {
270 0 : self.flush().await
271 : };
272 :
273 0 : info!("Collected {} SLRU blocks", self.total_blocks);
274 :
275 0 : res
276 0 : }
277 : }
278 :
279 : impl<W> Basebackup<'_, W>
280 : where
281 : W: AsyncWrite + Send + Sync + Unpin,
282 : {
283 0 : async fn send_tarball(mut self) -> Result<(), BasebackupError> {
284 : // TODO include checksum
285 :
286 : // Construct the pg_control file from the persisted checkpoint and pg_control
287 : // information. But we only add this to the tarball at the end, so that if the
288 : // writing is interrupted half-way through, the resulting incomplete tarball will
289 : // be missing the pg_control file, which prevents PostgreSQL from starting up on
290 : // it. With proper error handling, you should never try to start up from an
291 : // incomplete basebackup in the first place, of course, but this is a nice little
292 : // extra safety measure.
293 0 : let checkpoint_bytes = self
294 0 : .timeline
295 0 : .get_checkpoint(self.lsn, self.ctx)
296 0 : .await
297 0 : .context("failed to get checkpoint bytes")?;
298 0 : let pg_control_bytes = self
299 0 : .timeline
300 0 : .get_control_file(self.lsn, self.ctx)
301 0 : .await
302 0 : .context("failed to get control bytes")?;
303 0 : let (pg_control_bytes, system_identifier, was_shutdown) =
304 0 : postgres_ffi::generate_pg_control(
305 0 : &pg_control_bytes,
306 0 : &checkpoint_bytes,
307 0 : self.lsn,
308 0 : self.timeline.pg_version,
309 0 : )?;
310 :
311 0 : let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
312 :
313 0 : let pgversion = self.timeline.pg_version;
314 0 : let subdirs = dispatch_pgversion!(pgversion, &pgv::bindings::PGDATA_SUBDIRS[..]);
315 :
316 : // Create pgdata subdirs structure
317 0 : for dir in subdirs.iter() {
318 0 : let header = new_tar_header_dir(dir)?;
319 0 : self.ar
320 0 : .append(&header, io::empty())
321 0 : .await
322 0 : .map_err(|e| BasebackupError::Client(e, "send_tarball"))?;
323 : }
324 :
325 : // Send config files.
326 0 : for filepath in PGDATA_SPECIAL_FILES.iter() {
327 0 : if *filepath == "pg_hba.conf" {
328 0 : let data = PG_HBA.as_bytes();
329 0 : let header = new_tar_header(filepath, data.len() as u64)?;
330 0 : self.ar
331 0 : .append(&header, data)
332 0 : .await
333 0 : .map_err(|e| BasebackupError::Client(e, "send_tarball,pg_hba.conf"))?;
334 : } else {
335 0 : let header = new_tar_header(filepath, 0)?;
336 0 : self.ar
337 0 : .append(&header, io::empty())
338 0 : .await
339 0 : .map_err(|e| BasebackupError::Client(e, "send_tarball,add_config_file"))?;
340 : }
341 : }
342 0 : if !lazy_slru_download {
343 : // Gather non-relational files from object storage pages.
344 0 : let slru_partitions = self
345 0 : .timeline
346 0 : .get_slru_keyspace(Version::Lsn(self.lsn), self.ctx)
347 0 : .await?
348 0 : .partition(
349 0 : self.timeline.get_shard_identity(),
350 0 : Timeline::MAX_GET_VECTORED_KEYS * BLCKSZ as u64,
351 0 : );
352 0 :
353 0 : let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
354 :
355 0 : for part in slru_partitions.parts {
356 0 : let blocks = self
357 0 : .timeline
358 0 : .get_vectored(part, self.lsn, self.io_concurrency.clone(), self.ctx)
359 0 : .await?;
360 :
361 0 : for (key, block) in blocks {
362 0 : let block = block?;
363 0 : slru_builder.add_block(&key, block).await?;
364 : }
365 : }
366 0 : slru_builder.finish().await?;
367 0 : }
368 :
369 0 : let mut min_restart_lsn: Lsn = Lsn::MAX;
370 : // Create tablespace directories
371 0 : for ((spcnode, dbnode), has_relmap_file) in
372 0 : self.timeline.list_dbdirs(self.lsn, self.ctx).await?
373 : {
374 0 : self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
375 :
376 : // If full backup is requested, include all relation files.
377 : // Otherwise only include init forks of unlogged relations.
378 0 : let rels = self
379 0 : .timeline
380 0 : .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
381 0 : .await?;
382 0 : for &rel in rels.iter() {
383 : // Send init fork as main fork to provide well formed empty
384 : // contents of UNLOGGED relations. Postgres copies it in
385 : // `reinit.c` during recovery.
386 0 : if rel.forknum == INIT_FORKNUM {
387 : // I doubt we need _init fork itself, but having it at least
388 : // serves as a marker relation is unlogged.
389 0 : self.add_rel(rel, rel).await?;
390 0 : self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
391 0 : continue;
392 0 : }
393 0 :
394 0 : if self.full_backup {
395 0 : if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
396 : {
397 : // skip this, will include it when we reach the init fork
398 0 : continue;
399 0 : }
400 0 : self.add_rel(rel, rel).await?;
401 0 : }
402 : }
403 : }
404 :
405 0 : let start_time = Instant::now();
406 0 : let aux_files = self
407 0 : .timeline
408 0 : .list_aux_files(self.lsn, self.ctx, self.io_concurrency.clone())
409 0 : .await?;
410 0 : let aux_scan_time = start_time.elapsed();
411 0 : let aux_estimated_size = aux_files
412 0 : .values()
413 0 : .map(|content| content.len())
414 0 : .sum::<usize>();
415 0 : info!(
416 0 : "Scanned {} aux files in {}ms, aux file content size = {}",
417 0 : aux_files.len(),
418 0 : aux_scan_time.as_millis(),
419 : aux_estimated_size
420 : );
421 :
422 0 : for (path, content) in aux_files {
423 0 : if path.starts_with("pg_replslot") {
424 : // Do not create LR slots at standby because they are not used but prevent WAL truncation
425 0 : if self.replica {
426 0 : continue;
427 0 : }
428 0 : let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
429 0 : let restart_lsn = Lsn(u64::from_le_bytes(
430 0 : content[offs..offs + 8].try_into().unwrap(),
431 0 : ));
432 0 : info!("Replication slot {} restart LSN={}", path, restart_lsn);
433 0 : min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
434 0 : } else if path == "pg_logical/replorigin_checkpoint" {
435 : // replorigin_checkoint is written only on compute shutdown, so it contains
436 : // deteriorated values. So we generate our own version of this file for the particular LSN
437 : // based on information about replorigins extracted from transaction commit records.
438 : // In future we will not generate AUX record for "pg_logical/replorigin_checkpoint" at all,
439 : // but now we should handle (skip) it for backward compatibility.
440 0 : continue;
441 0 : } else if path == "pg_stat/pgstat.stat" && !was_shutdown {
442 : // Drop statistic in case of abnormal termination, i.e. if we're not starting from the exact LSN
443 : // of a shutdown checkpoint.
444 0 : continue;
445 0 : }
446 0 : let header = new_tar_header(&path, content.len() as u64)?;
447 0 : self.ar
448 0 : .append(&header, &*content)
449 0 : .await
450 0 : .map_err(|e| BasebackupError::Client(e, "send_tarball,add_aux_file"))?;
451 : }
452 :
453 0 : if min_restart_lsn != Lsn::MAX {
454 0 : info!(
455 0 : "Min restart LSN for logical replication is {}",
456 : min_restart_lsn
457 : );
458 0 : let data = min_restart_lsn.0.to_le_bytes();
459 0 : let header = new_tar_header("restart.lsn", data.len() as u64)?;
460 0 : self.ar
461 0 : .append(&header, &data[..])
462 0 : .await
463 0 : .map_err(|e| BasebackupError::Client(e, "send_tarball,restart.lsn"))?;
464 0 : }
465 0 : for xid in self
466 0 : .timeline
467 0 : .list_twophase_files(self.lsn, self.ctx)
468 0 : .await?
469 : {
470 0 : self.add_twophase_file(xid).await?;
471 : }
472 0 : let repl_origins = self
473 0 : .timeline
474 0 : .get_replorigins(self.lsn, self.ctx, self.io_concurrency.clone())
475 0 : .await?;
476 0 : let n_origins = repl_origins.len();
477 0 : if n_origins != 0 {
478 : //
479 : // Construct "pg_logical/replorigin_checkpoint" file based on information about replication origins
480 : // extracted from transaction commit record. We are using this file to pass information about replication
481 : // origins to compute to allow logical replication to restart from proper point.
482 : //
483 0 : let mut content = Vec::with_capacity(n_origins * 16 + 8);
484 0 : content.extend_from_slice(&pg_constants::REPLICATION_STATE_MAGIC.to_le_bytes());
485 0 : for (origin_id, origin_lsn) in repl_origins {
486 0 : content.extend_from_slice(&origin_id.to_le_bytes());
487 0 : content.extend_from_slice(&[0u8; 6]); // align to 8 bytes
488 0 : content.extend_from_slice(&origin_lsn.0.to_le_bytes());
489 0 : }
490 0 : let crc32 = crc32c::crc32c(&content);
491 0 : content.extend_from_slice(&crc32.to_le_bytes());
492 0 : let header = new_tar_header("pg_logical/replorigin_checkpoint", content.len() as u64)?;
493 0 : self.ar.append(&header, &*content).await.map_err(|e| {
494 0 : BasebackupError::Client(e, "send_tarball,pg_logical/replorigin_checkpoint")
495 0 : })?;
496 0 : }
497 :
498 0 : fail_point!("basebackup-before-control-file", |_| {
499 0 : Err(BasebackupError::Server(anyhow!(
500 0 : "failpoint basebackup-before-control-file"
501 0 : )))
502 0 : });
503 :
504 : // Last, add the pg_control file and bootstrap WAL segment.
505 0 : self.add_pgcontrol_file(pg_control_bytes, system_identifier)
506 0 : .await?;
507 0 : self.ar
508 0 : .finish()
509 0 : .await
510 0 : .map_err(|e| BasebackupError::Client(e, "send_tarball,finish"))?;
511 0 : debug!("all tarred up!");
512 0 : Ok(())
513 0 : }
514 :
515 : /// Add contents of relfilenode `src`, naming it as `dst`.
516 0 : async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> Result<(), BasebackupError> {
517 0 : let nblocks = self
518 0 : .timeline
519 0 : .get_rel_size(src, Version::Lsn(self.lsn), self.ctx)
520 0 : .await?;
521 :
522 : // If the relation is empty, create an empty file
523 0 : if nblocks == 0 {
524 0 : let file_name = dst.to_segfile_name(0);
525 0 : let header = new_tar_header(&file_name, 0)?;
526 0 : self.ar
527 0 : .append(&header, io::empty())
528 0 : .await
529 0 : .map_err(|e| BasebackupError::Client(e, "add_rel,empty"))?;
530 0 : return Ok(());
531 0 : }
532 0 :
533 0 : // Add a file for each chunk of blocks (aka segment)
534 0 : let mut startblk = 0;
535 0 : let mut seg = 0;
536 0 : while startblk < nblocks {
537 0 : let endblk = std::cmp::min(startblk + RELSEG_SIZE, nblocks);
538 0 :
539 0 : let mut segment_data: Vec<u8> = vec![];
540 0 : for blknum in startblk..endblk {
541 0 : let img = self
542 0 : .timeline
543 0 : // TODO: investigate using get_vectored for the entire startblk..endblk range.
544 0 : // But this code path is not on the critical path for most basebackups (?).
545 0 : .get(rel_block_to_key(src, blknum), self.lsn, self.ctx)
546 0 : .await?;
547 0 : segment_data.extend_from_slice(&img[..]);
548 : }
549 :
550 0 : let file_name = dst.to_segfile_name(seg as u32);
551 0 : let header = new_tar_header(&file_name, segment_data.len() as u64)?;
552 0 : self.ar
553 0 : .append(&header, segment_data.as_slice())
554 0 : .await
555 0 : .map_err(|e| BasebackupError::Client(e, "add_rel,segment"))?;
556 :
557 0 : seg += 1;
558 0 : startblk = endblk;
559 : }
560 :
561 0 : Ok(())
562 0 : }
563 :
564 : //
565 : // Include database/tablespace directories.
566 : //
567 : // Each directory contains a PG_VERSION file, and the default database
568 : // directories also contain pg_filenode.map files.
569 : //
570 0 : async fn add_dbdir(
571 0 : &mut self,
572 0 : spcnode: u32,
573 0 : dbnode: u32,
574 0 : has_relmap_file: bool,
575 0 : ) -> Result<(), BasebackupError> {
576 0 : let relmap_img = if has_relmap_file {
577 0 : let img = self
578 0 : .timeline
579 0 : .get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
580 0 : .await?;
581 :
582 0 : if img.len()
583 0 : != dispatch_pgversion!(self.timeline.pg_version, pgv::bindings::SIZEOF_RELMAPFILE)
584 : {
585 0 : return Err(BasebackupError::Server(anyhow!(
586 0 : "img.len() != SIZE_OF_RELMAPFILE, img.len()={}",
587 0 : img.len(),
588 0 : )));
589 0 : }
590 0 :
591 0 : Some(img)
592 : } else {
593 0 : None
594 : };
595 :
596 0 : if spcnode == GLOBALTABLESPACE_OID {
597 0 : let pg_version_str = match self.timeline.pg_version {
598 0 : 14 | 15 => self.timeline.pg_version.to_string(),
599 0 : ver => format!("{ver}\x0A"),
600 : };
601 0 : let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?;
602 0 : self.ar
603 0 : .append(&header, pg_version_str.as_bytes())
604 0 : .await
605 0 : .map_err(|e| BasebackupError::Client(e, "add_dbdir,PG_VERSION"))?;
606 :
607 0 : info!("timeline.pg_version {}", self.timeline.pg_version);
608 :
609 0 : if let Some(img) = relmap_img {
610 : // filenode map for global tablespace
611 0 : let header = new_tar_header("global/pg_filenode.map", img.len() as u64)?;
612 0 : self.ar
613 0 : .append(&header, &img[..])
614 0 : .await
615 0 : .map_err(|e| BasebackupError::Client(e, "add_dbdir,global/pg_filenode.map"))?;
616 : } else {
617 0 : warn!("global/pg_filenode.map is missing");
618 : }
619 : } else {
620 : // User defined tablespaces are not supported. However, as
621 : // a special case, if a tablespace/db directory is
622 : // completely empty, we can leave it out altogether. This
623 : // makes taking a base backup after the 'tablespace'
624 : // regression test pass, because the test drops the
625 : // created tablespaces after the tests.
626 : //
627 : // FIXME: this wouldn't be necessary, if we handled
628 : // XLOG_TBLSPC_DROP records. But we probably should just
629 : // throw an error on CREATE TABLESPACE in the first place.
630 0 : if !has_relmap_file
631 0 : && self
632 0 : .timeline
633 0 : .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
634 0 : .await?
635 0 : .is_empty()
636 : {
637 0 : return Ok(());
638 0 : }
639 0 : // User defined tablespaces are not supported
640 0 : if spcnode != DEFAULTTABLESPACE_OID {
641 0 : return Err(BasebackupError::Server(anyhow!(
642 0 : "spcnode != DEFAULTTABLESPACE_OID, spcnode={spcnode}"
643 0 : )));
644 0 : }
645 0 :
646 0 : // Append dir path for each database
647 0 : let path = format!("base/{}", dbnode);
648 0 : let header = new_tar_header_dir(&path)?;
649 0 : self.ar
650 0 : .append(&header, io::empty())
651 0 : .await
652 0 : .map_err(|e| BasebackupError::Client(e, "add_dbdir,base"))?;
653 :
654 0 : if let Some(img) = relmap_img {
655 0 : let dst_path = format!("base/{}/PG_VERSION", dbnode);
656 :
657 0 : let pg_version_str = match self.timeline.pg_version {
658 0 : 14 | 15 => self.timeline.pg_version.to_string(),
659 0 : ver => format!("{ver}\x0A"),
660 : };
661 0 : let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
662 0 : self.ar
663 0 : .append(&header, pg_version_str.as_bytes())
664 0 : .await
665 0 : .map_err(|e| BasebackupError::Client(e, "add_dbdir,base/PG_VERSION"))?;
666 :
667 0 : let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
668 0 : let header = new_tar_header(&relmap_path, img.len() as u64)?;
669 0 : self.ar
670 0 : .append(&header, &img[..])
671 0 : .await
672 0 : .map_err(|e| BasebackupError::Client(e, "add_dbdir,base/pg_filenode.map"))?;
673 0 : }
674 : };
675 0 : Ok(())
676 0 : }
677 :
678 : //
679 : // Extract twophase state files
680 : //
681 0 : async fn add_twophase_file(&mut self, xid: u64) -> Result<(), BasebackupError> {
682 0 : let img = self
683 0 : .timeline
684 0 : .get_twophase_file(xid, self.lsn, self.ctx)
685 0 : .await?;
686 :
687 0 : let mut buf = BytesMut::new();
688 0 : buf.extend_from_slice(&img[..]);
689 0 : let crc = crc32c::crc32c(&img[..]);
690 0 : buf.put_u32_le(crc);
691 0 : let path = if self.timeline.pg_version < 17 {
692 0 : format!("pg_twophase/{:>08X}", xid)
693 : } else {
694 0 : format!("pg_twophase/{:>016X}", xid)
695 : };
696 0 : let header = new_tar_header(&path, buf.len() as u64)?;
697 0 : self.ar
698 0 : .append(&header, &buf[..])
699 0 : .await
700 0 : .map_err(|e| BasebackupError::Client(e, "add_twophase_file"))?;
701 :
702 0 : Ok(())
703 0 : }
704 :
705 : //
706 : // Add generated pg_control file and bootstrap WAL segment.
707 : // Also send zenith.signal file with extra bootstrap data.
708 : //
709 0 : async fn add_pgcontrol_file(
710 0 : &mut self,
711 0 : pg_control_bytes: Bytes,
712 0 : system_identifier: u64,
713 0 : ) -> Result<(), BasebackupError> {
714 0 : // add zenith.signal file
715 0 : let mut zenith_signal = String::new();
716 0 : if self.prev_record_lsn == Lsn(0) {
717 0 : if self.timeline.is_ancestor_lsn(self.lsn) {
718 0 : write!(zenith_signal, "PREV LSN: none")
719 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
720 : } else {
721 0 : write!(zenith_signal, "PREV LSN: invalid")
722 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
723 : }
724 : } else {
725 0 : write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)
726 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
727 : }
728 0 : self.ar
729 0 : .append(
730 0 : &new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
731 0 : zenith_signal.as_bytes(),
732 0 : )
733 0 : .await
734 0 : .map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,zenith.signal"))?;
735 :
736 : //send pg_control
737 0 : let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
738 0 : self.ar
739 0 : .append(&header, &pg_control_bytes[..])
740 0 : .await
741 0 : .map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,pg_control"))?;
742 :
743 : //send wal segment
744 0 : let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
745 0 : let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE);
746 0 : let wal_file_path = format!("pg_wal/{}", wal_file_name);
747 0 : let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
748 :
749 0 : let wal_seg = postgres_ffi::generate_wal_segment(
750 0 : segno,
751 0 : system_identifier,
752 0 : self.timeline.pg_version,
753 0 : self.lsn,
754 0 : )
755 0 : .map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
756 0 : if wal_seg.len() != WAL_SEGMENT_SIZE {
757 0 : return Err(BasebackupError::Server(anyhow!(
758 0 : "wal_seg.len() != WAL_SEGMENT_SIZE, wal_seg.len()={}",
759 0 : wal_seg.len()
760 0 : )));
761 0 : }
762 0 : self.ar
763 0 : .append(&header, &wal_seg[..])
764 0 : .await
765 0 : .map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,wal_segment"))?;
766 0 : Ok(())
767 0 : }
768 : }
769 :
770 : //
771 : // Create new tarball entry header
772 : //
773 0 : fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
774 0 : let mut header = Header::new_gnu();
775 0 : header.set_size(size);
776 0 : header.set_path(path)?;
777 0 : header.set_mode(0b110000000); // -rw-------
778 0 : header.set_mtime(
779 0 : // use currenttime as last modified time
780 0 : SystemTime::now()
781 0 : .duration_since(SystemTime::UNIX_EPOCH)
782 0 : .unwrap()
783 0 : .as_secs(),
784 0 : );
785 0 : header.set_cksum();
786 0 : Ok(header)
787 0 : }
788 :
789 0 : fn new_tar_header_dir(path: &str) -> anyhow::Result<Header> {
790 0 : let mut header = Header::new_gnu();
791 0 : header.set_size(0);
792 0 : header.set_path(path)?;
793 0 : header.set_mode(0o755); // -rw-------
794 0 : header.set_entry_type(EntryType::dir());
795 0 : header.set_mtime(
796 0 : // use currenttime as last modified time
797 0 : SystemTime::now()
798 0 : .duration_since(SystemTime::UNIX_EPOCH)
799 0 : .unwrap()
800 0 : .as_secs(),
801 0 : );
802 0 : header.set_cksum();
803 0 : Ok(header)
804 0 : }
|