LCOV - code coverage report
Current view: top level - pageserver/src - walredo.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 69.5 % 370 257
Test Date: 2025-07-16 12:29:03 Functions: 76.3 % 38 29

            Line data    Source code
       1              : //!
       2              : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
       3              : //! to apply given WAL records over an old page image and return new
       4              : //! page image.
       5              : //!
       6              : //! We rely on Postgres to perform WAL redo for us. We launch a
       7              : //! postgres process in special "wal redo" mode that's similar to
       8              : //! single-user mode. We then pass the previous page image, if any,
       9              : //! and all the WAL records we want to apply, to the postgres
      10              : //! process. Then we get the page image back. Communication with the
      11              : //! postgres process happens via stdin/stdout
      12              : //!
      13              : //! See pgxn/neon_walredo/walredoproc.c for the other side of
      14              : //! this communication.
      15              : //!
      16              : //! The Postgres process is assumed to be secure against malicious WAL
      17              : //! records. It achieves it by dropping privileges before replaying
      18              : //! any WAL records, so that even if an attacker hijacks the Postgres
      19              : //! process, he cannot escape out of it.
      20              : 
      21              : /// Process lifecycle and abstracction for the IPC protocol.
      22              : mod process;
      23              : 
      24              : /// Code to apply [`NeonWalRecord`]s.
      25              : pub(crate) mod apply_neon;
      26              : 
      27              : use std::future::Future;
      28              : use std::sync::Arc;
      29              : use std::time::{Duration, Instant};
      30              : 
      31              : use anyhow::Context;
      32              : use bytes::{Bytes, BytesMut};
      33              : use pageserver_api::key::Key;
      34              : use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus};
      35              : use pageserver_api::shard::TenantShardId;
      36              : use postgres_ffi::PgMajorVersion;
      37              : use tracing::*;
      38              : use utils::lsn::Lsn;
      39              : use utils::sync::gate::GateError;
      40              : use utils::sync::heavier_once_cell;
      41              : use wal_decoder::models::record::NeonWalRecord;
      42              : 
      43              : use crate::config::PageServerConf;
      44              : use crate::metrics::{
      45              :     WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
      46              :     WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME,
      47              : };
      48              : 
      49              : /// The real implementation that uses a Postgres process to
      50              : /// perform WAL replay.
      51              : ///
      52              : /// Only one thread can use the process at a time, that is controlled by the
      53              : /// Mutex. In the future, we might want to launch a pool of processes to allow
      54              : /// concurrent replay of multiple records.
      55              : pub struct PostgresRedoManager {
      56              :     tenant_shard_id: TenantShardId,
      57              :     conf: &'static PageServerConf,
      58              :     last_redo_at: std::sync::Mutex<Option<Instant>>,
      59              :     /// We use [`heavier_once_cell`] for
      60              :     ///
      61              :     /// 1. coalescing the lazy spawning of walredo processes ([`ProcessOnceCell::Spawned`])
      62              :     /// 2. prevent new processes from being spawned on [`Self::shutdown`] (=> [`ProcessOnceCell::ManagerShutDown`]).
      63              :     ///
      64              :     /// # Spawning
      65              :     ///
      66              :     /// Redo requests use the once cell to coalesce onto one call to [`process::WalRedoProcess::launch`].
      67              :     ///
      68              :     /// Notably, requests don't use the [`heavier_once_cell::Guard`] to keep ahold of the
      69              :     /// their process object; we use [`Arc::clone`] for that.
      70              :     ///
      71              :     /// This is primarily because earlier implementations that didn't  use [`heavier_once_cell`]
      72              :     /// had that behavior; it's probably unnecessary.
      73              :     /// The only merit of it is that if one walredo process encounters an error,
      74              :     /// it can take it out of rotation (= using [`heavier_once_cell::Guard::take_and_deinit`].
      75              :     /// and retry redo, thereby starting the new process, while other redo tasks might
      76              :     /// still be using the old redo process. But, those other tasks will most likely
      77              :     /// encounter an error as well, and errors are an unexpected condition anyway.
      78              :     /// So, probably we could get rid of the `Arc` in the future.
      79              :     ///
      80              :     /// # Shutdown
      81              :     ///
      82              :     /// See [`Self::launched_processes`].
      83              :     redo_process: heavier_once_cell::OnceCell<ProcessOnceCell>,
      84              : 
      85              :     /// Gate that is entered when launching a walredo process and held open
      86              :     /// until the process has been `kill()`ed and `wait()`ed upon.
      87              :     ///
      88              :     /// Manager shutdown waits for this gate to close after setting the
      89              :     /// [`ProcessOnceCell::ManagerShutDown`] state in [`Self::redo_process`].
      90              :     ///
      91              :     /// This type of usage is a bit unusual because gates usually keep track of
      92              :     /// concurrent operations, e.g., every [`Self::request_redo`] that is inflight.
      93              :     /// But we use it here to keep track of the _processes_ that we have launched,
      94              :     /// which may outlive any individual redo request because
      95              :     /// - we keep walredo process around until its quiesced to amortize spawn cost and
      96              :     /// - the Arc may be held by multiple concurrent redo requests, so, just because
      97              :     ///   you replace the [`Self::redo_process`] cell's content doesn't mean the
      98              :     ///   process gets killed immediately.
      99              :     ///
     100              :     /// We could simplify this by getting rid of the [`Arc`].
     101              :     /// See the comment on [`Self::redo_process`] for more details.
     102              :     launched_processes: utils::sync::gate::Gate,
     103              : }
     104              : 
     105              : /// See [`PostgresRedoManager::redo_process`].
     106              : enum ProcessOnceCell {
     107              :     Spawned(Arc<Process>),
     108              :     ManagerShutDown,
     109              : }
     110              : 
     111              : struct Process {
     112              :     process: process::WalRedoProcess,
     113              :     /// This field is last in this struct so the guard gets dropped _after_ [`Self::process`].
     114              :     /// (Reminder: dropping [`Self::process`] synchronously sends SIGKILL and then `wait()`s for it to exit).
     115              :     _launched_processes_guard: utils::sync::gate::GateGuard,
     116              : }
     117              : 
     118              : impl std::ops::Deref for Process {
     119              :     type Target = process::WalRedoProcess;
     120              : 
     121           12 :     fn deref(&self) -> &Self::Target {
     122           12 :         &self.process
     123           12 :     }
     124              : }
     125              : 
     126              : #[derive(Debug, thiserror::Error)]
     127              : pub enum Error {
     128              :     #[error("cancelled")]
     129              :     Cancelled,
     130              :     #[error(transparent)]
     131              :     Other(#[from] anyhow::Error),
     132              : }
     133              : 
     134              : macro_rules! bail {
     135              :     ($($arg:tt)*) => {
     136              :         return Err($crate::walredo::Error::Other(::anyhow::anyhow!($($arg)*)));
     137              :     }
     138              : }
     139              : 
     140              : #[derive(Debug, Clone, Copy)]
     141              : pub enum RedoAttemptType {
     142              :     /// Used for the read path. Will fire critical errors and retry twice if failure.
     143              :     ReadPage,
     144              :     // Used for legacy compaction (only used in image compaction). Will fire critical errors and retry once if failure.
     145              :     LegacyCompaction,
     146              :     // Used for gc compaction. Will not fire critical errors and not retry.
     147              :     GcCompaction,
     148              : }
     149              : 
     150              : impl std::fmt::Display for RedoAttemptType {
     151            3 :     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     152            3 :         match self {
     153            3 :             RedoAttemptType::ReadPage => write!(f, "read page"),
     154            0 :             RedoAttemptType::LegacyCompaction => write!(f, "legacy compaction"),
     155            0 :             RedoAttemptType::GcCompaction => write!(f, "gc compaction"),
     156              :         }
     157            3 :     }
     158              : }
     159              : 
     160              : ///
     161              : /// Public interface of WAL redo manager
     162              : ///
     163              : impl PostgresRedoManager {
     164              :     ///
     165              :     /// Request the WAL redo manager to apply some WAL records
     166              :     ///
     167              :     /// The WAL redo is handled by a separate thread, so this just sends a request
     168              :     /// to the thread and waits for response.
     169              :     ///
     170              :     /// # Cancel-Safety
     171              :     ///
     172              :     /// This method is cancellation-safe.
     173            3 :     pub async fn request_redo(
     174            3 :         &self,
     175            3 :         key: Key,
     176            3 :         lsn: Lsn,
     177            3 :         base_img: Option<(Lsn, Bytes)>,
     178            3 :         records: Vec<(Lsn, NeonWalRecord)>,
     179            3 :         pg_version: PgMajorVersion,
     180            3 :         redo_attempt_type: RedoAttemptType,
     181            3 :     ) -> Result<Bytes, Error> {
     182            3 :         if records.is_empty() {
     183            0 :             bail!("invalid WAL redo request with no records");
     184            3 :         }
     185              : 
     186            3 :         let max_retry_attempts = match redo_attempt_type {
     187            3 :             RedoAttemptType::ReadPage => 2,
     188            0 :             RedoAttemptType::LegacyCompaction => 1,
     189            0 :             RedoAttemptType::GcCompaction => 0,
     190              :         };
     191              : 
     192            3 :         let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
     193            3 :         let mut img = base_img.map(|p| p.1);
     194            3 :         let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1);
     195            3 :         let mut batch_start = 0;
     196            3 :         for (i, record) in records.iter().enumerate().skip(1) {
     197            3 :             let rec_neon = apply_neon::can_apply_in_neon(&record.1);
     198              : 
     199            3 :             if rec_neon != batch_neon {
     200            0 :                 let result = if batch_neon {
     201            0 :                     self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
     202              :                 } else {
     203            0 :                     self.apply_batch_postgres(
     204            0 :                         key,
     205            0 :                         lsn,
     206            0 :                         img,
     207            0 :                         base_img_lsn,
     208            0 :                         &records[batch_start..i],
     209            0 :                         self.conf.wal_redo_timeout,
     210            0 :                         pg_version,
     211            0 :                         max_retry_attempts,
     212            0 :                         redo_attempt_type,
     213            0 :                     )
     214            0 :                     .await
     215              :                 };
     216            0 :                 img = Some(result?);
     217              : 
     218            0 :                 batch_neon = rec_neon;
     219            0 :                 batch_start = i;
     220            3 :             }
     221              :         }
     222              :         // last batch
     223            3 :         if batch_neon {
     224            0 :             self.apply_batch_neon(key, lsn, img, &records[batch_start..])
     225              :         } else {
     226            3 :             self.apply_batch_postgres(
     227            3 :                 key,
     228            3 :                 lsn,
     229            3 :                 img,
     230            3 :                 base_img_lsn,
     231            3 :                 &records[batch_start..],
     232            3 :                 self.conf.wal_redo_timeout,
     233            3 :                 pg_version,
     234            3 :                 max_retry_attempts,
     235            3 :                 redo_attempt_type,
     236            3 :             )
     237            3 :             .await
     238              :         }
     239            3 :     }
     240              : 
     241              :     /// Do a ping request-response roundtrip.
     242              :     ///
     243              :     /// Not used in production, but by Rust benchmarks.
     244              :     ///
     245              :     /// # Cancel-Safety
     246              :     ///
     247              :     /// This method is cancellation-safe.
     248            1 :     pub async fn ping(&self, pg_version: PgMajorVersion) -> Result<(), Error> {
     249            1 :         self.do_with_walredo_process(pg_version, |proc| async move {
     250            1 :             proc.ping(Duration::from_secs(1))
     251            1 :                 .await
     252            1 :                 .map_err(Error::Other)
     253            2 :         })
     254            1 :         .await
     255            1 :     }
     256              : 
     257            0 :     pub fn status(&self) -> WalRedoManagerStatus {
     258              :         WalRedoManagerStatus {
     259              :             last_redo_at: {
     260            0 :                 let at = *self.last_redo_at.lock().unwrap();
     261            0 :                 at.and_then(|at| {
     262            0 :                     let age = at.elapsed();
     263              :                     // map any chrono errors silently to None here
     264            0 :                     chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
     265            0 :                 })
     266              :             },
     267            0 :             process: self.redo_process.get().and_then(|p| match &*p {
     268            0 :                 ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
     269            0 :                 ProcessOnceCell::ManagerShutDown => None,
     270            0 :             }),
     271              :         }
     272            0 :     }
     273              : }
     274              : 
     275              : impl PostgresRedoManager {
     276              :     ///
     277              :     /// Create a new PostgresRedoManager.
     278              :     ///
     279            4 :     pub fn new(
     280            4 :         conf: &'static PageServerConf,
     281            4 :         tenant_shard_id: TenantShardId,
     282            4 :     ) -> PostgresRedoManager {
     283              :         // The actual process is launched lazily, on first request.
     284            4 :         PostgresRedoManager {
     285            4 :             tenant_shard_id,
     286            4 :             conf,
     287            4 :             last_redo_at: std::sync::Mutex::default(),
     288            4 :             redo_process: heavier_once_cell::OnceCell::default(),
     289            4 :             launched_processes: utils::sync::gate::Gate::default(),
     290            4 :         }
     291            4 :     }
     292              : 
     293              :     /// Shut down the WAL redo manager.
     294              :     ///
     295              :     /// Returns `true` if this call was the one that initiated shutdown.
     296              :     /// `true` may be observed by no caller if the first caller stops polling.
     297              :     ///
     298              :     /// After this future completes
     299              :     /// - no redo process is running
     300              :     /// - no new redo process will be spawned
     301              :     /// - redo requests that need walredo process will fail with [`Error::Cancelled`]
     302              :     /// - [`apply_neon`]-only redo requests may still work, but this may change in the future
     303              :     ///
     304              :     /// # Cancel-Safety
     305              :     ///
     306              :     /// This method is cancellation-safe.
     307            0 :     pub async fn shutdown(&self) -> bool {
     308              :         // prevent new processes from being spawned
     309            0 :         let maybe_permit = match self.redo_process.get_or_init_detached().await {
     310            0 :             Ok(guard) => {
     311            0 :                 if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
     312            0 :                     None
     313              :                 } else {
     314            0 :                     let (proc, permit) = guard.take_and_deinit();
     315            0 :                     drop(proc); // this just drops the Arc, its refcount may not be zero yet
     316            0 :                     Some(permit)
     317              :                 }
     318              :             }
     319            0 :             Err(permit) => Some(permit),
     320              :         };
     321            0 :         let it_was_us = if let Some(permit) = maybe_permit {
     322            0 :             self.redo_process
     323            0 :                 .set(ProcessOnceCell::ManagerShutDown, permit);
     324            0 :             true
     325              :         } else {
     326            0 :             false
     327              :         };
     328              :         // wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
     329              :         // we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
     330              :         // for the underlying process.
     331            0 :         self.launched_processes.close().await;
     332            0 :         it_was_us
     333            0 :     }
     334              : 
     335              :     /// This type doesn't have its own background task to check for idleness: we
     336              :     /// rely on our owner calling this function periodically in its own housekeeping
     337              :     /// loops.
     338            0 :     pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
     339            0 :         if let Ok(g) = self.last_redo_at.try_lock() {
     340            0 :             if let Some(last_redo_at) = *g {
     341            0 :                 if last_redo_at.elapsed() >= idle_timeout {
     342            0 :                     drop(g);
     343            0 :                     drop(self.redo_process.get().map(|guard| guard.take_and_deinit()));
     344            0 :                 }
     345            0 :             }
     346            0 :         }
     347            0 :     }
     348              : 
     349              :     /// # Cancel-Safety
     350              :     ///
     351              :     /// This method is cancel-safe iff `closure` is cancel-safe.
     352            6 :     async fn do_with_walredo_process<
     353            6 :         F: FnOnce(Arc<Process>) -> Fut,
     354            6 :         Fut: Future<Output = Result<O, Error>>,
     355            6 :         O,
     356            6 :     >(
     357            6 :         &self,
     358            6 :         pg_version: PgMajorVersion,
     359            6 :         closure: F,
     360            6 :     ) -> Result<O, Error> {
     361            6 :         let proc: Arc<Process> = match self.redo_process.get_or_init_detached().await {
     362            0 :             Ok(guard) => match &*guard {
     363            0 :                 ProcessOnceCell::Spawned(proc) => Arc::clone(proc),
     364              :                 ProcessOnceCell::ManagerShutDown => {
     365            0 :                     return Err(Error::Cancelled);
     366              :                 }
     367              :             },
     368            6 :             Err(permit) => {
     369            6 :                 let start = Instant::now();
     370              :                 // acquire guard before spawning process, so that we don't spawn new processes
     371              :                 // if the gate is already closed.
     372            6 :                 let _launched_processes_guard = match self.launched_processes.enter() {
     373            6 :                     Ok(guard) => guard,
     374            0 :                     Err(GateError::GateClosed) => unreachable!(
     375              :                         "shutdown sets the once cell to `ManagerShutDown` state before closing the gate"
     376              :                     ),
     377              :                 };
     378            6 :                 let proc = Arc::new(Process {
     379            6 :                     process: process::WalRedoProcess::launch(
     380            6 :                         self.conf,
     381            6 :                         self.tenant_shard_id,
     382            6 :                         pg_version,
     383              :                     )
     384            6 :                     .context("launch walredo process")?,
     385            6 :                     _launched_processes_guard,
     386              :                 });
     387            6 :                 let duration = start.elapsed();
     388            6 :                 WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
     389            6 :                 info!(
     390            0 :                     elapsed_ms = duration.as_millis(),
     391            0 :                     pid = proc.id(),
     392            0 :                     "launched walredo process"
     393              :                 );
     394            6 :                 self.redo_process
     395            6 :                     .set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
     396            6 :                 proc
     397              :             }
     398              :         };
     399              : 
     400              :         // async closures are unstable, would support &Process
     401            6 :         let result = closure(proc.clone()).await;
     402              : 
     403            6 :         if result.is_err() {
     404              :             // Avoid concurrent callers hitting the same issue by taking `proc` out of the rotation.
     405              :             // Note that there may be other tasks concurrent with us that also hold `proc`.
     406              :             // We have to deal with that here.
     407              :             // Also read the doc comment on field `self.redo_process`.
     408              :             //
     409              :             // NB: there may still be other concurrent threads using `proc`.
     410              :             // The last one will send SIGKILL when the underlying Arc reaches refcount 0.
     411              :             //
     412              :             // NB: the drop impl blocks the dropping thread with a wait() system call for
     413              :             // the child process. In some ways the blocking is actually good: if we
     414              :             // deferred the waiting into the background / to tokio if we used `tokio::process`,
     415              :             // it could happen that if walredo always fails immediately, we spawn processes faster
     416              :             // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
     417              :             // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
     418              :             // This probably needs revisiting at some later point.
     419            3 :             match self.redo_process.get() {
     420            0 :                 None => (),
     421            3 :                 Some(guard) => {
     422            3 :                     match &*guard {
     423            0 :                         ProcessOnceCell::ManagerShutDown => {}
     424            3 :                         ProcessOnceCell::Spawned(guard_proc) => {
     425            3 :                             if Arc::ptr_eq(&proc, guard_proc) {
     426            3 :                                 // We're the first to observe an error from `proc`, it's our job to take it out of rotation.
     427            3 :                                 guard.take_and_deinit();
     428            3 :                             } else {
     429            0 :                                 // Another task already spawned another redo process (further up in this method)
     430            0 :                                 // and put it into `redo_process`. Do nothing, our view of the world is behind.
     431            0 :                             }
     432              :                         }
     433              :                     }
     434              :                 }
     435              :             }
     436              :             // The last task that does this `drop()` of `proc` will do a blocking `wait()` syscall.
     437            3 :             drop(proc);
     438            3 :         }
     439              : 
     440            6 :         result
     441            6 :     }
     442              : 
     443              :     ///
     444              :     /// Process one request for WAL redo using wal-redo postgres
     445              :     ///
     446              :     /// # Cancel-Safety
     447              :     ///
     448              :     /// Cancellation safe.
     449              :     #[allow(clippy::too_many_arguments)]
     450            3 :     async fn apply_batch_postgres(
     451            3 :         &self,
     452            3 :         key: Key,
     453            3 :         lsn: Lsn,
     454            3 :         base_img: Option<Bytes>,
     455            3 :         base_img_lsn: Lsn,
     456            3 :         records: &[(Lsn, NeonWalRecord)],
     457            3 :         wal_redo_timeout: Duration,
     458            3 :         pg_version: PgMajorVersion,
     459            3 :         max_retry_attempts: u32,
     460            3 :         redo_attempt_type: RedoAttemptType,
     461            3 :     ) -> Result<Bytes, Error> {
     462            3 :         *(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
     463              : 
     464            3 :         let (rel, blknum) = key.to_rel_block().context("invalid record")?;
     465            3 :         let mut n_attempts = 0u32;
     466              :         loop {
     467            5 :             let base_img = &base_img;
     468            5 :             let closure = |proc: Arc<Process>| async move {
     469            5 :                 let started_at = std::time::Instant::now();
     470              : 
     471              :                 // Relational WAL records are applied using wal-redo-postgres
     472            5 :                 let result = proc
     473            5 :                     .apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout)
     474            5 :                     .await
     475            5 :                     .context("apply_wal_records");
     476              : 
     477            5 :                 let duration = started_at.elapsed();
     478              : 
     479            5 :                 let len = records.len();
     480           10 :                 let nbytes = records.iter().fold(0, |acumulator, record| {
     481           10 :                     acumulator
     482           10 :                         + match &record.1 {
     483           10 :                             NeonWalRecord::Postgres { rec, .. } => rec.len(),
     484            0 :                             _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
     485              :                         }
     486           10 :                 });
     487              : 
     488            5 :                 WAL_REDO_TIME.observe(duration.as_secs_f64());
     489            5 :                 WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
     490            5 :                 WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
     491              : 
     492            5 :                 debug!(
     493            0 :                     "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
     494              :                     len,
     495              :                     nbytes,
     496            0 :                     duration.as_micros(),
     497              :                     lsn
     498              :                 );
     499              : 
     500            5 :                 if let Err(e) = result.as_ref() {
     501              :                     macro_rules! message {
     502              :                         ($level:tt) => {
     503              :                             $level!(
     504              :                                 "error applying {} WAL records {}..{} ({} bytes) to key {} during {}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
     505              :                                 records.len(),
     506              :                                 records.first().map(|p| p.0).unwrap_or(Lsn(0)),
     507              :                                 records.last().map(|p| p.0).unwrap_or(Lsn(0)),
     508              :                                 nbytes,
     509              :                                 key,
     510              :                                 redo_attempt_type,
     511              :                                 base_img_lsn,
     512              :                                 lsn,
     513              :                                 n_attempts,
     514              :                                 e,
     515              :                             )
     516              :                         }
     517              :                     }
     518            3 :                     match redo_attempt_type {
     519            3 :                         RedoAttemptType::ReadPage => message!(error),
     520            0 :                         RedoAttemptType::LegacyCompaction => message!(error),
     521            0 :                         RedoAttemptType::GcCompaction => message!(warn),
     522              :                     }
     523            2 :                 }
     524              : 
     525            5 :                 result.map_err(Error::Other)
     526           10 :             };
     527            5 :             let result = self.do_with_walredo_process(pg_version, closure).await;
     528              : 
     529            5 :             if result.is_ok() && n_attempts != 0 {
     530            0 :                 info!(n_attempts, "retried walredo succeeded");
     531            5 :             }
     532            5 :             n_attempts += 1;
     533            5 :             if n_attempts > max_retry_attempts || result.is_ok() {
     534            3 :                 return result;
     535            2 :             }
     536              :         }
     537            3 :     }
     538              : 
     539              :     ///
     540              :     /// Process a batch of WAL records using bespoken Neon code.
     541              :     ///
     542            0 :     fn apply_batch_neon(
     543            0 :         &self,
     544            0 :         key: Key,
     545            0 :         lsn: Lsn,
     546            0 :         base_img: Option<Bytes>,
     547            0 :         records: &[(Lsn, NeonWalRecord)],
     548            0 :     ) -> Result<Bytes, Error> {
     549            0 :         let start_time = Instant::now();
     550              : 
     551            0 :         let mut page = BytesMut::new();
     552            0 :         if let Some(fpi) = base_img {
     553            0 :             // If full-page image is provided, then use it...
     554            0 :             page.extend_from_slice(&fpi[..]);
     555            0 :         } else {
     556              :             // All the current WAL record types that we can handle require a base image.
     557            0 :             bail!("invalid neon WAL redo request with no base image");
     558              :         }
     559              : 
     560              :         // Apply all the WAL records in the batch
     561            0 :         for (record_lsn, record) in records.iter() {
     562            0 :             self.apply_record_neon(key, &mut page, *record_lsn, record)?;
     563              :         }
     564              :         // Success!
     565            0 :         let duration = start_time.elapsed();
     566              :         // FIXME: using the same metric here creates a bimodal distribution by default, and because
     567              :         // there could be multiple batch sizes this would be N+1 modal.
     568            0 :         WAL_REDO_TIME.observe(duration.as_secs_f64());
     569              : 
     570            0 :         debug!(
     571            0 :             "neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
     572            0 :             records.len(),
     573            0 :             duration.as_micros(),
     574              :             lsn
     575              :         );
     576              : 
     577            0 :         Ok(page.freeze())
     578            0 :     }
     579              : 
     580            0 :     fn apply_record_neon(
     581            0 :         &self,
     582            0 :         key: Key,
     583            0 :         page: &mut BytesMut,
     584            0 :         record_lsn: Lsn,
     585            0 :         record: &NeonWalRecord,
     586            0 :     ) -> anyhow::Result<()> {
     587            0 :         apply_neon::apply_in_neon(record, record_lsn, key, page)?;
     588              : 
     589            0 :         Ok(())
     590            0 :     }
     591              : }
     592              : 
     593              : #[cfg(test)]
     594              : pub(crate) mod harness {
     595              :     use super::PostgresRedoManager;
     596              :     use crate::config::PageServerConf;
     597              :     use utils::{id::TenantId, shard::TenantShardId};
     598              : 
     599              :     pub struct RedoHarness {
     600              :         // underscored because unused, except for removal at drop
     601              :         _repo_dir: camino_tempfile::Utf8TempDir,
     602              :         pub manager: PostgresRedoManager,
     603              :         tenant_shard_id: TenantShardId,
     604              :     }
     605              : 
     606              :     impl RedoHarness {
     607            4 :         pub fn new() -> anyhow::Result<Self> {
     608            4 :             crate::tenant::harness::setup_logging();
     609              : 
     610            4 :             let repo_dir = camino_tempfile::tempdir()?;
     611            4 :             let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
     612            4 :             let conf = Box::leak(Box::new(conf));
     613            4 :             let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
     614              : 
     615            4 :             let manager = PostgresRedoManager::new(conf, tenant_shard_id);
     616              : 
     617            4 :             Ok(RedoHarness {
     618            4 :                 _repo_dir: repo_dir,
     619            4 :                 manager,
     620            4 :                 tenant_shard_id,
     621            4 :             })
     622            4 :         }
     623            4 :         pub fn span(&self) -> tracing::Span {
     624            4 :             tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
     625            4 :         }
     626              :     }
     627              : }
     628              : 
     629              : #[cfg(test)]
     630              : mod tests {
     631              :     use std::str::FromStr;
     632              : 
     633              :     use bytes::Bytes;
     634              :     use pageserver_api::key::Key;
     635              :     use postgres_ffi::PgMajorVersion;
     636              :     use tracing::Instrument;
     637              :     use utils::lsn::Lsn;
     638              :     use wal_decoder::models::record::NeonWalRecord;
     639              : 
     640              :     use crate::walredo::RedoAttemptType;
     641              :     use crate::walredo::harness::RedoHarness;
     642              : 
     643              :     #[tokio::test]
     644            1 :     async fn test_ping() {
     645            1 :         let h = RedoHarness::new().unwrap();
     646              : 
     647            1 :         h.manager
     648            1 :             .ping(PgMajorVersion::PG14)
     649            1 :             .instrument(h.span())
     650            1 :             .await
     651            1 :             .expect("ping should work");
     652            1 :     }
     653              : 
     654              :     #[tokio::test]
     655            1 :     async fn short_v14_redo() {
     656            1 :         let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
     657              : 
     658            1 :         let h = RedoHarness::new().unwrap();
     659              : 
     660            1 :         let page = h
     661            1 :             .manager
     662            1 :             .request_redo(
     663            1 :                 Key {
     664            1 :                     field1: 0,
     665            1 :                     field2: 1663,
     666            1 :                     field3: 13010,
     667            1 :                     field4: 1259,
     668            1 :                     field5: 0,
     669            1 :                     field6: 0,
     670            1 :                 },
     671            1 :                 Lsn::from_str("0/16E2408").unwrap(),
     672            1 :                 None,
     673            1 :                 short_records(),
     674            1 :                 PgMajorVersion::PG14,
     675            1 :                 RedoAttemptType::ReadPage,
     676            1 :             )
     677            1 :             .instrument(h.span())
     678            1 :             .await
     679            1 :             .unwrap();
     680              : 
     681            1 :         assert_eq!(&expected, &*page);
     682            1 :     }
     683              : 
     684              :     #[tokio::test]
     685            1 :     async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
     686            1 :         let h = RedoHarness::new().unwrap();
     687              : 
     688            1 :         let page = h
     689            1 :             .manager
     690            1 :             .request_redo(
     691            1 :                 Key {
     692            1 :                     field1: 0,
     693            1 :                     field2: 1663,
     694            1 :                     // key should be 13010
     695            1 :                     field3: 13130,
     696            1 :                     field4: 1259,
     697            1 :                     field5: 0,
     698            1 :                     field6: 0,
     699            1 :                 },
     700            1 :                 Lsn::from_str("0/16E2408").unwrap(),
     701            1 :                 None,
     702            1 :                 short_records(),
     703            1 :                 PgMajorVersion::PG14,
     704            1 :                 RedoAttemptType::ReadPage,
     705            1 :             )
     706            1 :             .instrument(h.span())
     707            1 :             .await
     708            1 :             .unwrap();
     709              : 
     710              :         // TODO: there will be some stderr printout, which is forwarded to tracing that could
     711              :         // perhaps be captured as long as it's in the same thread.
     712            1 :         assert_eq!(page, crate::ZERO_PAGE);
     713            1 :     }
     714              : 
     715              :     #[tokio::test]
     716            1 :     async fn test_stderr() {
     717            1 :         let h = RedoHarness::new().unwrap();
     718            1 :         h
     719            1 :             .manager
     720            1 :             .request_redo(
     721            1 :                 Key::from_i128(0),
     722            1 :                 Lsn::INVALID,
     723            1 :                 None,
     724            1 :                 short_records(),
     725            1 :                 PgMajorVersion::PG16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
     726            1 :                 RedoAttemptType::ReadPage,
     727            1 :             )
     728            1 :             .instrument(h.span())
     729            1 :             .await
     730            1 :             .unwrap_err();
     731            1 :     }
     732              : 
     733              :     #[allow(clippy::octal_escapes)]
     734            3 :     fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
     735            3 :         vec![
     736            3 :             (
     737            3 :                 Lsn::from_str("0/16A9388").unwrap(),
     738            3 :                 NeonWalRecord::Postgres {
     739            3 :                     will_init: true,
     740            3 :                     rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
     741            3 :                 }
     742            3 :             ),
     743            3 :             (
     744            3 :                 Lsn::from_str("0/16D4080").unwrap(),
     745            3 :                 NeonWalRecord::Postgres {
     746            3 :                     will_init: false,
     747            3 :                     rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
     748            3 :                 }
     749            3 :             )
     750              :         ]
     751            3 :     }
     752              : }
        

Generated by: LCOV version 2.1-beta