LCOV - code coverage report
Current view: top level - libs/desim/tests - reliable_copy_test.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 98.2 % 163 160
Test Date: 2025-03-12 00:01:28 Functions: 100.0 % 16 16

            Line data    Source code
       1              : //! Simple test to verify that simulator is working.
       2              : #[cfg(test)]
       3              : mod reliable_copy_test {
       4              :     use std::sync::Arc;
       5              : 
       6              :     use anyhow::Result;
       7              :     use desim::executor::{self, PollSome};
       8              :     use desim::node_os::NodeOs;
       9              :     use desim::options::{Delay, NetworkOptions};
      10              :     use desim::proto::{AnyMessage, NetEvent, NodeEvent, ReplCell};
      11              :     use desim::world::{NodeId, World};
      12              :     use parking_lot::Mutex;
      13              :     use tracing::info;
      14              : 
      15              :     /// Disk storage trait and implementation.
      16              :     pub trait Storage<T> {
      17              :         fn flush_pos(&self) -> u32;
      18              :         fn flush(&mut self) -> Result<()>;
      19              :         fn write(&mut self, t: T);
      20              :     }
      21              : 
      22              :     #[derive(Clone)]
      23              :     pub struct SharedStorage<T> {
      24              :         pub state: Arc<Mutex<InMemoryStorage<T>>>,
      25              :     }
      26              : 
      27              :     impl<T> SharedStorage<T> {
      28           20 :         pub fn new() -> Self {
      29           20 :             Self {
      30           20 :                 state: Arc::new(Mutex::new(InMemoryStorage::new())),
      31           20 :             }
      32           20 :         }
      33              :     }
      34              : 
      35              :     impl<T> Storage<T> for SharedStorage<T> {
      36         1049 :         fn flush_pos(&self) -> u32 {
      37         1049 :             self.state.lock().flush_pos
      38         1049 :         }
      39              : 
      40          100 :         fn flush(&mut self) -> Result<()> {
      41          100 :             executor::yield_me(0);
      42          100 :             self.state.lock().flush()
      43          100 :         }
      44              : 
      45          100 :         fn write(&mut self, t: T) {
      46          100 :             executor::yield_me(0);
      47          100 :             self.state.lock().write(t);
      48          100 :         }
      49              :     }
      50              : 
      51              :     pub struct InMemoryStorage<T> {
      52              :         pub data: Vec<T>,
      53              :         pub flush_pos: u32,
      54              :     }
      55              : 
      56              :     impl<T> InMemoryStorage<T> {
      57           20 :         pub fn new() -> Self {
      58           20 :             Self {
      59           20 :                 data: Vec::new(),
      60           20 :                 flush_pos: 0,
      61           20 :             }
      62           20 :         }
      63              : 
      64          100 :         pub fn flush(&mut self) -> Result<()> {
      65          100 :             self.flush_pos = self.data.len() as u32;
      66          100 :             Ok(())
      67          100 :         }
      68              : 
      69          100 :         pub fn write(&mut self, t: T) {
      70          100 :             self.data.push(t);
      71          100 :         }
      72              :     }
      73              : 
      74              :     /// Server implementation.
      75           20 :     pub fn run_server(os: NodeOs, mut storage: Box<dyn Storage<u32>>) {
      76           20 :         info!("started server");
      77              : 
      78           20 :         let node_events = os.node_events();
      79           20 :         let mut epoll_vec: Vec<Box<dyn PollSome>> = vec![Box::new(node_events.clone())];
      80           20 :         let mut sockets = vec![];
      81              : 
      82              :         loop {
      83         1537 :             let index = executor::epoll_chans(&epoll_vec, -1).unwrap();
      84         1537 : 
      85         1537 :             if index == 0 {
      86          568 :                 let node_event = node_events.must_recv();
      87          568 :                 info!("got node event: {:?}", node_event);
      88          568 :                 if let NodeEvent::Accept(tcp) = node_event {
      89          568 :                     tcp.send(AnyMessage::Just32(storage.flush_pos()));
      90          568 :                     epoll_vec.push(Box::new(tcp.recv_chan()));
      91          568 :                     sockets.push(tcp);
      92          568 :                 }
      93          568 :                 continue;
      94          969 :             }
      95          969 : 
      96          969 :             let recv_chan = sockets[index - 1].recv_chan();
      97          969 :             let socket = &sockets[index - 1];
      98          969 : 
      99          969 :             let event = recv_chan.must_recv();
     100          969 :             info!("got event: {:?}", event);
     101          381 :             if let NetEvent::Message(AnyMessage::ReplCell(cell)) = event {
     102          381 :                 if cell.seqno != storage.flush_pos() {
     103          281 :                     info!("got out of order data: {:?}", cell);
     104          281 :                     continue;
     105          100 :                 }
     106          100 :                 storage.write(cell.value);
     107          100 :                 storage.flush().unwrap();
     108          100 :                 socket.send(AnyMessage::Just32(storage.flush_pos()));
     109          568 :             }
     110              :         }
     111              :     }
     112              : 
     113              :     /// Client copies all data from array to the remote node.
     114           20 :     pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) {
     115           20 :         info!("started client");
     116              : 
     117           20 :         let mut delivered = 0;
     118           20 : 
     119           20 :         let mut sock = os.open_tcp(dst);
     120           20 :         let mut recv_chan = sock.recv_chan();
     121              : 
     122         1046 :         while delivered < data.len() {
     123         1026 :             let num = &data[delivered];
     124         1026 :             info!("sending data: {:?}", num.clone());
     125         1026 :             sock.send(AnyMessage::ReplCell(num.clone()));
     126         1026 : 
     127         1026 :             // loop {
     128         1026 :             let event = recv_chan.recv();
     129          128 :             match event {
     130          128 :                 NetEvent::Message(AnyMessage::Just32(flush_pos)) => {
     131          128 :                     if flush_pos == 1 + delivered as u32 {
     132          100 :                         delivered += 1;
     133          100 :                     }
     134              :                 }
     135              :                 NetEvent::Closed => {
     136          898 :                     info!("connection closed, reestablishing");
     137          898 :                     sock = os.open_tcp(dst);
     138          898 :                     recv_chan = sock.recv_chan();
     139              :                 }
     140            0 :                 _ => {}
     141              :             }
     142              : 
     143              :             // }
     144              :         }
     145              : 
     146           20 :         let sock = os.open_tcp(dst);
     147          120 :         for num in data {
     148          100 :             info!("sending data: {:?}", num.clone());
     149          100 :             sock.send(AnyMessage::ReplCell(num.clone()));
     150              :         }
     151              : 
     152           20 :         info!("sent all data and finished client");
     153           20 :     }
     154              : 
     155              :     /// Run test simulations.
     156              :     #[test]
     157            1 :     fn sim_example_reliable_copy() {
     158            1 :         utils::logging::init(
     159            1 :             utils::logging::LogFormat::Test,
     160            1 :             utils::logging::TracingErrorLayerEnablement::Disabled,
     161            1 :             utils::logging::Output::Stdout,
     162            1 :         )
     163            1 :         .expect("logging init failed");
     164            1 : 
     165            1 :         let delay = Delay {
     166            1 :             min: 1,
     167            1 :             max: 60,
     168            1 :             fail_prob: 0.4,
     169            1 :         };
     170            1 : 
     171            1 :         let network = NetworkOptions {
     172            1 :             keepalive_timeout: Some(50),
     173            1 :             connect_delay: delay.clone(),
     174            1 :             send_delay: delay.clone(),
     175            1 :         };
     176              : 
     177           21 :         for seed in 0..20 {
     178           20 :             let u32_data: [u32; 5] = [1, 2, 3, 4, 5];
     179           20 :             let data = u32_to_cells(&u32_data, 1);
     180           20 :             let world = Arc::new(World::new(seed, Arc::new(network.clone())));
     181           20 : 
     182           20 :             start_simulation(Options {
     183           20 :                 world,
     184           20 :                 time_limit: 1_000_000,
     185           20 :                 client_fn: Box::new(move |os, server_id| run_client(os, &data, server_id)),
     186           20 :                 u32_data,
     187           20 :             });
     188           20 :         }
     189            1 :     }
     190              : 
     191              :     pub struct Options {
     192              :         pub world: Arc<World>,
     193              :         pub time_limit: u64,
     194              :         pub u32_data: [u32; 5],
     195              :         pub client_fn: Box<dyn FnOnce(NodeOs, u32) + Send + 'static>,
     196              :     }
     197              : 
     198           20 :     pub fn start_simulation(options: Options) {
     199           20 :         let world = options.world;
     200           20 : 
     201           20 :         let client_node = world.new_node();
     202           20 :         let server_node = world.new_node();
     203           20 :         let server_id = server_node.id;
     204           20 : 
     205           20 :         // start the client thread
     206           20 :         client_node.launch(move |os| {
     207           20 :             let client_fn = options.client_fn;
     208           20 :             client_fn(os, server_id);
     209           20 :         });
     210           20 : 
     211           20 :         // start the server thread
     212           20 :         let shared_storage = SharedStorage::new();
     213           20 :         let server_storage = shared_storage.clone();
     214           20 :         server_node.launch(move |os| run_server(os, Box::new(server_storage)));
     215              : 
     216         7830 :         while world.step() && world.now() < options.time_limit {}
     217              : 
     218           20 :         let disk_data = shared_storage.state.lock().data.clone();
     219           20 :         assert!(verify_data(&disk_data, &options.u32_data[..]));
     220           20 :     }
     221              : 
     222           20 :     pub fn u32_to_cells(data: &[u32], client_id: u32) -> Vec<ReplCell> {
     223           20 :         let mut res = Vec::new();
     224          100 :         for (i, _) in data.iter().enumerate() {
     225          100 :             res.push(ReplCell {
     226          100 :                 client_id,
     227          100 :                 seqno: i as u32,
     228          100 :                 value: data[i],
     229          100 :             });
     230          100 :         }
     231           20 :         res
     232           20 :     }
     233              : 
     234           20 :     fn verify_data(disk_data: &[u32], data: &[u32]) -> bool {
     235           20 :         if disk_data.len() != data.len() {
     236            0 :             return false;
     237           20 :         }
     238          100 :         for i in 0..data.len() {
     239          100 :             if disk_data[i] != data[i] {
     240            0 :                 return false;
     241          100 :             }
     242              :         }
     243           20 :         true
     244           20 :     }
     245              : }
        

Generated by: LCOV version 2.1-beta