Line data Source code
1 : //!
2 : //! Generate a tarball with files needed to bootstrap ComputeNode.
3 : //!
4 : //! TODO: this module has nothing to do with PostgreSQL pg_basebackup.
5 : //! It could use a better name.
6 : //!
7 : //! Stateless Postgres compute node is launched by sending a tarball
8 : //! which contains non-relational data (multixacts, clog, filenodemaps, twophase files),
9 : //! generated pg_control and dummy segment of WAL.
10 : //! This module is responsible for creation of such tarball
11 : //! from data stored in object storage.
12 : //!
13 : use anyhow::{anyhow, bail, ensure, Context};
14 : use bytes::{BufMut, BytesMut};
15 : use fail::fail_point;
16 : use std::fmt::Write as FmtWrite;
17 : use std::time::SystemTime;
18 : use tokio::io;
19 : use tokio::io::AsyncWrite;
20 : use tracing::*;
21 :
22 : use tokio_tar::{Builder, EntryType, Header};
23 :
24 : use crate::context::RequestContext;
25 : use crate::tenant::Timeline;
26 : use pageserver_api::reltag::{RelTag, SlruKind};
27 :
28 : use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
29 : use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
30 : use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
31 : use postgres_ffi::TransactionId;
32 : use postgres_ffi::XLogFileName;
33 : use postgres_ffi::PG_TLI;
34 : use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE};
35 : use utils::lsn::Lsn;
36 :
37 : /// Create basebackup with non-rel data in it.
38 : /// Only include relational data if 'full_backup' is true.
39 : ///
40 : /// Currently we use empty 'req_lsn' in two cases:
41 : /// * During the basebackup right after timeline creation
42 : /// * When working without safekeepers. In this situation it is important to match the lsn
43 : /// we are taking basebackup on with the lsn that is used in pageserver's walreceiver
44 : /// to start the replication.
45 662 : pub async fn send_basebackup_tarball<'a, W>(
46 662 : write: &'a mut W,
47 662 : timeline: &'a Timeline,
48 662 : req_lsn: Option<Lsn>,
49 662 : prev_lsn: Option<Lsn>,
50 662 : full_backup: bool,
51 662 : ctx: &'a RequestContext,
52 662 : ) -> anyhow::Result<()>
53 662 : where
54 662 : W: AsyncWrite + Send + Sync + Unpin,
55 662 : {
56 : // Compute postgres doesn't have any previous WAL files, but the first
57 : // record that it's going to write needs to include the LSN of the
58 : // previous record (xl_prev). We include prev_record_lsn in the
59 : // "zenith.signal" file, so that postgres can read it during startup.
60 : //
61 : // We don't keep full history of record boundaries in the page server,
62 : // however, only the predecessor of the latest record on each
63 : // timeline. So we can only provide prev_record_lsn when you take a
64 : // base backup at the end of the timeline, i.e. at last_record_lsn.
65 : // Even at the end of the timeline, we sometimes don't have a valid
66 : // prev_lsn value; that happens if the timeline was just branched from
67 : // an old LSN and it doesn't have any WAL of its own yet. We will set
68 : // prev_lsn to Lsn(0) if we cannot provide the correct value.
69 662 : let (backup_prev, backup_lsn) = if let Some(req_lsn) = req_lsn {
70 : // Backup was requested at a particular LSN. The caller should've
71 : // already checked that it's a valid LSN.
72 :
73 : // If the requested point is the end of the timeline, we can
74 : // provide prev_lsn. (get_last_record_rlsn() might return it as
75 : // zero, though, if no WAL has been generated on this timeline
76 : // yet.)
77 215 : let end_of_timeline = timeline.get_last_record_rlsn();
78 215 : if req_lsn == end_of_timeline.last {
79 140 : (end_of_timeline.prev, req_lsn)
80 : } else {
81 75 : (Lsn(0), req_lsn)
82 : }
83 : } else {
84 : // Backup was requested at end of the timeline.
85 447 : let end_of_timeline = timeline.get_last_record_rlsn();
86 447 : (end_of_timeline.prev, end_of_timeline.last)
87 : };
88 :
89 : // Consolidate the derived and the provided prev_lsn values
90 662 : let prev_lsn = if let Some(provided_prev_lsn) = prev_lsn {
91 6 : if backup_prev != Lsn(0) {
92 5 : ensure!(backup_prev == provided_prev_lsn);
93 1 : }
94 6 : provided_prev_lsn
95 : } else {
96 656 : backup_prev
97 : };
98 :
99 662 : info!(
100 662 : "taking basebackup lsn={}, prev_lsn={} (full_backup={})",
101 662 : backup_lsn, prev_lsn, full_backup
102 662 : );
103 :
104 662 : let basebackup = Basebackup {
105 662 : ar: Builder::new_non_terminated(write),
106 662 : timeline,
107 662 : lsn: backup_lsn,
108 662 : prev_record_lsn: prev_lsn,
109 662 : full_backup,
110 662 : ctx,
111 662 : };
112 662 : basebackup
113 662 : .send_tarball()
114 662 : .instrument(info_span!("send_tarball", backup_lsn=%backup_lsn))
115 25212 : .await
116 662 : }
117 :
118 : /// This is short-living object only for the time of tarball creation,
119 : /// created mostly to avoid passing a lot of parameters between various functions
120 : /// used for constructing tarball.
121 : struct Basebackup<'a, W>
122 : where
123 : W: AsyncWrite + Send + Sync + Unpin,
124 : {
125 : ar: Builder<&'a mut W>,
126 : timeline: &'a Timeline,
127 : lsn: Lsn,
128 : prev_record_lsn: Lsn,
129 : full_backup: bool,
130 : ctx: &'a RequestContext,
131 : }
132 :
133 : impl<'a, W> Basebackup<'a, W>
134 : where
135 : W: AsyncWrite + Send + Sync + Unpin,
136 : {
137 662 : async fn send_tarball(mut self) -> anyhow::Result<()> {
138 : // TODO include checksum
139 :
140 : // Create pgdata subdirs structure
141 14564 : for dir in PGDATA_SUBDIRS.iter() {
142 14564 : let header = new_tar_header_dir(dir)?;
143 14564 : self.ar
144 14564 : .append(&header, &mut io::empty())
145 0 : .await
146 14564 : .context("could not add directory to basebackup tarball")?;
147 : }
148 :
149 : // Send config files.
150 1986 : for filepath in PGDATA_SPECIAL_FILES.iter() {
151 1986 : if *filepath == "pg_hba.conf" {
152 662 : let data = PG_HBA.as_bytes();
153 662 : let header = new_tar_header(filepath, data.len() as u64)?;
154 662 : self.ar
155 662 : .append(&header, data)
156 0 : .await
157 662 : .context("could not add config file to basebackup tarball")?;
158 : } else {
159 1324 : let header = new_tar_header(filepath, 0)?;
160 1324 : self.ar
161 1324 : .append(&header, &mut io::empty())
162 0 : .await
163 1324 : .context("could not add config file to basebackup tarball")?;
164 : }
165 : }
166 :
167 : // Gather non-relational files from object storage pages.
168 1984 : for kind in [
169 662 : SlruKind::Clog,
170 662 : SlruKind::MultiXactOffsets,
171 662 : SlruKind::MultiXactMembers,
172 : ] {
173 2001 : for segno in self
174 1984 : .timeline
175 1984 : .list_slru_segments(kind, self.lsn, self.ctx)
176 178 : .await?
177 : {
178 13006 : self.add_slru_segment(kind, segno).await?;
179 : }
180 : }
181 :
182 : // Create tablespace directories
183 2655 : for ((spcnode, dbnode), has_relmap_file) in
184 661 : self.timeline.list_dbdirs(self.lsn, self.ctx).await?
185 : {
186 2655 : self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
187 :
188 : // If full backup is requested, include all relation files.
189 : // Otherwise only include init forks of unlogged relations.
190 2655 : let rels = self
191 2655 : .timeline
192 2655 : .list_rels(spcnode, dbnode, self.lsn, self.ctx)
193 131 : .await?;
194 622022 : for &rel in rels.iter() {
195 : // Send init fork as main fork to provide well formed empty
196 : // contents of UNLOGGED relations. Postgres copies it in
197 : // `reinit.c` during recovery.
198 622022 : if rel.forknum == INIT_FORKNUM {
199 : // I doubt we need _init fork itself, but having it at least
200 : // serves as a marker relation is unlogged.
201 2 : self.add_rel(rel, rel).await?;
202 2 : self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
203 2 : continue;
204 622020 : }
205 622020 :
206 622020 : if self.full_backup {
207 8408 : if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
208 : {
209 : // skip this, will include it when we reach the init fork
210 0 : continue;
211 8408 : }
212 8408 : self.add_rel(rel, rel).await?;
213 613612 : }
214 : }
215 : }
216 661 : for xid in self
217 661 : .timeline
218 661 : .list_twophase_files(self.lsn, self.ctx)
219 21 : .await?
220 : {
221 2 : self.add_twophase_file(xid).await?;
222 : }
223 :
224 661 : fail_point!("basebackup-before-control-file", |_| {
225 1 : bail!("failpoint basebackup-before-control-file")
226 661 : });
227 :
228 : // Generate pg_control and bootstrap WAL segment.
229 7990 : self.add_pgcontrol_file().await?;
230 660 : self.ar.finish().await?;
231 0 : debug!("all tarred up!");
232 660 : Ok(())
233 662 : }
234 :
235 : /// Add contents of relfilenode `src`, naming it as `dst`.
236 8412 : async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
237 8412 : let nblocks = self
238 8412 : .timeline
239 8412 : .get_rel_size(src, self.lsn, false, self.ctx)
240 384 : .await?;
241 :
242 : // If the relation is empty, create an empty file
243 8412 : if nblocks == 0 {
244 1441 : let file_name = dst.to_segfile_name(0);
245 1441 : let header = new_tar_header(&file_name, 0)?;
246 1441 : self.ar.append(&header, &mut io::empty()).await?;
247 1441 : return Ok(());
248 6971 : }
249 6971 :
250 6971 : // Add a file for each chunk of blocks (aka segment)
251 6971 : let mut startblk = 0;
252 6971 : let mut seg = 0;
253 13942 : while startblk < nblocks {
254 6971 : let endblk = std::cmp::min(startblk + RELSEG_SIZE, nblocks);
255 6971 :
256 6971 : let mut segment_data: Vec<u8> = vec![];
257 27999 : for blknum in startblk..endblk {
258 27999 : let img = self
259 27999 : .timeline
260 27999 : .get_rel_page_at_lsn(src, blknum, self.lsn, false, self.ctx)
261 2684 : .await?;
262 27999 : segment_data.extend_from_slice(&img[..]);
263 : }
264 :
265 6971 : let file_name = dst.to_segfile_name(seg as u32);
266 6971 : let header = new_tar_header(&file_name, segment_data.len() as u64)?;
267 6971 : self.ar.append(&header, segment_data.as_slice()).await?;
268 :
269 6971 : seg += 1;
270 6971 : startblk = endblk;
271 : }
272 :
273 6971 : Ok(())
274 8412 : }
275 :
276 : //
277 : // Generate SLRU segment files from repository.
278 : //
279 2001 : async fn add_slru_segment(&mut self, slru: SlruKind, segno: u32) -> anyhow::Result<()> {
280 2001 : let nblocks = self
281 2001 : .timeline
282 2001 : .get_slru_segment_size(slru, segno, self.lsn, self.ctx)
283 51 : .await?;
284 :
285 2001 : let mut slru_buf: Vec<u8> = Vec::with_capacity(nblocks as usize * BLCKSZ as usize);
286 2637 : for blknum in 0..nblocks {
287 2637 : let img = self
288 2637 : .timeline
289 2637 : .get_slru_page_at_lsn(slru, segno, blknum, self.lsn, self.ctx)
290 12944 : .await?;
291 :
292 2637 : if slru == SlruKind::Clog {
293 717 : ensure!(img.len() == BLCKSZ as usize || img.len() == BLCKSZ as usize + 8);
294 : } else {
295 1920 : ensure!(img.len() == BLCKSZ as usize);
296 : }
297 :
298 2637 : slru_buf.extend_from_slice(&img[..BLCKSZ as usize]);
299 : }
300 :
301 2001 : let segname = format!("{}/{:>04X}", slru.to_str(), segno);
302 2001 : let header = new_tar_header(&segname, slru_buf.len() as u64)?;
303 2001 : self.ar.append(&header, slru_buf.as_slice()).await?;
304 :
305 0 : trace!("Added to basebackup slru {} relsize {}", segname, nblocks);
306 2001 : Ok(())
307 2001 : }
308 :
309 : //
310 : // Include database/tablespace directories.
311 : //
312 : // Each directory contains a PG_VERSION file, and the default database
313 : // directories also contain pg_filenode.map files.
314 : //
315 2655 : async fn add_dbdir(
316 2655 : &mut self,
317 2655 : spcnode: u32,
318 2655 : dbnode: u32,
319 2655 : has_relmap_file: bool,
320 2655 : ) -> anyhow::Result<()> {
321 2655 : let relmap_img = if has_relmap_file {
322 2654 : let img = self
323 2654 : .timeline
324 2654 : .get_relmap_file(spcnode, dbnode, self.lsn, self.ctx)
325 112 : .await?;
326 2654 : ensure!(img.len() == 512);
327 2654 : Some(img)
328 : } else {
329 1 : None
330 : };
331 :
332 2655 : if spcnode == GLOBALTABLESPACE_OID {
333 661 : let pg_version_str = self.timeline.pg_version.to_string();
334 661 : let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?;
335 661 : self.ar.append(&header, pg_version_str.as_bytes()).await?;
336 :
337 661 : info!("timeline.pg_version {}", self.timeline.pg_version);
338 :
339 661 : if let Some(img) = relmap_img {
340 : // filenode map for global tablespace
341 661 : let header = new_tar_header("global/pg_filenode.map", img.len() as u64)?;
342 661 : self.ar.append(&header, &img[..]).await?;
343 : } else {
344 0 : warn!("global/pg_filenode.map is missing");
345 : }
346 : } else {
347 : // User defined tablespaces are not supported. However, as
348 : // a special case, if a tablespace/db directory is
349 : // completely empty, we can leave it out altogether. This
350 : // makes taking a base backup after the 'tablespace'
351 : // regression test pass, because the test drops the
352 : // created tablespaces after the tests.
353 : //
354 : // FIXME: this wouldn't be necessary, if we handled
355 : // XLOG_TBLSPC_DROP records. But we probably should just
356 : // throw an error on CREATE TABLESPACE in the first place.
357 1994 : if !has_relmap_file
358 1 : && self
359 1 : .timeline
360 1 : .list_rels(spcnode, dbnode, self.lsn, self.ctx)
361 0 : .await?
362 1 : .is_empty()
363 : {
364 1 : return Ok(());
365 1993 : }
366 1993 : // User defined tablespaces are not supported
367 1993 : ensure!(spcnode == DEFAULTTABLESPACE_OID);
368 :
369 : // Append dir path for each database
370 1993 : let path = format!("base/{}", dbnode);
371 1993 : let header = new_tar_header_dir(&path)?;
372 1993 : self.ar.append(&header, &mut io::empty()).await?;
373 :
374 1993 : if let Some(img) = relmap_img {
375 1993 : let dst_path = format!("base/{}/PG_VERSION", dbnode);
376 1993 :
377 1993 : let pg_version_str = self.timeline.pg_version.to_string();
378 1993 : let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
379 1993 : self.ar.append(&header, pg_version_str.as_bytes()).await?;
380 :
381 1993 : let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
382 1993 : let header = new_tar_header(&relmap_path, img.len() as u64)?;
383 1993 : self.ar.append(&header, &img[..]).await?;
384 0 : }
385 : };
386 2654 : Ok(())
387 2655 : }
388 :
389 : //
390 : // Extract twophase state files
391 : //
392 2 : async fn add_twophase_file(&mut self, xid: TransactionId) -> anyhow::Result<()> {
393 2 : let img = self
394 2 : .timeline
395 2 : .get_twophase_file(xid, self.lsn, self.ctx)
396 0 : .await?;
397 :
398 2 : let mut buf = BytesMut::new();
399 2 : buf.extend_from_slice(&img[..]);
400 2 : let crc = crc32c::crc32c(&img[..]);
401 2 : buf.put_u32_le(crc);
402 2 : let path = format!("pg_twophase/{:>08X}", xid);
403 2 : let header = new_tar_header(&path, buf.len() as u64)?;
404 2 : self.ar.append(&header, &buf[..]).await?;
405 :
406 2 : Ok(())
407 2 : }
408 :
409 : //
410 : // Add generated pg_control file and bootstrap WAL segment.
411 : // Also send zenith.signal file with extra bootstrap data.
412 : //
413 660 : async fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
414 660 : // add zenith.signal file
415 660 : let mut zenith_signal = String::new();
416 660 : if self.prev_record_lsn == Lsn(0) {
417 96 : if self.lsn == self.timeline.get_ancestor_lsn() {
418 22 : write!(zenith_signal, "PREV LSN: none")?;
419 : } else {
420 74 : write!(zenith_signal, "PREV LSN: invalid")?;
421 : }
422 : } else {
423 564 : write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
424 : }
425 : self.ar
426 : .append(
427 660 : &new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
428 660 : zenith_signal.as_bytes(),
429 : )
430 0 : .await?;
431 :
432 660 : let checkpoint_bytes = self
433 660 : .timeline
434 660 : .get_checkpoint(self.lsn, self.ctx)
435 11 : .await
436 660 : .context("failed to get checkpoint bytes")?;
437 660 : let pg_control_bytes = self
438 660 : .timeline
439 660 : .get_control_file(self.lsn, self.ctx)
440 18 : .await
441 660 : .context("failed get control bytes")?;
442 :
443 660 : let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control(
444 660 : &pg_control_bytes,
445 660 : &checkpoint_bytes,
446 660 : self.lsn,
447 660 : self.timeline.pg_version,
448 660 : )?;
449 :
450 : //send pg_control
451 660 : let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
452 660 : self.ar.append(&header, &pg_control_bytes[..]).await?;
453 :
454 : //send wal segment
455 660 : let segno = self.lsn.segment_number(WAL_SEGMENT_SIZE);
456 660 : let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE);
457 660 : let wal_file_path = format!("pg_wal/{}", wal_file_name);
458 660 : let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
459 :
460 660 : let wal_seg = postgres_ffi::generate_wal_segment(
461 660 : segno,
462 660 : system_identifier,
463 660 : self.timeline.pg_version,
464 660 : self.lsn,
465 660 : )
466 660 : .map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
467 660 : ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
468 7961 : self.ar.append(&header, &wal_seg[..]).await?;
469 660 : Ok(())
470 660 : }
471 : }
472 :
473 : //
474 : // Create new tarball entry header
475 : //
476 19689 : fn new_tar_header(path: &str, size: u64) -> anyhow::Result<Header> {
477 19689 : let mut header = Header::new_gnu();
478 19689 : header.set_size(size);
479 19689 : header.set_path(path)?;
480 19689 : header.set_mode(0b110000000); // -rw-------
481 19689 : header.set_mtime(
482 19689 : // use currenttime as last modified time
483 19689 : SystemTime::now()
484 19689 : .duration_since(SystemTime::UNIX_EPOCH)
485 19689 : .unwrap()
486 19689 : .as_secs(),
487 19689 : );
488 19689 : header.set_cksum();
489 19689 : Ok(header)
490 19689 : }
491 :
492 16557 : fn new_tar_header_dir(path: &str) -> anyhow::Result<Header> {
493 16557 : let mut header = Header::new_gnu();
494 16557 : header.set_size(0);
495 16557 : header.set_path(path)?;
496 16557 : header.set_mode(0o755); // -rw-------
497 16557 : header.set_entry_type(EntryType::dir());
498 16557 : header.set_mtime(
499 16557 : // use currenttime as last modified time
500 16557 : SystemTime::now()
501 16557 : .duration_since(SystemTime::UNIX_EPOCH)
502 16557 : .unwrap()
503 16557 : .as_secs(),
504 16557 : );
505 16557 : header.set_cksum();
506 16557 : Ok(header)
507 16557 : }
|