LCOV - differential code coverage report
Current view: top level - libs/walproposer/src - walproposer.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 68.0 % 350 238 112 238
Current Date: 2024-01-09 02:06:09 Functions: 42.4 % 59 25 34 25
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::ffi::CString;
       2                 : 
       3                 : use postgres_ffi::WAL_SEGMENT_SIZE;
       4                 : use utils::id::TenantTimelineId;
       5                 : 
       6                 : use crate::{
       7                 :     api_bindings::{create_api, take_vec_u8, Level},
       8                 :     bindings::{
       9                 :         NeonWALReadResult, Safekeeper, WalProposer, WalProposerConfig, WalProposerCreate,
      10                 :         WalProposerFree, WalProposerStart,
      11                 :     },
      12                 : };
      13                 : 
      14                 : /// Rust high-level wrapper for C walproposer API. Many methods are not required
      15                 : /// for simple cases, hence todo!() in default implementations.
      16                 : ///
      17                 : /// Refer to `pgxn/neon/walproposer.h` for documentation.
      18                 : pub trait ApiImpl {
      19 UBC           0 :     fn get_shmem_state(&self) -> &mut crate::bindings::WalproposerShmemState {
      20               0 :         todo!()
      21               0 :     }
      22                 : 
      23               0 :     fn start_streaming(&self, _startpos: u64) {
      24               0 :         todo!()
      25               0 :     }
      26                 : 
      27               0 :     fn get_flush_rec_ptr(&self) -> u64 {
      28               0 :         todo!()
      29               0 :     }
      30                 : 
      31               0 :     fn get_current_timestamp(&self) -> i64 {
      32               0 :         todo!()
      33               0 :     }
      34                 : 
      35               0 :     fn conn_error_message(&self, _sk: &mut Safekeeper) -> String {
      36               0 :         todo!()
      37               0 :     }
      38                 : 
      39               0 :     fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType {
      40               0 :         todo!()
      41               0 :     }
      42                 : 
      43               0 :     fn conn_connect_start(&self, _sk: &mut Safekeeper) {
      44               0 :         todo!()
      45               0 :     }
      46                 : 
      47               0 :     fn conn_connect_poll(
      48               0 :         &self,
      49               0 :         _sk: &mut Safekeeper,
      50               0 :     ) -> crate::bindings::WalProposerConnectPollStatusType {
      51               0 :         todo!()
      52               0 :     }
      53                 : 
      54               0 :     fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool {
      55               0 :         todo!()
      56               0 :     }
      57                 : 
      58               0 :     fn conn_get_query_result(
      59               0 :         &self,
      60               0 :         _sk: &mut Safekeeper,
      61               0 :     ) -> crate::bindings::WalProposerExecStatusType {
      62               0 :         todo!()
      63               0 :     }
      64                 : 
      65               0 :     fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 {
      66               0 :         todo!()
      67               0 :     }
      68                 : 
      69               0 :     fn conn_finish(&self, _sk: &mut Safekeeper) {
      70               0 :         todo!()
      71               0 :     }
      72                 : 
      73               0 :     fn conn_async_read(&self, _sk: &mut Safekeeper) -> (&[u8], crate::bindings::PGAsyncReadResult) {
      74               0 :         todo!()
      75               0 :     }
      76                 : 
      77               0 :     fn conn_async_write(
      78               0 :         &self,
      79               0 :         _sk: &mut Safekeeper,
      80               0 :         _buf: &[u8],
      81               0 :     ) -> crate::bindings::PGAsyncWriteResult {
      82               0 :         todo!()
      83               0 :     }
      84                 : 
      85               0 :     fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool {
      86               0 :         todo!()
      87               0 :     }
      88                 : 
      89               0 :     fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
      90               0 :         todo!()
      91               0 :     }
      92                 : 
      93               0 :     fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
      94               0 :         todo!()
      95               0 :     }
      96                 : 
      97               0 :     fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
      98               0 :         todo!()
      99               0 :     }
     100                 : 
     101               0 :     fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
     102               0 :         todo!()
     103               0 :     }
     104                 : 
     105               0 :     fn init_event_set(&self, _wp: &mut WalProposer) {
     106               0 :         todo!()
     107               0 :     }
     108                 : 
     109               0 :     fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
     110               0 :         todo!()
     111               0 :     }
     112                 : 
     113               0 :     fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
     114               0 :         todo!()
     115               0 :     }
     116                 : 
     117               0 :     fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
     118               0 :         todo!()
     119               0 :     }
     120                 : 
     121               0 :     fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
     122               0 :         todo!()
     123               0 :     }
     124                 : 
     125               0 :     fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
     126               0 :         todo!()
     127               0 :     }
     128                 : 
     129               0 :     fn strong_random(&self, _buf: &mut [u8]) -> bool {
     130               0 :         todo!()
     131               0 :     }
     132                 : 
     133               0 :     fn get_redo_start_lsn(&self) -> u64 {
     134               0 :         todo!()
     135               0 :     }
     136                 : 
     137               0 :     fn finish_sync_safekeepers(&self, _lsn: u64) {
     138               0 :         todo!()
     139               0 :     }
     140                 : 
     141               0 :     fn process_safekeeper_feedback(&self, _wp: &mut WalProposer, _commit_lsn: u64) {
     142               0 :         todo!()
     143               0 :     }
     144                 : 
     145               0 :     fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
     146               0 :         todo!()
     147               0 :     }
     148                 : 
     149               0 :     fn after_election(&self, _wp: &mut WalProposer) {
     150               0 :         todo!()
     151               0 :     }
     152                 : }
     153                 : 
     154                 : pub enum WaitResult {
     155                 :     Latch,
     156                 :     Timeout,
     157                 :     Network(*mut Safekeeper, u32),
     158                 : }
     159                 : 
     160                 : pub struct Config {
     161                 :     /// Tenant and timeline id
     162                 :     pub ttid: TenantTimelineId,
     163                 :     /// List of safekeepers in format `host:port`
     164                 :     pub safekeepers_list: Vec<String>,
     165                 :     /// Safekeeper reconnect timeout in milliseconds
     166                 :     pub safekeeper_reconnect_timeout: i32,
     167                 :     /// Safekeeper connection timeout in milliseconds
     168                 :     pub safekeeper_connection_timeout: i32,
     169                 :     /// walproposer mode, finish when all safekeepers are synced or subscribe
     170                 :     /// to WAL streaming
     171                 :     pub sync_safekeepers: bool,
     172                 : }
     173                 : 
     174                 : /// WalProposer main struct. C methods are reexported as Rust functions.
     175                 : pub struct Wrapper {
     176                 :     wp: *mut WalProposer,
     177                 :     _safekeepers_list_vec: Vec<u8>,
     178                 : }
     179                 : 
     180                 : impl Wrapper {
     181 CBC           1 :     pub fn new(api: Box<dyn ApiImpl>, config: Config) -> Wrapper {
     182               1 :         let neon_tenant = CString::new(config.ttid.tenant_id.to_string())
     183               1 :             .unwrap()
     184               1 :             .into_raw();
     185               1 :         let neon_timeline = CString::new(config.ttid.timeline_id.to_string())
     186               1 :             .unwrap()
     187               1 :             .into_raw();
     188               1 : 
     189               1 :         let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
     190               1 :             .unwrap()
     191               1 :             .into_bytes_with_nul();
     192               1 :         assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
     193               1 :         let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
     194               1 : 
     195               1 :         let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
     196               1 : 
     197               1 :         let c_config = WalProposerConfig {
     198               1 :             neon_tenant,
     199               1 :             neon_timeline,
     200               1 :             safekeepers_list,
     201               1 :             safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
     202               1 :             safekeeper_connection_timeout: config.safekeeper_connection_timeout,
     203               1 :             wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
     204               1 :             syncSafekeepers: config.sync_safekeepers,
     205               1 :             systemId: 0,
     206               1 :             pgTimeline: 1,
     207               1 :             callback_data,
     208               1 :         };
     209               1 :         let c_config = Box::into_raw(Box::new(c_config));
     210               1 : 
     211               1 :         let api = create_api();
     212               1 :         let wp = unsafe { WalProposerCreate(c_config, api) };
     213               1 :         Wrapper {
     214               1 :             wp,
     215               1 :             _safekeepers_list_vec: safekeepers_list_vec,
     216               1 :         }
     217               1 :     }
     218                 : 
     219               1 :     pub fn start(&self) {
     220               1 :         unsafe { WalProposerStart(self.wp) }
     221               1 :     }
     222                 : }
     223                 : 
     224                 : impl Drop for Wrapper {
     225               1 :     fn drop(&mut self) {
     226               1 :         unsafe {
     227               1 :             let config = (*self.wp).config;
     228               1 :             drop(Box::from_raw(
     229               1 :                 (*config).callback_data as *mut Box<dyn ApiImpl>,
     230               1 :             ));
     231               1 :             drop(CString::from_raw((*config).neon_tenant));
     232               1 :             drop(CString::from_raw((*config).neon_timeline));
     233               1 :             drop(Box::from_raw(config));
     234                 : 
     235               1 :             for i in 0..(*self.wp).n_safekeepers {
     236               1 :                 let sk = &mut (*self.wp).safekeeper[i as usize];
     237               1 :                 take_vec_u8(&mut sk.inbuf);
     238               1 :             }
     239                 : 
     240               1 :             WalProposerFree(self.wp);
     241               1 :         }
     242               1 :     }
     243                 : }
     244                 : 
     245                 : #[cfg(test)]
     246                 : mod tests {
     247                 :     use core::panic;
     248                 :     use std::{
     249                 :         cell::Cell,
     250                 :         sync::{atomic::AtomicUsize, mpsc::sync_channel},
     251                 :     };
     252                 : 
     253                 :     use utils::id::TenantTimelineId;
     254                 : 
     255                 :     use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper};
     256                 : 
     257                 :     use super::ApiImpl;
     258                 : 
     259               4 :     #[derive(Clone, Copy, Debug)]
     260                 :     struct WaitEventsData {
     261                 :         sk: *mut crate::bindings::Safekeeper,
     262                 :         event_mask: u32,
     263                 :     }
     264                 : 
     265                 :     struct MockImpl {
     266                 :         // data to return from wait_event_set
     267                 :         wait_events: Cell<WaitEventsData>,
     268                 :         // walproposer->safekeeper messages
     269                 :         expected_messages: Vec<Vec<u8>>,
     270                 :         expected_ptr: AtomicUsize,
     271                 :         // safekeeper->walproposer messages
     272                 :         safekeeper_replies: Vec<Vec<u8>>,
     273                 :         replies_ptr: AtomicUsize,
     274                 :         // channel to send LSN to the main thread
     275                 :         sync_channel: std::sync::mpsc::SyncSender<u64>,
     276                 :     }
     277                 : 
     278                 :     impl MockImpl {
     279               2 :         fn check_walproposer_msg(&self, msg: &[u8]) {
     280               2 :             let ptr = self
     281               2 :                 .expected_ptr
     282               2 :                 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
     283               2 : 
     284               2 :             if ptr >= self.expected_messages.len() {
     285 UBC           0 :                 panic!("unexpected message from walproposer");
     286 CBC           2 :             }
     287               2 : 
     288               2 :             let expected_msg = &self.expected_messages[ptr];
     289               2 :             assert_eq!(msg, expected_msg.as_slice());
     290               2 :         }
     291                 : 
     292               2 :         fn next_safekeeper_reply(&self) -> &[u8] {
     293               2 :             let ptr = self
     294               2 :                 .replies_ptr
     295               2 :                 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
     296               2 : 
     297               2 :             if ptr >= self.safekeeper_replies.len() {
     298 UBC           0 :                 panic!("no more safekeeper replies");
     299 CBC           2 :             }
     300               2 : 
     301               2 :             &self.safekeeper_replies[ptr]
     302               2 :         }
     303                 :     }
     304                 : 
     305                 :     impl ApiImpl for MockImpl {
     306              14 :         fn get_current_timestamp(&self) -> i64 {
     307              14 :             println!("get_current_timestamp");
     308              14 :             0
     309              14 :         }
     310                 : 
     311               1 :         fn conn_status(
     312               1 :             &self,
     313               1 :             _: &mut crate::bindings::Safekeeper,
     314               1 :         ) -> crate::bindings::WalProposerConnStatusType {
     315               1 :             println!("conn_status");
     316               1 :             crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
     317               1 :         }
     318                 : 
     319               1 :         fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) {
     320               1 :             println!("conn_connect_start");
     321               1 :         }
     322                 : 
     323               1 :         fn conn_connect_poll(
     324               1 :             &self,
     325               1 :             _: &mut crate::bindings::Safekeeper,
     326               1 :         ) -> crate::bindings::WalProposerConnectPollStatusType {
     327               1 :             println!("conn_connect_poll");
     328               1 :             crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
     329               1 :         }
     330                 : 
     331               1 :         fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
     332               1 :             println!("conn_send_query: {}", query);
     333               1 :             true
     334               1 :         }
     335                 : 
     336               1 :         fn conn_get_query_result(
     337               1 :             &self,
     338               1 :             _: &mut crate::bindings::Safekeeper,
     339               1 :         ) -> crate::bindings::WalProposerExecStatusType {
     340               1 :             println!("conn_get_query_result");
     341               1 :             crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
     342               1 :         }
     343                 : 
     344               2 :         fn conn_async_read(
     345               2 :             &self,
     346               2 :             _: &mut crate::bindings::Safekeeper,
     347               2 :         ) -> (&[u8], crate::bindings::PGAsyncReadResult) {
     348               2 :             println!("conn_async_read");
     349               2 :             let reply = self.next_safekeeper_reply();
     350               2 :             println!("conn_async_read result: {:?}", reply);
     351               2 :             (
     352               2 :                 reply,
     353               2 :                 crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS,
     354               2 :             )
     355               2 :         }
     356                 : 
     357               2 :         fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
     358               2 :             println!("conn_blocking_write: {:?}", buf);
     359               2 :             self.check_walproposer_msg(buf);
     360               2 :             true
     361               2 :         }
     362                 : 
     363               1 :         fn recovery_download(
     364               1 :             &self,
     365               1 :             _wp: &mut crate::bindings::WalProposer,
     366               1 :             _sk: &mut crate::bindings::Safekeeper,
     367               1 :         ) -> bool {
     368               1 :             true
     369               1 :         }
     370                 : 
     371 UBC           0 :         fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
     372               0 :             println!("wal_reader_allocate");
     373               0 :             crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
     374               0 :         }
     375                 : 
     376 CBC           1 :         fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
     377               1 :             println!("init_event_set")
     378               1 :         }
     379                 : 
     380               4 :         fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
     381               4 :             println!(
     382               4 :                 "update_event_set, sk={:?}, events_mask={:#b}",
     383               4 :                 sk as *mut crate::bindings::Safekeeper, event_mask
     384               4 :             );
     385               4 :             self.wait_events.set(WaitEventsData { sk, event_mask });
     386               4 :         }
     387                 : 
     388               2 :         fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
     389               2 :             println!(
     390               2 :                 "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
     391               2 :                 sk as *mut crate::bindings::Safekeeper, event_mask
     392               2 :             );
     393               2 :             self.wait_events.set(WaitEventsData { sk, event_mask });
     394               2 :         }
     395                 : 
     396               1 :         fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
     397               1 :             println!(
     398               1 :                 "rm_safekeeper_event_set, sk={:?}",
     399               1 :                 sk as *mut crate::bindings::Safekeeper
     400               1 :             );
     401               1 :         }
     402                 : 
     403               4 :         fn wait_event_set(
     404               4 :             &self,
     405               4 :             _: &mut crate::bindings::WalProposer,
     406               4 :             timeout_millis: i64,
     407               4 :         ) -> super::WaitResult {
     408               4 :             let data = self.wait_events.get();
     409               4 :             println!(
     410               4 :                 "wait_event_set, timeout_millis={}, res={:?}",
     411               4 :                 timeout_millis, data
     412               4 :             );
     413               4 :             super::WaitResult::Network(data.sk, data.event_mask)
     414               4 :         }
     415                 : 
     416               1 :         fn strong_random(&self, buf: &mut [u8]) -> bool {
     417               1 :             println!("strong_random");
     418               1 :             buf.fill(0);
     419               1 :             true
     420               1 :         }
     421                 : 
     422               1 :         fn finish_sync_safekeepers(&self, lsn: u64) {
     423               1 :             self.sync_channel.send(lsn).unwrap();
     424               1 :             panic!("sync safekeepers finished at lsn={}", lsn);
     425                 :         }
     426                 : 
     427               7 :         fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
     428               7 :             println!("wp_log[{}] {}", level, msg);
     429               7 :         }
     430                 : 
     431 UBC           0 :         fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
     432               0 :             println!("after_election");
     433               0 :         }
     434                 :     }
     435                 : 
     436                 :     /// Test that walproposer can successfully connect to safekeeper and finish
     437                 :     /// sync_safekeepers. API is mocked in MockImpl.
     438                 :     ///
     439                 :     /// Run this test with valgrind to detect leaks:
     440                 :     /// `valgrind --leak-check=full target/debug/deps/walproposer-<build>`
     441 CBC           1 :     #[test]
     442               1 :     fn test_simple_sync_safekeepers() -> anyhow::Result<()> {
     443               1 :         let ttid = TenantTimelineId::new(
     444               1 :             "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
     445               1 :             "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
     446                 :         );
     447                 : 
     448               1 :         let (sender, receiver) = sync_channel(1);
     449               1 : 
     450               1 :         let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
     451               1 :             wait_events: Cell::new(WaitEventsData {
     452               1 :                 sk: std::ptr::null_mut(),
     453               1 :                 event_mask: 0,
     454               1 :             }),
     455               1 :             expected_messages: vec![
     456               1 :                 // Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160001, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
     457               1 :                 vec![
     458               1 :                     103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
     459               1 :                     0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110,
     460               1 :                     147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147,
     461               1 :                     188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1,
     462               1 :                 ],
     463               1 :                 // VoteRequest(VoteRequest { term: 3 })
     464               1 :                 vec![
     465               1 :                     118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
     466               1 :                     0, 0, 0, 0, 0, 0,
     467               1 :                 ],
     468               1 :             ],
     469               1 :             expected_ptr: AtomicUsize::new(0),
     470               1 :             safekeeper_replies: vec![
     471               1 :                 // Greeting(AcceptorGreeting { term: 2, node_id: NodeId(1) })
     472               1 :                 vec![
     473               1 :                     103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
     474               1 :                 ],
     475               1 :                 // VoteResponse(VoteResponse { term: 3, vote_given: 1, flush_lsn: 0/539, truncate_lsn: 0/539, term_history: [(2, 0/539)], timeline_start_lsn: 0/539 })
     476               1 :                 vec![
     477               1 :                     118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 57,
     478               1 :                     5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0,
     479               1 :                     0, 57, 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0,
     480               1 :                 ],
     481               1 :             ],
     482               1 :             replies_ptr: AtomicUsize::new(0),
     483               1 :             sync_channel: sender,
     484               1 :         });
     485               1 :         let config = crate::walproposer::Config {
     486               1 :             ttid,
     487               1 :             safekeepers_list: vec!["localhost:5000".to_string()],
     488               1 :             safekeeper_reconnect_timeout: 1000,
     489               1 :             safekeeper_connection_timeout: 10000,
     490               1 :             sync_safekeepers: true,
     491               1 :         };
     492               1 : 
     493               1 :         let wp = Wrapper::new(my_impl, config);
     494               1 : 
     495               1 :         // walproposer will panic when it finishes sync_safekeepers
     496               1 :         std::panic::catch_unwind(|| wp.start()).unwrap_err();
     497               1 :         // validate the resulting LSN
     498               1 :         assert_eq!(receiver.try_recv(), Ok(1337));
     499               1 :         Ok(())
     500                 :         // drop() will free up resources here
     501               1 :     }
     502                 : }
        

Generated by: LCOV version 2.1-beta