Line data Source code
1 : //! Safekeeper communication endpoint to WAL proposer (compute node).
2 : //! Gets messages from the network, passes them down to consensus module and
3 : //! sends replies back.
4 :
5 : use std::future;
6 : use std::net::SocketAddr;
7 : use std::sync::Arc;
8 :
9 : use anyhow::{Context, anyhow};
10 : use bytes::BytesMut;
11 : use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
12 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend, PostgresBackendReader, QueryError};
13 : use pq_proto::BeMessage;
14 : use safekeeper_api::ServerInfo;
15 : use safekeeper_api::membership::Configuration;
16 : use safekeeper_api::models::{ConnectionId, WalReceiverState, WalReceiverStatus};
17 : use tokio::io::{AsyncRead, AsyncWrite};
18 : use tokio::sync::mpsc::error::SendTimeoutError;
19 : use tokio::sync::mpsc::{Receiver, Sender, channel};
20 : use tokio::task;
21 : use tokio::task::JoinHandle;
22 : use tokio::time::{Duration, Instant, MissedTickBehavior};
23 : use tracing::*;
24 : use utils::id::TenantTimelineId;
25 : use utils::lsn::Lsn;
26 : use utils::pageserver_feedback::PageserverFeedback;
27 : use utils::pausable_failpoint;
28 :
29 : use crate::GlobalTimelines;
30 : use crate::handler::SafekeeperPostgresHandler;
31 : use crate::metrics::{
32 : WAL_RECEIVER_QUEUE_DEPTH, WAL_RECEIVER_QUEUE_DEPTH_TOTAL, WAL_RECEIVER_QUEUE_SIZE_TOTAL,
33 : WAL_RECEIVERS,
34 : };
35 : use crate::safekeeper::{AcceptorProposerMessage, ProposerAcceptorMessage};
36 : use crate::timeline::{TimelineError, WalResidentTimeline};
37 :
38 : const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
39 :
40 : /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
41 : /// in Arc).
42 : pub struct WalReceivers {
43 : mutex: Mutex<WalReceiversShared>,
44 : pageserver_feedback_tx: tokio::sync::broadcast::Sender<PageserverFeedback>,
45 :
46 : num_computes_tx: tokio::sync::watch::Sender<usize>,
47 : num_computes_rx: tokio::sync::watch::Receiver<usize>,
48 : }
49 :
50 : /// Id under which walreceiver is registered in shmem.
51 : type WalReceiverId = usize;
52 :
53 : impl WalReceivers {
54 5 : pub fn new() -> Arc<WalReceivers> {
55 5 : let (pageserver_feedback_tx, _) =
56 5 : tokio::sync::broadcast::channel(DEFAULT_FEEDBACK_CAPACITY);
57 :
58 5 : let (num_computes_tx, num_computes_rx) = tokio::sync::watch::channel(0usize);
59 :
60 5 : Arc::new(WalReceivers {
61 5 : mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
62 5 : pageserver_feedback_tx,
63 5 : num_computes_tx,
64 5 : num_computes_rx,
65 5 : })
66 5 : }
67 :
68 : /// Register new walreceiver. Returned guard provides access to the slot and
69 : /// automatically deregisters in Drop.
70 5 : pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
71 5 : let mut shared = self.mutex.lock();
72 5 : let slots = &mut shared.slots;
73 5 : let walreceiver = WalReceiverState {
74 5 : conn_id,
75 5 : status: WalReceiverStatus::Voting,
76 5 : };
77 : // find empty slot or create new one
78 5 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
79 0 : slots[pos] = Some(walreceiver);
80 0 : pos
81 : } else {
82 5 : let pos = slots.len();
83 5 : slots.push(Some(walreceiver));
84 5 : pos
85 : };
86 :
87 5 : self.update_num(&shared);
88 5 : WAL_RECEIVERS.inc();
89 :
90 5 : WalReceiverGuard {
91 5 : id: pos,
92 5 : walreceivers: self.clone(),
93 5 : }
94 5 : }
95 :
96 : /// Get reference to locked slot contents. Slot must exist (registered
97 : /// earlier).
98 0 : fn get_slot(
99 0 : self: &Arc<WalReceivers>,
100 0 : id: WalReceiverId,
101 0 : ) -> MappedMutexGuard<'_, WalReceiverState> {
102 0 : MutexGuard::map(self.mutex.lock(), |locked| {
103 0 : locked.slots[id]
104 0 : .as_mut()
105 0 : .expect("walreceiver doesn't exist")
106 0 : })
107 0 : }
108 :
109 : /// Get number of walreceivers (compute connections).
110 0 : pub fn get_num(self: &Arc<WalReceivers>) -> usize {
111 0 : self.mutex.lock().get_num()
112 0 : }
113 :
114 : /// Get channel for number of walreceivers.
115 5 : pub fn get_num_rx(self: &Arc<WalReceivers>) -> tokio::sync::watch::Receiver<usize> {
116 5 : self.num_computes_rx.clone()
117 5 : }
118 :
119 : /// Should get called after every update of slots.
120 10 : fn update_num(self: &Arc<WalReceivers>, shared: &MutexGuard<WalReceiversShared>) {
121 10 : let num = shared.get_num();
122 10 : self.num_computes_tx.send_replace(num);
123 10 : }
124 :
125 : /// Get state of all walreceivers.
126 0 : pub fn get_all(self: &Arc<WalReceivers>) -> Vec<WalReceiverState> {
127 0 : self.mutex.lock().slots.iter().flatten().cloned().collect()
128 0 : }
129 :
130 : /// Get number of streaming walreceivers (normally 0 or 1) from compute.
131 5 : pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
132 5 : self.mutex
133 5 : .lock()
134 5 : .slots
135 5 : .iter()
136 5 : .flatten()
137 : // conn_id.is_none skips recovery which also registers here
138 5 : .filter(|s| s.conn_id.is_some() && matches!(s.status, WalReceiverStatus::Streaming))
139 5 : .count()
140 5 : }
141 :
142 : /// Unregister walreceiver.
143 5 : fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
144 5 : let mut shared = self.mutex.lock();
145 5 : shared.slots[id] = None;
146 5 : self.update_num(&shared);
147 5 : WAL_RECEIVERS.dec();
148 5 : }
149 :
150 : /// Broadcast pageserver feedback to connected walproposers.
151 0 : pub fn broadcast_pageserver_feedback(&self, feedback: PageserverFeedback) {
152 : // Err means there is no subscribers, it is fine.
153 0 : let _ = self.pageserver_feedback_tx.send(feedback);
154 0 : }
155 : }
156 :
157 : /// Only a few connections are expected (normally one), so store in Vec.
158 : struct WalReceiversShared {
159 : slots: Vec<Option<WalReceiverState>>,
160 : }
161 :
162 : impl WalReceiversShared {
163 : /// Get number of walreceivers (compute connections).
164 10 : fn get_num(&self) -> usize {
165 10 : self.slots.iter().flatten().count()
166 10 : }
167 : }
168 :
169 : /// Scope guard to access slot in WalReceivers registry and unregister from
170 : /// it in Drop.
171 : pub struct WalReceiverGuard {
172 : id: WalReceiverId,
173 : walreceivers: Arc<WalReceivers>,
174 : }
175 :
176 : impl WalReceiverGuard {
177 : /// Get reference to locked shared state contents.
178 0 : fn get(&self) -> MappedMutexGuard<WalReceiverState> {
179 0 : self.walreceivers.get_slot(self.id)
180 0 : }
181 : }
182 :
183 : impl Drop for WalReceiverGuard {
184 5 : fn drop(&mut self) {
185 5 : self.walreceivers.unregister(self.id);
186 5 : }
187 : }
188 :
189 : pub const MSG_QUEUE_SIZE: usize = 256;
190 : pub const REPLY_QUEUE_SIZE: usize = 16;
191 :
192 : impl SafekeeperPostgresHandler {
193 : /// Wrapper around handle_start_wal_push_guts handling result. Error is
194 : /// handled here while we're still in walreceiver ttid span; with API
195 : /// extension, this can probably be moved into postgres_backend.
196 0 : pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
197 0 : &mut self,
198 0 : pgb: &mut PostgresBackend<IO>,
199 0 : proto_version: u32,
200 0 : allow_timeline_creation: bool,
201 0 : ) -> Result<(), QueryError> {
202 0 : let mut tli: Option<WalResidentTimeline> = None;
203 0 : if let Err(end) = self
204 0 : .handle_start_wal_push_guts(pgb, &mut tli, proto_version, allow_timeline_creation)
205 0 : .await
206 : {
207 : // Log the result and probably send it to the client, closing the stream.
208 0 : let handle_end_fut = pgb.handle_copy_stream_end(end);
209 : // If we managed to create the timeline, augment logging with current LSNs etc.
210 0 : if let Some(tli) = tli {
211 0 : let info = tli.get_safekeeper_info(&self.conf).await;
212 0 : handle_end_fut
213 0 : .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.commit_lsn)))
214 0 : .await;
215 : } else {
216 0 : handle_end_fut.await;
217 : }
218 0 : }
219 0 : Ok(())
220 0 : }
221 :
222 0 : pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
223 0 : &mut self,
224 0 : pgb: &mut PostgresBackend<IO>,
225 0 : tli: &mut Option<WalResidentTimeline>,
226 0 : proto_version: u32,
227 0 : allow_timeline_creation: bool,
228 0 : ) -> Result<(), CopyStreamHandlerEnd> {
229 : // The `tli` parameter is only used for passing _out_ a timeline, one should
230 : // not have been passed in.
231 0 : assert!(tli.is_none());
232 :
233 : // Notify the libpq client that it's allowed to send `CopyData` messages
234 0 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
235 :
236 : // Experiments [1] confirm that doing network IO in one (this) thread and
237 : // processing with disc IO in another significantly improves
238 : // performance; we spawn off WalAcceptor thread for message processing
239 : // to this end.
240 : //
241 : // [1] https://github.com/neondatabase/neon/pull/1318
242 0 : let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
243 0 : let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
244 0 : let mut acceptor_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
245 :
246 : // Concurrently receive and send data; replies are not synchronized with
247 : // sends, so this avoids deadlocks.
248 0 : let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
249 0 : let peer_addr = *pgb.get_peer_addr();
250 :
251 0 : let mut network_reader = NetworkReader {
252 0 : ttid: self.ttid,
253 0 : conn_id: self.conn_id,
254 0 : pgb_reader: &mut pgb_reader,
255 0 : peer_addr,
256 0 : proto_version,
257 0 : acceptor_handle: &mut acceptor_handle,
258 0 : global_timelines: self.global_timelines.clone(),
259 0 : };
260 :
261 : // Read first message and create timeline if needed and allowed. This
262 : // won't be when timelines will be always created by storcon and
263 : // allow_timeline_creation becomes false.
264 0 : let res = network_reader
265 0 : .read_first_message(allow_timeline_creation)
266 0 : .await;
267 :
268 0 : let network_res = if let Ok((timeline, next_msg)) = res {
269 0 : let pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback> =
270 0 : timeline
271 0 : .get_walreceivers()
272 0 : .pageserver_feedback_tx
273 0 : .subscribe();
274 0 : *tli = Some(timeline.wal_residence_guard().await?);
275 :
276 0 : let timeline_cancel = timeline.cancel.clone();
277 0 : tokio::select! {
278 : // todo: add read|write .context to these errors
279 0 : r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline, next_msg) => r,
280 0 : r = network_write(pgb, reply_rx, pageserver_feedback_rx, proto_version) => r,
281 0 : _ = timeline_cancel.cancelled() => {
282 0 : return Err(CopyStreamHandlerEnd::Cancelled);
283 : }
284 : }
285 : } else {
286 0 : res.map(|_| ())
287 : };
288 :
289 : // Join pg backend back.
290 0 : pgb.unsplit(pgb_reader)?;
291 :
292 : // Join the spawned WalAcceptor. At this point chans to/from it passed
293 : // to network routines are dropped, so it will exit as soon as it
294 : // touches them.
295 0 : match acceptor_handle {
296 : None => {
297 : // failed even before spawning; read_network should have error
298 0 : Err(network_res.expect_err("no error with WalAcceptor not spawn"))
299 : }
300 0 : Some(handle) => {
301 0 : let wal_acceptor_res = handle.await;
302 :
303 : // If there was any network error, return it.
304 0 : network_res?;
305 :
306 : // Otherwise, WalAcceptor thread must have errored.
307 0 : match wal_acceptor_res {
308 0 : Ok(Ok(_)) => Ok(()), // Clean shutdown
309 0 : Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
310 0 : Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
311 0 : "WalAcceptor task panicked",
312 0 : ))),
313 : }
314 : }
315 : }
316 0 : }
317 : }
318 :
319 : struct NetworkReader<'a, IO> {
320 : ttid: TenantTimelineId,
321 : conn_id: ConnectionId,
322 : pgb_reader: &'a mut PostgresBackendReader<IO>,
323 : peer_addr: SocketAddr,
324 : proto_version: u32,
325 : // WalAcceptor is spawned when we learn server info from walproposer and
326 : // create timeline; handle is put here.
327 : acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
328 : global_timelines: Arc<GlobalTimelines>,
329 : }
330 :
331 : impl<IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'_, IO> {
332 0 : async fn read_first_message(
333 0 : &mut self,
334 0 : allow_timeline_creation: bool,
335 0 : ) -> Result<(WalResidentTimeline, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
336 : // Receive information about server to create timeline, if not yet.
337 0 : let next_msg = read_message(self.pgb_reader, self.proto_version).await?;
338 0 : let tli = match next_msg {
339 0 : ProposerAcceptorMessage::Greeting(ref greeting) => {
340 0 : info!(
341 0 : "start handshake with walproposer {} sysid {}",
342 : self.peer_addr, greeting.system_id,
343 : );
344 0 : let server_info = ServerInfo {
345 0 : pg_version: greeting.pg_version,
346 0 : system_id: greeting.system_id,
347 0 : wal_seg_size: greeting.wal_seg_size,
348 0 : };
349 0 : let tli = if allow_timeline_creation {
350 0 : self.global_timelines
351 0 : .create(
352 0 : self.ttid,
353 0 : Configuration::empty(),
354 0 : server_info,
355 0 : Lsn::INVALID,
356 0 : Lsn::INVALID,
357 0 : )
358 0 : .await
359 0 : .context("create timeline")?
360 : } else {
361 0 : let timeline_res = self.global_timelines.get(self.ttid);
362 0 : match timeline_res {
363 0 : Ok(tl) => tl,
364 : Err(TimelineError::NotFound(_)) => {
365 0 : return Err(CopyStreamHandlerEnd::TimelineNoCreate);
366 : }
367 0 : other => other.context("get_timeline")?,
368 : }
369 : };
370 0 : tli.wal_residence_guard().await?
371 : }
372 : _ => {
373 0 : return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
374 0 : "unexpected message {next_msg:?} instead of greeting"
375 0 : )));
376 : }
377 : };
378 0 : Ok((tli, next_msg))
379 0 : }
380 :
381 : /// This function is cancellation-safe (only does network I/O and channel read/writes).
382 0 : async fn run(
383 0 : self,
384 0 : msg_tx: Sender<ProposerAcceptorMessage>,
385 0 : msg_rx: Receiver<ProposerAcceptorMessage>,
386 0 : reply_tx: Sender<AcceptorProposerMessage>,
387 0 : tli: WalResidentTimeline,
388 0 : next_msg: ProposerAcceptorMessage,
389 0 : ) -> Result<(), CopyStreamHandlerEnd> {
390 0 : *self.acceptor_handle = Some(WalAcceptor::spawn(
391 0 : tli,
392 0 : msg_rx,
393 0 : reply_tx,
394 0 : Some(self.conn_id),
395 0 : ));
396 :
397 : // Forward all messages to WalAcceptor
398 0 : read_network_loop(self.pgb_reader, msg_tx, next_msg, self.proto_version).await
399 0 : }
400 : }
401 :
402 : /// Read next message from walproposer.
403 : /// TODO: Return Ok(None) on graceful termination.
404 0 : async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
405 0 : pgb_reader: &mut PostgresBackendReader<IO>,
406 0 : proto_version: u32,
407 0 : ) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
408 0 : let copy_data = pgb_reader.read_copy_message().await?;
409 0 : let msg = ProposerAcceptorMessage::parse(copy_data, proto_version)?;
410 0 : Ok(msg)
411 0 : }
412 :
413 0 : async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
414 0 : pgb_reader: &mut PostgresBackendReader<IO>,
415 0 : msg_tx: Sender<ProposerAcceptorMessage>,
416 0 : mut next_msg: ProposerAcceptorMessage,
417 0 : proto_version: u32,
418 0 : ) -> Result<(), CopyStreamHandlerEnd> {
419 : /// Threshold for logging slow WalAcceptor sends.
420 : const SLOW_THRESHOLD: Duration = Duration::from_secs(5);
421 :
422 : loop {
423 0 : let started = Instant::now();
424 0 : let size = next_msg.size();
425 :
426 0 : match msg_tx.send_timeout(next_msg, SLOW_THRESHOLD).await {
427 0 : Ok(()) => {}
428 : // Slow send, log a message and keep trying. Log context has timeline ID.
429 0 : Err(SendTimeoutError::Timeout(next_msg)) => {
430 0 : warn!(
431 0 : "slow WalAcceptor send blocked for {:.3}s",
432 0 : Instant::now().duration_since(started).as_secs_f64()
433 : );
434 0 : if msg_tx.send(next_msg).await.is_err() {
435 0 : return Ok(()); // WalAcceptor terminated
436 0 : }
437 0 : warn!(
438 0 : "slow WalAcceptor send completed after {:.3}s",
439 0 : Instant::now().duration_since(started).as_secs_f64()
440 : )
441 : }
442 : // WalAcceptor terminated.
443 0 : Err(SendTimeoutError::Closed(_)) => return Ok(()),
444 : }
445 :
446 : // Update metrics. Will be decremented in WalAcceptor.
447 0 : WAL_RECEIVER_QUEUE_DEPTH_TOTAL.inc();
448 0 : WAL_RECEIVER_QUEUE_SIZE_TOTAL.add(size as i64);
449 :
450 0 : next_msg = read_message(pgb_reader, proto_version).await?;
451 : }
452 0 : }
453 :
454 : /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
455 : /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
456 : /// tell the error.
457 : ///
458 : /// This function is cancellation-safe (only does network I/O and channel read/writes).
459 0 : async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
460 0 : pgb_writer: &mut PostgresBackend<IO>,
461 0 : mut reply_rx: Receiver<AcceptorProposerMessage>,
462 0 : mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
463 0 : proto_version: u32,
464 0 : ) -> Result<(), CopyStreamHandlerEnd> {
465 0 : let mut buf = BytesMut::with_capacity(128);
466 :
467 : // storing append_response to inject PageserverFeedback into it
468 0 : let mut last_append_response = None;
469 :
470 : loop {
471 : // trying to read either AcceptorProposerMessage or PageserverFeedback
472 0 : let msg = tokio::select! {
473 0 : reply = reply_rx.recv() => {
474 0 : if let Some(msg) = reply {
475 0 : if let AcceptorProposerMessage::AppendResponse(append_response) = &msg {
476 0 : last_append_response = Some(append_response.clone());
477 0 : }
478 0 : Some(msg)
479 : } else {
480 0 : return Ok(()); // chan closed, WalAcceptor terminated
481 : }
482 : }
483 :
484 0 : feedback = pageserver_feedback_rx.recv() =>
485 0 : match (feedback, &last_append_response) {
486 0 : (Ok(feedback), Some(append_response)) => {
487 : // clone AppendResponse and inject PageserverFeedback into it
488 0 : let mut append_response = append_response.clone();
489 0 : append_response.pageserver_feedback = Some(feedback);
490 0 : Some(AcceptorProposerMessage::AppendResponse(append_response))
491 : }
492 0 : _ => None,
493 : },
494 : };
495 :
496 0 : let Some(msg) = msg else {
497 0 : continue;
498 : };
499 :
500 0 : buf.clear();
501 0 : msg.serialize(&mut buf, proto_version)?;
502 0 : pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
503 : }
504 0 : }
505 :
506 : /// The WAL flush interval. This ensures we periodically flush the WAL and send AppendResponses to
507 : /// walproposer, even when it's writing a steady stream of messages.
508 : const FLUSH_INTERVAL: Duration = Duration::from_secs(1);
509 :
510 : /// The metrics computation interval.
511 : ///
512 : /// The Prometheus poll interval is 60 seconds at the time of writing. We sample the queue depth
513 : /// every 5 seconds, for 12 samples per poll. This will give a count of up to 12x active timelines.
514 : const METRICS_INTERVAL: Duration = Duration::from_secs(5);
515 :
516 : /// Encapsulates a task which takes messages from msg_rx, processes and pushes
517 : /// replies to reply_tx.
518 : ///
519 : /// Reading from socket and writing to disk in parallel is beneficial for
520 : /// performance, this struct provides the writing to disk part.
521 : pub struct WalAcceptor {
522 : tli: WalResidentTimeline,
523 : msg_rx: Receiver<ProposerAcceptorMessage>,
524 : reply_tx: Sender<AcceptorProposerMessage>,
525 : conn_id: Option<ConnectionId>,
526 : }
527 :
528 : impl WalAcceptor {
529 : /// Spawn task with WalAcceptor running, return handle to it. Task returns
530 : /// Ok(()) if either of channels has closed, and Err if any error during
531 : /// message processing is encountered.
532 : ///
533 : /// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
534 5 : pub fn spawn(
535 5 : tli: WalResidentTimeline,
536 5 : msg_rx: Receiver<ProposerAcceptorMessage>,
537 5 : reply_tx: Sender<AcceptorProposerMessage>,
538 5 : conn_id: Option<ConnectionId>,
539 5 : ) -> JoinHandle<anyhow::Result<()>> {
540 5 : task::spawn(async move {
541 5 : let mut wa = WalAcceptor {
542 5 : tli,
543 5 : msg_rx,
544 5 : reply_tx,
545 5 : conn_id,
546 5 : };
547 :
548 5 : let span_ttid = wa.tli.ttid; // satisfy borrow checker
549 5 : wa.run()
550 5 : .instrument(
551 5 : info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
552 : )
553 5 : .await
554 4 : })
555 5 : }
556 :
557 : /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
558 : /// it must mean that network thread terminated.
559 : ///
560 : /// This function is *not* cancellation safe, it does local disk I/O: it should always
561 : /// be allowed to run to completion. It respects Timeline::cancel and shuts down cleanly
562 : /// when that gets triggered.
563 5 : async fn run(&mut self) -> anyhow::Result<()> {
564 5 : let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
565 :
566 : // Periodically flush the WAL and compute metrics.
567 5 : let mut flush_ticker = tokio::time::interval(FLUSH_INTERVAL);
568 5 : flush_ticker.set_missed_tick_behavior(MissedTickBehavior::Delay);
569 5 : flush_ticker.tick().await; // skip the initial, immediate tick
570 :
571 5 : let mut metrics_ticker = tokio::time::interval(METRICS_INTERVAL);
572 5 : metrics_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
573 :
574 : // Tracks whether we have unflushed appends.
575 5 : let mut dirty = false;
576 :
577 1250 : while !self.tli.is_cancelled() {
578 1250 : let reply = tokio::select! {
579 : // Process inbound message.
580 1250 : msg = self.msg_rx.recv() => {
581 : // If disconnected, break to flush WAL and return.
582 624 : let Some(mut msg) = msg else {
583 4 : break;
584 : };
585 :
586 : // Update gauge metrics.
587 620 : WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
588 620 : WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);
589 :
590 : // Update walreceiver state in shmem for reporting.
591 620 : if let ProposerAcceptorMessage::Elected(_) = &msg {
592 0 : walreceiver_guard.get().status = WalReceiverStatus::Streaming;
593 620 : }
594 :
595 : // Don't flush the WAL on every append, only periodically via flush_ticker.
596 : // This batches multiple appends per fsync. If the channel is empty after
597 : // sending the reply, we'll schedule an immediate flush.
598 : //
599 : // Note that a flush can still happen on segment bounds, which will result
600 : // in an AppendResponse.
601 620 : if let ProposerAcceptorMessage::AppendRequest(append_request) = msg {
602 : // allow tests to pause AppendRequest processing to simulate lag
603 620 : pausable_failpoint!("sk-acceptor-pausable");
604 620 : msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
605 620 : dirty = true;
606 0 : }
607 :
608 620 : self.tli.process_msg(&msg).await?
609 : }
610 :
611 : // While receiving AppendRequests, flush the WAL periodically and respond with an
612 : // AppendResponse to let walproposer know we're still alive.
613 1250 : _ = flush_ticker.tick(), if dirty => {
614 0 : dirty = false;
615 0 : self.tli
616 0 : .process_msg(&ProposerAcceptorMessage::FlushWAL)
617 0 : .await?
618 : }
619 :
620 : // If there are no pending messages, flush the WAL immediately.
621 : //
622 : // TODO: this should be done via flush_ticker.reset_immediately(), but that's always
623 : // delayed by 1ms due to this bug: https://github.com/tokio-rs/tokio/issues/6866.
624 1250 : _ = future::ready(()), if dirty && self.msg_rx.is_empty() => {
625 620 : dirty = false;
626 620 : flush_ticker.reset();
627 620 : self.tli
628 620 : .process_msg(&ProposerAcceptorMessage::FlushWAL)
629 620 : .await?
630 : }
631 :
632 : // Update histogram metrics periodically.
633 1250 : _ = metrics_ticker.tick() => {
634 5 : WAL_RECEIVER_QUEUE_DEPTH.observe(self.msg_rx.len() as f64);
635 5 : None // no reply
636 : }
637 :
638 1250 : _ = self.tli.cancel.cancelled() => {
639 0 : break;
640 : }
641 : };
642 :
643 : // Send reply, if any.
644 1245 : if let Some(reply) = reply {
645 620 : if self.reply_tx.send(reply).await.is_err() {
646 0 : break; // disconnected, break to flush WAL and return
647 620 : }
648 625 : }
649 : }
650 :
651 : // Flush WAL on disconnect, see https://github.com/neondatabase/neon/issues/9259.
652 4 : if dirty && !self.tli.cancel.is_cancelled() {
653 0 : self.tli
654 0 : .process_msg(&ProposerAcceptorMessage::FlushWAL)
655 0 : .await?;
656 4 : }
657 :
658 4 : Ok(())
659 4 : }
660 : }
661 :
662 : /// On drop, drain msg_rx and update metrics to avoid leaks.
663 : impl Drop for WalAcceptor {
664 5 : fn drop(&mut self) {
665 5 : self.msg_rx.close(); // prevent further sends
666 5 : while let Ok(msg) = self.msg_rx.try_recv() {
667 0 : WAL_RECEIVER_QUEUE_DEPTH_TOTAL.dec();
668 0 : WAL_RECEIVER_QUEUE_SIZE_TOTAL.sub(msg.size() as i64);
669 0 : }
670 5 : }
671 : }
|