LCOV - code coverage report
Current view: top level - pageserver/src - walredo.rs (source / functions) Coverage Total Hit
Test: ccf45ed1c149555259baec52d6229a81013dcd6a.info Lines: 65.2 % 348 227
Test Date: 2024-08-21 17:32:46 Functions: 52.9 % 34 18

            Line data    Source code
       1              : //!
       2              : //! WAL redo. This service runs PostgreSQL in a special wal_redo mode
       3              : //! to apply given WAL records over an old page image and return new
       4              : //! page image.
       5              : //!
       6              : //! We rely on Postgres to perform WAL redo for us. We launch a
       7              : //! postgres process in special "wal redo" mode that's similar to
       8              : //! single-user mode. We then pass the previous page image, if any,
       9              : //! and all the WAL records we want to apply, to the postgres
      10              : //! process. Then we get the page image back. Communication with the
      11              : //! postgres process happens via stdin/stdout
      12              : //!
      13              : //! See pgxn/neon_walredo/walredoproc.c for the other side of
      14              : //! this communication.
      15              : //!
      16              : //! The Postgres process is assumed to be secure against malicious WAL
      17              : //! records. It achieves it by dropping privileges before replaying
      18              : //! any WAL records, so that even if an attacker hijacks the Postgres
      19              : //! process, he cannot escape out of it.
      20              : 
      21              : /// Process lifecycle and abstracction for the IPC protocol.
      22              : mod process;
      23              : 
      24              : /// Code to apply [`NeonWalRecord`]s.
      25              : pub(crate) mod apply_neon;
      26              : 
      27              : use crate::config::PageServerConf;
      28              : use crate::metrics::{
      29              :     WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM,
      30              :     WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_TIME,
      31              : };
      32              : use crate::repository::Key;
      33              : use crate::walrecord::NeonWalRecord;
      34              : use anyhow::Context;
      35              : use bytes::{Bytes, BytesMut};
      36              : use pageserver_api::models::{WalRedoManagerProcessStatus, WalRedoManagerStatus};
      37              : use pageserver_api::shard::TenantShardId;
      38              : use std::sync::Arc;
      39              : use std::time::Duration;
      40              : use std::time::Instant;
      41              : use tracing::*;
      42              : use utils::lsn::Lsn;
      43              : use utils::sync::gate::GateError;
      44              : use utils::sync::heavier_once_cell;
      45              : 
      46              : ///
      47              : /// This is the real implementation that uses a Postgres process to
      48              : /// perform WAL replay. Only one thread can use the process at a time,
      49              : /// that is controlled by the Mutex. In the future, we might want to
      50              : /// launch a pool of processes to allow concurrent replay of multiple
      51              : /// records.
      52              : ///
      53              : pub struct PostgresRedoManager {
      54              :     tenant_shard_id: TenantShardId,
      55              :     conf: &'static PageServerConf,
      56              :     last_redo_at: std::sync::Mutex<Option<Instant>>,
      57              :     /// We use [`heavier_once_cell`] for
      58              :     ///
      59              :     /// 1. coalescing the lazy spawning of walredo processes ([`ProcessOnceCell::Spawned`])
      60              :     /// 2. prevent new processes from being spawned on [`Self::shutdown`] (=> [`ProcessOnceCell::ManagerShutDown`]).
      61              :     ///
      62              :     /// # Spawning
      63              :     ///
      64              :     /// Redo requests use the once cell to coalesce onto one call to [`process::WalRedoProcess::launch`].
      65              :     ///
      66              :     /// Notably, requests don't use the [`heavier_once_cell::Guard`] to keep ahold of the
      67              :     /// their process object; we use [`Arc::clone`] for that.
      68              :     ///
      69              :     /// This is primarily because earlier implementations that didn't  use [`heavier_once_cell`]
      70              :     /// had that behavior; it's probably unnecessary.
      71              :     /// The only merit of it is that if one walredo process encounters an error,
      72              :     /// it can take it out of rotation (= using [`heavier_once_cell::Guard::take_and_deinit`].
      73              :     /// and retry redo, thereby starting the new process, while other redo tasks might
      74              :     /// still be using the old redo process. But, those other tasks will most likely
      75              :     /// encounter an error as well, and errors are an unexpected condition anyway.
      76              :     /// So, probably we could get rid of the `Arc` in the future.
      77              :     ///
      78              :     /// # Shutdown
      79              :     ///
      80              :     /// See [`Self::launched_processes`].
      81              :     redo_process: heavier_once_cell::OnceCell<ProcessOnceCell>,
      82              : 
      83              :     /// Gate that is entered when launching a walredo process and held open
      84              :     /// until the process has been `kill()`ed and `wait()`ed upon.
      85              :     ///
      86              :     /// Manager shutdown waits for this gate to close after setting the
      87              :     /// [`ProcessOnceCell::ManagerShutDown`] state in [`Self::redo_process`].
      88              :     ///
      89              :     /// This type of usage is a bit unusual because gates usually keep track of
      90              :     /// concurrent operations, e.g., every [`Self::request_redo`] that is inflight.
      91              :     /// But we use it here to keep track of the _processes_ that we have launched,
      92              :     /// which may outlive any individual redo request because
      93              :     /// - we keep walredo process around until its quiesced to amortize spawn cost and
      94              :     /// - the Arc may be held by multiple concurrent redo requests, so, just because
      95              :     ///   you replace the [`Self::redo_process`] cell's content doesn't mean the
      96              :     ///   process gets killed immediately.
      97              :     ///
      98              :     /// We could simplify this by getting rid of the [`Arc`].
      99              :     /// See the comment on [`Self::redo_process`] for more details.
     100              :     launched_processes: utils::sync::gate::Gate,
     101              : }
     102              : 
     103              : /// See [`PostgresRedoManager::redo_process`].
     104              : enum ProcessOnceCell {
     105              :     Spawned(Arc<Process>),
     106              :     ManagerShutDown,
     107              : }
     108              : 
     109              : struct Process {
     110              :     process: process::WalRedoProcess,
     111              :     /// This field is last in this struct so the guard gets dropped _after_ [`Self::process`].
     112              :     /// (Reminder: dropping [`Self::process`] synchronously sends SIGKILL and then `wait()`s for it to exit).
     113              :     _launched_processes_guard: utils::sync::gate::GateGuard,
     114              : }
     115              : 
     116              : impl std::ops::Deref for Process {
     117              :     type Target = process::WalRedoProcess;
     118              : 
     119           16 :     fn deref(&self) -> &Self::Target {
     120           16 :         &self.process
     121           16 :     }
     122              : }
     123              : 
     124            0 : #[derive(Debug, thiserror::Error)]
     125              : pub enum Error {
     126              :     #[error("cancelled")]
     127              :     Cancelled,
     128              :     #[error(transparent)]
     129              :     Other(#[from] anyhow::Error),
     130              : }
     131              : 
     132              : macro_rules! bail {
     133              :     ($($arg:tt)*) => {
     134              :         return Err($crate::walredo::Error::Other(::anyhow::anyhow!($($arg)*)));
     135              :     }
     136              : }
     137              : 
     138              : ///
     139              : /// Public interface of WAL redo manager
     140              : ///
     141              : impl PostgresRedoManager {
     142              :     ///
     143              :     /// Request the WAL redo manager to apply some WAL records
     144              :     ///
     145              :     /// The WAL redo is handled by a separate thread, so this just sends a request
     146              :     /// to the thread and waits for response.
     147              :     ///
     148              :     /// # Cancel-Safety
     149              :     ///
     150              :     /// This method is cancellation-safe.
     151            6 :     pub async fn request_redo(
     152            6 :         &self,
     153            6 :         key: Key,
     154            6 :         lsn: Lsn,
     155            6 :         base_img: Option<(Lsn, Bytes)>,
     156            6 :         records: Vec<(Lsn, NeonWalRecord)>,
     157            6 :         pg_version: u32,
     158            6 :     ) -> Result<Bytes, Error> {
     159            6 :         if records.is_empty() {
     160            0 :             bail!("invalid WAL redo request with no records");
     161            6 :         }
     162            6 : 
     163            6 :         let base_img_lsn = base_img.as_ref().map(|p| p.0).unwrap_or(Lsn::INVALID);
     164            6 :         let mut img = base_img.map(|p| p.1);
     165            6 :         let mut batch_neon = apply_neon::can_apply_in_neon(&records[0].1);
     166            6 :         let mut batch_start = 0;
     167            6 :         for (i, record) in records.iter().enumerate().skip(1) {
     168            6 :             let rec_neon = apply_neon::can_apply_in_neon(&record.1);
     169            6 : 
     170            6 :             if rec_neon != batch_neon {
     171            0 :                 let result = if batch_neon {
     172            0 :                     self.apply_batch_neon(key, lsn, img, &records[batch_start..i])
     173              :                 } else {
     174            0 :                     self.apply_batch_postgres(
     175            0 :                         key,
     176            0 :                         lsn,
     177            0 :                         img,
     178            0 :                         base_img_lsn,
     179            0 :                         &records[batch_start..i],
     180            0 :                         self.conf.wal_redo_timeout,
     181            0 :                         pg_version,
     182            0 :                     )
     183            0 :                     .await
     184              :                 };
     185            0 :                 img = Some(result?);
     186              : 
     187            0 :                 batch_neon = rec_neon;
     188            0 :                 batch_start = i;
     189            6 :             }
     190              :         }
     191              :         // last batch
     192            6 :         if batch_neon {
     193            0 :             self.apply_batch_neon(key, lsn, img, &records[batch_start..])
     194              :         } else {
     195            6 :             self.apply_batch_postgres(
     196            6 :                 key,
     197            6 :                 lsn,
     198            6 :                 img,
     199            6 :                 base_img_lsn,
     200            6 :                 &records[batch_start..],
     201            6 :                 self.conf.wal_redo_timeout,
     202            6 :                 pg_version,
     203            6 :             )
     204           16 :             .await
     205              :         }
     206            6 :     }
     207              : 
     208            0 :     pub fn status(&self) -> WalRedoManagerStatus {
     209            0 :         WalRedoManagerStatus {
     210            0 :             last_redo_at: {
     211            0 :                 let at = *self.last_redo_at.lock().unwrap();
     212            0 :                 at.and_then(|at| {
     213            0 :                     let age = at.elapsed();
     214            0 :                     // map any chrono errors silently to None here
     215            0 :                     chrono::Utc::now().checked_sub_signed(chrono::Duration::from_std(age).ok()?)
     216            0 :                 })
     217            0 :             },
     218            0 :             process: self.redo_process.get().and_then(|p| match &*p {
     219            0 :                 ProcessOnceCell::Spawned(p) => Some(WalRedoManagerProcessStatus { pid: p.id() }),
     220            0 :                 ProcessOnceCell::ManagerShutDown => None,
     221            0 :             }),
     222            0 :         }
     223            0 :     }
     224              : }
     225              : 
     226              : impl PostgresRedoManager {
     227              :     ///
     228              :     /// Create a new PostgresRedoManager.
     229              :     ///
     230            6 :     pub fn new(
     231            6 :         conf: &'static PageServerConf,
     232            6 :         tenant_shard_id: TenantShardId,
     233            6 :     ) -> PostgresRedoManager {
     234            6 :         // The actual process is launched lazily, on first request.
     235            6 :         PostgresRedoManager {
     236            6 :             tenant_shard_id,
     237            6 :             conf,
     238            6 :             last_redo_at: std::sync::Mutex::default(),
     239            6 :             redo_process: heavier_once_cell::OnceCell::default(),
     240            6 :             launched_processes: utils::sync::gate::Gate::default(),
     241            6 :         }
     242            6 :     }
     243              : 
     244              :     /// Shut down the WAL redo manager.
     245              :     ///
     246              :     /// Returns `true` if this call was the one that initiated shutdown.
     247              :     /// `true` may be observed by no caller if the first caller stops polling.
     248              :     ///
     249              :     /// After this future completes
     250              :     /// - no redo process is running
     251              :     /// - no new redo process will be spawned
     252              :     /// - redo requests that need walredo process will fail with [`Error::Cancelled`]
     253              :     /// - [`apply_neon`]-only redo requests may still work, but this may change in the future
     254              :     ///
     255              :     /// # Cancel-Safety
     256              :     ///
     257              :     /// This method is cancellation-safe.
     258            0 :     pub async fn shutdown(&self) -> bool {
     259              :         // prevent new processes from being spawned
     260            0 :         let maybe_permit = match self.redo_process.get_or_init_detached().await {
     261            0 :             Ok(guard) => {
     262            0 :                 if matches!(&*guard, ProcessOnceCell::ManagerShutDown) {
     263            0 :                     None
     264              :                 } else {
     265            0 :                     let (proc, permit) = guard.take_and_deinit();
     266            0 :                     drop(proc); // this just drops the Arc, its refcount may not be zero yet
     267            0 :                     Some(permit)
     268              :                 }
     269              :             }
     270            0 :             Err(permit) => Some(permit),
     271              :         };
     272            0 :         let it_was_us = if let Some(permit) = maybe_permit {
     273            0 :             self.redo_process
     274            0 :                 .set(ProcessOnceCell::ManagerShutDown, permit);
     275            0 :             true
     276              :         } else {
     277            0 :             false
     278              :         };
     279              :         // wait for ongoing requests to drain and the refcounts of all Arc<WalRedoProcess> that
     280              :         // we ever launched to drop to zero, which when it happens synchronously kill()s & wait()s
     281              :         // for the underlying process.
     282            0 :         self.launched_processes.close().await;
     283            0 :         it_was_us
     284            0 :     }
     285              : 
     286              :     /// This type doesn't have its own background task to check for idleness: we
     287              :     /// rely on our owner calling this function periodically in its own housekeeping
     288              :     /// loops.
     289            0 :     pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
     290            0 :         if let Ok(g) = self.last_redo_at.try_lock() {
     291            0 :             if let Some(last_redo_at) = *g {
     292            0 :                 if last_redo_at.elapsed() >= idle_timeout {
     293            0 :                     drop(g);
     294            0 :                     drop(self.redo_process.get().map(|guard| guard.take_and_deinit()));
     295            0 :                 }
     296            0 :             }
     297            0 :         }
     298            0 :     }
     299              : 
     300              :     ///
     301              :     /// Process one request for WAL redo using wal-redo postgres
     302              :     ///
     303              :     /// # Cancel-Safety
     304              :     ///
     305              :     /// Cancellation safe.
     306              :     #[allow(clippy::too_many_arguments)]
     307            6 :     async fn apply_batch_postgres(
     308            6 :         &self,
     309            6 :         key: Key,
     310            6 :         lsn: Lsn,
     311            6 :         base_img: Option<Bytes>,
     312            6 :         base_img_lsn: Lsn,
     313            6 :         records: &[(Lsn, NeonWalRecord)],
     314            6 :         wal_redo_timeout: Duration,
     315            6 :         pg_version: u32,
     316            6 :     ) -> Result<Bytes, Error> {
     317            6 :         *(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
     318              : 
     319            6 :         let (rel, blknum) = key.to_rel_block().context("invalid record")?;
     320              :         const MAX_RETRY_ATTEMPTS: u32 = 1;
     321            6 :         let mut n_attempts = 0u32;
     322              :         loop {
     323            8 :             let proc: Arc<Process> = match self.redo_process.get_or_init_detached().await {
     324            0 :                 Ok(guard) => match &*guard {
     325            0 :                     ProcessOnceCell::Spawned(proc) => Arc::clone(proc),
     326              :                     ProcessOnceCell::ManagerShutDown => {
     327            0 :                         return Err(Error::Cancelled);
     328              :                     }
     329              :                 },
     330            8 :                 Err(permit) => {
     331            8 :                     let start = Instant::now();
     332              :                     // acquire guard before spawning process, so that we don't spawn new processes
     333              :                     // if the gate is already closed.
     334            8 :                     let _launched_processes_guard = match self.launched_processes.enter() {
     335            8 :                                 Ok(guard) => guard,
     336            0 :                                 Err(GateError::GateClosed) => unreachable!(
     337            0 :                                     "shutdown sets the once cell to `ManagerShutDown` state before closing the gate"
     338            0 :                                 ),
     339              :                             };
     340            8 :                     let proc = Arc::new(Process {
     341            8 :                         process: process::WalRedoProcess::launch(
     342            8 :                             self.conf,
     343            8 :                             self.tenant_shard_id,
     344            8 :                             pg_version,
     345            8 :                         )
     346            8 :                         .context("launch walredo process")?,
     347            8 :                         _launched_processes_guard,
     348            8 :                     });
     349            8 :                     let duration = start.elapsed();
     350            8 :                     WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM.observe(duration.as_secs_f64());
     351            8 :                     info!(
     352            0 :                         duration_ms = duration.as_millis(),
     353            0 :                         pid = proc.id(),
     354            0 :                         "launched walredo process"
     355              :                     );
     356            8 :                     self.redo_process
     357            8 :                         .set(ProcessOnceCell::Spawned(Arc::clone(&proc)), permit);
     358            8 :                     proc
     359              :                 }
     360              :             };
     361              : 
     362            8 :             let started_at = std::time::Instant::now();
     363              : 
     364              :             // Relational WAL records are applied using wal-redo-postgres
     365            8 :             let result = proc
     366            8 :                 .apply_wal_records(rel, blknum, &base_img, records, wal_redo_timeout)
     367           16 :                 .await
     368            8 :                 .context("apply_wal_records");
     369            8 : 
     370            8 :             let duration = started_at.elapsed();
     371            8 : 
     372            8 :             let len = records.len();
     373           16 :             let nbytes = records.iter().fold(0, |acumulator, record| {
     374           16 :                 acumulator
     375           16 :                     + match &record.1 {
     376           16 :                         NeonWalRecord::Postgres { rec, .. } => rec.len(),
     377            0 :                         _ => unreachable!("Only PostgreSQL records are accepted in this batch"),
     378              :                     }
     379           16 :             });
     380            8 : 
     381            8 :             WAL_REDO_TIME.observe(duration.as_secs_f64());
     382            8 :             WAL_REDO_RECORDS_HISTOGRAM.observe(len as f64);
     383            8 :             WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
     384            8 : 
     385            8 :             debug!(
     386            0 :                 "postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
     387            0 :                 len,
     388            0 :                 nbytes,
     389            0 :                 duration.as_micros(),
     390              :                 lsn
     391              :             );
     392              : 
     393              :             // If something went wrong, don't try to reuse the process. Kill it, and
     394              :             // next request will launch a new one.
     395            8 :             if let Err(e) = result.as_ref() {
     396            4 :                 error!(
     397            0 :                     "error applying {} WAL records {}..{} ({} bytes) to key {key}, from base image with LSN {} to reconstruct page image at LSN {} n_attempts={}: {:?}",
     398            0 :                     records.len(),
     399            4 :                     records.first().map(|p| p.0).unwrap_or(Lsn(0)),
     400            4 :                     records.last().map(|p| p.0).unwrap_or(Lsn(0)),
     401              :                     nbytes,
     402              :                     base_img_lsn,
     403              :                     lsn,
     404              :                     n_attempts,
     405              :                     e,
     406              :                 );
     407              :                 // Avoid concurrent callers hitting the same issue by taking `proc` out of the rotation.
     408              :                 // Note that there may be other tasks concurrent with us that also hold `proc`.
     409              :                 // We have to deal with that here.
     410              :                 // Also read the doc comment on field `self.redo_process`.
     411              :                 //
     412              :                 // NB: there may still be other concurrent threads using `proc`.
     413              :                 // The last one will send SIGKILL when the underlying Arc reaches refcount 0.
     414              :                 //
     415              :                 // NB: the drop impl blocks the dropping thread with a wait() system call for
     416              :                 // the child process. In some ways the blocking is actually good: if we
     417              :                 // deferred the waiting into the background / to tokio if we used `tokio::process`,
     418              :                 // it could happen that if walredo always fails immediately, we spawn processes faster
     419              :                 // than we can SIGKILL & `wait` for them to exit. By doing it the way we do here,
     420              :                 // we limit this risk of run-away to at most $num_runtimes * $num_executor_threads.
     421              :                 // This probably needs revisiting at some later point.
     422            4 :                 match self.redo_process.get() {
     423            0 :                     None => (),
     424            4 :                     Some(guard) => {
     425            4 :                         match &*guard {
     426            0 :                             ProcessOnceCell::ManagerShutDown => {}
     427            4 :                             ProcessOnceCell::Spawned(guard_proc) => {
     428            4 :                                 if Arc::ptr_eq(&proc, guard_proc) {
     429            4 :                                     // We're the first to observe an error from `proc`, it's our job to take it out of rotation.
     430            4 :                                     guard.take_and_deinit();
     431            4 :                                 } else {
     432            0 :                                     // Another task already spawned another redo process (further up in this method)
     433            0 :                                     // and put it into `redo_process`. Do nothing, our view of the world is behind.
     434            0 :                                 }
     435              :                             }
     436              :                         }
     437              :                     }
     438              :                 }
     439              :                 // The last task that does this `drop()` of `proc` will do a blocking `wait()` syscall.
     440            4 :                 drop(proc);
     441            4 :             } else if n_attempts != 0 {
     442            0 :                 info!(n_attempts, "retried walredo succeeded");
     443            4 :             }
     444            8 :             n_attempts += 1;
     445            8 :             if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
     446            6 :                 return result.map_err(Error::Other);
     447            2 :             }
     448              :         }
     449            6 :     }
     450              : 
     451              :     ///
     452              :     /// Process a batch of WAL records using bespoken Neon code.
     453              :     ///
     454            0 :     fn apply_batch_neon(
     455            0 :         &self,
     456            0 :         key: Key,
     457            0 :         lsn: Lsn,
     458            0 :         base_img: Option<Bytes>,
     459            0 :         records: &[(Lsn, NeonWalRecord)],
     460            0 :     ) -> Result<Bytes, Error> {
     461            0 :         let start_time = Instant::now();
     462            0 : 
     463            0 :         let mut page = BytesMut::new();
     464            0 :         if let Some(fpi) = base_img {
     465            0 :             // If full-page image is provided, then use it...
     466            0 :             page.extend_from_slice(&fpi[..]);
     467            0 :         } else {
     468              :             // All the current WAL record types that we can handle require a base image.
     469            0 :             bail!("invalid neon WAL redo request with no base image");
     470              :         }
     471              : 
     472              :         // Apply all the WAL records in the batch
     473            0 :         for (record_lsn, record) in records.iter() {
     474            0 :             self.apply_record_neon(key, &mut page, *record_lsn, record)?;
     475              :         }
     476              :         // Success!
     477            0 :         let duration = start_time.elapsed();
     478            0 :         // FIXME: using the same metric here creates a bimodal distribution by default, and because
     479            0 :         // there could be multiple batch sizes this would be N+1 modal.
     480            0 :         WAL_REDO_TIME.observe(duration.as_secs_f64());
     481            0 : 
     482            0 :         debug!(
     483            0 :             "neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
     484            0 :             records.len(),
     485            0 :             duration.as_micros(),
     486              :             lsn
     487              :         );
     488              : 
     489            0 :         Ok(page.freeze())
     490            0 :     }
     491              : 
     492            0 :     fn apply_record_neon(
     493            0 :         &self,
     494            0 :         key: Key,
     495            0 :         page: &mut BytesMut,
     496            0 :         record_lsn: Lsn,
     497            0 :         record: &NeonWalRecord,
     498            0 :     ) -> anyhow::Result<()> {
     499            0 :         apply_neon::apply_in_neon(record, record_lsn, key, page)?;
     500              : 
     501            0 :         Ok(())
     502            0 :     }
     503              : }
     504              : 
     505              : #[cfg(test)]
     506              : mod tests {
     507              :     use super::PostgresRedoManager;
     508              :     use crate::repository::Key;
     509              :     use crate::{config::PageServerConf, walrecord::NeonWalRecord};
     510              :     use bytes::Bytes;
     511              :     use pageserver_api::shard::TenantShardId;
     512              :     use std::str::FromStr;
     513              :     use tracing::Instrument;
     514              :     use utils::{id::TenantId, lsn::Lsn};
     515              : 
     516              :     #[tokio::test]
     517            2 :     async fn short_v14_redo() {
     518            2 :         let expected = std::fs::read("test_data/short_v14_redo.page").unwrap();
     519            2 : 
     520            2 :         let h = RedoHarness::new().unwrap();
     521            2 : 
     522            2 :         let page = h
     523            2 :             .manager
     524            2 :             .request_redo(
     525            2 :                 Key {
     526            2 :                     field1: 0,
     527            2 :                     field2: 1663,
     528            2 :                     field3: 13010,
     529            2 :                     field4: 1259,
     530            2 :                     field5: 0,
     531            2 :                     field6: 0,
     532            2 :                 },
     533            2 :                 Lsn::from_str("0/16E2408").unwrap(),
     534            2 :                 None,
     535            2 :                 short_records(),
     536            2 :                 14,
     537            2 :             )
     538            2 :             .instrument(h.span())
     539            4 :             .await
     540            2 :             .unwrap();
     541            2 : 
     542            2 :         assert_eq!(&expected, &*page);
     543            2 :     }
     544              : 
     545              :     #[tokio::test]
     546            2 :     async fn short_v14_fails_for_wrong_key_but_returns_zero_page() {
     547            2 :         let h = RedoHarness::new().unwrap();
     548            2 : 
     549            2 :         let page = h
     550            2 :             .manager
     551            2 :             .request_redo(
     552            2 :                 Key {
     553            2 :                     field1: 0,
     554            2 :                     field2: 1663,
     555            2 :                     // key should be 13010
     556            2 :                     field3: 13130,
     557            2 :                     field4: 1259,
     558            2 :                     field5: 0,
     559            2 :                     field6: 0,
     560            2 :                 },
     561            2 :                 Lsn::from_str("0/16E2408").unwrap(),
     562            2 :                 None,
     563            2 :                 short_records(),
     564            2 :                 14,
     565            2 :             )
     566            2 :             .instrument(h.span())
     567            4 :             .await
     568            2 :             .unwrap();
     569            2 : 
     570            2 :         // TODO: there will be some stderr printout, which is forwarded to tracing that could
     571            2 :         // perhaps be captured as long as it's in the same thread.
     572            2 :         assert_eq!(page, crate::ZERO_PAGE);
     573            2 :     }
     574              : 
     575              :     #[tokio::test]
     576            2 :     async fn test_stderr() {
     577            2 :         let h = RedoHarness::new().unwrap();
     578            2 :         h
     579            2 :             .manager
     580            2 :             .request_redo(
     581            2 :                 Key::from_i128(0),
     582            2 :                 Lsn::INVALID,
     583            2 :                 None,
     584            2 :                 short_records(),
     585            2 :                 16, /* 16 currently produces stderr output on startup, which adds a nice extra edge */
     586            2 :             )
     587            2 :             .instrument(h.span())
     588            8 :             .await
     589            2 :             .unwrap_err();
     590            2 :     }
     591              : 
     592              :     #[allow(clippy::octal_escapes)]
     593            6 :     fn short_records() -> Vec<(Lsn, NeonWalRecord)> {
     594            6 :         vec![
     595            6 :             (
     596            6 :                 Lsn::from_str("0/16A9388").unwrap(),
     597            6 :                 NeonWalRecord::Postgres {
     598            6 :                     will_init: true,
     599            6 :                     rec: Bytes::from_static(b"j\x03\0\0\0\x04\0\0\xe8\x7fj\x01\0\0\0\0\0\n\0\0\xd0\x16\x13Y\0\x10\0\04\x03\xd4\0\x05\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x03\0\0\0\0\x80\xeca\x01\0\0\x01\0\xd4\0\xa0\x1d\0 \x04 \0\0\0\0/\0\x01\0\xa0\x9dX\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\00\x9f\x9a\x01P\x9e\xb2\x01\0\x04\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x02\0!\0\x01\x08 \xff\xff\xff?\0\0\0\0\0\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\0\0\0\0\0\0\x80\xbf\0\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\0\0\0\0\x0c\x02\0\0\0\0\0\0\0\0\0\0\0\0\0\0/\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0\xdf\x04\0\0pg_type\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0G\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0\0\0\0\0\0\x0e\0\0\0\0@\x16D\x0e\0\0\0K\x10\0\0\x01\0pr \0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0[\x01\0\0\0\0\0\0\0\t\x04\0\0\x02\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0\0\0C\x01\0\0\x15\x01\0\0\0\0\0\0\0\0\0\0\0\0\0\0.\0!\x80\x03+ \xff\xff\xff\x7f\0\0\0\0\0;\n\0\0pg_statistic\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x0b\0\0\0\xfd.\0\0\0\0\0\0\n\0\0\0\x02\0\0\0;\n\0\0\0\0\0\0\x13\0\0\0\0\0\xcbC\x13\0\0\0\x18\x0b\0\0\x01\0pr\x1f\0\0\0\0\0\0\0\0\x01n\0\0\0\0\0\xd6\x02\0\0\x01\0\0\0C\x01\0\0\0\0\0\0\0\t\x04\0\0\x01\0\0\0\x01\0\0\0\n\0\0\0\n\0\0\0\x7f\0\0\0\0\0\0\x02\0\x01")
     600            6 :                 }
     601            6 :             ),
     602            6 :             (
     603            6 :                 Lsn::from_str("0/16D4080").unwrap(),
     604            6 :                 NeonWalRecord::Postgres {
     605            6 :                     will_init: false,
     606            6 :                     rec: Bytes::from_static(b"\xbc\0\0\0\0\0\0\0h?m\x01\0\0\0\0p\n\0\09\x08\xa3\xea\0 \x8c\0\x7f\x06\0\0\xd22\0\0\xeb\x04\0\0\0\0\0\0\xff\x02\0@\0\0another_table\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\x98\x08\0\0\x02@\0\0\0\0\0\0\n\0\0\0\x02\0\0\0\0@\0\0\0\0\0\0\x05\0\0\0\0@zD\x05\0\0\0\0\0\0\0\0\0pr\x01\0\0\0\0\0\0\0\0\x01d\0\0\0\0\0\0\x04\0\0\x01\0\0\0\x02\0")
     607            6 :                 }
     608            6 :             )
     609            6 :         ]
     610            6 :     }
     611              : 
     612              :     struct RedoHarness {
     613              :         // underscored because unused, except for removal at drop
     614              :         _repo_dir: camino_tempfile::Utf8TempDir,
     615              :         manager: PostgresRedoManager,
     616              :         tenant_shard_id: TenantShardId,
     617              :     }
     618              : 
     619              :     impl RedoHarness {
     620            6 :         fn new() -> anyhow::Result<Self> {
     621            6 :             crate::tenant::harness::setup_logging();
     622              : 
     623            6 :             let repo_dir = camino_tempfile::tempdir()?;
     624            6 :             let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
     625            6 :             let conf = Box::leak(Box::new(conf));
     626            6 :             let tenant_shard_id = TenantShardId::unsharded(TenantId::generate());
     627            6 : 
     628            6 :             let manager = PostgresRedoManager::new(conf, tenant_shard_id);
     629            6 : 
     630            6 :             Ok(RedoHarness {
     631            6 :                 _repo_dir: repo_dir,
     632            6 :                 manager,
     633            6 :                 tenant_shard_id,
     634            6 :             })
     635            6 :         }
     636            6 :         fn span(&self) -> tracing::Span {
     637            6 :             tracing::info_span!("RedoHarness", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug())
     638            6 :         }
     639              :     }
     640              : }
        

Generated by: LCOV version 2.1-beta