LCOV - code coverage report
Current view: top level - libs/walproposer/src - walproposer.rs (source / functions) Coverage Total Hit
Test: 1d5975439f3c9882b18414799141ebf9a3922c58.info Lines: 74.3 % 373 277
Test Date: 2025-07-31 15:59:03 Functions: 40.2 % 87 35

            Line data    Source code
       1              : #![allow(clippy::todo)]
       2              : 
       3              : use std::ffi::CString;
       4              : use std::str::FromStr;
       5              : 
       6              : use postgres_ffi::WAL_SEGMENT_SIZE;
       7              : use utils::id::TenantTimelineId;
       8              : use utils::lsn::Lsn;
       9              : 
      10              : use crate::api_bindings::{Level, create_api, take_vec_u8};
      11              : use crate::bindings::{
      12              :     NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
      13              :     WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
      14              : };
      15              : 
      16              : /// Rust high-level wrapper for C walproposer API. Many methods are not required
      17              : /// for simple cases, hence todo!() in default implementations.
      18              : ///
      19              : /// Refer to `pgxn/neon/walproposer.h` for documentation.
      20              : pub trait ApiImpl {
      21            0 :     fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
      22            0 :         todo!()
      23              :     }
      24              : 
      25            0 :     fn start_streaming(&self, _startpos: u64, _callback: &StreamingCallback) {
      26            0 :         todo!()
      27              :     }
      28              : 
      29            0 :     fn get_flush_rec_ptr(&self) -> u64 {
      30            0 :         todo!()
      31              :     }
      32              : 
      33            0 :     fn update_donor(&self, _donor: &mut Safekeeper, _donor_lsn: u64) {
      34            0 :         todo!()
      35              :     }
      36              : 
      37            0 :     fn get_current_timestamp(&self) -> i64 {
      38            0 :         todo!()
      39              :     }
      40              : 
      41            0 :     fn conn_error_message(&self, _sk: &mut Safekeeper) -> String {
      42            0 :         todo!()
      43              :     }
      44              : 
      45            0 :     fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType {
      46            0 :         todo!()
      47              :     }
      48              : 
      49            0 :     fn conn_connect_start(&self, _sk: &mut Safekeeper) {
      50            0 :         todo!()
      51              :     }
      52              : 
      53            0 :     fn conn_connect_poll(
      54            0 :         &self,
      55            0 :         _sk: &mut Safekeeper,
      56            0 :     ) -> crate::bindings::WalProposerConnectPollStatusType {
      57            0 :         todo!()
      58              :     }
      59              : 
      60            0 :     fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool {
      61            0 :         todo!()
      62              :     }
      63              : 
      64            0 :     fn conn_get_query_result(
      65            0 :         &self,
      66            0 :         _sk: &mut Safekeeper,
      67            0 :     ) -> crate::bindings::WalProposerExecStatusType {
      68            0 :         todo!()
      69              :     }
      70              : 
      71            0 :     fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 {
      72            0 :         todo!()
      73              :     }
      74              : 
      75            0 :     fn conn_finish(&self, _sk: &mut Safekeeper) {
      76            0 :         todo!()
      77              :     }
      78              : 
      79            0 :     fn conn_async_read(
      80            0 :         &self,
      81            0 :         _sk: &mut Safekeeper,
      82            0 :         _vec: &mut Vec<u8>,
      83            0 :     ) -> crate::bindings::PGAsyncReadResult {
      84            0 :         todo!()
      85              :     }
      86              : 
      87            0 :     fn conn_async_write(
      88            0 :         &self,
      89            0 :         _sk: &mut Safekeeper,
      90            0 :         _buf: &[u8],
      91            0 :     ) -> crate::bindings::PGAsyncWriteResult {
      92            0 :         todo!()
      93              :     }
      94              : 
      95            0 :     fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool {
      96            0 :         todo!()
      97              :     }
      98              : 
      99            0 :     fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
     100            0 :         todo!()
     101              :     }
     102              : 
     103            0 :     fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
     104            0 :         todo!()
     105              :     }
     106              : 
     107            0 :     fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
     108            0 :         todo!()
     109              :     }
     110              : 
     111            0 :     fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
     112            0 :         todo!()
     113              :     }
     114              : 
     115            0 :     fn init_event_set(&self, _wp: &mut WalProposer) {
     116            0 :         todo!()
     117              :     }
     118              : 
     119            0 :     fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
     120            0 :         todo!()
     121              :     }
     122              : 
     123            0 :     fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
     124            0 :         todo!()
     125              :     }
     126              : 
     127            0 :     fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
     128            0 :         todo!()
     129              :     }
     130              : 
     131            0 :     fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
     132            0 :         todo!()
     133              :     }
     134              : 
     135            0 :     fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
     136            0 :         todo!()
     137              :     }
     138              : 
     139            0 :     fn strong_random(&self, _buf: &mut [u8]) -> bool {
     140            0 :         todo!()
     141              :     }
     142              : 
     143            0 :     fn get_redo_start_lsn(&self) -> u64 {
     144            0 :         todo!()
     145              :     }
     146              : 
     147            0 :     fn finish_sync_safekeepers(&self, _lsn: u64) -> ! {
     148            0 :         todo!()
     149              :     }
     150              : 
     151            0 :     fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer, _sk: &mut Safekeeper) {
     152            0 :         todo!()
     153              :     }
     154              : 
     155            0 :     fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
     156            0 :         todo!()
     157              :     }
     158              : 
     159            0 :     fn after_election(&self, _wp: &mut WalProposer) {
     160            0 :         todo!()
     161              :     }
     162              : 
     163              :     /* BEGIN_HADRON */
     164         9028 :     fn reset_safekeeper_statuses_for_metrics(&self, _wp: &mut WalProposer, _num_safekeepers: u32) {
     165              :         // Do nothing for testing purposes.
     166            1 :     }
     167              : 
     168         9500 :     fn update_safekeeper_status_for_metrics(
     169         9500 :         &self,
     170         9500 :         _wp: &mut WalProposer,
     171         9500 :         _sk_index: u32,
     172         9500 :         _status: u8,
     173         9500 :     ) {
     174              :         // Do nothing for testing purposes.
     175            0 :     }
     176              :     /* END_HADRON */
     177              : }
     178              : 
     179              : #[derive(Debug)]
     180              : pub enum WaitResult {
     181              :     Latch,
     182              :     Timeout,
     183              :     Network(*mut Safekeeper, u32),
     184              : }
     185              : 
     186              : #[derive(Clone)]
     187              : pub struct Config {
     188              :     /// Tenant and timeline id
     189              :     pub ttid: TenantTimelineId,
     190              :     /// List of safekeepers in format `host:port`
     191              :     pub safekeepers_list: Vec<String>,
     192              :     /// libpq connection info options
     193              :     pub safekeeper_conninfo_options: String,
     194              :     /// Safekeeper reconnect timeout in milliseconds
     195              :     pub safekeeper_reconnect_timeout: i32,
     196              :     /// Safekeeper connection timeout in milliseconds
     197              :     pub safekeeper_connection_timeout: i32,
     198              :     /// walproposer mode, finish when all safekeepers are synced or subscribe
     199              :     /// to WAL streaming
     200              :     pub sync_safekeepers: bool,
     201              : }
     202              : 
     203              : /// WalProposer main struct. C methods are reexported as Rust functions.
     204              : pub struct Wrapper {
     205              :     wp: *mut WalProposer,
     206              :     _safekeepers_list_vec: Vec<u8>,
     207              : }
     208              : 
     209              : impl Wrapper {
     210         9028 :     pub fn new(api: Box<dyn ApiImpl>, config: Config) -> Wrapper {
     211         9028 :         let neon_tenant = CString::new(config.ttid.tenant_id.to_string())
     212         9028 :             .unwrap()
     213         9028 :             .into_raw();
     214         9028 :         let neon_timeline = CString::new(config.ttid.timeline_id.to_string())
     215         9028 :             .unwrap()
     216         9028 :             .into_raw();
     217              : 
     218         9028 :         let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
     219         9028 :             .unwrap()
     220         9028 :             .into_bytes_with_nul();
     221         9028 :         assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
     222         9028 :         let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
     223         9028 :         let safekeeper_conninfo_options = CString::from_str(&config.safekeeper_conninfo_options)
     224         9028 :             .unwrap()
     225         9028 :             .into_raw();
     226              : 
     227         9028 :         let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
     228              : 
     229         9028 :         let c_config = WalProposerConfig {
     230         9028 :             neon_tenant,
     231         9028 :             neon_timeline,
     232         9028 :             safekeepers_list,
     233         9028 :             safekeeper_conninfo_options,
     234         9028 :             safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
     235         9028 :             safekeeper_connection_timeout: config.safekeeper_connection_timeout,
     236         9028 :             wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
     237         9028 :             syncSafekeepers: config.sync_safekeepers,
     238         9028 :             systemId: 0,
     239         9028 :             pgTimeline: 1,
     240         9028 :             proto_version: 3,
     241         9028 :             callback_data,
     242         9028 :         };
     243         9028 :         let c_config = Box::into_raw(Box::new(c_config));
     244              : 
     245         9028 :         let api = create_api();
     246         9028 :         let wp = unsafe { WalProposerCreate(c_config, api) };
     247         9028 :         Wrapper {
     248         9028 :             wp,
     249         9028 :             _safekeepers_list_vec: safekeepers_list_vec,
     250         9028 :         }
     251         9028 :     }
     252              : 
     253         9028 :     pub fn start(&self) {
     254         9028 :         unsafe { WalProposerStart(self.wp) }
     255         9028 :     }
     256              : }
     257              : 
     258              : impl Drop for Wrapper {
     259         9026 :     fn drop(&mut self) {
     260              :         unsafe {
     261         9026 :             let config = (*self.wp).config;
     262         9026 :             drop(Box::from_raw(
     263         9026 :                 (*config).callback_data as *mut Box<dyn ApiImpl>,
     264              :             ));
     265         9026 :             drop(CString::from_raw((*config).neon_tenant));
     266         9026 :             drop(CString::from_raw((*config).neon_timeline));
     267         9026 :             drop(Box::from_raw(config));
     268              : 
     269        27076 :             for i in 0..(*self.wp).n_safekeepers {
     270        27076 :                 let sk = &mut (*self.wp).safekeeper[i as usize];
     271        27076 :                 take_vec_u8(&mut sk.inbuf);
     272        27076 :             }
     273              : 
     274         9026 :             WalProposerFree(self.wp);
     275              :         }
     276         9026 :     }
     277              : }
     278              : 
     279              : pub struct StreamingCallback {
     280              :     wp: *mut WalProposer,
     281              : }
     282              : 
     283              : impl StreamingCallback {
     284          152 :     pub fn new(wp: *mut WalProposer) -> StreamingCallback {
     285          152 :         StreamingCallback { wp }
     286          152 :     }
     287              : 
     288          498 :     pub fn broadcast(&self, startpos: Lsn, endpos: Lsn) {
     289          498 :         unsafe { WalProposerBroadcast(self.wp, startpos.0, endpos.0) }
     290          498 :     }
     291              : 
     292          498 :     pub fn poll(&self) {
     293          498 :         unsafe { WalProposerPoll(self.wp) }
     294          498 :     }
     295              : }
     296              : 
     297              : #[cfg(test)]
     298              : mod tests {
     299              :     use core::panic;
     300              :     use std::cell::{Cell, UnsafeCell};
     301              :     use std::ffi::CString;
     302              :     use std::sync::atomic::AtomicUsize;
     303              :     use std::sync::mpsc::sync_channel;
     304              : 
     305              :     use utils::id::TenantTimelineId;
     306              : 
     307              :     use super::ApiImpl;
     308              :     use crate::api_bindings::Level;
     309              :     use crate::bindings::{NeonWALReadResult, PG_VERSION_NUM};
     310              :     use crate::walproposer::Wrapper;
     311              : 
     312              :     #[derive(Clone, Copy, Debug)]
     313              :     struct WaitEventsData {
     314              :         sk: *mut crate::bindings::Safekeeper,
     315              :         event_mask: u32,
     316              :     }
     317              : 
     318              :     struct MockImpl {
     319              :         // data to return from wait_event_set
     320              :         wait_events: Cell<WaitEventsData>,
     321              :         // walproposer->safekeeper messages
     322              :         expected_messages: Vec<Vec<u8>>,
     323              :         expected_ptr: AtomicUsize,
     324              :         // safekeeper->walproposer messages
     325              :         safekeeper_replies: Vec<Vec<u8>>,
     326              :         replies_ptr: AtomicUsize,
     327              :         // channel to send LSN to the main thread
     328              :         sync_channel: std::sync::mpsc::SyncSender<u64>,
     329              :         // Shmem state, used for storing donor info
     330              :         shmem: UnsafeCell<crate::bindings::WalproposerShmemState>,
     331              :     }
     332              : 
     333              :     impl MockImpl {
     334            2 :         fn check_walproposer_msg(&self, msg: &[u8]) {
     335            2 :             let ptr = self
     336            2 :                 .expected_ptr
     337            2 :                 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
     338              : 
     339            2 :             if ptr >= self.expected_messages.len() {
     340            0 :                 panic!("unexpected message from walproposer");
     341            2 :             }
     342              : 
     343            2 :             let expected_msg = &self.expected_messages[ptr];
     344            2 :             assert_eq!(msg, expected_msg.as_slice());
     345            2 :         }
     346              : 
     347            2 :         fn next_safekeeper_reply(&self) -> &[u8] {
     348            2 :             let ptr = self
     349            2 :                 .replies_ptr
     350            2 :                 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
     351              : 
     352            2 :             if ptr >= self.safekeeper_replies.len() {
     353            0 :                 panic!("no more safekeeper replies");
     354            2 :             }
     355              : 
     356            2 :             &self.safekeeper_replies[ptr]
     357            2 :         }
     358              :     }
     359              : 
     360              :     impl ApiImpl for MockImpl {
     361            2 :         fn get_shmem_state(&self) -> *mut crate::bindings::WalproposerShmemState {
     362            2 :             self.shmem.get()
     363            2 :         }
     364              : 
     365           14 :         fn get_current_timestamp(&self) -> i64 {
     366           14 :             println!("get_current_timestamp");
     367           14 :             0
     368           14 :         }
     369              : 
     370            0 :         fn update_donor(&self, donor: &mut crate::bindings::Safekeeper, donor_lsn: u64) {
     371            0 :             let mut shmem = unsafe { *self.get_shmem_state() };
     372            0 :             shmem.propEpochStartLsn.value = donor_lsn;
     373            0 :             shmem.donor_conninfo = donor.conninfo;
     374            0 :             shmem.donor_lsn = donor_lsn;
     375            0 :         }
     376              : 
     377            1 :         fn conn_status(
     378            1 :             &self,
     379            1 :             _: &mut crate::bindings::Safekeeper,
     380            1 :         ) -> crate::bindings::WalProposerConnStatusType {
     381            1 :             println!("conn_status");
     382            1 :             crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
     383            1 :         }
     384              : 
     385            1 :         fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) {
     386            1 :             println!("conn_connect_start");
     387            1 :         }
     388              : 
     389            1 :         fn conn_connect_poll(
     390            1 :             &self,
     391            1 :             _: &mut crate::bindings::Safekeeper,
     392            1 :         ) -> crate::bindings::WalProposerConnectPollStatusType {
     393            1 :             println!("conn_connect_poll");
     394            1 :             crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
     395            1 :         }
     396              : 
     397            1 :         fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
     398            1 :             println!("conn_send_query: {query}");
     399            1 :             true
     400            1 :         }
     401              : 
     402            1 :         fn conn_get_query_result(
     403            1 :             &self,
     404            1 :             _: &mut crate::bindings::Safekeeper,
     405            1 :         ) -> crate::bindings::WalProposerExecStatusType {
     406            1 :             println!("conn_get_query_result");
     407            1 :             crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
     408            1 :         }
     409              : 
     410            2 :         fn conn_async_read(
     411            2 :             &self,
     412            2 :             _: &mut crate::bindings::Safekeeper,
     413            2 :             vec: &mut Vec<u8>,
     414            2 :         ) -> crate::bindings::PGAsyncReadResult {
     415            2 :             println!("conn_async_read");
     416            2 :             let reply = self.next_safekeeper_reply();
     417            2 :             println!("conn_async_read result: {reply:?}");
     418            2 :             vec.extend_from_slice(reply);
     419            2 :             crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
     420            2 :         }
     421              : 
     422            2 :         fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
     423            2 :             println!("conn_blocking_write: {buf:?}");
     424            2 :             self.check_walproposer_msg(buf);
     425            2 :             true
     426            2 :         }
     427              : 
     428            1 :         fn recovery_download(
     429            1 :             &self,
     430            1 :             _wp: &mut crate::bindings::WalProposer,
     431            1 :             _sk: &mut crate::bindings::Safekeeper,
     432            1 :         ) -> bool {
     433            1 :             true
     434            1 :         }
     435              : 
     436            0 :         fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
     437            0 :             println!("wal_reader_allocate");
     438            0 :             crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
     439            0 :         }
     440              : 
     441            1 :         fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
     442            1 :             println!("init_event_set")
     443            1 :         }
     444              : 
     445            3 :         fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
     446            3 :             println!(
     447            3 :                 "update_event_set, sk={:?}, events_mask={:#b}",
     448            3 :                 sk as *mut crate::bindings::Safekeeper, event_mask
     449              :             );
     450            3 :             self.wait_events.set(WaitEventsData { sk, event_mask });
     451            3 :         }
     452              : 
     453            2 :         fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
     454            2 :             println!(
     455            2 :                 "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
     456            2 :                 sk as *mut crate::bindings::Safekeeper, event_mask
     457              :             );
     458            2 :             self.wait_events.set(WaitEventsData { sk, event_mask });
     459            2 :         }
     460              : 
     461            1 :         fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
     462            1 :             println!(
     463            1 :                 "rm_safekeeper_event_set, sk={:?}",
     464            1 :                 sk as *mut crate::bindings::Safekeeper
     465              :             );
     466            1 :         }
     467              : 
     468            4 :         fn wait_event_set(
     469            4 :             &self,
     470            4 :             _: &mut crate::bindings::WalProposer,
     471            4 :             timeout_millis: i64,
     472            4 :         ) -> super::WaitResult {
     473            4 :             let data = self.wait_events.get();
     474            4 :             println!("wait_event_set, timeout_millis={timeout_millis}, res={data:?}");
     475            4 :             super::WaitResult::Network(data.sk, data.event_mask)
     476            4 :         }
     477              : 
     478            0 :         fn strong_random(&self, buf: &mut [u8]) -> bool {
     479            0 :             println!("strong_random");
     480            0 :             buf.fill(0);
     481            0 :             true
     482            0 :         }
     483              : 
     484            1 :         fn finish_sync_safekeepers(&self, lsn: u64) -> ! {
     485            1 :             self.sync_channel.send(lsn).unwrap();
     486            1 :             panic!("sync safekeepers finished at lsn={}", lsn);
     487              :         }
     488              : 
     489           12 :         fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
     490           12 :             println!("wp_log[{level}] {msg}");
     491           12 :         }
     492              : 
     493            1 :         fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
     494            1 :             println!("after_election");
     495            1 :         }
     496              :     }
     497              : 
     498              :     /// Test that walproposer can successfully connect to safekeeper and finish
     499              :     /// sync_safekeepers. API is mocked in MockImpl.
     500              :     ///
     501              :     /// Run this test with valgrind to detect leaks:
     502              :     /// `valgrind --leak-check=full target/debug/deps/walproposer-<build>`
     503              :     #[test]
     504            1 :     fn test_simple_sync_safekeepers() -> anyhow::Result<()> {
     505            1 :         let ttid = TenantTimelineId::new(
     506            1 :             "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
     507            1 :             "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
     508              :         );
     509              : 
     510            1 :         let (sender, receiver) = sync_channel(1);
     511              : 
     512              :         // Messages definitions are at walproposer.h
     513              :         // xxx: it would be better to extract them from safekeeper crate and
     514              :         // use serialization/deserialization here.
     515            1 :         let greeting_tag = (b'g').to_be_bytes();
     516            1 :         let tenant_id = CString::new(ttid.tenant_id.to_string())
     517            1 :             .unwrap()
     518            1 :             .into_bytes_with_nul();
     519            1 :         let timeline_id = CString::new(ttid.timeline_id.to_string())
     520            1 :             .unwrap()
     521            1 :             .into_bytes_with_nul();
     522            1 :         let mconf_gen = 0_u32.to_be_bytes();
     523            1 :         let mconf_members_len = 0_u32.to_be_bytes();
     524            1 :         let mconf_members_new_len = 0_u32.to_be_bytes();
     525            1 :         let pg_version: [u8; 4] = PG_VERSION_NUM.to_be_bytes();
     526            1 :         let system_id = 0_u64.to_be_bytes();
     527            1 :         let wal_seg_size = 16777216_u32.to_be_bytes();
     528              : 
     529            1 :         let proposer_greeting = [
     530            1 :             greeting_tag.as_slice(),
     531            1 :             tenant_id.as_slice(),
     532            1 :             timeline_id.as_slice(),
     533            1 :             mconf_gen.as_slice(),
     534            1 :             mconf_members_len.as_slice(),
     535            1 :             mconf_members_new_len.as_slice(),
     536            1 :             pg_version.as_slice(),
     537            1 :             system_id.as_slice(),
     538            1 :             wal_seg_size.as_slice(),
     539            1 :         ]
     540            1 :         .concat();
     541              : 
     542            1 :         let voting_tag = (b'v').to_be_bytes();
     543            1 :         let vote_request_term = 3_u64.to_be_bytes();
     544            1 :         let vote_request = [
     545            1 :             voting_tag.as_slice(),
     546            1 :             mconf_gen.as_slice(),
     547            1 :             vote_request_term.as_slice(),
     548            1 :         ]
     549            1 :         .concat();
     550              : 
     551            1 :         let acceptor_greeting_term = 2_u64.to_be_bytes();
     552            1 :         let acceptor_greeting_node_id = 1_u64.to_be_bytes();
     553            1 :         let acceptor_greeting = [
     554            1 :             greeting_tag.as_slice(),
     555            1 :             acceptor_greeting_node_id.as_slice(),
     556            1 :             mconf_gen.as_slice(),
     557            1 :             mconf_members_len.as_slice(),
     558            1 :             mconf_members_new_len.as_slice(),
     559            1 :             acceptor_greeting_term.as_slice(),
     560            1 :         ]
     561            1 :         .concat();
     562              : 
     563            1 :         let vote_response_term = 3_u64.to_be_bytes();
     564            1 :         let vote_given = 1_u8.to_be_bytes();
     565            1 :         let flush_lsn = 0x539_u64.to_be_bytes();
     566            1 :         let truncate_lsn = 0x539_u64.to_be_bytes();
     567            1 :         let th_len = 1_u32.to_be_bytes();
     568            1 :         let th_term = 2_u64.to_be_bytes();
     569            1 :         let th_lsn = 0x539_u64.to_be_bytes();
     570            1 :         let vote_response = [
     571            1 :             voting_tag.as_slice(),
     572            1 :             mconf_gen.as_slice(),
     573            1 :             vote_response_term.as_slice(),
     574            1 :             vote_given.as_slice(),
     575            1 :             flush_lsn.as_slice(),
     576            1 :             truncate_lsn.as_slice(),
     577            1 :             th_len.as_slice(),
     578            1 :             th_term.as_slice(),
     579            1 :             th_lsn.as_slice(),
     580            1 :         ]
     581            1 :         .concat();
     582              : 
     583            1 :         let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
     584            1 :             wait_events: Cell::new(WaitEventsData {
     585            1 :                 sk: std::ptr::null_mut(),
     586            1 :                 event_mask: 0,
     587            1 :             }),
     588            1 :             expected_messages: vec![proposer_greeting, vote_request],
     589            1 :             expected_ptr: AtomicUsize::new(0),
     590            1 :             safekeeper_replies: vec![acceptor_greeting, vote_response],
     591            1 :             replies_ptr: AtomicUsize::new(0),
     592            1 :             sync_channel: sender,
     593            1 :             shmem: UnsafeCell::new(crate::api_bindings::empty_shmem()),
     594            1 :         });
     595            1 :         let config = crate::walproposer::Config {
     596            1 :             ttid,
     597            1 :             safekeepers_list: vec!["localhost:5000".to_string()],
     598            1 :             safekeeper_conninfo_options: String::new(),
     599            1 :             safekeeper_reconnect_timeout: 1000,
     600            1 :             safekeeper_connection_timeout: 10000,
     601            1 :             sync_safekeepers: true,
     602            1 :         };
     603              : 
     604            1 :         let wp = Wrapper::new(my_impl, config);
     605              : 
     606              :         // walproposer will panic when it finishes sync_safekeepers
     607            1 :         std::panic::catch_unwind(|| wp.start()).unwrap_err();
     608              :         // validate the resulting LSN
     609            1 :         assert_eq!(receiver.try_recv(), Ok(1337));
     610            1 :         Ok(())
     611              :         // drop() will free up resources here
     612            1 :     }
     613              : }
        

Generated by: LCOV version 2.1-beta