Line data Source code
1 : use std::io;
2 : use std::ops::{Deref, DerefMut};
3 : use std::process::{Child, Command};
4 :
5 : use pageserver_api::shard::TenantShardId;
6 : use tracing::{error, info, instrument};
7 :
8 : use crate::metrics::{WAL_REDO_PROCESS_COUNTERS, WalRedoKillCause};
9 :
10 : /// Wrapper type around `std::process::Child` which guarantees that the child
11 : /// will be killed and waited-for by this process before being dropped.
12 : pub(crate) struct NoLeakChild {
13 : pub(crate) tenant_id: TenantShardId,
14 : pub(crate) child: Option<Child>,
15 : }
16 :
17 : impl Deref for NoLeakChild {
18 : type Target = Child;
19 :
20 56 : fn deref(&self) -> &Self::Target {
21 56 : self.child.as_ref().expect("must not use from drop")
22 56 : }
23 : }
24 :
25 : impl DerefMut for NoLeakChild {
26 60 : fn deref_mut(&mut self) -> &mut Self::Target {
27 60 : self.child.as_mut().expect("must not use from drop")
28 60 : }
29 : }
30 :
31 : impl NoLeakChild {
32 20 : pub(crate) fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result<Self> {
33 20 : let child = command.spawn()?;
34 20 : Ok(NoLeakChild {
35 20 : tenant_id,
36 20 : child: Some(child),
37 20 : })
38 20 : }
39 :
40 20 : pub(crate) fn kill_and_wait(mut self, cause: WalRedoKillCause) {
41 20 : let child = match self.child.take() {
42 20 : Some(child) => child,
43 0 : None => return,
44 : };
45 20 : Self::kill_and_wait_impl(child, cause);
46 20 : }
47 :
48 : #[instrument(skip_all, fields(pid=child.id(), ?cause))]
49 : pub(crate) fn kill_and_wait_impl(mut child: Child, cause: WalRedoKillCause) {
50 : scopeguard::defer! {
51 : WAL_REDO_PROCESS_COUNTERS.killed_by_cause[cause].inc();
52 : }
53 : let res = child.kill();
54 : if let Err(e) = res {
55 : // This branch is very unlikely because:
56 : // - We (= pageserver) spawned this process successfully, so, we're allowed to kill it.
57 : // - This is the only place that calls .kill()
58 : // - We consume `self`, so, .kill() can't be called twice.
59 : // - If the process exited by itself or was killed by someone else,
60 : // .kill() will still succeed because we haven't wait()'ed yet.
61 : //
62 : // So, if we arrive here, we have really no idea what happened,
63 : // whether the PID stored in self.child is still valid, etc.
64 : // If this function were fallible, we'd return an error, but
65 : // since it isn't, all we can do is log an error and proceed
66 : // with the wait().
67 : error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
68 : }
69 :
70 : match child.wait() {
71 : Ok(exit_status) => {
72 : info!(exit_status = %exit_status, "wait successful");
73 : }
74 : Err(e) => {
75 : error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)");
76 : }
77 : }
78 : }
79 : }
80 :
81 : impl Drop for NoLeakChild {
82 20 : fn drop(&mut self) {
83 20 : let child = match self.child.take() {
84 0 : Some(child) => child,
85 20 : None => return,
86 : };
87 0 : let tenant_shard_id = self.tenant_id;
88 0 : // Offload the kill+wait of the child process into the background.
89 0 : // If someone stops the runtime, we'll leak the child process.
90 0 : // We can ignore that case because we only stop the runtime on pageserver exit.
91 0 : tokio::runtime::Handle::current().spawn(async move {
92 0 : tokio::task::spawn_blocking(move || {
93 : // Intentionally don't inherit the tracing context from whoever is dropping us.
94 : // This thread here is going to outlive of our dropper.
95 0 : let span = tracing::info_span!(
96 : "walredo",
97 : tenant_id = %tenant_shard_id.tenant_id,
98 0 : shard_id = %tenant_shard_id.shard_slug()
99 : );
100 0 : let _entered = span.enter();
101 0 : Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
102 0 : })
103 0 : .await
104 0 : });
105 20 : }
106 : }
107 :
108 : pub(crate) trait NoLeakChildCommandExt {
109 : fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild>;
110 : }
111 :
112 : impl NoLeakChildCommandExt for Command {
113 20 : fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild> {
114 20 : NoLeakChild::spawn(tenant_id, self)
115 20 : }
116 : }
|