LCOV - differential code coverage report
Current view: top level - libs/walproposer/src - api_bindings.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 65.3 % 363 237 126 237
Current Date: 2024-01-09 02:06:09 Functions: 63.9 % 36 23 13 23
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! A C-Rust shim: defines implementation of C walproposer API, assuming wp
       2                 : //! callback_data stores Box to some Rust implementation.
       3                 : 
       4                 : #![allow(dead_code)]
       5                 : 
       6                 : use std::ffi::CStr;
       7                 : use std::ffi::CString;
       8                 : 
       9                 : use crate::bindings::uint32;
      10                 : use crate::bindings::walproposer_api;
      11                 : use crate::bindings::NeonWALReadResult;
      12                 : use crate::bindings::PGAsyncReadResult;
      13                 : use crate::bindings::PGAsyncWriteResult;
      14                 : use crate::bindings::Safekeeper;
      15                 : use crate::bindings::Size;
      16                 : use crate::bindings::StringInfoData;
      17                 : use crate::bindings::TimestampTz;
      18                 : use crate::bindings::WalProposer;
      19                 : use crate::bindings::WalProposerConnStatusType;
      20                 : use crate::bindings::WalProposerConnectPollStatusType;
      21                 : use crate::bindings::WalProposerExecStatusType;
      22                 : use crate::bindings::WalproposerShmemState;
      23                 : use crate::bindings::XLogRecPtr;
      24                 : use crate::walproposer::ApiImpl;
      25                 : use crate::walproposer::WaitResult;
      26                 : 
      27 UBC           0 : extern "C" fn get_shmem_state(wp: *mut WalProposer) -> *mut WalproposerShmemState {
      28               0 :     unsafe {
      29               0 :         let callback_data = (*(*wp).config).callback_data;
      30               0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      31               0 :         (*api).get_shmem_state()
      32               0 :     }
      33               0 : }
      34                 : 
      35               0 : extern "C" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) {
      36               0 :     unsafe {
      37               0 :         let callback_data = (*(*wp).config).callback_data;
      38               0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      39               0 :         (*api).start_streaming(startpos)
      40               0 :     }
      41               0 : }
      42                 : 
      43               0 : extern "C" fn get_flush_rec_ptr(wp: *mut WalProposer) -> XLogRecPtr {
      44               0 :     unsafe {
      45               0 :         let callback_data = (*(*wp).config).callback_data;
      46               0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      47               0 :         (*api).get_flush_rec_ptr()
      48               0 :     }
      49               0 : }
      50                 : 
      51 CBC          14 : extern "C" fn get_current_timestamp(wp: *mut WalProposer) -> TimestampTz {
      52              14 :     unsafe {
      53              14 :         let callback_data = (*(*wp).config).callback_data;
      54              14 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      55              14 :         (*api).get_current_timestamp()
      56              14 :     }
      57              14 : }
      58                 : 
      59 UBC           0 : extern "C" fn conn_error_message(sk: *mut Safekeeper) -> *mut ::std::os::raw::c_char {
      60               0 :     unsafe {
      61               0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
      62               0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      63               0 :         let msg = (*api).conn_error_message(&mut (*sk));
      64               0 :         let msg = CString::new(msg).unwrap();
      65               0 :         // TODO: fix leaking error message
      66               0 :         msg.into_raw()
      67               0 :     }
      68               0 : }
      69                 : 
      70 CBC           1 : extern "C" fn conn_status(sk: *mut Safekeeper) -> WalProposerConnStatusType {
      71               1 :     unsafe {
      72               1 :         let callback_data = (*(*(*sk).wp).config).callback_data;
      73               1 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      74               1 :         (*api).conn_status(&mut (*sk))
      75               1 :     }
      76               1 : }
      77                 : 
      78               1 : extern "C" fn conn_connect_start(sk: *mut Safekeeper) {
      79               1 :     unsafe {
      80               1 :         let callback_data = (*(*(*sk).wp).config).callback_data;
      81               1 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      82               1 :         (*api).conn_connect_start(&mut (*sk))
      83               1 :     }
      84               1 : }
      85                 : 
      86               1 : extern "C" fn conn_connect_poll(sk: *mut Safekeeper) -> WalProposerConnectPollStatusType {
      87               1 :     unsafe {
      88               1 :         let callback_data = (*(*(*sk).wp).config).callback_data;
      89               1 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      90               1 :         (*api).conn_connect_poll(&mut (*sk))
      91               1 :     }
      92               1 : }
      93                 : 
      94               1 : extern "C" fn conn_send_query(sk: *mut Safekeeper, query: *mut ::std::os::raw::c_char) -> bool {
      95               1 :     let query = unsafe { CStr::from_ptr(query) };
      96               1 :     let query = query.to_str().unwrap();
      97               1 : 
      98               1 :     unsafe {
      99               1 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     100               1 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     101               1 :         (*api).conn_send_query(&mut (*sk), query)
     102               1 :     }
     103               1 : }
     104                 : 
     105               1 : extern "C" fn conn_get_query_result(sk: *mut Safekeeper) -> WalProposerExecStatusType {
     106               1 :     unsafe {
     107               1 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     108               1 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     109               1 :         (*api).conn_get_query_result(&mut (*sk))
     110               1 :     }
     111               1 : }
     112                 : 
     113 UBC           0 : extern "C" fn conn_flush(sk: *mut Safekeeper) -> ::std::os::raw::c_int {
     114               0 :     unsafe {
     115               0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     116               0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     117               0 :         (*api).conn_flush(&mut (*sk))
     118               0 :     }
     119               0 : }
     120                 : 
     121               0 : extern "C" fn conn_finish(sk: *mut Safekeeper) {
     122               0 :     unsafe {
     123               0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     124               0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     125               0 :         (*api).conn_finish(&mut (*sk))
     126               0 :     }
     127               0 : }
     128                 : 
     129 CBC           2 : extern "C" fn conn_async_read(
     130               2 :     sk: *mut Safekeeper,
     131               2 :     buf: *mut *mut ::std::os::raw::c_char,
     132               2 :     amount: *mut ::std::os::raw::c_int,
     133               2 : ) -> PGAsyncReadResult {
     134               2 :     unsafe {
     135               2 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     136               2 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     137               2 :         let (res, result) = (*api).conn_async_read(&mut (*sk));
     138               2 : 
     139               2 :         // This function has guarantee that returned buf will be valid until
     140               2 :         // the next call. So we can store a Vec in each Safekeeper and reuse
     141               2 :         // it on the next call.
     142               2 :         let mut inbuf = take_vec_u8(&mut (*sk).inbuf).unwrap_or_default();
     143               2 : 
     144               2 :         inbuf.clear();
     145               2 :         inbuf.extend_from_slice(res);
     146               2 : 
     147               2 :         // Put a Vec back to sk->inbuf and return data ptr.
     148               2 :         *buf = store_vec_u8(&mut (*sk).inbuf, inbuf);
     149               2 :         *amount = res.len() as i32;
     150               2 : 
     151               2 :         result
     152               2 :     }
     153               2 : }
     154                 : 
     155 UBC           0 : extern "C" fn conn_async_write(
     156               0 :     sk: *mut Safekeeper,
     157               0 :     buf: *const ::std::os::raw::c_void,
     158               0 :     size: usize,
     159               0 : ) -> PGAsyncWriteResult {
     160               0 :     unsafe {
     161               0 :         let buf = std::slice::from_raw_parts(buf as *const u8, size);
     162               0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     163               0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     164               0 :         (*api).conn_async_write(&mut (*sk), buf)
     165               0 :     }
     166               0 : }
     167                 : 
     168 CBC           2 : extern "C" fn conn_blocking_write(
     169               2 :     sk: *mut Safekeeper,
     170               2 :     buf: *const ::std::os::raw::c_void,
     171               2 :     size: usize,
     172               2 : ) -> bool {
     173               2 :     unsafe {
     174               2 :         let buf = std::slice::from_raw_parts(buf as *const u8, size);
     175               2 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     176               2 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     177               2 :         (*api).conn_blocking_write(&mut (*sk), buf)
     178               2 :     }
     179               2 : }
     180                 : 
     181               1 : extern "C" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
     182               1 :     unsafe {
     183               1 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     184               1 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     185               1 :         (*api).recovery_download(&mut (*wp), &mut (*sk))
     186               1 :     }
     187               1 : }
     188                 : 
     189 UBC           0 : extern "C" fn wal_reader_allocate(sk: *mut Safekeeper) {
     190               0 :     unsafe {
     191               0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     192               0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     193               0 :         (*api).wal_reader_allocate(&mut (*sk));
     194               0 :     }
     195               0 : }
     196                 : 
     197                 : #[allow(clippy::unnecessary_cast)]
     198               0 : extern "C" fn wal_read(
     199               0 :     sk: *mut Safekeeper,
     200               0 :     buf: *mut ::std::os::raw::c_char,
     201               0 :     startptr: XLogRecPtr,
     202               0 :     count: Size,
     203               0 :     _errmsg: *mut *mut ::std::os::raw::c_char,
     204               0 : ) -> NeonWALReadResult {
     205               0 :     unsafe {
     206               0 :         let buf = std::slice::from_raw_parts_mut(buf as *mut u8, count);
     207               0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     208               0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     209               0 :         // TODO: errmsg is not forwarded
     210               0 :         (*api).wal_read(&mut (*sk), buf, startptr)
     211               0 :     }
     212               0 : }
     213                 : 
     214               0 : extern "C" fn wal_reader_events(sk: *mut Safekeeper) -> uint32 {
     215               0 :     unsafe {
     216               0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     217               0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     218               0 :         (*api).wal_reader_events(&mut (*sk))
     219               0 :     }
     220               0 : }
     221                 : 
     222 CBC           1 : extern "C" fn init_event_set(wp: *mut WalProposer) {
     223               1 :     unsafe {
     224               1 :         let callback_data = (*(*wp).config).callback_data;
     225               1 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     226               1 :         (*api).init_event_set(&mut (*wp));
     227               1 :     }
     228               1 : }
     229                 : 
     230               4 : extern "C" fn update_event_set(sk: *mut Safekeeper, events: uint32) {
     231               4 :     unsafe {
     232               4 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     233               4 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     234               4 :         (*api).update_event_set(&mut (*sk), events);
     235               4 :     }
     236               4 : }
     237                 : 
     238 UBC           0 : extern "C" fn active_state_update_event_set(sk: *mut Safekeeper) {
     239               0 :     unsafe {
     240               0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     241               0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     242               0 :         (*api).active_state_update_event_set(&mut (*sk));
     243               0 :     }
     244               0 : }
     245                 : 
     246 CBC           2 : extern "C" fn add_safekeeper_event_set(sk: *mut Safekeeper, events: uint32) {
     247               2 :     unsafe {
     248               2 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     249               2 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     250               2 :         (*api).add_safekeeper_event_set(&mut (*sk), events);
     251               2 :     }
     252               2 : }
     253                 : 
     254               1 : extern "C" fn rm_safekeeper_event_set(sk: *mut Safekeeper) {
     255               1 :     unsafe {
     256               1 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     257               1 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     258               1 :         (*api).rm_safekeeper_event_set(&mut (*sk));
     259               1 :     }
     260               1 : }
     261                 : 
     262               4 : extern "C" fn wait_event_set(
     263               4 :     wp: *mut WalProposer,
     264               4 :     timeout: ::std::os::raw::c_long,
     265               4 :     event_sk: *mut *mut Safekeeper,
     266               4 :     events: *mut uint32,
     267               4 : ) -> ::std::os::raw::c_int {
     268               4 :     unsafe {
     269               4 :         let callback_data = (*(*wp).config).callback_data;
     270               4 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     271               4 :         let result = (*api).wait_event_set(&mut (*wp), timeout);
     272               4 :         match result {
     273                 :             WaitResult::Latch => {
     274 UBC           0 :                 *event_sk = std::ptr::null_mut();
     275               0 :                 *events = crate::bindings::WL_LATCH_SET;
     276               0 :                 1
     277                 :             }
     278                 :             WaitResult::Timeout => {
     279               0 :                 *event_sk = std::ptr::null_mut();
     280               0 :                 *events = crate::bindings::WL_TIMEOUT;
     281               0 :                 0
     282                 :             }
     283 CBC           4 :             WaitResult::Network(sk, event_mask) => {
     284               4 :                 *event_sk = sk;
     285               4 :                 *events = event_mask;
     286               4 :                 1
     287                 :             }
     288                 :         }
     289                 :     }
     290               4 : }
     291                 : 
     292               1 : extern "C" fn strong_random(
     293               1 :     wp: *mut WalProposer,
     294               1 :     buf: *mut ::std::os::raw::c_void,
     295               1 :     len: usize,
     296               1 : ) -> bool {
     297               1 :     unsafe {
     298               1 :         let buf = std::slice::from_raw_parts_mut(buf as *mut u8, len);
     299               1 :         let callback_data = (*(*wp).config).callback_data;
     300               1 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     301               1 :         (*api).strong_random(buf)
     302               1 :     }
     303               1 : }
     304                 : 
     305 UBC           0 : extern "C" fn get_redo_start_lsn(wp: *mut WalProposer) -> XLogRecPtr {
     306               0 :     unsafe {
     307               0 :         let callback_data = (*(*wp).config).callback_data;
     308               0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     309               0 :         (*api).get_redo_start_lsn()
     310               0 :     }
     311               0 : }
     312                 : 
     313 CBC           1 : extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
     314               1 :     unsafe {
     315               1 :         let callback_data = (*(*wp).config).callback_data;
     316               1 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     317               1 :         (*api).finish_sync_safekeepers(lsn)
     318               1 :     }
     319               1 : }
     320                 : 
     321 UBC           0 : extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, commit_lsn: XLogRecPtr) {
     322               0 :     unsafe {
     323               0 :         let callback_data = (*(*wp).config).callback_data;
     324               0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     325               0 :         (*api).process_safekeeper_feedback(&mut (*wp), commit_lsn)
     326               0 :     }
     327               0 : }
     328                 : 
     329 CBC           7 : extern "C" fn log_internal(
     330               7 :     wp: *mut WalProposer,
     331               7 :     level: ::std::os::raw::c_int,
     332               7 :     line: *const ::std::os::raw::c_char,
     333               7 : ) {
     334               7 :     unsafe {
     335               7 :         let callback_data = (*(*wp).config).callback_data;
     336               7 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     337               7 :         let line = CStr::from_ptr(line);
     338               7 :         let line = line.to_str().unwrap();
     339               7 :         (*api).log_internal(&mut (*wp), Level::from(level as u32), line)
     340               7 :     }
     341               7 : }
     342                 : 
     343               7 : #[derive(Debug)]
     344                 : pub enum Level {
     345                 :     Debug5,
     346                 :     Debug4,
     347                 :     Debug3,
     348                 :     Debug2,
     349                 :     Debug1,
     350                 :     Log,
     351                 :     Info,
     352                 :     Notice,
     353                 :     Warning,
     354                 :     Error,
     355                 :     Fatal,
     356                 :     Panic,
     357                 :     WPEvent,
     358                 : }
     359                 : 
     360                 : impl Level {
     361               7 :     pub fn from(elevel: u32) -> Level {
     362               7 :         use crate::bindings::*;
     363               7 : 
     364               7 :         match elevel {
     365 UBC           0 :             DEBUG5 => Level::Debug5,
     366               0 :             DEBUG4 => Level::Debug4,
     367               0 :             DEBUG3 => Level::Debug3,
     368               0 :             DEBUG2 => Level::Debug2,
     369               0 :             DEBUG1 => Level::Debug1,
     370 CBC           7 :             LOG => Level::Log,
     371 UBC           0 :             INFO => Level::Info,
     372               0 :             NOTICE => Level::Notice,
     373               0 :             WARNING => Level::Warning,
     374               0 :             ERROR => Level::Error,
     375               0 :             FATAL => Level::Fatal,
     376               0 :             PANIC => Level::Panic,
     377               0 :             WPEVENT => Level::WPEvent,
     378               0 :             _ => panic!("unknown log level {}", elevel),
     379                 :         }
     380 CBC           7 :     }
     381                 : }
     382                 : 
     383               1 : pub(crate) fn create_api() -> walproposer_api {
     384               1 :     walproposer_api {
     385               1 :         get_shmem_state: Some(get_shmem_state),
     386               1 :         start_streaming: Some(start_streaming),
     387               1 :         get_flush_rec_ptr: Some(get_flush_rec_ptr),
     388               1 :         get_current_timestamp: Some(get_current_timestamp),
     389               1 :         conn_error_message: Some(conn_error_message),
     390               1 :         conn_status: Some(conn_status),
     391               1 :         conn_connect_start: Some(conn_connect_start),
     392               1 :         conn_connect_poll: Some(conn_connect_poll),
     393               1 :         conn_send_query: Some(conn_send_query),
     394               1 :         conn_get_query_result: Some(conn_get_query_result),
     395               1 :         conn_flush: Some(conn_flush),
     396               1 :         conn_finish: Some(conn_finish),
     397               1 :         conn_async_read: Some(conn_async_read),
     398               1 :         conn_async_write: Some(conn_async_write),
     399               1 :         conn_blocking_write: Some(conn_blocking_write),
     400               1 :         recovery_download: Some(recovery_download),
     401               1 :         wal_reader_allocate: Some(wal_reader_allocate),
     402               1 :         wal_read: Some(wal_read),
     403               1 :         wal_reader_events: Some(wal_reader_events),
     404               1 :         init_event_set: Some(init_event_set),
     405               1 :         update_event_set: Some(update_event_set),
     406               1 :         active_state_update_event_set: Some(active_state_update_event_set),
     407               1 :         add_safekeeper_event_set: Some(add_safekeeper_event_set),
     408               1 :         rm_safekeeper_event_set: Some(rm_safekeeper_event_set),
     409               1 :         wait_event_set: Some(wait_event_set),
     410               1 :         strong_random: Some(strong_random),
     411               1 :         get_redo_start_lsn: Some(get_redo_start_lsn),
     412               1 :         finish_sync_safekeepers: Some(finish_sync_safekeepers),
     413               1 :         process_safekeeper_feedback: Some(process_safekeeper_feedback),
     414               1 :         log_internal: Some(log_internal),
     415               1 :     }
     416               1 : }
     417                 : 
     418                 : impl std::fmt::Display for Level {
     419               7 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     420               7 :         write!(f, "{:?}", self)
     421               7 :     }
     422                 : }
     423                 : 
     424                 : /// Take ownership of `Vec<u8>` from StringInfoData.
     425                 : #[allow(clippy::unnecessary_cast)]
     426               3 : pub(crate) fn take_vec_u8(pg: &mut StringInfoData) -> Option<Vec<u8>> {
     427               3 :     if pg.data.is_null() {
     428               1 :         return None;
     429               2 :     }
     430               2 : 
     431               2 :     let ptr = pg.data as *mut u8;
     432               2 :     let length = pg.len as usize;
     433               2 :     let capacity = pg.maxlen as usize;
     434               2 : 
     435               2 :     pg.data = std::ptr::null_mut();
     436               2 :     pg.len = 0;
     437               2 :     pg.maxlen = 0;
     438               2 : 
     439               2 :     unsafe { Some(Vec::from_raw_parts(ptr, length, capacity)) }
     440               3 : }
     441                 : 
     442                 : /// Store `Vec<u8>` in StringInfoData.
     443               2 : fn store_vec_u8(pg: &mut StringInfoData, vec: Vec<u8>) -> *mut ::std::os::raw::c_char {
     444               2 :     let ptr = vec.as_ptr() as *mut ::std::os::raw::c_char;
     445               2 :     let length = vec.len();
     446               2 :     let capacity = vec.capacity();
     447                 : 
     448               2 :     assert!(pg.data.is_null());
     449                 : 
     450               2 :     pg.data = ptr;
     451               2 :     pg.len = length as i32;
     452               2 :     pg.maxlen = capacity as i32;
     453               2 : 
     454               2 :     std::mem::forget(vec);
     455               2 : 
     456               2 :     ptr
     457               2 : }
        

Generated by: LCOV version 2.1-beta