LCOV - code coverage report
Current view: top level - safekeeper/src - receive_wal.rs (source / functions) Coverage Total Hit
Test: 496e96cdfff2df79370229591d6427cda12fde29.info Lines: 0.0 % 272 0
Test Date: 2024-05-21 18:28:29 Functions: 0.0 % 65 0

            Line data    Source code
       1              : //! Safekeeper communication endpoint to WAL proposer (compute node).
       2              : //! Gets messages from the network, passes them down to consensus module and
       3              : //! sends replies back.
       4              : 
       5              : use crate::handler::SafekeeperPostgresHandler;
       6              : use crate::safekeeper::AcceptorProposerMessage;
       7              : use crate::safekeeper::ProposerAcceptorMessage;
       8              : use crate::safekeeper::ServerInfo;
       9              : use crate::timeline::Timeline;
      10              : use crate::wal_service::ConnectionId;
      11              : use crate::GlobalTimelines;
      12              : use anyhow::{anyhow, Context};
      13              : use bytes::BytesMut;
      14              : use parking_lot::MappedMutexGuard;
      15              : use parking_lot::Mutex;
      16              : use parking_lot::MutexGuard;
      17              : use postgres_backend::CopyStreamHandlerEnd;
      18              : use postgres_backend::PostgresBackend;
      19              : use postgres_backend::PostgresBackendReader;
      20              : use postgres_backend::QueryError;
      21              : use pq_proto::BeMessage;
      22              : use serde::Deserialize;
      23              : use serde::Serialize;
      24              : use std::net::SocketAddr;
      25              : use std::sync::Arc;
      26              : use tokio::io::AsyncRead;
      27              : use tokio::io::AsyncWrite;
      28              : use tokio::sync::mpsc::channel;
      29              : use tokio::sync::mpsc::error::TryRecvError;
      30              : use tokio::sync::mpsc::Receiver;
      31              : use tokio::sync::mpsc::Sender;
      32              : use tokio::task;
      33              : use tokio::task::JoinHandle;
      34              : use tokio::time::Duration;
      35              : use tokio::time::Instant;
      36              : use tracing::*;
      37              : use utils::id::TenantTimelineId;
      38              : use utils::lsn::Lsn;
      39              : use utils::pageserver_feedback::PageserverFeedback;
      40              : 
      41              : const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
      42              : 
      43              : /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
      44              : /// in Arc).
      45              : pub struct WalReceivers {
      46              :     mutex: Mutex<WalReceiversShared>,
      47              :     pageserver_feedback_tx: tokio::sync::broadcast::Sender<PageserverFeedback>,
      48              : }
      49              : 
      50              : /// Id under which walreceiver is registered in shmem.
      51              : type WalReceiverId = usize;
      52              : 
      53              : impl WalReceivers {
      54            0 :     pub fn new() -> Arc<WalReceivers> {
      55            0 :         let (pageserver_feedback_tx, _) =
      56            0 :             tokio::sync::broadcast::channel(DEFAULT_FEEDBACK_CAPACITY);
      57            0 : 
      58            0 :         Arc::new(WalReceivers {
      59            0 :             mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
      60            0 :             pageserver_feedback_tx,
      61            0 :         })
      62            0 :     }
      63              : 
      64              :     /// Register new walreceiver. Returned guard provides access to the slot and
      65              :     /// automatically deregisters in Drop.
      66            0 :     pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
      67            0 :         let slots = &mut self.mutex.lock().slots;
      68            0 :         let walreceiver = WalReceiverState {
      69            0 :             conn_id,
      70            0 :             status: WalReceiverStatus::Voting,
      71            0 :         };
      72              :         // find empty slot or create new one
      73            0 :         let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
      74            0 :             slots[pos] = Some(walreceiver);
      75            0 :             pos
      76              :         } else {
      77            0 :             let pos = slots.len();
      78            0 :             slots.push(Some(walreceiver));
      79            0 :             pos
      80              :         };
      81            0 :         WalReceiverGuard {
      82            0 :             id: pos,
      83            0 :             walreceivers: self.clone(),
      84            0 :         }
      85            0 :     }
      86              : 
      87              :     /// Get reference to locked slot contents. Slot must exist (registered
      88              :     /// earlier).
      89            0 :     fn get_slot<'a>(
      90            0 :         self: &'a Arc<WalReceivers>,
      91            0 :         id: WalReceiverId,
      92            0 :     ) -> MappedMutexGuard<'a, WalReceiverState> {
      93            0 :         MutexGuard::map(self.mutex.lock(), |locked| {
      94            0 :             locked.slots[id]
      95            0 :                 .as_mut()
      96            0 :                 .expect("walreceiver doesn't exist")
      97            0 :         })
      98            0 :     }
      99              : 
     100              :     /// Get number of walreceivers (compute connections).
     101            0 :     pub fn get_num(self: &Arc<WalReceivers>) -> usize {
     102            0 :         self.mutex.lock().slots.iter().flatten().count()
     103            0 :     }
     104              : 
     105              :     /// Get state of all walreceivers.
     106            0 :     pub fn get_all(self: &Arc<WalReceivers>) -> Vec<WalReceiverState> {
     107            0 :         self.mutex.lock().slots.iter().flatten().cloned().collect()
     108            0 :     }
     109              : 
     110              :     /// Get number of streaming walreceivers (normally 0 or 1) from compute.
     111            0 :     pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
     112            0 :         self.mutex
     113            0 :             .lock()
     114            0 :             .slots
     115            0 :             .iter()
     116            0 :             .flatten()
     117            0 :             // conn_id.is_none skips recovery which also registers here
     118            0 :             .filter(|s| s.conn_id.is_some() && matches!(s.status, WalReceiverStatus::Streaming))
     119            0 :             .count()
     120            0 :     }
     121              : 
     122              :     /// Unregister walreceiver.
     123            0 :     fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
     124            0 :         let mut shared = self.mutex.lock();
     125            0 :         shared.slots[id] = None;
     126            0 :     }
     127              : 
     128              :     /// Broadcast pageserver feedback to connected walproposers.
     129            0 :     pub fn broadcast_pageserver_feedback(&self, feedback: PageserverFeedback) {
     130            0 :         // Err means there is no subscribers, it is fine.
     131            0 :         let _ = self.pageserver_feedback_tx.send(feedback);
     132            0 :     }
     133              : }
     134              : 
     135              : /// Only a few connections are expected (normally one), so store in Vec.
     136              : struct WalReceiversShared {
     137              :     slots: Vec<Option<WalReceiverState>>,
     138              : }
     139              : 
     140            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     141              : pub struct WalReceiverState {
     142              :     /// None means it is recovery initiated by us (this safekeeper).
     143              :     pub conn_id: Option<ConnectionId>,
     144              :     pub status: WalReceiverStatus,
     145              : }
     146              : 
     147              : /// Walreceiver status. Currently only whether it passed voting stage and
     148              : /// started receiving the stream, but it is easy to add more if needed.
     149            0 : #[derive(Debug, Clone, Serialize, Deserialize)]
     150              : pub enum WalReceiverStatus {
     151              :     Voting,
     152              :     Streaming,
     153              : }
     154              : 
     155              : /// Scope guard to access slot in WalReceivers registry and unregister from
     156              : /// it in Drop.
     157              : pub struct WalReceiverGuard {
     158              :     id: WalReceiverId,
     159              :     walreceivers: Arc<WalReceivers>,
     160              : }
     161              : 
     162              : impl WalReceiverGuard {
     163              :     /// Get reference to locked shared state contents.
     164            0 :     fn get(&self) -> MappedMutexGuard<WalReceiverState> {
     165            0 :         self.walreceivers.get_slot(self.id)
     166            0 :     }
     167              : }
     168              : 
     169              : impl Drop for WalReceiverGuard {
     170            0 :     fn drop(&mut self) {
     171            0 :         self.walreceivers.unregister(self.id);
     172            0 :     }
     173              : }
     174              : 
     175              : pub const MSG_QUEUE_SIZE: usize = 256;
     176              : pub const REPLY_QUEUE_SIZE: usize = 16;
     177              : 
     178              : impl SafekeeperPostgresHandler {
     179              :     /// Wrapper around handle_start_wal_push_guts handling result. Error is
     180              :     /// handled here while we're still in walreceiver ttid span; with API
     181              :     /// extension, this can probably be moved into postgres_backend.
     182            0 :     pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
     183            0 :         &mut self,
     184            0 :         pgb: &mut PostgresBackend<IO>,
     185            0 :     ) -> Result<(), QueryError> {
     186            0 :         let mut tli: Option<Arc<Timeline>> = None;
     187            0 :         if let Err(end) = self.handle_start_wal_push_guts(pgb, &mut tli).await {
     188              :             // Log the result and probably send it to the client, closing the stream.
     189            0 :             let handle_end_fut = pgb.handle_copy_stream_end(end);
     190              :             // If we managed to create the timeline, augment logging with current LSNs etc.
     191            0 :             if let Some(tli) = tli {
     192            0 :                 let info = tli.get_safekeeper_info(&self.conf).await;
     193            0 :                 handle_end_fut
     194            0 :                     .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.commit_lsn)))
     195            0 :                     .await;
     196              :             } else {
     197            0 :                 handle_end_fut.await;
     198              :             }
     199            0 :         }
     200            0 :         Ok(())
     201            0 :     }
     202              : 
     203            0 :     pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
     204            0 :         &mut self,
     205            0 :         pgb: &mut PostgresBackend<IO>,
     206            0 :         tli: &mut Option<Arc<Timeline>>,
     207            0 :     ) -> Result<(), CopyStreamHandlerEnd> {
     208            0 :         // Notify the libpq client that it's allowed to send `CopyData` messages
     209            0 :         pgb.write_message(&BeMessage::CopyBothResponse).await?;
     210              : 
     211              :         // Experiments [1] confirm that doing network IO in one (this) thread and
     212              :         // processing with disc IO in another significantly improves
     213              :         // performance; we spawn off WalAcceptor thread for message processing
     214              :         // to this end.
     215              :         //
     216              :         // [1] https://github.com/neondatabase/neon/pull/1318
     217            0 :         let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
     218            0 :         let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
     219            0 :         let mut acceptor_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
     220              : 
     221              :         // Concurrently receive and send data; replies are not synchronized with
     222              :         // sends, so this avoids deadlocks.
     223            0 :         let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
     224            0 :         let peer_addr = *pgb.get_peer_addr();
     225            0 :         let mut network_reader = NetworkReader {
     226            0 :             ttid: self.ttid,
     227            0 :             conn_id: self.conn_id,
     228            0 :             pgb_reader: &mut pgb_reader,
     229            0 :             peer_addr,
     230            0 :             acceptor_handle: &mut acceptor_handle,
     231            0 :         };
     232              : 
     233              :         // Read first message and create timeline if needed.
     234            0 :         let res = network_reader.read_first_message().await;
     235              : 
     236            0 :         let network_res = if let Ok((timeline, next_msg)) = res {
     237            0 :             let pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback> =
     238            0 :                 timeline
     239            0 :                     .get_walreceivers()
     240            0 :                     .pageserver_feedback_tx
     241            0 :                     .subscribe();
     242            0 :             *tli = Some(timeline.clone());
     243              : 
     244              :             tokio::select! {
     245              :                 // todo: add read|write .context to these errors
     246              :                 r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline.clone(), next_msg) => r,
     247              :                 r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
     248              :             }
     249              :         } else {
     250            0 :             res.map(|_| ())
     251              :         };
     252              : 
     253              :         // Join pg backend back.
     254            0 :         pgb.unsplit(pgb_reader)?;
     255              : 
     256              :         // Join the spawned WalAcceptor. At this point chans to/from it passed
     257              :         // to network routines are dropped, so it will exit as soon as it
     258              :         // touches them.
     259            0 :         match acceptor_handle {
     260              :             None => {
     261              :                 // failed even before spawning; read_network should have error
     262            0 :                 Err(network_res.expect_err("no error with WalAcceptor not spawn"))
     263              :             }
     264            0 :             Some(handle) => {
     265            0 :                 let wal_acceptor_res = handle.await;
     266              : 
     267              :                 // If there was any network error, return it.
     268            0 :                 network_res?;
     269              : 
     270              :                 // Otherwise, WalAcceptor thread must have errored.
     271            0 :                 match wal_acceptor_res {
     272            0 :                     Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination
     273            0 :                     Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
     274            0 :                     Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
     275            0 :                         "WalAcceptor task panicked",
     276            0 :                     ))),
     277              :                 }
     278              :             }
     279              :         }
     280            0 :     }
     281              : }
     282              : 
     283              : struct NetworkReader<'a, IO> {
     284              :     ttid: TenantTimelineId,
     285              :     conn_id: ConnectionId,
     286              :     pgb_reader: &'a mut PostgresBackendReader<IO>,
     287              :     peer_addr: SocketAddr,
     288              :     // WalAcceptor is spawned when we learn server info from walproposer and
     289              :     // create timeline; handle is put here.
     290              :     acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
     291              : }
     292              : 
     293              : impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
     294            0 :     async fn read_first_message(
     295            0 :         &mut self,
     296            0 :     ) -> Result<(Arc<Timeline>, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
     297              :         // Receive information about server to create timeline, if not yet.
     298            0 :         let next_msg = read_message(self.pgb_reader).await?;
     299            0 :         let tli = match next_msg {
     300            0 :             ProposerAcceptorMessage::Greeting(ref greeting) => {
     301            0 :                 info!(
     302            0 :                     "start handshake with walproposer {} sysid {} timeline {}",
     303              :                     self.peer_addr, greeting.system_id, greeting.tli,
     304              :                 );
     305            0 :                 let server_info = ServerInfo {
     306            0 :                     pg_version: greeting.pg_version,
     307            0 :                     system_id: greeting.system_id,
     308            0 :                     wal_seg_size: greeting.wal_seg_size,
     309            0 :                 };
     310            0 :                 GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID).await?
     311              :             }
     312              :             _ => {
     313            0 :                 return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
     314            0 :                     "unexpected message {next_msg:?} instead of greeting"
     315            0 :                 )))
     316              :             }
     317              :         };
     318            0 :         Ok((tli, next_msg))
     319            0 :     }
     320              : 
     321            0 :     async fn run(
     322            0 :         self,
     323            0 :         msg_tx: Sender<ProposerAcceptorMessage>,
     324            0 :         msg_rx: Receiver<ProposerAcceptorMessage>,
     325            0 :         reply_tx: Sender<AcceptorProposerMessage>,
     326            0 :         tli: Arc<Timeline>,
     327            0 :         next_msg: ProposerAcceptorMessage,
     328            0 :     ) -> Result<(), CopyStreamHandlerEnd> {
     329            0 :         *self.acceptor_handle = Some(WalAcceptor::spawn(
     330            0 :             tli,
     331            0 :             msg_rx,
     332            0 :             reply_tx,
     333            0 :             Some(self.conn_id),
     334            0 :         ));
     335            0 : 
     336            0 :         // Forward all messages to WalAcceptor
     337            0 :         read_network_loop(self.pgb_reader, msg_tx, next_msg).await
     338            0 :     }
     339              : }
     340              : 
     341              : /// Read next message from walproposer.
     342              : /// TODO: Return Ok(None) on graceful termination.
     343            0 : async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
     344            0 :     pgb_reader: &mut PostgresBackendReader<IO>,
     345            0 : ) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
     346            0 :     let copy_data = pgb_reader.read_copy_message().await?;
     347            0 :     let msg = ProposerAcceptorMessage::parse(copy_data)?;
     348            0 :     Ok(msg)
     349            0 : }
     350              : 
     351            0 : async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
     352            0 :     pgb_reader: &mut PostgresBackendReader<IO>,
     353            0 :     msg_tx: Sender<ProposerAcceptorMessage>,
     354            0 :     mut next_msg: ProposerAcceptorMessage,
     355            0 : ) -> Result<(), CopyStreamHandlerEnd> {
     356              :     loop {
     357            0 :         if msg_tx.send(next_msg).await.is_err() {
     358            0 :             return Ok(()); // chan closed, WalAcceptor terminated
     359            0 :         }
     360            0 :         next_msg = read_message(pgb_reader).await?;
     361              :     }
     362            0 : }
     363              : 
     364              : /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
     365              : /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
     366              : /// tell the error.
     367            0 : async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
     368            0 :     pgb_writer: &mut PostgresBackend<IO>,
     369            0 :     mut reply_rx: Receiver<AcceptorProposerMessage>,
     370            0 :     mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
     371            0 : ) -> Result<(), CopyStreamHandlerEnd> {
     372            0 :     let mut buf = BytesMut::with_capacity(128);
     373            0 : 
     374            0 :     // storing append_response to inject PageserverFeedback into it
     375            0 :     let mut last_append_response = None;
     376              : 
     377              :     loop {
     378              :         // trying to read either AcceptorProposerMessage or PageserverFeedback
     379            0 :         let msg = tokio::select! {
     380              :             reply = reply_rx.recv() => {
     381              :                 if let Some(msg) = reply {
     382              :                     if let AcceptorProposerMessage::AppendResponse(append_response) = &msg {
     383              :                         last_append_response = Some(append_response.clone());
     384              :                     }
     385              :                     Some(msg)
     386              :                 } else {
     387              :                     return Ok(()); // chan closed, WalAcceptor terminated
     388              :                 }
     389              :             }
     390              : 
     391              :             feedback = pageserver_feedback_rx.recv() =>
     392              :                 match (feedback, &last_append_response) {
     393              :                     (Ok(feedback), Some(append_response)) => {
     394              :                         // clone AppendResponse and inject PageserverFeedback into it
     395              :                         let mut append_response = append_response.clone();
     396              :                         append_response.pageserver_feedback = Some(feedback);
     397              :                         Some(AcceptorProposerMessage::AppendResponse(append_response))
     398              :                     }
     399              :                     _ => None,
     400              :                 }
     401              :         };
     402              : 
     403            0 :         let Some(msg) = msg else {
     404            0 :             continue;
     405              :         };
     406              : 
     407            0 :         buf.clear();
     408            0 :         msg.serialize(&mut buf)?;
     409            0 :         pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
     410              :     }
     411            0 : }
     412              : 
     413              : // Send keepalive messages to walproposer, to make sure it receives updates
     414              : // even when it writes a steady stream of messages.
     415              : const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
     416              : 
     417              : /// Encapsulates a task which takes messages from msg_rx, processes and pushes
     418              : /// replies to reply_tx; reading from socket and writing to disk in parallel is
     419              : /// beneficial for performance, this struct provides writing to disk part.
     420              : pub struct WalAcceptor {
     421              :     tli: Arc<Timeline>,
     422              :     msg_rx: Receiver<ProposerAcceptorMessage>,
     423              :     reply_tx: Sender<AcceptorProposerMessage>,
     424              :     conn_id: Option<ConnectionId>,
     425              : }
     426              : 
     427              : impl WalAcceptor {
     428              :     /// Spawn task with WalAcceptor running, return handle to it. Task returns
     429              :     /// Ok(()) if either of channels has closed, and Err if any error during
     430              :     /// message processing is encountered.
     431              :     ///
     432              :     /// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
     433            0 :     pub fn spawn(
     434            0 :         tli: Arc<Timeline>,
     435            0 :         msg_rx: Receiver<ProposerAcceptorMessage>,
     436            0 :         reply_tx: Sender<AcceptorProposerMessage>,
     437            0 :         conn_id: Option<ConnectionId>,
     438            0 :     ) -> JoinHandle<anyhow::Result<()>> {
     439            0 :         task::spawn(async move {
     440            0 :             let mut wa = WalAcceptor {
     441            0 :                 tli,
     442            0 :                 msg_rx,
     443            0 :                 reply_tx,
     444            0 :                 conn_id,
     445            0 :             };
     446            0 : 
     447            0 :             let span_ttid = wa.tli.ttid; // satisfy borrow checker
     448            0 :             wa.run()
     449            0 :                 .instrument(
     450            0 :                     info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
     451              :                 )
     452            0 :                 .await
     453            0 :         })
     454            0 :     }
     455              : 
     456              :     /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
     457              :     /// it must mean that network thread terminated.
     458            0 :     async fn run(&mut self) -> anyhow::Result<()> {
     459            0 :         // Register the connection and defer unregister.
     460            0 :         // Order of the next two lines is important: we want first to remove our entry and then
     461            0 :         // update status which depends on registered connections.
     462            0 :         let _compute_conn_guard = ComputeConnectionGuard {
     463            0 :             timeline: Arc::clone(&self.tli),
     464            0 :         };
     465            0 :         let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
     466            0 :         self.tli.update_status_notify().await?;
     467              : 
     468              :         // After this timestamp we will stop processing AppendRequests and send a response
     469              :         // to the walproposer. walproposer sends at least one AppendRequest per second,
     470              :         // we will send keepalives by replying to these requests once per second.
     471            0 :         let mut next_keepalive = Instant::now();
     472              : 
     473              :         loop {
     474            0 :             let opt_msg = self.msg_rx.recv().await;
     475            0 :             if opt_msg.is_none() {
     476            0 :                 return Ok(()); // chan closed, streaming terminated
     477            0 :             }
     478            0 :             let mut next_msg = opt_msg.unwrap();
     479            0 : 
     480            0 :             // Update walreceiver state in shmem for reporting.
     481            0 :             if let ProposerAcceptorMessage::Elected(_) = &next_msg {
     482            0 :                 walreceiver_guard.get().status = WalReceiverStatus::Streaming;
     483            0 :             }
     484              : 
     485            0 :             let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
     486              :                 // loop through AppendRequest's while it's readily available to
     487              :                 // write as many WAL as possible without fsyncing
     488              :                 //
     489              :                 // Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
     490              :                 // Otherwise, we might end up in a situation where we read a message, but don't
     491              :                 // process it.
     492            0 :                 while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
     493            0 :                     let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
     494              : 
     495            0 :                     if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
     496            0 :                         if self.reply_tx.send(reply).await.is_err() {
     497            0 :                             return Ok(()); // chan closed, streaming terminated
     498            0 :                         }
     499            0 :                     }
     500              : 
     501              :                     // get out of this loop if keepalive time is reached
     502            0 :                     if Instant::now() >= next_keepalive {
     503            0 :                         break;
     504            0 :                     }
     505            0 : 
     506            0 :                     match self.msg_rx.try_recv() {
     507            0 :                         Ok(msg) => next_msg = msg,
     508            0 :                         Err(TryRecvError::Empty) => break,
     509            0 :                         Err(TryRecvError::Disconnected) => return Ok(()), // chan closed, streaming terminated
     510              :                     }
     511              :                 }
     512              : 
     513              :                 // flush all written WAL to the disk
     514            0 :                 self.tli
     515            0 :                     .process_msg(&ProposerAcceptorMessage::FlushWAL)
     516            0 :                     .await?
     517              :             } else {
     518              :                 // process message other than AppendRequest
     519            0 :                 self.tli.process_msg(&next_msg).await?
     520              :             };
     521              : 
     522            0 :             if let Some(reply) = reply_msg {
     523            0 :                 if self.reply_tx.send(reply).await.is_err() {
     524            0 :                     return Ok(()); // chan closed, streaming terminated
     525            0 :                 }
     526            0 :                 // reset keepalive time
     527            0 :                 next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
     528            0 :             }
     529              :         }
     530            0 :     }
     531              : }
     532              : 
     533              : /// Calls update_status_notify in drop to update timeline status.
     534              : struct ComputeConnectionGuard {
     535              :     timeline: Arc<Timeline>,
     536              : }
     537              : 
     538              : impl Drop for ComputeConnectionGuard {
     539            0 :     fn drop(&mut self) {
     540            0 :         let tli = self.timeline.clone();
     541            0 :         tokio::spawn(async move {
     542            0 :             if let Err(e) = tli.update_status_notify().await {
     543            0 :                 error!("failed to update timeline status: {}", e);
     544            0 :             }
     545            0 :         });
     546            0 :     }
     547              : }
        

Generated by: LCOV version 2.1-beta