Line data Source code
1 : use self::no_leak_child::NoLeakChild;
2 : use crate::{
3 : config::PageServerConf,
4 : metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER},
5 : walrecord::NeonWalRecord,
6 : walredo::process::{no_leak_child, protocol},
7 : };
8 : use anyhow::Context;
9 : use bytes::Bytes;
10 : use pageserver_api::{reltag::RelTag, shard::TenantShardId};
11 : use postgres_ffi::BLCKSZ;
12 : #[cfg(feature = "testing")]
13 : use std::sync::atomic::AtomicUsize;
14 : use std::{
15 : collections::VecDeque,
16 : process::{Command, Stdio},
17 : time::Duration,
18 : };
19 : use tokio::io::{AsyncReadExt, AsyncWriteExt};
20 : use tracing::{debug, error, instrument, Instrument};
21 : use utils::{lsn::Lsn, poison::Poison};
22 :
23 : pub struct WalRedoProcess {
24 : #[allow(dead_code)]
25 : conf: &'static PageServerConf,
26 : tenant_shard_id: TenantShardId,
27 : // Some() on construction, only becomes None on Drop.
28 : child: Option<NoLeakChild>,
29 : stdout: tokio::sync::Mutex<Poison<ProcessOutput>>,
30 : stdin: tokio::sync::Mutex<Poison<ProcessInput>>,
31 : /// Counter to separate same sized walredo inputs failing at the same millisecond.
32 : #[cfg(feature = "testing")]
33 : dump_sequence: AtomicUsize,
34 : }
35 :
36 : struct ProcessInput {
37 : stdin: tokio::process::ChildStdin,
38 : n_requests: usize,
39 : }
40 :
41 : struct ProcessOutput {
42 : stdout: tokio::process::ChildStdout,
43 : pending_responses: VecDeque<Option<Bytes>>,
44 : n_processed_responses: usize,
45 : }
46 :
47 : impl WalRedoProcess {
48 : //
49 : // Start postgres binary in special WAL redo mode.
50 : //
51 0 : #[instrument(skip_all,fields(pg_version=pg_version))]
52 : pub(crate) fn launch(
53 : conf: &'static PageServerConf,
54 : tenant_shard_id: TenantShardId,
55 : pg_version: u32,
56 : ) -> anyhow::Result<Self> {
57 : crate::span::debug_assert_current_span_has_tenant_id();
58 :
59 : let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible.
60 : let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?;
61 :
62 : use no_leak_child::NoLeakChildCommandExt;
63 : // Start postgres itself
64 : let child = Command::new(pg_bin_dir_path.join("postgres"))
65 : // the first arg must be --wal-redo so the child process enters into walredo mode
66 : .arg("--wal-redo")
67 : // the child doesn't process this arg, but, having it in the argv helps indentify the
68 : // walredo process for a particular tenant when debugging a pagserver
69 : .args(["--tenant-shard-id", &format!("{tenant_shard_id}")])
70 : .stdin(Stdio::piped())
71 : .stderr(Stdio::piped())
72 : .stdout(Stdio::piped())
73 : .env_clear()
74 : .env("LD_LIBRARY_PATH", &pg_lib_dir_path)
75 : .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path)
76 : // NB: The redo process is not trusted after we sent it the first
77 : // walredo work. Before that, it is trusted. Specifically, we trust
78 : // it to
79 : // 1. close all file descriptors except stdin, stdout, stderr because
80 : // pageserver might not be 100% diligent in setting FD_CLOEXEC on all
81 : // the files it opens, and
82 : // 2. to use seccomp to sandbox itself before processing the first
83 : // walredo request.
84 : .spawn_no_leak_child(tenant_shard_id)
85 : .context("spawn process")?;
86 : WAL_REDO_PROCESS_COUNTERS.started.inc();
87 0 : let mut child = scopeguard::guard(child, |child| {
88 0 : error!("killing wal-redo-postgres process due to a problem during launch");
89 0 : child.kill_and_wait(WalRedoKillCause::Startup);
90 0 : });
91 :
92 : let stdin = child.stdin.take().unwrap();
93 : let stdout = child.stdout.take().unwrap();
94 : let stderr = child.stderr.take().unwrap();
95 : let stderr = tokio::process::ChildStderr::from_std(stderr)
96 : .context("convert to tokio::ChildStderr")?;
97 : let stdin =
98 : tokio::process::ChildStdin::from_std(stdin).context("convert to tokio::ChildStdin")?;
99 : let stdout = tokio::process::ChildStdout::from_std(stdout)
100 : .context("convert to tokio::ChildStdout")?;
101 :
102 : // all fallible operations post-spawn are complete, so get rid of the guard
103 : let child = scopeguard::ScopeGuard::into_inner(child);
104 :
105 : tokio::spawn(
106 0 : async move {
107 : scopeguard::defer! {
108 : debug!("wal-redo-postgres stderr_logger_task finished");
109 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc();
110 : }
111 0 : debug!("wal-redo-postgres stderr_logger_task started");
112 0 : crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc();
113 0 :
114 0 : use tokio::io::AsyncBufReadExt;
115 0 : let mut stderr_lines = tokio::io::BufReader::new(stderr);
116 0 : let mut buf = Vec::new();
117 0 : let res = loop {
118 0 : buf.clear();
119 0 : // TODO we don't trust the process to cap its stderr length.
120 0 : // Currently it can do unbounded Vec allocation.
121 0 : match stderr_lines.read_until(b'\n', &mut buf).await {
122 0 : Ok(0) => break Ok(()), // eof
123 0 : Ok(num_bytes) => {
124 0 : let output = String::from_utf8_lossy(&buf[..num_bytes]);
125 0 : error!(%output, "received output");
126 : }
127 0 : Err(e) => {
128 0 : break Err(e);
129 : }
130 : }
131 : };
132 0 : match res {
133 0 : Ok(()) => (),
134 0 : Err(e) => {
135 0 : error!(error=?e, "failed to read from walredo stderr");
136 : }
137 : }
138 0 : }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version))
139 : );
140 :
141 : Ok(Self {
142 : conf,
143 : tenant_shard_id,
144 : child: Some(child),
145 : stdin: tokio::sync::Mutex::new(Poison::new(
146 : "stdin",
147 : ProcessInput {
148 : stdin,
149 : n_requests: 0,
150 : },
151 : )),
152 : stdout: tokio::sync::Mutex::new(Poison::new(
153 : "stdout",
154 : ProcessOutput {
155 : stdout,
156 : pending_responses: VecDeque::new(),
157 : n_processed_responses: 0,
158 : },
159 : )),
160 : #[cfg(feature = "testing")]
161 : dump_sequence: AtomicUsize::default(),
162 : })
163 : }
164 :
165 0 : pub(crate) fn id(&self) -> u32 {
166 0 : self.child
167 0 : .as_ref()
168 0 : .expect("must not call this during Drop")
169 0 : .id()
170 0 : }
171 :
172 : /// Apply given WAL records ('records') over an old page image. Returns
173 : /// new page image.
174 : ///
175 : /// # Cancel-Safety
176 : ///
177 : /// Cancellation safe.
178 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))]
179 : pub(crate) async fn apply_wal_records(
180 : &self,
181 : rel: RelTag,
182 : blknum: u32,
183 : base_img: &Option<Bytes>,
184 : records: &[(Lsn, NeonWalRecord)],
185 : wal_redo_timeout: Duration,
186 : ) -> anyhow::Result<Bytes> {
187 : let tag = protocol::BufferTag { rel, blknum };
188 :
189 : // Serialize all the messages to send the WAL redo process first.
190 : //
191 : // This could be problematic if there are millions of records to replay,
192 : // but in practice the number of records is usually so small that it doesn't
193 : // matter, and it's better to keep this code simple.
194 : //
195 : // Most requests start with a before-image with BLCKSZ bytes, followed by
196 : // by some other WAL records. Start with a buffer that can hold that
197 : // comfortably.
198 : let mut writebuf: Vec<u8> = Vec::with_capacity((BLCKSZ as usize) * 3);
199 : protocol::build_begin_redo_for_block_msg(tag, &mut writebuf);
200 : if let Some(img) = base_img {
201 : protocol::build_push_page_msg(tag, img, &mut writebuf);
202 : }
203 : for (lsn, rec) in records.iter() {
204 : if let NeonWalRecord::Postgres {
205 : will_init: _,
206 : rec: postgres_rec,
207 : } = rec
208 : {
209 : protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf);
210 : } else {
211 : anyhow::bail!("tried to pass neon wal record to postgres WAL redo");
212 : }
213 : }
214 : protocol::build_get_page_msg(tag, &mut writebuf);
215 : WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
216 :
217 : let Ok(res) =
218 : tokio::time::timeout(wal_redo_timeout, self.apply_wal_records0(&writebuf)).await
219 : else {
220 : anyhow::bail!("WAL redo timed out");
221 : };
222 :
223 : if res.is_err() {
224 : // not all of these can be caused by this particular input, however these are so rare
225 : // in tests so capture all.
226 : self.record_and_log(&writebuf);
227 : }
228 :
229 : res
230 : }
231 :
232 : /// # Cancel-Safety
233 : ///
234 : /// When not polled to completion (e.g. because in `tokio::select!` another
235 : /// branch becomes ready before this future), concurrent and subsequent
236 : /// calls may fail due to [`utils::poison::Poison::check_and_arm`] calls.
237 : /// Dispose of this process instance and create a new one.
238 0 : async fn apply_wal_records0(&self, writebuf: &[u8]) -> anyhow::Result<Bytes> {
239 0 : let request_no = {
240 0 : let mut lock_guard = self.stdin.lock().await;
241 0 : let mut poison_guard = lock_guard.check_and_arm()?;
242 0 : let input = poison_guard.data_mut();
243 0 : input
244 0 : .stdin
245 0 : .write_all(writebuf)
246 0 : .await
247 0 : .context("write to walredo stdin")?;
248 0 : let request_no = input.n_requests;
249 0 : input.n_requests += 1;
250 0 : poison_guard.disarm();
251 0 : request_no
252 : };
253 :
254 : // To improve walredo performance we separate sending requests and receiving
255 : // responses. Them are protected by different mutexes (output and input).
256 : // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process
257 : // then there is not warranty that T1 will first granted output mutex lock.
258 : // To address this issue we maintain number of sent requests, number of processed
259 : // responses and ring buffer with pending responses. After sending response
260 : // (under input mutex), threads remembers request number. Then it releases
261 : // input mutex, locks output mutex and fetch in ring buffer all responses until
262 : // its stored request number. The it takes correspondent element from
263 : // pending responses ring buffer and truncate all empty elements from the front,
264 : // advancing processed responses number.
265 :
266 0 : let mut lock_guard = self.stdout.lock().await;
267 0 : let mut poison_guard = lock_guard.check_and_arm()?;
268 0 : let output = poison_guard.data_mut();
269 0 : let n_processed_responses = output.n_processed_responses;
270 0 : while n_processed_responses + output.pending_responses.len() <= request_no {
271 : // We expect the WAL redo process to respond with an 8k page image. We read it
272 : // into this buffer.
273 0 : let mut resultbuf = vec![0; BLCKSZ.into()];
274 0 : output
275 0 : .stdout
276 0 : .read_exact(&mut resultbuf)
277 0 : .await
278 0 : .context("read walredo stdout")?;
279 0 : output
280 0 : .pending_responses
281 0 : .push_back(Some(Bytes::from(resultbuf)));
282 : }
283 : // Replace our request's response with None in `pending_responses`.
284 : // Then make space in the ring buffer by clearing out any seqence of contiguous
285 : // `None`'s from the front of `pending_responses`.
286 : // NB: We can't pop_front() because other requests' responses because another
287 : // requester might have grabbed the output mutex before us:
288 : // T1: grab input mutex
289 : // T1: send request_no 23
290 : // T1: release input mutex
291 : // T2: grab input mutex
292 : // T2: send request_no 24
293 : // T2: release input mutex
294 : // T2: grab output mutex
295 : // T2: n_processed_responses + output.pending_responses.len() <= request_no
296 : // 23 0 24
297 : // T2: enters poll loop that reads stdout
298 : // T2: put response for 23 into pending_responses
299 : // T2: put response for 24 into pending_resposnes
300 : // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back
301 : // T2: takes its response_24
302 : // pending_responses now looks like this: Front Some(response_23) None Back
303 : // T2: does the while loop below
304 : // pending_responses now looks like this: Front Some(response_23) None Back
305 : // T2: releases output mutex
306 : // T1: grabs output mutex
307 : // T1: n_processed_responses + output.pending_responses.len() > request_no
308 : // 23 2 23
309 : // T1: skips poll loop that reads stdout
310 : // T1: takes its response_23
311 : // pending_responses now looks like this: Front None None Back
312 : // T2: does the while loop below
313 : // pending_responses now looks like this: Front Back
314 : // n_processed_responses now has value 25
315 0 : let res = output.pending_responses[request_no - n_processed_responses]
316 0 : .take()
317 0 : .expect("we own this request_no, nobody else is supposed to take it");
318 0 : while let Some(front) = output.pending_responses.front() {
319 0 : if front.is_none() {
320 0 : output.pending_responses.pop_front();
321 0 : output.n_processed_responses += 1;
322 0 : } else {
323 0 : break;
324 : }
325 : }
326 0 : poison_guard.disarm();
327 0 : Ok(res)
328 0 : }
329 :
330 : #[cfg(feature = "testing")]
331 0 : fn record_and_log(&self, writebuf: &[u8]) {
332 0 : use std::sync::atomic::Ordering;
333 0 :
334 0 : let millis = std::time::SystemTime::now()
335 0 : .duration_since(std::time::SystemTime::UNIX_EPOCH)
336 0 : .unwrap()
337 0 : .as_millis();
338 0 :
339 0 : let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
340 0 :
341 0 : // these files will be collected to an allure report
342 0 : let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
343 0 :
344 0 : let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename);
345 0 :
346 0 : use std::io::Write;
347 0 : let res = std::fs::OpenOptions::new()
348 0 : .write(true)
349 0 : .create_new(true)
350 0 : .read(true)
351 0 : .open(path)
352 0 : .and_then(|mut f| f.write_all(writebuf));
353 :
354 : // trip up allowed_errors
355 0 : if let Err(e) = res {
356 0 : tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
357 : } else {
358 0 : tracing::error!(filename, "erroring walredo input saved");
359 : }
360 0 : }
361 :
362 : #[cfg(not(feature = "testing"))]
363 : fn record_and_log(&self, _: &[u8]) {}
364 : }
365 :
366 : impl Drop for WalRedoProcess {
367 0 : fn drop(&mut self) {
368 0 : self.child
369 0 : .take()
370 0 : .expect("we only do this once")
371 0 : .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop);
372 0 : // no way to wait for stderr_logger_task from Drop because that is async only
373 0 : }
374 : }
|