LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/io_engine - tokio_epoll_uring_ext.rs (source / functions) Coverage Total Hit
Test: 2e3a7638747e564a4f6d1af1cc0c3b3438fbb740.info Lines: 48.3 % 116 56
Test Date: 2024-11-20 01:36:58 Functions: 76.9 % 13 10

            Line data    Source code
       1              : //! Like [`::tokio_epoll_uring::thread_local_system()`], but with pageserver-specific
       2              : //! handling in case the instance can't launched.
       3              : //!
       4              : //! This is primarily necessary due to ENOMEM aka OutOfMemory errors during io_uring creation
       5              : //! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series.
       6              : //! See <https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391> for more details.
       7              : 
       8              : use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
       9              : use std::sync::Arc;
      10              : 
      11              : use tokio_util::sync::CancellationToken;
      12              : use tracing::{error, info, info_span, warn, Instrument};
      13              : use utils::backoff::{DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
      14              : 
      15              : use tokio_epoll_uring::{System, SystemHandle};
      16              : 
      17              : use crate::virtual_file::on_fatal_io_error;
      18              : 
      19              : use crate::metrics::tokio_epoll_uring::{self as metrics, THREAD_LOCAL_METRICS_STORAGE};
      20              : 
      21              : #[derive(Clone)]
      22              : struct ThreadLocalState(Arc<ThreadLocalStateInner>);
      23              : 
      24              : struct ThreadLocalStateInner {
      25              :     cell: tokio::sync::OnceCell<SystemHandle<metrics::ThreadLocalMetrics>>,
      26              :     launch_attempts: AtomicU32,
      27              :     /// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`]
      28              :     thread_local_state_id: u64,
      29              : }
      30              : 
      31              : impl Drop for ThreadLocalStateInner {
      32          102 :     fn drop(&mut self) {
      33          102 :         THREAD_LOCAL_METRICS_STORAGE.remove_system(self.thread_local_state_id);
      34          102 :     }
      35              : }
      36              : 
      37              : impl ThreadLocalState {
      38          362 :     pub fn new() -> Self {
      39          362 :         Self(Arc::new(ThreadLocalStateInner {
      40          362 :             cell: tokio::sync::OnceCell::default(),
      41          362 :             launch_attempts: AtomicU32::new(0),
      42          362 :             thread_local_state_id: THREAD_LOCAL_STATE_ID.fetch_add(1, Ordering::Relaxed),
      43          362 :         }))
      44          362 :     }
      45              : 
      46          362 :     pub fn make_id_string(&self) -> String {
      47          362 :         format!("{}", self.0.thread_local_state_id)
      48          362 :     }
      49              : }
      50              : 
      51              : static THREAD_LOCAL_STATE_ID: AtomicU64 = AtomicU64::new(0);
      52              : 
      53              : thread_local! {
      54              :     static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new();
      55              : }
      56              : 
      57              : /// Panics if we cannot [`System::launch`].
      58      1047313 : pub async fn thread_local_system() -> Handle {
      59      1047313 :     let fake_cancel = CancellationToken::new();
      60      1047313 :     loop {
      61      1047313 :         let thread_local_state = THREAD_LOCAL.with(|arc| arc.clone());
      62      1047313 :         let inner = &thread_local_state.0;
      63      1047313 :         let get_or_init_res = inner
      64      1047313 :             .cell
      65      1047313 :             .get_or_try_init(|| async {
      66          362 :                 let attempt_no = inner
      67          362 :                     .launch_attempts
      68          362 :                     .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      69          362 :                 let span = info_span!("tokio_epoll_uring_ext::thread_local_system", thread_local=%thread_local_state.make_id_string(), %attempt_no);
      70          362 :                 async {
      71          362 :                     // Rate-limit retries per thread-local.
      72          362 :                     // NB: doesn't yield to executor at attempt_no=0.
      73          362 :                     utils::backoff::exponential_backoff(
      74          362 :                         attempt_no,
      75          362 :                         DEFAULT_BASE_BACKOFF_SECONDS,
      76          362 :                         DEFAULT_MAX_BACKOFF_SECONDS,
      77          362 :                         &fake_cancel,
      78          362 :                     )
      79            0 :                     .await;
      80          362 :                     let per_system_metrics = metrics::THREAD_LOCAL_METRICS_STORAGE.register_system(inner.thread_local_state_id);
      81          362 :                     let res = System::launch_with_metrics(per_system_metrics)
      82              :                     // this might move us to another executor thread => loop outside the get_or_try_init, not inside it
      83          362 :                     .await;
      84            0 :                     match res {
      85          362 :                         Ok(system) => {
      86          362 :                             info!("successfully launched system");
      87          362 :                             metrics::THREAD_LOCAL_LAUNCH_SUCCESSES.inc();
      88          362 :                             Ok(system)
      89              :                         }
      90            0 :                         Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
      91            0 :                             warn!("not enough locked memory to tokio-epoll-uring, will retry");
      92            0 :                             info_span!("stats").in_scope(|| {
      93            0 :                                 emit_launch_failure_process_stats();
      94            0 :                             });
      95            0 :                             metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
      96            0 :                             metrics::THREAD_LOCAL_METRICS_STORAGE.remove_system(inner.thread_local_state_id);
      97            0 :                             Err(())
      98              :                         }
      99              :                         // abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
     100              :                         // This is equivalent to a fatal IO error.
     101            0 :                         Err(ref e @ tokio_epoll_uring::LaunchResult::IoUringBuild(ref inner)) => {
     102            0 :                             error!(error=%e, "failed to launch thread-local tokio-epoll-uring, this should not happen, aborting process");
     103            0 :                             info_span!("stats").in_scope(|| {
     104            0 :                                 emit_launch_failure_process_stats();
     105            0 :                             });
     106            0 :                             on_fatal_io_error(inner, "launch thread-local tokio-epoll-uring");
     107              :                         },
     108              :                     }
     109          362 :                 }
     110          362 :                 .instrument(span)
     111          362 :                 .await
     112      1047313 :             })
     113          362 :             .await;
     114      1047313 :         if get_or_init_res.is_ok() {
     115      1047313 :             return Handle(thread_local_state);
     116            0 :         }
     117              :     }
     118      1047313 : }
     119              : 
     120            0 : fn emit_launch_failure_process_stats() {
     121            0 :     // tokio-epoll-uring stats
     122            0 :     // vmlck + rlimit
     123            0 :     // number of threads
     124            0 :     // rss / system memory usage generally
     125            0 : 
     126            0 :     let tokio_epoll_uring::metrics::GlobalMetrics {
     127            0 :         systems_created,
     128            0 :         systems_destroyed,
     129            0 :     } = tokio_epoll_uring::metrics::global();
     130            0 :     info!(systems_created, systems_destroyed, "tokio-epoll-uring");
     131              : 
     132            0 :     match procfs::process::Process::myself() {
     133            0 :         Ok(myself) => {
     134            0 :             match myself.limits() {
     135            0 :                 Ok(limits) => {
     136            0 :                     info!(?limits.max_locked_memory, "/proc/self/limits");
     137              :                 }
     138            0 :                 Err(error) => {
     139            0 :                     info!(%error, "no limit stats due to error");
     140              :                 }
     141              :             }
     142              : 
     143            0 :             match myself.status() {
     144            0 :                 Ok(status) => {
     145            0 :                     let procfs::process::Status {
     146            0 :                         vmsize,
     147            0 :                         vmlck,
     148            0 :                         vmpin,
     149            0 :                         vmrss,
     150            0 :                         rssanon,
     151            0 :                         rssfile,
     152            0 :                         rssshmem,
     153            0 :                         vmdata,
     154            0 :                         vmstk,
     155            0 :                         vmexe,
     156            0 :                         vmlib,
     157            0 :                         vmpte,
     158            0 :                         threads,
     159            0 :                         ..
     160            0 :                     } = status;
     161            0 :                     info!(
     162              :                         vmsize,
     163              :                         vmlck,
     164              :                         vmpin,
     165              :                         vmrss,
     166              :                         rssanon,
     167              :                         rssfile,
     168              :                         rssshmem,
     169              :                         vmdata,
     170              :                         vmstk,
     171              :                         vmexe,
     172              :                         vmlib,
     173              :                         vmpte,
     174              :                         threads,
     175            0 :                         "/proc/self/status"
     176              :                     );
     177              :                 }
     178            0 :                 Err(error) => {
     179            0 :                     info!(%error, "no status status due to error");
     180              :                 }
     181              :             }
     182              :         }
     183            0 :         Err(error) => {
     184            0 :             info!(%error, "no process stats due to error");
     185              :         }
     186              :     };
     187            0 : }
     188              : 
     189              : #[derive(Clone)]
     190              : pub struct Handle(ThreadLocalState);
     191              : 
     192              : impl std::ops::Deref for Handle {
     193              :     type Target = SystemHandle<metrics::ThreadLocalMetrics>;
     194              : 
     195      1047313 :     fn deref(&self) -> &Self::Target {
     196      1047313 :         self.0
     197      1047313 :              .0
     198      1047313 :             .cell
     199      1047313 :             .get()
     200      1047313 :             .expect("must be already initialized when using this")
     201      1047313 :     }
     202              : }
        

Generated by: LCOV version 2.1-beta