TLA Line data Source code
1 : //! Spawns and kills background processes that are needed by Neon CLI.
2 : //! Applies common set-up such as log and pid files (if needed) to every process.
3 : //!
4 : //! Neon CLI does not run in background, so it needs to store the information about
5 : //! spawned processes, which it does in this module.
6 : //! We do that by storing the pid of the process in the "${process_name}.pid" file.
7 : //! The pid file can be created by the process itself
8 : //! (Neon storage binaries do that and also ensure that a lock is taken onto that file)
9 : //! or we create such file after starting the process
10 : //! (non-Neon binaries don't necessarily follow our pidfile conventions).
11 : //! The pid stored in the file is later used to stop the service.
12 : //!
13 : //! See the [`lock_file`](utils::lock_file) module for more info.
14 :
15 : use std::ffi::OsStr;
16 : use std::io::Write;
17 : use std::os::unix::prelude::AsRawFd;
18 : use std::os::unix::process::CommandExt;
19 : use std::path::Path;
20 : use std::process::{Child, Command};
21 : use std::time::Duration;
22 : use std::{fs, io, thread};
23 :
24 : use anyhow::Context;
25 : use camino::{Utf8Path, Utf8PathBuf};
26 : use nix::errno::Errno;
27 : use nix::fcntl::{FcntlArg, FdFlag};
28 : use nix::sys::signal::{kill, Signal};
29 : use nix::unistd::Pid;
30 : use utils::pid_file::{self, PidFileRead};
31 :
32 : // These constants control the loop used to poll for process start / stop.
33 : //
34 : // The loop waits for at most 10 seconds, polling every 100 ms.
35 : // Once a second, it prints a dot ("."), to give the user an indication that
36 : // it's waiting. If the process hasn't started/stopped after 5 seconds,
37 : // it prints a notice that it's taking long, but keeps waiting.
38 : //
39 : const RETRY_UNTIL_SECS: u64 = 10;
40 : const RETRIES: u64 = (RETRY_UNTIL_SECS * 1000) / RETRY_INTERVAL_MILLIS;
41 : const RETRY_INTERVAL_MILLIS: u64 = 100;
42 : const DOT_EVERY_RETRIES: u64 = 10;
43 : const NOTICE_AFTER_RETRIES: u64 = 50;
44 :
45 : /// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates
46 : /// it itself.
47 : pub enum InitialPidFile {
48 : /// Create a pidfile, to allow future CLI invocations to manipulate the process.
49 : Create(Utf8PathBuf),
50 : /// The process will create the pidfile itself, need to wait for that event.
51 : Expect(Utf8PathBuf),
52 : }
53 :
54 : /// Start a background child process using the parameters given.
55 CBC 1381 : pub async fn start_process<F, Fut, AI, A, EI>(
56 1381 : process_name: &str,
57 1381 : datadir: &Path,
58 1381 : command: &Path,
59 1381 : args: AI,
60 1381 : envs: EI,
61 1381 : initial_pid_file: InitialPidFile,
62 1381 : process_status_check: F,
63 1381 : ) -> anyhow::Result<Child>
64 1381 : where
65 1381 : F: Fn() -> Fut,
66 1381 : Fut: std::future::Future<Output = anyhow::Result<bool>>,
67 1381 : AI: IntoIterator<Item = A>,
68 1381 : A: AsRef<OsStr>,
69 1381 : // Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
70 1381 : EI: IntoIterator<Item = (String, String)>,
71 1381 : {
72 1381 : let log_path = datadir.join(format!("{process_name}.log"));
73 1381 : let process_log_file = fs::OpenOptions::new()
74 1381 : .create(true)
75 1381 : .write(true)
76 1381 : .append(true)
77 1381 : .open(&log_path)
78 1381 : .with_context(|| {
79 UBC 0 : format!("Could not open {process_name} log file {log_path:?} for writing")
80 CBC 1381 : })?;
81 1381 : let same_file_for_stderr = process_log_file.try_clone().with_context(|| {
82 UBC 0 : format!("Could not reuse {process_name} log file {log_path:?} for writing stderr")
83 CBC 1381 : })?;
84 :
85 1381 : let mut command = Command::new(command);
86 1381 : let background_command = command
87 1381 : .stdout(process_log_file)
88 1381 : .stderr(same_file_for_stderr)
89 1381 : .args(args);
90 1381 : let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
91 1381 : filled_cmd.envs(envs);
92 :
93 1381 : let pid_file_to_check = match &initial_pid_file {
94 339 : InitialPidFile::Create(path) => {
95 339 : pre_exec_create_pidfile(filled_cmd, path);
96 339 : path
97 : }
98 1042 : InitialPidFile::Expect(path) => path,
99 : };
100 :
101 1381 : let mut spawned_process = filled_cmd.spawn().with_context(|| {
102 UBC 0 : format!("Could not spawn {process_name}, see console output and log files for details.")
103 CBC 1381 : })?;
104 1381 : let pid = spawned_process.id();
105 1381 : let pid = Pid::from_raw(
106 1381 : i32::try_from(pid)
107 1381 : .with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?,
108 : );
109 :
110 2442 : for retries in 0..RETRIES {
111 5317 : match process_started(pid, pid_file_to_check, &process_status_check).await {
112 : Ok(true) => {
113 1381 : println!("\n{process_name} started, pid: {pid}");
114 1381 : return Ok(spawned_process);
115 : }
116 : Ok(false) => {
117 1061 : if retries == NOTICE_AFTER_RETRIES {
118 UBC 0 : // The process is taking a long time to start up. Keep waiting, but
119 0 : // print a message
120 0 : print!("\n{process_name} has not started yet, continuing to wait");
121 CBC 1061 : }
122 1061 : if retries % DOT_EVERY_RETRIES == 0 {
123 1043 : print!(".");
124 1043 : io::stdout().flush().unwrap();
125 1043 : }
126 1061 : thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
127 : }
128 UBC 0 : Err(e) => {
129 0 : println!("{process_name} failed to start: {e:#}");
130 0 : if let Err(e) = spawned_process.kill() {
131 0 : println!("Could not stop {process_name} subprocess: {e:#}")
132 0 : };
133 0 : return Err(e);
134 : }
135 : }
136 : }
137 0 : println!();
138 0 : anyhow::bail!("{process_name} did not start in {RETRY_UNTIL_SECS} seconds");
139 CBC 1381 : }
140 :
141 : /// Stops the process, using the pid file given. Returns Ok also if the process is already not running.
142 1418 : pub fn stop_process(
143 1418 : immediate: bool,
144 1418 : process_name: &str,
145 1418 : pid_file: &Utf8Path,
146 1418 : ) -> anyhow::Result<()> {
147 1418 : let pid = match pid_file::read(pid_file)
148 1418 : .with_context(|| format!("read pid_file {pid_file:?}"))?
149 : {
150 : PidFileRead::NotExist => {
151 UBC 0 : println!("{process_name} is already stopped: no pid file present at {pid_file:?}");
152 0 : return Ok(());
153 : }
154 : PidFileRead::NotHeldByAnyProcess(_) => {
155 : // Don't try to kill according to file contents beacuse the pid might have been re-used by another process.
156 : // Don't delete the file either, it can race with new pid file creation.
157 : // Read `pid_file` module comment for details.
158 CBC 48 : println!(
159 48 : "No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: {pid_file:?}"
160 48 : );
161 48 : return Ok(());
162 : }
163 1370 : PidFileRead::LockedByOtherProcess(pid) => pid,
164 : };
165 : // XXX the pid could become invalid (and recycled) at any time before the kill() below.
166 :
167 : // send signal
168 1370 : let sig = if immediate {
169 1094 : print!("Stopping {process_name} with pid {pid} immediately..");
170 1094 : Signal::SIGQUIT
171 : } else {
172 276 : print!("Stopping {process_name} with pid {pid} gracefully..");
173 276 : Signal::SIGTERM
174 : };
175 1370 : io::stdout().flush().unwrap();
176 1370 : match kill(pid, sig) {
177 1370 : Ok(()) => (),
178 : Err(Errno::ESRCH) => {
179 : // Again, don't delete the pid file. The unlink can race with a new pid file being created.
180 UBC 0 : println!(
181 0 : "{process_name} with pid {pid} does not exist, but a pid file {pid_file:?} was found. Likely the pid got recycled. Lucky we didn't harm anyone."
182 0 : );
183 0 : return Ok(());
184 : }
185 0 : Err(e) => anyhow::bail!("Failed to send signal to {process_name} with pid {pid}: {e}"),
186 : }
187 :
188 : // Wait until process is gone
189 CBC 1370 : wait_until_stopped(process_name, pid)?;
190 1370 : Ok(())
191 1418 : }
192 :
193 1906 : pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> {
194 3424 : for retries in 0..RETRIES {
195 3424 : match process_has_stopped(pid) {
196 : Ok(true) => {
197 1906 : println!("\n{process_name} stopped");
198 1906 : return Ok(());
199 : }
200 : Ok(false) => {
201 1518 : if retries == NOTICE_AFTER_RETRIES {
202 UBC 0 : // The process is taking a long time to start up. Keep waiting, but
203 0 : // print a message
204 0 : print!("\n{process_name} has not stopped yet, continuing to wait");
205 CBC 1518 : }
206 1518 : if retries % DOT_EVERY_RETRIES == 0 {
207 1384 : print!(".");
208 1384 : io::stdout().flush().unwrap();
209 1384 : }
210 1518 : thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
211 : }
212 UBC 0 : Err(e) => {
213 0 : println!("{process_name} with pid {pid} failed to stop: {e:#}");
214 0 : return Err(e);
215 : }
216 : }
217 : }
218 0 : println!();
219 0 : anyhow::bail!("{process_name} with pid {pid} did not stop in {RETRY_UNTIL_SECS} seconds");
220 CBC 1906 : }
221 :
222 1381 : fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
223 1381 : // If RUST_BACKTRACE is set, pass it through. But if it's not set, default
224 1381 : // to RUST_BACKTRACE=1.
225 1381 : let backtrace_setting = std::env::var_os("RUST_BACKTRACE");
226 1381 : let backtrace_setting = backtrace_setting
227 1381 : .as_deref()
228 1381 : .unwrap_or_else(|| OsStr::new("1"));
229 1381 :
230 1381 : let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
231 :
232 : // Pass through these environment variables to the command
233 4143 : for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
234 4143 : if let Some(val) = std::env::var_os(var) {
235 1393 : filled_cmd = filled_cmd.env(var, val);
236 2750 : }
237 : }
238 :
239 1381 : filled_cmd
240 1381 : }
241 :
242 1381 : fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
243 6905 : for env_key in [
244 : "AWS_ACCESS_KEY_ID",
245 1381 : "AWS_SECRET_ACCESS_KEY",
246 1381 : "AWS_SESSION_TOKEN",
247 1381 : "AZURE_STORAGE_ACCOUNT",
248 1381 : "AZURE_STORAGE_ACCESS_KEY",
249 : ] {
250 6905 : if let Ok(value) = std::env::var(env_key) {
251 2762 : cmd = cmd.env(env_key, value);
252 4143 : }
253 : }
254 1381 : cmd
255 1381 : }
256 :
257 : /// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
258 : /// 1. Claims a pidfile with a fcntl lock on it and
259 : /// 2. Sets up the pidfile's file descriptor so that it (and the lock)
260 : /// will remain held until the cmd exits.
261 339 : fn pre_exec_create_pidfile<P>(cmd: &mut Command, path: P) -> &mut Command
262 339 : where
263 339 : P: Into<Utf8PathBuf>,
264 339 : {
265 339 : let path: Utf8PathBuf = path.into();
266 339 : // SAFETY:
267 339 : // pre_exec is marked unsafe because it runs between fork and exec.
268 339 : // Why is that dangerous in various ways?
269 339 : // Long answer: https://github.com/rust-lang/rust/issues/39575
270 339 : // Short answer: in a multi-threaded program, other threads may have
271 339 : // been inside of critical sections at the time of fork. In the
272 339 : // original process, that was allright, assuming they protected
273 339 : // the critical sections appropriately, e.g., through locks.
274 339 : // Fork adds another process to the mix that
275 339 : // 1. Has a single thread T
276 339 : // 2. In an exact copy of the address space at the time of fork.
277 339 : // A variety of problems scan occur now:
278 339 : // 1. T tries to grab a lock that was locked at the time of fork.
279 339 : // It will wait forever since in its address space, the lock
280 339 : // is in state 'taken' but the thread that would unlock it is
281 339 : // not there.
282 339 : // 2. A rust object that represented some external resource in the
283 339 : // parent now got implicitly copied by the the fork, even though
284 339 : // the object's type is not `Copy`. The parent program may use
285 339 : // non-copyability as way to enforce unique ownership of an
286 339 : // external resource in the typesystem. The fork breaks that
287 339 : // assumption, as now both parent and child process have an
288 339 : // owned instance of the object that represents the same
289 339 : // underlying resource.
290 339 : // While these seem like niche problems, (1) in particular is
291 339 : // highly relevant. For example, `malloc()` may grab a mutex internally,
292 339 : // and so, if we forked while another thread was mallocing' and our
293 339 : // pre_exec closure allocates as well, it will block on the malloc
294 339 : // mutex forever
295 339 : //
296 339 : // The proper solution is to only use C library functions that are marked
297 339 : // "async-signal-safe": https://man7.org/linux/man-pages/man7/signal-safety.7.html
298 339 : //
299 339 : // With this specific pre_exec() closure, the non-error path doesn't allocate.
300 339 : // The error path uses `anyhow`, and hence does allocate.
301 339 : // We take our chances there, hoping that any potential disaster is constrained
302 339 : // to the child process (e.g., malloc has no state ourside of the child process).
303 339 : // Last, `expect` prints to stderr, and stdio is not async-signal-safe.
304 339 : // Again, we take our chances, making the same assumptions as for malloc.
305 339 : unsafe {
306 339 : cmd.pre_exec(move || {
307 UBC 0 : let file = pid_file::claim_for_current_process(&path).expect("claim pid file");
308 0 : // Remove the FD_CLOEXEC flag on the pidfile descriptor so that the pidfile
309 0 : // remains locked after exec.
310 0 : nix::fcntl::fcntl(file.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
311 0 : .expect("remove FD_CLOEXEC");
312 0 : // Don't run drop(file), it would close the file before we actually exec.
313 0 : std::mem::forget(file);
314 0 : Ok(())
315 CBC 339 : });
316 339 : }
317 339 : cmd
318 339 : }
319 :
320 2442 : async fn process_started<F, Fut>(
321 2442 : pid: Pid,
322 2442 : pid_file_to_check: &Utf8Path,
323 2442 : status_check: &F,
324 2442 : ) -> anyhow::Result<bool>
325 2442 : where
326 2442 : F: Fn() -> Fut,
327 2442 : Fut: std::future::Future<Output = anyhow::Result<bool>>,
328 2442 : {
329 5317 : match status_check().await {
330 1381 : Ok(true) => match pid_file::read(pid_file_to_check)? {
331 UBC 0 : PidFileRead::NotExist => Ok(false),
332 CBC 1381 : PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid),
333 UBC 0 : PidFileRead::NotHeldByAnyProcess(_) => Ok(false),
334 : },
335 CBC 1061 : Ok(false) => Ok(false),
336 UBC 0 : Err(e) => anyhow::bail!("process failed to start: {e}"),
337 : }
338 CBC 2442 : }
339 :
340 3424 : fn process_has_stopped(pid: Pid) -> anyhow::Result<bool> {
341 3424 : match kill(pid, None) {
342 : // Process exists, keep waiting
343 1518 : Ok(_) => Ok(false),
344 : // Process not found, we're done
345 1906 : Err(Errno::ESRCH) => Ok(true),
346 UBC 0 : Err(err) => anyhow::bail!("Failed to send signal to process with pid {pid}: {err}"),
347 : }
348 CBC 3424 : }
|