LCOV - code coverage report
Current view: top level - pageserver/src/walredo - process.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 85.7 % 154 132
Test Date: 2024-02-07 07:37:29 Functions: 47.4 % 19 9

            Line data    Source code
       1              : use self::no_leak_child::NoLeakChild;
       2              : use crate::{
       3              :     config::PageServerConf,
       4              :     metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER},
       5              :     walrecord::NeonWalRecord,
       6              : };
       7              : use anyhow::Context;
       8              : use bytes::Bytes;
       9              : use nix::poll::{PollFd, PollFlags};
      10              : use pageserver_api::{reltag::RelTag, shard::TenantShardId};
      11              : use postgres_ffi::BLCKSZ;
      12              : use std::os::fd::AsRawFd;
      13              : #[cfg(feature = "testing")]
      14              : use std::sync::atomic::AtomicUsize;
      15              : use std::{
      16              :     collections::VecDeque,
      17              :     io::{Read, Write},
      18              :     process::{ChildStdin, ChildStdout, Command, Stdio},
      19              :     sync::{Mutex, MutexGuard},
      20              :     time::Duration,
      21              : };
      22              : use tracing::{debug, error, instrument, Instrument};
      23              : use utils::{lsn::Lsn, nonblock::set_nonblock};
      24              : 
      25              : mod no_leak_child;
      26              : /// The IPC protocol that pageserver and walredo process speak over their shared pipe.
      27              : mod protocol;
      28              : 
      29              : pub struct WalRedoProcess {
      30              :     #[allow(dead_code)]
      31              :     conf: &'static PageServerConf,
      32              :     tenant_shard_id: TenantShardId,
      33              :     // Some() on construction, only becomes None on Drop.
      34              :     child: Option<NoLeakChild>,
      35              :     stdout: Mutex<ProcessOutput>,
      36              :     stdin: Mutex<ProcessInput>,
      37              :     /// Counter to separate same sized walredo inputs failing at the same millisecond.
      38              :     #[cfg(feature = "testing")]
      39              :     dump_sequence: AtomicUsize,
      40              : }
      41              : 
      42              : struct ProcessInput {
      43              :     stdin: ChildStdin,
      44              :     n_requests: usize,
      45              : }
      46              : 
      47              : struct ProcessOutput {
      48              :     stdout: ChildStdout,
      49              :     pending_responses: VecDeque<Option<Bytes>>,
      50              :     n_processed_responses: usize,
      51              : }
      52              : 
      53              : impl WalRedoProcess {
      54              :     //
      55              :     // Start postgres binary in special WAL redo mode.
      56              :     //
      57          604 :     #[instrument(skip_all,fields(pg_version=pg_version))]
      58              :     pub(crate) fn launch(
      59              :         conf: &'static PageServerConf,
      60              :         tenant_shard_id: TenantShardId,
      61              :         pg_version: u32,
      62              :     ) -> anyhow::Result<Self> {
      63              :         crate::span::debug_assert_current_span_has_tenant_id();
      64              : 
      65              :         let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
      66              :         let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
      67              : 
      68              :         use no_leak_child::NoLeakChildCommandExt;
      69              :         // Start postgres itself
      70              :         let child = Command::new(pg_bin_dir_path.join("postgres"))
      71              :             // the first arg must be --wal-redo so the child process enters into walredo mode
      72              :             .arg("--wal-redo")
      73              :             // the child doesn't process this arg, but, having it in the argv helps indentify the
      74              :             // walredo process for a particular tenant when debugging a pagserver
      75              :             .args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
      76              :             .stdin(Stdio::piped())
      77              :             .stderr(Stdio::piped())
      78              :             .stdout(Stdio::piped())
      79              :             .env_clear()
      80              :             .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
      81              :             .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
      82              :             // NB: The redo process is not trusted after we sent it the first
      83              :             // walredo work. Before that, it is trusted. Specifically, we trust
      84              :             // it to
      85              :             // 1. close all file descriptors except stdin, stdout, stderr because
      86              :             //    pageserver might not be 100% diligent in setting FD_CLOEXEC on all
      87              :             //    the files it opens, and
      88              :             // 2. to use seccomp to sandbox itself before processing the first
      89              :             //    walredo request.
      90              :             .spawn_no_leak_child(tenant_shard_id)
      91              :             .context("spawn process")?;
      92              :         WAL_REDO_PROCESS_COUNTERS.started.inc();
      93            0 :         let mut child = scopeguard::guard(child, |child| {
      94            0 :             error!("killing wal-redo-postgres process due to a problem during launch");
      95            0 :             child.kill_and_wait(WalRedoKillCause::Startup);
      96            0 :         });
      97              : 
      98              :         let stdin = child.stdin.take().unwrap();
      99              :         let stdout = child.stdout.take().unwrap();
     100              :         let stderr = child.stderr.take().unwrap();
     101              :         let stderr = tokio::process::ChildStderr::from_std(stderr)
     102              :             .context("convert to tokio::ChildStderr")?;
     103              :         macro_rules! set_nonblock_or_log_err {
     104              :         ($file:ident) => {{
     105              :             let res = set_nonblock($file.as_raw_fd());
     106              :             if let Err(e) = &res {
     107              :                 error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
     108              :             }
     109              :             res
     110              :         }};
     111              :     }
     112            0 :         set_nonblock_or_log_err!(stdin)?;
     113            0 :         set_nonblock_or_log_err!(stdout)?;
     114              : 
     115              :         // all fallible operations post-spawn are complete, so get rid of the guard
     116              :         let child = scopeguard::ScopeGuard::into_inner(child);
     117              : 
     118              :         tokio::spawn(
     119          596 :         async move {
     120          281 :             scopeguard::defer! {
     121          281 :                 debug!("wal-redo-postgres stderr_logger_task finished");
     122          281 :                 crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
     123              :             }
     124          596 :             debug!("wal-redo-postgres stderr_logger_task started");
     125          596 :             crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
     126          596 : 
     127          596 :             use tokio::io::AsyncBufReadExt;
     128          596 :             let mut stderr_lines = tokio::io::BufReader::new(stderr);
     129          596 :             let mut buf = Vec::new();
     130          596 :             let res = loop {
     131          596 :                 buf.clear();
     132          596 :                 // TODO we don't trust the process to cap its stderr length.
     133          596 :                 // Currently it can do unbounded Vec allocation.
     134          596 :                 match stderr_lines.read_until(b'\n', &mut buf).await {
     135          281 :                     Ok(0) => break Ok(()), // eof
     136            0 :                     Ok(num_bytes) => {
     137            0 :                         let output = String::from_utf8_lossy(&buf[..num_bytes]);
     138            0 :                         error!(%output, "received output");
     139              :                     }
     140            0 :                     Err(e) => {
     141            0 :                         break Err(e);
     142              :                     }
     143              :                 }
     144              :             };
     145          281 :             match res {
     146          281 :                 Ok(()) => (),
     147            0 :                 Err(e) => {
     148            0 :                     error!(error=?e, "failed to read from walredo stderr");
     149              :                 }
     150              :             }
     151          281 :         }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
     152              :     );
     153              : 
     154              :         Ok(Self {
     155              :             conf,
     156              :             tenant_shard_id,
     157              :             child: Some(child),
     158              :             stdin: Mutex::new(ProcessInput {
     159              :                 stdin,
     160              :                 n_requests: 0,
     161              :             }),
     162              :             stdout: Mutex::new(ProcessOutput {
     163              :                 stdout,
     164              :                 pending_responses: VecDeque::new(),
     165              :                 n_processed_responses: 0,
     166              :             }),
     167              :             #[cfg(feature = "testing")]
     168              :             dump_sequence: AtomicUsize::default(),
     169              :         })
     170              :     }
     171              : 
     172      2857406 :     pub(crate) fn id(&self) -> u32 {
     173      2857406 :         self.child
     174      2857406 :             .as_ref()
     175      2857406 :             .expect("must not call this during Drop")
     176      2857406 :             .id()
     177      2857406 :     }
     178              : 
     179              :     // Apply given WAL records ('records') over an old page image. Returns
     180              :     // new page image.
     181              :     //
     182      2856626 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
     183              :     pub(crate) fn apply_wal_records(
     184              :         &self,
     185              :         rel: RelTag,
     186              :         blknum: u32,
     187              :         base_img: &Option<Bytes>,
     188              :         records: &[(Lsn, NeonWalRecord)],
     189              :         wal_redo_timeout: Duration,
     190              :     ) -> anyhow::Result<Bytes> {
     191              :         let tag = protocol::BufferTag { rel, blknum };
     192              :         let input = self.stdin.lock().unwrap();
     193              : 
     194              :         // Serialize all the messages to send the WAL redo process first.
     195              :         //
     196              :         // This could be problematic if there are millions of records to replay,
     197              :         // but in practice the number of records is usually so small that it doesn't
     198              :         // matter, and it's better to keep this code simple.
     199              :         //
     200              :         // Most requests start with a before-image with BLCKSZ bytes, followed by
     201              :         // by some other WAL records. Start with a buffer that can hold that
     202              :         // comfortably.
     203              :         let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
     204              :         protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
     205              :         if let Some(img) = base_img {
     206              :             protocol::build_push_page_msg(tag, img, &mut writebuf);
     207              :         }
     208              :         for (lsn, rec) in records.iter() {
     209              :             if let NeonWalRecord::Postgres {
     210              :                 will_init: _,
     211              :                 rec: postgres_rec,
     212              :             } = rec
     213              :             {
     214              :                 protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
     215              :             } else {
     216              :                 anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
     217              :             }
     218              :         }
     219              :         protocol::build_get_page_msg(tag, &mut writebuf);
     220              :         WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
     221              : 
     222              :         let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
     223              : 
     224              :         if res.is_err() {
     225              :             // not all of these can be caused by this particular input, however these are so rare
     226              :             // in tests so capture all.
     227              :             self.record_and_log(&writebuf);
     228              :         }
     229              : 
     230              :         res
     231              :     }
     232              : 
     233      2856626 :     fn apply_wal_records0(
     234      2856626 :         &self,
     235      2856626 :         writebuf: &[u8],
     236      2856626 :         input: MutexGuard<ProcessInput>,
     237      2856626 :         wal_redo_timeout: Duration,
     238      2856626 :     ) -> anyhow::Result<Bytes> {
     239      2856626 :         let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
     240      2856626 :         let mut nwrite = 0usize;
     241              : 
     242      5721570 :         while nwrite < writebuf.len() {
     243      2864944 :             let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
     244      2864944 :             let n = loop {
     245      2864944 :                 match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
     246            0 :                     Err(nix::errno::Errno::EINTR) => continue,
     247      2864944 :                     res => break res,
     248              :                 }
     249            0 :             }?;
     250              : 
     251      2864944 :             if n == 0 {
     252            0 :                 anyhow::bail!("WAL redo timed out");
     253      2864944 :             }
     254      2864944 : 
     255      2864944 :             // If 'stdin' is writeable, do write.
     256      2864944 :             let in_revents = stdin_pollfds[0].revents().unwrap();
     257      2864944 :             if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
     258      2864944 :                 nwrite += proc.stdin.write(&writebuf[nwrite..])?;
     259            0 :             }
     260      2864944 :             if in_revents.contains(PollFlags::POLLHUP) {
     261              :                 // We still have more data to write, but the process closed the pipe.
     262            0 :                 anyhow::bail!("WAL redo process closed its stdin unexpectedly");
     263      2864944 :             }
     264              :         }
     265      2856626 :         let request_no = proc.n_requests;
     266      2856626 :         proc.n_requests += 1;
     267      2856626 :         drop(proc);
     268      2856626 : 
     269      2856626 :         // To improve walredo performance we separate sending requests and receiving
     270      2856626 :         // responses. Them are protected by different mutexes (output and input).
     271      2856626 :         // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
     272      2856626 :         // then there is not warranty that T1 will first granted output mutex lock.
     273      2856626 :         // To address this issue we maintain number of sent requests, number of processed
     274      2856626 :         // responses and ring buffer with pending responses. After sending response
     275      2856626 :         // (under input mutex), threads remembers request number. Then it releases
     276      2856626 :         // input mutex, locks output mutex and fetch in ring buffer all responses until
     277      2856626 :         // its stored request number. The it takes correspondent element from
     278      2856626 :         // pending responses ring buffer and truncate all empty elements from the front,
     279      2856626 :         // advancing processed responses number.
     280      2856626 : 
     281      2856626 :         let mut output = self.stdout.lock().unwrap();
     282      2856626 :         let n_processed_responses = output.n_processed_responses;
     283      5713248 :         while n_processed_responses + output.pending_responses.len() <= request_no {
     284              :             // We expect the WAL redo process to respond with an 8k page image. We read it
     285              :             // into this buffer.
     286      2856626 :             let mut resultbuf = vec![0; BLCKSZ.into()];
     287      2856626 :             let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
     288      5713248 :             while nresult < BLCKSZ.into() {
     289      2856626 :                 let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
     290              :                 // We do two things simultaneously: reading response from stdout
     291              :                 // and forward any logging information that the child writes to its stderr to the page server's log.
     292      2856626 :                 let n = loop {
     293              :                     match nix::poll::poll(
     294      2856626 :                         &mut stdout_pollfds[..],
     295      2856626 :                         wal_redo_timeout.as_millis() as i32,
     296              :                     ) {
     297            0 :                         Err(nix::errno::Errno::EINTR) => continue,
     298      2856626 :                         res => break res,
     299              :                     }
     300            0 :                 }?;
     301              : 
     302      2856626 :                 if n == 0 {
     303            0 :                     anyhow::bail!("WAL redo timed out");
     304      2856626 :                 }
     305      2856626 : 
     306      2856626 :                 // If we have some data in stdout, read it to the result buffer.
     307      2856626 :                 let out_revents = stdout_pollfds[0].revents().unwrap();
     308      2856626 :                 if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
     309      2856622 :                     nresult += output.stdout.read(&mut resultbuf[nresult..])?;
     310            4 :                 }
     311      2856626 :                 if out_revents.contains(PollFlags::POLLHUP) {
     312            4 :                     anyhow::bail!("WAL redo process closed its stdout unexpectedly");
     313      2856622 :                 }
     314              :             }
     315      2856622 :             output
     316      2856622 :                 .pending_responses
     317      2856622 :                 .push_back(Some(Bytes::from(resultbuf)));
     318              :         }
     319              :         // Replace our request's response with None in `pending_responses`.
     320              :         // Then make space in the ring buffer by clearing out any seqence of contiguous
     321              :         // `None`'s from the front of `pending_responses`.
     322              :         // NB: We can't pop_front() because other requests' responses because another
     323              :         // requester might have grabbed the output mutex before us:
     324              :         // T1: grab input mutex
     325              :         // T1: send request_no 23
     326              :         // T1: release input mutex
     327              :         // T2: grab input mutex
     328              :         // T2: send request_no 24
     329              :         // T2: release input mutex
     330              :         // T2: grab output mutex
     331              :         // T2: n_processed_responses + output.pending_responses.len() <= request_no
     332              :         //            23                                0                   24
     333              :         // T2: enters poll loop that reads stdout
     334              :         // T2: put response for 23 into pending_responses
     335              :         // T2: put response for 24 into pending_resposnes
     336              :         // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
     337              :         // T2: takes its response_24
     338              :         // pending_responses now looks like this: Front Some(response_23) None Back
     339              :         // T2: does the while loop below
     340              :         // pending_responses now looks like this: Front Some(response_23) None Back
     341              :         // T2: releases output mutex
     342              :         // T1: grabs output mutex
     343              :         // T1: n_processed_responses + output.pending_responses.len() > request_no
     344              :         //            23                                2                   23
     345              :         // T1: skips poll loop that reads stdout
     346              :         // T1: takes its response_23
     347              :         // pending_responses now looks like this: Front None None Back
     348              :         // T2: does the while loop below
     349              :         // pending_responses now looks like this: Front Back
     350              :         // n_processed_responses now has value 25
     351      2856622 :         let res = output.pending_responses[request_no - n_processed_responses]
     352      2856622 :             .take()
     353      2856622 :             .expect("we own this request_no, nobody else is supposed to take it");
     354      5713244 :         while let Some(front) = output.pending_responses.front() {
     355      2892693 :             if front.is_none() {
     356      2856622 :                 output.pending_responses.pop_front();
     357      2856622 :                 output.n_processed_responses += 1;
     358      2856622 :             } else {
     359        36071 :                 break;
     360              :             }
     361              :         }
     362      2856622 :         Ok(res)
     363      2856626 :     }
     364              : 
     365              :     #[cfg(feature = "testing")]
     366            4 :     fn record_and_log(&self, writebuf: &[u8]) {
     367            4 :         use std::sync::atomic::Ordering;
     368            4 : 
     369            4 :         let millis = std::time::SystemTime::now()
     370            4 :             .duration_since(std::time::SystemTime::UNIX_EPOCH)
     371            4 :             .unwrap()
     372            4 :             .as_millis();
     373            4 : 
     374            4 :         let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
     375            4 : 
     376            4 :         // these files will be collected to an allure report
     377            4 :         let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
     378            4 : 
     379            4 :         let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
     380            4 : 
     381            4 :         let res = std::fs::OpenOptions::new()
     382            4 :             .write(true)
     383            4 :             .create_new(true)
     384            4 :             .read(true)
     385            4 :             .open(path)
     386            4 :             .and_then(|mut f| f.write_all(writebuf));
     387              : 
     388              :         // trip up allowed_errors
     389            4 :         if let Err(e) = res {
     390            4 :             tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
     391              :         } else {
     392            0 :             tracing::error!(filename, "erroring walredo input saved");
     393              :         }
     394            4 :     }
     395              : 
     396              :     #[cfg(not(feature = "testing"))]
     397              :     fn record_and_log(&self, _: &[u8]) {}
     398              : }
     399              : 
     400              : impl Drop for WalRedoProcess {
     401          289 :     fn drop(&mut self) {
     402          289 :         self.child
     403          289 :             .take()
     404          289 :             .expect("we only do this once")
     405          289 :             .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
     406          289 :         // no way to wait for stderr_logger_task from Drop because that is async only
     407          289 :     }
     408              : }
        

Generated by: LCOV version 2.1-beta