Line data Source code
1 : use std::{
2 : cmp::Ordering,
3 : collections::{BinaryHeap, VecDeque},
4 : fmt::{self, Debug},
5 : ops::DerefMut,
6 : sync::{mpsc, Arc},
7 : };
8 :
9 : use parking_lot::{
10 : lock_api::{MappedMutexGuard, MutexGuard},
11 : Mutex, RawMutex,
12 : };
13 : use rand::rngs::StdRng;
14 : use tracing::debug;
15 :
16 : use crate::{
17 : executor::{self, ThreadContext},
18 : options::NetworkOptions,
19 : proto::NetEvent,
20 : proto::NodeEvent,
21 : };
22 :
23 : use super::{chan::Chan, proto::AnyMessage};
24 :
25 : pub struct NetworkTask {
26 : options: Arc<NetworkOptions>,
27 : connections: Mutex<Vec<VirtualConnection>>,
28 : /// min-heap of connections having something to deliver.
29 : events: Mutex<BinaryHeap<Event>>,
30 : task_context: Arc<ThreadContext>,
31 : }
32 :
33 : impl NetworkTask {
34 12168 : pub fn start_new(options: Arc<NetworkOptions>, tx: mpsc::Sender<Arc<NetworkTask>>) {
35 12168 : let ctx = executor::get_thread_ctx();
36 12168 : let task = Arc::new(Self {
37 12168 : options,
38 12168 : connections: Mutex::new(Vec::new()),
39 12168 : events: Mutex::new(BinaryHeap::new()),
40 12168 : task_context: ctx,
41 12168 : });
42 12168 :
43 12168 : // send the task upstream
44 12168 : tx.send(task.clone()).unwrap();
45 12168 :
46 12168 : // start the task
47 12168 : task.start();
48 12168 : }
49 :
50 812767 : pub fn start_new_connection(self: &Arc<Self>, rng: StdRng, dst_accept: Chan<NodeEvent>) -> TCP {
51 812767 : let now = executor::now();
52 812767 : let connection_id = self.connections.lock().len();
53 812767 :
54 812767 : let vc = VirtualConnection {
55 812767 : connection_id,
56 812767 : dst_accept,
57 812767 : dst_sockets: [Chan::new(), Chan::new()],
58 812767 : state: Mutex::new(ConnectionState {
59 812767 : buffers: [NetworkBuffer::new(None), NetworkBuffer::new(Some(now))],
60 812767 : rng,
61 812767 : }),
62 812767 : };
63 812767 : vc.schedule_timeout(self);
64 812767 : vc.send_connect(self);
65 812767 :
66 812767 : let recv_chan = vc.dst_sockets[0].clone();
67 812767 : self.connections.lock().push(vc);
68 812767 :
69 812767 : TCP {
70 812767 : net: self.clone(),
71 812767 : conn_id: connection_id,
72 812767 : dir: 0,
73 812767 : recv_chan,
74 812767 : }
75 812767 : }
76 : }
77 :
78 : // private functions
79 : impl NetworkTask {
80 : /// Schedule to wakeup network task (self) `after_ms` later to deliver
81 : /// messages of connection `id`.
82 4098408 : fn schedule(&self, id: usize, after_ms: u64) {
83 4098408 : self.events.lock().push(Event {
84 4098408 : time: executor::now() + after_ms,
85 4098408 : conn_id: id,
86 4098408 : });
87 4098408 : self.task_context.schedule_wakeup(after_ms);
88 4098408 : }
89 :
90 : /// Get locked connection `id`.
91 5468920 : fn get(&self, id: usize) -> MappedMutexGuard<'_, RawMutex, VirtualConnection> {
92 5468920 : MutexGuard::map(self.connections.lock(), |connections| {
93 5468920 : connections.get_mut(id).unwrap()
94 5468920 : })
95 5468920 : }
96 :
97 3859187 : fn collect_pending_events(&self, now: u64, vec: &mut Vec<Event>) {
98 3859187 : vec.clear();
99 3859187 : let mut events = self.events.lock();
100 7706212 : while let Some(event) = events.peek() {
101 7652499 : if event.time > now {
102 3805474 : break;
103 3847025 : }
104 3847025 : let event = events.pop().unwrap();
105 3847025 : vec.push(event);
106 : }
107 3859187 : }
108 :
109 12168 : fn start(self: &Arc<Self>) {
110 12168 : debug!("started network task");
111 :
112 12168 : let mut events = Vec::new();
113 : loop {
114 3871355 : let now = executor::now();
115 3871355 : self.collect_pending_events(now, &mut events);
116 :
117 3871355 : for event in events.drain(..) {
118 3847025 : let conn = self.get(event.conn_id);
119 3847025 : conn.process(self);
120 3847025 : }
121 :
122 : // block until wakeup
123 3859187 : executor::yield_me(-1);
124 : }
125 : }
126 : }
127 :
128 : // 0 - from node(0) to node(1)
129 : // 1 - from node(1) to node(0)
130 : type MessageDirection = u8;
131 :
132 7710 : fn sender_str(dir: MessageDirection) -> &'static str {
133 7710 : match dir {
134 1266 : 0 => "client",
135 6444 : 1 => "server",
136 0 : _ => unreachable!(),
137 : }
138 7710 : }
139 :
140 2082 : fn receiver_str(dir: MessageDirection) -> &'static str {
141 2082 : match dir {
142 948 : 0 => "server",
143 1134 : 1 => "client",
144 0 : _ => unreachable!(),
145 : }
146 2082 : }
147 :
148 : /// Virtual connection between two nodes.
149 : /// Node 0 is the creator of the connection (client),
150 : /// and node 1 is the acceptor (server).
151 : struct VirtualConnection {
152 : connection_id: usize,
153 : /// one-off chan, used to deliver Accept message to dst
154 : dst_accept: Chan<NodeEvent>,
155 : /// message sinks
156 : dst_sockets: [Chan<NetEvent>; 2],
157 : state: Mutex<ConnectionState>,
158 : }
159 :
160 : struct ConnectionState {
161 : buffers: [NetworkBuffer; 2],
162 : rng: StdRng,
163 : }
164 :
165 : impl VirtualConnection {
166 : /// Notify the future about the possible timeout.
167 2360529 : fn schedule_timeout(&self, net: &NetworkTask) {
168 2360529 : if let Some(timeout) = net.options.keepalive_timeout {
169 2360529 : net.schedule(self.connection_id, timeout);
170 2360529 : }
171 2360529 : }
172 :
173 : /// Send the handshake (Accept) to the server.
174 812767 : fn send_connect(&self, net: &NetworkTask) {
175 812767 : let now = executor::now();
176 812767 : let mut state = self.state.lock();
177 812767 : let delay = net.options.connect_delay.delay(&mut state.rng);
178 812767 : let buffer = &mut state.buffers[0];
179 812767 : assert!(buffer.buf.is_empty());
180 812767 : assert!(!buffer.recv_closed);
181 812767 : assert!(!buffer.send_closed);
182 812767 : assert!(buffer.last_recv.is_none());
183 :
184 812767 : let delay = if let Some(ms) = delay {
185 628750 : ms
186 : } else {
187 184017 : debug!("NET: TCP #{} dropped connect", self.connection_id);
188 184017 : buffer.send_closed = true;
189 184017 : return;
190 : };
191 :
192 : // Send a message into the future.
193 628750 : buffer
194 628750 : .buf
195 628750 : .push_back((now + delay, AnyMessage::InternalConnect));
196 628750 : net.schedule(self.connection_id, delay);
197 812767 : }
198 :
199 : /// Transmit some of the messages from the buffer to the nodes.
200 3847025 : fn process(&self, net: &Arc<NetworkTask>) {
201 3847025 : let now = executor::now();
202 3847025 :
203 3847025 : let mut state = self.state.lock();
204 :
205 11541075 : for direction in 0..2 {
206 7694050 : self.process_direction(
207 7694050 : net,
208 7694050 : state.deref_mut(),
209 7694050 : now,
210 7694050 : direction as MessageDirection,
211 7694050 : &self.dst_sockets[direction ^ 1],
212 7694050 : );
213 7694050 : }
214 :
215 : // Close the one side of the connection by timeout if the node
216 : // has not received any messages for a long time.
217 3847025 : if let Some(timeout) = net.options.keepalive_timeout {
218 3847025 : let mut to_close = [false, false];
219 11541075 : for direction in 0..2 {
220 7694050 : let buffer = &mut state.buffers[direction];
221 7694050 : if buffer.recv_closed {
222 1617766 : continue;
223 6076284 : }
224 6076284 : if let Some(last_recv) = buffer.last_recv {
225 5487243 : if now - last_recv >= timeout {
226 1213193 : debug!(
227 0 : "NET: connection {} timed out at {}",
228 0 : self.connection_id,
229 0 : receiver_str(direction as MessageDirection)
230 : );
231 1213193 : let node_idx = direction ^ 1;
232 1213193 : to_close[node_idx] = true;
233 4274050 : }
234 589041 : }
235 : }
236 3847025 : drop(state);
237 :
238 7694050 : for (node_idx, should_close) in to_close.iter().enumerate() {
239 7694050 : if *should_close {
240 1213193 : self.close(node_idx);
241 6480857 : }
242 : }
243 0 : }
244 3847025 : }
245 :
246 : /// Process messages in the buffer in the given direction.
247 7694050 : fn process_direction(
248 7694050 : &self,
249 7694050 : net: &Arc<NetworkTask>,
250 7694050 : state: &mut ConnectionState,
251 7694050 : now: u64,
252 7694050 : direction: MessageDirection,
253 7694050 : to_socket: &Chan<NetEvent>,
254 7694050 : ) {
255 7694050 : let buffer = &mut state.buffers[direction as usize];
256 7694050 : if buffer.recv_closed {
257 1617766 : assert!(buffer.buf.is_empty());
258 6076284 : }
259 :
260 9241812 : while !buffer.buf.is_empty() && buffer.buf.front().unwrap().0 <= now {
261 1547762 : let msg = buffer.buf.pop_front().unwrap().1;
262 1547762 :
263 1547762 : buffer.last_recv = Some(now);
264 1547762 : self.schedule_timeout(net);
265 1547762 :
266 1547762 : if let AnyMessage::InternalConnect = msg {
267 596632 : // TODO: assert to_socket is the server
268 596632 : let server_to_client = TCP {
269 596632 : net: net.clone(),
270 596632 : conn_id: self.connection_id,
271 596632 : dir: direction ^ 1,
272 596632 : recv_chan: to_socket.clone(),
273 596632 : };
274 596632 : // special case, we need to deliver new connection to a separate channel
275 596632 : self.dst_accept.send(NodeEvent::Accept(server_to_client));
276 951130 : } else {
277 951130 : to_socket.send(NetEvent::Message(msg));
278 951130 : }
279 : }
280 7694050 : }
281 :
282 : /// Try to send a message to the buffer, optionally dropping it and
283 : /// determining delivery timestamp.
284 1529280 : fn send(&self, net: &NetworkTask, direction: MessageDirection, msg: AnyMessage) {
285 1529280 : let now = executor::now();
286 1529280 : let mut state = self.state.lock();
287 :
288 1529280 : let (delay, close) = if let Some(ms) = net.options.send_delay.delay(&mut state.rng) {
289 1396372 : (ms, false)
290 : } else {
291 132908 : (0, true)
292 : };
293 :
294 1529280 : let buffer = &mut state.buffers[direction as usize];
295 1529280 : if buffer.send_closed {
296 171030 : debug!(
297 0 : "NET: TCP #{} dropped message {:?} (broken pipe)",
298 : self.connection_id, msg
299 : );
300 171030 : return;
301 1358250 : }
302 1358250 :
303 1358250 : if close {
304 98112 : debug!(
305 0 : "NET: TCP #{} dropped message {:?} (pipe just broke)",
306 : self.connection_id, msg
307 : );
308 98112 : buffer.send_closed = true;
309 98112 : return;
310 1260138 : }
311 1260138 :
312 1260138 : if buffer.recv_closed {
313 151009 : debug!(
314 0 : "NET: TCP #{} dropped message {:?} (recv closed)",
315 : self.connection_id, msg
316 : );
317 151009 : return;
318 1109129 : }
319 1109129 :
320 1109129 : // Send a message into the future.
321 1109129 : buffer.buf.push_back((now + delay, msg));
322 1109129 : net.schedule(self.connection_id, delay);
323 1529280 : }
324 :
325 : /// Close the connection. Only one side of the connection will be closed,
326 : /// and no further messages will be delivered. The other side will not be notified.
327 1305808 : fn close(&self, node_idx: usize) {
328 1305808 : let mut state = self.state.lock();
329 1305808 : let recv_buffer = &mut state.buffers[1 ^ node_idx];
330 1305808 : if recv_buffer.recv_closed {
331 6881 : debug!(
332 0 : "NET: TCP #{} closed twice at {}",
333 0 : self.connection_id,
334 0 : sender_str(node_idx as MessageDirection),
335 : );
336 6881 : return;
337 1298927 : }
338 1298927 :
339 1298927 : debug!(
340 0 : "NET: TCP #{} closed at {}",
341 0 : self.connection_id,
342 0 : sender_str(node_idx as MessageDirection),
343 : );
344 1298927 : recv_buffer.recv_closed = true;
345 1298927 : for msg in recv_buffer.buf.drain(..) {
346 121649 : debug!(
347 0 : "NET: TCP #{} dropped message {:?} (closed)",
348 : self.connection_id, msg
349 : );
350 : }
351 :
352 1298927 : let send_buffer = &mut state.buffers[node_idx];
353 1298927 : send_buffer.send_closed = true;
354 1298927 : drop(state);
355 1298927 :
356 1298927 : // TODO: notify the other side?
357 1298927 :
358 1298927 : self.dst_sockets[node_idx].send(NetEvent::Closed);
359 1305808 : }
360 : }
361 :
362 : struct NetworkBuffer {
363 : /// Messages paired with time of delivery
364 : buf: VecDeque<(u64, AnyMessage)>,
365 : /// True if the connection is closed on the receiving side,
366 : /// i.e. no more messages from the buffer will be delivered.
367 : recv_closed: bool,
368 : /// True if the connection is closed on the sending side,
369 : /// i.e. no more messages will be added to the buffer.
370 : send_closed: bool,
371 : /// Last time a message was delivered from the buffer.
372 : /// If None, it means that the server is the receiver and
373 : /// it has not yet aware of this connection (i.e. has not
374 : /// received the Accept).
375 : last_recv: Option<u64>,
376 : }
377 :
378 : impl NetworkBuffer {
379 1625534 : fn new(last_recv: Option<u64>) -> Self {
380 1625534 : Self {
381 1625534 : buf: VecDeque::new(),
382 1625534 : recv_closed: false,
383 1625534 : send_closed: false,
384 1625534 : last_recv,
385 1625534 : }
386 1625534 : }
387 : }
388 :
389 : /// Single end of a bidirectional network stream without reordering (TCP-like).
390 : /// Reads are implemented using channels, writes go to the buffer inside VirtualConnection.
391 : pub struct TCP {
392 : net: Arc<NetworkTask>,
393 : conn_id: usize,
394 : dir: MessageDirection,
395 : recv_chan: Chan<NetEvent>,
396 : }
397 :
398 : impl Debug for TCP {
399 5406 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
400 5406 : write!(f, "TCP #{} ({})", self.conn_id, sender_str(self.dir),)
401 5406 : }
402 : }
403 :
404 : impl TCP {
405 : /// Send a message to the other side. It's guaranteed that it will not arrive
406 : /// before the arrival of all messages sent earlier.
407 1529280 : pub fn send(&self, msg: AnyMessage) {
408 1529280 : let conn = self.net.get(self.conn_id);
409 1529280 : conn.send(&self.net, self.dir, msg);
410 1529280 : }
411 :
412 : /// Get a channel to receive incoming messages.
413 10158291 : pub fn recv_chan(&self) -> Chan<NetEvent> {
414 10158291 : self.recv_chan.clone()
415 10158291 : }
416 :
417 6961122 : pub fn connection_id(&self) -> usize {
418 6961122 : self.conn_id
419 6961122 : }
420 :
421 92615 : pub fn close(&self) {
422 92615 : let conn = self.net.get(self.conn_id);
423 92615 : conn.close(self.dir as usize);
424 92615 : }
425 : }
426 : struct Event {
427 : time: u64,
428 : conn_id: usize,
429 : }
430 :
431 : // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
432 : // to get that.
433 : impl PartialOrd for Event {
434 23458131 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
435 23458131 : Some(self.cmp(other))
436 23458131 : }
437 : }
438 :
439 : impl Ord for Event {
440 23458131 : fn cmp(&self, other: &Self) -> Ordering {
441 23458131 : (other.time, other.conn_id).cmp(&(self.time, self.conn_id))
442 23458131 : }
443 : }
444 :
445 : impl PartialEq for Event {
446 0 : fn eq(&self, other: &Self) -> bool {
447 0 : (other.time, other.conn_id) == (self.time, self.conn_id)
448 0 : }
449 : }
450 :
451 : impl Eq for Event {}
|