Line data Source code
1 : mod no_leak_child;
2 : /// The IPC protocol that pageserver and walredo process speak over their shared pipe.
3 : mod protocol;
4 :
5 : use self::no_leak_child::NoLeakChild;
6 : use crate::{
7 : config::PageServerConf,
8 : metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER},
9 : page_cache::PAGE_SZ,
10 : span::debug_assert_current_span_has_tenant_id,
11 : };
12 : use anyhow::Context;
13 : use bytes::Bytes;
14 : use pageserver_api::record::NeonWalRecord;
15 : use pageserver_api::{reltag::RelTag, shard::TenantShardId};
16 : use postgres_ffi::BLCKSZ;
17 : #[cfg(feature = "testing")]
18 : use std::sync::atomic::AtomicUsize;
19 : use std::{
20 : collections::VecDeque,
21 : process::{Command, Stdio},
22 : time::Duration,
23 : };
24 : use tokio::io::{AsyncReadExt, AsyncWriteExt};
25 : use tracing::{debug, error, instrument, Instrument};
26 : use utils::{lsn::Lsn, poison::Poison};
27 :
28 : pub struct WalRedoProcess {
29 : #[allow(dead_code)]
30 : conf: &'static PageServerConf,
31 : #[cfg(feature = "testing")]
32 : tenant_shard_id: TenantShardId,
33 : // Some() on construction, only becomes None on Drop.
34 : child: Option<NoLeakChild>,
35 : stdout: tokio::sync::Mutex<Poison<ProcessOutput>>,
36 : stdin: tokio::sync::Mutex<Poison<ProcessInput>>,
37 : /// Counter to separate same sized walredo inputs failing at the same millisecond.
38 : #[cfg(feature = "testing")]
39 : dump_sequence: AtomicUsize,
40 : }
41 :
42 : struct ProcessInput {
43 : stdin: tokio::process::ChildStdin,
44 : n_requests: usize,
45 : }
46 :
47 : struct ProcessOutput {
48 : stdout: tokio::process::ChildStdout,
49 : pending_responses: VecDeque<Option<Bytes>>,
50 : n_processed_responses: usize,
51 : }
52 :
53 : impl WalRedoProcess {
54 : //
55 : // Start postgres binary in special WAL redo mode.
56 : //
57 : #[instrument(skip_all,fields(pg_version=pg_version))]
58 : pub(crate) fn launch(
59 : conf: &'static PageServerConf,
60 : tenant_shard_id: TenantShardId,
61 : pg_version: u32,
62 : ) -> anyhow::Result<Self> {
63 : crate::span::debug_assert_current_span_has_tenant_id();
64 :
65 : let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
66 : let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
67 :
68 : use no_leak_child::NoLeakChildCommandExt;
69 : // Start postgres itself
70 : let child = Command::new(pg_bin_dir_path.join("postgres"))
71 : // the first arg must be --wal-redo so the child process enters into walredo mode
72 : .arg("--wal-redo")
73 : // the child doesn't process this arg, but, having it in the argv helps indentify the
74 : // walredo process for a particular tenant when debugging a pagserver
75 : .args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
76 : .stdin(Stdio::piped())
77 : .stderr(Stdio::piped())
78 : .stdout(Stdio::piped())
79 : .env_clear()
80 : .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
81 : .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
82 : .env(
83 : "ASAN_OPTIONS",
84 : std::env::var("ASAN_OPTIONS").unwrap_or_default(),
85 : )
86 : .env(
87 : "UBSAN_OPTIONS",
88 : std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
89 : )
90 : // NB: The redo process is not trusted after we sent it the first
91 : // walredo work. Before that, it is trusted. Specifically, we trust
92 : // it to
93 : // 1. close all file descriptors except stdin, stdout, stderr because
94 : // pageserver might not be 100% diligent in setting FD_CLOEXEC on all
95 : // the files it opens, and
96 : // 2. to use seccomp to sandbox itself before processing the first
97 : // walredo request.
98 : .spawn_no_leak_child(tenant_shard_id)
99 : .context("spawn process")?;
100 : WAL_REDO_PROCESS_COUNTERS.started.inc();
101 0 : let mut child = scopeguard::guard(child, |child| {
102 0 : error!("killing wal-redo-postgres process due to a problem during launch");
103 0 : child.kill_and_wait(WalRedoKillCause::Startup);
104 0 : });
105 :
106 : let stdin = child.stdin.take().unwrap();
107 : let stdout = child.stdout.take().unwrap();
108 : let stderr = child.stderr.take().unwrap();
109 : let stderr = tokio::process::ChildStderr::from_std(stderr)
110 : .context("convert to tokio::ChildStderr")?;
111 : let stdin =
112 : tokio::process::ChildStdin::from_std(stdin).context("convert to tokio::ChildStdin")?;
113 : let stdout = tokio::process::ChildStdout::from_std(stdout)
114 : .context("convert to tokio::ChildStdout")?;
115 :
116 : // all fallible operations post-spawn are complete, so get rid of the guard
117 : let child = scopeguard::ScopeGuard::into_inner(child);
118 :
119 : tokio::spawn(
120 20 : async move {
121 20 : scopeguard::defer! {
122 20 : debug!("wal-redo-postgres stderr_logger_task finished");
123 20 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
124 20 : }
125 20 : debug!("wal-redo-postgres stderr_logger_task started");
126 20 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
127 :
128 : use tokio::io::AsyncBufReadExt;
129 20 : let mut stderr_lines = tokio::io::BufReader::new(stderr);
130 20 : let mut buf = Vec::new();
131 4 : let res = loop {
132 60 : buf.clear();
133 60 : // TODO we don't trust the process to cap its stderr length.
134 60 : // Currently it can do unbounded Vec allocation.
135 60 : match stderr_lines.read_until(b'\n', &mut buf).await {
136 4 : Ok(0) => break Ok(()), // eof
137 40 : Ok(num_bytes) => {
138 40 : let output = String::from_utf8_lossy(&buf[..num_bytes]);
139 40 : error!(%output, "received output");
140 : }
141 0 : Err(e) => {
142 0 : break Err(e);
143 : }
144 : }
145 : };
146 4 : match res {
147 4 : Ok(()) => (),
148 0 : Err(e) => {
149 0 : error!(error=?e, "failed to read from walredo stderr");
150 : }
151 : }
152 4 : }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
153 : );
154 :
155 : Ok(Self {
156 : conf,
157 : #[cfg(feature = "testing")]
158 : tenant_shard_id,
159 : child: Some(child),
160 : stdin: tokio::sync::Mutex::new(Poison::new(
161 : "stdin",
162 : ProcessInput {
163 : stdin,
164 : n_requests: 0,
165 : },
166 : )),
167 : stdout: tokio::sync::Mutex::new(Poison::new(
168 : "stdout",
169 : ProcessOutput {
170 : stdout,
171 : pending_responses: VecDeque::new(),
172 : n_processed_responses: 0,
173 : },
174 : )),
175 : #[cfg(feature = "testing")]
176 : dump_sequence: AtomicUsize::default(),
177 : })
178 : }
179 :
180 36 : pub(crate) fn id(&self) -> u32 {
181 36 : self.child
182 36 : .as_ref()
183 36 : .expect("must not call this during Drop")
184 36 : .id()
185 36 : }
186 :
187 : /// Apply given WAL records ('records') over an old page image. Returns
188 : /// new page image.
189 : ///
190 : /// # Cancel-Safety
191 : ///
192 : /// Cancellation safe.
193 : #[instrument(skip_all, fields(pid=%self.id()))]
194 : pub(crate) async fn apply_wal_records(
195 : &self,
196 : rel: RelTag,
197 : blknum: u32,
198 : base_img: &Option<Bytes>,
199 : records: &[(Lsn, NeonWalRecord)],
200 : wal_redo_timeout: Duration,
201 : ) -> anyhow::Result<Bytes> {
202 : debug_assert_current_span_has_tenant_id();
203 :
204 : let tag = protocol::BufferTag { rel, blknum };
205 :
206 : // Serialize all the messages to send the WAL redo process first.
207 : //
208 : // This could be problematic if there are millions of records to replay,
209 : // but in practice the number of records is usually so small that it doesn't
210 : // matter, and it's better to keep this code simple.
211 : //
212 : // Most requests start with a before-image with BLCKSZ bytes, followed by
213 : // by some other WAL records. Start with a buffer that can hold that
214 : // comfortably.
215 : let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
216 : protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
217 : if let Some(img) = base_img {
218 : protocol::build_push_page_msg(tag, img, &mut writebuf);
219 : }
220 : for (lsn, rec) in records.iter() {
221 : if let NeonWalRecord::Postgres {
222 : will_init: _,
223 : rec: postgres_rec,
224 : } = rec
225 : {
226 : protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
227 : } else {
228 : anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
229 : }
230 : }
231 : protocol::build_get_page_msg(tag, &mut writebuf);
232 : WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
233 :
234 : let Ok(res) =
235 : tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await
236 : else {
237 : anyhow::bail!("WAL redo timed out");
238 : };
239 :
240 : if res.is_err() {
241 : // not all of these can be caused by this particular input, however these are so rare
242 : // in tests so capture all.
243 : self.record_and_log(&writebuf);
244 : }
245 :
246 : res
247 : }
248 :
249 : /// Do a ping request-response roundtrip.
250 : ///
251 : /// Not used in production, but by Rust benchmarks.
252 4 : pub(crate) async fn ping(&self, timeout: Duration) -> anyhow::Result<()> {
253 4 : let mut writebuf: Vec<u8> = Vec::with_capacity(4);
254 4 : protocol::build_ping_msg(&mut writebuf);
255 4 : let Ok(res) = tokio::time::timeout(timeout, self.apply_wal_records0(&writebuf)).await
256 : else {
257 0 : anyhow::bail!("WAL redo ping timed out");
258 : };
259 4 : let response = res?;
260 4 : if response.len() != PAGE_SZ {
261 0 : anyhow::bail!(
262 0 : "WAL redo ping response should respond with page-sized response: {}",
263 0 : response.len()
264 0 : );
265 4 : }
266 4 : Ok(())
267 4 : }
268 :
269 : /// # Cancel-Safety
270 : ///
271 : /// When not polled to completion (e.g. because in `tokio::select!` another
272 : /// branch becomes ready before this future), concurrent and subsequent
273 : /// calls may fail due to [`utils::poison::Poison::check_and_arm`] calls.
274 : /// Dispose of this process instance and create a new one.
275 20 : async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result<Bytes> {
276 20 : let request_no = {
277 20 : let mut lock_guard = self.stdin.lock().await;
278 20 : let mut poison_guard = lock_guard.check_and_arm()?;
279 20 : let input = poison_guard.data_mut();
280 20 : input
281 20 : .stdin
282 20 : .write_all(writebuf)
283 20 : .await
284 20 : .context("write to walredo stdin")?;
285 20 : let request_no = input.n_requests;
286 20 : input.n_requests += 1;
287 20 : poison_guard.disarm();
288 20 : request_no
289 : };
290 :
291 : // To improve walredo performance we separate sending requests and receiving
292 : // responses. Them are protected by different mutexes (output and input).
293 : // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
294 : // then there is not warranty that T1 will first granted output mutex lock.
295 : // To address this issue we maintain number of sent requests, number of processed
296 : // responses and ring buffer with pending responses. After sending response
297 : // (under input mutex), threads remembers request number. Then it releases
298 : // input mutex, locks output mutex and fetch in ring buffer all responses until
299 : // its stored request number. The it takes correspondent element from
300 : // pending responses ring buffer and truncate all empty elements from the front,
301 : // advancing processed responses number.
302 :
303 20 : let mut lock_guard = self.stdout.lock().await;
304 20 : let mut poison_guard = lock_guard.check_and_arm()?;
305 20 : let output = poison_guard.data_mut();
306 20 : let n_processed_responses = output.n_processed_responses;
307 32 : while n_processed_responses + output.pending_responses.len() <= request_no {
308 : // We expect the WAL redo process to respond with an 8k page image. We read it
309 : // into this buffer.
310 20 : let mut resultbuf = vec![0; BLCKSZ.into()];
311 20 : output
312 20 : .stdout
313 20 : .read_exact(&mut resultbuf)
314 20 : .await
315 20 : .context("read walredo stdout")?;
316 12 : output
317 12 : .pending_responses
318 12 : .push_back(Some(Bytes::from(resultbuf)));
319 : }
320 : // Replace our request's response with None in `pending_responses`.
321 : // Then make space in the ring buffer by clearing out any seqence of contiguous
322 : // `None`'s from the front of `pending_responses`.
323 : // NB: We can't pop_front() because other requests' responses because another
324 : // requester might have grabbed the output mutex before us:
325 : // T1: grab input mutex
326 : // T1: send request_no 23
327 : // T1: release input mutex
328 : // T2: grab input mutex
329 : // T2: send request_no 24
330 : // T2: release input mutex
331 : // T2: grab output mutex
332 : // T2: n_processed_responses + output.pending_responses.len() <= request_no
333 : // 23 0 24
334 : // T2: enters poll loop that reads stdout
335 : // T2: put response for 23 into pending_responses
336 : // T2: put response for 24 into pending_resposnes
337 : // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
338 : // T2: takes its response_24
339 : // pending_responses now looks like this: Front Some(response_23) None Back
340 : // T2: does the while loop below
341 : // pending_responses now looks like this: Front Some(response_23) None Back
342 : // T2: releases output mutex
343 : // T1: grabs output mutex
344 : // T1: n_processed_responses + output.pending_responses.len() > request_no
345 : // 23 2 23
346 : // T1: skips poll loop that reads stdout
347 : // T1: takes its response_23
348 : // pending_responses now looks like this: Front None None Back
349 : // T2: does the while loop below
350 : // pending_responses now looks like this: Front Back
351 : // n_processed_responses now has value 25
352 12 : let res = output.pending_responses[request_no - n_processed_responses]
353 12 : .take()
354 12 : .expect("we own this request_no, nobody else is supposed to take it");
355 24 : while let Some(front) = output.pending_responses.front() {
356 12 : if front.is_none() {
357 12 : output.pending_responses.pop_front();
358 12 : output.n_processed_responses += 1;
359 12 : } else {
360 0 : break;
361 : }
362 : }
363 12 : poison_guard.disarm();
364 12 : Ok(res)
365 20 : }
366 :
367 : #[cfg(feature = "testing")]
368 8 : fn record_and_log(&self, writebuf: &[u8]) {
369 : use std::sync::atomic::Ordering;
370 :
371 8 : let millis = std::time::SystemTime::now()
372 8 : .duration_since(std::time::SystemTime::UNIX_EPOCH)
373 8 : .unwrap()
374 8 : .as_millis();
375 8 :
376 8 : let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
377 8 :
378 8 : // these files will be collected to an allure report
379 8 : let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
380 8 :
381 8 : let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
382 :
383 : use std::io::Write;
384 8 : let res = std::fs::OpenOptions::new()
385 8 : .write(true)
386 8 : .create_new(true)
387 8 : .read(true)
388 8 : .open(path)
389 8 : .and_then(|mut f| f.write_all(writebuf));
390 :
391 : // trip up allowed_errors
392 8 : if let Err(e) = res {
393 8 : tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
394 : } else {
395 0 : tracing::error!(filename, "erroring walredo input saved");
396 : }
397 8 : }
398 :
399 : #[cfg(not(feature = "testing"))]
400 : fn record_and_log(&self, _: &[u8]) {}
401 : }
402 :
403 : impl Drop for WalRedoProcess {
404 20 : fn drop(&mut self) {
405 20 : self.child
406 20 : .take()
407 20 : .expect("we only do this once")
408 20 : .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
409 20 : // no way to wait for stderr_logger_task from Drop because that is async only
410 20 : }
411 : }
|