LCOV - code coverage report
Current view: top level - safekeeper/src - receive_wal.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 92.4 % 225 208
Test Date: 2023-09-06 10:18:01 Functions: 63.2 % 57 36

            Line data    Source code
       1              : //! Safekeeper communication endpoint to WAL proposer (compute node).
       2              : //! Gets messages from the network, passes them down to consensus module and
       3              : //! sends replies back.
       4              : 
       5              : use crate::handler::SafekeeperPostgresHandler;
       6              : use crate::safekeeper::AcceptorProposerMessage;
       7              : use crate::safekeeper::ProposerAcceptorMessage;
       8              : use crate::safekeeper::ServerInfo;
       9              : use crate::timeline::Timeline;
      10              : use crate::wal_service::ConnectionId;
      11              : use crate::GlobalTimelines;
      12              : use anyhow::{anyhow, Context};
      13              : use bytes::BytesMut;
      14              : use parking_lot::MappedMutexGuard;
      15              : use parking_lot::Mutex;
      16              : use parking_lot::MutexGuard;
      17              : use postgres_backend::CopyStreamHandlerEnd;
      18              : use postgres_backend::PostgresBackend;
      19              : use postgres_backend::PostgresBackendReader;
      20              : use postgres_backend::QueryError;
      21              : use pq_proto::BeMessage;
      22              : use serde::Deserialize;
      23              : use serde::Serialize;
      24              : use std::net::SocketAddr;
      25              : use std::sync::Arc;
      26              : use tokio::io::AsyncRead;
      27              : use tokio::io::AsyncWrite;
      28              : use tokio::sync::mpsc::channel;
      29              : use tokio::sync::mpsc::error::TryRecvError;
      30              : use tokio::sync::mpsc::Receiver;
      31              : use tokio::sync::mpsc::Sender;
      32              : use tokio::task;
      33              : use tokio::task::JoinHandle;
      34              : use tokio::time::Duration;
      35              : use tokio::time::Instant;
      36              : use tracing::*;
      37              : use utils::id::TenantTimelineId;
      38              : use utils::lsn::Lsn;
      39              : 
      40              : /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
      41              : /// in Arc).
      42              : pub struct WalReceivers {
      43              :     mutex: Mutex<WalReceiversShared>,
      44              : }
      45              : 
      46              : /// Id under which walreceiver is registered in shmem.
      47              : type WalReceiverId = usize;
      48              : 
      49              : impl WalReceivers {
      50          604 :     pub fn new() -> Arc<WalReceivers> {
      51          604 :         Arc::new(WalReceivers {
      52          604 :             mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
      53          604 :         })
      54          604 :     }
      55              : 
      56              :     /// Register new walreceiver. Returned guard provides access to the slot and
      57              :     /// automatically deregisters in Drop.
      58         2105 :     pub fn register(self: &Arc<WalReceivers>) -> WalReceiverGuard {
      59         2105 :         let slots = &mut self.mutex.lock().slots;
      60         2105 :         let walreceiver = WalReceiverState::Voting;
      61              :         // find empty slot or create new one
      62         2105 :         let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
      63         1493 :             slots[pos] = Some(walreceiver);
      64         1493 :             pos
      65              :         } else {
      66          612 :             let pos = slots.len();
      67          612 :             slots.push(Some(walreceiver));
      68          612 :             pos
      69              :         };
      70         2105 :         WalReceiverGuard {
      71         2105 :             id: pos,
      72         2105 :             walreceivers: self.clone(),
      73         2105 :         }
      74         2105 :     }
      75              : 
      76              :     /// Get reference to locked slot contents. Slot must exist (registered
      77              :     /// earlier).
      78          968 :     fn get_slot<'a>(
      79          968 :         self: &'a Arc<WalReceivers>,
      80          968 :         id: WalReceiverId,
      81          968 :     ) -> MappedMutexGuard<'a, WalReceiverState> {
      82          968 :         MutexGuard::map(self.mutex.lock(), |locked| {
      83          968 :             locked.slots[id]
      84          968 :                 .as_mut()
      85          968 :                 .expect("walreceiver doesn't exist")
      86          968 :         })
      87          968 :     }
      88              : 
      89              :     /// Get number of walreceivers (compute connections).
      90        14005 :     pub fn get_num(self: &Arc<WalReceivers>) -> usize {
      91        14005 :         self.mutex.lock().slots.iter().flatten().count()
      92        14005 :     }
      93              : 
      94              :     /// Get state of all walreceivers.
      95          199 :     pub fn get_all(self: &Arc<WalReceivers>) -> Vec<WalReceiverState> {
      96          199 :         self.mutex.lock().slots.iter().flatten().cloned().collect()
      97          199 :     }
      98              : 
      99              :     /// Unregister walsender.
     100         2032 :     fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
     101         2032 :         let mut shared = self.mutex.lock();
     102         2032 :         shared.slots[id] = None;
     103         2032 :     }
     104              : }
     105              : 
     106              : /// Only a few connections are expected (normally one), so store in Vec.
     107              : struct WalReceiversShared {
     108              :     slots: Vec<Option<WalReceiverState>>,
     109              : }
     110              : 
     111              : /// Walreceiver status. Currently only whether it passed voting stage and
     112              : /// started receiving the stream, but it is easy to add more if needed.
     113          186 : #[derive(Debug, Clone, Serialize, Deserialize)]
     114              : pub enum WalReceiverState {
     115              :     Voting,
     116              :     Streaming,
     117              : }
     118              : 
     119              : /// Scope guard to access slot in WalSenders registry and unregister from it in
     120              : /// Drop.
     121              : pub struct WalReceiverGuard {
     122              :     id: WalReceiverId,
     123              :     walreceivers: Arc<WalReceivers>,
     124              : }
     125              : 
     126              : impl WalReceiverGuard {
     127              :     /// Get reference to locked shared state contents.
     128          968 :     fn get(&self) -> MappedMutexGuard<WalReceiverState> {
     129          968 :         self.walreceivers.get_slot(self.id)
     130          968 :     }
     131              : }
     132              : 
     133              : impl Drop for WalReceiverGuard {
     134         2032 :     fn drop(&mut self) {
     135         2032 :         self.walreceivers.unregister(self.id);
     136         2032 :     }
     137              : }
     138              : 
     139              : const MSG_QUEUE_SIZE: usize = 256;
     140              : const REPLY_QUEUE_SIZE: usize = 16;
     141              : 
     142              : impl SafekeeperPostgresHandler {
     143              :     /// Wrapper around handle_start_wal_push_guts handling result. Error is
     144              :     /// handled here while we're still in walreceiver ttid span; with API
     145              :     /// extension, this can probably be moved into postgres_backend.
     146         2105 :     pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
     147         2105 :         &mut self,
     148         2105 :         pgb: &mut PostgresBackend<IO>,
     149         2105 :     ) -> Result<(), QueryError> {
     150      4548274 :         if let Err(end) = self.handle_start_wal_push_guts(pgb).await {
     151              :             // Log the result and probably send it to the client, closing the stream.
     152         2032 :             pgb.handle_copy_stream_end(end).await;
     153            0 :         }
     154         2032 :         Ok(())
     155         2032 :     }
     156              : 
     157         2105 :     pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
     158         2105 :         &mut self,
     159         2105 :         pgb: &mut PostgresBackend<IO>,
     160         2105 :     ) -> Result<(), CopyStreamHandlerEnd> {
     161         2105 :         // Notify the libpq client that it's allowed to send `CopyData` messages
     162         2105 :         pgb.write_message(&BeMessage::CopyBothResponse).await?;
     163              : 
     164              :         // Experiments [1] confirm that doing network IO in one (this) thread and
     165              :         // processing with disc IO in another significantly improves
     166              :         // performance; we spawn off WalAcceptor thread for message processing
     167              :         // to this end.
     168              :         //
     169              :         // [1] https://github.com/neondatabase/neon/pull/1318
     170         2105 :         let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
     171         2105 :         let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
     172         2105 :         let mut acceptor_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
     173              : 
     174              :         // Concurrently receive and send data; replies are not synchronized with
     175              :         // sends, so this avoids deadlocks.
     176         2105 :         let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
     177         2105 :         let peer_addr = *pgb.get_peer_addr();
     178         2105 :         let network_reader = NetworkReader {
     179         2105 :             ttid: self.ttid,
     180         2105 :             conn_id: self.conn_id,
     181         2105 :             pgb_reader: &mut pgb_reader,
     182         2105 :             peer_addr,
     183         2105 :             acceptor_handle: &mut acceptor_handle,
     184         2105 :         };
     185         2105 :         let res = tokio::select! {
     186              :             // todo: add read|write .context to these errors
     187         2032 :             r = network_reader.run(msg_tx, msg_rx, reply_tx) => r,
     188            0 :             r = network_write(pgb, reply_rx) => r,
     189              :         };
     190              : 
     191              :         // Join pg backend back.
     192         2032 :         pgb.unsplit(pgb_reader)?;
     193              : 
     194              :         // Join the spawned WalAcceptor. At this point chans to/from it passed
     195              :         // to network routines are dropped, so it will exit as soon as it
     196              :         // touches them.
     197         2032 :         match acceptor_handle {
     198              :             None => {
     199              :                 // failed even before spawning; read_network should have error
     200            0 :                 Err(res.expect_err("no error with WalAcceptor not spawn"))
     201              :             }
     202         2032 :             Some(handle) => {
     203         2037 :                 let wal_acceptor_res = handle.await;
     204              : 
     205              :                 // If there was any network error, return it.
     206         2032 :                 res?;
     207              : 
     208              :                 // Otherwise, WalAcceptor thread must have errored.
     209            0 :                 match wal_acceptor_res {
     210            0 :                     Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination
     211            0 :                     Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
     212            0 :                     Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
     213            0 :                         "WalAcceptor task panicked",
     214            0 :                     ))),
     215              :                 }
     216              :             }
     217              :         }
     218         2032 :     }
     219              : }
     220              : 
     221              : struct NetworkReader<'a, IO> {
     222              :     ttid: TenantTimelineId,
     223              :     conn_id: ConnectionId,
     224              :     pgb_reader: &'a mut PostgresBackendReader<IO>,
     225              :     peer_addr: SocketAddr,
     226              :     // WalAcceptor is spawned when we learn server info from walproposer and
     227              :     // create timeline; handle is put here.
     228              :     acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
     229              : }
     230              : 
     231              : impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
     232         2105 :     async fn run(
     233         2105 :         self,
     234         2105 :         msg_tx: Sender<ProposerAcceptorMessage>,
     235         2105 :         msg_rx: Receiver<ProposerAcceptorMessage>,
     236         2105 :         reply_tx: Sender<AcceptorProposerMessage>,
     237         2105 :     ) -> Result<(), CopyStreamHandlerEnd> {
     238              :         // Receive information about server to create timeline, if not yet.
     239         2105 :         let next_msg = read_message(self.pgb_reader).await?;
     240         2105 :         let tli = match next_msg {
     241         2105 :             ProposerAcceptorMessage::Greeting(ref greeting) => {
     242         2105 :                 info!(
     243         2105 :                     "start handshake with walproposer {} sysid {} timeline {}",
     244         2105 :                     self.peer_addr, greeting.system_id, greeting.tli,
     245         2105 :                 );
     246         2105 :                 let server_info = ServerInfo {
     247         2105 :                     pg_version: greeting.pg_version,
     248         2105 :                     system_id: greeting.system_id,
     249         2105 :                     wal_seg_size: greeting.wal_seg_size,
     250         2105 :                 };
     251         2756 :                 GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID).await?
     252              :             }
     253              :             _ => {
     254            0 :                 return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
     255            0 :                     "unexpected message {next_msg:?} instead of greeting"
     256            0 :                 )))
     257              :             }
     258              :         };
     259              : 
     260         2105 :         *self.acceptor_handle = Some(WalAcceptor::spawn(
     261         2105 :             tli.clone(),
     262         2105 :             msg_rx,
     263         2105 :             reply_tx,
     264         2105 :             self.conn_id,
     265         2105 :         ));
     266         2105 : 
     267         2105 :         // Forward all messages to WalAcceptor
     268      4541376 :         read_network_loop(self.pgb_reader, msg_tx, next_msg).await
     269         2032 :     }
     270              : }
     271              : 
     272              : /// Read next message from walproposer.
     273              : /// TODO: Return Ok(None) on graceful termination.
     274      2650523 : async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
     275      2650523 :     pgb_reader: &mut PostgresBackendReader<IO>,
     276      2650523 : ) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
     277      4543441 :     let copy_data = pgb_reader.read_copy_message().await?;
     278      2648418 :     let msg = ProposerAcceptorMessage::parse(copy_data)?;
     279      2648418 :     Ok(msg)
     280      2650450 : }
     281              : 
     282         2105 : async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
     283         2105 :     pgb_reader: &mut PostgresBackendReader<IO>,
     284         2105 :     msg_tx: Sender<ProposerAcceptorMessage>,
     285         2105 :     mut next_msg: ProposerAcceptorMessage,
     286         2105 : ) -> Result<(), CopyStreamHandlerEnd> {
     287              :     loop {
     288      2648418 :         if msg_tx.send(next_msg).await.is_err() {
     289            0 :             return Ok(()); // chan closed, WalAcceptor terminated
     290      2648418 :         }
     291      4541336 :         next_msg = read_message(pgb_reader).await?;
     292              :     }
     293         2032 : }
     294              : 
     295              : /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
     296              : /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
     297              : /// tell the error.
     298         2105 : async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
     299         2105 :     pgb_writer: &mut PostgresBackend<IO>,
     300         2105 :     mut reply_rx: Receiver<AcceptorProposerMessage>,
     301         2105 : ) -> Result<(), CopyStreamHandlerEnd> {
     302         2105 :     let mut buf = BytesMut::with_capacity(128);
     303              : 
     304              :     loop {
     305      4545217 :         match reply_rx.recv().await {
     306      2204482 :             Some(msg) => {
     307      2204482 :                 buf.clear();
     308      2204482 :                 msg.serialize(&mut buf)?;
     309      2204482 :                 pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
     310              :             }
     311            0 :             None => return Ok(()), // chan closed, WalAcceptor terminated
     312              :         }
     313              :     }
     314            0 : }
     315              : 
     316              : // Send keepalive messages to walproposer, to make sure it receives updates
     317              : // even when it writes a steady stream of messages.
     318              : const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
     319              : 
     320              : /// Takes messages from msg_rx, processes and pushes replies to reply_tx.
     321              : struct WalAcceptor {
     322              :     tli: Arc<Timeline>,
     323              :     msg_rx: Receiver<ProposerAcceptorMessage>,
     324              :     reply_tx: Sender<AcceptorProposerMessage>,
     325              : }
     326              : 
     327              : impl WalAcceptor {
     328              :     /// Spawn thread with WalAcceptor running, return handle to it.
     329         2105 :     fn spawn(
     330         2105 :         tli: Arc<Timeline>,
     331         2105 :         msg_rx: Receiver<ProposerAcceptorMessage>,
     332         2105 :         reply_tx: Sender<AcceptorProposerMessage>,
     333         2105 :         conn_id: ConnectionId,
     334         2105 :     ) -> JoinHandle<anyhow::Result<()>> {
     335         2105 :         task::spawn(async move {
     336         2105 :             let mut wa = WalAcceptor {
     337         2105 :                 tli,
     338         2105 :                 msg_rx,
     339         2105 :                 reply_tx,
     340         2105 :             };
     341         2105 : 
     342         2105 :             let span_ttid = wa.tli.ttid; // satisfy borrow checker
     343         2105 :             wa.run()
     344         2105 :                 .instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid))
     345      6971489 :                 .await
     346         2105 :         })
     347         2105 :     }
     348              : 
     349              :     /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
     350              :     /// it must mean that network thread terminated.
     351         2105 :     async fn run(&mut self) -> anyhow::Result<()> {
     352         2105 :         // Register the connection and defer unregister.
     353         2105 :         // Order of the next two lines is important: we want first to remove our entry and then
     354         2105 :         // update status which depends on registered connections.
     355         2105 :         let _compute_conn_guard = ComputeConnectionGuard {
     356         2105 :             timeline: Arc::clone(&self.tli),
     357         2105 :         };
     358         2105 :         let walreceiver_guard = self.tli.get_walreceivers().register();
     359         2105 :         self.tli.update_status_notify().await?;
     360              : 
     361              :         // After this timestamp we will stop processing AppendRequests and send a response
     362              :         // to the walproposer. walproposer sends at least one AppendRequest per second,
     363              :         // we will send keepalives by replying to these requests once per second.
     364         2105 :         let mut next_keepalive = Instant::now();
     365              : 
     366              :         loop {
     367      2207483 :             let opt_msg = self.msg_rx.recv().await;
     368      2207410 :             if opt_msg.is_none() {
     369         2024 :                 return Ok(()); // chan closed, streaming terminated
     370      2205386 :             }
     371      2205386 :             let mut next_msg = opt_msg.unwrap();
     372      2205386 : 
     373      2205386 :             // Update walreceiver state in shmem for reporting.
     374      2205386 :             if let ProposerAcceptorMessage::Elected(_) = &next_msg {
     375          968 :                 *walreceiver_guard.get() = WalReceiverState::Streaming;
     376      2204418 :             }
     377              : 
     378      2205386 :             let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
     379              :                 // loop through AppendRequest's while it's readily available to
     380              :                 // write as many WAL as possible without fsyncing
     381              :                 //
     382              :                 // Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
     383              :                 // Otherwise, we might end up in a situation where we read a message, but don't
     384              :                 // process it.
     385      2643242 :                 while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
     386      2643242 :                     let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
     387              : 
     388      3045750 :                     if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
     389           72 :                         if self.reply_tx.send(reply).await.is_err() {
     390            0 :                             return Ok(()); // chan closed, streaming terminated
     391           72 :                         }
     392      2643170 :                     }
     393              : 
     394              :                     // get out of this loop if keepalive time is reached
     395      2643242 :                     if Instant::now() >= next_keepalive {
     396          998 :                         break;
     397      2642244 :                     }
     398      2642244 : 
     399      2642244 :                     match self.msg_rx.try_recv() {
     400       443032 :                         Ok(msg) => next_msg = msg,
     401      2199210 :                         Err(TryRecvError::Empty) => break,
     402            2 :                         Err(TryRecvError::Disconnected) => return Ok(()), // chan closed, streaming terminated
     403              :                     }
     404              :                 }
     405              : 
     406              :                 // flush all written WAL to the disk
     407      2200208 :                 self.tli
     408      2200208 :                     .process_msg(&ProposerAcceptorMessage::FlushWAL)
     409          325 :                     .await?
     410              :             } else {
     411              :                 // process message other than AppendRequest
     412      1734650 :                 self.tli.process_msg(&next_msg).await?
     413              :             };
     414              : 
     415      2205384 :             if let Some(reply) = reply_msg {
     416      2204416 :                 if self.reply_tx.send(reply).await.is_err() {
     417            6 :                     return Ok(()); // chan closed, streaming terminated
     418      2204410 :                 }
     419      2204410 :                 // reset keepalive time
     420      2204410 :                 next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
     421          968 :             }
     422              :         }
     423         2032 :     }
     424              : }
     425              : 
     426              : /// Calls update_status_notify in drop to update timeline status.
     427              : struct ComputeConnectionGuard {
     428              :     timeline: Arc<Timeline>,
     429              : }
     430              : 
     431              : impl Drop for ComputeConnectionGuard {
     432         2032 :     fn drop(&mut self) {
     433         2032 :         let tli = self.timeline.clone();
     434         2032 :         tokio::spawn(async move {
     435         2032 :             if let Err(e) = tli.update_status_notify().await {
     436            0 :                 error!("failed to update timeline status: {}", e);
     437         2032 :             }
     438         2032 :         });
     439         2032 :     }
     440              : }
        

Generated by: LCOV version 2.1-beta