LCOV - differential code coverage report
Current view: top level - safekeeper/src - receive_wal.rs (source / functions) Coverage Total Hit UBC GBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 92.9 % 225 209 16 1 208
Current Date: 2023-10-19 02:04:12 Functions: 63.2 % 57 36 21 36
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Safekeeper communication endpoint to WAL proposer (compute node).
       2                 : //! Gets messages from the network, passes them down to consensus module and
       3                 : //! sends replies back.
       4                 : 
       5                 : use crate::handler::SafekeeperPostgresHandler;
       6                 : use crate::safekeeper::AcceptorProposerMessage;
       7                 : use crate::safekeeper::ProposerAcceptorMessage;
       8                 : use crate::safekeeper::ServerInfo;
       9                 : use crate::timeline::Timeline;
      10                 : use crate::wal_service::ConnectionId;
      11                 : use crate::GlobalTimelines;
      12                 : use anyhow::{anyhow, Context};
      13                 : use bytes::BytesMut;
      14                 : use parking_lot::MappedMutexGuard;
      15                 : use parking_lot::Mutex;
      16                 : use parking_lot::MutexGuard;
      17                 : use postgres_backend::CopyStreamHandlerEnd;
      18                 : use postgres_backend::PostgresBackend;
      19                 : use postgres_backend::PostgresBackendReader;
      20                 : use postgres_backend::QueryError;
      21                 : use pq_proto::BeMessage;
      22                 : use serde::Deserialize;
      23                 : use serde::Serialize;
      24                 : use std::net::SocketAddr;
      25                 : use std::sync::Arc;
      26                 : use tokio::io::AsyncRead;
      27                 : use tokio::io::AsyncWrite;
      28                 : use tokio::sync::mpsc::channel;
      29                 : use tokio::sync::mpsc::error::TryRecvError;
      30                 : use tokio::sync::mpsc::Receiver;
      31                 : use tokio::sync::mpsc::Sender;
      32                 : use tokio::task;
      33                 : use tokio::task::JoinHandle;
      34                 : use tokio::time::Duration;
      35                 : use tokio::time::Instant;
      36                 : use tracing::*;
      37                 : use utils::id::TenantTimelineId;
      38                 : use utils::lsn::Lsn;
      39                 : 
      40                 : /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
      41                 : /// in Arc).
      42                 : pub struct WalReceivers {
      43                 :     mutex: Mutex<WalReceiversShared>,
      44                 : }
      45                 : 
      46                 : /// Id under which walreceiver is registered in shmem.
      47                 : type WalReceiverId = usize;
      48                 : 
      49                 : impl WalReceivers {
      50 CBC         565 :     pub fn new() -> Arc<WalReceivers> {
      51             565 :         Arc::new(WalReceivers {
      52             565 :             mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
      53             565 :         })
      54             565 :     }
      55                 : 
      56                 :     /// Register new walreceiver. Returned guard provides access to the slot and
      57                 :     /// automatically deregisters in Drop.
      58            2012 :     pub fn register(self: &Arc<WalReceivers>) -> WalReceiverGuard {
      59            2012 :         let slots = &mut self.mutex.lock().slots;
      60            2012 :         let walreceiver = WalReceiverState::Voting;
      61                 :         // find empty slot or create new one
      62            2012 :         let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
      63            1447 :             slots[pos] = Some(walreceiver);
      64            1447 :             pos
      65                 :         } else {
      66             565 :             let pos = slots.len();
      67             565 :             slots.push(Some(walreceiver));
      68             565 :             pos
      69                 :         };
      70            2012 :         WalReceiverGuard {
      71            2012 :             id: pos,
      72            2012 :             walreceivers: self.clone(),
      73            2012 :         }
      74            2012 :     }
      75                 : 
      76                 :     /// Get reference to locked slot contents. Slot must exist (registered
      77                 :     /// earlier).
      78            1016 :     fn get_slot<'a>(
      79            1016 :         self: &'a Arc<WalReceivers>,
      80            1016 :         id: WalReceiverId,
      81            1016 :     ) -> MappedMutexGuard<'a, WalReceiverState> {
      82            1016 :         MutexGuard::map(self.mutex.lock(), |locked| {
      83            1016 :             locked.slots[id]
      84            1016 :                 .as_mut()
      85            1016 :                 .expect("walreceiver doesn't exist")
      86            1016 :         })
      87            1016 :     }
      88                 : 
      89                 :     /// Get number of walreceivers (compute connections).
      90           16500 :     pub fn get_num(self: &Arc<WalReceivers>) -> usize {
      91           16500 :         self.mutex.lock().slots.iter().flatten().count()
      92           16500 :     }
      93                 : 
      94                 :     /// Get state of all walreceivers.
      95             212 :     pub fn get_all(self: &Arc<WalReceivers>) -> Vec<WalReceiverState> {
      96             212 :         self.mutex.lock().slots.iter().flatten().cloned().collect()
      97             212 :     }
      98                 : 
      99                 :     /// Unregister walsender.
     100            1935 :     fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
     101            1935 :         let mut shared = self.mutex.lock();
     102            1935 :         shared.slots[id] = None;
     103            1935 :     }
     104                 : }
     105                 : 
     106                 : /// Only a few connections are expected (normally one), so store in Vec.
     107                 : struct WalReceiversShared {
     108                 :     slots: Vec<Option<WalReceiverState>>,
     109                 : }
     110                 : 
     111                 : /// Walreceiver status. Currently only whether it passed voting stage and
     112                 : /// started receiving the stream, but it is easy to add more if needed.
     113             198 : #[derive(Debug, Clone, Serialize, Deserialize)]
     114                 : pub enum WalReceiverState {
     115                 :     Voting,
     116                 :     Streaming,
     117                 : }
     118                 : 
     119                 : /// Scope guard to access slot in WalSenders registry and unregister from it in
     120                 : /// Drop.
     121                 : pub struct WalReceiverGuard {
     122                 :     id: WalReceiverId,
     123                 :     walreceivers: Arc<WalReceivers>,
     124                 : }
     125                 : 
     126                 : impl WalReceiverGuard {
     127                 :     /// Get reference to locked shared state contents.
     128            1016 :     fn get(&self) -> MappedMutexGuard<WalReceiverState> {
     129            1016 :         self.walreceivers.get_slot(self.id)
     130            1016 :     }
     131                 : }
     132                 : 
     133                 : impl Drop for WalReceiverGuard {
     134            1935 :     fn drop(&mut self) {
     135            1935 :         self.walreceivers.unregister(self.id);
     136            1935 :     }
     137                 : }
     138                 : 
     139                 : const MSG_QUEUE_SIZE: usize = 256;
     140                 : const REPLY_QUEUE_SIZE: usize = 16;
     141                 : 
     142                 : impl SafekeeperPostgresHandler {
     143                 :     /// Wrapper around handle_start_wal_push_guts handling result. Error is
     144                 :     /// handled here while we're still in walreceiver ttid span; with API
     145                 :     /// extension, this can probably be moved into postgres_backend.
     146            2013 :     pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
     147            2013 :         &mut self,
     148            2013 :         pgb: &mut PostgresBackend<IO>,
     149            2013 :     ) -> Result<(), QueryError> {
     150         5068421 :         if let Err(end) = self.handle_start_wal_push_guts(pgb).await {
     151                 :             // Log the result and probably send it to the client, closing the stream.
     152            1936 :             pgb.handle_copy_stream_end(end).await;
     153 UBC           0 :         }
     154 CBC        1936 :         Ok(())
     155            1936 :     }
     156                 : 
     157            2013 :     pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
     158            2013 :         &mut self,
     159            2013 :         pgb: &mut PostgresBackend<IO>,
     160            2013 :     ) -> Result<(), CopyStreamHandlerEnd> {
     161            2013 :         // Notify the libpq client that it's allowed to send `CopyData` messages
     162            2013 :         pgb.write_message(&BeMessage::CopyBothResponse).await?;
     163                 : 
     164                 :         // Experiments [1] confirm that doing network IO in one (this) thread and
     165                 :         // processing with disc IO in another significantly improves
     166                 :         // performance; we spawn off WalAcceptor thread for message processing
     167                 :         // to this end.
     168                 :         //
     169                 :         // [1] https://github.com/neondatabase/neon/pull/1318
     170            2013 :         let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
     171            2013 :         let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
     172            2013 :         let mut acceptor_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
     173                 : 
     174                 :         // Concurrently receive and send data; replies are not synchronized with
     175                 :         // sends, so this avoids deadlocks.
     176            2013 :         let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
     177            2013 :         let peer_addr = *pgb.get_peer_addr();
     178            2013 :         let network_reader = NetworkReader {
     179            2013 :             ttid: self.ttid,
     180            2013 :             conn_id: self.conn_id,
     181            2013 :             pgb_reader: &mut pgb_reader,
     182            2013 :             peer_addr,
     183            2013 :             acceptor_handle: &mut acceptor_handle,
     184            2013 :         };
     185            2013 :         let res = tokio::select! {
     186                 :             // todo: add read|write .context to these errors
     187            1936 :             r = network_reader.run(msg_tx, msg_rx, reply_tx) => r,
     188 UBC           0 :             r = network_write(pgb, reply_rx) => r,
     189                 :         };
     190                 : 
     191                 :         // Join pg backend back.
     192 CBC        1936 :         pgb.unsplit(pgb_reader)?;
     193                 : 
     194                 :         // Join the spawned WalAcceptor. At this point chans to/from it passed
     195                 :         // to network routines are dropped, so it will exit as soon as it
     196                 :         // touches them.
     197            1936 :         match acceptor_handle {
     198                 :             None => {
     199                 :                 // failed even before spawning; read_network should have error
     200 GBC           1 :                 Err(res.expect_err("no error with WalAcceptor not spawn"))
     201                 :             }
     202 CBC        1935 :             Some(handle) => {
     203            1937 :                 let wal_acceptor_res = handle.await;
     204                 : 
     205                 :                 // If there was any network error, return it.
     206            1935 :                 res?;
     207                 : 
     208                 :                 // Otherwise, WalAcceptor thread must have errored.
     209 UBC           0 :                 match wal_acceptor_res {
     210               0 :                     Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination
     211               0 :                     Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
     212               0 :                     Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
     213               0 :                         "WalAcceptor task panicked",
     214               0 :                     ))),
     215                 :                 }
     216                 :             }
     217                 :         }
     218 CBC        1936 :     }
     219                 : }
     220                 : 
     221                 : struct NetworkReader<'a, IO> {
     222                 :     ttid: TenantTimelineId,
     223                 :     conn_id: ConnectionId,
     224                 :     pgb_reader: &'a mut PostgresBackendReader<IO>,
     225                 :     peer_addr: SocketAddr,
     226                 :     // WalAcceptor is spawned when we learn server info from walproposer and
     227                 :     // create timeline; handle is put here.
     228                 :     acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
     229                 : }
     230                 : 
     231                 : impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
     232            2013 :     async fn run(
     233            2013 :         self,
     234            2013 :         msg_tx: Sender<ProposerAcceptorMessage>,
     235            2013 :         msg_rx: Receiver<ProposerAcceptorMessage>,
     236            2013 :         reply_tx: Sender<AcceptorProposerMessage>,
     237            2013 :     ) -> Result<(), CopyStreamHandlerEnd> {
     238                 :         // Receive information about server to create timeline, if not yet.
     239            2013 :         let next_msg = read_message(self.pgb_reader).await?;
     240            2012 :         let tli = match next_msg {
     241            2012 :             ProposerAcceptorMessage::Greeting(ref greeting) => {
     242            2012 :                 info!(
     243            2012 :                     "start handshake with walproposer {} sysid {} timeline {}",
     244            2012 :                     self.peer_addr, greeting.system_id, greeting.tli,
     245            2012 :                 );
     246            2012 :                 let server_info = ServerInfo {
     247            2012 :                     pg_version: greeting.pg_version,
     248            2012 :                     system_id: greeting.system_id,
     249            2012 :                     wal_seg_size: greeting.wal_seg_size,
     250            2012 :                 };
     251            2540 :                 GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID).await?
     252                 :             }
     253                 :             _ => {
     254 UBC           0 :                 return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
     255               0 :                     "unexpected message {next_msg:?} instead of greeting"
     256               0 :                 )))
     257                 :             }
     258                 :         };
     259                 : 
     260 CBC        2012 :         *self.acceptor_handle = Some(WalAcceptor::spawn(
     261            2012 :             tli.clone(),
     262            2012 :             msg_rx,
     263            2012 :             reply_tx,
     264            2012 :             self.conn_id,
     265            2012 :         ));
     266            2012 : 
     267            2012 :         // Forward all messages to WalAcceptor
     268         5061931 :         read_network_loop(self.pgb_reader, msg_tx, next_msg).await
     269            1936 :     }
     270                 : }
     271                 : 
     272                 : /// Read next message from walproposer.
     273                 : /// TODO: Return Ok(None) on graceful termination.
     274         2919773 : async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
     275         2919773 :     pgb_reader: &mut PostgresBackendReader<IO>,
     276         2919773 : ) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
     277         5063908 :     let copy_data = pgb_reader.read_copy_message().await?;
     278         2917760 :     let msg = ProposerAcceptorMessage::parse(copy_data)?;
     279         2917760 :     Ok(msg)
     280         2919696 : }
     281                 : 
     282            2012 : async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
     283            2012 :     pgb_reader: &mut PostgresBackendReader<IO>,
     284            2012 :     msg_tx: Sender<ProposerAcceptorMessage>,
     285            2012 :     mut next_msg: ProposerAcceptorMessage,
     286            2012 : ) -> Result<(), CopyStreamHandlerEnd> {
     287                 :     loop {
     288         2917760 :         if msg_tx.send(next_msg).await.is_err() {
     289 UBC           0 :             return Ok(()); // chan closed, WalAcceptor terminated
     290 CBC     2917760 :         }
     291         5061895 :         next_msg = read_message(pgb_reader).await?;
     292                 :     }
     293            1935 : }
     294                 : 
     295                 : /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
     296                 : /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
     297                 : /// tell the error.
     298            2013 : async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
     299            2013 :     pgb_writer: &mut PostgresBackend<IO>,
     300            2013 :     mut reply_rx: Receiver<AcceptorProposerMessage>,
     301            2013 : ) -> Result<(), CopyStreamHandlerEnd> {
     302            2013 :     let mut buf = BytesMut::with_capacity(128);
     303                 : 
     304                 :     loop {
     305         5065514 :         match reply_rx.recv().await {
     306         2463777 :             Some(msg) => {
     307         2463777 :                 buf.clear();
     308         2463777 :                 msg.serialize(&mut buf)?;
     309         2463777 :                 pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
     310                 :             }
     311 UBC           0 :             None => return Ok(()), // chan closed, WalAcceptor terminated
     312                 :         }
     313                 :     }
     314               0 : }
     315                 : 
     316                 : // Send keepalive messages to walproposer, to make sure it receives updates
     317                 : // even when it writes a steady stream of messages.
     318                 : const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
     319                 : 
     320                 : /// Takes messages from msg_rx, processes and pushes replies to reply_tx.
     321                 : struct WalAcceptor {
     322                 :     tli: Arc<Timeline>,
     323                 :     msg_rx: Receiver<ProposerAcceptorMessage>,
     324                 :     reply_tx: Sender<AcceptorProposerMessage>,
     325                 : }
     326                 : 
     327                 : impl WalAcceptor {
     328                 :     /// Spawn thread with WalAcceptor running, return handle to it.
     329 CBC        2012 :     fn spawn(
     330            2012 :         tli: Arc<Timeline>,
     331            2012 :         msg_rx: Receiver<ProposerAcceptorMessage>,
     332            2012 :         reply_tx: Sender<AcceptorProposerMessage>,
     333            2012 :         conn_id: ConnectionId,
     334            2012 :     ) -> JoinHandle<anyhow::Result<()>> {
     335            2012 :         task::spawn(async move {
     336            2012 :             let mut wa = WalAcceptor {
     337            2012 :                 tli,
     338            2012 :                 msg_rx,
     339            2012 :                 reply_tx,
     340            2012 :             };
     341            2012 : 
     342            2012 :             let span_ttid = wa.tli.ttid; // satisfy borrow checker
     343            2012 :             wa.run()
     344            2012 :                 .instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid))
     345         7207096 :                 .await
     346            2012 :         })
     347            2012 :     }
     348                 : 
     349                 :     /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
     350                 :     /// it must mean that network thread terminated.
     351            2012 :     async fn run(&mut self) -> anyhow::Result<()> {
     352            2012 :         // Register the connection and defer unregister.
     353            2012 :         // Order of the next two lines is important: we want first to remove our entry and then
     354            2012 :         // update status which depends on registered connections.
     355            2012 :         let _compute_conn_guard = ComputeConnectionGuard {
     356            2012 :             timeline: Arc::clone(&self.tli),
     357            2012 :         };
     358            2012 :         let walreceiver_guard = self.tli.get_walreceivers().register();
     359            2012 :         self.tli.update_status_notify().await?;
     360                 : 
     361                 :         // After this timestamp we will stop processing AppendRequests and send a response
     362                 :         // to the walproposer. walproposer sends at least one AppendRequest per second,
     363                 :         // we will send keepalives by replying to these requests once per second.
     364            2012 :         let mut next_keepalive = Instant::now();
     365                 : 
     366                 :         loop {
     367         2466748 :             let opt_msg = self.msg_rx.recv().await;
     368         2466671 :             if opt_msg.is_none() {
     369            1932 :                 return Ok(()); // chan closed, streaming terminated
     370         2464739 :             }
     371         2464739 :             let mut next_msg = opt_msg.unwrap();
     372         2464739 : 
     373         2464739 :             // Update walreceiver state in shmem for reporting.
     374         2464739 :             if let ProposerAcceptorMessage::Elected(_) = &next_msg {
     375            1016 :                 *walreceiver_guard.get() = WalReceiverState::Streaming;
     376         2463723 :             }
     377                 : 
     378         2464739 :             let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
     379                 :                 // loop through AppendRequest's while it's readily available to
     380                 :                 // write as many WAL as possible without fsyncing
     381                 :                 //
     382                 :                 // Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
     383                 :                 // Otherwise, we might end up in a situation where we read a message, but don't
     384                 :                 // process it.
     385         2912723 :                 while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
     386         2912723 :                     let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
     387                 : 
     388         3135031 :                     if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
     389              57 :                         if self.reply_tx.send(reply).await.is_err() {
     390 UBC           0 :                             return Ok(()); // chan closed, streaming terminated
     391 CBC          57 :                         }
     392         2912666 :                     }
     393                 : 
     394                 :                     // get out of this loop if keepalive time is reached
     395         2912723 :                     if Instant::now() >= next_keepalive {
     396            1339 :                         break;
     397         2911384 :                     }
     398         2911384 : 
     399         2911384 :                     match self.msg_rx.try_recv() {
     400          453021 :                         Ok(msg) => next_msg = msg,
     401         2458361 :                         Err(TryRecvError::Empty) => break,
     402               2 :                         Err(TryRecvError::Disconnected) => return Ok(()), // chan closed, streaming terminated
     403                 :                     }
     404                 :                 }
     405                 : 
     406                 :                 // flush all written WAL to the disk
     407         2459700 :                 self.tli
     408         2459700 :                     .process_msg(&ProposerAcceptorMessage::FlushWAL)
     409             338 :                     .await?
     410                 :             } else {
     411                 :                 // process message other than AppendRequest
     412         1622170 :                 self.tli.process_msg(&next_msg).await?
     413                 :             };
     414                 : 
     415         2464737 :             if let Some(reply) = reply_msg {
     416         2463721 :                 if self.reply_tx.send(reply).await.is_err() {
     417               1 :                     return Ok(()); // chan closed, streaming terminated
     418         2463720 :                 }
     419         2463720 :                 // reset keepalive time
     420         2463720 :                 next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
     421            1016 :             }
     422                 :         }
     423            1935 :     }
     424                 : }
     425                 : 
     426                 : /// Calls update_status_notify in drop to update timeline status.
     427                 : struct ComputeConnectionGuard {
     428                 :     timeline: Arc<Timeline>,
     429                 : }
     430                 : 
     431                 : impl Drop for ComputeConnectionGuard {
     432            1935 :     fn drop(&mut self) {
     433            1935 :         let tli = self.timeline.clone();
     434            1935 :         tokio::spawn(async move {
     435            1935 :             if let Err(e) = tli.update_status_notify().await {
     436 UBC           0 :                 error!("failed to update timeline status: {}", e);
     437 CBC        1935 :             }
     438            1935 :         });
     439            1935 :     }
     440                 : }
        

Generated by: LCOV version 2.1-beta