Line data Source code
1 : //! Simple test to verify that simulator is working.
2 : #[cfg(test)]
3 : mod reliable_copy_test {
4 : use std::sync::Arc;
5 :
6 : use anyhow::Result;
7 : use desim::executor::{self, PollSome};
8 : use desim::node_os::NodeOs;
9 : use desim::options::{Delay, NetworkOptions};
10 : use desim::proto::{AnyMessage, NetEvent, NodeEvent, ReplCell};
11 : use desim::world::{NodeId, World};
12 : use parking_lot::Mutex;
13 : use tracing::info;
14 :
15 : /// Disk storage trait and implementation.
16 : pub trait Storage<T> {
17 : fn flush_pos(&self) -> u32;
18 : fn flush(&mut self) -> Result<()>;
19 : fn write(&mut self, t: T);
20 : }
21 :
22 : #[derive(Clone)]
23 : pub struct SharedStorage<T> {
24 : pub state: Arc<Mutex<InMemoryStorage<T>>>,
25 : }
26 :
27 : impl<T> SharedStorage<T> {
28 20 : pub fn new() -> Self {
29 20 : Self {
30 20 : state: Arc::new(Mutex::new(InMemoryStorage::new())),
31 20 : }
32 20 : }
33 : }
34 :
35 : impl<T> Storage<T> for SharedStorage<T> {
36 1049 : fn flush_pos(&self) -> u32 {
37 1049 : self.state.lock().flush_pos
38 1049 : }
39 :
40 100 : fn flush(&mut self) -> Result<()> {
41 100 : executor::yield_me(0);
42 100 : self.state.lock().flush()
43 100 : }
44 :
45 100 : fn write(&mut self, t: T) {
46 100 : executor::yield_me(0);
47 100 : self.state.lock().write(t);
48 100 : }
49 : }
50 :
51 : pub struct InMemoryStorage<T> {
52 : pub data: Vec<T>,
53 : pub flush_pos: u32,
54 : }
55 :
56 : impl<T> InMemoryStorage<T> {
57 20 : pub fn new() -> Self {
58 20 : Self {
59 20 : data: Vec::new(),
60 20 : flush_pos: 0,
61 20 : }
62 20 : }
63 :
64 100 : pub fn flush(&mut self) -> Result<()> {
65 100 : self.flush_pos = self.data.len() as u32;
66 100 : Ok(())
67 100 : }
68 :
69 100 : pub fn write(&mut self, t: T) {
70 100 : self.data.push(t);
71 100 : }
72 : }
73 :
74 : /// Server implementation.
75 20 : pub fn run_server(os: NodeOs, mut storage: Box<dyn Storage<u32>>) {
76 20 : info!("started server");
77 :
78 20 : let node_events = os.node_events();
79 20 : let mut epoll_vec: Vec<Box<dyn PollSome>> = vec![Box::new(node_events.clone())];
80 20 : let mut sockets = vec![];
81 :
82 : loop {
83 1537 : let index = executor::epoll_chans(&epoll_vec, -1).unwrap();
84 1537 :
85 1537 : if index == 0 {
86 568 : let node_event = node_events.must_recv();
87 568 : info!("got node event: {:?}", node_event);
88 568 : if let NodeEvent::Accept(tcp) = node_event {
89 568 : tcp.send(AnyMessage::Just32(storage.flush_pos()));
90 568 : epoll_vec.push(Box::new(tcp.recv_chan()));
91 568 : sockets.push(tcp);
92 568 : }
93 568 : continue;
94 969 : }
95 969 :
96 969 : let recv_chan = sockets[index - 1].recv_chan();
97 969 : let socket = &sockets[index - 1];
98 969 :
99 969 : let event = recv_chan.must_recv();
100 969 : info!("got event: {:?}", event);
101 381 : if let NetEvent::Message(AnyMessage::ReplCell(cell)) = event {
102 381 : if cell.seqno != storage.flush_pos() {
103 281 : info!("got out of order data: {:?}", cell);
104 281 : continue;
105 100 : }
106 100 : storage.write(cell.value);
107 100 : storage.flush().unwrap();
108 100 : socket.send(AnyMessage::Just32(storage.flush_pos()));
109 568 : }
110 : }
111 : }
112 :
113 : /// Client copies all data from array to the remote node.
114 20 : pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) {
115 20 : info!("started client");
116 :
117 20 : let mut delivered = 0;
118 20 :
119 20 : let mut sock = os.open_tcp(dst);
120 20 : let mut recv_chan = sock.recv_chan();
121 :
122 1046 : while delivered < data.len() {
123 1026 : let num = &data[delivered];
124 1026 : info!("sending data: {:?}", num.clone());
125 1026 : sock.send(AnyMessage::ReplCell(num.clone()));
126 1026 :
127 1026 : // loop {
128 1026 : let event = recv_chan.recv();
129 128 : match event {
130 128 : NetEvent::Message(AnyMessage::Just32(flush_pos)) => {
131 128 : if flush_pos == 1 + delivered as u32 {
132 100 : delivered += 1;
133 100 : }
134 : }
135 : NetEvent::Closed => {
136 898 : info!("connection closed, reestablishing");
137 898 : sock = os.open_tcp(dst);
138 898 : recv_chan = sock.recv_chan();
139 : }
140 0 : _ => {}
141 : }
142 :
143 : // }
144 : }
145 :
146 20 : let sock = os.open_tcp(dst);
147 120 : for num in data {
148 100 : info!("sending data: {:?}", num.clone());
149 100 : sock.send(AnyMessage::ReplCell(num.clone()));
150 : }
151 :
152 20 : info!("sent all data and finished client");
153 20 : }
154 :
155 : /// Run test simulations.
156 : #[test]
157 1 : fn sim_example_reliable_copy() {
158 1 : utils::logging::init(
159 1 : utils::logging::LogFormat::Test,
160 1 : utils::logging::TracingErrorLayerEnablement::Disabled,
161 1 : utils::logging::Output::Stdout,
162 1 : )
163 1 : .expect("logging init failed");
164 1 :
165 1 : let delay = Delay {
166 1 : min: 1,
167 1 : max: 60,
168 1 : fail_prob: 0.4,
169 1 : };
170 1 :
171 1 : let network = NetworkOptions {
172 1 : keepalive_timeout: Some(50),
173 1 : connect_delay: delay.clone(),
174 1 : send_delay: delay.clone(),
175 1 : };
176 :
177 21 : for seed in 0..20 {
178 20 : let u32_data: [u32; 5] = [1, 2, 3, 4, 5];
179 20 : let data = u32_to_cells(&u32_data, 1);
180 20 : let world = Arc::new(World::new(seed, Arc::new(network.clone())));
181 20 :
182 20 : start_simulation(Options {
183 20 : world,
184 20 : time_limit: 1_000_000,
185 20 : client_fn: Box::new(move |os, server_id| run_client(os, &data, server_id)),
186 20 : u32_data,
187 20 : });
188 20 : }
189 1 : }
190 :
191 : pub struct Options {
192 : pub world: Arc<World>,
193 : pub time_limit: u64,
194 : pub u32_data: [u32; 5],
195 : pub client_fn: Box<dyn FnOnce(NodeOs, u32) + Send + 'static>,
196 : }
197 :
198 20 : pub fn start_simulation(options: Options) {
199 20 : let world = options.world;
200 20 :
201 20 : let client_node = world.new_node();
202 20 : let server_node = world.new_node();
203 20 : let server_id = server_node.id;
204 20 :
205 20 : // start the client thread
206 20 : client_node.launch(move |os| {
207 20 : let client_fn = options.client_fn;
208 20 : client_fn(os, server_id);
209 20 : });
210 20 :
211 20 : // start the server thread
212 20 : let shared_storage = SharedStorage::new();
213 20 : let server_storage = shared_storage.clone();
214 20 : server_node.launch(move |os| run_server(os, Box::new(server_storage)));
215 :
216 7830 : while world.step() && world.now() < options.time_limit {}
217 :
218 20 : let disk_data = shared_storage.state.lock().data.clone();
219 20 : assert!(verify_data(&disk_data, &options.u32_data[..]));
220 20 : }
221 :
222 20 : pub fn u32_to_cells(data: &[u32], client_id: u32) -> Vec<ReplCell> {
223 20 : let mut res = Vec::new();
224 100 : for (i, _) in data.iter().enumerate() {
225 100 : res.push(ReplCell {
226 100 : client_id,
227 100 : seqno: i as u32,
228 100 : value: data[i],
229 100 : });
230 100 : }
231 20 : res
232 20 : }
233 :
234 20 : fn verify_data(disk_data: &[u32], data: &[u32]) -> bool {
235 20 : if disk_data.len() != data.len() {
236 0 : return false;
237 20 : }
238 100 : for i in 0..data.len() {
239 100 : if disk_data[i] != data[i] {
240 0 : return false;
241 100 : }
242 : }
243 20 : true
244 20 : }
245 : }
|