Line data Source code
1 : use parking_lot::Mutex;
2 : use rand::{rngs::StdRng, SeedableRng};
3 : use std::{
4 : ops::DerefMut,
5 : sync::{mpsc, Arc},
6 : };
7 :
8 : use crate::{
9 : executor::{ExternalHandle, Runtime},
10 : network::NetworkTask,
11 : options::NetworkOptions,
12 : proto::{NodeEvent, SimEvent},
13 : time::Timing,
14 : };
15 :
16 : use super::{chan::Chan, network::TCP, node_os::NodeOs};
17 :
18 : pub type NodeId = u32;
19 :
20 : /// World contains simulation state.
21 : pub struct World {
22 : nodes: Mutex<Vec<Arc<Node>>>,
23 : /// Random number generator.
24 : rng: Mutex<StdRng>,
25 : /// Internal event log.
26 : events: Mutex<Vec<SimEvent>>,
27 : /// Separate task that processes all network messages.
28 : network_task: Arc<NetworkTask>,
29 : /// Runtime for running threads and moving time.
30 : runtime: Mutex<Runtime>,
31 : /// To get current time.
32 : timing: Arc<Timing>,
33 : }
34 :
35 : impl World {
36 528 : pub fn new(seed: u64, options: Arc<NetworkOptions>) -> World {
37 528 : let timing = Arc::new(Timing::new());
38 528 : let mut runtime = Runtime::new(timing.clone());
39 528 :
40 528 : let (tx, rx) = mpsc::channel();
41 528 :
42 528 : runtime.spawn(move || {
43 528 : // create and start network background thread, and send it back via the channel
44 528 : NetworkTask::start_new(options, tx)
45 528 : });
46 :
47 : // wait for the network task to start
48 1056 : while runtime.step() {}
49 :
50 528 : let network_task = rx.recv().unwrap();
51 528 :
52 528 : World {
53 528 : nodes: Mutex::new(Vec::new()),
54 528 : rng: Mutex::new(StdRng::seed_from_u64(seed)),
55 528 : events: Mutex::new(Vec::new()),
56 528 : network_task,
57 528 : runtime: Mutex::new(runtime),
58 528 : timing,
59 528 : }
60 528 : }
61 :
62 422001 : pub fn step(&self) -> bool {
63 422001 : self.runtime.lock().step()
64 422001 : }
65 :
66 2 : pub fn get_thread_step_count(&self) -> u64 {
67 2 : self.runtime.lock().step_counter
68 2 : }
69 :
70 : /// Create a new random number generator.
71 46677 : pub fn new_rng(&self) -> StdRng {
72 46677 : let mut rng = self.rng.lock();
73 46677 : StdRng::from_rng(rng.deref_mut()).unwrap()
74 46677 : }
75 :
76 : /// Create a new node.
77 11050 : pub fn new_node(self: &Arc<Self>) -> Arc<Node> {
78 11050 : let mut nodes = self.nodes.lock();
79 11050 : let id = nodes.len() as NodeId;
80 11050 : let node = Arc::new(Node::new(id, self.clone(), self.new_rng()));
81 11050 : nodes.push(node.clone());
82 11050 : node
83 11050 : }
84 :
85 : /// Get an internal node state by id.
86 35627 : fn get_node(&self, id: NodeId) -> Option<Arc<Node>> {
87 35627 : let nodes = self.nodes.lock();
88 35627 : let num = id as usize;
89 35627 : if num < nodes.len() {
90 35627 : Some(nodes[num].clone())
91 : } else {
92 0 : None
93 : }
94 35627 : }
95 :
96 505 : pub fn stop_all(&self) {
97 505 : self.runtime.lock().crash_all_threads();
98 505 : }
99 :
100 : /// Returns a writable end of a TCP connection, to send src->dst messages.
101 35627 : pub fn open_tcp(self: &Arc<World>, dst: NodeId) -> TCP {
102 35627 : // TODO: replace unwrap() with /dev/null socket.
103 35627 : let dst = self.get_node(dst).unwrap();
104 35627 : let dst_accept = dst.node_events.lock().clone();
105 35627 :
106 35627 : let rng = self.new_rng();
107 35627 : self.network_task.start_new_connection(rng, dst_accept)
108 35627 : }
109 :
110 : /// Get current time.
111 805940 : pub fn now(&self) -> u64 {
112 805940 : self.timing.now()
113 805940 : }
114 :
115 : /// Get a copy of the internal clock.
116 1012 : pub fn clock(&self) -> Arc<Timing> {
117 1012 : self.timing.clone()
118 1012 : }
119 :
120 28221 : pub fn add_event(&self, node: NodeId, data: String) {
121 28221 : let time = self.now();
122 28221 : self.events.lock().push(SimEvent { time, node, data });
123 28221 : }
124 :
125 502 : pub fn take_events(&self) -> Vec<SimEvent> {
126 502 : let mut events = self.events.lock();
127 502 : let mut res = Vec::new();
128 502 : std::mem::swap(&mut res, &mut events);
129 502 : res
130 502 : }
131 :
132 503 : pub fn deallocate(&self) {
133 503 : self.stop_all();
134 503 : self.timing.clear();
135 503 : self.nodes.lock().clear();
136 503 : }
137 : }
138 :
139 : /// Internal node state.
140 : pub struct Node {
141 : pub id: NodeId,
142 : node_events: Mutex<Chan<NodeEvent>>,
143 : world: Arc<World>,
144 : pub(crate) rng: Mutex<StdRng>,
145 : }
146 :
147 : impl Node {
148 11050 : pub fn new(id: NodeId, world: Arc<World>, rng: StdRng) -> Node {
149 11050 : Node {
150 11050 : id,
151 11050 : node_events: Mutex::new(Chan::new()),
152 11050 : world,
153 11050 : rng: Mutex::new(rng),
154 11050 : }
155 11050 : }
156 :
157 : /// Spawn a new thread with this node context.
158 19372 : pub fn launch(self: &Arc<Self>, f: impl FnOnce(NodeOs) + Send + 'static) -> ExternalHandle {
159 19372 : let node = self.clone();
160 19372 : let world = self.world.clone();
161 19372 : self.world.runtime.lock().spawn(move || {
162 19016 : f(NodeOs::new(world, node.clone()));
163 19372 : })
164 19372 : }
165 :
166 : /// Returns a channel to receive Accepts and internal messages.
167 20149 : pub fn node_events(&self) -> Chan<NodeEvent> {
168 20149 : self.node_events.lock().clone()
169 20149 : }
170 :
171 : /// This will drop all in-flight Accept messages.
172 0 : pub fn replug_node_events(&self, chan: Chan<NodeEvent>) {
173 0 : *self.node_events.lock() = chan;
174 0 : }
175 :
176 : /// Append event to the world's log.
177 28221 : pub fn log_event(&self, data: String) {
178 28221 : self.world.add_event(self.id, data)
179 28221 : }
180 : }
|