LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/io_engine - tokio_epoll_uring_ext.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 49.6 % 117 58
Test Date: 2025-03-12 00:01:28 Functions: 76.9 % 13 10

            Line data    Source code
       1              : //! Like [`::tokio_epoll_uring::thread_local_system()`], but with pageserver-specific
       2              : //! handling in case the instance can't launched.
       3              : //!
       4              : //! This is primarily necessary due to ENOMEM aka OutOfMemory errors during io_uring creation
       5              : //! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series.
       6              : //! See <https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391> for more details.
       7              : 
       8              : use std::sync::Arc;
       9              : use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
      10              : 
      11              : use tokio_epoll_uring::{System, SystemHandle};
      12              : use tokio_util::sync::CancellationToken;
      13              : use tracing::{Instrument, error, info, info_span, warn};
      14              : use utils::backoff::{DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
      15              : 
      16              : use crate::metrics::tokio_epoll_uring::{self as metrics, THREAD_LOCAL_METRICS_STORAGE};
      17              : use crate::virtual_file::on_fatal_io_error;
      18              : 
      19              : #[derive(Clone)]
      20              : struct ThreadLocalState(Arc<ThreadLocalStateInner>);
      21              : 
      22              : struct ThreadLocalStateInner {
      23              :     cell: tokio::sync::OnceCell<SystemHandle<metrics::ThreadLocalMetrics>>,
      24              :     launch_attempts: AtomicU32,
      25              :     /// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`]
      26              :     thread_local_state_id: u64,
      27              : }
      28              : 
      29              : impl Drop for ThreadLocalStateInner {
      30          238 :     fn drop(&mut self) {
      31          238 :         THREAD_LOCAL_METRICS_STORAGE.remove_system(self.thread_local_state_id);
      32          238 :     }
      33              : }
      34              : 
      35              : impl ThreadLocalState {
      36         1100 :     pub fn new() -> Self {
      37         1100 :         Self(Arc::new(ThreadLocalStateInner {
      38         1100 :             cell: tokio::sync::OnceCell::default(),
      39         1100 :             launch_attempts: AtomicU32::new(0),
      40         1100 :             thread_local_state_id: THREAD_LOCAL_STATE_ID.fetch_add(1, Ordering::Relaxed),
      41         1100 :         }))
      42         1100 :     }
      43              : 
      44         1100 :     pub fn make_id_string(&self) -> String {
      45         1100 :         format!("{}", self.0.thread_local_state_id)
      46         1100 :     }
      47              : }
      48              : 
      49              : static THREAD_LOCAL_STATE_ID: AtomicU64 = AtomicU64::new(0);
      50              : 
      51              : thread_local! {
      52              :     static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new();
      53              : }
      54              : 
      55              : /// Panics if we cannot [`System::launch`].
      56      1819258 : pub async fn thread_local_system() -> Handle {
      57      1819258 :     let fake_cancel = CancellationToken::new();
      58      1819258 :     loop {
      59      1819258 :         let thread_local_state = THREAD_LOCAL.with(|arc| arc.clone());
      60      1819258 :         let inner = &thread_local_state.0;
      61      1819258 :         let get_or_init_res = inner
      62      1819258 :             .cell
      63      1819258 :             .get_or_try_init(|| async {
      64         1100 :                 let attempt_no = inner
      65         1100 :                     .launch_attempts
      66         1100 :                     .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      67         1100 :                 let span = info_span!("tokio_epoll_uring_ext::thread_local_system", thread_local=%thread_local_state.make_id_string(), %attempt_no);
      68         1100 :                 async {
      69         1100 :                     // Rate-limit retries per thread-local.
      70         1100 :                     // NB: doesn't yield to executor at attempt_no=0.
      71         1100 :                     utils::backoff::exponential_backoff(
      72         1100 :                         attempt_no,
      73         1100 :                         DEFAULT_BASE_BACKOFF_SECONDS,
      74         1100 :                         DEFAULT_MAX_BACKOFF_SECONDS,
      75         1100 :                         &fake_cancel,
      76         1100 :                     )
      77         1100 :                     .await;
      78         1100 :                     let per_system_metrics = metrics::THREAD_LOCAL_METRICS_STORAGE.register_system(inner.thread_local_state_id);
      79         1100 :                     let res = System::launch_with_metrics(per_system_metrics)
      80         1100 :                     // this might move us to another executor thread => loop outside the get_or_try_init, not inside it
      81         1100 :                     .await;
      82            0 :                     match res {
      83         1100 :                         Ok(system) => {
      84         1100 :                             info!("successfully launched system");
      85         1100 :                             metrics::THREAD_LOCAL_LAUNCH_SUCCESSES.inc();
      86         1100 :                             Ok(system)
      87              :                         }
      88            0 :                         Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
      89            0 :                             warn!("not enough locked memory to tokio-epoll-uring, will retry");
      90            0 :                             info_span!("stats").in_scope(|| {
      91            0 :                                 emit_launch_failure_process_stats();
      92            0 :                             });
      93            0 :                             metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
      94            0 :                             metrics::THREAD_LOCAL_METRICS_STORAGE.remove_system(inner.thread_local_state_id);
      95            0 :                             Err(())
      96              :                         }
      97              :                         // abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
      98              :                         // This is equivalent to a fatal IO error.
      99            0 :                         Err(ref e @ tokio_epoll_uring::LaunchResult::IoUringBuild(ref inner)) => {
     100            0 :                             error!(error=%e, "failed to launch thread-local tokio-epoll-uring, this should not happen, aborting process");
     101            0 :                             info_span!("stats").in_scope(|| {
     102            0 :                                 emit_launch_failure_process_stats();
     103            0 :                             });
     104            0 :                             on_fatal_io_error(inner, "launch thread-local tokio-epoll-uring");
     105              :                         },
     106              :                     }
     107         1100 :                 }
     108         1100 :                 .instrument(span)
     109         1100 :                 .await
     110      1819258 :             })
     111      1819258 :             .await;
     112      1819258 :         if get_or_init_res.is_ok() {
     113      1819258 :             return Handle(thread_local_state);
     114            0 :         }
     115              :     }
     116      1819258 : }
     117              : 
     118            0 : fn emit_launch_failure_process_stats() {
     119            0 :     // tokio-epoll-uring stats
     120            0 :     // vmlck + rlimit
     121            0 :     // number of threads
     122            0 :     // rss / system memory usage generally
     123            0 : 
     124            0 :     let tokio_epoll_uring::metrics::GlobalMetrics {
     125            0 :         systems_created,
     126            0 :         systems_destroyed,
     127            0 :     } = tokio_epoll_uring::metrics::global();
     128            0 :     info!(systems_created, systems_destroyed, "tokio-epoll-uring");
     129              : 
     130            0 :     match procfs::process::Process::myself() {
     131            0 :         Ok(myself) => {
     132            0 :             match myself.limits() {
     133            0 :                 Ok(limits) => {
     134            0 :                     info!(?limits.max_locked_memory, "/proc/self/limits");
     135              :                 }
     136            0 :                 Err(error) => {
     137            0 :                     info!(%error, "no limit stats due to error");
     138              :                 }
     139              :             }
     140              : 
     141            0 :             match myself.status() {
     142            0 :                 Ok(status) => {
     143            0 :                     let procfs::process::Status {
     144            0 :                         vmsize,
     145            0 :                         vmlck,
     146            0 :                         vmpin,
     147            0 :                         vmrss,
     148            0 :                         rssanon,
     149            0 :                         rssfile,
     150            0 :                         rssshmem,
     151            0 :                         vmdata,
     152            0 :                         vmstk,
     153            0 :                         vmexe,
     154            0 :                         vmlib,
     155            0 :                         vmpte,
     156            0 :                         threads,
     157            0 :                         ..
     158            0 :                     } = status;
     159            0 :                     info!(
     160              :                         vmsize,
     161              :                         vmlck,
     162              :                         vmpin,
     163              :                         vmrss,
     164              :                         rssanon,
     165              :                         rssfile,
     166              :                         rssshmem,
     167              :                         vmdata,
     168              :                         vmstk,
     169              :                         vmexe,
     170              :                         vmlib,
     171              :                         vmpte,
     172              :                         threads,
     173            0 :                         "/proc/self/status"
     174              :                     );
     175              :                 }
     176            0 :                 Err(error) => {
     177            0 :                     info!(%error, "no status status due to error");
     178              :                 }
     179              :             }
     180              :         }
     181            0 :         Err(error) => {
     182            0 :             info!(%error, "no process stats due to error");
     183              :         }
     184              :     };
     185            0 : }
     186              : 
     187              : #[derive(Clone)]
     188              : pub struct Handle(ThreadLocalState);
     189              : 
     190              : impl std::ops::Deref for Handle {
     191              :     type Target = SystemHandle<metrics::ThreadLocalMetrics>;
     192              : 
     193      1819258 :     fn deref(&self) -> &Self::Target {
     194      1819258 :         self.0
     195      1819258 :             .0
     196      1819258 :             .cell
     197      1819258 :             .get()
     198      1819258 :             .expect("must be already initialized when using this")
     199      1819258 :     }
     200              : }
        

Generated by: LCOV version 2.1-beta