Line data Source code
1 : //! Like [`::tokio_epoll_uring::thread_local_system()`], but with pageserver-specific
2 : //! handling in case the instance can't launched.
3 : //!
4 : //! This is primarily necessary due to ENOMEM aka OutOfMemory errors during io_uring creation
5 : //! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series.
6 : //! See <https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391> for more details.
7 :
8 : use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
9 : use std::sync::Arc;
10 :
11 : use tokio_util::sync::CancellationToken;
12 : use tracing::{error, info, info_span, warn, Instrument};
13 : use utils::backoff::{DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
14 :
15 : use tokio_epoll_uring::{System, SystemHandle};
16 :
17 : use crate::virtual_file::on_fatal_io_error;
18 :
19 : use crate::metrics::tokio_epoll_uring as metrics;
20 :
21 : #[derive(Clone)]
22 : struct ThreadLocalState(Arc<ThreadLocalStateInner>);
23 :
24 : struct ThreadLocalStateInner {
25 : cell: tokio::sync::OnceCell<SystemHandle>,
26 : launch_attempts: AtomicU32,
27 : /// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`]
28 : thread_local_state_id: u64,
29 : }
30 :
31 : impl ThreadLocalState {
32 1156 : pub fn new() -> Self {
33 1156 : Self(Arc::new(ThreadLocalStateInner {
34 1156 : cell: tokio::sync::OnceCell::default(),
35 1156 : launch_attempts: AtomicU32::new(0),
36 1156 : thread_local_state_id: THREAD_LOCAL_STATE_ID.fetch_add(1, Ordering::Relaxed),
37 1156 : }))
38 1156 : }
39 :
40 1156 : pub fn make_id_string(&self) -> String {
41 1156 : format!("{}", self.0.thread_local_state_id)
42 1156 : }
43 : }
44 :
45 : static THREAD_LOCAL_STATE_ID: AtomicU64 = AtomicU64::new(0);
46 :
47 : thread_local! {
48 : static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new();
49 : }
50 :
51 : /// Panics if we cannot [`System::launch`].
52 3142343 : pub async fn thread_local_system() -> Handle {
53 3142343 : let fake_cancel = CancellationToken::new();
54 3142343 : loop {
55 3142343 : let thread_local_state = THREAD_LOCAL.with(|arc| arc.clone());
56 3142343 : let inner = &thread_local_state.0;
57 3142343 : let get_or_init_res = inner
58 3142343 : .cell
59 3142343 : .get_or_try_init(|| async {
60 1156 : let attempt_no = inner
61 1156 : .launch_attempts
62 1156 : .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
63 1156 : let span = info_span!("tokio_epoll_uring_ext::thread_local_system", thread_local=%thread_local_state.make_id_string(), %attempt_no);
64 1156 : async {
65 1156 : // Rate-limit retries per thread-local.
66 1156 : // NB: doesn't yield to executor at attempt_no=0.
67 1156 : utils::backoff::exponential_backoff(
68 1156 : attempt_no,
69 1156 : DEFAULT_BASE_BACKOFF_SECONDS,
70 1156 : DEFAULT_MAX_BACKOFF_SECONDS,
71 1156 : &fake_cancel,
72 1156 : )
73 0 : .await;
74 1156 : let res = System::launch()
75 : // this might move us to another executor thread => loop outside the get_or_try_init, not inside it
76 1156 : .await;
77 0 : match res {
78 1156 : Ok(system) => {
79 1156 : info!("successfully launched system");
80 1156 : metrics::THREAD_LOCAL_LAUNCH_SUCCESSES.inc();
81 1156 : Ok(system)
82 : }
83 0 : Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
84 0 : warn!("not enough locked memory to tokio-epoll-uring, will retry");
85 0 : info_span!("stats").in_scope(|| {
86 0 : emit_launch_failure_process_stats();
87 0 : });
88 0 : metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
89 0 : Err(())
90 : }
91 : // abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
92 : // This is equivalent to a fatal IO error.
93 0 : Err(ref e @ tokio_epoll_uring::LaunchResult::IoUringBuild(ref inner)) => {
94 0 : error!(error=%e, "failed to launch thread-local tokio-epoll-uring, this should not happen, aborting process");
95 0 : info_span!("stats").in_scope(|| {
96 0 : emit_launch_failure_process_stats();
97 0 : });
98 0 : on_fatal_io_error(inner, "launch thread-local tokio-epoll-uring");
99 : },
100 : }
101 1156 : }
102 1156 : .instrument(span)
103 1156 : .await
104 3142343 : })
105 1156 : .await;
106 3142343 : if get_or_init_res.is_ok() {
107 3142343 : return Handle(thread_local_state);
108 0 : }
109 : }
110 3142343 : }
111 :
112 0 : fn emit_launch_failure_process_stats() {
113 0 : // tokio-epoll-uring stats
114 0 : // vmlck + rlimit
115 0 : // number of threads
116 0 : // rss / system memory usage generally
117 0 :
118 0 : let tokio_epoll_uring::metrics::Metrics {
119 0 : systems_created,
120 0 : systems_destroyed,
121 0 : } = tokio_epoll_uring::metrics::global();
122 0 : info!(systems_created, systems_destroyed, "tokio-epoll-uring");
123 :
124 0 : match procfs::process::Process::myself() {
125 0 : Ok(myself) => {
126 0 : match myself.limits() {
127 0 : Ok(limits) => {
128 0 : info!(?limits.max_locked_memory, "/proc/self/limits");
129 : }
130 0 : Err(error) => {
131 0 : info!(%error, "no limit stats due to error");
132 : }
133 : }
134 :
135 0 : match myself.status() {
136 0 : Ok(status) => {
137 0 : let procfs::process::Status {
138 0 : vmsize,
139 0 : vmlck,
140 0 : vmpin,
141 0 : vmrss,
142 0 : rssanon,
143 0 : rssfile,
144 0 : rssshmem,
145 0 : vmdata,
146 0 : vmstk,
147 0 : vmexe,
148 0 : vmlib,
149 0 : vmpte,
150 0 : threads,
151 0 : ..
152 0 : } = status;
153 0 : info!(
154 : vmsize,
155 : vmlck,
156 : vmpin,
157 : vmrss,
158 : rssanon,
159 : rssfile,
160 : rssshmem,
161 : vmdata,
162 : vmstk,
163 : vmexe,
164 : vmlib,
165 : vmpte,
166 : threads,
167 0 : "/proc/self/status"
168 : );
169 : }
170 0 : Err(error) => {
171 0 : info!(%error, "no status status due to error");
172 : }
173 : }
174 : }
175 0 : Err(error) => {
176 0 : info!(%error, "no process stats due to error");
177 : }
178 : };
179 0 : }
180 :
181 : #[derive(Clone)]
182 : pub struct Handle(ThreadLocalState);
183 :
184 : impl std::ops::Deref for Handle {
185 : type Target = SystemHandle;
186 :
187 3142343 : fn deref(&self) -> &Self::Target {
188 3142343 : self.0
189 3142343 : .0
190 3142343 : .cell
191 3142343 : .get()
192 3142343 : .expect("must be already initialized when using this")
193 3142343 : }
194 : }
|