LCOV - code coverage report
Current view: top level - pageserver/src - walredo.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 79.6 % 530 422
Test Date: 2023-09-06 10:18:01 Functions: 47.1 % 70 33

            Line data    Source code
       1              : //!
       2              : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
       3              : //! to apply given WAL records over an old page image and return new
       4              : //! page image.
       5              : //!
       6              : //! We rely on Postgres to perform WAL redo for us. We launch a
       7              : //! postgres process in special "wal redo" mode that's similar to
       8              : //! single-user mode. We then pass the previous page image, if any,
       9              : //! and all the WAL records we want to apply, to the postgres
      10              : //! process. Then we get the page image back. Communication with the
      11              : //! postgres process happens via stdin/stdout
      12              : //!
      13              : //! See pgxn/neon_walredo/walredoproc.c for the other side of
      14              : //! this communication.
      15              : //!
      16              : //! The Postgres process is assumed to be secure against malicious WAL
      17              : //! records. It achieves it by dropping privileges before replaying
      18              : //! any WAL records, so that even if an attacker hijacks the Postgres
      19              : //! process, he cannot escape out of it.
      20              : //!
      21              : use byteorder::{ByteOrder, LittleEndian};
      22              : use bytes::{BufMut, Bytes, BytesMut};
      23              : use nix::poll::*;
      24              : use serde::Serialize;
      25              : use std::collections::VecDeque;
      26              : use std::io::prelude::*;
      27              : use std::io::{Error, ErrorKind};
      28              : use std::ops::{Deref, DerefMut};
      29              : use std::os::unix::io::{AsRawFd, RawFd};
      30              : use std::os::unix::prelude::CommandExt;
      31              : use std::process::Stdio;
      32              : use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
      33              : use std::sync::{Mutex, MutexGuard};
      34              : use std::time::Duration;
      35              : use std::time::Instant;
      36              : use std::{fs, io};
      37              : use tracing::*;
      38              : use utils::crashsafe::path_with_suffix_extension;
      39              : use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
      40              : 
      41              : use crate::metrics::{
      42              :     WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
      43              :     WAL_REDO_WAIT_TIME,
      44              : };
      45              : use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
      46              : use crate::repository::Key;
      47              : use crate::task_mgr::BACKGROUND_RUNTIME;
      48              : use crate::walrecord::NeonWalRecord;
      49              : use crate::{config::PageServerConf, TEMP_FILE_SUFFIX};
      50              : use pageserver_api::reltag::{RelTag, SlruKind};
      51              : use postgres_ffi::pg_constants;
      52              : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
      53              : use postgres_ffi::v14::nonrelfile_utils::{
      54              :     mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
      55              :     transaction_id_set_status,
      56              : };
      57              : use postgres_ffi::BLCKSZ;
      58              : 
      59              : ///
      60              : /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
      61              : ///
      62              : /// In Postgres `BufferTag` structure is used for exactly the same purpose.
      63              : /// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91).
      64              : ///
      65      6405409 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize)]
      66              : pub struct BufferTag {
      67              :     pub rel: RelTag,
      68              :     pub blknum: u32,
      69              : }
      70              : 
      71              : ///
      72              : /// WAL Redo Manager is responsible for replaying WAL records.
      73              : ///
      74              : /// Callers use the WAL redo manager through this abstract interface,
      75              : /// which makes it easy to mock it in tests.
      76              : pub trait WalRedoManager: Send + Sync {
      77              :     /// Apply some WAL records.
      78              :     ///
      79              :     /// The caller passes an old page image, and WAL records that should be
      80              :     /// applied over it. The return value is a new page image, after applying
      81              :     /// the reords.
      82              :     fn request_redo(
      83              :         &self,
      84              :         key: Key,
      85              :         lsn: Lsn,
      86              :         base_img: Option<(Lsn, Bytes)>,
      87              :         records: Vec<(Lsn, NeonWalRecord)>,
      88              :         pg_version: u32,
      89              :     ) -> Result<Bytes, WalRedoError>;
      90              : }
      91              : 
      92              : struct ProcessInput {
      93              :     child: NoLeakChild,
      94              :     stdin: ChildStdin,
      95              :     stderr_fd: RawFd,
      96              :     stdout_fd: RawFd,
      97              :     n_requests: usize,
      98              : }
      99              : 
     100              : struct ProcessOutput {
     101              :     stdout: ChildStdout,
     102              :     pending_responses: VecDeque<Option<Bytes>>,
     103              :     n_processed_responses: usize,
     104              : }
     105              : 
     106              : ///
     107              : /// This is the real implementation that uses a Postgres process to
     108              : /// perform WAL replay. Only one thread can use the process at a time,
     109              : /// that is controlled by the Mutex. In the future, we might want to
     110              : /// launch a pool of processes to allow concurrent replay of multiple
     111              : /// records.
     112              : ///
     113              : pub struct PostgresRedoManager {
     114              :     tenant_id: TenantId,
     115              :     conf: &'static PageServerConf,
     116              : 
     117              :     stdout: Mutex<Option<ProcessOutput>>,
     118              :     stdin: Mutex<Option<ProcessInput>>,
     119              :     stderr: Mutex<Option<ChildStderr>>,
     120              : }
     121              : 
     122              : /// Can this request be served by neon redo functions
     123              : /// or we need to pass it to wal-redo postgres process?
     124    207480142 : fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
     125    207480142 :     // Currently, we don't have bespoken Rust code to replay any
     126    207480142 :     // Postgres WAL records. But everything else is handled in neon.
     127    207480142 :     #[allow(clippy::match_like_matches_macro)]
     128    207480142 :     match rec {
     129              :         NeonWalRecord::Postgres {
     130              :             will_init: _,
     131              :             rec: _,
     132    179396673 :         } => false,
     133     28083469 :         _ => true,
     134              :     }
     135    207480142 : }
     136              : 
     137              : /// An error happened in WAL redo
     138            0 : #[derive(Debug, thiserror::Error)]
     139              : pub enum WalRedoError {
     140              :     #[error(transparent)]
     141              :     IoError(#[from] std::io::Error),
     142              : 
     143              :     #[error("cannot perform WAL redo now")]
     144              :     InvalidState,
     145              :     #[error("cannot perform WAL redo for this request")]
     146              :     InvalidRequest,
     147              :     #[error("cannot perform WAL redo for this record")]
     148              :     InvalidRecord,
     149              : }
     150              : 
     151              : ///
     152              : /// Public interface of WAL redo manager
     153              : ///
     154              : impl WalRedoManager for PostgresRedoManager {
     155              :     ///
     156              :     /// Request the WAL redo manager to apply some WAL records
     157              :     ///
     158              :     /// The WAL redo is handled by a separate thread, so this just sends a request
     159              :     /// to the thread and waits for response.
     160              :     ///
     161      2762175 :     fn request_redo(
     162      2762175 :         &self,
     163      2762175 :         key: Key,
     164      2762175 :         lsn: Lsn,
     165      2762175 :         base_img: Option<(Lsn, Bytes)>,
     166      2762175 :         records: Vec<(Lsn, NeonWalRecord)>,
     167      2762175 :         pg_version: u32,
     168      2762175 :     ) -> Result<Bytes, WalRedoError> {
     169      2762175 :         if records.is_empty() {
     170            0 :             error!("invalid WAL redo request with no records");
     171            0 :             return Err(WalRedoError::InvalidRequest);
     172      2762175 :         }
     173      2762175 : 
     174      2762175 :         let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
     175      2762175 :         let mut img = base_img.map(|p| p.1);
     176      2762175 :         let mut batch_neon = can_apply_in_neon(&records[0].1);
     177      2762175 :         let mut batch_start = 0;
     178    204717967 :         for (i, record) in records.iter().enumerate().skip(1) {
     179    204717967 :             let rec_neon = can_apply_in_neon(&record.1);
     180    204717967 : 
     181    204717967 :             if rec_neon != batch_neon {
     182            9 :                 let result = if batch_neon {
     183            7 :                     self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
     184              :                 } else {
     185            2 :                     self.apply_batch_postgres(
     186            2 :                         key,
     187            2 :                         lsn,
     188            2 :                         img,
     189            2 :                         base_img_lsn,
     190            2 :                         &records[batch_start..i],
     191            2 :                         self.conf.wal_redo_timeout,
     192            2 :                         pg_version,
     193            2 :                     )
     194              :                 };
     195            9 :                 img = Some(result?);
     196              : 
     197            9 :                 batch_neon = rec_neon;
     198            9 :                 batch_start = i;
     199    204717958 :             }
     200              :         }
     201              :         // last batch
     202      2762175 :         if batch_neon {
     203         4150 :             self.apply_batch_neon(key, lsn, img, &records[batch_start..])
     204              :         } else {
     205      2758025 :             self.apply_batch_postgres(
     206      2758025 :                 key,
     207      2758025 :                 lsn,
     208      2758025 :                 img,
     209      2758025 :                 base_img_lsn,
     210      2758025 :                 &records[batch_start..],
     211      2758025 :                 self.conf.wal_redo_timeout,
     212      2758025 :                 pg_version,
     213      2758025 :             )
     214              :         }
     215      2762175 :     }
     216              : }
     217              : 
     218              : impl PostgresRedoManager {
     219              :     ///
     220              :     /// Create a new PostgresRedoManager.
     221              :     ///
     222          741 :     pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
     223          741 :         // The actual process is launched lazily, on first request.
     224          741 :         PostgresRedoManager {
     225          741 :             tenant_id,
     226          741 :             conf,
     227          741 :             stdin: Mutex::new(None),
     228          741 :             stdout: Mutex::new(None),
     229          741 :             stderr: Mutex::new(None),
     230          741 :         }
     231          741 :     }
     232              : 
     233              :     /// Launch process pre-emptively. Should not be needed except for benchmarking.
     234            0 :     pub fn launch_process(&self, pg_version: u32) -> anyhow::Result<()> {
     235            0 :         let mut proc = self.stdin.lock().unwrap();
     236            0 :         if proc.is_none() {
     237            0 :             self.launch(&mut proc, pg_version)?;
     238            0 :         }
     239            0 :         Ok(())
     240            0 :     }
     241              : 
     242              :     ///
     243              :     /// Process one request for WAL redo using wal-redo postgres
     244              :     ///
     245              :     #[allow(clippy::too_many_arguments)]
     246      2758027 :     fn apply_batch_postgres(
     247      2758027 :         &self,
     248      2758027 :         key: Key,
     249      2758027 :         lsn: Lsn,
     250      2758027 :         base_img: Option<Bytes>,
     251      2758027 :         base_img_lsn: Lsn,
     252      2758027 :         records: &[(Lsn, NeonWalRecord)],
     253      2758027 :         wal_redo_timeout: Duration,
     254      2758027 :         pg_version: u32,
     255      2758027 :     ) -> Result<Bytes, WalRedoError> {
     256      2758027 :         let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
     257              :         const MAX_RETRY_ATTEMPTS: u32 = 1;
     258      2758027 :         let start_time = Instant::now();
     259      2758027 :         let mut n_attempts = 0u32;
     260      2758027 :         loop {
     261      2758027 :             let mut proc = self.stdin.lock().unwrap();
     262      2758027 :             let lock_time = Instant::now();
     263      2758027 : 
     264      2758027 :             // launch the WAL redo process on first use
     265      2758027 :             if proc.is_none() {
     266          402 :                 self.launch(&mut proc, pg_version)?;
     267      2757625 :             }
     268      2758027 :             WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
     269      2758027 : 
     270      2758027 :             // Relational WAL records are applied using wal-redo-postgres
     271      2758027 :             let buf_tag = BufferTag { rel, blknum };
     272      2758027 :             let result = self
     273      2758027 :                 .apply_wal_records(proc, buf_tag, &base_img, records, wal_redo_timeout)
     274      2758027 :                 .map_err(WalRedoError::IoError);
     275      2758027 : 
     276      2758027 :             let end_time = Instant::now();
     277      2758027 :             let duration = end_time.duration_since(lock_time);
     278      2758027 : 
     279      2758027 :             let len = records.len();
     280    179396673 :             let nbytes = records.iter().fold(0, |acumulator, record| {
     281    179396673 :                 acumulator
     282    179396673 :                     + match &record.1 {
     283    179396673 :                         NeonWalRecord::Postgres { rec, .. } => rec.len(),
     284            0 :                         _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
     285              :                     }
     286    179396673 :             });
     287      2758027 : 
     288      2758027 :             WAL_REDO_TIME.observe(duration.as_secs_f64());
     289      2758027 :             WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
     290      2758027 :             WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
     291      2758027 : 
     292      2758027 :             debug!(
     293            0 :                                 "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
     294            0 :                                 len,
     295            0 :                                 nbytes,
     296            0 :                                 duration.as_micros(),
     297            0 :                                 lsn
     298            0 :                         );
     299              : 
     300              :             // If something went wrong, don't try to reuse the process. Kill it, and
     301              :             // next request will launch a new one.
     302      2758027 :             if result.is_err() {
     303            0 :                 error!(
     304            0 :                 "error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}",
     305            0 :                 records.len(),
     306            0 :                                 records.first().map(|p| p.0).unwrap_or(Lsn(0)),
     307            0 :                                 records.last().map(|p| p.0).unwrap_or(Lsn(0)),
     308            0 :                 nbytes,
     309            0 :                                 base_img_lsn,
     310            0 :                 lsn
     311            0 :             );
     312              :                 // self.stdin only holds stdin & stderr as_raw_fd().
     313              :                 // Dropping it as part of take() doesn't close them.
     314              :                 // The owning objects (ChildStdout and ChildStderr) are stored in
     315              :                 // self.stdout and self.stderr, respsectively.
     316              :                 // We intentionally keep them open here to avoid a race between
     317              :                 // currently running `apply_wal_records()` and a `launch()` call
     318              :                 // after we return here.
     319              :                 // The currently running `apply_wal_records()` must not read from
     320              :                 // the newly launched process.
     321              :                 // By keeping self.stdout and self.stderr open here, `launch()` will
     322              :                 // get other file descriptors for the new child's stdout and stderr,
     323              :                 // and hence the current `apply_wal_records()` calls will observe
     324              :                 //  `output.stdout.as_raw_fd() != stdout_fd` .
     325            0 :                 if let Some(proc) = self.stdin.lock().unwrap().take() {
     326            0 :                     proc.child.kill_and_wait();
     327            0 :                 }
     328      2758027 :             }
     329      2758027 :             n_attempts += 1;
     330      2758027 :             if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
     331      2758027 :                 return result;
     332            0 :             }
     333              :         }
     334      2758027 :     }
     335              : 
     336              :     ///
     337              :     /// Process a batch of WAL records using bespoken Neon code.
     338              :     ///
     339         4157 :     fn apply_batch_neon(
     340         4157 :         &self,
     341         4157 :         key: Key,
     342         4157 :         lsn: Lsn,
     343         4157 :         base_img: Option<Bytes>,
     344         4157 :         records: &[(Lsn, NeonWalRecord)],
     345         4157 :     ) -> Result<Bytes, WalRedoError> {
     346         4157 :         let start_time = Instant::now();
     347         4157 : 
     348         4157 :         let mut page = BytesMut::new();
     349         4157 :         if let Some(fpi) = base_img {
     350         4157 :             // If full-page image is provided, then use it...
     351         4157 :             page.extend_from_slice(&fpi[..]);
     352         4157 :         } else {
     353              :             // All the current WAL record types that we can handle require a base image.
     354            0 :             error!("invalid neon WAL redo request with no base image");
     355            0 :             return Err(WalRedoError::InvalidRequest);
     356              :         }
     357              : 
     358              :         // Apply all the WAL records in the batch
     359     28083469 :         for (record_lsn, record) in records.iter() {
     360     28083469 :             self.apply_record_neon(key, &mut page, *record_lsn, record)?;
     361              :         }
     362              :         // Success!
     363         4157 :         let end_time = Instant::now();
     364         4157 :         let duration = end_time.duration_since(start_time);
     365         4157 :         WAL_REDO_TIME.observe(duration.as_secs_f64());
     366         4157 : 
     367         4157 :         debug!(
     368            0 :             "neon applied {} WAL records in {} ms to reconstruct page image at LSN {}",
     369            0 :             records.len(),
     370            0 :             duration.as_micros(),
     371            0 :             lsn
     372            0 :         );
     373              : 
     374         4157 :         Ok(page.freeze())
     375         4157 :     }
     376              : 
     377     28083469 :     fn apply_record_neon(
     378     28083469 :         &self,
     379     28083469 :         key: Key,
     380     28083469 :         page: &mut BytesMut,
     381     28083469 :         _record_lsn: Lsn,
     382     28083469 :         record: &NeonWalRecord,
     383     28083469 :     ) -> Result<(), WalRedoError> {
     384     28083469 :         match record {
     385              :             NeonWalRecord::Postgres {
     386              :                 will_init: _,
     387              :                 rec: _,
     388              :             } => {
     389            0 :                 error!("tried to pass postgres wal record to neon WAL redo");
     390            0 :                 return Err(WalRedoError::InvalidRequest);
     391              :             }
     392              :             NeonWalRecord::ClearVisibilityMapFlags {
     393         3230 :                 new_heap_blkno,
     394         3230 :                 old_heap_blkno,
     395         3230 :                 flags,
     396              :             } => {
     397              :                 // sanity check that this is modifying the correct relation
     398         3230 :                 let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
     399         3230 :                 assert!(
     400         3230 :                     rel.forknum == VISIBILITYMAP_FORKNUM,
     401            0 :                     "ClearVisibilityMapFlags record on unexpected rel {}",
     402              :                     rel
     403              :                 );
     404         3230 :                 if let Some(heap_blkno) = *new_heap_blkno {
     405              :                     // Calculate the VM block and offset that corresponds to the heap block.
     406          366 :                     let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
     407          366 :                     let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
     408          366 :                     let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
     409          366 : 
     410          366 :                     // Check that we're modifying the correct VM block.
     411          366 :                     assert!(map_block == blknum);
     412              : 
     413              :                     // equivalent to PageGetContents(page)
     414          366 :                     let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
     415          366 : 
     416          366 :                     map[map_byte as usize] &= !(flags << map_offset);
     417         2864 :                 }
     418              : 
     419              :                 // Repeat for 'old_heap_blkno', if any
     420         3230 :                 if let Some(heap_blkno) = *old_heap_blkno {
     421         2867 :                     let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
     422         2867 :                     let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
     423         2867 :                     let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
     424         2867 : 
     425         2867 :                     assert!(map_block == blknum);
     426              : 
     427         2867 :                     let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
     428         2867 : 
     429         2867 :                     map[map_byte as usize] &= !(flags << map_offset);
     430          363 :                 }
     431              :             }
     432              :             // Non-relational WAL records are handled here, with custom code that has the
     433              :             // same effects as the corresponding Postgres WAL redo function.
     434     27983072 :             NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
     435     27983072 :                 let (slru_kind, segno, blknum) =
     436     27983072 :                     key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
     437     27983072 :                 assert_eq!(
     438              :                     slru_kind,
     439              :                     SlruKind::Clog,
     440            0 :                     "ClogSetCommitted record with unexpected key {}",
     441              :                     key
     442              :                 );
     443     56066157 :                 for &xid in xids {
     444     28083085 :                     let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
     445     28083085 :                     let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     446     28083085 :                     let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     447     28083085 : 
     448     28083085 :                     // Check that we're modifying the correct CLOG block.
     449     28083085 :                     assert!(
     450     28083085 :                         segno == expected_segno,
     451            0 :                         "ClogSetCommitted record for XID {} with unexpected key {}",
     452              :                         xid,
     453              :                         key
     454              :                     );
     455     28083085 :                     assert!(
     456     28083085 :                         blknum == expected_blknum,
     457            0 :                         "ClogSetCommitted record for XID {} with unexpected key {}",
     458              :                         xid,
     459              :                         key
     460              :                     );
     461              : 
     462     28083085 :                     transaction_id_set_status(
     463     28083085 :                         xid,
     464     28083085 :                         pg_constants::TRANSACTION_STATUS_COMMITTED,
     465     28083085 :                         page,
     466     28083085 :                     );
     467              :                 }
     468              : 
     469              :                 // Append the timestamp
     470     27983072 :                 if page.len() == BLCKSZ as usize + 8 {
     471     27979693 :                     page.truncate(BLCKSZ as usize);
     472     27979693 :                 }
     473     27983072 :                 if page.len() == BLCKSZ as usize {
     474     27983072 :                     page.extend_from_slice(&timestamp.to_be_bytes());
     475     27983072 :                 } else {
     476            0 :                     warn!(
     477            0 :                         "CLOG blk {} in seg {} has invalid size {}",
     478            0 :                         blknum,
     479            0 :                         segno,
     480            0 :                         page.len()
     481            0 :                     );
     482              :                 }
     483              :             }
     484         1295 :             NeonWalRecord::ClogSetAborted { xids } => {
     485         1295 :                 let (slru_kind, segno, blknum) =
     486         1295 :                     key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
     487         1295 :                 assert_eq!(
     488              :                     slru_kind,
     489              :                     SlruKind::Clog,
     490            0 :                     "ClogSetAborted record with unexpected key {}",
     491              :                     key
     492              :                 );
     493         2597 :                 for &xid in xids {
     494         1302 :                     let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
     495         1302 :                     let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     496         1302 :                     let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     497         1302 : 
     498         1302 :                     // Check that we're modifying the correct CLOG block.
     499         1302 :                     assert!(
     500         1302 :                         segno == expected_segno,
     501            0 :                         "ClogSetAborted record for XID {} with unexpected key {}",
     502              :                         xid,
     503              :                         key
     504              :                     );
     505         1302 :                     assert!(
     506         1302 :                         blknum == expected_blknum,
     507            0 :                         "ClogSetAborted record for XID {} with unexpected key {}",
     508              :                         xid,
     509              :                         key
     510              :                     );
     511              : 
     512         1302 :                     transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
     513              :                 }
     514              :             }
     515        47662 :             NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
     516        47662 :                 let (slru_kind, segno, blknum) =
     517        47662 :                     key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
     518        47662 :                 assert_eq!(
     519              :                     slru_kind,
     520              :                     SlruKind::MultiXactOffsets,
     521            0 :                     "MultixactOffsetCreate record with unexpected key {}",
     522              :                     key
     523              :                 );
     524              :                 // Compute the block and offset to modify.
     525              :                 // See RecordNewMultiXact in PostgreSQL sources.
     526        47662 :                 let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     527        47662 :                 let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     528        47662 :                 let offset = (entryno * 4) as usize;
     529        47662 : 
     530        47662 :                 // Check that we're modifying the correct multixact-offsets block.
     531        47662 :                 let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     532        47662 :                 let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     533        47662 :                 assert!(
     534        47662 :                     segno == expected_segno,
     535            0 :                     "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
     536              :                     mid,
     537              :                     key
     538              :                 );
     539        47662 :                 assert!(
     540        47662 :                     blknum == expected_blknum,
     541            0 :                     "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
     542              :                     mid,
     543              :                     key
     544              :                 );
     545              : 
     546        47662 :                 LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
     547              :             }
     548        48210 :             NeonWalRecord::MultixactMembersCreate { moff, members } => {
     549        48210 :                 let (slru_kind, segno, blknum) =
     550        48210 :                     key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?;
     551        48210 :                 assert_eq!(
     552              :                     slru_kind,
     553              :                     SlruKind::MultiXactMembers,
     554            0 :                     "MultixactMembersCreate record with unexpected key {}",
     555              :                     key
     556              :                 );
     557       945038 :                 for (i, member) in members.iter().enumerate() {
     558       945038 :                     let offset = moff + i as u32;
     559       945038 : 
     560       945038 :                     // Compute the block and offset to modify.
     561       945038 :                     // See RecordNewMultiXact in PostgreSQL sources.
     562       945038 :                     let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
     563       945038 :                     let memberoff = mx_offset_to_member_offset(offset);
     564       945038 :                     let flagsoff = mx_offset_to_flags_offset(offset);
     565       945038 :                     let bshift = mx_offset_to_flags_bitshift(offset);
     566       945038 : 
     567       945038 :                     // Check that we're modifying the correct multixact-members block.
     568       945038 :                     let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     569       945038 :                     let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     570       945038 :                     assert!(
     571       945038 :                         segno == expected_segno,
     572            0 :                         "MultiXactMembersCreate record for offset {} with unexpected key {}",
     573              :                         moff,
     574              :                         key
     575              :                     );
     576       945038 :                     assert!(
     577       945038 :                         blknum == expected_blknum,
     578            0 :                         "MultiXactMembersCreate record for offset {} with unexpected key {}",
     579              :                         moff,
     580              :                         key
     581              :                     );
     582              : 
     583       945038 :                     let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
     584       945038 :                     flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
     585       945038 :                     flagsval |= member.status << bshift;
     586       945038 :                     LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
     587       945038 :                     LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
     588              :                 }
     589              :             }
     590              :         }
     591              : 
     592     28083469 :         Ok(())
     593     28083469 :     }
     594              : }
     595              : 
     596              : ///
     597              : /// Command with ability not to give all file descriptors to child process
     598              : ///
     599              : trait CloseFileDescriptors: CommandExt {
     600              :     ///
     601              :     /// Close file descriptors (other than stdin, stdout, stderr) in child process
     602              :     ///
     603              :     fn close_fds(&mut self) -> &mut Command;
     604              : }
     605              : 
     606              : impl<C: CommandExt> CloseFileDescriptors for C {
     607          402 :     fn close_fds(&mut self) -> &mut Command {
     608          402 :         unsafe {
     609          402 :             self.pre_exec(move || {
     610            0 :                 // SAFETY: Code executed inside pre_exec should have async-signal-safety,
     611            0 :                 // which means it should be safe to execute inside a signal handler.
     612            0 :                 // The precise meaning depends on platform. See `man signal-safety`
     613            0 :                 // for the linux definition.
     614            0 :                 //
     615            0 :                 // The set_fds_cloexec_threadsafe function is documented to be
     616            0 :                 // async-signal-safe.
     617            0 :                 //
     618            0 :                 // Aside from this function, the rest of the code is re-entrant and
     619            0 :                 // doesn't make any syscalls. We're just passing constants.
     620            0 :                 //
     621            0 :                 // NOTE: It's easy to indirectly cause a malloc or lock a mutex,
     622            0 :                 // which is not async-signal-safe. Be careful.
     623            0 :                 close_fds::set_fds_cloexec_threadsafe(3, &[]);
     624            0 :                 Ok(())
     625          402 :             })
     626          402 :         }
     627          402 :     }
     628              : }
     629              : 
     630              : impl PostgresRedoManager {
     631              :     //
     632              :     // Start postgres binary in special WAL redo mode.
     633              :     //
     634          402 :     #[instrument(skip_all,fields(tenant_id=%self.tenant_id, pg_version=pg_version))]
     635              :     fn launch(
     636              :         &self,
     637              :         input: &mut MutexGuard<Option<ProcessInput>>,
     638              :         pg_version: u32,
     639              :     ) -> Result<(), Error> {
     640              :         // Previous versions of wal-redo required data directory and that directories
     641              :         // occupied some space on disk. Remove it if we face it.
     642              :         //
     643              :         // This code could be dropped after one release cycle.
     644              :         let legacy_datadir = path_with_suffix_extension(
     645              :             self.conf
     646              :                 .tenant_path(&self.tenant_id)
     647              :                 .join("wal-redo-datadir"),
     648              :             TEMP_FILE_SUFFIX,
     649              :         );
     650              :         if legacy_datadir.exists() {
     651            0 :             info!("legacy wal-redo datadir {legacy_datadir:?} exists, removing");
     652            0 :             fs::remove_dir_all(&legacy_datadir).map_err(|e| {
     653            0 :                 Error::new(
     654            0 :                     e.kind(),
     655            0 :                     format!("legacy wal-redo datadir {legacy_datadir:?} removal failure: {e}"),
     656            0 :                 )
     657            0 :             })?;
     658              :         }
     659              : 
     660              :         let pg_bin_dir_path = self
     661              :             .conf
     662              :             .pg_bin_dir(pg_version)
     663            0 :             .map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_bin_dir path: {e}")))?;
     664              :         let pg_lib_dir_path = self
     665              :             .conf
     666              :             .pg_lib_dir(pg_version)
     667            0 :             .map_err(|e| Error::new(ErrorKind::Other, format!("incorrect pg_lib_dir path: {e}")))?;
     668              : 
     669              :         // Start postgres itself
     670              :         let child = Command::new(pg_bin_dir_path.join("postgres"))
     671              :             .arg("--wal-redo")
     672              :             .stdin(Stdio::piped())
     673              :             .stderr(Stdio::piped())
     674              :             .stdout(Stdio::piped())
     675              :             .env_clear()
     676              :             .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
     677              :             .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
     678              :             // The redo process is not trusted, and runs in seccomp mode that
     679              :             // doesn't allow it to open any files. We have to also make sure it
     680              :             // doesn't inherit any file descriptors from the pageserver, that
     681              :             // would allow an attacker to read any files that happen to be open
     682              :             // in the pageserver.
     683              :             //
     684              :             // The Rust standard library makes sure to mark any file descriptors with
     685              :             // as close-on-exec by default, but that's not enough, since we use
     686              :             // libraries that directly call libc open without setting that flag.
     687              :             .close_fds()
     688              :             .spawn_no_leak_child(self.tenant_id)
     689            0 :             .map_err(|e| {
     690            0 :                 Error::new(
     691            0 :                     e.kind(),
     692            0 :                     format!("postgres --wal-redo command failed to start: {}", e),
     693            0 :                 )
     694            0 :             })?;
     695              : 
     696            0 :         let mut child = scopeguard::guard(child, |child| {
     697            0 :             error!("killing wal-redo-postgres process due to a problem during launch");
     698            0 :             child.kill_and_wait();
     699            0 :         });
     700              : 
     701              :         let stdin = child.stdin.take().unwrap();
     702              :         let stdout = child.stdout.take().unwrap();
     703              :         let stderr = child.stderr.take().unwrap();
     704              : 
     705              :         macro_rules! set_nonblock_or_log_err {
     706              :             ($file:ident) => {{
     707              :                 let res = set_nonblock($file.as_raw_fd());
     708              :                 if let Err(e) = &res {
     709              :                     error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
     710              :                 }
     711              :                 res
     712              :             }};
     713              :         }
     714            0 :         set_nonblock_or_log_err!(stdin)?;
     715            0 :         set_nonblock_or_log_err!(stdout)?;
     716            0 :         set_nonblock_or_log_err!(stderr)?;
     717              : 
     718              :         // all fallible operations post-spawn are complete, so get rid of the guard
     719              :         let child = scopeguard::ScopeGuard::into_inner(child);
     720              : 
     721              :         **input = Some(ProcessInput {
     722              :             child,
     723              :             stdout_fd: stdout.as_raw_fd(),
     724              :             stderr_fd: stderr.as_raw_fd(),
     725              :             stdin,
     726              :             n_requests: 0,
     727              :         });
     728              : 
     729              :         *self.stdout.lock().unwrap() = Some(ProcessOutput {
     730              :             stdout,
     731              :             pending_responses: VecDeque::new(),
     732              :             n_processed_responses: 0,
     733              :         });
     734              :         *self.stderr.lock().unwrap() = Some(stderr);
     735              : 
     736              :         Ok(())
     737              :     }
     738              : 
     739              :     // Apply given WAL records ('records') over an old page image. Returns
     740              :     // new page image.
     741              :     //
     742      2758027 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%input.as_ref().unwrap().child.id()))]
     743              :     fn apply_wal_records(
     744              :         &self,
     745              :         mut input: MutexGuard<Option<ProcessInput>>,
     746              :         tag: BufferTag,
     747              :         base_img: &Option<Bytes>,
     748              :         records: &[(Lsn, NeonWalRecord)],
     749              :         wal_redo_timeout: Duration,
     750              :     ) -> Result<Bytes, std::io::Error> {
     751              :         // Serialize all the messages to send the WAL redo process first.
     752              :         //
     753              :         // This could be problematic if there are millions of records to replay,
     754              :         // but in practice the number of records is usually so small that it doesn't
     755              :         // matter, and it's better to keep this code simple.
     756              :         //
     757              :         // Most requests start with a before-image with BLCKSZ bytes, followed by
     758              :         // by some other WAL records. Start with a buffer that can hold that
     759              :         // comfortably.
     760              :         let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
     761              :         build_begin_redo_for_block_msg(tag, &mut writebuf);
     762              :         if let Some(img) = base_img {
     763              :             build_push_page_msg(tag, img, &mut writebuf);
     764              :         }
     765              :         for (lsn, rec) in records.iter() {
     766              :             if let NeonWalRecord::Postgres {
     767              :                 will_init: _,
     768              :                 rec: postgres_rec,
     769              :             } = rec
     770              :             {
     771              :                 build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
     772              :             } else {
     773              :                 return Err(Error::new(
     774              :                     ErrorKind::Other,
     775              :                     "tried to pass neon wal record to postgres WAL redo",
     776              :                 ));
     777              :             }
     778              :         }
     779              :         build_get_page_msg(tag, &mut writebuf);
     780              :         WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
     781              : 
     782              :         let proc = input.as_mut().unwrap();
     783              :         let mut nwrite = 0usize;
     784              :         let stdout_fd = proc.stdout_fd;
     785              : 
     786              :         // Prepare for calling poll()
     787              :         let mut pollfds = [
     788              :             PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT),
     789              :             PollFd::new(proc.stderr_fd, PollFlags::POLLIN),
     790              :             PollFd::new(stdout_fd, PollFlags::POLLIN),
     791              :         ];
     792              : 
     793              :         // We do two things simultaneously: send the old base image and WAL records to
     794              :         // the child process's stdin and forward any logging
     795              :         // information that the child writes to its stderr to the page server's log.
     796              :         while nwrite < writebuf.len() {
     797              :             let n = loop {
     798              :                 match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) {
     799              :                     Err(e) if e == nix::errno::Errno::EINTR => continue,
     800              :                     res => break res,
     801              :                 }
     802              :             }?;
     803              : 
     804              :             if n == 0 {
     805              :                 return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
     806              :             }
     807              : 
     808              :             // If we have some messages in stderr, forward them to the log.
     809              :             let err_revents = pollfds[1].revents().unwrap();
     810              :             if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
     811              :                 let mut errbuf: [u8; 16384] = [0; 16384];
     812              :                 let mut stderr_guard = self.stderr.lock().unwrap();
     813              :                 let stderr = stderr_guard.as_mut().unwrap();
     814              :                 let len = stderr.read(&mut errbuf)?;
     815              : 
     816              :                 // The message might not be split correctly into lines here. But this is
     817              :                 // good enough, the important thing is to get the message to the log.
     818              :                 if len > 0 {
     819            0 :                     error!(
     820            0 :                         "wal-redo-postgres: {}",
     821            0 :                         String::from_utf8_lossy(&errbuf[0..len])
     822            0 :                     );
     823              : 
     824              :                     // To make sure we capture all log from the process if it fails, keep
     825              :                     // reading from the stderr, before checking the stdout.
     826              :                     continue;
     827              :                 }
     828              :             } else if err_revents.contains(PollFlags::POLLHUP) {
     829              :                 return Err(Error::new(
     830              :                     ErrorKind::BrokenPipe,
     831              :                     "WAL redo process closed its stderr unexpectedly",
     832              :                 ));
     833              :             }
     834              : 
     835              :             // If 'stdin' is writeable, do write.
     836              :             let in_revents = pollfds[0].revents().unwrap();
     837              :             if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
     838              :                 nwrite += proc.stdin.write(&writebuf[nwrite..])?;
     839              :             } else if in_revents.contains(PollFlags::POLLHUP) {
     840              :                 // We still have more data to write, but the process closed the pipe.
     841              :                 return Err(Error::new(
     842              :                     ErrorKind::BrokenPipe,
     843              :                     "WAL redo process closed its stdin unexpectedly",
     844              :                 ));
     845              :             }
     846              :         }
     847              :         let request_no = proc.n_requests;
     848              :         proc.n_requests += 1;
     849              :         drop(input);
     850              : 
     851              :         // To improve walredo performance we separate sending requests and receiving
     852              :         // responses. Them are protected by different mutexes (output and input).
     853              :         // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
     854              :         // then there is not warranty that T1 will first granted output mutex lock.
     855              :         // To address this issue we maintain number of sent requests, number of processed
     856              :         // responses and ring buffer with pending responses. After sending response
     857              :         // (under input mutex), threads remembers request number. Then it releases
     858              :         // input mutex, locks output mutex and fetch in ring buffer all responses until
     859              :         // its stored request number. The it takes correspondent element from
     860              :         // pending responses ring buffer and truncate all empty elements from the front,
     861              :         // advancing processed responses number.
     862              : 
     863              :         let mut output_guard = self.stdout.lock().unwrap();
     864              :         let output = output_guard.as_mut().unwrap();
     865              :         if output.stdout.as_raw_fd() != stdout_fd {
     866              :             // If stdout file descriptor is changed then it means that walredo process is crashed and restarted.
     867              :             // As far as ProcessInput and ProcessOutout are protected by different mutexes,
     868              :             // it can happen that we send request to one process and waiting response from another.
     869              :             // To prevent such situation we compare stdout file descriptors.
     870              :             // As far as old stdout pipe is destroyed only after new one is created,
     871              :             // it can not reuse the same file descriptor, so this check is safe.
     872              :             //
     873              :             // Cross-read this with the comment in apply_batch_postgres if result.is_err().
     874              :             // That's where we kill the child process.
     875              :             return Err(Error::new(
     876              :                 ErrorKind::BrokenPipe,
     877              :                 "WAL redo process closed its stdout unexpectedly",
     878              :             ));
     879              :         }
     880              :         let n_processed_responses = output.n_processed_responses;
     881              :         while n_processed_responses + output.pending_responses.len() <= request_no {
     882              :             // We expect the WAL redo process to respond with an 8k page image. We read it
     883              :             // into this buffer.
     884              :             let mut resultbuf = vec![0; BLCKSZ.into()];
     885              :             let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
     886              :             while nresult < BLCKSZ.into() {
     887              :                 // We do two things simultaneously: reading response from stdout
     888              :                 // and forward any logging information that the child writes to its stderr to the page server's log.
     889              :                 let n = loop {
     890              :                     match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) {
     891              :                         Err(e) if e == nix::errno::Errno::EINTR => continue,
     892              :                         res => break res,
     893              :                     }
     894              :                 }?;
     895              : 
     896              :                 if n == 0 {
     897              :                     return Err(Error::new(ErrorKind::Other, "WAL redo timed out"));
     898              :                 }
     899              : 
     900              :                 // If we have some messages in stderr, forward them to the log.
     901              :                 let err_revents = pollfds[1].revents().unwrap();
     902              :                 if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
     903              :                     let mut errbuf: [u8; 16384] = [0; 16384];
     904              :                     let mut stderr_guard = self.stderr.lock().unwrap();
     905              :                     let stderr = stderr_guard.as_mut().unwrap();
     906              :                     let len = stderr.read(&mut errbuf)?;
     907              : 
     908              :                     // The message might not be split correctly into lines here. But this is
     909              :                     // good enough, the important thing is to get the message to the log.
     910              :                     if len > 0 {
     911            2 :                         error!(
     912            2 :                             "wal-redo-postgres: {}",
     913            2 :                             String::from_utf8_lossy(&errbuf[0..len])
     914            2 :                         );
     915              : 
     916              :                         // To make sure we capture all log from the process if it fails, keep
     917              :                         // reading from the stderr, before checking the stdout.
     918              :                         continue;
     919              :                     }
     920              :                 } else if err_revents.contains(PollFlags::POLLHUP) {
     921              :                     return Err(Error::new(
     922              :                         ErrorKind::BrokenPipe,
     923              :                         "WAL redo process closed its stderr unexpectedly",
     924              :                     ));
     925              :                 }
     926              : 
     927              :                 // If we have some data in stdout, read it to the result buffer.
     928              :                 let out_revents = pollfds[2].revents().unwrap();
     929              :                 if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
     930              :                     nresult += output.stdout.read(&mut resultbuf[nresult..])?;
     931              :                 } else if out_revents.contains(PollFlags::POLLHUP) {
     932              :                     return Err(Error::new(
     933              :                         ErrorKind::BrokenPipe,
     934              :                         "WAL redo process closed its stdout unexpectedly",
     935              :                     ));
     936              :                 }
     937              :             }
     938              :             output
     939              :                 .pending_responses
     940              :                 .push_back(Some(Bytes::from(resultbuf)));
     941              :         }
     942              :         // Replace our request's response with None in `pending_responses`.
     943              :         // Then make space in the ring buffer by clearing out any seqence of contiguous
     944              :         // `None`'s from the front of `pending_responses`.
     945              :         // NB: We can't pop_front() because other requests' responses because another
     946              :         // requester might have grabbed the output mutex before us:
     947              :         // T1: grab input mutex
     948              :         // T1: send request_no 23
     949              :         // T1: release input mutex
     950              :         // T2: grab input mutex
     951              :         // T2: send request_no 24
     952              :         // T2: release input mutex
     953              :         // T2: grab output mutex
     954              :         // T2: n_processed_responses + output.pending_responses.len() <= request_no
     955              :         //            23                                0                   24
     956              :         // T2: enters poll loop that reads stdout
     957              :         // T2: put response for 23 into pending_responses
     958              :         // T2: put response for 24 into pending_resposnes
     959              :         // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
     960              :         // T2: takes its response_24
     961              :         // pending_responses now looks like this: Front Some(response_23) None Back
     962              :         // T2: does the while loop below
     963              :         // pending_responses now looks like this: Front Some(response_23) None Back
     964              :         // T2: releases output mutex
     965              :         // T1: grabs output mutex
     966              :         // T1: n_processed_responses + output.pending_responses.len() > request_no
     967              :         //            23                                2                   23
     968              :         // T1: skips poll loop that reads stdout
     969              :         // T1: takes its response_23
     970              :         // pending_responses now looks like this: Front None None Back
     971              :         // T2: does the while loop below
     972              :         // pending_responses now looks like this: Front Back
     973              :         // n_processed_responses now has value 25
     974              :         let res = output.pending_responses[request_no - n_processed_responses]
     975              :             .take()
     976              :             .expect("we own this request_no, nobody else is supposed to take it");
     977              :         while let Some(front) = output.pending_responses.front() {
     978              :             if front.is_none() {
     979              :                 output.pending_responses.pop_front();
     980              :                 output.n_processed_responses += 1;
     981              :             } else {
     982              :                 break;
     983              :             }
     984              :         }
     985              :         Ok(res)
     986              :     }
     987              : }
     988              : 
     989              : /// Wrapper type around `std::process::Child` which guarantees that the child
     990              : /// will be killed and waited-for by this process before being dropped.
     991              : struct NoLeakChild {
     992              :     tenant_id: TenantId,
     993              :     child: Option<Child>,
     994              : }
     995              : 
     996              : impl Deref for NoLeakChild {
     997              :     type Target = Child;
     998              : 
     999      2758027 :     fn deref(&self) -> &Self::Target {
    1000      2758027 :         self.child.as_ref().expect("must not use from drop")
    1001      2758027 :     }
    1002              : }
    1003              : 
    1004              : impl DerefMut for NoLeakChild {
    1005         1206 :     fn deref_mut(&mut self) -> &mut Self::Target {
    1006         1206 :         self.child.as_mut().expect("must not use from drop")
    1007         1206 :     }
    1008              : }
    1009              : 
    1010              : impl NoLeakChild {
    1011          402 :     fn spawn(tenant_id: TenantId, command: &mut Command) -> io::Result<Self> {
    1012          402 :         let child = command.spawn()?;
    1013          402 :         Ok(NoLeakChild {
    1014          402 :             tenant_id,
    1015          402 :             child: Some(child),
    1016          402 :         })
    1017          402 :     }
    1018              : 
    1019            0 :     fn kill_and_wait(mut self) {
    1020            0 :         let child = match self.child.take() {
    1021            0 :             Some(child) => child,
    1022            0 :             None => return,
    1023              :         };
    1024            0 :         Self::kill_and_wait_impl(child);
    1025            0 :     }
    1026              : 
    1027           65 :     #[instrument(skip_all, fields(pid=child.id()))]
    1028              :     fn kill_and_wait_impl(mut child: Child) {
    1029              :         let res = child.kill();
    1030              :         if let Err(e) = res {
    1031              :             // This branch is very unlikely because:
    1032              :             // - We (= pageserver) spawned this process successfully, so, we're allowed to kill it.
    1033              :             // - This is the only place that calls .kill()
    1034              :             // - We consume `self`, so, .kill() can't be called twice.
    1035              :             // - If the process exited by itself or was killed by someone else,
    1036              :             //   .kill() will still succeed because we haven't wait()'ed yet.
    1037              :             //
    1038              :             // So, if we arrive here, we have really no idea what happened,
    1039              :             // whether the PID stored in self.child is still valid, etc.
    1040              :             // If this function were fallible, we'd return an error, but
    1041              :             // since it isn't, all we can do is log an error and proceed
    1042              :             // with the wait().
    1043            0 :             error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
    1044              :         }
    1045              : 
    1046              :         match child.wait() {
    1047              :             Ok(exit_status) => {
    1048           65 :                 info!(exit_status = %exit_status, "wait successful");
    1049              :             }
    1050              :             Err(e) => {
    1051            0 :                 error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)");
    1052              :             }
    1053              :         }
    1054              :     }
    1055              : }
    1056              : 
    1057              : impl Drop for NoLeakChild {
    1058           65 :     fn drop(&mut self) {
    1059           65 :         let child = match self.child.take() {
    1060           65 :             Some(child) => child,
    1061            0 :             None => return,
    1062              :         };
    1063           65 :         let tenant_id = self.tenant_id;
    1064           65 :         // Offload the kill+wait of the child process into the background.
    1065           65 :         // If someone stops the runtime, we'll leak the child process.
    1066           65 :         // We can ignore that case because we only stop the runtime on pageserver exit.
    1067           65 :         BACKGROUND_RUNTIME.spawn(async move {
    1068           65 :             tokio::task::spawn_blocking(move || {
    1069              :                 // Intentionally don't inherit the tracing context from whoever is dropping us.
    1070              :                 // This thread here is going to outlive of our dropper.
    1071           65 :                 let span = tracing::info_span!("walredo", %tenant_id);
    1072           65 :                 let _entered = span.enter();
    1073           65 :                 Self::kill_and_wait_impl(child);
    1074           65 :             })
    1075           65 :             .await
    1076           65 :         });
    1077           65 :     }
    1078              : }
    1079              : 
    1080              : trait NoLeakChildCommandExt {
    1081              :     fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result<NoLeakChild>;
    1082              : }
    1083              : 
    1084              : impl NoLeakChildCommandExt for Command {
    1085          402 :     fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result<NoLeakChild> {
    1086          402 :         NoLeakChild::spawn(tenant_id, self)
    1087          402 :     }
    1088              : }
    1089              : 
    1090              : // Functions for constructing messages to send to the postgres WAL redo
    1091              : // process. See pgxn/neon_walredo/walredoproc.c for
    1092              : // explanation of the protocol.
    1093              : 
    1094      2758027 : fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
    1095      2758027 :     let len = 4 + 1 + 4 * 4;
    1096      2758027 : 
    1097      2758027 :     buf.put_u8(b'B');
    1098      2758027 :     buf.put_u32(len as u32);
    1099      2758027 : 
    1100      2758027 :     tag.ser_into(buf)
    1101      2758027 :         .expect("serialize BufferTag should always succeed");
    1102      2758027 : }
    1103              : 
    1104       889355 : fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
    1105       889355 :     assert!(base_img.len() == 8192);
    1106              : 
    1107       889355 :     let len = 4 + 1 + 4 * 4 + base_img.len();
    1108       889355 : 
    1109       889355 :     buf.put_u8(b'P');
    1110       889355 :     buf.put_u32(len as u32);
    1111       889355 :     tag.ser_into(buf)
    1112       889355 :         .expect("serialize BufferTag should always succeed");
    1113       889355 :     buf.put(base_img);
    1114       889355 : }
    1115              : 
    1116    179396673 : fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
    1117    179396673 :     let len = 4 + 8 + rec.len();
    1118    179396673 : 
    1119    179396673 :     buf.put_u8(b'A');
    1120    179396673 :     buf.put_u32(len as u32);
    1121    179396673 :     buf.put_u64(endlsn.0);
    1122    179396673 :     buf.put(rec);
    1123    179396673 : }
    1124              : 
    1125      2758027 : fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
    1126      2758027 :     let len = 4 + 1 + 4 * 4;
    1127      2758027 : 
    1128      2758027 :     buf.put_u8(b'G');
    1129      2758027 :     buf.put_u32(len as u32);
    1130      2758027 :     tag.ser_into(buf)
    1131      2758027 :         .expect("serialize BufferTag should always succeed");
    1132      2758027 : }
    1133              : 
    1134              : #[cfg(test)]
    1135              : mod tests {
    1136              :     use super::{PostgresRedoManager, WalRedoManager};
    1137              :     use crate::repository::Key;
    1138              :     use crate::{config::PageServerConf, walrecord::NeonWalRecord};
    1139              :     use bytes::Bytes;
    1140              :     use std::str::FromStr;
    1141              :     use utils::{id::TenantId, lsn::Lsn};
    1142              : 
    1143            1 :     #[test]
    1144            1 :     fn short_v14_redo() {
    1145            1 :         let expected = std::fs::read("fixtures/short_v14_redo.page").unwrap();
    1146            1 : 
    1147            1 :         let h = RedoHarness::new().unwrap();
    1148            1 : 
    1149            1 :         let page = h
    1150            1 :             .manager
    1151            1 :             .request_redo(
    1152            1 :                 Key {
    1153            1 :                     field1: 0,
    1154            1 :                     field2: 1663,
    1155            1 :                     field3: 13010,
    1156            1 :                     field4: 1259,
    1157            1 :                     field5: 0,
    1158            1 :                     field6: 0,
    1159            1 :                 },
    1160            1 :                 Lsn::from_str("0/16E2408").unwrap(),
    1161            1 :                 None,
    1162            1 :                 short_records(),
    1163            1 :                 14,
    1164            1 :             )
    1165            1 :             .unwrap();
    1166            1 : 
    1167            1 :         assert_eq!(&expected, &*page);
    1168            1 :     }
    1169              : 
    1170            1 :     #[test]
    1171            1 :     fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
    1172            1 :         let h = RedoHarness::new().unwrap();
    1173            1 : 
    1174            1 :         let page = h
    1175            1 :             .manager
    1176            1 :             .request_redo(
    1177            1 :                 Key {
    1178            1 :                     field1: 0,
    1179            1 :                     field2: 1663,
    1180            1 :                     // key should be 13010
    1181            1 :                     field3: 13130,
    1182            1 :                     field4: 1259,
    1183            1 :                     field5: 0,
    1184            1 :                     field6: 0,
    1185            1 :                 },
    1186            1 :                 Lsn::from_str("0/16E2408").unwrap(),
    1187            1 :                 None,
    1188            1 :                 short_records(),
    1189            1 :                 14,
    1190            1 :             )
    1191            1 :             .unwrap();
    1192            1 : 
    1193            1 :         // TODO: there will be some stderr printout, which is forwarded to tracing that could
    1194            1 :         // perhaps be captured as long as it's in the same thread.
    1195            1 :         assert_eq!(page, crate::ZERO_PAGE);
    1196            1 :     }
    1197              : 
    1198              :     #[allow(clippy::octal_escapes)]
    1199            2 :     fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
    1200            2 :         vec![
    1201            2 :             (
    1202            2 :                 Lsn::from_str("0/16A9388").unwrap(),
    1203            2 :                 NeonWalRecord::Postgres {
    1204            2 :                     will_init: true,
    1205            2 :                     rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
    1206            2 :                 }
    1207            2 :             ),
    1208            2 :             (
    1209            2 :                 Lsn::from_str("0/16D4080").unwrap(),
    1210            2 :                 NeonWalRecord::Postgres {
    1211            2 :                     will_init: false,
    1212            2 :                     rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
    1213            2 :                 }
    1214            2 :             )
    1215            2 :         ]
    1216            2 :     }
    1217              : 
    1218              :     struct RedoHarness {
    1219              :         // underscored because unused, except for removal at drop
    1220              :         _repo_dir: tempfile::TempDir,
    1221              :         manager: PostgresRedoManager,
    1222              :     }
    1223              : 
    1224              :     impl RedoHarness {
    1225            2 :         fn new() -> anyhow::Result<Self> {
    1226            2 :             let repo_dir = tempfile::tempdir()?;
    1227            2 :             let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
    1228            2 :             let conf = Box::leak(Box::new(conf));
    1229            2 :             let tenant_id = TenantId::generate();
    1230            2 : 
    1231            2 :             let manager = PostgresRedoManager::new(conf, tenant_id);
    1232            2 : 
    1233            2 :             Ok(RedoHarness {
    1234            2 :                 _repo_dir: repo_dir,
    1235            2 :                 manager,
    1236            2 :             })
    1237            2 :         }
    1238              :     }
    1239              : }
        

Generated by: LCOV version 2.1-beta