LCOV - code coverage report
Current view: top level - safekeeper/tests/walproposer_sim - walproposer_api.rs (source / functions) Coverage Total Hit
Test: 24ee6d80f31ab4e4f310a156af8323b901a6f144.info Lines: 93.8 % 486 456
Test Date: 2025-03-17 16:48:14 Functions: 97.1 % 138 134

            Line data    Source code
       1              : use std::cell::{RefCell, RefMut, UnsafeCell};
       2              : use std::ffi::CStr;
       3              : use std::sync::Arc;
       4              : 
       5              : use bytes::Bytes;
       6              : use desim::executor::{self, PollSome};
       7              : use desim::network::TCP;
       8              : use desim::node_os::NodeOs;
       9              : use desim::proto::{AnyMessage, NetEvent, NodeEvent};
      10              : use desim::world::NodeId;
      11              : use tracing::debug;
      12              : use utils::lsn::Lsn;
      13              : use walproposer::api_bindings::Level;
      14              : use walproposer::bindings::{
      15              :     NeonWALReadResult, SafekeeperStateDesiredEvents, WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE,
      16              : };
      17              : use walproposer::walproposer::{ApiImpl, Config};
      18              : 
      19              : use super::walproposer_disk::DiskWalProposer;
      20              : 
      21              : /// Special state for each wp->sk connection.
      22              : struct SafekeeperConn {
      23              :     host: String,
      24              :     port: String,
      25              :     node_id: NodeId,
      26              :     // socket is Some(..) equals to connection is established
      27              :     socket: Option<TCP>,
      28              :     // connection is in progress
      29              :     is_connecting: bool,
      30              :     // START_WAL_PUSH is in progress
      31              :     is_start_wal_push: bool,
      32              :     // pointer to Safekeeper in walproposer for callbacks
      33              :     raw_ptr: *mut walproposer::bindings::Safekeeper,
      34              : }
      35              : 
      36              : impl SafekeeperConn {
      37        27585 :     pub fn new(host: String, port: String) -> Self {
      38        27585 :         // port number is the same as NodeId
      39        27585 :         let port_num = port.parse::<u32>().unwrap();
      40        27585 :         Self {
      41        27585 :             host,
      42        27585 :             port,
      43        27585 :             node_id: port_num,
      44        27585 :             socket: None,
      45        27585 :             is_connecting: false,
      46        27585 :             is_start_wal_push: false,
      47        27585 :             raw_ptr: std::ptr::null_mut(),
      48        27585 :         }
      49        27585 :     }
      50              : }
      51              : 
      52              : /// Simulation version of a postgres WaitEventSet. At pos 0 there is always
      53              : /// a special NodeEvents channel, which is used as a latch.
      54              : struct EventSet {
      55              :     os: NodeOs,
      56              :     // all pollable channels, 0 is always NodeEvent channel
      57              :     chans: Vec<Box<dyn PollSome>>,
      58              :     // 0 is always nullptr
      59              :     sk_ptrs: Vec<*mut walproposer::bindings::Safekeeper>,
      60              :     // event mask for each channel
      61              :     masks: Vec<u32>,
      62              : }
      63              : 
      64              : impl EventSet {
      65         9195 :     pub fn new(os: NodeOs) -> Self {
      66         9195 :         let node_events = os.node_events();
      67         9195 :         Self {
      68         9195 :             os,
      69         9195 :             chans: vec![Box::new(node_events)],
      70         9195 :             sk_ptrs: vec![std::ptr::null_mut()],
      71         9195 :             masks: vec![WL_SOCKET_READABLE],
      72         9195 :         }
      73         9195 :     }
      74              : 
      75              :     /// Leaves all readable channels at the beginning of the array.
      76        31049 :     fn sort_readable(&mut self) -> usize {
      77        31049 :         let mut cnt = 1;
      78        74386 :         for i in 1..self.chans.len() {
      79        74386 :             if self.masks[i] & WL_SOCKET_READABLE != 0 {
      80        74386 :                 self.chans.swap(i, cnt);
      81        74386 :                 self.sk_ptrs.swap(i, cnt);
      82        74386 :                 self.masks.swap(i, cnt);
      83        74386 :                 cnt += 1;
      84        74386 :             }
      85              :         }
      86        31049 :         cnt
      87        31049 :     }
      88              : 
      89        68036 :     fn update_event_set(&mut self, conn: &SafekeeperConn, event_mask: u32) {
      90        68036 :         let index = self
      91        68036 :             .sk_ptrs
      92        68036 :             .iter()
      93       256640 :             .position(|&ptr| ptr == conn.raw_ptr)
      94        68036 :             .expect("safekeeper should exist in event set");
      95        68036 :         self.masks[index] = event_mask;
      96        68036 :     }
      97              : 
      98        61416 :     fn add_safekeeper(&mut self, sk: &SafekeeperConn, event_mask: u32) {
      99       148575 :         for ptr in self.sk_ptrs.iter() {
     100       148575 :             assert!(*ptr != sk.raw_ptr);
     101              :         }
     102              : 
     103        61416 :         self.chans.push(Box::new(
     104        61416 :             sk.socket
     105        61416 :                 .as_ref()
     106        61416 :                 .expect("socket should not be closed")
     107        61416 :                 .recv_chan(),
     108        61416 :         ));
     109        61416 :         self.sk_ptrs.push(sk.raw_ptr);
     110        61416 :         self.masks.push(event_mask);
     111        61416 :     }
     112              : 
     113        38904 :     fn remove_safekeeper(&mut self, sk: &SafekeeperConn) {
     114        82854 :         let index = self.sk_ptrs.iter().position(|&ptr| ptr == sk.raw_ptr);
     115        38904 :         if index.is_none() {
     116            3 :             debug!("remove_safekeeper: sk={:?} not found", sk.raw_ptr);
     117            3 :             return;
     118        38901 :         }
     119        38901 :         let index = index.unwrap();
     120        38901 : 
     121        38901 :         self.chans.remove(index);
     122        38901 :         self.sk_ptrs.remove(index);
     123        38901 :         self.masks.remove(index);
     124        38901 : 
     125        38901 :         // to simulate the actual behaviour
     126        38901 :         self.refresh_event_set();
     127        38904 :     }
     128              : 
     129              :     /// Updates all masks to match the result of a SafekeeperStateDesiredEvents.
     130        47782 :     fn refresh_event_set(&mut self) {
     131       137935 :         for (i, mask) in self.masks.iter_mut().enumerate() {
     132       137935 :             if i == 0 {
     133        47782 :                 continue;
     134        90153 :             }
     135        90153 : 
     136        90153 :             let mut mask_sk: u32 = 0;
     137        90153 :             let mut mask_nwr: u32 = 0;
     138        90153 :             unsafe { SafekeeperStateDesiredEvents(self.sk_ptrs[i], &mut mask_sk, &mut mask_nwr) };
     139        90153 : 
     140        90153 :             if mask_sk != *mask {
     141            0 :                 debug!(
     142            0 :                     "refresh_event_set: sk={:?}, old_mask={:#b}, new_mask={:#b}",
     143            0 :                     self.sk_ptrs[i], *mask, mask_sk
     144              :                 );
     145            0 :                 *mask = mask_sk;
     146        90153 :             }
     147              :         }
     148        47782 :     }
     149              : 
     150              :     /// Wait for events on all channels.
     151        31049 :     fn wait(&mut self, timeout_millis: i64) -> walproposer::walproposer::WaitResult {
     152              :         // all channels are always writeable
     153       105435 :         for (i, mask) in self.masks.iter().enumerate() {
     154       105435 :             if *mask & WL_SOCKET_WRITEABLE != 0 {
     155            0 :                 return walproposer::walproposer::WaitResult::Network(
     156            0 :                     self.sk_ptrs[i],
     157            0 :                     WL_SOCKET_WRITEABLE,
     158            0 :                 );
     159       105435 :             }
     160              :         }
     161              : 
     162        31049 :         let cnt = self.sort_readable();
     163        31049 : 
     164        31049 :         let slice = &self.chans[0..cnt];
     165        31049 :         match executor::epoll_chans(slice, timeout_millis) {
     166        11304 :             None => walproposer::walproposer::WaitResult::Timeout,
     167              :             Some(0) => {
     168          638 :                 let msg = self.os.node_events().must_recv();
     169          638 :                 match msg {
     170          638 :                     NodeEvent::Internal(AnyMessage::Just32(0)) => {
     171          638 :                         // got a notification about new WAL available
     172          638 :                     }
     173            0 :                     NodeEvent::Internal(_) => unreachable!(),
     174            0 :                     NodeEvent::Accept(_) => unreachable!(),
     175              :                 }
     176          638 :                 walproposer::walproposer::WaitResult::Latch
     177              :             }
     178        19107 :             Some(index) => walproposer::walproposer::WaitResult::Network(
     179        19107 :                 self.sk_ptrs[index],
     180        19107 :                 WL_SOCKET_READABLE,
     181        19107 :             ),
     182              :         }
     183        31049 :     }
     184              : }
     185              : 
     186              : /// This struct handles all calls from walproposer into walproposer_api.
     187              : pub struct SimulationApi {
     188              :     os: NodeOs,
     189              :     safekeepers: RefCell<Vec<SafekeeperConn>>,
     190              :     disk: Arc<DiskWalProposer>,
     191              :     redo_start_lsn: Option<Lsn>,
     192              :     last_logged_commit_lsn: u64,
     193              :     shmem: UnsafeCell<walproposer::bindings::WalproposerShmemState>,
     194              :     config: Config,
     195              :     event_set: RefCell<Option<EventSet>>,
     196              : }
     197              : 
     198              : pub struct Args {
     199              :     pub os: NodeOs,
     200              :     pub config: Config,
     201              :     pub disk: Arc<DiskWalProposer>,
     202              :     pub redo_start_lsn: Option<Lsn>,
     203              : }
     204              : 
     205              : impl SimulationApi {
     206         9195 :     pub fn new(args: Args) -> Self {
     207         9195 :         // initialize connection state for each safekeeper
     208         9195 :         let sk_conns = args
     209         9195 :             .config
     210         9195 :             .safekeepers_list
     211         9195 :             .iter()
     212        27585 :             .map(|s| {
     213        27585 :                 SafekeeperConn::new(
     214        27585 :                     s.split(':').next().unwrap().to_string(),
     215        27585 :                     s.split(':').nth(1).unwrap().to_string(),
     216        27585 :                 )
     217        27585 :             })
     218         9195 :             .collect::<Vec<_>>();
     219         9195 : 
     220         9195 :         Self {
     221         9195 :             os: args.os,
     222         9195 :             safekeepers: RefCell::new(sk_conns),
     223         9195 :             disk: args.disk,
     224         9195 :             redo_start_lsn: args.redo_start_lsn,
     225         9195 :             last_logged_commit_lsn: 0,
     226         9195 :             shmem: UnsafeCell::new(walproposer::api_bindings::empty_shmem()),
     227         9195 :             config: args.config,
     228         9195 :             event_set: RefCell::new(None),
     229         9195 :         }
     230         9195 :     }
     231              : 
     232              :     /// Get SafekeeperConn for the given Safekeeper.
     233       309603 :     fn get_conn(&self, sk: &mut walproposer::bindings::Safekeeper) -> RefMut<'_, SafekeeperConn> {
     234       309603 :         let sk_port = unsafe { CStr::from_ptr(sk.port).to_str().unwrap() };
     235       309603 :         let state = self.safekeepers.borrow_mut();
     236       309603 :         RefMut::map(state, |v| {
     237       309603 :             v.iter_mut()
     238       618043 :                 .find(|conn| conn.port == sk_port)
     239       309603 :                 .expect("safekeeper conn not found by port")
     240       309603 :         })
     241       309603 :     }
     242              : }
     243              : 
     244              : impl ApiImpl for SimulationApi {
     245       332368 :     fn get_current_timestamp(&self) -> i64 {
     246       332368 :         debug!("get_current_timestamp");
     247              :         // PG TimestampTZ is microseconds, but simulation unit is assumed to be
     248              :         // milliseconds, so add 10^3
     249       332368 :         self.os.now() as i64 * 1000
     250       332368 :     }
     251              : 
     252         1427 :     fn update_donor(&self, donor: &mut walproposer::bindings::Safekeeper, donor_lsn: u64) {
     253         1427 :         let mut shmem = unsafe { *self.get_shmem_state() };
     254         1427 :         shmem.propEpochStartLsn.value = donor_lsn;
     255         1427 :         shmem.donor_conninfo = donor.conninfo;
     256         1427 :     }
     257              : 
     258        34107 :     fn conn_status(
     259        34107 :         &self,
     260        34107 :         _: &mut walproposer::bindings::Safekeeper,
     261        34107 :     ) -> walproposer::bindings::WalProposerConnStatusType {
     262        34107 :         debug!("conn_status");
     263              :         // break the connection with a 10% chance
     264        34107 :         if self.os.random(100) < 10 {
     265         3399 :             walproposer::bindings::WalProposerConnStatusType_WP_CONNECTION_BAD
     266              :         } else {
     267        30708 :             walproposer::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
     268              :         }
     269        34107 :     }
     270              : 
     271        34107 :     fn conn_connect_start(&self, sk: &mut walproposer::bindings::Safekeeper) {
     272        34107 :         debug!("conn_connect_start");
     273        34107 :         let mut conn = self.get_conn(sk);
     274        34107 : 
     275        34107 :         assert!(conn.socket.is_none());
     276        34107 :         let socket = self.os.open_tcp(conn.node_id);
     277        34107 :         conn.socket = Some(socket);
     278        34107 :         conn.raw_ptr = sk;
     279        34107 :         conn.is_connecting = true;
     280        34107 :     }
     281              : 
     282        30708 :     fn conn_connect_poll(
     283        30708 :         &self,
     284        30708 :         _: &mut walproposer::bindings::Safekeeper,
     285        30708 :     ) -> walproposer::bindings::WalProposerConnectPollStatusType {
     286        30708 :         debug!("conn_connect_poll");
     287              :         // TODO: break the connection here
     288        30708 :         walproposer::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
     289        30708 :     }
     290              : 
     291        30708 :     fn conn_send_query(&self, sk: &mut walproposer::bindings::Safekeeper, query: &str) -> bool {
     292        30708 :         debug!("conn_send_query: {}", query);
     293        30708 :         self.get_conn(sk).is_start_wal_push = true;
     294        30708 :         true
     295        30708 :     }
     296              : 
     297        30708 :     fn conn_get_query_result(
     298        30708 :         &self,
     299        30708 :         _: &mut walproposer::bindings::Safekeeper,
     300        30708 :     ) -> walproposer::bindings::WalProposerExecStatusType {
     301        30708 :         debug!("conn_get_query_result");
     302              :         // TODO: break the connection here
     303        30708 :         walproposer::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
     304        30708 :     }
     305              : 
     306        21986 :     fn conn_async_read(
     307        21986 :         &self,
     308        21986 :         sk: &mut walproposer::bindings::Safekeeper,
     309        21986 :         vec: &mut Vec<u8>,
     310        21986 :     ) -> walproposer::bindings::PGAsyncReadResult {
     311        21986 :         debug!("conn_async_read");
     312        21986 :         let mut conn = self.get_conn(sk);
     313              : 
     314        21986 :         let socket = if let Some(socket) = conn.socket.as_mut() {
     315        21986 :             socket
     316              :         } else {
     317              :             // socket is already closed
     318            0 :             return walproposer::bindings::PGAsyncReadResult_PG_ASYNC_READ_FAIL;
     319              :         };
     320              : 
     321        21986 :         let msg = socket.recv_chan().try_recv();
     322              : 
     323        18944 :         match msg {
     324              :             None => {
     325              :                 // no message is ready
     326         3042 :                 walproposer::bindings::PGAsyncReadResult_PG_ASYNC_READ_TRY_AGAIN
     327              :             }
     328              :             Some(NetEvent::Closed) => {
     329              :                 // connection is closed
     330         7880 :                 debug!("conn_async_read: connection is closed");
     331         7880 :                 conn.socket = None;
     332         7880 :                 walproposer::bindings::PGAsyncReadResult_PG_ASYNC_READ_FAIL
     333              :             }
     334        11064 :             Some(NetEvent::Message(msg)) => {
     335              :                 // got a message
     336        11064 :                 let b = match msg {
     337        11064 :                     desim::proto::AnyMessage::Bytes(b) => b,
     338            0 :                     _ => unreachable!(),
     339              :                 };
     340        11064 :                 vec.extend_from_slice(&b);
     341        11064 :                 walproposer::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
     342              :             }
     343              :         }
     344        21986 :     }
     345              : 
     346        35104 :     fn conn_blocking_write(&self, sk: &mut walproposer::bindings::Safekeeper, buf: &[u8]) -> bool {
     347        35104 :         let mut conn = self.get_conn(sk);
     348        35104 :         debug!("conn_blocking_write to {}: {:?}", conn.node_id, buf);
     349        35104 :         let socket = conn.socket.as_mut().unwrap();
     350        35104 :         socket.send(desim::proto::AnyMessage::Bytes(Bytes::copy_from_slice(buf)));
     351        35104 :         true
     352        35104 :     }
     353              : 
     354         7328 :     fn conn_async_write(
     355         7328 :         &self,
     356         7328 :         sk: &mut walproposer::bindings::Safekeeper,
     357         7328 :         buf: &[u8],
     358         7328 :     ) -> walproposer::bindings::PGAsyncWriteResult {
     359         7328 :         let mut conn = self.get_conn(sk);
     360         7328 :         debug!("conn_async_write to {}: {:?}", conn.node_id, buf);
     361         7328 :         if let Some(socket) = conn.socket.as_mut() {
     362         7328 :             socket.send(desim::proto::AnyMessage::Bytes(Bytes::copy_from_slice(buf)));
     363         7328 :         } else {
     364              :             // connection is already closed
     365            0 :             debug!("conn_async_write: writing to a closed socket!");
     366              :             // TODO: maybe we should return error here?
     367              :         }
     368         7328 :         walproposer::bindings::PGAsyncWriteResult_PG_ASYNC_WRITE_SUCCESS
     369         7328 :     }
     370              : 
     371         1489 :     fn wal_reader_allocate(&self, _: &mut walproposer::bindings::Safekeeper) -> NeonWALReadResult {
     372         1489 :         debug!("wal_reader_allocate");
     373         1489 :         walproposer::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
     374         1489 :     }
     375              : 
     376         1274 :     fn wal_read(
     377         1274 :         &self,
     378         1274 :         _sk: &mut walproposer::bindings::Safekeeper,
     379         1274 :         buf: &mut [u8],
     380         1274 :         startpos: u64,
     381         1274 :     ) -> NeonWALReadResult {
     382         1274 :         self.disk.lock().read(startpos, buf);
     383         1274 :         walproposer::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
     384         1274 :     }
     385              : 
     386         9195 :     fn init_event_set(&self, _: &mut walproposer::bindings::WalProposer) {
     387         9195 :         debug!("init_event_set");
     388         9195 :         let new_event_set = EventSet::new(self.os.clone());
     389         9195 :         let old_event_set = self.event_set.replace(Some(new_event_set));
     390         9195 :         assert!(old_event_set.is_none());
     391         9195 :     }
     392              : 
     393        68036 :     fn update_event_set(&self, sk: &mut walproposer::bindings::Safekeeper, event_mask: u32) {
     394        68036 :         debug!(
     395            0 :             "update_event_set, sk={:?}, events_mask={:#b}",
     396            0 :             sk as *mut walproposer::bindings::Safekeeper, event_mask
     397              :         );
     398        68036 :         let conn = self.get_conn(sk);
     399        68036 : 
     400        68036 :         self.event_set
     401        68036 :             .borrow_mut()
     402        68036 :             .as_mut()
     403        68036 :             .unwrap()
     404        68036 :             .update_event_set(&conn, event_mask);
     405        68036 :     }
     406              : 
     407        61416 :     fn add_safekeeper_event_set(
     408        61416 :         &self,
     409        61416 :         sk: &mut walproposer::bindings::Safekeeper,
     410        61416 :         event_mask: u32,
     411        61416 :     ) {
     412        61416 :         debug!(
     413            0 :             "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
     414            0 :             sk as *mut walproposer::bindings::Safekeeper, event_mask
     415              :         );
     416              : 
     417        61416 :         self.event_set
     418        61416 :             .borrow_mut()
     419        61416 :             .as_mut()
     420        61416 :             .unwrap()
     421        61416 :             .add_safekeeper(&self.get_conn(sk), event_mask);
     422        61416 :     }
     423              : 
     424        38904 :     fn rm_safekeeper_event_set(&self, sk: &mut walproposer::bindings::Safekeeper) {
     425        38904 :         debug!(
     426            0 :             "rm_safekeeper_event_set, sk={:?}",
     427            0 :             sk as *mut walproposer::bindings::Safekeeper,
     428              :         );
     429              : 
     430        38904 :         self.event_set
     431        38904 :             .borrow_mut()
     432        38904 :             .as_mut()
     433        38904 :             .unwrap()
     434        38904 :             .remove_safekeeper(&self.get_conn(sk));
     435        38904 :     }
     436              : 
     437         8881 :     fn active_state_update_event_set(&self, sk: &mut walproposer::bindings::Safekeeper) {
     438         8881 :         debug!("active_state_update_event_set");
     439              : 
     440         8881 :         assert!(sk.state == walproposer::bindings::SafekeeperState_SS_ACTIVE);
     441         8881 :         self.event_set
     442         8881 :             .borrow_mut()
     443         8881 :             .as_mut()
     444         8881 :             .unwrap()
     445         8881 :             .refresh_event_set();
     446         8881 :     }
     447              : 
     448        24912 :     fn wal_reader_events(&self, _sk: &mut walproposer::bindings::Safekeeper) -> u32 {
     449        24912 :         0
     450        24912 :     }
     451              : 
     452        92465 :     fn wait_event_set(
     453        92465 :         &self,
     454        92465 :         _: &mut walproposer::bindings::WalProposer,
     455        92465 :         timeout_millis: i64,
     456        92465 :     ) -> walproposer::walproposer::WaitResult {
     457        92465 :         // TODO: handle multiple stages as part of the simulation (e.g. connect, start_wal_push, etc)
     458        92465 :         let mut conns = self.safekeepers.borrow_mut();
     459       215977 :         for conn in conns.iter_mut() {
     460       215977 :             if conn.socket.is_some() && conn.is_connecting {
     461        30708 :                 conn.is_connecting = false;
     462        30708 :                 debug!("wait_event_set, connecting to {}:{}", conn.host, conn.port);
     463        30708 :                 return walproposer::walproposer::WaitResult::Network(
     464        30708 :                     conn.raw_ptr,
     465        30708 :                     WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE,
     466        30708 :                 );
     467       185269 :             }
     468       185269 :             if conn.socket.is_some() && conn.is_start_wal_push {
     469        30708 :                 conn.is_start_wal_push = false;
     470        30708 :                 debug!(
     471            0 :                     "wait_event_set, start wal push to {}:{}",
     472              :                     conn.host, conn.port
     473              :                 );
     474        30708 :                 return walproposer::walproposer::WaitResult::Network(
     475        30708 :                     conn.raw_ptr,
     476        30708 :                     WL_SOCKET_READABLE,
     477        30708 :                 );
     478       154561 :             }
     479              :         }
     480        31049 :         drop(conns);
     481        31049 : 
     482        31049 :         let res = self
     483        31049 :             .event_set
     484        31049 :             .borrow_mut()
     485        31049 :             .as_mut()
     486        31049 :             .unwrap()
     487        31049 :             .wait(timeout_millis);
     488        31049 : 
     489        31049 :         debug!(
     490            0 :             "wait_event_set, timeout_millis={}, res={:?}",
     491              :             timeout_millis, res,
     492              :         );
     493        22454 :         res
     494        83870 :     }
     495              : 
     496            0 :     fn strong_random(&self, buf: &mut [u8]) -> bool {
     497            0 :         debug!("strong_random");
     498            0 :         buf.fill(0);
     499            0 :         true
     500            0 :     }
     501              : 
     502          452 :     fn finish_sync_safekeepers(&self, lsn: u64) {
     503          452 :         debug!("finish_sync_safekeepers, lsn={}", lsn);
     504          452 :         executor::exit(0, Lsn(lsn).to_string());
     505          452 :     }
     506              : 
     507       157137 :     fn log_internal(&self, _wp: &mut walproposer::bindings::WalProposer, level: Level, msg: &str) {
     508       157137 :         debug!("wp_log[{}] {}", level, msg);
     509       157137 :         if level == Level::Fatal || level == Level::Panic {
     510           73 :             if msg.contains("rejects our connection request with term") {
     511           37 :                 // collected quorum with lower term, then got rejected by next connected safekeeper
     512           37 :                 executor::exit(1, msg.to_owned());
     513           37 :             }
     514           73 :             if msg.contains("collected propTermStartLsn") && msg.contains(", but basebackup LSN ") {
     515            1 :                 // sync-safekeepers collected wrong quorum, walproposer collected another quorum
     516            1 :                 executor::exit(1, msg.to_owned());
     517           72 :             }
     518           73 :             if msg.contains("failed to download WAL for logical replicaiton") {
     519           35 :                 // Recovery connection broken and recovery was failed
     520           35 :                 executor::exit(1, msg.to_owned());
     521           38 :             }
     522           73 :             if msg.contains("missing majority of votes, collected") {
     523            0 :                 // Voting bug when safekeeper disconnects after voting
     524            0 :                 executor::exit(1, msg.to_owned());
     525           73 :             }
     526           73 :             panic!("unknown FATAL error from walproposer: {}", msg);
     527       157064 :         }
     528       157064 :     }
     529              : 
     530          789 :     fn after_election(&self, wp: &mut walproposer::bindings::WalProposer) {
     531          789 :         let prop_lsn = wp.propTermStartLsn;
     532          789 :         let prop_term = wp.propTerm;
     533          789 : 
     534          789 :         let mut prev_lsn: u64 = 0;
     535          789 :         let mut prev_term: u64 = 0;
     536          789 : 
     537          789 :         unsafe {
     538          789 :             let history = wp.propTermHistory.entries;
     539          789 :             let len = wp.propTermHistory.n_entries as usize;
     540          789 :             if len > 1 {
     541          726 :                 let entry = *history.wrapping_add(len - 2);
     542          726 :                 prev_lsn = entry.lsn;
     543          726 :                 prev_term = entry.term;
     544          726 :             }
     545              :         }
     546              : 
     547          789 :         let msg = format!(
     548          789 :             "prop_elected;{};{};{};{}",
     549          789 :             prop_lsn, prop_term, prev_lsn, prev_term
     550          789 :         );
     551          789 : 
     552          789 :         debug!(msg);
     553          789 :         self.os.log_event(msg);
     554          789 :     }
     555              : 
     556          313 :     fn get_redo_start_lsn(&self) -> u64 {
     557          313 :         debug!("get_redo_start_lsn -> {:?}", self.redo_start_lsn);
     558          313 :         self.redo_start_lsn.expect("redo_start_lsn is not set").0
     559          313 :     }
     560              : 
     561         3007 :     fn get_shmem_state(&self) -> *mut walproposer::bindings::WalproposerShmemState {
     562         3007 :         self.shmem.get()
     563         3007 :     }
     564              : 
     565          242 :     fn start_streaming(
     566          242 :         &self,
     567          242 :         startpos: u64,
     568          242 :         callback: &walproposer::walproposer::StreamingCallback,
     569          242 :     ) {
     570          242 :         let disk = &self.disk;
     571          242 :         let disk_lsn = disk.lock().flush_rec_ptr().0;
     572          242 :         debug!("start_streaming at {} (disk_lsn={})", startpos, disk_lsn);
     573          242 :         if startpos < disk_lsn {
     574           65 :             debug!(
     575            0 :                 "startpos < disk_lsn, it means we wrote some transaction even before streaming started"
     576              :             );
     577          177 :         }
     578          242 :         assert!(startpos <= disk_lsn);
     579          242 :         let mut broadcasted = Lsn(startpos);
     580              : 
     581              :         loop {
     582          947 :             let available = disk.lock().flush_rec_ptr();
     583          947 :             assert!(available >= broadcasted);
     584          705 :             callback.broadcast(broadcasted, available);
     585          705 :             broadcasted = available;
     586          705 :             callback.poll();
     587              :         }
     588              :     }
     589              : 
     590         3304 :     fn process_safekeeper_feedback(
     591         3304 :         &mut self,
     592         3304 :         wp: &mut walproposer::bindings::WalProposer,
     593         3304 :         _sk: &mut walproposer::bindings::Safekeeper,
     594         3304 :     ) {
     595         3304 :         debug!("process_safekeeper_feedback, commit_lsn={}", wp.commitLsn);
     596         3304 :         if wp.commitLsn > self.last_logged_commit_lsn {
     597          718 :             self.os.log_event(format!("commit_lsn;{}", wp.commitLsn));
     598          718 :             self.last_logged_commit_lsn = wp.commitLsn;
     599         2586 :         }
     600         3304 :     }
     601              : 
     602          124 :     fn get_flush_rec_ptr(&self) -> u64 {
     603          124 :         let lsn = self.disk.lock().flush_rec_ptr();
     604          124 :         debug!("get_flush_rec_ptr: {}", lsn);
     605          124 :         lsn.0
     606          124 :     }
     607              : 
     608          789 :     fn recovery_download(
     609          789 :         &self,
     610          789 :         wp: &mut walproposer::bindings::WalProposer,
     611          789 :         sk: &mut walproposer::bindings::Safekeeper,
     612          789 :     ) -> bool {
     613          789 :         let mut startpos = wp.truncateLsn;
     614          789 :         let endpos = wp.propTermStartLsn;
     615          789 : 
     616          789 :         if startpos == endpos {
     617          370 :             debug!("recovery_download: nothing to download");
     618          370 :             return true;
     619          419 :         }
     620          419 : 
     621          419 :         debug!("recovery_download from {} to {}", startpos, endpos,);
     622              : 
     623          419 :         let replication_prompt = format!(
     624          419 :             "START_REPLICATION {} {} {} {}",
     625          419 :             self.config.ttid.tenant_id, self.config.ttid.timeline_id, startpos, endpos,
     626          419 :         );
     627          419 :         let async_conn = self.get_conn(sk);
     628          419 : 
     629          419 :         let conn = self.os.open_tcp(async_conn.node_id);
     630          419 :         conn.send(desim::proto::AnyMessage::Bytes(replication_prompt.into()));
     631          419 : 
     632          419 :         let chan = conn.recv_chan();
     633          728 :         while startpos < endpos {
     634          419 :             let event = chan.recv();
     635          384 :             match event {
     636              :                 NetEvent::Closed => {
     637           35 :                     debug!("connection closed in recovery");
     638           35 :                     break;
     639              :                 }
     640          384 :                 NetEvent::Message(AnyMessage::Bytes(b)) => {
     641          384 :                     debug!("got recovery bytes from safekeeper");
     642          309 :                     self.disk.lock().write(startpos, &b);
     643          309 :                     startpos += b.len() as u64;
     644              :                 }
     645            0 :                 NetEvent::Message(_) => unreachable!(),
     646              :             }
     647              :         }
     648              : 
     649          344 :         debug!("recovery finished at {}", startpos);
     650              : 
     651          344 :         startpos == endpos
     652          714 :     }
     653              : 
     654        11595 :     fn conn_finish(&self, sk: &mut walproposer::bindings::Safekeeper) {
     655        11595 :         let mut conn = self.get_conn(sk);
     656        11595 :         debug!("conn_finish to {}", conn.node_id);
     657        11595 :         if let Some(socket) = conn.socket.as_mut() {
     658         3712 :             socket.close();
     659         7883 :         } else {
     660         7883 :             // connection is already closed
     661         7883 :         }
     662        11595 :         conn.socket = None;
     663        11595 :     }
     664              : 
     665        11279 :     fn conn_error_message(&self, _sk: &mut walproposer::bindings::Safekeeper) -> String {
     666        11279 :         "connection is closed, probably".into()
     667        11279 :     }
     668              : }
        

Generated by: LCOV version 2.1-beta