LCOV - code coverage report
Current view: top level - pageserver/src/walredo/process - no_leak_child.rs (source / functions) Coverage Total Hit
Test: 960803fca14b2e843c565dddf575f7017d250bc3.info Lines: 63.4 % 41 26
Test Date: 2024-06-22 23:41:44 Functions: 77.8 % 9 7

            Line data    Source code
       1              : use tracing::instrument;
       2              : use tracing::{error, info};
       3              : 
       4              : use crate::metrics::WalRedoKillCause;
       5              : use crate::metrics::WAL_REDO_PROCESS_COUNTERS;
       6              : 
       7              : use std::io;
       8              : use std::process::Command;
       9              : 
      10              : use std::ops::DerefMut;
      11              : 
      12              : use std::ops::Deref;
      13              : 
      14              : use std::process::Child;
      15              : 
      16              : use pageserver_api::shard::TenantShardId;
      17              : 
      18              : /// Wrapper type around `std::process::Child` which guarantees that the child
      19              : /// will be killed and waited-for by this process before being dropped.
      20              : pub(crate) struct NoLeakChild {
      21              :     pub(crate) tenant_id: TenantShardId,
      22              :     pub(crate) child: Option<Child>,
      23              : }
      24              : 
      25              : impl Deref for NoLeakChild {
      26              :     type Target = Child;
      27              : 
      28           24 :     fn deref(&self) -> &Self::Target {
      29           24 :         self.child.as_ref().expect("must not use from drop")
      30           24 :     }
      31              : }
      32              : 
      33              : impl DerefMut for NoLeakChild {
      34           24 :     fn deref_mut(&mut self) -> &mut Self::Target {
      35           24 :         self.child.as_mut().expect("must not use from drop")
      36           24 :     }
      37              : }
      38              : 
      39              : impl NoLeakChild {
      40            8 :     pub(crate) fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result<Self> {
      41            8 :         let child = command.spawn()?;
      42            8 :         Ok(NoLeakChild {
      43            8 :             tenant_id,
      44            8 :             child: Some(child),
      45            8 :         })
      46            8 :     }
      47              : 
      48            8 :     pub(crate) fn kill_and_wait(mut self, cause: WalRedoKillCause) {
      49            8 :         let child = match self.child.take() {
      50            8 :             Some(child) => child,
      51            0 :             None => return,
      52              :         };
      53            8 :         Self::kill_and_wait_impl(child, cause);
      54            8 :     }
      55              : 
      56            8 :     #[instrument(skip_all, fields(pid=child.id(), ?cause))]
      57              :     pub(crate) fn kill_and_wait_impl(mut child: Child, cause: WalRedoKillCause) {
      58              :         scopeguard::defer! {
      59              :             WAL_REDO_PROCESS_COUNTERS.killed_by_cause[cause].inc();
      60              :         }
      61              :         let res = child.kill();
      62              :         if let Err(e) = res {
      63              :             // This branch is very unlikely because:
      64              :             // - We (= pageserver) spawned this process successfully, so, we're allowed to kill it.
      65              :             // - This is the only place that calls .kill()
      66              :             // - We consume `self`, so, .kill() can't be called twice.
      67              :             // - If the process exited by itself or was killed by someone else,
      68              :             //   .kill() will still succeed because we haven't wait()'ed yet.
      69              :             //
      70              :             // So, if we arrive here, we have really no idea what happened,
      71              :             // whether the PID stored in self.child is still valid, etc.
      72              :             // If this function were fallible, we'd return an error, but
      73              :             // since it isn't, all we can do is log an error and proceed
      74              :             // with the wait().
      75              :             error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
      76              :         }
      77              : 
      78              :         match child.wait() {
      79              :             Ok(exit_status) => {
      80              :                 info!(exit_status = %exit_status, "wait successful");
      81              :             }
      82              :             Err(e) => {
      83              :                 error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)");
      84              :             }
      85              :         }
      86              :     }
      87              : }
      88              : 
      89              : impl Drop for NoLeakChild {
      90            8 :     fn drop(&mut self) {
      91            8 :         let child = match self.child.take() {
      92            0 :             Some(child) => child,
      93            8 :             None => return,
      94              :         };
      95            0 :         let tenant_shard_id = self.tenant_id;
      96            0 :         // Offload the kill+wait of the child process into the background.
      97            0 :         // If someone stops the runtime, we'll leak the child process.
      98            0 :         // We can ignore that case because we only stop the runtime on pageserver exit.
      99            0 :         tokio::runtime::Handle::current().spawn(async move {
     100            0 :             tokio::task::spawn_blocking(move || {
     101              :                 // Intentionally don't inherit the tracing context from whoever is dropping us.
     102              :                 // This thread here is going to outlive of our dropper.
     103            0 :                 let span = tracing::info_span!(
     104              :                     "walredo",
     105              :                     tenant_id = %tenant_shard_id.tenant_id,
     106            0 :                     shard_id = %tenant_shard_id.shard_slug()
     107              :                 );
     108            0 :                 let _entered = span.enter();
     109            0 :                 Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
     110            0 :             })
     111            0 :             .await
     112            0 :         });
     113            8 :     }
     114              : }
     115              : 
     116              : pub(crate) trait NoLeakChildCommandExt {
     117              :     fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild>;
     118              : }
     119              : 
     120              : impl NoLeakChildCommandExt for Command {
     121            8 :     fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild> {
     122            8 :         NoLeakChild::spawn(tenant_id, self)
     123            8 :     }
     124              : }
        

Generated by: LCOV version 2.1-beta