Line data Source code
1 : mod no_leak_child;
2 : /// The IPC protocol that pageserver and walredo process speak over their shared pipe.
3 : mod protocol;
4 :
5 : use std::collections::VecDeque;
6 : use std::process::{Command, Stdio};
7 : #[cfg(feature = "testing")]
8 : use std::sync::atomic::AtomicUsize;
9 : use std::time::Duration;
10 :
11 : use anyhow::Context;
12 : use bytes::Bytes;
13 : use pageserver_api::record::NeonWalRecord;
14 : use pageserver_api::reltag::RelTag;
15 : use pageserver_api::shard::TenantShardId;
16 : use postgres_ffi::BLCKSZ;
17 : use tokio::io::{AsyncReadExt, AsyncWriteExt};
18 : use tracing::{Instrument, debug, error, instrument};
19 : use utils::lsn::Lsn;
20 : use utils::poison::Poison;
21 :
22 : use self::no_leak_child::NoLeakChild;
23 : use crate::config::PageServerConf;
24 : use crate::metrics::{WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER, WalRedoKillCause};
25 : use crate::page_cache::PAGE_SZ;
26 : use crate::span::debug_assert_current_span_has_tenant_id;
27 :
28 : pub struct WalRedoProcess {
29 : #[allow(dead_code)]
30 : conf: &'static PageServerConf,
31 : #[cfg(feature = "testing")]
32 : tenant_shard_id: TenantShardId,
33 : // Some() on construction, only becomes None on Drop.
34 : child: Option<NoLeakChild>,
35 : stdout: tokio::sync::Mutex<Poison<ProcessOutput>>,
36 : stdin: tokio::sync::Mutex<Poison<ProcessInput>>,
37 : /// Counter to separate same sized walredo inputs failing at the same millisecond.
38 : #[cfg(feature = "testing")]
39 : dump_sequence: AtomicUsize,
40 : }
41 :
42 : struct ProcessInput {
43 : stdin: tokio::process::ChildStdin,
44 : n_requests: usize,
45 : }
46 :
47 : struct ProcessOutput {
48 : stdout: tokio::process::ChildStdout,
49 : pending_responses: VecDeque<Option<Bytes>>,
50 : n_processed_responses: usize,
51 : }
52 :
53 : impl WalRedoProcess {
54 : //
55 : // Start postgres binary in special WAL redo mode.
56 : //
57 : #[instrument(skip_all,fields(pg_version=pg_version))]
58 : pub(crate) fn launch(
59 : conf: &'static PageServerConf,
60 : tenant_shard_id: TenantShardId,
61 : pg_version: u32,
62 : ) -> anyhow::Result<Self> {
63 : crate::span::debug_assert_current_span_has_tenant_id();
64 :
65 : let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
66 : let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
67 :
68 : use no_leak_child::NoLeakChildCommandExt;
69 : // Start postgres itself
70 : let child = Command::new(pg_bin_dir_path.join("postgres"))
71 : // the first arg must be --wal-redo so the child process enters into walredo mode
72 : .arg("--wal-redo")
73 : // the child doesn't process this arg, but, having it in the argv helps indentify the
74 : // walredo process for a particular tenant when debugging a pagserver
75 : .args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
76 : .stdin(Stdio::piped())
77 : .stderr(Stdio::piped())
78 : .stdout(Stdio::piped())
79 : .env_clear()
80 : .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
81 : .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
82 : .env(
83 : "ASAN_OPTIONS",
84 : std::env::var("ASAN_OPTIONS").unwrap_or_default(),
85 : )
86 : .env(
87 : "UBSAN_OPTIONS",
88 : std::env::var("UBSAN_OPTIONS").unwrap_or_default(),
89 : )
90 : // NB: The redo process is not trusted after we sent it the first
91 : // walredo work. Before that, it is trusted. Specifically, we trust
92 : // it to
93 : // 1. close all file descriptors except stdin, stdout, stderr because
94 : // pageserver might not be 100% diligent in setting FD_CLOEXEC on all
95 : // the files it opens, and
96 : // 2. to use seccomp to sandbox itself before processing the first
97 : // walredo request.
98 : .spawn_no_leak_child(tenant_shard_id)
99 : .context("spawn process")?;
100 : WAL_REDO_PROCESS_COUNTERS.started.inc();
101 0 : let mut child = scopeguard::guard(child, |child| {
102 0 : error!("killing wal-redo-postgres process due to a problem during launch");
103 0 : child.kill_and_wait(WalRedoKillCause::Startup);
104 0 : });
105 :
106 : let stdin = child.stdin.take().unwrap();
107 : let stdout = child.stdout.take().unwrap();
108 : let stderr = child.stderr.take().unwrap();
109 : let stderr = tokio::process::ChildStderr::from_std(stderr)
110 : .context("convert to tokio::ChildStderr")?;
111 : let stdin =
112 : tokio::process::ChildStdin::from_std(stdin).context("convert to tokio::ChildStdin")?;
113 : let stdout = tokio::process::ChildStdout::from_std(stdout)
114 : .context("convert to tokio::ChildStdout")?;
115 :
116 : // all fallible operations post-spawn are complete, so get rid of the guard
117 : let child = scopeguard::ScopeGuard::into_inner(child);
118 :
119 : tokio::spawn(
120 20 : async move {
121 20 : scopeguard::defer! {
122 20 : debug!("wal-redo-postgres stderr_logger_task finished");
123 20 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
124 20 : }
125 20 : debug!("wal-redo-postgres stderr_logger_task started");
126 20 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
127 :
128 : use tokio::io::AsyncBufReadExt;
129 20 : let mut stderr_lines = tokio::io::BufReader::new(stderr);
130 20 : let mut buf = Vec::new();
131 5 : let res = loop {
132 60 : buf.clear();
133 60 : // TODO we don't trust the process to cap its stderr length.
134 60 : // Currently it can do unbounded Vec allocation.
135 60 : match stderr_lines.read_until(b'\n', &mut buf).await {
136 5 : Ok(0) => break Ok(()), // eof
137 40 : Ok(num_bytes) => {
138 40 : let output = String::from_utf8_lossy(&buf[..num_bytes]);
139 40 : if !output.contains("LOG:") {
140 32 : error!(%output, "received output");
141 8 : }
142 : }
143 0 : Err(e) => {
144 0 : break Err(e);
145 : }
146 : }
147 : };
148 5 : match res {
149 5 : Ok(()) => (),
150 0 : Err(e) => {
151 0 : error!(error=?e, "failed to read from walredo stderr");
152 : }
153 : }
154 5 : }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
155 : );
156 :
157 : Ok(Self {
158 : conf,
159 : #[cfg(feature = "testing")]
160 : tenant_shard_id,
161 : child: Some(child),
162 : stdin: tokio::sync::Mutex::new(Poison::new(
163 : "stdin",
164 : ProcessInput {
165 : stdin,
166 : n_requests: 0,
167 : },
168 : )),
169 : stdout: tokio::sync::Mutex::new(Poison::new(
170 : "stdout",
171 : ProcessOutput {
172 : stdout,
173 : pending_responses: VecDeque::new(),
174 : n_processed_responses: 0,
175 : },
176 : )),
177 : #[cfg(feature = "testing")]
178 : dump_sequence: AtomicUsize::default(),
179 : })
180 : }
181 :
182 36 : pub(crate) fn id(&self) -> u32 {
183 36 : self.child
184 36 : .as_ref()
185 36 : .expect("must not call this during Drop")
186 36 : .id()
187 36 : }
188 :
189 : /// Apply given WAL records ('records') over an old page image. Returns
190 : /// new page image.
191 : ///
192 : /// # Cancel-Safety
193 : ///
194 : /// Cancellation safe.
195 : #[instrument(skip_all, fields(pid=%self.id()))]
196 : pub(crate) async fn apply_wal_records(
197 : &self,
198 : rel: RelTag,
199 : blknum: u32,
200 : base_img: &Option<Bytes>,
201 : records: &[(Lsn, NeonWalRecord)],
202 : wal_redo_timeout: Duration,
203 : ) -> anyhow::Result<Bytes> {
204 : debug_assert_current_span_has_tenant_id();
205 :
206 : let tag = protocol::BufferTag { rel, blknum };
207 :
208 : // Serialize all the messages to send the WAL redo process first.
209 : //
210 : // This could be problematic if there are millions of records to replay,
211 : // but in practice the number of records is usually so small that it doesn't
212 : // matter, and it's better to keep this code simple.
213 : //
214 : // Most requests start with a before-image with BLCKSZ bytes, followed by
215 : // by some other WAL records. Start with a buffer that can hold that
216 : // comfortably.
217 : let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
218 : protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
219 : if let Some(img) = base_img {
220 : protocol::build_push_page_msg(tag, img, &mut writebuf);
221 : }
222 : for (lsn, rec) in records.iter() {
223 : if let NeonWalRecord::Postgres {
224 : will_init: _,
225 : rec: postgres_rec,
226 : } = rec
227 : {
228 : protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
229 : } else {
230 : anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
231 : }
232 : }
233 : protocol::build_get_page_msg(tag, &mut writebuf);
234 : WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
235 :
236 : let Ok(res) =
237 : tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await
238 : else {
239 : anyhow::bail!("WAL redo timed out");
240 : };
241 :
242 : if res.is_err() {
243 : // not all of these can be caused by this particular input, however these are so rare
244 : // in tests so capture all.
245 : self.record_and_log(&writebuf);
246 : }
247 :
248 : res
249 : }
250 :
251 : /// Do a ping request-response roundtrip.
252 : ///
253 : /// Not used in production, but by Rust benchmarks.
254 4 : pub(crate) async fn ping(&self, timeout: Duration) -> anyhow::Result<()> {
255 4 : let mut writebuf: Vec<u8> = Vec::with_capacity(4);
256 4 : protocol::build_ping_msg(&mut writebuf);
257 4 : let Ok(res) = tokio::time::timeout(timeout, self.apply_wal_records0(&writebuf)).await
258 : else {
259 0 : anyhow::bail!("WAL redo ping timed out");
260 : };
261 4 : let response = res?;
262 4 : if response.len() != PAGE_SZ {
263 0 : anyhow::bail!(
264 0 : "WAL redo ping response should respond with page-sized response: {}",
265 0 : response.len()
266 0 : );
267 4 : }
268 4 : Ok(())
269 4 : }
270 :
271 : /// # Cancel-Safety
272 : ///
273 : /// When not polled to completion (e.g. because in `tokio::select!` another
274 : /// branch becomes ready before this future), concurrent and subsequent
275 : /// calls may fail due to [`utils::poison::Poison::check_and_arm`] calls.
276 : /// Dispose of this process instance and create a new one.
277 20 : async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result<Bytes> {
278 20 : let request_no = {
279 20 : let mut lock_guard = self.stdin.lock().await;
280 20 : let mut poison_guard = lock_guard.check_and_arm()?;
281 20 : let input = poison_guard.data_mut();
282 20 : input
283 20 : .stdin
284 20 : .write_all(writebuf)
285 20 : .await
286 20 : .context("write to walredo stdin")?;
287 20 : let request_no = input.n_requests;
288 20 : input.n_requests += 1;
289 20 : poison_guard.disarm();
290 20 : request_no
291 : };
292 :
293 : // To improve walredo performance we separate sending requests and receiving
294 : // responses. Them are protected by different mutexes (output and input).
295 : // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
296 : // then there is not warranty that T1 will first granted output mutex lock.
297 : // To address this issue we maintain number of sent requests, number of processed
298 : // responses and ring buffer with pending responses. After sending response
299 : // (under input mutex), threads remembers request number. Then it releases
300 : // input mutex, locks output mutex and fetch in ring buffer all responses until
301 : // its stored request number. The it takes correspondent element from
302 : // pending responses ring buffer and truncate all empty elements from the front,
303 : // advancing processed responses number.
304 :
305 20 : let mut lock_guard = self.stdout.lock().await;
306 20 : let mut poison_guard = lock_guard.check_and_arm()?;
307 20 : let output = poison_guard.data_mut();
308 20 : let n_processed_responses = output.n_processed_responses;
309 32 : while n_processed_responses + output.pending_responses.len() <= request_no {
310 : // We expect the WAL redo process to respond with an 8k page image. We read it
311 : // into this buffer.
312 20 : let mut resultbuf = vec![0; BLCKSZ.into()];
313 20 : output
314 20 : .stdout
315 20 : .read_exact(&mut resultbuf)
316 20 : .await
317 20 : .context("read walredo stdout")?;
318 12 : output
319 12 : .pending_responses
320 12 : .push_back(Some(Bytes::from(resultbuf)));
321 : }
322 : // Replace our request's response with None in `pending_responses`.
323 : // Then make space in the ring buffer by clearing out any seqence of contiguous
324 : // `None`'s from the front of `pending_responses`.
325 : // NB: We can't pop_front() because other requests' responses because another
326 : // requester might have grabbed the output mutex before us:
327 : // T1: grab input mutex
328 : // T1: send request_no 23
329 : // T1: release input mutex
330 : // T2: grab input mutex
331 : // T2: send request_no 24
332 : // T2: release input mutex
333 : // T2: grab output mutex
334 : // T2: n_processed_responses + output.pending_responses.len() <= request_no
335 : // 23 0 24
336 : // T2: enters poll loop that reads stdout
337 : // T2: put response for 23 into pending_responses
338 : // T2: put response for 24 into pending_resposnes
339 : // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
340 : // T2: takes its response_24
341 : // pending_responses now looks like this: Front Some(response_23) None Back
342 : // T2: does the while loop below
343 : // pending_responses now looks like this: Front Some(response_23) None Back
344 : // T2: releases output mutex
345 : // T1: grabs output mutex
346 : // T1: n_processed_responses + output.pending_responses.len() > request_no
347 : // 23 2 23
348 : // T1: skips poll loop that reads stdout
349 : // T1: takes its response_23
350 : // pending_responses now looks like this: Front None None Back
351 : // T2: does the while loop below
352 : // pending_responses now looks like this: Front Back
353 : // n_processed_responses now has value 25
354 12 : let res = output.pending_responses[request_no - n_processed_responses]
355 12 : .take()
356 12 : .expect("we own this request_no, nobody else is supposed to take it");
357 24 : while let Some(front) = output.pending_responses.front() {
358 12 : if front.is_none() {
359 12 : output.pending_responses.pop_front();
360 12 : output.n_processed_responses += 1;
361 12 : } else {
362 0 : break;
363 : }
364 : }
365 12 : poison_guard.disarm();
366 12 : Ok(res)
367 20 : }
368 :
369 : #[cfg(feature = "testing")]
370 8 : fn record_and_log(&self, writebuf: &[u8]) {
371 : use std::sync::atomic::Ordering;
372 :
373 8 : let millis = std::time::SystemTime::now()
374 8 : .duration_since(std::time::SystemTime::UNIX_EPOCH)
375 8 : .unwrap()
376 8 : .as_millis();
377 8 :
378 8 : let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
379 8 :
380 8 : // these files will be collected to an allure report
381 8 : let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
382 8 :
383 8 : let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
384 :
385 : use std::io::Write;
386 8 : let res = std::fs::OpenOptions::new()
387 8 : .write(true)
388 8 : .create_new(true)
389 8 : .read(true)
390 8 : .open(path)
391 8 : .and_then(|mut f| f.write_all(writebuf));
392 :
393 : // trip up allowed_errors
394 8 : if let Err(e) = res {
395 8 : tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
396 : } else {
397 0 : tracing::error!(filename, "erroring walredo input saved");
398 : }
399 8 : }
400 :
401 : #[cfg(not(feature = "testing"))]
402 : fn record_and_log(&self, _: &[u8]) {}
403 : }
404 :
405 : impl Drop for WalRedoProcess {
406 20 : fn drop(&mut self) {
407 20 : self.child
408 20 : .take()
409 20 : .expect("we only do this once")
410 20 : .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
411 20 : // no way to wait for stderr_logger_task from Drop because that is async only
412 20 : }
413 : }
|