LCOV - code coverage report
Current view: top level - libs/desim/src - network.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 92.9 % 269 250
Test Date: 2025-03-12 00:01:28 Functions: 95.8 % 24 23

            Line data    Source code
       1              : use std::cmp::Ordering;
       2              : use std::collections::{BinaryHeap, VecDeque};
       3              : use std::fmt::{self, Debug};
       4              : use std::ops::DerefMut;
       5              : use std::sync::{Arc, mpsc};
       6              : 
       7              : use parking_lot::lock_api::{MappedMutexGuard, MutexGuard};
       8              : use parking_lot::{Mutex, RawMutex};
       9              : use rand::rngs::StdRng;
      10              : use tracing::debug;
      11              : 
      12              : use super::chan::Chan;
      13              : use super::proto::AnyMessage;
      14              : use crate::executor::{self, ThreadContext};
      15              : use crate::options::NetworkOptions;
      16              : use crate::proto::{NetEvent, NodeEvent};
      17              : 
      18              : pub struct NetworkTask {
      19              :     options: Arc<NetworkOptions>,
      20              :     connections: Mutex<Vec<VirtualConnection>>,
      21              :     /// min-heap of connections having something to deliver.
      22              :     events: Mutex<BinaryHeap<Event>>,
      23              :     task_context: Arc<ThreadContext>,
      24              : }
      25              : 
      26              : impl NetworkTask {
      27          528 :     pub fn start_new(options: Arc<NetworkOptions>, tx: mpsc::Sender<Arc<NetworkTask>>) {
      28          528 :         let ctx = executor::get_thread_ctx();
      29          528 :         let task = Arc::new(Self {
      30          528 :             options,
      31          528 :             connections: Mutex::new(Vec::new()),
      32          528 :             events: Mutex::new(BinaryHeap::new()),
      33          528 :             task_context: ctx,
      34          528 :         });
      35          528 : 
      36          528 :         // send the task upstream
      37          528 :         tx.send(task.clone()).unwrap();
      38          528 : 
      39          528 :         // start the task
      40          528 :         task.start();
      41          528 :     }
      42              : 
      43        33810 :     pub fn start_new_connection(self: &Arc<Self>, rng: StdRng, dst_accept: Chan<NodeEvent>) -> TCP {
      44        33810 :         let now = executor::now();
      45        33810 :         let connection_id = self.connections.lock().len();
      46        33810 : 
      47        33810 :         let vc = VirtualConnection {
      48        33810 :             connection_id,
      49        33810 :             dst_accept,
      50        33810 :             dst_sockets: [Chan::new(), Chan::new()],
      51        33810 :             state: Mutex::new(ConnectionState {
      52        33810 :                 buffers: [NetworkBuffer::new(None), NetworkBuffer::new(Some(now))],
      53        33810 :                 rng,
      54        33810 :             }),
      55        33810 :         };
      56        33810 :         vc.schedule_timeout(self);
      57        33810 :         vc.send_connect(self);
      58        33810 : 
      59        33810 :         let recv_chan = vc.dst_sockets[0].clone();
      60        33810 :         self.connections.lock().push(vc);
      61        33810 : 
      62        33810 :         TCP {
      63        33810 :             net: self.clone(),
      64        33810 :             conn_id: connection_id,
      65        33810 :             dir: 0,
      66        33810 :             recv_chan,
      67        33810 :         }
      68        33810 :     }
      69              : }
      70              : 
      71              : // private functions
      72              : impl NetworkTask {
      73              :     /// Schedule to wakeup network task (self) `after_ms` later to deliver
      74              :     /// messages of connection `id`.
      75       171966 :     fn schedule(&self, id: usize, after_ms: u64) {
      76       171966 :         self.events.lock().push(Event {
      77       171966 :             time: executor::now() + after_ms,
      78       171966 :             conn_id: id,
      79       171966 :         });
      80       171966 :         self.task_context.schedule_wakeup(after_ms);
      81       171966 :     }
      82              : 
      83              :     /// Get locked connection `id`.
      84       227840 :     fn get(&self, id: usize) -> MappedMutexGuard<'_, RawMutex, VirtualConnection> {
      85       227840 :         MutexGuard::map(self.connections.lock(), |connections| {
      86       227840 :             connections.get_mut(id).unwrap()
      87       227840 :         })
      88       227840 :     }
      89              : 
      90       160305 :     fn collect_pending_events(&self, now: u64, vec: &mut Vec<Event>) {
      91       160305 :         vec.clear();
      92       160305 :         let mut events = self.events.lock();
      93       320083 :         while let Some(event) = events.peek() {
      94       317538 :             if event.time > now {
      95       157760 :                 break;
      96       159778 :             }
      97       159778 :             let event = events.pop().unwrap();
      98       159778 :             vec.push(event);
      99              :         }
     100       160305 :     }
     101              : 
     102          528 :     fn start(self: &Arc<Self>) {
     103          528 :         debug!("started network task");
     104              : 
     105          528 :         let mut events = Vec::new();
     106              :         loop {
     107       160833 :             let now = executor::now();
     108       160833 :             self.collect_pending_events(now, &mut events);
     109              : 
     110       160833 :             for event in events.drain(..) {
     111       159778 :                 let conn = self.get(event.conn_id);
     112       159778 :                 conn.process(self);
     113       159778 :             }
     114              : 
     115              :             // block until wakeup
     116       160305 :             executor::yield_me(-1);
     117              :         }
     118              :     }
     119              : }
     120              : 
     121              : // 0 - from node(0) to node(1)
     122              : // 1 - from node(1) to node(0)
     123              : type MessageDirection = u8;
     124              : 
     125         1311 : fn sender_str(dir: MessageDirection) -> &'static str {
     126         1311 :     match dir {
     127          222 :         0 => "client",
     128         1089 :         1 => "server",
     129            0 :         _ => unreachable!(),
     130              :     }
     131         1311 : }
     132              : 
     133          355 : fn receiver_str(dir: MessageDirection) -> &'static str {
     134          355 :     match dir {
     135          163 :         0 => "server",
     136          192 :         1 => "client",
     137            0 :         _ => unreachable!(),
     138              :     }
     139          355 : }
     140              : 
     141              : /// Virtual connection between two nodes.
     142              : /// Node 0 is the creator of the connection (client),
     143              : /// and node 1 is the acceptor (server).
     144              : struct VirtualConnection {
     145              :     connection_id: usize,
     146              :     /// one-off chan, used to deliver Accept message to dst
     147              :     dst_accept: Chan<NodeEvent>,
     148              :     /// message sinks
     149              :     dst_sockets: [Chan<NetEvent>; 2],
     150              :     state: Mutex<ConnectionState>,
     151              : }
     152              : 
     153              : struct ConnectionState {
     154              :     buffers: [NetworkBuffer; 2],
     155              :     rng: StdRng,
     156              : }
     157              : 
     158              : impl VirtualConnection {
     159              :     /// Notify the future about the possible timeout.
     160        98423 :     fn schedule_timeout(&self, net: &NetworkTask) {
     161        98423 :         if let Some(timeout) = net.options.keepalive_timeout {
     162        98423 :             net.schedule(self.connection_id, timeout);
     163        98423 :         }
     164        98423 :     }
     165              : 
     166              :     /// Send the handshake (Accept) to the server.
     167        33810 :     fn send_connect(&self, net: &NetworkTask) {
     168        33810 :         let now = executor::now();
     169        33810 :         let mut state = self.state.lock();
     170        33810 :         let delay = net.options.connect_delay.delay(&mut state.rng);
     171        33810 :         let buffer = &mut state.buffers[0];
     172        33810 :         assert!(buffer.buf.is_empty());
     173        33810 :         assert!(!buffer.recv_closed);
     174        33810 :         assert!(!buffer.send_closed);
     175        33810 :         assert!(buffer.last_recv.is_none());
     176              : 
     177        33810 :         let delay = if let Some(ms) = delay {
     178        26492 :             ms
     179              :         } else {
     180         7318 :             debug!("NET: TCP #{} dropped connect", self.connection_id);
     181         7318 :             buffer.send_closed = true;
     182         7318 :             return;
     183              :         };
     184              : 
     185              :         // Send a message into the future.
     186        26492 :         buffer
     187        26492 :             .buf
     188        26492 :             .push_back((now + delay, AnyMessage::InternalConnect));
     189        26492 :         net.schedule(self.connection_id, delay);
     190        33810 :     }
     191              : 
     192              :     /// Transmit some of the messages from the buffer to the nodes.
     193       159778 :     fn process(&self, net: &Arc<NetworkTask>) {
     194       159778 :         let now = executor::now();
     195       159778 : 
     196       159778 :         let mut state = self.state.lock();
     197              : 
     198       479334 :         for direction in 0..2 {
     199       319556 :             self.process_direction(
     200       319556 :                 net,
     201       319556 :                 state.deref_mut(),
     202       319556 :                 now,
     203       319556 :                 direction as MessageDirection,
     204       319556 :                 &self.dst_sockets[direction ^ 1],
     205       319556 :             );
     206       319556 :         }
     207              : 
     208              :         // Close the one side of the connection by timeout if the node
     209              :         // has not received any messages for a long time.
     210       159778 :         if let Some(timeout) = net.options.keepalive_timeout {
     211       159778 :             let mut to_close = [false, false];
     212       479334 :             for direction in 0..2 {
     213       319556 :                 let buffer = &mut state.buffers[direction];
     214       319556 :                 if buffer.recv_closed {
     215        68657 :                     continue;
     216       250899 :                 }
     217       250899 :                 if let Some(last_recv) = buffer.last_recv {
     218       226947 :                     if now - last_recv >= timeout {
     219        50711 :                         debug!(
     220            0 :                             "NET: connection {} timed out at {}",
     221            0 :                             self.connection_id,
     222            0 :                             receiver_str(direction as MessageDirection)
     223              :                         );
     224        50711 :                         let node_idx = direction ^ 1;
     225        50711 :                         to_close[node_idx] = true;
     226       176236 :                     }
     227        23952 :                 }
     228              :             }
     229       159778 :             drop(state);
     230              : 
     231       319556 :             for (node_idx, should_close) in to_close.iter().enumerate() {
     232       319556 :                 if *should_close {
     233        50711 :                     self.close(node_idx);
     234       268845 :                 }
     235              :             }
     236            0 :         }
     237       159778 :     }
     238              : 
     239              :     /// Process messages in the buffer in the given direction.
     240       319556 :     fn process_direction(
     241       319556 :         &self,
     242       319556 :         net: &Arc<NetworkTask>,
     243       319556 :         state: &mut ConnectionState,
     244       319556 :         now: u64,
     245       319556 :         direction: MessageDirection,
     246       319556 :         to_socket: &Chan<NetEvent>,
     247       319556 :     ) {
     248       319556 :         let buffer = &mut state.buffers[direction as usize];
     249       319556 :         if buffer.recv_closed {
     250        68657 :             assert!(buffer.buf.is_empty());
     251       250899 :         }
     252              : 
     253       384169 :         while !buffer.buf.is_empty() && buffer.buf.front().unwrap().0 <= now {
     254        64613 :             let msg = buffer.buf.pop_front().unwrap().1;
     255        64613 : 
     256        64613 :             buffer.last_recv = Some(now);
     257        64613 :             self.schedule_timeout(net);
     258        64613 : 
     259        64613 :             if let AnyMessage::InternalConnect = msg {
     260        25086 :                 // TODO: assert to_socket is the server
     261        25086 :                 let server_to_client = TCP {
     262        25086 :                     net: net.clone(),
     263        25086 :                     conn_id: self.connection_id,
     264        25086 :                     dir: direction ^ 1,
     265        25086 :                     recv_chan: to_socket.clone(),
     266        25086 :                 };
     267        25086 :                 // special case, we need to deliver new connection to a separate channel
     268        25086 :                 self.dst_accept.send(NodeEvent::Accept(server_to_client));
     269        39527 :             } else {
     270        39527 :                 to_socket.send(NetEvent::Message(msg));
     271        39527 :             }
     272              :         }
     273       319556 :     }
     274              : 
     275              :     /// Try to send a message to the buffer, optionally dropping it and
     276              :     /// determining delivery timestamp.
     277        64378 :     fn send(&self, net: &NetworkTask, direction: MessageDirection, msg: AnyMessage) {
     278        64378 :         let now = executor::now();
     279        64378 :         let mut state = self.state.lock();
     280              : 
     281        64378 :         let (delay, close) = if let Some(ms) = net.options.send_delay.delay(&mut state.rng) {
     282        58708 :             (ms, false)
     283              :         } else {
     284         5670 :             (0, true)
     285              :         };
     286              : 
     287        64378 :         let buffer = &mut state.buffers[direction as usize];
     288        64378 :         if buffer.send_closed {
     289         6895 :             debug!(
     290            0 :                 "NET: TCP #{} dropped message {:?} (broken pipe)",
     291              :                 self.connection_id, msg
     292              :             );
     293         6895 :             return;
     294        57483 :         }
     295        57483 : 
     296        57483 :         if close {
     297         4294 :             debug!(
     298            0 :                 "NET: TCP #{} dropped message {:?} (pipe just broke)",
     299              :                 self.connection_id, msg
     300              :             );
     301         4294 :             buffer.send_closed = true;
     302         4294 :             return;
     303        53189 :         }
     304        53189 : 
     305        53189 :         if buffer.recv_closed {
     306         6138 :             debug!(
     307            0 :                 "NET: TCP #{} dropped message {:?} (recv closed)",
     308              :                 self.connection_id, msg
     309              :             );
     310         6138 :             return;
     311        47051 :         }
     312        47051 : 
     313        47051 :         // Send a message into the future.
     314        47051 :         buffer.buf.push_back((now + delay, msg));
     315        47051 :         net.schedule(self.connection_id, delay);
     316        64378 :     }
     317              : 
     318              :     /// Close the connection. Only one side of the connection will be closed,
     319              :     /// and no further messages will be delivered. The other side will not be notified.
     320        54395 :     fn close(&self, node_idx: usize) {
     321        54395 :         let mut state = self.state.lock();
     322        54395 :         let recv_buffer = &mut state.buffers[1 ^ node_idx];
     323        54395 :         if recv_buffer.recv_closed {
     324          254 :             debug!(
     325            0 :                 "NET: TCP #{} closed twice at {}",
     326            0 :                 self.connection_id,
     327            0 :                 sender_str(node_idx as MessageDirection),
     328              :             );
     329          254 :             return;
     330        54141 :         }
     331        54141 : 
     332        54141 :         debug!(
     333            0 :             "NET: TCP #{} closed at {}",
     334            0 :             self.connection_id,
     335            0 :             sender_str(node_idx as MessageDirection),
     336              :         );
     337        54141 :         recv_buffer.recv_closed = true;
     338        54141 :         for msg in recv_buffer.buf.drain(..) {
     339         5900 :             debug!(
     340            0 :                 "NET: TCP #{} dropped message {:?} (closed)",
     341              :                 self.connection_id, msg
     342              :             );
     343              :         }
     344              : 
     345        54141 :         let send_buffer = &mut state.buffers[node_idx];
     346        54141 :         send_buffer.send_closed = true;
     347        54141 :         drop(state);
     348        54141 : 
     349        54141 :         // TODO: notify the other side?
     350        54141 : 
     351        54141 :         self.dst_sockets[node_idx].send(NetEvent::Closed);
     352        54395 :     }
     353              : }
     354              : 
     355              : struct NetworkBuffer {
     356              :     /// Messages paired with time of delivery
     357              :     buf: VecDeque<(u64, AnyMessage)>,
     358              :     /// True if the connection is closed on the receiving side,
     359              :     /// i.e. no more messages from the buffer will be delivered.
     360              :     recv_closed: bool,
     361              :     /// True if the connection is closed on the sending side,
     362              :     /// i.e. no more messages will be added to the buffer.
     363              :     send_closed: bool,
     364              :     /// Last time a message was delivered from the buffer.
     365              :     /// If None, it means that the server is the receiver and
     366              :     /// it has not yet aware of this connection (i.e. has not
     367              :     /// received the Accept).
     368              :     last_recv: Option<u64>,
     369              : }
     370              : 
     371              : impl NetworkBuffer {
     372        67620 :     fn new(last_recv: Option<u64>) -> Self {
     373        67620 :         Self {
     374        67620 :             buf: VecDeque::new(),
     375        67620 :             recv_closed: false,
     376        67620 :             send_closed: false,
     377        67620 :             last_recv,
     378        67620 :         }
     379        67620 :     }
     380              : }
     381              : 
     382              : /// Single end of a bidirectional network stream without reordering (TCP-like).
     383              : /// Reads are implemented using channels, writes go to the buffer inside VirtualConnection.
     384              : pub struct TCP {
     385              :     net: Arc<NetworkTask>,
     386              :     conn_id: usize,
     387              :     dir: MessageDirection,
     388              :     recv_chan: Chan<NetEvent>,
     389              : }
     390              : 
     391              : impl Debug for TCP {
     392          911 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     393          911 :         write!(f, "TCP #{} ({})", self.conn_id, sender_str(self.dir),)
     394          911 :     }
     395              : }
     396              : 
     397              : impl TCP {
     398              :     /// Send a message to the other side. It's guaranteed that it will not arrive
     399              :     /// before the arrival of all messages sent earlier.
     400        64378 :     pub fn send(&self, msg: AnyMessage) {
     401        64378 :         let conn = self.net.get(self.conn_id);
     402        64378 :         conn.send(&self.net, self.dir, msg);
     403        64378 :     }
     404              : 
     405              :     /// Get a channel to receive incoming messages.
     406       412872 :     pub fn recv_chan(&self) -> Chan<NetEvent> {
     407       412872 :         self.recv_chan.clone()
     408       412872 :     }
     409              : 
     410       279408 :     pub fn connection_id(&self) -> usize {
     411       279408 :         self.conn_id
     412       279408 :     }
     413              : 
     414         3684 :     pub fn close(&self) {
     415         3684 :         let conn = self.net.get(self.conn_id);
     416         3684 :         conn.close(self.dir as usize);
     417         3684 :     }
     418              : }
     419              : struct Event {
     420              :     time: u64,
     421              :     conn_id: usize,
     422              : }
     423              : 
     424              : // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
     425              : // to get that.
     426              : impl PartialOrd for Event {
     427       988923 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     428       988923 :         Some(self.cmp(other))
     429       988923 :     }
     430              : }
     431              : 
     432              : impl Ord for Event {
     433       988923 :     fn cmp(&self, other: &Self) -> Ordering {
     434       988923 :         (other.time, other.conn_id).cmp(&(self.time, self.conn_id))
     435       988923 :     }
     436              : }
     437              : 
     438              : impl PartialEq for Event {
     439            0 :     fn eq(&self, other: &Self) -> bool {
     440            0 :         (other.time, other.conn_id) == (self.time, self.conn_id)
     441            0 :     }
     442              : }
     443              : 
     444              : impl Eq for Event {}
        

Generated by: LCOV version 2.1-beta