Line data Source code
1 : use parking_lot::Mutex;
2 : use rand::{rngs::StdRng, SeedableRng};
3 : use std::{
4 : ops::DerefMut,
5 : sync::{mpsc, Arc},
6 : };
7 :
8 : use crate::{
9 : executor::{ExternalHandle, Runtime},
10 : network::NetworkTask,
11 : options::NetworkOptions,
12 : proto::{NodeEvent, SimEvent},
13 : time::Timing,
14 : };
15 :
16 : use super::{chan::Chan, network::TCP, node_os::NodeOs};
17 :
18 : pub type NodeId = u32;
19 :
20 : /// World contains simulation state.
21 : pub struct World {
22 : nodes: Mutex<Vec<Arc<Node>>>,
23 : /// Random number generator.
24 : rng: Mutex<StdRng>,
25 : /// Internal event log.
26 : events: Mutex<Vec<SimEvent>>,
27 : /// Separate task that processes all network messages.
28 : network_task: Arc<NetworkTask>,
29 : /// Runtime for running threads and moving time.
30 : runtime: Mutex<Runtime>,
31 : /// To get current time.
32 : timing: Arc<Timing>,
33 : }
34 :
35 : impl World {
36 2028 : pub fn new(seed: u64, options: Arc<NetworkOptions>) -> World {
37 2028 : let timing = Arc::new(Timing::new());
38 2028 : let mut runtime = Runtime::new(timing.clone());
39 2028 :
40 2028 : let (tx, rx) = mpsc::channel();
41 2028 :
42 2028 : runtime.spawn(move || {
43 2028 : // create and start network background thread, and send it back via the channel
44 2028 : NetworkTask::start_new(options, tx)
45 2028 : });
46 :
47 : // wait for the network task to start
48 4056 : while runtime.step() {}
49 :
50 2028 : let network_task = rx.recv().unwrap();
51 2028 :
52 2028 : World {
53 2028 : nodes: Mutex::new(Vec::new()),
54 2028 : rng: Mutex::new(StdRng::seed_from_u64(seed)),
55 2028 : events: Mutex::new(Vec::new()),
56 2028 : network_task,
57 2028 : runtime: Mutex::new(runtime),
58 2028 : timing,
59 2028 : }
60 2028 : }
61 :
62 1606528 : pub fn step(&self) -> bool {
63 1606528 : self.runtime.lock().step()
64 1606528 : }
65 :
66 2 : pub fn get_thread_step_count(&self) -> u64 {
67 2 : self.runtime.lock().step_counter
68 2 : }
69 :
70 : /// Create a new random number generator.
71 179373 : pub fn new_rng(&self) -> StdRng {
72 179373 : let mut rng = self.rng.lock();
73 179373 : StdRng::from_rng(rng.deref_mut()).unwrap()
74 179373 : }
75 :
76 : /// Create a new node.
77 43039 : pub fn new_node(self: &Arc<Self>) -> Arc<Node> {
78 43039 : let mut nodes = self.nodes.lock();
79 43039 : let id = nodes.len() as NodeId;
80 43039 : let node = Arc::new(Node::new(id, self.clone(), self.new_rng()));
81 43039 : nodes.push(node.clone());
82 43039 : node
83 43039 : }
84 :
85 : /// Get an internal node state by id.
86 136334 : fn get_node(&self, id: NodeId) -> Option<Arc<Node>> {
87 136334 : let nodes = self.nodes.lock();
88 136334 : let num = id as usize;
89 136334 : if num < nodes.len() {
90 136334 : Some(nodes[num].clone())
91 : } else {
92 0 : None
93 : }
94 136334 : }
95 :
96 2005 : pub fn stop_all(&self) {
97 2005 : self.runtime.lock().crash_all_threads();
98 2005 : }
99 :
100 : /// Returns a writable end of a TCP connection, to send src->dst messages.
101 136334 : pub fn open_tcp(self: &Arc<World>, dst: NodeId) -> TCP {
102 136334 : // TODO: replace unwrap() with /dev/null socket.
103 136334 : let dst = self.get_node(dst).unwrap();
104 136334 : let dst_accept = dst.node_events.lock().clone();
105 136334 :
106 136334 : let rng = self.new_rng();
107 136334 : self.network_task.start_new_connection(rng, dst_accept)
108 136334 : }
109 :
110 : /// Get current time.
111 3092583 : pub fn now(&self) -> u64 {
112 3092583 : self.timing.now()
113 3092583 : }
114 :
115 : /// Get a copy of the internal clock.
116 4012 : pub fn clock(&self) -> Arc<Timing> {
117 4012 : self.timing.clone()
118 4012 : }
119 :
120 110806 : pub fn add_event(&self, node: NodeId, data: String) {
121 110806 : let time = self.now();
122 110806 : self.events.lock().push(SimEvent { time, node, data });
123 110806 : }
124 :
125 2002 : pub fn take_events(&self) -> Vec<SimEvent> {
126 2002 : let mut events = self.events.lock();
127 2002 : let mut res = Vec::new();
128 2002 : std::mem::swap(&mut res, &mut events);
129 2002 : res
130 2002 : }
131 :
132 2003 : pub fn deallocate(&self) {
133 2003 : self.stop_all();
134 2003 : self.timing.clear();
135 2003 : self.nodes.lock().clear();
136 2003 : }
137 : }
138 :
139 : /// Internal node state.
140 : pub struct Node {
141 : pub id: NodeId,
142 : node_events: Mutex<Chan<NodeEvent>>,
143 : world: Arc<World>,
144 : pub(crate) rng: Mutex<StdRng>,
145 : }
146 :
147 : impl Node {
148 43039 : pub fn new(id: NodeId, world: Arc<World>, rng: StdRng) -> Node {
149 43039 : Node {
150 43039 : id,
151 43039 : node_events: Mutex::new(Chan::new()),
152 43039 : world,
153 43039 : rng: Mutex::new(rng),
154 43039 : }
155 43039 : }
156 :
157 : /// Spawn a new thread with this node context.
158 76704 : pub fn launch(self: &Arc<Self>, f: impl FnOnce(NodeOs) + Send + 'static) -> ExternalHandle {
159 76704 : let node = self.clone();
160 76704 : let world = self.world.clone();
161 76704 : self.world.runtime.lock().spawn(move || {
162 75318 : f(NodeOs::new(world, node.clone()));
163 76704 : })
164 76704 : }
165 :
166 : /// Returns a channel to receive Accepts and internal messages.
167 78411 : pub fn node_events(&self) -> Chan<NodeEvent> {
168 78411 : self.node_events.lock().clone()
169 78411 : }
170 :
171 : /// This will drop all in-flight Accept messages.
172 0 : pub fn replug_node_events(&self, chan: Chan<NodeEvent>) {
173 0 : *self.node_events.lock() = chan;
174 0 : }
175 :
176 : /// Append event to the world's log.
177 110806 : pub fn log_event(&self, data: String) {
178 110806 : self.world.add_event(self.id, data)
179 110806 : }
180 : }
|