LCOV - differential code coverage report
Current view: top level - control_plane/src - background_process.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 81.6 % 228 186 42 186
Current Date: 2023-10-19 02:04:12 Functions: 42.4 % 33 14 19 14
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Spawns and kills background processes that are needed by Neon CLI.
       2                 : //! Applies common set-up such as log and pid files (if needed) to every process.
       3                 : //!
       4                 : //! Neon CLI does not run in background, so it needs to store the information about
       5                 : //! spawned processes, which it does in this module.
       6                 : //! We do that by storing the pid of the process in the "${process_name}.pid" file.
       7                 : //! The pid file can be created by the process itself
       8                 : //! (Neon storage binaries do that and also ensure that a lock is taken onto that file)
       9                 : //! or we create such file after starting the process
      10                 : //! (non-Neon binaries don't necessarily follow our pidfile conventions).
      11                 : //! The pid stored in the file is later used to stop the service.
      12                 : //!
      13                 : //! See the [`lock_file`](utils::lock_file) module for more info.
      14                 : 
      15                 : use std::ffi::OsStr;
      16                 : use std::io::Write;
      17                 : use std::os::unix::prelude::AsRawFd;
      18                 : use std::os::unix::process::CommandExt;
      19                 : use std::path::Path;
      20                 : use std::process::{Child, Command};
      21                 : use std::time::Duration;
      22                 : use std::{fs, io, thread};
      23                 : 
      24                 : use anyhow::Context;
      25                 : use camino::{Utf8Path, Utf8PathBuf};
      26                 : use nix::errno::Errno;
      27                 : use nix::fcntl::{FcntlArg, FdFlag};
      28                 : use nix::sys::signal::{kill, Signal};
      29                 : use nix::unistd::Pid;
      30                 : use utils::pid_file::{self, PidFileRead};
      31                 : 
      32                 : // These constants control the loop used to poll for process start / stop.
      33                 : //
      34                 : // The loop waits for at most 10 seconds, polling every 100 ms.
      35                 : // Once a second, it prints a dot ("."), to give the user an indication that
      36                 : // it's waiting. If the process hasn't started/stopped after 5 seconds,
      37                 : // it prints a notice that it's taking long, but keeps waiting.
      38                 : //
      39                 : const RETRY_UNTIL_SECS: u64 = 10;
      40                 : const RETRIES: u64 = (RETRY_UNTIL_SECS * 1000) / RETRY_INTERVAL_MILLIS;
      41                 : const RETRY_INTERVAL_MILLIS: u64 = 100;
      42                 : const DOT_EVERY_RETRIES: u64 = 10;
      43                 : const NOTICE_AFTER_RETRIES: u64 = 50;
      44                 : 
      45                 : /// Argument to `start_process`, to indicate whether it should create pidfile or if the process creates
      46                 : /// it itself.
      47                 : pub enum InitialPidFile<'t> {
      48                 :     /// Create a pidfile, to allow future CLI invocations to manipulate the process.
      49                 :     Create(&'t Utf8Path),
      50                 :     /// The process will create the pidfile itself, need to wait for that event.
      51                 :     Expect(&'t Utf8Path),
      52                 : }
      53                 : 
      54                 : /// Start a background child process using the parameters given.
      55 CBC        1078 : pub fn start_process<F, AI, A, EI>(
      56            1078 :     process_name: &str,
      57            1078 :     datadir: &Path,
      58            1078 :     command: &Path,
      59            1078 :     args: AI,
      60            1078 :     envs: EI,
      61            1078 :     initial_pid_file: InitialPidFile,
      62            1078 :     process_status_check: F,
      63            1078 : ) -> anyhow::Result<Child>
      64            1078 : where
      65            1078 :     F: Fn() -> anyhow::Result<bool>,
      66            1078 :     AI: IntoIterator<Item = A>,
      67            1078 :     A: AsRef<OsStr>,
      68            1078 :     // Not generic AsRef<OsStr>, otherwise empty `envs` prevents type inference
      69            1078 :     EI: IntoIterator<Item = (String, String)>,
      70            1078 : {
      71            1078 :     let log_path = datadir.join(format!("{process_name}.log"));
      72            1078 :     let process_log_file = fs::OpenOptions::new()
      73            1078 :         .create(true)
      74            1078 :         .write(true)
      75            1078 :         .append(true)
      76            1078 :         .open(&log_path)
      77            1078 :         .with_context(|| {
      78 UBC           0 :             format!("Could not open {process_name} log file {log_path:?} for writing")
      79 CBC        1078 :         })?;
      80            1078 :     let same_file_for_stderr = process_log_file.try_clone().with_context(|| {
      81 UBC           0 :         format!("Could not reuse {process_name} log file {log_path:?} for writing stderr")
      82 CBC        1078 :     })?;
      83                 : 
      84            1078 :     let mut command = Command::new(command);
      85            1078 :     let background_command = command
      86            1078 :         .stdout(process_log_file)
      87            1078 :         .stderr(same_file_for_stderr)
      88            1078 :         .args(args);
      89            1078 :     let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
      90            1078 :     filled_cmd.envs(envs);
      91                 : 
      92            1078 :     let pid_file_to_check = match initial_pid_file {
      93              18 :         InitialPidFile::Create(path) => {
      94              18 :             pre_exec_create_pidfile(filled_cmd, path);
      95              18 :             path
      96                 :         }
      97            1060 :         InitialPidFile::Expect(path) => path,
      98                 :     };
      99                 : 
     100            1078 :     let mut spawned_process = filled_cmd.spawn().with_context(|| {
     101 UBC           0 :         format!("Could not spawn {process_name}, see console output and log files for details.")
     102 CBC        1078 :     })?;
     103            1078 :     let pid = spawned_process.id();
     104            1078 :     let pid = Pid::from_raw(
     105            1078 :         i32::try_from(pid)
     106            1078 :             .with_context(|| format!("Subprocess {process_name} has invalid pid {pid}"))?,
     107                 :     );
     108                 : 
     109            2195 :     for retries in 0..RETRIES {
     110            2195 :         match process_started(pid, Some(pid_file_to_check), &process_status_check) {
     111                 :             Ok(true) => {
     112            1078 :                 println!("\n{process_name} started, pid: {pid}");
     113            1078 :                 return Ok(spawned_process);
     114                 :             }
     115                 :             Ok(false) => {
     116            1117 :                 if retries == NOTICE_AFTER_RETRIES {
     117 UBC           0 :                     // The process is taking a long time to start up. Keep waiting, but
     118               0 :                     // print a message
     119               0 :                     print!("\n{process_name} has not started yet, continuing to wait");
     120 CBC        1117 :                 }
     121            1117 :                 if retries % DOT_EVERY_RETRIES == 0 {
     122            1063 :                     print!(".");
     123            1063 :                     io::stdout().flush().unwrap();
     124            1063 :                 }
     125            1117 :                 thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
     126                 :             }
     127 UBC           0 :             Err(e) => {
     128               0 :                 println!("{process_name} failed to start: {e:#}");
     129               0 :                 if let Err(e) = spawned_process.kill() {
     130               0 :                     println!("Could not stop {process_name} subprocess: {e:#}")
     131               0 :                 };
     132               0 :                 return Err(e);
     133                 :             }
     134                 :         }
     135                 :     }
     136               0 :     println!();
     137               0 :     anyhow::bail!("{process_name} did not start in {RETRY_UNTIL_SECS} seconds");
     138 CBC        1078 : }
     139                 : 
     140                 : /// Stops the process, using the pid file given. Returns Ok also if the process is already not running.
     141            1112 : pub fn stop_process(
     142            1112 :     immediate: bool,
     143            1112 :     process_name: &str,
     144            1112 :     pid_file: &Utf8Path,
     145            1112 : ) -> anyhow::Result<()> {
     146            1112 :     let pid = match pid_file::read(pid_file)
     147            1112 :         .with_context(|| format!("read pid_file {pid_file:?}"))?
     148                 :     {
     149                 :         PidFileRead::NotExist => {
     150 UBC           0 :             println!("{process_name} is already stopped: no pid file present at {pid_file:?}");
     151               0 :             return Ok(());
     152                 :         }
     153                 :         PidFileRead::NotHeldByAnyProcess(_) => {
     154                 :             // Don't try to kill according to file contents beacuse the pid might have been re-used by another process.
     155                 :             // Don't delete the file either, it can race with new pid file creation.
     156                 :             // Read `pid_file` module comment for details.
     157 CBC          44 :             println!(
     158              44 :                 "No process is holding the pidfile. The process must have already exited. Leave in place to avoid race conditions: {pid_file:?}"
     159              44 :             );
     160              44 :             return Ok(());
     161                 :         }
     162            1068 :         PidFileRead::LockedByOtherProcess(pid) => pid,
     163                 :     };
     164                 :     // XXX the pid could become invalid (and recycled) at any time before the kill() below.
     165                 : 
     166                 :     // send signal
     167            1068 :     let sig = if immediate {
     168             810 :         print!("Stopping {process_name} with pid {pid} immediately..");
     169             810 :         Signal::SIGQUIT
     170                 :     } else {
     171             258 :         print!("Stopping {process_name} with pid {pid} gracefully..");
     172             258 :         Signal::SIGTERM
     173                 :     };
     174            1068 :     io::stdout().flush().unwrap();
     175            1068 :     match kill(pid, sig) {
     176            1068 :         Ok(()) => (),
     177                 :         Err(Errno::ESRCH) => {
     178                 :             // Again, don't delete the pid file. The unlink can race with a new pid file being created.
     179 UBC           0 :             println!(
     180               0 :                 "{process_name} with pid {pid} does not exist, but a pid file {pid_file:?} was found. Likely the pid got recycled. Lucky we didn't harm anyone."
     181               0 :             );
     182               0 :             return Ok(());
     183                 :         }
     184               0 :         Err(e) => anyhow::bail!("Failed to send signal to {process_name} with pid {pid}: {e}"),
     185                 :     }
     186                 : 
     187                 :     // Wait until process is gone
     188 CBC        1068 :     wait_until_stopped(process_name, pid)?;
     189            1068 :     Ok(())
     190            1112 : }
     191                 : 
     192            1681 : pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> {
     193            3005 :     for retries in 0..RETRIES {
     194            3005 :         match process_has_stopped(pid) {
     195                 :             Ok(true) => {
     196            1681 :                 println!("\n{process_name} stopped");
     197            1681 :                 return Ok(());
     198                 :             }
     199                 :             Ok(false) => {
     200            1324 :                 if retries == NOTICE_AFTER_RETRIES {
     201 UBC           0 :                     // The process is taking a long time to start up. Keep waiting, but
     202               0 :                     // print a message
     203               0 :                     print!("\n{process_name} has not stopped yet, continuing to wait");
     204 CBC        1324 :                 }
     205            1324 :                 if retries % DOT_EVERY_RETRIES == 0 {
     206            1116 :                     print!(".");
     207            1116 :                     io::stdout().flush().unwrap();
     208            1116 :                 }
     209            1324 :                 thread::sleep(Duration::from_millis(RETRY_INTERVAL_MILLIS));
     210                 :             }
     211 UBC           0 :             Err(e) => {
     212               0 :                 println!("{process_name} with pid {pid} failed to stop: {e:#}");
     213               0 :                 return Err(e);
     214                 :             }
     215                 :         }
     216                 :     }
     217               0 :     println!();
     218               0 :     anyhow::bail!("{process_name} with pid {pid} did not stop in {RETRY_UNTIL_SECS} seconds");
     219 CBC        1681 : }
     220                 : 
     221            1078 : fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command {
     222            1078 :     // If RUST_BACKTRACE is set, pass it through. But if it's not set, default
     223            1078 :     // to RUST_BACKTRACE=1.
     224            1078 :     let backtrace_setting = std::env::var_os("RUST_BACKTRACE");
     225            1078 :     let backtrace_setting = backtrace_setting
     226            1078 :         .as_deref()
     227            1078 :         .unwrap_or_else(|| OsStr::new("1"));
     228            1078 : 
     229            1078 :     let mut filled_cmd = cmd.env_clear().env("RUST_BACKTRACE", backtrace_setting);
     230                 : 
     231                 :     // Pass through these environment variables to the command
     232            3234 :     for var in ["LLVM_PROFILE_FILE", "FAILPOINTS", "RUST_LOG"] {
     233            3234 :         if let Some(val) = std::env::var_os(var) {
     234            1089 :             filled_cmd = filled_cmd.env(var, val);
     235            2145 :         }
     236                 :     }
     237                 : 
     238            1078 :     filled_cmd
     239            1078 : }
     240                 : 
     241            1078 : fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
     242            5390 :     for env_key in [
     243                 :         "AWS_ACCESS_KEY_ID",
     244            1078 :         "AWS_SECRET_ACCESS_KEY",
     245            1078 :         "AWS_SESSION_TOKEN",
     246            1078 :         "AZURE_STORAGE_ACCOUNT",
     247            1078 :         "AZURE_STORAGE_ACCESS_KEY",
     248                 :     ] {
     249            5390 :         if let Ok(value) = std::env::var(env_key) {
     250            2156 :             cmd = cmd.env(env_key, value);
     251            3234 :         }
     252                 :     }
     253            1078 :     cmd
     254            1078 : }
     255                 : 
     256                 : /// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
     257                 : /// 1. Claims a pidfile with a fcntl lock on it and
     258                 : /// 2. Sets up the pidfile's file descriptor so that it (and the lock)
     259                 : ///    will remain held until the cmd exits.
     260              18 : fn pre_exec_create_pidfile<P>(cmd: &mut Command, path: P) -> &mut Command
     261              18 : where
     262              18 :     P: Into<Utf8PathBuf>,
     263              18 : {
     264              18 :     let path: Utf8PathBuf = path.into();
     265              18 :     // SAFETY
     266              18 :     // pre_exec is marked unsafe because it runs between fork and exec.
     267              18 :     // Why is that dangerous in various ways?
     268              18 :     // Long answer:  https://github.com/rust-lang/rust/issues/39575
     269              18 :     // Short answer: in a multi-threaded program, other threads may have
     270              18 :     // been inside of critical sections at the time of fork. In the
     271              18 :     // original process, that was allright, assuming they protected
     272              18 :     // the critical sections appropriately, e.g., through locks.
     273              18 :     // Fork adds another process to the mix that
     274              18 :     //   1. Has a single thread T
     275              18 :     //   2. In an exact copy of the address space at the time of fork.
     276              18 :     // A variety of problems scan occur now:
     277              18 :     //   1. T tries to grab a lock that was locked at the time of fork.
     278              18 :     //      It will wait forever since in its address space, the lock
     279              18 :     //      is in state 'taken' but the thread that would unlock it is
     280              18 :     //      not there.
     281              18 :     //   2. A rust object that represented some external resource in the
     282              18 :     //      parent now got implicitly copied by the the fork, even though
     283              18 :     //      the object's type is not `Copy`. The parent program may use
     284              18 :     //      non-copyability as way to enforce unique ownership of an
     285              18 :     //      external resource in the typesystem. The fork breaks that
     286              18 :     //      assumption, as now both parent and child process have an
     287              18 :     //      owned instance of the object that represents the same
     288              18 :     //      underlying resource.
     289              18 :     // While these seem like niche problems, (1) in particular is
     290              18 :     // highly relevant. For example, `malloc()` may grab a mutex internally,
     291              18 :     // and so, if we forked while another thread was mallocing' and our
     292              18 :     // pre_exec closure allocates as well, it will block on the malloc
     293              18 :     // mutex forever
     294              18 :     //
     295              18 :     // The proper solution is to only use C library functions that are marked
     296              18 :     // "async-signal-safe": https://man7.org/linux/man-pages/man7/signal-safety.7.html
     297              18 :     //
     298              18 :     // With this specific pre_exec() closure, the non-error path doesn't allocate.
     299              18 :     // The error path uses `anyhow`, and hence does allocate.
     300              18 :     // We take our chances there, hoping that any potential disaster is constrained
     301              18 :     // to the child process (e.g., malloc has no state ourside of the child process).
     302              18 :     // Last, `expect` prints to stderr, and stdio is not async-signal-safe.
     303              18 :     // Again, we take our chances, making the same assumptions as for malloc.
     304              18 :     unsafe {
     305              18 :         cmd.pre_exec(move || {
     306 UBC           0 :             let file = pid_file::claim_for_current_process(&path).expect("claim pid file");
     307               0 :             // Remove the FD_CLOEXEC flag on the pidfile descriptor so that the pidfile
     308               0 :             // remains locked after exec.
     309               0 :             nix::fcntl::fcntl(file.as_raw_fd(), FcntlArg::F_SETFD(FdFlag::empty()))
     310               0 :                 .expect("remove FD_CLOEXEC");
     311               0 :             // Don't run drop(file), it would close the file before we actually exec.
     312               0 :             std::mem::forget(file);
     313               0 :             Ok(())
     314 CBC          18 :         });
     315              18 :     }
     316              18 :     cmd
     317              18 : }
     318                 : 
     319            2195 : fn process_started<F>(
     320            2195 :     pid: Pid,
     321            2195 :     pid_file_to_check: Option<&Utf8Path>,
     322            2195 :     status_check: &F,
     323            2195 : ) -> anyhow::Result<bool>
     324            2195 : where
     325            2195 :     F: Fn() -> anyhow::Result<bool>,
     326            2195 : {
     327            2195 :     match status_check() {
     328            1078 :         Ok(true) => match pid_file_to_check {
     329            1078 :             Some(pid_file_path) => match pid_file::read(pid_file_path)? {
     330 UBC           0 :                 PidFileRead::NotExist => Ok(false),
     331 CBC        1078 :                 PidFileRead::LockedByOtherProcess(pid_in_file) => Ok(pid_in_file == pid),
     332 UBC           0 :                 PidFileRead::NotHeldByAnyProcess(_) => Ok(false),
     333                 :             },
     334               0 :             None => Ok(true),
     335                 :         },
     336 CBC        1117 :         Ok(false) => Ok(false),
     337 UBC           0 :         Err(e) => anyhow::bail!("process failed to start: {e}"),
     338                 :     }
     339 CBC        2195 : }
     340                 : 
     341                 : fn process_has_stopped(pid: Pid) -> anyhow::Result<bool> {
     342            3005 :     match kill(pid, None) {
     343                 :         // Process exists, keep waiting
     344            1324 :         Ok(_) => Ok(false),
     345                 :         // Process not found, we're done
     346            1681 :         Err(Errno::ESRCH) => Ok(true),
     347 UBC           0 :         Err(err) => anyhow::bail!("Failed to send signal to process with pid {pid}: {err}"),
     348                 :     }
     349 CBC        3005 : }
        

Generated by: LCOV version 2.1-beta