Line data Source code
1 : //! Spawns and kills background processes that are needed by Neon CLI.
2 : //! Applies common set-up such as log and pid files (if needed) to every process.
3 : //!
4 : //! Neon CLI does not run in background, so it needs to store the information about
5 : //! spawned processes, which it does in this module.
6 : //! We do that by storing the pid of the process in the "${process_name}.pid" file.
7 : //! The pid file can be created by the process itself
8 : //! (Neon storage binaries do that and also ensure that a lock is taken onto that file)
9 : //! or we create such file after starting the process
10 : //! (non-Neon binaries don't necessarily follow our pidfile conventions).
11 : //! The pid stored in the file is later used to stop the service.
12 : //!
13 : //! See the [`lock_file`](utils::lock_file) module for more info.
14 :
15 : use std::ffi::OsStr;
16 : use std::io::Write;
17 : use std::os::unix::prelude::AsRawFd;
18 : use std::os::unix::process::CommandExt;
19 : use std::path::Path;
20 : use std::process::Command;
21 : use std::time::Duration;
22 : use std::{fs, io, thread};
23 :
24 : use anyhow::Context;
25 : use camino::{Utf8Path, Utf8PathBuf};
26 : use nix::errno::Errno;
27 : use nix::fcntl::{FcntlArg, FdFlag};
28 : use nix::sys::signal::{kill, Signal};
29 : use nix::unistd::Pid;
30 : use utils::pid_file::{self, PidFileRead};
31 :
32 : // These constants control the loop used to poll for process start / stop.
33 : //
34 : // The loop waits for at most 10 seconds, polling every 100 ms.
35 : // Once a second, it prints a dot ("."), to give the user an indication that
36 : // it's waiting. If the process hasn't started/stopped after 5 seconds,
37 : // it prints a notice that it's taking long, but keeps waiting.
38 : //
39 : const STOP_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
40 : const STOP_RETRIES: u128 = STOP_RETRY_TIMEOUT.as_millis() / RETRY_INTERVAL.as_millis();
41 : const RETRY_INTERVAL: Duration = Duration::from_millis(100);
42 : const DOT_EVERY_RETRIES: u128 = 10;
43 : const NOTICE_AFTER_RETRIES: u128 = 50;
44 :
45 : /// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates
46 : /// it itself.
47 : pub enum InitialPidFile {
48 : /// Create a pidfile, to allow future CLI invocations to manipulate the process.
49 : Create(Utf8PathBuf),
50 : /// The process will create the pidfile itself, need to wait for that event.
51 : Expect(Utf8PathBuf),
52 : }
53 :
54 : /// Start a background child process using the parameters given.
55 : #[allow(clippy::too_many_arguments)]
56 0 : pub async fn start_process<F, Fut, AI, A, EI>(
57 0 : process_name: &str,
58 0 : datadir: &Path,
59 0 : command: &Path,
60 0 : args: AI,
61 0 : envs: EI,
62 0 : initial_pid_file: InitialPidFile,
63 0 : retry_timeout: &Duration,
64 0 : process_status_check: F,
65 0 : ) -> anyhow::Result<()>
66 0 : where
67 0 : F: Fn() -> Fut,
68 0 : Fut: std::future::Future<Output = anyhow::Result<bool>>,
69 0 : AI: IntoIterator<Item = A>,
70 0 : A: AsRef<OsStr>,
71 0 : // Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
72 0 : EI: IntoIterator<Item = (String, String)>,
73 0 : {
74 0 : let retries: u128 = retry_timeout.as_millis() / RETRY_INTERVAL.as_millis();
75 0 : if !datadir.metadata().context("stat datadir")?.is_dir() {
76 0 : anyhow::bail!("`datadir` must be a directory when calling this function: {datadir:?}");
77 0 : }
78 0 : let log_path = datadir.join(format!("{process_name}.log"));
79 0 : let process_log_file = fs::OpenOptions::new()
80 0 : .create(true)
81 0 : .append(true)
82 0 : .open(&log_path)
83 0 : .with_context(|| {
84 0 : format!("Could not open {process_name} log file {log_path:?} for writing")
85 0 : })?;
86 0 : let same_file_for_stderr = process_log_file.try_clone().with_context(|| {
87 0 : format!("Could not reuse {process_name} log file {log_path:?} for writing stderr")
88 0 : })?;
89 :
90 0 : let mut command = Command::new(command);
91 0 : let background_command = command
92 0 : .stdout(process_log_file)
93 0 : .stderr(same_file_for_stderr)
94 0 : .args(args)
95 0 : // spawn all child processes in their datadir, useful for all kinds of things,
96 0 : // not least cleaning up child processes e.g. after an unclean exit from the test suite:
97 0 : // ```
98 0 : // lsof -d cwd -a +D Users/cs/src/neon/test_output
99 0 : // ```
100 0 : .current_dir(datadir);
101 0 :
102 0 : let filled_cmd = fill_env_vars_prefixed_neon(fill_remote_storage_secrets_vars(
103 0 : fill_rust_env_vars(background_command),
104 0 : ));
105 0 : filled_cmd.envs(envs);
106 :
107 0 : let pid_file_to_check = match &initial_pid_file {
108 0 : InitialPidFile::Create(path) => {
109 0 : pre_exec_create_pidfile(filled_cmd, path);
110 0 : path
111 : }
112 0 : InitialPidFile::Expect(path) => path,
113 : };
114 :
115 0 : let spawned_process = filled_cmd.spawn().with_context(|| {
116 0 : format!("Could not spawn {process_name}, see console output and log files for details.")
117 0 : })?;
118 0 : let pid = spawned_process.id();
119 0 : let pid = Pid::from_raw(
120 0 : i32::try_from(pid)
121 0 : .with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?,
122 : );
123 : // set up a scopeguard to kill & wait for the child in case we panic or bail below
124 0 : let spawned_process = scopeguard::guard(spawned_process, |mut spawned_process| {
125 0 : println!("SIGKILL & wait the started process");
126 0 : (|| {
127 0 : // TODO: use another signal that can be caught by the child so it can clean up any children it spawned (e..g, walredo).
128 0 : spawned_process.kill().context("SIGKILL child")?;
129 0 : spawned_process.wait().context("wait() for child process")?;
130 0 : anyhow::Ok(())
131 0 : })()
132 0 : .with_context(|| format!("scopeguard kill&wait child {process_name:?}"))
133 0 : .unwrap();
134 0 : });
135 :
136 0 : for retries in 0..retries {
137 0 : match process_started(pid, pid_file_to_check, &process_status_check).await {
138 : Ok(true) => {
139 0 : println!("\n{process_name} started and passed status check, pid: {pid}");
140 0 : // leak the child process, it'll outlive this neon_local invocation
141 0 : drop(scopeguard::ScopeGuard::into_inner(spawned_process));
142 0 : return Ok(());
143 : }
144 : Ok(false) => {
145 0 : if retries == NOTICE_AFTER_RETRIES {
146 0 : // The process is taking a long time to start up. Keep waiting, but
147 0 : // print a message
148 0 : print!("\n{process_name} has not started yet, continuing to wait");
149 0 : }
150 0 : if retries % DOT_EVERY_RETRIES == 0 {
151 0 : print!(".");
152 0 : io::stdout().flush().unwrap();
153 0 : }
154 0 : tokio::time::sleep(RETRY_INTERVAL).await;
155 : }
156 0 : Err(e) => {
157 0 : println!("error starting process {process_name:?}: {e:#}");
158 0 : return Err(e);
159 : }
160 : }
161 : }
162 0 : println!();
163 0 : anyhow::bail!(format!(
164 0 : "{} did not start+pass status checks within {:?} seconds",
165 0 : process_name, retry_timeout
166 0 : ));
167 0 : }
168 :
169 : /// Stops the process, using the pid file given. Returns Ok also if the process is already not running.
170 0 : pub fn stop_process(
171 0 : immediate: bool,
172 0 : process_name: &str,
173 0 : pid_file: &Utf8Path,
174 0 : ) -> anyhow::Result<()> {
175 0 : let pid = match pid_file::read(pid_file)
176 0 : .with_context(|| format!("read pid_file {pid_file:?}"))?
177 : {
178 : PidFileRead::NotExist => {
179 0 : println!("{process_name} is already stopped: no pid file present at {pid_file:?}");
180 0 : return Ok(());
181 : }
182 : PidFileRead::NotHeldByAnyProcess(_) => {
183 : // Don't try to kill according to file contents beacuse the pid might have been re-used by another process.
184 : // Don't delete the file either, it can race with new pid file creation.
185 : // Read `pid_file` module comment for details.
186 0 : println!(
187 0 : "No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: {pid_file:?}"
188 0 : );
189 0 : return Ok(());
190 : }
191 0 : PidFileRead::LockedByOtherProcess(pid) => pid,
192 : };
193 : // XXX the pid could become invalid (and recycled) at any time before the kill() below.
194 :
195 : // send signal
196 0 : let sig = if immediate {
197 0 : print!("Stopping {process_name} with pid {pid} immediately..");
198 0 : Signal::SIGQUIT
199 : } else {
200 0 : print!("Stopping {process_name} with pid {pid} gracefully..");
201 0 : Signal::SIGTERM
202 : };
203 0 : io::stdout().flush().unwrap();
204 0 : match kill(pid, sig) {
205 0 : Ok(()) => (),
206 : Err(Errno::ESRCH) => {
207 : // Again, don't delete the pid file. The unlink can race with a new pid file being created.
208 0 : println!(
209 0 : "{process_name} with pid {pid} does not exist, but a pid file {pid_file:?} was found. Likely the pid got recycled. Lucky we didn't harm anyone."
210 0 : );
211 0 : return Ok(());
212 : }
213 0 : Err(e) => anyhow::bail!("Failed to send signal to {process_name} with pid {pid}: {e}"),
214 : }
215 :
216 : // Wait until process is gone
217 0 : wait_until_stopped(process_name, pid)?;
218 0 : Ok(())
219 0 : }
220 :
221 0 : pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> {
222 0 : for retries in 0..STOP_RETRIES {
223 0 : match process_has_stopped(pid) {
224 : Ok(true) => {
225 0 : println!("\n{process_name} stopped");
226 0 : return Ok(());
227 : }
228 : Ok(false) => {
229 0 : if retries == NOTICE_AFTER_RETRIES {
230 0 : // The process is taking a long time to start up. Keep waiting, but
231 0 : // print a message
232 0 : print!("\n{process_name} has not stopped yet, continuing to wait");
233 0 : }
234 0 : if retries % DOT_EVERY_RETRIES == 0 {
235 0 : print!(".");
236 0 : io::stdout().flush().unwrap();
237 0 : }
238 0 : thread::sleep(RETRY_INTERVAL);
239 : }
240 0 : Err(e) => {
241 0 : println!("{process_name} with pid {pid} failed to stop: {e:#}");
242 0 : return Err(e);
243 : }
244 : }
245 : }
246 0 : println!();
247 0 : anyhow::bail!(format!(
248 0 : "{} with pid {} did not stop in {:?} seconds",
249 0 : process_name, pid, STOP_RETRY_TIMEOUT
250 0 : ));
251 0 : }
252 :
253 0 : fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
254 0 : // If RUST_BACKTRACE is set, pass it through. But if it's not set, default
255 0 : // to RUST_BACKTRACE=1.
256 0 : let backtrace_setting = std::env::var_os("RUST_BACKTRACE");
257 0 : let backtrace_setting = backtrace_setting
258 0 : .as_deref()
259 0 : .unwrap_or_else(|| OsStr::new("1"));
260 0 :
261 0 : let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
262 :
263 : // Pass through these environment variables to the command
264 0 : for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
265 0 : if let Some(val) = std::env::var_os(var) {
266 0 : filled_cmd = filled_cmd.env(var, val);
267 0 : }
268 : }
269 :
270 0 : filled_cmd
271 0 : }
272 :
273 0 : fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
274 0 : for env_key in [
275 : "AWS_ACCESS_KEY_ID",
276 0 : "AWS_SECRET_ACCESS_KEY",
277 0 : "AWS_PROFILE",
278 0 : // HOME is needed in combination with `AWS_PROFILE` to pick up the SSO sessions.
279 0 : "HOME",
280 0 : "AZURE_STORAGE_ACCOUNT",
281 0 : "AZURE_STORAGE_ACCESS_KEY",
282 : ] {
283 0 : if let Ok(value) = std::env::var(env_key) {
284 0 : cmd = cmd.env(env_key, value);
285 0 : }
286 : }
287 0 : cmd
288 0 : }
289 :
290 0 : fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command {
291 0 : for (var, val) in std::env::vars() {
292 0 : if var.starts_with("NEON_") {
293 0 : cmd = cmd.env(var, val);
294 0 : }
295 : }
296 0 : cmd
297 0 : }
298 :
299 : /// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
300 : /// 1. Claims a pidfile with a fcntl lock on it and
301 : /// 2. Sets up the pidfile's file descriptor so that it (and the lock)
302 : /// will remain held until the cmd exits.
303 0 : fn pre_exec_create_pidfile<P>(cmd: &mut Command, path: P) -> &mut Command
304 0 : where
305 0 : P: Into<Utf8PathBuf>,
306 0 : {
307 0 : let path: Utf8PathBuf = path.into();
308 0 : // SAFETY:
309 0 : // pre_exec is marked unsafe because it runs between fork and exec.
310 0 : // Why is that dangerous in various ways?
311 0 : // Long answer: https://github.com/rust-lang/rust/issues/39575
312 0 : // Short answer: in a multi-threaded program, other threads may have
313 0 : // been inside of critical sections at the time of fork. In the
314 0 : // original process, that was allright, assuming they protected
315 0 : // the critical sections appropriately, e.g., through locks.
316 0 : // Fork adds another process to the mix that
317 0 : // 1. Has a single thread T
318 0 : // 2. In an exact copy of the address space at the time of fork.
319 0 : // A variety of problems scan occur now:
320 0 : // 1. T tries to grab a lock that was locked at the time of fork.
321 0 : // It will wait forever since in its address space, the lock
322 0 : // is in state 'taken' but the thread that would unlock it is
323 0 : // not there.
324 0 : // 2. A rust object that represented some external resource in the
325 0 : // parent now got implicitly copied by the fork, even though
326 0 : // the object's type is not `Copy`. The parent program may use
327 0 : // non-copyability as way to enforce unique ownership of an
328 0 : // external resource in the typesystem. The fork breaks that
329 0 : // assumption, as now both parent and child process have an
330 0 : // owned instance of the object that represents the same
331 0 : // underlying resource.
332 0 : // While these seem like niche problems, (1) in particular is
333 0 : // highly relevant. For example, `malloc()` may grab a mutex internally,
334 0 : // and so, if we forked while another thread was mallocing' and our
335 0 : // pre_exec closure allocates as well, it will block on the malloc
336 0 : // mutex forever
337 0 : //
338 0 : // The proper solution is to only use C library functions that are marked
339 0 : // "async-signal-safe": https://man7.org/linux/man-pages/man7/signal-safety.7.html
340 0 : //
341 0 : // With this specific pre_exec() closure, the non-error path doesn't allocate.
342 0 : // The error path uses `anyhow`, and hence does allocate.
343 0 : // We take our chances there, hoping that any potential disaster is constrained
344 0 : // to the child process (e.g., malloc has no state ourside of the child process).
345 0 : // Last, `expect` prints to stderr, and stdio is not async-signal-safe.
346 0 : // Again, we take our chances, making the same assumptions as for malloc.
347 0 : unsafe {
348 0 : cmd.pre_exec(move || {
349 0 : let file = pid_file::claim_for_current_process(&path).expect("claim pid file");
350 0 : // Remove the FD_CLOEXEC flag on the pidfile descriptor so that the pidfile
351 0 : // remains locked after exec.
352 0 : nix::fcntl::fcntl(file.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
353 0 : .expect("remove FD_CLOEXEC");
354 0 : // Don't run drop(file), it would close the file before we actually exec.
355 0 : std::mem::forget(file);
356 0 : Ok(())
357 0 : });
358 0 : }
359 0 : cmd
360 0 : }
361 :
362 0 : async fn process_started<F, Fut>(
363 0 : pid: Pid,
364 0 : pid_file_to_check: &Utf8Path,
365 0 : status_check: &F,
366 0 : ) -> anyhow::Result<bool>
367 0 : where
368 0 : F: Fn() -> Fut,
369 0 : Fut: std::future::Future<Output = anyhow::Result<bool>>,
370 0 : {
371 0 : match status_check().await {
372 0 : Ok(true) => match pid_file::read(pid_file_to_check)? {
373 0 : PidFileRead::NotExist => Ok(false),
374 0 : PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid),
375 0 : PidFileRead::NotHeldByAnyProcess(_) => Ok(false),
376 : },
377 0 : Ok(false) => Ok(false),
378 0 : Err(e) => anyhow::bail!("process failed to start: {e}"),
379 : }
380 0 : }
381 :
382 0 : pub(crate) fn process_has_stopped(pid: Pid) -> anyhow::Result<bool> {
383 0 : match kill(pid, None) {
384 : // Process exists, keep waiting
385 0 : Ok(_) => Ok(false),
386 : // Process not found, we're done
387 0 : Err(Errno::ESRCH) => Ok(true),
388 0 : Err(err) => anyhow::bail!("Failed to send signal to process with pid {pid}: {err}"),
389 : }
390 0 : }
|