TLA Line data Source code
1 : //!
2 : //! Generate a tarball with files needed to bootstrap ComputeNode.
3 : //!
4 : //! TODO: this module has nothing to do with PostgreSQL pg_basebackup.
5 : //! It could use a better name.
6 : //!
7 : //! Stateless Postgres compute node is launched by sending a tarball
8 : //! which contains non-relational data (multixacts, clog, filenodemaps, twophase files),
9 : //! generated pg_control and dummy segment of WAL.
10 : //! This module is responsible for creation of such tarball
11 : //! from data stored in object storage.
12 : //!
13 : use anyhow::{anyhow, bail, ensure, Context};
14 : use bytes::{BufMut, BytesMut};
15 : use fail::fail_point;
16 : use postgres_ffi::pg_constants;
17 : use std::fmt::Write as FmtWrite;
18 : use std::time::SystemTime;
19 : use tokio::io;
20 : use tokio::io::AsyncWrite;
21 : use tracing::*;
22 :
23 : use tokio_tar::{Builder, EntryType, Header};
24 :
25 : use crate::context::RequestContext;
26 : use crate::tenant::Timeline;
27 : use pageserver_api::reltag::{RelTag, SlruKind};
28 :
29 : use postgres_ffi::dispatch_pgversion;
30 : use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
31 : use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
32 : use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
33 : use postgres_ffi::TransactionId;
34 : use postgres_ffi::XLogFileName;
35 : use postgres_ffi::PG_TLI;
36 : use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE};
37 : use utils::lsn::Lsn;
38 :
39 : /// Create basebackup with non-rel data in it.
40 : /// Only include relational data if 'full_backup' is true.
41 : ///
42 : /// Currently we use empty 'req_lsn' in two cases:
43 : /// * During the basebackup right after timeline creation
44 : /// * When working without safekeepers. In this situation it is important to match the lsn
45 : /// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
46 : /// to start the replication.
47 CBC 640 : pub async fn send_basebackup_tarball<'a, W>(
48 640 : write: &'a mut W,
49 640 : timeline: &'a Timeline,
50 640 : req_lsn: Option<Lsn>,
51 640 : prev_lsn: Option<Lsn>,
52 640 : full_backup: bool,
53 640 : ctx: &'a RequestContext,
54 640 : ) -> anyhow::Result<()>
55 640 : where
56 640 : W: AsyncWrite + Send + Sync + Unpin,
57 640 : {
58 : // Compute postgres doesn't have any previous WAL files, but the first
59 : // record that it's going to write needs to include the LSN of the
60 : // previous record (xl_prev). We include prev_record_lsn in the
61 : // "zenith.signal" file, so that postgres can read it during startup.
62 : //
63 : // We don't keep full history of record boundaries in the page server,
64 : // however, only the predecessor of the latest record on each
65 : // timeline. So we can only provide prev_record_lsn when you take a
66 : // base backup at the end of the timeline, i.e. at last_record_lsn.
67 : // Even at the end of the timeline, we sometimes don't have a valid
68 : // prev_lsn value; that happens if the timeline was just branched from
69 : // an old LSN and it doesn't have any WAL of its own yet. We will set
70 : // prev_lsn to Lsn(0) if we cannot provide the correct value.
71 640 : let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
72 : // Backup was requested at a particular LSN. The caller should've
73 : // already checked that it's a valid LSN.
74 :
75 : // If the requested point is the end of the timeline, we can
76 : // provide prev_lsn. (get_last_record_rlsn() might return it as
77 : // zero, though, if no WAL has been generated on this timeline
78 : // yet.)
79 232 : let end_of_timeline = timeline.get_last_record_rlsn();
80 232 : if req_lsn == end_of_timeline.last {
81 155 : (end_of_timeline.prev, req_lsn)
82 : } else {
83 77 : (Lsn(0), req_lsn)
84 : }
85 : } else {
86 : // Backup was requested at end of the timeline.
87 408 : let end_of_timeline = timeline.get_last_record_rlsn();
88 408 : (end_of_timeline.prev, end_of_timeline.last)
89 : };
90 :
91 : // Consolidate the derived and the provided prev_lsn values
92 640 : let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
93 6 : if backup_prev != Lsn(0) {
94 5 : ensure!(backup_prev == provided_prev_lsn);
95 1 : }
96 6 : provided_prev_lsn
97 : } else {
98 634 : backup_prev
99 : };
100 :
101 640 : info!(
102 640 : "taking basebackup lsn={}, prev_lsn={} (full_backup={})",
103 640 : backup_lsn, prev_lsn, full_backup
104 640 : );
105 :
106 640 : let basebackup = Basebackup {
107 640 : ar: Builder::new_non_terminated(write),
108 640 : timeline,
109 640 : lsn: backup_lsn,
110 640 : prev_record_lsn: prev_lsn,
111 640 : full_backup,
112 640 : ctx,
113 640 : };
114 640 : basebackup
115 640 : .send_tarball()
116 640 : .instrument(info_span!("send_tarball", backup_lsn=%backup_lsn))
117 45698 : .await
118 640 : }
119 :
120 : /// This is short-living object only for the time of tarball creation,
121 : /// created mostly to avoid passing a lot of parameters between various functions
122 : /// used for constructing tarball.
123 : struct Basebackup<'a, W>
124 : where
125 : W: AsyncWrite + Send + Sync + Unpin,
126 : {
127 : ar: Builder<&'a mut W>,
128 : timeline: &'a Timeline,
129 : lsn: Lsn,
130 : prev_record_lsn: Lsn,
131 : full_backup: bool,
132 : ctx: &'a RequestContext,
133 : }
134 :
135 : impl<'a, W> Basebackup<'a, W>
136 : where
137 : W: AsyncWrite + Send + Sync + Unpin,
138 : {
139 640 : async fn send_tarball(mut self) -> anyhow::Result<()> {
140 : // TODO include checksum
141 :
142 : // Create pgdata subdirs structure
143 14080 : for dir in PGDATA_SUBDIRS.iter() {
144 14080 : let header = new_tar_header_dir(dir)?;
145 14080 : self.ar
146 14080 : .append(&header, &mut io::empty())
147 UBC 0 : .await
148 CBC 14080 : .context("could not add directory to basebackup tarball")?;
149 : }
150 :
151 : // Send config files.
152 1920 : for filepath in PGDATA_SPECIAL_FILES.iter() {
153 1920 : if *filepath == "pg_hba.conf" {
154 640 : let data = PG_HBA.as_bytes();
155 640 : let header = new_tar_header(filepath, data.len() as u64)?;
156 640 : self.ar
157 640 : .append(&header, data)
158 UBC 0 : .await
159 CBC 640 : .context("could not add config file to basebackup tarball")?;
160 : } else {
161 1280 : let header = new_tar_header(filepath, 0)?;
162 1280 : self.ar
163 1280 : .append(&header, &mut io::empty())
164 UBC 0 : .await
165 CBC 1280 : .context("could not add config file to basebackup tarball")?;
166 : }
167 : }
168 :
169 : // Gather non-relational files from object storage pages.
170 1918 : for kind in [
171 640 : SlruKind::Clog,
172 640 : SlruKind::MultiXactOffsets,
173 640 : SlruKind::MultiXactMembers,
174 : ] {
175 1935 : for segno in self
176 1918 : .timeline
177 1918 : .list_slru_segments(kind, self.lsn, self.ctx)
178 279 : .await?
179 : {
180 31697 : self.add_slru_segment(kind, segno).await?;
181 : }
182 : }
183 :
184 639 : let mut min_restart_lsn: Lsn = Lsn::MAX;
185 : // Create tablespace directories
186 2567 : for ((spcnode, dbnode), has_relmap_file) in
187 639 : self.timeline.list_dbdirs(self.lsn, self.ctx).await?
188 : {
189 2567 : self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
190 :
191 : // If full backup is requested, include all relation files.
192 : // Otherwise only include init forks of unlogged relations.
193 2567 : let rels = self
194 2567 : .timeline
195 2567 : .list_rels(spcnode, dbnode, self.lsn, self.ctx)
196 214 : .await?;
197 601605 : for &rel in rels.iter() {
198 : // Send init fork as main fork to provide well formed empty
199 : // contents of UNLOGGED relations. Postgres copies it in
200 : // `reinit.c` during recovery.
201 601605 : if rel.forknum == INIT_FORKNUM {
202 : // I doubt we need _init fork itself, but having it at least
203 : // serves as a marker relation is unlogged.
204 2 : self.add_rel(rel, rel).await?;
205 2 : self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
206 2 : continue;
207 601603 : }
208 601603 :
209 601603 : if self.full_backup {
210 8408 : if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
211 : {
212 : // skip this, will include it when we reach the init fork
213 UBC 0 : continue;
214 CBC 8408 : }
215 8408 : self.add_rel(rel, rel).await?;
216 593195 : }
217 : }
218 :
219 2567 : for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
220 672 : if path.starts_with("pg_replslot") {
221 8 : let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
222 8 : let restart_lsn = Lsn(u64::from_le_bytes(
223 8 : content[offs..offs + 8].try_into().unwrap(),
224 8 : ));
225 8 : info!("Replication slot {} restart LSN={}", path, restart_lsn);
226 8 : min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
227 664 : }
228 672 : let header = new_tar_header(&path, content.len() as u64)?;
229 672 : self.ar
230 672 : .append(&header, &*content)
231 8 : .await
232 672 : .context("could not add aux file to basebackup tarball")?;
233 : }
234 : }
235 639 : if min_restart_lsn != Lsn::MAX {
236 2 : info!(
237 2 : "Min restart LSN for logical replication is {}",
238 2 : min_restart_lsn
239 2 : );
240 2 : let data = min_restart_lsn.0.to_le_bytes();
241 2 : let header = new_tar_header("restart.lsn", data.len() as u64)?;
242 2 : self.ar
243 2 : .append(&header, &data[..])
244 UBC 0 : .await
245 CBC 2 : .context("could not add restart.lsn file to basebackup tarball")?;
246 637 : }
247 639 : for xid in self
248 639 : .timeline
249 639 : .list_twophase_files(self.lsn, self.ctx)
250 30 : .await?
251 : {
252 2 : self.add_twophase_file(xid).await?;
253 : }
254 :
255 639 : fail_point!("basebackup-before-control-file", |_| {
256 1 : bail!("failpoint basebackup-before-control-file")
257 639 : });
258 :
259 : // Generate pg_control and bootstrap WAL segment.
260 7318 : self.add_pgcontrol_file().await?;
261 638 : self.ar.finish().await?;
262 UBC 0 : debug!("all tarred up!");
263 CBC 638 : Ok(())
264 640 : }
265 :
266 : /// Add contents of relfilenode `src`, naming it as `dst`.
267 8412 : async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
268 8412 : let nblocks = self
269 8412 : .timeline
270 8412 : .get_rel_size(src, self.lsn, false, self.ctx)
271 654 : .await?;
272 :
273 : // If the relation is empty, create an empty file
274 8412 : if nblocks == 0 {
275 1441 : let file_name = dst.to_segfile_name(0);
276 1441 : let header = new_tar_header(&file_name, 0)?;
277 1441 : self.ar.append(&header, &mut io::empty()).await?;
278 1441 : return Ok(());
279 6971 : }
280 6971 :
281 6971 : // Add a file for each chunk of blocks (aka segment)
282 6971 : let mut startblk = 0;
283 6971 : let mut seg = 0;
284 13942 : while startblk < nblocks {
285 6971 : let endblk = std::cmp::min(startblk + RELSEG_SIZE, nblocks);
286 6971 :
287 6971 : let mut segment_data: Vec<u8> = vec![];
288 27999 : for blknum in startblk..endblk {
289 27999 : let img = self
290 27999 : .timeline
291 27999 : .get_rel_page_at_lsn(src, blknum, self.lsn, false, self.ctx)
292 4630 : .await?;
293 27999 : segment_data.extend_from_slice(&img[..]);
294 : }
295 :
296 6971 : let file_name = dst.to_segfile_name(seg as u32);
297 6971 : let header = new_tar_header(&file_name, segment_data.len() as u64)?;
298 6971 : self.ar.append(&header, segment_data.as_slice()).await?;
299 :
300 6971 : seg += 1;
301 6971 : startblk = endblk;
302 : }
303 :
304 6971 : Ok(())
305 8412 : }
306 :
307 : //
308 : // Generate SLRU segment files from repository.
309 : //
310 1935 : async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> {
311 1935 : let nblocks = self
312 1935 : .timeline
313 1935 : .get_slru_segment_size(slru, segno, self.lsn, self.ctx)
314 77 : .await?;
315 :
316 1935 : let mut slru_buf: Vec<u8> = Vec::with_capacity(nblocks as usize * BLCKSZ as usize);
317 2581 : for blknum in 0..nblocks {
318 2581 : let img = self
319 2581 : .timeline
320 2581 : .get_slru_page_at_lsn(slru, segno, blknum, self.lsn, self.ctx)
321 31609 : .await?;
322 :
323 2581 : if slru == SlruKind::Clog {
324 705 : ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8);
325 : } else {
326 1876 : ensure!(img.len() == BLCKSZ as usize);
327 : }
328 :
329 2581 : slru_buf.extend_from_slice(&img[..BLCKSZ as usize]);
330 : }
331 :
332 1935 : let segname = format!("{}/{:>04X}", slru.to_str(), segno);
333 1935 : let header = new_tar_header(&segname, slru_buf.len() as u64)?;
334 1935 : self.ar.append(&header, slru_buf.as_slice()).await?;
335 :
336 UBC 0 : trace!("Added to basebackup slru {} relsize {}", segname, nblocks);
337 CBC 1935 : Ok(())
338 1935 : }
339 :
340 : //
341 : // Include database/tablespace directories.
342 : //
343 : // Each directory contains a PG_VERSION file, and the default database
344 : // directories also contain pg_filenode.map files.
345 : //
346 2567 : async fn add_dbdir(
347 2567 : &mut self,
348 2567 : spcnode: u32,
349 2567 : dbnode: u32,
350 2567 : has_relmap_file: bool,
351 2567 : ) -> anyhow::Result<()> {
352 2567 : let relmap_img = if has_relmap_file {
353 2566 : let img = self
354 2566 : .timeline
355 2566 : .get_relmap_file(spcnode, dbnode, self.lsn, self.ctx)
356 130 : .await?;
357 :
358 2566 : ensure!(
359 2566 : img.len()
360 2566 : == dispatch_pgversion!(
361 2566 : self.timeline.pg_version,
362 2566 : pgv::bindings::SIZEOF_RELMAPFILE
363 : )
364 : );
365 :
366 2566 : Some(img)
367 : } else {
368 1 : None
369 : };
370 :
371 2567 : if spcnode == GLOBALTABLESPACE_OID {
372 639 : let pg_version_str = match self.timeline.pg_version {
373 639 : 14 | 15 => self.timeline.pg_version.to_string(),
374 UBC 0 : ver => format!("{ver}\x0A"),
375 : };
376 CBC 639 : let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?;
377 639 : self.ar.append(&header, pg_version_str.as_bytes()).await?;
378 :
379 639 : info!("timeline.pg_version {}", self.timeline.pg_version);
380 :
381 639 : if let Some(img) = relmap_img {
382 : // filenode map for global tablespace
383 639 : let header = new_tar_header("global/pg_filenode.map", img.len() as u64)?;
384 639 : self.ar.append(&header, &img[..]).await?;
385 : } else {
386 UBC 0 : warn!("global/pg_filenode.map is missing");
387 : }
388 : } else {
389 : // User defined tablespaces are not supported. However, as
390 : // a special case, if a tablespace/db directory is
391 : // completely empty, we can leave it out altogether. This
392 : // makes taking a base backup after the 'tablespace'
393 : // regression test pass, because the test drops the
394 : // created tablespaces after the tests.
395 : //
396 : // FIXME: this wouldn't be necessary, if we handled
397 : // XLOG_TBLSPC_DROP records. But we probably should just
398 : // throw an error on CREATE TABLESPACE in the first place.
399 CBC 1928 : if !has_relmap_file
400 1 : && self
401 1 : .timeline
402 1 : .list_rels(spcnode, dbnode, self.lsn, self.ctx)
403 UBC 0 : .await?
404 CBC 1 : .is_empty()
405 : {
406 1 : return Ok(());
407 1927 : }
408 1927 : // User defined tablespaces are not supported
409 1927 : ensure!(spcnode == DEFAULTTABLESPACE_OID);
410 :
411 : // Append dir path for each database
412 1927 : let path = format!("base/{}", dbnode);
413 1927 : let header = new_tar_header_dir(&path)?;
414 1927 : self.ar.append(&header, &mut io::empty()).await?;
415 :
416 1927 : if let Some(img) = relmap_img {
417 1927 : let dst_path = format!("base/{}/PG_VERSION", dbnode);
418 :
419 1927 : let pg_version_str = match self.timeline.pg_version {
420 1927 : 14 | 15 => self.timeline.pg_version.to_string(),
421 UBC 0 : ver => format!("{ver}\x0A"),
422 : };
423 CBC 1927 : let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
424 1927 : self.ar.append(&header, pg_version_str.as_bytes()).await?;
425 :
426 1927 : let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
427 1927 : let header = new_tar_header(&relmap_path, img.len() as u64)?;
428 1927 : self.ar.append(&header, &img[..]).await?;
429 UBC 0 : }
430 : };
431 CBC 2566 : Ok(())
432 2567 : }
433 :
434 : //
435 : // Extract twophase state files
436 : //
437 2 : async fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
438 2 : let img = self
439 2 : .timeline
440 2 : .get_twophase_file(xid, self.lsn, self.ctx)
441 UBC 0 : .await?;
442 :
443 CBC 2 : let mut buf = BytesMut::new();
444 2 : buf.extend_from_slice(&img[..]);
445 2 : let crc = crc32c::crc32c(&img[..]);
446 2 : buf.put_u32_le(crc);
447 2 : let path = format!("pg_twophase/{:>08X}", xid);
448 2 : let header = new_tar_header(&path, buf.len() as u64)?;
449 2 : self.ar.append(&header, &buf[..]).await?;
450 :
451 2 : Ok(())
452 2 : }
453 :
454 : //
455 : // Add generated pg_control file and bootstrap WAL segment.
456 : // Also send zenith.signal file with extra bootstrap data.
457 : //
458 638 : async fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
459 638 : // add zenith.signal file
460 638 : let mut zenith_signal = String::new();
461 638 : if self.prev_record_lsn == Lsn(0) {
462 94 : if self.lsn == self.timeline.get_ancestor_lsn() {
463 18 : write!(zenith_signal, "PREV LSN: none")?;
464 : } else {
465 76 : write!(zenith_signal, "PREV LSN: invalid")?;
466 : }
467 : } else {
468 544 : write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
469 : }
470 638 : self.ar
471 638 : .append(
472 638 : &new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
473 638 : zenith_signal.as_bytes(),
474 : )
475 2 : .await?;
476 :
477 638 : let checkpoint_bytes = self
478 638 : .timeline
479 638 : .get_checkpoint(self.lsn, self.ctx)
480 6 : .await
481 638 : .context("failed to get checkpoint bytes")?;
482 638 : let pg_control_bytes = self
483 638 : .timeline
484 638 : .get_control_file(self.lsn, self.ctx)
485 24 : .await
486 638 : .context("failed get control bytes")?;
487 :
488 638 : let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control(
489 638 : &pg_control_bytes,
490 638 : &checkpoint_bytes,
491 638 : self.lsn,
492 638 : self.timeline.pg_version,
493 638 : )?;
494 :
495 : //send pg_control
496 638 : let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
497 638 : self.ar.append(&header, &pg_control_bytes[..]).await?;
498 :
499 : //send wal segment
500 638 : let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
501 638 : let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE);
502 638 : let wal_file_path = format!("pg_wal/{}", wal_file_name);
503 638 : let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
504 :
505 638 : let wal_seg = postgres_ffi::generate_wal_segment(
506 638 : segno,
507 638 : system_identifier,
508 638 : self.timeline.pg_version,
509 638 : self.lsn,
510 638 : )
511 638 : .map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
512 638 : ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
513 7285 : self.ar.append(&header, &wal_seg[..]).await?;
514 638 : Ok(())
515 638 : }
516 : }
517 :
518 : //
519 : // Create new tarball entry header
520 : //
521 19989 : fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
522 19989 : let mut header = Header::new_gnu();
523 19989 : header.set_size(size);
524 19989 : header.set_path(path)?;
525 19989 : header.set_mode(0b110000000); // -rw-------
526 19989 : header.set_mtime(
527 19989 : // use currenttime as last modified time
528 19989 : SystemTime::now()
529 19989 : .duration_since(SystemTime::UNIX_EPOCH)
530 19989 : .unwrap()
531 19989 : .as_secs(),
532 19989 : );
533 19989 : header.set_cksum();
534 19989 : Ok(header)
535 19989 : }
536 :
537 16007 : fn new_tar_header_dir(path: &str) -> anyhow::Result<Header> {
538 16007 : let mut header = Header::new_gnu();
539 16007 : header.set_size(0);
540 16007 : header.set_path(path)?;
541 16007 : header.set_mode(0o755); // -rw-------
542 16007 : header.set_entry_type(EntryType::dir());
543 16007 : header.set_mtime(
544 16007 : // use currenttime as last modified time
545 16007 : SystemTime::now()
546 16007 : .duration_since(SystemTime::UNIX_EPOCH)
547 16007 : .unwrap()
548 16007 : .as_secs(),
549 16007 : );
550 16007 : header.set_cksum();
551 16007 : Ok(header)
552 16007 : }
|