LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/io_engine - tokio_epoll_uring_ext.rs (source / functions) Coverage Total Hit
Test: fc67f8dc6087a0b4f4f0bcd74f6e1dc25fab8cf3.info Lines: 46.8 % 111 52
Test Date: 2024-09-24 13:57:57 Functions: 75.0 % 12 9

            Line data    Source code
       1              : //! Like [`::tokio_epoll_uring::thread_local_system()`], but with pageserver-specific
       2              : //! handling in case the instance can't launched.
       3              : //!
       4              : //! This is primarily necessary due to ENOMEM aka OutOfMemory errors during io_uring creation
       5              : //! on older kernels, such as some (but not all) older kernels in the Linux 5.10 series.
       6              : //! See <https://github.com/neondatabase/neon/issues/6373#issuecomment-1905814391> for more details.
       7              : 
       8              : use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
       9              : use std::sync::Arc;
      10              : 
      11              : use tokio_util::sync::CancellationToken;
      12              : use tracing::{error, info, info_span, warn, Instrument};
      13              : use utils::backoff::{DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS};
      14              : 
      15              : use tokio_epoll_uring::{System, SystemHandle};
      16              : 
      17              : use crate::virtual_file::on_fatal_io_error;
      18              : 
      19              : use crate::metrics::tokio_epoll_uring as metrics;
      20              : 
      21              : #[derive(Clone)]
      22              : struct ThreadLocalState(Arc<ThreadLocalStateInner>);
      23              : 
      24              : struct ThreadLocalStateInner {
      25              :     cell: tokio::sync::OnceCell<SystemHandle>,
      26              :     launch_attempts: AtomicU32,
      27              :     /// populated through fetch_add from [`THREAD_LOCAL_STATE_ID`]
      28              :     thread_local_state_id: u64,
      29              : }
      30              : 
      31              : impl ThreadLocalState {
      32         1156 :     pub fn new() -> Self {
      33         1156 :         Self(Arc::new(ThreadLocalStateInner {
      34         1156 :             cell: tokio::sync::OnceCell::default(),
      35         1156 :             launch_attempts: AtomicU32::new(0),
      36         1156 :             thread_local_state_id: THREAD_LOCAL_STATE_ID.fetch_add(1, Ordering::Relaxed),
      37         1156 :         }))
      38         1156 :     }
      39              : 
      40         1156 :     pub fn make_id_string(&self) -> String {
      41         1156 :         format!("{}", self.0.thread_local_state_id)
      42         1156 :     }
      43              : }
      44              : 
      45              : static THREAD_LOCAL_STATE_ID: AtomicU64 = AtomicU64::new(0);
      46              : 
      47              : thread_local! {
      48              :     static THREAD_LOCAL: ThreadLocalState = ThreadLocalState::new();
      49              : }
      50              : 
      51              : /// Panics if we cannot [`System::launch`].
      52      3142343 : pub async fn thread_local_system() -> Handle {
      53      3142343 :     let fake_cancel = CancellationToken::new();
      54      3142343 :     loop {
      55      3142343 :         let thread_local_state = THREAD_LOCAL.with(|arc| arc.clone());
      56      3142343 :         let inner = &thread_local_state.0;
      57      3142343 :         let get_or_init_res = inner
      58      3142343 :             .cell
      59      3142343 :             .get_or_try_init(|| async {
      60         1156 :                 let attempt_no = inner
      61         1156 :                     .launch_attempts
      62         1156 :                     .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
      63         1156 :                 let span = info_span!("tokio_epoll_uring_ext::thread_local_system", thread_local=%thread_local_state.make_id_string(), %attempt_no);
      64         1156 :                 async {
      65         1156 :                     // Rate-limit retries per thread-local.
      66         1156 :                     // NB: doesn't yield to executor at attempt_no=0.
      67         1156 :                     utils::backoff::exponential_backoff(
      68         1156 :                         attempt_no,
      69         1156 :                         DEFAULT_BASE_BACKOFF_SECONDS,
      70         1156 :                         DEFAULT_MAX_BACKOFF_SECONDS,
      71         1156 :                         &fake_cancel,
      72         1156 :                     )
      73            0 :                     .await;
      74         1156 :                     let res = System::launch()
      75              :                     // this might move us to another executor thread => loop outside the get_or_try_init, not inside it
      76         1156 :                     .await;
      77            0 :                     match res {
      78         1156 :                         Ok(system) => {
      79         1156 :                             info!("successfully launched system");
      80         1156 :                             metrics::THREAD_LOCAL_LAUNCH_SUCCESSES.inc();
      81         1156 :                             Ok(system)
      82              :                         }
      83            0 :                         Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) if e.kind() == std::io::ErrorKind::OutOfMemory => {
      84            0 :                             warn!("not enough locked memory to tokio-epoll-uring, will retry");
      85            0 :                             info_span!("stats").in_scope(|| {
      86            0 :                                 emit_launch_failure_process_stats();
      87            0 :                             });
      88            0 :                             metrics::THREAD_LOCAL_LAUNCH_FAILURES.inc();
      89            0 :                             Err(())
      90              :                         }
      91              :                         // abort the process instead of panicking because pageserver usually becomes half-broken if we panic somewhere.
      92              :                         // This is equivalent to a fatal IO error.
      93            0 :                         Err(ref e @ tokio_epoll_uring::LaunchResult::IoUringBuild(ref inner)) => {
      94            0 :                             error!(error=%e, "failed to launch thread-local tokio-epoll-uring, this should not happen, aborting process");
      95            0 :                             info_span!("stats").in_scope(|| {
      96            0 :                                 emit_launch_failure_process_stats();
      97            0 :                             });
      98            0 :                             on_fatal_io_error(inner, "launch thread-local tokio-epoll-uring");
      99              :                         },
     100              :                     }
     101         1156 :                 }
     102         1156 :                 .instrument(span)
     103         1156 :                 .await
     104      3142343 :             })
     105         1156 :             .await;
     106      3142343 :         if get_or_init_res.is_ok() {
     107      3142343 :             return Handle(thread_local_state);
     108            0 :         }
     109              :     }
     110      3142343 : }
     111              : 
     112            0 : fn emit_launch_failure_process_stats() {
     113            0 :     // tokio-epoll-uring stats
     114            0 :     // vmlck + rlimit
     115            0 :     // number of threads
     116            0 :     // rss / system memory usage generally
     117            0 : 
     118            0 :     let tokio_epoll_uring::metrics::Metrics {
     119            0 :         systems_created,
     120            0 :         systems_destroyed,
     121            0 :     } = tokio_epoll_uring::metrics::global();
     122            0 :     info!(systems_created, systems_destroyed, "tokio-epoll-uring");
     123              : 
     124            0 :     match procfs::process::Process::myself() {
     125            0 :         Ok(myself) => {
     126            0 :             match myself.limits() {
     127            0 :                 Ok(limits) => {
     128            0 :                     info!(?limits.max_locked_memory, "/proc/self/limits");
     129              :                 }
     130            0 :                 Err(error) => {
     131            0 :                     info!(%error, "no limit stats due to error");
     132              :                 }
     133              :             }
     134              : 
     135            0 :             match myself.status() {
     136            0 :                 Ok(status) => {
     137            0 :                     let procfs::process::Status {
     138            0 :                         vmsize,
     139            0 :                         vmlck,
     140            0 :                         vmpin,
     141            0 :                         vmrss,
     142            0 :                         rssanon,
     143            0 :                         rssfile,
     144            0 :                         rssshmem,
     145            0 :                         vmdata,
     146            0 :                         vmstk,
     147            0 :                         vmexe,
     148            0 :                         vmlib,
     149            0 :                         vmpte,
     150            0 :                         threads,
     151            0 :                         ..
     152            0 :                     } = status;
     153            0 :                     info!(
     154              :                         vmsize,
     155              :                         vmlck,
     156              :                         vmpin,
     157              :                         vmrss,
     158              :                         rssanon,
     159              :                         rssfile,
     160              :                         rssshmem,
     161              :                         vmdata,
     162              :                         vmstk,
     163              :                         vmexe,
     164              :                         vmlib,
     165              :                         vmpte,
     166              :                         threads,
     167            0 :                         "/proc/self/status"
     168              :                     );
     169              :                 }
     170            0 :                 Err(error) => {
     171            0 :                     info!(%error, "no status status due to error");
     172              :                 }
     173              :             }
     174              :         }
     175            0 :         Err(error) => {
     176            0 :             info!(%error, "no process stats due to error");
     177              :         }
     178              :     };
     179            0 : }
     180              : 
     181              : #[derive(Clone)]
     182              : pub struct Handle(ThreadLocalState);
     183              : 
     184              : impl std::ops::Deref for Handle {
     185              :     type Target = SystemHandle;
     186              : 
     187      3142343 :     fn deref(&self) -> &Self::Target {
     188      3142343 :         self.0
     189      3142343 :              .0
     190      3142343 :             .cell
     191      3142343 :             .get()
     192      3142343 :             .expect("must be already initialized when using this")
     193      3142343 :     }
     194              : }
        

Generated by: LCOV version 2.1-beta