Line data Source code
1 : //!
2 : //! Generate a tarball with files needed to bootstrap ComputeNode.
3 : //!
4 : //! TODO: this module has nothing to do with PostgreSQL pg_basebackup.
5 : //! It could use a better name.
6 : //!
7 : //! Stateless Postgres compute node is launched by sending a tarball
8 : //! which contains non-relational data (multixacts, clog, filenodemaps, twophase files),
9 : //! generated pg_control and dummy segment of WAL.
10 : //! This module is responsible for creation of such tarball
11 : //! from data stored in object storage.
12 : //!
13 : use std::fmt::Write as FmtWrite;
14 : use std::sync::Arc;
15 : use std::time::{Instant, SystemTime};
16 :
17 : use anyhow::{Context, anyhow};
18 : use async_compression::tokio::write::GzipEncoder;
19 : use bytes::{BufMut, Bytes, BytesMut};
20 : use fail::fail_point;
21 : use pageserver_api::key::{Key, rel_block_to_key};
22 : use pageserver_api::reltag::{RelTag, SlruKind};
23 : use postgres_ffi::pg_constants::{PG_HBA, PGDATA_SPECIAL_FILES};
24 : use postgres_ffi::{
25 : BLCKSZ, PG_TLI, PgMajorVersion, RELSEG_SIZE, WAL_SEGMENT_SIZE, XLogFileName,
26 : dispatch_pgversion, pg_constants,
27 : };
28 : use postgres_ffi_types::constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
29 : use postgres_ffi_types::forknum::{INIT_FORKNUM, MAIN_FORKNUM};
30 : use tokio::io::{self, AsyncWrite, AsyncWriteExt as _};
31 : use tokio_tar::{Builder, EntryType, Header};
32 : use tracing::*;
33 : use utils::lsn::Lsn;
34 :
35 : use crate::context::RequestContext;
36 : use crate::pgdatadir_mapping::Version;
37 : use crate::tenant::storage_layer::IoConcurrency;
38 : use crate::tenant::timeline::{GetVectoredError, VersionedKeySpaceQuery};
39 : use crate::tenant::{PageReconstructError, Timeline};
40 :
41 : #[derive(Debug, thiserror::Error)]
42 : pub enum BasebackupError {
43 : #[error("basebackup pageserver error {0:#}")]
44 : Server(#[from] anyhow::Error),
45 : #[error("basebackup client error {0:#} when {1}")]
46 : Client(#[source] io::Error, &'static str),
47 : #[error("basebackup during shutdown")]
48 : Shutdown,
49 : }
50 :
51 : impl From<PageReconstructError> for BasebackupError {
52 0 : fn from(value: PageReconstructError) -> Self {
53 0 : match value {
54 0 : PageReconstructError::Cancelled => BasebackupError::Shutdown,
55 0 : err => BasebackupError::Server(err.into()),
56 : }
57 0 : }
58 : }
59 :
60 : impl From<GetVectoredError> for BasebackupError {
61 0 : fn from(value: GetVectoredError) -> Self {
62 0 : match value {
63 0 : GetVectoredError::Cancelled => BasebackupError::Shutdown,
64 0 : err => BasebackupError::Server(err.into()),
65 : }
66 0 : }
67 : }
68 :
69 : impl From<BasebackupError> for postgres_backend::QueryError {
70 0 : fn from(err: BasebackupError) -> Self {
71 : use postgres_backend::QueryError;
72 : use pq_proto::framed::ConnectionError;
73 0 : match err {
74 0 : BasebackupError::Client(err, _) => QueryError::Disconnected(ConnectionError::Io(err)),
75 0 : BasebackupError::Server(err) => QueryError::Other(err),
76 0 : BasebackupError::Shutdown => QueryError::Shutdown,
77 : }
78 0 : }
79 : }
80 :
81 : impl From<BasebackupError> for tonic::Status {
82 0 : fn from(err: BasebackupError) -> Self {
83 : use tonic::Code;
84 0 : let code = match &err {
85 0 : BasebackupError::Client(_, _) => Code::Cancelled,
86 0 : BasebackupError::Server(_) => Code::Internal,
87 0 : BasebackupError::Shutdown => Code::Unavailable,
88 : };
89 0 : tonic::Status::new(code, err.to_string())
90 0 : }
91 : }
92 :
93 : /// Create basebackup with non-rel data in it.
94 : /// Only include relational data if 'full_backup' is true.
95 : ///
96 : /// Currently we use empty 'req_lsn' in two cases:
97 : /// * During the basebackup right after timeline creation
98 : /// * When working without safekeepers. In this situation it is important to match the lsn
99 : /// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
100 : /// to start the replication.
101 : #[allow(clippy::too_many_arguments)]
102 0 : pub async fn send_basebackup_tarball<'a, W>(
103 0 : write: &'a mut W,
104 0 : timeline: &'a Timeline,
105 0 : req_lsn: Option<Lsn>,
106 0 : prev_lsn: Option<Lsn>,
107 0 : full_backup: bool,
108 0 : replica: bool,
109 0 : gzip_level: Option<async_compression::Level>,
110 0 : ctx: &'a RequestContext,
111 0 : ) -> Result<(), BasebackupError>
112 0 : where
113 0 : W: AsyncWrite + Send + Sync + Unpin,
114 0 : {
115 : // Compute postgres doesn't have any previous WAL files, but the first
116 : // record that it's going to write needs to include the LSN of the
117 : // previous record (xl_prev). We include prev_record_lsn in the
118 : // "neon.signal" file, so that postgres can read it during startup.
119 : //
120 : // We don't keep full history of record boundaries in the page server,
121 : // however, only the predecessor of the latest record on each
122 : // timeline. So we can only provide prev_record_lsn when you take a
123 : // base backup at the end of the timeline, i.e. at last_record_lsn.
124 : // Even at the end of the timeline, we sometimes don't have a valid
125 : // prev_lsn value; that happens if the timeline was just branched from
126 : // an old LSN and it doesn't have any WAL of its own yet. We will set
127 : // prev_lsn to Lsn(0) if we cannot provide the correct value.
128 0 : let (backup_prev, lsn) = if let Some(req_lsn) = req_lsn {
129 : // Backup was requested at a particular LSN. The caller should've
130 : // already checked that it's a valid LSN.
131 :
132 : // If the requested point is the end of the timeline, we can
133 : // provide prev_lsn. (get_last_record_rlsn() might return it as
134 : // zero, though, if no WAL has been generated on this timeline
135 : // yet.)
136 0 : let end_of_timeline = timeline.get_last_record_rlsn();
137 0 : if req_lsn == end_of_timeline.last {
138 0 : (end_of_timeline.prev, req_lsn)
139 : } else {
140 0 : (Lsn(0), req_lsn)
141 : }
142 : } else {
143 : // Backup was requested at end of the timeline.
144 0 : let end_of_timeline = timeline.get_last_record_rlsn();
145 0 : (end_of_timeline.prev, end_of_timeline.last)
146 : };
147 :
148 : // Consolidate the derived and the provided prev_lsn values
149 0 : let prev_record_lsn = if let Some(provided_prev_lsn) = prev_lsn {
150 0 : if backup_prev != Lsn(0) && backup_prev != provided_prev_lsn {
151 0 : return Err(BasebackupError::Server(anyhow!(
152 0 : "backup_prev {backup_prev} != provided_prev_lsn {provided_prev_lsn}"
153 0 : )));
154 0 : }
155 0 : provided_prev_lsn
156 : } else {
157 0 : backup_prev
158 : };
159 :
160 0 : info!(
161 0 : "taking basebackup lsn={lsn}, prev_lsn={prev_record_lsn} \
162 0 : (full_backup={full_backup}, replica={replica}, gzip={gzip_level:?})",
163 : );
164 0 : let span = info_span!("send_tarball", backup_lsn=%lsn);
165 :
166 0 : let io_concurrency = IoConcurrency::spawn_from_conf(
167 0 : timeline.conf.get_vectored_concurrent_io,
168 0 : timeline
169 0 : .gate
170 0 : .enter()
171 0 : .map_err(|_| BasebackupError::Shutdown)?,
172 : );
173 :
174 0 : if let Some(gzip_level) = gzip_level {
175 0 : let mut encoder = GzipEncoder::with_quality(write, gzip_level);
176 0 : Basebackup {
177 0 : ar: Builder::new_non_terminated(&mut encoder),
178 0 : timeline,
179 0 : lsn,
180 0 : prev_record_lsn,
181 0 : full_backup,
182 0 : replica,
183 0 : ctx,
184 0 : io_concurrency,
185 0 : }
186 0 : .send_tarball()
187 0 : .instrument(span)
188 0 : .await?;
189 0 : encoder
190 0 : .shutdown()
191 0 : .await
192 0 : .map_err(|err| BasebackupError::Client(err, "gzip"))?;
193 : } else {
194 0 : Basebackup {
195 0 : ar: Builder::new_non_terminated(write),
196 0 : timeline,
197 0 : lsn,
198 0 : prev_record_lsn,
199 0 : full_backup,
200 0 : replica,
201 0 : ctx,
202 0 : io_concurrency,
203 0 : }
204 0 : .send_tarball()
205 0 : .instrument(span)
206 0 : .await?;
207 : }
208 :
209 0 : Ok(())
210 0 : }
211 :
212 : /// This is short-living object only for the time of tarball creation,
213 : /// created mostly to avoid passing a lot of parameters between various functions
214 : /// used for constructing tarball.
215 : struct Basebackup<'a, W>
216 : where
217 : W: AsyncWrite + Send + Sync + Unpin,
218 : {
219 : ar: Builder<&'a mut W>,
220 : timeline: &'a Timeline,
221 : lsn: Lsn,
222 : prev_record_lsn: Lsn,
223 : full_backup: bool,
224 : replica: bool,
225 : ctx: &'a RequestContext,
226 : io_concurrency: IoConcurrency,
227 : }
228 :
229 : /// A sink that accepts SLRU blocks ordered by key and forwards
230 : /// full segments to the archive.
231 : struct SlruSegmentsBuilder<'a, 'b, W>
232 : where
233 : W: AsyncWrite + Send + Sync + Unpin,
234 : {
235 : ar: &'a mut Builder<&'b mut W>,
236 : buf: Vec<u8>,
237 : current_segment: Option<(SlruKind, u32)>,
238 : total_blocks: usize,
239 : }
240 :
241 : impl<'a, 'b, W> SlruSegmentsBuilder<'a, 'b, W>
242 : where
243 : W: AsyncWrite + Send + Sync + Unpin,
244 : {
245 0 : fn new(ar: &'a mut Builder<&'b mut W>) -> Self {
246 0 : Self {
247 0 : ar,
248 0 : buf: Vec::new(),
249 0 : current_segment: None,
250 0 : total_blocks: 0,
251 0 : }
252 0 : }
253 :
254 0 : async fn add_block(&mut self, key: &Key, block: Bytes) -> Result<(), BasebackupError> {
255 0 : let (kind, segno, _) = key.to_slru_block()?;
256 :
257 0 : match kind {
258 : SlruKind::Clog => {
259 0 : if !(block.len() == BLCKSZ as usize || block.len() == BLCKSZ as usize + 8) {
260 0 : return Err(BasebackupError::Server(anyhow!(
261 0 : "invalid SlruKind::Clog record: block.len()={}",
262 0 : block.len()
263 0 : )));
264 0 : }
265 : }
266 : SlruKind::MultiXactMembers | SlruKind::MultiXactOffsets => {
267 0 : if block.len() != BLCKSZ as usize {
268 0 : return Err(BasebackupError::Server(anyhow!(
269 0 : "invalid {:?} record: block.len()={}",
270 0 : kind,
271 0 : block.len()
272 0 : )));
273 0 : }
274 : }
275 : }
276 :
277 0 : let segment = (kind, segno);
278 0 : match self.current_segment {
279 0 : None => {
280 0 : self.current_segment = Some(segment);
281 0 : self.buf
282 0 : .extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
283 0 : }
284 0 : Some(current_seg) if current_seg == segment => {
285 0 : self.buf
286 0 : .extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
287 0 : }
288 : Some(_) => {
289 0 : self.flush().await?;
290 :
291 0 : self.current_segment = Some(segment);
292 0 : self.buf
293 0 : .extend_from_slice(block.slice(..BLCKSZ as usize).as_ref());
294 : }
295 : }
296 :
297 0 : Ok(())
298 0 : }
299 :
300 0 : async fn flush(&mut self) -> Result<(), BasebackupError> {
301 0 : let nblocks = self.buf.len() / BLCKSZ as usize;
302 0 : let (kind, segno) = self.current_segment.take().unwrap();
303 0 : let segname = format!("{kind}/{segno:>04X}");
304 0 : let header = new_tar_header(&segname, self.buf.len() as u64)?;
305 0 : self.ar
306 0 : .append(&header, self.buf.as_slice())
307 0 : .await
308 0 : .map_err(|e| BasebackupError::Client(e, "flush"))?;
309 :
310 0 : self.total_blocks += nblocks;
311 0 : debug!("Added to basebackup slru {} relsize {}", segname, nblocks);
312 :
313 0 : self.buf.clear();
314 :
315 0 : Ok(())
316 0 : }
317 :
318 0 : async fn finish(mut self) -> Result<(), BasebackupError> {
319 0 : let res = if self.current_segment.is_none() || self.buf.is_empty() {
320 0 : Ok(())
321 : } else {
322 0 : self.flush().await
323 : };
324 :
325 0 : info!("Collected {} SLRU blocks", self.total_blocks);
326 :
327 0 : res
328 0 : }
329 : }
330 :
331 : impl<W> Basebackup<'_, W>
332 : where
333 : W: AsyncWrite + Send + Sync + Unpin,
334 : {
335 0 : async fn send_tarball(mut self) -> Result<(), BasebackupError> {
336 : // TODO include checksum
337 :
338 : // Construct the pg_control file from the persisted checkpoint and pg_control
339 : // information. But we only add this to the tarball at the end, so that if the
340 : // writing is interrupted half-way through, the resulting incomplete tarball will
341 : // be missing the pg_control file, which prevents PostgreSQL from starting up on
342 : // it. With proper error handling, you should never try to start up from an
343 : // incomplete basebackup in the first place, of course, but this is a nice little
344 : // extra safety measure.
345 0 : let checkpoint_bytes = self
346 0 : .timeline
347 0 : .get_checkpoint(self.lsn, self.ctx)
348 0 : .await
349 0 : .context("failed to get checkpoint bytes")?;
350 0 : let pg_control_bytes = self
351 0 : .timeline
352 0 : .get_control_file(self.lsn, self.ctx)
353 0 : .await
354 0 : .context("failed to get control bytes")?;
355 0 : let (pg_control_bytes, system_identifier, was_shutdown) =
356 0 : postgres_ffi::generate_pg_control(
357 0 : &pg_control_bytes,
358 0 : &checkpoint_bytes,
359 0 : self.lsn,
360 0 : self.timeline.pg_version,
361 0 : )?;
362 :
363 0 : let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;
364 :
365 0 : let pgversion = self.timeline.pg_version;
366 0 : let subdirs = dispatch_pgversion!(pgversion, &pgv::bindings::PGDATA_SUBDIRS[..]);
367 :
368 : // Create pgdata subdirs structure
369 0 : for dir in subdirs.iter() {
370 0 : let header = new_tar_header_dir(dir)?;
371 0 : self.ar
372 0 : .append(&header, io::empty())
373 0 : .await
374 0 : .map_err(|e| BasebackupError::Client(e, "send_tarball"))?;
375 : }
376 :
377 : // Send config files.
378 0 : for filepath in PGDATA_SPECIAL_FILES.iter() {
379 0 : if *filepath == "pg_hba.conf" {
380 0 : let data = PG_HBA.as_bytes();
381 0 : let header = new_tar_header(filepath, data.len() as u64)?;
382 0 : self.ar
383 0 : .append(&header, data)
384 0 : .await
385 0 : .map_err(|e| BasebackupError::Client(e, "send_tarball,pg_hba.conf"))?;
386 : } else {
387 0 : let header = new_tar_header(filepath, 0)?;
388 0 : self.ar
389 0 : .append(&header, io::empty())
390 0 : .await
391 0 : .map_err(|e| BasebackupError::Client(e, "send_tarball,add_config_file"))?;
392 : }
393 : }
394 0 : if !lazy_slru_download {
395 : // Gather non-relational files from object storage pages.
396 0 : let slru_partitions = self
397 0 : .timeline
398 0 : .get_slru_keyspace(Version::at(self.lsn), self.ctx)
399 0 : .await?
400 0 : .partition(
401 0 : self.timeline.get_shard_identity(),
402 0 : self.timeline.conf.max_get_vectored_keys.get() as u64 * BLCKSZ as u64,
403 0 : BLCKSZ as u64,
404 : );
405 :
406 0 : let mut slru_builder = SlruSegmentsBuilder::new(&mut self.ar);
407 :
408 0 : for part in slru_partitions.parts {
409 0 : let query = VersionedKeySpaceQuery::uniform(part, self.lsn);
410 0 : let blocks = self
411 0 : .timeline
412 0 : .get_vectored(query, self.io_concurrency.clone(), self.ctx)
413 0 : .await?;
414 :
415 0 : for (key, block) in blocks {
416 0 : let block = block?;
417 0 : slru_builder.add_block(&key, block).await?;
418 : }
419 : }
420 0 : slru_builder.finish().await?;
421 0 : }
422 :
423 0 : let mut min_restart_lsn: Lsn = Lsn::MAX;
424 :
425 0 : let mut dbdir_cnt = 0;
426 0 : let mut rel_cnt = 0;
427 :
428 : // Create tablespace directories
429 0 : for ((spcnode, dbnode), has_relmap_file) in
430 0 : self.timeline.list_dbdirs(self.lsn, self.ctx).await?
431 : {
432 0 : self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
433 0 : dbdir_cnt += 1;
434 : // If full backup is requested, include all relation files.
435 : // Otherwise only include init forks of unlogged relations.
436 0 : let rels = self
437 0 : .timeline
438 0 : .list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
439 0 : .await?;
440 0 : for &rel in rels.iter() {
441 0 : rel_cnt += 1;
442 : // Send init fork as main fork to provide well formed empty
443 : // contents of UNLOGGED relations. Postgres copies it in
444 : // `reinit.c` during recovery.
445 0 : if rel.forknum == INIT_FORKNUM {
446 : // I doubt we need _init fork itself, but having it at least
447 : // serves as a marker relation is unlogged.
448 0 : self.add_rel(rel, rel).await?;
449 0 : self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
450 0 : continue;
451 0 : }
452 :
453 0 : if self.full_backup {
454 0 : if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
455 : {
456 : // skip this, will include it when we reach the init fork
457 0 : continue;
458 0 : }
459 0 : self.add_rel(rel, rel).await?;
460 0 : }
461 : }
462 : }
463 :
464 0 : self.timeline
465 0 : .db_rel_count
466 0 : .store(Some(Arc::new((dbdir_cnt, rel_cnt))));
467 :
468 0 : let start_time = Instant::now();
469 0 : let aux_files = self
470 0 : .timeline
471 0 : .list_aux_files(self.lsn, self.ctx, self.io_concurrency.clone())
472 0 : .await?;
473 0 : let aux_scan_time = start_time.elapsed();
474 0 : let aux_estimated_size = aux_files
475 0 : .values()
476 0 : .map(|content| content.len())
477 0 : .sum::<usize>();
478 0 : info!(
479 0 : "Scanned {} aux files in {}ms, aux file content size = {}",
480 0 : aux_files.len(),
481 0 : aux_scan_time.as_millis(),
482 : aux_estimated_size
483 : );
484 :
485 0 : for (path, content) in aux_files {
486 0 : if path.starts_with("pg_replslot") {
487 : // Do not create LR slots at standby because they are not used but prevent WAL truncation
488 0 : if self.replica {
489 0 : continue;
490 0 : }
491 0 : let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
492 0 : let restart_lsn = Lsn(u64::from_le_bytes(
493 0 : content[offs..offs + 8].try_into().unwrap(),
494 0 : ));
495 0 : info!("Replication slot {} restart LSN={}", path, restart_lsn);
496 0 : min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
497 0 : } else if path == "pg_logical/replorigin_checkpoint" {
498 : // replorigin_checkoint is written only on compute shutdown, so it contains
499 : // deteriorated values. So we generate our own version of this file for the particular LSN
500 : // based on information about replorigins extracted from transaction commit records.
501 : // In future we will not generate AUX record for "pg_logical/replorigin_checkpoint" at all,
502 : // but now we should handle (skip) it for backward compatibility.
503 0 : continue;
504 0 : } else if path == "pg_stat/pgstat.stat" && !was_shutdown {
505 : // Drop statistic in case of abnormal termination, i.e. if we're not starting from the exact LSN
506 : // of a shutdown checkpoint.
507 0 : continue;
508 0 : }
509 0 : let header = new_tar_header(&path, content.len() as u64)?;
510 0 : self.ar
511 0 : .append(&header, &*content)
512 0 : .await
513 0 : .map_err(|e| BasebackupError::Client(e, "send_tarball,add_aux_file"))?;
514 : }
515 :
516 0 : if min_restart_lsn != Lsn::MAX {
517 0 : info!(
518 0 : "Min restart LSN for logical replication is {}",
519 : min_restart_lsn
520 : );
521 0 : let data = min_restart_lsn.0.to_le_bytes();
522 0 : let header = new_tar_header("restart.lsn", data.len() as u64)?;
523 0 : self.ar
524 0 : .append(&header, &data[..])
525 0 : .await
526 0 : .map_err(|e| BasebackupError::Client(e, "send_tarball,restart.lsn"))?;
527 0 : }
528 0 : for xid in self
529 0 : .timeline
530 0 : .list_twophase_files(self.lsn, self.ctx)
531 0 : .await?
532 : {
533 0 : self.add_twophase_file(xid).await?;
534 : }
535 0 : let repl_origins = self
536 0 : .timeline
537 0 : .get_replorigins(self.lsn, self.ctx, self.io_concurrency.clone())
538 0 : .await?;
539 0 : let n_origins = repl_origins.len();
540 0 : if n_origins != 0 {
541 : //
542 : // Construct "pg_logical/replorigin_checkpoint" file based on information about replication origins
543 : // extracted from transaction commit record. We are using this file to pass information about replication
544 : // origins to compute to allow logical replication to restart from proper point.
545 : //
546 0 : let mut content = Vec::with_capacity(n_origins * 16 + 8);
547 0 : content.extend_from_slice(&pg_constants::REPLICATION_STATE_MAGIC.to_le_bytes());
548 0 : for (origin_id, origin_lsn) in repl_origins {
549 0 : content.extend_from_slice(&origin_id.to_le_bytes());
550 0 : content.extend_from_slice(&[0u8; 6]); // align to 8 bytes
551 0 : content.extend_from_slice(&origin_lsn.0.to_le_bytes());
552 0 : }
553 0 : let crc32 = crc32c::crc32c(&content);
554 0 : content.extend_from_slice(&crc32.to_le_bytes());
555 0 : let header = new_tar_header("pg_logical/replorigin_checkpoint", content.len() as u64)?;
556 0 : self.ar.append(&header, &*content).await.map_err(|e| {
557 0 : BasebackupError::Client(e, "send_tarball,pg_logical/replorigin_checkpoint")
558 0 : })?;
559 0 : }
560 :
561 0 : fail_point!("basebackup-before-control-file", |_| {
562 0 : Err(BasebackupError::Server(anyhow!(
563 0 : "failpoint basebackup-before-control-file"
564 0 : )))
565 0 : });
566 :
567 : // Last, add the pg_control file and bootstrap WAL segment.
568 0 : self.add_pgcontrol_file(pg_control_bytes, system_identifier)
569 0 : .await?;
570 0 : self.ar
571 0 : .finish()
572 0 : .await
573 0 : .map_err(|e| BasebackupError::Client(e, "send_tarball,finish"))?;
574 0 : debug!("all tarred up!");
575 0 : Ok(())
576 0 : }
577 :
578 : /// Add contents of relfilenode `src`, naming it as `dst`.
579 0 : async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> Result<(), BasebackupError> {
580 0 : let nblocks = self
581 0 : .timeline
582 0 : .get_rel_size(src, Version::at(self.lsn), self.ctx)
583 0 : .await?;
584 :
585 : // If the relation is empty, create an empty file
586 0 : if nblocks == 0 {
587 0 : let file_name = dst.to_segfile_name(0);
588 0 : let header = new_tar_header(&file_name, 0)?;
589 0 : self.ar
590 0 : .append(&header, io::empty())
591 0 : .await
592 0 : .map_err(|e| BasebackupError::Client(e, "add_rel,empty"))?;
593 0 : return Ok(());
594 0 : }
595 :
596 : // Add a file for each chunk of blocks (aka segment)
597 0 : let mut startblk = 0;
598 0 : let mut seg = 0;
599 0 : while startblk < nblocks {
600 0 : let endblk = std::cmp::min(startblk + RELSEG_SIZE, nblocks);
601 :
602 0 : let mut segment_data: Vec<u8> = vec![];
603 0 : for blknum in startblk..endblk {
604 0 : let img = self
605 0 : .timeline
606 0 : // TODO: investigate using get_vectored for the entire startblk..endblk range.
607 0 : // But this code path is not on the critical path for most basebackups (?).
608 0 : .get(rel_block_to_key(src, blknum), self.lsn, self.ctx)
609 0 : .await?;
610 0 : segment_data.extend_from_slice(&img[..]);
611 : }
612 :
613 0 : let file_name = dst.to_segfile_name(seg as u32);
614 0 : let header = new_tar_header(&file_name, segment_data.len() as u64)?;
615 0 : self.ar
616 0 : .append(&header, segment_data.as_slice())
617 0 : .await
618 0 : .map_err(|e| BasebackupError::Client(e, "add_rel,segment"))?;
619 :
620 0 : seg += 1;
621 0 : startblk = endblk;
622 : }
623 :
624 0 : Ok(())
625 0 : }
626 :
627 : //
628 : // Include database/tablespace directories.
629 : //
630 : // Each directory contains a PG_VERSION file, and the default database
631 : // directories also contain pg_filenode.map files.
632 : //
633 0 : async fn add_dbdir(
634 0 : &mut self,
635 0 : spcnode: u32,
636 0 : dbnode: u32,
637 0 : has_relmap_file: bool,
638 0 : ) -> Result<(), BasebackupError> {
639 0 : let relmap_img = if has_relmap_file {
640 0 : let img = self
641 0 : .timeline
642 0 : .get_relmap_file(spcnode, dbnode, Version::at(self.lsn), self.ctx)
643 0 : .await?;
644 :
645 0 : if img.len()
646 0 : != dispatch_pgversion!(self.timeline.pg_version, pgv::bindings::SIZEOF_RELMAPFILE)
647 : {
648 0 : return Err(BasebackupError::Server(anyhow!(
649 0 : "img.len() != SIZE_OF_RELMAPFILE, img.len()={}",
650 0 : img.len(),
651 0 : )));
652 0 : }
653 :
654 0 : Some(img)
655 : } else {
656 0 : None
657 : };
658 :
659 0 : if spcnode == GLOBALTABLESPACE_OID {
660 0 : let pg_version_str = self.timeline.pg_version.versionfile_string();
661 0 : let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?;
662 0 : self.ar
663 0 : .append(&header, pg_version_str.as_bytes())
664 0 : .await
665 0 : .map_err(|e| BasebackupError::Client(e, "add_dbdir,PG_VERSION"))?;
666 :
667 0 : info!("timeline.pg_version {}", self.timeline.pg_version);
668 :
669 0 : if let Some(img) = relmap_img {
670 : // filenode map for global tablespace
671 0 : let header = new_tar_header("global/pg_filenode.map", img.len() as u64)?;
672 0 : self.ar
673 0 : .append(&header, &img[..])
674 0 : .await
675 0 : .map_err(|e| BasebackupError::Client(e, "add_dbdir,global/pg_filenode.map"))?;
676 : } else {
677 0 : warn!("global/pg_filenode.map is missing");
678 : }
679 : } else {
680 : // User defined tablespaces are not supported. However, as
681 : // a special case, if a tablespace/db directory is
682 : // completely empty, we can leave it out altogether. This
683 : // makes taking a base backup after the 'tablespace'
684 : // regression test pass, because the test drops the
685 : // created tablespaces after the tests.
686 : //
687 : // FIXME: this wouldn't be necessary, if we handled
688 : // XLOG_TBLSPC_DROP records. But we probably should just
689 : // throw an error on CREATE TABLESPACE in the first place.
690 0 : if !has_relmap_file
691 0 : && self
692 0 : .timeline
693 0 : .list_rels(spcnode, dbnode, Version::at(self.lsn), self.ctx)
694 0 : .await?
695 0 : .is_empty()
696 : {
697 0 : return Ok(());
698 0 : }
699 : // User defined tablespaces are not supported
700 0 : if spcnode != DEFAULTTABLESPACE_OID {
701 0 : return Err(BasebackupError::Server(anyhow!(
702 0 : "spcnode != DEFAULTTABLESPACE_OID, spcnode={spcnode}"
703 0 : )));
704 0 : }
705 :
706 : // Append dir path for each database
707 0 : let path = format!("base/{dbnode}");
708 0 : let header = new_tar_header_dir(&path)?;
709 0 : self.ar
710 0 : .append(&header, io::empty())
711 0 : .await
712 0 : .map_err(|e| BasebackupError::Client(e, "add_dbdir,base"))?;
713 :
714 0 : if let Some(img) = relmap_img {
715 0 : let dst_path = format!("base/{dbnode}/PG_VERSION");
716 :
717 0 : let pg_version_str = self.timeline.pg_version.versionfile_string();
718 0 : let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
719 0 : self.ar
720 0 : .append(&header, pg_version_str.as_bytes())
721 0 : .await
722 0 : .map_err(|e| BasebackupError::Client(e, "add_dbdir,base/PG_VERSION"))?;
723 :
724 0 : let relmap_path = format!("base/{dbnode}/pg_filenode.map");
725 0 : let header = new_tar_header(&relmap_path, img.len() as u64)?;
726 0 : self.ar
727 0 : .append(&header, &img[..])
728 0 : .await
729 0 : .map_err(|e| BasebackupError::Client(e, "add_dbdir,base/pg_filenode.map"))?;
730 0 : }
731 : };
732 0 : Ok(())
733 0 : }
734 :
735 : //
736 : // Extract twophase state files
737 : //
738 0 : async fn add_twophase_file(&mut self, xid: u64) -> Result<(), BasebackupError> {
739 0 : let img = self
740 0 : .timeline
741 0 : .get_twophase_file(xid, self.lsn, self.ctx)
742 0 : .await?;
743 :
744 0 : let mut buf = BytesMut::new();
745 0 : buf.extend_from_slice(&img[..]);
746 0 : let crc = crc32c::crc32c(&img[..]);
747 0 : buf.put_u32_le(crc);
748 0 : let path = if self.timeline.pg_version < PgMajorVersion::PG17 {
749 0 : format!("pg_twophase/{xid:>08X}")
750 : } else {
751 0 : format!("pg_twophase/{xid:>016X}")
752 : };
753 0 : let header = new_tar_header(&path, buf.len() as u64)?;
754 0 : self.ar
755 0 : .append(&header, &buf[..])
756 0 : .await
757 0 : .map_err(|e| BasebackupError::Client(e, "add_twophase_file"))?;
758 :
759 0 : Ok(())
760 0 : }
761 :
762 : //
763 : // Add generated pg_control file and bootstrap WAL segment.
764 : // Also send neon.signal and zenith.signal file with extra bootstrap data.
765 : //
766 0 : async fn add_pgcontrol_file(
767 0 : &mut self,
768 0 : pg_control_bytes: Bytes,
769 0 : system_identifier: u64,
770 0 : ) -> Result<(), BasebackupError> {
771 : // add neon.signal file
772 0 : let mut neon_signal = String::new();
773 0 : if self.prev_record_lsn == Lsn(0) {
774 0 : if self.timeline.is_ancestor_lsn(self.lsn) {
775 0 : write!(neon_signal, "PREV LSN: none")
776 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
777 : } else {
778 0 : write!(neon_signal, "PREV LSN: invalid")
779 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
780 : }
781 : } else {
782 0 : write!(neon_signal, "PREV LSN: {}", self.prev_record_lsn)
783 0 : .map_err(|e| BasebackupError::Server(e.into()))?;
784 : }
785 :
786 : // TODO: Remove zenith.signal once all historical computes have been replaced
787 : // ... and thus support the neon.signal file.
788 0 : for signalfilename in ["neon.signal", "zenith.signal"] {
789 0 : self.ar
790 0 : .append(
791 0 : &new_tar_header(signalfilename, neon_signal.len() as u64)?,
792 0 : neon_signal.as_bytes(),
793 : )
794 0 : .await
795 0 : .map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,neon.signal"))?;
796 : }
797 :
798 : //send pg_control
799 0 : let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
800 0 : self.ar
801 0 : .append(&header, &pg_control_bytes[..])
802 0 : .await
803 0 : .map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,pg_control"))?;
804 :
805 : //send wal segment
806 0 : let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
807 0 : let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE);
808 0 : let wal_file_path = format!("pg_wal/{wal_file_name}");
809 0 : let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
810 :
811 0 : let wal_seg = postgres_ffi::generate_wal_segment(
812 0 : segno,
813 0 : system_identifier,
814 0 : self.timeline.pg_version,
815 0 : self.lsn,
816 : )
817 0 : .map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
818 0 : if wal_seg.len() != WAL_SEGMENT_SIZE {
819 0 : return Err(BasebackupError::Server(anyhow!(
820 0 : "wal_seg.len() != WAL_SEGMENT_SIZE, wal_seg.len()={}",
821 0 : wal_seg.len()
822 0 : )));
823 0 : }
824 0 : self.ar
825 0 : .append(&header, &wal_seg[..])
826 0 : .await
827 0 : .map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,wal_segment"))?;
828 0 : Ok(())
829 0 : }
830 : }
831 :
832 : //
833 : // Create new tarball entry header
834 : //
835 0 : fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
836 0 : let mut header = Header::new_gnu();
837 0 : header.set_size(size);
838 0 : header.set_path(path)?;
839 0 : header.set_mode(0b110000000); // -rw-------
840 0 : header.set_mtime(
841 : // use currenttime as last modified time
842 0 : SystemTime::now()
843 0 : .duration_since(SystemTime::UNIX_EPOCH)
844 0 : .unwrap()
845 0 : .as_secs(),
846 : );
847 0 : header.set_cksum();
848 0 : Ok(header)
849 0 : }
850 :
851 0 : fn new_tar_header_dir(path: &str) -> anyhow::Result<Header> {
852 0 : let mut header = Header::new_gnu();
853 0 : header.set_size(0);
854 0 : header.set_path(path)?;
855 0 : header.set_mode(0o755); // -rw-------
856 0 : header.set_entry_type(EntryType::dir());
857 0 : header.set_mtime(
858 : // use currenttime as last modified time
859 0 : SystemTime::now()
860 0 : .duration_since(SystemTime::UNIX_EPOCH)
861 0 : .unwrap()
862 0 : .as_secs(),
863 : );
864 0 : header.set_cksum();
865 0 : Ok(header)
866 0 : }
|