LCOV - code coverage report
Current view: top level - safekeeper/tests/walproposer_sim - walproposer_api.rs (source / functions) Coverage Total Hit
Test: 36bb8dd7c7efcb53483d1a7d9f7cb33e8406dcf0.info Lines: 97.0 % 507 492
Test Date: 2024-04-08 10:22:05 Functions: 80.5 % 246 198

            Line data    Source code
       1              : use std::{
       2              :     cell::{RefCell, RefMut, UnsafeCell},
       3              :     ffi::CStr,
       4              :     sync::Arc,
       5              : };
       6              : 
       7              : use bytes::Bytes;
       8              : use desim::{
       9              :     executor::{self, PollSome},
      10              :     network::TCP,
      11              :     node_os::NodeOs,
      12              :     proto::{AnyMessage, NetEvent, NodeEvent},
      13              :     world::NodeId,
      14              : };
      15              : use tracing::debug;
      16              : use utils::lsn::Lsn;
      17              : use walproposer::{
      18              :     api_bindings::Level,
      19              :     bindings::{
      20              :         pg_atomic_uint64, NeonWALReadResult, PageserverFeedback, SafekeeperStateDesiredEvents,
      21              :         WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE,
      22              :     },
      23              :     walproposer::{ApiImpl, Config},
      24              : };
      25              : 
      26              : use super::walproposer_disk::DiskWalProposer;
      27              : 
      28              : /// Special state for each wp->sk connection.
      29              : struct SafekeeperConn {
      30              :     host: String,
      31              :     port: String,
      32              :     node_id: NodeId,
      33              :     // socket is Some(..) equals to connection is established
      34              :     socket: Option<TCP>,
      35              :     // connection is in progress
      36              :     is_connecting: bool,
      37              :     // START_WAL_PUSH is in progress
      38              :     is_start_wal_push: bool,
      39              :     // pointer to Safekeeper in walproposer for callbacks
      40              :     raw_ptr: *mut walproposer::bindings::Safekeeper,
      41              : }
      42              : 
      43              : impl SafekeeperConn {
      44       219426 :     pub fn new(host: String, port: String) -> Self {
      45       219426 :         // port number is the same as NodeId
      46       219426 :         let port_num = port.parse::<u32>().unwrap();
      47       219426 :         Self {
      48       219426 :             host,
      49       219426 :             port,
      50       219426 :             node_id: port_num,
      51       219426 :             socket: None,
      52       219426 :             is_connecting: false,
      53       219426 :             is_start_wal_push: false,
      54       219426 :             raw_ptr: std::ptr::null_mut(),
      55       219426 :         }
      56       219426 :     }
      57              : }
      58              : 
      59              : /// Simulation version of a postgres WaitEventSet. At pos 0 there is always
      60              : /// a special NodeEvents channel, which is used as a latch.
      61              : struct EventSet {
      62              :     os: NodeOs,
      63              :     // all pollable channels, 0 is always NodeEvent channel
      64              :     chans: Vec<Box<dyn PollSome>>,
      65              :     // 0 is always nullptr
      66              :     sk_ptrs: Vec<*mut walproposer::bindings::Safekeeper>,
      67              :     // event mask for each channel
      68              :     masks: Vec<u32>,
      69              : }
      70              : 
      71              : impl EventSet {
      72        73142 :     pub fn new(os: NodeOs) -> Self {
      73        73142 :         let node_events = os.node_events();
      74        73142 :         Self {
      75        73142 :             os,
      76        73142 :             chans: vec![Box::new(node_events)],
      77        73142 :             sk_ptrs: vec![std::ptr::null_mut()],
      78        73142 :             masks: vec![WL_SOCKET_READABLE],
      79        73142 :         }
      80        73142 :     }
      81              : 
      82              :     /// Leaves all readable channels at the beginning of the array.
      83       218030 :     fn sort_readable(&mut self) -> usize {
      84       218030 :         let mut cnt = 1;
      85       513080 :         for i in 1..self.chans.len() {
      86       513080 :             if self.masks[i] & WL_SOCKET_READABLE != 0 {
      87       513080 :                 self.chans.swap(i, cnt);
      88       513080 :                 self.sk_ptrs.swap(i, cnt);
      89       513080 :                 self.masks.swap(i, cnt);
      90       513080 :                 cnt += 1;
      91       513080 :             }
      92              :         }
      93       218030 :         cnt
      94       218030 :     }
      95              : 
      96       533911 :     fn update_event_set(&mut self, conn: &SafekeeperConn, event_mask: u32) {
      97       533911 :         let index = self
      98       533911 :             .sk_ptrs
      99       533911 :             .iter()
     100      2017446 :             .position(|&ptr| ptr == conn.raw_ptr)
     101       533911 :             .expect("safekeeper should exist in event set");
     102       533911 :         self.masks[index] = event_mask;
     103       533911 :     }
     104              : 
     105       485600 :     fn add_safekeeper(&mut self, sk: &SafekeeperConn, event_mask: u32) {
     106      1174788 :         for ptr in self.sk_ptrs.iter() {
     107      1174788 :             assert!(*ptr != sk.raw_ptr);
     108              :         }
     109              : 
     110       485600 :         self.chans.push(Box::new(
     111       485600 :             sk.socket
     112       485600 :                 .as_ref()
     113       485600 :                 .expect("socket should not be closed")
     114       485600 :                 .recv_chan(),
     115       485600 :         ));
     116       485600 :         self.sk_ptrs.push(sk.raw_ptr);
     117       485600 :         self.masks.push(event_mask);
     118       485600 :     }
     119              : 
     120       306963 :     fn remove_safekeeper(&mut self, sk: &SafekeeperConn) {
     121       652592 :         let index = self.sk_ptrs.iter().position(|&ptr| ptr == sk.raw_ptr);
     122       306963 :         if index.is_none() {
     123           41 :             debug!("remove_safekeeper: sk={:?} not found", sk.raw_ptr);
     124           41 :             return;
     125       306922 :         }
     126       306922 :         let index = index.unwrap();
     127       306922 : 
     128       306922 :         self.chans.remove(index);
     129       306922 :         self.sk_ptrs.remove(index);
     130       306922 :         self.masks.remove(index);
     131       306922 : 
     132       306922 :         // to simulate the actual behaviour
     133       306922 :         self.refresh_event_set();
     134       306963 :     }
     135              : 
     136              :     /// Updates all masks to match the result of a SafekeeperStateDesiredEvents.
     137       342053 :     fn refresh_event_set(&mut self) {
     138       958933 :         for (i, mask) in self.masks.iter_mut().enumerate() {
     139       958933 :             if i == 0 {
     140       342053 :                 continue;
     141       616880 :             }
     142       616880 : 
     143       616880 :             let mut mask_sk: u32 = 0;
     144       616880 :             let mut mask_nwr: u32 = 0;
     145       616880 :             unsafe { SafekeeperStateDesiredEvents(self.sk_ptrs[i], &mut mask_sk, &mut mask_nwr) };
     146       616880 : 
     147       616880 :             if mask_sk != *mask {
     148            0 :                 debug!(
     149            0 :                     "refresh_event_set: sk={:?}, old_mask={:#b}, new_mask={:#b}",
     150            0 :                     self.sk_ptrs[i], *mask, mask_sk
     151            0 :                 );
     152            0 :                 *mask = mask_sk;
     153       616880 :             }
     154              :         }
     155       342053 :     }
     156              : 
     157              :     /// Wait for events on all channels.
     158       218030 :     fn wait(&mut self, timeout_millis: i64) -> walproposer::walproposer::WaitResult {
     159              :         // all channels are always writeable
     160       731110 :         for (i, mask) in self.masks.iter().enumerate() {
     161       731110 :             if *mask & WL_SOCKET_WRITEABLE != 0 {
     162            0 :                 return walproposer::walproposer::WaitResult::Network(
     163            0 :                     self.sk_ptrs[i],
     164            0 :                     WL_SOCKET_WRITEABLE,
     165            0 :                 );
     166       731110 :             }
     167              :         }
     168              : 
     169       218030 :         let cnt = self.sort_readable();
     170       218030 : 
     171       218030 :         let slice = &self.chans[0..cnt];
     172       218030 :         match executor::epoll_chans(slice, timeout_millis) {
     173        91480 :             None => walproposer::walproposer::WaitResult::Timeout,
     174              :             Some(0) => {
     175         2852 :                 let msg = self.os.node_events().must_recv();
     176         2852 :                 match msg {
     177         2852 :                     NodeEvent::Internal(AnyMessage::Just32(0)) => {
     178         2852 :                         // got a notification about new WAL available
     179         2852 :                     }
     180            0 :                     NodeEvent::Internal(_) => unreachable!(),
     181            0 :                     NodeEvent::Accept(_) => unreachable!(),
     182              :                 }
     183         2852 :                 walproposer::walproposer::WaitResult::Latch
     184              :             }
     185       123698 :             Some(index) => walproposer::walproposer::WaitResult::Network(
     186       123698 :                 self.sk_ptrs[index],
     187       123698 :                 WL_SOCKET_READABLE,
     188       123698 :             ),
     189              :         }
     190       218030 :     }
     191              : }
     192              : 
     193              : /// This struct handles all calls from walproposer into walproposer_api.
     194              : pub struct SimulationApi {
     195              :     os: NodeOs,
     196              :     safekeepers: RefCell<Vec<SafekeeperConn>>,
     197              :     disk: Arc<DiskWalProposer>,
     198              :     redo_start_lsn: Option<Lsn>,
     199              :     last_logged_commit_lsn: u64,
     200              :     shmem: UnsafeCell<walproposer::bindings::WalproposerShmemState>,
     201              :     config: Config,
     202              :     event_set: RefCell<Option<EventSet>>,
     203              : }
     204              : 
     205              : pub struct Args {
     206              :     pub os: NodeOs,
     207              :     pub config: Config,
     208              :     pub disk: Arc<DiskWalProposer>,
     209              :     pub redo_start_lsn: Option<Lsn>,
     210              : }
     211              : 
     212              : impl SimulationApi {
     213        73142 :     pub fn new(args: Args) -> Self {
     214        73142 :         // initialize connection state for each safekeeper
     215        73142 :         let sk_conns = args
     216        73142 :             .config
     217        73142 :             .safekeepers_list
     218        73142 :             .iter()
     219       219426 :             .map(|s| {
     220       219426 :                 SafekeeperConn::new(
     221       219426 :                     s.split(':').next().unwrap().to_string(),
     222       219426 :                     s.split(':').nth(1).unwrap().to_string(),
     223       219426 :                 )
     224       219426 :             })
     225        73142 :             .collect::<Vec<_>>();
     226        73142 : 
     227        73142 :         let empty_feedback = PageserverFeedback {
     228        73142 :             present: false,
     229        73142 :             currentClusterSize: 0,
     230        73142 :             last_received_lsn: 0,
     231        73142 :             disk_consistent_lsn: 0,
     232        73142 :             remote_consistent_lsn: 0,
     233        73142 :             replytime: 0,
     234        73142 :             shard_number: 0,
     235        73142 :         };
     236        73142 : 
     237        73142 :         Self {
     238        73142 :             os: args.os,
     239        73142 :             safekeepers: RefCell::new(sk_conns),
     240        73142 :             disk: args.disk,
     241        73142 :             redo_start_lsn: args.redo_start_lsn,
     242        73142 :             last_logged_commit_lsn: 0,
     243        73142 :             shmem: UnsafeCell::new(walproposer::bindings::WalproposerShmemState {
     244        73142 :                 mutex: 0,
     245        73142 :                 mineLastElectedTerm: 0,
     246        73142 :                 backpressureThrottlingTime: pg_atomic_uint64 { value: 0 },
     247        73142 :                 currentClusterSize: pg_atomic_uint64 { value: 0 },
     248        73142 :                 shard_ps_feedback: [empty_feedback; 128],
     249        73142 :                 num_shards: 0,
     250        73142 :                 min_ps_feedback: empty_feedback,
     251        73142 :             }),
     252        73142 :             config: args.config,
     253        73142 :             event_set: RefCell::new(None),
     254        73142 :         }
     255        73142 :     }
     256              : 
     257              :     /// Get SafekeeperConn for the given Safekeeper.
     258      2367248 :     fn get_conn(&self, sk: &mut walproposer::bindings::Safekeeper) -> RefMut<'_, SafekeeperConn> {
     259      2367248 :         let sk_port = unsafe { CStr::from_ptr(sk.port).to_str().unwrap() };
     260      2367248 :         let state = self.safekeepers.borrow_mut();
     261      2367248 :         RefMut::map(state, |v| {
     262      2367248 :             v.iter_mut()
     263      4730747 :                 .find(|conn| conn.port == sk_port)
     264      2367248 :                 .expect("safekeeper conn not found by port")
     265      2367248 :         })
     266      2367248 :     }
     267              : }
     268              : 
     269              : impl ApiImpl for SimulationApi {
     270      2523782 :     fn get_current_timestamp(&self) -> i64 {
     271      2523782 :         debug!("get_current_timestamp");
     272              :         // PG TimestampTZ is microseconds, but simulation unit is assumed to be
     273              :         // milliseconds, so add 10^3
     274      2523782 :         self.os.now() as i64 * 1000
     275      2523782 :     }
     276              : 
     277       269854 :     fn conn_status(
     278       269854 :         &self,
     279       269854 :         _: &mut walproposer::bindings::Safekeeper,
     280       269854 :     ) -> walproposer::bindings::WalProposerConnStatusType {
     281       269854 :         debug!("conn_status");
     282              :         // break the connection with a 10% chance
     283       269854 :         if self.os.random(100) < 10 {
     284        27054 :             walproposer::bindings::WalProposerConnStatusType_WP_CONNECTION_BAD
     285              :         } else {
     286       242800 :             walproposer::bindings::WalProposerConnStatusType_WP_CONNECTION_OK
     287              :         }
     288       269854 :     }
     289              : 
     290       269854 :     fn conn_connect_start(&self, sk: &mut walproposer::bindings::Safekeeper) {
     291       269854 :         debug!("conn_connect_start");
     292       269854 :         let mut conn = self.get_conn(sk);
     293       269854 : 
     294       269854 :         assert!(conn.socket.is_none());
     295       269854 :         let socket = self.os.open_tcp(conn.node_id);
     296       269854 :         conn.socket = Some(socket);
     297       269854 :         conn.raw_ptr = sk;
     298       269854 :         conn.is_connecting = true;
     299       269854 :     }
     300              : 
     301       242800 :     fn conn_connect_poll(
     302       242800 :         &self,
     303       242800 :         _: &mut walproposer::bindings::Safekeeper,
     304       242800 :     ) -> walproposer::bindings::WalProposerConnectPollStatusType {
     305       242800 :         debug!("conn_connect_poll");
     306              :         // TODO: break the connection here
     307       242800 :         walproposer::bindings::WalProposerConnectPollStatusType_WP_CONN_POLLING_OK
     308       242800 :     }
     309              : 
     310       242800 :     fn conn_send_query(&self, sk: &mut walproposer::bindings::Safekeeper, query: &str) -> bool {
     311       242800 :         debug!("conn_send_query: {}", query);
     312       242800 :         self.get_conn(sk).is_start_wal_push = true;
     313       242800 :         true
     314       242800 :     }
     315              : 
     316       242800 :     fn conn_get_query_result(
     317       242800 :         &self,
     318       242800 :         _: &mut walproposer::bindings::Safekeeper,
     319       242800 :     ) -> walproposer::bindings::WalProposerExecStatusType {
     320       242800 :         debug!("conn_get_query_result");
     321              :         // TODO: break the connection here
     322       242800 :         walproposer::bindings::WalProposerExecStatusType_WP_EXEC_SUCCESS_COPYBOTH
     323       242800 :     }
     324              : 
     325       133267 :     fn conn_async_read(
     326       133267 :         &self,
     327       133267 :         sk: &mut walproposer::bindings::Safekeeper,
     328       133267 :         vec: &mut Vec<u8>,
     329       133267 :     ) -> walproposer::bindings::PGAsyncReadResult {
     330       133267 :         debug!("conn_async_read");
     331       133267 :         let mut conn = self.get_conn(sk);
     332              : 
     333       133267 :         let socket = if let Some(socket) = conn.socket.as_mut() {
     334       133267 :             socket
     335              :         } else {
     336              :             // socket is already closed
     337            0 :             return walproposer::bindings::PGAsyncReadResult_PG_ASYNC_READ_FAIL;
     338              :         };
     339              : 
     340       133267 :         let msg = socket.recv_chan().try_recv();
     341              : 
     342       121987 :         match msg {
     343              :             None => {
     344              :                 // no message is ready
     345        11280 :                 walproposer::bindings::PGAsyncReadResult_PG_ASYNC_READ_TRY_AGAIN
     346              :             }
     347              :             Some(NetEvent::Closed) => {
     348              :                 // connection is closed
     349        61960 :                 debug!("conn_async_read: connection is closed");
     350        61960 :                 conn.socket = None;
     351        61960 :                 walproposer::bindings::PGAsyncReadResult_PG_ASYNC_READ_FAIL
     352              :             }
     353        60027 :             Some(NetEvent::Message(msg)) => {
     354              :                 // got a message
     355        60027 :                 let b = match msg {
     356        60027 :                     desim::proto::AnyMessage::Bytes(b) => b,
     357            0 :                     _ => unreachable!(),
     358              :                 };
     359        60027 :                 vec.extend_from_slice(&b);
     360        60027 :                 walproposer::bindings::PGAsyncReadResult_PG_ASYNC_READ_SUCCESS
     361              :             }
     362              :         }
     363       133267 :     }
     364              : 
     365       270676 :     fn conn_blocking_write(&self, sk: &mut walproposer::bindings::Safekeeper, buf: &[u8]) -> bool {
     366       270676 :         let mut conn = self.get_conn(sk);
     367       270676 :         debug!("conn_blocking_write to {}: {:?}", conn.node_id, buf);
     368       270676 :         let socket = conn.socket.as_mut().unwrap();
     369       270676 :         socket.send(desim::proto::AnyMessage::Bytes(Bytes::copy_from_slice(buf)));
     370       270676 :         true
     371       270676 :     }
     372              : 
     373        30872 :     fn conn_async_write(
     374        30872 :         &self,
     375        30872 :         sk: &mut walproposer::bindings::Safekeeper,
     376        30872 :         buf: &[u8],
     377        30872 :     ) -> walproposer::bindings::PGAsyncWriteResult {
     378        30872 :         let mut conn = self.get_conn(sk);
     379        30872 :         debug!("conn_async_write to {}: {:?}", conn.node_id, buf);
     380        30872 :         if let Some(socket) = conn.socket.as_mut() {
     381        30872 :             socket.send(desim::proto::AnyMessage::Bytes(Bytes::copy_from_slice(buf)));
     382        30872 :         } else {
     383              :             // connection is already closed
     384            0 :             debug!("conn_async_write: writing to a closed socket!");
     385              :             // TODO: maybe we should return error here?
     386              :         }
     387        30872 :         walproposer::bindings::PGAsyncWriteResult_PG_ASYNC_WRITE_SUCCESS
     388        30872 :     }
     389              : 
     390         7021 :     fn wal_reader_allocate(&self, _: &mut walproposer::bindings::Safekeeper) -> NeonWALReadResult {
     391         7021 :         debug!("wal_reader_allocate");
     392         7021 :         walproposer::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
     393         7021 :     }
     394              : 
     395        23851 :     fn wal_read(
     396        23851 :         &self,
     397        23851 :         _sk: &mut walproposer::bindings::Safekeeper,
     398        23851 :         buf: &mut [u8],
     399        23851 :         startpos: u64,
     400        23851 :     ) -> NeonWALReadResult {
     401        23851 :         self.disk.lock().read(startpos, buf);
     402        23851 :         walproposer::bindings::NeonWALReadResult_NEON_WALREAD_SUCCESS
     403        23851 :     }
     404              : 
     405        73142 :     fn init_event_set(&self, _: &mut walproposer::bindings::WalProposer) {
     406        73142 :         debug!("init_event_set");
     407        73142 :         let new_event_set = EventSet::new(self.os.clone());
     408        73142 :         let old_event_set = self.event_set.replace(Some(new_event_set));
     409        73142 :         assert!(old_event_set.is_none());
     410        73142 :     }
     411              : 
     412       533911 :     fn update_event_set(&self, sk: &mut walproposer::bindings::Safekeeper, event_mask: u32) {
     413       533911 :         debug!(
     414         1166 :             "update_event_set, sk={:?}, events_mask={:#b}",
     415         1166 :             sk as *mut walproposer::bindings::Safekeeper, event_mask
     416         1166 :         );
     417       533911 :         let conn = self.get_conn(sk);
     418       533911 : 
     419       533911 :         self.event_set
     420       533911 :             .borrow_mut()
     421       533911 :             .as_mut()
     422       533911 :             .unwrap()
     423       533911 :             .update_event_set(&conn, event_mask);
     424       533911 :     }
     425              : 
     426       485600 :     fn add_safekeeper_event_set(
     427       485600 :         &self,
     428       485600 :         sk: &mut walproposer::bindings::Safekeeper,
     429       485600 :         event_mask: u32,
     430       485600 :     ) {
     431       485600 :         debug!(
     432          728 :             "add_safekeeper_event_set, sk={:?}, events_mask={:#b}",
     433          728 :             sk as *mut walproposer::bindings::Safekeeper, event_mask
     434          728 :         );
     435              : 
     436       485600 :         self.event_set
     437       485600 :             .borrow_mut()
     438       485600 :             .as_mut()
     439       485600 :             .unwrap()
     440       485600 :             .add_safekeeper(&self.get_conn(sk), event_mask);
     441       485600 :     }
     442              : 
     443       306963 :     fn rm_safekeeper_event_set(&self, sk: &mut walproposer::bindings::Safekeeper) {
     444       306963 :         debug!(
     445          400 :             "rm_safekeeper_event_set, sk={:?}",
     446          400 :             sk as *mut walproposer::bindings::Safekeeper,
     447          400 :         );
     448              : 
     449       306963 :         self.event_set
     450       306963 :             .borrow_mut()
     451       306963 :             .as_mut()
     452       306963 :             .unwrap()
     453       306963 :             .remove_safekeeper(&self.get_conn(sk));
     454       306963 :     }
     455              : 
     456        35131 :     fn active_state_update_event_set(&self, sk: &mut walproposer::bindings::Safekeeper) {
     457        35131 :         debug!("active_state_update_event_set");
     458              : 
     459        35131 :         assert!(sk.state == walproposer::bindings::SafekeeperState_SS_ACTIVE);
     460        35131 :         self.event_set
     461        35131 :             .borrow_mut()
     462        35131 :             .as_mut()
     463        35131 :             .unwrap()
     464        35131 :             .refresh_event_set();
     465        35131 :     }
     466              : 
     467        99168 :     fn wal_reader_events(&self, _sk: &mut walproposer::bindings::Safekeeper) -> u32 {
     468        99168 :         0
     469        99168 :     }
     470              : 
     471       703630 :     fn wait_event_set(
     472       703630 :         &self,
     473       703630 :         _: &mut walproposer::bindings::WalProposer,
     474       703630 :         timeout_millis: i64,
     475       703630 :     ) -> walproposer::walproposer::WaitResult {
     476       703630 :         // TODO: handle multiple stages as part of the simulation (e.g. connect, start_wal_push, etc)
     477       703630 :         let mut conns = self.safekeepers.borrow_mut();
     478      1625158 :         for conn in conns.iter_mut() {
     479      1625158 :             if conn.socket.is_some() && conn.is_connecting {
     480       242800 :                 conn.is_connecting = false;
     481       242800 :                 debug!("wait_event_set, connecting to {}:{}", conn.host, conn.port);
     482       242800 :                 return walproposer::walproposer::WaitResult::Network(
     483       242800 :                     conn.raw_ptr,
     484       242800 :                     WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE,
     485       242800 :                 );
     486      1382358 :             }
     487      1382358 :             if conn.socket.is_some() && conn.is_start_wal_push {
     488       242800 :                 conn.is_start_wal_push = false;
     489       242800 :                 debug!(
     490          364 :                     "wait_event_set, start wal push to {}:{}",
     491          364 :                     conn.host, conn.port
     492          364 :                 );
     493       242800 :                 return walproposer::walproposer::WaitResult::Network(
     494       242800 :                     conn.raw_ptr,
     495       242800 :                     WL_SOCKET_READABLE,
     496       242800 :                 );
     497      1139558 :             }
     498              :         }
     499       218030 :         drop(conns);
     500       218030 : 
     501       218030 :         let res = self
     502       218030 :             .event_set
     503       218030 :             .borrow_mut()
     504       218030 :             .as_mut()
     505       218030 :             .unwrap()
     506       218030 :             .wait(timeout_millis);
     507       218030 : 
     508       218030 :         debug!(
     509          690 :             "wait_event_set, timeout_millis={}, res={:?}",
     510          690 :             timeout_millis, res,
     511          690 :         );
     512       148625 :         res
     513       634225 :     }
     514              : 
     515        73142 :     fn strong_random(&self, buf: &mut [u8]) -> bool {
     516        73142 :         debug!("strong_random");
     517        73142 :         buf.fill(0);
     518        73142 :         true
     519        73142 :     }
     520              : 
     521         2663 :     fn finish_sync_safekeepers(&self, lsn: u64) {
     522         2663 :         debug!("finish_sync_safekeepers, lsn={}", lsn);
     523         2663 :         executor::exit(0, Lsn(lsn).to_string());
     524         2663 :     }
     525              : 
     526       713869 :     fn log_internal(&self, _wp: &mut walproposer::bindings::WalProposer, level: Level, msg: &str) {
     527       713869 :         debug!("wp_log[{}] {}", level, msg);
     528       713869 :         if level == Level::Fatal || level == Level::Panic {
     529          546 :             if msg.contains("rejects our connection request with term") {
     530          310 :                 // collected quorum with lower term, then got rejected by next connected safekeeper
     531          310 :                 executor::exit(1, msg.to_owned());
     532          310 :             }
     533          546 :             if msg.contains("collected propEpochStartLsn") && msg.contains(", but basebackup LSN ")
     534           19 :             {
     535           19 :                 // sync-safekeepers collected wrong quorum, walproposer collected another quorum
     536           19 :                 executor::exit(1, msg.to_owned());
     537          527 :             }
     538          546 :             if msg.contains("failed to download WAL for logical replicaiton") {
     539          142 :                 // Recovery connection broken and recovery was failed
     540          142 :                 executor::exit(1, msg.to_owned());
     541          404 :             }
     542          546 :             if msg.contains("missing majority of votes, collected") {
     543           75 :                 // Voting bug when safekeeper disconnects after voting
     544           75 :                 executor::exit(1, msg.to_owned());
     545          471 :             }
     546          546 :             panic!("unknown FATAL error from walproposer: {}", msg);
     547       713323 :         }
     548       713323 :     }
     549              : 
     550         5410 :     fn after_election(&self, wp: &mut walproposer::bindings::WalProposer) {
     551         5410 :         let prop_lsn = wp.propEpochStartLsn;
     552         5410 :         let prop_term = wp.propTerm;
     553         5410 : 
     554         5410 :         let mut prev_lsn: u64 = 0;
     555         5410 :         let mut prev_term: u64 = 0;
     556         5410 : 
     557         5410 :         unsafe {
     558         5410 :             let history = wp.propTermHistory.entries;
     559         5410 :             let len = wp.propTermHistory.n_entries as usize;
     560         5410 :             if len > 1 {
     561         3511 :                 let entry = *history.wrapping_add(len - 2);
     562         3511 :                 prev_lsn = entry.lsn;
     563         3511 :                 prev_term = entry.term;
     564         3511 :             }
     565              :         }
     566              : 
     567         5410 :         let msg = format!(
     568         5410 :             "prop_elected;{};{};{};{}",
     569         5410 :             prop_lsn, prop_term, prev_lsn, prev_term
     570         5410 :         );
     571         5410 : 
     572         5410 :         debug!(msg);
     573         5410 :         self.os.log_event(msg);
     574         5410 :     }
     575              : 
     576         1994 :     fn get_redo_start_lsn(&self) -> u64 {
     577         1994 :         debug!("get_redo_start_lsn -> {:?}", self.redo_start_lsn);
     578         1994 :         self.redo_start_lsn.expect("redo_start_lsn is not set").0
     579         1994 :     }
     580              : 
     581         1162 :     fn get_shmem_state(&self) -> *mut walproposer::bindings::WalproposerShmemState {
     582         1162 :         self.shmem.get()
     583         1162 :     }
     584              : 
     585         1121 :     fn start_streaming(
     586         1121 :         &self,
     587         1121 :         startpos: u64,
     588         1121 :         callback: &walproposer::walproposer::StreamingCallback,
     589         1121 :     ) {
     590         1121 :         let disk = &self.disk;
     591         1121 :         let disk_lsn = disk.lock().flush_rec_ptr().0;
     592         1121 :         debug!("start_streaming at {} (disk_lsn={})", startpos, disk_lsn);
     593         1121 :         if startpos < disk_lsn {
     594          322 :             debug!("startpos < disk_lsn, it means we wrote some transaction even before streaming started");
     595          799 :         }
     596         1121 :         assert!(startpos <= disk_lsn);
     597         1121 :         let mut broadcasted = Lsn(startpos);
     598              : 
     599              :         loop {
     600         3774 :             let available = disk.lock().flush_rec_ptr();
     601         3774 :             assert!(available >= broadcasted);
     602         2653 :             callback.broadcast(broadcasted, available);
     603         2653 :             broadcasted = available;
     604         2653 :             callback.poll();
     605              :         }
     606              :     }
     607              : 
     608        12151 :     fn process_safekeeper_feedback(
     609        12151 :         &mut self,
     610        12151 :         wp: &mut walproposer::bindings::WalProposer,
     611        12151 :         _sk: &mut walproposer::bindings::Safekeeper,
     612        12151 :     ) {
     613        12151 :         debug!("process_safekeeper_feedback, commit_lsn={}", wp.commitLsn);
     614        12151 :         if wp.commitLsn > self.last_logged_commit_lsn {
     615         2593 :             self.os.log_event(format!("commit_lsn;{}", wp.commitLsn));
     616         2593 :             self.last_logged_commit_lsn = wp.commitLsn;
     617         9558 :         }
     618        12151 :     }
     619              : 
     620          809 :     fn get_flush_rec_ptr(&self) -> u64 {
     621          809 :         let lsn = self.disk.lock().flush_rec_ptr();
     622          809 :         debug!("get_flush_rec_ptr: {}", lsn);
     623          809 :         lsn.0
     624          809 :     }
     625              : 
     626         5410 :     fn recovery_download(
     627         5410 :         &self,
     628         5410 :         wp: &mut walproposer::bindings::WalProposer,
     629         5410 :         sk: &mut walproposer::bindings::Safekeeper,
     630         5410 :     ) -> bool {
     631         5410 :         let mut startpos = wp.truncateLsn;
     632         5410 :         let endpos = wp.propEpochStartLsn;
     633         5410 : 
     634         5410 :         if startpos == endpos {
     635         3322 :             debug!("recovery_download: nothing to download");
     636         3322 :             return true;
     637         2088 :         }
     638         2088 : 
     639         2088 :         debug!("recovery_download from {} to {}", startpos, endpos,);
     640              : 
     641         2088 :         let replication_prompt = format!(
     642         2088 :             "START_REPLICATION {} {} {} {}",
     643         2088 :             self.config.ttid.tenant_id, self.config.ttid.timeline_id, startpos, endpos,
     644         2088 :         );
     645         2088 :         let async_conn = self.get_conn(sk);
     646         2088 : 
     647         2088 :         let conn = self.os.open_tcp(async_conn.node_id);
     648         2088 :         conn.send(desim::proto::AnyMessage::Bytes(replication_prompt.into()));
     649         2088 : 
     650         2088 :         let chan = conn.recv_chan();
     651         3506 :         while startpos < endpos {
     652         2088 :             let event = chan.recv();
     653         1946 :             match event {
     654              :                 NetEvent::Closed => {
     655          142 :                     debug!("connection closed in recovery");
     656          142 :                     break;
     657              :                 }
     658         1946 :                 NetEvent::Message(AnyMessage::Bytes(b)) => {
     659         1946 :                     debug!("got recovery bytes from safekeeper");
     660         1418 :                     self.disk.lock().write(startpos, &b);
     661         1418 :                     startpos += b.len() as u64;
     662              :                 }
     663            0 :                 NetEvent::Message(_) => unreachable!(),
     664              :             }
     665              :         }
     666              : 
     667         1560 :         debug!("recovery finished at {}", startpos);
     668              : 
     669         1560 :         startpos == endpos
     670         4882 :     }
     671              : 
     672        91217 :     fn conn_finish(&self, sk: &mut walproposer::bindings::Safekeeper) {
     673        91217 :         let mut conn = self.get_conn(sk);
     674        91217 :         debug!("conn_finish to {}", conn.node_id);
     675        91217 :         if let Some(socket) = conn.socket.as_mut() {
     676        29216 :             socket.close();
     677        62001 :         } else {
     678        62001 :             // connection is already closed
     679        62001 :         }
     680        91217 :         conn.socket = None;
     681        91217 :     }
     682              : 
     683        89014 :     fn conn_error_message(&self, _sk: &mut walproposer::bindings::Safekeeper) -> String {
     684        89014 :         "connection is closed, probably".into()
     685        89014 :     }
     686              : }
        

Generated by: LCOV version 2.1-beta