LCOV - differential code coverage report
Current view: top level - pageserver/src - walredo.rs (source / functions) Coverage Total Hit UBC GIC CBC ECB
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 76.2 % 673 513 160 513
Current Date: 2023-10-19 02:04:12 Functions: 54.5 % 66 36 30 2 34 2
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //!
       2                 : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
       3                 : //! to apply given WAL records over an old page image and return new
       4                 : //! page image.
       5                 : //!
       6                 : //! We rely on Postgres to perform WAL redo for us. We launch a
       7                 : //! postgres process in special "wal redo" mode that's similar to
       8                 : //! single-user mode. We then pass the previous page image, if any,
       9                 : //! and all the WAL records we want to apply, to the postgres
      10                 : //! process. Then we get the page image back. Communication with the
      11                 : //! postgres process happens via stdin/stdout
      12                 : //!
      13                 : //! See pgxn/neon_walredo/walredoproc.c for the other side of
      14                 : //! this communication.
      15                 : //!
      16                 : //! The Postgres process is assumed to be secure against malicious WAL
      17                 : //! records. It achieves it by dropping privileges before replaying
      18                 : //! any WAL records, so that even if an attacker hijacks the Postgres
      19                 : //! process, he cannot escape out of it.
      20                 : //!
      21                 : use anyhow::Context;
      22                 : use byteorder::{ByteOrder, LittleEndian};
      23                 : use bytes::{BufMut, Bytes, BytesMut};
      24                 : use nix::poll::*;
      25                 : use serde::Serialize;
      26                 : use std::collections::VecDeque;
      27                 : use std::io;
      28                 : use std::io::prelude::*;
      29                 : use std::ops::{Deref, DerefMut};
      30                 : use std::os::unix::io::{AsRawFd, RawFd};
      31                 : use std::os::unix::prelude::CommandExt;
      32                 : use std::process::Stdio;
      33                 : use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
      34                 : use std::sync::{Arc, Mutex, MutexGuard, RwLock};
      35                 : use std::time::Duration;
      36                 : use std::time::Instant;
      37                 : use tracing::*;
      38                 : use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
      39                 : 
      40                 : #[cfg(feature = "testing")]
      41                 : use std::sync::atomic::{AtomicUsize, Ordering};
      42                 : 
      43                 : use crate::config::PageServerConf;
      44                 : use crate::metrics::{
      45                 :     WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
      46                 :     WAL_REDO_WAIT_TIME,
      47                 : };
      48                 : use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
      49                 : use crate::repository::Key;
      50                 : use crate::task_mgr::BACKGROUND_RUNTIME;
      51                 : use crate::walrecord::NeonWalRecord;
      52                 : use pageserver_api::reltag::{RelTag, SlruKind};
      53                 : use postgres_ffi::pg_constants;
      54                 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
      55                 : use postgres_ffi::v14::nonrelfile_utils::{
      56                 :     mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
      57                 :     transaction_id_set_status,
      58                 : };
      59                 : use postgres_ffi::BLCKSZ;
      60                 : 
      61                 : ///
      62                 : /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
      63                 : ///
      64                 : /// In Postgres `BufferTag` structure is used for exactly the same purpose.
      65                 : /// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91).
      66                 : ///
      67 CBC     5003353 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize)]
      68                 : pub(crate) struct BufferTag {
      69                 :     pub rel: RelTag,
      70                 :     pub blknum: u32,
      71                 : }
      72                 : 
      73                 : struct ProcessInput {
      74                 :     stdin: ChildStdin,
      75                 :     stderr_fd: RawFd,
      76                 :     stdout_fd: RawFd,
      77                 :     n_requests: usize,
      78                 : }
      79                 : 
      80                 : struct ProcessOutput {
      81                 :     stdout: ChildStdout,
      82                 :     pending_responses: VecDeque<Option<Bytes>>,
      83                 :     n_processed_responses: usize,
      84                 : }
      85                 : 
      86                 : ///
      87                 : /// This is the real implementation that uses a Postgres process to
      88                 : /// perform WAL replay. Only one thread can use the process at a time,
      89                 : /// that is controlled by the Mutex. In the future, we might want to
      90                 : /// launch a pool of processes to allow concurrent replay of multiple
      91                 : /// records.
      92                 : ///
      93                 : pub struct PostgresRedoManager {
      94                 :     tenant_id: TenantId,
      95                 :     conf: &'static PageServerConf,
      96                 :     redo_process: RwLock<Option<Arc<WalRedoProcess>>>,
      97                 : }
      98                 : 
      99                 : /// Can this request be served by neon redo functions
     100                 : /// or we need to pass it to wal-redo postgres process?
     101       159709630 : fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
     102       159709630 :     // Currently, we don't have bespoken Rust code to replay any
     103       159709630 :     // Postgres WAL records. But everything else is handled in neon.
     104       159709630 :     #[allow(clippy::match_like_matches_macro)]
     105       159709630 :     match rec {
     106                 :         NeonWalRecord::Postgres {
     107                 :             will_init: _,
     108                 :             rec: _,
     109       136659412 :         } => false,
     110        23050218 :         _ => true,
     111                 :     }
     112       159709630 : }
     113                 : 
     114                 : ///
     115                 : /// Public interface of WAL redo manager
     116                 : ///
     117                 : impl PostgresRedoManager {
     118                 :     ///
     119                 :     /// Request the WAL redo manager to apply some WAL records
     120                 :     ///
     121                 :     /// The WAL redo is handled by a separate thread, so this just sends a request
     122                 :     /// to the thread and waits for response.
     123                 :     ///
     124         2255850 :     pub async fn request_redo(
     125         2255850 :         &self,
     126         2255850 :         key: Key,
     127         2255850 :         lsn: Lsn,
     128         2255850 :         base_img: Option<(Lsn, Bytes)>,
     129         2255850 :         records: Vec<(Lsn, NeonWalRecord)>,
     130         2255850 :         pg_version: u32,
     131         2255852 :     ) -> anyhow::Result<Bytes> {
     132         2255852 :         if records.is_empty() {
     133 UBC           0 :             anyhow::bail!("invalid WAL redo request with no records");
     134 CBC     2255852 :         }
     135         2255852 : 
     136         2255852 :         let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
     137         2255852 :         let mut img = base_img.map(|p| p.1);
     138         2255852 :         let mut batch_neon = can_apply_in_neon(&records[0].1);
     139         2255852 :         let mut batch_start = 0;
     140       157453869 :         for (i, record) in records.iter().enumerate().skip(1) {
     141       157453869 :             let rec_neon = can_apply_in_neon(&record.1);
     142       157453869 : 
     143       157453869 :             if rec_neon != batch_neon {
     144              16 :                 let result = if batch_neon {
     145              13 :                     self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
     146                 :                 } else {
     147               3 :                     self.apply_batch_postgres(
     148               3 :                         key,
     149               3 :                         lsn,
     150               3 :                         img,
     151               3 :                         base_img_lsn,
     152               3 :                         &records[batch_start..i],
     153               3 :                         self.conf.wal_redo_timeout,
     154               3 :                         pg_version,
     155               3 :                     )
     156                 :                 };
     157              16 :                 img = Some(result?);
     158                 : 
     159              16 :                 batch_neon = rec_neon;
     160              16 :                 batch_start = i;
     161       157453853 :             }
     162                 :         }
     163                 :         // last batch
     164         2255852 :         if batch_neon {
     165            4038 :             self.apply_batch_neon(key, lsn, img, &records[batch_start..])
     166                 :         } else {
     167         2251814 :             self.apply_batch_postgres(
     168         2251814 :                 key,
     169         2251814 :                 lsn,
     170         2251814 :                 img,
     171         2251814 :                 base_img_lsn,
     172         2251814 :                 &records[batch_start..],
     173         2251814 :                 self.conf.wal_redo_timeout,
     174         2251814 :                 pg_version,
     175         2251814 :             )
     176                 :         }
     177         2255852 :     }
     178                 : }
     179                 : 
     180                 : impl PostgresRedoManager {
     181                 :     ///
     182                 :     /// Create a new PostgresRedoManager.
     183                 :     ///
     184             708 :     pub fn new(conf: &'static PageServerConf, tenant_id: TenantId) -> PostgresRedoManager {
     185             708 :         // The actual process is launched lazily, on first request.
     186             708 :         PostgresRedoManager {
     187             708 :             tenant_id,
     188             708 :             conf,
     189             708 :             redo_process: RwLock::new(None),
     190             708 :         }
     191             708 :     }
     192                 : 
     193                 :     ///
     194                 :     /// Process one request for WAL redo using wal-redo postgres
     195                 :     ///
     196                 :     #[allow(clippy::too_many_arguments)]
     197         2251815 :     fn apply_batch_postgres(
     198         2251815 :         &self,
     199         2251815 :         key: Key,
     200         2251815 :         lsn: Lsn,
     201         2251815 :         base_img: Option<Bytes>,
     202         2251815 :         base_img_lsn: Lsn,
     203         2251815 :         records: &[(Lsn, NeonWalRecord)],
     204         2251815 :         wal_redo_timeout: Duration,
     205         2251815 :         pg_version: u32,
     206         2251815 :     ) -> anyhow::Result<Bytes> {
     207         2251815 :         let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
     208                 :         const MAX_RETRY_ATTEMPTS: u32 = 1;
     209         2251815 :         let start_time = Instant::now();
     210         2251815 :         let mut n_attempts = 0u32;
     211         2251815 :         loop {
     212         2251815 :             let lock_time = Instant::now();
     213                 : 
     214                 :             // launch the WAL redo process on first use
     215         2251815 :             let proc: Arc<WalRedoProcess> = {
     216         2251815 :                 let proc_guard = self.redo_process.read().unwrap();
     217         2251815 :                 match &*proc_guard {
     218                 :                     None => {
     219                 :                         // "upgrade" to write lock to launch the process
     220             386 :                         drop(proc_guard);
     221             386 :                         let mut proc_guard = self.redo_process.write().unwrap();
     222             386 :                         match &*proc_guard {
     223                 :                             None => {
     224             386 :                                 let proc = Arc::new(
     225             386 :                                     WalRedoProcess::launch(self.conf, self.tenant_id, pg_version)
     226             386 :                                         .context("launch walredo process")?,
     227                 :                                 );
     228             386 :                                 *proc_guard = Some(Arc::clone(&proc));
     229             386 :                                 proc
     230                 :                             }
     231 UBC           0 :                             Some(proc) => Arc::clone(proc),
     232                 :                         }
     233                 :                     }
     234 CBC     2251429 :                     Some(proc) => Arc::clone(proc),
     235                 :                 }
     236                 :             };
     237                 : 
     238         2251815 :             WAL_REDO_WAIT_TIME.observe(lock_time.duration_since(start_time).as_secs_f64());
     239         2251815 : 
     240         2251815 :             // Relational WAL records are applied using wal-redo-postgres
     241         2251815 :             let buf_tag = BufferTag { rel, blknum };
     242         2251815 :             let result = proc
     243         2251815 :                 .apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
     244         2251815 :                 .context("apply_wal_records");
     245         2251815 : 
     246         2251815 :             let end_time = Instant::now();
     247         2251815 :             let duration = end_time.duration_since(lock_time);
     248         2251815 : 
     249         2251815 :             let len = records.len();
     250       136658410 :             let nbytes = records.iter().fold(0, |acumulator, record| {
     251       136658410 :                 acumulator
     252       136658410 :                     + match &record.1 {
     253       136658410 :                         NeonWalRecord::Postgres { rec, .. } => rec.len(),
     254 UBC           0 :                         _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
     255                 :                     }
     256 CBC   136658410 :             });
     257         2251815 : 
     258         2251815 :             WAL_REDO_TIME.observe(duration.as_secs_f64());
     259         2251815 :             WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
     260         2251815 :             WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
     261         2251815 : 
     262         2251815 :             debug!(
     263 UBC           0 :                 "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
     264               0 :                 len,
     265               0 :                 nbytes,
     266               0 :                 duration.as_micros(),
     267               0 :                 lsn
     268               0 :             );
     269                 : 
     270                 :             // If something went wrong, don't try to reuse the process. Kill it, and
     271                 :             // next request will launch a new one.
     272 CBC     2251813 :             if let Err(e) = result.as_ref() {
     273 UBC           0 :                 error!(
     274               0 :                     "error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
     275               0 :                     records.len(),
     276               0 :                     records.first().map(|p| p.0).unwrap_or(Lsn(0)),
     277               0 :                     records.last().map(|p| p.0).unwrap_or(Lsn(0)),
     278               0 :                     nbytes,
     279               0 :                     base_img_lsn,
     280               0 :                     lsn,
     281               0 :                     n_attempts,
     282               0 :                     e,
     283               0 :                 );
     284                 :                 // Avoid concurrent callers hitting the same issue.
     285                 :                 // We can't prevent it from happening because we want to enable parallelism.
     286               0 :                 let mut guard = self.redo_process.write().unwrap();
     287               0 :                 match &*guard {
     288               0 :                     Some(current_field_value) => {
     289               0 :                         if Arc::ptr_eq(current_field_value, &proc) {
     290               0 :                             // We're the first to observe an error from `proc`, it's our job to take it out of rotation.
     291               0 :                             *guard = None;
     292               0 :                         }
     293                 :                     }
     294               0 :                     None => {
     295               0 :                         // Another thread was faster to observe the error, and already took the process out of rotation.
     296               0 :                     }
     297                 :                 }
     298               0 :                 drop(guard);
     299               0 :                 // NB: there may still be other concurrent threads using `proc`.
     300               0 :                 // The last one will send SIGKILL when the underlying Arc reaches refcount 0.
     301               0 :                 // NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep
     302               0 :                 // holding the lock while waiting for the process to exit.
     303               0 :                 // NB: the drop impl blocks the current threads with a wait() system call for
     304               0 :                 // the child process. We dropped the `guard` above so that other threads aren't
     305               0 :                 // affected. But, it's good that the current thread _does_ block to wait.
     306               0 :                 // If we instead deferred the waiting into the background / to tokio, it could
     307               0 :                 // happen that if walredo always fails immediately, we spawn processes faster
     308               0 :                 // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
     309               0 :                 // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
     310               0 :                 // This probably needs revisiting at some later point.
     311               0 :                 drop(proc);
     312 CBC     2251813 :             } else if n_attempts != 0 {
     313 UBC           0 :                 info!(n_attempts, "retried walredo succeeded");
     314 CBC     2251813 :             }
     315         2251813 :             n_attempts += 1;
     316         2251813 :             if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
     317         2251813 :                 return result;
     318 UBC           0 :             }
     319                 :         }
     320 CBC     2251813 :     }
     321                 : 
     322                 :     ///
     323                 :     /// Process a batch of WAL records using bespoken Neon code.
     324                 :     ///
     325            4051 :     fn apply_batch_neon(
     326            4051 :         &self,
     327            4051 :         key: Key,
     328            4051 :         lsn: Lsn,
     329            4051 :         base_img: Option<Bytes>,
     330            4051 :         records: &[(Lsn, NeonWalRecord)],
     331            4051 :     ) -> anyhow::Result<Bytes> {
     332            4051 :         let start_time = Instant::now();
     333            4051 : 
     334            4051 :         let mut page = BytesMut::new();
     335            4051 :         if let Some(fpi) = base_img {
     336            4051 :             // If full-page image is provided, then use it...
     337            4051 :             page.extend_from_slice(&fpi[..]);
     338            4051 :         } else {
     339                 :             // All the current WAL record types that we can handle require a base image.
     340 UBC           0 :             anyhow::bail!("invalid neon WAL redo request with no base image");
     341                 :         }
     342                 : 
     343                 :         // Apply all the WAL records in the batch
     344 CBC    23050218 :         for (record_lsn, record) in records.iter() {
     345        23050218 :             self.apply_record_neon(key, &mut page, *record_lsn, record)?;
     346                 :         }
     347                 :         // Success!
     348            4051 :         let end_time = Instant::now();
     349            4051 :         let duration = end_time.duration_since(start_time);
     350            4051 :         WAL_REDO_TIME.observe(duration.as_secs_f64());
     351            4051 : 
     352            4051 :         debug!(
     353 UBC           0 :             "neon applied {} WAL records in {} ms to reconstruct page image at LSN {}",
     354               0 :             records.len(),
     355               0 :             duration.as_micros(),
     356               0 :             lsn
     357               0 :         );
     358                 : 
     359 CBC        4051 :         Ok(page.freeze())
     360            4051 :     }
     361                 : 
     362        23050218 :     fn apply_record_neon(
     363        23050218 :         &self,
     364        23050218 :         key: Key,
     365        23050218 :         page: &mut BytesMut,
     366        23050218 :         _record_lsn: Lsn,
     367        23050218 :         record: &NeonWalRecord,
     368        23050218 :     ) -> anyhow::Result<()> {
     369        23050218 :         match record {
     370                 :             NeonWalRecord::Postgres {
     371                 :                 will_init: _,
     372                 :                 rec: _,
     373                 :             } => {
     374 UBC           0 :                 anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
     375                 :             }
     376                 :             NeonWalRecord::ClearVisibilityMapFlags {
     377 CBC       10637 :                 new_heap_blkno,
     378           10637 :                 old_heap_blkno,
     379           10637 :                 flags,
     380                 :             } => {
     381                 :                 // sanity check that this is modifying the correct relation
     382           10637 :                 let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
     383           10637 :                 assert!(
     384           10637 :                     rel.forknum == VISIBILITYMAP_FORKNUM,
     385 UBC           0 :                     "ClearVisibilityMapFlags record on unexpected rel {}",
     386                 :                     rel
     387                 :                 );
     388 CBC       10637 :                 if let Some(heap_blkno) = *new_heap_blkno {
     389                 :                     // Calculate the VM block and offset that corresponds to the heap block.
     390             676 :                     let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
     391             676 :                     let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
     392             676 :                     let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
     393             676 : 
     394             676 :                     // Check that we're modifying the correct VM block.
     395             676 :                     assert!(map_block == blknum);
     396                 : 
     397                 :                     // equivalent to PageGetContents(page)
     398             676 :                     let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
     399             676 : 
     400             676 :                     map[map_byte as usize] &= !(flags << map_offset);
     401            9961 :                 }
     402                 : 
     403                 :                 // Repeat for 'old_heap_blkno', if any
     404           10637 :                 if let Some(heap_blkno) = *old_heap_blkno {
     405            9965 :                     let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
     406            9965 :                     let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
     407            9965 :                     let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
     408            9965 : 
     409            9965 :                     assert!(map_block == blknum);
     410                 : 
     411            9965 :                     let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
     412            9965 : 
     413            9965 :                     map[map_byte as usize] &= !(flags << map_offset);
     414             672 :                 }
     415                 :             }
     416                 :             // Non-relational WAL records are handled here, with custom code that has the
     417                 :             // same effects as the corresponding Postgres WAL redo function.
     418        22942379 :             NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
     419        22942379 :                 let (slru_kind, segno, blknum) =
     420        22942379 :                     key_to_slru_block(key).context("invalid record")?;
     421        22942379 :                 assert_eq!(
     422                 :                     slru_kind,
     423                 :                     SlruKind::Clog,
     424 UBC           0 :                     "ClogSetCommitted record with unexpected key {}",
     425                 :                     key
     426                 :                 );
     427 CBC    45984771 :                 for &xid in xids {
     428        23042392 :                     let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
     429        23042392 :                     let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     430        23042392 :                     let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     431        23042392 : 
     432        23042392 :                     // Check that we're modifying the correct CLOG block.
     433        23042392 :                     assert!(
     434        23042392 :                         segno == expected_segno,
     435 UBC           0 :                         "ClogSetCommitted record for XID {} with unexpected key {}",
     436                 :                         xid,
     437                 :                         key
     438                 :                     );
     439 CBC    23042392 :                     assert!(
     440        23042392 :                         blknum == expected_blknum,
     441 UBC           0 :                         "ClogSetCommitted record for XID {} with unexpected key {}",
     442                 :                         xid,
     443                 :                         key
     444                 :                     );
     445                 : 
     446 CBC    23042392 :                     transaction_id_set_status(
     447        23042392 :                         xid,
     448        23042392 :                         pg_constants::TRANSACTION_STATUS_COMMITTED,
     449        23042392 :                         page,
     450        23042392 :                     );
     451                 :                 }
     452                 : 
     453                 :                 // Append the timestamp
     454        22942379 :                 if page.len() == BLCKSZ as usize + 8 {
     455        22939144 :                     page.truncate(BLCKSZ as usize);
     456        22939144 :                 }
     457        22942379 :                 if page.len() == BLCKSZ as usize {
     458        22942379 :                     page.extend_from_slice(&timestamp.to_be_bytes());
     459        22942379 :                 } else {
     460 UBC           0 :                     warn!(
     461               0 :                         "CLOG blk {} in seg {} has invalid size {}",
     462               0 :                         blknum,
     463               0 :                         segno,
     464               0 :                         page.len()
     465               0 :                     );
     466                 :                 }
     467                 :             }
     468 CBC        1330 :             NeonWalRecord::ClogSetAborted { xids } => {
     469            1330 :                 let (slru_kind, segno, blknum) =
     470            1330 :                     key_to_slru_block(key).context("invalid record")?;
     471            1330 :                 assert_eq!(
     472                 :                     slru_kind,
     473                 :                     SlruKind::Clog,
     474 UBC           0 :                     "ClogSetAborted record with unexpected key {}",
     475                 :                     key
     476                 :                 );
     477 CBC        2667 :                 for &xid in xids {
     478            1337 :                     let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
     479            1337 :                     let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     480            1337 :                     let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     481            1337 : 
     482            1337 :                     // Check that we're modifying the correct CLOG block.
     483            1337 :                     assert!(
     484            1337 :                         segno == expected_segno,
     485 UBC           0 :                         "ClogSetAborted record for XID {} with unexpected key {}",
     486                 :                         xid,
     487                 :                         key
     488                 :                     );
     489 CBC        1337 :                     assert!(
     490            1337 :                         blknum == expected_blknum,
     491 UBC           0 :                         "ClogSetAborted record for XID {} with unexpected key {}",
     492                 :                         xid,
     493                 :                         key
     494                 :                     );
     495                 : 
     496 CBC        1337 :                     transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
     497                 :                 }
     498                 :             }
     499           47662 :             NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
     500           47662 :                 let (slru_kind, segno, blknum) =
     501           47662 :                     key_to_slru_block(key).context("invalid record")?;
     502           47662 :                 assert_eq!(
     503                 :                     slru_kind,
     504                 :                     SlruKind::MultiXactOffsets,
     505 UBC           0 :                     "MultixactOffsetCreate record with unexpected key {}",
     506                 :                     key
     507                 :                 );
     508                 :                 // Compute the block and offset to modify.
     509                 :                 // See RecordNewMultiXact in PostgreSQL sources.
     510 CBC       47662 :                 let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     511           47662 :                 let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     512           47662 :                 let offset = (entryno * 4) as usize;
     513           47662 : 
     514           47662 :                 // Check that we're modifying the correct multixact-offsets block.
     515           47662 :                 let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     516           47662 :                 let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     517           47662 :                 assert!(
     518           47662 :                     segno == expected_segno,
     519 UBC           0 :                     "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
     520                 :                     mid,
     521                 :                     key
     522                 :                 );
     523 CBC       47662 :                 assert!(
     524           47662 :                     blknum == expected_blknum,
     525 UBC           0 :                     "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
     526                 :                     mid,
     527                 :                     key
     528                 :                 );
     529                 : 
     530 CBC       47662 :                 LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
     531                 :             }
     532           48210 :             NeonWalRecord::MultixactMembersCreate { moff, members } => {
     533           48210 :                 let (slru_kind, segno, blknum) =
     534           48210 :                     key_to_slru_block(key).context("invalid record")?;
     535           48210 :                 assert_eq!(
     536                 :                     slru_kind,
     537                 :                     SlruKind::MultiXactMembers,
     538 UBC           0 :                     "MultixactMembersCreate record with unexpected key {}",
     539                 :                     key
     540                 :                 );
     541 CBC      945038 :                 for (i, member) in members.iter().enumerate() {
     542          945038 :                     let offset = moff + i as u32;
     543          945038 : 
     544          945038 :                     // Compute the block and offset to modify.
     545          945038 :                     // See RecordNewMultiXact in PostgreSQL sources.
     546          945038 :                     let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
     547          945038 :                     let memberoff = mx_offset_to_member_offset(offset);
     548          945038 :                     let flagsoff = mx_offset_to_flags_offset(offset);
     549          945038 :                     let bshift = mx_offset_to_flags_bitshift(offset);
     550          945038 : 
     551          945038 :                     // Check that we're modifying the correct multixact-members block.
     552          945038 :                     let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     553          945038 :                     let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     554          945038 :                     assert!(
     555          945038 :                         segno == expected_segno,
     556 UBC           0 :                         "MultiXactMembersCreate record for offset {} with unexpected key {}",
     557                 :                         moff,
     558                 :                         key
     559                 :                     );
     560 CBC      945038 :                     assert!(
     561          945038 :                         blknum == expected_blknum,
     562 UBC           0 :                         "MultiXactMembersCreate record for offset {} with unexpected key {}",
     563                 :                         moff,
     564                 :                         key
     565                 :                     );
     566                 : 
     567 CBC      945038 :                     let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
     568          945038 :                     flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
     569          945038 :                     flagsval |= member.status << bshift;
     570          945038 :                     LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
     571          945038 :                     LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
     572                 :                 }
     573                 :             }
     574                 :         }
     575                 : 
     576        23050218 :         Ok(())
     577        23050218 :     }
     578                 : }
     579                 : 
     580                 : ///
     581                 : /// Command with ability not to give all file descriptors to child process
     582                 : ///
     583                 : trait CloseFileDescriptors: CommandExt {
     584                 :     ///
     585                 :     /// Close file descriptors (other than stdin, stdout, stderr) in child process
     586                 :     ///
     587                 :     fn close_fds(&mut self) -> &mut Command;
     588                 : }
     589                 : 
     590                 : impl<C: CommandExt> CloseFileDescriptors for C {
     591             386 :     fn close_fds(&mut self) -> &mut Command {
     592             386 :         unsafe {
     593             386 :             self.pre_exec(move || {
     594 UBC           0 :                 // SAFETY: Code executed inside pre_exec should have async-signal-safety,
     595               0 :                 // which means it should be safe to execute inside a signal handler.
     596               0 :                 // The precise meaning depends on platform. See `man signal-safety`
     597               0 :                 // for the linux definition.
     598               0 :                 //
     599               0 :                 // The set_fds_cloexec_threadsafe function is documented to be
     600               0 :                 // async-signal-safe.
     601               0 :                 //
     602               0 :                 // Aside from this function, the rest of the code is re-entrant and
     603               0 :                 // doesn't make any syscalls. We're just passing constants.
     604               0 :                 //
     605               0 :                 // NOTE: It's easy to indirectly cause a malloc or lock a mutex,
     606               0 :                 // which is not async-signal-safe. Be careful.
     607               0 :                 close_fds::set_fds_cloexec_threadsafe(3, &[]);
     608               0 :                 Ok(())
     609 CBC         386 :             })
     610             386 :         }
     611             386 :     }
     612                 : }
     613                 : 
     614                 : struct WalRedoProcess {
     615                 :     #[allow(dead_code)]
     616                 :     conf: &'static PageServerConf,
     617                 :     tenant_id: TenantId,
     618                 :     // Some() on construction, only becomes None on Drop.
     619                 :     child: Option<NoLeakChild>,
     620                 :     stdout: Mutex<ProcessOutput>,
     621                 :     stdin: Mutex<ProcessInput>,
     622                 :     stderr: Mutex<ChildStderr>,
     623                 :     /// Counter to separate same sized walredo inputs failing at the same millisecond.
     624                 :     #[cfg(feature = "testing")]
     625                 :     dump_sequence: AtomicUsize,
     626                 : }
     627                 : 
     628                 : impl WalRedoProcess {
     629                 :     //
     630                 :     // Start postgres binary in special WAL redo mode.
     631                 :     //
     632             386 :     #[instrument(skip_all,fields(tenant_id=%tenant_id, pg_version=pg_version))]
     633                 :     fn launch(
     634                 :         conf: &'static PageServerConf,
     635                 :         tenant_id: TenantId,
     636                 :         pg_version: u32,
     637                 :     ) -> anyhow::Result<Self> {
     638                 :         let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
     639                 :         let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
     640                 : 
     641                 :         // Start postgres itself
     642                 :         let child = Command::new(pg_bin_dir_path.join("postgres"))
     643                 :             .arg("--wal-redo")
     644                 :             .stdin(Stdio::piped())
     645                 :             .stderr(Stdio::piped())
     646                 :             .stdout(Stdio::piped())
     647                 :             .env_clear()
     648                 :             .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
     649                 :             .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
     650                 :             // The redo process is not trusted, and runs in seccomp mode that
     651                 :             // doesn't allow it to open any files. We have to also make sure it
     652                 :             // doesn't inherit any file descriptors from the pageserver, that
     653                 :             // would allow an attacker to read any files that happen to be open
     654                 :             // in the pageserver.
     655                 :             //
     656                 :             // The Rust standard library makes sure to mark any file descriptors with
     657                 :             // as close-on-exec by default, but that's not enough, since we use
     658                 :             // libraries that directly call libc open without setting that flag.
     659                 :             .close_fds()
     660                 :             .spawn_no_leak_child(tenant_id)
     661                 :             .context("spawn process")?;
     662                 : 
     663 UBC           0 :         let mut child = scopeguard::guard(child, |child| {
     664               0 :             error!("killing wal-redo-postgres process due to a problem during launch");
     665               0 :             child.kill_and_wait();
     666               0 :         });
     667                 : 
     668                 :         let stdin = child.stdin.take().unwrap();
     669                 :         let stdout = child.stdout.take().unwrap();
     670                 :         let stderr = child.stderr.take().unwrap();
     671                 : 
     672                 :         macro_rules! set_nonblock_or_log_err {
     673                 :             ($file:ident) => {{
     674                 :                 let res = set_nonblock($file.as_raw_fd());
     675                 :                 if let Err(e) = &res {
     676                 :                     error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
     677                 :                 }
     678                 :                 res
     679                 :             }};
     680                 :         }
     681               0 :         set_nonblock_or_log_err!(stdin)?;
     682               0 :         set_nonblock_or_log_err!(stdout)?;
     683               0 :         set_nonblock_or_log_err!(stderr)?;
     684                 : 
     685                 :         // all fallible operations post-spawn are complete, so get rid of the guard
     686                 :         let child = scopeguard::ScopeGuard::into_inner(child);
     687                 : 
     688                 :         Ok(Self {
     689                 :             conf,
     690                 :             tenant_id,
     691                 :             child: Some(child),
     692                 :             stdin: Mutex::new(ProcessInput {
     693                 :                 stdout_fd: stdout.as_raw_fd(),
     694                 :                 stderr_fd: stderr.as_raw_fd(),
     695                 :                 stdin,
     696                 :                 n_requests: 0,
     697                 :             }),
     698                 :             stdout: Mutex::new(ProcessOutput {
     699                 :                 stdout,
     700                 :                 pending_responses: VecDeque::new(),
     701                 :                 n_processed_responses: 0,
     702                 :             }),
     703                 :             stderr: Mutex::new(stderr),
     704                 :             #[cfg(feature = "testing")]
     705                 :             dump_sequence: AtomicUsize::default(),
     706                 :         })
     707                 :     }
     708                 : 
     709 CBC     2251815 :     fn id(&self) -> u32 {
     710         2251815 :         self.child
     711         2251815 :             .as_ref()
     712         2251815 :             .expect("must not call this during Drop")
     713         2251815 :             .id()
     714         2251815 :     }
     715                 : 
     716                 :     // Apply given WAL records ('records') over an old page image. Returns
     717                 :     // new page image.
     718                 :     //
     719         2251815 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%self.id()))]
     720                 :     fn apply_wal_records(
     721                 :         &self,
     722                 :         tag: BufferTag,
     723                 :         base_img: &Option<Bytes>,
     724                 :         records: &[(Lsn, NeonWalRecord)],
     725                 :         wal_redo_timeout: Duration,
     726                 :     ) -> anyhow::Result<Bytes> {
     727                 :         let input = self.stdin.lock().unwrap();
     728                 : 
     729                 :         // Serialize all the messages to send the WAL redo process first.
     730                 :         //
     731                 :         // This could be problematic if there are millions of records to replay,
     732                 :         // but in practice the number of records is usually so small that it doesn't
     733                 :         // matter, and it's better to keep this code simple.
     734                 :         //
     735                 :         // Most requests start with a before-image with BLCKSZ bytes, followed by
     736                 :         // by some other WAL records. Start with a buffer that can hold that
     737                 :         // comfortably.
     738                 :         let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
     739                 :         build_begin_redo_for_block_msg(tag, &mut writebuf);
     740                 :         if let Some(img) = base_img {
     741                 :             build_push_page_msg(tag, img, &mut writebuf);
     742                 :         }
     743                 :         for (lsn, rec) in records.iter() {
     744                 :             if let NeonWalRecord::Postgres {
     745                 :                 will_init: _,
     746                 :                 rec: postgres_rec,
     747                 :             } = rec
     748                 :             {
     749                 :                 build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
     750                 :             } else {
     751                 :                 anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
     752                 :             }
     753                 :         }
     754                 :         build_get_page_msg(tag, &mut writebuf);
     755                 :         WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
     756                 : 
     757                 :         let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
     758                 : 
     759                 :         if res.is_err() {
     760                 :             // not all of these can be caused by this particular input, however these are so rare
     761                 :             // in tests so capture all.
     762                 :             self.record_and_log(&writebuf);
     763                 :         }
     764                 : 
     765                 :         res
     766                 :     }
     767                 : 
     768         2251814 :     fn apply_wal_records0(
     769         2251814 :         &self,
     770         2251814 :         writebuf: &[u8],
     771         2251814 :         input: MutexGuard<ProcessInput>,
     772         2251814 :         wal_redo_timeout: Duration,
     773         2251814 :     ) -> anyhow::Result<Bytes> {
     774         2251814 :         let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
     775         2251814 :         let mut nwrite = 0usize;
     776         2251814 : 
     777         2251814 :         // Prepare for calling poll()
     778         2251814 :         let mut pollfds = [
     779         2251814 :             PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT),
     780         2251814 :             PollFd::new(proc.stderr_fd, PollFlags::POLLIN),
     781         2251814 :             PollFd::new(proc.stdout_fd, PollFlags::POLLIN),
     782         2251814 :         ];
     783                 : 
     784                 :         // We do two things simultaneously: send the old base image and WAL records to
     785                 :         // the child process's stdin and forward any logging
     786                 :         // information that the child writes to its stderr to the page server's log.
     787         4547832 :         while nwrite < writebuf.len() {
     788         2296018 :             let n = loop {
     789         2296018 :                 match nix::poll::poll(&mut pollfds[0..2], wal_redo_timeout.as_millis() as i32) {
     790 UBC           0 :                     Err(nix::errno::Errno::EINTR) => continue,
     791 CBC     2296018 :                     res => break res,
     792                 :                 }
     793 UBC           0 :             }?;
     794                 : 
     795 CBC     2296018 :             if n == 0 {
     796 UBC           0 :                 anyhow::bail!("WAL redo timed out");
     797 CBC     2296018 :             }
     798         2296018 : 
     799         2296018 :             // If we have some messages in stderr, forward them to the log.
     800         2296018 :             let err_revents = pollfds[1].revents().unwrap();
     801         2296018 :             if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
     802 UBC           0 :                 let mut errbuf: [u8; 16384] = [0; 16384];
     803               0 :                 let mut stderr = self.stderr.lock().unwrap();
     804               0 :                 let len = stderr.read(&mut errbuf)?;
     805                 : 
     806                 :                 // The message might not be split correctly into lines here. But this is
     807                 :                 // good enough, the important thing is to get the message to the log.
     808               0 :                 if len > 0 {
     809               0 :                     error!(
     810               0 :                         "wal-redo-postgres: {}",
     811               0 :                         String::from_utf8_lossy(&errbuf[0..len])
     812               0 :                     );
     813                 : 
     814                 :                     // To make sure we capture all log from the process if it fails, keep
     815                 :                     // reading from the stderr, before checking the stdout.
     816               0 :                     continue;
     817               0 :                 }
     818 CBC     2296018 :             } else if err_revents.contains(PollFlags::POLLHUP) {
     819 UBC           0 :                 anyhow::bail!("WAL redo process closed its stderr unexpectedly");
     820 CBC     2296018 :             }
     821                 : 
     822                 :             // If 'stdin' is writeable, do write.
     823         2296018 :             let in_revents = pollfds[0].revents().unwrap();
     824         2296018 :             if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
     825         2296018 :                 nwrite += proc.stdin.write(&writebuf[nwrite..])?;
     826 UBC           0 :             } else if in_revents.contains(PollFlags::POLLHUP) {
     827                 :                 // We still have more data to write, but the process closed the pipe.
     828               0 :                 anyhow::bail!("WAL redo process closed its stdin unexpectedly");
     829               0 :             }
     830                 :         }
     831 CBC     2251814 :         let request_no = proc.n_requests;
     832         2251814 :         proc.n_requests += 1;
     833         2251814 :         drop(proc);
     834         2251814 : 
     835         2251814 :         // To improve walredo performance we separate sending requests and receiving
     836         2251814 :         // responses. Them are protected by different mutexes (output and input).
     837         2251814 :         // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
     838         2251814 :         // then there is not warranty that T1 will first granted output mutex lock.
     839         2251814 :         // To address this issue we maintain number of sent requests, number of processed
     840         2251814 :         // responses and ring buffer with pending responses. After sending response
     841         2251814 :         // (under input mutex), threads remembers request number. Then it releases
     842         2251814 :         // input mutex, locks output mutex and fetch in ring buffer all responses until
     843         2251814 :         // its stored request number. The it takes correspondent element from
     844         2251814 :         // pending responses ring buffer and truncate all empty elements from the front,
     845         2251814 :         // advancing processed responses number.
     846         2251814 : 
     847         2251814 :         let mut output = self.stdout.lock().unwrap();
     848         2251814 :         let n_processed_responses = output.n_processed_responses;
     849         4503627 :         while n_processed_responses + output.pending_responses.len() <= request_no {
     850                 :             // We expect the WAL redo process to respond with an 8k page image. We read it
     851                 :             // into this buffer.
     852         2251814 :             let mut resultbuf = vec![0; BLCKSZ.into()];
     853         2251814 :             let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
     854         4503629 :             while nresult < BLCKSZ.into() {
     855                 :                 // We do two things simultaneously: reading response from stdout
     856                 :                 // and forward any logging information that the child writes to its stderr to the page server's log.
     857         2251815 :                 let n = loop {
     858         2251816 :                     match nix::poll::poll(&mut pollfds[1..3], wal_redo_timeout.as_millis() as i32) {
     859 UBC           0 :                         Err(nix::errno::Errno::EINTR) => continue,
     860 CBC     2251815 :                         res => break res,
     861                 :                     }
     862 UBC           0 :                 }?;
     863                 : 
     864 CBC     2251815 :                 if n == 0 {
     865 UBC           0 :                     anyhow::bail!("WAL redo timed out");
     866 CBC     2251815 :                 }
     867         2251815 : 
     868         2251815 :                 // If we have some messages in stderr, forward them to the log.
     869         2251815 :                 let err_revents = pollfds[1].revents().unwrap();
     870         2251815 :                 if err_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
     871               2 :                     let mut errbuf: [u8; 16384] = [0; 16384];
     872               2 :                     let mut stderr = self.stderr.lock().unwrap();
     873               2 :                     let len = stderr.read(&mut errbuf)?;
     874                 : 
     875                 :                     // The message might not be split correctly into lines here. But this is
     876                 :                     // good enough, the important thing is to get the message to the log.
     877               2 :                     if len > 0 {
     878               2 :                         error!(
     879               2 :                             "wal-redo-postgres: {}",
     880               2 :                             String::from_utf8_lossy(&errbuf[0..len])
     881               2 :                         );
     882                 : 
     883                 :                         // To make sure we capture all log from the process if it fails, keep
     884                 :                         // reading from the stderr, before checking the stdout.
     885               2 :                         continue;
     886 UBC           0 :                     }
     887 CBC     2251813 :                 } else if err_revents.contains(PollFlags::POLLHUP) {
     888 UBC           0 :                     anyhow::bail!("WAL redo process closed its stderr unexpectedly");
     889 CBC     2251813 :                 }
     890                 : 
     891                 :                 // If we have some data in stdout, read it to the result buffer.
     892         2251813 :                 let out_revents = pollfds[2].revents().unwrap();
     893         2251813 :                 if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
     894         2251813 :                     nresult += output.stdout.read(&mut resultbuf[nresult..])?;
     895 UBC           0 :                 } else if out_revents.contains(PollFlags::POLLHUP) {
     896               0 :                     anyhow::bail!("WAL redo process closed its stdout unexpectedly");
     897               0 :                 }
     898                 :             }
     899 CBC     2251813 :             output
     900         2251813 :                 .pending_responses
     901         2251813 :                 .push_back(Some(Bytes::from(resultbuf)));
     902                 :         }
     903                 :         // Replace our request's response with None in `pending_responses`.
     904                 :         // Then make space in the ring buffer by clearing out any seqence of contiguous
     905                 :         // `None`'s from the front of `pending_responses`.
     906                 :         // NB: We can't pop_front() because other requests' responses because another
     907                 :         // requester might have grabbed the output mutex before us:
     908                 :         // T1: grab input mutex
     909                 :         // T1: send request_no 23
     910                 :         // T1: release input mutex
     911                 :         // T2: grab input mutex
     912                 :         // T2: send request_no 24
     913                 :         // T2: release input mutex
     914                 :         // T2: grab output mutex
     915                 :         // T2: n_processed_responses + output.pending_responses.len() <= request_no
     916                 :         //            23                                0                   24
     917                 :         // T2: enters poll loop that reads stdout
     918                 :         // T2: put response for 23 into pending_responses
     919                 :         // T2: put response for 24 into pending_resposnes
     920                 :         // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
     921                 :         // T2: takes its response_24
     922                 :         // pending_responses now looks like this: Front Some(response_23) None Back
     923                 :         // T2: does the while loop below
     924                 :         // pending_responses now looks like this: Front Some(response_23) None Back
     925                 :         // T2: releases output mutex
     926                 :         // T1: grabs output mutex
     927                 :         // T1: n_processed_responses + output.pending_responses.len() > request_no
     928                 :         //            23                                2                   23
     929                 :         // T1: skips poll loop that reads stdout
     930                 :         // T1: takes its response_23
     931                 :         // pending_responses now looks like this: Front None None Back
     932                 :         // T2: does the while loop below
     933                 :         // pending_responses now looks like this: Front Back
     934                 :         // n_processed_responses now has value 25
     935         2251813 :         let res = output.pending_responses[request_no - n_processed_responses]
     936         2251813 :             .take()
     937         2251813 :             .expect("we own this request_no, nobody else is supposed to take it");
     938         4503626 :         while let Some(front) = output.pending_responses.front() {
     939         2261161 :             if front.is_none() {
     940         2251813 :                 output.pending_responses.pop_front();
     941         2251813 :                 output.n_processed_responses += 1;
     942         2251813 :             } else {
     943            9348 :                 break;
     944                 :             }
     945                 :         }
     946         2251813 :         Ok(res)
     947         2251813 :     }
     948                 : 
     949                 :     #[cfg(feature = "testing")]
     950 UBC           0 :     fn record_and_log(&self, writebuf: &[u8]) {
     951               0 :         let millis = std::time::SystemTime::now()
     952               0 :             .duration_since(std::time::SystemTime::UNIX_EPOCH)
     953               0 :             .unwrap()
     954               0 :             .as_millis();
     955               0 : 
     956               0 :         let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
     957               0 : 
     958               0 :         // these files will be collected to an allure report
     959               0 :         let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
     960               0 : 
     961               0 :         let path = self.conf.tenant_path(&self.tenant_id).join(&filename);
     962               0 : 
     963               0 :         let res = std::fs::OpenOptions::new()
     964               0 :             .write(true)
     965               0 :             .create_new(true)
     966               0 :             .read(true)
     967               0 :             .open(path)
     968               0 :             .and_then(|mut f| f.write_all(writebuf));
     969                 : 
     970                 :         // trip up allowed_errors
     971               0 :         if let Err(e) = res {
     972               0 :             tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
     973                 :         } else {
     974               0 :             tracing::error!(filename, "erroring walredo input saved");
     975                 :         }
     976               0 :     }
     977                 : 
     978                 :     #[cfg(not(feature = "testing"))]
     979                 :     fn record_and_log(&self, _: &[u8]) {}
     980                 : }
     981                 : 
     982                 : impl Drop for WalRedoProcess {
     983 CBC          50 :     fn drop(&mut self) {
     984              50 :         self.child
     985              50 :             .take()
     986              50 :             .expect("we only do this once")
     987              50 :             .kill_and_wait();
     988              50 :     }
     989                 : }
     990                 : 
     991                 : /// Wrapper type around `std::process::Child` which guarantees that the child
     992                 : /// will be killed and waited-for by this process before being dropped.
     993                 : struct NoLeakChild {
     994                 :     tenant_id: TenantId,
     995                 :     child: Option<Child>,
     996                 : }
     997                 : 
     998                 : impl Deref for NoLeakChild {
     999                 :     type Target = Child;
    1000                 : 
    1001         2251815 :     fn deref(&self) -> &Self::Target {
    1002         2251815 :         self.child.as_ref().expect("must not use from drop")
    1003         2251815 :     }
    1004                 : }
    1005                 : 
    1006                 : impl DerefMut for NoLeakChild {
    1007            1158 :     fn deref_mut(&mut self) -> &mut Self::Target {
    1008            1158 :         self.child.as_mut().expect("must not use from drop")
    1009            1158 :     }
    1010                 : }
    1011                 : 
    1012                 : impl NoLeakChild {
    1013             386 :     fn spawn(tenant_id: TenantId, command: &mut Command) -> io::Result<Self> {
    1014             386 :         let child = command.spawn()?;
    1015             386 :         Ok(NoLeakChild {
    1016             386 :             tenant_id,
    1017             386 :             child: Some(child),
    1018             386 :         })
    1019             386 :     }
    1020                 : 
    1021              50 :     fn kill_and_wait(mut self) {
    1022              50 :         let child = match self.child.take() {
    1023              50 :             Some(child) => child,
    1024 UBC           0 :             None => return,
    1025                 :         };
    1026 CBC          50 :         Self::kill_and_wait_impl(child);
    1027              50 :     }
    1028                 : 
    1029              50 :     #[instrument(skip_all, fields(pid=child.id()))]
    1030                 :     fn kill_and_wait_impl(mut child: Child) {
    1031                 :         let res = child.kill();
    1032                 :         if let Err(e) = res {
    1033                 :             // This branch is very unlikely because:
    1034                 :             // - We (= pageserver) spawned this process successfully, so, we're allowed to kill it.
    1035                 :             // - This is the only place that calls .kill()
    1036                 :             // - We consume `self`, so, .kill() can't be called twice.
    1037                 :             // - If the process exited by itself or was killed by someone else,
    1038                 :             //   .kill() will still succeed because we haven't wait()'ed yet.
    1039                 :             //
    1040                 :             // So, if we arrive here, we have really no idea what happened,
    1041                 :             // whether the PID stored in self.child is still valid, etc.
    1042                 :             // If this function were fallible, we'd return an error, but
    1043                 :             // since it isn't, all we can do is log an error and proceed
    1044                 :             // with the wait().
    1045 UBC           0 :             error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
    1046                 :         }
    1047                 : 
    1048                 :         match child.wait() {
    1049                 :             Ok(exit_status) => {
    1050 CBC          50 :                 info!(exit_status = %exit_status, "wait successful");
    1051                 :             }
    1052                 :             Err(e) => {
    1053 UBC           0 :                 error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)");
    1054                 :             }
    1055                 :         }
    1056                 :     }
    1057                 : }
    1058                 : 
    1059                 : impl Drop for NoLeakChild {
    1060 CBC          50 :     fn drop(&mut self) {
    1061              50 :         let child = match self.child.take() {
    1062 UBC           0 :             Some(child) => child,
    1063 CBC          50 :             None => return,
    1064                 :         };
    1065 UBC           0 :         let tenant_id = self.tenant_id;
    1066               0 :         // Offload the kill+wait of the child process into the background.
    1067               0 :         // If someone stops the runtime, we'll leak the child process.
    1068               0 :         // We can ignore that case because we only stop the runtime on pageserver exit.
    1069               0 :         BACKGROUND_RUNTIME.spawn(async move {
    1070               0 :             tokio::task::spawn_blocking(move || {
    1071                 :                 // Intentionally don't inherit the tracing context from whoever is dropping us.
    1072                 :                 // This thread here is going to outlive of our dropper.
    1073               0 :                 let span = tracing::info_span!("walredo", %tenant_id);
    1074               0 :                 let _entered = span.enter();
    1075               0 :                 Self::kill_and_wait_impl(child);
    1076               0 :             })
    1077               0 :             .await
    1078               0 :         });
    1079 CBC          50 :     }
    1080                 : }
    1081                 : 
    1082                 : trait NoLeakChildCommandExt {
    1083                 :     fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result<NoLeakChild>;
    1084                 : }
    1085                 : 
    1086                 : impl NoLeakChildCommandExt for Command {
    1087             386 :     fn spawn_no_leak_child(&mut self, tenant_id: TenantId) -> io::Result<NoLeakChild> {
    1088             386 :         NoLeakChild::spawn(tenant_id, self)
    1089             386 :     }
    1090                 : }
    1091                 : 
    1092                 : // Functions for constructing messages to send to the postgres WAL redo
    1093                 : // process. See pgxn/neon_walredo/walredoproc.c for
    1094                 : // explanation of the protocol.
    1095                 : 
    1096         2251814 : fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
    1097         2251814 :     let len = 4 + 1 + 4 * 4;
    1098         2251814 : 
    1099         2251814 :     buf.put_u8(b'B');
    1100         2251814 :     buf.put_u32(len as u32);
    1101         2251814 : 
    1102         2251814 :     tag.ser_into(buf)
    1103         2251814 :         .expect("serialize BufferTag should always succeed");
    1104         2251814 : }
    1105                 : 
    1106          499721 : fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
    1107          499721 :     assert!(base_img.len() == 8192);
    1108                 : 
    1109          499721 :     let len = 4 + 1 + 4 * 4 + base_img.len();
    1110          499721 : 
    1111          499721 :     buf.put_u8(b'P');
    1112          499721 :     buf.put_u32(len as u32);
    1113          499721 :     tag.ser_into(buf)
    1114          499721 :         .expect("serialize BufferTag should always succeed");
    1115          499721 :     buf.put(base_img);
    1116          499721 : }
    1117                 : 
    1118       136659088 : fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
    1119       136659088 :     let len = 4 + 8 + rec.len();
    1120       136659088 : 
    1121       136659088 :     buf.put_u8(b'A');
    1122       136659088 :     buf.put_u32(len as u32);
    1123       136659088 :     buf.put_u64(endlsn.0);
    1124       136659088 :     buf.put(rec);
    1125       136659088 : }
    1126                 : 
    1127         2251814 : fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
    1128         2251814 :     let len = 4 + 1 + 4 * 4;
    1129         2251814 : 
    1130         2251814 :     buf.put_u8(b'G');
    1131         2251814 :     buf.put_u32(len as u32);
    1132         2251814 :     tag.ser_into(buf)
    1133         2251814 :         .expect("serialize BufferTag should always succeed");
    1134         2251814 : }
    1135                 : 
    1136                 : #[cfg(test)]
    1137                 : mod tests {
    1138                 :     use super::PostgresRedoManager;
    1139                 :     use crate::repository::Key;
    1140                 :     use crate::{config::PageServerConf, walrecord::NeonWalRecord};
    1141                 :     use bytes::Bytes;
    1142                 :     use std::str::FromStr;
    1143                 :     use utils::{id::TenantId, lsn::Lsn};
    1144                 : 
    1145               1 :     #[tokio::test]
    1146               1 :     async fn short_v14_redo() {
    1147               1 :         let expected = std::fs::read("fixtures/short_v14_redo.page").unwrap();
    1148               1 : 
    1149               1 :         let h = RedoHarness::new().unwrap();
    1150                 : 
    1151               1 :         let page = h
    1152               1 :             .manager
    1153               1 :             .request_redo(
    1154               1 :                 Key {
    1155               1 :                     field1: 0,
    1156               1 :                     field2: 1663,
    1157               1 :                     field3: 13010,
    1158               1 :                     field4: 1259,
    1159               1 :                     field5: 0,
    1160               1 :                     field6: 0,
    1161               1 :                 },
    1162               1 :                 Lsn::from_str("0/16E2408").unwrap(),
    1163               1 :                 None,
    1164               1 :                 short_records(),
    1165               1 :                 14,
    1166               1 :             )
    1167 UBC           0 :             .await
    1168 CBC           1 :             .unwrap();
    1169               1 : 
    1170               1 :         assert_eq!(&expected, &*page);
    1171                 :     }
    1172                 : 
    1173               1 :     #[tokio::test]
    1174               1 :     async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
    1175               1 :         let h = RedoHarness::new().unwrap();
    1176                 : 
    1177               1 :         let page = h
    1178               1 :             .manager
    1179               1 :             .request_redo(
    1180               1 :                 Key {
    1181               1 :                     field1: 0,
    1182               1 :                     field2: 1663,
    1183               1 :                     // key should be 13010
    1184               1 :                     field3: 13130,
    1185               1 :                     field4: 1259,
    1186               1 :                     field5: 0,
    1187               1 :                     field6: 0,
    1188               1 :                 },
    1189               1 :                 Lsn::from_str("0/16E2408").unwrap(),
    1190               1 :                 None,
    1191               1 :                 short_records(),
    1192               1 :                 14,
    1193               1 :             )
    1194 UBC           0 :             .await
    1195 CBC           1 :             .unwrap();
    1196               1 : 
    1197               1 :         // TODO: there will be some stderr printout, which is forwarded to tracing that could
    1198               1 :         // perhaps be captured as long as it's in the same thread.
    1199               1 :         assert_eq!(page, crate::ZERO_PAGE);
    1200                 :     }
    1201                 : 
    1202                 :     #[allow(clippy::octal_escapes)]
    1203               2 :     fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
    1204               2 :         vec![
    1205               2 :             (
    1206               2 :                 Lsn::from_str("0/16A9388").unwrap(),
    1207               2 :                 NeonWalRecord::Postgres {
    1208               2 :                     will_init: true,
    1209               2 :                     rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
    1210               2 :                 }
    1211               2 :             ),
    1212               2 :             (
    1213               2 :                 Lsn::from_str("0/16D4080").unwrap(),
    1214               2 :                 NeonWalRecord::Postgres {
    1215               2 :                     will_init: false,
    1216               2 :                     rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
    1217               2 :                 }
    1218               2 :             )
    1219               2 :         ]
    1220               2 :     }
    1221                 : 
    1222                 :     struct RedoHarness {
    1223                 :         // underscored because unused, except for removal at drop
    1224                 :         _repo_dir: camino_tempfile::Utf8TempDir,
    1225                 :         manager: PostgresRedoManager,
    1226                 :     }
    1227                 : 
    1228                 :     impl RedoHarness {
    1229               2 :         fn new() -> anyhow::Result<Self> {
    1230               2 :             let repo_dir = camino_tempfile::tempdir()?;
    1231               2 :             let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
    1232               2 :             let conf = Box::leak(Box::new(conf));
    1233               2 :             let tenant_id = TenantId::generate();
    1234               2 : 
    1235               2 :             let manager = PostgresRedoManager::new(conf, tenant_id);
    1236               2 : 
    1237               2 :             Ok(RedoHarness {
    1238               2 :                 _repo_dir: repo_dir,
    1239               2 :                 manager,
    1240               2 :             })
    1241               2 :         }
    1242                 :     }
    1243                 : }
        

Generated by: LCOV version 2.1-beta