LCOV - differential code coverage report
Current view: top level - safekeeper/src - receive_wal.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 92.9 % 241 224 17 224
Current Date: 2024-01-09 02:06:09 Functions: 60.3 % 73 44 29 44
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! Safekeeper communication endpoint to WAL proposer (compute node).
       2                 : //! Gets messages from the network, passes them down to consensus module and
       3                 : //! sends replies back.
       4                 : 
       5                 : use crate::handler::SafekeeperPostgresHandler;
       6                 : use crate::safekeeper::AcceptorProposerMessage;
       7                 : use crate::safekeeper::ProposerAcceptorMessage;
       8                 : use crate::safekeeper::ServerInfo;
       9                 : use crate::timeline::Timeline;
      10                 : use crate::wal_service::ConnectionId;
      11                 : use crate::GlobalTimelines;
      12                 : use anyhow::{anyhow, Context};
      13                 : use bytes::BytesMut;
      14                 : use parking_lot::MappedMutexGuard;
      15                 : use parking_lot::Mutex;
      16                 : use parking_lot::MutexGuard;
      17                 : use postgres_backend::CopyStreamHandlerEnd;
      18                 : use postgres_backend::PostgresBackend;
      19                 : use postgres_backend::PostgresBackendReader;
      20                 : use postgres_backend::QueryError;
      21                 : use pq_proto::BeMessage;
      22                 : use serde::Deserialize;
      23                 : use serde::Serialize;
      24                 : use std::net::SocketAddr;
      25                 : use std::sync::Arc;
      26                 : use tokio::io::AsyncRead;
      27                 : use tokio::io::AsyncWrite;
      28                 : use tokio::sync::mpsc::channel;
      29                 : use tokio::sync::mpsc::error::TryRecvError;
      30                 : use tokio::sync::mpsc::Receiver;
      31                 : use tokio::sync::mpsc::Sender;
      32                 : use tokio::task;
      33                 : use tokio::task::JoinHandle;
      34                 : use tokio::time::Duration;
      35                 : use tokio::time::Instant;
      36                 : use tracing::*;
      37                 : use utils::id::TenantTimelineId;
      38                 : use utils::lsn::Lsn;
      39                 : 
      40                 : /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
      41                 : /// in Arc).
      42                 : pub struct WalReceivers {
      43                 :     mutex: Mutex<WalReceiversShared>,
      44                 : }
      45                 : 
      46                 : /// Id under which walreceiver is registered in shmem.
      47                 : type WalReceiverId = usize;
      48                 : 
      49                 : impl WalReceivers {
      50 CBC         582 :     pub fn new() -> Arc<WalReceivers> {
      51             582 :         Arc::new(WalReceivers {
      52             582 :             mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
      53             582 :         })
      54             582 :     }
      55                 : 
      56                 :     /// Register new walreceiver. Returned guard provides access to the slot and
      57                 :     /// automatically deregisters in Drop.
      58            1746 :     pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
      59            1746 :         let slots = &mut self.mutex.lock().slots;
      60            1746 :         let walreceiver = WalReceiverState {
      61            1746 :             conn_id,
      62            1746 :             status: WalReceiverStatus::Voting,
      63            1746 :         };
      64                 :         // find empty slot or create new one
      65            1746 :         let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
      66            1220 :             slots[pos] = Some(walreceiver);
      67            1220 :             pos
      68                 :         } else {
      69             526 :             let pos = slots.len();
      70             526 :             slots.push(Some(walreceiver));
      71             526 :             pos
      72                 :         };
      73            1746 :         WalReceiverGuard {
      74            1746 :             id: pos,
      75            1746 :             walreceivers: self.clone(),
      76            1746 :         }
      77            1746 :     }
      78                 : 
      79                 :     /// Get reference to locked slot contents. Slot must exist (registered
      80                 :     /// earlier).
      81             830 :     fn get_slot<'a>(
      82             830 :         self: &'a Arc<WalReceivers>,
      83             830 :         id: WalReceiverId,
      84             830 :     ) -> MappedMutexGuard<'a, WalReceiverState> {
      85             830 :         MutexGuard::map(self.mutex.lock(), |locked| {
      86             830 :             locked.slots[id]
      87             830 :                 .as_mut()
      88             830 :                 .expect("walreceiver doesn't exist")
      89             830 :         })
      90             830 :     }
      91                 : 
      92                 :     /// Get number of walreceivers (compute connections).
      93           19573 :     pub fn get_num(self: &Arc<WalReceivers>) -> usize {
      94           19573 :         self.mutex.lock().slots.iter().flatten().count()
      95           19573 :     }
      96                 : 
      97                 :     /// Get state of all walreceivers.
      98             247 :     pub fn get_all(self: &Arc<WalReceivers>) -> Vec<WalReceiverState> {
      99             247 :         self.mutex.lock().slots.iter().flatten().cloned().collect()
     100             247 :     }
     101                 : 
     102                 :     /// Get number of streaming walreceivers (normally 0 or 1) from compute.
     103               3 :     pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
     104               3 :         self.mutex
     105               3 :             .lock()
     106               3 :             .slots
     107               3 :             .iter()
     108               3 :             .flatten()
     109               3 :             // conn_id.is_none skips recovery which also registers here
     110               3 :             .filter(|s| s.conn_id.is_some() && matches!(s.status, WalReceiverStatus::Streaming))
     111               3 :             .count()
     112               3 :     }
     113                 : 
     114                 :     /// Unregister walreceiver.
     115            1672 :     fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
     116            1672 :         let mut shared = self.mutex.lock();
     117            1672 :         shared.slots[id] = None;
     118            1672 :     }
     119                 : }
     120                 : 
     121                 : /// Only a few connections are expected (normally one), so store in Vec.
     122                 : struct WalReceiversShared {
     123                 :     slots: Vec<Option<WalReceiverState>>,
     124                 : }
     125                 : 
     126             165 : #[derive(Debug, Clone, Serialize, Deserialize)]
     127                 : pub struct WalReceiverState {
     128                 :     /// None means it is recovery initiated by us (this safekeeper).
     129                 :     pub conn_id: Option<ConnectionId>,
     130                 :     pub status: WalReceiverStatus,
     131                 : }
     132                 : 
     133                 : /// Walreceiver status. Currently only whether it passed voting stage and
     134                 : /// started receiving the stream, but it is easy to add more if needed.
     135             165 : #[derive(Debug, Clone, Serialize, Deserialize)]
     136                 : pub enum WalReceiverStatus {
     137                 :     Voting,
     138                 :     Streaming,
     139                 : }
     140                 : 
     141                 : /// Scope guard to access slot in WalReceivers registry and unregister from
     142                 : /// it in Drop.
     143                 : pub struct WalReceiverGuard {
     144                 :     id: WalReceiverId,
     145                 :     walreceivers: Arc<WalReceivers>,
     146                 : }
     147                 : 
     148                 : impl WalReceiverGuard {
     149                 :     /// Get reference to locked shared state contents.
     150             830 :     fn get(&self) -> MappedMutexGuard<WalReceiverState> {
     151             830 :         self.walreceivers.get_slot(self.id)
     152             830 :     }
     153                 : }
     154                 : 
     155                 : impl Drop for WalReceiverGuard {
     156            1672 :     fn drop(&mut self) {
     157            1672 :         self.walreceivers.unregister(self.id);
     158            1672 :     }
     159                 : }
     160                 : 
     161                 : pub const MSG_QUEUE_SIZE: usize = 256;
     162                 : pub const REPLY_QUEUE_SIZE: usize = 16;
     163                 : 
     164                 : impl SafekeeperPostgresHandler {
     165                 :     /// Wrapper around handle_start_wal_push_guts handling result. Error is
     166                 :     /// handled here while we're still in walreceiver ttid span; with API
     167                 :     /// extension, this can probably be moved into postgres_backend.
     168            1745 :     pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
     169            1745 :         &mut self,
     170            1745 :         pgb: &mut PostgresBackend<IO>,
     171            1745 :     ) -> Result<(), QueryError> {
     172         2711470 :         if let Err(end) = self.handle_start_wal_push_guts(pgb).await {
     173                 :             // Log the result and probably send it to the client, closing the stream.
     174            1671 :             pgb.handle_copy_stream_end(end).await;
     175 UBC           0 :         }
     176 CBC        1671 :         Ok(())
     177            1671 :     }
     178                 : 
     179            1745 :     pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
     180            1745 :         &mut self,
     181            1745 :         pgb: &mut PostgresBackend<IO>,
     182            1745 :     ) -> Result<(), CopyStreamHandlerEnd> {
     183            1745 :         // Notify the libpq client that it's allowed to send `CopyData` messages
     184            1745 :         pgb.write_message(&BeMessage::CopyBothResponse).await?;
     185                 : 
     186                 :         // Experiments [1] confirm that doing network IO in one (this) thread and
     187                 :         // processing with disc IO in another significantly improves
     188                 :         // performance; we spawn off WalAcceptor thread for message processing
     189                 :         // to this end.
     190                 :         //
     191                 :         // [1] https://github.com/neondatabase/neon/pull/1318
     192            1745 :         let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
     193            1745 :         let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
     194            1745 :         let mut acceptor_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
     195                 : 
     196                 :         // Concurrently receive and send data; replies are not synchronized with
     197                 :         // sends, so this avoids deadlocks.
     198            1745 :         let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
     199            1745 :         let peer_addr = *pgb.get_peer_addr();
     200            1745 :         let network_reader = NetworkReader {
     201            1745 :             ttid: self.ttid,
     202            1745 :             conn_id: self.conn_id,
     203            1745 :             pgb_reader: &mut pgb_reader,
     204            1745 :             peer_addr,
     205            1745 :             acceptor_handle: &mut acceptor_handle,
     206            1745 :         };
     207            1745 :         let res = tokio::select! {
     208                 :             // todo: add read|write .context to these errors
     209            1671 :             r = network_reader.run(msg_tx, msg_rx, reply_tx) => r,
     210 UBC           0 :             r = network_write(pgb, reply_rx) => r,
     211                 :         };
     212                 : 
     213                 :         // Join pg backend back.
     214 CBC        1671 :         pgb.unsplit(pgb_reader)?;
     215                 : 
     216                 :         // Join the spawned WalAcceptor. At this point chans to/from it passed
     217                 :         // to network routines are dropped, so it will exit as soon as it
     218                 :         // touches them.
     219            1671 :         match acceptor_handle {
     220                 :             None => {
     221                 :                 // failed even before spawning; read_network should have error
     222 UBC           0 :                 Err(res.expect_err("no error with WalAcceptor not spawn"))
     223                 :             }
     224 CBC        1671 :             Some(handle) => {
     225            1677 :                 let wal_acceptor_res = handle.await;
     226                 : 
     227                 :                 // If there was any network error, return it.
     228            1671 :                 res?;
     229                 : 
     230                 :                 // Otherwise, WalAcceptor thread must have errored.
     231 UBC           0 :                 match wal_acceptor_res {
     232               0 :                     Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination
     233               0 :                     Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
     234               0 :                     Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
     235               0 :                         "WalAcceptor task panicked",
     236               0 :                     ))),
     237                 :                 }
     238                 :             }
     239                 :         }
     240 CBC        1671 :     }
     241                 : }
     242                 : 
     243                 : struct NetworkReader<'a, IO> {
     244                 :     ttid: TenantTimelineId,
     245                 :     conn_id: ConnectionId,
     246                 :     pgb_reader: &'a mut PostgresBackendReader<IO>,
     247                 :     peer_addr: SocketAddr,
     248                 :     // WalAcceptor is spawned when we learn server info from walproposer and
     249                 :     // create timeline; handle is put here.
     250                 :     acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
     251                 : }
     252                 : 
     253                 : impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
     254            1745 :     async fn run(
     255            1745 :         self,
     256            1745 :         msg_tx: Sender<ProposerAcceptorMessage>,
     257            1745 :         msg_rx: Receiver<ProposerAcceptorMessage>,
     258            1745 :         reply_tx: Sender<AcceptorProposerMessage>,
     259            1745 :     ) -> Result<(), CopyStreamHandlerEnd> {
     260                 :         // Receive information about server to create timeline, if not yet.
     261            1745 :         let next_msg = read_message(self.pgb_reader).await?;
     262            1745 :         let tli = match next_msg {
     263            1745 :             ProposerAcceptorMessage::Greeting(ref greeting) => {
     264            1745 :                 info!(
     265            1745 :                     "start handshake with walproposer {} sysid {} timeline {}",
     266            1745 :                     self.peer_addr, greeting.system_id, greeting.tli,
     267            1745 :                 );
     268            1745 :                 let server_info = ServerInfo {
     269            1745 :                     pg_version: greeting.pg_version,
     270            1745 :                     system_id: greeting.system_id,
     271            1745 :                     wal_seg_size: greeting.wal_seg_size,
     272            1745 :                 };
     273            2464 :                 GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID).await?
     274                 :             }
     275                 :             _ => {
     276 UBC           0 :                 return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
     277               0 :                     "unexpected message {next_msg:?} instead of greeting"
     278               0 :                 )))
     279                 :             }
     280                 :         };
     281                 : 
     282 CBC        1745 :         *self.acceptor_handle = Some(WalAcceptor::spawn(
     283            1745 :             tli.clone(),
     284            1745 :             msg_rx,
     285            1745 :             reply_tx,
     286            1745 :             Some(self.conn_id),
     287            1745 :         ));
     288            1745 : 
     289            1745 :         // Forward all messages to WalAcceptor
     290         2705592 :         read_network_loop(self.pgb_reader, msg_tx, next_msg).await
     291            1671 :     }
     292                 : }
     293                 : 
     294                 : /// Read next message from walproposer.
     295                 : /// TODO: Return Ok(None) on graceful termination.
     296         1981903 : async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
     297         1981903 :     pgb_reader: &mut PostgresBackendReader<IO>,
     298         1981903 : ) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
     299         2706718 :     let copy_data = pgb_reader.read_copy_message().await?;
     300         1980158 :     let msg = ProposerAcceptorMessage::parse(copy_data)?;
     301         1980158 :     Ok(msg)
     302         1981829 : }
     303                 : 
     304            1745 : async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
     305            1745 :     pgb_reader: &mut PostgresBackendReader<IO>,
     306            1745 :     msg_tx: Sender<ProposerAcceptorMessage>,
     307            1745 :     mut next_msg: ProposerAcceptorMessage,
     308            1745 : ) -> Result<(), CopyStreamHandlerEnd> {
     309                 :     loop {
     310         1980158 :         if msg_tx.send(next_msg).await.is_err() {
     311 UBC           0 :             return Ok(()); // chan closed, WalAcceptor terminated
     312 CBC     1980158 :         }
     313         2704981 :         next_msg = read_message(pgb_reader).await?;
     314                 :     }
     315            1671 : }
     316                 : 
     317                 : /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
     318                 : /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
     319                 : /// tell the error.
     320            1745 : async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
     321            1745 :     pgb_writer: &mut PostgresBackend<IO>,
     322            1745 :     mut reply_rx: Receiver<AcceptorProposerMessage>,
     323            1745 : ) -> Result<(), CopyStreamHandlerEnd> {
     324            1745 :     let mut buf = BytesMut::with_capacity(128);
     325                 : 
     326                 :     loop {
     327         2708951 :         match reply_rx.recv().await {
     328         1297710 :             Some(msg) => {
     329         1297710 :                 buf.clear();
     330         1297710 :                 msg.serialize(&mut buf)?;
     331         1297710 :                 pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
     332                 :             }
     333 UBC           0 :             None => return Ok(()), // chan closed, WalAcceptor terminated
     334                 :         }
     335                 :     }
     336               0 : }
     337                 : 
     338                 : // Send keepalive messages to walproposer, to make sure it receives updates
     339                 : // even when it writes a steady stream of messages.
     340                 : const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
     341                 : 
     342                 : /// Encapsulates a task which takes messages from msg_rx, processes and pushes
     343                 : /// replies to reply_tx; reading from socket and writing to disk in parallel is
     344                 : /// beneficial for performance, this struct provides writing to disk part.
     345                 : pub struct WalAcceptor {
     346                 :     tli: Arc<Timeline>,
     347                 :     msg_rx: Receiver<ProposerAcceptorMessage>,
     348                 :     reply_tx: Sender<AcceptorProposerMessage>,
     349                 :     conn_id: Option<ConnectionId>,
     350                 : }
     351                 : 
     352                 : impl WalAcceptor {
     353                 :     /// Spawn task with WalAcceptor running, return handle to it. Task returns
     354                 :     /// Ok(()) if either of channels has closed, and Err if any error during
     355                 :     /// message processing is encountered.
     356                 :     ///
     357                 :     /// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
     358 CBC        1746 :     pub fn spawn(
     359            1746 :         tli: Arc<Timeline>,
     360            1746 :         msg_rx: Receiver<ProposerAcceptorMessage>,
     361            1746 :         reply_tx: Sender<AcceptorProposerMessage>,
     362            1746 :         conn_id: Option<ConnectionId>,
     363            1746 :     ) -> JoinHandle<anyhow::Result<()>> {
     364            1746 :         task::spawn(async move {
     365            1746 :             let mut wa = WalAcceptor {
     366            1746 :                 tli,
     367            1746 :                 msg_rx,
     368            1746 :                 reply_tx,
     369            1746 :                 conn_id,
     370            1746 :             };
     371            1746 : 
     372            1746 :             let span_ttid = wa.tli.ttid; // satisfy borrow checker
     373            1746 :             wa.run()
     374            1746 :                 .instrument(
     375            1746 :                     info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
     376                 :                 )
     377         5253477 :                 .await
     378            1746 :         })
     379            1746 :     }
     380                 : 
     381                 :     /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
     382                 :     /// it must mean that network thread terminated.
     383            1746 :     async fn run(&mut self) -> anyhow::Result<()> {
     384            1746 :         // Register the connection and defer unregister.
     385            1746 :         // Order of the next two lines is important: we want first to remove our entry and then
     386            1746 :         // update status which depends on registered connections.
     387            1746 :         let _compute_conn_guard = ComputeConnectionGuard {
     388            1746 :             timeline: Arc::clone(&self.tli),
     389            1746 :         };
     390            1746 :         let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
     391            1746 :         self.tli.update_status_notify().await?;
     392                 : 
     393                 :         // After this timestamp we will stop processing AppendRequests and send a response
     394                 :         // to the walproposer. walproposer sends at least one AppendRequest per second,
     395                 :         // we will send keepalives by replying to these requests once per second.
     396            1746 :         let mut next_keepalive = Instant::now();
     397                 : 
     398                 :         loop {
     399         1300288 :             let opt_msg = self.msg_rx.recv().await;
     400         1300215 :             if opt_msg.is_none() {
     401            1664 :                 return Ok(()); // chan closed, streaming terminated
     402         1298551 :             }
     403         1298551 :             let mut next_msg = opt_msg.unwrap();
     404         1298551 : 
     405         1298551 :             // Update walreceiver state in shmem for reporting.
     406         1298551 :             if let ProposerAcceptorMessage::Elected(_) = &next_msg {
     407             830 :                 walreceiver_guard.get().status = WalReceiverStatus::Streaming;
     408         1297721 :             }
     409                 : 
     410         1298551 :             let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
     411                 :                 // loop through AppendRequest's while it's readily available to
     412                 :                 // write as many WAL as possible without fsyncing
     413                 :                 //
     414                 :                 // Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
     415                 :                 // Otherwise, we might end up in a situation where we read a message, but don't
     416                 :                 // process it.
     417         1975841 :                 while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
     418         1975841 :                     let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
     419                 : 
     420         2489374 :                     if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
     421               1 :                         if self.reply_tx.send(reply).await.is_err() {
     422               1 :                             return Ok(()); // chan closed, streaming terminated
     423 UBC           0 :                         }
     424 CBC     1975839 :                     }
     425                 : 
     426                 :                     // get out of this loop if keepalive time is reached
     427         1975839 :                     if Instant::now() >= next_keepalive {
     428            1897 :                         break;
     429         1973942 :                     }
     430         1973942 : 
     431         1973942 :                     match self.msg_rx.try_recv() {
     432          681607 :                         Ok(msg) => next_msg = msg,
     433         1292331 :                         Err(TryRecvError::Empty) => break,
     434               4 :                         Err(TryRecvError::Disconnected) => return Ok(()), // chan closed, streaming terminated
     435                 :                     }
     436                 :                 }
     437                 : 
     438                 :                 // flush all written WAL to the disk
     439         1294228 :                 self.tli
     440         1294228 :                     .process_msg(&ProposerAcceptorMessage::FlushWAL)
     441             382 :                     .await?
     442                 :             } else {
     443                 :                 // process message other than AppendRequest
     444         1486765 :                 self.tli.process_msg(&next_msg).await?
     445                 :             };
     446                 : 
     447         1298545 :             if let Some(reply) = reply_msg {
     448         1297715 :                 if self.reply_tx.send(reply).await.is_err() {
     449               3 :                     return Ok(()); // chan closed, streaming terminated
     450         1297712 :                 }
     451         1297712 :                 // reset keepalive time
     452         1297712 :                 next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
     453             830 :             }
     454                 :         }
     455            1672 :     }
     456                 : }
     457                 : 
     458                 : /// Calls update_status_notify in drop to update timeline status.
     459                 : struct ComputeConnectionGuard {
     460                 :     timeline: Arc<Timeline>,
     461                 : }
     462                 : 
     463                 : impl Drop for ComputeConnectionGuard {
     464            1672 :     fn drop(&mut self) {
     465            1672 :         let tli = self.timeline.clone();
     466            1672 :         tokio::spawn(async move {
     467            1672 :             if let Err(e) = tli.update_status_notify().await {
     468 UBC           0 :                 error!("failed to update timeline status: {}", e);
     469 CBC        1672 :             }
     470            1672 :         });
     471            1672 :     }
     472                 : }
        

Generated by: LCOV version 2.1-beta