TLA Line data Source code
1 : //! Spawns and kills background processes that are needed by Neon CLI.
2 : //! Applies common set-up such as log and pid files (if needed) to every process.
3 : //!
4 : //! Neon CLI does not run in background, so it needs to store the information about
5 : //! spawned processes, which it does in this module.
6 : //! We do that by storing the pid of the process in the "${process_name}.pid" file.
7 : //! The pid file can be created by the process itself
8 : //! (Neon storage binaries do that and also ensure that a lock is taken onto that file)
9 : //! or we create such file after starting the process
10 : //! (non-Neon binaries don't necessarily follow our pidfile conventions).
11 : //! The pid stored in the file is later used to stop the service.
12 : //!
13 : //! See the [`lock_file`](utils::lock_file) module for more info.
14 :
15 : use std::ffi::OsStr;
16 : use std::io::Write;
17 : use std::os::unix::prelude::AsRawFd;
18 : use std::os::unix::process::CommandExt;
19 : use std::path::Path;
20 : use std::process::{Child, Command};
21 : use std::time::Duration;
22 : use std::{fs, io, thread};
23 :
24 : use anyhow::Context;
25 : use camino::{Utf8Path, Utf8PathBuf};
26 : use nix::errno::Errno;
27 : use nix::fcntl::{FcntlArg, FdFlag};
28 : use nix::sys::signal::{kill, Signal};
29 : use nix::unistd::Pid;
30 : use utils::pid_file::{self, PidFileRead};
31 :
32 : // These constants control the loop used to poll for process start / stop.
33 : //
34 : // The loop waits for at most 10 seconds, polling every 100 ms.
35 : // Once a second, it prints a dot ("."), to give the user an indication that
36 : // it's waiting. If the process hasn't started/stopped after 5 seconds,
37 : // it prints a notice that it's taking long, but keeps waiting.
38 : //
39 : const RETRY_UNTIL_SECS: u64 = 10;
40 : const RETRIES: u64 = (RETRY_UNTIL_SECS * 1000) / RETRY_INTERVAL_MILLIS;
41 : const RETRY_INTERVAL_MILLIS: u64 = 100;
42 : const DOT_EVERY_RETRIES: u64 = 10;
43 : const NOTICE_AFTER_RETRIES: u64 = 50;
44 :
45 : /// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates
46 : /// it itself.
47 : pub enum InitialPidFile<'t> {
48 : /// Create a pidfile, to allow future CLI invocations to manipulate the process.
49 : Create(&'t Utf8Path),
50 : /// The process will create the pidfile itself, need to wait for that event.
51 : Expect(&'t Utf8Path),
52 : }
53 :
54 : /// Start a background child process using the parameters given.
55 CBC 1078 : pub fn start_process<F, AI, A, EI>(
56 1078 : process_name: &str,
57 1078 : datadir: &Path,
58 1078 : command: &Path,
59 1078 : args: AI,
60 1078 : envs: EI,
61 1078 : initial_pid_file: InitialPidFile,
62 1078 : process_status_check: F,
63 1078 : ) -> anyhow::Result<Child>
64 1078 : where
65 1078 : F: Fn() -> anyhow::Result<bool>,
66 1078 : AI: IntoIterator<Item = A>,
67 1078 : A: AsRef<OsStr>,
68 1078 : // Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
69 1078 : EI: IntoIterator<Item = (String, String)>,
70 1078 : {
71 1078 : let log_path = datadir.join(format!("{process_name}.log"));
72 1078 : let process_log_file = fs::OpenOptions::new()
73 1078 : .create(true)
74 1078 : .write(true)
75 1078 : .append(true)
76 1078 : .open(&log_path)
77 1078 : .with_context(|| {
78 UBC 0 : format!("Could not open {process_name} log file {log_path:?} for writing")
79 CBC 1078 : })?;
80 1078 : let same_file_for_stderr = process_log_file.try_clone().with_context(|| {
81 UBC 0 : format!("Could not reuse {process_name} log file {log_path:?} for writing stderr")
82 CBC 1078 : })?;
83 :
84 1078 : let mut command = Command::new(command);
85 1078 : let background_command = command
86 1078 : .stdout(process_log_file)
87 1078 : .stderr(same_file_for_stderr)
88 1078 : .args(args);
89 1078 : let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
90 1078 : filled_cmd.envs(envs);
91 :
92 1078 : let pid_file_to_check = match initial_pid_file {
93 18 : InitialPidFile::Create(path) => {
94 18 : pre_exec_create_pidfile(filled_cmd, path);
95 18 : path
96 : }
97 1060 : InitialPidFile::Expect(path) => path,
98 : };
99 :
100 1078 : let mut spawned_process = filled_cmd.spawn().with_context(|| {
101 UBC 0 : format!("Could not spawn {process_name}, see console output and log files for details.")
102 CBC 1078 : })?;
103 1078 : let pid = spawned_process.id();
104 1078 : let pid = Pid::from_raw(
105 1078 : i32::try_from(pid)
106 1078 : .with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?,
107 : );
108 :
109 2195 : for retries in 0..RETRIES {
110 2195 : match process_started(pid, Some(pid_file_to_check), &process_status_check) {
111 : Ok(true) => {
112 1078 : println!("\n{process_name} started, pid: {pid}");
113 1078 : return Ok(spawned_process);
114 : }
115 : Ok(false) => {
116 1117 : if retries == NOTICE_AFTER_RETRIES {
117 UBC 0 : // The process is taking a long time to start up. Keep waiting, but
118 0 : // print a message
119 0 : print!("\n{process_name} has not started yet, continuing to wait");
120 CBC 1117 : }
121 1117 : if retries % DOT_EVERY_RETRIES == 0 {
122 1063 : print!(".");
123 1063 : io::stdout().flush().unwrap();
124 1063 : }
125 1117 : thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
126 : }
127 UBC 0 : Err(e) => {
128 0 : println!("{process_name} failed to start: {e:#}");
129 0 : if let Err(e) = spawned_process.kill() {
130 0 : println!("Could not stop {process_name} subprocess: {e:#}")
131 0 : };
132 0 : return Err(e);
133 : }
134 : }
135 : }
136 0 : println!();
137 0 : anyhow::bail!("{process_name} did not start in {RETRY_UNTIL_SECS} seconds");
138 CBC 1078 : }
139 :
140 : /// Stops the process, using the pid file given. Returns Ok also if the process is already not running.
141 1112 : pub fn stop_process(
142 1112 : immediate: bool,
143 1112 : process_name: &str,
144 1112 : pid_file: &Utf8Path,
145 1112 : ) -> anyhow::Result<()> {
146 1112 : let pid = match pid_file::read(pid_file)
147 1112 : .with_context(|| format!("read pid_file {pid_file:?}"))?
148 : {
149 : PidFileRead::NotExist => {
150 UBC 0 : println!("{process_name} is already stopped: no pid file present at {pid_file:?}");
151 0 : return Ok(());
152 : }
153 : PidFileRead::NotHeldByAnyProcess(_) => {
154 : // Don't try to kill according to file contents beacuse the pid might have been re-used by another process.
155 : // Don't delete the file either, it can race with new pid file creation.
156 : // Read `pid_file` module comment for details.
157 CBC 44 : println!(
158 44 : "No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: {pid_file:?}"
159 44 : );
160 44 : return Ok(());
161 : }
162 1068 : PidFileRead::LockedByOtherProcess(pid) => pid,
163 : };
164 : // XXX the pid could become invalid (and recycled) at any time before the kill() below.
165 :
166 : // send signal
167 1068 : let sig = if immediate {
168 810 : print!("Stopping {process_name} with pid {pid} immediately..");
169 810 : Signal::SIGQUIT
170 : } else {
171 258 : print!("Stopping {process_name} with pid {pid} gracefully..");
172 258 : Signal::SIGTERM
173 : };
174 1068 : io::stdout().flush().unwrap();
175 1068 : match kill(pid, sig) {
176 1068 : Ok(()) => (),
177 : Err(Errno::ESRCH) => {
178 : // Again, don't delete the pid file. The unlink can race with a new pid file being created.
179 UBC 0 : println!(
180 0 : "{process_name} with pid {pid} does not exist, but a pid file {pid_file:?} was found. Likely the pid got recycled. Lucky we didn't harm anyone."
181 0 : );
182 0 : return Ok(());
183 : }
184 0 : Err(e) => anyhow::bail!("Failed to send signal to {process_name} with pid {pid}: {e}"),
185 : }
186 :
187 : // Wait until process is gone
188 CBC 1068 : wait_until_stopped(process_name, pid)?;
189 1068 : Ok(())
190 1112 : }
191 :
192 1681 : pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> {
193 3005 : for retries in 0..RETRIES {
194 3005 : match process_has_stopped(pid) {
195 : Ok(true) => {
196 1681 : println!("\n{process_name} stopped");
197 1681 : return Ok(());
198 : }
199 : Ok(false) => {
200 1324 : if retries == NOTICE_AFTER_RETRIES {
201 UBC 0 : // The process is taking a long time to start up. Keep waiting, but
202 0 : // print a message
203 0 : print!("\n{process_name} has not stopped yet, continuing to wait");
204 CBC 1324 : }
205 1324 : if retries % DOT_EVERY_RETRIES == 0 {
206 1116 : print!(".");
207 1116 : io::stdout().flush().unwrap();
208 1116 : }
209 1324 : thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
210 : }
211 UBC 0 : Err(e) => {
212 0 : println!("{process_name} with pid {pid} failed to stop: {e:#}");
213 0 : return Err(e);
214 : }
215 : }
216 : }
217 0 : println!();
218 0 : anyhow::bail!("{process_name} with pid {pid} did not stop in {RETRY_UNTIL_SECS} seconds");
219 CBC 1681 : }
220 :
221 1078 : fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
222 1078 : // If RUST_BACKTRACE is set, pass it through. But if it's not set, default
223 1078 : // to RUST_BACKTRACE=1.
224 1078 : let backtrace_setting = std::env::var_os("RUST_BACKTRACE");
225 1078 : let backtrace_setting = backtrace_setting
226 1078 : .as_deref()
227 1078 : .unwrap_or_else(|| OsStr::new("1"));
228 1078 :
229 1078 : let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
230 :
231 : // Pass through these environment variables to the command
232 3234 : for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
233 3234 : if let Some(val) = std::env::var_os(var) {
234 1089 : filled_cmd = filled_cmd.env(var, val);
235 2145 : }
236 : }
237 :
238 1078 : filled_cmd
239 1078 : }
240 :
241 1078 : fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
242 5390 : for env_key in [
243 : "AWS_ACCESS_KEY_ID",
244 1078 : "AWS_SECRET_ACCESS_KEY",
245 1078 : "AWS_SESSION_TOKEN",
246 1078 : "AZURE_STORAGE_ACCOUNT",
247 1078 : "AZURE_STORAGE_ACCESS_KEY",
248 : ] {
249 5390 : if let Ok(value) = std::env::var(env_key) {
250 2156 : cmd = cmd.env(env_key, value);
251 3234 : }
252 : }
253 1078 : cmd
254 1078 : }
255 :
256 : /// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
257 : /// 1. Claims a pidfile with a fcntl lock on it and
258 : /// 2. Sets up the pidfile's file descriptor so that it (and the lock)
259 : /// will remain held until the cmd exits.
260 18 : fn pre_exec_create_pidfile<P>(cmd: &mut Command, path: P) -> &mut Command
261 18 : where
262 18 : P: Into<Utf8PathBuf>,
263 18 : {
264 18 : let path: Utf8PathBuf = path.into();
265 18 : // SAFETY
266 18 : // pre_exec is marked unsafe because it runs between fork and exec.
267 18 : // Why is that dangerous in various ways?
268 18 : // Long answer: https://github.com/rust-lang/rust/issues/39575
269 18 : // Short answer: in a multi-threaded program, other threads may have
270 18 : // been inside of critical sections at the time of fork. In the
271 18 : // original process, that was allright, assuming they protected
272 18 : // the critical sections appropriately, e.g., through locks.
273 18 : // Fork adds another process to the mix that
274 18 : // 1. Has a single thread T
275 18 : // 2. In an exact copy of the address space at the time of fork.
276 18 : // A variety of problems scan occur now:
277 18 : // 1. T tries to grab a lock that was locked at the time of fork.
278 18 : // It will wait forever since in its address space, the lock
279 18 : // is in state 'taken' but the thread that would unlock it is
280 18 : // not there.
281 18 : // 2. A rust object that represented some external resource in the
282 18 : // parent now got implicitly copied by the the fork, even though
283 18 : // the object's type is not `Copy`. The parent program may use
284 18 : // non-copyability as way to enforce unique ownership of an
285 18 : // external resource in the typesystem. The fork breaks that
286 18 : // assumption, as now both parent and child process have an
287 18 : // owned instance of the object that represents the same
288 18 : // underlying resource.
289 18 : // While these seem like niche problems, (1) in particular is
290 18 : // highly relevant. For example, `malloc()` may grab a mutex internally,
291 18 : // and so, if we forked while another thread was mallocing' and our
292 18 : // pre_exec closure allocates as well, it will block on the malloc
293 18 : // mutex forever
294 18 : //
295 18 : // The proper solution is to only use C library functions that are marked
296 18 : // "async-signal-safe": https://man7.org/linux/man-pages/man7/signal-safety.7.html
297 18 : //
298 18 : // With this specific pre_exec() closure, the non-error path doesn't allocate.
299 18 : // The error path uses `anyhow`, and hence does allocate.
300 18 : // We take our chances there, hoping that any potential disaster is constrained
301 18 : // to the child process (e.g., malloc has no state ourside of the child process).
302 18 : // Last, `expect` prints to stderr, and stdio is not async-signal-safe.
303 18 : // Again, we take our chances, making the same assumptions as for malloc.
304 18 : unsafe {
305 18 : cmd.pre_exec(move || {
306 UBC 0 : let file = pid_file::claim_for_current_process(&path).expect("claim pid file");
307 0 : // Remove the FD_CLOEXEC flag on the pidfile descriptor so that the pidfile
308 0 : // remains locked after exec.
309 0 : nix::fcntl::fcntl(file.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
310 0 : .expect("remove FD_CLOEXEC");
311 0 : // Don't run drop(file), it would close the file before we actually exec.
312 0 : std::mem::forget(file);
313 0 : Ok(())
314 CBC 18 : });
315 18 : }
316 18 : cmd
317 18 : }
318 :
319 2195 : fn process_started<F>(
320 2195 : pid: Pid,
321 2195 : pid_file_to_check: Option<&Utf8Path>,
322 2195 : status_check: &F,
323 2195 : ) -> anyhow::Result<bool>
324 2195 : where
325 2195 : F: Fn() -> anyhow::Result<bool>,
326 2195 : {
327 2195 : match status_check() {
328 1078 : Ok(true) => match pid_file_to_check {
329 1078 : Some(pid_file_path) => match pid_file::read(pid_file_path)? {
330 UBC 0 : PidFileRead::NotExist => Ok(false),
331 CBC 1078 : PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid),
332 UBC 0 : PidFileRead::NotHeldByAnyProcess(_) => Ok(false),
333 : },
334 0 : None => Ok(true),
335 : },
336 CBC 1117 : Ok(false) => Ok(false),
337 UBC 0 : Err(e) => anyhow::bail!("process failed to start: {e}"),
338 : }
339 CBC 2195 : }
340 :
341 : fn process_has_stopped(pid: Pid) -> anyhow::Result<bool> {
342 3005 : match kill(pid, None) {
343 : // Process exists, keep waiting
344 1324 : Ok(_) => Ok(false),
345 : // Process not found, we're done
346 1681 : Err(Errno::ESRCH) => Ok(true),
347 UBC 0 : Err(err) => anyhow::bail!("Failed to send signal to process with pid {pid}: {err}"),
348 : }
349 CBC 3005 : }
|