LCOV - code coverage report
Current view: top level - libs/walproposer/src - api_bindings.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 65.3 % 363 237
Test Date: 2024-02-07 07:37:29 Functions: 63.9 % 36 23

            Line data    Source code
       1              : //! A C-Rust shim: defines implementation of C walproposer API, assuming wp
       2              : //! callback_data stores Box to some Rust implementation.
       3              : 
       4              : #![allow(dead_code)]
       5              : 
       6              : use std::ffi::CStr;
       7              : use std::ffi::CString;
       8              : 
       9              : use crate::bindings::uint32;
      10              : use crate::bindings::walproposer_api;
      11              : use crate::bindings::NeonWALReadResult;
      12              : use crate::bindings::PGAsyncReadResult;
      13              : use crate::bindings::PGAsyncWriteResult;
      14              : use crate::bindings::Safekeeper;
      15              : use crate::bindings::Size;
      16              : use crate::bindings::StringInfoData;
      17              : use crate::bindings::TimestampTz;
      18              : use crate::bindings::WalProposer;
      19              : use crate::bindings::WalProposerConnStatusType;
      20              : use crate::bindings::WalProposerConnectPollStatusType;
      21              : use crate::bindings::WalProposerExecStatusType;
      22              : use crate::bindings::WalproposerShmemState;
      23              : use crate::bindings::XLogRecPtr;
      24              : use crate::walproposer::ApiImpl;
      25              : use crate::walproposer::WaitResult;
      26              : 
      27            0 : extern "C" fn get_shmem_state(wp: *mut WalProposer) -> *mut WalproposerShmemState {
      28            0 :     unsafe {
      29            0 :         let callback_data = (*(*wp).config).callback_data;
      30            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      31            0 :         (*api).get_shmem_state()
      32            0 :     }
      33            0 : }
      34              : 
      35            0 : extern "C" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) {
      36            0 :     unsafe {
      37            0 :         let callback_data = (*(*wp).config).callback_data;
      38            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      39            0 :         (*api).start_streaming(startpos)
      40            0 :     }
      41            0 : }
      42              : 
      43            0 : extern "C" fn get_flush_rec_ptr(wp: *mut WalProposer) -> XLogRecPtr {
      44            0 :     unsafe {
      45            0 :         let callback_data = (*(*wp).config).callback_data;
      46            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      47            0 :         (*api).get_flush_rec_ptr()
      48            0 :     }
      49            0 : }
      50              : 
      51           28 : extern "C" fn get_current_timestamp(wp: *mut WalProposer) -> TimestampTz {
      52           28 :     unsafe {
      53           28 :         let callback_data = (*(*wp).config).callback_data;
      54           28 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      55           28 :         (*api).get_current_timestamp()
      56           28 :     }
      57           28 : }
      58              : 
      59            0 : extern "C" fn conn_error_message(sk: *mut Safekeeper) -> *mut ::std::os::raw::c_char {
      60            0 :     unsafe {
      61            0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
      62            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      63            0 :         let msg = (*api).conn_error_message(&mut (*sk));
      64            0 :         let msg = CString::new(msg).unwrap();
      65            0 :         // TODO: fix leaking error message
      66            0 :         msg.into_raw()
      67            0 :     }
      68            0 : }
      69              : 
      70            2 : extern "C" fn conn_status(sk: *mut Safekeeper) -> WalProposerConnStatusType {
      71            2 :     unsafe {
      72            2 :         let callback_data = (*(*(*sk).wp).config).callback_data;
      73            2 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      74            2 :         (*api).conn_status(&mut (*sk))
      75            2 :     }
      76            2 : }
      77              : 
      78            2 : extern "C" fn conn_connect_start(sk: *mut Safekeeper) {
      79            2 :     unsafe {
      80            2 :         let callback_data = (*(*(*sk).wp).config).callback_data;
      81            2 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      82            2 :         (*api).conn_connect_start(&mut (*sk))
      83            2 :     }
      84            2 : }
      85              : 
      86            2 : extern "C" fn conn_connect_poll(sk: *mut Safekeeper) -> WalProposerConnectPollStatusType {
      87            2 :     unsafe {
      88            2 :         let callback_data = (*(*(*sk).wp).config).callback_data;
      89            2 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      90            2 :         (*api).conn_connect_poll(&mut (*sk))
      91            2 :     }
      92            2 : }
      93              : 
      94            2 : extern "C" fn conn_send_query(sk: *mut Safekeeper, query: *mut ::std::os::raw::c_char) -> bool {
      95            2 :     let query = unsafe { CStr::from_ptr(query) };
      96            2 :     let query = query.to_str().unwrap();
      97            2 : 
      98            2 :     unsafe {
      99            2 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     100            2 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     101            2 :         (*api).conn_send_query(&mut (*sk), query)
     102            2 :     }
     103            2 : }
     104              : 
     105            2 : extern "C" fn conn_get_query_result(sk: *mut Safekeeper) -> WalProposerExecStatusType {
     106            2 :     unsafe {
     107            2 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     108            2 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     109            2 :         (*api).conn_get_query_result(&mut (*sk))
     110            2 :     }
     111            2 : }
     112              : 
     113            0 : extern "C" fn conn_flush(sk: *mut Safekeeper) -> ::std::os::raw::c_int {
     114            0 :     unsafe {
     115            0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     116            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     117            0 :         (*api).conn_flush(&mut (*sk))
     118            0 :     }
     119            0 : }
     120              : 
     121            0 : extern "C" fn conn_finish(sk: *mut Safekeeper) {
     122            0 :     unsafe {
     123            0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     124            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     125            0 :         (*api).conn_finish(&mut (*sk))
     126            0 :     }
     127            0 : }
     128              : 
     129            4 : extern "C" fn conn_async_read(
     130            4 :     sk: *mut Safekeeper,
     131            4 :     buf: *mut *mut ::std::os::raw::c_char,
     132            4 :     amount: *mut ::std::os::raw::c_int,
     133            4 : ) -> PGAsyncReadResult {
     134            4 :     unsafe {
     135            4 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     136            4 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     137            4 :         let (res, result) = (*api).conn_async_read(&mut (*sk));
     138            4 : 
     139            4 :         // This function has guarantee that returned buf will be valid until
     140            4 :         // the next call. So we can store a Vec in each Safekeeper and reuse
     141            4 :         // it on the next call.
     142            4 :         let mut inbuf = take_vec_u8(&mut (*sk).inbuf).unwrap_or_default();
     143            4 : 
     144            4 :         inbuf.clear();
     145            4 :         inbuf.extend_from_slice(res);
     146            4 : 
     147            4 :         // Put a Vec back to sk->inbuf and return data ptr.
     148            4 :         *buf = store_vec_u8(&mut (*sk).inbuf, inbuf);
     149            4 :         *amount = res.len() as i32;
     150            4 : 
     151            4 :         result
     152            4 :     }
     153            4 : }
     154              : 
     155            0 : extern "C" fn conn_async_write(
     156            0 :     sk: *mut Safekeeper,
     157            0 :     buf: *const ::std::os::raw::c_void,
     158            0 :     size: usize,
     159            0 : ) -> PGAsyncWriteResult {
     160            0 :     unsafe {
     161            0 :         let buf = std::slice::from_raw_parts(buf as *const u8, size);
     162            0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     163            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     164            0 :         (*api).conn_async_write(&mut (*sk), buf)
     165            0 :     }
     166            0 : }
     167              : 
     168            4 : extern "C" fn conn_blocking_write(
     169            4 :     sk: *mut Safekeeper,
     170            4 :     buf: *const ::std::os::raw::c_void,
     171            4 :     size: usize,
     172            4 : ) -> bool {
     173            4 :     unsafe {
     174            4 :         let buf = std::slice::from_raw_parts(buf as *const u8, size);
     175            4 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     176            4 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     177            4 :         (*api).conn_blocking_write(&mut (*sk), buf)
     178            4 :     }
     179            4 : }
     180              : 
     181            2 : extern "C" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
     182            2 :     unsafe {
     183            2 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     184            2 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     185            2 :         (*api).recovery_download(&mut (*wp), &mut (*sk))
     186            2 :     }
     187            2 : }
     188              : 
     189            0 : extern "C" fn wal_reader_allocate(sk: *mut Safekeeper) {
     190            0 :     unsafe {
     191            0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     192            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     193            0 :         (*api).wal_reader_allocate(&mut (*sk));
     194            0 :     }
     195            0 : }
     196              : 
     197              : #[allow(clippy::unnecessary_cast)]
     198            0 : extern "C" fn wal_read(
     199            0 :     sk: *mut Safekeeper,
     200            0 :     buf: *mut ::std::os::raw::c_char,
     201            0 :     startptr: XLogRecPtr,
     202            0 :     count: Size,
     203            0 :     _errmsg: *mut *mut ::std::os::raw::c_char,
     204            0 : ) -> NeonWALReadResult {
     205            0 :     unsafe {
     206            0 :         let buf = std::slice::from_raw_parts_mut(buf as *mut u8, count);
     207            0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     208            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     209            0 :         // TODO: errmsg is not forwarded
     210            0 :         (*api).wal_read(&mut (*sk), buf, startptr)
     211            0 :     }
     212            0 : }
     213              : 
     214            0 : extern "C" fn wal_reader_events(sk: *mut Safekeeper) -> uint32 {
     215            0 :     unsafe {
     216            0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     217            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     218            0 :         (*api).wal_reader_events(&mut (*sk))
     219            0 :     }
     220            0 : }
     221              : 
     222            2 : extern "C" fn init_event_set(wp: *mut WalProposer) {
     223            2 :     unsafe {
     224            2 :         let callback_data = (*(*wp).config).callback_data;
     225            2 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     226            2 :         (*api).init_event_set(&mut (*wp));
     227            2 :     }
     228            2 : }
     229              : 
     230            8 : extern "C" fn update_event_set(sk: *mut Safekeeper, events: uint32) {
     231            8 :     unsafe {
     232            8 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     233            8 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     234            8 :         (*api).update_event_set(&mut (*sk), events);
     235            8 :     }
     236            8 : }
     237              : 
     238            0 : extern "C" fn active_state_update_event_set(sk: *mut Safekeeper) {
     239            0 :     unsafe {
     240            0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     241            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     242            0 :         (*api).active_state_update_event_set(&mut (*sk));
     243            0 :     }
     244            0 : }
     245              : 
     246            4 : extern "C" fn add_safekeeper_event_set(sk: *mut Safekeeper, events: uint32) {
     247            4 :     unsafe {
     248            4 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     249            4 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     250            4 :         (*api).add_safekeeper_event_set(&mut (*sk), events);
     251            4 :     }
     252            4 : }
     253              : 
     254            2 : extern "C" fn rm_safekeeper_event_set(sk: *mut Safekeeper) {
     255            2 :     unsafe {
     256            2 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     257            2 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     258            2 :         (*api).rm_safekeeper_event_set(&mut (*sk));
     259            2 :     }
     260            2 : }
     261              : 
     262            8 : extern "C" fn wait_event_set(
     263            8 :     wp: *mut WalProposer,
     264            8 :     timeout: ::std::os::raw::c_long,
     265            8 :     event_sk: *mut *mut Safekeeper,
     266            8 :     events: *mut uint32,
     267            8 : ) -> ::std::os::raw::c_int {
     268            8 :     unsafe {
     269            8 :         let callback_data = (*(*wp).config).callback_data;
     270            8 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     271            8 :         let result = (*api).wait_event_set(&mut (*wp), timeout);
     272            8 :         match result {
     273              :             WaitResult::Latch => {
     274            0 :                 *event_sk = std::ptr::null_mut();
     275            0 :                 *events = crate::bindings::WL_LATCH_SET;
     276            0 :                 1
     277              :             }
     278              :             WaitResult::Timeout => {
     279            0 :                 *event_sk = std::ptr::null_mut();
     280            0 :                 *events = crate::bindings::WL_TIMEOUT;
     281            0 :                 0
     282              :             }
     283            8 :             WaitResult::Network(sk, event_mask) => {
     284            8 :                 *event_sk = sk;
     285            8 :                 *events = event_mask;
     286            8 :                 1
     287              :             }
     288              :         }
     289              :     }
     290            8 : }
     291              : 
     292            2 : extern "C" fn strong_random(
     293            2 :     wp: *mut WalProposer,
     294            2 :     buf: *mut ::std::os::raw::c_void,
     295            2 :     len: usize,
     296            2 : ) -> bool {
     297            2 :     unsafe {
     298            2 :         let buf = std::slice::from_raw_parts_mut(buf as *mut u8, len);
     299            2 :         let callback_data = (*(*wp).config).callback_data;
     300            2 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     301            2 :         (*api).strong_random(buf)
     302            2 :     }
     303            2 : }
     304              : 
     305            0 : extern "C" fn get_redo_start_lsn(wp: *mut WalProposer) -> XLogRecPtr {
     306            0 :     unsafe {
     307            0 :         let callback_data = (*(*wp).config).callback_data;
     308            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     309            0 :         (*api).get_redo_start_lsn()
     310            0 :     }
     311            0 : }
     312              : 
     313            2 : extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
     314            2 :     unsafe {
     315            2 :         let callback_data = (*(*wp).config).callback_data;
     316            2 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     317            2 :         (*api).finish_sync_safekeepers(lsn)
     318            2 :     }
     319            2 : }
     320              : 
     321            0 : extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, commit_lsn: XLogRecPtr) {
     322            0 :     unsafe {
     323            0 :         let callback_data = (*(*wp).config).callback_data;
     324            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     325            0 :         (*api).process_safekeeper_feedback(&mut (*wp), commit_lsn)
     326            0 :     }
     327            0 : }
     328              : 
     329           14 : extern "C" fn log_internal(
     330           14 :     wp: *mut WalProposer,
     331           14 :     level: ::std::os::raw::c_int,
     332           14 :     line: *const ::std::os::raw::c_char,
     333           14 : ) {
     334           14 :     unsafe {
     335           14 :         let callback_data = (*(*wp).config).callback_data;
     336           14 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     337           14 :         let line = CStr::from_ptr(line);
     338           14 :         let line = line.to_str().unwrap();
     339           14 :         (*api).log_internal(&mut (*wp), Level::from(level as u32), line)
     340           14 :     }
     341           14 : }
     342              : 
     343           14 : #[derive(Debug)]
     344              : pub enum Level {
     345              :     Debug5,
     346              :     Debug4,
     347              :     Debug3,
     348              :     Debug2,
     349              :     Debug1,
     350              :     Log,
     351              :     Info,
     352              :     Notice,
     353              :     Warning,
     354              :     Error,
     355              :     Fatal,
     356              :     Panic,
     357              :     WPEvent,
     358              : }
     359              : 
     360              : impl Level {
     361           14 :     pub fn from(elevel: u32) -> Level {
     362           14 :         use crate::bindings::*;
     363           14 : 
     364           14 :         match elevel {
     365            0 :             DEBUG5 => Level::Debug5,
     366            0 :             DEBUG4 => Level::Debug4,
     367            0 :             DEBUG3 => Level::Debug3,
     368            0 :             DEBUG2 => Level::Debug2,
     369            0 :             DEBUG1 => Level::Debug1,
     370           14 :             LOG => Level::Log,
     371            0 :             INFO => Level::Info,
     372            0 :             NOTICE => Level::Notice,
     373            0 :             WARNING => Level::Warning,
     374            0 :             ERROR => Level::Error,
     375            0 :             FATAL => Level::Fatal,
     376            0 :             PANIC => Level::Panic,
     377            0 :             WPEVENT => Level::WPEvent,
     378            0 :             _ => panic!("unknown log level {}", elevel),
     379              :         }
     380           14 :     }
     381              : }
     382              : 
     383            2 : pub(crate) fn create_api() -> walproposer_api {
     384            2 :     walproposer_api {
     385            2 :         get_shmem_state: Some(get_shmem_state),
     386            2 :         start_streaming: Some(start_streaming),
     387            2 :         get_flush_rec_ptr: Some(get_flush_rec_ptr),
     388            2 :         get_current_timestamp: Some(get_current_timestamp),
     389            2 :         conn_error_message: Some(conn_error_message),
     390            2 :         conn_status: Some(conn_status),
     391            2 :         conn_connect_start: Some(conn_connect_start),
     392            2 :         conn_connect_poll: Some(conn_connect_poll),
     393            2 :         conn_send_query: Some(conn_send_query),
     394            2 :         conn_get_query_result: Some(conn_get_query_result),
     395            2 :         conn_flush: Some(conn_flush),
     396            2 :         conn_finish: Some(conn_finish),
     397            2 :         conn_async_read: Some(conn_async_read),
     398            2 :         conn_async_write: Some(conn_async_write),
     399            2 :         conn_blocking_write: Some(conn_blocking_write),
     400            2 :         recovery_download: Some(recovery_download),
     401            2 :         wal_reader_allocate: Some(wal_reader_allocate),
     402            2 :         wal_read: Some(wal_read),
     403            2 :         wal_reader_events: Some(wal_reader_events),
     404            2 :         init_event_set: Some(init_event_set),
     405            2 :         update_event_set: Some(update_event_set),
     406            2 :         active_state_update_event_set: Some(active_state_update_event_set),
     407            2 :         add_safekeeper_event_set: Some(add_safekeeper_event_set),
     408            2 :         rm_safekeeper_event_set: Some(rm_safekeeper_event_set),
     409            2 :         wait_event_set: Some(wait_event_set),
     410            2 :         strong_random: Some(strong_random),
     411            2 :         get_redo_start_lsn: Some(get_redo_start_lsn),
     412            2 :         finish_sync_safekeepers: Some(finish_sync_safekeepers),
     413            2 :         process_safekeeper_feedback: Some(process_safekeeper_feedback),
     414            2 :         log_internal: Some(log_internal),
     415            2 :     }
     416            2 : }
     417              : 
     418              : impl std::fmt::Display for Level {
     419           14 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     420           14 :         write!(f, "{:?}", self)
     421           14 :     }
     422              : }
     423              : 
     424              : /// Take ownership of `Vec<u8>` from StringInfoData.
     425              : #[allow(clippy::unnecessary_cast)]
     426            6 : pub(crate) fn take_vec_u8(pg: &mut StringInfoData) -> Option<Vec<u8>> {
     427            6 :     if pg.data.is_null() {
     428            2 :         return None;
     429            4 :     }
     430            4 : 
     431            4 :     let ptr = pg.data as *mut u8;
     432            4 :     let length = pg.len as usize;
     433            4 :     let capacity = pg.maxlen as usize;
     434            4 : 
     435            4 :     pg.data = std::ptr::null_mut();
     436            4 :     pg.len = 0;
     437            4 :     pg.maxlen = 0;
     438            4 : 
     439            4 :     unsafe { Some(Vec::from_raw_parts(ptr, length, capacity)) }
     440            6 : }
     441              : 
     442              : /// Store `Vec<u8>` in StringInfoData.
     443            4 : fn store_vec_u8(pg: &mut StringInfoData, vec: Vec<u8>) -> *mut ::std::os::raw::c_char {
     444            4 :     let ptr = vec.as_ptr() as *mut ::std::os::raw::c_char;
     445            4 :     let length = vec.len();
     446            4 :     let capacity = vec.capacity();
     447              : 
     448            4 :     assert!(pg.data.is_null());
     449              : 
     450            4 :     pg.data = ptr;
     451            4 :     pg.len = length as i32;
     452            4 :     pg.maxlen = capacity as i32;
     453            4 : 
     454            4 :     std::mem::forget(vec);
     455            4 : 
     456            4 :     ptr
     457            4 : }
        

Generated by: LCOV version 2.1-beta