LCOV - code coverage report
Current view: top level - safekeeper/src - receive_wal.rs (source / functions) Coverage Total Hit
Test: c639aa5f7ab62b43d647b10f40d15a15686ce8a9.info Lines: 94.2 % 241 227
Test Date: 2024-02-12 20:26:03 Functions: 60.3 % 73 44

            Line data    Source code
       1              : //! Safekeeper communication endpoint to WAL proposer (compute node).
       2              : //! Gets messages from the network, passes them down to consensus module and
       3              : //! sends replies back.
       4              : 
       5              : use crate::handler::SafekeeperPostgresHandler;
       6              : use crate::safekeeper::AcceptorProposerMessage;
       7              : use crate::safekeeper::ProposerAcceptorMessage;
       8              : use crate::safekeeper::ServerInfo;
       9              : use crate::timeline::Timeline;
      10              : use crate::wal_service::ConnectionId;
      11              : use crate::GlobalTimelines;
      12              : use anyhow::{anyhow, Context};
      13              : use bytes::BytesMut;
      14              : use parking_lot::MappedMutexGuard;
      15              : use parking_lot::Mutex;
      16              : use parking_lot::MutexGuard;
      17              : use postgres_backend::CopyStreamHandlerEnd;
      18              : use postgres_backend::PostgresBackend;
      19              : use postgres_backend::PostgresBackendReader;
      20              : use postgres_backend::QueryError;
      21              : use pq_proto::BeMessage;
      22              : use serde::Deserialize;
      23              : use serde::Serialize;
      24              : use std::net::SocketAddr;
      25              : use std::sync::Arc;
      26              : use tokio::io::AsyncRead;
      27              : use tokio::io::AsyncWrite;
      28              : use tokio::sync::mpsc::channel;
      29              : use tokio::sync::mpsc::error::TryRecvError;
      30              : use tokio::sync::mpsc::Receiver;
      31              : use tokio::sync::mpsc::Sender;
      32              : use tokio::task;
      33              : use tokio::task::JoinHandle;
      34              : use tokio::time::Duration;
      35              : use tokio::time::Instant;
      36              : use tracing::*;
      37              : use utils::id::TenantTimelineId;
      38              : use utils::lsn::Lsn;
      39              : 
      40              : /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
      41              : /// in Arc).
      42              : pub struct WalReceivers {
      43              :     mutex: Mutex<WalReceiversShared>,
      44              : }
      45              : 
      46              : /// Id under which walreceiver is registered in shmem.
      47              : type WalReceiverId = usize;
      48              : 
      49              : impl WalReceivers {
      50          612 :     pub fn new() -> Arc<WalReceivers> {
      51          612 :         Arc::new(WalReceivers {
      52          612 :             mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
      53          612 :         })
      54          612 :     }
      55              : 
      56              :     /// Register new walreceiver. Returned guard provides access to the slot and
      57              :     /// automatically deregisters in Drop.
      58         1849 :     pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
      59         1849 :         let slots = &mut self.mutex.lock().slots;
      60         1849 :         let walreceiver = WalReceiverState {
      61         1849 :             conn_id,
      62         1849 :             status: WalReceiverStatus::Voting,
      63         1849 :         };
      64              :         // find empty slot or create new one
      65         1849 :         let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
      66         1293 :             slots[pos] = Some(walreceiver);
      67         1293 :             pos
      68              :         } else {
      69          556 :             let pos = slots.len();
      70          556 :             slots.push(Some(walreceiver));
      71          556 :             pos
      72              :         };
      73         1849 :         WalReceiverGuard {
      74         1849 :             id: pos,
      75         1849 :             walreceivers: self.clone(),
      76         1849 :         }
      77         1849 :     }
      78              : 
      79              :     /// Get reference to locked slot contents. Slot must exist (registered
      80              :     /// earlier).
      81          864 :     fn get_slot<'a>(
      82          864 :         self: &'a Arc<WalReceivers>,
      83          864 :         id: WalReceiverId,
      84          864 :     ) -> MappedMutexGuard<'a, WalReceiverState> {
      85          864 :         MutexGuard::map(self.mutex.lock(), |locked| {
      86          864 :             locked.slots[id]
      87          864 :                 .as_mut()
      88          864 :                 .expect("walreceiver doesn't exist")
      89          864 :         })
      90          864 :     }
      91              : 
      92              :     /// Get number of walreceivers (compute connections).
      93        18832 :     pub fn get_num(self: &Arc<WalReceivers>) -> usize {
      94        18832 :         self.mutex.lock().slots.iter().flatten().count()
      95        18832 :     }
      96              : 
      97              :     /// Get state of all walreceivers.
      98          255 :     pub fn get_all(self: &Arc<WalReceivers>) -> Vec<WalReceiverState> {
      99          255 :         self.mutex.lock().slots.iter().flatten().cloned().collect()
     100          255 :     }
     101              : 
     102              :     /// Get number of streaming walreceivers (normally 0 or 1) from compute.
     103            3 :     pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
     104            3 :         self.mutex
     105            3 :             .lock()
     106            3 :             .slots
     107            3 :             .iter()
     108            3 :             .flatten()
     109            3 :             // conn_id.is_none skips recovery which also registers here
     110            3 :             .filter(|s| s.conn_id.is_some() && matches!(s.status, WalReceiverStatus::Streaming))
     111            3 :             .count()
     112            3 :     }
     113              : 
     114              :     /// Unregister walreceiver.
     115         1777 :     fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
     116         1777 :         let mut shared = self.mutex.lock();
     117         1777 :         shared.slots[id] = None;
     118         1777 :     }
     119              : }
     120              : 
     121              : /// Only a few connections are expected (normally one), so store in Vec.
     122              : struct WalReceiversShared {
     123              :     slots: Vec<Option<WalReceiverState>>,
     124              : }
     125              : 
     126          171 : #[derive(Debug, Clone, Serialize, Deserialize)]
     127              : pub struct WalReceiverState {
     128              :     /// None means it is recovery initiated by us (this safekeeper).
     129              :     pub conn_id: Option<ConnectionId>,
     130              :     pub status: WalReceiverStatus,
     131              : }
     132              : 
     133              : /// Walreceiver status. Currently only whether it passed voting stage and
     134              : /// started receiving the stream, but it is easy to add more if needed.
     135          171 : #[derive(Debug, Clone, Serialize, Deserialize)]
     136              : pub enum WalReceiverStatus {
     137              :     Voting,
     138              :     Streaming,
     139              : }
     140              : 
     141              : /// Scope guard to access slot in WalReceivers registry and unregister from
     142              : /// it in Drop.
     143              : pub struct WalReceiverGuard {
     144              :     id: WalReceiverId,
     145              :     walreceivers: Arc<WalReceivers>,
     146              : }
     147              : 
     148              : impl WalReceiverGuard {
     149              :     /// Get reference to locked shared state contents.
     150          864 :     fn get(&self) -> MappedMutexGuard<WalReceiverState> {
     151          864 :         self.walreceivers.get_slot(self.id)
     152          864 :     }
     153              : }
     154              : 
     155              : impl Drop for WalReceiverGuard {
     156         1777 :     fn drop(&mut self) {
     157         1777 :         self.walreceivers.unregister(self.id);
     158         1777 :     }
     159              : }
     160              : 
     161              : pub const MSG_QUEUE_SIZE: usize = 256;
     162              : pub const REPLY_QUEUE_SIZE: usize = 16;
     163              : 
     164              : impl SafekeeperPostgresHandler {
     165              :     /// Wrapper around handle_start_wal_push_guts handling result. Error is
     166              :     /// handled here while we're still in walreceiver ttid span; with API
     167              :     /// extension, this can probably be moved into postgres_backend.
     168         1848 :     pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
     169         1848 :         &mut self,
     170         1848 :         pgb: &mut PostgresBackend<IO>,
     171         1848 :     ) -> Result<(), QueryError> {
     172      3534950 :         if let Err(end) = self.handle_start_wal_push_guts(pgb).await {
     173              :             // Log the result and probably send it to the client, closing the stream.
     174         1776 :             pgb.handle_copy_stream_end(end).await;
     175            0 :         }
     176         1776 :         Ok(())
     177         1776 :     }
     178              : 
     179         1848 :     pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
     180         1848 :         &mut self,
     181         1848 :         pgb: &mut PostgresBackend<IO>,
     182         1848 :     ) -> Result<(), CopyStreamHandlerEnd> {
     183         1848 :         // Notify the libpq client that it's allowed to send `CopyData` messages
     184         1848 :         pgb.write_message(&BeMessage::CopyBothResponse).await?;
     185              : 
     186              :         // Experiments [1] confirm that doing network IO in one (this) thread and
     187              :         // processing with disc IO in another significantly improves
     188              :         // performance; we spawn off WalAcceptor thread for message processing
     189              :         // to this end.
     190              :         //
     191              :         // [1] https://github.com/neondatabase/neon/pull/1318
     192         1848 :         let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
     193         1848 :         let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
     194         1848 :         let mut acceptor_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
     195              : 
     196              :         // Concurrently receive and send data; replies are not synchronized with
     197              :         // sends, so this avoids deadlocks.
     198         1848 :         let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
     199         1848 :         let peer_addr = *pgb.get_peer_addr();
     200         1848 :         let network_reader = NetworkReader {
     201         1848 :             ttid: self.ttid,
     202         1848 :             conn_id: self.conn_id,
     203         1848 :             pgb_reader: &mut pgb_reader,
     204         1848 :             peer_addr,
     205         1848 :             acceptor_handle: &mut acceptor_handle,
     206         1848 :         };
     207         1848 :         let res = tokio::select! {
     208              :             // todo: add read|write .context to these errors
     209         1774 :             r = network_reader.run(msg_tx, msg_rx, reply_tx) => r,
     210            2 :             r = network_write(pgb, reply_rx) => r,
     211              :         };
     212              : 
     213              :         // Join pg backend back.
     214         1776 :         pgb.unsplit(pgb_reader)?;
     215              : 
     216              :         // Join the spawned WalAcceptor. At this point chans to/from it passed
     217              :         // to network routines are dropped, so it will exit as soon as it
     218              :         // touches them.
     219         1776 :         match acceptor_handle {
     220              :             None => {
     221              :                 // failed even before spawning; read_network should have error
     222            0 :                 Err(res.expect_err("no error with WalAcceptor not spawn"))
     223              :             }
     224         1776 :             Some(handle) => {
     225         1781 :                 let wal_acceptor_res = handle.await;
     226              : 
     227              :                 // If there was any network error, return it.
     228         1776 :                 res?;
     229              : 
     230              :                 // Otherwise, WalAcceptor thread must have errored.
     231            2 :                 match wal_acceptor_res {
     232            0 :                     Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination
     233            2 :                     Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
     234            0 :                     Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
     235            0 :                         "WalAcceptor task panicked",
     236            0 :                     ))),
     237              :                 }
     238              :             }
     239              :         }
     240         1776 :     }
     241              : }
     242              : 
     243              : struct NetworkReader<'a, IO> {
     244              :     ttid: TenantTimelineId,
     245              :     conn_id: ConnectionId,
     246              :     pgb_reader: &'a mut PostgresBackendReader<IO>,
     247              :     peer_addr: SocketAddr,
     248              :     // WalAcceptor is spawned when we learn server info from walproposer and
     249              :     // create timeline; handle is put here.
     250              :     acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
     251              : }
     252              : 
     253              : impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
     254         1848 :     async fn run(
     255         1848 :         self,
     256         1848 :         msg_tx: Sender<ProposerAcceptorMessage>,
     257         1848 :         msg_rx: Receiver<ProposerAcceptorMessage>,
     258         1848 :         reply_tx: Sender<AcceptorProposerMessage>,
     259         1848 :     ) -> Result<(), CopyStreamHandlerEnd> {
     260              :         // Receive information about server to create timeline, if not yet.
     261         1848 :         let next_msg = read_message(self.pgb_reader).await?;
     262         1848 :         let tli = match next_msg {
     263         1848 :             ProposerAcceptorMessage::Greeting(ref greeting) => {
     264         1848 :                 info!(
     265         1848 :                     "start handshake with walproposer {} sysid {} timeline {}",
     266         1848 :                     self.peer_addr, greeting.system_id, greeting.tli,
     267         1848 :                 );
     268         1848 :                 let server_info = ServerInfo {
     269         1848 :                     pg_version: greeting.pg_version,
     270         1848 :                     system_id: greeting.system_id,
     271         1848 :                     wal_seg_size: greeting.wal_seg_size,
     272         1848 :                 };
     273         2631 :                 GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID).await?
     274              :             }
     275              :             _ => {
     276            0 :                 return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
     277            0 :                     "unexpected message {next_msg:?} instead of greeting"
     278            0 :                 )))
     279              :             }
     280              :         };
     281              : 
     282         1848 :         *self.acceptor_handle = Some(WalAcceptor::spawn(
     283         1848 :             tli.clone(),
     284         1848 :             msg_rx,
     285         1848 :             reply_tx,
     286         1848 :             Some(self.conn_id),
     287         1848 :         ));
     288         1848 : 
     289         1848 :         // Forward all messages to WalAcceptor
     290      3528694 :         read_network_loop(self.pgb_reader, msg_tx, next_msg).await
     291         1774 :     }
     292              : }
     293              : 
     294              : /// Read next message from walproposer.
     295              : /// TODO: Return Ok(None) on graceful termination.
     296      2678525 : async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
     297      2678525 :     pgb_reader: &mut PostgresBackendReader<IO>,
     298      2678525 : ) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
     299      3529950 :     let copy_data = pgb_reader.read_copy_message().await?;
     300      2676677 :     let msg = ProposerAcceptorMessage::parse(copy_data)?;
     301      2676677 :     Ok(msg)
     302      2678451 : }
     303              : 
     304         1848 : async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
     305         1848 :     pgb_reader: &mut PostgresBackendReader<IO>,
     306         1848 :     msg_tx: Sender<ProposerAcceptorMessage>,
     307         1848 :     mut next_msg: ProposerAcceptorMessage,
     308         1848 : ) -> Result<(), CopyStreamHandlerEnd> {
     309              :     loop {
     310      2676677 :         if msg_tx.send(next_msg).await.is_err() {
     311            0 :             return Ok(()); // chan closed, WalAcceptor terminated
     312      2676677 :         }
     313      3528108 :         next_msg = read_message(pgb_reader).await?;
     314              :     }
     315         1774 : }
     316              : 
     317              : /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
     318              : /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
     319              : /// tell the error.
     320         1848 : async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
     321         1848 :     pgb_writer: &mut PostgresBackend<IO>,
     322         1848 :     mut reply_rx: Receiver<AcceptorProposerMessage>,
     323         1848 : ) -> Result<(), CopyStreamHandlerEnd> {
     324         1848 :     let mut buf = BytesMut::with_capacity(128);
     325              : 
     326              :     loop {
     327      3532316 :         match reply_rx.recv().await {
     328      1663975 :             Some(msg) => {
     329      1663975 :                 buf.clear();
     330      1663975 :                 msg.serialize(&mut buf)?;
     331      1663975 :                 pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
     332              :             }
     333            2 :             None => return Ok(()), // chan closed, WalAcceptor terminated
     334              :         }
     335              :     }
     336            2 : }
     337              : 
     338              : // Send keepalive messages to walproposer, to make sure it receives updates
     339              : // even when it writes a steady stream of messages.
     340              : const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
     341              : 
     342              : /// Encapsulates a task which takes messages from msg_rx, processes and pushes
     343              : /// replies to reply_tx; reading from socket and writing to disk in parallel is
     344              : /// beneficial for performance, this struct provides writing to disk part.
     345              : pub struct WalAcceptor {
     346              :     tli: Arc<Timeline>,
     347              :     msg_rx: Receiver<ProposerAcceptorMessage>,
     348              :     reply_tx: Sender<AcceptorProposerMessage>,
     349              :     conn_id: Option<ConnectionId>,
     350              : }
     351              : 
     352              : impl WalAcceptor {
     353              :     /// Spawn task with WalAcceptor running, return handle to it. Task returns
     354              :     /// Ok(()) if either of channels has closed, and Err if any error during
     355              :     /// message processing is encountered.
     356              :     ///
     357              :     /// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
     358         1849 :     pub fn spawn(
     359         1849 :         tli: Arc<Timeline>,
     360         1849 :         msg_rx: Receiver<ProposerAcceptorMessage>,
     361         1849 :         reply_tx: Sender<AcceptorProposerMessage>,
     362         1849 :         conn_id: Option<ConnectionId>,
     363         1849 :     ) -> JoinHandle<anyhow::Result<()>> {
     364         1849 :         task::spawn(async move {
     365         1849 :             let mut wa = WalAcceptor {
     366         1849 :                 tli,
     367         1849 :                 msg_rx,
     368         1849 :                 reply_tx,
     369         1849 :                 conn_id,
     370         1849 :             };
     371         1849 : 
     372         1849 :             let span_ttid = wa.tli.ttid; // satisfy borrow checker
     373         1849 :             wa.run()
     374         1849 :                 .instrument(
     375         1849 :                     info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
     376              :                 )
     377      6466674 :                 .await
     378         1849 :         })
     379         1849 :     }
     380              : 
     381              :     /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
     382              :     /// it must mean that network thread terminated.
     383         1849 :     async fn run(&mut self) -> anyhow::Result<()> {
     384         1849 :         // Register the connection and defer unregister.
     385         1849 :         // Order of the next two lines is important: we want first to remove our entry and then
     386         1849 :         // update status which depends on registered connections.
     387         1849 :         let _compute_conn_guard = ComputeConnectionGuard {
     388         1849 :             timeline: Arc::clone(&self.tli),
     389         1849 :         };
     390         1849 :         let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
     391         1849 :         self.tli.update_status_notify().await?;
     392              : 
     393              :         // After this timestamp we will stop processing AppendRequests and send a response
     394              :         // to the walproposer. walproposer sends at least one AppendRequest per second,
     395              :         // we will send keepalives by replying to these requests once per second.
     396         1849 :         let mut next_keepalive = Instant::now();
     397              : 
     398              :         loop {
     399      1666690 :             let opt_msg = self.msg_rx.recv().await;
     400      1666618 :             if opt_msg.is_none() {
     401         1767 :                 return Ok(()); // chan closed, streaming terminated
     402      1664851 :             }
     403      1664851 :             let mut next_msg = opt_msg.unwrap();
     404      1664851 : 
     405      1664851 :             // Update walreceiver state in shmem for reporting.
     406      1664851 :             if let ProposerAcceptorMessage::Elected(_) = &next_msg {
     407          864 :                 walreceiver_guard.get().status = WalReceiverStatus::Streaming;
     408      1663987 :             }
     409              : 
     410      1664851 :             let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
     411              :                 // loop through AppendRequest's while it's readily available to
     412              :                 // write as many WAL as possible without fsyncing
     413              :                 //
     414              :                 // Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
     415              :                 // Otherwise, we might end up in a situation where we read a message, but don't
     416              :                 // process it.
     417      2672260 :                 while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
     418      2672260 :                     let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
     419              : 
     420      3210566 :                     if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
     421            0 :                         if self.reply_tx.send(reply).await.is_err() {
     422            0 :                             return Ok(()); // chan closed, streaming terminated
     423            0 :                         }
     424      2672258 :                     }
     425              : 
     426              :                     // get out of this loop if keepalive time is reached
     427      2672258 :                     if Instant::now() >= next_keepalive {
     428         1455 :                         break;
     429      2670803 :                     }
     430      2670803 : 
     431      2670803 :                     match self.msg_rx.try_recv() {
     432      1011965 :                         Ok(msg) => next_msg = msg,
     433      1658831 :                         Err(TryRecvError::Empty) => break,
     434            7 :                         Err(TryRecvError::Disconnected) => return Ok(()), // chan closed, streaming terminated
     435              :                     }
     436              :                 }
     437              : 
     438              :                 // flush all written WAL to the disk
     439      1660286 :                 self.tli
     440      1660286 :                     .process_msg(&ProposerAcceptorMessage::FlushWAL)
     441        38493 :                     .await?
     442              :             } else {
     443              :                 // process message other than AppendRequest
     444      1586737 :                 self.tli.process_msg(&next_msg).await?
     445              :             };
     446              : 
     447      1664842 :             if let Some(reply) = reply_msg {
     448      1663978 :                 if self.reply_tx.send(reply).await.is_err() {
     449            1 :                     return Ok(()); // chan closed, streaming terminated
     450      1663977 :                 }
     451      1663977 :                 // reset keepalive time
     452      1663977 :                 next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
     453          864 :             }
     454              :         }
     455         1777 :     }
     456              : }
     457              : 
     458              : /// Calls update_status_notify in drop to update timeline status.
     459              : struct ComputeConnectionGuard {
     460              :     timeline: Arc<Timeline>,
     461              : }
     462              : 
     463              : impl Drop for ComputeConnectionGuard {
     464         1777 :     fn drop(&mut self) {
     465         1777 :         let tli = self.timeline.clone();
     466         1777 :         tokio::spawn(async move {
     467         1777 :             if let Err(e) = tli.update_status_notify().await {
     468            0 :                 error!("failed to update timeline status: {}", e);
     469         1777 :             }
     470         1777 :         });
     471         1777 :     }
     472              : }
        

Generated by: LCOV version 2.1-beta