Line data Source code
1 : //! Like [`::tokio_epoll_uring::thread_local_system()`], but with pageserver-specific
2 : //! handling in case the instance can't launched.
3 : //!
4 : //! This is primarily necessary due to ENOMEM aka OutOfMemory errors during io_uring creation
5 : //! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series.
6 : //! See <https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391> for more details.
7 :
8 : use std::sync::Arc;
9 : use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
10 :
11 : use tokio_epoll_uring::{System, SystemHandle};
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{Instrument, error, info, info_span, warn};
14 : use utils::backoff::{DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
15 :
16 : use crate::metrics::tokio_epoll_uring::{self as metrics, THREAD_LOCAL_METRICS_STORAGE};
17 : use crate::virtual_file::on_fatal_io_error;
18 :
19 : #[derive(Clone)]
20 : struct ThreadLocalState(Arc<ThreadLocalStateInner>);
21 :
22 : struct ThreadLocalStateInner {
23 : cell: tokio::sync::OnceCell<SystemHandle<metrics::ThreadLocalMetrics>>,
24 : launch_attempts: AtomicU32,
25 : /// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`]
26 : thread_local_state_id: u64,
27 : }
28 :
29 : impl Drop for ThreadLocalStateInner {
30 238 : fn drop(&mut self) {
31 238 : THREAD_LOCAL_METRICS_STORAGE.remove_system(self.thread_local_state_id);
32 238 : }
33 : }
34 :
35 : impl ThreadLocalState {
36 1100 : pub fn new() -> Self {
37 1100 : Self(Arc::new(ThreadLocalStateInner {
38 1100 : cell: tokio::sync::OnceCell::default(),
39 1100 : launch_attempts: AtomicU32::new(0),
40 1100 : thread_local_state_id: THREAD_LOCAL_STATE_ID.fetch_add(1, Ordering::Relaxed),
41 1100 : }))
42 1100 : }
43 :
44 1100 : pub fn make_id_string(&self) -> String {
45 1100 : format!("{}", self.0.thread_local_state_id)
46 1100 : }
47 : }
48 :
49 : static THREAD_LOCAL_STATE_ID: AtomicU64 = AtomicU64::new(0);
50 :
51 : thread_local! {
52 : static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new();
53 : }
54 :
55 : /// Panics if we cannot [`System::launch`].
56 1819258 : pub async fn thread_local_system() -> Handle {
57 1819258 : let fake_cancel = CancellationToken::new();
58 1819258 : loop {
59 1819258 : let thread_local_state = THREAD_LOCAL.with(|arc| arc.clone());
60 1819258 : let inner = &thread_local_state.0;
61 1819258 : let get_or_init_res = inner
62 1819258 : .cell
63 1819258 : .get_or_try_init(|| async {
64 1100 : let attempt_no = inner
65 1100 : .launch_attempts
66 1100 : .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
67 1100 : let span = info_span!("tokio_epoll_uring_ext::thread_local_system", thread_local=%thread_local_state.make_id_string(), %attempt_no);
68 1100 : async {
69 1100 : // Rate-limit retries per thread-local.
70 1100 : // NB: doesn't yield to executor at attempt_no=0.
71 1100 : utils::backoff::exponential_backoff(
72 1100 : attempt_no,
73 1100 : DEFAULT_BASE_BACKOFF_SECONDS,
74 1100 : DEFAULT_MAX_BACKOFF_SECONDS,
75 1100 : &fake_cancel,
76 1100 : )
77 1100 : .await;
78 1100 : let per_system_metrics = metrics::THREAD_LOCAL_METRICS_STORAGE.register_system(inner.thread_local_state_id);
79 1100 : let res = System::launch_with_metrics(per_system_metrics)
80 1100 : // this might move us to another executor thread => loop outside the get_or_try_init, not inside it
81 1100 : .await;
82 0 : match res {
83 1100 : Ok(system) => {
84 1100 : info!("successfully launched system");
85 1100 : metrics::THREAD_LOCAL_LAUNCH_SUCCESSES.inc();
86 1100 : Ok(system)
87 : }
88 0 : Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
89 0 : warn!("not enough locked memory to tokio-epoll-uring, will retry");
90 0 : info_span!("stats").in_scope(|| {
91 0 : emit_launch_failure_process_stats();
92 0 : });
93 0 : metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
94 0 : metrics::THREAD_LOCAL_METRICS_STORAGE.remove_system(inner.thread_local_state_id);
95 0 : Err(())
96 : }
97 : // abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
98 : // This is equivalent to a fatal IO error.
99 0 : Err(ref e @ tokio_epoll_uring::LaunchResult::IoUringBuild(ref inner)) => {
100 0 : error!(error=%e, "failed to launch thread-local tokio-epoll-uring, this should not happen, aborting process");
101 0 : info_span!("stats").in_scope(|| {
102 0 : emit_launch_failure_process_stats();
103 0 : });
104 0 : on_fatal_io_error(inner, "launch thread-local tokio-epoll-uring");
105 : },
106 : }
107 1100 : }
108 1100 : .instrument(span)
109 1100 : .await
110 1819258 : })
111 1819258 : .await;
112 1819258 : if get_or_init_res.is_ok() {
113 1819258 : return Handle(thread_local_state);
114 0 : }
115 : }
116 1819258 : }
117 :
118 0 : fn emit_launch_failure_process_stats() {
119 0 : // tokio-epoll-uring stats
120 0 : // vmlck + rlimit
121 0 : // number of threads
122 0 : // rss / system memory usage generally
123 0 :
124 0 : let tokio_epoll_uring::metrics::GlobalMetrics {
125 0 : systems_created,
126 0 : systems_destroyed,
127 0 : } = tokio_epoll_uring::metrics::global();
128 0 : info!(systems_created, systems_destroyed, "tokio-epoll-uring");
129 :
130 0 : match procfs::process::Process::myself() {
131 0 : Ok(myself) => {
132 0 : match myself.limits() {
133 0 : Ok(limits) => {
134 0 : info!(?limits.max_locked_memory, "/proc/self/limits");
135 : }
136 0 : Err(error) => {
137 0 : info!(%error, "no limit stats due to error");
138 : }
139 : }
140 :
141 0 : match myself.status() {
142 0 : Ok(status) => {
143 0 : let procfs::process::Status {
144 0 : vmsize,
145 0 : vmlck,
146 0 : vmpin,
147 0 : vmrss,
148 0 : rssanon,
149 0 : rssfile,
150 0 : rssshmem,
151 0 : vmdata,
152 0 : vmstk,
153 0 : vmexe,
154 0 : vmlib,
155 0 : vmpte,
156 0 : threads,
157 0 : ..
158 0 : } = status;
159 0 : info!(
160 : vmsize,
161 : vmlck,
162 : vmpin,
163 : vmrss,
164 : rssanon,
165 : rssfile,
166 : rssshmem,
167 : vmdata,
168 : vmstk,
169 : vmexe,
170 : vmlib,
171 : vmpte,
172 : threads,
173 0 : "/proc/self/status"
174 : );
175 : }
176 0 : Err(error) => {
177 0 : info!(%error, "no status status due to error");
178 : }
179 : }
180 : }
181 0 : Err(error) => {
182 0 : info!(%error, "no process stats due to error");
183 : }
184 : };
185 0 : }
186 :
187 : #[derive(Clone)]
188 : pub struct Handle(ThreadLocalState);
189 :
190 : impl std::ops::Deref for Handle {
191 : type Target = SystemHandle<metrics::ThreadLocalMetrics>;
192 :
193 1819258 : fn deref(&self) -> &Self::Target {
194 1819258 : self.0
195 1819258 : .0
196 1819258 : .cell
197 1819258 : .get()
198 1819258 : .expect("must be already initialized when using this")
199 1819258 : }
200 : }
|