Line data Source code
1 : mod no_leak_child;
2 : /// The IPC protocol that pageserver and walredo process speak over their shared pipe.
3 : mod protocol;
4 :
5 : use self::no_leak_child::NoLeakChild;
6 : use crate::{
7 : config::PageServerConf,
8 : metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER},
9 : span::debug_assert_current_span_has_tenant_id,
10 : walrecord::NeonWalRecord,
11 : };
12 : use anyhow::Context;
13 : use bytes::Bytes;
14 : use pageserver_api::{reltag::RelTag, shard::TenantShardId};
15 : use postgres_ffi::BLCKSZ;
16 : #[cfg(feature = "testing")]
17 : use std::sync::atomic::AtomicUsize;
18 : use std::{
19 : collections::VecDeque,
20 : process::{Command, Stdio},
21 : time::Duration,
22 : };
23 : use tokio::io::{AsyncReadExt, AsyncWriteExt};
24 : use tracing::{debug, error, instrument, Instrument};
25 : use utils::{lsn::Lsn, poison::Poison};
26 :
27 : pub struct WalRedoProcess {
28 : #[allow(dead_code)]
29 : conf: &'static PageServerConf,
30 : #[cfg(feature = "testing")]
31 : tenant_shard_id: TenantShardId,
32 : // Some() on construction, only becomes None on Drop.
33 : child: Option<NoLeakChild>,
34 : stdout: tokio::sync::Mutex<Poison<ProcessOutput>>,
35 : stdin: tokio::sync::Mutex<Poison<ProcessInput>>,
36 : /// Counter to separate same sized walredo inputs failing at the same millisecond.
37 : #[cfg(feature = "testing")]
38 : dump_sequence: AtomicUsize,
39 : }
40 :
41 : struct ProcessInput {
42 : stdin: tokio::process::ChildStdin,
43 : n_requests: usize,
44 : }
45 :
46 : struct ProcessOutput {
47 : stdout: tokio::process::ChildStdout,
48 : pending_responses: VecDeque<Option<Bytes>>,
49 : n_processed_responses: usize,
50 : }
51 :
52 : impl WalRedoProcess {
53 : //
54 : // Start postgres binary in special WAL redo mode.
55 : //
56 24 : #[instrument(skip_all,fields(pg_version=pg_version))]
57 : pub(crate) fn launch(
58 : conf: &'static PageServerConf,
59 : tenant_shard_id: TenantShardId,
60 : pg_version: u32,
61 : ) -> anyhow::Result<Self> {
62 : crate::span::debug_assert_current_span_has_tenant_id();
63 :
64 : let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
65 : let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
66 :
67 : use no_leak_child::NoLeakChildCommandExt;
68 : // Start postgres itself
69 : let child = Command::new(pg_bin_dir_path.join("postgres"))
70 : // the first arg must be --wal-redo so the child process enters into walredo mode
71 : .arg("--wal-redo")
72 : // the child doesn't process this arg, but, having it in the argv helps indentify the
73 : // walredo process for a particular tenant when debugging a pagserver
74 : .args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
75 : .stdin(Stdio::piped())
76 : .stderr(Stdio::piped())
77 : .stdout(Stdio::piped())
78 : .env_clear()
79 : .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
80 : .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
81 : // NB: The redo process is not trusted after we sent it the first
82 : // walredo work. Before that, it is trusted. Specifically, we trust
83 : // it to
84 : // 1. close all file descriptors except stdin, stdout, stderr because
85 : // pageserver might not be 100% diligent in setting FD_CLOEXEC on all
86 : // the files it opens, and
87 : // 2. to use seccomp to sandbox itself before processing the first
88 : // walredo request.
89 : .spawn_no_leak_child(tenant_shard_id)
90 : .context("spawn process")?;
91 : WAL_REDO_PROCESS_COUNTERS.started.inc();
92 0 : let mut child = scopeguard::guard(child, |child| {
93 0 : error!("killing wal-redo-postgres process due to a problem during launch");
94 0 : child.kill_and_wait(WalRedoKillCause::Startup);
95 0 : });
96 :
97 : let stdin = child.stdin.take().unwrap();
98 : let stdout = child.stdout.take().unwrap();
99 : let stderr = child.stderr.take().unwrap();
100 : let stderr = tokio::process::ChildStderr::from_std(stderr)
101 : .context("convert to tokio::ChildStderr")?;
102 : let stdin =
103 : tokio::process::ChildStdin::from_std(stdin).context("convert to tokio::ChildStdin")?;
104 : let stdout = tokio::process::ChildStdout::from_std(stdout)
105 : .context("convert to tokio::ChildStdout")?;
106 :
107 : // all fallible operations post-spawn are complete, so get rid of the guard
108 : let child = scopeguard::ScopeGuard::into_inner(child);
109 :
110 : tokio::spawn(
111 24 : async move {
112 24 : scopeguard::defer! {
113 24 : debug!("wal-redo-postgres stderr_logger_task finished");
114 24 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
115 24 : }
116 24 : debug!("wal-redo-postgres stderr_logger_task started");
117 24 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
118 :
119 : use tokio::io::AsyncBufReadExt;
120 24 : let mut stderr_lines = tokio::io::BufReader::new(stderr);
121 24 : let mut buf = Vec::new();
122 7 : let res = loop {
123 84 : buf.clear();
124 84 : // TODO we don't trust the process to cap its stderr length.
125 84 : // Currently it can do unbounded Vec allocation.
126 84 : match stderr_lines.read_until(b'\n', &mut buf).await {
127 7 : Ok(0) => break Ok(()), // eof
128 60 : Ok(num_bytes) => {
129 60 : let output = String::from_utf8_lossy(&buf[..num_bytes]);
130 60 : error!(%output, "received output");
131 : }
132 0 : Err(e) => {
133 0 : break Err(e);
134 : }
135 : }
136 : };
137 7 : match res {
138 7 : Ok(()) => (),
139 0 : Err(e) => {
140 0 : error!(error=?e, "failed to read from walredo stderr");
141 : }
142 : }
143 7 : }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
144 : );
145 :
146 : Ok(Self {
147 : conf,
148 : #[cfg(feature = "testing")]
149 : tenant_shard_id,
150 : child: Some(child),
151 : stdin: tokio::sync::Mutex::new(Poison::new(
152 : "stdin",
153 : ProcessInput {
154 : stdin,
155 : n_requests: 0,
156 : },
157 : )),
158 : stdout: tokio::sync::Mutex::new(Poison::new(
159 : "stdout",
160 : ProcessOutput {
161 : stdout,
162 : pending_responses: VecDeque::new(),
163 : n_processed_responses: 0,
164 : },
165 : )),
166 : #[cfg(feature = "testing")]
167 : dump_sequence: AtomicUsize::default(),
168 : })
169 : }
170 :
171 48 : pub(crate) fn id(&self) -> u32 {
172 48 : self.child
173 48 : .as_ref()
174 48 : .expect("must not call this during Drop")
175 48 : .id()
176 48 : }
177 :
178 : /// Apply given WAL records ('records') over an old page image. Returns
179 : /// new page image.
180 : ///
181 : /// # Cancel-Safety
182 : ///
183 : /// Cancellation safe.
184 24 : #[instrument(skip_all, fields(pid=%self.id()))]
185 : pub(crate) async fn apply_wal_records(
186 : &self,
187 : rel: RelTag,
188 : blknum: u32,
189 : base_img: &Option<Bytes>,
190 : records: &[(Lsn, NeonWalRecord)],
191 : wal_redo_timeout: Duration,
192 : ) -> anyhow::Result<Bytes> {
193 : debug_assert_current_span_has_tenant_id();
194 :
195 : let tag = protocol::BufferTag { rel, blknum };
196 :
197 : // Serialize all the messages to send the WAL redo process first.
198 : //
199 : // This could be problematic if there are millions of records to replay,
200 : // but in practice the number of records is usually so small that it doesn't
201 : // matter, and it's better to keep this code simple.
202 : //
203 : // Most requests start with a before-image with BLCKSZ bytes, followed by
204 : // by some other WAL records. Start with a buffer that can hold that
205 : // comfortably.
206 : let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
207 : protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
208 : if let Some(img) = base_img {
209 : protocol::build_push_page_msg(tag, img, &mut writebuf);
210 : }
211 : for (lsn, rec) in records.iter() {
212 : if let NeonWalRecord::Postgres {
213 : will_init: _,
214 : rec: postgres_rec,
215 : } = rec
216 : {
217 : protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
218 : } else {
219 : anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
220 : }
221 : }
222 : protocol::build_get_page_msg(tag, &mut writebuf);
223 : WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
224 :
225 : let Ok(res) =
226 : tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await
227 : else {
228 : anyhow::bail!("WAL redo timed out");
229 : };
230 :
231 : if res.is_err() {
232 : // not all of these can be caused by this particular input, however these are so rare
233 : // in tests so capture all.
234 : self.record_and_log(&writebuf);
235 : }
236 :
237 : res
238 : }
239 :
240 : /// # Cancel-Safety
241 : ///
242 : /// When not polled to completion (e.g. because in `tokio::select!` another
243 : /// branch becomes ready before this future), concurrent and subsequent
244 : /// calls may fail due to [`utils::poison::Poison::check_and_arm`] calls.
245 : /// Dispose of this process instance and create a new one.
246 24 : async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result<Bytes> {
247 24 : let request_no = {
248 24 : let mut lock_guard = self.stdin.lock().await;
249 24 : let mut poison_guard = lock_guard.check_and_arm()?;
250 24 : let input = poison_guard.data_mut();
251 24 : input
252 24 : .stdin
253 24 : .write_all(writebuf)
254 24 : .await
255 24 : .context("write to walredo stdin")?;
256 24 : let request_no = input.n_requests;
257 24 : input.n_requests += 1;
258 24 : poison_guard.disarm();
259 24 : request_no
260 : };
261 :
262 : // To improve walredo performance we separate sending requests and receiving
263 : // responses. Them are protected by different mutexes (output and input).
264 : // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
265 : // then there is not warranty that T1 will first granted output mutex lock.
266 : // To address this issue we maintain number of sent requests, number of processed
267 : // responses and ring buffer with pending responses. After sending response
268 : // (under input mutex), threads remembers request number. Then it releases
269 : // input mutex, locks output mutex and fetch in ring buffer all responses until
270 : // its stored request number. The it takes correspondent element from
271 : // pending responses ring buffer and truncate all empty elements from the front,
272 : // advancing processed responses number.
273 :
274 24 : let mut lock_guard = self.stdout.lock().await;
275 24 : let mut poison_guard = lock_guard.check_and_arm()?;
276 24 : let output = poison_guard.data_mut();
277 24 : let n_processed_responses = output.n_processed_responses;
278 36 : while n_processed_responses + output.pending_responses.len() <= request_no {
279 : // We expect the WAL redo process to respond with an 8k page image. We read it
280 : // into this buffer.
281 24 : let mut resultbuf = vec![0; BLCKSZ.into()];
282 24 : output
283 24 : .stdout
284 24 : .read_exact(&mut resultbuf)
285 24 : .await
286 24 : .context("read walredo stdout")?;
287 12 : output
288 12 : .pending_responses
289 12 : .push_back(Some(Bytes::from(resultbuf)));
290 : }
291 : // Replace our request's response with None in `pending_responses`.
292 : // Then make space in the ring buffer by clearing out any seqence of contiguous
293 : // `None`'s from the front of `pending_responses`.
294 : // NB: We can't pop_front() because other requests' responses because another
295 : // requester might have grabbed the output mutex before us:
296 : // T1: grab input mutex
297 : // T1: send request_no 23
298 : // T1: release input mutex
299 : // T2: grab input mutex
300 : // T2: send request_no 24
301 : // T2: release input mutex
302 : // T2: grab output mutex
303 : // T2: n_processed_responses + output.pending_responses.len() <= request_no
304 : // 23 0 24
305 : // T2: enters poll loop that reads stdout
306 : // T2: put response for 23 into pending_responses
307 : // T2: put response for 24 into pending_resposnes
308 : // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
309 : // T2: takes its response_24
310 : // pending_responses now looks like this: Front Some(response_23) None Back
311 : // T2: does the while loop below
312 : // pending_responses now looks like this: Front Some(response_23) None Back
313 : // T2: releases output mutex
314 : // T1: grabs output mutex
315 : // T1: n_processed_responses + output.pending_responses.len() > request_no
316 : // 23 2 23
317 : // T1: skips poll loop that reads stdout
318 : // T1: takes its response_23
319 : // pending_responses now looks like this: Front None None Back
320 : // T2: does the while loop below
321 : // pending_responses now looks like this: Front Back
322 : // n_processed_responses now has value 25
323 12 : let res = output.pending_responses[request_no - n_processed_responses]
324 12 : .take()
325 12 : .expect("we own this request_no, nobody else is supposed to take it");
326 24 : while let Some(front) = output.pending_responses.front() {
327 12 : if front.is_none() {
328 12 : output.pending_responses.pop_front();
329 12 : output.n_processed_responses += 1;
330 12 : } else {
331 0 : break;
332 : }
333 : }
334 12 : poison_guard.disarm();
335 12 : Ok(res)
336 24 : }
337 :
338 : #[cfg(feature = "testing")]
339 12 : fn record_and_log(&self, writebuf: &[u8]) {
340 : use std::sync::atomic::Ordering;
341 :
342 12 : let millis = std::time::SystemTime::now()
343 12 : .duration_since(std::time::SystemTime::UNIX_EPOCH)
344 12 : .unwrap()
345 12 : .as_millis();
346 12 :
347 12 : let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
348 12 :
349 12 : // these files will be collected to an allure report
350 12 : let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
351 12 :
352 12 : let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
353 :
354 : use std::io::Write;
355 12 : let res = std::fs::OpenOptions::new()
356 12 : .write(true)
357 12 : .create_new(true)
358 12 : .read(true)
359 12 : .open(path)
360 12 : .and_then(|mut f| f.write_all(writebuf));
361 :
362 : // trip up allowed_errors
363 12 : if let Err(e) = res {
364 12 : tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
365 : } else {
366 0 : tracing::error!(filename, "erroring walredo input saved");
367 : }
368 12 : }
369 :
370 : #[cfg(not(feature = "testing"))]
371 : fn record_and_log(&self, _: &[u8]) {}
372 : }
373 :
374 : impl Drop for WalRedoProcess {
375 24 : fn drop(&mut self) {
376 24 : self.child
377 24 : .take()
378 24 : .expect("we only do this once")
379 24 : .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
380 24 : // no way to wait for stderr_logger_task from Drop because that is async only
381 24 : }
382 : }
|