Line data Source code
1 : //!
2 : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
3 : //! to apply given WAL records over an old page image and return new
4 : //! page image.
5 : //!
6 : //! We rely on Postgres to perform WAL redo for us. We launch a
7 : //! postgres process in special "wal redo" mode that's similar to
8 : //! single-user mode. We then pass the previous page image, if any,
9 : //! and all the WAL records we want to apply, to the postgres
10 : //! process. Then we get the page image back. Communication with the
11 : //! postgres process happens via stdin/stdout
12 : //!
13 : //! See pgxn/neon_walredo/walredoproc.c for the other side of
14 : //! this communication.
15 : //!
16 : //! The Postgres process is assumed to be secure against malicious WAL
17 : //! records. It achieves it by dropping privileges before replaying
18 : //! any WAL records, so that even if an attacker hijacks the Postgres
19 : //! process, he cannot escape out of it.
20 : //!
21 : use byteorder::{ByteOrder, LittleEndian};
22 : use bytes::{BufMut, Bytes, BytesMut};
23 : use nix::poll::*;
24 : use serde::Serialize;
25 : use std::collections::VecDeque;
26 : use std::io::prelude::*;
27 : use std::io::{Error, ErrorKind};
28 : use std::ops::{Deref, DerefMut};
29 : use std::os::unix::io::{AsRawFd, RawFd};
30 : use std::os::unix::prelude::CommandExt;
31 : use std::process::Stdio;
32 : use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
33 : use std::sync::{Mutex, MutexGuard};
34 : use std::time::Duration;
35 : use std::time::Instant;
36 : use std::{fs, io};
37 : use tracing::*;
38 : use utils::crashsafe::path_with_suffix_extension;
39 : use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
40 :
41 : use crate::metrics::{
42 : WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
43 : WAL_REDO_WAIT_TIME,
44 : };
45 : use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
46 : use crate::repository::Key;
47 : use crate::task_mgr::BACKGROUND_RUNTIME;
48 : use crate::walrecord::NeonWalRecord;
49 : use crate::{config::PageServerConf, TEMP_FILE_SUFFIX};
50 : use pageserver_api::reltag::{RelTag, SlruKind};
51 : use postgres_ffi::pg_constants;
52 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
53 : use postgres_ffi::v14::nonrelfile_utils::{
54 : mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
55 : transaction_id_set_status,
56 : };
57 : use postgres_ffi::BLCKSZ;
58 :
59 : ///
60 : /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
61 : ///
62 : /// In Postgres `BufferTag` structure is used for exactly the same purpose.
63 : /// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91).
64 : ///
65 6405409 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize)]
66 : pub struct BufferTag {
67 : pub rel: RelTag,
68 : pub blknum: u32,
69 : }
70 :
71 : ///
72 : /// WAL Redo Manager is responsible for replaying WAL records.
73 : ///
74 : /// Callers use the WAL redo manager through this abstract interface,
75 : /// which makes it easy to mock it in tests.
76 : pub trait WalRedoManager: Send + Sync {
77 : /// Apply some WAL records.
78 : ///
79 : /// The caller passes an old page image, and WAL records that should be
80 : /// applied over it. The return value is a new page image, after applying
81 : /// the reords.
82 : fn request_redo(
83 : &self,
84 : key: Key,
85 : lsn: Lsn,
86 : base_img: Option<(Lsn, Bytes)>,
87 : records: Vec<(Lsn, NeonWalRecord)>,
88 : pg_version: u32,
89 : ) -> Result<Bytes, WalRedoError>;
90 : }
91 :
92 : struct ProcessInput {
93 : child: NoLeakChild,
94 : stdin: ChildStdin,
95 : stderr_fd: RawFd,
96 : stdout_fd: RawFd,
97 : n_requests: usize,
98 : }
99 :
100 : struct ProcessOutput {
101 : stdout: ChildStdout,
102 : pending_responses: VecDeque<Option<Bytes>>,
103 : n_processed_responses: usize,
104 : }
105 :
106 : ///
107 : /// This is the real implementation that uses a Postgres process to
108 : /// perform WAL replay. Only one thread can use the process at a time,
109 : /// that is controlled by the Mutex. In the future, we might want to
110 : /// launch a pool of processes to allow concurrent replay of multiple
111 : /// records.
112 : ///
113 : pub struct PostgresRedoManager {
114 : tenant_id: TenantId,
115 : conf: &'static PageServerConf,
116 :
117 : stdout: Mutex<Option<ProcessOutput>>,
118 : stdin: Mutex<Option<ProcessInput>>,
119 : stderr: Mutex<Option<ChildStderr>>,
120 : }
121 :
122 : /// Can this request be served by neon redo functions
123 : /// or we need to pass it to wal-redo postgres process?
124 207480142 : fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
125 207480142 : // Currently, we don't have bespoken Rust code to replay any
126 207480142 : // Postgres WAL records. But everything else is handled in neon.
127 207480142 : #[allow(clippy::match_like_matches_macro)]
128 207480142 : match rec {
129 : NeonWalRecord::Postgres {
130 : will_init: _,
131 : rec: _,
132 179396673 : } => false,
133 28083469 : _ => true,
134 : }
135 207480142 : }
136 :
137 : /// An error happened in WAL redo
138 0 : #[derive(Debug, thiserror::Error)]
139 : pub enum WalRedoError {
140 : #[error(transparent)]
141 : IoError(#[from] std::io::Error),
142 :
143 : #[error("cannot perform WAL redo now")]
144 : InvalidState,
145 : #[error("cannot perform WAL redo for this request")]
146 : InvalidRequest,
147 : #[error("cannot perform WAL redo for this record")]
148 : InvalidRecord,
149 : }
150 :
151 : ///
152 : /// Public interface of WAL redo manager
153 : ///
154 : impl WalRedoManager for PostgresRedoManager {
155 : ///
156 : /// Request the WAL redo manager to apply some WAL records
157 : ///
158 : /// The WAL redo is handled by a separate thread, so this just sends a request
159 : /// to the thread and waits for response.
160 : ///
161 2762175 : fn request_redo(
162 2762175 : &self,
163 2762175 : key: Key,
164 2762175 : lsn: Lsn,
165 2762175 : base_img: Option<(Lsn, Bytes)>,
166 2762175 : records: Vec<(Lsn, NeonWalRecord)>,
167 2762175 : pg_version: u32,
168 2762175 : ) -> Result<Bytes, WalRedoError> {
169 2762175 : if records.is_empty() {
170 0 : error!("invalid WAL redo request with no records");
171 0 : return Err(WalRedoError::InvalidRequest);
172 2762175 : }
173 2762175 :
174 2762175 : let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
175 2762175 : let mut img = base_img.map(|p| p.1);
176 2762175 : let mut batch_neon = can_apply_in_neon(&records[0].1);
177 2762175 : let mut batch_start = 0;
178 204717967 : for (i, record) in records.iter().enumerate().skip(1) {
179 204717967 : let rec_neon = can_apply_in_neon(&record.1);
180 204717967 :
181 204717967 : if rec_neon != batch_neon {
182 9 : let result = if batch_neon {
183 7 : self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
184 : } else {
185 2 : self.apply_batch_postgres(
186 2 : key,
187 2 : lsn,
188 2 : img,
189 2 : base_img_lsn,
190 2 : &records[batch_start..i],
191 2 : self.conf.wal_redo_timeout,
192 2 : pg_version,
193 2 : )
194 : };
195 9 : img = Some(result?);
196 :
197 9 : batch_neon = rec_neon;
198 9 : batch_start = i;
199 204717958 : }
200 : }
201 : // last batch
202 2762175 : if batch_neon {
203 4150 : self.apply_batch_neon(key, lsn, img, &records[batch_start..])
204 : } else {
205 2758025 : self.apply_batch_postgres(
206 2758025 : key,
207 2758025 : lsn,
208 2758025 : img,
209 2758025 : base_img_lsn,
210 2758025 : &records[batch_start..],
211 2758025 : self.conf.wal_redo_timeout,
212 2758025 : pg_version,
213 2758025 : )
214 : }
215 2762175 : }
216 : }
217 :
218 : impl PostgresRedoManager {
219 : ///
220 : /// Create a new PostgresRedoManager.
221 : ///
222 741 : pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
223 741 : // The actual process is launched lazily, on first request.
224 741 : PostgresRedoManager {
225 741 : tenant_id,
226 741 : conf,
227 741 : stdin: Mutex::new(None),
228 741 : stdout: Mutex::new(None),
229 741 : stderr: Mutex::new(None),
230 741 : }
231 741 : }
232 :
233 : /// Launch process pre-emptively. Should not be needed except for benchmarking.
234 0 : pub fn launch_process(&self, pg_version: u32) -> anyhow::Result<()> {
235 0 : let mut proc = self.stdin.lock().unwrap();
236 0 : if proc.is_none() {
237 0 : self.launch(&mut proc, pg_version)?;
238 0 : }
239 0 : Ok(())
240 0 : }
241 :
242 : ///
243 : /// Process one request for WAL redo using wal-redo postgres
244 : ///
245 : #[allow(clippy::too_many_arguments)]
246 2758027 : fn apply_batch_postgres(
247 2758027 : &self,
248 2758027 : key: Key,
249 2758027 : lsn: Lsn,
250 2758027 : base_img: Option<Bytes>,
251 2758027 : base_img_lsn: Lsn,
252 2758027 : records: &[(Lsn, NeonWalRecord)],
253 2758027 : wal_redo_timeout: Duration,
254 2758027 : pg_version: u32,
255 2758027 : ) -> Result<Bytes, WalRedoError> {
256 2758027 : let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
257 : const MAX_RETRY_ATTEMPTS: u32 = 1;
258 2758027 : let start_time = Instant::now();
259 2758027 : let mut n_attempts = 0u32;
260 2758027 : loop {
261 2758027 : let mut proc = self.stdin.lock().unwrap();
262 2758027 : let lock_time = Instant::now();
263 2758027 :
264 2758027 : // launch the WAL redo process on first use
265 2758027 : if proc.is_none() {
266 402 : self.launch(&mut proc, pg_version)?;
267 2757625 : }
268 2758027 : WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
269 2758027 :
270 2758027 : // Relational WAL records are applied using wal-redo-postgres
271 2758027 : let buf_tag = BufferTag { rel, blknum };
272 2758027 : let result = self
273 2758027 : .apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout)
274 2758027 : .map_err(WalRedoError::IoError);
275 2758027 :
276 2758027 : let end_time = Instant::now();
277 2758027 : let duration = end_time.duration_since(lock_time);
278 2758027 :
279 2758027 : let len = records.len();
280 179396673 : let nbytes = records.iter().fold(0, |acumulator, record| {
281 179396673 : acumulator
282 179396673 : + match &record.1 {
283 179396673 : NeonWalRecord::Postgres { rec, .. } => rec.len(),
284 0 : _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
285 : }
286 179396673 : });
287 2758027 :
288 2758027 : WAL_REDO_TIME.observe(duration.as_secs_f64());
289 2758027 : WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
290 2758027 : WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
291 2758027 :
292 2758027 : debug!(
293 0 : "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
294 0 : len,
295 0 : nbytes,
296 0 : duration.as_micros(),
297 0 : lsn
298 0 : );
299 :
300 : // If something went wrong, don't try to reuse the process. Kill it, and
301 : // next request will launch a new one.
302 2758027 : if result.is_err() {
303 0 : error!(
304 0 : "error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}",
305 0 : records.len(),
306 0 : records.first().map(|p| p.0).unwrap_or(Lsn(0)),
307 0 : records.last().map(|p| p.0).unwrap_or(Lsn(0)),
308 0 : nbytes,
309 0 : base_img_lsn,
310 0 : lsn
311 0 : );
312 : // self.stdin only holds stdin & stderr as_raw_fd().
313 : // Dropping it as part of take() doesn't close them.
314 : // The owning objects (ChildStdout and ChildStderr) are stored in
315 : // self.stdout and self.stderr, respsectively.
316 : // We intentionally keep them open here to avoid a race between
317 : // currently running `apply_wal_records()` and a `launch()` call
318 : // after we return here.
319 : // The currently running `apply_wal_records()` must not read from
320 : // the newly launched process.
321 : // By keeping self.stdout and self.stderr open here, `launch()` will
322 : // get other file descriptors for the new child's stdout and stderr,
323 : // and hence the current `apply_wal_records()` calls will observe
324 : // `output.stdout.as_raw_fd() != stdout_fd` .
325 0 : if let Some(proc) = self.stdin.lock().unwrap().take() {
326 0 : proc.child.kill_and_wait();
327 0 : }
328 2758027 : }
329 2758027 : n_attempts += 1;
330 2758027 : if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
331 2758027 : return result;
332 0 : }
333 : }
334 2758027 : }
335 :
336 : ///
337 : /// Process a batch of WAL records using bespoken Neon code.
338 : ///
339 4157 : fn apply_batch_neon(
340 4157 : &self,
341 4157 : key: Key,
342 4157 : lsn: Lsn,
343 4157 : base_img: Option<Bytes>,
344 4157 : records: &[(Lsn, NeonWalRecord)],
345 4157 : ) -> Result<Bytes, WalRedoError> {
346 4157 : let start_time = Instant::now();
347 4157 :
348 4157 : let mut page = BytesMut::new();
349 4157 : if let Some(fpi) = base_img {
350 4157 : // If full-page image is provided, then use it...
351 4157 : page.extend_from_slice(&fpi[..]);
352 4157 : } else {
353 : // All the current WAL record types that we can handle require a base image.
354 0 : error!("invalid neon WAL redo request with no base image");
355 0 : return Err(WalRedoError::InvalidRequest);
356 : }
357 :
358 : // Apply all the WAL records in the batch
359 28083469 : for (record_lsn, record) in records.iter() {
360 28083469 : self.apply_record_neon(key, &mut page, *record_lsn, record)?;
361 : }
362 : // Success!
363 4157 : let end_time = Instant::now();
364 4157 : let duration = end_time.duration_since(start_time);
365 4157 : WAL_REDO_TIME.observe(duration.as_secs_f64());
366 4157 :
367 4157 : debug!(
368 0 : "neon applied {} WAL records in {} ms to reconstruct page image at LSN {}",
369 0 : records.len(),
370 0 : duration.as_micros(),
371 0 : lsn
372 0 : );
373 :
374 4157 : Ok(page.freeze())
375 4157 : }
376 :
377 28083469 : fn apply_record_neon(
378 28083469 : &self,
379 28083469 : key: Key,
380 28083469 : page: &mut BytesMut,
381 28083469 : _record_lsn: Lsn,
382 28083469 : record: &NeonWalRecord,
383 28083469 : ) -> Result<(), WalRedoError> {
384 28083469 : match record {
385 : NeonWalRecord::Postgres {
386 : will_init: _,
387 : rec: _,
388 : } => {
389 0 : error!("tried to pass postgres wal record to neon WAL redo");
390 0 : return Err(WalRedoError::InvalidRequest);
391 : }
392 : NeonWalRecord::ClearVisibilityMapFlags {
393 3230 : new_heap_blkno,
394 3230 : old_heap_blkno,
395 3230 : flags,
396 : } => {
397 : // sanity check that this is modifying the correct relation
398 3230 : let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
399 3230 : assert!(
400 3230 : rel.forknum == VISIBILITYMAP_FORKNUM,
401 0 : "ClearVisibilityMapFlags record on unexpected rel {}",
402 : rel
403 : );
404 3230 : if let Some(heap_blkno) = *new_heap_blkno {
405 : // Calculate the VM block and offset that corresponds to the heap block.
406 366 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
407 366 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
408 366 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
409 366 :
410 366 : // Check that we're modifying the correct VM block.
411 366 : assert!(map_block == blknum);
412 :
413 : // equivalent to PageGetContents(page)
414 366 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
415 366 :
416 366 : map[map_byte as usize] &= !(flags << map_offset);
417 2864 : }
418 :
419 : // Repeat for 'old_heap_blkno', if any
420 3230 : if let Some(heap_blkno) = *old_heap_blkno {
421 2867 : let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
422 2867 : let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
423 2867 : let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
424 2867 :
425 2867 : assert!(map_block == blknum);
426 :
427 2867 : let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
428 2867 :
429 2867 : map[map_byte as usize] &= !(flags << map_offset);
430 363 : }
431 : }
432 : // Non-relational WAL records are handled here, with custom code that has the
433 : // same effects as the corresponding Postgres WAL redo function.
434 27983072 : NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
435 27983072 : let (slru_kind, segno, blknum) =
436 27983072 : key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
437 27983072 : assert_eq!(
438 : slru_kind,
439 : SlruKind::Clog,
440 0 : "ClogSetCommitted record with unexpected key {}",
441 : key
442 : );
443 56066157 : for &xid in xids {
444 28083085 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
445 28083085 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
446 28083085 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
447 28083085 :
448 28083085 : // Check that we're modifying the correct CLOG block.
449 28083085 : assert!(
450 28083085 : segno == expected_segno,
451 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
452 : xid,
453 : key
454 : );
455 28083085 : assert!(
456 28083085 : blknum == expected_blknum,
457 0 : "ClogSetCommitted record for XID {} with unexpected key {}",
458 : xid,
459 : key
460 : );
461 :
462 28083085 : transaction_id_set_status(
463 28083085 : xid,
464 28083085 : pg_constants::TRANSACTION_STATUS_COMMITTED,
465 28083085 : page,
466 28083085 : );
467 : }
468 :
469 : // Append the timestamp
470 27983072 : if page.len() == BLCKSZ as usize + 8 {
471 27979693 : page.truncate(BLCKSZ as usize);
472 27979693 : }
473 27983072 : if page.len() == BLCKSZ as usize {
474 27983072 : page.extend_from_slice(×tamp.to_be_bytes());
475 27983072 : } else {
476 0 : warn!(
477 0 : "CLOG blk {} in seg {} has invalid size {}",
478 0 : blknum,
479 0 : segno,
480 0 : page.len()
481 0 : );
482 : }
483 : }
484 1295 : NeonWalRecord::ClogSetAborted { xids } => {
485 1295 : let (slru_kind, segno, blknum) =
486 1295 : key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
487 1295 : assert_eq!(
488 : slru_kind,
489 : SlruKind::Clog,
490 0 : "ClogSetAborted record with unexpected key {}",
491 : key
492 : );
493 2597 : for &xid in xids {
494 1302 : let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
495 1302 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
496 1302 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
497 1302 :
498 1302 : // Check that we're modifying the correct CLOG block.
499 1302 : assert!(
500 1302 : segno == expected_segno,
501 0 : "ClogSetAborted record for XID {} with unexpected key {}",
502 : xid,
503 : key
504 : );
505 1302 : assert!(
506 1302 : blknum == expected_blknum,
507 0 : "ClogSetAborted record for XID {} with unexpected key {}",
508 : xid,
509 : key
510 : );
511 :
512 1302 : transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
513 : }
514 : }
515 47662 : NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
516 47662 : let (slru_kind, segno, blknum) =
517 47662 : key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
518 47662 : assert_eq!(
519 : slru_kind,
520 : SlruKind::MultiXactOffsets,
521 0 : "MultixactOffsetCreate record with unexpected key {}",
522 : key
523 : );
524 : // Compute the block and offset to modify.
525 : // See RecordNewMultiXact in PostgreSQL sources.
526 47662 : let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
527 47662 : let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
528 47662 : let offset = (entryno * 4) as usize;
529 47662 :
530 47662 : // Check that we're modifying the correct multixact-offsets block.
531 47662 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
532 47662 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
533 47662 : assert!(
534 47662 : segno == expected_segno,
535 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
536 : mid,
537 : key
538 : );
539 47662 : assert!(
540 47662 : blknum == expected_blknum,
541 0 : "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
542 : mid,
543 : key
544 : );
545 :
546 47662 : LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
547 : }
548 48210 : NeonWalRecord::MultixactMembersCreate { moff, members } => {
549 48210 : let (slru_kind, segno, blknum) =
550 48210 : key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
551 48210 : assert_eq!(
552 : slru_kind,
553 : SlruKind::MultiXactMembers,
554 0 : "MultixactMembersCreate record with unexpected key {}",
555 : key
556 : );
557 945038 : for (i, member) in members.iter().enumerate() {
558 945038 : let offset = moff + i as u32;
559 945038 :
560 945038 : // Compute the block and offset to modify.
561 945038 : // See RecordNewMultiXact in PostgreSQL sources.
562 945038 : let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
563 945038 : let memberoff = mx_offset_to_member_offset(offset);
564 945038 : let flagsoff = mx_offset_to_flags_offset(offset);
565 945038 : let bshift = mx_offset_to_flags_bitshift(offset);
566 945038 :
567 945038 : // Check that we're modifying the correct multixact-members block.
568 945038 : let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
569 945038 : let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
570 945038 : assert!(
571 945038 : segno == expected_segno,
572 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
573 : moff,
574 : key
575 : );
576 945038 : assert!(
577 945038 : blknum == expected_blknum,
578 0 : "MultiXactMembersCreate record for offset {} with unexpected key {}",
579 : moff,
580 : key
581 : );
582 :
583 945038 : let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
584 945038 : flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
585 945038 : flagsval |= member.status << bshift;
586 945038 : LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
587 945038 : LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
588 : }
589 : }
590 : }
591 :
592 28083469 : Ok(())
593 28083469 : }
594 : }
595 :
596 : ///
597 : /// Command with ability not to give all file descriptors to child process
598 : ///
599 : trait CloseFileDescriptors: CommandExt {
600 : ///
601 : /// Close file descriptors (other than stdin, stdout, stderr) in child process
602 : ///
603 : fn close_fds(&mut self) -> &mut Command;
604 : }
605 :
606 : impl<C: CommandExt> CloseFileDescriptors for C {
607 402 : fn close_fds(&mut self) -> &mut Command {
608 402 : unsafe {
609 402 : self.pre_exec(move || {
610 0 : // SAFETY: Code executed inside pre_exec should have async-signal-safety,
611 0 : // which means it should be safe to execute inside a signal handler.
612 0 : // The precise meaning depends on platform. See `man signal-safety`
613 0 : // for the linux definition.
614 0 : //
615 0 : // The set_fds_cloexec_threadsafe function is documented to be
616 0 : // async-signal-safe.
617 0 : //
618 0 : // Aside from this function, the rest of the code is re-entrant and
619 0 : // doesn't make any syscalls. We're just passing constants.
620 0 : //
621 0 : // NOTE: It's easy to indirectly cause a malloc or lock a mutex,
622 0 : // which is not async-signal-safe. Be careful.
623 0 : close_fds::set_fds_cloexec_threadsafe(3, &[]);
624 0 : Ok(())
625 402 : })
626 402 : }
627 402 : }
628 : }
629 :
630 : impl PostgresRedoManager {
631 : //
632 : // Start postgres binary in special WAL redo mode.
633 : //
634 402 : #[instrument(skip_all,fields(tenant_id=%self.tenant_id, pg_version=pg_version))]
635 : fn launch(
636 : &self,
637 : input: &mut MutexGuard<Option<ProcessInput>>,
638 : pg_version: u32,
639 : ) -> Result<(), Error> {
640 : // Previous versions of wal-redo required data directory and that directories
641 : // occupied some space on disk. Remove it if we face it.
642 : //
643 : // This code could be dropped after one release cycle.
644 : let legacy_datadir = path_with_suffix_extension(
645 : self.conf
646 : .tenant_path(&self.tenant_id)
647 : .join("wal-redo-datadir"),
648 : TEMP_FILE_SUFFIX,
649 : );
650 : if legacy_datadir.exists() {
651 0 : info!("legacy wal-redo datadir {legacy_datadir:?} exists, removing");
652 0 : fs::remove_dir_all(&legacy_datadir).map_err(|e| {
653 0 : Error::new(
654 0 : e.kind(),
655 0 : format!("legacy wal-redo datadir {legacy_datadir:?} removal failure: {e}"),
656 0 : )
657 0 : })?;
658 : }
659 :
660 : let pg_bin_dir_path = self
661 : .conf
662 : .pg_bin_dir(pg_version)
663 0 : .map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_bin_dir path: {e}")))?;
664 : let pg_lib_dir_path = self
665 : .conf
666 : .pg_lib_dir(pg_version)
667 0 : .map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_lib_dir path: {e}")))?;
668 :
669 : // Start postgres itself
670 : let child = Command::new(pg_bin_dir_path.join("postgres"))
671 : .arg("--wal-redo")
672 : .stdin(Stdio::piped())
673 : .stderr(Stdio::piped())
674 : .stdout(Stdio::piped())
675 : .env_clear()
676 : .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
677 : .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
678 : // The redo process is not trusted, and runs in seccomp mode that
679 : // doesn't allow it to open any files. We have to also make sure it
680 : // doesn't inherit any file descriptors from the pageserver, that
681 : // would allow an attacker to read any files that happen to be open
682 : // in the pageserver.
683 : //
684 : // The Rust standard library makes sure to mark any file descriptors with
685 : // as close-on-exec by default, but that's not enough, since we use
686 : // libraries that directly call libc open without setting that flag.
687 : .close_fds()
688 : .spawn_no_leak_child(self.tenant_id)
689 0 : .map_err(|e| {
690 0 : Error::new(
691 0 : e.kind(),
692 0 : format!("postgres --wal-redo command failed to start: {}", e),
693 0 : )
694 0 : })?;
695 :
696 0 : let mut child = scopeguard::guard(child, |child| {
697 0 : error!("killing wal-redo-postgres process due to a problem during launch");
698 0 : child.kill_and_wait();
699 0 : });
700 :
701 : let stdin = child.stdin.take().unwrap();
702 : let stdout = child.stdout.take().unwrap();
703 : let stderr = child.stderr.take().unwrap();
704 :
705 : macro_rules! set_nonblock_or_log_err {
706 : ($file:ident) => {{
707 : let res = set_nonblock($file.as_raw_fd());
708 : if let Err(e) = &res {
709 : error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
710 : }
711 : res
712 : }};
713 : }
714 0 : set_nonblock_or_log_err!(stdin)?;
715 0 : set_nonblock_or_log_err!(stdout)?;
716 0 : set_nonblock_or_log_err!(stderr)?;
717 :
718 : // all fallible operations post-spawn are complete, so get rid of the guard
719 : let child = scopeguard::ScopeGuard::into_inner(child);
720 :
721 : **input = Some(ProcessInput {
722 : child,
723 : stdout_fd: stdout.as_raw_fd(),
724 : stderr_fd: stderr.as_raw_fd(),
725 : stdin,
726 : n_requests: 0,
727 : });
728 :
729 : *self.stdout.lock().unwrap() = Some(ProcessOutput {
730 : stdout,
731 : pending_responses: VecDeque::new(),
732 : n_processed_responses: 0,
733 : });
734 : *self.stderr.lock().unwrap() = Some(stderr);
735 :
736 : Ok(())
737 : }
738 :
739 : // Apply given WAL records ('records') over an old page image. Returns
740 : // new page image.
741 : //
742 2758027 : #[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%input.as_ref().unwrap().child.id()))]
743 : fn apply_wal_records(
744 : &self,
745 : mut input: MutexGuard<Option<ProcessInput>>,
746 : tag: BufferTag,
747 : base_img: &Option<Bytes>,
748 : records: &[(Lsn, NeonWalRecord)],
749 : wal_redo_timeout: Duration,
750 : ) -> Result<Bytes, std::io::Error> {
751 : // Serialize all the messages to send the WAL redo process first.
752 : //
753 : // This could be problematic if there are millions of records to replay,
754 : // but in practice the number of records is usually so small that it doesn't
755 : // matter, and it's better to keep this code simple.
756 : //
757 : // Most requests start with a before-image with BLCKSZ bytes, followed by
758 : // by some other WAL records. Start with a buffer that can hold that
759 : // comfortably.
760 : let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
761 : build_begin_redo_for_block_msg(tag, &mut writebuf);
762 : if let Some(img) = base_img {
763 : build_push_page_msg(tag, img, &mut writebuf);
764 : }
765 : for (lsn, rec) in records.iter() {
766 : if let NeonWalRecord::Postgres {
767 : will_init: _,
768 : rec: postgres_rec,
769 : } = rec
770 : {
771 : build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
772 : } else {
773 : return Err(Error::new(
774 : ErrorKind::Other,
775 : "tried to pass neon wal record to postgres WAL redo",
776 : ));
777 : }
778 : }
779 : build_get_page_msg(tag, &mut writebuf);
780 : WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
781 :
782 : let proc = input.as_mut().unwrap();
783 : let mut nwrite = 0usize;
784 : let stdout_fd = proc.stdout_fd;
785 :
786 : // Prepare for calling poll()
787 : let mut pollfds = [
788 : PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT),
789 : PollFd::new(proc.stderr_fd, PollFlags::POLLIN),
790 : PollFd::new(stdout_fd, PollFlags::POLLIN),
791 : ];
792 :
793 : // We do two things simultaneously: send the old base image and WAL records to
794 : // the child process's stdin and forward any logging
795 : // information that the child writes to its stderr to the page server's log.
796 : while nwrite < writebuf.len() {
797 : let n = loop {
798 : match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) {
799 : Err(e) if e == nix::errno::Errno::EINTR => continue,
800 : res => break res,
801 : }
802 : }?;
803 :
804 : if n == 0 {
805 : return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
806 : }
807 :
808 : // If we have some messages in stderr, forward them to the log.
809 : let err_revents = pollfds[1].revents().unwrap();
810 : if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
811 : let mut errbuf: [u8; 16384] = [0; 16384];
812 : let mut stderr_guard = self.stderr.lock().unwrap();
813 : let stderr = stderr_guard.as_mut().unwrap();
814 : let len = stderr.read(&mut errbuf)?;
815 :
816 : // The message might not be split correctly into lines here. But this is
817 : // good enough, the important thing is to get the message to the log.
818 : if len > 0 {
819 0 : error!(
820 0 : "wal-redo-postgres: {}",
821 0 : String::from_utf8_lossy(&errbuf[0..len])
822 0 : );
823 :
824 : // To make sure we capture all log from the process if it fails, keep
825 : // reading from the stderr, before checking the stdout.
826 : continue;
827 : }
828 : } else if err_revents.contains(PollFlags::POLLHUP) {
829 : return Err(Error::new(
830 : ErrorKind::BrokenPipe,
831 : "WAL redo process closed its stderr unexpectedly",
832 : ));
833 : }
834 :
835 : // If 'stdin' is writeable, do write.
836 : let in_revents = pollfds[0].revents().unwrap();
837 : if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
838 : nwrite += proc.stdin.write(&writebuf[nwrite..])?;
839 : } else if in_revents.contains(PollFlags::POLLHUP) {
840 : // We still have more data to write, but the process closed the pipe.
841 : return Err(Error::new(
842 : ErrorKind::BrokenPipe,
843 : "WAL redo process closed its stdin unexpectedly",
844 : ));
845 : }
846 : }
847 : let request_no = proc.n_requests;
848 : proc.n_requests += 1;
849 : drop(input);
850 :
851 : // To improve walredo performance we separate sending requests and receiving
852 : // responses. Them are protected by different mutexes (output and input).
853 : // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
854 : // then there is not warranty that T1 will first granted output mutex lock.
855 : // To address this issue we maintain number of sent requests, number of processed
856 : // responses and ring buffer with pending responses. After sending response
857 : // (under input mutex), threads remembers request number. Then it releases
858 : // input mutex, locks output mutex and fetch in ring buffer all responses until
859 : // its stored request number. The it takes correspondent element from
860 : // pending responses ring buffer and truncate all empty elements from the front,
861 : // advancing processed responses number.
862 :
863 : let mut output_guard = self.stdout.lock().unwrap();
864 : let output = output_guard.as_mut().unwrap();
865 : if output.stdout.as_raw_fd() != stdout_fd {
866 : // If stdout file descriptor is changed then it means that walredo process is crashed and restarted.
867 : // As far as ProcessInput and ProcessOutout are protected by different mutexes,
868 : // it can happen that we send request to one process and waiting response from another.
869 : // To prevent such situation we compare stdout file descriptors.
870 : // As far as old stdout pipe is destroyed only after new one is created,
871 : // it can not reuse the same file descriptor, so this check is safe.
872 : //
873 : // Cross-read this with the comment in apply_batch_postgres if result.is_err().
874 : // That's where we kill the child process.
875 : return Err(Error::new(
876 : ErrorKind::BrokenPipe,
877 : "WAL redo process closed its stdout unexpectedly",
878 : ));
879 : }
880 : let n_processed_responses = output.n_processed_responses;
881 : while n_processed_responses + output.pending_responses.len() <= request_no {
882 : // We expect the WAL redo process to respond with an 8k page image. We read it
883 : // into this buffer.
884 : let mut resultbuf = vec![0; BLCKSZ.into()];
885 : let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
886 : while nresult < BLCKSZ.into() {
887 : // We do two things simultaneously: reading response from stdout
888 : // and forward any logging information that the child writes to its stderr to the page server's log.
889 : let n = loop {
890 : match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) {
891 : Err(e) if e == nix::errno::Errno::EINTR => continue,
892 : res => break res,
893 : }
894 : }?;
895 :
896 : if n == 0 {
897 : return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
898 : }
899 :
900 : // If we have some messages in stderr, forward them to the log.
901 : let err_revents = pollfds[1].revents().unwrap();
902 : if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
903 : let mut errbuf: [u8; 16384] = [0; 16384];
904 : let mut stderr_guard = self.stderr.lock().unwrap();
905 : let stderr = stderr_guard.as_mut().unwrap();
906 : let len = stderr.read(&mut errbuf)?;
907 :
908 : // The message might not be split correctly into lines here. But this is
909 : // good enough, the important thing is to get the message to the log.
910 : if len > 0 {
911 2 : error!(
912 2 : "wal-redo-postgres: {}",
913 2 : String::from_utf8_lossy(&errbuf[0..len])
914 2 : );
915 :
916 : // To make sure we capture all log from the process if it fails, keep
917 : // reading from the stderr, before checking the stdout.
918 : continue;
919 : }
920 : } else if err_revents.contains(PollFlags::POLLHUP) {
921 : return Err(Error::new(
922 : ErrorKind::BrokenPipe,
923 : "WAL redo process closed its stderr unexpectedly",
924 : ));
925 : }
926 :
927 : // If we have some data in stdout, read it to the result buffer.
928 : let out_revents = pollfds[2].revents().unwrap();
929 : if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
930 : nresult += output.stdout.read(&mut resultbuf[nresult..])?;
931 : } else if out_revents.contains(PollFlags::POLLHUP) {
932 : return Err(Error::new(
933 : ErrorKind::BrokenPipe,
934 : "WAL redo process closed its stdout unexpectedly",
935 : ));
936 : }
937 : }
938 : output
939 : .pending_responses
940 : .push_back(Some(Bytes::from(resultbuf)));
941 : }
942 : // Replace our request's response with None in `pending_responses`.
943 : // Then make space in the ring buffer by clearing out any seqence of contiguous
944 : // `None`'s from the front of `pending_responses`.
945 : // NB: We can't pop_front() because other requests' responses because another
946 : // requester might have grabbed the output mutex before us:
947 : // T1: grab input mutex
948 : // T1: send request_no 23
949 : // T1: release input mutex
950 : // T2: grab input mutex
951 : // T2: send request_no 24
952 : // T2: release input mutex
953 : // T2: grab output mutex
954 : // T2: n_processed_responses + output.pending_responses.len() <= request_no
955 : // 23 0 24
956 : // T2: enters poll loop that reads stdout
957 : // T2: put response for 23 into pending_responses
958 : // T2: put response for 24 into pending_resposnes
959 : // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
960 : // T2: takes its response_24
961 : // pending_responses now looks like this: Front Some(response_23) None Back
962 : // T2: does the while loop below
963 : // pending_responses now looks like this: Front Some(response_23) None Back
964 : // T2: releases output mutex
965 : // T1: grabs output mutex
966 : // T1: n_processed_responses + output.pending_responses.len() > request_no
967 : // 23 2 23
968 : // T1: skips poll loop that reads stdout
969 : // T1: takes its response_23
970 : // pending_responses now looks like this: Front None None Back
971 : // T2: does the while loop below
972 : // pending_responses now looks like this: Front Back
973 : // n_processed_responses now has value 25
974 : let res = output.pending_responses[request_no - n_processed_responses]
975 : .take()
976 : .expect("we own this request_no, nobody else is supposed to take it");
977 : while let Some(front) = output.pending_responses.front() {
978 : if front.is_none() {
979 : output.pending_responses.pop_front();
980 : output.n_processed_responses += 1;
981 : } else {
982 : break;
983 : }
984 : }
985 : Ok(res)
986 : }
987 : }
988 :
989 : /// Wrapper type around `std::process::Child` which guarantees that the child
990 : /// will be killed and waited-for by this process before being dropped.
991 : struct NoLeakChild {
992 : tenant_id: TenantId,
993 : child: Option<Child>,
994 : }
995 :
996 : impl Deref for NoLeakChild {
997 : type Target = Child;
998 :
999 2758027 : fn deref(&self) -> &Self::Target {
1000 2758027 : self.child.as_ref().expect("must not use from drop")
1001 2758027 : }
1002 : }
1003 :
1004 : impl DerefMut for NoLeakChild {
1005 1206 : fn deref_mut(&mut self) -> &mut Self::Target {
1006 1206 : self.child.as_mut().expect("must not use from drop")
1007 1206 : }
1008 : }
1009 :
1010 : impl NoLeakChild {
1011 402 : fn spawn(tenant_id: TenantId, command: &mut Command) -> io::Result<Self> {
1012 402 : let child = command.spawn()?;
1013 402 : Ok(NoLeakChild {
1014 402 : tenant_id,
1015 402 : child: Some(child),
1016 402 : })
1017 402 : }
1018 :
1019 0 : fn kill_and_wait(mut self) {
1020 0 : let child = match self.child.take() {
1021 0 : Some(child) => child,
1022 0 : None => return,
1023 : };
1024 0 : Self::kill_and_wait_impl(child);
1025 0 : }
1026 :
1027 65 : #[instrument(skip_all, fields(pid=child.id()))]
1028 : fn kill_and_wait_impl(mut child: Child) {
1029 : let res = child.kill();
1030 : if let Err(e) = res {
1031 : // This branch is very unlikely because:
1032 : // - We (= pageserver) spawned this process successfully, so, we're allowed to kill it.
1033 : // - This is the only place that calls .kill()
1034 : // - We consume `self`, so, .kill() can't be called twice.
1035 : // - If the process exited by itself or was killed by someone else,
1036 : // .kill() will still succeed because we haven't wait()'ed yet.
1037 : //
1038 : // So, if we arrive here, we have really no idea what happened,
1039 : // whether the PID stored in self.child is still valid, etc.
1040 : // If this function were fallible, we'd return an error, but
1041 : // since it isn't, all we can do is log an error and proceed
1042 : // with the wait().
1043 0 : error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
1044 : }
1045 :
1046 : match child.wait() {
1047 : Ok(exit_status) => {
1048 65 : info!(exit_status = %exit_status, "wait successful");
1049 : }
1050 : Err(e) => {
1051 0 : error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)");
1052 : }
1053 : }
1054 : }
1055 : }
1056 :
1057 : impl Drop for NoLeakChild {
1058 65 : fn drop(&mut self) {
1059 65 : let child = match self.child.take() {
1060 65 : Some(child) => child,
1061 0 : None => return,
1062 : };
1063 65 : let tenant_id = self.tenant_id;
1064 65 : // Offload the kill+wait of the child process into the background.
1065 65 : // If someone stops the runtime, we'll leak the child process.
1066 65 : // We can ignore that case because we only stop the runtime on pageserver exit.
1067 65 : BACKGROUND_RUNTIME.spawn(async move {
1068 65 : tokio::task::spawn_blocking(move || {
1069 : // Intentionally don't inherit the tracing context from whoever is dropping us.
1070 : // This thread here is going to outlive of our dropper.
1071 65 : let span = tracing::info_span!("walredo", %tenant_id);
1072 65 : let _entered = span.enter();
1073 65 : Self::kill_and_wait_impl(child);
1074 65 : })
1075 65 : .await
1076 65 : });
1077 65 : }
1078 : }
1079 :
1080 : trait NoLeakChildCommandExt {
1081 : fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result<NoLeakChild>;
1082 : }
1083 :
1084 : impl NoLeakChildCommandExt for Command {
1085 402 : fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result<NoLeakChild> {
1086 402 : NoLeakChild::spawn(tenant_id, self)
1087 402 : }
1088 : }
1089 :
1090 : // Functions for constructing messages to send to the postgres WAL redo
1091 : // process. See pgxn/neon_walredo/walredoproc.c for
1092 : // explanation of the protocol.
1093 :
1094 2758027 : fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
1095 2758027 : let len = 4 + 1 + 4 * 4;
1096 2758027 :
1097 2758027 : buf.put_u8(b'B');
1098 2758027 : buf.put_u32(len as u32);
1099 2758027 :
1100 2758027 : tag.ser_into(buf)
1101 2758027 : .expect("serialize BufferTag should always succeed");
1102 2758027 : }
1103 :
1104 889355 : fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
1105 889355 : assert!(base_img.len() == 8192);
1106 :
1107 889355 : let len = 4 + 1 + 4 * 4 + base_img.len();
1108 889355 :
1109 889355 : buf.put_u8(b'P');
1110 889355 : buf.put_u32(len as u32);
1111 889355 : tag.ser_into(buf)
1112 889355 : .expect("serialize BufferTag should always succeed");
1113 889355 : buf.put(base_img);
1114 889355 : }
1115 :
1116 179396673 : fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
1117 179396673 : let len = 4 + 8 + rec.len();
1118 179396673 :
1119 179396673 : buf.put_u8(b'A');
1120 179396673 : buf.put_u32(len as u32);
1121 179396673 : buf.put_u64(endlsn.0);
1122 179396673 : buf.put(rec);
1123 179396673 : }
1124 :
1125 2758027 : fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
1126 2758027 : let len = 4 + 1 + 4 * 4;
1127 2758027 :
1128 2758027 : buf.put_u8(b'G');
1129 2758027 : buf.put_u32(len as u32);
1130 2758027 : tag.ser_into(buf)
1131 2758027 : .expect("serialize BufferTag should always succeed");
1132 2758027 : }
1133 :
1134 : #[cfg(test)]
1135 : mod tests {
1136 : use super::{PostgresRedoManager, WalRedoManager};
1137 : use crate::repository::Key;
1138 : use crate::{config::PageServerConf, walrecord::NeonWalRecord};
1139 : use bytes::Bytes;
1140 : use std::str::FromStr;
1141 : use utils::{id::TenantId, lsn::Lsn};
1142 :
1143 1 : #[test]
1144 1 : fn short_v14_redo() {
1145 1 : let expected = std::fs::read("fixtures/short_v14_redo.page").unwrap();
1146 1 :
1147 1 : let h = RedoHarness::new().unwrap();
1148 1 :
1149 1 : let page = h
1150 1 : .manager
1151 1 : .request_redo(
1152 1 : Key {
1153 1 : field1: 0,
1154 1 : field2: 1663,
1155 1 : field3: 13010,
1156 1 : field4: 1259,
1157 1 : field5: 0,
1158 1 : field6: 0,
1159 1 : },
1160 1 : Lsn::from_str("0/16E2408").unwrap(),
1161 1 : None,
1162 1 : short_records(),
1163 1 : 14,
1164 1 : )
1165 1 : .unwrap();
1166 1 :
1167 1 : assert_eq!(&expected, &*page);
1168 1 : }
1169 :
1170 1 : #[test]
1171 1 : fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
1172 1 : let h = RedoHarness::new().unwrap();
1173 1 :
1174 1 : let page = h
1175 1 : .manager
1176 1 : .request_redo(
1177 1 : Key {
1178 1 : field1: 0,
1179 1 : field2: 1663,
1180 1 : // key should be 13010
1181 1 : field3: 13130,
1182 1 : field4: 1259,
1183 1 : field5: 0,
1184 1 : field6: 0,
1185 1 : },
1186 1 : Lsn::from_str("0/16E2408").unwrap(),
1187 1 : None,
1188 1 : short_records(),
1189 1 : 14,
1190 1 : )
1191 1 : .unwrap();
1192 1 :
1193 1 : // TODO: there will be some stderr printout, which is forwarded to tracing that could
1194 1 : // perhaps be captured as long as it's in the same thread.
1195 1 : assert_eq!(page, crate::ZERO_PAGE);
1196 1 : }
1197 :
1198 : #[allow(clippy::octal_escapes)]
1199 2 : fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
1200 2 : vec![
1201 2 : (
1202 2 : Lsn::from_str("0/16A9388").unwrap(),
1203 2 : NeonWalRecord::Postgres {
1204 2 : will_init: true,
1205 2 : rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
1206 2 : }
1207 2 : ),
1208 2 : (
1209 2 : Lsn::from_str("0/16D4080").unwrap(),
1210 2 : NeonWalRecord::Postgres {
1211 2 : will_init: false,
1212 2 : rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
1213 2 : }
1214 2 : )
1215 2 : ]
1216 2 : }
1217 :
1218 : struct RedoHarness {
1219 : // underscored because unused, except for removal at drop
1220 : _repo_dir: tempfile::TempDir,
1221 : manager: PostgresRedoManager,
1222 : }
1223 :
1224 : impl RedoHarness {
1225 2 : fn new() -> anyhow::Result<Self> {
1226 2 : let repo_dir = tempfile::tempdir()?;
1227 2 : let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
1228 2 : let conf = Box::leak(Box::new(conf));
1229 2 : let tenant_id = TenantId::generate();
1230 2 :
1231 2 : let manager = PostgresRedoManager::new(conf, tenant_id);
1232 2 :
1233 2 : Ok(RedoHarness {
1234 2 : _repo_dir: repo_dir,
1235 2 : manager,
1236 2 : })
1237 2 : }
1238 : }
1239 : }
|