LCOV - code coverage report
Current view: top level - libs/walproposer/src - walproposer.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 75.6 % 389 294
Test Date: 2025-03-12 00:01:28 Functions: 36.4 % 77 28

            Line data    Source code
       1              : #![allow(clippy::todo)]
       2              : 
       3              : use std::ffi::CString;
       4              : 
       5              : use postgres_ffi::WAL_SEGMENT_SIZE;
       6              : use utils::id::TenantTimelineId;
       7              : use utils::lsn::Lsn;
       8              : 
       9              : use crate::api_bindings::{Level, create_api, take_vec_u8};
      10              : use crate::bindings::{
      11              :     NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
      12              :     WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
      13              : };
      14              : 
      15              : /// Rust high-level wrapper for C walproposer API. Many methods are not required
      16              : /// for simple cases, hence todo!() in default implementations.
      17              : ///
      18              : /// Refer to `pgxn/neon/walproposer.h` for documentation.
      19              : pub trait ApiImpl {
      20            0 :     fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
      21            0 :         todo!()
      22              :     }
      23              : 
      24            0 :     fn start_streaming(&self, _startpos: u64, _callback: &StreamingCallback) {
      25            0 :         todo!()
      26              :     }
      27              : 
      28            0 :     fn get_flush_rec_ptr(&self) -> u64 {
      29            0 :         todo!()
      30              :     }
      31              : 
      32            0 :     fn update_donor(&self, _donor: &mut Safekeeper, _donor_lsn: u64) {
      33            0 :         todo!()
      34              :     }
      35              : 
      36            0 :     fn get_current_timestamp(&self) -> i64 {
      37            0 :         todo!()
      38              :     }
      39              : 
      40            0 :     fn conn_error_message(&self, _sk: &mut Safekeeper) -> String {
      41            0 :         todo!()
      42              :     }
      43              : 
      44            0 :     fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType {
      45            0 :         todo!()
      46              :     }
      47              : 
      48            0 :     fn conn_connect_start(&self, _sk: &mut Safekeeper) {
      49            0 :         todo!()
      50              :     }
      51              : 
      52            0 :     fn conn_connect_poll(
      53            0 :         &self,
      54            0 :         _sk: &mut Safekeeper,
      55            0 :     ) -> crate::bindings::WalProposerConnectPollStatusType {
      56            0 :         todo!()
      57              :     }
      58              : 
      59            0 :     fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool {
      60            0 :         todo!()
      61              :     }
      62              : 
      63            0 :     fn conn_get_query_result(
      64            0 :         &self,
      65            0 :         _sk: &mut Safekeeper,
      66            0 :     ) -> crate::bindings::WalProposerExecStatusType {
      67            0 :         todo!()
      68              :     }
      69              : 
      70            0 :     fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 {
      71            0 :         todo!()
      72              :     }
      73              : 
      74            0 :     fn conn_finish(&self, _sk: &mut Safekeeper) {
      75            0 :         todo!()
      76              :     }
      77              : 
      78            0 :     fn conn_async_read(
      79            0 :         &self,
      80            0 :         _sk: &mut Safekeeper,
      81            0 :         _vec: &mut Vec<u8>,
      82            0 :     ) -> crate::bindings::PGAsyncReadResult {
      83            0 :         todo!()
      84              :     }
      85              : 
      86            0 :     fn conn_async_write(
      87            0 :         &self,
      88            0 :         _sk: &mut Safekeeper,
      89            0 :         _buf: &[u8],
      90            0 :     ) -> crate::bindings::PGAsyncWriteResult {
      91            0 :         todo!()
      92              :     }
      93              : 
      94            0 :     fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool {
      95            0 :         todo!()
      96              :     }
      97              : 
      98            0 :     fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
      99            0 :         todo!()
     100              :     }
     101              : 
     102            0 :     fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
     103            0 :         todo!()
     104              :     }
     105              : 
     106            0 :     fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
     107            0 :         todo!()
     108              :     }
     109              : 
     110            0 :     fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
     111            0 :         todo!()
     112              :     }
     113              : 
     114            0 :     fn init_event_set(&self, _wp: &mut WalProposer) {
     115            0 :         todo!()
     116              :     }
     117              : 
     118            0 :     fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
     119            0 :         todo!()
     120              :     }
     121              : 
     122            0 :     fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
     123            0 :         todo!()
     124              :     }
     125              : 
     126            0 :     fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
     127            0 :         todo!()
     128              :     }
     129              : 
     130            0 :     fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
     131            0 :         todo!()
     132              :     }
     133              : 
     134            0 :     fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
     135            0 :         todo!()
     136              :     }
     137              : 
     138            0 :     fn strong_random(&self, _buf: &mut [u8]) -> bool {
     139            0 :         todo!()
     140              :     }
     141              : 
     142            0 :     fn get_redo_start_lsn(&self) -> u64 {
     143            0 :         todo!()
     144              :     }
     145              : 
     146            0 :     fn finish_sync_safekeepers(&self, _lsn: u64) {
     147            0 :         todo!()
     148              :     }
     149              : 
     150            0 :     fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer, _sk: &mut Safekeeper) {
     151            0 :         todo!()
     152              :     }
     153              : 
     154            0 :     fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
     155            0 :         todo!()
     156              :     }
     157              : 
     158            0 :     fn after_election(&self, _wp: &mut WalProposer) {
     159            0 :         todo!()
     160              :     }
     161              : }
     162              : 
     163              : #[derive(Debug)]
     164              : pub enum WaitResult {
     165              :     Latch,
     166              :     Timeout,
     167              :     Network(*mut Safekeeper, u32),
     168              : }
     169              : 
     170              : #[derive(Clone)]
     171              : pub struct Config {
     172              :     /// Tenant and timeline id
     173              :     pub ttid: TenantTimelineId,
     174              :     /// List of safekeepers in format `host:port`
     175              :     pub safekeepers_list: Vec<String>,
     176              :     /// Safekeeper reconnect timeout in milliseconds
     177              :     pub safekeeper_reconnect_timeout: i32,
     178              :     /// Safekeeper connection timeout in milliseconds
     179              :     pub safekeeper_connection_timeout: i32,
     180              :     /// walproposer mode, finish when all safekeepers are synced or subscribe
     181              :     /// to WAL streaming
     182              :     pub sync_safekeepers: bool,
     183              : }
     184              : 
     185              : /// WalProposer main struct. C methods are reexported as Rust functions.
     186              : pub struct Wrapper {
     187              :     wp: *mut WalProposer,
     188              :     _safekeepers_list_vec: Vec<u8>,
     189              : }
     190              : 
     191              : impl Wrapper {
     192         8793 :     pub fn new(api: Box<dyn ApiImpl>, config: Config) -> Wrapper {
     193         8793 :         let neon_tenant = CString::new(config.ttid.tenant_id.to_string())
     194         8793 :             .unwrap()
     195         8793 :             .into_raw();
     196         8793 :         let neon_timeline = CString::new(config.ttid.timeline_id.to_string())
     197         8793 :             .unwrap()
     198         8793 :             .into_raw();
     199         8793 : 
     200         8793 :         let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
     201         8793 :             .unwrap()
     202         8793 :             .into_bytes_with_nul();
     203         8793 :         assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
     204         8793 :         let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
     205         8793 : 
     206         8793 :         let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
     207         8793 : 
     208         8793 :         let c_config = WalProposerConfig {
     209         8793 :             neon_tenant,
     210         8793 :             neon_timeline,
     211         8793 :             safekeepers_list,
     212         8793 :             safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
     213         8793 :             safekeeper_connection_timeout: config.safekeeper_connection_timeout,
     214         8793 :             wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
     215         8793 :             syncSafekeepers: config.sync_safekeepers,
     216         8793 :             systemId: 0,
     217         8793 :             pgTimeline: 1,
     218         8793 :             proto_version: 3,
     219         8793 :             callback_data,
     220         8793 :         };
     221         8793 :         let c_config = Box::into_raw(Box::new(c_config));
     222         8793 : 
     223         8793 :         let api = create_api();
     224         8793 :         let wp = unsafe { WalProposerCreate(c_config, api) };
     225         8793 :         Wrapper {
     226         8793 :             wp,
     227         8793 :             _safekeepers_list_vec: safekeepers_list_vec,
     228         8793 :         }
     229         8793 :     }
     230              : 
     231         8793 :     pub fn start(&self) {
     232         8793 :         unsafe { WalProposerStart(self.wp) }
     233         8793 :     }
     234              : }
     235              : 
     236              : impl Drop for Wrapper {
     237         8791 :     fn drop(&mut self) {
     238         8791 :         unsafe {
     239         8791 :             let config = (*self.wp).config;
     240         8791 :             drop(Box::from_raw(
     241         8791 :                 (*config).callback_data as *mut Box<dyn ApiImpl>,
     242         8791 :             ));
     243         8791 :             drop(CString::from_raw((*config).neon_tenant));
     244         8791 :             drop(CString::from_raw((*config).neon_timeline));
     245         8791 :             drop(Box::from_raw(config));
     246              : 
     247        26371 :             for i in 0..(*self.wp).n_safekeepers {
     248        26371 :                 let sk = &mut (*self.wp).safekeeper[i as usize];
     249        26371 :                 take_vec_u8(&mut sk.inbuf);
     250        26371 :             }
     251              : 
     252         8791 :             WalProposerFree(self.wp);
     253         8791 :         }
     254         8791 :     }
     255              : }
     256              : 
     257              : pub struct StreamingCallback {
     258              :     wp: *mut WalProposer,
     259              : }
     260              : 
     261              : impl StreamingCallback {
     262          160 :     pub fn new(wp: *mut WalProposer) -> StreamingCallback {
     263          160 :         StreamingCallback { wp }
     264          160 :     }
     265              : 
     266          506 :     pub fn broadcast(&self, startpos: Lsn, endpos: Lsn) {
     267          506 :         unsafe { WalProposerBroadcast(self.wp, startpos.0, endpos.0) }
     268          506 :     }
     269              : 
     270          506 :     pub fn poll(&self) {
     271          506 :         unsafe { WalProposerPoll(self.wp) }
     272          506 :     }
     273              : }
     274              : 
     275              : #[cfg(test)]
     276              : mod tests {
     277              :     use core::panic;
     278              :     use std::cell::{Cell, UnsafeCell};
     279              :     use std::ffi::CString;
     280              :     use std::sync::atomic::AtomicUsize;
     281              :     use std::sync::mpsc::sync_channel;
     282              : 
     283              :     use utils::id::TenantTimelineId;
     284              : 
     285              :     use super::ApiImpl;
     286              :     use crate::api_bindings::Level;
     287              :     use crate::bindings::{NeonWALReadResult, PG_VERSION_NUM};
     288              :     use crate::walproposer::Wrapper;
     289              : 
     290              :     #[derive(Clone, Copy, Debug)]
     291              :     struct WaitEventsData {
     292              :         sk: *mut crate::bindings::Safekeeper,
     293              :         event_mask: u32,
     294              :     }
     295              : 
     296              :     struct MockImpl {
     297              :         // data to return from wait_event_set
     298              :         wait_events: Cell<WaitEventsData>,
     299              :         // walproposer->safekeeper messages
     300              :         expected_messages: Vec<Vec<u8>>,
     301              :         expected_ptr: AtomicUsize,
     302              :         // safekeeper->walproposer messages
     303              :         safekeeper_replies: Vec<Vec<u8>>,
     304              :         replies_ptr: AtomicUsize,
     305              :         // channel to send LSN to the main thread
     306              :         sync_channel: std::sync::mpsc::SyncSender<u64>,
     307              :         // Shmem state, used for storing donor info
     308              :         shmem: UnsafeCell<crate::bindings::WalproposerShmemState>,
     309              :     }
     310              : 
     311              :     impl MockImpl {
     312            2 :         fn check_walproposer_msg(&self, msg: &[u8]) {
     313            2 :             let ptr = self
     314            2 :                 .expected_ptr
     315            2 :                 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
     316            2 : 
     317            2 :             if ptr >= self.expected_messages.len() {
     318            0 :                 panic!("unexpected message from walproposer");
     319            2 :             }
     320            2 : 
     321            2 :             let expected_msg = &self.expected_messages[ptr];
     322            2 :             assert_eq!(msg, expected_msg.as_slice());
     323            2 :         }
     324              : 
     325            2 :         fn next_safekeeper_reply(&self) -> &[u8] {
     326            2 :             let ptr = self
     327            2 :                 .replies_ptr
     328            2 :                 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
     329            2 : 
     330            2 :             if ptr >= self.safekeeper_replies.len() {
     331            0 :                 panic!("no more safekeeper replies");
     332            2 :             }
     333            2 : 
     334            2 :             &self.safekeeper_replies[ptr]
     335            2 :         }
     336              :     }
     337              : 
     338              :     impl ApiImpl for MockImpl {
     339            2 :         fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
     340            2 :             self.shmem.get()
     341            2 :         }
     342              : 
     343           14 :         fn get_current_timestamp(&self) -> i64 {
     344           14 :             println!("get_current_timestamp");
     345           14 :             0
     346           14 :         }
     347              : 
     348            0 :         fn update_donor(&self, donor: &mut crate::bindings::Safekeeper, donor_lsn: u64) {
     349            0 :             let mut shmem = unsafe { *self.get_shmem_state() };
     350            0 :             shmem.propEpochStartLsn.value = donor_lsn;
     351            0 :             shmem.donor_conninfo = donor.conninfo;
     352            0 :             shmem.donor_lsn = donor_lsn;
     353            0 :         }
     354              : 
     355            1 :         fn conn_status(
     356            1 :             &self,
     357            1 :             _: &mut crate::bindings::Safekeeper,
     358            1 :         ) -> crate::bindings::WalProposerConnStatusType {
     359            1 :             println!("conn_status");
     360            1 :             crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
     361            1 :         }
     362              : 
     363            1 :         fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) {
     364            1 :             println!("conn_connect_start");
     365            1 :         }
     366              : 
     367            1 :         fn conn_connect_poll(
     368            1 :             &self,
     369            1 :             _: &mut crate::bindings::Safekeeper,
     370            1 :         ) -> crate::bindings::WalProposerConnectPollStatusType {
     371            1 :             println!("conn_connect_poll");
     372            1 :             crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
     373            1 :         }
     374              : 
     375            1 :         fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
     376            1 :             println!("conn_send_query: {}", query);
     377            1 :             true
     378            1 :         }
     379              : 
     380            1 :         fn conn_get_query_result(
     381            1 :             &self,
     382            1 :             _: &mut crate::bindings::Safekeeper,
     383            1 :         ) -> crate::bindings::WalProposerExecStatusType {
     384            1 :             println!("conn_get_query_result");
     385            1 :             crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
     386            1 :         }
     387              : 
     388            2 :         fn conn_async_read(
     389            2 :             &self,
     390            2 :             _: &mut crate::bindings::Safekeeper,
     391            2 :             vec: &mut Vec<u8>,
     392            2 :         ) -> crate::bindings::PGAsyncReadResult {
     393            2 :             println!("conn_async_read");
     394            2 :             let reply = self.next_safekeeper_reply();
     395            2 :             println!("conn_async_read result: {:?}", reply);
     396            2 :             vec.extend_from_slice(reply);
     397            2 :             crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
     398            2 :         }
     399              : 
     400            2 :         fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
     401            2 :             println!("conn_blocking_write: {:?}", buf);
     402            2 :             self.check_walproposer_msg(buf);
     403            2 :             true
     404            2 :         }
     405              : 
     406            1 :         fn recovery_download(
     407            1 :             &self,
     408            1 :             _wp: &mut crate::bindings::WalProposer,
     409            1 :             _sk: &mut crate::bindings::Safekeeper,
     410            1 :         ) -> bool {
     411            1 :             true
     412            1 :         }
     413              : 
     414            0 :         fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
     415            0 :             println!("wal_reader_allocate");
     416            0 :             crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
     417            0 :         }
     418              : 
     419            1 :         fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
     420            1 :             println!("init_event_set")
     421            1 :         }
     422              : 
     423            3 :         fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
     424            3 :             println!(
     425            3 :                 "update_event_set, sk={:?}, events_mask={:#b}",
     426            3 :                 sk as *mut crate::bindings::Safekeeper, event_mask
     427            3 :             );
     428            3 :             self.wait_events.set(WaitEventsData { sk, event_mask });
     429            3 :         }
     430              : 
     431            2 :         fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
     432            2 :             println!(
     433            2 :                 "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
     434            2 :                 sk as *mut crate::bindings::Safekeeper, event_mask
     435            2 :             );
     436            2 :             self.wait_events.set(WaitEventsData { sk, event_mask });
     437            2 :         }
     438              : 
     439            1 :         fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
     440            1 :             println!(
     441            1 :                 "rm_safekeeper_event_set, sk={:?}",
     442            1 :                 sk as *mut crate::bindings::Safekeeper
     443            1 :             );
     444            1 :         }
     445              : 
     446            4 :         fn wait_event_set(
     447            4 :             &self,
     448            4 :             _: &mut crate::bindings::WalProposer,
     449            4 :             timeout_millis: i64,
     450            4 :         ) -> super::WaitResult {
     451            4 :             let data = self.wait_events.get();
     452            4 :             println!(
     453            4 :                 "wait_event_set, timeout_millis={}, res={:?}",
     454            4 :                 timeout_millis, data
     455            4 :             );
     456            4 :             super::WaitResult::Network(data.sk, data.event_mask)
     457            4 :         }
     458              : 
     459            0 :         fn strong_random(&self, buf: &mut [u8]) -> bool {
     460            0 :             println!("strong_random");
     461            0 :             buf.fill(0);
     462            0 :             true
     463            0 :         }
     464              : 
     465            1 :         fn finish_sync_safekeepers(&self, lsn: u64) {
     466            1 :             self.sync_channel.send(lsn).unwrap();
     467            1 :             panic!("sync safekeepers finished at lsn={}", lsn);
     468              :         }
     469              : 
     470           11 :         fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
     471           11 :             println!("wp_log[{}] {}", level, msg);
     472           11 :         }
     473              : 
     474            1 :         fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
     475            1 :             println!("after_election");
     476            1 :         }
     477              :     }
     478              : 
     479              :     /// Test that walproposer can successfully connect to safekeeper and finish
     480              :     /// sync_safekeepers. API is mocked in MockImpl.
     481              :     ///
     482              :     /// Run this test with valgrind to detect leaks:
     483              :     /// `valgrind --leak-check=full target/debug/deps/walproposer-<build>`
     484              :     #[test]
     485            1 :     fn test_simple_sync_safekeepers() -> anyhow::Result<()> {
     486            1 :         let ttid = TenantTimelineId::new(
     487            1 :             "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
     488            1 :             "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
     489              :         );
     490              : 
     491            1 :         let (sender, receiver) = sync_channel(1);
     492            1 : 
     493            1 :         // Messages definitions are at walproposer.h
     494            1 :         // xxx: it would be better to extract them from safekeeper crate and
     495            1 :         // use serialization/deserialization here.
     496            1 :         let greeting_tag = (b'g').to_be_bytes();
     497            1 :         let tenant_id = CString::new(ttid.tenant_id.to_string())
     498            1 :             .unwrap()
     499            1 :             .into_bytes_with_nul();
     500            1 :         let timeline_id = CString::new(ttid.timeline_id.to_string())
     501            1 :             .unwrap()
     502            1 :             .into_bytes_with_nul();
     503            1 :         let mconf_gen = 0_u32.to_be_bytes();
     504            1 :         let mconf_members_len = 0_u32.to_be_bytes();
     505            1 :         let mconf_members_new_len = 0_u32.to_be_bytes();
     506            1 :         let pg_version: [u8; 4] = PG_VERSION_NUM.to_be_bytes();
     507            1 :         let system_id = 0_u64.to_be_bytes();
     508            1 :         let wal_seg_size = 16777216_u32.to_be_bytes();
     509            1 : 
     510            1 :         let proposer_greeting = [
     511            1 :             greeting_tag.as_slice(),
     512            1 :             tenant_id.as_slice(),
     513            1 :             timeline_id.as_slice(),
     514            1 :             mconf_gen.as_slice(),
     515            1 :             mconf_members_len.as_slice(),
     516            1 :             mconf_members_new_len.as_slice(),
     517            1 :             pg_version.as_slice(),
     518            1 :             system_id.as_slice(),
     519            1 :             wal_seg_size.as_slice(),
     520            1 :         ]
     521            1 :         .concat();
     522            1 : 
     523            1 :         let voting_tag = (b'v').to_be_bytes();
     524            1 :         let vote_request_term = 3_u64.to_be_bytes();
     525            1 :         let vote_request = [
     526            1 :             voting_tag.as_slice(),
     527            1 :             mconf_gen.as_slice(),
     528            1 :             vote_request_term.as_slice(),
     529            1 :         ]
     530            1 :         .concat();
     531            1 : 
     532            1 :         let acceptor_greeting_term = 2_u64.to_be_bytes();
     533            1 :         let acceptor_greeting_node_id = 1_u64.to_be_bytes();
     534            1 :         let acceptor_greeting = [
     535            1 :             greeting_tag.as_slice(),
     536            1 :             acceptor_greeting_node_id.as_slice(),
     537            1 :             mconf_gen.as_slice(),
     538            1 :             mconf_members_len.as_slice(),
     539            1 :             mconf_members_new_len.as_slice(),
     540            1 :             acceptor_greeting_term.as_slice(),
     541            1 :         ]
     542            1 :         .concat();
     543            1 : 
     544            1 :         let vote_response_term = 3_u64.to_be_bytes();
     545            1 :         let vote_given = 1_u8.to_be_bytes();
     546            1 :         let flush_lsn = 0x539_u64.to_be_bytes();
     547            1 :         let truncate_lsn = 0x539_u64.to_be_bytes();
     548            1 :         let th_len = 1_u32.to_be_bytes();
     549            1 :         let th_term = 2_u64.to_be_bytes();
     550            1 :         let th_lsn = 0x539_u64.to_be_bytes();
     551            1 :         let vote_response = [
     552            1 :             voting_tag.as_slice(),
     553            1 :             mconf_gen.as_slice(),
     554            1 :             vote_response_term.as_slice(),
     555            1 :             vote_given.as_slice(),
     556            1 :             flush_lsn.as_slice(),
     557            1 :             truncate_lsn.as_slice(),
     558            1 :             th_len.as_slice(),
     559            1 :             th_term.as_slice(),
     560            1 :             th_lsn.as_slice(),
     561            1 :         ]
     562            1 :         .concat();
     563            1 : 
     564            1 :         let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
     565            1 :             wait_events: Cell::new(WaitEventsData {
     566            1 :                 sk: std::ptr::null_mut(),
     567            1 :                 event_mask: 0,
     568            1 :             }),
     569            1 :             expected_messages: vec![proposer_greeting, vote_request],
     570            1 :             expected_ptr: AtomicUsize::new(0),
     571            1 :             safekeeper_replies: vec![acceptor_greeting, vote_response],
     572            1 :             replies_ptr: AtomicUsize::new(0),
     573            1 :             sync_channel: sender,
     574            1 :             shmem: UnsafeCell::new(crate::api_bindings::empty_shmem()),
     575            1 :         });
     576            1 :         let config = crate::walproposer::Config {
     577            1 :             ttid,
     578            1 :             safekeepers_list: vec!["localhost:5000".to_string()],
     579            1 :             safekeeper_reconnect_timeout: 1000,
     580            1 :             safekeeper_connection_timeout: 10000,
     581            1 :             sync_safekeepers: true,
     582            1 :         };
     583            1 : 
     584            1 :         let wp = Wrapper::new(my_impl, config);
     585            1 : 
     586            1 :         // walproposer will panic when it finishes sync_safekeepers
     587            1 :         std::panic::catch_unwind(|| wp.start()).unwrap_err();
     588            1 :         // validate the resulting LSN
     589            1 :         assert_eq!(receiver.try_recv(), Ok(1337));
     590            1 :         Ok(())
     591              :         // drop() will free up resources here
     592            1 :     }
     593              : }
        

Generated by: LCOV version 2.1-beta