Line data Source code
1 : use std::cmp::Ordering;
2 : use std::collections::{BinaryHeap, VecDeque};
3 : use std::fmt::{self, Debug};
4 : use std::ops::DerefMut;
5 : use std::sync::{Arc, mpsc};
6 :
7 : use parking_lot::lock_api::{MappedMutexGuard, MutexGuard};
8 : use parking_lot::{Mutex, RawMutex};
9 : use rand::rngs::StdRng;
10 : use tracing::debug;
11 :
12 : use super::chan::Chan;
13 : use super::proto::AnyMessage;
14 : use crate::executor::{self, ThreadContext};
15 : use crate::options::NetworkOptions;
16 : use crate::proto::{NetEvent, NodeEvent};
17 :
18 : pub struct NetworkTask {
19 : options: Arc<NetworkOptions>,
20 : connections: Mutex<Vec<VirtualConnection>>,
21 : /// min-heap of connections having something to deliver.
22 : events: Mutex<BinaryHeap<Event>>,
23 : task_context: Arc<ThreadContext>,
24 : }
25 :
26 : impl NetworkTask {
27 528 : pub fn start_new(options: Arc<NetworkOptions>, tx: mpsc::Sender<Arc<NetworkTask>>) {
28 528 : let ctx = executor::get_thread_ctx();
29 528 : let task = Arc::new(Self {
30 528 : options,
31 528 : connections: Mutex::new(Vec::new()),
32 528 : events: Mutex::new(BinaryHeap::new()),
33 528 : task_context: ctx,
34 528 : });
35 528 :
36 528 : // send the task upstream
37 528 : tx.send(task.clone()).unwrap();
38 528 :
39 528 : // start the task
40 528 : task.start();
41 528 : }
42 :
43 35335 : pub fn start_new_connection(self: &Arc<Self>, rng: StdRng, dst_accept: Chan<NodeEvent>) -> TCP {
44 35335 : let now = executor::now();
45 35335 : let connection_id = self.connections.lock().len();
46 35335 :
47 35335 : let vc = VirtualConnection {
48 35335 : connection_id,
49 35335 : dst_accept,
50 35335 : dst_sockets: [Chan::new(), Chan::new()],
51 35335 : state: Mutex::new(ConnectionState {
52 35335 : buffers: [NetworkBuffer::new(None), NetworkBuffer::new(Some(now))],
53 35335 : rng,
54 35335 : }),
55 35335 : };
56 35335 : vc.schedule_timeout(self);
57 35335 : vc.send_connect(self);
58 35335 :
59 35335 : let recv_chan = vc.dst_sockets[0].clone();
60 35335 : self.connections.lock().push(vc);
61 35335 :
62 35335 : TCP {
63 35335 : net: self.clone(),
64 35335 : conn_id: connection_id,
65 35335 : dir: 0,
66 35335 : recv_chan,
67 35335 : }
68 35335 : }
69 : }
70 :
71 : // private functions
72 : impl NetworkTask {
73 : /// Schedule to wakeup network task (self) `after_ms` later to deliver
74 : /// messages of connection `id`.
75 179596 : fn schedule(&self, id: usize, after_ms: u64) {
76 179596 : self.events.lock().push(Event {
77 179596 : time: executor::now() + after_ms,
78 179596 : conn_id: id,
79 179596 : });
80 179596 : self.task_context.schedule_wakeup(after_ms);
81 179596 : }
82 :
83 : /// Get locked connection `id`.
84 240366 : fn get(&self, id: usize) -> MappedMutexGuard<'_, RawMutex, VirtualConnection> {
85 240366 : MutexGuard::map(self.connections.lock(), |connections| {
86 240366 : connections.get_mut(id).unwrap()
87 240366 : })
88 240366 : }
89 :
90 168965 : fn collect_pending_events(&self, now: u64, vec: &mut Vec<Event>) {
91 168965 : vec.clear();
92 168965 : let mut events = self.events.lock();
93 337403 : while let Some(event) = events.peek() {
94 334601 : if event.time > now {
95 166163 : break;
96 168438 : }
97 168438 : let event = events.pop().unwrap();
98 168438 : vec.push(event);
99 : }
100 168965 : }
101 :
102 528 : fn start(self: &Arc<Self>) {
103 528 : debug!("started network task");
104 :
105 528 : let mut events = Vec::new();
106 : loop {
107 169493 : let now = executor::now();
108 169493 : self.collect_pending_events(now, &mut events);
109 :
110 169493 : for event in events.drain(..) {
111 168438 : let conn = self.get(event.conn_id);
112 168438 : conn.process(self);
113 168438 : }
114 :
115 : // block until wakeup
116 168965 : executor::yield_me(-1);
117 : }
118 : }
119 : }
120 :
121 : // 0 - from node(0) to node(1)
122 : // 1 - from node(1) to node(0)
123 : type MessageDirection = u8;
124 :
125 1311 : fn sender_str(dir: MessageDirection) -> &'static str {
126 1311 : match dir {
127 222 : 0 => "client",
128 1089 : 1 => "server",
129 0 : _ => unreachable!(),
130 : }
131 1311 : }
132 :
133 355 : fn receiver_str(dir: MessageDirection) -> &'static str {
134 355 : match dir {
135 163 : 0 => "server",
136 192 : 1 => "client",
137 0 : _ => unreachable!(),
138 : }
139 355 : }
140 :
141 : /// Virtual connection between two nodes.
142 : /// Node 0 is the creator of the connection (client),
143 : /// and node 1 is the acceptor (server).
144 : struct VirtualConnection {
145 : connection_id: usize,
146 : /// one-off chan, used to deliver Accept message to dst
147 : dst_accept: Chan<NodeEvent>,
148 : /// message sinks
149 : dst_sockets: [Chan<NetEvent>; 2],
150 : state: Mutex<ConnectionState>,
151 : }
152 :
153 : struct ConnectionState {
154 : buffers: [NetworkBuffer; 2],
155 : rng: StdRng,
156 : }
157 :
158 : impl VirtualConnection {
159 : /// Notify the future about the possible timeout.
160 103315 : fn schedule_timeout(&self, net: &NetworkTask) {
161 103315 : if let Some(timeout) = net.options.keepalive_timeout {
162 103315 : net.schedule(self.connection_id, timeout);
163 103315 : }
164 103315 : }
165 :
166 : /// Send the handshake (Accept) to the server.
167 35335 : fn send_connect(&self, net: &NetworkTask) {
168 35335 : let now = executor::now();
169 35335 : let mut state = self.state.lock();
170 35335 : let delay = net.options.connect_delay.delay(&mut state.rng);
171 35335 : let buffer = &mut state.buffers[0];
172 35335 : assert!(buffer.buf.is_empty());
173 35335 : assert!(!buffer.recv_closed);
174 35335 : assert!(!buffer.send_closed);
175 35335 : assert!(buffer.last_recv.is_none());
176 :
177 35335 : let delay = if let Some(ms) = delay {
178 27178 : ms
179 : } else {
180 8157 : debug!("NET: TCP #{} dropped connect", self.connection_id);
181 8157 : buffer.send_closed = true;
182 8157 : return;
183 : };
184 :
185 : // Send a message into the future.
186 27178 : buffer
187 27178 : .buf
188 27178 : .push_back((now + delay, AnyMessage::InternalConnect));
189 27178 : net.schedule(self.connection_id, delay);
190 35335 : }
191 :
192 : /// Transmit some of the messages from the buffer to the nodes.
193 168438 : fn process(&self, net: &Arc<NetworkTask>) {
194 168438 : let now = executor::now();
195 168438 :
196 168438 : let mut state = self.state.lock();
197 :
198 505314 : for direction in 0..2 {
199 336876 : self.process_direction(
200 336876 : net,
201 336876 : state.deref_mut(),
202 336876 : now,
203 336876 : direction as MessageDirection,
204 336876 : &self.dst_sockets[direction ^ 1],
205 336876 : );
206 336876 : }
207 :
208 : // Close the one side of the connection by timeout if the node
209 : // has not received any messages for a long time.
210 168438 : if let Some(timeout) = net.options.keepalive_timeout {
211 168438 : let mut to_close = [false, false];
212 505314 : for direction in 0..2 {
213 336876 : let buffer = &mut state.buffers[direction];
214 336876 : if buffer.recv_closed {
215 70850 : continue;
216 266026 : }
217 266026 : if let Some(last_recv) = buffer.last_recv {
218 240265 : if now - last_recv >= timeout {
219 53249 : debug!(
220 0 : "NET: connection {} timed out at {}",
221 0 : self.connection_id,
222 0 : receiver_str(direction as MessageDirection)
223 : );
224 53249 : let node_idx = direction ^ 1;
225 53249 : to_close[node_idx] = true;
226 187016 : }
227 25761 : }
228 : }
229 168438 : drop(state);
230 :
231 336876 : for (node_idx, should_close) in to_close.iter().enumerate() {
232 336876 : if *should_close {
233 53249 : self.close(node_idx);
234 283627 : }
235 : }
236 0 : }
237 168438 : }
238 :
239 : /// Process messages in the buffer in the given direction.
240 336876 : fn process_direction(
241 336876 : &self,
242 336876 : net: &Arc<NetworkTask>,
243 336876 : state: &mut ConnectionState,
244 336876 : now: u64,
245 336876 : direction: MessageDirection,
246 336876 : to_socket: &Chan<NetEvent>,
247 336876 : ) {
248 336876 : let buffer = &mut state.buffers[direction as usize];
249 336876 : if buffer.recv_closed {
250 70850 : assert!(buffer.buf.is_empty());
251 266026 : }
252 :
253 404856 : while !buffer.buf.is_empty() && buffer.buf.front().unwrap().0 <= now {
254 67980 : let msg = buffer.buf.pop_front().unwrap().1;
255 67980 :
256 67980 : buffer.last_recv = Some(now);
257 67980 : self.schedule_timeout(net);
258 67980 :
259 67980 : if let AnyMessage::InternalConnect = msg {
260 25874 : // TODO: assert to_socket is the server
261 25874 : let server_to_client = TCP {
262 25874 : net: net.clone(),
263 25874 : conn_id: self.connection_id,
264 25874 : dir: direction ^ 1,
265 25874 : recv_chan: to_socket.clone(),
266 25874 : };
267 25874 : // special case, we need to deliver new connection to a separate channel
268 25874 : self.dst_accept.send(NodeEvent::Accept(server_to_client));
269 42106 : } else {
270 42106 : to_socket.send(NetEvent::Message(msg));
271 42106 : }
272 : }
273 336876 : }
274 :
275 : /// Try to send a message to the buffer, optionally dropping it and
276 : /// determining delivery timestamp.
277 67864 : fn send(&self, net: &NetworkTask, direction: MessageDirection, msg: AnyMessage) {
278 67864 : let now = executor::now();
279 67864 : let mut state = self.state.lock();
280 :
281 67864 : let (delay, close) = if let Some(ms) = net.options.send_delay.delay(&mut state.rng) {
282 61553 : (ms, false)
283 : } else {
284 6311 : (0, true)
285 : };
286 :
287 67864 : let buffer = &mut state.buffers[direction as usize];
288 67864 : if buffer.send_closed {
289 7663 : debug!(
290 0 : "NET: TCP #{} dropped message {:?} (broken pipe)",
291 : self.connection_id, msg
292 : );
293 7663 : return;
294 60201 : }
295 60201 :
296 60201 : if close {
297 4680 : debug!(
298 0 : "NET: TCP #{} dropped message {:?} (pipe just broke)",
299 : self.connection_id, msg
300 : );
301 4680 : buffer.send_closed = true;
302 4680 : return;
303 55521 : }
304 55521 :
305 55521 : if buffer.recv_closed {
306 6418 : debug!(
307 0 : "NET: TCP #{} dropped message {:?} (recv closed)",
308 : self.connection_id, msg
309 : );
310 6418 : return;
311 49103 : }
312 49103 :
313 49103 : // Send a message into the future.
314 49103 : buffer.buf.push_back((now + delay, msg));
315 49103 : net.schedule(self.connection_id, delay);
316 67864 : }
317 :
318 : /// Close the connection. Only one side of the connection will be closed,
319 : /// and no further messages will be delivered. The other side will not be notified.
320 57313 : fn close(&self, node_idx: usize) {
321 57313 : let mut state = self.state.lock();
322 57313 : let recv_buffer = &mut state.buffers[1 ^ node_idx];
323 57313 : if recv_buffer.recv_closed {
324 367 : debug!(
325 0 : "NET: TCP #{} closed twice at {}",
326 0 : self.connection_id,
327 0 : sender_str(node_idx as MessageDirection),
328 : );
329 367 : return;
330 56946 : }
331 56946 :
332 56946 : debug!(
333 0 : "NET: TCP #{} closed at {}",
334 0 : self.connection_id,
335 0 : sender_str(node_idx as MessageDirection),
336 : );
337 56946 : recv_buffer.recv_closed = true;
338 56946 : for msg in recv_buffer.buf.drain(..) {
339 5490 : debug!(
340 0 : "NET: TCP #{} dropped message {:?} (closed)",
341 : self.connection_id, msg
342 : );
343 : }
344 :
345 56946 : let send_buffer = &mut state.buffers[node_idx];
346 56946 : send_buffer.send_closed = true;
347 56946 : drop(state);
348 56946 :
349 56946 : // TODO: notify the other side?
350 56946 :
351 56946 : self.dst_sockets[node_idx].send(NetEvent::Closed);
352 57313 : }
353 : }
354 :
355 : struct NetworkBuffer {
356 : /// Messages paired with time of delivery
357 : buf: VecDeque<(u64, AnyMessage)>,
358 : /// True if the connection is closed on the receiving side,
359 : /// i.e. no more messages from the buffer will be delivered.
360 : recv_closed: bool,
361 : /// True if the connection is closed on the sending side,
362 : /// i.e. no more messages will be added to the buffer.
363 : send_closed: bool,
364 : /// Last time a message was delivered from the buffer.
365 : /// If None, it means that the server is the receiver and
366 : /// it has not yet aware of this connection (i.e. has not
367 : /// received the Accept).
368 : last_recv: Option<u64>,
369 : }
370 :
371 : impl NetworkBuffer {
372 70670 : fn new(last_recv: Option<u64>) -> Self {
373 70670 : Self {
374 70670 : buf: VecDeque::new(),
375 70670 : recv_closed: false,
376 70670 : send_closed: false,
377 70670 : last_recv,
378 70670 : }
379 70670 : }
380 : }
381 :
382 : /// Single end of a bidirectional network stream without reordering (TCP-like).
383 : /// Reads are implemented using channels, writes go to the buffer inside VirtualConnection.
384 : pub struct TCP {
385 : net: Arc<NetworkTask>,
386 : conn_id: usize,
387 : dir: MessageDirection,
388 : recv_chan: Chan<NetEvent>,
389 : }
390 :
391 : impl Debug for TCP {
392 911 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
393 911 : write!(f, "TCP #{} ({})", self.conn_id, sender_str(self.dir),)
394 911 : }
395 : }
396 :
397 : impl TCP {
398 : /// Send a message to the other side. It's guaranteed that it will not arrive
399 : /// before the arrival of all messages sent earlier.
400 67864 : pub fn send(&self, msg: AnyMessage) {
401 67864 : let conn = self.net.get(self.conn_id);
402 67864 : conn.send(&self.net, self.dir, msg);
403 67864 : }
404 :
405 : /// Get a channel to receive incoming messages.
406 433412 : pub fn recv_chan(&self) -> Chan<NetEvent> {
407 433412 : self.recv_chan.clone()
408 433412 : }
409 :
410 291002 : pub fn connection_id(&self) -> usize {
411 291002 : self.conn_id
412 291002 : }
413 :
414 4064 : pub fn close(&self) {
415 4064 : let conn = self.net.get(self.conn_id);
416 4064 : conn.close(self.dir as usize);
417 4064 : }
418 : }
419 : struct Event {
420 : time: u64,
421 : conn_id: usize,
422 : }
423 :
424 : // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
425 : // to get that.
426 : impl PartialOrd for Event {
427 1009291 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
428 1009291 : Some(self.cmp(other))
429 1009291 : }
430 : }
431 :
432 : impl Ord for Event {
433 1009291 : fn cmp(&self, other: &Self) -> Ordering {
434 1009291 : (other.time, other.conn_id).cmp(&(self.time, self.conn_id))
435 1009291 : }
436 : }
437 :
438 : impl PartialEq for Event {
439 0 : fn eq(&self, other: &Self) -> bool {
440 0 : (other.time, other.conn_id) == (self.time, self.conn_id)
441 0 : }
442 : }
443 :
444 : impl Eq for Event {}
|