TLA Line data Source code
1 : //!
2 : //! Generate a tarball with files needed to bootstrap ComputeNode.
3 : //!
4 : //! TODO: this module has nothing to do with PostgreSQL pg_basebackup.
5 : //! It could use a better name.
6 : //!
7 : //! Stateless Postgres compute node is launched by sending a tarball
8 : //! which contains non-relational data (multixacts, clog, filenodemaps, twophase files),
9 : //! generated pg_control and dummy segment of WAL.
10 : //! This module is responsible for creation of such tarball
11 : //! from data stored in object storage.
12 : //!
13 : use anyhow::{anyhow, bail, ensure, Context};
14 : use bytes::{BufMut, BytesMut};
15 : use fail::fail_point;
16 : use postgres_ffi::pg_constants;
17 : use std::fmt::Write as FmtWrite;
18 : use std::time::SystemTime;
19 : use tokio::io;
20 : use tokio::io::AsyncWrite;
21 : use tracing::*;
22 :
23 : use tokio_tar::{Builder, EntryType, Header};
24 :
25 : use crate::context::RequestContext;
26 : use crate::pgdatadir_mapping::Version;
27 : use crate::tenant::Timeline;
28 : use pageserver_api::reltag::{RelTag, SlruKind};
29 :
30 : use postgres_ffi::dispatch_pgversion;
31 : use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
32 : use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
33 : use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
34 : use postgres_ffi::TransactionId;
35 : use postgres_ffi::XLogFileName;
36 : use postgres_ffi::PG_TLI;
37 : use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE};
38 : use utils::lsn::Lsn;
39 :
40 : /// Create basebackup with non-rel data in it.
41 : /// Only include relational data if 'full_backup' is true.
42 : ///
43 : /// Currently we use empty 'req_lsn' in two cases:
44 : /// * During the basebackup right after timeline creation
45 : /// * When working without safekeepers. In this situation it is important to match the lsn
46 : /// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
47 : /// to start the replication.
48 CBC 559 : pub async fn send_basebackup_tarball<'a, W>(
49 559 : write: &'a mut W,
50 559 : timeline: &'a Timeline,
51 559 : req_lsn: Option<Lsn>,
52 559 : prev_lsn: Option<Lsn>,
53 559 : full_backup: bool,
54 559 : ctx: &'a RequestContext,
55 559 : ) -> anyhow::Result<()>
56 559 : where
57 559 : W: AsyncWrite + Send + Sync + Unpin,
58 559 : {
59 : // Compute postgres doesn't have any previous WAL files, but the first
60 : // record that it's going to write needs to include the LSN of the
61 : // previous record (xl_prev). We include prev_record_lsn in the
62 : // "zenith.signal" file, so that postgres can read it during startup.
63 : //
64 : // We don't keep full history of record boundaries in the page server,
65 : // however, only the predecessor of the latest record on each
66 : // timeline. So we can only provide prev_record_lsn when you take a
67 : // base backup at the end of the timeline, i.e. at last_record_lsn.
68 : // Even at the end of the timeline, we sometimes don't have a valid
69 : // prev_lsn value; that happens if the timeline was just branched from
70 : // an old LSN and it doesn't have any WAL of its own yet. We will set
71 : // prev_lsn to Lsn(0) if we cannot provide the correct value.
72 559 : let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
73 : // Backup was requested at a particular LSN. The caller should've
74 : // already checked that it's a valid LSN.
75 :
76 : // If the requested point is the end of the timeline, we can
77 : // provide prev_lsn. (get_last_record_rlsn() might return it as
78 : // zero, though, if no WAL has been generated on this timeline
79 : // yet.)
80 180 : let end_of_timeline = timeline.get_last_record_rlsn();
81 180 : if req_lsn == end_of_timeline.last {
82 141 : (end_of_timeline.prev, req_lsn)
83 : } else {
84 39 : (Lsn(0), req_lsn)
85 : }
86 : } else {
87 : // Backup was requested at end of the timeline.
88 379 : let end_of_timeline = timeline.get_last_record_rlsn();
89 379 : (end_of_timeline.prev, end_of_timeline.last)
90 : };
91 :
92 : // Consolidate the derived and the provided prev_lsn values
93 559 : let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
94 12 : if backup_prev != Lsn(0) {
95 10 : ensure!(backup_prev == provided_prev_lsn);
96 2 : }
97 12 : provided_prev_lsn
98 : } else {
99 547 : backup_prev
100 : };
101 :
102 559 : info!(
103 559 : "taking basebackup lsn={}, prev_lsn={} (full_backup={})",
104 559 : backup_lsn, prev_lsn, full_backup
105 559 : );
106 :
107 559 : let basebackup = Basebackup {
108 559 : ar: Builder::new_non_terminated(write),
109 559 : timeline,
110 559 : lsn: backup_lsn,
111 559 : prev_record_lsn: prev_lsn,
112 559 : full_backup,
113 559 : ctx,
114 559 : };
115 559 : basebackup
116 559 : .send_tarball()
117 559 : .instrument(info_span!("send_tarball", backup_lsn=%backup_lsn))
118 35722 : .await
119 559 : }
120 :
121 : /// This is short-living object only for the time of tarball creation,
122 : /// created mostly to avoid passing a lot of parameters between various functions
123 : /// used for constructing tarball.
124 : struct Basebackup<'a, W>
125 : where
126 : W: AsyncWrite + Send + Sync + Unpin,
127 : {
128 : ar: Builder<&'a mut W>,
129 : timeline: &'a Timeline,
130 : lsn: Lsn,
131 : prev_record_lsn: Lsn,
132 : full_backup: bool,
133 : ctx: &'a RequestContext,
134 : }
135 :
136 : impl<'a, W> Basebackup<'a, W>
137 : where
138 : W: AsyncWrite + Send + Sync + Unpin,
139 : {
140 559 : async fn send_tarball(mut self) -> anyhow::Result<()> {
141 : // TODO include checksum
142 :
143 : // Create pgdata subdirs structure
144 12298 : for dir in PGDATA_SUBDIRS.iter() {
145 12298 : let header = new_tar_header_dir(dir)?;
146 12298 : self.ar
147 12298 : .append(&header, &mut io::empty())
148 UBC 0 : .await
149 CBC 12298 : .context("could not add directory to basebackup tarball")?;
150 : }
151 :
152 : // Send config files.
153 1677 : for filepath in PGDATA_SPECIAL_FILES.iter() {
154 1677 : if *filepath == "pg_hba.conf" {
155 559 : let data = PG_HBA.as_bytes();
156 559 : let header = new_tar_header(filepath, data.len() as u64)?;
157 559 : self.ar
158 559 : .append(&header, data)
159 UBC 0 : .await
160 CBC 559 : .context("could not add config file to basebackup tarball")?;
161 : } else {
162 1118 : let header = new_tar_header(filepath, 0)?;
163 1118 : self.ar
164 1118 : .append(&header, &mut io::empty())
165 UBC 0 : .await
166 CBC 1118 : .context("could not add config file to basebackup tarball")?;
167 : }
168 : }
169 :
170 : // Gather non-relational files from object storage pages.
171 1675 : for kind in [
172 559 : SlruKind::Clog,
173 559 : SlruKind::MultiXactOffsets,
174 559 : SlruKind::MultiXactMembers,
175 : ] {
176 1692 : for segno in self
177 1675 : .timeline
178 1675 : .list_slru_segments(kind, Version::Lsn(self.lsn), self.ctx)
179 196 : .await?
180 : {
181 20256 : self.add_slru_segment(kind, segno).await?;
182 : }
183 : }
184 :
185 558 : let mut min_restart_lsn: Lsn = Lsn::MAX;
186 : // Create tablespace directories
187 2241 : for ((spcnode, dbnode), has_relmap_file) in
188 558 : self.timeline.list_dbdirs(self.lsn, self.ctx).await?
189 : {
190 2241 : self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
191 :
192 : // If full backup is requested, include all relation files.
193 : // Otherwise only include init forks of unlogged relations.
194 2241 : let rels = self
195 2241 : .timeline
196 2241 : .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
197 102 : .await?;
198 525344 : for &rel in rels.iter() {
199 : // Send init fork as main fork to provide well formed empty
200 : // contents of UNLOGGED relations. Postgres copies it in
201 : // `reinit.c` during recovery.
202 525344 : if rel.forknum == INIT_FORKNUM {
203 : // I doubt we need _init fork itself, but having it at least
204 : // serves as a marker relation is unlogged.
205 2 : self.add_rel(rel, rel).await?;
206 2 : self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
207 2 : continue;
208 525342 : }
209 525342 :
210 525342 : if self.full_backup {
211 14014 : if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
212 : {
213 : // skip this, will include it when we reach the init fork
214 UBC 0 : continue;
215 CBC 14014 : }
216 14014 : self.add_rel(rel, rel).await?;
217 511328 : }
218 : }
219 :
220 2241 : for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
221 688 : if path.starts_with("pg_replslot") {
222 12 : let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
223 12 : let restart_lsn = Lsn(u64::from_le_bytes(
224 12 : content[offs..offs + 8].try_into().unwrap(),
225 12 : ));
226 12 : info!("Replication slot {} restart LSN={}", path, restart_lsn);
227 12 : min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
228 676 : }
229 688 : let header = new_tar_header(&path, content.len() as u64)?;
230 688 : self.ar
231 688 : .append(&header, &*content)
232 8 : .await
233 688 : .context("could not add aux file to basebackup tarball")?;
234 : }
235 : }
236 558 : if min_restart_lsn != Lsn::MAX {
237 3 : info!(
238 3 : "Min restart LSN for logical replication is {}",
239 3 : min_restart_lsn
240 3 : );
241 3 : let data = min_restart_lsn.0.to_le_bytes();
242 3 : let header = new_tar_header("restart.lsn", data.len() as u64)?;
243 3 : self.ar
244 3 : .append(&header, &data[..])
245 UBC 0 : .await
246 CBC 3 : .context("could not add restart.lsn file to basebackup tarball")?;
247 555 : }
248 558 : for xid in self
249 558 : .timeline
250 558 : .list_twophase_files(self.lsn, self.ctx)
251 26 : .await?
252 : {
253 2 : self.add_twophase_file(xid).await?;
254 : }
255 :
256 558 : fail_point!("basebackup-before-control-file", |_| {
257 1 : bail!("failpoint basebackup-before-control-file")
258 558 : });
259 :
260 : // Generate pg_control and bootstrap WAL segment.
261 7445 : self.add_pgcontrol_file().await?;
262 557 : self.ar.finish().await?;
263 UBC 0 : debug!("all tarred up!");
264 CBC 557 : Ok(())
265 559 : }
266 :
267 : /// Add contents of relfilenode `src`, naming it as `dst`.
268 14018 : async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
269 14018 : let nblocks = self
270 14018 : .timeline
271 14018 : .get_rel_size(src, Version::Lsn(self.lsn), false, self.ctx)
272 648 : .await?;
273 :
274 : // If the relation is empty, create an empty file
275 14018 : if nblocks == 0 {
276 2400 : let file_name = dst.to_segfile_name(0);
277 2400 : let header = new_tar_header(&file_name, 0)?;
278 2400 : self.ar.append(&header, &mut io::empty()).await?;
279 2400 : return Ok(());
280 11618 : }
281 11618 :
282 11618 : // Add a file for each chunk of blocks (aka segment)
283 11618 : let mut startblk = 0;
284 11618 : let mut seg = 0;
285 23236 : while startblk < nblocks {
286 11618 : let endblk = std::cmp::min(startblk + RELSEG_SIZE, nblocks);
287 11618 :
288 11618 : let mut segment_data: Vec<u8> = vec![];
289 46718 : for blknum in startblk..endblk {
290 46718 : let img = self
291 46718 : .timeline
292 46718 : .get_rel_page_at_lsn(src, blknum, Version::Lsn(self.lsn), false, self.ctx)
293 6075 : .await?;
294 46718 : segment_data.extend_from_slice(&img[..]);
295 : }
296 :
297 11618 : let file_name = dst.to_segfile_name(seg as u32);
298 11618 : let header = new_tar_header(&file_name, segment_data.len() as u64)?;
299 11618 : self.ar.append(&header, segment_data.as_slice()).await?;
300 :
301 11618 : seg += 1;
302 11618 : startblk = endblk;
303 : }
304 :
305 11618 : Ok(())
306 14018 : }
307 :
308 : //
309 : // Generate SLRU segment files from repository.
310 : //
311 1692 : async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> {
312 1692 : let nblocks = self
313 1692 : .timeline
314 1692 : .get_slru_segment_size(slru, segno, Version::Lsn(self.lsn), self.ctx)
315 72 : .await?;
316 :
317 1692 : let mut slru_buf: Vec<u8> = Vec::with_capacity(nblocks as usize * BLCKSZ as usize);
318 2323 : for blknum in 0..nblocks {
319 2323 : let img = self
320 2323 : .timeline
321 2323 : .get_slru_page_at_lsn(slru, segno, blknum, self.lsn, self.ctx)
322 20174 : .await?;
323 :
324 2323 : if slru == SlruKind::Clog {
325 609 : ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8);
326 : } else {
327 1714 : ensure!(img.len() == BLCKSZ as usize);
328 : }
329 :
330 2323 : slru_buf.extend_from_slice(&img[..BLCKSZ as usize]);
331 : }
332 :
333 1692 : let segname = format!("{}/{:>04X}", slru.to_str(), segno);
334 1692 : let header = new_tar_header(&segname, slru_buf.len() as u64)?;
335 1692 : self.ar.append(&header, slru_buf.as_slice()).await?;
336 :
337 UBC 0 : trace!("Added to basebackup slru {} relsize {}", segname, nblocks);
338 CBC 1692 : Ok(())
339 1692 : }
340 :
341 : //
342 : // Include database/tablespace directories.
343 : //
344 : // Each directory contains a PG_VERSION file, and the default database
345 : // directories also contain pg_filenode.map files.
346 : //
347 2241 : async fn add_dbdir(
348 2241 : &mut self,
349 2241 : spcnode: u32,
350 2241 : dbnode: u32,
351 2241 : has_relmap_file: bool,
352 2241 : ) -> anyhow::Result<()> {
353 2241 : let relmap_img = if has_relmap_file {
354 2240 : let img = self
355 2240 : .timeline
356 2240 : .get_relmap_file(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
357 160 : .await?;
358 :
359 2240 : ensure!(
360 2240 : img.len()
361 2240 : == dispatch_pgversion!(
362 2240 : self.timeline.pg_version,
363 2240 : pgv::bindings::SIZEOF_RELMAPFILE
364 : )
365 : );
366 :
367 2240 : Some(img)
368 : } else {
369 1 : None
370 : };
371 :
372 2241 : if spcnode == GLOBALTABLESPACE_OID {
373 558 : let pg_version_str = match self.timeline.pg_version {
374 558 : 14 | 15 => self.timeline.pg_version.to_string(),
375 UBC 0 : ver => format!("{ver}\x0A"),
376 : };
377 CBC 558 : let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?;
378 558 : self.ar.append(&header, pg_version_str.as_bytes()).await?;
379 :
380 558 : info!("timeline.pg_version {}", self.timeline.pg_version);
381 :
382 558 : if let Some(img) = relmap_img {
383 : // filenode map for global tablespace
384 558 : let header = new_tar_header("global/pg_filenode.map", img.len() as u64)?;
385 558 : self.ar.append(&header, &img[..]).await?;
386 : } else {
387 UBC 0 : warn!("global/pg_filenode.map is missing");
388 : }
389 : } else {
390 : // User defined tablespaces are not supported. However, as
391 : // a special case, if a tablespace/db directory is
392 : // completely empty, we can leave it out altogether. This
393 : // makes taking a base backup after the 'tablespace'
394 : // regression test pass, because the test drops the
395 : // created tablespaces after the tests.
396 : //
397 : // FIXME: this wouldn't be necessary, if we handled
398 : // XLOG_TBLSPC_DROP records. But we probably should just
399 : // throw an error on CREATE TABLESPACE in the first place.
400 CBC 1683 : if !has_relmap_file
401 1 : && self
402 1 : .timeline
403 1 : .list_rels(spcnode, dbnode, Version::Lsn(self.lsn), self.ctx)
404 UBC 0 : .await?
405 CBC 1 : .is_empty()
406 : {
407 1 : return Ok(());
408 1682 : }
409 1682 : // User defined tablespaces are not supported
410 1682 : ensure!(spcnode == DEFAULTTABLESPACE_OID);
411 :
412 : // Append dir path for each database
413 1682 : let path = format!("base/{}", dbnode);
414 1682 : let header = new_tar_header_dir(&path)?;
415 1682 : self.ar.append(&header, &mut io::empty()).await?;
416 :
417 1682 : if let Some(img) = relmap_img {
418 1682 : let dst_path = format!("base/{}/PG_VERSION", dbnode);
419 :
420 1682 : let pg_version_str = match self.timeline.pg_version {
421 1682 : 14 | 15 => self.timeline.pg_version.to_string(),
422 UBC 0 : ver => format!("{ver}\x0A"),
423 : };
424 CBC 1682 : let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
425 1682 : self.ar.append(&header, pg_version_str.as_bytes()).await?;
426 :
427 1682 : let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
428 1682 : let header = new_tar_header(&relmap_path, img.len() as u64)?;
429 1682 : self.ar.append(&header, &img[..]).await?;
430 UBC 0 : }
431 : };
432 CBC 2240 : Ok(())
433 2241 : }
434 :
435 : //
436 : // Extract twophase state files
437 : //
438 2 : async fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
439 2 : let img = self
440 2 : .timeline
441 2 : .get_twophase_file(xid, self.lsn, self.ctx)
442 UBC 0 : .await?;
443 :
444 CBC 2 : let mut buf = BytesMut::new();
445 2 : buf.extend_from_slice(&img[..]);
446 2 : let crc = crc32c::crc32c(&img[..]);
447 2 : buf.put_u32_le(crc);
448 2 : let path = format!("pg_twophase/{:>08X}", xid);
449 2 : let header = new_tar_header(&path, buf.len() as u64)?;
450 2 : self.ar.append(&header, &buf[..]).await?;
451 :
452 2 : Ok(())
453 2 : }
454 :
455 : //
456 : // Add generated pg_control file and bootstrap WAL segment.
457 : // Also send zenith.signal file with extra bootstrap data.
458 : //
459 557 : async fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
460 557 : // add zenith.signal file
461 557 : let mut zenith_signal = String::new();
462 557 : if self.prev_record_lsn == Lsn(0) {
463 47 : if self.lsn == self.timeline.get_ancestor_lsn() {
464 10 : write!(zenith_signal, "PREV LSN: none")?;
465 : } else {
466 37 : write!(zenith_signal, "PREV LSN: invalid")?;
467 : }
468 : } else {
469 510 : write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
470 : }
471 557 : self.ar
472 557 : .append(
473 557 : &new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
474 557 : zenith_signal.as_bytes(),
475 : )
476 GBC 3 : .await?;
477 :
478 CBC 557 : let checkpoint_bytes = self
479 557 : .timeline
480 557 : .get_checkpoint(self.lsn, self.ctx)
481 125 : .await
482 557 : .context("failed to get checkpoint bytes")?;
483 557 : let pg_control_bytes = self
484 557 : .timeline
485 557 : .get_control_file(self.lsn, self.ctx)
486 23 : .await
487 557 : .context("failed get control bytes")?;
488 :
489 557 : let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control(
490 557 : &pg_control_bytes,
491 557 : &checkpoint_bytes,
492 557 : self.lsn,
493 557 : self.timeline.pg_version,
494 557 : )?;
495 :
496 : //send pg_control
497 557 : let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
498 557 : self.ar.append(&header, &pg_control_bytes[..]).await?;
499 :
500 : //send wal segment
501 557 : let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
502 557 : let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE);
503 557 : let wal_file_path = format!("pg_wal/{}", wal_file_name);
504 557 : let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
505 :
506 557 : let wal_seg = postgres_ffi::generate_wal_segment(
507 557 : segno,
508 557 : system_identifier,
509 557 : self.timeline.pg_version,
510 557 : self.lsn,
511 557 : )
512 557 : .map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
513 557 : ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
514 7292 : self.ar.append(&header, &wal_seg[..]).await?;
515 557 : Ok(())
516 557 : }
517 : }
518 :
519 : //
520 : // Create new tarball entry header
521 : //
522 24231 : fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
523 24231 : let mut header = Header::new_gnu();
524 24231 : header.set_size(size);
525 24231 : header.set_path(path)?;
526 24231 : header.set_mode(0b110000000); // -rw-------
527 24231 : header.set_mtime(
528 24231 : // use currenttime as last modified time
529 24231 : SystemTime::now()
530 24231 : .duration_since(SystemTime::UNIX_EPOCH)
531 24231 : .unwrap()
532 24231 : .as_secs(),
533 24231 : );
534 24231 : header.set_cksum();
535 24231 : Ok(header)
536 24231 : }
537 :
538 13980 : fn new_tar_header_dir(path: &str) -> anyhow::Result<Header> {
539 13980 : let mut header = Header::new_gnu();
540 13980 : header.set_size(0);
541 13980 : header.set_path(path)?;
542 13980 : header.set_mode(0o755); // -rw-------
543 13980 : header.set_entry_type(EntryType::dir());
544 13980 : header.set_mtime(
545 13980 : // use currenttime as last modified time
546 13980 : SystemTime::now()
547 13980 : .duration_since(SystemTime::UNIX_EPOCH)
548 13980 : .unwrap()
549 13980 : .as_secs(),
550 13980 : );
551 13980 : header.set_cksum();
552 13980 : Ok(header)
553 13980 : }
|