LCOV - differential code coverage report
Current view: top level - pageserver/src - walredo.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 87.9 % 687 604 83 604
Current Date: 2024-01-09 02:06:09 Functions: 63.0 % 73 46 27 46
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //!
       2                 : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
       3                 : //! to apply given WAL records over an old page image and return new
       4                 : //! page image.
       5                 : //!
       6                 : //! We rely on Postgres to perform WAL redo for us. We launch a
       7                 : //! postgres process in special "wal redo" mode that's similar to
       8                 : //! single-user mode. We then pass the previous page image, if any,
       9                 : //! and all the WAL records we want to apply, to the postgres
      10                 : //! process. Then we get the page image back. Communication with the
      11                 : //! postgres process happens via stdin/stdout
      12                 : //!
      13                 : //! See pgxn/neon_walredo/walredoproc.c for the other side of
      14                 : //! this communication.
      15                 : //!
      16                 : //! The Postgres process is assumed to be secure against malicious WAL
      17                 : //! records. It achieves it by dropping privileges before replaying
      18                 : //! any WAL records, so that even if an attacker hijacks the Postgres
      19                 : //! process, he cannot escape out of it.
      20                 : //!
      21                 : use anyhow::Context;
      22                 : use byteorder::{ByteOrder, LittleEndian};
      23                 : use bytes::{BufMut, Bytes, BytesMut};
      24                 : use nix::poll::*;
      25                 : use pageserver_api::shard::TenantShardId;
      26                 : use serde::Serialize;
      27                 : use std::collections::VecDeque;
      28                 : use std::io;
      29                 : use std::io::prelude::*;
      30                 : use std::ops::{Deref, DerefMut};
      31                 : use std::os::unix::io::AsRawFd;
      32                 : use std::os::unix::prelude::CommandExt;
      33                 : use std::process::Stdio;
      34                 : use std::process::{Child, ChildStdin, ChildStdout, Command};
      35                 : use std::sync::{Arc, Mutex, MutexGuard, RwLock};
      36                 : use std::time::Duration;
      37                 : use std::time::Instant;
      38                 : use tracing::*;
      39                 : use utils::{bin_ser::BeSer, lsn::Lsn, nonblock::set_nonblock};
      40                 : 
      41                 : #[cfg(feature = "testing")]
      42                 : use std::sync::atomic::{AtomicUsize, Ordering};
      43                 : 
      44                 : use crate::config::PageServerConf;
      45                 : use crate::metrics::{
      46                 :     WalRedoKillCause, WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_COUNTERS,
      47                 :     WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM,
      48                 :     WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
      49                 : };
      50                 : use crate::pgdatadir_mapping::{key_to_rel_block, key_to_slru_block};
      51                 : use crate::repository::Key;
      52                 : use crate::walrecord::NeonWalRecord;
      53                 : use pageserver_api::reltag::{RelTag, SlruKind};
      54                 : use postgres_ffi::pg_constants;
      55                 : use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
      56                 : use postgres_ffi::v14::nonrelfile_utils::{
      57                 :     mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
      58                 :     transaction_id_set_status,
      59                 : };
      60                 : use postgres_ffi::BLCKSZ;
      61                 : 
      62                 : ///
      63                 : /// `RelTag` + block number (`blknum`) gives us a unique id of the page in the cluster.
      64                 : ///
      65                 : /// In Postgres `BufferTag` structure is used for exactly the same purpose.
      66                 : /// [See more related comments here](https://github.com/postgres/postgres/blob/99c5852e20a0987eca1c38ba0c09329d4076b6a0/src/include/storage/buf_internals.h#L91).
      67                 : ///
      68 CBC     4544070 : #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Serialize)]
      69                 : pub(crate) struct BufferTag {
      70                 :     pub rel: RelTag,
      71                 :     pub blknum: u32,
      72                 : }
      73                 : 
      74                 : struct ProcessInput {
      75                 :     stdin: ChildStdin,
      76                 :     n_requests: usize,
      77                 : }
      78                 : 
      79                 : struct ProcessOutput {
      80                 :     stdout: ChildStdout,
      81                 :     pending_responses: VecDeque<Option<Bytes>>,
      82                 :     n_processed_responses: usize,
      83                 : }
      84                 : 
      85                 : ///
      86                 : /// This is the real implementation that uses a Postgres process to
      87                 : /// perform WAL replay. Only one thread can use the process at a time,
      88                 : /// that is controlled by the Mutex. In the future, we might want to
      89                 : /// launch a pool of processes to allow concurrent replay of multiple
      90                 : /// records.
      91                 : ///
      92                 : pub struct PostgresRedoManager {
      93                 :     tenant_shard_id: TenantShardId,
      94                 :     conf: &'static PageServerConf,
      95                 :     last_redo_at: std::sync::Mutex<Option<Instant>>,
      96                 :     redo_process: RwLock<Option<Arc<WalRedoProcess>>>,
      97                 : }
      98                 : 
      99                 : /// Can this request be served by neon redo functions
     100                 : /// or we need to pass it to wal-redo postgres process?
     101        72749603 : fn can_apply_in_neon(rec: &NeonWalRecord) -> bool {
     102        72749603 :     // Currently, we don't have bespoken Rust code to replay any
     103        72749603 :     // Postgres WAL records. But everything else is handled in neon.
     104        72749603 :     #[allow(clippy::match_like_matches_macro)]
     105        72749603 :     match rec {
     106                 :         NeonWalRecord::Postgres {
     107                 :             will_init: _,
     108                 :             rec: _,
     109        56589789 :         } => false,
     110        16159814 :         _ => true,
     111                 :     }
     112        72749603 : }
     113                 : 
     114                 : ///
     115                 : /// Public interface of WAL redo manager
     116                 : ///
     117                 : impl PostgresRedoManager {
     118                 :     ///
     119                 :     /// Request the WAL redo manager to apply some WAL records
     120                 :     ///
     121                 :     /// The WAL redo is handled by a separate thread, so this just sends a request
     122                 :     /// to the thread and waits for response.
     123                 :     ///
     124                 :     /// # Cancel-Safety
     125                 :     ///
     126                 :     /// This method is cancellation-safe.
     127         2060221 :     pub async fn request_redo(
     128         2060221 :         &self,
     129         2060221 :         key: Key,
     130         2060221 :         lsn: Lsn,
     131         2060221 :         base_img: Option<(Lsn, Bytes)>,
     132         2060221 :         records: Vec<(Lsn, NeonWalRecord)>,
     133         2060221 :         pg_version: u32,
     134         2060221 :     ) -> anyhow::Result<Bytes> {
     135         2060221 :         if records.is_empty() {
     136 UBC           0 :             anyhow::bail!("invalid WAL redo request with no records");
     137 CBC     2060221 :         }
     138         2060221 : 
     139         2060221 :         let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
     140         2060221 :         let mut img = base_img.map(|p| p.1);
     141         2060221 :         let mut batch_neon = can_apply_in_neon(&records[0].1);
     142         2060221 :         let mut batch_start = 0;
     143        70689382 :         for (i, record) in records.iter().enumerate().skip(1) {
     144        70689382 :             let rec_neon = can_apply_in_neon(&record.1);
     145        70689382 : 
     146        70689382 :             if rec_neon != batch_neon {
     147              13 :                 let result = if batch_neon {
     148              10 :                     self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
     149                 :                 } else {
     150               3 :                     self.apply_batch_postgres(
     151               3 :                         key,
     152               3 :                         lsn,
     153               3 :                         img,
     154               3 :                         base_img_lsn,
     155               3 :                         &records[batch_start..i],
     156               3 :                         self.conf.wal_redo_timeout,
     157               3 :                         pg_version,
     158               3 :                     )
     159                 :                 };
     160              13 :                 img = Some(result?);
     161                 : 
     162              13 :                 batch_neon = rec_neon;
     163              13 :                 batch_start = i;
     164        70689369 :             }
     165                 :         }
     166                 :         // last batch
     167         2060221 :         if batch_neon {
     168            3974 :             self.apply_batch_neon(key, lsn, img, &records[batch_start..])
     169                 :         } else {
     170         2056247 :             self.apply_batch_postgres(
     171         2056247 :                 key,
     172         2056247 :                 lsn,
     173         2056247 :                 img,
     174         2056247 :                 base_img_lsn,
     175         2056247 :                 &records[batch_start..],
     176         2056247 :                 self.conf.wal_redo_timeout,
     177         2056247 :                 pg_version,
     178         2056247 :             )
     179                 :         }
     180         2060221 :     }
     181                 : }
     182                 : 
     183                 : impl PostgresRedoManager {
     184                 :     ///
     185                 :     /// Create a new PostgresRedoManager.
     186                 :     ///
     187             739 :     pub fn new(
     188             739 :         conf: &'static PageServerConf,
     189             739 :         tenant_shard_id: TenantShardId,
     190             739 :     ) -> PostgresRedoManager {
     191             739 :         // The actual process is launched lazily, on first request.
     192             739 :         PostgresRedoManager {
     193             739 :             tenant_shard_id,
     194             739 :             conf,
     195             739 :             last_redo_at: std::sync::Mutex::default(),
     196             739 :             redo_process: RwLock::new(None),
     197             739 :         }
     198             739 :     }
     199                 : 
     200                 :     /// This type doesn't have its own background task to check for idleness: we
     201                 :     /// rely on our owner calling this function periodically in its own housekeeping
     202                 :     /// loops.
     203             727 :     pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
     204             727 :         if let Ok(g) = self.last_redo_at.try_lock() {
     205             727 :             if let Some(last_redo_at) = *g {
     206             344 :                 if last_redo_at.elapsed() >= idle_timeout {
     207             192 :                     drop(g);
     208             192 :                     let mut guard = self.redo_process.write().unwrap();
     209             192 :                     *guard = None;
     210             192 :                 }
     211             383 :             }
     212 UBC           0 :         }
     213 CBC         727 :     }
     214                 : 
     215                 :     ///
     216                 :     /// Process one request for WAL redo using wal-redo postgres
     217                 :     ///
     218                 :     #[allow(clippy::too_many_arguments)]
     219         2056250 :     fn apply_batch_postgres(
     220         2056250 :         &self,
     221         2056250 :         key: Key,
     222         2056250 :         lsn: Lsn,
     223         2056250 :         base_img: Option<Bytes>,
     224         2056250 :         base_img_lsn: Lsn,
     225         2056250 :         records: &[(Lsn, NeonWalRecord)],
     226         2056250 :         wal_redo_timeout: Duration,
     227         2056250 :         pg_version: u32,
     228         2056250 :     ) -> anyhow::Result<Bytes> {
     229         2056250 :         *(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
     230                 : 
     231         2056250 :         let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
     232                 :         const MAX_RETRY_ATTEMPTS: u32 = 1;
     233         2056250 :         let mut n_attempts = 0u32;
     234                 :         loop {
     235                 :             // launch the WAL redo process on first use
     236         2056251 :             let proc: Arc<WalRedoProcess> = {
     237         2056251 :                 let proc_guard = self.redo_process.read().unwrap();
     238         2056251 :                 match &*proc_guard {
     239                 :                     None => {
     240                 :                         // "upgrade" to write lock to launch the process
     241             547 :                         drop(proc_guard);
     242             547 :                         let mut proc_guard = self.redo_process.write().unwrap();
     243             547 :                         match &*proc_guard {
     244                 :                             None => {
     245             544 :                                 let timer =
     246             544 :                                     WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.start_timer();
     247             544 :                                 let proc = Arc::new(
     248             544 :                                     WalRedoProcess::launch(
     249             544 :                                         self.conf,
     250             544 :                                         self.tenant_shard_id,
     251             544 :                                         pg_version,
     252             544 :                                     )
     253             544 :                                     .context("launch walredo process")?,
     254                 :                                 );
     255             544 :                                 timer.observe_duration();
     256             544 :                                 *proc_guard = Some(Arc::clone(&proc));
     257             544 :                                 proc
     258                 :                             }
     259               3 :                             Some(proc) => Arc::clone(proc),
     260                 :                         }
     261                 :                     }
     262         2055704 :                     Some(proc) => Arc::clone(proc),
     263                 :                 }
     264                 :             };
     265                 : 
     266         2056251 :             let started_at = std::time::Instant::now();
     267         2056251 : 
     268         2056251 :             // Relational WAL records are applied using wal-redo-postgres
     269         2056251 :             let buf_tag = BufferTag { rel, blknum };
     270         2056251 :             let result = proc
     271         2056251 :                 .apply_wal_records(buf_tag, &base_img, records, wal_redo_timeout)
     272         2056251 :                 .context("apply_wal_records");
     273         2056251 : 
     274         2056251 :             let duration = started_at.elapsed();
     275         2056251 : 
     276         2056251 :             let len = records.len();
     277        56589791 :             let nbytes = records.iter().fold(0, |acumulator, record| {
     278        56589791 :                 acumulator
     279        56589791 :                     + match &record.1 {
     280        56589791 :                         NeonWalRecord::Postgres { rec, .. } => rec.len(),
     281 UBC           0 :                         _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
     282                 :                     }
     283 CBC    56589791 :             });
     284         2056251 : 
     285         2056251 :             WAL_REDO_TIME.observe(duration.as_secs_f64());
     286         2056251 :             WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
     287         2056251 :             WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
     288         2056251 : 
     289         2056251 :             debug!(
     290 UBC           0 :                 "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
     291               0 :                 len,
     292               0 :                 nbytes,
     293               0 :                 duration.as_micros(),
     294               0 :                 lsn
     295               0 :             );
     296                 : 
     297                 :             // If something went wrong, don't try to reuse the process. Kill it, and
     298                 :             // next request will launch a new one.
     299 CBC     2056251 :             if let Err(e) = result.as_ref() {
     300               2 :                 error!(
     301               2 :                     "error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
     302               2 :                     records.len(),
     303               2 :                     records.first().map(|p| p.0).unwrap_or(Lsn(0)),
     304               2 :                     records.last().map(|p| p.0).unwrap_or(Lsn(0)),
     305               2 :                     nbytes,
     306               2 :                     base_img_lsn,
     307               2 :                     lsn,
     308               2 :                     n_attempts,
     309               2 :                     e,
     310               2 :                 );
     311                 :                 // Avoid concurrent callers hitting the same issue.
     312                 :                 // We can't prevent it from happening because we want to enable parallelism.
     313                 :                 {
     314               2 :                     let mut guard = self.redo_process.write().unwrap();
     315               2 :                     match &*guard {
     316               2 :                         Some(current_field_value) => {
     317               2 :                             if Arc::ptr_eq(current_field_value, &proc) {
     318               2 :                                 // We're the first to observe an error from `proc`, it's our job to take it out of rotation.
     319               2 :                                 *guard = None;
     320               2 :                             }
     321                 :                         }
     322 UBC           0 :                         None => {
     323               0 :                             // Another thread was faster to observe the error, and already took the process out of rotation.
     324               0 :                         }
     325                 :                     }
     326                 :                 }
     327                 :                 // NB: there may still be other concurrent threads using `proc`.
     328                 :                 // The last one will send SIGKILL when the underlying Arc reaches refcount 0.
     329                 :                 // NB: it's important to drop(proc) after drop(guard). Otherwise we'd keep
     330                 :                 // holding the lock while waiting for the process to exit.
     331                 :                 // NB: the drop impl blocks the current threads with a wait() system call for
     332                 :                 // the child process. We dropped the `guard` above so that other threads aren't
     333                 :                 // affected. But, it's good that the current thread _does_ block to wait.
     334                 :                 // If we instead deferred the waiting into the background / to tokio, it could
     335                 :                 // happen that if walredo always fails immediately, we spawn processes faster
     336                 :                 // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
     337                 :                 // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
     338                 :                 // This probably needs revisiting at some later point.
     339 CBC           2 :                 drop(proc);
     340         2056249 :             } else if n_attempts != 0 {
     341 UBC           0 :                 info!(n_attempts, "retried walredo succeeded");
     342 CBC     2056249 :             }
     343         2056251 :             n_attempts += 1;
     344         2056251 :             if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
     345         2056250 :                 return result;
     346               1 :             }
     347                 :         }
     348         2056250 :     }
     349                 : 
     350                 :     ///
     351                 :     /// Process a batch of WAL records using bespoken Neon code.
     352                 :     ///
     353            3984 :     fn apply_batch_neon(
     354            3984 :         &self,
     355            3984 :         key: Key,
     356            3984 :         lsn: Lsn,
     357            3984 :         base_img: Option<Bytes>,
     358            3984 :         records: &[(Lsn, NeonWalRecord)],
     359            3984 :     ) -> anyhow::Result<Bytes> {
     360            3984 :         let start_time = Instant::now();
     361            3984 : 
     362            3984 :         let mut page = BytesMut::new();
     363            3984 :         if let Some(fpi) = base_img {
     364            3984 :             // If full-page image is provided, then use it...
     365            3984 :             page.extend_from_slice(&fpi[..]);
     366            3984 :         } else {
     367                 :             // All the current WAL record types that we can handle require a base image.
     368 UBC           0 :             anyhow::bail!("invalid neon WAL redo request with no base image");
     369                 :         }
     370                 : 
     371                 :         // Apply all the WAL records in the batch
     372 CBC    16159814 :         for (record_lsn, record) in records.iter() {
     373        16159814 :             self.apply_record_neon(key, &mut page, *record_lsn, record)?;
     374                 :         }
     375                 :         // Success!
     376            3984 :         let duration = start_time.elapsed();
     377            3984 :         // FIXME: using the same metric here creates a bimodal distribution by default, and because
     378            3984 :         // there could be multiple batch sizes this would be N+1 modal.
     379            3984 :         WAL_REDO_TIME.observe(duration.as_secs_f64());
     380            3984 : 
     381            3984 :         debug!(
     382 UBC           0 :             "neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
     383               0 :             records.len(),
     384               0 :             duration.as_micros(),
     385               0 :             lsn
     386               0 :         );
     387                 : 
     388 CBC        3984 :         Ok(page.freeze())
     389            3984 :     }
     390                 : 
     391        16159814 :     fn apply_record_neon(
     392        16159814 :         &self,
     393        16159814 :         key: Key,
     394        16159814 :         page: &mut BytesMut,
     395        16159814 :         _record_lsn: Lsn,
     396        16159814 :         record: &NeonWalRecord,
     397        16159814 :     ) -> anyhow::Result<()> {
     398        16159814 :         match record {
     399                 :             NeonWalRecord::Postgres {
     400                 :                 will_init: _,
     401                 :                 rec: _,
     402                 :             } => {
     403 UBC           0 :                 anyhow::bail!("tried to pass postgres wal record to neon WAL redo");
     404                 :             }
     405                 :             NeonWalRecord::ClearVisibilityMapFlags {
     406 CBC         870 :                 new_heap_blkno,
     407             870 :                 old_heap_blkno,
     408             870 :                 flags,
     409                 :             } => {
     410                 :                 // sanity check that this is modifying the correct relation
     411             870 :                 let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
     412             870 :                 assert!(
     413             870 :                     rel.forknum == VISIBILITYMAP_FORKNUM,
     414 UBC           0 :                     "ClearVisibilityMapFlags record on unexpected rel {}",
     415                 :                     rel
     416                 :                 );
     417 CBC         870 :                 if let Some(heap_blkno) = *new_heap_blkno {
     418                 :                     // Calculate the VM block and offset that corresponds to the heap block.
     419             398 :                     let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
     420             398 :                     let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
     421             398 :                     let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
     422                 : 
     423                 :                     // Check that we're modifying the correct VM block.
     424             398 :                     assert!(map_block == blknum);
     425                 : 
     426                 :                     // equivalent to PageGetContents(page)
     427             398 :                     let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
     428             398 : 
     429             398 :                     map[map_byte as usize] &= !(flags << map_offset);
     430             472 :                 }
     431                 : 
     432                 :                 // Repeat for 'old_heap_blkno', if any
     433             870 :                 if let Some(heap_blkno) = *old_heap_blkno {
     434             474 :                     let map_block = pg_constants::HEAPBLK_TO_MAPBLOCK(heap_blkno);
     435             474 :                     let map_byte = pg_constants::HEAPBLK_TO_MAPBYTE(heap_blkno);
     436             474 :                     let map_offset = pg_constants::HEAPBLK_TO_OFFSET(heap_blkno);
     437                 : 
     438             474 :                     assert!(map_block == blknum);
     439                 : 
     440             474 :                     let map = &mut page[pg_constants::MAXALIGN_SIZE_OF_PAGE_HEADER_DATA..];
     441             474 : 
     442             474 :                     map[map_byte as usize] &= !(flags << map_offset);
     443             396 :                 }
     444                 :             }
     445                 :             // Non-relational WAL records are handled here, with custom code that has the
     446                 :             // same effects as the corresponding Postgres WAL redo function.
     447        16061763 :             NeonWalRecord::ClogSetCommitted { xids, timestamp } => {
     448        16061763 :                 let (slru_kind, segno, blknum) =
     449        16061763 :                     key_to_slru_block(key).context("invalid record")?;
     450        16061763 :                 assert_eq!(
     451                 :                     slru_kind,
     452                 :                     SlruKind::Clog,
     453 UBC           0 :                     "ClogSetCommitted record with unexpected key {}",
     454                 :                     key
     455                 :                 );
     456 CBC    32223539 :                 for &xid in xids {
     457        16161776 :                     let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
     458        16161776 :                     let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     459        16161776 :                     let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     460                 : 
     461                 :                     // Check that we're modifying the correct CLOG block.
     462        16161776 :                     assert!(
     463        16161776 :                         segno == expected_segno,
     464 UBC           0 :                         "ClogSetCommitted record for XID {} with unexpected key {}",
     465                 :                         xid,
     466                 :                         key
     467                 :                     );
     468 CBC    16161776 :                     assert!(
     469        16161776 :                         blknum == expected_blknum,
     470 UBC           0 :                         "ClogSetCommitted record for XID {} with unexpected key {}",
     471                 :                         xid,
     472                 :                         key
     473                 :                     );
     474                 : 
     475 CBC    16161776 :                     transaction_id_set_status(
     476        16161776 :                         xid,
     477        16161776 :                         pg_constants::TRANSACTION_STATUS_COMMITTED,
     478        16161776 :                         page,
     479        16161776 :                     );
     480                 :                 }
     481                 : 
     482                 :                 // Append the timestamp
     483        16061763 :                 if page.len() == BLCKSZ as usize + 8 {
     484        16058614 :                     page.truncate(BLCKSZ as usize);
     485        16058614 :                 }
     486        16061763 :                 if page.len() == BLCKSZ as usize {
     487        16061763 :                     page.extend_from_slice(&timestamp.to_be_bytes());
     488        16061763 :                 } else {
     489 UBC           0 :                     warn!(
     490               0 :                         "CLOG blk {} in seg {} has invalid size {}",
     491               0 :                         blknum,
     492               0 :                         segno,
     493               0 :                         page.len()
     494               0 :                     );
     495                 :                 }
     496                 :             }
     497 CBC        1309 :             NeonWalRecord::ClogSetAborted { xids } => {
     498            1309 :                 let (slru_kind, segno, blknum) =
     499            1309 :                     key_to_slru_block(key).context("invalid record")?;
     500            1309 :                 assert_eq!(
     501                 :                     slru_kind,
     502                 :                     SlruKind::Clog,
     503 UBC           0 :                     "ClogSetAborted record with unexpected key {}",
     504                 :                     key
     505                 :                 );
     506 CBC        2625 :                 for &xid in xids {
     507            1316 :                     let pageno = xid / pg_constants::CLOG_XACTS_PER_PAGE;
     508            1316 :                     let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     509            1316 :                     let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     510                 : 
     511                 :                     // Check that we're modifying the correct CLOG block.
     512            1316 :                     assert!(
     513            1316 :                         segno == expected_segno,
     514 UBC           0 :                         "ClogSetAborted record for XID {} with unexpected key {}",
     515                 :                         xid,
     516                 :                         key
     517                 :                     );
     518 CBC        1316 :                     assert!(
     519            1316 :                         blknum == expected_blknum,
     520 UBC           0 :                         "ClogSetAborted record for XID {} with unexpected key {}",
     521                 :                         xid,
     522                 :                         key
     523                 :                     );
     524                 : 
     525 CBC        1316 :                     transaction_id_set_status(xid, pg_constants::TRANSACTION_STATUS_ABORTED, page);
     526                 :                 }
     527                 :             }
     528           47662 :             NeonWalRecord::MultixactOffsetCreate { mid, moff } => {
     529           47662 :                 let (slru_kind, segno, blknum) =
     530           47662 :                     key_to_slru_block(key).context("invalid record")?;
     531           47662 :                 assert_eq!(
     532                 :                     slru_kind,
     533                 :                     SlruKind::MultiXactOffsets,
     534 UBC           0 :                     "MultixactOffsetCreate record with unexpected key {}",
     535                 :                     key
     536                 :                 );
     537                 :                 // Compute the block and offset to modify.
     538                 :                 // See RecordNewMultiXact in PostgreSQL sources.
     539 CBC       47662 :                 let pageno = mid / pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     540           47662 :                 let entryno = mid % pg_constants::MULTIXACT_OFFSETS_PER_PAGE as u32;
     541           47662 :                 let offset = (entryno * 4) as usize;
     542           47662 : 
     543           47662 :                 // Check that we're modifying the correct multixact-offsets block.
     544           47662 :                 let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     545           47662 :                 let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     546           47662 :                 assert!(
     547           47662 :                     segno == expected_segno,
     548 UBC           0 :                     "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
     549                 :                     mid,
     550                 :                     key
     551                 :                 );
     552 CBC       47662 :                 assert!(
     553           47662 :                     blknum == expected_blknum,
     554 UBC           0 :                     "MultiXactOffsetsCreate record for multi-xid {} with unexpected key {}",
     555                 :                     mid,
     556                 :                     key
     557                 :                 );
     558                 : 
     559 CBC       47662 :                 LittleEndian::write_u32(&mut page[offset..offset + 4], *moff);
     560                 :             }
     561           48210 :             NeonWalRecord::MultixactMembersCreate { moff, members } => {
     562           48210 :                 let (slru_kind, segno, blknum) =
     563           48210 :                     key_to_slru_block(key).context("invalid record")?;
     564           48210 :                 assert_eq!(
     565                 :                     slru_kind,
     566                 :                     SlruKind::MultiXactMembers,
     567 UBC           0 :                     "MultixactMembersCreate record with unexpected key {}",
     568                 :                     key
     569                 :                 );
     570 CBC      945038 :                 for (i, member) in members.iter().enumerate() {
     571          945038 :                     let offset = moff + i as u32;
     572          945038 : 
     573          945038 :                     // Compute the block and offset to modify.
     574          945038 :                     // See RecordNewMultiXact in PostgreSQL sources.
     575          945038 :                     let pageno = offset / pg_constants::MULTIXACT_MEMBERS_PER_PAGE as u32;
     576          945038 :                     let memberoff = mx_offset_to_member_offset(offset);
     577          945038 :                     let flagsoff = mx_offset_to_flags_offset(offset);
     578          945038 :                     let bshift = mx_offset_to_flags_bitshift(offset);
     579          945038 : 
     580          945038 :                     // Check that we're modifying the correct multixact-members block.
     581          945038 :                     let expected_segno = pageno / pg_constants::SLRU_PAGES_PER_SEGMENT;
     582          945038 :                     let expected_blknum = pageno % pg_constants::SLRU_PAGES_PER_SEGMENT;
     583          945038 :                     assert!(
     584          945038 :                         segno == expected_segno,
     585 UBC           0 :                         "MultiXactMembersCreate record for offset {} with unexpected key {}",
     586                 :                         moff,
     587                 :                         key
     588                 :                     );
     589 CBC      945038 :                     assert!(
     590          945038 :                         blknum == expected_blknum,
     591 UBC           0 :                         "MultiXactMembersCreate record for offset {} with unexpected key {}",
     592                 :                         moff,
     593                 :                         key
     594                 :                     );
     595                 : 
     596 CBC      945038 :                     let mut flagsval = LittleEndian::read_u32(&page[flagsoff..flagsoff + 4]);
     597          945038 :                     flagsval &= !(((1 << pg_constants::MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift);
     598          945038 :                     flagsval |= member.status << bshift;
     599          945038 :                     LittleEndian::write_u32(&mut page[flagsoff..flagsoff + 4], flagsval);
     600          945038 :                     LittleEndian::write_u32(&mut page[memberoff..memberoff + 4], member.xid);
     601                 :                 }
     602                 :             }
     603                 :         }
     604                 : 
     605        16159814 :         Ok(())
     606        16159814 :     }
     607                 : }
     608                 : 
     609                 : ///
     610                 : /// Command with ability not to give all file descriptors to child process
     611                 : ///
     612                 : trait CloseFileDescriptors: CommandExt {
     613                 :     ///
     614                 :     /// Close file descriptors (other than stdin, stdout, stderr) in child process
     615                 :     ///
     616                 :     fn close_fds(&mut self) -> &mut Command;
     617                 : }
     618                 : 
     619                 : impl<C: CommandExt> CloseFileDescriptors for C {
     620             544 :     fn close_fds(&mut self) -> &mut Command {
     621             544 :         // SAFETY: Code executed inside pre_exec should have async-signal-safety,
     622             544 :         // which means it should be safe to execute inside a signal handler.
     623             544 :         // The precise meaning depends on platform. See `man signal-safety`
     624             544 :         // for the linux definition.
     625             544 :         //
     626             544 :         // The set_fds_cloexec_threadsafe function is documented to be
     627             544 :         // async-signal-safe.
     628             544 :         //
     629             544 :         // Aside from this function, the rest of the code is re-entrant and
     630             544 :         // doesn't make any syscalls. We're just passing constants.
     631             544 :         //
     632             544 :         // NOTE: It's easy to indirectly cause a malloc or lock a mutex,
     633             544 :         // which is not async-signal-safe. Be careful.
     634             544 :         unsafe {
     635             544 :             self.pre_exec(move || {
     636 UBC           0 :                 close_fds::set_fds_cloexec_threadsafe(3, &[]);
     637               0 :                 Ok(())
     638 CBC         544 :             })
     639             544 :         }
     640             544 :     }
     641                 : }
     642                 : 
     643                 : struct WalRedoProcess {
     644                 :     #[allow(dead_code)]
     645                 :     conf: &'static PageServerConf,
     646                 :     tenant_shard_id: TenantShardId,
     647                 :     // Some() on construction, only becomes None on Drop.
     648                 :     child: Option<NoLeakChild>,
     649                 :     stdout: Mutex<ProcessOutput>,
     650                 :     stdin: Mutex<ProcessInput>,
     651                 :     /// Counter to separate same sized walredo inputs failing at the same millisecond.
     652                 :     #[cfg(feature = "testing")]
     653                 :     dump_sequence: AtomicUsize,
     654                 : }
     655                 : 
     656                 : impl WalRedoProcess {
     657                 :     //
     658                 :     // Start postgres binary in special WAL redo mode.
     659                 :     //
     660             544 :     #[instrument(skip_all,fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), pg_version=pg_version))]
     661                 :     fn launch(
     662                 :         conf: &'static PageServerConf,
     663                 :         tenant_shard_id: TenantShardId,
     664                 :         pg_version: u32,
     665                 :     ) -> anyhow::Result<Self> {
     666                 :         let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
     667                 :         let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
     668                 : 
     669                 :         // Start postgres itself
     670                 :         let child = Command::new(pg_bin_dir_path.join("postgres"))
     671                 :             .arg("--wal-redo")
     672                 :             .stdin(Stdio::piped())
     673                 :             .stderr(Stdio::piped())
     674                 :             .stdout(Stdio::piped())
     675                 :             .env_clear()
     676                 :             .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
     677                 :             .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
     678                 :             // The redo process is not trusted, and runs in seccomp mode that
     679                 :             // doesn't allow it to open any files. We have to also make sure it
     680                 :             // doesn't inherit any file descriptors from the pageserver, that
     681                 :             // would allow an attacker to read any files that happen to be open
     682                 :             // in the pageserver.
     683                 :             //
     684                 :             // The Rust standard library makes sure to mark any file descriptors with
     685                 :             // as close-on-exec by default, but that's not enough, since we use
     686                 :             // libraries that directly call libc open without setting that flag.
     687                 :             .close_fds()
     688                 :             .spawn_no_leak_child(tenant_shard_id)
     689                 :             .context("spawn process")?;
     690                 :         WAL_REDO_PROCESS_COUNTERS.started.inc();
     691 UBC           0 :         let mut child = scopeguard::guard(child, |child| {
     692               0 :             error!("killing wal-redo-postgres process due to a problem during launch");
     693               0 :             child.kill_and_wait(WalRedoKillCause::Startup);
     694               0 :         });
     695                 : 
     696                 :         let stdin = child.stdin.take().unwrap();
     697                 :         let stdout = child.stdout.take().unwrap();
     698                 :         let stderr = child.stderr.take().unwrap();
     699                 :         let stderr = tokio::process::ChildStderr::from_std(stderr)
     700                 :             .context("convert to tokio::ChildStderr")?;
     701                 :         macro_rules! set_nonblock_or_log_err {
     702                 :             ($file:ident) => {{
     703                 :                 let res = set_nonblock($file.as_raw_fd());
     704                 :                 if let Err(e) = &res {
     705                 :                     error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
     706                 :                 }
     707                 :                 res
     708                 :             }};
     709                 :         }
     710               0 :         set_nonblock_or_log_err!(stdin)?;
     711               0 :         set_nonblock_or_log_err!(stdout)?;
     712                 : 
     713                 :         // all fallible operations post-spawn are complete, so get rid of the guard
     714                 :         let child = scopeguard::ScopeGuard::into_inner(child);
     715                 : 
     716                 :         tokio::spawn(
     717 CBC         540 :             async move {
     718             265 :                 scopeguard::defer! {
     719             265 :                     debug!("wal-redo-postgres stderr_logger_task finished");
     720             265 :                     crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
     721                 :                 }
     722             540 :                 debug!("wal-redo-postgres stderr_logger_task started");
     723             540 :                 crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
     724             540 : 
     725             540 :                 use tokio::io::AsyncBufReadExt;
     726             540 :                 let mut stderr_lines = tokio::io::BufReader::new(stderr);
     727             540 :                 let mut buf = Vec::new();
     728             540 :                 let res = loop {
     729             540 :                     buf.clear();
     730             540 :                     // TODO we don't trust the process to cap its stderr length.
     731             540 :                     // Currently it can do unbounded Vec allocation.
     732             540 :                     match stderr_lines.read_until(b'\n', &mut buf).await {
     733             265 :                         Ok(0) => break Ok(()), // eof
     734 UBC           0 :                         Ok(num_bytes) => {
     735               0 :                             let output = String::from_utf8_lossy(&buf[..num_bytes]);
     736               0 :                             error!(%output, "received output");
     737                 :                         }
     738               0 :                         Err(e) => {
     739               0 :                             break Err(e);
     740                 :                         }
     741                 :                     }
     742                 :                 };
     743 CBC         265 :                 match res {
     744             265 :                     Ok(()) => (),
     745 UBC           0 :                     Err(e) => {
     746               0 :                         error!(error=?e, "failed to read from walredo stderr");
     747                 :                     }
     748                 :                 }
     749 CBC         265 :             }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
     750                 :         );
     751                 : 
     752                 :         Ok(Self {
     753                 :             conf,
     754                 :             tenant_shard_id,
     755                 :             child: Some(child),
     756                 :             stdin: Mutex::new(ProcessInput {
     757                 :                 stdin,
     758                 :                 n_requests: 0,
     759                 :             }),
     760                 :             stdout: Mutex::new(ProcessOutput {
     761                 :                 stdout,
     762                 :                 pending_responses: VecDeque::new(),
     763                 :                 n_processed_responses: 0,
     764                 :             }),
     765                 :             #[cfg(feature = "testing")]
     766                 :             dump_sequence: AtomicUsize::default(),
     767                 :         })
     768                 :     }
     769                 : 
     770         2056251 :     fn id(&self) -> u32 {
     771         2056251 :         self.child
     772         2056251 :             .as_ref()
     773         2056251 :             .expect("must not call this during Drop")
     774         2056251 :             .id()
     775         2056251 :     }
     776                 : 
     777                 :     // Apply given WAL records ('records') over an old page image. Returns
     778                 :     // new page image.
     779                 :     //
     780         2056251 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
     781                 :     fn apply_wal_records(
     782                 :         &self,
     783                 :         tag: BufferTag,
     784                 :         base_img: &Option<Bytes>,
     785                 :         records: &[(Lsn, NeonWalRecord)],
     786                 :         wal_redo_timeout: Duration,
     787                 :     ) -> anyhow::Result<Bytes> {
     788                 :         let input = self.stdin.lock().unwrap();
     789                 : 
     790                 :         // Serialize all the messages to send the WAL redo process first.
     791                 :         //
     792                 :         // This could be problematic if there are millions of records to replay,
     793                 :         // but in practice the number of records is usually so small that it doesn't
     794                 :         // matter, and it's better to keep this code simple.
     795                 :         //
     796                 :         // Most requests start with a before-image with BLCKSZ bytes, followed by
     797                 :         // by some other WAL records. Start with a buffer that can hold that
     798                 :         // comfortably.
     799                 :         let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
     800                 :         build_begin_redo_for_block_msg(tag, &mut writebuf);
     801                 :         if let Some(img) = base_img {
     802                 :             build_push_page_msg(tag, img, &mut writebuf);
     803                 :         }
     804                 :         for (lsn, rec) in records.iter() {
     805                 :             if let NeonWalRecord::Postgres {
     806                 :                 will_init: _,
     807                 :                 rec: postgres_rec,
     808                 :             } = rec
     809                 :             {
     810                 :                 build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
     811                 :             } else {
     812                 :                 anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
     813                 :             }
     814                 :         }
     815                 :         build_get_page_msg(tag, &mut writebuf);
     816                 :         WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
     817                 : 
     818                 :         let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
     819                 : 
     820                 :         if res.is_err() {
     821                 :             // not all of these can be caused by this particular input, however these are so rare
     822                 :             // in tests so capture all.
     823                 :             self.record_and_log(&writebuf);
     824                 :         }
     825                 : 
     826                 :         res
     827                 :     }
     828                 : 
     829         2056251 :     fn apply_wal_records0(
     830         2056251 :         &self,
     831         2056251 :         writebuf: &[u8],
     832         2056251 :         input: MutexGuard<ProcessInput>,
     833         2056251 :         wal_redo_timeout: Duration,
     834         2056251 :     ) -> anyhow::Result<Bytes> {
     835         2056251 :         let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
     836         2056251 :         let mut nwrite = 0usize;
     837         2056251 : 
     838         2056251 :         let mut stdin_pollfds = [PollFd::new(proc.stdin.as_raw_fd(), PollFlags::POLLOUT)];
     839                 : 
     840         4118051 :         while nwrite < writebuf.len() {
     841         2061800 :             let n = loop {
     842         2061800 :                 match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
     843 UBC           0 :                     Err(nix::errno::Errno::EINTR) => continue,
     844 CBC     2061800 :                     res => break res,
     845                 :                 }
     846 UBC           0 :             }?;
     847                 : 
     848 CBC     2061800 :             if n == 0 {
     849 UBC           0 :                 anyhow::bail!("WAL redo timed out");
     850 CBC     2061800 :             }
     851         2061800 : 
     852         2061800 :             // If 'stdin' is writeable, do write.
     853         2061800 :             let in_revents = stdin_pollfds[0].revents().unwrap();
     854         2061800 :             if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
     855         2061800 :                 nwrite += proc.stdin.write(&writebuf[nwrite..])?;
     856 UBC           0 :             }
     857 CBC     2061800 :             if in_revents.contains(PollFlags::POLLHUP) {
     858                 :                 // We still have more data to write, but the process closed the pipe.
     859 UBC           0 :                 anyhow::bail!("WAL redo process closed its stdin unexpectedly");
     860 CBC     2061800 :             }
     861                 :         }
     862         2056251 :         let request_no = proc.n_requests;
     863         2056251 :         proc.n_requests += 1;
     864         2056251 :         drop(proc);
     865         2056251 : 
     866         2056251 :         // To improve walredo performance we separate sending requests and receiving
     867         2056251 :         // responses. Them are protected by different mutexes (output and input).
     868         2056251 :         // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
     869         2056251 :         // then there is not warranty that T1 will first granted output mutex lock.
     870         2056251 :         // To address this issue we maintain number of sent requests, number of processed
     871         2056251 :         // responses and ring buffer with pending responses. After sending response
     872         2056251 :         // (under input mutex), threads remembers request number. Then it releases
     873         2056251 :         // input mutex, locks output mutex and fetch in ring buffer all responses until
     874         2056251 :         // its stored request number. The it takes correspondent element from
     875         2056251 :         // pending responses ring buffer and truncate all empty elements from the front,
     876         2056251 :         // advancing processed responses number.
     877         2056251 : 
     878         2056251 :         let mut output = self.stdout.lock().unwrap();
     879         2056251 :         let mut stdout_pollfds = [PollFd::new(output.stdout.as_raw_fd(), PollFlags::POLLIN)];
     880         2056251 :         let n_processed_responses = output.n_processed_responses;
     881         4112500 :         while n_processed_responses + output.pending_responses.len() <= request_no {
     882                 :             // We expect the WAL redo process to respond with an 8k page image. We read it
     883                 :             // into this buffer.
     884         2056251 :             let mut resultbuf = vec![0; BLCKSZ.into()];
     885         2056251 :             let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
     886         4112500 :             while nresult < BLCKSZ.into() {
     887                 :                 // We do two things simultaneously: reading response from stdout
     888                 :                 // and forward any logging information that the child writes to its stderr to the page server's log.
     889         2056251 :                 let n = loop {
     890                 :                     match nix::poll::poll(
     891         2056251 :                         &mut stdout_pollfds[..],
     892         2056251 :                         wal_redo_timeout.as_millis() as i32,
     893                 :                     ) {
     894 UBC           0 :                         Err(nix::errno::Errno::EINTR) => continue,
     895 CBC     2056251 :                         res => break res,
     896                 :                     }
     897 UBC           0 :                 }?;
     898                 : 
     899 CBC     2056251 :                 if n == 0 {
     900 UBC           0 :                     anyhow::bail!("WAL redo timed out");
     901 CBC     2056251 :                 }
     902         2056251 : 
     903         2056251 :                 // If we have some data in stdout, read it to the result buffer.
     904         2056251 :                 let out_revents = stdout_pollfds[0].revents().unwrap();
     905         2056251 :                 if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
     906         2056249 :                     nresult += output.stdout.read(&mut resultbuf[nresult..])?;
     907               2 :                 }
     908         2056251 :                 if out_revents.contains(PollFlags::POLLHUP) {
     909               2 :                     anyhow::bail!("WAL redo process closed its stdout unexpectedly");
     910         2056249 :                 }
     911                 :             }
     912         2056249 :             output
     913         2056249 :                 .pending_responses
     914         2056249 :                 .push_back(Some(Bytes::from(resultbuf)));
     915                 :         }
     916                 :         // Replace our request's response with None in `pending_responses`.
     917                 :         // Then make space in the ring buffer by clearing out any seqence of contiguous
     918                 :         // `None`'s from the front of `pending_responses`.
     919                 :         // NB: We can't pop_front() because other requests' responses because another
     920                 :         // requester might have grabbed the output mutex before us:
     921                 :         // T1: grab input mutex
     922                 :         // T1: send request_no 23
     923                 :         // T1: release input mutex
     924                 :         // T2: grab input mutex
     925                 :         // T2: send request_no 24
     926                 :         // T2: release input mutex
     927                 :         // T2: grab output mutex
     928                 :         // T2: n_processed_responses + output.pending_responses.len() <= request_no
     929                 :         //            23                                0                   24
     930                 :         // T2: enters poll loop that reads stdout
     931                 :         // T2: put response for 23 into pending_responses
     932                 :         // T2: put response for 24 into pending_resposnes
     933                 :         // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
     934                 :         // T2: takes its response_24
     935                 :         // pending_responses now looks like this: Front Some(response_23) None Back
     936                 :         // T2: does the while loop below
     937                 :         // pending_responses now looks like this: Front Some(response_23) None Back
     938                 :         // T2: releases output mutex
     939                 :         // T1: grabs output mutex
     940                 :         // T1: n_processed_responses + output.pending_responses.len() > request_no
     941                 :         //            23                                2                   23
     942                 :         // T1: skips poll loop that reads stdout
     943                 :         // T1: takes its response_23
     944                 :         // pending_responses now looks like this: Front None None Back
     945                 :         // T2: does the while loop below
     946                 :         // pending_responses now looks like this: Front Back
     947                 :         // n_processed_responses now has value 25
     948         2056249 :         let res = output.pending_responses[request_no - n_processed_responses]
     949         2056249 :             .take()
     950         2056249 :             .expect("we own this request_no, nobody else is supposed to take it");
     951         4112498 :         while let Some(front) = output.pending_responses.front() {
     952         2088372 :             if front.is_none() {
     953         2056249 :                 output.pending_responses.pop_front();
     954         2056249 :                 output.n_processed_responses += 1;
     955         2056249 :             } else {
     956           32123 :                 break;
     957                 :             }
     958                 :         }
     959         2056249 :         Ok(res)
     960         2056251 :     }
     961                 : 
     962                 :     #[cfg(feature = "testing")]
     963               2 :     fn record_and_log(&self, writebuf: &[u8]) {
     964               2 :         let millis = std::time::SystemTime::now()
     965               2 :             .duration_since(std::time::SystemTime::UNIX_EPOCH)
     966               2 :             .unwrap()
     967               2 :             .as_millis();
     968               2 : 
     969               2 :         let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
     970               2 : 
     971               2 :         // these files will be collected to an allure report
     972               2 :         let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
     973               2 : 
     974               2 :         let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
     975               2 : 
     976               2 :         let res = std::fs::OpenOptions::new()
     977               2 :             .write(true)
     978               2 :             .create_new(true)
     979               2 :             .read(true)
     980               2 :             .open(path)
     981               2 :             .and_then(|mut f| f.write_all(writebuf));
     982                 : 
     983                 :         // trip up allowed_errors
     984               2 :         if let Err(e) = res {
     985               2 :             tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
     986                 :         } else {
     987 UBC           0 :             tracing::error!(filename, "erroring walredo input saved");
     988                 :         }
     989 CBC           2 :     }
     990                 : 
     991                 :     #[cfg(not(feature = "testing"))]
     992                 :     fn record_and_log(&self, _: &[u8]) {}
     993                 : }
     994                 : 
     995                 : impl Drop for WalRedoProcess {
     996             269 :     fn drop(&mut self) {
     997             269 :         self.child
     998             269 :             .take()
     999             269 :             .expect("we only do this once")
    1000             269 :             .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
    1001             269 :         // no way to wait for stderr_logger_task from Drop because that is async only
    1002             269 :     }
    1003                 : }
    1004                 : 
    1005                 : /// Wrapper type around `std::process::Child` which guarantees that the child
    1006                 : /// will be killed and waited-for by this process before being dropped.
    1007                 : struct NoLeakChild {
    1008                 :     tenant_id: TenantShardId,
    1009                 :     child: Option<Child>,
    1010                 : }
    1011                 : 
    1012                 : impl Deref for NoLeakChild {
    1013                 :     type Target = Child;
    1014                 : 
    1015         2056795 :     fn deref(&self) -> &Self::Target {
    1016         2056795 :         self.child.as_ref().expect("must not use from drop")
    1017         2056795 :     }
    1018                 : }
    1019                 : 
    1020                 : impl DerefMut for NoLeakChild {
    1021            1632 :     fn deref_mut(&mut self) -> &mut Self::Target {
    1022            1632 :         self.child.as_mut().expect("must not use from drop")
    1023            1632 :     }
    1024                 : }
    1025                 : 
    1026                 : impl NoLeakChild {
    1027             544 :     fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result<Self> {
    1028             544 :         let child = command.spawn()?;
    1029             544 :         Ok(NoLeakChild {
    1030             544 :             tenant_id,
    1031             544 :             child: Some(child),
    1032             544 :         })
    1033             544 :     }
    1034                 : 
    1035             269 :     fn kill_and_wait(mut self, cause: WalRedoKillCause) {
    1036             269 :         let child = match self.child.take() {
    1037             269 :             Some(child) => child,
    1038 UBC           0 :             None => return,
    1039                 :         };
    1040 CBC         269 :         Self::kill_and_wait_impl(child, cause);
    1041             269 :     }
    1042                 : 
    1043             269 :     #[instrument(skip_all, fields(pid=child.id(), ?cause))]
    1044                 :     fn kill_and_wait_impl(mut child: Child, cause: WalRedoKillCause) {
    1045             269 :         scopeguard::defer! {
    1046             269 :             WAL_REDO_PROCESS_COUNTERS.killed_by_cause[cause].inc();
    1047             269 :         }
    1048                 :         let res = child.kill();
    1049                 :         if let Err(e) = res {
    1050                 :             // This branch is very unlikely because:
    1051                 :             // - We (= pageserver) spawned this process successfully, so, we're allowed to kill it.
    1052                 :             // - This is the only place that calls .kill()
    1053                 :             // - We consume `self`, so, .kill() can't be called twice.
    1054                 :             // - If the process exited by itself or was killed by someone else,
    1055                 :             //   .kill() will still succeed because we haven't wait()'ed yet.
    1056                 :             //
    1057                 :             // So, if we arrive here, we have really no idea what happened,
    1058                 :             // whether the PID stored in self.child is still valid, etc.
    1059                 :             // If this function were fallible, we'd return an error, but
    1060                 :             // since it isn't, all we can do is log an error and proceed
    1061                 :             // with the wait().
    1062 UBC           0 :             error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
    1063                 :         }
    1064                 : 
    1065                 :         match child.wait() {
    1066                 :             Ok(exit_status) => {
    1067 CBC         269 :                 info!(exit_status = %exit_status, "wait successful");
    1068                 :             }
    1069                 :             Err(e) => {
    1070 UBC           0 :                 error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)");
    1071                 :             }
    1072                 :         }
    1073                 :     }
    1074                 : }
    1075                 : 
    1076                 : impl Drop for NoLeakChild {
    1077 CBC         269 :     fn drop(&mut self) {
    1078             269 :         let child = match self.child.take() {
    1079 UBC           0 :             Some(child) => child,
    1080 CBC         269 :             None => return,
    1081                 :         };
    1082 UBC           0 :         let tenant_shard_id = self.tenant_id;
    1083               0 :         // Offload the kill+wait of the child process into the background.
    1084               0 :         // If someone stops the runtime, we'll leak the child process.
    1085               0 :         // We can ignore that case because we only stop the runtime on pageserver exit.
    1086               0 :         tokio::runtime::Handle::current().spawn(async move {
    1087               0 :             tokio::task::spawn_blocking(move || {
    1088                 :                 // Intentionally don't inherit the tracing context from whoever is dropping us.
    1089                 :                 // This thread here is going to outlive of our dropper.
    1090               0 :                 let span = tracing::info_span!(
    1091                 :                     "walredo",
    1092                 :                     tenant_id = %tenant_shard_id.tenant_id,
    1093               0 :                     shard_id = %tenant_shard_id.shard_slug()
    1094                 :                 );
    1095               0 :                 let _entered = span.enter();
    1096               0 :                 Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
    1097               0 :             })
    1098               0 :             .await
    1099               0 :         });
    1100 CBC         269 :     }
    1101                 : }
    1102                 : 
    1103                 : trait NoLeakChildCommandExt {
    1104                 :     fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild>;
    1105                 : }
    1106                 : 
    1107                 : impl NoLeakChildCommandExt for Command {
    1108             544 :     fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild> {
    1109             544 :         NoLeakChild::spawn(tenant_id, self)
    1110             544 :     }
    1111                 : }
    1112                 : 
    1113                 : // Functions for constructing messages to send to the postgres WAL redo
    1114                 : // process. See pgxn/neon_walredo/walredoproc.c for
    1115                 : // explanation of the protocol.
    1116                 : 
    1117         2056251 : fn build_begin_redo_for_block_msg(tag: BufferTag, buf: &mut Vec<u8>) {
    1118         2056251 :     let len = 4 + 1 + 4 * 4;
    1119         2056251 : 
    1120         2056251 :     buf.put_u8(b'B');
    1121         2056251 :     buf.put_u32(len as u32);
    1122         2056251 : 
    1123         2056251 :     tag.ser_into(buf)
    1124         2056251 :         .expect("serialize BufferTag should always succeed");
    1125         2056251 : }
    1126                 : 
    1127          431568 : fn build_push_page_msg(tag: BufferTag, base_img: &[u8], buf: &mut Vec<u8>) {
    1128          431568 :     assert!(base_img.len() == 8192);
    1129                 : 
    1130          431568 :     let len = 4 + 1 + 4 * 4 + base_img.len();
    1131          431568 : 
    1132          431568 :     buf.put_u8(b'P');
    1133          431568 :     buf.put_u32(len as u32);
    1134          431568 :     tag.ser_into(buf)
    1135          431568 :         .expect("serialize BufferTag should always succeed");
    1136          431568 :     buf.put(base_img);
    1137          431568 : }
    1138                 : 
    1139        56589791 : fn build_apply_record_msg(endlsn: Lsn, rec: &[u8], buf: &mut Vec<u8>) {
    1140        56589791 :     let len = 4 + 8 + rec.len();
    1141        56589791 : 
    1142        56589791 :     buf.put_u8(b'A');
    1143        56589791 :     buf.put_u32(len as u32);
    1144        56589791 :     buf.put_u64(endlsn.0);
    1145        56589791 :     buf.put(rec);
    1146        56589791 : }
    1147                 : 
    1148         2056251 : fn build_get_page_msg(tag: BufferTag, buf: &mut Vec<u8>) {
    1149         2056251 :     let len = 4 + 1 + 4 * 4;
    1150         2056251 : 
    1151         2056251 :     buf.put_u8(b'G');
    1152         2056251 :     buf.put_u32(len as u32);
    1153         2056251 :     tag.ser_into(buf)
    1154         2056251 :         .expect("serialize BufferTag should always succeed");
    1155         2056251 : }
    1156                 : 
    1157                 : #[cfg(test)]
    1158                 : mod tests {
    1159                 :     use super::PostgresRedoManager;
    1160                 :     use crate::repository::Key;
    1161                 :     use crate::{config::PageServerConf, walrecord::NeonWalRecord};
    1162                 :     use bytes::Bytes;
    1163                 :     use pageserver_api::shard::TenantShardId;
    1164                 :     use std::str::FromStr;
    1165                 :     use utils::{id::TenantId, lsn::Lsn};
    1166                 : 
    1167               1 :     #[tokio::test]
    1168               1 :     async fn short_v14_redo() {
    1169               1 :         let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
    1170               1 : 
    1171               1 :         let h = RedoHarness::new().unwrap();
    1172                 : 
    1173               1 :         let page = h
    1174               1 :             .manager
    1175               1 :             .request_redo(
    1176               1 :                 Key {
    1177               1 :                     field1: 0,
    1178               1 :                     field2: 1663,
    1179               1 :                     field3: 13010,
    1180               1 :                     field4: 1259,
    1181               1 :                     field5: 0,
    1182               1 :                     field6: 0,
    1183               1 :                 },
    1184               1 :                 Lsn::from_str("0/16E2408").unwrap(),
    1185               1 :                 None,
    1186               1 :                 short_records(),
    1187               1 :                 14,
    1188               1 :             )
    1189 UBC           0 :             .await
    1190 CBC           1 :             .unwrap();
    1191               1 : 
    1192               1 :         assert_eq!(&expected, &*page);
    1193                 :     }
    1194                 : 
    1195               1 :     #[tokio::test]
    1196               1 :     async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
    1197               1 :         let h = RedoHarness::new().unwrap();
    1198                 : 
    1199               1 :         let page = h
    1200               1 :             .manager
    1201               1 :             .request_redo(
    1202               1 :                 Key {
    1203               1 :                     field1: 0,
    1204               1 :                     field2: 1663,
    1205               1 :                     // key should be 13010
    1206               1 :                     field3: 13130,
    1207               1 :                     field4: 1259,
    1208               1 :                     field5: 0,
    1209               1 :                     field6: 0,
    1210               1 :                 },
    1211               1 :                 Lsn::from_str("0/16E2408").unwrap(),
    1212               1 :                 None,
    1213               1 :                 short_records(),
    1214               1 :                 14,
    1215               1 :             )
    1216 UBC           0 :             .await
    1217 CBC           1 :             .unwrap();
    1218               1 : 
    1219               1 :         // TODO: there will be some stderr printout, which is forwarded to tracing that could
    1220               1 :         // perhaps be captured as long as it's in the same thread.
    1221               1 :         assert_eq!(page, crate::ZERO_PAGE);
    1222                 :     }
    1223                 : 
    1224               1 :     #[tokio::test]
    1225               1 :     async fn test_stderr() {
    1226               1 :         let h = RedoHarness::new().unwrap();
    1227               1 :         h
    1228               1 :             .manager
    1229               1 :             .request_redo(
    1230               1 :                 Key::from_i128(0),
    1231               1 :                 Lsn::INVALID,
    1232               1 :                 None,
    1233               1 :                 short_records(),
    1234               1 :                 16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
    1235               1 :             )
    1236 UBC           0 :             .await
    1237 CBC           1 :             .unwrap_err();
    1238                 :     }
    1239                 : 
    1240                 :     #[allow(clippy::octal_escapes)]
    1241               3 :     fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
    1242               3 :         vec![
    1243               3 :             (
    1244               3 :                 Lsn::from_str("0/16A9388").unwrap(),
    1245               3 :                 NeonWalRecord::Postgres {
    1246               3 :                     will_init: true,
    1247               3 :                     rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
    1248               3 :                 }
    1249               3 :             ),
    1250               3 :             (
    1251               3 :                 Lsn::from_str("0/16D4080").unwrap(),
    1252               3 :                 NeonWalRecord::Postgres {
    1253               3 :                     will_init: false,
    1254               3 :                     rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
    1255               3 :                 }
    1256               3 :             )
    1257               3 :         ]
    1258               3 :     }
    1259                 : 
    1260                 :     struct RedoHarness {
    1261                 :         // underscored because unused, except for removal at drop
    1262                 :         _repo_dir: camino_tempfile::Utf8TempDir,
    1263                 :         manager: PostgresRedoManager,
    1264                 :     }
    1265                 : 
    1266                 :     impl RedoHarness {
    1267               3 :         fn new() -> anyhow::Result<Self> {
    1268               3 :             crate::tenant::harness::setup_logging();
    1269                 : 
    1270               3 :             let repo_dir = camino_tempfile::tempdir()?;
    1271               3 :             let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
    1272               3 :             let conf = Box::leak(Box::new(conf));
    1273               3 :             let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
    1274               3 : 
    1275               3 :             let manager = PostgresRedoManager::new(conf, tenant_shard_id);
    1276               3 : 
    1277               3 :             Ok(RedoHarness {
    1278               3 :                 _repo_dir: repo_dir,
    1279               3 :                 manager,
    1280               3 :             })
    1281               3 :         }
    1282                 :     }
    1283                 : }
        

Generated by: LCOV version 2.1-beta