LCOV - code coverage report
Current view: top level - libs/walproposer/src - walproposer.rs (source / functions) Coverage Total Hit
Test: 75747cdbffeb0b6d2a2a311584368de68cd9aadc.info Lines: 73.8 % 344 254
Test Date: 2024-06-24 06:52:57 Functions: 37.7 % 77 29

            Line data    Source code
       1              : use std::ffi::CString;
       2              : 
       3              : use crate::{
       4              :     api_bindings::{create_api, take_vec_u8, Level},
       5              :     bindings::{
       6              :         NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
       7              :         WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
       8              :     },
       9              : };
      10              : use postgres_ffi::WAL_SEGMENT_SIZE;
      11              : use utils::{id::TenantTimelineId, lsn::Lsn};
      12              : 
      13              : /// Rust high-level wrapper for C walproposer API. Many methods are not required
      14              : /// for simple cases, hence todo!() in default implementations.
      15              : ///
      16              : /// Refer to `pgxn/neon/walproposer.h` for documentation.
      17              : pub trait ApiImpl {
      18            0 :     fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
      19            0 :         todo!()
      20              :     }
      21              : 
      22            0 :     fn start_streaming(&self, _startpos: u64, _callback: &StreamingCallback) {
      23            0 :         todo!()
      24              :     }
      25              : 
      26            0 :     fn get_flush_rec_ptr(&self) -> u64 {
      27            0 :         todo!()
      28              :     }
      29              : 
      30            0 :     fn update_donor(&self, _donor: &mut Safekeeper, _donor_lsn: u64) {
      31            0 :         todo!()
      32              :     }
      33              : 
      34            0 :     fn get_current_timestamp(&self) -> i64 {
      35            0 :         todo!()
      36              :     }
      37              : 
      38            0 :     fn conn_error_message(&self, _sk: &mut Safekeeper) -> String {
      39            0 :         todo!()
      40              :     }
      41              : 
      42            0 :     fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType {
      43            0 :         todo!()
      44              :     }
      45              : 
      46            0 :     fn conn_connect_start(&self, _sk: &mut Safekeeper) {
      47            0 :         todo!()
      48              :     }
      49              : 
      50            0 :     fn conn_connect_poll(
      51            0 :         &self,
      52            0 :         _sk: &mut Safekeeper,
      53            0 :     ) -> crate::bindings::WalProposerConnectPollStatusType {
      54            0 :         todo!()
      55              :     }
      56              : 
      57            0 :     fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool {
      58            0 :         todo!()
      59              :     }
      60              : 
      61            0 :     fn conn_get_query_result(
      62            0 :         &self,
      63            0 :         _sk: &mut Safekeeper,
      64            0 :     ) -> crate::bindings::WalProposerExecStatusType {
      65            0 :         todo!()
      66              :     }
      67              : 
      68            0 :     fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 {
      69            0 :         todo!()
      70              :     }
      71              : 
      72            0 :     fn conn_finish(&self, _sk: &mut Safekeeper) {
      73            0 :         todo!()
      74              :     }
      75              : 
      76            0 :     fn conn_async_read(
      77            0 :         &self,
      78            0 :         _sk: &mut Safekeeper,
      79            0 :         _vec: &mut Vec<u8>,
      80            0 :     ) -> crate::bindings::PGAsyncReadResult {
      81            0 :         todo!()
      82              :     }
      83              : 
      84            0 :     fn conn_async_write(
      85            0 :         &self,
      86            0 :         _sk: &mut Safekeeper,
      87            0 :         _buf: &[u8],
      88            0 :     ) -> crate::bindings::PGAsyncWriteResult {
      89            0 :         todo!()
      90              :     }
      91              : 
      92            0 :     fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool {
      93            0 :         todo!()
      94              :     }
      95              : 
      96            0 :     fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
      97            0 :         todo!()
      98              :     }
      99              : 
     100            0 :     fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
     101            0 :         todo!()
     102              :     }
     103              : 
     104            0 :     fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
     105            0 :         todo!()
     106              :     }
     107              : 
     108            0 :     fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
     109            0 :         todo!()
     110              :     }
     111              : 
     112            0 :     fn init_event_set(&self, _wp: &mut WalProposer) {
     113            0 :         todo!()
     114              :     }
     115              : 
     116            0 :     fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
     117            0 :         todo!()
     118              :     }
     119              : 
     120            0 :     fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
     121            0 :         todo!()
     122              :     }
     123              : 
     124            0 :     fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
     125            0 :         todo!()
     126              :     }
     127              : 
     128            0 :     fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
     129            0 :         todo!()
     130              :     }
     131              : 
     132            0 :     fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
     133            0 :         todo!()
     134              :     }
     135              : 
     136            0 :     fn strong_random(&self, _buf: &mut [u8]) -> bool {
     137            0 :         todo!()
     138              :     }
     139              : 
     140            0 :     fn get_redo_start_lsn(&self) -> u64 {
     141            0 :         todo!()
     142              :     }
     143              : 
     144            0 :     fn finish_sync_safekeepers(&self, _lsn: u64) {
     145            0 :         todo!()
     146              :     }
     147              : 
     148            0 :     fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer, _sk: &mut Safekeeper) {
     149            0 :         todo!()
     150              :     }
     151              : 
     152            0 :     fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
     153            0 :         todo!()
     154              :     }
     155              : 
     156            0 :     fn after_election(&self, _wp: &mut WalProposer) {
     157            0 :         todo!()
     158              :     }
     159              : }
     160              : 
     161              : #[derive(Debug)]
     162              : pub enum WaitResult {
     163              :     Latch,
     164              :     Timeout,
     165              :     Network(*mut Safekeeper, u32),
     166              : }
     167              : 
     168              : #[derive(Clone)]
     169              : pub struct Config {
     170              :     /// Tenant and timeline id
     171              :     pub ttid: TenantTimelineId,
     172              :     /// List of safekeepers in format `host:port`
     173              :     pub safekeepers_list: Vec<String>,
     174              :     /// Safekeeper reconnect timeout in milliseconds
     175              :     pub safekeeper_reconnect_timeout: i32,
     176              :     /// Safekeeper connection timeout in milliseconds
     177              :     pub safekeeper_connection_timeout: i32,
     178              :     /// walproposer mode, finish when all safekeepers are synced or subscribe
     179              :     /// to WAL streaming
     180              :     pub sync_safekeepers: bool,
     181              : }
     182              : 
     183              : /// WalProposer main struct. C methods are reexported as Rust functions.
     184              : pub struct Wrapper {
     185              :     wp: *mut WalProposer,
     186              :     _safekeepers_list_vec: Vec<u8>,
     187              : }
     188              : 
     189              : impl Wrapper {
     190        74424 :     pub fn new(api: Box<dyn ApiImpl>, config: Config) -> Wrapper {
     191        74424 :         let neon_tenant = CString::new(config.ttid.tenant_id.to_string())
     192        74424 :             .unwrap()
     193        74424 :             .into_raw();
     194        74424 :         let neon_timeline = CString::new(config.ttid.timeline_id.to_string())
     195        74424 :             .unwrap()
     196        74424 :             .into_raw();
     197        74424 : 
     198        74424 :         let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
     199        74424 :             .unwrap()
     200        74424 :             .into_bytes_with_nul();
     201        74424 :         assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
     202        74424 :         let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
     203        74424 : 
     204        74424 :         let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
     205        74424 : 
     206        74424 :         let c_config = WalProposerConfig {
     207        74424 :             neon_tenant,
     208        74424 :             neon_timeline,
     209        74424 :             safekeepers_list,
     210        74424 :             safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
     211        74424 :             safekeeper_connection_timeout: config.safekeeper_connection_timeout,
     212        74424 :             wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
     213        74424 :             syncSafekeepers: config.sync_safekeepers,
     214        74424 :             systemId: 0,
     215        74424 :             pgTimeline: 1,
     216        74424 :             callback_data,
     217        74424 :         };
     218        74424 :         let c_config = Box::into_raw(Box::new(c_config));
     219        74424 : 
     220        74424 :         let api = create_api();
     221        74424 :         let wp = unsafe { WalProposerCreate(c_config, api) };
     222        74424 :         Wrapper {
     223        74424 :             wp,
     224        74424 :             _safekeepers_list_vec: safekeepers_list_vec,
     225        74424 :         }
     226        74424 :     }
     227              : 
     228        74424 :     pub fn start(&self) {
     229        74424 :         unsafe { WalProposerStart(self.wp) }
     230        74424 :     }
     231              : }
     232              : 
     233              : impl Drop for Wrapper {
     234        74420 :     fn drop(&mut self) {
     235        74420 :         unsafe {
     236        74420 :             let config = (*self.wp).config;
     237        74420 :             drop(Box::from_raw(
     238        74420 :                 (*config).callback_data as *mut Box<dyn ApiImpl>,
     239        74420 :             ));
     240        74420 :             drop(CString::from_raw((*config).neon_tenant));
     241        74420 :             drop(CString::from_raw((*config).neon_timeline));
     242        74420 :             drop(Box::from_raw(config));
     243              : 
     244       223256 :             for i in 0..(*self.wp).n_safekeepers {
     245       223256 :                 let sk = &mut (*self.wp).safekeeper[i as usize];
     246       223256 :                 take_vec_u8(&mut sk.inbuf);
     247       223256 :             }
     248              : 
     249        74420 :             WalProposerFree(self.wp);
     250        74420 :         }
     251        74420 :     }
     252              : }
     253              : 
     254              : pub struct StreamingCallback {
     255              :     wp: *mut WalProposer,
     256              : }
     257              : 
     258              : impl StreamingCallback {
     259         1473 :     pub fn new(wp: *mut WalProposer) -> StreamingCallback {
     260         1473 :         StreamingCallback { wp }
     261         1473 :     }
     262              : 
     263         3279 :     pub fn broadcast(&self, startpos: Lsn, endpos: Lsn) {
     264         3279 :         unsafe { WalProposerBroadcast(self.wp, startpos.0, endpos.0) }
     265         3279 :     }
     266              : 
     267         3279 :     pub fn poll(&self) {
     268         3279 :         unsafe { WalProposerPoll(self.wp) }
     269         3279 :     }
     270              : }
     271              : 
     272              : #[cfg(test)]
     273              : mod tests {
     274              :     use core::panic;
     275              :     use std::{
     276              :         cell::Cell,
     277              :         sync::{atomic::AtomicUsize, mpsc::sync_channel},
     278              :     };
     279              : 
     280              :     use std::cell::UnsafeCell;
     281              :     use utils::id::TenantTimelineId;
     282              : 
     283              :     use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper};
     284              : 
     285              :     use super::ApiImpl;
     286              : 
     287              :     #[derive(Clone, Copy, Debug)]
     288              :     struct WaitEventsData {
     289              :         sk: *mut crate::bindings::Safekeeper,
     290              :         event_mask: u32,
     291              :     }
     292              : 
     293              :     struct MockImpl {
     294              :         // data to return from wait_event_set
     295              :         wait_events: Cell<WaitEventsData>,
     296              :         // walproposer->safekeeper messages
     297              :         expected_messages: Vec<Vec<u8>>,
     298              :         expected_ptr: AtomicUsize,
     299              :         // safekeeper->walproposer messages
     300              :         safekeeper_replies: Vec<Vec<u8>>,
     301              :         replies_ptr: AtomicUsize,
     302              :         // channel to send LSN to the main thread
     303              :         sync_channel: std::sync::mpsc::SyncSender<u64>,
     304              :         // Shmem state, used for storing donor info
     305              :         shmem: UnsafeCell<crate::bindings::WalproposerShmemState>,
     306              :     }
     307              : 
     308              :     impl MockImpl {
     309            4 :         fn check_walproposer_msg(&self, msg: &[u8]) {
     310            4 :             let ptr = self
     311            4 :                 .expected_ptr
     312            4 :                 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
     313            4 : 
     314            4 :             if ptr >= self.expected_messages.len() {
     315            0 :                 panic!("unexpected message from walproposer");
     316            4 :             }
     317            4 : 
     318            4 :             let expected_msg = &self.expected_messages[ptr];
     319            4 :             assert_eq!(msg, expected_msg.as_slice());
     320            4 :         }
     321              : 
     322            4 :         fn next_safekeeper_reply(&self) -> &[u8] {
     323            4 :             let ptr = self
     324            4 :                 .replies_ptr
     325            4 :                 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
     326            4 : 
     327            4 :             if ptr >= self.safekeeper_replies.len() {
     328            0 :                 panic!("no more safekeeper replies");
     329            4 :             }
     330            4 : 
     331            4 :             &self.safekeeper_replies[ptr]
     332            4 :         }
     333              :     }
     334              : 
     335              :     impl ApiImpl for MockImpl {
     336            4 :         fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
     337            4 :             self.shmem.get()
     338            4 :         }
     339              : 
     340           28 :         fn get_current_timestamp(&self) -> i64 {
     341           28 :             println!("get_current_timestamp");
     342           28 :             0
     343           28 :         }
     344              : 
     345            0 :         fn update_donor(&self, donor: &mut crate::bindings::Safekeeper, donor_lsn: u64) {
     346            0 :             let mut shmem = unsafe { *self.get_shmem_state() };
     347            0 :             shmem.propEpochStartLsn.value = donor_lsn;
     348            0 :             shmem.donor_conninfo = donor.conninfo;
     349            0 :             shmem.donor_lsn = donor_lsn;
     350            0 :         }
     351              : 
     352            2 :         fn conn_status(
     353            2 :             &self,
     354            2 :             _: &mut crate::bindings::Safekeeper,
     355            2 :         ) -> crate::bindings::WalProposerConnStatusType {
     356            2 :             println!("conn_status");
     357            2 :             crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
     358            2 :         }
     359              : 
     360            2 :         fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) {
     361            2 :             println!("conn_connect_start");
     362            2 :         }
     363              : 
     364            2 :         fn conn_connect_poll(
     365            2 :             &self,
     366            2 :             _: &mut crate::bindings::Safekeeper,
     367            2 :         ) -> crate::bindings::WalProposerConnectPollStatusType {
     368            2 :             println!("conn_connect_poll");
     369            2 :             crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
     370            2 :         }
     371              : 
     372            2 :         fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
     373            2 :             println!("conn_send_query: {}", query);
     374            2 :             true
     375            2 :         }
     376              : 
     377            2 :         fn conn_get_query_result(
     378            2 :             &self,
     379            2 :             _: &mut crate::bindings::Safekeeper,
     380            2 :         ) -> crate::bindings::WalProposerExecStatusType {
     381            2 :             println!("conn_get_query_result");
     382            2 :             crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
     383            2 :         }
     384              : 
     385            4 :         fn conn_async_read(
     386            4 :             &self,
     387            4 :             _: &mut crate::bindings::Safekeeper,
     388            4 :             vec: &mut Vec<u8>,
     389            4 :         ) -> crate::bindings::PGAsyncReadResult {
     390            4 :             println!("conn_async_read");
     391            4 :             let reply = self.next_safekeeper_reply();
     392            4 :             println!("conn_async_read result: {:?}", reply);
     393            4 :             vec.extend_from_slice(reply);
     394            4 :             crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
     395            4 :         }
     396              : 
     397            4 :         fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
     398            4 :             println!("conn_blocking_write: {:?}", buf);
     399            4 :             self.check_walproposer_msg(buf);
     400            4 :             true
     401            4 :         }
     402              : 
     403            2 :         fn recovery_download(
     404            2 :             &self,
     405            2 :             _wp: &mut crate::bindings::WalProposer,
     406            2 :             _sk: &mut crate::bindings::Safekeeper,
     407            2 :         ) -> bool {
     408            2 :             true
     409            2 :         }
     410              : 
     411            0 :         fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
     412            0 :             println!("wal_reader_allocate");
     413            0 :             crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
     414            0 :         }
     415              : 
     416            2 :         fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
     417            2 :             println!("init_event_set")
     418            2 :         }
     419              : 
     420            8 :         fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
     421            8 :             println!(
     422            8 :                 "update_event_set, sk={:?}, events_mask={:#b}",
     423            8 :                 sk as *mut crate::bindings::Safekeeper, event_mask
     424            8 :             );
     425            8 :             self.wait_events.set(WaitEventsData { sk, event_mask });
     426            8 :         }
     427              : 
     428            4 :         fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
     429            4 :             println!(
     430            4 :                 "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
     431            4 :                 sk as *mut crate::bindings::Safekeeper, event_mask
     432            4 :             );
     433            4 :             self.wait_events.set(WaitEventsData { sk, event_mask });
     434            4 :         }
     435              : 
     436            2 :         fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
     437            2 :             println!(
     438            2 :                 "rm_safekeeper_event_set, sk={:?}",
     439            2 :                 sk as *mut crate::bindings::Safekeeper
     440            2 :             );
     441            2 :         }
     442              : 
     443            8 :         fn wait_event_set(
     444            8 :             &self,
     445            8 :             _: &mut crate::bindings::WalProposer,
     446            8 :             timeout_millis: i64,
     447            8 :         ) -> super::WaitResult {
     448            8 :             let data = self.wait_events.get();
     449            8 :             println!(
     450            8 :                 "wait_event_set, timeout_millis={}, res={:?}",
     451            8 :                 timeout_millis, data
     452            8 :             );
     453            8 :             super::WaitResult::Network(data.sk, data.event_mask)
     454            8 :         }
     455              : 
     456            2 :         fn strong_random(&self, buf: &mut [u8]) -> bool {
     457            2 :             println!("strong_random");
     458            2 :             buf.fill(0);
     459            2 :             true
     460            2 :         }
     461              : 
     462            2 :         fn finish_sync_safekeepers(&self, lsn: u64) {
     463            2 :             self.sync_channel.send(lsn).unwrap();
     464            2 :             panic!("sync safekeepers finished at lsn={}", lsn);
     465              :         }
     466              : 
     467           14 :         fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
     468           14 :             println!("wp_log[{}] {}", level, msg);
     469           14 :         }
     470              : 
     471            2 :         fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
     472            2 :             println!("after_election");
     473            2 :         }
     474              :     }
     475              : 
     476              :     /// Test that walproposer can successfully connect to safekeeper and finish
     477              :     /// sync_safekeepers. API is mocked in MockImpl.
     478              :     ///
     479              :     /// Run this test with valgrind to detect leaks:
     480              :     /// `valgrind --leak-check=full target/debug/deps/walproposer-<build>`
     481              :     #[test]
     482            2 :     fn test_simple_sync_safekeepers() -> anyhow::Result<()> {
     483            2 :         let ttid = TenantTimelineId::new(
     484            2 :             "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
     485            2 :             "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
     486              :         );
     487              : 
     488            2 :         let (sender, receiver) = sync_channel(1);
     489            2 : 
     490            2 :         let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
     491            2 :             wait_events: Cell::new(WaitEventsData {
     492            2 :                 sk: std::ptr::null_mut(),
     493            2 :                 event_mask: 0,
     494            2 :             }),
     495            2 :             expected_messages: vec![
     496            2 :                 // TODO: When updating Postgres versions, this test will cause
     497            2 :                 // problems. Postgres version in message needs updating.
     498            2 :                 //
     499            2 :                 // Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160003, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
     500            2 :                 vec![
     501            2 :                     103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 3, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
     502            2 :                     0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110,
     503            2 :                     147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147,
     504            2 :                     188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1,
     505            2 :                 ],
     506            2 :                 // VoteRequest(VoteRequest { term: 3 })
     507            2 :                 vec![
     508            2 :                     118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
     509            2 :                     0, 0, 0, 0, 0, 0,
     510            2 :                 ],
     511            2 :             ],
     512            2 :             expected_ptr: AtomicUsize::new(0),
     513            2 :             safekeeper_replies: vec![
     514            2 :                 // Greeting(AcceptorGreeting { term: 2, node_id: NodeId(1) })
     515            2 :                 vec![
     516            2 :                     103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
     517            2 :                 ],
     518            2 :                 // VoteResponse(VoteResponse { term: 3, vote_given: 1, flush_lsn: 0/539, truncate_lsn: 0/539, term_history: [(2, 0/539)], timeline_start_lsn: 0/539 })
     519            2 :                 vec![
     520            2 :                     118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 57,
     521            2 :                     5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0,
     522            2 :                     0, 57, 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0,
     523            2 :                 ],
     524            2 :             ],
     525            2 :             replies_ptr: AtomicUsize::new(0),
     526            2 :             sync_channel: sender,
     527            2 :             shmem: UnsafeCell::new(crate::api_bindings::empty_shmem()),
     528            2 :         });
     529            2 :         let config = crate::walproposer::Config {
     530            2 :             ttid,
     531            2 :             safekeepers_list: vec!["localhost:5000".to_string()],
     532            2 :             safekeeper_reconnect_timeout: 1000,
     533            2 :             safekeeper_connection_timeout: 10000,
     534            2 :             sync_safekeepers: true,
     535            2 :         };
     536            2 : 
     537            2 :         let wp = Wrapper::new(my_impl, config);
     538            2 : 
     539            2 :         // walproposer will panic when it finishes sync_safekeepers
     540            2 :         std::panic::catch_unwind(|| wp.start()).unwrap_err();
     541            2 :         // validate the resulting LSN
     542            2 :         assert_eq!(receiver.try_recv(), Ok(1337));
     543            2 :         Ok(())
     544              :         // drop() will free up resources here
     545            2 :     }
     546              : }
        

Generated by: LCOV version 2.1-beta