|             Line data    Source code 
       1              : mod no_leak_child;
       2              : /// The IPC protocol that pageserver and walredo process speak over their shared pipe.
       3              : mod protocol;
       4              : 
       5              : use std::collections::VecDeque;
       6              : use std::process::{Command, Stdio};
       7              : #[cfg(feature = "testing")]
       8              : use std::sync::atomic::AtomicUsize;
       9              : use std::time::Duration;
      10              : 
      11              : use anyhow::Context;
      12              : use bytes::Bytes;
      13              : use pageserver_api::reltag::RelTag;
      14              : use pageserver_api::shard::TenantShardId;
      15              : use postgres_ffi::{BLCKSZ, PgMajorVersion};
      16              : use tokio::io::{AsyncReadExt, AsyncWriteExt};
      17              : use tracing::{Instrument, debug, error, instrument};
      18              : use utils::lsn::Lsn;
      19              : use utils::poison::Poison;
      20              : use wal_decoder::models::record::NeonWalRecord;
      21              : 
      22              : use self::no_leak_child::NoLeakChild;
      23              : use crate::config::PageServerConf;
      24              : use crate::metrics::{WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER, WalRedoKillCause};
      25              : use crate::page_cache::PAGE_SZ;
      26              : use crate::span::debug_assert_current_span_has_tenant_id;
      27              : 
      28              : pub struct WalRedoProcess {
      29              :     #[allow(dead_code)]
      30              :     conf: &'static PageServerConf,
      31              :     #[cfg(feature = "testing")]
      32              :     tenant_shard_id: TenantShardId,
      33              :     // Some() on construction, only becomes None on Drop.
      34              :     child: Option<NoLeakChild>,
      35              :     stdout: tokio::sync::Mutex<Poison<ProcessOutput>>,
      36              :     stdin: tokio::sync::Mutex<Poison<ProcessInput>>,
      37              :     /// Counter to separate same sized walredo inputs failing at the same millisecond.
      38              :     #[cfg(feature = "testing")]
      39              :     dump_sequence: AtomicUsize,
      40              : }
      41              : 
      42              : struct ProcessInput {
      43              :     stdin: tokio::process::ChildStdin,
      44              :     n_requests: usize,
      45              : }
      46              : 
      47              : struct ProcessOutput {
      48              :     stdout: tokio::process::ChildStdout,
      49              :     pending_responses: VecDeque<Option<Bytes>>,
      50              :     n_processed_responses: usize,
      51              : }
      52              : 
      53              : impl WalRedoProcess {
      54              :     //
      55              :     // Start postgres binary in special WAL redo mode.
      56              :     //
      57              :     #[instrument(skip_all,fields(pg_version=pg_version.major_version_num()))]
      58              :     pub(crate) fn launch(
      59              :         conf: &'static PageServerConf,
      60              :         tenant_shard_id: TenantShardId,
      61              :         pg_version: PgMajorVersion,
      62              :     ) -> anyhow::Result<Self> {
      63              :         crate::span::debug_assert_current_span_has_tenant_id();
      64              : 
      65              :         let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
      66              :         let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
      67              : 
      68              :         use no_leak_child::NoLeakChildCommandExt;
      69              :         // Start postgres itself
      70              :         let child = Command::new(pg_bin_dir_path.join("postgres"))
      71              :             // the first arg must be --wal-redo so the child process enters into walredo mode
      72              :             .arg("--wal-redo")
      73              :             // the child doesn't process this arg, but, having it in the argv helps indentify the
      74              :             // walredo process for a particular tenant when debugging a pagserver
      75              :             .args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
      76              :             .stdin(Stdio::piped())
      77              :             .stderr(Stdio::piped())
      78              :             .stdout(Stdio::piped())
      79              :             .env_clear()
      80              :             .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
      81              :             .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
      82              :             .env(
      83              :                 "ASAN_OPTIONS",
      84              :                 std::env::var("ASAN_OPTIONS").unwrap_or_default(),
      85              :             )
      86              :             .env(
      87              :                 "UBSAN_OPTIONS",
      88              :                 std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
      89              :             )
      90              :             // NB: The redo process is not trusted after we sent it the first
      91              :             // walredo work. Before that, it is trusted. Specifically, we trust
      92              :             // it to
      93              :             // 1. close all file descriptors except stdin, stdout, stderr because
      94              :             //    pageserver might not be 100% diligent in setting FD_CLOEXEC on all
      95              :             //    the files it opens, and
      96              :             // 2. to use seccomp to sandbox itself before processing the first
      97              :             //    walredo request.
      98              :             .spawn_no_leak_child(tenant_shard_id)
      99              :             .context("spawn process")?;
     100              :         WAL_REDO_PROCESS_COUNTERS.started.inc();
     101            0 :         let mut child = scopeguard::guard(child, |child| {
     102            0 :             error!("killing wal-redo-postgres process due to a problem during launch");
     103            0 :             child.kill_and_wait(WalRedoKillCause::Startup);
     104            0 :         });
     105              : 
     106              :         let stdin = child.stdin.take().unwrap();
     107              :         let stdout = child.stdout.take().unwrap();
     108              :         let stderr = child.stderr.take().unwrap();
     109              :         let stderr = tokio::process::ChildStderr::from_std(stderr)
     110              :             .context("convert to tokio::ChildStderr")?;
     111              :         let stdin =
     112              :             tokio::process::ChildStdin::from_std(stdin).context("convert to tokio::ChildStdin")?;
     113              :         let stdout = tokio::process::ChildStdout::from_std(stdout)
     114              :             .context("convert to tokio::ChildStdout")?;
     115              : 
     116              :         // all fallible operations post-spawn are complete, so get rid of the guard
     117              :         let child = scopeguard::ScopeGuard::into_inner(child);
     118              : 
     119              :         tokio::spawn(
     120            6 :             async move {
     121            6 :                 scopeguard::defer! {
     122              :                     debug!("wal-redo-postgres stderr_logger_task finished");
     123              :                     crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
     124              :                 }
     125            6 :                 debug!("wal-redo-postgres stderr_logger_task started");
     126            6 :                 crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
     127              : 
     128              :                 use tokio::io::AsyncBufReadExt;
     129            6 :                 let mut stderr_lines = tokio::io::BufReader::new(stderr);
     130            6 :                 let mut buf = Vec::new();
     131            2 :                 let res = loop {
     132           16 :                     buf.clear();
     133              :                     // TODO we don't trust the process to cap its stderr length.
     134              :                     // Currently it can do unbounded Vec allocation.
     135           16 :                     match stderr_lines.read_until(b'\n', &mut buf).await {
     136            2 :                         Ok(0) => break Ok(()), // eof
     137           10 :                         Ok(num_bytes) => {
     138           10 :                             let output = String::from_utf8_lossy(&buf[..num_bytes]);
     139           10 :                             if !output.contains("LOG:") {
     140            8 :                                error!(%output, "received output");
     141            2 :                             }
     142              :                         }
     143            0 :                         Err(e) => {
     144            0 :                             break Err(e);
     145              :                         }
     146              :                     }
     147              :                 };
     148            2 :                 match res {
     149            2 :                     Ok(()) => (),
     150            0 :                     Err(e) => {
     151            0 :                         error!(error=?e, "failed to read from walredo stderr");
     152              :                     }
     153              :                 }
     154            2 :             }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
     155              :         );
     156              : 
     157              :         Ok(Self {
     158              :             conf,
     159              :             #[cfg(feature = "testing")]
     160              :             tenant_shard_id,
     161              :             child: Some(child),
     162              :             stdin: tokio::sync::Mutex::new(Poison::new(
     163              :                 "stdin",
     164              :                 ProcessInput {
     165              :                     stdin,
     166              :                     n_requests: 0,
     167              :                 },
     168              :             )),
     169              :             stdout: tokio::sync::Mutex::new(Poison::new(
     170              :                 "stdout",
     171              :                 ProcessOutput {
     172              :                     stdout,
     173              :                     pending_responses: VecDeque::new(),
     174              :                     n_processed_responses: 0,
     175              :                 },
     176              :             )),
     177              :             #[cfg(feature = "testing")]
     178              :             dump_sequence: AtomicUsize::default(),
     179              :         })
     180              :     }
     181              : 
     182           11 :     pub(crate) fn id(&self) -> u32 {
     183           11 :         self.child
     184           11 :             .as_ref()
     185           11 :             .expect("must not call this during Drop")
     186           11 :             .id()
     187           11 :     }
     188              : 
     189              :     /// Apply given WAL records ('records') over an old page image. Returns
     190              :     /// new page image.
     191              :     ///
     192              :     /// # Cancel-Safety
     193              :     ///
     194              :     /// Cancellation safe.
     195              :     #[instrument(skip_all, fields(pid=%self.id()))]
     196              :     pub(crate) async fn apply_wal_records(
     197              :         &self,
     198              :         rel: RelTag,
     199              :         blknum: u32,
     200              :         base_img: &Option<Bytes>,
     201              :         records: &[(Lsn, NeonWalRecord)],
     202              :         wal_redo_timeout: Duration,
     203              :     ) -> anyhow::Result<Bytes> {
     204              :         debug_assert_current_span_has_tenant_id();
     205              : 
     206              :         let tag = protocol::BufferTag { rel, blknum };
     207              : 
     208              :         // Serialize all the messages to send the WAL redo process first.
     209              :         //
     210              :         // This could be problematic if there are millions of records to replay,
     211              :         // but in practice the number of records is usually so small that it doesn't
     212              :         // matter, and it's better to keep this code simple.
     213              :         //
     214              :         // Most requests start with a before-image with BLCKSZ bytes, followed by
     215              :         // by some other WAL records. Start with a buffer that can hold that
     216              :         // comfortably.
     217              :         let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
     218              :         protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
     219              :         if let Some(img) = base_img {
     220              :             protocol::build_push_page_msg(tag, img, &mut writebuf);
     221              :         }
     222              :         for (lsn, rec) in records.iter() {
     223              :             if let NeonWalRecord::Postgres {
     224              :                 will_init: _,
     225              :                 rec: postgres_rec,
     226              :             } = rec
     227              :             {
     228              :                 protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
     229              :             } else {
     230              :                 anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
     231              :             }
     232              :         }
     233              :         protocol::build_get_page_msg(tag, &mut writebuf);
     234              :         WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
     235              : 
     236              :         let Ok(res) =
     237              :             tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await
     238              :         else {
     239              :             anyhow::bail!("WAL redo timed out");
     240              :         };
     241              : 
     242              :         if res.is_err() {
     243              :             // not all of these can be caused by this particular input, however these are so rare
     244              :             // in tests so capture all.
     245              :             self.record_and_log(&writebuf);
     246              :         }
     247              : 
     248              :         res
     249              :     }
     250              : 
     251              :     /// Do a ping request-response roundtrip.
     252              :     ///
     253              :     /// Not used in production, but by Rust benchmarks.
     254            1 :     pub(crate) async fn ping(&self, timeout: Duration) -> anyhow::Result<()> {
     255            1 :         let mut writebuf: Vec<u8> = Vec::with_capacity(4);
     256            1 :         protocol::build_ping_msg(&mut writebuf);
     257            1 :         let Ok(res) = tokio::time::timeout(timeout, self.apply_wal_records0(&writebuf)).await
     258              :         else {
     259            0 :             anyhow::bail!("WAL redo ping timed out");
     260              :         };
     261            1 :         let response = res?;
     262            1 :         if response.len() != PAGE_SZ {
     263            0 :             anyhow::bail!(
     264            0 :                 "WAL redo ping response should respond with page-sized response: {}",
     265            0 :                 response.len()
     266              :             );
     267            1 :         }
     268            1 :         Ok(())
     269            1 :     }
     270              : 
     271              :     /// # Cancel-Safety
     272              :     ///
     273              :     /// When not polled to completion (e.g. because in `tokio::select!` another
     274              :     /// branch becomes ready before this future), concurrent and subsequent
     275              :     /// calls may fail due to [`utils::poison::Poison::check_and_arm`] calls.
     276              :     /// Dispose of this process instance and create a new one.
     277            6 :     async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result<Bytes> {
     278            6 :         let request_no = {
     279            6 :             let mut lock_guard = self.stdin.lock().await;
     280            6 :             let mut poison_guard = lock_guard.check_and_arm()?;
     281            6 :             let input = poison_guard.data_mut();
     282            6 :             input
     283            6 :                 .stdin
     284            6 :                 .write_all(writebuf)
     285            6 :                 .await
     286            6 :                 .context("write to walredo stdin")?;
     287            6 :             let request_no = input.n_requests;
     288            6 :             input.n_requests += 1;
     289            6 :             poison_guard.disarm();
     290            6 :             request_no
     291              :         };
     292              : 
     293              :         // To improve walredo performance we separate sending requests and receiving
     294              :         // responses. Them are protected by different mutexes (output and input).
     295              :         // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
     296              :         // then there is not warranty that T1 will first granted output mutex lock.
     297              :         // To address this issue we maintain number of sent requests, number of processed
     298              :         // responses and ring buffer with pending responses. After sending response
     299              :         // (under input mutex), threads remembers request number. Then it releases
     300              :         // input mutex, locks output mutex and fetch in ring buffer all responses until
     301              :         // its stored request number. The it takes correspondent element from
     302              :         // pending responses ring buffer and truncate all empty elements from the front,
     303              :         // advancing processed responses number.
     304              : 
     305            6 :         let mut lock_guard = self.stdout.lock().await;
     306            6 :         let mut poison_guard = lock_guard.check_and_arm()?;
     307            6 :         let output = poison_guard.data_mut();
     308            6 :         let n_processed_responses = output.n_processed_responses;
     309            9 :         while n_processed_responses + output.pending_responses.len() <= request_no {
     310              :             // We expect the WAL redo process to respond with an 8k page image. We read it
     311              :             // into this buffer.
     312            6 :             let mut resultbuf = vec![0; BLCKSZ.into()];
     313            6 :             output
     314            6 :                 .stdout
     315            6 :                 .read_exact(&mut resultbuf)
     316            6 :                 .await
     317            6 :                 .context("read walredo stdout")?;
     318            3 :             output
     319            3 :                 .pending_responses
     320            3 :                 .push_back(Some(Bytes::from(resultbuf)));
     321              :         }
     322              :         // Replace our request's response with None in `pending_responses`.
     323              :         // Then make space in the ring buffer by clearing out any seqence of contiguous
     324              :         // `None`'s from the front of `pending_responses`.
     325              :         // NB: We can't pop_front() because other requests' responses because another
     326              :         // requester might have grabbed the output mutex before us:
     327              :         // T1: grab input mutex
     328              :         // T1: send request_no 23
     329              :         // T1: release input mutex
     330              :         // T2: grab input mutex
     331              :         // T2: send request_no 24
     332              :         // T2: release input mutex
     333              :         // T2: grab output mutex
     334              :         // T2: n_processed_responses + output.pending_responses.len() <= request_no
     335              :         //            23                                0                   24
     336              :         // T2: enters poll loop that reads stdout
     337              :         // T2: put response for 23 into pending_responses
     338              :         // T2: put response for 24 into pending_resposnes
     339              :         // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
     340              :         // T2: takes its response_24
     341              :         // pending_responses now looks like this: Front Some(response_23) None Back
     342              :         // T2: does the while loop below
     343              :         // pending_responses now looks like this: Front Some(response_23) None Back
     344              :         // T2: releases output mutex
     345              :         // T1: grabs output mutex
     346              :         // T1: n_processed_responses + output.pending_responses.len() > request_no
     347              :         //            23                                2                   23
     348              :         // T1: skips poll loop that reads stdout
     349              :         // T1: takes its response_23
     350              :         // pending_responses now looks like this: Front None None Back
     351              :         // T2: does the while loop below
     352              :         // pending_responses now looks like this: Front Back
     353              :         // n_processed_responses now has value 25
     354            3 :         let res = output.pending_responses[request_no - n_processed_responses]
     355            3 :             .take()
     356            3 :             .expect("we own this request_no, nobody else is supposed to take it");
     357            6 :         while let Some(front) = output.pending_responses.front() {
     358            3 :             if front.is_none() {
     359            3 :                 output.pending_responses.pop_front();
     360            3 :                 output.n_processed_responses += 1;
     361            3 :             } else {
     362            0 :                 break;
     363              :             }
     364              :         }
     365            3 :         poison_guard.disarm();
     366            3 :         Ok(res)
     367            6 :     }
     368              : 
     369              :     #[cfg(feature = "testing")]
     370            3 :     fn record_and_log(&self, writebuf: &[u8]) {
     371              :         use std::sync::atomic::Ordering;
     372              : 
     373            3 :         let millis = std::time::SystemTime::now()
     374            3 :             .duration_since(std::time::SystemTime::UNIX_EPOCH)
     375            3 :             .unwrap()
     376            3 :             .as_millis();
     377              : 
     378            3 :         let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
     379              : 
     380              :         // these files will be collected to an allure report
     381            3 :         let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
     382              : 
     383            3 :         let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
     384              : 
     385              :         use std::io::Write;
     386            3 :         let res = std::fs::OpenOptions::new()
     387            3 :             .write(true)
     388            3 :             .create_new(true)
     389            3 :             .read(true)
     390            3 :             .open(path)
     391            3 :             .and_then(|mut f| f.write_all(writebuf));
     392              : 
     393              :         // trip up allowed_errors
     394            3 :         if let Err(e) = res {
     395            3 :             tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
     396              :         } else {
     397            0 :             tracing::error!(filename, "erroring walredo input saved");
     398              :         }
     399            3 :     }
     400              : 
     401              :     #[cfg(not(feature = "testing"))]
     402              :     fn record_and_log(&self, _: &[u8]) {}
     403              : }
     404              : 
     405              : impl Drop for WalRedoProcess {
     406            6 :     fn drop(&mut self) {
     407            6 :         self.child
     408            6 :             .take()
     409            6 :             .expect("we only do this once")
     410            6 :             .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
     411              :         // no way to wait for stderr_logger_task from Drop because that is async only
     412            6 :     }
     413              : }
         |