LCOV - code coverage report
Current view: top level - safekeeper/tests/walproposer_sim - walproposer_api.rs (source / functions) Coverage Total Hit
Test: 465a86b0c1fda0069b3e0f6c1c126e6b635a1f72.info Lines: 95.5 % 486 464
Test Date: 2024-06-25 15:47:26 Functions: 99.3 % 138 137

            Line data    Source code
       1              : use std::{
       2              :     cell::{RefCell, RefMut, UnsafeCell},
       3              :     ffi::CStr,
       4              :     sync::Arc,
       5              : };
       6              : 
       7              : use bytes::Bytes;
       8              : use desim::{
       9              :     executor::{self, PollSome},
      10              :     network::TCP,
      11              :     node_os::NodeOs,
      12              :     proto::{AnyMessage, NetEvent, NodeEvent},
      13              :     world::NodeId,
      14              : };
      15              : use tracing::debug;
      16              : use utils::lsn::Lsn;
      17              : use walproposer::{
      18              :     api_bindings::Level,
      19              :     bindings::{
      20              :         NeonWALReadResult, SafekeeperStateDesiredEvents, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE,
      21              :     },
      22              :     walproposer::{ApiImpl, Config},
      23              : };
      24              : 
      25              : use super::walproposer_disk::DiskWalProposer;
      26              : 
      27              : /// Special state for each wp->sk connection.
      28              : struct SafekeeperConn {
      29              :     host: String,
      30              :     port: String,
      31              :     node_id: NodeId,
      32              :     // socket is Some(..) equals to connection is established
      33              :     socket: Option<TCP>,
      34              :     // connection is in progress
      35              :     is_connecting: bool,
      36              :     // START_WAL_PUSH is in progress
      37              :     is_start_wal_push: bool,
      38              :     // pointer to Safekeeper in walproposer for callbacks
      39              :     raw_ptr: *mut walproposer::bindings::Safekeeper,
      40              : }
      41              : 
      42              : impl SafekeeperConn {
      43       217359 :     pub fn new(host: String, port: String) -> Self {
      44       217359 :         // port number is the same as NodeId
      45       217359 :         let port_num = port.parse::<u32>().unwrap();
      46       217359 :         Self {
      47       217359 :             host,
      48       217359 :             port,
      49       217359 :             node_id: port_num,
      50       217359 :             socket: None,
      51       217359 :             is_connecting: false,
      52       217359 :             is_start_wal_push: false,
      53       217359 :             raw_ptr: std::ptr::null_mut(),
      54       217359 :         }
      55       217359 :     }
      56              : }
      57              : 
      58              : /// Simulation version of a postgres WaitEventSet. At pos 0 there is always
      59              : /// a special NodeEvents channel, which is used as a latch.
      60              : struct EventSet {
      61              :     os: NodeOs,
      62              :     // all pollable channels, 0 is always NodeEvent channel
      63              :     chans: Vec<Box<dyn PollSome>>,
      64              :     // 0 is always nullptr
      65              :     sk_ptrs: Vec<*mut walproposer::bindings::Safekeeper>,
      66              :     // event mask for each channel
      67              :     masks: Vec<u32>,
      68              : }
      69              : 
      70              : impl EventSet {
      71        72453 :     pub fn new(os: NodeOs) -> Self {
      72        72453 :         let node_events = os.node_events();
      73        72453 :         Self {
      74        72453 :             os,
      75        72453 :             chans: vec![Box::new(node_events)],
      76        72453 :             sk_ptrs: vec![std::ptr::null_mut()],
      77        72453 :             masks: vec![WL_SOCKET_READABLE],
      78        72453 :         }
      79        72453 :     }
      80              : 
      81              :     /// Leaves all readable channels at the beginning of the array.
      82       221682 :     fn sort_readable(&mut self) -> usize {
      83       221682 :         let mut cnt = 1;
      84       531501 :         for i in 1..self.chans.len() {
      85       531501 :             if self.masks[i] & WL_SOCKET_READABLE != 0 {
      86       531501 :                 self.chans.swap(i, cnt);
      87       531501 :                 self.sk_ptrs.swap(i, cnt);
      88       531501 :                 self.masks.swap(i, cnt);
      89       531501 :                 cnt += 1;
      90       531501 :             }
      91              :         }
      92       221682 :         cnt
      93       221682 :     }
      94              : 
      95       534221 :     fn update_event_set(&mut self, conn: &SafekeeperConn, event_mask: u32) {
      96       534221 :         let index = self
      97       534221 :             .sk_ptrs
      98       534221 :             .iter()
      99      2013725 :             .position(|&ptr| ptr == conn.raw_ptr)
     100       534221 :             .expect("safekeeper should exist in event set");
     101       534221 :         self.masks[index] = event_mask;
     102       534221 :     }
     103              : 
     104       480374 :     fn add_safekeeper(&mut self, sk: &SafekeeperConn, event_mask: u32) {
     105      1162219 :         for ptr in self.sk_ptrs.iter() {
     106      1162219 :             assert!(*ptr != sk.raw_ptr);
     107              :         }
     108              : 
     109       480374 :         self.chans.push(Box::new(
     110       480374 :             sk.socket
     111       480374 :                 .as_ref()
     112       480374 :                 .expect("socket should not be closed")
     113       480374 :                 .recv_chan(),
     114       480374 :         ));
     115       480374 :         self.sk_ptrs.push(sk.raw_ptr);
     116       480374 :         self.masks.push(event_mask);
     117       480374 :     }
     118              : 
     119       300420 :     fn remove_safekeeper(&mut self, sk: &SafekeeperConn) {
     120       639739 :         let index = self.sk_ptrs.iter().position(|&ptr| ptr == sk.raw_ptr);
     121       300420 :         if index.is_none() {
     122           51 :             debug!("remove_safekeeper: sk={:?} not found", sk.raw_ptr);
     123           51 :             return;
     124       300369 :         }
     125       300369 :         let index = index.unwrap();
     126       300369 : 
     127       300369 :         self.chans.remove(index);
     128       300369 :         self.sk_ptrs.remove(index);
     129       300369 :         self.masks.remove(index);
     130       300369 : 
     131       300369 :         // to simulate the actual behaviour
     132       300369 :         self.refresh_event_set();
     133       300420 :     }
     134              : 
     135              :     /// Updates all masks to match the result of a SafekeeperStateDesiredEvents.
     136       343718 :     fn refresh_event_set(&mut self) {
     137       975940 :         for (i, mask) in self.masks.iter_mut().enumerate() {
     138       975940 :             if i == 0 {
     139       343718 :                 continue;
     140       632222 :             }
     141       632222 : 
     142       632222 :             let mut mask_sk: u32 = 0;
     143       632222 :             let mut mask_nwr: u32 = 0;
     144       632222 :             unsafe { SafekeeperStateDesiredEvents(self.sk_ptrs[i], &mut mask_sk, &mut mask_nwr) };
     145       632222 : 
     146       632222 :             if mask_sk != *mask {
     147            0 :                 debug!(
     148            0 :                     "refresh_event_set: sk={:?}, old_mask={:#b}, new_mask={:#b}",
     149            0 :                     self.sk_ptrs[i], *mask, mask_sk
     150              :                 );
     151            0 :                 *mask = mask_sk;
     152       632222 :             }
     153              :         }
     154       343718 :     }
     155              : 
     156              :     /// Wait for events on all channels.
     157       221682 :     fn wait(&mut self, timeout_millis: i64) -> walproposer::walproposer::WaitResult {
     158              :         // all channels are always writeable
     159       753183 :         for (i, mask) in self.masks.iter().enumerate() {
     160       753183 :             if *mask & WL_SOCKET_WRITEABLE != 0 {
     161            0 :                 return walproposer::walproposer::WaitResult::Network(
     162            0 :                     self.sk_ptrs[i],
     163            0 :                     WL_SOCKET_WRITEABLE,
     164            0 :                 );
     165       753183 :             }
     166              :         }
     167              : 
     168       221682 :         let cnt = self.sort_readable();
     169       221682 : 
     170       221682 :         let slice = &self.chans[0..cnt];
     171       221682 :         match executor::epoll_chans(slice, timeout_millis) {
     172        90088 :             None => walproposer::walproposer::WaitResult::Timeout,
     173              :             Some(0) => {
     174         3178 :                 let msg = self.os.node_events().must_recv();
     175         3178 :                 match msg {
     176         3178 :                     NodeEvent::Internal(AnyMessage::Just32(0)) => {
     177         3178 :                         // got a notification about new WAL available
     178         3178 :                     }
     179            0 :                     NodeEvent::Internal(_) => unreachable!(),
     180            0 :                     NodeEvent::Accept(_) => unreachable!(),
     181              :                 }
     182         3178 :                 walproposer::walproposer::WaitResult::Latch
     183              :             }
     184       128416 :             Some(index) => walproposer::walproposer::WaitResult::Network(
     185       128416 :                 self.sk_ptrs[index],
     186       128416 :                 WL_SOCKET_READABLE,
     187       128416 :             ),
     188              :         }
     189       221682 :     }
     190              : }
     191              : 
     192              : /// This struct handles all calls from walproposer into walproposer_api.
     193              : pub struct SimulationApi {
     194              :     os: NodeOs,
     195              :     safekeepers: RefCell<Vec<SafekeeperConn>>,
     196              :     disk: Arc<DiskWalProposer>,
     197              :     redo_start_lsn: Option<Lsn>,
     198              :     last_logged_commit_lsn: u64,
     199              :     shmem: UnsafeCell<walproposer::bindings::WalproposerShmemState>,
     200              :     config: Config,
     201              :     event_set: RefCell<Option<EventSet>>,
     202              : }
     203              : 
     204              : pub struct Args {
     205              :     pub os: NodeOs,
     206              :     pub config: Config,
     207              :     pub disk: Arc<DiskWalProposer>,
     208              :     pub redo_start_lsn: Option<Lsn>,
     209              : }
     210              : 
     211              : impl SimulationApi {
     212        72453 :     pub fn new(args: Args) -> Self {
     213        72453 :         // initialize connection state for each safekeeper
     214        72453 :         let sk_conns = args
     215        72453 :             .config
     216        72453 :             .safekeepers_list
     217        72453 :             .iter()
     218       217359 :             .map(|s| {
     219       217359 :                 SafekeeperConn::new(
     220       217359 :                     s.split(':').next().unwrap().to_string(),
     221       217359 :                     s.split(':').nth(1).unwrap().to_string(),
     222       217359 :                 )
     223       217359 :             })
     224        72453 :             .collect::<Vec<_>>();
     225        72453 : 
     226        72453 :         Self {
     227        72453 :             os: args.os,
     228        72453 :             safekeepers: RefCell::new(sk_conns),
     229        72453 :             disk: args.disk,
     230        72453 :             redo_start_lsn: args.redo_start_lsn,
     231        72453 :             last_logged_commit_lsn: 0,
     232        72453 :             shmem: UnsafeCell::new(walproposer::api_bindings::empty_shmem()),
     233        72453 :             config: args.config,
     234        72453 :             event_set: RefCell::new(None),
     235        72453 :         }
     236        72453 :     }
     237              : 
     238              :     /// Get SafekeeperConn for the given Safekeeper.
     239      2361915 :     fn get_conn(&self, sk: &mut walproposer::bindings::Safekeeper) -> RefMut<'_, SafekeeperConn> {
     240      2361915 :         let sk_port = unsafe { CStr::from_ptr(sk.port).to_str().unwrap() };
     241      2361915 :         let state = self.safekeepers.borrow_mut();
     242      2361915 :         RefMut::map(state, |v| {
     243      2361915 :             v.iter_mut()
     244      4719418 :                 .find(|conn| conn.port == sk_port)
     245      2361915 :                 .expect("safekeeper conn not found by port")
     246      2361915 :         })
     247      2361915 :     }
     248              : }
     249              : 
     250              : impl ApiImpl for SimulationApi {
     251      2522542 :     fn get_current_timestamp(&self) -> i64 {
     252      2522542 :         debug!("get_current_timestamp");
     253              :         // PG TimestampTZ is microseconds, but simulation unit is assumed to be
     254              :         // milliseconds, so add 10^3
     255      2522542 :         self.os.now() as i64 * 1000
     256      2522542 :     }
     257              : 
     258         8177 :     fn update_donor(&self, donor: &mut walproposer::bindings::Safekeeper, donor_lsn: u64) {
     259         8177 :         let mut shmem = unsafe { *self.get_shmem_state() };
     260         8177 :         shmem.propEpochStartLsn.value = donor_lsn;
     261         8177 :         shmem.donor_conninfo = donor.conninfo;
     262         8177 :     }
     263              : 
     264       267006 :     fn conn_status(
     265       267006 :         &self,
     266       267006 :         _: &mut walproposer::bindings::Safekeeper,
     267       267006 :     ) -> walproposer::bindings::WalProposerConnStatusType {
     268       267006 :         debug!("conn_status");
     269              :         // break the connection with a 10% chance
     270       267006 :         if self.os.random(100) < 10 {
     271        26819 :             walproposer::bindings::WalProposerConnStatusType_WP_CONNECTION_BAD
     272              :         } else {
     273       240187 :             walproposer::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
     274              :         }
     275       267006 :     }
     276              : 
     277       267006 :     fn conn_connect_start(&self, sk: &mut walproposer::bindings::Safekeeper) {
     278       267006 :         debug!("conn_connect_start");
     279       267006 :         let mut conn = self.get_conn(sk);
     280       267006 : 
     281       267006 :         assert!(conn.socket.is_none());
     282       267006 :         let socket = self.os.open_tcp(conn.node_id);
     283       267006 :         conn.socket = Some(socket);
     284       267006 :         conn.raw_ptr = sk;
     285       267006 :         conn.is_connecting = true;
     286       267006 :     }
     287              : 
     288       240187 :     fn conn_connect_poll(
     289       240187 :         &self,
     290       240187 :         _: &mut walproposer::bindings::Safekeeper,
     291       240187 :     ) -> walproposer::bindings::WalProposerConnectPollStatusType {
     292       240187 :         debug!("conn_connect_poll");
     293              :         // TODO: break the connection here
     294       240187 :         walproposer::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
     295       240187 :     }
     296              : 
     297       240187 :     fn conn_send_query(&self, sk: &mut walproposer::bindings::Safekeeper, query: &str) -> bool {
     298       240187 :         debug!("conn_send_query: {}", query);
     299       240187 :         self.get_conn(sk).is_start_wal_push = true;
     300       240187 :         true
     301       240187 :     }
     302              : 
     303       240187 :     fn conn_get_query_result(
     304       240187 :         &self,
     305       240187 :         _: &mut walproposer::bindings::Safekeeper,
     306       240187 :     ) -> walproposer::bindings::WalProposerExecStatusType {
     307       240187 :         debug!("conn_get_query_result");
     308              :         // TODO: break the connection here
     309       240187 :         walproposer::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
     310       240187 :     }
     311              : 
     312       140918 :     fn conn_async_read(
     313       140918 :         &self,
     314       140918 :         sk: &mut walproposer::bindings::Safekeeper,
     315       140918 :         vec: &mut Vec<u8>,
     316       140918 :     ) -> walproposer::bindings::PGAsyncReadResult {
     317       140918 :         debug!("conn_async_read");
     318       140918 :         let mut conn = self.get_conn(sk);
     319              : 
     320       140918 :         let socket = if let Some(socket) = conn.socket.as_mut() {
     321       140918 :             socket
     322              :         } else {
     323              :             // socket is already closed
     324            0 :             return walproposer::bindings::PGAsyncReadResult_PG_ASYNC_READ_FAIL;
     325              :         };
     326              : 
     327       140918 :         let msg = socket.recv_chan().try_recv();
     328              : 
     329       126817 :         match msg {
     330              :             None => {
     331              :                 // no message is ready
     332        14101 :                 walproposer::bindings::PGAsyncReadResult_PG_ASYNC_READ_TRY_AGAIN
     333              :             }
     334              :             Some(NetEvent::Closed) => {
     335              :                 // connection is closed
     336        58037 :                 debug!("conn_async_read: connection is closed");
     337        58037 :                 conn.socket = None;
     338        58037 :                 walproposer::bindings::PGAsyncReadResult_PG_ASYNC_READ_FAIL
     339              :             }
     340        68780 :             Some(NetEvent::Message(msg)) => {
     341              :                 // got a message
     342        68780 :                 let b = match msg {
     343        68780 :                     desim::proto::AnyMessage::Bytes(b) => b,
     344            0 :                     _ => unreachable!(),
     345              :                 };
     346        68780 :                 vec.extend_from_slice(&b);
     347        68780 :                 walproposer::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
     348              :             }
     349              :         }
     350       140918 :     }
     351              : 
     352       271430 :     fn conn_blocking_write(&self, sk: &mut walproposer::bindings::Safekeeper, buf: &[u8]) -> bool {
     353       271430 :         let mut conn = self.get_conn(sk);
     354       271430 :         debug!("conn_blocking_write to {}: {:?}", conn.node_id, buf);
     355       271430 :         let socket = conn.socket.as_mut().unwrap();
     356       271430 :         socket.send(desim::proto::AnyMessage::Bytes(Bytes::copy_from_slice(buf)));
     357       271430 :         true
     358       271430 :     }
     359              : 
     360        37776 :     fn conn_async_write(
     361        37776 :         &self,
     362        37776 :         sk: &mut walproposer::bindings::Safekeeper,
     363        37776 :         buf: &[u8],
     364        37776 :     ) -> walproposer::bindings::PGAsyncWriteResult {
     365        37776 :         let mut conn = self.get_conn(sk);
     366        37776 :         debug!("conn_async_write to {}: {:?}", conn.node_id, buf);
     367        37776 :         if let Some(socket) = conn.socket.as_mut() {
     368        37776 :             socket.send(desim::proto::AnyMessage::Bytes(Bytes::copy_from_slice(buf)));
     369        37776 :         } else {
     370              :             // connection is already closed
     371            0 :             debug!("conn_async_write: writing to a closed socket!");
     372              :             // TODO: maybe we should return error here?
     373              :         }
     374        37776 :         walproposer::bindings::PGAsyncWriteResult_PG_ASYNC_WRITE_SUCCESS
     375        37776 :     }
     376              : 
     377         8528 :     fn wal_reader_allocate(&self, _: &mut walproposer::bindings::Safekeeper) -> NeonWALReadResult {
     378         8528 :         debug!("wal_reader_allocate");
     379         8528 :         walproposer::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
     380         8528 :     }
     381              : 
     382        29248 :     fn wal_read(
     383        29248 :         &self,
     384        29248 :         _sk: &mut walproposer::bindings::Safekeeper,
     385        29248 :         buf: &mut [u8],
     386        29248 :         startpos: u64,
     387        29248 :     ) -> NeonWALReadResult {
     388        29248 :         self.disk.lock().read(startpos, buf);
     389        29248 :         walproposer::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
     390        29248 :     }
     391              : 
     392        72453 :     fn init_event_set(&self, _: &mut walproposer::bindings::WalProposer) {
     393        72453 :         debug!("init_event_set");
     394        72453 :         let new_event_set = EventSet::new(self.os.clone());
     395        72453 :         let old_event_set = self.event_set.replace(Some(new_event_set));
     396        72453 :         assert!(old_event_set.is_none());
     397        72453 :     }
     398              : 
     399       534221 :     fn update_event_set(&self, sk: &mut walproposer::bindings::Safekeeper, event_mask: u32) {
     400       534221 :         debug!(
     401            0 :             "update_event_set, sk={:?}, events_mask={:#b}",
     402            0 :             sk as *mut walproposer::bindings::Safekeeper, event_mask
     403              :         );
     404       534221 :         let conn = self.get_conn(sk);
     405       534221 : 
     406       534221 :         self.event_set
     407       534221 :             .borrow_mut()
     408       534221 :             .as_mut()
     409       534221 :             .unwrap()
     410       534221 :             .update_event_set(&conn, event_mask);
     411       534221 :     }
     412              : 
     413       480374 :     fn add_safekeeper_event_set(
     414       480374 :         &self,
     415       480374 :         sk: &mut walproposer::bindings::Safekeeper,
     416       480374 :         event_mask: u32,
     417       480374 :     ) {
     418       480374 :         debug!(
     419            0 :             "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
     420            0 :             sk as *mut walproposer::bindings::Safekeeper, event_mask
     421              :         );
     422              : 
     423       480374 :         self.event_set
     424       480374 :             .borrow_mut()
     425       480374 :             .as_mut()
     426       480374 :             .unwrap()
     427       480374 :             .add_safekeeper(&self.get_conn(sk), event_mask);
     428       480374 :     }
     429              : 
     430       300420 :     fn rm_safekeeper_event_set(&self, sk: &mut walproposer::bindings::Safekeeper) {
     431       300420 :         debug!(
     432            0 :             "rm_safekeeper_event_set, sk={:?}",
     433            0 :             sk as *mut walproposer::bindings::Safekeeper,
     434              :         );
     435              : 
     436       300420 :         self.event_set
     437       300420 :             .borrow_mut()
     438       300420 :             .as_mut()
     439       300420 :             .unwrap()
     440       300420 :             .remove_safekeeper(&self.get_conn(sk));
     441       300420 :     }
     442              : 
     443        43349 :     fn active_state_update_event_set(&self, sk: &mut walproposer::bindings::Safekeeper) {
     444        43349 :         debug!("active_state_update_event_set");
     445              : 
     446        43349 :         assert!(sk.state == walproposer::bindings::SafekeeperState_SS_ACTIVE);
     447        43349 :         self.event_set
     448        43349 :             .borrow_mut()
     449        43349 :             .as_mut()
     450        43349 :             .unwrap()
     451        43349 :             .refresh_event_set();
     452        43349 :     }
     453              : 
     454       122757 :     fn wal_reader_events(&self, _sk: &mut walproposer::bindings::Safekeeper) -> u32 {
     455       122757 :         0
     456       122757 :     }
     457              : 
     458       702056 :     fn wait_event_set(
     459       702056 :         &self,
     460       702056 :         _: &mut walproposer::bindings::WalProposer,
     461       702056 :         timeout_millis: i64,
     462       702056 :     ) -> walproposer::walproposer::WaitResult {
     463       702056 :         // TODO: handle multiple stages as part of the simulation (e.g. connect, start_wal_push, etc)
     464       702056 :         let mut conns = self.safekeepers.borrow_mut();
     465      1625654 :         for conn in conns.iter_mut() {
     466      1625654 :             if conn.socket.is_some() && conn.is_connecting {
     467       240187 :                 conn.is_connecting = false;
     468       240187 :                 debug!("wait_event_set, connecting to {}:{}", conn.host, conn.port);
     469       240187 :                 return walproposer::walproposer::WaitResult::Network(
     470       240187 :                     conn.raw_ptr,
     471       240187 :                     WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE,
     472       240187 :                 );
     473      1385467 :             }
     474      1385467 :             if conn.socket.is_some() && conn.is_start_wal_push {
     475       240187 :                 conn.is_start_wal_push = false;
     476       240187 :                 debug!(
     477            0 :                     "wait_event_set, start wal push to {}:{}",
     478              :                     conn.host, conn.port
     479              :                 );
     480       240187 :                 return walproposer::walproposer::WaitResult::Network(
     481       240187 :                     conn.raw_ptr,
     482       240187 :                     WL_SOCKET_READABLE,
     483       240187 :                 );
     484      1145280 :             }
     485              :         }
     486       221682 :         drop(conns);
     487       221682 : 
     488       221682 :         let res = self
     489       221682 :             .event_set
     490       221682 :             .borrow_mut()
     491       221682 :             .as_mut()
     492       221682 :             .unwrap()
     493       221682 :             .wait(timeout_millis);
     494       221682 : 
     495       221682 :         debug!(
     496            0 :             "wait_event_set, timeout_millis={}, res={:?}",
     497              :             timeout_millis, res,
     498              :         );
     499       153352 :         res
     500       633726 :     }
     501              : 
     502        72453 :     fn strong_random(&self, buf: &mut [u8]) -> bool {
     503        72453 :         debug!("strong_random");
     504        72453 :         buf.fill(0);
     505        72453 :         true
     506        72453 :     }
     507              : 
     508         2935 :     fn finish_sync_safekeepers(&self, lsn: u64) {
     509         2935 :         debug!("finish_sync_safekeepers, lsn={}", lsn);
     510         2935 :         executor::exit(0, Lsn(lsn).to_string());
     511         2935 :     }
     512              : 
     513       726102 :     fn log_internal(&self, _wp: &mut walproposer::bindings::WalProposer, level: Level, msg: &str) {
     514       726102 :         debug!("wp_log[{}] {}", level, msg);
     515       726102 :         if level == Level::Fatal || level == Level::Panic {
     516          605 :             if msg.contains("rejects our connection request with term") {
     517          347 :                 // collected quorum with lower term, then got rejected by next connected safekeeper
     518          347 :                 executor::exit(1, msg.to_owned());
     519          347 :             }
     520          605 :             if msg.contains("collected propEpochStartLsn") && msg.contains(", but basebackup LSN ")
     521           18 :             {
     522           18 :                 // sync-safekeepers collected wrong quorum, walproposer collected another quorum
     523           18 :                 executor::exit(1, msg.to_owned());
     524          587 :             }
     525          605 :             if msg.contains("failed to download WAL for logical replicaiton") {
     526          163 :                 // Recovery connection broken and recovery was failed
     527          163 :                 executor::exit(1, msg.to_owned());
     528          442 :             }
     529          605 :             if msg.contains("missing majority of votes, collected") {
     530           77 :                 // Voting bug when safekeeper disconnects after voting
     531           77 :                 executor::exit(1, msg.to_owned());
     532          528 :             }
     533          605 :             panic!("unknown FATAL error from walproposer: {}", msg);
     534       725497 :         }
     535       725497 :     }
     536              : 
     537         6182 :     fn after_election(&self, wp: &mut walproposer::bindings::WalProposer) {
     538         6182 :         let prop_lsn = wp.propEpochStartLsn;
     539         6182 :         let prop_term = wp.propTerm;
     540         6182 : 
     541         6182 :         let mut prev_lsn: u64 = 0;
     542         6182 :         let mut prev_term: u64 = 0;
     543         6182 : 
     544         6182 :         unsafe {
     545         6182 :             let history = wp.propTermHistory.entries;
     546         6182 :             let len = wp.propTermHistory.n_entries as usize;
     547         6182 :             if len > 1 {
     548         4221 :                 let entry = *history.wrapping_add(len - 2);
     549         4221 :                 prev_lsn = entry.lsn;
     550         4221 :                 prev_term = entry.term;
     551         4221 :             }
     552              :         }
     553              : 
     554         6182 :         let msg = format!(
     555         6182 :             "prop_elected;{};{};{};{}",
     556         6182 :             prop_lsn, prop_term, prev_lsn, prev_term
     557         6182 :         );
     558         6182 : 
     559         6182 :         debug!(msg);
     560         6182 :         self.os.log_event(msg);
     561         6182 :     }
     562              : 
     563         2306 :     fn get_redo_start_lsn(&self) -> u64 {
     564         2306 :         debug!("get_redo_start_lsn -> {:?}", self.redo_start_lsn);
     565         2306 :         self.redo_start_lsn.expect("redo_start_lsn is not set").0
     566         2306 :     }
     567              : 
     568        20577 :     fn get_shmem_state(&self) -> *mut walproposer::bindings::WalproposerShmemState {
     569        20577 :         self.shmem.get()
     570        20577 :     }
     571              : 
     572         1357 :     fn start_streaming(
     573         1357 :         &self,
     574         1357 :         startpos: u64,
     575         1357 :         callback: &walproposer::walproposer::StreamingCallback,
     576         1357 :     ) {
     577         1357 :         let disk = &self.disk;
     578         1357 :         let disk_lsn = disk.lock().flush_rec_ptr().0;
     579         1357 :         debug!("start_streaming at {} (disk_lsn={})", startpos, disk_lsn);
     580         1357 :         if startpos < disk_lsn {
     581          395 :             debug!("startpos < disk_lsn, it means we wrote some transaction even before streaming started");
     582          962 :         }
     583         1357 :         assert!(startpos <= disk_lsn);
     584         1357 :         let mut broadcasted = Lsn(startpos);
     585              : 
     586              :         loop {
     587         4441 :             let available = disk.lock().flush_rec_ptr();
     588         4441 :             assert!(available >= broadcasted);
     589         3084 :             callback.broadcast(broadcasted, available);
     590         3084 :             broadcasted = available;
     591         3084 :             callback.poll();
     592              :         }
     593              :     }
     594              : 
     595        15220 :     fn process_safekeeper_feedback(
     596        15220 :         &mut self,
     597        15220 :         wp: &mut walproposer::bindings::WalProposer,
     598        15220 :         _sk: &mut walproposer::bindings::Safekeeper,
     599        15220 :     ) {
     600        15220 :         debug!("process_safekeeper_feedback, commit_lsn={}", wp.commitLsn);
     601        15220 :         if wp.commitLsn > self.last_logged_commit_lsn {
     602         3213 :             self.os.log_event(format!("commit_lsn;{}", wp.commitLsn));
     603         3213 :             self.last_logged_commit_lsn = wp.commitLsn;
     604        12007 :         }
     605        15220 :     }
     606              : 
     607          821 :     fn get_flush_rec_ptr(&self) -> u64 {
     608          821 :         let lsn = self.disk.lock().flush_rec_ptr();
     609          821 :         debug!("get_flush_rec_ptr: {}", lsn);
     610          821 :         lsn.0
     611          821 :     }
     612              : 
     613         6182 :     fn recovery_download(
     614         6182 :         &self,
     615         6182 :         wp: &mut walproposer::bindings::WalProposer,
     616         6182 :         sk: &mut walproposer::bindings::Safekeeper,
     617         6182 :     ) -> bool {
     618         6182 :         let mut startpos = wp.truncateLsn;
     619         6182 :         let endpos = wp.propEpochStartLsn;
     620         6182 : 
     621         6182 :         if startpos == endpos {
     622         3651 :             debug!("recovery_download: nothing to download");
     623         3651 :             return true;
     624         2531 :         }
     625         2531 : 
     626         2531 :         debug!("recovery_download from {} to {}", startpos, endpos,);
     627              : 
     628         2531 :         let replication_prompt = format!(
     629         2531 :             "START_REPLICATION {} {} {} {}",
     630         2531 :             self.config.ttid.tenant_id, self.config.ttid.timeline_id, startpos, endpos,
     631         2531 :         );
     632         2531 :         let async_conn = self.get_conn(sk);
     633         2531 : 
     634         2531 :         let conn = self.os.open_tcp(async_conn.node_id);
     635         2531 :         conn.send(desim::proto::AnyMessage::Bytes(replication_prompt.into()));
     636         2531 : 
     637         2531 :         let chan = conn.recv_chan();
     638         4316 :         while startpos < endpos {
     639         2531 :             let event = chan.recv();
     640         2368 :             match event {
     641              :                 NetEvent::Closed => {
     642          163 :                     debug!("connection closed in recovery");
     643          163 :                     break;
     644              :                 }
     645         2368 :                 NetEvent::Message(AnyMessage::Bytes(b)) => {
     646         2368 :                     debug!("got recovery bytes from safekeeper");
     647         1785 :                     self.disk.lock().write(startpos, &b);
     648         1785 :                     startpos += b.len() as u64;
     649              :                 }
     650            0 :                 NetEvent::Message(_) => unreachable!(),
     651              :             }
     652              :         }
     653              : 
     654         1948 :         debug!("recovery finished at {}", startpos);
     655              : 
     656         1948 :         startpos == endpos
     657         5599 :     }
     658              : 
     659        87052 :     fn conn_finish(&self, sk: &mut walproposer::bindings::Safekeeper) {
     660        87052 :         let mut conn = self.get_conn(sk);
     661        87052 :         debug!("conn_finish to {}", conn.node_id);
     662        87052 :         if let Some(socket) = conn.socket.as_mut() {
     663        28964 :             socket.close();
     664        58088 :         } else {
     665        58088 :             // connection is already closed
     666        58088 :         }
     667        87052 :         conn.socket = None;
     668        87052 :     }
     669              : 
     670        84856 :     fn conn_error_message(&self, _sk: &mut walproposer::bindings::Safekeeper) -> String {
     671        84856 :         "connection is closed, probably".into()
     672        84856 :     }
     673              : }
        

Generated by: LCOV version 2.1-beta