LCOV - code coverage report
Current view: top level - libs/walproposer/src - api_bindings.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 95.9 % 369 354
Test Date: 2024-02-14 18:05:35 Functions: 97.3 % 37 36

            Line data    Source code
       1              : //! A C-Rust shim: defines implementation of C walproposer API, assuming wp
       2              : //! callback_data stores Box to some Rust implementation.
       3              : 
       4              : #![allow(dead_code)]
       5              : 
       6              : use std::ffi::CStr;
       7              : use std::ffi::CString;
       8              : 
       9              : use crate::bindings::uint32;
      10              : use crate::bindings::walproposer_api;
      11              : use crate::bindings::NeonWALReadResult;
      12              : use crate::bindings::PGAsyncReadResult;
      13              : use crate::bindings::PGAsyncWriteResult;
      14              : use crate::bindings::Safekeeper;
      15              : use crate::bindings::Size;
      16              : use crate::bindings::StringInfoData;
      17              : use crate::bindings::TimestampTz;
      18              : use crate::bindings::WalProposer;
      19              : use crate::bindings::WalProposerConnStatusType;
      20              : use crate::bindings::WalProposerConnectPollStatusType;
      21              : use crate::bindings::WalProposerExecStatusType;
      22              : use crate::bindings::WalproposerShmemState;
      23              : use crate::bindings::XLogRecPtr;
      24              : use crate::walproposer::ApiImpl;
      25              : use crate::walproposer::StreamingCallback;
      26              : use crate::walproposer::WaitResult;
      27              : 
      28         1264 : extern "C" fn get_shmem_state(wp: *mut WalProposer) -> *mut WalproposerShmemState {
      29         1264 :     unsafe {
      30         1264 :         let callback_data = (*(*wp).config).callback_data;
      31         1264 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      32         1264 :         (*api).get_shmem_state()
      33         1264 :     }
      34         1264 : }
      35              : 
      36         1214 : extern "C" fn start_streaming(wp: *mut WalProposer, startpos: XLogRecPtr) {
      37         1214 :     unsafe {
      38         1214 :         let callback_data = (*(*wp).config).callback_data;
      39         1214 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      40         1214 :         let callback = StreamingCallback::new(wp);
      41         1214 :         (*api).start_streaming(startpos, &callback);
      42         1214 :     }
      43         1214 : }
      44              : 
      45          817 : extern "C" fn get_flush_rec_ptr(wp: *mut WalProposer) -> XLogRecPtr {
      46          817 :     unsafe {
      47          817 :         let callback_data = (*(*wp).config).callback_data;
      48          817 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      49          817 :         (*api).get_flush_rec_ptr()
      50          817 :     }
      51          817 : }
      52              : 
      53      2534101 : extern "C" fn get_current_timestamp(wp: *mut WalProposer) -> TimestampTz {
      54      2534101 :     unsafe {
      55      2534101 :         let callback_data = (*(*wp).config).callback_data;
      56      2534101 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      57      2534101 :         (*api).get_current_timestamp()
      58      2534101 :     }
      59      2534101 : }
      60              : 
      61        87390 : extern "C" fn conn_error_message(sk: *mut Safekeeper) -> *mut ::std::os::raw::c_char {
      62        87390 :     unsafe {
      63        87390 :         let callback_data = (*(*(*sk).wp).config).callback_data;
      64        87390 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      65        87390 :         let msg = (*api).conn_error_message(&mut (*sk));
      66        87390 :         let msg = CString::new(msg).unwrap();
      67        87390 :         // TODO: fix leaking error message
      68        87390 :         msg.into_raw()
      69        87390 :     }
      70        87390 : }
      71              : 
      72       270015 : extern "C" fn conn_status(sk: *mut Safekeeper) -> WalProposerConnStatusType {
      73       270015 :     unsafe {
      74       270015 :         let callback_data = (*(*(*sk).wp).config).callback_data;
      75       270015 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      76       270015 :         (*api).conn_status(&mut (*sk))
      77       270015 :     }
      78       270015 : }
      79              : 
      80       270015 : extern "C" fn conn_connect_start(sk: *mut Safekeeper) {
      81       270015 :     unsafe {
      82       270015 :         let callback_data = (*(*(*sk).wp).config).callback_data;
      83       270015 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      84       270015 :         (*api).conn_connect_start(&mut (*sk))
      85       270015 :     }
      86       270015 : }
      87              : 
      88       243151 : extern "C" fn conn_connect_poll(sk: *mut Safekeeper) -> WalProposerConnectPollStatusType {
      89       243151 :     unsafe {
      90       243151 :         let callback_data = (*(*(*sk).wp).config).callback_data;
      91       243151 :         let api = callback_data as *mut Box<dyn ApiImpl>;
      92       243151 :         (*api).conn_connect_poll(&mut (*sk))
      93       243151 :     }
      94       243151 : }
      95              : 
      96       243151 : extern "C" fn conn_send_query(sk: *mut Safekeeper, query: *mut ::std::os::raw::c_char) -> bool {
      97       243151 :     let query = unsafe { CStr::from_ptr(query) };
      98       243151 :     let query = query.to_str().unwrap();
      99       243151 : 
     100       243151 :     unsafe {
     101       243151 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     102       243151 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     103       243151 :         (*api).conn_send_query(&mut (*sk), query)
     104       243151 :     }
     105       243151 : }
     106              : 
     107       243151 : extern "C" fn conn_get_query_result(sk: *mut Safekeeper) -> WalProposerExecStatusType {
     108       243151 :     unsafe {
     109       243151 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     110       243151 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     111       243151 :         (*api).conn_get_query_result(&mut (*sk))
     112       243151 :     }
     113       243151 : }
     114              : 
     115            0 : extern "C" fn conn_flush(sk: *mut Safekeeper) -> ::std::os::raw::c_int {
     116            0 :     unsafe {
     117            0 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     118            0 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     119            0 :         (*api).conn_flush(&mut (*sk))
     120            0 :     }
     121            0 : }
     122              : 
     123        89522 : extern "C" fn conn_finish(sk: *mut Safekeeper) {
     124        89522 :     unsafe {
     125        89522 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     126        89522 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     127        89522 :         (*api).conn_finish(&mut (*sk))
     128        89522 :     }
     129        89522 : }
     130              : 
     131       136617 : extern "C" fn conn_async_read(
     132       136617 :     sk: *mut Safekeeper,
     133       136617 :     buf: *mut *mut ::std::os::raw::c_char,
     134       136617 :     amount: *mut ::std::os::raw::c_int,
     135       136617 : ) -> PGAsyncReadResult {
     136       136617 :     unsafe {
     137       136617 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     138       136617 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     139       136617 : 
     140       136617 :         // This function has guarantee that returned buf will be valid until
     141       136617 :         // the next call. So we can store a Vec in each Safekeeper and reuse
     142       136617 :         // it on the next call.
     143       136617 :         let mut inbuf = take_vec_u8(&mut (*sk).inbuf).unwrap_or_default();
     144       136617 :         inbuf.clear();
     145       136617 : 
     146       136617 :         let result = (*api).conn_async_read(&mut (*sk), &mut inbuf);
     147       136617 : 
     148       136617 :         // Put a Vec back to sk->inbuf and return data ptr.
     149       136617 :         *amount = inbuf.len() as i32;
     150       136617 :         *buf = store_vec_u8(&mut (*sk).inbuf, inbuf);
     151       136617 : 
     152       136617 :         result
     153       136617 :     }
     154       136617 : }
     155              : 
     156        33923 : extern "C" fn conn_async_write(
     157        33923 :     sk: *mut Safekeeper,
     158        33923 :     buf: *const ::std::os::raw::c_void,
     159        33923 :     size: usize,
     160        33923 : ) -> PGAsyncWriteResult {
     161        33923 :     unsafe {
     162        33923 :         let buf = std::slice::from_raw_parts(buf as *const u8, size);
     163        33923 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     164        33923 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     165        33923 :         (*api).conn_async_write(&mut (*sk), buf)
     166        33923 :     }
     167        33923 : }
     168              : 
     169       272090 : extern "C" fn conn_blocking_write(
     170       272090 :     sk: *mut Safekeeper,
     171       272090 :     buf: *const ::std::os::raw::c_void,
     172       272090 :     size: usize,
     173       272090 : ) -> bool {
     174       272090 :     unsafe {
     175       272090 :         let buf = std::slice::from_raw_parts(buf as *const u8, size);
     176       272090 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     177       272090 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     178       272090 :         (*api).conn_blocking_write(&mut (*sk), buf)
     179       272090 :     }
     180       272090 : }
     181              : 
     182         5635 : extern "C" fn recovery_download(wp: *mut WalProposer, sk: *mut Safekeeper) -> bool {
     183         5635 :     unsafe {
     184         5635 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     185         5635 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     186         5635 : 
     187         5635 :         // currently `recovery_download` is always called right after election
     188         5635 :         (*api).after_election(&mut (*wp));
     189         5635 : 
     190         5635 :         (*api).recovery_download(&mut (*wp), &mut (*sk))
     191         5635 :     }
     192         5635 : }
     193              : 
     194         7643 : extern "C" fn wal_reader_allocate(sk: *mut Safekeeper) {
     195         7643 :     unsafe {
     196         7643 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     197         7643 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     198         7643 :         (*api).wal_reader_allocate(&mut (*sk));
     199         7643 :     }
     200         7643 : }
     201              : 
     202              : #[allow(clippy::unnecessary_cast)]
     203        26280 : extern "C" fn wal_read(
     204        26280 :     sk: *mut Safekeeper,
     205        26280 :     buf: *mut ::std::os::raw::c_char,
     206        26280 :     startptr: XLogRecPtr,
     207        26280 :     count: Size,
     208        26280 :     _errmsg: *mut *mut ::std::os::raw::c_char,
     209        26280 : ) -> NeonWALReadResult {
     210        26280 :     unsafe {
     211        26280 :         let buf = std::slice::from_raw_parts_mut(buf as *mut u8, count);
     212        26280 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     213        26280 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     214        26280 :         // TODO: errmsg is not forwarded
     215        26280 :         (*api).wal_read(&mut (*sk), buf, startptr)
     216        26280 :     }
     217        26280 : }
     218              : 
     219       110114 : extern "C" fn wal_reader_events(sk: *mut Safekeeper) -> uint32 {
     220       110114 :     unsafe {
     221       110114 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     222       110114 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     223       110114 :         (*api).wal_reader_events(&mut (*sk))
     224       110114 :     }
     225       110114 : }
     226              : 
     227        73356 : extern "C" fn init_event_set(wp: *mut WalProposer) {
     228        73356 :     unsafe {
     229        73356 :         let callback_data = (*(*wp).config).callback_data;
     230        73356 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     231        73356 :         (*api).init_event_set(&mut (*wp));
     232        73356 :     }
     233        73356 : }
     234              : 
     235       536298 : extern "C" fn update_event_set(sk: *mut Safekeeper, events: uint32) {
     236       536298 :     unsafe {
     237       536298 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     238       536298 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     239       536298 :         (*api).update_event_set(&mut (*sk), events);
     240       536298 :     }
     241       536298 : }
     242              : 
     243        38816 : extern "C" fn active_state_update_event_set(sk: *mut Safekeeper) {
     244        38816 :     unsafe {
     245        38816 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     246        38816 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     247        38816 :         (*api).active_state_update_event_set(&mut (*sk));
     248        38816 :     }
     249        38816 : }
     250              : 
     251       486302 : extern "C" fn add_safekeeper_event_set(sk: *mut Safekeeper, events: uint32) {
     252       486302 :     unsafe {
     253       486302 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     254       486302 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     255       486302 :         (*api).add_safekeeper_event_set(&mut (*sk), events);
     256       486302 :     }
     257       486302 : }
     258              : 
     259       305809 : extern "C" fn rm_safekeeper_event_set(sk: *mut Safekeeper) {
     260       305809 :     unsafe {
     261       305809 :         let callback_data = (*(*(*sk).wp).config).callback_data;
     262       305809 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     263       305809 :         (*api).rm_safekeeper_event_set(&mut (*sk));
     264       305809 :     }
     265       305809 : }
     266              : 
     267       706115 : extern "C" fn wait_event_set(
     268       706115 :     wp: *mut WalProposer,
     269       706115 :     timeout: ::std::os::raw::c_long,
     270       706115 :     event_sk: *mut *mut Safekeeper,
     271       706115 :     events: *mut uint32,
     272       706115 : ) -> ::std::os::raw::c_int {
     273       706115 :     unsafe {
     274       706115 :         let callback_data = (*(*wp).config).callback_data;
     275       706115 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     276       706115 :         let result = (*api).wait_event_set(&mut (*wp), timeout);
     277       706115 :         match result {
     278              :             WaitResult::Latch => {
     279        72549 :                 *event_sk = std::ptr::null_mut();
     280        72549 :                 *events = crate::bindings::WL_LATCH_SET;
     281        72549 :                 1
     282              :             }
     283              :             WaitResult::Timeout => {
     284        22113 :                 *event_sk = std::ptr::null_mut();
     285        22113 :                 // WaitEventSetWait returns 0 for timeout.
     286        22113 :                 *events = 0;
     287        22113 :                 0
     288              :             }
     289       611453 :             WaitResult::Network(sk, event_mask) => {
     290       611453 :                 *event_sk = sk;
     291       611453 :                 *events = event_mask;
     292       611453 :                 1
     293              :             }
     294              :         }
     295              :     }
     296       706115 : }
     297              : 
     298        73356 : extern "C" fn strong_random(
     299        73356 :     wp: *mut WalProposer,
     300        73356 :     buf: *mut ::std::os::raw::c_void,
     301        73356 :     len: usize,
     302        73356 : ) -> bool {
     303        73356 :     unsafe {
     304        73356 :         let buf = std::slice::from_raw_parts_mut(buf as *mut u8, len);
     305        73356 :         let callback_data = (*(*wp).config).callback_data;
     306        73356 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     307        73356 :         (*api).strong_random(buf)
     308        73356 :     }
     309        73356 : }
     310              : 
     311         2086 : extern "C" fn get_redo_start_lsn(wp: *mut WalProposer) -> XLogRecPtr {
     312         2086 :     unsafe {
     313         2086 :         let callback_data = (*(*wp).config).callback_data;
     314         2086 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     315         2086 :         (*api).get_redo_start_lsn()
     316         2086 :     }
     317         2086 : }
     318              : 
     319         2759 : extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
     320         2759 :     unsafe {
     321         2759 :         let callback_data = (*(*wp).config).callback_data;
     322         2759 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     323         2759 :         (*api).finish_sync_safekeepers(lsn)
     324         2759 :     }
     325         2759 : }
     326              : 
     327        13032 : extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, commit_lsn: XLogRecPtr) {
     328        13032 :     unsafe {
     329        13032 :         let callback_data = (*(*wp).config).callback_data;
     330        13032 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     331        13032 :         (*api).process_safekeeper_feedback(&mut (*wp), commit_lsn)
     332        13032 :     }
     333        13032 : }
     334              : 
     335       788304 : extern "C" fn log_internal(
     336       788304 :     wp: *mut WalProposer,
     337       788304 :     level: ::std::os::raw::c_int,
     338       788304 :     line: *const ::std::os::raw::c_char,
     339       788304 : ) {
     340       788304 :     unsafe {
     341       788304 :         let callback_data = (*(*wp).config).callback_data;
     342       788304 :         let api = callback_data as *mut Box<dyn ApiImpl>;
     343       788304 :         let line = CStr::from_ptr(line);
     344       788304 :         let line = line.to_str().unwrap();
     345       788304 :         (*api).log_internal(&mut (*wp), Level::from(level as u32), line)
     346       788304 :     }
     347       788304 : }
     348              : 
     349      1576105 : #[derive(Debug, PartialEq)]
     350              : pub enum Level {
     351              :     Debug5,
     352              :     Debug4,
     353              :     Debug3,
     354              :     Debug2,
     355              :     Debug1,
     356              :     Log,
     357              :     Info,
     358              :     Notice,
     359              :     Warning,
     360              :     Error,
     361              :     Fatal,
     362              :     Panic,
     363              :     WPEvent,
     364              : }
     365              : 
     366              : impl Level {
     367       788304 :     pub fn from(elevel: u32) -> Level {
     368       788304 :         use crate::bindings::*;
     369       788304 : 
     370       788304 :         match elevel {
     371        26280 :             DEBUG5 => Level::Debug5,
     372            0 :             DEBUG4 => Level::Debug4,
     373            0 :             DEBUG3 => Level::Debug3,
     374        81306 :             DEBUG2 => Level::Debug2,
     375            0 :             DEBUG1 => Level::Debug1,
     376       590699 :             LOG => Level::Log,
     377            0 :             INFO => Level::Info,
     378            0 :             NOTICE => Level::Notice,
     379        89522 :             WARNING => Level::Warning,
     380            0 :             ERROR => Level::Error,
     381          475 :             FATAL => Level::Fatal,
     382           22 :             PANIC => Level::Panic,
     383            0 :             WPEVENT => Level::WPEvent,
     384            0 :             _ => panic!("unknown log level {}", elevel),
     385              :         }
     386       788304 :     }
     387              : }
     388              : 
     389        73356 : pub(crate) fn create_api() -> walproposer_api {
     390        73356 :     walproposer_api {
     391        73356 :         get_shmem_state: Some(get_shmem_state),
     392        73356 :         start_streaming: Some(start_streaming),
     393        73356 :         get_flush_rec_ptr: Some(get_flush_rec_ptr),
     394        73356 :         get_current_timestamp: Some(get_current_timestamp),
     395        73356 :         conn_error_message: Some(conn_error_message),
     396        73356 :         conn_status: Some(conn_status),
     397        73356 :         conn_connect_start: Some(conn_connect_start),
     398        73356 :         conn_connect_poll: Some(conn_connect_poll),
     399        73356 :         conn_send_query: Some(conn_send_query),
     400        73356 :         conn_get_query_result: Some(conn_get_query_result),
     401        73356 :         conn_flush: Some(conn_flush),
     402        73356 :         conn_finish: Some(conn_finish),
     403        73356 :         conn_async_read: Some(conn_async_read),
     404        73356 :         conn_async_write: Some(conn_async_write),
     405        73356 :         conn_blocking_write: Some(conn_blocking_write),
     406        73356 :         recovery_download: Some(recovery_download),
     407        73356 :         wal_reader_allocate: Some(wal_reader_allocate),
     408        73356 :         wal_read: Some(wal_read),
     409        73356 :         wal_reader_events: Some(wal_reader_events),
     410        73356 :         init_event_set: Some(init_event_set),
     411        73356 :         update_event_set: Some(update_event_set),
     412        73356 :         active_state_update_event_set: Some(active_state_update_event_set),
     413        73356 :         add_safekeeper_event_set: Some(add_safekeeper_event_set),
     414        73356 :         rm_safekeeper_event_set: Some(rm_safekeeper_event_set),
     415        73356 :         wait_event_set: Some(wait_event_set),
     416        73356 :         strong_random: Some(strong_random),
     417        73356 :         get_redo_start_lsn: Some(get_redo_start_lsn),
     418        73356 :         finish_sync_safekeepers: Some(finish_sync_safekeepers),
     419        73356 :         process_safekeeper_feedback: Some(process_safekeeper_feedback),
     420        73356 :         log_internal: Some(log_internal),
     421        73356 :     }
     422        73356 : }
     423              : 
     424              : impl std::fmt::Display for Level {
     425         3142 :     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
     426         3142 :         write!(f, "{:?}", self)
     427         3142 :     }
     428              : }
     429              : 
     430              : /// Take ownership of `Vec<u8>` from StringInfoData.
     431              : #[allow(clippy::unnecessary_cast)]
     432       356669 : pub(crate) fn take_vec_u8(pg: &mut StringInfoData) -> Option<Vec<u8>> {
     433       356669 :     if pg.data.is_null() {
     434       220062 :         return None;
     435       136607 :     }
     436       136607 : 
     437       136607 :     let ptr = pg.data as *mut u8;
     438       136607 :     let length = pg.len as usize;
     439       136607 :     let capacity = pg.maxlen as usize;
     440       136607 : 
     441       136607 :     pg.data = std::ptr::null_mut();
     442       136607 :     pg.len = 0;
     443       136607 :     pg.maxlen = 0;
     444       136607 : 
     445       136607 :     unsafe { Some(Vec::from_raw_parts(ptr, length, capacity)) }
     446       356669 : }
     447              : 
     448              : /// Store `Vec<u8>` in StringInfoData.
     449       136617 : fn store_vec_u8(pg: &mut StringInfoData, vec: Vec<u8>) -> *mut ::std::os::raw::c_char {
     450       136617 :     let ptr = vec.as_ptr() as *mut ::std::os::raw::c_char;
     451       136617 :     let length = vec.len();
     452       136617 :     let capacity = vec.capacity();
     453       136617 : 
     454       136617 :     assert!(pg.data.is_null());
     455              : 
     456       136617 :     pg.data = ptr;
     457       136617 :     pg.len = length as i32;
     458       136617 :     pg.maxlen = capacity as i32;
     459       136617 : 
     460       136617 :     std::mem::forget(vec);
     461       136617 : 
     462       136617 :     ptr
     463       136617 : }
        

Generated by: LCOV version 2.1-beta