|             Line data    Source code 
       1              : //! Simple test to verify that simulator is working.
       2              : #[cfg(test)]
       3              : mod reliable_copy_test {
       4              :     use anyhow::Result;
       5              :     use desim::executor::{self, PollSome};
       6              :     use desim::options::{Delay, NetworkOptions};
       7              :     use desim::proto::{NetEvent, NodeEvent, ReplCell};
       8              :     use desim::world::{NodeId, World};
       9              :     use desim::{node_os::NodeOs, proto::AnyMessage};
      10              :     use parking_lot::Mutex;
      11              :     use std::sync::Arc;
      12              :     use tracing::info;
      13              : 
      14              :     /// Disk storage trait and implementation.
      15              :     pub trait Storage<T> {
      16              :         fn flush_pos(&self) -> u32;
      17              :         fn flush(&mut self) -> Result<()>;
      18              :         fn write(&mut self, t: T);
      19              :     }
      20              : 
      21              :     #[derive(Clone)]
      22              :     pub struct SharedStorage<T> {
      23              :         pub state: Arc<Mutex<InMemoryStorage<T>>>,
      24              :     }
      25              : 
      26              :     impl<T> SharedStorage<T> {
      27           40 :         pub fn new() -> Self {
      28           40 :             Self {
      29           40 :                 state: Arc::new(Mutex::new(InMemoryStorage::new())),
      30           40 :             }
      31           40 :         }
      32              :     }
      33              : 
      34              :     impl<T> Storage<T> for SharedStorage<T> {
      35         2098 :         fn flush_pos(&self) -> u32 {
      36         2098 :             self.state.lock().flush_pos
      37         2098 :         }
      38              : 
      39          200 :         fn flush(&mut self) -> Result<()> {
      40          200 :             executor::yield_me(0);
      41          200 :             self.state.lock().flush()
      42          200 :         }
      43              : 
      44          200 :         fn write(&mut self, t: T) {
      45          200 :             executor::yield_me(0);
      46          200 :             self.state.lock().write(t);
      47          200 :         }
      48              :     }
      49              : 
      50              :     pub struct InMemoryStorage<T> {
      51              :         pub data: Vec<T>,
      52              :         pub flush_pos: u32,
      53              :     }
      54              : 
      55              :     impl<T> InMemoryStorage<T> {
      56           40 :         pub fn new() -> Self {
      57           40 :             Self {
      58           40 :                 data: Vec::new(),
      59           40 :                 flush_pos: 0,
      60           40 :             }
      61           40 :         }
      62              : 
      63          200 :         pub fn flush(&mut self) -> Result<()> {
      64          200 :             self.flush_pos = self.data.len() as u32;
      65          200 :             Ok(())
      66          200 :         }
      67              : 
      68          200 :         pub fn write(&mut self, t: T) {
      69          200 :             self.data.push(t);
      70          200 :         }
      71              :     }
      72              : 
      73              :     /// Server implementation.
      74           40 :     pub fn run_server(os: NodeOs, mut storage: Box<dyn Storage<u32>>) {
      75           40 :         info!("started server");
      76              : 
      77           40 :         let node_events = os.node_events();
      78           40 :         let mut epoll_vec: Vec<Box<dyn PollSome>> = vec![Box::new(node_events.clone())];
      79           40 :         let mut sockets = vec![];
      80              : 
      81         3074 :         loop {
      82         3074 :             let index = executor::epoll_chans(&epoll_vec, -1).unwrap();
      83         3074 : 
      84         3074 :             if index == 0 {
      85         1136 :                 let node_event = node_events.must_recv();
      86         1136 :                 info!("got node event: {:?}", node_event);
      87         1136 :                 if let NodeEvent::Accept(tcp) = node_event {
      88         1136 :                     tcp.send(AnyMessage::Just32(storage.flush_pos()));
      89         1136 :                     epoll_vec.push(Box::new(tcp.recv_chan()));
      90         1136 :                     sockets.push(tcp);
      91         1136 :                 }
      92         1136 :                 continue;
      93         1938 :             }
      94         1938 : 
      95         1938 :             let recv_chan = sockets[index - 1].recv_chan();
      96         1938 :             let socket = &sockets[index - 1];
      97         1938 : 
      98         1938 :             let event = recv_chan.must_recv();
      99         1938 :             info!("got event: {:?}", event);
     100         1898 :             if let NetEvent::Message(AnyMessage::ReplCell(cell)) = event {
     101          762 :                 if cell.seqno != storage.flush_pos() {
     102          562 :                     info!("got out of order data: {:?}", cell);
     103          562 :                     continue;
     104          200 :                 }
     105          200 :                 storage.write(cell.value);
     106          200 :                 storage.flush().unwrap();
     107          200 :                 socket.send(AnyMessage::Just32(storage.flush_pos()));
     108         1136 :             }
     109              :         }
     110              :     }
     111              : 
     112              :     /// Client copies all data from array to the remote node.
     113           40 :     pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) {
     114           40 :         info!("started client");
     115              : 
     116           40 :         let mut delivered = 0;
     117           40 : 
     118           40 :         let mut sock = os.open_tcp(dst);
     119           40 :         let mut recv_chan = sock.recv_chan();
     120              : 
     121         2092 :         while delivered < data.len() {
     122         2052 :             let num = &data[delivered];
     123         2052 :             info!("sending data: {:?}", num.clone());
     124         2052 :             sock.send(AnyMessage::ReplCell(num.clone()));
     125         2052 : 
     126         2052 :             // loop {
     127         2052 :             let event = recv_chan.recv();
     128          256 :             match event {
     129          256 :                 NetEvent::Message(AnyMessage::Just32(flush_pos)) => {
     130          256 :                     if flush_pos == 1 + delivered as u32 {
     131          200 :                         delivered += 1;
     132          200 :                     }
     133              :                 }
     134              :                 NetEvent::Closed => {
     135         1796 :                     info!("connection closed, reestablishing");
     136         1796 :                     sock = os.open_tcp(dst);
     137         1796 :                     recv_chan = sock.recv_chan();
     138              :                 }
     139            0 :                 _ => {}
     140              :             }
     141              : 
     142              :             // }
     143              :         }
     144              : 
     145           40 :         let sock = os.open_tcp(dst);
     146          240 :         for num in data {
     147          200 :             info!("sending data: {:?}", num.clone());
     148          200 :             sock.send(AnyMessage::ReplCell(num.clone()));
     149              :         }
     150              : 
     151           40 :         info!("sent all data and finished client");
     152           40 :     }
     153              : 
     154              :     /// Run test simulations.
     155              :     #[test]
     156            2 :     fn sim_example_reliable_copy() {
     157            2 :         utils::logging::init(
     158            2 :             utils::logging::LogFormat::Test,
     159            2 :             utils::logging::TracingErrorLayerEnablement::Disabled,
     160            2 :             utils::logging::Output::Stdout,
     161            2 :         )
     162            2 :         .expect("logging init failed");
     163            2 : 
     164            2 :         let delay = Delay {
     165            2 :             min: 1,
     166            2 :             max: 60,
     167            2 :             fail_prob: 0.4,
     168            2 :         };
     169            2 : 
     170            2 :         let network = NetworkOptions {
     171            2 :             keepalive_timeout: Some(50),
     172            2 :             connect_delay: delay.clone(),
     173            2 :             send_delay: delay.clone(),
     174            2 :         };
     175              : 
     176           42 :         for seed in 0..20 {
     177           40 :             let u32_data: [u32; 5] = [1, 2, 3, 4, 5];
     178           40 :             let data = u32_to_cells(&u32_data, 1);
     179           40 :             let world = Arc::new(World::new(seed, Arc::new(network.clone())));
     180           40 : 
     181           40 :             start_simulation(Options {
     182           40 :                 world,
     183           40 :                 time_limit: 1_000_000,
     184           40 :                 client_fn: Box::new(move |os, server_id| run_client(os, &data, server_id)),
     185           40 :                 u32_data,
     186           40 :             });
     187           40 :         }
     188            2 :     }
     189              : 
     190              :     pub struct Options {
     191              :         pub world: Arc<World>,
     192              :         pub time_limit: u64,
     193              :         pub u32_data: [u32; 5],
     194              :         pub client_fn: Box<dyn FnOnce(NodeOs, u32) + Send + 'static>,
     195              :     }
     196              : 
     197           40 :     pub fn start_simulation(options: Options) {
     198           40 :         let world = options.world;
     199           40 : 
     200           40 :         let client_node = world.new_node();
     201           40 :         let server_node = world.new_node();
     202           40 :         let server_id = server_node.id;
     203           40 : 
     204           40 :         // start the client thread
     205           40 :         client_node.launch(move |os| {
     206           40 :             let client_fn = options.client_fn;
     207           40 :             client_fn(os, server_id);
     208           40 :         });
     209           40 : 
     210           40 :         // start the server thread
     211           40 :         let shared_storage = SharedStorage::new();
     212           40 :         let server_storage = shared_storage.clone();
     213           40 :         server_node.launch(move |os| run_server(os, Box::new(server_storage)));
     214              : 
     215        15660 :         while world.step() && world.now() < options.time_limit {}
     216              : 
     217           40 :         let disk_data = shared_storage.state.lock().data.clone();
     218           40 :         assert!(verify_data(&disk_data, &options.u32_data[..]));
     219           40 :     }
     220              : 
     221           40 :     pub fn u32_to_cells(data: &[u32], client_id: u32) -> Vec<ReplCell> {
     222           40 :         let mut res = Vec::new();
     223          200 :         for (i, _) in data.iter().enumerate() {
     224          200 :             res.push(ReplCell {
     225          200 :                 client_id,
     226          200 :                 seqno: i as u32,
     227          200 :                 value: data[i],
     228          200 :             });
     229          200 :         }
     230           40 :         res
     231           40 :     }
     232              : 
     233           40 :     fn verify_data(disk_data: &[u32], data: &[u32]) -> bool {
     234           40 :         if disk_data.len() != data.len() {
     235            0 :             return false;
     236           40 :         }
     237          200 :         for i in 0..data.len() {
     238          200 :             if disk_data[i] != data[i] {
     239            0 :                 return false;
     240          200 :             }
     241              :         }
     242           40 :         true
     243           40 :     }
     244              : }
         |