LCOV - code coverage report
Current view: top level - libs/desim/src - network.rs (source / functions) Coverage Total Hit
Test: 190869232aac3a234374e5bb62582e91cf5f5818.info Lines: 95.7 % 280 268
Test Date: 2024-02-23 13:21:27 Functions: 90.9 % 33 30

            Line data    Source code
       1              : use std::{
       2              :     cmp::Ordering,
       3              :     collections::{BinaryHeap, VecDeque},
       4              :     fmt::{self, Debug},
       5              :     ops::DerefMut,
       6              :     sync::{mpsc, Arc},
       7              : };
       8              : 
       9              : use parking_lot::{
      10              :     lock_api::{MappedMutexGuard, MutexGuard},
      11              :     Mutex, RawMutex,
      12              : };
      13              : use rand::rngs::StdRng;
      14              : use tracing::debug;
      15              : 
      16              : use crate::{
      17              :     executor::{self, ThreadContext},
      18              :     options::NetworkOptions,
      19              :     proto::NetEvent,
      20              :     proto::NodeEvent,
      21              : };
      22              : 
      23              : use super::{chan::Chan, proto::AnyMessage};
      24              : 
      25              : pub struct NetworkTask {
      26              :     options: Arc<NetworkOptions>,
      27              :     connections: Mutex<Vec<VirtualConnection>>,
      28              :     /// min-heap of connections having something to deliver.
      29              :     events: Mutex<BinaryHeap<Event>>,
      30              :     task_context: Arc<ThreadContext>,
      31              : }
      32              : 
      33              : impl NetworkTask {
      34         4056 :     pub fn start_new(options: Arc<NetworkOptions>, tx: mpsc::Sender<Arc<NetworkTask>>) {
      35         4056 :         let ctx = executor::get_thread_ctx();
      36         4056 :         let task = Arc::new(Self {
      37         4056 :             options,
      38         4056 :             connections: Mutex::new(Vec::new()),
      39         4056 :             events: Mutex::new(BinaryHeap::new()),
      40         4056 :             task_context: ctx,
      41         4056 :         });
      42         4056 : 
      43         4056 :         // send the task upstream
      44         4056 :         tx.send(task.clone()).unwrap();
      45         4056 : 
      46         4056 :         // start the task
      47         4056 :         task.start();
      48         4056 :     }
      49              : 
      50       270691 :     pub fn start_new_connection(self: &Arc<Self>, rng: StdRng, dst_accept: Chan<NodeEvent>) -> TCP {
      51       270691 :         let now = executor::now();
      52       270691 :         let connection_id = self.connections.lock().len();
      53       270691 : 
      54       270691 :         let vc = VirtualConnection {
      55       270691 :             connection_id,
      56       270691 :             dst_accept,
      57       270691 :             dst_sockets: [Chan::new(), Chan::new()],
      58       270691 :             state: Mutex::new(ConnectionState {
      59       270691 :                 buffers: [NetworkBuffer::new(None), NetworkBuffer::new(Some(now))],
      60       270691 :                 rng,
      61       270691 :             }),
      62       270691 :         };
      63       270691 :         vc.schedule_timeout(self);
      64       270691 :         vc.send_connect(self);
      65       270691 : 
      66       270691 :         let recv_chan = vc.dst_sockets[0].clone();
      67       270691 :         self.connections.lock().push(vc);
      68       270691 : 
      69       270691 :         TCP {
      70       270691 :             net: self.clone(),
      71       270691 :             conn_id: connection_id,
      72       270691 :             dir: 0,
      73       270691 :             recv_chan,
      74       270691 :         }
      75       270691 :     }
      76              : }
      77              : 
      78              : // private functions
      79              : impl NetworkTask {
      80              :     /// Schedule to wakeup network task (self) `after_ms` later to deliver
      81              :     /// messages of connection `id`.
      82      1365213 :     fn schedule(&self, id: usize, after_ms: u64) {
      83      1365213 :         self.events.lock().push(Event {
      84      1365213 :             time: executor::now() + after_ms,
      85      1365213 :             conn_id: id,
      86      1365213 :         });
      87      1365213 :         self.task_context.schedule_wakeup(after_ms);
      88      1365213 :     }
      89              : 
      90              :     /// Get locked connection `id`.
      91      1820362 :     fn get(&self, id: usize) -> MappedMutexGuard<'_, RawMutex, VirtualConnection> {
      92      1820362 :         MutexGuard::map(self.connections.lock(), |connections| {
      93      1820362 :             connections.get_mut(id).unwrap()
      94      1820362 :         })
      95      1820362 :     }
      96              : 
      97      1283394 :     fn collect_pending_events(&self, now: u64, vec: &mut Vec<Event>) {
      98      1283394 :         vec.clear();
      99      1283394 :         let mut events = self.events.lock();
     100      2562734 :         while let Some(event) = events.peek() {
     101      2545187 :             if event.time > now {
     102      1265847 :                 break;
     103      1279340 :             }
     104      1279340 :             let event = events.pop().unwrap();
     105      1279340 :             vec.push(event);
     106              :         }
     107      1283394 :     }
     108              : 
     109         4056 :     fn start(self: &Arc<Self>) {
     110         4056 :         debug!("started network task");
     111              : 
     112         4056 :         let mut events = Vec::new();
     113              :         loop {
     114      1287450 :             let now = executor::now();
     115      1287450 :             self.collect_pending_events(now, &mut events);
     116              : 
     117      1287450 :             for event in events.drain(..) {
     118      1279340 :                 let conn = self.get(event.conn_id);
     119      1279340 :                 conn.process(self);
     120      1279340 :             }
     121              : 
     122              :             // block until wakeup
     123      1283394 :             executor::yield_me(-1);
     124              :         }
     125              :     }
     126              : }
     127              : 
     128              : // 0 - from node(0) to node(1)
     129              : // 1 - from node(1) to node(0)
     130              : type MessageDirection = u8;
     131              : 
     132         2570 : fn sender_str(dir: MessageDirection) -> &'static str {
     133         2570 :     match dir {
     134          422 :         0 => "client",
     135         2148 :         1 => "server",
     136            0 :         _ => unreachable!(),
     137              :     }
     138         2570 : }
     139              : 
     140          694 : fn receiver_str(dir: MessageDirection) -> &'static str {
     141          694 :     match dir {
     142          316 :         0 => "server",
     143          378 :         1 => "client",
     144            0 :         _ => unreachable!(),
     145              :     }
     146          694 : }
     147              : 
     148              : /// Virtual connection between two nodes.
     149              : /// Node 0 is the creator of the connection (client),
     150              : /// and node 1 is the acceptor (server).
     151              : struct VirtualConnection {
     152              :     connection_id: usize,
     153              :     /// one-off chan, used to deliver Accept message to dst
     154              :     dst_accept: Chan<NodeEvent>,
     155              :     /// message sinks
     156              :     dst_sockets: [Chan<NetEvent>; 2],
     157              :     state: Mutex<ConnectionState>,
     158              : }
     159              : 
     160              : struct ConnectionState {
     161              :     buffers: [NetworkBuffer; 2],
     162              :     rng: StdRng,
     163              : }
     164              : 
     165              : impl VirtualConnection {
     166              :     /// Notify the future about the possible timeout.
     167       786970 :     fn schedule_timeout(&self, net: &NetworkTask) {
     168       786970 :         if let Some(timeout) = net.options.keepalive_timeout {
     169       786970 :             net.schedule(self.connection_id, timeout);
     170       786970 :         }
     171       786970 :     }
     172              : 
     173              :     /// Send the handshake (Accept) to the server.
     174       270691 :     fn send_connect(&self, net: &NetworkTask) {
     175       270691 :         let now = executor::now();
     176       270691 :         let mut state = self.state.lock();
     177       270691 :         let delay = net.options.connect_delay.delay(&mut state.rng);
     178       270691 :         let buffer = &mut state.buffers[0];
     179       270691 :         assert!(buffer.buf.is_empty());
     180       270691 :         assert!(!buffer.recv_closed);
     181       270691 :         assert!(!buffer.send_closed);
     182       270691 :         assert!(buffer.last_recv.is_none());
     183              : 
     184       270691 :         let delay = if let Some(ms) = delay {
     185       209555 :             ms
     186              :         } else {
     187        61136 :             debug!("NET: TCP #{} dropped connect", self.connection_id);
     188        61136 :             buffer.send_closed = true;
     189        61136 :             return;
     190              :         };
     191              : 
     192              :         // Send a message into the future.
     193       209555 :         buffer
     194       209555 :             .buf
     195       209555 :             .push_back((now + delay, AnyMessage::InternalConnect));
     196       209555 :         net.schedule(self.connection_id, delay);
     197       270691 :     }
     198              : 
     199              :     /// Transmit some of the messages from the buffer to the nodes.
     200      1279340 :     fn process(&self, net: &Arc<NetworkTask>) {
     201      1279340 :         let now = executor::now();
     202      1279340 : 
     203      1279340 :         let mut state = self.state.lock();
     204              : 
     205      3838020 :         for direction in 0..2 {
     206      2558680 :             self.process_direction(
     207      2558680 :                 net,
     208      2558680 :                 state.deref_mut(),
     209      2558680 :                 now,
     210      2558680 :                 direction as MessageDirection,
     211      2558680 :                 &self.dst_sockets[direction ^ 1],
     212      2558680 :             );
     213      2558680 :         }
     214              : 
     215              :         // Close the one side of the connection by timeout if the node
     216              :         // has not received any messages for a long time.
     217      1279340 :         if let Some(timeout) = net.options.keepalive_timeout {
     218      1279340 :             let mut to_close = [false, false];
     219      3838020 :             for direction in 0..2 {
     220      2558680 :                 let buffer = &mut state.buffers[direction];
     221      2558680 :                 if buffer.recv_closed {
     222       540624 :                     continue;
     223      2018056 :                 }
     224      2018056 :                 if let Some(last_recv) = buffer.last_recv {
     225      1819315 :                     if now - last_recv >= timeout {
     226       403024 :                         debug!(
     227          694 :                             "NET: connection {} timed out at {}",
     228          694 :                             self.connection_id,
     229          694 :                             receiver_str(direction as MessageDirection)
     230          694 :                         );
     231       403024 :                         let node_idx = direction ^ 1;
     232       403024 :                         to_close[node_idx] = true;
     233      1416291 :                     }
     234       198741 :                 }
     235              :             }
     236      1279340 :             drop(state);
     237              : 
     238      2558680 :             for (node_idx, should_close) in to_close.iter().enumerate() {
     239      2558680 :                 if *should_close {
     240       403024 :                     self.close(node_idx);
     241      2155656 :                 }
     242              :             }
     243            0 :         }
     244      1279340 :     }
     245              : 
     246              :     /// Process messages in the buffer in the given direction.
     247      2558680 :     fn process_direction(
     248      2558680 :         &self,
     249      2558680 :         net: &Arc<NetworkTask>,
     250      2558680 :         state: &mut ConnectionState,
     251      2558680 :         now: u64,
     252      2558680 :         direction: MessageDirection,
     253      2558680 :         to_socket: &Chan<NetEvent>,
     254      2558680 :     ) {
     255      2558680 :         let buffer = &mut state.buffers[direction as usize];
     256      2558680 :         if buffer.recv_closed {
     257       540624 :             assert!(buffer.buf.is_empty());
     258      2018056 :         }
     259              : 
     260      3074959 :         while !buffer.buf.is_empty() && buffer.buf.front().unwrap().0 <= now {
     261       516279 :             let msg = buffer.buf.pop_front().unwrap().1;
     262       516279 : 
     263       516279 :             buffer.last_recv = Some(now);
     264       516279 :             self.schedule_timeout(net);
     265       516279 : 
     266       516279 :             if let AnyMessage::InternalConnect = msg {
     267       198567 :                 // TODO: assert to_socket is the server
     268       198567 :                 let server_to_client = TCP {
     269       198567 :                     net: net.clone(),
     270       198567 :                     conn_id: self.connection_id,
     271       198567 :                     dir: direction ^ 1,
     272       198567 :                     recv_chan: to_socket.clone(),
     273       198567 :                 };
     274       198567 :                 // special case, we need to deliver new connection to a separate channel
     275       198567 :                 self.dst_accept.send(NodeEvent::Accept(server_to_client));
     276       317712 :             } else {
     277       317712 :                 to_socket.send(NetEvent::Message(msg));
     278       317712 :             }
     279              :         }
     280      2558680 :     }
     281              : 
     282              :     /// Try to send a message to the buffer, optionally dropping it and
     283              :     /// determining delivery timestamp.
     284       510147 :     fn send(&self, net: &NetworkTask, direction: MessageDirection, msg: AnyMessage) {
     285       510147 :         let now = executor::now();
     286       510147 :         let mut state = self.state.lock();
     287              : 
     288       510147 :         let (delay, close) = if let Some(ms) = net.options.send_delay.delay(&mut state.rng) {
     289       466684 :             (ms, false)
     290              :         } else {
     291        43463 :             (0, true)
     292              :         };
     293              : 
     294       510147 :         let buffer = &mut state.buffers[direction as usize];
     295       510147 :         if buffer.send_closed {
     296        56653 :             debug!(
     297           74 :                 "NET: TCP #{} dropped message {:?} (broken pipe)",
     298           74 :                 self.connection_id, msg
     299           74 :             );
     300        56653 :             return;
     301       453494 :         }
     302       453494 : 
     303       453494 :         if close {
     304        32611 :             debug!(
     305           14 :                 "NET: TCP #{} dropped message {:?} (pipe just broke)",
     306           14 :                 self.connection_id, msg
     307           14 :             );
     308        32611 :             buffer.send_closed = true;
     309        32611 :             return;
     310       420883 :         }
     311       420883 : 
     312       420883 :         if buffer.recv_closed {
     313        52195 :             debug!(
     314            0 :                 "NET: TCP #{} dropped message {:?} (recv closed)",
     315            0 :                 self.connection_id, msg
     316            0 :             );
     317        52195 :             return;
     318       368688 :         }
     319       368688 : 
     320       368688 :         // Send a message into the future.
     321       368688 :         buffer.buf.push_back((now + delay, msg));
     322       368688 :         net.schedule(self.connection_id, delay);
     323       510147 :     }
     324              : 
     325              :     /// Close the connection. Only one side of the connection will be closed,
     326              :     /// and no further messages will be delivered. The other side will not be notified.
     327       433899 :     fn close(&self, node_idx: usize) {
     328       433899 :         let mut state = self.state.lock();
     329       433899 :         let recv_buffer = &mut state.buffers[1 ^ node_idx];
     330       433899 :         if recv_buffer.recv_closed {
     331         2167 :             debug!(
     332           10 :                 "NET: TCP #{} closed twice at {}",
     333           10 :                 self.connection_id,
     334           10 :                 sender_str(node_idx as MessageDirection),
     335           10 :             );
     336         2167 :             return;
     337       431732 :         }
     338       431732 : 
     339       431732 :         debug!(
     340          758 :             "NET: TCP #{} closed at {}",
     341          758 :             self.connection_id,
     342          758 :             sender_str(node_idx as MessageDirection),
     343          758 :         );
     344       431732 :         recv_buffer.recv_closed = true;
     345       431732 :         for msg in recv_buffer.buf.drain(..) {
     346        38643 :             debug!(
     347            0 :                 "NET: TCP #{} dropped message {:?} (closed)",
     348            0 :                 self.connection_id, msg
     349            0 :             );
     350              :         }
     351              : 
     352       431732 :         let send_buffer = &mut state.buffers[node_idx];
     353       431732 :         send_buffer.send_closed = true;
     354       431732 :         drop(state);
     355       431732 : 
     356       431732 :         // TODO: notify the other side?
     357       431732 : 
     358       431732 :         self.dst_sockets[node_idx].send(NetEvent::Closed);
     359       433899 :     }
     360              : }
     361              : 
     362              : struct NetworkBuffer {
     363              :     /// Messages paired with time of delivery
     364              :     buf: VecDeque<(u64, AnyMessage)>,
     365              :     /// True if the connection is closed on the receiving side,
     366              :     /// i.e. no more messages from the buffer will be delivered.
     367              :     recv_closed: bool,
     368              :     /// True if the connection is closed on the sending side,
     369              :     /// i.e. no more messages will be added to the buffer.
     370              :     send_closed: bool,
     371              :     /// Last time a message was delivered from the buffer.
     372              :     /// If None, it means that the server is the receiver and
     373              :     /// it has not yet aware of this connection (i.e. has not
     374              :     /// received the Accept).
     375              :     last_recv: Option<u64>,
     376              : }
     377              : 
     378              : impl NetworkBuffer {
     379       541382 :     fn new(last_recv: Option<u64>) -> Self {
     380       541382 :         Self {
     381       541382 :             buf: VecDeque::new(),
     382       541382 :             recv_closed: false,
     383       541382 :             send_closed: false,
     384       541382 :             last_recv,
     385       541382 :         }
     386       541382 :     }
     387              : }
     388              : 
     389              : /// Single end of a bidirectional network stream without reordering (TCP-like).
     390              : /// Reads are implemented using channels, writes go to the buffer inside VirtualConnection.
     391              : pub struct TCP {
     392              :     net: Arc<NetworkTask>,
     393              :     conn_id: usize,
     394              :     dir: MessageDirection,
     395              :     recv_chan: Chan<NetEvent>,
     396              : }
     397              : 
     398              : impl Debug for TCP {
     399         1802 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     400         1802 :         write!(f, "TCP #{} ({})", self.conn_id, sender_str(self.dir),)
     401         1802 :     }
     402              : }
     403              : 
     404              : impl TCP {
     405              :     /// Send a message to the other side. It's guaranteed that it will not arrive
     406              :     /// before the arrival of all messages sent earlier.
     407       510147 :     pub fn send(&self, msg: AnyMessage) {
     408       510147 :         let conn = self.net.get(self.conn_id);
     409       510147 :         conn.send(&self.net, self.dir, msg);
     410       510147 :     }
     411              : 
     412              :     /// Get a channel to receive incoming messages.
     413      3360700 :     pub fn recv_chan(&self) -> Chan<NetEvent> {
     414      3360700 :         self.recv_chan.clone()
     415      3360700 :     }
     416              : 
     417      2298590 :     pub fn connection_id(&self) -> usize {
     418      2298590 :         self.conn_id
     419      2298590 :     }
     420              : 
     421        30875 :     pub fn close(&self) {
     422        30875 :         let conn = self.net.get(self.conn_id);
     423        30875 :         conn.close(self.dir as usize);
     424        30875 :     }
     425              : }
     426              : struct Event {
     427              :     time: u64,
     428              :     conn_id: usize,
     429              : }
     430              : 
     431              : // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
     432              : // to get that.
     433              : impl PartialOrd for Event {
     434      7824321 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     435      7824321 :         Some(self.cmp(other))
     436      7824321 :     }
     437              : }
     438              : 
     439              : impl Ord for Event {
     440      7824321 :     fn cmp(&self, other: &Self) -> Ordering {
     441      7824321 :         (other.time, other.conn_id).cmp(&(self.time, self.conn_id))
     442      7824321 :     }
     443              : }
     444              : 
     445              : impl PartialEq for Event {
     446            0 :     fn eq(&self, other: &Self) -> bool {
     447            0 :         (other.time, other.conn_id) == (self.time, self.conn_id)
     448            0 :     }
     449              : }
     450              : 
     451              : impl Eq for Event {}
        

Generated by: LCOV version 2.1-beta