Line data Source code
1 : mod no_leak_child;
2 : /// The IPC protocol that pageserver and walredo process speak over their shared pipe.
3 : mod protocol;
4 :
5 : use self::no_leak_child::NoLeakChild;
6 : use crate::{
7 : config::PageServerConf,
8 : metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER},
9 : page_cache::PAGE_SZ,
10 : span::debug_assert_current_span_has_tenant_id,
11 : };
12 : use anyhow::Context;
13 : use bytes::Bytes;
14 : use pageserver_api::record::NeonWalRecord;
15 : use pageserver_api::{reltag::RelTag, shard::TenantShardId};
16 : use postgres_ffi::BLCKSZ;
17 : #[cfg(feature = "testing")]
18 : use std::sync::atomic::AtomicUsize;
19 : use std::{
20 : collections::VecDeque,
21 : process::{Command, Stdio},
22 : time::Duration,
23 : };
24 : use tokio::io::{AsyncReadExt, AsyncWriteExt};
25 : use tracing::{debug, error, instrument, Instrument};
26 : use utils::{lsn::Lsn, poison::Poison};
27 :
28 : pub struct WalRedoProcess {
29 : #[allow(dead_code)]
30 : conf: &'static PageServerConf,
31 : #[cfg(feature = "testing")]
32 : tenant_shard_id: TenantShardId,
33 : // Some() on construction, only becomes None on Drop.
34 : child: Option<NoLeakChild>,
35 : stdout: tokio::sync::Mutex<Poison<ProcessOutput>>,
36 : stdin: tokio::sync::Mutex<Poison<ProcessInput>>,
37 : /// Counter to separate same sized walredo inputs failing at the same millisecond.
38 : #[cfg(feature = "testing")]
39 : dump_sequence: AtomicUsize,
40 : }
41 :
42 : struct ProcessInput {
43 : stdin: tokio::process::ChildStdin,
44 : n_requests: usize,
45 : }
46 :
47 : struct ProcessOutput {
48 : stdout: tokio::process::ChildStdout,
49 : pending_responses: VecDeque<Option<Bytes>>,
50 : n_processed_responses: usize,
51 : }
52 :
53 : impl WalRedoProcess {
54 : //
55 : // Start postgres binary in special WAL redo mode.
56 : //
57 10 : #[instrument(skip_all,fields(pg_version=pg_version))]
58 : pub(crate) fn launch(
59 : conf: &'static PageServerConf,
60 : tenant_shard_id: TenantShardId,
61 : pg_version: u32,
62 : ) -> anyhow::Result<Self> {
63 : crate::span::debug_assert_current_span_has_tenant_id();
64 :
65 : let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
66 : let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
67 :
68 : use no_leak_child::NoLeakChildCommandExt;
69 : // Start postgres itself
70 : let child = Command::new(pg_bin_dir_path.join("postgres"))
71 : // the first arg must be --wal-redo so the child process enters into walredo mode
72 : .arg("--wal-redo")
73 : // the child doesn't process this arg, but, having it in the argv helps indentify the
74 : // walredo process for a particular tenant when debugging a pagserver
75 : .args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
76 : .stdin(Stdio::piped())
77 : .stderr(Stdio::piped())
78 : .stdout(Stdio::piped())
79 : .env_clear()
80 : .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
81 : .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
82 : // NB: The redo process is not trusted after we sent it the first
83 : // walredo work. Before that, it is trusted. Specifically, we trust
84 : // it to
85 : // 1. close all file descriptors except stdin, stdout, stderr because
86 : // pageserver might not be 100% diligent in setting FD_CLOEXEC on all
87 : // the files it opens, and
88 : // 2. to use seccomp to sandbox itself before processing the first
89 : // walredo request.
90 : .spawn_no_leak_child(tenant_shard_id)
91 : .context("spawn process")?;
92 : WAL_REDO_PROCESS_COUNTERS.started.inc();
93 0 : let mut child = scopeguard::guard(child, |child| {
94 0 : error!("killing wal-redo-postgres process due to a problem during launch");
95 0 : child.kill_and_wait(WalRedoKillCause::Startup);
96 0 : });
97 :
98 : let stdin = child.stdin.take().unwrap();
99 : let stdout = child.stdout.take().unwrap();
100 : let stderr = child.stderr.take().unwrap();
101 : let stderr = tokio::process::ChildStderr::from_std(stderr)
102 : .context("convert to tokio::ChildStderr")?;
103 : let stdin =
104 : tokio::process::ChildStdin::from_std(stdin).context("convert to tokio::ChildStdin")?;
105 : let stdout = tokio::process::ChildStdout::from_std(stdout)
106 : .context("convert to tokio::ChildStdout")?;
107 :
108 : // all fallible operations post-spawn are complete, so get rid of the guard
109 : let child = scopeguard::ScopeGuard::into_inner(child);
110 :
111 : tokio::spawn(
112 10 : async move {
113 10 : scopeguard::defer! {
114 10 : debug!("wal-redo-postgres stderr_logger_task finished");
115 10 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
116 10 : }
117 10 : debug!("wal-redo-postgres stderr_logger_task started");
118 10 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
119 :
120 : use tokio::io::AsyncBufReadExt;
121 10 : let mut stderr_lines = tokio::io::BufReader::new(stderr);
122 10 : let mut buf = Vec::new();
123 2 : let res = loop {
124 18 : buf.clear();
125 18 : // TODO we don't trust the process to cap its stderr length.
126 18 : // Currently it can do unbounded Vec allocation.
127 18 : match stderr_lines.read_until(b'\n', &mut buf).await {
128 2 : Ok(0) => break Ok(()), // eof
129 8 : Ok(num_bytes) => {
130 8 : let output = String::from_utf8_lossy(&buf[..num_bytes]);
131 8 : error!(%output, "received output");
132 : }
133 0 : Err(e) => {
134 0 : break Err(e);
135 : }
136 : }
137 : };
138 2 : match res {
139 2 : Ok(()) => (),
140 0 : Err(e) => {
141 0 : error!(error=?e, "failed to read from walredo stderr");
142 : }
143 : }
144 2 : }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
145 : );
146 :
147 : Ok(Self {
148 : conf,
149 : #[cfg(feature = "testing")]
150 : tenant_shard_id,
151 : child: Some(child),
152 : stdin: tokio::sync::Mutex::new(Poison::new(
153 : "stdin",
154 : ProcessInput {
155 : stdin,
156 : n_requests: 0,
157 : },
158 : )),
159 : stdout: tokio::sync::Mutex::new(Poison::new(
160 : "stdout",
161 : ProcessOutput {
162 : stdout,
163 : pending_responses: VecDeque::new(),
164 : n_processed_responses: 0,
165 : },
166 : )),
167 : #[cfg(feature = "testing")]
168 : dump_sequence: AtomicUsize::default(),
169 : })
170 : }
171 :
172 18 : pub(crate) fn id(&self) -> u32 {
173 18 : self.child
174 18 : .as_ref()
175 18 : .expect("must not call this during Drop")
176 18 : .id()
177 18 : }
178 :
179 : /// Apply given WAL records ('records') over an old page image. Returns
180 : /// new page image.
181 : ///
182 : /// # Cancel-Safety
183 : ///
184 : /// Cancellation safe.
185 8 : #[instrument(skip_all, fields(pid=%self.id()))]
186 : pub(crate) async fn apply_wal_records(
187 : &self,
188 : rel: RelTag,
189 : blknum: u32,
190 : base_img: &Option<Bytes>,
191 : records: &[(Lsn, NeonWalRecord)],
192 : wal_redo_timeout: Duration,
193 : ) -> anyhow::Result<Bytes> {
194 : debug_assert_current_span_has_tenant_id();
195 :
196 : let tag = protocol::BufferTag { rel, blknum };
197 :
198 : // Serialize all the messages to send the WAL redo process first.
199 : //
200 : // This could be problematic if there are millions of records to replay,
201 : // but in practice the number of records is usually so small that it doesn't
202 : // matter, and it's better to keep this code simple.
203 : //
204 : // Most requests start with a before-image with BLCKSZ bytes, followed by
205 : // by some other WAL records. Start with a buffer that can hold that
206 : // comfortably.
207 : let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
208 : protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
209 : if let Some(img) = base_img {
210 : protocol::build_push_page_msg(tag, img, &mut writebuf);
211 : }
212 : for (lsn, rec) in records.iter() {
213 : if let NeonWalRecord::Postgres {
214 : will_init: _,
215 : rec: postgres_rec,
216 : } = rec
217 : {
218 : protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
219 : } else {
220 : anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
221 : }
222 : }
223 : protocol::build_get_page_msg(tag, &mut writebuf);
224 : WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
225 :
226 : let Ok(res) =
227 : tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await
228 : else {
229 : anyhow::bail!("WAL redo timed out");
230 : };
231 :
232 : if res.is_err() {
233 : // not all of these can be caused by this particular input, however these are so rare
234 : // in tests so capture all.
235 : self.record_and_log(&writebuf);
236 : }
237 :
238 : res
239 : }
240 :
241 : /// Do a ping request-response roundtrip.
242 : ///
243 : /// Not used in production, but by Rust benchmarks.
244 2 : pub(crate) async fn ping(&self, timeout: Duration) -> anyhow::Result<()> {
245 2 : let mut writebuf: Vec<u8> = Vec::with_capacity(4);
246 2 : protocol::build_ping_msg(&mut writebuf);
247 4 : let Ok(res) = tokio::time::timeout(timeout, self.apply_wal_records0(&writebuf)).await
248 : else {
249 0 : anyhow::bail!("WAL redo ping timed out");
250 : };
251 2 : let response = res?;
252 2 : if response.len() != PAGE_SZ {
253 0 : anyhow::bail!(
254 0 : "WAL redo ping response should respond with page-sized response: {}",
255 0 : response.len()
256 0 : );
257 2 : }
258 2 : Ok(())
259 2 : }
260 :
261 : /// # Cancel-Safety
262 : ///
263 : /// When not polled to completion (e.g. because in `tokio::select!` another
264 : /// branch becomes ready before this future), concurrent and subsequent
265 : /// calls may fail due to [`utils::poison::Poison::check_and_arm`] calls.
266 : /// Dispose of this process instance and create a new one.
267 10 : async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result<Bytes> {
268 10 : let request_no = {
269 10 : let mut lock_guard = self.stdin.lock().await;
270 10 : let mut poison_guard = lock_guard.check_and_arm()?;
271 10 : let input = poison_guard.data_mut();
272 10 : input
273 10 : .stdin
274 10 : .write_all(writebuf)
275 10 : .await
276 10 : .context("write to walredo stdin")?;
277 10 : let request_no = input.n_requests;
278 10 : input.n_requests += 1;
279 10 : poison_guard.disarm();
280 10 : request_no
281 : };
282 :
283 : // To improve walredo performance we separate sending requests and receiving
284 : // responses. Them are protected by different mutexes (output and input).
285 : // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
286 : // then there is not warranty that T1 will first granted output mutex lock.
287 : // To address this issue we maintain number of sent requests, number of processed
288 : // responses and ring buffer with pending responses. After sending response
289 : // (under input mutex), threads remembers request number. Then it releases
290 : // input mutex, locks output mutex and fetch in ring buffer all responses until
291 : // its stored request number. The it takes correspondent element from
292 : // pending responses ring buffer and truncate all empty elements from the front,
293 : // advancing processed responses number.
294 :
295 10 : let mut lock_guard = self.stdout.lock().await;
296 10 : let mut poison_guard = lock_guard.check_and_arm()?;
297 10 : let output = poison_guard.data_mut();
298 10 : let n_processed_responses = output.n_processed_responses;
299 16 : while n_processed_responses + output.pending_responses.len() <= request_no {
300 : // We expect the WAL redo process to respond with an 8k page image. We read it
301 : // into this buffer.
302 10 : let mut resultbuf = vec![0; BLCKSZ.into()];
303 10 : output
304 10 : .stdout
305 10 : .read_exact(&mut resultbuf)
306 10 : .await
307 10 : .context("read walredo stdout")?;
308 6 : output
309 6 : .pending_responses
310 6 : .push_back(Some(Bytes::from(resultbuf)));
311 : }
312 : // Replace our request's response with None in `pending_responses`.
313 : // Then make space in the ring buffer by clearing out any seqence of contiguous
314 : // `None`'s from the front of `pending_responses`.
315 : // NB: We can't pop_front() because other requests' responses because another
316 : // requester might have grabbed the output mutex before us:
317 : // T1: grab input mutex
318 : // T1: send request_no 23
319 : // T1: release input mutex
320 : // T2: grab input mutex
321 : // T2: send request_no 24
322 : // T2: release input mutex
323 : // T2: grab output mutex
324 : // T2: n_processed_responses + output.pending_responses.len() <= request_no
325 : // 23 0 24
326 : // T2: enters poll loop that reads stdout
327 : // T2: put response for 23 into pending_responses
328 : // T2: put response for 24 into pending_resposnes
329 : // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
330 : // T2: takes its response_24
331 : // pending_responses now looks like this: Front Some(response_23) None Back
332 : // T2: does the while loop below
333 : // pending_responses now looks like this: Front Some(response_23) None Back
334 : // T2: releases output mutex
335 : // T1: grabs output mutex
336 : // T1: n_processed_responses + output.pending_responses.len() > request_no
337 : // 23 2 23
338 : // T1: skips poll loop that reads stdout
339 : // T1: takes its response_23
340 : // pending_responses now looks like this: Front None None Back
341 : // T2: does the while loop below
342 : // pending_responses now looks like this: Front Back
343 : // n_processed_responses now has value 25
344 6 : let res = output.pending_responses[request_no - n_processed_responses]
345 6 : .take()
346 6 : .expect("we own this request_no, nobody else is supposed to take it");
347 12 : while let Some(front) = output.pending_responses.front() {
348 6 : if front.is_none() {
349 6 : output.pending_responses.pop_front();
350 6 : output.n_processed_responses += 1;
351 6 : } else {
352 0 : break;
353 : }
354 : }
355 6 : poison_guard.disarm();
356 6 : Ok(res)
357 10 : }
358 :
359 : #[cfg(feature = "testing")]
360 4 : fn record_and_log(&self, writebuf: &[u8]) {
361 : use std::sync::atomic::Ordering;
362 :
363 4 : let millis = std::time::SystemTime::now()
364 4 : .duration_since(std::time::SystemTime::UNIX_EPOCH)
365 4 : .unwrap()
366 4 : .as_millis();
367 4 :
368 4 : let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
369 4 :
370 4 : // these files will be collected to an allure report
371 4 : let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
372 4 :
373 4 : let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
374 :
375 : use std::io::Write;
376 4 : let res = std::fs::OpenOptions::new()
377 4 : .write(true)
378 4 : .create_new(true)
379 4 : .read(true)
380 4 : .open(path)
381 4 : .and_then(|mut f| f.write_all(writebuf));
382 :
383 : // trip up allowed_errors
384 4 : if let Err(e) = res {
385 4 : tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
386 : } else {
387 0 : tracing::error!(filename, "erroring walredo input saved");
388 : }
389 4 : }
390 :
391 : #[cfg(not(feature = "testing"))]
392 : fn record_and_log(&self, _: &[u8]) {}
393 : }
394 :
395 : impl Drop for WalRedoProcess {
396 10 : fn drop(&mut self) {
397 10 : self.child
398 10 : .take()
399 10 : .expect("we only do this once")
400 10 : .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
401 10 : // no way to wait for stderr_logger_task from Drop because that is async only
402 10 : }
403 : }
|