Line data Source code
1 : use self::no_leak_child::NoLeakChild;
2 : use crate::{
3 : config::PageServerConf,
4 : metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER},
5 : walrecord::NeonWalRecord,
6 : walredo::process::{no_leak_child, protocol},
7 : };
8 : use anyhow::Context;
9 : use bytes::Bytes;
10 : use nix::poll::{PollFd, PollFlags};
11 : use pageserver_api::{reltag::RelTag, shard::TenantShardId};
12 : use postgres_ffi::BLCKSZ;
13 : use std::os::fd::AsRawFd;
14 : #[cfg(feature = "testing")]
15 : use std::sync::atomic::AtomicUsize;
16 : use std::{
17 : collections::VecDeque,
18 : io::{Read, Write},
19 : process::{ChildStdin, ChildStdout, Command, Stdio},
20 : sync::{Mutex, MutexGuard},
21 : time::Duration,
22 : };
23 : use tracing::{debug, error, instrument, Instrument};
24 : use utils::{lsn::Lsn, nonblock::set_nonblock};
25 :
26 : pub struct WalRedoProcess {
27 : #[allow(dead_code)]
28 : conf: &'static PageServerConf,
29 : tenant_shard_id: TenantShardId,
30 : // Some() on construction, only becomes None on Drop.
31 : child: Option<NoLeakChild>,
32 : stdout: Mutex<ProcessOutput>,
33 : stdin: Mutex<ProcessInput>,
34 : /// Counter to separate same sized walredo inputs failing at the same millisecond.
35 : #[cfg(feature = "testing")]
36 : dump_sequence: AtomicUsize,
37 : }
38 :
39 : struct ProcessInput {
40 : stdin: ChildStdin,
41 : n_requests: usize,
42 : }
43 :
44 : struct ProcessOutput {
45 : stdout: ChildStdout,
46 : pending_responses: VecDeque<Option<Bytes>>,
47 : n_processed_responses: usize,
48 : }
49 :
50 : impl WalRedoProcess {
51 : //
52 : // Start postgres binary in special WAL redo mode.
53 : //
54 8 : #[instrument(skip_all,fields(pg_version=pg_version))]
55 : pub(crate) fn launch(
56 : conf: &'static PageServerConf,
57 : tenant_shard_id: TenantShardId,
58 : pg_version: u32,
59 : ) -> anyhow::Result<Self> {
60 : crate::span::debug_assert_current_span_has_tenant_id();
61 :
62 : let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
63 : let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
64 :
65 : use no_leak_child::NoLeakChildCommandExt;
66 : // Start postgres itself
67 : let child = Command::new(pg_bin_dir_path.join("postgres"))
68 : // the first arg must be --wal-redo so the child process enters into walredo mode
69 : .arg("--wal-redo")
70 : // the child doesn't process this arg, but, having it in the argv helps indentify the
71 : // walredo process for a particular tenant when debugging a pagserver
72 : .args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
73 : .stdin(Stdio::piped())
74 : .stderr(Stdio::piped())
75 : .stdout(Stdio::piped())
76 : .env_clear()
77 : .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
78 : .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
79 : // NB: The redo process is not trusted after we sent it the first
80 : // walredo work. Before that, it is trusted. Specifically, we trust
81 : // it to
82 : // 1. close all file descriptors except stdin, stdout, stderr because
83 : // pageserver might not be 100% diligent in setting FD_CLOEXEC on all
84 : // the files it opens, and
85 : // 2. to use seccomp to sandbox itself before processing the first
86 : // walredo request.
87 : .spawn_no_leak_child(tenant_shard_id)
88 : .context("spawn process")?;
89 : WAL_REDO_PROCESS_COUNTERS.started.inc();
90 0 : let mut child = scopeguard::guard(child, |child| {
91 0 : error!("killing wal-redo-postgres process due to a problem during launch");
92 0 : child.kill_and_wait(WalRedoKillCause::Startup);
93 0 : });
94 :
95 : let stdin = child.stdin.take().unwrap();
96 : let stdout = child.stdout.take().unwrap();
97 : let stderr = child.stderr.take().unwrap();
98 : let stderr = tokio::process::ChildStderr::from_std(stderr)
99 : .context("convert to tokio::ChildStderr")?;
100 : macro_rules! set_nonblock_or_log_err {
101 : ($file:ident) => {{
102 : let res = set_nonblock($file.as_raw_fd());
103 : if let Err(e) = &res {
104 : error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed");
105 : }
106 : res
107 : }};
108 : }
109 : set_nonblock_or_log_err!(stdin)?;
110 : set_nonblock_or_log_err!(stdout)?;
111 :
112 : // all fallible operations post-spawn are complete, so get rid of the guard
113 : let child = scopeguard::ScopeGuard::into_inner(child);
114 :
115 : tokio::spawn(
116 0 : async move {
117 : scopeguard::defer! {
118 : debug!("wal-redo-postgres stderr_logger_task finished");
119 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
120 : }
121 0 : debug!("wal-redo-postgres stderr_logger_task started");
122 0 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
123 0 :
124 0 : use tokio::io::AsyncBufReadExt;
125 0 : let mut stderr_lines = tokio::io::BufReader::new(stderr);
126 0 : let mut buf = Vec::new();
127 0 : let res = loop {
128 0 : buf.clear();
129 0 : // TODO we don't trust the process to cap its stderr length.
130 0 : // Currently it can do unbounded Vec allocation.
131 0 : match stderr_lines.read_until(b'\n', &mut buf).await {
132 0 : Ok(0) => break Ok(()), // eof
133 0 : Ok(num_bytes) => {
134 0 : let output = String::from_utf8_lossy(&buf[..num_bytes]);
135 0 : error!(%output, "received output");
136 : }
137 0 : Err(e) => {
138 0 : break Err(e);
139 : }
140 : }
141 : };
142 0 : match res {
143 0 : Ok(()) => (),
144 0 : Err(e) => {
145 0 : error!(error=?e, "failed to read from walredo stderr");
146 : }
147 : }
148 0 : }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
149 : );
150 :
151 : Ok(Self {
152 : conf,
153 : tenant_shard_id,
154 : child: Some(child),
155 : stdin: Mutex::new(ProcessInput {
156 : stdin,
157 : n_requests: 0,
158 : }),
159 : stdout: Mutex::new(ProcessOutput {
160 : stdout,
161 : pending_responses: VecDeque::new(),
162 : n_processed_responses: 0,
163 : }),
164 : #[cfg(feature = "testing")]
165 : dump_sequence: AtomicUsize::default(),
166 : })
167 : }
168 :
169 16 : pub(crate) fn id(&self) -> u32 {
170 16 : self.child
171 16 : .as_ref()
172 16 : .expect("must not call this during Drop")
173 16 : .id()
174 16 : }
175 :
176 : // Apply given WAL records ('records') over an old page image. Returns
177 : // new page image.
178 : //
179 16 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
180 : pub(crate) async fn apply_wal_records(
181 : &self,
182 : rel: RelTag,
183 : blknum: u32,
184 : base_img: &Option<Bytes>,
185 : records: &[(Lsn, NeonWalRecord)],
186 : wal_redo_timeout: Duration,
187 : ) -> anyhow::Result<Bytes> {
188 : let tag = protocol::BufferTag { rel, blknum };
189 : let input = self.stdin.lock().unwrap();
190 :
191 : // Serialize all the messages to send the WAL redo process first.
192 : //
193 : // This could be problematic if there are millions of records to replay,
194 : // but in practice the number of records is usually so small that it doesn't
195 : // matter, and it's better to keep this code simple.
196 : //
197 : // Most requests start with a before-image with BLCKSZ bytes, followed by
198 : // by some other WAL records. Start with a buffer that can hold that
199 : // comfortably.
200 : let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
201 : protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
202 : if let Some(img) = base_img {
203 : protocol::build_push_page_msg(tag, img, &mut writebuf);
204 : }
205 : for (lsn, rec) in records.iter() {
206 : if let NeonWalRecord::Postgres {
207 : will_init: _,
208 : rec: postgres_rec,
209 : } = rec
210 : {
211 : protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
212 : } else {
213 : anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
214 : }
215 : }
216 : protocol::build_get_page_msg(tag, &mut writebuf);
217 : WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
218 :
219 : let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
220 :
221 : if res.is_err() {
222 : // not all of these can be caused by this particular input, however these are so rare
223 : // in tests so capture all.
224 : self.record_and_log(&writebuf);
225 : }
226 :
227 : res
228 : }
229 :
230 8 : fn apply_wal_records0(
231 8 : &self,
232 8 : writebuf: &[u8],
233 8 : input: MutexGuard<ProcessInput>,
234 8 : wal_redo_timeout: Duration,
235 8 : ) -> anyhow::Result<Bytes> {
236 8 : let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small.
237 8 : let mut nwrite = 0usize;
238 :
239 16 : while nwrite < writebuf.len() {
240 8 : let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)];
241 8 : let n = loop {
242 8 : match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) {
243 0 : Err(nix::errno::Errno::EINTR) => continue,
244 8 : res => break res,
245 : }
246 0 : }?;
247 :
248 8 : if n == 0 {
249 0 : anyhow::bail!("WAL redo timed out");
250 8 : }
251 8 :
252 8 : // If 'stdin' is writeable, do write.
253 8 : let in_revents = stdin_pollfds[0].revents().unwrap();
254 8 : if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() {
255 8 : nwrite += proc.stdin.write(&writebuf[nwrite..])?;
256 0 : }
257 8 : if in_revents.contains(PollFlags::POLLHUP) {
258 : // We still have more data to write, but the process closed the pipe.
259 0 : anyhow::bail!("WAL redo process closed its stdin unexpectedly");
260 8 : }
261 : }
262 8 : let request_no = proc.n_requests;
263 8 : proc.n_requests += 1;
264 8 : drop(proc);
265 8 :
266 8 : // To improve walredo performance we separate sending requests and receiving
267 8 : // responses. Them are protected by different mutexes (output and input).
268 8 : // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
269 8 : // then there is not warranty that T1 will first granted output mutex lock.
270 8 : // To address this issue we maintain number of sent requests, number of processed
271 8 : // responses and ring buffer with pending responses. After sending response
272 8 : // (under input mutex), threads remembers request number. Then it releases
273 8 : // input mutex, locks output mutex and fetch in ring buffer all responses until
274 8 : // its stored request number. The it takes correspondent element from
275 8 : // pending responses ring buffer and truncate all empty elements from the front,
276 8 : // advancing processed responses number.
277 8 :
278 8 : let mut output = self.stdout.lock().unwrap();
279 8 : let n_processed_responses = output.n_processed_responses;
280 12 : while n_processed_responses + output.pending_responses.len() <= request_no {
281 : // We expect the WAL redo process to respond with an 8k page image. We read it
282 : // into this buffer.
283 8 : let mut resultbuf = vec![0; BLCKSZ.into()];
284 8 : let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far
285 12 : while nresult < BLCKSZ.into() {
286 8 : let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)];
287 : // We do two things simultaneously: reading response from stdout
288 : // and forward any logging information that the child writes to its stderr to the page server's log.
289 8 : let n = loop {
290 : match nix::poll::poll(
291 8 : &mut stdout_pollfds[..],
292 8 : wal_redo_timeout.as_millis() as i32,
293 : ) {
294 0 : Err(nix::errno::Errno::EINTR) => continue,
295 8 : res => break res,
296 : }
297 0 : }?;
298 :
299 8 : if n == 0 {
300 0 : anyhow::bail!("WAL redo timed out");
301 8 : }
302 8 :
303 8 : // If we have some data in stdout, read it to the result buffer.
304 8 : let out_revents = stdout_pollfds[0].revents().unwrap();
305 8 : if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() {
306 4 : nresult += output.stdout.read(&mut resultbuf[nresult..])?;
307 4 : }
308 8 : if out_revents.contains(PollFlags::POLLHUP) {
309 4 : anyhow::bail!("WAL redo process closed its stdout unexpectedly");
310 4 : }
311 : }
312 4 : output
313 4 : .pending_responses
314 4 : .push_back(Some(Bytes::from(resultbuf)));
315 : }
316 : // Replace our request's response with None in `pending_responses`.
317 : // Then make space in the ring buffer by clearing out any seqence of contiguous
318 : // `None`'s from the front of `pending_responses`.
319 : // NB: We can't pop_front() because other requests' responses because another
320 : // requester might have grabbed the output mutex before us:
321 : // T1: grab input mutex
322 : // T1: send request_no 23
323 : // T1: release input mutex
324 : // T2: grab input mutex
325 : // T2: send request_no 24
326 : // T2: release input mutex
327 : // T2: grab output mutex
328 : // T2: n_processed_responses + output.pending_responses.len() <= request_no
329 : // 23 0 24
330 : // T2: enters poll loop that reads stdout
331 : // T2: put response for 23 into pending_responses
332 : // T2: put response for 24 into pending_resposnes
333 : // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
334 : // T2: takes its response_24
335 : // pending_responses now looks like this: Front Some(response_23) None Back
336 : // T2: does the while loop below
337 : // pending_responses now looks like this: Front Some(response_23) None Back
338 : // T2: releases output mutex
339 : // T1: grabs output mutex
340 : // T1: n_processed_responses + output.pending_responses.len() > request_no
341 : // 23 2 23
342 : // T1: skips poll loop that reads stdout
343 : // T1: takes its response_23
344 : // pending_responses now looks like this: Front None None Back
345 : // T2: does the while loop below
346 : // pending_responses now looks like this: Front Back
347 : // n_processed_responses now has value 25
348 4 : let res = output.pending_responses[request_no - n_processed_responses]
349 4 : .take()
350 4 : .expect("we own this request_no, nobody else is supposed to take it");
351 8 : while let Some(front) = output.pending_responses.front() {
352 4 : if front.is_none() {
353 4 : output.pending_responses.pop_front();
354 4 : output.n_processed_responses += 1;
355 4 : } else {
356 0 : break;
357 : }
358 : }
359 4 : Ok(res)
360 8 : }
361 :
362 : #[cfg(feature = "testing")]
363 4 : fn record_and_log(&self, writebuf: &[u8]) {
364 4 : use std::sync::atomic::Ordering;
365 4 :
366 4 : let millis = std::time::SystemTime::now()
367 4 : .duration_since(std::time::SystemTime::UNIX_EPOCH)
368 4 : .unwrap()
369 4 : .as_millis();
370 4 :
371 4 : let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
372 4 :
373 4 : // these files will be collected to an allure report
374 4 : let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
375 4 :
376 4 : let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
377 4 :
378 4 : let res = std::fs::OpenOptions::new()
379 4 : .write(true)
380 4 : .create_new(true)
381 4 : .read(true)
382 4 : .open(path)
383 4 : .and_then(|mut f| f.write_all(writebuf));
384 :
385 : // trip up allowed_errors
386 4 : if let Err(e) = res {
387 4 : tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
388 : } else {
389 0 : tracing::error!(filename, "erroring walredo input saved");
390 : }
391 4 : }
392 :
393 : #[cfg(not(feature = "testing"))]
394 : fn record_and_log(&self, _: &[u8]) {}
395 : }
396 :
397 : impl Drop for WalRedoProcess {
398 8 : fn drop(&mut self) {
399 8 : self.child
400 8 : .take()
401 8 : .expect("we only do this once")
402 8 : .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
403 8 : // no way to wait for stderr_logger_task from Drop because that is async only
404 8 : }
405 : }
|