LCOV - code coverage report
Current view: top level - safekeeper/src - receive_wal.rs (source / functions) Coverage Total Hit
Test: 5445d246133daeceb0507e6cc0797ab7c1c70cb8.info Lines: 36.4 % 365 133
Test Date: 2025-03-12 18:05:02 Functions: 25.5 % 51 13

            Line data    Source code
       1              : //! Safekeeper communication endpoint to WAL proposer (compute node).
       2              : //! Gets messages from the network, passes them down to consensus module and
       3              : //! sends replies back.
       4              : 
       5              : use std::future;
       6              : use std::net::SocketAddr;
       7              : use std::sync::Arc;
       8              : 
       9              : use anyhow::{Context, anyhow};
      10              : use bytes::BytesMut;
      11              : use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
      12              : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend, PostgresBackendReader, QueryError};
      13              : use pq_proto::BeMessage;
      14              : use safekeeper_api::ServerInfo;
      15              : use safekeeper_api::membership::Configuration;
      16              : use safekeeper_api::models::{ConnectionId, WalReceiverState, WalReceiverStatus};
      17              : use tokio::io::{AsyncRead, AsyncWrite};
      18              : use tokio::sync::mpsc::error::SendTimeoutError;
      19              : use tokio::sync::mpsc::{Receiver, Sender, channel};
      20              : use tokio::task;
      21              : use tokio::task::JoinHandle;
      22              : use tokio::time::{Duration, Instant, MissedTickBehavior};
      23              : use tracing::*;
      24              : use utils::id::TenantTimelineId;
      25              : use utils::lsn::Lsn;
      26              : use utils::pageserver_feedback::PageserverFeedback;
      27              : 
      28              : use crate::GlobalTimelines;
      29              : use crate::handler::SafekeeperPostgresHandler;
      30              : use crate::metrics::{
      31              :     WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL, WAL_RECEIVER_QUEUE_SIZE_TOTAL,
      32              :     WAL_RECEIVERS,
      33              : };
      34              : use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage};
      35              : use crate::timeline::WalResidentTimeline;
      36              : 
      37              : const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
      38              : 
      39              : /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
      40              : /// in Arc).
      41              : pub struct WalReceivers {
      42              :     mutex: Mutex<WalReceiversShared>,
      43              :     pageserver_feedback_tx: tokio::sync::broadcast::Sender<PageserverFeedback>,
      44              : 
      45              :     num_computes_tx: tokio::sync::watch::Sender<usize>,
      46              :     num_computes_rx: tokio::sync::watch::Receiver<usize>,
      47              : }
      48              : 
      49              : /// Id under which walreceiver is registered in shmem.
      50              : type WalReceiverId = usize;
      51              : 
      52              : impl WalReceivers {
      53            5 :     pub fn new() -> Arc<WalReceivers> {
      54            5 :         let (pageserver_feedback_tx, _) =
      55            5 :             tokio::sync::broadcast::channel(DEFAULT_FEEDBACK_CAPACITY);
      56            5 : 
      57            5 :         let (num_computes_tx, num_computes_rx) = tokio::sync::watch::channel(0usize);
      58            5 : 
      59            5 :         Arc::new(WalReceivers {
      60            5 :             mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
      61            5 :             pageserver_feedback_tx,
      62            5 :             num_computes_tx,
      63            5 :             num_computes_rx,
      64            5 :         })
      65            5 :     }
      66              : 
      67              :     /// Register new walreceiver. Returned guard provides access to the slot and
      68              :     /// automatically deregisters in Drop.
      69            5 :     pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
      70            5 :         let mut shared = self.mutex.lock();
      71            5 :         let slots = &mut shared.slots;
      72            5 :         let walreceiver = WalReceiverState {
      73            5 :             conn_id,
      74            5 :             status: WalReceiverStatus::Voting,
      75            5 :         };
      76              :         // find empty slot or create new one
      77            5 :         let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
      78            0 :             slots[pos] = Some(walreceiver);
      79            0 :             pos
      80              :         } else {
      81            5 :             let pos = slots.len();
      82            5 :             slots.push(Some(walreceiver));
      83            5 :             pos
      84              :         };
      85              : 
      86            5 :         self.update_num(&shared);
      87            5 :         WAL_RECEIVERS.inc();
      88            5 : 
      89            5 :         WalReceiverGuard {
      90            5 :             id: pos,
      91            5 :             walreceivers: self.clone(),
      92            5 :         }
      93            5 :     }
      94              : 
      95              :     /// Get reference to locked slot contents. Slot must exist (registered
      96              :     /// earlier).
      97            0 :     fn get_slot<'a>(
      98            0 :         self: &'a Arc<WalReceivers>,
      99            0 :         id: WalReceiverId,
     100            0 :     ) -> MappedMutexGuard<'a, WalReceiverState> {
     101            0 :         MutexGuard::map(self.mutex.lock(), |locked| {
     102            0 :             locked.slots[id]
     103            0 :                 .as_mut()
     104            0 :                 .expect("walreceiver doesn't exist")
     105            0 :         })
     106            0 :     }
     107              : 
     108              :     /// Get number of walreceivers (compute connections).
     109            0 :     pub fn get_num(self: &Arc<WalReceivers>) -> usize {
     110            0 :         self.mutex.lock().get_num()
     111            0 :     }
     112              : 
     113              :     /// Get channel for number of walreceivers.
     114            5 :     pub fn get_num_rx(self: &Arc<WalReceivers>) -> tokio::sync::watch::Receiver<usize> {
     115            5 :         self.num_computes_rx.clone()
     116            5 :     }
     117              : 
     118              :     /// Should get called after every update of slots.
     119           10 :     fn update_num(self: &Arc<WalReceivers>, shared: &MutexGuard<WalReceiversShared>) {
     120           10 :         let num = shared.get_num();
     121           10 :         self.num_computes_tx.send_replace(num);
     122           10 :     }
     123              : 
     124              :     /// Get state of all walreceivers.
     125            0 :     pub fn get_all(self: &Arc<WalReceivers>) -> Vec<WalReceiverState> {
     126            0 :         self.mutex.lock().slots.iter().flatten().cloned().collect()
     127            0 :     }
     128              : 
     129              :     /// Get number of streaming walreceivers (normally 0 or 1) from compute.
     130            5 :     pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
     131            5 :         self.mutex
     132            5 :             .lock()
     133            5 :             .slots
     134            5 :             .iter()
     135            5 :             .flatten()
     136            5 :             // conn_id.is_none skips recovery which also registers here
     137            5 :             .filter(|s| s.conn_id.is_some() && matches!(s.status, WalReceiverStatus::Streaming))
     138            5 :             .count()
     139            5 :     }
     140              : 
     141              :     /// Unregister walreceiver.
     142            5 :     fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
     143            5 :         let mut shared = self.mutex.lock();
     144            5 :         shared.slots[id] = None;
     145            5 :         self.update_num(&shared);
     146            5 :         WAL_RECEIVERS.dec();
     147            5 :     }
     148              : 
     149              :     /// Broadcast pageserver feedback to connected walproposers.
     150            0 :     pub fn broadcast_pageserver_feedback(&self, feedback: PageserverFeedback) {
     151            0 :         // Err means there is no subscribers, it is fine.
     152            0 :         let _ = self.pageserver_feedback_tx.send(feedback);
     153            0 :     }
     154              : }
     155              : 
     156              : /// Only a few connections are expected (normally one), so store in Vec.
     157              : struct WalReceiversShared {
     158              :     slots: Vec<Option<WalReceiverState>>,
     159              : }
     160              : 
     161              : impl WalReceiversShared {
     162              :     /// Get number of walreceivers (compute connections).
     163           10 :     fn get_num(&self) -> usize {
     164           10 :         self.slots.iter().flatten().count()
     165           10 :     }
     166              : }
     167              : 
     168              : /// Scope guard to access slot in WalReceivers registry and unregister from
     169              : /// it in Drop.
     170              : pub struct WalReceiverGuard {
     171              :     id: WalReceiverId,
     172              :     walreceivers: Arc<WalReceivers>,
     173              : }
     174              : 
     175              : impl WalReceiverGuard {
     176              :     /// Get reference to locked shared state contents.
     177            0 :     fn get(&self) -> MappedMutexGuard<WalReceiverState> {
     178            0 :         self.walreceivers.get_slot(self.id)
     179            0 :     }
     180              : }
     181              : 
     182              : impl Drop for WalReceiverGuard {
     183            5 :     fn drop(&mut self) {
     184            5 :         self.walreceivers.unregister(self.id);
     185            5 :     }
     186              : }
     187              : 
     188              : pub const MSG_QUEUE_SIZE: usize = 256;
     189              : pub const REPLY_QUEUE_SIZE: usize = 16;
     190              : 
     191              : impl SafekeeperPostgresHandler {
     192              :     /// Wrapper around handle_start_wal_push_guts handling result. Error is
     193              :     /// handled here while we're still in walreceiver ttid span; with API
     194              :     /// extension, this can probably be moved into postgres_backend.
     195            0 :     pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
     196            0 :         &mut self,
     197            0 :         pgb: &mut PostgresBackend<IO>,
     198            0 :         proto_version: u32,
     199            0 :         allow_timeline_creation: bool,
     200            0 :     ) -> Result<(), QueryError> {
     201            0 :         let mut tli: Option<WalResidentTimeline> = None;
     202            0 :         if let Err(end) = self
     203            0 :             .handle_start_wal_push_guts(pgb, &mut tli, proto_version, allow_timeline_creation)
     204            0 :             .await
     205              :         {
     206              :             // Log the result and probably send it to the client, closing the stream.
     207            0 :             let handle_end_fut = pgb.handle_copy_stream_end(end);
     208              :             // If we managed to create the timeline, augment logging with current LSNs etc.
     209            0 :             if let Some(tli) = tli {
     210            0 :                 let info = tli.get_safekeeper_info(&self.conf).await;
     211            0 :                 handle_end_fut
     212            0 :                     .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.commit_lsn)))
     213            0 :                     .await;
     214              :             } else {
     215            0 :                 handle_end_fut.await;
     216              :             }
     217            0 :         }
     218            0 :         Ok(())
     219            0 :     }
     220              : 
     221            0 :     pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
     222            0 :         &mut self,
     223            0 :         pgb: &mut PostgresBackend<IO>,
     224            0 :         tli: &mut Option<WalResidentTimeline>,
     225            0 :         proto_version: u32,
     226            0 :         allow_timeline_creation: bool,
     227            0 :     ) -> Result<(), CopyStreamHandlerEnd> {
     228            0 :         // The `tli` parameter is only used for passing _out_ a timeline, one should
     229            0 :         // not have been passed in.
     230            0 :         assert!(tli.is_none());
     231              : 
     232              :         // Notify the libpq client that it's allowed to send `CopyData` messages
     233            0 :         pgb.write_message(&BeMessage::CopyBothResponse).await?;
     234              : 
     235              :         // Experiments [1] confirm that doing network IO in one (this) thread and
     236              :         // processing with disc IO in another significantly improves
     237              :         // performance; we spawn off WalAcceptor thread for message processing
     238              :         // to this end.
     239              :         //
     240              :         // [1] https://github.com/neondatabase/neon/pull/1318
     241            0 :         let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
     242            0 :         let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
     243            0 :         let mut acceptor_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
     244              : 
     245              :         // Concurrently receive and send data; replies are not synchronized with
     246              :         // sends, so this avoids deadlocks.
     247            0 :         let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
     248            0 :         let peer_addr = *pgb.get_peer_addr();
     249            0 : 
     250            0 :         let mut network_reader = NetworkReader {
     251            0 :             ttid: self.ttid,
     252            0 :             conn_id: self.conn_id,
     253            0 :             pgb_reader: &mut pgb_reader,
     254            0 :             peer_addr,
     255            0 :             proto_version,
     256            0 :             acceptor_handle: &mut acceptor_handle,
     257            0 :             global_timelines: self.global_timelines.clone(),
     258            0 :         };
     259              : 
     260              :         // Read first message and create timeline if needed and allowed. This
     261              :         // won't be when timelines will be always created by storcon and
     262              :         // allow_timeline_creation becomes false.
     263            0 :         let res = network_reader
     264            0 :             .read_first_message(allow_timeline_creation)
     265            0 :             .await;
     266              : 
     267            0 :         let network_res = if let Ok((timeline, next_msg)) = res {
     268            0 :             let pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback> =
     269            0 :                 timeline
     270            0 :                     .get_walreceivers()
     271            0 :                     .pageserver_feedback_tx
     272            0 :                     .subscribe();
     273            0 :             *tli = Some(timeline.wal_residence_guard().await?);
     274              : 
     275            0 :             let timeline_cancel = timeline.cancel.clone();
     276            0 :             tokio::select! {
     277              :                 // todo: add read|write .context to these errors
     278            0 :                 r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline, next_msg) => r,
     279            0 :                 r = network_write(pgb, reply_rx, pageserver_feedback_rx, proto_version) => r,
     280            0 :                 _ = timeline_cancel.cancelled() => {
     281            0 :                     return Err(CopyStreamHandlerEnd::Cancelled);
     282              :                 }
     283              :             }
     284              :         } else {
     285            0 :             res.map(|_| ())
     286              :         };
     287              : 
     288              :         // Join pg backend back.
     289            0 :         pgb.unsplit(pgb_reader)?;
     290              : 
     291              :         // Join the spawned WalAcceptor. At this point chans to/from it passed
     292              :         // to network routines are dropped, so it will exit as soon as it
     293              :         // touches them.
     294            0 :         match acceptor_handle {
     295              :             None => {
     296              :                 // failed even before spawning; read_network should have error
     297            0 :                 Err(network_res.expect_err("no error with WalAcceptor not spawn"))
     298              :             }
     299            0 :             Some(handle) => {
     300            0 :                 let wal_acceptor_res = handle.await;
     301              : 
     302              :                 // If there was any network error, return it.
     303            0 :                 network_res?;
     304              : 
     305              :                 // Otherwise, WalAcceptor thread must have errored.
     306            0 :                 match wal_acceptor_res {
     307            0 :                     Ok(Ok(_)) => Ok(()), // Clean shutdown
     308            0 :                     Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
     309            0 :                     Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
     310            0 :                         "WalAcceptor task panicked",
     311            0 :                     ))),
     312              :                 }
     313              :             }
     314              :         }
     315            0 :     }
     316              : }
     317              : 
     318              : struct NetworkReader<'a, IO> {
     319              :     ttid: TenantTimelineId,
     320              :     conn_id: ConnectionId,
     321              :     pgb_reader: &'a mut PostgresBackendReader<IO>,
     322              :     peer_addr: SocketAddr,
     323              :     proto_version: u32,
     324              :     // WalAcceptor is spawned when we learn server info from walproposer and
     325              :     // create timeline; handle is put here.
     326              :     acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
     327              :     global_timelines: Arc<GlobalTimelines>,
     328              : }
     329              : 
     330              : impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
     331            0 :     async fn read_first_message(
     332            0 :         &mut self,
     333            0 :         allow_timeline_creation: bool,
     334            0 :     ) -> Result<(WalResidentTimeline, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
     335              :         // Receive information about server to create timeline, if not yet.
     336            0 :         let next_msg = read_message(self.pgb_reader, self.proto_version).await?;
     337            0 :         let tli = match next_msg {
     338            0 :             ProposerAcceptorMessage::Greeting(ref greeting) => {
     339            0 :                 info!(
     340            0 :                     "start handshake with walproposer {} sysid {}",
     341              :                     self.peer_addr, greeting.system_id,
     342              :                 );
     343            0 :                 let server_info = ServerInfo {
     344            0 :                     pg_version: greeting.pg_version,
     345            0 :                     system_id: greeting.system_id,
     346            0 :                     wal_seg_size: greeting.wal_seg_size,
     347            0 :                 };
     348            0 :                 let tli = if allow_timeline_creation {
     349            0 :                     self.global_timelines
     350            0 :                         .create(
     351            0 :                             self.ttid,
     352            0 :                             Configuration::empty(),
     353            0 :                             server_info,
     354            0 :                             Lsn::INVALID,
     355            0 :                             Lsn::INVALID,
     356            0 :                         )
     357            0 :                         .await
     358            0 :                         .context("create timeline")?
     359              :                 } else {
     360            0 :                     self.global_timelines
     361            0 :                         .get(self.ttid)
     362            0 :                         .context("get timeline")?
     363              :                 };
     364            0 :                 tli.wal_residence_guard().await?
     365              :             }
     366              :             _ => {
     367            0 :                 return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
     368            0 :                     "unexpected message {next_msg:?} instead of greeting"
     369            0 :                 )));
     370              :             }
     371              :         };
     372            0 :         Ok((tli, next_msg))
     373            0 :     }
     374              : 
     375              :     /// This function is cancellation-safe (only does network I/O and channel read/writes).
     376            0 :     async fn run(
     377            0 :         self,
     378            0 :         msg_tx: Sender<ProposerAcceptorMessage>,
     379            0 :         msg_rx: Receiver<ProposerAcceptorMessage>,
     380            0 :         reply_tx: Sender<AcceptorProposerMessage>,
     381            0 :         tli: WalResidentTimeline,
     382            0 :         next_msg: ProposerAcceptorMessage,
     383            0 :     ) -> Result<(), CopyStreamHandlerEnd> {
     384            0 :         *self.acceptor_handle = Some(WalAcceptor::spawn(
     385            0 :             tli,
     386            0 :             msg_rx,
     387            0 :             reply_tx,
     388            0 :             Some(self.conn_id),
     389            0 :         ));
     390            0 : 
     391            0 :         // Forward all messages to WalAcceptor
     392            0 :         read_network_loop(self.pgb_reader, msg_tx, next_msg, self.proto_version).await
     393            0 :     }
     394              : }
     395              : 
     396              : /// Read next message from walproposer.
     397              : /// TODO: Return Ok(None) on graceful termination.
     398            0 : async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
     399            0 :     pgb_reader: &mut PostgresBackendReader<IO>,
     400            0 :     proto_version: u32,
     401            0 : ) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
     402            0 :     let copy_data = pgb_reader.read_copy_message().await?;
     403            0 :     let msg = ProposerAcceptorMessage::parse(copy_data, proto_version)?;
     404            0 :     Ok(msg)
     405            0 : }
     406              : 
     407            0 : async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
     408            0 :     pgb_reader: &mut PostgresBackendReader<IO>,
     409            0 :     msg_tx: Sender<ProposerAcceptorMessage>,
     410            0 :     mut next_msg: ProposerAcceptorMessage,
     411            0 :     proto_version: u32,
     412            0 : ) -> Result<(), CopyStreamHandlerEnd> {
     413              :     /// Threshold for logging slow WalAcceptor sends.
     414              :     const SLOW_THRESHOLD: Duration = Duration::from_secs(5);
     415              : 
     416              :     loop {
     417            0 :         let started = Instant::now();
     418            0 :         let size = next_msg.size();
     419            0 : 
     420            0 :         match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await {
     421            0 :             Ok(()) => {}
     422              :             // Slow send, log a message and keep trying. Log context has timeline ID.
     423            0 :             Err(SendTimeoutError::Timeout(next_msg)) => {
     424            0 :                 warn!(
     425            0 :                     "slow WalAcceptor send blocked for {:.3}s",
     426            0 :                     Instant::now().duration_since(started).as_secs_f64()
     427              :                 );
     428            0 :                 if msg_tx.send(next_msg).await.is_err() {
     429            0 :                     return Ok(()); // WalAcceptor terminated
     430            0 :                 }
     431            0 :                 warn!(
     432            0 :                     "slow WalAcceptor send completed after {:.3}s",
     433            0 :                     Instant::now().duration_since(started).as_secs_f64()
     434              :                 )
     435              :             }
     436              :             // WalAcceptor terminated.
     437            0 :             Err(SendTimeoutError::Closed(_)) => return Ok(()),
     438              :         }
     439              : 
     440              :         // Update metrics. Will be decremented in WalAcceptor.
     441            0 :         WAL_RECEIVER_QUEUE_DEPTH_TOTAL.inc();
     442            0 :         WAL_RECEIVER_QUEUE_SIZE_TOTAL.add(size as i64);
     443              : 
     444            0 :         next_msg = read_message(pgb_reader, proto_version).await?;
     445              :     }
     446            0 : }
     447              : 
     448              : /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
     449              : /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
     450              : /// tell the error.
     451              : ///
     452              : /// This function is cancellation-safe (only does network I/O and channel read/writes).
     453            0 : async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
     454            0 :     pgb_writer: &mut PostgresBackend<IO>,
     455            0 :     mut reply_rx: Receiver<AcceptorProposerMessage>,
     456            0 :     mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
     457            0 :     proto_version: u32,
     458            0 : ) -> Result<(), CopyStreamHandlerEnd> {
     459            0 :     let mut buf = BytesMut::with_capacity(128);
     460            0 : 
     461            0 :     // storing append_response to inject PageserverFeedback into it
     462            0 :     let mut last_append_response = None;
     463              : 
     464              :     loop {
     465              :         // trying to read either AcceptorProposerMessage or PageserverFeedback
     466            0 :         let msg = tokio::select! {
     467            0 :             reply = reply_rx.recv() => {
     468            0 :                 if let Some(msg) = reply {
     469            0 :                     if let AcceptorProposerMessage::AppendResponse(append_response) = &msg {
     470            0 :                         last_append_response = Some(append_response.clone());
     471            0 :                     }
     472            0 :                     Some(msg)
     473              :                 } else {
     474            0 :                     return Ok(()); // chan closed, WalAcceptor terminated
     475              :                 }
     476              :             }
     477              : 
     478            0 :             feedback = pageserver_feedback_rx.recv() =>
     479            0 :                 match (feedback, &last_append_response) {
     480            0 :                     (Ok(feedback), Some(append_response)) => {
     481            0 :                         // clone AppendResponse and inject PageserverFeedback into it
     482            0 :                         let mut append_response = append_response.clone();
     483            0 :                         append_response.pageserver_feedback = Some(feedback);
     484            0 :                         Some(AcceptorProposerMessage::AppendResponse(append_response))
     485              :                     }
     486            0 :                     _ => None,
     487              :                 },
     488              :         };
     489              : 
     490            0 :         let Some(msg) = msg else {
     491            0 :             continue;
     492              :         };
     493              : 
     494            0 :         buf.clear();
     495            0 :         msg.serialize(&mut buf, proto_version)?;
     496            0 :         pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
     497              :     }
     498            0 : }
     499              : 
     500              : /// The WAL flush interval. This ensures we periodically flush the WAL and send AppendResponses to
     501              : /// walproposer, even when it's writing a steady stream of messages.
     502              : const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
     503              : 
     504              : /// The metrics computation interval.
     505              : ///
     506              : /// The Prometheus poll interval is 60 seconds at the time of writing. We sample the queue depth
     507              : /// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
     508              : const METRICS_INTERVAL: Duration = Duration::from_secs(5);
     509              : 
     510              : /// Encapsulates a task which takes messages from msg_rx, processes and pushes
     511              : /// replies to reply_tx.
     512              : ///
     513              : /// Reading from socket and writing to disk in parallel is beneficial for
     514              : /// performance, this struct provides the writing to disk part.
     515              : pub struct WalAcceptor {
     516              :     tli: WalResidentTimeline,
     517              :     msg_rx: Receiver<ProposerAcceptorMessage>,
     518              :     reply_tx: Sender<AcceptorProposerMessage>,
     519              :     conn_id: Option<ConnectionId>,
     520              : }
     521              : 
     522              : impl WalAcceptor {
     523              :     /// Spawn task with WalAcceptor running, return handle to it. Task returns
     524              :     /// Ok(()) if either of channels has closed, and Err if any error during
     525              :     /// message processing is encountered.
     526              :     ///
     527              :     /// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
     528            5 :     pub fn spawn(
     529            5 :         tli: WalResidentTimeline,
     530            5 :         msg_rx: Receiver<ProposerAcceptorMessage>,
     531            5 :         reply_tx: Sender<AcceptorProposerMessage>,
     532            5 :         conn_id: Option<ConnectionId>,
     533            5 :     ) -> JoinHandle<anyhow::Result<()>> {
     534            5 :         task::spawn(async move {
     535            5 :             let mut wa = WalAcceptor {
     536            5 :                 tli,
     537            5 :                 msg_rx,
     538            5 :                 reply_tx,
     539            5 :                 conn_id,
     540            5 :             };
     541            5 : 
     542            5 :             let span_ttid = wa.tli.ttid; // satisfy borrow checker
     543            5 :             wa.run()
     544            5 :                 .instrument(
     545            5 :                     info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
     546              :                 )
     547            5 :                 .await
     548            5 :         })
     549            5 :     }
     550              : 
     551              :     /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
     552              :     /// it must mean that network thread terminated.
     553              :     ///
     554              :     /// This function is *not* cancellation safe, it does local disk I/O: it should always
     555              :     /// be allowed to run to completion. It respects Timeline::cancel and shuts down cleanly
     556              :     /// when that gets triggered.
     557            5 :     async fn run(&mut self) -> anyhow::Result<()> {
     558            5 :         let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
     559            5 : 
     560            5 :         // Periodically flush the WAL and compute metrics.
     561            5 :         let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
     562            5 :         flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
     563            5 :         flush_ticker.tick().await; // skip the initial, immediate tick
     564              : 
     565            5 :         let mut metrics_ticker = tokio::time::interval(METRICS_INTERVAL);
     566            5 :         metrics_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
     567            5 : 
     568            5 :         // Tracks whether we have unflushed appends.
     569            5 :         let mut dirty = false;
     570              : 
     571         1250 :         while !self.tli.is_cancelled() {
     572         1250 :             let reply = tokio::select! {
     573              :                 // Process inbound message.
     574         1250 :                 msg = self.msg_rx.recv() => {
     575              :                     // If disconnected, break to flush WAL and return.
     576          624 :                     let Some(mut msg) = msg else {
     577            4 :                         break;
     578              :                     };
     579              : 
     580              :                     // Update gauge metrics.
     581          620 :                     WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
     582          620 :                     WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);
     583          620 : 
     584          620 :                     // Update walreceiver state in shmem for reporting.
     585          620 :                     if let ProposerAcceptorMessage::Elected(_) = &msg {
     586            0 :                         walreceiver_guard.get().status = WalReceiverStatus::Streaming;
     587          620 :                     }
     588              : 
     589              :                     // Don't flush the WAL on every append, only periodically via flush_ticker.
     590              :                     // This batches multiple appends per fsync. If the channel is empty after
     591              :                     // sending the reply, we'll schedule an immediate flush.
     592              :                     //
     593              :                     // Note that a flush can still happen on segment bounds, which will result
     594              :                     // in an AppendResponse.
     595          620 :                     if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
     596          620 :                         msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
     597          620 :                         dirty = true;
     598          620 :                     }
     599              : 
     600          620 :                     self.tli.process_msg(&msg).await?
     601              :                 }
     602              : 
     603              :                 // While receiving AppendRequests, flush the WAL periodically and respond with an
     604              :                 // AppendResponse to let walproposer know we're still alive.
     605         1250 :                 _ = flush_ticker.tick(), if dirty => {
     606            0 :                     dirty = false;
     607            0 :                     self.tli
     608            0 :                         .process_msg(&ProposerAcceptorMessage::FlushWAL)
     609            0 :                         .await?
     610              :                 }
     611              : 
     612              :                 // If there are no pending messages, flush the WAL immediately.
     613              :                 //
     614              :                 // TODO: this should be done via flush_ticker.reset_immediately(), but that's always
     615              :                 // delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
     616         1250 :                 _ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
     617          620 :                     dirty = false;
     618          620 :                     flush_ticker.reset();
     619          620 :                     self.tli
     620          620 :                         .process_msg(&ProposerAcceptorMessage::FlushWAL)
     621          620 :                         .await?
     622              :                 }
     623              : 
     624              :                 // Update histogram metrics periodically.
     625         1250 :                 _ = metrics_ticker.tick() => {
     626            5 :                     WAL_RECEIVER_QUEUE_DEPTH.observe(self.msg_rx.len() as f64);
     627            5 :                     None // no reply
     628              :                 }
     629              : 
     630         1250 :                 _ = self.tli.cancel.cancelled() => {
     631            0 :                     break;
     632              :                 }
     633              :             };
     634              : 
     635              :             // Send reply, if any.
     636         1245 :             if let Some(reply) = reply {
     637          620 :                 if self.reply_tx.send(reply).await.is_err() {
     638            0 :                     break; // disconnected, break to flush WAL and return
     639          620 :                 }
     640          625 :             }
     641              :         }
     642              : 
     643              :         // Flush WAL on disconnect, see https://github.com/neondatabase/neon/issues/9259.
     644            4 :         if dirty && !self.tli.cancel.is_cancelled() {
     645            0 :             self.tli
     646            0 :                 .process_msg(&ProposerAcceptorMessage::FlushWAL)
     647            0 :                 .await?;
     648            4 :         }
     649              : 
     650            4 :         Ok(())
     651            4 :     }
     652              : }
     653              : 
     654              : /// On drop, drain msg_rx and update metrics to avoid leaks.
     655              : impl Drop for WalAcceptor {
     656            5 :     fn drop(&mut self) {
     657            5 :         self.msg_rx.close(); // prevent further sends
     658            5 :         while let Ok(msg) = self.msg_rx.try_recv() {
     659            0 :             WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
     660            0 :             WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);
     661            0 :         }
     662            5 :     }
     663              : }
        

Generated by: LCOV version 2.1-beta