LCOV - code coverage report
Current view: top level - pageserver/src/walredo/process - no_leak_child.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 63.8 % 47 30
Test Date: 2024-02-07 07:37:29 Functions: 69.2 % 13 9

            Line data    Source code
       1              : use tracing;
       2              : use tracing::error;
       3              : use tracing::info;
       4              : use tracing::instrument;
       5              : 
       6              : use crate::metrics::WalRedoKillCause;
       7              : use crate::metrics::WAL_REDO_PROCESS_COUNTERS;
       8              : 
       9              : use std::io;
      10              : use std::process::Command;
      11              : 
      12              : use std::ops::DerefMut;
      13              : 
      14              : use std::ops::Deref;
      15              : 
      16              : use std::process::Child;
      17              : 
      18              : use pageserver_api::shard::TenantShardId;
      19              : 
      20              : /// Wrapper type around `std::process::Child` which guarantees that the child
      21              : /// will be killed and waited-for by this process before being dropped.
      22              : pub(crate) struct NoLeakChild {
      23              :     pub(crate) tenant_id: TenantShardId,
      24              :     pub(crate) child: Option<Child>,
      25              : }
      26              : 
      27              : impl Deref for NoLeakChild {
      28              :     type Target = Child;
      29              : 
      30      2858010 :     fn deref(&self) -> &Self::Target {
      31      2858010 :         self.child.as_ref().expect("must not use from drop")
      32      2858010 :     }
      33              : }
      34              : 
      35              : impl DerefMut for NoLeakChild {
      36         1812 :     fn deref_mut(&mut self) -> &mut Self::Target {
      37         1812 :         self.child.as_mut().expect("must not use from drop")
      38         1812 :     }
      39              : }
      40              : 
      41              : impl NoLeakChild {
      42          604 :     pub(crate) fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result<Self> {
      43          604 :         let child = command.spawn()?;
      44          604 :         Ok(NoLeakChild {
      45          604 :             tenant_id,
      46          604 :             child: Some(child),
      47          604 :         })
      48          604 :     }
      49              : 
      50          289 :     pub(crate) fn kill_and_wait(mut self, cause: WalRedoKillCause) {
      51          289 :         let child = match self.child.take() {
      52          289 :             Some(child) => child,
      53            0 :             None => return,
      54              :         };
      55          289 :         Self::kill_and_wait_impl(child, cause);
      56          289 :     }
      57              : 
      58          289 :     #[instrument(skip_all, fields(pid=child.id(), ?cause))]
      59              :     pub(crate) fn kill_and_wait_impl(mut child: Child, cause: WalRedoKillCause) {
      60          289 :         scopeguard::defer! {
      61          289 :             WAL_REDO_PROCESS_COUNTERS.killed_by_cause[cause].inc();
      62          289 :         }
      63              :         let res = child.kill();
      64              :         if let Err(e) = res {
      65              :             // This branch is very unlikely because:
      66              :             // - We (= pageserver) spawned this process successfully, so, we're allowed to kill it.
      67              :             // - This is the only place that calls .kill()
      68              :             // - We consume `self`, so, .kill() can't be called twice.
      69              :             // - If the process exited by itself or was killed by someone else,
      70              :             //   .kill() will still succeed because we haven't wait()'ed yet.
      71              :             //
      72              :             // So, if we arrive here, we have really no idea what happened,
      73              :             // whether the PID stored in self.child is still valid, etc.
      74              :             // If this function were fallible, we'd return an error, but
      75              :             // since it isn't, all we can do is log an error and proceed
      76              :             // with the wait().
      77            0 :             error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
      78              :         }
      79              : 
      80              :         match child.wait() {
      81              :             Ok(exit_status) => {
      82          289 :                 info!(exit_status = %exit_status, "wait successful");
      83              :             }
      84              :             Err(e) => {
      85            0 :                 error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)");
      86              :             }
      87              :         }
      88              :     }
      89              : }
      90              : 
      91              : impl Drop for NoLeakChild {
      92          289 :     fn drop(&mut self) {
      93          289 :         let child = match self.child.take() {
      94            0 :             Some(child) => child,
      95          289 :             None => return,
      96              :         };
      97            0 :         let tenant_shard_id = self.tenant_id;
      98            0 :         // Offload the kill+wait of the child process into the background.
      99            0 :         // If someone stops the runtime, we'll leak the child process.
     100            0 :         // We can ignore that case because we only stop the runtime on pageserver exit.
     101            0 :         tokio::runtime::Handle::current().spawn(async move {
     102            0 :             tokio::task::spawn_blocking(move || {
     103              :                 // Intentionally don't inherit the tracing context from whoever is dropping us.
     104              :                 // This thread here is going to outlive of our dropper.
     105            0 :                 let span = tracing::info_span!(
     106              :                     "walredo",
     107              :                     tenant_id = %tenant_shard_id.tenant_id,
     108            0 :                     shard_id = %tenant_shard_id.shard_slug()
     109              :                 );
     110            0 :                 let _entered = span.enter();
     111            0 :                 Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
     112            0 :             })
     113            0 :             .await
     114            0 :         });
     115          289 :     }
     116              : }
     117              : 
     118              : pub(crate) trait NoLeakChildCommandExt {
     119              :     fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild>;
     120              : }
     121              : 
     122              : impl NoLeakChildCommandExt for Command {
     123          604 :     fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild> {
     124          604 :         NoLeakChild::spawn(tenant_id, self)
     125          604 :     }
     126              : }
        

Generated by: LCOV version 2.1-beta