Line data Source code
1 : //! Simple test to verify that simulator is working.
2 : #[cfg(test)]
3 : mod reliable_copy_test {
4 : use anyhow::Result;
5 : use desim::executor::{self, PollSome};
6 : use desim::options::{Delay, NetworkOptions};
7 : use desim::proto::{NetEvent, NodeEvent, ReplCell};
8 : use desim::world::{NodeId, World};
9 : use desim::{node_os::NodeOs, proto::AnyMessage};
10 : use parking_lot::Mutex;
11 : use std::sync::Arc;
12 : use tracing::info;
13 :
14 : /// Disk storage trait and implementation.
15 : pub trait Storage<T> {
16 : fn flush_pos(&self) -> u32;
17 : fn flush(&mut self) -> Result<()>;
18 : fn write(&mut self, t: T);
19 : }
20 :
21 40 : #[derive(Clone)]
22 : pub struct SharedStorage<T> {
23 : pub state: Arc<Mutex<InMemoryStorage<T>>>,
24 : }
25 :
26 : impl<T> SharedStorage<T> {
27 40 : pub fn new() -> Self {
28 40 : Self {
29 40 : state: Arc::new(Mutex::new(InMemoryStorage::new())),
30 40 : }
31 40 : }
32 : }
33 :
34 : impl<T> Storage<T> for SharedStorage<T> {
35 2098 : fn flush_pos(&self) -> u32 {
36 2098 : self.state.lock().flush_pos
37 2098 : }
38 :
39 200 : fn flush(&mut self) -> Result<()> {
40 200 : executor::yield_me(0);
41 200 : self.state.lock().flush()
42 200 : }
43 :
44 200 : fn write(&mut self, t: T) {
45 200 : executor::yield_me(0);
46 200 : self.state.lock().write(t);
47 200 : }
48 : }
49 :
50 : pub struct InMemoryStorage<T> {
51 : pub data: Vec<T>,
52 : pub flush_pos: u32,
53 : }
54 :
55 : impl<T> InMemoryStorage<T> {
56 40 : pub fn new() -> Self {
57 40 : Self {
58 40 : data: Vec::new(),
59 40 : flush_pos: 0,
60 40 : }
61 40 : }
62 :
63 200 : pub fn flush(&mut self) -> Result<()> {
64 200 : self.flush_pos = self.data.len() as u32;
65 200 : Ok(())
66 200 : }
67 :
68 200 : pub fn write(&mut self, t: T) {
69 200 : self.data.push(t);
70 200 : }
71 : }
72 :
73 : /// Server implementation.
74 40 : pub fn run_server(os: NodeOs, mut storage: Box<dyn Storage<u32>>) {
75 40 : info!("started server");
76 :
77 40 : let node_events = os.node_events();
78 40 : let mut epoll_vec: Vec<Box<dyn PollSome>> = vec![Box::new(node_events.clone())];
79 40 : let mut sockets = vec![];
80 :
81 3074 : loop {
82 3074 : let index = executor::epoll_chans(&epoll_vec, -1).unwrap();
83 3074 :
84 3074 : if index == 0 {
85 1136 : let node_event = node_events.must_recv();
86 1136 : info!("got node event: {:?}", node_event);
87 1136 : if let NodeEvent::Accept(tcp) = node_event {
88 1136 : tcp.send(AnyMessage::Just32(storage.flush_pos()));
89 1136 : epoll_vec.push(Box::new(tcp.recv_chan()));
90 1136 : sockets.push(tcp);
91 1136 : }
92 1136 : continue;
93 1938 : }
94 1938 :
95 1938 : let recv_chan = sockets[index - 1].recv_chan();
96 1938 : let socket = &sockets[index - 1];
97 1938 :
98 1938 : let event = recv_chan.must_recv();
99 1938 : info!("got event: {:?}", event);
100 1898 : if let NetEvent::Message(AnyMessage::ReplCell(cell)) = event {
101 762 : if cell.seqno != storage.flush_pos() {
102 562 : info!("got out of order data: {:?}", cell);
103 562 : continue;
104 200 : }
105 200 : storage.write(cell.value);
106 200 : storage.flush().unwrap();
107 200 : socket.send(AnyMessage::Just32(storage.flush_pos()));
108 1136 : }
109 : }
110 : }
111 :
112 : /// Client copies all data from array to the remote node.
113 40 : pub fn run_client(os: NodeOs, data: &[ReplCell], dst: NodeId) {
114 40 : info!("started client");
115 :
116 40 : let mut delivered = 0;
117 40 :
118 40 : let mut sock = os.open_tcp(dst);
119 40 : let mut recv_chan = sock.recv_chan();
120 :
121 2092 : while delivered < data.len() {
122 2052 : let num = &data[delivered];
123 2052 : info!("sending data: {:?}", num.clone());
124 2052 : sock.send(AnyMessage::ReplCell(num.clone()));
125 2052 :
126 2052 : // loop {
127 2052 : let event = recv_chan.recv();
128 256 : match event {
129 256 : NetEvent::Message(AnyMessage::Just32(flush_pos)) => {
130 256 : if flush_pos == 1 + delivered as u32 {
131 200 : delivered += 1;
132 200 : }
133 : }
134 : NetEvent::Closed => {
135 1796 : info!("connection closed, reestablishing");
136 1796 : sock = os.open_tcp(dst);
137 1796 : recv_chan = sock.recv_chan();
138 : }
139 0 : _ => {}
140 : }
141 :
142 : // }
143 : }
144 :
145 40 : let sock = os.open_tcp(dst);
146 240 : for num in data {
147 200 : info!("sending data: {:?}", num.clone());
148 200 : sock.send(AnyMessage::ReplCell(num.clone()));
149 : }
150 :
151 40 : info!("sent all data and finished client");
152 40 : }
153 :
154 : /// Run test simulations.
155 2 : #[test]
156 2 : fn sim_example_reliable_copy() {
157 2 : utils::logging::init(
158 2 : utils::logging::LogFormat::Test,
159 2 : utils::logging::TracingErrorLayerEnablement::Disabled,
160 2 : utils::logging::Output::Stdout,
161 2 : )
162 2 : .expect("logging init failed");
163 2 :
164 2 : let delay = Delay {
165 2 : min: 1,
166 2 : max: 60,
167 2 : fail_prob: 0.4,
168 2 : };
169 2 :
170 2 : let network = NetworkOptions {
171 2 : keepalive_timeout: Some(50),
172 2 : connect_delay: delay.clone(),
173 2 : send_delay: delay.clone(),
174 2 : };
175 :
176 42 : for seed in 0..20 {
177 40 : let u32_data: [u32; 5] = [1, 2, 3, 4, 5];
178 40 : let data = u32_to_cells(&u32_data, 1);
179 40 : let world = Arc::new(World::new(seed, Arc::new(network.clone())));
180 40 :
181 40 : start_simulation(Options {
182 40 : world,
183 40 : time_limit: 1_000_000,
184 40 : client_fn: Box::new(move |os, server_id| run_client(os, &data, server_id)),
185 40 : u32_data,
186 40 : });
187 40 : }
188 2 : }
189 :
190 : pub struct Options {
191 : pub world: Arc<World>,
192 : pub time_limit: u64,
193 : pub u32_data: [u32; 5],
194 : pub client_fn: Box<dyn FnOnce(NodeOs, u32) + Send + 'static>,
195 : }
196 :
197 40 : pub fn start_simulation(options: Options) {
198 40 : let world = options.world;
199 40 :
200 40 : let client_node = world.new_node();
201 40 : let server_node = world.new_node();
202 40 : let server_id = server_node.id;
203 40 :
204 40 : // start the client thread
205 40 : client_node.launch(move |os| {
206 40 : let client_fn = options.client_fn;
207 40 : client_fn(os, server_id);
208 40 : });
209 40 :
210 40 : // start the server thread
211 40 : let shared_storage = SharedStorage::new();
212 40 : let server_storage = shared_storage.clone();
213 40 : server_node.launch(move |os| run_server(os, Box::new(server_storage)));
214 :
215 15660 : while world.step() && world.now() < options.time_limit {}
216 :
217 40 : let disk_data = shared_storage.state.lock().data.clone();
218 40 : assert!(verify_data(&disk_data, &options.u32_data[..]));
219 40 : }
220 :
221 40 : pub fn u32_to_cells(data: &[u32], client_id: u32) -> Vec<ReplCell> {
222 40 : let mut res = Vec::new();
223 200 : for (i, _) in data.iter().enumerate() {
224 200 : res.push(ReplCell {
225 200 : client_id,
226 200 : seqno: i as u32,
227 200 : value: data[i],
228 200 : });
229 200 : }
230 40 : res
231 40 : }
232 :
233 40 : fn verify_data(disk_data: &[u32], data: &[u32]) -> bool {
234 40 : if disk_data.len() != data.len() {
235 0 : return false;
236 40 : }
237 200 : for i in 0..data.len() {
238 200 : if disk_data[i] != data[i] {
239 0 : return false;
240 200 : }
241 : }
242 40 : true
243 40 : }
244 : }
|