LCOV - code coverage report
Current view: top level - libs/walproposer/src - walproposer.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 73.9 % 364 269
Test Date: 2025-07-16 12:29:03 Functions: 36.4 % 77 28

            Line data    Source code
       1              : #![allow(clippy::todo)]
       2              : 
       3              : use std::ffi::CString;
       4              : use std::str::FromStr;
       5              : 
       6              : use postgres_ffi::WAL_SEGMENT_SIZE;
       7              : use utils::id::TenantTimelineId;
       8              : use utils::lsn::Lsn;
       9              : 
      10              : use crate::api_bindings::{Level, create_api, take_vec_u8};
      11              : use crate::bindings::{
      12              :     NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
      13              :     WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
      14              : };
      15              : 
      16              : /// Rust high-level wrapper for C walproposer API. Many methods are not required
      17              : /// for simple cases, hence todo!() in default implementations.
      18              : ///
      19              : /// Refer to `pgxn/neon/walproposer.h` for documentation.
      20              : pub trait ApiImpl {
      21            0 :     fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
      22            0 :         todo!()
      23              :     }
      24              : 
      25            0 :     fn start_streaming(&self, _startpos: u64, _callback: &StreamingCallback) {
      26            0 :         todo!()
      27              :     }
      28              : 
      29            0 :     fn get_flush_rec_ptr(&self) -> u64 {
      30            0 :         todo!()
      31              :     }
      32              : 
      33            0 :     fn update_donor(&self, _donor: &mut Safekeeper, _donor_lsn: u64) {
      34            0 :         todo!()
      35              :     }
      36              : 
      37            0 :     fn get_current_timestamp(&self) -> i64 {
      38            0 :         todo!()
      39              :     }
      40              : 
      41            0 :     fn conn_error_message(&self, _sk: &mut Safekeeper) -> String {
      42            0 :         todo!()
      43              :     }
      44              : 
      45            0 :     fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType {
      46            0 :         todo!()
      47              :     }
      48              : 
      49            0 :     fn conn_connect_start(&self, _sk: &mut Safekeeper) {
      50            0 :         todo!()
      51              :     }
      52              : 
      53            0 :     fn conn_connect_poll(
      54            0 :         &self,
      55            0 :         _sk: &mut Safekeeper,
      56            0 :     ) -> crate::bindings::WalProposerConnectPollStatusType {
      57            0 :         todo!()
      58              :     }
      59              : 
      60            0 :     fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool {
      61            0 :         todo!()
      62              :     }
      63              : 
      64            0 :     fn conn_get_query_result(
      65            0 :         &self,
      66            0 :         _sk: &mut Safekeeper,
      67            0 :     ) -> crate::bindings::WalProposerExecStatusType {
      68            0 :         todo!()
      69              :     }
      70              : 
      71            0 :     fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 {
      72            0 :         todo!()
      73              :     }
      74              : 
      75            0 :     fn conn_finish(&self, _sk: &mut Safekeeper) {
      76            0 :         todo!()
      77              :     }
      78              : 
      79            0 :     fn conn_async_read(
      80            0 :         &self,
      81            0 :         _sk: &mut Safekeeper,
      82            0 :         _vec: &mut Vec<u8>,
      83            0 :     ) -> crate::bindings::PGAsyncReadResult {
      84            0 :         todo!()
      85              :     }
      86              : 
      87            0 :     fn conn_async_write(
      88            0 :         &self,
      89            0 :         _sk: &mut Safekeeper,
      90            0 :         _buf: &[u8],
      91            0 :     ) -> crate::bindings::PGAsyncWriteResult {
      92            0 :         todo!()
      93              :     }
      94              : 
      95            0 :     fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool {
      96            0 :         todo!()
      97              :     }
      98              : 
      99            0 :     fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
     100            0 :         todo!()
     101              :     }
     102              : 
     103            0 :     fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
     104            0 :         todo!()
     105              :     }
     106              : 
     107            0 :     fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
     108            0 :         todo!()
     109              :     }
     110              : 
     111            0 :     fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
     112            0 :         todo!()
     113              :     }
     114              : 
     115            0 :     fn init_event_set(&self, _wp: &mut WalProposer) {
     116            0 :         todo!()
     117              :     }
     118              : 
     119            0 :     fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
     120            0 :         todo!()
     121              :     }
     122              : 
     123            0 :     fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
     124            0 :         todo!()
     125              :     }
     126              : 
     127            0 :     fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
     128            0 :         todo!()
     129              :     }
     130              : 
     131            0 :     fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
     132            0 :         todo!()
     133              :     }
     134              : 
     135            0 :     fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
     136            0 :         todo!()
     137              :     }
     138              : 
     139            0 :     fn strong_random(&self, _buf: &mut [u8]) -> bool {
     140            0 :         todo!()
     141              :     }
     142              : 
     143            0 :     fn get_redo_start_lsn(&self) -> u64 {
     144            0 :         todo!()
     145              :     }
     146              : 
     147            0 :     fn finish_sync_safekeepers(&self, _lsn: u64) -> ! {
     148            0 :         todo!()
     149              :     }
     150              : 
     151            0 :     fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer, _sk: &mut Safekeeper) {
     152            0 :         todo!()
     153              :     }
     154              : 
     155            0 :     fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
     156            0 :         todo!()
     157              :     }
     158              : 
     159            0 :     fn after_election(&self, _wp: &mut WalProposer) {
     160            0 :         todo!()
     161              :     }
     162              : }
     163              : 
     164              : #[derive(Debug)]
     165              : pub enum WaitResult {
     166              :     Latch,
     167              :     Timeout,
     168              :     Network(*mut Safekeeper, u32),
     169              : }
     170              : 
     171              : #[derive(Clone)]
     172              : pub struct Config {
     173              :     /// Tenant and timeline id
     174              :     pub ttid: TenantTimelineId,
     175              :     /// List of safekeepers in format `host:port`
     176              :     pub safekeepers_list: Vec<String>,
     177              :     /// libpq connection info options
     178              :     pub safekeeper_conninfo_options: String,
     179              :     /// Safekeeper reconnect timeout in milliseconds
     180              :     pub safekeeper_reconnect_timeout: i32,
     181              :     /// Safekeeper connection timeout in milliseconds
     182              :     pub safekeeper_connection_timeout: i32,
     183              :     /// walproposer mode, finish when all safekeepers are synced or subscribe
     184              :     /// to WAL streaming
     185              :     pub sync_safekeepers: bool,
     186              : }
     187              : 
     188              : /// WalProposer main struct. C methods are reexported as Rust functions.
     189              : pub struct Wrapper {
     190              :     wp: *mut WalProposer,
     191              :     _safekeepers_list_vec: Vec<u8>,
     192              : }
     193              : 
     194              : impl Wrapper {
     195         9427 :     pub fn new(api: Box<dyn ApiImpl>, config: Config) -> Wrapper {
     196         9427 :         let neon_tenant = CString::new(config.ttid.tenant_id.to_string())
     197         9427 :             .unwrap()
     198         9427 :             .into_raw();
     199         9427 :         let neon_timeline = CString::new(config.ttid.timeline_id.to_string())
     200         9427 :             .unwrap()
     201         9427 :             .into_raw();
     202              : 
     203         9427 :         let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
     204         9427 :             .unwrap()
     205         9427 :             .into_bytes_with_nul();
     206         9427 :         assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
     207         9427 :         let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
     208         9427 :         let safekeeper_conninfo_options = CString::from_str(&config.safekeeper_conninfo_options)
     209         9427 :             .unwrap()
     210         9427 :             .into_raw();
     211              : 
     212         9427 :         let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
     213              : 
     214         9427 :         let c_config = WalProposerConfig {
     215         9427 :             neon_tenant,
     216         9427 :             neon_timeline,
     217         9427 :             safekeepers_list,
     218         9427 :             safekeeper_conninfo_options,
     219         9427 :             safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
     220         9427 :             safekeeper_connection_timeout: config.safekeeper_connection_timeout,
     221         9427 :             wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
     222         9427 :             syncSafekeepers: config.sync_safekeepers,
     223         9427 :             systemId: 0,
     224         9427 :             pgTimeline: 1,
     225         9427 :             proto_version: 3,
     226         9427 :             callback_data,
     227         9427 :         };
     228         9427 :         let c_config = Box::into_raw(Box::new(c_config));
     229              : 
     230         9427 :         let api = create_api();
     231         9427 :         let wp = unsafe { WalProposerCreate(c_config, api) };
     232         9427 :         Wrapper {
     233         9427 :             wp,
     234         9427 :             _safekeepers_list_vec: safekeepers_list_vec,
     235         9427 :         }
     236         9427 :     }
     237              : 
     238         9427 :     pub fn start(&self) {
     239         9427 :         unsafe { WalProposerStart(self.wp) }
     240         9427 :     }
     241              : }
     242              : 
     243              : impl Drop for Wrapper {
     244         9425 :     fn drop(&mut self) {
     245              :         unsafe {
     246         9425 :             let config = (*self.wp).config;
     247         9425 :             drop(Box::from_raw(
     248         9425 :                 (*config).callback_data as *mut Box<dyn ApiImpl>,
     249              :             ));
     250         9425 :             drop(CString::from_raw((*config).neon_tenant));
     251         9425 :             drop(CString::from_raw((*config).neon_timeline));
     252         9425 :             drop(Box::from_raw(config));
     253              : 
     254        28273 :             for i in 0..(*self.wp).n_safekeepers {
     255        28273 :                 let sk = &mut (*self.wp).safekeeper[i as usize];
     256        28273 :                 take_vec_u8(&mut sk.inbuf);
     257        28273 :             }
     258              : 
     259         9425 :             WalProposerFree(self.wp);
     260              :         }
     261         9425 :     }
     262              : }
     263              : 
     264              : pub struct StreamingCallback {
     265              :     wp: *mut WalProposer,
     266              : }
     267              : 
     268              : impl StreamingCallback {
     269          128 :     pub fn new(wp: *mut WalProposer) -> StreamingCallback {
     270          128 :         StreamingCallback { wp }
     271          128 :     }
     272              : 
     273          451 :     pub fn broadcast(&self, startpos: Lsn, endpos: Lsn) {
     274          451 :         unsafe { WalProposerBroadcast(self.wp, startpos.0, endpos.0) }
     275          451 :     }
     276              : 
     277          451 :     pub fn poll(&self) {
     278          451 :         unsafe { WalProposerPoll(self.wp) }
     279          451 :     }
     280              : }
     281              : 
     282              : #[cfg(test)]
     283              : mod tests {
     284              :     use core::panic;
     285              :     use std::cell::{Cell, UnsafeCell};
     286              :     use std::ffi::CString;
     287              :     use std::sync::atomic::AtomicUsize;
     288              :     use std::sync::mpsc::sync_channel;
     289              : 
     290              :     use utils::id::TenantTimelineId;
     291              : 
     292              :     use super::ApiImpl;
     293              :     use crate::api_bindings::Level;
     294              :     use crate::bindings::{NeonWALReadResult, PG_VERSION_NUM};
     295              :     use crate::walproposer::Wrapper;
     296              : 
     297              :     #[derive(Clone, Copy, Debug)]
     298              :     struct WaitEventsData {
     299              :         sk: *mut crate::bindings::Safekeeper,
     300              :         event_mask: u32,
     301              :     }
     302              : 
     303              :     struct MockImpl {
     304              :         // data to return from wait_event_set
     305              :         wait_events: Cell<WaitEventsData>,
     306              :         // walproposer->safekeeper messages
     307              :         expected_messages: Vec<Vec<u8>>,
     308              :         expected_ptr: AtomicUsize,
     309              :         // safekeeper->walproposer messages
     310              :         safekeeper_replies: Vec<Vec<u8>>,
     311              :         replies_ptr: AtomicUsize,
     312              :         // channel to send LSN to the main thread
     313              :         sync_channel: std::sync::mpsc::SyncSender<u64>,
     314              :         // Shmem state, used for storing donor info
     315              :         shmem: UnsafeCell<crate::bindings::WalproposerShmemState>,
     316              :     }
     317              : 
     318              :     impl MockImpl {
     319            2 :         fn check_walproposer_msg(&self, msg: &[u8]) {
     320            2 :             let ptr = self
     321            2 :                 .expected_ptr
     322            2 :                 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
     323              : 
     324            2 :             if ptr >= self.expected_messages.len() {
     325            0 :                 panic!("unexpected message from walproposer");
     326            2 :             }
     327              : 
     328            2 :             let expected_msg = &self.expected_messages[ptr];
     329            2 :             assert_eq!(msg, expected_msg.as_slice());
     330            2 :         }
     331              : 
     332            2 :         fn next_safekeeper_reply(&self) -> &[u8] {
     333            2 :             let ptr = self
     334            2 :                 .replies_ptr
     335            2 :                 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
     336              : 
     337            2 :             if ptr >= self.safekeeper_replies.len() {
     338            0 :                 panic!("no more safekeeper replies");
     339            2 :             }
     340              : 
     341            2 :             &self.safekeeper_replies[ptr]
     342            2 :         }
     343              :     }
     344              : 
     345              :     impl ApiImpl for MockImpl {
     346            2 :         fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
     347            2 :             self.shmem.get()
     348            2 :         }
     349              : 
     350           14 :         fn get_current_timestamp(&self) -> i64 {
     351           14 :             println!("get_current_timestamp");
     352           14 :             0
     353           14 :         }
     354              : 
     355            0 :         fn update_donor(&self, donor: &mut crate::bindings::Safekeeper, donor_lsn: u64) {
     356            0 :             let mut shmem = unsafe { *self.get_shmem_state() };
     357            0 :             shmem.propEpochStartLsn.value = donor_lsn;
     358            0 :             shmem.donor_conninfo = donor.conninfo;
     359            0 :             shmem.donor_lsn = donor_lsn;
     360            0 :         }
     361              : 
     362            1 :         fn conn_status(
     363            1 :             &self,
     364            1 :             _: &mut crate::bindings::Safekeeper,
     365            1 :         ) -> crate::bindings::WalProposerConnStatusType {
     366            1 :             println!("conn_status");
     367            1 :             crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
     368            1 :         }
     369              : 
     370            1 :         fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) {
     371            1 :             println!("conn_connect_start");
     372            1 :         }
     373              : 
     374            1 :         fn conn_connect_poll(
     375            1 :             &self,
     376            1 :             _: &mut crate::bindings::Safekeeper,
     377            1 :         ) -> crate::bindings::WalProposerConnectPollStatusType {
     378            1 :             println!("conn_connect_poll");
     379            1 :             crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
     380            1 :         }
     381              : 
     382            1 :         fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
     383            1 :             println!("conn_send_query: {query}");
     384            1 :             true
     385            1 :         }
     386              : 
     387            1 :         fn conn_get_query_result(
     388            1 :             &self,
     389            1 :             _: &mut crate::bindings::Safekeeper,
     390            1 :         ) -> crate::bindings::WalProposerExecStatusType {
     391            1 :             println!("conn_get_query_result");
     392            1 :             crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
     393            1 :         }
     394              : 
     395            2 :         fn conn_async_read(
     396            2 :             &self,
     397            2 :             _: &mut crate::bindings::Safekeeper,
     398            2 :             vec: &mut Vec<u8>,
     399            2 :         ) -> crate::bindings::PGAsyncReadResult {
     400            2 :             println!("conn_async_read");
     401            2 :             let reply = self.next_safekeeper_reply();
     402            2 :             println!("conn_async_read result: {reply:?}");
     403            2 :             vec.extend_from_slice(reply);
     404            2 :             crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
     405            2 :         }
     406              : 
     407            2 :         fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
     408            2 :             println!("conn_blocking_write: {buf:?}");
     409            2 :             self.check_walproposer_msg(buf);
     410            2 :             true
     411            2 :         }
     412              : 
     413            1 :         fn recovery_download(
     414            1 :             &self,
     415            1 :             _wp: &mut crate::bindings::WalProposer,
     416            1 :             _sk: &mut crate::bindings::Safekeeper,
     417            1 :         ) -> bool {
     418            1 :             true
     419            1 :         }
     420              : 
     421            0 :         fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
     422            0 :             println!("wal_reader_allocate");
     423            0 :             crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
     424            0 :         }
     425              : 
     426            1 :         fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
     427            1 :             println!("init_event_set")
     428            1 :         }
     429              : 
     430            3 :         fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
     431            3 :             println!(
     432            3 :                 "update_event_set, sk={:?}, events_mask={:#b}",
     433            3 :                 sk as *mut crate::bindings::Safekeeper, event_mask
     434              :             );
     435            3 :             self.wait_events.set(WaitEventsData { sk, event_mask });
     436            3 :         }
     437              : 
     438            2 :         fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
     439            2 :             println!(
     440            2 :                 "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
     441            2 :                 sk as *mut crate::bindings::Safekeeper, event_mask
     442              :             );
     443            2 :             self.wait_events.set(WaitEventsData { sk, event_mask });
     444            2 :         }
     445              : 
     446            1 :         fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
     447            1 :             println!(
     448            1 :                 "rm_safekeeper_event_set, sk={:?}",
     449            1 :                 sk as *mut crate::bindings::Safekeeper
     450              :             );
     451            1 :         }
     452              : 
     453            4 :         fn wait_event_set(
     454            4 :             &self,
     455            4 :             _: &mut crate::bindings::WalProposer,
     456            4 :             timeout_millis: i64,
     457            4 :         ) -> super::WaitResult {
     458            4 :             let data = self.wait_events.get();
     459            4 :             println!("wait_event_set, timeout_millis={timeout_millis}, res={data:?}");
     460            4 :             super::WaitResult::Network(data.sk, data.event_mask)
     461            4 :         }
     462              : 
     463            0 :         fn strong_random(&self, buf: &mut [u8]) -> bool {
     464            0 :             println!("strong_random");
     465            0 :             buf.fill(0);
     466            0 :             true
     467            0 :         }
     468              : 
     469            1 :         fn finish_sync_safekeepers(&self, lsn: u64) -> ! {
     470            1 :             self.sync_channel.send(lsn).unwrap();
     471            1 :             panic!("sync safekeepers finished at lsn={}", lsn);
     472              :         }
     473              : 
     474           12 :         fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
     475           12 :             println!("wp_log[{level}] {msg}");
     476           12 :         }
     477              : 
     478            1 :         fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
     479            1 :             println!("after_election");
     480            1 :         }
     481              :     }
     482              : 
     483              :     /// Test that walproposer can successfully connect to safekeeper and finish
     484              :     /// sync_safekeepers. API is mocked in MockImpl.
     485              :     ///
     486              :     /// Run this test with valgrind to detect leaks:
     487              :     /// `valgrind --leak-check=full target/debug/deps/walproposer-<build>`
     488              :     #[test]
     489            1 :     fn test_simple_sync_safekeepers() -> anyhow::Result<()> {
     490            1 :         let ttid = TenantTimelineId::new(
     491            1 :             "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
     492            1 :             "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
     493              :         );
     494              : 
     495            1 :         let (sender, receiver) = sync_channel(1);
     496              : 
     497              :         // Messages definitions are at walproposer.h
     498              :         // xxx: it would be better to extract them from safekeeper crate and
     499              :         // use serialization/deserialization here.
     500            1 :         let greeting_tag = (b'g').to_be_bytes();
     501            1 :         let tenant_id = CString::new(ttid.tenant_id.to_string())
     502            1 :             .unwrap()
     503            1 :             .into_bytes_with_nul();
     504            1 :         let timeline_id = CString::new(ttid.timeline_id.to_string())
     505            1 :             .unwrap()
     506            1 :             .into_bytes_with_nul();
     507            1 :         let mconf_gen = 0_u32.to_be_bytes();
     508            1 :         let mconf_members_len = 0_u32.to_be_bytes();
     509            1 :         let mconf_members_new_len = 0_u32.to_be_bytes();
     510            1 :         let pg_version: [u8; 4] = PG_VERSION_NUM.to_be_bytes();
     511            1 :         let system_id = 0_u64.to_be_bytes();
     512            1 :         let wal_seg_size = 16777216_u32.to_be_bytes();
     513              : 
     514            1 :         let proposer_greeting = [
     515            1 :             greeting_tag.as_slice(),
     516            1 :             tenant_id.as_slice(),
     517            1 :             timeline_id.as_slice(),
     518            1 :             mconf_gen.as_slice(),
     519            1 :             mconf_members_len.as_slice(),
     520            1 :             mconf_members_new_len.as_slice(),
     521            1 :             pg_version.as_slice(),
     522            1 :             system_id.as_slice(),
     523            1 :             wal_seg_size.as_slice(),
     524            1 :         ]
     525            1 :         .concat();
     526              : 
     527            1 :         let voting_tag = (b'v').to_be_bytes();
     528            1 :         let vote_request_term = 3_u64.to_be_bytes();
     529            1 :         let vote_request = [
     530            1 :             voting_tag.as_slice(),
     531            1 :             mconf_gen.as_slice(),
     532            1 :             vote_request_term.as_slice(),
     533            1 :         ]
     534            1 :         .concat();
     535              : 
     536            1 :         let acceptor_greeting_term = 2_u64.to_be_bytes();
     537            1 :         let acceptor_greeting_node_id = 1_u64.to_be_bytes();
     538            1 :         let acceptor_greeting = [
     539            1 :             greeting_tag.as_slice(),
     540            1 :             acceptor_greeting_node_id.as_slice(),
     541            1 :             mconf_gen.as_slice(),
     542            1 :             mconf_members_len.as_slice(),
     543            1 :             mconf_members_new_len.as_slice(),
     544            1 :             acceptor_greeting_term.as_slice(),
     545            1 :         ]
     546            1 :         .concat();
     547              : 
     548            1 :         let vote_response_term = 3_u64.to_be_bytes();
     549            1 :         let vote_given = 1_u8.to_be_bytes();
     550            1 :         let flush_lsn = 0x539_u64.to_be_bytes();
     551            1 :         let truncate_lsn = 0x539_u64.to_be_bytes();
     552            1 :         let th_len = 1_u32.to_be_bytes();
     553            1 :         let th_term = 2_u64.to_be_bytes();
     554            1 :         let th_lsn = 0x539_u64.to_be_bytes();
     555            1 :         let vote_response = [
     556            1 :             voting_tag.as_slice(),
     557            1 :             mconf_gen.as_slice(),
     558            1 :             vote_response_term.as_slice(),
     559            1 :             vote_given.as_slice(),
     560            1 :             flush_lsn.as_slice(),
     561            1 :             truncate_lsn.as_slice(),
     562            1 :             th_len.as_slice(),
     563            1 :             th_term.as_slice(),
     564            1 :             th_lsn.as_slice(),
     565            1 :         ]
     566            1 :         .concat();
     567              : 
     568            1 :         let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
     569            1 :             wait_events: Cell::new(WaitEventsData {
     570            1 :                 sk: std::ptr::null_mut(),
     571            1 :                 event_mask: 0,
     572            1 :             }),
     573            1 :             expected_messages: vec![proposer_greeting, vote_request],
     574            1 :             expected_ptr: AtomicUsize::new(0),
     575            1 :             safekeeper_replies: vec![acceptor_greeting, vote_response],
     576            1 :             replies_ptr: AtomicUsize::new(0),
     577            1 :             sync_channel: sender,
     578            1 :             shmem: UnsafeCell::new(crate::api_bindings::empty_shmem()),
     579            1 :         });
     580            1 :         let config = crate::walproposer::Config {
     581            1 :             ttid,
     582            1 :             safekeepers_list: vec!["localhost:5000".to_string()],
     583            1 :             safekeeper_conninfo_options: String::new(),
     584            1 :             safekeeper_reconnect_timeout: 1000,
     585            1 :             safekeeper_connection_timeout: 10000,
     586            1 :             sync_safekeepers: true,
     587            1 :         };
     588              : 
     589            1 :         let wp = Wrapper::new(my_impl, config);
     590              : 
     591              :         // walproposer will panic when it finishes sync_safekeepers
     592            1 :         std::panic::catch_unwind(|| wp.start()).unwrap_err();
     593              :         // validate the resulting LSN
     594            1 :         assert_eq!(receiver.try_recv(), Ok(1337));
     595            1 :         Ok(())
     596              :         // drop() will free up resources here
     597            1 :     }
     598              : }
        

Generated by: LCOV version 2.1-beta