Line data Source code
1 : //! Safekeeper communication endpoint to WAL proposer (compute node).
2 : //! Gets messages from the network, passes them down to consensus module and
3 : //! sends replies back.
4 :
5 : use crate::handler::SafekeeperPostgresHandler;
6 : use crate::metrics::{
7 : WAL_RECEIVERS, WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL,
8 : WAL_RECEIVER_QUEUE_SIZE_TOTAL,
9 : };
10 : use crate::safekeeper::AcceptorProposerMessage;
11 : use crate::safekeeper::ProposerAcceptorMessage;
12 : use crate::timeline::WalResidentTimeline;
13 : use crate::GlobalTimelines;
14 : use anyhow::{anyhow, Context};
15 : use bytes::BytesMut;
16 : use parking_lot::MappedMutexGuard;
17 : use parking_lot::Mutex;
18 : use parking_lot::MutexGuard;
19 : use postgres_backend::CopyStreamHandlerEnd;
20 : use postgres_backend::PostgresBackend;
21 : use postgres_backend::PostgresBackendReader;
22 : use postgres_backend::QueryError;
23 : use pq_proto::BeMessage;
24 : use safekeeper_api::membership::Configuration;
25 : use safekeeper_api::models::{ConnectionId, WalReceiverState, WalReceiverStatus};
26 : use safekeeper_api::ServerInfo;
27 : use std::future;
28 : use std::net::SocketAddr;
29 : use std::sync::Arc;
30 : use tokio::io::AsyncRead;
31 : use tokio::io::AsyncWrite;
32 : use tokio::sync::mpsc::error::SendTimeoutError;
33 : use tokio::sync::mpsc::{channel, Receiver, Sender};
34 : use tokio::task;
35 : use tokio::task::JoinHandle;
36 : use tokio::time::{Duration, Instant, MissedTickBehavior};
37 : use tracing::*;
38 : use utils::id::TenantTimelineId;
39 : use utils::lsn::Lsn;
40 : use utils::pageserver_feedback::PageserverFeedback;
41 :
42 : const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
43 :
44 : /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
45 : /// in Arc).
46 : pub struct WalReceivers {
47 : mutex: Mutex<WalReceiversShared>,
48 : pageserver_feedback_tx: tokio::sync::broadcast::Sender<PageserverFeedback>,
49 :
50 : num_computes_tx: tokio::sync::watch::Sender<usize>,
51 : num_computes_rx: tokio::sync::watch::Receiver<usize>,
52 : }
53 :
54 : /// Id under which walreceiver is registered in shmem.
55 : type WalReceiverId = usize;
56 :
57 : impl WalReceivers {
58 3 : pub fn new() -> Arc<WalReceivers> {
59 3 : let (pageserver_feedback_tx, _) =
60 3 : tokio::sync::broadcast::channel(DEFAULT_FEEDBACK_CAPACITY);
61 3 :
62 3 : let (num_computes_tx, num_computes_rx) = tokio::sync::watch::channel(0usize);
63 3 :
64 3 : Arc::new(WalReceivers {
65 3 : mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
66 3 : pageserver_feedback_tx,
67 3 : num_computes_tx,
68 3 : num_computes_rx,
69 3 : })
70 3 : }
71 :
72 : /// Register new walreceiver. Returned guard provides access to the slot and
73 : /// automatically deregisters in Drop.
74 3 : pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
75 3 : let mut shared = self.mutex.lock();
76 3 : let slots = &mut shared.slots;
77 3 : let walreceiver = WalReceiverState {
78 3 : conn_id,
79 3 : status: WalReceiverStatus::Voting,
80 3 : };
81 : // find empty slot or create new one
82 3 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
83 0 : slots[pos] = Some(walreceiver);
84 0 : pos
85 : } else {
86 3 : let pos = slots.len();
87 3 : slots.push(Some(walreceiver));
88 3 : pos
89 : };
90 :
91 3 : self.update_num(&shared);
92 3 : WAL_RECEIVERS.inc();
93 3 :
94 3 : WalReceiverGuard {
95 3 : id: pos,
96 3 : walreceivers: self.clone(),
97 3 : }
98 3 : }
99 :
100 : /// Get reference to locked slot contents. Slot must exist (registered
101 : /// earlier).
102 0 : fn get_slot<'a>(
103 0 : self: &'a Arc<WalReceivers>,
104 0 : id: WalReceiverId,
105 0 : ) -> MappedMutexGuard<'a, WalReceiverState> {
106 0 : MutexGuard::map(self.mutex.lock(), |locked| {
107 0 : locked.slots[id]
108 0 : .as_mut()
109 0 : .expect("walreceiver doesn't exist")
110 0 : })
111 0 : }
112 :
113 : /// Get number of walreceivers (compute connections).
114 0 : pub fn get_num(self: &Arc<WalReceivers>) -> usize {
115 0 : self.mutex.lock().get_num()
116 0 : }
117 :
118 : /// Get channel for number of walreceivers.
119 3 : pub fn get_num_rx(self: &Arc<WalReceivers>) -> tokio::sync::watch::Receiver<usize> {
120 3 : self.num_computes_rx.clone()
121 3 : }
122 :
123 : /// Should get called after every update of slots.
124 6 : fn update_num(self: &Arc<WalReceivers>, shared: &MutexGuard<WalReceiversShared>) {
125 6 : let num = shared.get_num();
126 6 : self.num_computes_tx.send_replace(num);
127 6 : }
128 :
129 : /// Get state of all walreceivers.
130 0 : pub fn get_all(self: &Arc<WalReceivers>) -> Vec<WalReceiverState> {
131 0 : self.mutex.lock().slots.iter().flatten().cloned().collect()
132 0 : }
133 :
134 : /// Get number of streaming walreceivers (normally 0 or 1) from compute.
135 3 : pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
136 3 : self.mutex
137 3 : .lock()
138 3 : .slots
139 3 : .iter()
140 3 : .flatten()
141 3 : // conn_id.is_none skips recovery which also registers here
142 3 : .filter(|s| s.conn_id.is_some() && matches!(s.status, WalReceiverStatus::Streaming))
143 3 : .count()
144 3 : }
145 :
146 : /// Unregister walreceiver.
147 3 : fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
148 3 : let mut shared = self.mutex.lock();
149 3 : shared.slots[id] = None;
150 3 : self.update_num(&shared);
151 3 : WAL_RECEIVERS.dec();
152 3 : }
153 :
154 : /// Broadcast pageserver feedback to connected walproposers.
155 0 : pub fn broadcast_pageserver_feedback(&self, feedback: PageserverFeedback) {
156 0 : // Err means there is no subscribers, it is fine.
157 0 : let _ = self.pageserver_feedback_tx.send(feedback);
158 0 : }
159 : }
160 :
161 : /// Only a few connections are expected (normally one), so store in Vec.
162 : struct WalReceiversShared {
163 : slots: Vec<Option<WalReceiverState>>,
164 : }
165 :
166 : impl WalReceiversShared {
167 : /// Get number of walreceivers (compute connections).
168 6 : fn get_num(&self) -> usize {
169 6 : self.slots.iter().flatten().count()
170 6 : }
171 : }
172 :
173 : /// Scope guard to access slot in WalReceivers registry and unregister from
174 : /// it in Drop.
175 : pub struct WalReceiverGuard {
176 : id: WalReceiverId,
177 : walreceivers: Arc<WalReceivers>,
178 : }
179 :
180 : impl WalReceiverGuard {
181 : /// Get reference to locked shared state contents.
182 0 : fn get(&self) -> MappedMutexGuard<WalReceiverState> {
183 0 : self.walreceivers.get_slot(self.id)
184 0 : }
185 : }
186 :
187 : impl Drop for WalReceiverGuard {
188 3 : fn drop(&mut self) {
189 3 : self.walreceivers.unregister(self.id);
190 3 : }
191 : }
192 :
193 : pub const MSG_QUEUE_SIZE: usize = 256;
194 : pub const REPLY_QUEUE_SIZE: usize = 16;
195 :
196 : impl SafekeeperPostgresHandler {
197 : /// Wrapper around handle_start_wal_push_guts handling result. Error is
198 : /// handled here while we're still in walreceiver ttid span; with API
199 : /// extension, this can probably be moved into postgres_backend.
200 0 : pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
201 0 : &mut self,
202 0 : pgb: &mut PostgresBackend<IO>,
203 0 : proto_version: u32,
204 0 : allow_timeline_creation: bool,
205 0 : ) -> Result<(), QueryError> {
206 0 : let mut tli: Option<WalResidentTimeline> = None;
207 0 : if let Err(end) = self
208 0 : .handle_start_wal_push_guts(pgb, &mut tli, proto_version, allow_timeline_creation)
209 0 : .await
210 : {
211 : // Log the result and probably send it to the client, closing the stream.
212 0 : let handle_end_fut = pgb.handle_copy_stream_end(end);
213 : // If we managed to create the timeline, augment logging with current LSNs etc.
214 0 : if let Some(tli) = tli {
215 0 : let info = tli.get_safekeeper_info(&self.conf).await;
216 0 : handle_end_fut
217 0 : .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.commit_lsn)))
218 0 : .await;
219 : } else {
220 0 : handle_end_fut.await;
221 : }
222 0 : }
223 0 : Ok(())
224 0 : }
225 :
226 0 : pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
227 0 : &mut self,
228 0 : pgb: &mut PostgresBackend<IO>,
229 0 : tli: &mut Option<WalResidentTimeline>,
230 0 : proto_version: u32,
231 0 : allow_timeline_creation: bool,
232 0 : ) -> Result<(), CopyStreamHandlerEnd> {
233 0 : // The `tli` parameter is only used for passing _out_ a timeline, one should
234 0 : // not have been passed in.
235 0 : assert!(tli.is_none());
236 :
237 : // Notify the libpq client that it's allowed to send `CopyData` messages
238 0 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
239 :
240 : // Experiments [1] confirm that doing network IO in one (this) thread and
241 : // processing with disc IO in another significantly improves
242 : // performance; we spawn off WalAcceptor thread for message processing
243 : // to this end.
244 : //
245 : // [1] https://github.com/neondatabase/neon/pull/1318
246 0 : let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
247 0 : let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
248 0 : let mut acceptor_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
249 :
250 : // Concurrently receive and send data; replies are not synchronized with
251 : // sends, so this avoids deadlocks.
252 0 : let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
253 0 : let peer_addr = *pgb.get_peer_addr();
254 0 :
255 0 : let mut network_reader = NetworkReader {
256 0 : ttid: self.ttid,
257 0 : conn_id: self.conn_id,
258 0 : pgb_reader: &mut pgb_reader,
259 0 : peer_addr,
260 0 : proto_version,
261 0 : acceptor_handle: &mut acceptor_handle,
262 0 : global_timelines: self.global_timelines.clone(),
263 0 : };
264 :
265 : // Read first message and create timeline if needed and allowed. This
266 : // won't be when timelines will be always created by storcon and
267 : // allow_timeline_creation becomes false.
268 0 : let res = network_reader
269 0 : .read_first_message(allow_timeline_creation)
270 0 : .await;
271 :
272 0 : let network_res = if let Ok((timeline, next_msg)) = res {
273 0 : let pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback> =
274 0 : timeline
275 0 : .get_walreceivers()
276 0 : .pageserver_feedback_tx
277 0 : .subscribe();
278 0 : *tli = Some(timeline.wal_residence_guard().await?);
279 :
280 0 : let timeline_cancel = timeline.cancel.clone();
281 0 : tokio::select! {
282 : // todo: add read|write .context to these errors
283 0 : r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline, next_msg) => r,
284 0 : r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
285 0 : _ = timeline_cancel.cancelled() => {
286 0 : return Err(CopyStreamHandlerEnd::Cancelled);
287 : }
288 : }
289 : } else {
290 0 : res.map(|_| ())
291 : };
292 :
293 : // Join pg backend back.
294 0 : pgb.unsplit(pgb_reader)?;
295 :
296 : // Join the spawned WalAcceptor. At this point chans to/from it passed
297 : // to network routines are dropped, so it will exit as soon as it
298 : // touches them.
299 0 : match acceptor_handle {
300 : None => {
301 : // failed even before spawning; read_network should have error
302 0 : Err(network_res.expect_err("no error with WalAcceptor not spawn"))
303 : }
304 0 : Some(handle) => {
305 0 : let wal_acceptor_res = handle.await;
306 :
307 : // If there was any network error, return it.
308 0 : network_res?;
309 :
310 : // Otherwise, WalAcceptor thread must have errored.
311 0 : match wal_acceptor_res {
312 0 : Ok(Ok(_)) => Ok(()), // Clean shutdown
313 0 : Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
314 0 : Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
315 0 : "WalAcceptor task panicked",
316 0 : ))),
317 : }
318 : }
319 : }
320 0 : }
321 : }
322 :
323 : struct NetworkReader<'a, IO> {
324 : ttid: TenantTimelineId,
325 : conn_id: ConnectionId,
326 : pgb_reader: &'a mut PostgresBackendReader<IO>,
327 : peer_addr: SocketAddr,
328 : proto_version: u32,
329 : // WalAcceptor is spawned when we learn server info from walproposer and
330 : // create timeline; handle is put here.
331 : acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
332 : global_timelines: Arc<GlobalTimelines>,
333 : }
334 :
335 : impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
336 0 : async fn read_first_message(
337 0 : &mut self,
338 0 : allow_timeline_creation: bool,
339 0 : ) -> Result<(WalResidentTimeline, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
340 : // Receive information about server to create timeline, if not yet.
341 0 : let next_msg = read_message(self.pgb_reader, self.proto_version).await?;
342 0 : let tli = match next_msg {
343 0 : ProposerAcceptorMessage::Greeting(ref greeting) => {
344 0 : info!(
345 0 : "start handshake with walproposer {} sysid {} timeline {}",
346 : self.peer_addr, greeting.system_id, greeting.tli,
347 : );
348 0 : let server_info = ServerInfo {
349 0 : pg_version: greeting.pg_version,
350 0 : system_id: greeting.system_id,
351 0 : wal_seg_size: greeting.wal_seg_size,
352 0 : };
353 0 : let tli = if allow_timeline_creation {
354 0 : self.global_timelines
355 0 : .create(
356 0 : self.ttid,
357 0 : Configuration::empty(),
358 0 : server_info,
359 0 : Lsn::INVALID,
360 0 : Lsn::INVALID,
361 0 : )
362 0 : .await
363 0 : .context("create timeline")?
364 : } else {
365 0 : self.global_timelines
366 0 : .get(self.ttid)
367 0 : .context("get timeline")?
368 : };
369 0 : tli.wal_residence_guard().await?
370 : }
371 : _ => {
372 0 : return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
373 0 : "unexpected message {next_msg:?} instead of greeting"
374 0 : )))
375 : }
376 : };
377 0 : Ok((tli, next_msg))
378 0 : }
379 :
380 : /// This function is cancellation-safe (only does network I/O and channel read/writes).
381 0 : async fn run(
382 0 : self,
383 0 : msg_tx: Sender<ProposerAcceptorMessage>,
384 0 : msg_rx: Receiver<ProposerAcceptorMessage>,
385 0 : reply_tx: Sender<AcceptorProposerMessage>,
386 0 : tli: WalResidentTimeline,
387 0 : next_msg: ProposerAcceptorMessage,
388 0 : ) -> Result<(), CopyStreamHandlerEnd> {
389 0 : *self.acceptor_handle = Some(WalAcceptor::spawn(
390 0 : tli,
391 0 : msg_rx,
392 0 : reply_tx,
393 0 : Some(self.conn_id),
394 0 : ));
395 0 :
396 0 : // Forward all messages to WalAcceptor
397 0 : read_network_loop(self.pgb_reader, msg_tx, next_msg, self.proto_version).await
398 0 : }
399 : }
400 :
401 : /// Read next message from walproposer.
402 : /// TODO: Return Ok(None) on graceful termination.
403 0 : async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
404 0 : pgb_reader: &mut PostgresBackendReader<IO>,
405 0 : proto_version: u32,
406 0 : ) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
407 0 : let copy_data = pgb_reader.read_copy_message().await?;
408 0 : let msg = ProposerAcceptorMessage::parse(copy_data, proto_version)?;
409 0 : Ok(msg)
410 0 : }
411 :
412 0 : async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
413 0 : pgb_reader: &mut PostgresBackendReader<IO>,
414 0 : msg_tx: Sender<ProposerAcceptorMessage>,
415 0 : mut next_msg: ProposerAcceptorMessage,
416 0 : proto_version: u32,
417 0 : ) -> Result<(), CopyStreamHandlerEnd> {
418 : /// Threshold for logging slow WalAcceptor sends.
419 : const SLOW_THRESHOLD: Duration = Duration::from_secs(5);
420 :
421 : loop {
422 0 : let started = Instant::now();
423 0 : let size = next_msg.size();
424 0 :
425 0 : match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await {
426 0 : Ok(()) => {}
427 : // Slow send, log a message and keep trying. Log context has timeline ID.
428 0 : Err(SendTimeoutError::Timeout(next_msg)) => {
429 0 : warn!(
430 0 : "slow WalAcceptor send blocked for {:.3}s",
431 0 : Instant::now().duration_since(started).as_secs_f64()
432 : );
433 0 : if msg_tx.send(next_msg).await.is_err() {
434 0 : return Ok(()); // WalAcceptor terminated
435 0 : }
436 0 : warn!(
437 0 : "slow WalAcceptor send completed after {:.3}s",
438 0 : Instant::now().duration_since(started).as_secs_f64()
439 : )
440 : }
441 : // WalAcceptor terminated.
442 0 : Err(SendTimeoutError::Closed(_)) => return Ok(()),
443 : }
444 :
445 : // Update metrics. Will be decremented in WalAcceptor.
446 0 : WAL_RECEIVER_QUEUE_DEPTH_TOTAL.inc();
447 0 : WAL_RECEIVER_QUEUE_SIZE_TOTAL.add(size as i64);
448 :
449 0 : next_msg = read_message(pgb_reader, proto_version).await?;
450 : }
451 0 : }
452 :
453 : /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
454 : /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
455 : /// tell the error.
456 : ///
457 : /// This function is cancellation-safe (only does network I/O and channel read/writes).
458 0 : async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
459 0 : pgb_writer: &mut PostgresBackend<IO>,
460 0 : mut reply_rx: Receiver<AcceptorProposerMessage>,
461 0 : mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
462 0 : ) -> Result<(), CopyStreamHandlerEnd> {
463 0 : let mut buf = BytesMut::with_capacity(128);
464 0 :
465 0 : // storing append_response to inject PageserverFeedback into it
466 0 : let mut last_append_response = None;
467 :
468 : loop {
469 : // trying to read either AcceptorProposerMessage or PageserverFeedback
470 0 : let msg = tokio::select! {
471 0 : reply = reply_rx.recv() => {
472 0 : if let Some(msg) = reply {
473 0 : if let AcceptorProposerMessage::AppendResponse(append_response) = &msg {
474 0 : last_append_response = Some(append_response.clone());
475 0 : }
476 0 : Some(msg)
477 : } else {
478 0 : return Ok(()); // chan closed, WalAcceptor terminated
479 : }
480 : }
481 :
482 0 : feedback = pageserver_feedback_rx.recv() =>
483 0 : match (feedback, &last_append_response) {
484 0 : (Ok(feedback), Some(append_response)) => {
485 0 : // clone AppendResponse and inject PageserverFeedback into it
486 0 : let mut append_response = append_response.clone();
487 0 : append_response.pageserver_feedback = Some(feedback);
488 0 : Some(AcceptorProposerMessage::AppendResponse(append_response))
489 : }
490 0 : _ => None,
491 : },
492 : };
493 :
494 0 : let Some(msg) = msg else {
495 0 : continue;
496 : };
497 :
498 0 : buf.clear();
499 0 : msg.serialize(&mut buf)?;
500 0 : pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
501 : }
502 0 : }
503 :
504 : /// The WAL flush interval. This ensures we periodically flush the WAL and send AppendResponses to
505 : /// walproposer, even when it's writing a steady stream of messages.
506 : const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
507 :
508 : /// The metrics computation interval.
509 : ///
510 : /// The Prometheus poll interval is 60 seconds at the time of writing. We sample the queue depth
511 : /// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
512 : const METRICS_INTERVAL: Duration = Duration::from_secs(5);
513 :
514 : /// Encapsulates a task which takes messages from msg_rx, processes and pushes
515 : /// replies to reply_tx.
516 : ///
517 : /// Reading from socket and writing to disk in parallel is beneficial for
518 : /// performance, this struct provides the writing to disk part.
519 : pub struct WalAcceptor {
520 : tli: WalResidentTimeline,
521 : msg_rx: Receiver<ProposerAcceptorMessage>,
522 : reply_tx: Sender<AcceptorProposerMessage>,
523 : conn_id: Option<ConnectionId>,
524 : }
525 :
526 : impl WalAcceptor {
527 : /// Spawn task with WalAcceptor running, return handle to it. Task returns
528 : /// Ok(()) if either of channels has closed, and Err if any error during
529 : /// message processing is encountered.
530 : ///
531 : /// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
532 3 : pub fn spawn(
533 3 : tli: WalResidentTimeline,
534 3 : msg_rx: Receiver<ProposerAcceptorMessage>,
535 3 : reply_tx: Sender<AcceptorProposerMessage>,
536 3 : conn_id: Option<ConnectionId>,
537 3 : ) -> JoinHandle<anyhow::Result<()>> {
538 3 : task::spawn(async move {
539 3 : let mut wa = WalAcceptor {
540 3 : tli,
541 3 : msg_rx,
542 3 : reply_tx,
543 3 : conn_id,
544 3 : };
545 3 :
546 3 : let span_ttid = wa.tli.ttid; // satisfy borrow checker
547 3 : wa.run()
548 3 : .instrument(
549 3 : info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
550 : )
551 3 : .await
552 3 : })
553 3 : }
554 :
555 : /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
556 : /// it must mean that network thread terminated.
557 : ///
558 : /// This function is *not* cancellation safe, it does local disk I/O: it should always
559 : /// be allowed to run to completion. It respects Timeline::cancel and shuts down cleanly
560 : /// when that gets triggered.
561 3 : async fn run(&mut self) -> anyhow::Result<()> {
562 3 : let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
563 3 :
564 3 : // Periodically flush the WAL and compute metrics.
565 3 : let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
566 3 : flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
567 3 : flush_ticker.tick().await; // skip the initial, immediate tick
568 :
569 3 : let mut metrics_ticker = tokio::time::interval(METRICS_INTERVAL);
570 3 : metrics_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
571 3 :
572 3 : // Tracks whether we have unflushed appends.
573 3 : let mut dirty = false;
574 :
575 1206 : while !self.tli.is_cancelled() {
576 1206 : let reply = tokio::select! {
577 : // Process inbound message.
578 1206 : msg = self.msg_rx.recv() => {
579 : // If disconnected, break to flush WAL and return.
580 603 : let Some(mut msg) = msg else {
581 3 : break;
582 : };
583 :
584 : // Update gauge metrics.
585 600 : WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
586 600 : WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);
587 600 :
588 600 : // Update walreceiver state in shmem for reporting.
589 600 : if let ProposerAcceptorMessage::Elected(_) = &msg {
590 0 : walreceiver_guard.get().status = WalReceiverStatus::Streaming;
591 600 : }
592 :
593 : // Don't flush the WAL on every append, only periodically via flush_ticker.
594 : // This batches multiple appends per fsync. If the channel is empty after
595 : // sending the reply, we'll schedule an immediate flush.
596 : //
597 : // Note that a flush can still happen on segment bounds, which will result
598 : // in an AppendResponse.
599 600 : if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
600 600 : msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
601 600 : dirty = true;
602 600 : }
603 :
604 600 : self.tli.process_msg(&msg).await?
605 : }
606 :
607 : // While receiving AppendRequests, flush the WAL periodically and respond with an
608 : // AppendResponse to let walproposer know we're still alive.
609 1206 : _ = flush_ticker.tick(), if dirty => {
610 0 : dirty = false;
611 0 : self.tli
612 0 : .process_msg(&ProposerAcceptorMessage::FlushWAL)
613 0 : .await?
614 : }
615 :
616 : // If there are no pending messages, flush the WAL immediately.
617 : //
618 : // TODO: this should be done via flush_ticker.reset_immediately(), but that's always
619 : // delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
620 1206 : _ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
621 600 : dirty = false;
622 600 : flush_ticker.reset();
623 600 : self.tli
624 600 : .process_msg(&ProposerAcceptorMessage::FlushWAL)
625 600 : .await?
626 : }
627 :
628 : // Update histogram metrics periodically.
629 1206 : _ = metrics_ticker.tick() => {
630 3 : WAL_RECEIVER_QUEUE_DEPTH.observe(self.msg_rx.len() as f64);
631 3 : None // no reply
632 : }
633 :
634 1206 : _ = self.tli.cancel.cancelled() => {
635 0 : break;
636 : }
637 : };
638 :
639 : // Send reply, if any.
640 1203 : if let Some(reply) = reply {
641 600 : if self.reply_tx.send(reply).await.is_err() {
642 0 : break; // disconnected, break to flush WAL and return
643 600 : }
644 603 : }
645 : }
646 :
647 : // Flush WAL on disconnect, see https://github.com/neondatabase/neon/issues/9259.
648 3 : if dirty && !self.tli.cancel.is_cancelled() {
649 0 : self.tli
650 0 : .process_msg(&ProposerAcceptorMessage::FlushWAL)
651 0 : .await?;
652 3 : }
653 :
654 3 : Ok(())
655 3 : }
656 : }
657 :
658 : /// On drop, drain msg_rx and update metrics to avoid leaks.
659 : impl Drop for WalAcceptor {
660 3 : fn drop(&mut self) {
661 3 : self.msg_rx.close(); // prevent further sends
662 3 : while let Ok(msg) = self.msg_rx.try_recv() {
663 0 : WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
664 0 : WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);
665 0 : }
666 3 : }
667 : }
|