LCOV - code coverage report
Current view: top level - libs/desim/src - network.rs (source / functions) Coverage Total Hit
Test: ce3d23e223f9597001df6ba9f83269b728828ed3.info Lines: 92.9 % 269 250
Test Date: 2025-03-14 20:25:30 Functions: 95.8 % 24 23

            Line data    Source code
       1              : use std::cmp::Ordering;
       2              : use std::collections::{BinaryHeap, VecDeque};
       3              : use std::fmt::{self, Debug};
       4              : use std::ops::DerefMut;
       5              : use std::sync::{Arc, mpsc};
       6              : 
       7              : use parking_lot::lock_api::{MappedMutexGuard, MutexGuard};
       8              : use parking_lot::{Mutex, RawMutex};
       9              : use rand::rngs::StdRng;
      10              : use tracing::debug;
      11              : 
      12              : use super::chan::Chan;
      13              : use super::proto::AnyMessage;
      14              : use crate::executor::{self, ThreadContext};
      15              : use crate::options::NetworkOptions;
      16              : use crate::proto::{NetEvent, NodeEvent};
      17              : 
      18              : pub struct NetworkTask {
      19              :     options: Arc<NetworkOptions>,
      20              :     connections: Mutex<Vec<VirtualConnection>>,
      21              :     /// min-heap of connections having something to deliver.
      22              :     events: Mutex<BinaryHeap<Event>>,
      23              :     task_context: Arc<ThreadContext>,
      24              : }
      25              : 
      26              : impl NetworkTask {
      27          528 :     pub fn start_new(options: Arc<NetworkOptions>, tx: mpsc::Sender<Arc<NetworkTask>>) {
      28          528 :         let ctx = executor::get_thread_ctx();
      29          528 :         let task = Arc::new(Self {
      30          528 :             options,
      31          528 :             connections: Mutex::new(Vec::new()),
      32          528 :             events: Mutex::new(BinaryHeap::new()),
      33          528 :             task_context: ctx,
      34          528 :         });
      35          528 : 
      36          528 :         // send the task upstream
      37          528 :         tx.send(task.clone()).unwrap();
      38          528 : 
      39          528 :         // start the task
      40          528 :         task.start();
      41          528 :     }
      42              : 
      43        34315 :     pub fn start_new_connection(self: &Arc<Self>, rng: StdRng, dst_accept: Chan<NodeEvent>) -> TCP {
      44        34315 :         let now = executor::now();
      45        34315 :         let connection_id = self.connections.lock().len();
      46        34315 : 
      47        34315 :         let vc = VirtualConnection {
      48        34315 :             connection_id,
      49        34315 :             dst_accept,
      50        34315 :             dst_sockets: [Chan::new(), Chan::new()],
      51        34315 :             state: Mutex::new(ConnectionState {
      52        34315 :                 buffers: [NetworkBuffer::new(None), NetworkBuffer::new(Some(now))],
      53        34315 :                 rng,
      54        34315 :             }),
      55        34315 :         };
      56        34315 :         vc.schedule_timeout(self);
      57        34315 :         vc.send_connect(self);
      58        34315 : 
      59        34315 :         let recv_chan = vc.dst_sockets[0].clone();
      60        34315 :         self.connections.lock().push(vc);
      61        34315 : 
      62        34315 :         TCP {
      63        34315 :             net: self.clone(),
      64        34315 :             conn_id: connection_id,
      65        34315 :             dir: 0,
      66        34315 :             recv_chan,
      67        34315 :         }
      68        34315 :     }
      69              : }
      70              : 
      71              : // private functions
      72              : impl NetworkTask {
      73              :     /// Schedule to wakeup network task (self) `after_ms` later to deliver
      74              :     /// messages of connection `id`.
      75       176914 :     fn schedule(&self, id: usize, after_ms: u64) {
      76       176914 :         self.events.lock().push(Event {
      77       176914 :             time: executor::now() + after_ms,
      78       176914 :             conn_id: id,
      79       176914 :         });
      80       176914 :         self.task_context.schedule_wakeup(after_ms);
      81       176914 :     }
      82              : 
      83              :     /// Get locked connection `id`.
      84       234646 :     fn get(&self, id: usize) -> MappedMutexGuard<'_, RawMutex, VirtualConnection> {
      85       234646 :         MutexGuard::map(self.connections.lock(), |connections| {
      86       234646 :             connections.get_mut(id).unwrap()
      87       234646 :         })
      88       234646 :     }
      89              : 
      90       165474 :     fn collect_pending_events(&self, now: u64, vec: &mut Vec<Event>) {
      91       165474 :         vec.clear();
      92       165474 :         let mut events = self.events.lock();
      93       330421 :         while let Some(event) = events.peek() {
      94       328171 :             if event.time > now {
      95       163224 :                 break;
      96       164947 :             }
      97       164947 :             let event = events.pop().unwrap();
      98       164947 :             vec.push(event);
      99              :         }
     100       165474 :     }
     101              : 
     102          528 :     fn start(self: &Arc<Self>) {
     103          528 :         debug!("started network task");
     104              : 
     105          528 :         let mut events = Vec::new();
     106              :         loop {
     107       166002 :             let now = executor::now();
     108       166002 :             self.collect_pending_events(now, &mut events);
     109              : 
     110       166002 :             for event in events.drain(..) {
     111       164947 :                 let conn = self.get(event.conn_id);
     112       164947 :                 conn.process(self);
     113       164947 :             }
     114              : 
     115              :             // block until wakeup
     116       165474 :             executor::yield_me(-1);
     117              :         }
     118              :     }
     119              : }
     120              : 
     121              : // 0 - from node(0) to node(1)
     122              : // 1 - from node(1) to node(0)
     123              : type MessageDirection = u8;
     124              : 
     125         1311 : fn sender_str(dir: MessageDirection) -> &'static str {
     126         1311 :     match dir {
     127          222 :         0 => "client",
     128         1089 :         1 => "server",
     129            0 :         _ => unreachable!(),
     130              :     }
     131         1311 : }
     132              : 
     133          355 : fn receiver_str(dir: MessageDirection) -> &'static str {
     134          355 :     match dir {
     135          163 :         0 => "server",
     136          192 :         1 => "client",
     137            0 :         _ => unreachable!(),
     138              :     }
     139          355 : }
     140              : 
     141              : /// Virtual connection between two nodes.
     142              : /// Node 0 is the creator of the connection (client),
     143              : /// and node 1 is the acceptor (server).
     144              : struct VirtualConnection {
     145              :     connection_id: usize,
     146              :     /// one-off chan, used to deliver Accept message to dst
     147              :     dst_accept: Chan<NodeEvent>,
     148              :     /// message sinks
     149              :     dst_sockets: [Chan<NetEvent>; 2],
     150              :     state: Mutex<ConnectionState>,
     151              : }
     152              : 
     153              : struct ConnectionState {
     154              :     buffers: [NetworkBuffer; 2],
     155              :     rng: StdRng,
     156              : }
     157              : 
     158              : impl VirtualConnection {
     159              :     /// Notify the future about the possible timeout.
     160       101649 :     fn schedule_timeout(&self, net: &NetworkTask) {
     161       101649 :         if let Some(timeout) = net.options.keepalive_timeout {
     162       101649 :             net.schedule(self.connection_id, timeout);
     163       101649 :         }
     164       101649 :     }
     165              : 
     166              :     /// Send the handshake (Accept) to the server.
     167        34315 :     fn send_connect(&self, net: &NetworkTask) {
     168        34315 :         let now = executor::now();
     169        34315 :         let mut state = self.state.lock();
     170        34315 :         let delay = net.options.connect_delay.delay(&mut state.rng);
     171        34315 :         let buffer = &mut state.buffers[0];
     172        34315 :         assert!(buffer.buf.is_empty());
     173        34315 :         assert!(!buffer.recv_closed);
     174        34315 :         assert!(!buffer.send_closed);
     175        34315 :         assert!(buffer.last_recv.is_none());
     176              : 
     177        34315 :         let delay = if let Some(ms) = delay {
     178        27213 :             ms
     179              :         } else {
     180         7102 :             debug!("NET: TCP #{} dropped connect", self.connection_id);
     181         7102 :             buffer.send_closed = true;
     182         7102 :             return;
     183              :         };
     184              : 
     185              :         // Send a message into the future.
     186        27213 :         buffer
     187        27213 :             .buf
     188        27213 :             .push_back((now + delay, AnyMessage::InternalConnect));
     189        27213 :         net.schedule(self.connection_id, delay);
     190        34315 :     }
     191              : 
     192              :     /// Transmit some of the messages from the buffer to the nodes.
     193       164947 :     fn process(&self, net: &Arc<NetworkTask>) {
     194       164947 :         let now = executor::now();
     195       164947 : 
     196       164947 :         let mut state = self.state.lock();
     197              : 
     198       494841 :         for direction in 0..2 {
     199       329894 :             self.process_direction(
     200       329894 :                 net,
     201       329894 :                 state.deref_mut(),
     202       329894 :                 now,
     203       329894 :                 direction as MessageDirection,
     204       329894 :                 &self.dst_sockets[direction ^ 1],
     205       329894 :             );
     206       329894 :         }
     207              : 
     208              :         // Close the one side of the connection by timeout if the node
     209              :         // has not received any messages for a long time.
     210       164947 :         if let Some(timeout) = net.options.keepalive_timeout {
     211       164947 :             let mut to_close = [false, false];
     212       494841 :             for direction in 0..2 {
     213       329894 :                 let buffer = &mut state.buffers[direction];
     214       329894 :                 if buffer.recv_closed {
     215        70459 :                     continue;
     216       259435 :                 }
     217       259435 :                 if let Some(last_recv) = buffer.last_recv {
     218       234280 :                     if now - last_recv >= timeout {
     219        51958 :                         debug!(
     220            0 :                             "NET: connection {} timed out at {}",
     221            0 :                             self.connection_id,
     222            0 :                             receiver_str(direction as MessageDirection)
     223              :                         );
     224        51958 :                         let node_idx = direction ^ 1;
     225        51958 :                         to_close[node_idx] = true;
     226       182322 :                     }
     227        25155 :                 }
     228              :             }
     229       164947 :             drop(state);
     230              : 
     231       329894 :             for (node_idx, should_close) in to_close.iter().enumerate() {
     232       329894 :                 if *should_close {
     233        51958 :                     self.close(node_idx);
     234       277936 :                 }
     235              :             }
     236            0 :         }
     237       164947 :     }
     238              : 
     239              :     /// Process messages in the buffer in the given direction.
     240       329894 :     fn process_direction(
     241       329894 :         &self,
     242       329894 :         net: &Arc<NetworkTask>,
     243       329894 :         state: &mut ConnectionState,
     244       329894 :         now: u64,
     245       329894 :         direction: MessageDirection,
     246       329894 :         to_socket: &Chan<NetEvent>,
     247       329894 :     ) {
     248       329894 :         let buffer = &mut state.buffers[direction as usize];
     249       329894 :         if buffer.recv_closed {
     250        70459 :             assert!(buffer.buf.is_empty());
     251       259435 :         }
     252              : 
     253       397228 :         while !buffer.buf.is_empty() && buffer.buf.front().unwrap().0 <= now {
     254        67334 :             let msg = buffer.buf.pop_front().unwrap().1;
     255        67334 : 
     256        67334 :             buffer.last_recv = Some(now);
     257        67334 :             self.schedule_timeout(net);
     258        67334 : 
     259        67334 :             if let AnyMessage::InternalConnect = msg {
     260        25862 :                 // TODO: assert to_socket is the server
     261        25862 :                 let server_to_client = TCP {
     262        25862 :                     net: net.clone(),
     263        25862 :                     conn_id: self.connection_id,
     264        25862 :                     dir: direction ^ 1,
     265        25862 :                     recv_chan: to_socket.clone(),
     266        25862 :                 };
     267        25862 :                 // special case, we need to deliver new connection to a separate channel
     268        25862 :                 self.dst_accept.send(NodeEvent::Accept(server_to_client));
     269        41472 :             } else {
     270        41472 :                 to_socket.send(NetEvent::Message(msg));
     271        41472 :             }
     272              :         }
     273       329894 :     }
     274              : 
     275              :     /// Try to send a message to the buffer, optionally dropping it and
     276              :     /// determining delivery timestamp.
     277        65916 :     fn send(&self, net: &NetworkTask, direction: MessageDirection, msg: AnyMessage) {
     278        65916 :         let now = executor::now();
     279        65916 :         let mut state = self.state.lock();
     280              : 
     281        65916 :         let (delay, close) = if let Some(ms) = net.options.send_delay.delay(&mut state.rng) {
     282        60313 :             (ms, false)
     283              :         } else {
     284         5603 :             (0, true)
     285              :         };
     286              : 
     287        65916 :         let buffer = &mut state.buffers[direction as usize];
     288        65916 :         if buffer.send_closed {
     289         6752 :             debug!(
     290            0 :                 "NET: TCP #{} dropped message {:?} (broken pipe)",
     291              :                 self.connection_id, msg
     292              :             );
     293         6752 :             return;
     294        59164 :         }
     295        59164 : 
     296        59164 :         if close {
     297         4336 :             debug!(
     298            0 :                 "NET: TCP #{} dropped message {:?} (pipe just broke)",
     299              :                 self.connection_id, msg
     300              :             );
     301         4336 :             buffer.send_closed = true;
     302         4336 :             return;
     303        54828 :         }
     304        54828 : 
     305        54828 :         if buffer.recv_closed {
     306         6776 :             debug!(
     307            0 :                 "NET: TCP #{} dropped message {:?} (recv closed)",
     308              :                 self.connection_id, msg
     309              :             );
     310         6776 :             return;
     311        48052 :         }
     312        48052 : 
     313        48052 :         // Send a message into the future.
     314        48052 :         buffer.buf.push_back((now + delay, msg));
     315        48052 :         net.schedule(self.connection_id, delay);
     316        65916 :     }
     317              : 
     318              :     /// Close the connection. Only one side of the connection will be closed,
     319              :     /// and no further messages will be delivered. The other side will not be notified.
     320        55741 :     fn close(&self, node_idx: usize) {
     321        55741 :         let mut state = self.state.lock();
     322        55741 :         let recv_buffer = &mut state.buffers[1 ^ node_idx];
     323        55741 :         if recv_buffer.recv_closed {
     324          256 :             debug!(
     325            0 :                 "NET: TCP #{} closed twice at {}",
     326            0 :                 self.connection_id,
     327            0 :                 sender_str(node_idx as MessageDirection),
     328              :             );
     329          256 :             return;
     330        55485 :         }
     331        55485 : 
     332        55485 :         debug!(
     333            0 :             "NET: TCP #{} closed at {}",
     334            0 :             self.connection_id,
     335            0 :             sender_str(node_idx as MessageDirection),
     336              :         );
     337        55485 :         recv_buffer.recv_closed = true;
     338        55485 :         for msg in recv_buffer.buf.drain(..) {
     339         4998 :             debug!(
     340            0 :                 "NET: TCP #{} dropped message {:?} (closed)",
     341              :                 self.connection_id, msg
     342              :             );
     343              :         }
     344              : 
     345        55485 :         let send_buffer = &mut state.buffers[node_idx];
     346        55485 :         send_buffer.send_closed = true;
     347        55485 :         drop(state);
     348        55485 : 
     349        55485 :         // TODO: notify the other side?
     350        55485 : 
     351        55485 :         self.dst_sockets[node_idx].send(NetEvent::Closed);
     352        55741 :     }
     353              : }
     354              : 
     355              : struct NetworkBuffer {
     356              :     /// Messages paired with time of delivery
     357              :     buf: VecDeque<(u64, AnyMessage)>,
     358              :     /// True if the connection is closed on the receiving side,
     359              :     /// i.e. no more messages from the buffer will be delivered.
     360              :     recv_closed: bool,
     361              :     /// True if the connection is closed on the sending side,
     362              :     /// i.e. no more messages will be added to the buffer.
     363              :     send_closed: bool,
     364              :     /// Last time a message was delivered from the buffer.
     365              :     /// If None, it means that the server is the receiver and
     366              :     /// it has not yet aware of this connection (i.e. has not
     367              :     /// received the Accept).
     368              :     last_recv: Option<u64>,
     369              : }
     370              : 
     371              : impl NetworkBuffer {
     372        68630 :     fn new(last_recv: Option<u64>) -> Self {
     373        68630 :         Self {
     374        68630 :             buf: VecDeque::new(),
     375        68630 :             recv_closed: false,
     376        68630 :             send_closed: false,
     377        68630 :             last_recv,
     378        68630 :         }
     379        68630 :     }
     380              : }
     381              : 
     382              : /// Single end of a bidirectional network stream without reordering (TCP-like).
     383              : /// Reads are implemented using channels, writes go to the buffer inside VirtualConnection.
     384              : pub struct TCP {
     385              :     net: Arc<NetworkTask>,
     386              :     conn_id: usize,
     387              :     dir: MessageDirection,
     388              :     recv_chan: Chan<NetEvent>,
     389              : }
     390              : 
     391              : impl Debug for TCP {
     392          911 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     393          911 :         write!(f, "TCP #{} ({})", self.conn_id, sender_str(self.dir),)
     394          911 :     }
     395              : }
     396              : 
     397              : impl TCP {
     398              :     /// Send a message to the other side. It's guaranteed that it will not arrive
     399              :     /// before the arrival of all messages sent earlier.
     400        65916 :     pub fn send(&self, msg: AnyMessage) {
     401        65916 :         let conn = self.net.get(self.conn_id);
     402        65916 :         conn.send(&self.net, self.dir, msg);
     403        65916 :     }
     404              : 
     405              :     /// Get a channel to receive incoming messages.
     406       423860 :     pub fn recv_chan(&self) -> Chan<NetEvent> {
     407       423860 :         self.recv_chan.clone()
     408       423860 :     }
     409              : 
     410       288072 :     pub fn connection_id(&self) -> usize {
     411       288072 :         self.conn_id
     412       288072 :     }
     413              : 
     414         3783 :     pub fn close(&self) {
     415         3783 :         let conn = self.net.get(self.conn_id);
     416         3783 :         conn.close(self.dir as usize);
     417         3783 :     }
     418              : }
     419              : struct Event {
     420              :     time: u64,
     421              :     conn_id: usize,
     422              : }
     423              : 
     424              : // BinaryHeap is a max-heap, and we want a min-heap. Reverse the ordering here
     425              : // to get that.
     426              : impl PartialOrd for Event {
     427      1029611 :     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
     428      1029611 :         Some(self.cmp(other))
     429      1029611 :     }
     430              : }
     431              : 
     432              : impl Ord for Event {
     433      1029611 :     fn cmp(&self, other: &Self) -> Ordering {
     434      1029611 :         (other.time, other.conn_id).cmp(&(self.time, self.conn_id))
     435      1029611 :     }
     436              : }
     437              : 
     438              : impl PartialEq for Event {
     439            0 :     fn eq(&self, other: &Self) -> bool {
     440            0 :         (other.time, other.conn_id) == (self.time, self.conn_id)
     441            0 :     }
     442              : }
     443              : 
     444              : impl Eq for Event {}
        

Generated by: LCOV version 2.1-beta