Line data Source code
1 : //! Safekeeper communication endpoint to WAL proposer (compute node).
2 : //! Gets messages from the network, passes them down to consensus module and
3 : //! sends replies back.
4 :
5 : use crate::handler::SafekeeperPostgresHandler;
6 : use crate::metrics::{
7 : WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL,
8 : WAL_RECEIVER_QUEUE_SIZE_TOTAL,
9 : };
10 : use crate::safekeeper::AcceptorProposerMessage;
11 : use crate::safekeeper::ProposerAcceptorMessage;
12 : use crate::safekeeper::ServerInfo;
13 : use crate::timeline::WalResidentTimeline;
14 : use crate::wal_service::ConnectionId;
15 : use crate::GlobalTimelines;
16 : use anyhow::{anyhow, Context};
17 : use bytes::BytesMut;
18 : use parking_lot::MappedMutexGuard;
19 : use parking_lot::Mutex;
20 : use parking_lot::MutexGuard;
21 : use postgres_backend::CopyStreamHandlerEnd;
22 : use postgres_backend::PostgresBackend;
23 : use postgres_backend::PostgresBackendReader;
24 : use postgres_backend::QueryError;
25 : use pq_proto::BeMessage;
26 : use serde::Deserialize;
27 : use serde::Serialize;
28 : use std::future;
29 : use std::net::SocketAddr;
30 : use std::sync::Arc;
31 : use tokio::io::AsyncRead;
32 : use tokio::io::AsyncWrite;
33 : use tokio::sync::mpsc::error::SendTimeoutError;
34 : use tokio::sync::mpsc::{channel, Receiver, Sender};
35 : use tokio::task;
36 : use tokio::task::JoinHandle;
37 : use tokio::time::{Duration, Instant, MissedTickBehavior};
38 : use tracing::*;
39 : use utils::id::TenantTimelineId;
40 : use utils::lsn::Lsn;
41 : use utils::pageserver_feedback::PageserverFeedback;
42 :
43 : const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
44 :
45 : /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
46 : /// in Arc).
47 : pub struct WalReceivers {
48 : mutex: Mutex<WalReceiversShared>,
49 : pageserver_feedback_tx: tokio::sync::broadcast::Sender<PageserverFeedback>,
50 :
51 : num_computes_tx: tokio::sync::watch::Sender<usize>,
52 : num_computes_rx: tokio::sync::watch::Receiver<usize>,
53 : }
54 :
55 : /// Id under which walreceiver is registered in shmem.
56 : type WalReceiverId = usize;
57 :
58 : impl WalReceivers {
59 0 : pub fn new() -> Arc<WalReceivers> {
60 0 : let (pageserver_feedback_tx, _) =
61 0 : tokio::sync::broadcast::channel(DEFAULT_FEEDBACK_CAPACITY);
62 0 :
63 0 : let (num_computes_tx, num_computes_rx) = tokio::sync::watch::channel(0usize);
64 0 :
65 0 : Arc::new(WalReceivers {
66 0 : mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
67 0 : pageserver_feedback_tx,
68 0 : num_computes_tx,
69 0 : num_computes_rx,
70 0 : })
71 0 : }
72 :
73 : /// Register new walreceiver. Returned guard provides access to the slot and
74 : /// automatically deregisters in Drop.
75 0 : pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
76 0 : let mut shared = self.mutex.lock();
77 0 : let slots = &mut shared.slots;
78 0 : let walreceiver = WalReceiverState {
79 0 : conn_id,
80 0 : status: WalReceiverStatus::Voting,
81 0 : };
82 : // find empty slot or create new one
83 0 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
84 0 : slots[pos] = Some(walreceiver);
85 0 : pos
86 : } else {
87 0 : let pos = slots.len();
88 0 : slots.push(Some(walreceiver));
89 0 : pos
90 : };
91 :
92 0 : self.update_num(&shared);
93 0 : WAL_RECEIVERS.inc();
94 0 :
95 0 : WalReceiverGuard {
96 0 : id: pos,
97 0 : walreceivers: self.clone(),
98 0 : }
99 0 : }
100 :
101 : /// Get reference to locked slot contents. Slot must exist (registered
102 : /// earlier).
103 0 : fn get_slot<'a>(
104 0 : self: &'a Arc<WalReceivers>,
105 0 : id: WalReceiverId,
106 0 : ) -> MappedMutexGuard<'a, WalReceiverState> {
107 0 : MutexGuard::map(self.mutex.lock(), |locked| {
108 0 : locked.slots[id]
109 0 : .as_mut()
110 0 : .expect("walreceiver doesn't exist")
111 0 : })
112 0 : }
113 :
114 : /// Get number of walreceivers (compute connections).
115 0 : pub fn get_num(self: &Arc<WalReceivers>) -> usize {
116 0 : self.mutex.lock().get_num()
117 0 : }
118 :
119 : /// Get channel for number of walreceivers.
120 0 : pub fn get_num_rx(self: &Arc<WalReceivers>) -> tokio::sync::watch::Receiver<usize> {
121 0 : self.num_computes_rx.clone()
122 0 : }
123 :
124 : /// Should get called after every update of slots.
125 0 : fn update_num(self: &Arc<WalReceivers>, shared: &MutexGuard<WalReceiversShared>) {
126 0 : let num = shared.get_num();
127 0 : self.num_computes_tx.send_replace(num);
128 0 : }
129 :
130 : /// Get state of all walreceivers.
131 0 : pub fn get_all(self: &Arc<WalReceivers>) -> Vec<WalReceiverState> {
132 0 : self.mutex.lock().slots.iter().flatten().cloned().collect()
133 0 : }
134 :
135 : /// Get number of streaming walreceivers (normally 0 or 1) from compute.
136 0 : pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
137 0 : self.mutex
138 0 : .lock()
139 0 : .slots
140 0 : .iter()
141 0 : .flatten()
142 0 : // conn_id.is_none skips recovery which also registers here
143 0 : .filter(|s| s.conn_id.is_some() && matches!(s.status, WalReceiverStatus::Streaming))
144 0 : .count()
145 0 : }
146 :
147 : /// Unregister walreceiver.
148 0 : fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
149 0 : let mut shared = self.mutex.lock();
150 0 : shared.slots[id] = None;
151 0 : self.update_num(&shared);
152 0 : WAL_RECEIVERS.dec();
153 0 : }
154 :
155 : /// Broadcast pageserver feedback to connected walproposers.
156 0 : pub fn broadcast_pageserver_feedback(&self, feedback: PageserverFeedback) {
157 0 : // Err means there is no subscribers, it is fine.
158 0 : let _ = self.pageserver_feedback_tx.send(feedback);
159 0 : }
160 : }
161 :
162 : /// Only a few connections are expected (normally one), so store in Vec.
163 : struct WalReceiversShared {
164 : slots: Vec<Option<WalReceiverState>>,
165 : }
166 :
167 : impl WalReceiversShared {
168 : /// Get number of walreceivers (compute connections).
169 0 : fn get_num(&self) -> usize {
170 0 : self.slots.iter().flatten().count()
171 0 : }
172 : }
173 :
174 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
175 : pub struct WalReceiverState {
176 : /// None means it is recovery initiated by us (this safekeeper).
177 : pub conn_id: Option<ConnectionId>,
178 : pub status: WalReceiverStatus,
179 : }
180 :
181 : /// Walreceiver status. Currently only whether it passed voting stage and
182 : /// started receiving the stream, but it is easy to add more if needed.
183 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
184 : pub enum WalReceiverStatus {
185 : Voting,
186 : Streaming,
187 : }
188 :
189 : /// Scope guard to access slot in WalReceivers registry and unregister from
190 : /// it in Drop.
191 : pub struct WalReceiverGuard {
192 : id: WalReceiverId,
193 : walreceivers: Arc<WalReceivers>,
194 : }
195 :
196 : impl WalReceiverGuard {
197 : /// Get reference to locked shared state contents.
198 0 : fn get(&self) -> MappedMutexGuard<WalReceiverState> {
199 0 : self.walreceivers.get_slot(self.id)
200 0 : }
201 : }
202 :
203 : impl Drop for WalReceiverGuard {
204 0 : fn drop(&mut self) {
205 0 : self.walreceivers.unregister(self.id);
206 0 : }
207 : }
208 :
209 : pub const MSG_QUEUE_SIZE: usize = 256;
210 : pub const REPLY_QUEUE_SIZE: usize = 16;
211 :
212 : impl SafekeeperPostgresHandler {
213 : /// Wrapper around handle_start_wal_push_guts handling result. Error is
214 : /// handled here while we're still in walreceiver ttid span; with API
215 : /// extension, this can probably be moved into postgres_backend.
216 0 : pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
217 0 : &mut self,
218 0 : pgb: &mut PostgresBackend<IO>,
219 0 : ) -> Result<(), QueryError> {
220 0 : let mut tli: Option<WalResidentTimeline> = None;
221 0 : if let Err(end) = self.handle_start_wal_push_guts(pgb, &mut tli).await {
222 : // Log the result and probably send it to the client, closing the stream.
223 0 : let handle_end_fut = pgb.handle_copy_stream_end(end);
224 : // If we managed to create the timeline, augment logging with current LSNs etc.
225 0 : if let Some(tli) = tli {
226 0 : let info = tli.get_safekeeper_info(&self.conf).await;
227 0 : handle_end_fut
228 0 : .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.commit_lsn)))
229 0 : .await;
230 : } else {
231 0 : handle_end_fut.await;
232 : }
233 0 : }
234 0 : Ok(())
235 0 : }
236 :
237 0 : pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
238 0 : &mut self,
239 0 : pgb: &mut PostgresBackend<IO>,
240 0 : tli: &mut Option<WalResidentTimeline>,
241 0 : ) -> Result<(), CopyStreamHandlerEnd> {
242 0 : // The `tli` parameter is only used for passing _out_ a timeline, one should
243 0 : // not have been passed in.
244 0 : assert!(tli.is_none());
245 :
246 : // Notify the libpq client that it's allowed to send `CopyData` messages
247 0 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
248 :
249 : // Experiments [1] confirm that doing network IO in one (this) thread and
250 : // processing with disc IO in another significantly improves
251 : // performance; we spawn off WalAcceptor thread for message processing
252 : // to this end.
253 : //
254 : // [1] https://github.com/neondatabase/neon/pull/1318
255 0 : let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
256 0 : let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
257 0 : let mut acceptor_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
258 :
259 : // Concurrently receive and send data; replies are not synchronized with
260 : // sends, so this avoids deadlocks.
261 0 : let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
262 0 : let peer_addr = *pgb.get_peer_addr();
263 0 :
264 0 : let mut network_reader = NetworkReader {
265 0 : ttid: self.ttid,
266 0 : conn_id: self.conn_id,
267 0 : pgb_reader: &mut pgb_reader,
268 0 : peer_addr,
269 0 : acceptor_handle: &mut acceptor_handle,
270 0 : global_timelines: self.global_timelines.clone(),
271 0 : };
272 :
273 : // Read first message and create timeline if needed.
274 0 : let res = network_reader.read_first_message().await;
275 :
276 0 : let network_res = if let Ok((timeline, next_msg)) = res {
277 0 : let pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback> =
278 0 : timeline
279 0 : .get_walreceivers()
280 0 : .pageserver_feedback_tx
281 0 : .subscribe();
282 0 : *tli = Some(timeline.wal_residence_guard().await?);
283 :
284 0 : let timeline_cancel = timeline.cancel.clone();
285 0 : tokio::select! {
286 : // todo: add read|write .context to these errors
287 0 : r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline, next_msg) => r,
288 0 : r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
289 0 : _ = timeline_cancel.cancelled() => {
290 0 : return Err(CopyStreamHandlerEnd::Cancelled);
291 : }
292 : }
293 : } else {
294 0 : res.map(|_| ())
295 : };
296 :
297 : // Join pg backend back.
298 0 : pgb.unsplit(pgb_reader)?;
299 :
300 : // Join the spawned WalAcceptor. At this point chans to/from it passed
301 : // to network routines are dropped, so it will exit as soon as it
302 : // touches them.
303 0 : match acceptor_handle {
304 : None => {
305 : // failed even before spawning; read_network should have error
306 0 : Err(network_res.expect_err("no error with WalAcceptor not spawn"))
307 : }
308 0 : Some(handle) => {
309 0 : let wal_acceptor_res = handle.await;
310 :
311 : // If there was any network error, return it.
312 0 : network_res?;
313 :
314 : // Otherwise, WalAcceptor thread must have errored.
315 0 : match wal_acceptor_res {
316 0 : Ok(Ok(_)) => Ok(()), // Clean shutdown
317 0 : Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
318 0 : Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
319 0 : "WalAcceptor task panicked",
320 0 : ))),
321 : }
322 : }
323 : }
324 0 : }
325 : }
326 :
327 : struct NetworkReader<'a, IO> {
328 : ttid: TenantTimelineId,
329 : conn_id: ConnectionId,
330 : pgb_reader: &'a mut PostgresBackendReader<IO>,
331 : peer_addr: SocketAddr,
332 : // WalAcceptor is spawned when we learn server info from walproposer and
333 : // create timeline; handle is put here.
334 : acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
335 : global_timelines: Arc<GlobalTimelines>,
336 : }
337 :
338 : impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
339 0 : async fn read_first_message(
340 0 : &mut self,
341 0 : ) -> Result<(WalResidentTimeline, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
342 : // Receive information about server to create timeline, if not yet.
343 0 : let next_msg = read_message(self.pgb_reader).await?;
344 0 : let tli = match next_msg {
345 0 : ProposerAcceptorMessage::Greeting(ref greeting) => {
346 0 : info!(
347 0 : "start handshake with walproposer {} sysid {} timeline {}",
348 : self.peer_addr, greeting.system_id, greeting.tli,
349 : );
350 0 : let server_info = ServerInfo {
351 0 : pg_version: greeting.pg_version,
352 0 : system_id: greeting.system_id,
353 0 : wal_seg_size: greeting.wal_seg_size,
354 0 : };
355 0 : let tli = self
356 0 : .global_timelines
357 0 : .create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID)
358 0 : .await
359 0 : .context("create timeline")?;
360 0 : tli.wal_residence_guard().await?
361 : }
362 : _ => {
363 0 : return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
364 0 : "unexpected message {next_msg:?} instead of greeting"
365 0 : )))
366 : }
367 : };
368 0 : Ok((tli, next_msg))
369 0 : }
370 :
371 : /// This function is cancellation-safe (only does network I/O and channel read/writes).
372 0 : async fn run(
373 0 : self,
374 0 : msg_tx: Sender<ProposerAcceptorMessage>,
375 0 : msg_rx: Receiver<ProposerAcceptorMessage>,
376 0 : reply_tx: Sender<AcceptorProposerMessage>,
377 0 : tli: WalResidentTimeline,
378 0 : next_msg: ProposerAcceptorMessage,
379 0 : ) -> Result<(), CopyStreamHandlerEnd> {
380 0 : *self.acceptor_handle = Some(WalAcceptor::spawn(
381 0 : tli,
382 0 : msg_rx,
383 0 : reply_tx,
384 0 : Some(self.conn_id),
385 0 : ));
386 0 :
387 0 : // Forward all messages to WalAcceptor
388 0 : read_network_loop(self.pgb_reader, msg_tx, next_msg).await
389 0 : }
390 : }
391 :
392 : /// Read next message from walproposer.
393 : /// TODO: Return Ok(None) on graceful termination.
394 0 : async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
395 0 : pgb_reader: &mut PostgresBackendReader<IO>,
396 0 : ) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
397 0 : let copy_data = pgb_reader.read_copy_message().await?;
398 0 : let msg = ProposerAcceptorMessage::parse(copy_data)?;
399 0 : Ok(msg)
400 0 : }
401 :
402 0 : async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
403 0 : pgb_reader: &mut PostgresBackendReader<IO>,
404 0 : msg_tx: Sender<ProposerAcceptorMessage>,
405 0 : mut next_msg: ProposerAcceptorMessage,
406 0 : ) -> Result<(), CopyStreamHandlerEnd> {
407 : /// Threshold for logging slow WalAcceptor sends.
408 : const SLOW_THRESHOLD: Duration = Duration::from_secs(5);
409 :
410 : loop {
411 0 : let started = Instant::now();
412 0 : let size = next_msg.size();
413 0 :
414 0 : match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await {
415 0 : Ok(()) => {}
416 : // Slow send, log a message and keep trying. Log context has timeline ID.
417 0 : Err(SendTimeoutError::Timeout(next_msg)) => {
418 0 : warn!(
419 0 : "slow WalAcceptor send blocked for {:.3}s",
420 0 : Instant::now().duration_since(started).as_secs_f64()
421 : );
422 0 : if msg_tx.send(next_msg).await.is_err() {
423 0 : return Ok(()); // WalAcceptor terminated
424 0 : }
425 0 : warn!(
426 0 : "slow WalAcceptor send completed after {:.3}s",
427 0 : Instant::now().duration_since(started).as_secs_f64()
428 : )
429 : }
430 : // WalAcceptor terminated.
431 0 : Err(SendTimeoutError::Closed(_)) => return Ok(()),
432 : }
433 :
434 : // Update metrics. Will be decremented in WalAcceptor.
435 0 : WAL_RECEIVER_QUEUE_DEPTH_TOTAL.inc();
436 0 : WAL_RECEIVER_QUEUE_SIZE_TOTAL.add(size as i64);
437 :
438 0 : next_msg = read_message(pgb_reader).await?;
439 : }
440 0 : }
441 :
442 : /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
443 : /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
444 : /// tell the error.
445 : ///
446 : /// This function is cancellation-safe (only does network I/O and channel read/writes).
447 0 : async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
448 0 : pgb_writer: &mut PostgresBackend<IO>,
449 0 : mut reply_rx: Receiver<AcceptorProposerMessage>,
450 0 : mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
451 0 : ) -> Result<(), CopyStreamHandlerEnd> {
452 0 : let mut buf = BytesMut::with_capacity(128);
453 0 :
454 0 : // storing append_response to inject PageserverFeedback into it
455 0 : let mut last_append_response = None;
456 :
457 : loop {
458 : // trying to read either AcceptorProposerMessage or PageserverFeedback
459 0 : let msg = tokio::select! {
460 0 : reply = reply_rx.recv() => {
461 0 : if let Some(msg) = reply {
462 0 : if let AcceptorProposerMessage::AppendResponse(append_response) = &msg {
463 0 : last_append_response = Some(append_response.clone());
464 0 : }
465 0 : Some(msg)
466 : } else {
467 0 : return Ok(()); // chan closed, WalAcceptor terminated
468 : }
469 : }
470 :
471 0 : feedback = pageserver_feedback_rx.recv() =>
472 0 : match (feedback, &last_append_response) {
473 0 : (Ok(feedback), Some(append_response)) => {
474 0 : // clone AppendResponse and inject PageserverFeedback into it
475 0 : let mut append_response = append_response.clone();
476 0 : append_response.pageserver_feedback = Some(feedback);
477 0 : Some(AcceptorProposerMessage::AppendResponse(append_response))
478 : }
479 0 : _ => None,
480 : },
481 : };
482 :
483 0 : let Some(msg) = msg else {
484 0 : continue;
485 : };
486 :
487 0 : buf.clear();
488 0 : msg.serialize(&mut buf)?;
489 0 : pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
490 : }
491 0 : }
492 :
493 : /// The WAL flush interval. This ensures we periodically flush the WAL and send AppendResponses to
494 : /// walproposer, even when it's writing a steady stream of messages.
495 : const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
496 :
497 : /// The metrics computation interval.
498 : ///
499 : /// The Prometheus poll interval is 60 seconds at the time of writing. We sample the queue depth
500 : /// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
501 : const METRICS_INTERVAL: Duration = Duration::from_secs(5);
502 :
503 : /// Encapsulates a task which takes messages from msg_rx, processes and pushes
504 : /// replies to reply_tx.
505 : ///
506 : /// Reading from socket and writing to disk in parallel is beneficial for
507 : /// performance, this struct provides the writing to disk part.
508 : pub struct WalAcceptor {
509 : tli: WalResidentTimeline,
510 : msg_rx: Receiver<ProposerAcceptorMessage>,
511 : reply_tx: Sender<AcceptorProposerMessage>,
512 : conn_id: Option<ConnectionId>,
513 : }
514 :
515 : impl WalAcceptor {
516 : /// Spawn task with WalAcceptor running, return handle to it. Task returns
517 : /// Ok(()) if either of channels has closed, and Err if any error during
518 : /// message processing is encountered.
519 : ///
520 : /// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
521 0 : pub fn spawn(
522 0 : tli: WalResidentTimeline,
523 0 : msg_rx: Receiver<ProposerAcceptorMessage>,
524 0 : reply_tx: Sender<AcceptorProposerMessage>,
525 0 : conn_id: Option<ConnectionId>,
526 0 : ) -> JoinHandle<anyhow::Result<()>> {
527 0 : task::spawn(async move {
528 0 : let mut wa = WalAcceptor {
529 0 : tli,
530 0 : msg_rx,
531 0 : reply_tx,
532 0 : conn_id,
533 0 : };
534 0 :
535 0 : let span_ttid = wa.tli.ttid; // satisfy borrow checker
536 0 : wa.run()
537 0 : .instrument(
538 0 : info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
539 : )
540 0 : .await
541 0 : })
542 0 : }
543 :
544 : /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
545 : /// it must mean that network thread terminated.
546 : ///
547 : /// This function is *not* cancellation safe, it does local disk I/O: it should always
548 : /// be allowed to run to completion. It respects Timeline::cancel and shuts down cleanly
549 : /// when that gets triggered.
550 0 : async fn run(&mut self) -> anyhow::Result<()> {
551 0 : let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
552 0 :
553 0 : // Periodically flush the WAL and compute metrics.
554 0 : let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
555 0 : flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
556 0 : flush_ticker.tick().await; // skip the initial, immediate tick
557 :
558 0 : let mut metrics_ticker = tokio::time::interval(METRICS_INTERVAL);
559 0 : metrics_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
560 0 :
561 0 : // Tracks whether we have unflushed appends.
562 0 : let mut dirty = false;
563 :
564 0 : while !self.tli.is_cancelled() {
565 0 : let reply = tokio::select! {
566 : // Process inbound message.
567 0 : msg = self.msg_rx.recv() => {
568 : // If disconnected, break to flush WAL and return.
569 0 : let Some(mut msg) = msg else {
570 0 : break;
571 : };
572 :
573 : // Update gauge metrics.
574 0 : WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
575 0 : WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);
576 0 :
577 0 : // Update walreceiver state in shmem for reporting.
578 0 : if let ProposerAcceptorMessage::Elected(_) = &msg {
579 0 : walreceiver_guard.get().status = WalReceiverStatus::Streaming;
580 0 : }
581 :
582 : // Don't flush the WAL on every append, only periodically via flush_ticker.
583 : // This batches multiple appends per fsync. If the channel is empty after
584 : // sending the reply, we'll schedule an immediate flush.
585 : //
586 : // Note that a flush can still happen on segment bounds, which will result
587 : // in an AppendResponse.
588 0 : if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
589 0 : msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
590 0 : dirty = true;
591 0 : }
592 :
593 0 : self.tli.process_msg(&msg).await?
594 : }
595 :
596 : // While receiving AppendRequests, flush the WAL periodically and respond with an
597 : // AppendResponse to let walproposer know we're still alive.
598 0 : _ = flush_ticker.tick(), if dirty => {
599 0 : dirty = false;
600 0 : self.tli
601 0 : .process_msg(&ProposerAcceptorMessage::FlushWAL)
602 0 : .await?
603 : }
604 :
605 : // If there are no pending messages, flush the WAL immediately.
606 : //
607 : // TODO: this should be done via flush_ticker.reset_immediately(), but that's always
608 : // delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
609 0 : _ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
610 0 : dirty = false;
611 0 : flush_ticker.reset();
612 0 : self.tli
613 0 : .process_msg(&ProposerAcceptorMessage::FlushWAL)
614 0 : .await?
615 : }
616 :
617 : // Update histogram metrics periodically.
618 0 : _ = metrics_ticker.tick() => {
619 0 : WAL_RECEIVER_QUEUE_DEPTH.observe(self.msg_rx.len() as f64);
620 0 : None // no reply
621 : }
622 :
623 0 : _ = self.tli.cancel.cancelled() => {
624 0 : break;
625 : }
626 : };
627 :
628 : // Send reply, if any.
629 0 : if let Some(reply) = reply {
630 0 : if self.reply_tx.send(reply).await.is_err() {
631 0 : break; // disconnected, break to flush WAL and return
632 0 : }
633 0 : }
634 : }
635 :
636 : // Flush WAL on disconnect, see https://github.com/neondatabase/neon/issues/9259.
637 0 : if dirty && !self.tli.cancel.is_cancelled() {
638 0 : self.tli
639 0 : .process_msg(&ProposerAcceptorMessage::FlushWAL)
640 0 : .await?;
641 0 : }
642 :
643 0 : Ok(())
644 0 : }
645 : }
646 :
647 : /// On drop, drain msg_rx and update metrics to avoid leaks.
648 : impl Drop for WalAcceptor {
649 0 : fn drop(&mut self) {
650 0 : self.msg_rx.close(); // prevent further sends
651 0 : while let Ok(msg) = self.msg_rx.try_recv() {
652 0 : WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
653 0 : WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);
654 0 : }
655 0 : }
656 : }
|