Line data Source code
1 : use tracing;
2 : use tracing::error;
3 : use tracing::info;
4 : use tracing::instrument;
5 :
6 : use crate::metrics::WalRedoKillCause;
7 : use crate::metrics::WAL_REDO_PROCESS_COUNTERS;
8 :
9 : use std::io;
10 : use std::process::Command;
11 :
12 : use std::ops::DerefMut;
13 :
14 : use std::ops::Deref;
15 :
16 : use std::process::Child;
17 :
18 : use pageserver_api::shard::TenantShardId;
19 :
20 : /// Wrapper type around `std::process::Child` which guarantees that the child
21 : /// will be killed and waited-for by this process before being dropped.
22 : pub(crate) struct NoLeakChild {
23 : pub(crate) tenant_id: TenantShardId,
24 : pub(crate) child: Option<Child>,
25 : }
26 :
27 : impl Deref for NoLeakChild {
28 : type Target = Child;
29 :
30 2858010 : fn deref(&self) -> &Self::Target {
31 2858010 : self.child.as_ref().expect("must not use from drop")
32 2858010 : }
33 : }
34 :
35 : impl DerefMut for NoLeakChild {
36 1812 : fn deref_mut(&mut self) -> &mut Self::Target {
37 1812 : self.child.as_mut().expect("must not use from drop")
38 1812 : }
39 : }
40 :
41 : impl NoLeakChild {
42 604 : pub(crate) fn spawn(tenant_id: TenantShardId, command: &mut Command) -> io::Result<Self> {
43 604 : let child = command.spawn()?;
44 604 : Ok(NoLeakChild {
45 604 : tenant_id,
46 604 : child: Some(child),
47 604 : })
48 604 : }
49 :
50 289 : pub(crate) fn kill_and_wait(mut self, cause: WalRedoKillCause) {
51 289 : let child = match self.child.take() {
52 289 : Some(child) => child,
53 0 : None => return,
54 : };
55 289 : Self::kill_and_wait_impl(child, cause);
56 289 : }
57 :
58 289 : #[instrument(skip_all, fields(pid=child.id(), ?cause))]
59 : pub(crate) fn kill_and_wait_impl(mut child: Child, cause: WalRedoKillCause) {
60 289 : scopeguard::defer! {
61 289 : WAL_REDO_PROCESS_COUNTERS.killed_by_cause[cause].inc();
62 289 : }
63 : let res = child.kill();
64 : if let Err(e) = res {
65 : // This branch is very unlikely because:
66 : // - We (= pageserver) spawned this process successfully, so, we're allowed to kill it.
67 : // - This is the only place that calls .kill()
68 : // - We consume `self`, so, .kill() can't be called twice.
69 : // - If the process exited by itself or was killed by someone else,
70 : // .kill() will still succeed because we haven't wait()'ed yet.
71 : //
72 : // So, if we arrive here, we have really no idea what happened,
73 : // whether the PID stored in self.child is still valid, etc.
74 : // If this function were fallible, we'd return an error, but
75 : // since it isn't, all we can do is log an error and proceed
76 : // with the wait().
77 0 : error!(error = %e, "failed to SIGKILL; subsequent wait() might fail or wait for wrong process");
78 : }
79 :
80 : match child.wait() {
81 : Ok(exit_status) => {
82 289 : info!(exit_status = %exit_status, "wait successful");
83 : }
84 : Err(e) => {
85 0 : error!(error = %e, "wait error; might leak the child process; it will show as zombie (defunct)");
86 : }
87 : }
88 : }
89 : }
90 :
91 : impl Drop for NoLeakChild {
92 289 : fn drop(&mut self) {
93 289 : let child = match self.child.take() {
94 0 : Some(child) => child,
95 289 : None => return,
96 : };
97 0 : let tenant_shard_id = self.tenant_id;
98 0 : // Offload the kill+wait of the child process into the background.
99 0 : // If someone stops the runtime, we'll leak the child process.
100 0 : // We can ignore that case because we only stop the runtime on pageserver exit.
101 0 : tokio::runtime::Handle::current().spawn(async move {
102 0 : tokio::task::spawn_blocking(move || {
103 : // Intentionally don't inherit the tracing context from whoever is dropping us.
104 : // This thread here is going to outlive of our dropper.
105 0 : let span = tracing::info_span!(
106 : "walredo",
107 : tenant_id = %tenant_shard_id.tenant_id,
108 0 : shard_id = %tenant_shard_id.shard_slug()
109 : );
110 0 : let _entered = span.enter();
111 0 : Self::kill_and_wait_impl(child, WalRedoKillCause::NoLeakChildDrop);
112 0 : })
113 0 : .await
114 0 : });
115 289 : }
116 : }
117 :
118 : pub(crate) trait NoLeakChildCommandExt {
119 : fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild>;
120 : }
121 :
122 : impl NoLeakChildCommandExt for Command {
123 604 : fn spawn_no_leak_child(&mut self, tenant_id: TenantShardId) -> io::Result<NoLeakChild> {
124 604 : NoLeakChild::spawn(tenant_id, self)
125 604 : }
126 : }
|