Line data Source code
1 : use self::no_leak_child::NoLeakChild;
2 : use crate::{
3 : config::PageServerConf,
4 : metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER},
5 : walrecord::NeonWalRecord,
6 : };
7 : use anyhow::Context;
8 : use bytes::Bytes;
9 : use nix::poll::{PollFd, PollFlags};
10 : use pageserver_api::{reltag::RelTag, shard::TenantShardId};
11 : use postgres_ffi::BLCKSZ;
12 : use std::os::fd::AsRawFd;
13 : #[cfg(feature = "testing")]
14 : use std::sync::atomic::AtomicUsize;
15 : use std::{
16 : collections::VecDeque,
17 : io::{Read, Write},
18 : process::{ChildStdin, ChildStdout, Command, Stdio},
19 : sync::{Mutex, MutexGuard},
20 : time::Duration,
21 : };
22 : use tracing::{debug, error, instrument, Instrument};
23 : use utils::{lsn::Lsn, nonblock::set_nonblock};
24 :
25 : mod no_leak_child;
26 : /// The IPC protocol that pageserver and walredo process speak over their shared pipe.
27 : mod protocol;
28 :
29 : pub struct WalRedoProcess {
30 : #[allow(dead_code)]
31 : conf: &'static PageServerConf,
32 : tenant_shard_id: TenantShardId,
33 : // Some() on construction, only becomes None on Drop.
34 : child: Option<NoLeakChild>,
35 : stdout: Mutex<ProcessOutput>,
36 : stdin: Mutex<ProcessInput>,
37 : /// Counter to separate same sized walredo inputs failing at the same millisecond.
38 : #[cfg(feature = "testing")]
39 : dump_sequence: AtomicUsize,
40 : }
41 :
42 : struct ProcessInput {
43 : stdin: ChildStdin,
44 : n_requests: usize,
45 : }
46 :
47 : struct ProcessOutput {
48 : stdout: ChildStdout,
49 : pending_responses: VecDeque<Option<Bytes>>,
50 : n_processed_responses: usize,
51 : }
52 :
53 : impl WalRedoProcess {
54 : //
55 : // Start postgres binary in special WAL redo mode.
56 : //
57 604 : #[instrument(skip_all,fields(pg_version=pg_version))]
58 : pub(crate) fn launch(
59 : conf: &'static PageServerConf,
60 : tenant_shard_id: TenantShardId,
61 : pg_version: u32,
62 : ) -> anyhow::Result<Self> {
63 : crate::span::debug_assert_current_span_has_tenant_id();
64 :
65 : let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
66 : let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
67 :
68 : use no_leak_child::NoLeakChildCommandExt;
69 : // Start postgres itself
70 : let child = Command::new(pg_bin_dir_path.join("postgres"))
71 : // the first arg must be --wal-redo so the child process enters into walredo mode
72 : .arg("--wal-redo")
73 : // the child doesn't process this arg, but, having it in the argv helps indentify the
74 : // walredo process for a particular tenant when debugging a pagserver
75 : .args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
76 : .stdin(Stdio::piped())
77 : .stderr(Stdio::piped())
78 : .stdout(Stdio::piped())
79 : .env_clear()
80 : .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
81 : .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
82 : // NB: The redo process is not trusted after we sent it the first
83 : // walredo work. Before that, it is trusted. Specifically, we trust
84 : // it to
85 : // 1. close all file descriptors except stdin, stdout, stderr because
86 : // pageserver might not be 100% diligent in setting FD_CLOEXEC on all
87 : // the files it opens, and
88 : // 2. to use seccomp to sandbox itself before processing the first
89 : // walredo request.
90 : .spawn_no_leak_child(tenant_shard_id)
91 : .context("spawn process")?;
92 : WAL_REDO_PROCESS_COUNTERS.started.inc();
93 0 : let mut child = scopeguard::guard(child, |child| {
94 0 : error!("killing wal-redo-postgres process due to a problem during launch");
95 0 : child.kill_and_wait(WalRedoKillCause::Startup);
96 0 : });
97 :
98 : let stdin = child.stdin.take().unwrap();
99 : let stdout = child.stdout.take().unwrap();
100 : let stderr = child.stderr.take().unwrap();
101 : let stderr = tokio::process::ChildStderr::from_std(stderr)
102 : .context("convert to tokio::ChildStderr")?;
103 : macro_rules! set_nonblock_or_log_err {
104 : ($file:ident) => {{
105 : let res = set_nonblock($file.as_raw_fd());
106 : if let Err(e) = &res {
107 : error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
108 : }
109 : res
110 : }};
111 : }
112 0 : set_nonblock_or_log_err!(stdin)?;
113 0 : set_nonblock_or_log_err!(stdout)?;
114 :
115 : // all fallible operations post-spawn are complete, so get rid of the guard
116 : let child = scopeguard::ScopeGuard::into_inner(child);
117 :
118 : tokio::spawn(
119 596 : async move {
120 281 : scopeguard::defer! {
121 281 : debug!("wal-redo-postgres stderr_logger_task finished");
122 281 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
123 : }
124 596 : debug!("wal-redo-postgres stderr_logger_task started");
125 596 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
126 596 :
127 596 : use tokio::io::AsyncBufReadExt;
128 596 : let mut stderr_lines = tokio::io::BufReader::new(stderr);
129 596 : let mut buf = Vec::new();
130 596 : let res = loop {
131 596 : buf.clear();
132 596 : // TODO we don't trust the process to cap its stderr length.
133 596 : // Currently it can do unbounded Vec allocation.
134 596 : match stderr_lines.read_until(b'\n', &mut buf).await {
135 281 : Ok(0) => break Ok(()), // eof
136 0 : Ok(num_bytes) => {
137 0 : let output = String::from_utf8_lossy(&buf[..num_bytes]);
138 0 : error!(%output, "received output");
139 : }
140 0 : Err(e) => {
141 0 : break Err(e);
142 : }
143 : }
144 : };
145 281 : match res {
146 281 : Ok(()) => (),
147 0 : Err(e) => {
148 0 : error!(error=?e, "failed to read from walredo stderr");
149 : }
150 : }
151 281 : }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
152 : );
153 :
154 : Ok(Self {
155 : conf,
156 : tenant_shard_id,
157 : child: Some(child),
158 : stdin: Mutex::new(ProcessInput {
159 : stdin,
160 : n_requests: 0,
161 : }),
162 : stdout: Mutex::new(ProcessOutput {
163 : stdout,
164 : pending_responses: VecDeque::new(),
165 : n_processed_responses: 0,
166 : }),
167 : #[cfg(feature = "testing")]
168 : dump_sequence: AtomicUsize::default(),
169 : })
170 : }
171 :
172 2857406 : pub(crate) fn id(&self) -> u32 {
173 2857406 : self.child
174 2857406 : .as_ref()
175 2857406 : .expect("must not call this during Drop")
176 2857406 : .id()
177 2857406 : }
178 :
179 : // Apply given WAL records ('records') over an old page image. Returns
180 : // new page image.
181 : //
182 2856626 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
183 : pub(crate) fn apply_wal_records(
184 : &self,
185 : rel: RelTag,
186 : blknum: u32,
187 : base_img: &Option<Bytes>,
188 : records: &[(Lsn, NeonWalRecord)],
189 : wal_redo_timeout: Duration,
190 : ) -> anyhow::Result<Bytes> {
191 : let tag = protocol::BufferTag { rel, blknum };
192 : let input = self.stdin.lock().unwrap();
193 :
194 : // Serialize all the messages to send the WAL redo process first.
195 : //
196 : // This could be problematic if there are millions of records to replay,
197 : // but in practice the number of records is usually so small that it doesn't
198 : // matter, and it's better to keep this code simple.
199 : //
200 : // Most requests start with a before-image with BLCKSZ bytes, followed by
201 : // by some other WAL records. Start with a buffer that can hold that
202 : // comfortably.
203 : let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
204 : protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
205 : if let Some(img) = base_img {
206 : protocol::build_push_page_msg(tag, img, &mut writebuf);
207 : }
208 : for (lsn, rec) in records.iter() {
209 : if let NeonWalRecord::Postgres {
210 : will_init: _,
211 : rec: postgres_rec,
212 : } = rec
213 : {
214 : protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
215 : } else {
216 : anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
217 : }
218 : }
219 : protocol::build_get_page_msg(tag, &mut writebuf);
220 : WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
221 :
222 : let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
223 :
224 : if res.is_err() {
225 : // not all of these can be caused by this particular input, however these are so rare
226 : // in tests so capture all.
227 : self.record_and_log(&writebuf);
228 : }
229 :
230 : res
231 : }
232 :
233 2856626 : fn apply_wal_records0(
234 2856626 : &self,
235 2856626 : writebuf: &[u8],
236 2856626 : input: MutexGuard<ProcessInput>,
237 2856626 : wal_redo_timeout: Duration,
238 2856626 : ) -> anyhow::Result<Bytes> {
239 2856626 : let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
240 2856626 : let mut nwrite = 0usize;
241 :
242 5721570 : while nwrite < writebuf.len() {
243 2864944 : let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
244 2864944 : let n = loop {
245 2864944 : match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
246 0 : Err(nix::errno::Errno::EINTR) => continue,
247 2864944 : res => break res,
248 : }
249 0 : }?;
250 :
251 2864944 : if n == 0 {
252 0 : anyhow::bail!("WAL redo timed out");
253 2864944 : }
254 2864944 :
255 2864944 : // If 'stdin' is writeable, do write.
256 2864944 : let in_revents = stdin_pollfds[0].revents().unwrap();
257 2864944 : if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
258 2864944 : nwrite += proc.stdin.write(&writebuf[nwrite..])?;
259 0 : }
260 2864944 : if in_revents.contains(PollFlags::POLLHUP) {
261 : // We still have more data to write, but the process closed the pipe.
262 0 : anyhow::bail!("WAL redo process closed its stdin unexpectedly");
263 2864944 : }
264 : }
265 2856626 : let request_no = proc.n_requests;
266 2856626 : proc.n_requests += 1;
267 2856626 : drop(proc);
268 2856626 :
269 2856626 : // To improve walredo performance we separate sending requests and receiving
270 2856626 : // responses. Them are protected by different mutexes (output and input).
271 2856626 : // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
272 2856626 : // then there is not warranty that T1 will first granted output mutex lock.
273 2856626 : // To address this issue we maintain number of sent requests, number of processed
274 2856626 : // responses and ring buffer with pending responses. After sending response
275 2856626 : // (under input mutex), threads remembers request number. Then it releases
276 2856626 : // input mutex, locks output mutex and fetch in ring buffer all responses until
277 2856626 : // its stored request number. The it takes correspondent element from
278 2856626 : // pending responses ring buffer and truncate all empty elements from the front,
279 2856626 : // advancing processed responses number.
280 2856626 :
281 2856626 : let mut output = self.stdout.lock().unwrap();
282 2856626 : let n_processed_responses = output.n_processed_responses;
283 5713248 : while n_processed_responses + output.pending_responses.len() <= request_no {
284 : // We expect the WAL redo process to respond with an 8k page image. We read it
285 : // into this buffer.
286 2856626 : let mut resultbuf = vec![0; BLCKSZ.into()];
287 2856626 : let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
288 5713248 : while nresult < BLCKSZ.into() {
289 2856626 : let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
290 : // We do two things simultaneously: reading response from stdout
291 : // and forward any logging information that the child writes to its stderr to the page server's log.
292 2856626 : let n = loop {
293 : match nix::poll::poll(
294 2856626 : &mut stdout_pollfds[..],
295 2856626 : wal_redo_timeout.as_millis() as i32,
296 : ) {
297 0 : Err(nix::errno::Errno::EINTR) => continue,
298 2856626 : res => break res,
299 : }
300 0 : }?;
301 :
302 2856626 : if n == 0 {
303 0 : anyhow::bail!("WAL redo timed out");
304 2856626 : }
305 2856626 :
306 2856626 : // If we have some data in stdout, read it to the result buffer.
307 2856626 : let out_revents = stdout_pollfds[0].revents().unwrap();
308 2856626 : if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
309 2856622 : nresult += output.stdout.read(&mut resultbuf[nresult..])?;
310 4 : }
311 2856626 : if out_revents.contains(PollFlags::POLLHUP) {
312 4 : anyhow::bail!("WAL redo process closed its stdout unexpectedly");
313 2856622 : }
314 : }
315 2856622 : output
316 2856622 : .pending_responses
317 2856622 : .push_back(Some(Bytes::from(resultbuf)));
318 : }
319 : // Replace our request's response with None in `pending_responses`.
320 : // Then make space in the ring buffer by clearing out any seqence of contiguous
321 : // `None`'s from the front of `pending_responses`.
322 : // NB: We can't pop_front() because other requests' responses because another
323 : // requester might have grabbed the output mutex before us:
324 : // T1: grab input mutex
325 : // T1: send request_no 23
326 : // T1: release input mutex
327 : // T2: grab input mutex
328 : // T2: send request_no 24
329 : // T2: release input mutex
330 : // T2: grab output mutex
331 : // T2: n_processed_responses + output.pending_responses.len() <= request_no
332 : // 23 0 24
333 : // T2: enters poll loop that reads stdout
334 : // T2: put response for 23 into pending_responses
335 : // T2: put response for 24 into pending_resposnes
336 : // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
337 : // T2: takes its response_24
338 : // pending_responses now looks like this: Front Some(response_23) None Back
339 : // T2: does the while loop below
340 : // pending_responses now looks like this: Front Some(response_23) None Back
341 : // T2: releases output mutex
342 : // T1: grabs output mutex
343 : // T1: n_processed_responses + output.pending_responses.len() > request_no
344 : // 23 2 23
345 : // T1: skips poll loop that reads stdout
346 : // T1: takes its response_23
347 : // pending_responses now looks like this: Front None None Back
348 : // T2: does the while loop below
349 : // pending_responses now looks like this: Front Back
350 : // n_processed_responses now has value 25
351 2856622 : let res = output.pending_responses[request_no - n_processed_responses]
352 2856622 : .take()
353 2856622 : .expect("we own this request_no, nobody else is supposed to take it");
354 5713244 : while let Some(front) = output.pending_responses.front() {
355 2892693 : if front.is_none() {
356 2856622 : output.pending_responses.pop_front();
357 2856622 : output.n_processed_responses += 1;
358 2856622 : } else {
359 36071 : break;
360 : }
361 : }
362 2856622 : Ok(res)
363 2856626 : }
364 :
365 : #[cfg(feature = "testing")]
366 4 : fn record_and_log(&self, writebuf: &[u8]) {
367 4 : use std::sync::atomic::Ordering;
368 4 :
369 4 : let millis = std::time::SystemTime::now()
370 4 : .duration_since(std::time::SystemTime::UNIX_EPOCH)
371 4 : .unwrap()
372 4 : .as_millis();
373 4 :
374 4 : let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
375 4 :
376 4 : // these files will be collected to an allure report
377 4 : let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
378 4 :
379 4 : let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
380 4 :
381 4 : let res = std::fs::OpenOptions::new()
382 4 : .write(true)
383 4 : .create_new(true)
384 4 : .read(true)
385 4 : .open(path)
386 4 : .and_then(|mut f| f.write_all(writebuf));
387 :
388 : // trip up allowed_errors
389 4 : if let Err(e) = res {
390 4 : tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
391 : } else {
392 0 : tracing::error!(filename, "erroring walredo input saved");
393 : }
394 4 : }
395 :
396 : #[cfg(not(feature = "testing"))]
397 : fn record_and_log(&self, _: &[u8]) {}
398 : }
399 :
400 : impl Drop for WalRedoProcess {
401 289 : fn drop(&mut self) {
402 289 : self.child
403 289 : .take()
404 289 : .expect("we only do this once")
405 289 : .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
406 289 : // no way to wait for stderr_logger_task from Drop because that is async only
407 289 : }
408 : }
|