LCOV - code coverage report
Current view: top level - libs/walproposer/src - walproposer.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 68.0 % 350 238
Test Date: 2024-02-07 07:37:29 Functions: 42.4 % 59 25

            Line data    Source code
       1              : use std::ffi::CString;
       2              : 
       3              : use postgres_ffi::WAL_SEGMENT_SIZE;
       4              : use utils::id::TenantTimelineId;
       5              : 
       6              : use crate::{
       7              :     api_bindings::{create_api, take_vec_u8, Level},
       8              :     bindings::{
       9              :         NeonWALReadResult, Safekeeper, WalProposer, WalProposerConfig, WalProposerCreate,
      10              :         WalProposerFree, WalProposerStart,
      11              :     },
      12              : };
      13              : 
      14              : /// Rust high-level wrapper for C walproposer API. Many methods are not required
      15              : /// for simple cases, hence todo!() in default implementations.
      16              : ///
      17              : /// Refer to `pgxn/neon/walproposer.h` for documentation.
      18              : pub trait ApiImpl {
      19            0 :     fn get_shmem_state(&self) -> &mut crate::bindings::WalproposerShmemState {
      20            0 :         todo!()
      21            0 :     }
      22              : 
      23            0 :     fn start_streaming(&self, _startpos: u64) {
      24            0 :         todo!()
      25            0 :     }
      26              : 
      27            0 :     fn get_flush_rec_ptr(&self) -> u64 {
      28            0 :         todo!()
      29            0 :     }
      30              : 
      31            0 :     fn get_current_timestamp(&self) -> i64 {
      32            0 :         todo!()
      33            0 :     }
      34              : 
      35            0 :     fn conn_error_message(&self, _sk: &mut Safekeeper) -> String {
      36            0 :         todo!()
      37            0 :     }
      38              : 
      39            0 :     fn conn_status(&self, _sk: &mut Safekeeper) -> crate::bindings::WalProposerConnStatusType {
      40            0 :         todo!()
      41            0 :     }
      42              : 
      43            0 :     fn conn_connect_start(&self, _sk: &mut Safekeeper) {
      44            0 :         todo!()
      45            0 :     }
      46              : 
      47            0 :     fn conn_connect_poll(
      48            0 :         &self,
      49            0 :         _sk: &mut Safekeeper,
      50            0 :     ) -> crate::bindings::WalProposerConnectPollStatusType {
      51            0 :         todo!()
      52            0 :     }
      53              : 
      54            0 :     fn conn_send_query(&self, _sk: &mut Safekeeper, _query: &str) -> bool {
      55            0 :         todo!()
      56            0 :     }
      57              : 
      58            0 :     fn conn_get_query_result(
      59            0 :         &self,
      60            0 :         _sk: &mut Safekeeper,
      61            0 :     ) -> crate::bindings::WalProposerExecStatusType {
      62            0 :         todo!()
      63            0 :     }
      64              : 
      65            0 :     fn conn_flush(&self, _sk: &mut Safekeeper) -> i32 {
      66            0 :         todo!()
      67            0 :     }
      68              : 
      69            0 :     fn conn_finish(&self, _sk: &mut Safekeeper) {
      70            0 :         todo!()
      71            0 :     }
      72              : 
      73            0 :     fn conn_async_read(&self, _sk: &mut Safekeeper) -> (&[u8], crate::bindings::PGAsyncReadResult) {
      74            0 :         todo!()
      75            0 :     }
      76              : 
      77            0 :     fn conn_async_write(
      78            0 :         &self,
      79            0 :         _sk: &mut Safekeeper,
      80            0 :         _buf: &[u8],
      81            0 :     ) -> crate::bindings::PGAsyncWriteResult {
      82            0 :         todo!()
      83            0 :     }
      84              : 
      85            0 :     fn conn_blocking_write(&self, _sk: &mut Safekeeper, _buf: &[u8]) -> bool {
      86            0 :         todo!()
      87            0 :     }
      88              : 
      89            0 :     fn recovery_download(&self, _wp: &mut WalProposer, _sk: &mut Safekeeper) -> bool {
      90            0 :         todo!()
      91            0 :     }
      92              : 
      93            0 :     fn wal_reader_allocate(&self, _sk: &mut Safekeeper) -> NeonWALReadResult {
      94            0 :         todo!()
      95            0 :     }
      96              : 
      97            0 :     fn wal_read(&self, _sk: &mut Safekeeper, _buf: &mut [u8], _startpos: u64) -> NeonWALReadResult {
      98            0 :         todo!()
      99            0 :     }
     100              : 
     101            0 :     fn wal_reader_events(&self, _sk: &mut Safekeeper) -> u32 {
     102            0 :         todo!()
     103            0 :     }
     104              : 
     105            0 :     fn init_event_set(&self, _wp: &mut WalProposer) {
     106            0 :         todo!()
     107            0 :     }
     108              : 
     109            0 :     fn update_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
     110            0 :         todo!()
     111            0 :     }
     112              : 
     113            0 :     fn active_state_update_event_set(&self, _sk: &mut Safekeeper) {
     114            0 :         todo!()
     115            0 :     }
     116              : 
     117            0 :     fn add_safekeeper_event_set(&self, _sk: &mut Safekeeper, _events_mask: u32) {
     118            0 :         todo!()
     119            0 :     }
     120              : 
     121            0 :     fn rm_safekeeper_event_set(&self, _sk: &mut Safekeeper) {
     122            0 :         todo!()
     123            0 :     }
     124              : 
     125            0 :     fn wait_event_set(&self, _wp: &mut WalProposer, _timeout_millis: i64) -> WaitResult {
     126            0 :         todo!()
     127            0 :     }
     128              : 
     129            0 :     fn strong_random(&self, _buf: &mut [u8]) -> bool {
     130            0 :         todo!()
     131            0 :     }
     132              : 
     133            0 :     fn get_redo_start_lsn(&self) -> u64 {
     134            0 :         todo!()
     135            0 :     }
     136              : 
     137            0 :     fn finish_sync_safekeepers(&self, _lsn: u64) {
     138            0 :         todo!()
     139            0 :     }
     140              : 
     141            0 :     fn process_safekeeper_feedback(&self, _wp: &mut WalProposer, _commit_lsn: u64) {
     142            0 :         todo!()
     143            0 :     }
     144              : 
     145            0 :     fn log_internal(&self, _wp: &mut WalProposer, _level: Level, _msg: &str) {
     146            0 :         todo!()
     147            0 :     }
     148              : 
     149            0 :     fn after_election(&self, _wp: &mut WalProposer) {
     150            0 :         todo!()
     151            0 :     }
     152              : }
     153              : 
     154              : pub enum WaitResult {
     155              :     Latch,
     156              :     Timeout,
     157              :     Network(*mut Safekeeper, u32),
     158              : }
     159              : 
     160              : pub struct Config {
     161              :     /// Tenant and timeline id
     162              :     pub ttid: TenantTimelineId,
     163              :     /// List of safekeepers in format `host:port`
     164              :     pub safekeepers_list: Vec<String>,
     165              :     /// Safekeeper reconnect timeout in milliseconds
     166              :     pub safekeeper_reconnect_timeout: i32,
     167              :     /// Safekeeper connection timeout in milliseconds
     168              :     pub safekeeper_connection_timeout: i32,
     169              :     /// walproposer mode, finish when all safekeepers are synced or subscribe
     170              :     /// to WAL streaming
     171              :     pub sync_safekeepers: bool,
     172              : }
     173              : 
     174              : /// WalProposer main struct. C methods are reexported as Rust functions.
     175              : pub struct Wrapper {
     176              :     wp: *mut WalProposer,
     177              :     _safekeepers_list_vec: Vec<u8>,
     178              : }
     179              : 
     180              : impl Wrapper {
     181            2 :     pub fn new(api: Box<dyn ApiImpl>, config: Config) -> Wrapper {
     182            2 :         let neon_tenant = CString::new(config.ttid.tenant_id.to_string())
     183            2 :             .unwrap()
     184            2 :             .into_raw();
     185            2 :         let neon_timeline = CString::new(config.ttid.timeline_id.to_string())
     186            2 :             .unwrap()
     187            2 :             .into_raw();
     188            2 : 
     189            2 :         let mut safekeepers_list_vec = CString::new(config.safekeepers_list.join(","))
     190            2 :             .unwrap()
     191            2 :             .into_bytes_with_nul();
     192            2 :         assert!(safekeepers_list_vec.len() == safekeepers_list_vec.capacity());
     193            2 :         let safekeepers_list = safekeepers_list_vec.as_mut_ptr() as *mut std::ffi::c_char;
     194            2 : 
     195            2 :         let callback_data = Box::into_raw(Box::new(api)) as *mut ::std::os::raw::c_void;
     196            2 : 
     197            2 :         let c_config = WalProposerConfig {
     198            2 :             neon_tenant,
     199            2 :             neon_timeline,
     200            2 :             safekeepers_list,
     201            2 :             safekeeper_reconnect_timeout: config.safekeeper_reconnect_timeout,
     202            2 :             safekeeper_connection_timeout: config.safekeeper_connection_timeout,
     203            2 :             wal_segment_size: WAL_SEGMENT_SIZE as i32, // default 16MB
     204            2 :             syncSafekeepers: config.sync_safekeepers,
     205            2 :             systemId: 0,
     206            2 :             pgTimeline: 1,
     207            2 :             callback_data,
     208            2 :         };
     209            2 :         let c_config = Box::into_raw(Box::new(c_config));
     210            2 : 
     211            2 :         let api = create_api();
     212            2 :         let wp = unsafe { WalProposerCreate(c_config, api) };
     213            2 :         Wrapper {
     214            2 :             wp,
     215            2 :             _safekeepers_list_vec: safekeepers_list_vec,
     216            2 :         }
     217            2 :     }
     218              : 
     219            2 :     pub fn start(&self) {
     220            2 :         unsafe { WalProposerStart(self.wp) }
     221            2 :     }
     222              : }
     223              : 
     224              : impl Drop for Wrapper {
     225            2 :     fn drop(&mut self) {
     226            2 :         unsafe {
     227            2 :             let config = (*self.wp).config;
     228            2 :             drop(Box::from_raw(
     229            2 :                 (*config).callback_data as *mut Box<dyn ApiImpl>,
     230            2 :             ));
     231            2 :             drop(CString::from_raw((*config).neon_tenant));
     232            2 :             drop(CString::from_raw((*config).neon_timeline));
     233            2 :             drop(Box::from_raw(config));
     234              : 
     235            2 :             for i in 0..(*self.wp).n_safekeepers {
     236            2 :                 let sk = &mut (*self.wp).safekeeper[i as usize];
     237            2 :                 take_vec_u8(&mut sk.inbuf);
     238            2 :             }
     239              : 
     240            2 :             WalProposerFree(self.wp);
     241            2 :         }
     242            2 :     }
     243              : }
     244              : 
     245              : #[cfg(test)]
     246              : mod tests {
     247              :     use core::panic;
     248              :     use std::{
     249              :         cell::Cell,
     250              :         sync::{atomic::AtomicUsize, mpsc::sync_channel},
     251              :     };
     252              : 
     253              :     use utils::id::TenantTimelineId;
     254              : 
     255              :     use crate::{api_bindings::Level, bindings::NeonWALReadResult, walproposer::Wrapper};
     256              : 
     257              :     use super::ApiImpl;
     258              : 
     259            8 :     #[derive(Clone, Copy, Debug)]
     260              :     struct WaitEventsData {
     261              :         sk: *mut crate::bindings::Safekeeper,
     262              :         event_mask: u32,
     263              :     }
     264              : 
     265              :     struct MockImpl {
     266              :         // data to return from wait_event_set
     267              :         wait_events: Cell<WaitEventsData>,
     268              :         // walproposer->safekeeper messages
     269              :         expected_messages: Vec<Vec<u8>>,
     270              :         expected_ptr: AtomicUsize,
     271              :         // safekeeper->walproposer messages
     272              :         safekeeper_replies: Vec<Vec<u8>>,
     273              :         replies_ptr: AtomicUsize,
     274              :         // channel to send LSN to the main thread
     275              :         sync_channel: std::sync::mpsc::SyncSender<u64>,
     276              :     }
     277              : 
     278              :     impl MockImpl {
     279            4 :         fn check_walproposer_msg(&self, msg: &[u8]) {
     280            4 :             let ptr = self
     281            4 :                 .expected_ptr
     282            4 :                 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
     283            4 : 
     284            4 :             if ptr >= self.expected_messages.len() {
     285            0 :                 panic!("unexpected message from walproposer");
     286            4 :             }
     287            4 : 
     288            4 :             let expected_msg = &self.expected_messages[ptr];
     289            4 :             assert_eq!(msg, expected_msg.as_slice());
     290            4 :         }
     291              : 
     292            4 :         fn next_safekeeper_reply(&self) -> &[u8] {
     293            4 :             let ptr = self
     294            4 :                 .replies_ptr
     295            4 :                 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
     296            4 : 
     297            4 :             if ptr >= self.safekeeper_replies.len() {
     298            0 :                 panic!("no more safekeeper replies");
     299            4 :             }
     300            4 : 
     301            4 :             &self.safekeeper_replies[ptr]
     302            4 :         }
     303              :     }
     304              : 
     305              :     impl ApiImpl for MockImpl {
     306           28 :         fn get_current_timestamp(&self) -> i64 {
     307           28 :             println!("get_current_timestamp");
     308           28 :             0
     309           28 :         }
     310              : 
     311            2 :         fn conn_status(
     312            2 :             &self,
     313            2 :             _: &mut crate::bindings::Safekeeper,
     314            2 :         ) -> crate::bindings::WalProposerConnStatusType {
     315            2 :             println!("conn_status");
     316            2 :             crate::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
     317            2 :         }
     318              : 
     319            2 :         fn conn_connect_start(&self, _: &mut crate::bindings::Safekeeper) {
     320            2 :             println!("conn_connect_start");
     321            2 :         }
     322              : 
     323            2 :         fn conn_connect_poll(
     324            2 :             &self,
     325            2 :             _: &mut crate::bindings::Safekeeper,
     326            2 :         ) -> crate::bindings::WalProposerConnectPollStatusType {
     327            2 :             println!("conn_connect_poll");
     328            2 :             crate::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
     329            2 :         }
     330              : 
     331            2 :         fn conn_send_query(&self, _: &mut crate::bindings::Safekeeper, query: &str) -> bool {
     332            2 :             println!("conn_send_query: {}", query);
     333            2 :             true
     334            2 :         }
     335              : 
     336            2 :         fn conn_get_query_result(
     337            2 :             &self,
     338            2 :             _: &mut crate::bindings::Safekeeper,
     339            2 :         ) -> crate::bindings::WalProposerExecStatusType {
     340            2 :             println!("conn_get_query_result");
     341            2 :             crate::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
     342            2 :         }
     343              : 
     344            4 :         fn conn_async_read(
     345            4 :             &self,
     346            4 :             _: &mut crate::bindings::Safekeeper,
     347            4 :         ) -> (&[u8], crate::bindings::PGAsyncReadResult) {
     348            4 :             println!("conn_async_read");
     349            4 :             let reply = self.next_safekeeper_reply();
     350            4 :             println!("conn_async_read result: {:?}", reply);
     351            4 :             (
     352            4 :                 reply,
     353            4 :                 crate::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS,
     354            4 :             )
     355            4 :         }
     356              : 
     357            4 :         fn conn_blocking_write(&self, _: &mut crate::bindings::Safekeeper, buf: &[u8]) -> bool {
     358            4 :             println!("conn_blocking_write: {:?}", buf);
     359            4 :             self.check_walproposer_msg(buf);
     360            4 :             true
     361            4 :         }
     362              : 
     363            2 :         fn recovery_download(
     364            2 :             &self,
     365            2 :             _wp: &mut crate::bindings::WalProposer,
     366            2 :             _sk: &mut crate::bindings::Safekeeper,
     367            2 :         ) -> bool {
     368            2 :             true
     369            2 :         }
     370              : 
     371            0 :         fn wal_reader_allocate(&self, _: &mut crate::bindings::Safekeeper) -> NeonWALReadResult {
     372            0 :             println!("wal_reader_allocate");
     373            0 :             crate::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
     374            0 :         }
     375              : 
     376            2 :         fn init_event_set(&self, _: &mut crate::bindings::WalProposer) {
     377            2 :             println!("init_event_set")
     378            2 :         }
     379              : 
     380            8 :         fn update_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
     381            8 :             println!(
     382            8 :                 "update_event_set, sk={:?}, events_mask={:#b}",
     383            8 :                 sk as *mut crate::bindings::Safekeeper, event_mask
     384            8 :             );
     385            8 :             self.wait_events.set(WaitEventsData { sk, event_mask });
     386            8 :         }
     387              : 
     388            4 :         fn add_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper, event_mask: u32) {
     389            4 :             println!(
     390            4 :                 "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
     391            4 :                 sk as *mut crate::bindings::Safekeeper, event_mask
     392            4 :             );
     393            4 :             self.wait_events.set(WaitEventsData { sk, event_mask });
     394            4 :         }
     395              : 
     396            2 :         fn rm_safekeeper_event_set(&self, sk: &mut crate::bindings::Safekeeper) {
     397            2 :             println!(
     398            2 :                 "rm_safekeeper_event_set, sk={:?}",
     399            2 :                 sk as *mut crate::bindings::Safekeeper
     400            2 :             );
     401            2 :         }
     402              : 
     403            8 :         fn wait_event_set(
     404            8 :             &self,
     405            8 :             _: &mut crate::bindings::WalProposer,
     406            8 :             timeout_millis: i64,
     407            8 :         ) -> super::WaitResult {
     408            8 :             let data = self.wait_events.get();
     409            8 :             println!(
     410            8 :                 "wait_event_set, timeout_millis={}, res={:?}",
     411            8 :                 timeout_millis, data
     412            8 :             );
     413            8 :             super::WaitResult::Network(data.sk, data.event_mask)
     414            8 :         }
     415              : 
     416            2 :         fn strong_random(&self, buf: &mut [u8]) -> bool {
     417            2 :             println!("strong_random");
     418            2 :             buf.fill(0);
     419            2 :             true
     420            2 :         }
     421              : 
     422            2 :         fn finish_sync_safekeepers(&self, lsn: u64) {
     423            2 :             self.sync_channel.send(lsn).unwrap();
     424            2 :             panic!("sync safekeepers finished at lsn={}", lsn);
     425              :         }
     426              : 
     427           14 :         fn log_internal(&self, _wp: &mut crate::bindings::WalProposer, level: Level, msg: &str) {
     428           14 :             println!("wp_log[{}] {}", level, msg);
     429           14 :         }
     430              : 
     431            0 :         fn after_election(&self, _wp: &mut crate::bindings::WalProposer) {
     432            0 :             println!("after_election");
     433            0 :         }
     434              :     }
     435              : 
     436              :     /// Test that walproposer can successfully connect to safekeeper and finish
     437              :     /// sync_safekeepers. API is mocked in MockImpl.
     438              :     ///
     439              :     /// Run this test with valgrind to detect leaks:
     440              :     /// `valgrind --leak-check=full target/debug/deps/walproposer-<build>`
     441            2 :     #[test]
     442            2 :     fn test_simple_sync_safekeepers() -> anyhow::Result<()> {
     443            2 :         let ttid = TenantTimelineId::new(
     444            2 :             "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
     445            2 :             "9e4c8f36063c6c6e93bc20d65a820f3d".parse()?,
     446              :         );
     447              : 
     448            2 :         let (sender, receiver) = sync_channel(1);
     449            2 : 
     450            2 :         let my_impl: Box<dyn ApiImpl> = Box::new(MockImpl {
     451            2 :             wait_events: Cell::new(WaitEventsData {
     452            2 :                 sk: std::ptr::null_mut(),
     453            2 :                 event_mask: 0,
     454            2 :             }),
     455            2 :             expected_messages: vec![
     456            2 :                 // Greeting(ProposerGreeting { protocol_version: 2, pg_version: 160001, proposer_id: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], system_id: 0, timeline_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tenant_id: 9e4c8f36063c6c6e93bc20d65a820f3d, tli: 1, wal_seg_size: 16777216 })
     457            2 :                 vec![
     458            2 :                     103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 113, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
     459            2 :                     0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 158, 76, 143, 54, 6, 60, 108, 110,
     460            2 :                     147, 188, 32, 214, 90, 130, 15, 61, 158, 76, 143, 54, 6, 60, 108, 110, 147,
     461            2 :                     188, 32, 214, 90, 130, 15, 61, 1, 0, 0, 0, 0, 0, 0, 1,
     462            2 :                 ],
     463            2 :                 // VoteRequest(VoteRequest { term: 3 })
     464            2 :                 vec![
     465            2 :                     118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
     466            2 :                     0, 0, 0, 0, 0, 0,
     467            2 :                 ],
     468            2 :             ],
     469            2 :             expected_ptr: AtomicUsize::new(0),
     470            2 :             safekeeper_replies: vec![
     471            2 :                 // Greeting(AcceptorGreeting { term: 2, node_id: NodeId(1) })
     472            2 :                 vec![
     473            2 :                     103, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0,
     474            2 :                 ],
     475            2 :                 // VoteResponse(VoteResponse { term: 3, vote_given: 1, flush_lsn: 0/539, truncate_lsn: 0/539, term_history: [(2, 0/539)], timeline_start_lsn: 0/539 })
     476            2 :                 vec![
     477            2 :                     118, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 57,
     478            2 :                     5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0,
     479            2 :                     0, 57, 5, 0, 0, 0, 0, 0, 0, 57, 5, 0, 0, 0, 0, 0, 0,
     480            2 :                 ],
     481            2 :             ],
     482            2 :             replies_ptr: AtomicUsize::new(0),
     483            2 :             sync_channel: sender,
     484            2 :         });
     485            2 :         let config = crate::walproposer::Config {
     486            2 :             ttid,
     487            2 :             safekeepers_list: vec!["localhost:5000".to_string()],
     488            2 :             safekeeper_reconnect_timeout: 1000,
     489            2 :             safekeeper_connection_timeout: 10000,
     490            2 :             sync_safekeepers: true,
     491            2 :         };
     492            2 : 
     493            2 :         let wp = Wrapper::new(my_impl, config);
     494            2 : 
     495            2 :         // walproposer will panic when it finishes sync_safekeepers
     496            2 :         std::panic::catch_unwind(|| wp.start()).unwrap_err();
     497            2 :         // validate the resulting LSN
     498            2 :         assert_eq!(receiver.try_recv(), Ok(1337));
     499            2 :         Ok(())
     500              :         // drop() will free up resources here
     501            2 :     }
     502              : }
        

Generated by: LCOV version 2.1-beta