Line data Source code
1 : //! Like [`::tokio_epoll_uring::thread_local_system()`], but with pageserver-specific
2 : //! handling in case the instance can't launched.
3 : //!
4 : //! This is primarily necessary due to ENOMEM aka OutOfMemory errors during io_uring creation
5 : //! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series.
6 : //! See <https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391> for more details.
7 :
8 : use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
9 : use std::sync::Arc;
10 :
11 : use tokio_util::sync::CancellationToken;
12 : use tracing::{error, info, info_span, warn, Instrument};
13 : use utils::backoff::{DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
14 :
15 : use tokio_epoll_uring::{System, SystemHandle};
16 :
17 : use crate::virtual_file::on_fatal_io_error;
18 :
19 : use crate::metrics::tokio_epoll_uring::{self as metrics, THREAD_LOCAL_METRICS_STORAGE};
20 :
21 : #[derive(Clone)]
22 : struct ThreadLocalState(Arc<ThreadLocalStateInner>);
23 :
24 : struct ThreadLocalStateInner {
25 : cell: tokio::sync::OnceCell<SystemHandle<metrics::ThreadLocalMetrics>>,
26 : launch_attempts: AtomicU32,
27 : /// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`]
28 : thread_local_state_id: u64,
29 : }
30 :
31 : impl Drop for ThreadLocalStateInner {
32 102 : fn drop(&mut self) {
33 102 : THREAD_LOCAL_METRICS_STORAGE.remove_system(self.thread_local_state_id);
34 102 : }
35 : }
36 :
37 : impl ThreadLocalState {
38 367 : pub fn new() -> Self {
39 367 : Self(Arc::new(ThreadLocalStateInner {
40 367 : cell: tokio::sync::OnceCell::default(),
41 367 : launch_attempts: AtomicU32::new(0),
42 367 : thread_local_state_id: THREAD_LOCAL_STATE_ID.fetch_add(1, Ordering::Relaxed),
43 367 : }))
44 367 : }
45 :
46 367 : pub fn make_id_string(&self) -> String {
47 367 : format!("{}", self.0.thread_local_state_id)
48 367 : }
49 : }
50 :
51 : static THREAD_LOCAL_STATE_ID: AtomicU64 = AtomicU64::new(0);
52 :
53 : thread_local! {
54 : static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new();
55 : }
56 :
57 : /// Panics if we cannot [`System::launch`].
58 1047705 : pub async fn thread_local_system() -> Handle {
59 1047705 : let fake_cancel = CancellationToken::new();
60 1047705 : loop {
61 1047705 : let thread_local_state = THREAD_LOCAL.with(|arc| arc.clone());
62 1047705 : let inner = &thread_local_state.0;
63 1047705 : let get_or_init_res = inner
64 1047705 : .cell
65 1047705 : .get_or_try_init(|| async {
66 367 : let attempt_no = inner
67 367 : .launch_attempts
68 367 : .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
69 367 : let span = info_span!("tokio_epoll_uring_ext::thread_local_system", thread_local=%thread_local_state.make_id_string(), %attempt_no);
70 367 : async {
71 367 : // Rate-limit retries per thread-local.
72 367 : // NB: doesn't yield to executor at attempt_no=0.
73 367 : utils::backoff::exponential_backoff(
74 367 : attempt_no,
75 367 : DEFAULT_BASE_BACKOFF_SECONDS,
76 367 : DEFAULT_MAX_BACKOFF_SECONDS,
77 367 : &fake_cancel,
78 367 : )
79 0 : .await;
80 367 : let per_system_metrics = metrics::THREAD_LOCAL_METRICS_STORAGE.register_system(inner.thread_local_state_id);
81 367 : let res = System::launch_with_metrics(per_system_metrics)
82 : // this might move us to another executor thread => loop outside the get_or_try_init, not inside it
83 367 : .await;
84 0 : match res {
85 367 : Ok(system) => {
86 367 : info!("successfully launched system");
87 367 : metrics::THREAD_LOCAL_LAUNCH_SUCCESSES.inc();
88 367 : Ok(system)
89 : }
90 0 : Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
91 0 : warn!("not enough locked memory to tokio-epoll-uring, will retry");
92 0 : info_span!("stats").in_scope(|| {
93 0 : emit_launch_failure_process_stats();
94 0 : });
95 0 : metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
96 0 : metrics::THREAD_LOCAL_METRICS_STORAGE.remove_system(inner.thread_local_state_id);
97 0 : Err(())
98 : }
99 : // abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
100 : // This is equivalent to a fatal IO error.
101 0 : Err(ref e @ tokio_epoll_uring::LaunchResult::IoUringBuild(ref inner)) => {
102 0 : error!(error=%e, "failed to launch thread-local tokio-epoll-uring, this should not happen, aborting process");
103 0 : info_span!("stats").in_scope(|| {
104 0 : emit_launch_failure_process_stats();
105 0 : });
106 0 : on_fatal_io_error(inner, "launch thread-local tokio-epoll-uring");
107 : },
108 : }
109 367 : }
110 367 : .instrument(span)
111 367 : .await
112 1047705 : })
113 367 : .await;
114 1047705 : if get_or_init_res.is_ok() {
115 1047705 : return Handle(thread_local_state);
116 0 : }
117 : }
118 1047705 : }
119 :
120 0 : fn emit_launch_failure_process_stats() {
121 0 : // tokio-epoll-uring stats
122 0 : // vmlck + rlimit
123 0 : // number of threads
124 0 : // rss / system memory usage generally
125 0 :
126 0 : let tokio_epoll_uring::metrics::GlobalMetrics {
127 0 : systems_created,
128 0 : systems_destroyed,
129 0 : } = tokio_epoll_uring::metrics::global();
130 0 : info!(systems_created, systems_destroyed, "tokio-epoll-uring");
131 :
132 0 : match procfs::process::Process::myself() {
133 0 : Ok(myself) => {
134 0 : match myself.limits() {
135 0 : Ok(limits) => {
136 0 : info!(?limits.max_locked_memory, "/proc/self/limits");
137 : }
138 0 : Err(error) => {
139 0 : info!(%error, "no limit stats due to error");
140 : }
141 : }
142 :
143 0 : match myself.status() {
144 0 : Ok(status) => {
145 0 : let procfs::process::Status {
146 0 : vmsize,
147 0 : vmlck,
148 0 : vmpin,
149 0 : vmrss,
150 0 : rssanon,
151 0 : rssfile,
152 0 : rssshmem,
153 0 : vmdata,
154 0 : vmstk,
155 0 : vmexe,
156 0 : vmlib,
157 0 : vmpte,
158 0 : threads,
159 0 : ..
160 0 : } = status;
161 0 : info!(
162 : vmsize,
163 : vmlck,
164 : vmpin,
165 : vmrss,
166 : rssanon,
167 : rssfile,
168 : rssshmem,
169 : vmdata,
170 : vmstk,
171 : vmexe,
172 : vmlib,
173 : vmpte,
174 : threads,
175 0 : "/proc/self/status"
176 : );
177 : }
178 0 : Err(error) => {
179 0 : info!(%error, "no status status due to error");
180 : }
181 : }
182 : }
183 0 : Err(error) => {
184 0 : info!(%error, "no process stats due to error");
185 : }
186 : };
187 0 : }
188 :
189 : #[derive(Clone)]
190 : pub struct Handle(ThreadLocalState);
191 :
192 : impl std::ops::Deref for Handle {
193 : type Target = SystemHandle<metrics::ThreadLocalMetrics>;
194 :
195 1047705 : fn deref(&self) -> &Self::Target {
196 1047705 : self.0
197 1047705 : .0
198 1047705 : .cell
199 1047705 : .get()
200 1047705 : .expect("must be already initialized when using this")
201 1047705 : }
202 : }
|