Line data Source code
1 : //! Safekeeper communication endpoint to WAL proposer (compute node).
2 : //! Gets messages from the network, passes them down to consensus module and
3 : //! sends replies back.
4 :
5 : use crate::handler::SafekeeperPostgresHandler;
6 : use crate::safekeeper::AcceptorProposerMessage;
7 : use crate::safekeeper::ProposerAcceptorMessage;
8 : use crate::safekeeper::ServerInfo;
9 : use crate::timeline::Timeline;
10 : use crate::wal_service::ConnectionId;
11 : use crate::GlobalTimelines;
12 : use anyhow::{anyhow, Context};
13 : use bytes::BytesMut;
14 : use parking_lot::MappedMutexGuard;
15 : use parking_lot::Mutex;
16 : use parking_lot::MutexGuard;
17 : use postgres_backend::CopyStreamHandlerEnd;
18 : use postgres_backend::PostgresBackend;
19 : use postgres_backend::PostgresBackendReader;
20 : use postgres_backend::QueryError;
21 : use pq_proto::BeMessage;
22 : use serde::Deserialize;
23 : use serde::Serialize;
24 : use std::net::SocketAddr;
25 : use std::sync::Arc;
26 : use tokio::io::AsyncRead;
27 : use tokio::io::AsyncWrite;
28 : use tokio::sync::mpsc::channel;
29 : use tokio::sync::mpsc::error::TryRecvError;
30 : use tokio::sync::mpsc::Receiver;
31 : use tokio::sync::mpsc::Sender;
32 : use tokio::task;
33 : use tokio::task::JoinHandle;
34 : use tokio::time::Duration;
35 : use tokio::time::Instant;
36 : use tracing::*;
37 : use utils::id::TenantTimelineId;
38 : use utils::lsn::Lsn;
39 :
40 : /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
41 : /// in Arc).
42 : pub struct WalReceivers {
43 : mutex: Mutex<WalReceiversShared>,
44 : }
45 :
46 : /// Id under which walreceiver is registered in shmem.
47 : type WalReceiverId = usize;
48 :
49 : impl WalReceivers {
50 612 : pub fn new() -> Arc<WalReceivers> {
51 612 : Arc::new(WalReceivers {
52 612 : mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
53 612 : })
54 612 : }
55 :
56 : /// Register new walreceiver. Returned guard provides access to the slot and
57 : /// automatically deregisters in Drop.
58 1849 : pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
59 1849 : let slots = &mut self.mutex.lock().slots;
60 1849 : let walreceiver = WalReceiverState {
61 1849 : conn_id,
62 1849 : status: WalReceiverStatus::Voting,
63 1849 : };
64 : // find empty slot or create new one
65 1849 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
66 1293 : slots[pos] = Some(walreceiver);
67 1293 : pos
68 : } else {
69 556 : let pos = slots.len();
70 556 : slots.push(Some(walreceiver));
71 556 : pos
72 : };
73 1849 : WalReceiverGuard {
74 1849 : id: pos,
75 1849 : walreceivers: self.clone(),
76 1849 : }
77 1849 : }
78 :
79 : /// Get reference to locked slot contents. Slot must exist (registered
80 : /// earlier).
81 864 : fn get_slot<'a>(
82 864 : self: &'a Arc<WalReceivers>,
83 864 : id: WalReceiverId,
84 864 : ) -> MappedMutexGuard<'a, WalReceiverState> {
85 864 : MutexGuard::map(self.mutex.lock(), |locked| {
86 864 : locked.slots[id]
87 864 : .as_mut()
88 864 : .expect("walreceiver doesn't exist")
89 864 : })
90 864 : }
91 :
92 : /// Get number of walreceivers (compute connections).
93 18832 : pub fn get_num(self: &Arc<WalReceivers>) -> usize {
94 18832 : self.mutex.lock().slots.iter().flatten().count()
95 18832 : }
96 :
97 : /// Get state of all walreceivers.
98 255 : pub fn get_all(self: &Arc<WalReceivers>) -> Vec<WalReceiverState> {
99 255 : self.mutex.lock().slots.iter().flatten().cloned().collect()
100 255 : }
101 :
102 : /// Get number of streaming walreceivers (normally 0 or 1) from compute.
103 3 : pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
104 3 : self.mutex
105 3 : .lock()
106 3 : .slots
107 3 : .iter()
108 3 : .flatten()
109 3 : // conn_id.is_none skips recovery which also registers here
110 3 : .filter(|s| s.conn_id.is_some() && matches!(s.status, WalReceiverStatus::Streaming))
111 3 : .count()
112 3 : }
113 :
114 : /// Unregister walreceiver.
115 1777 : fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
116 1777 : let mut shared = self.mutex.lock();
117 1777 : shared.slots[id] = None;
118 1777 : }
119 : }
120 :
121 : /// Only a few connections are expected (normally one), so store in Vec.
122 : struct WalReceiversShared {
123 : slots: Vec<Option<WalReceiverState>>,
124 : }
125 :
126 171 : #[derive(Debug, Clone, Serialize, Deserialize)]
127 : pub struct WalReceiverState {
128 : /// None means it is recovery initiated by us (this safekeeper).
129 : pub conn_id: Option<ConnectionId>,
130 : pub status: WalReceiverStatus,
131 : }
132 :
133 : /// Walreceiver status. Currently only whether it passed voting stage and
134 : /// started receiving the stream, but it is easy to add more if needed.
135 171 : #[derive(Debug, Clone, Serialize, Deserialize)]
136 : pub enum WalReceiverStatus {
137 : Voting,
138 : Streaming,
139 : }
140 :
141 : /// Scope guard to access slot in WalReceivers registry and unregister from
142 : /// it in Drop.
143 : pub struct WalReceiverGuard {
144 : id: WalReceiverId,
145 : walreceivers: Arc<WalReceivers>,
146 : }
147 :
148 : impl WalReceiverGuard {
149 : /// Get reference to locked shared state contents.
150 864 : fn get(&self) -> MappedMutexGuard<WalReceiverState> {
151 864 : self.walreceivers.get_slot(self.id)
152 864 : }
153 : }
154 :
155 : impl Drop for WalReceiverGuard {
156 1777 : fn drop(&mut self) {
157 1777 : self.walreceivers.unregister(self.id);
158 1777 : }
159 : }
160 :
161 : pub const MSG_QUEUE_SIZE: usize = 256;
162 : pub const REPLY_QUEUE_SIZE: usize = 16;
163 :
164 : impl SafekeeperPostgresHandler {
165 : /// Wrapper around handle_start_wal_push_guts handling result. Error is
166 : /// handled here while we're still in walreceiver ttid span; with API
167 : /// extension, this can probably be moved into postgres_backend.
168 1848 : pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
169 1848 : &mut self,
170 1848 : pgb: &mut PostgresBackend<IO>,
171 1848 : ) -> Result<(), QueryError> {
172 3534950 : if let Err(end) = self.handle_start_wal_push_guts(pgb).await {
173 : // Log the result and probably send it to the client, closing the stream.
174 1776 : pgb.handle_copy_stream_end(end).await;
175 0 : }
176 1776 : Ok(())
177 1776 : }
178 :
179 1848 : pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
180 1848 : &mut self,
181 1848 : pgb: &mut PostgresBackend<IO>,
182 1848 : ) -> Result<(), CopyStreamHandlerEnd> {
183 1848 : // Notify the libpq client that it's allowed to send `CopyData` messages
184 1848 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
185 :
186 : // Experiments [1] confirm that doing network IO in one (this) thread and
187 : // processing with disc IO in another significantly improves
188 : // performance; we spawn off WalAcceptor thread for message processing
189 : // to this end.
190 : //
191 : // [1] https://github.com/neondatabase/neon/pull/1318
192 1848 : let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
193 1848 : let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
194 1848 : let mut acceptor_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
195 :
196 : // Concurrently receive and send data; replies are not synchronized with
197 : // sends, so this avoids deadlocks.
198 1848 : let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
199 1848 : let peer_addr = *pgb.get_peer_addr();
200 1848 : let network_reader = NetworkReader {
201 1848 : ttid: self.ttid,
202 1848 : conn_id: self.conn_id,
203 1848 : pgb_reader: &mut pgb_reader,
204 1848 : peer_addr,
205 1848 : acceptor_handle: &mut acceptor_handle,
206 1848 : };
207 1848 : let res = tokio::select! {
208 : // todo: add read|write .context to these errors
209 1774 : r = network_reader.run(msg_tx, msg_rx, reply_tx) => r,
210 2 : r = network_write(pgb, reply_rx) => r,
211 : };
212 :
213 : // Join pg backend back.
214 1776 : pgb.unsplit(pgb_reader)?;
215 :
216 : // Join the spawned WalAcceptor. At this point chans to/from it passed
217 : // to network routines are dropped, so it will exit as soon as it
218 : // touches them.
219 1776 : match acceptor_handle {
220 : None => {
221 : // failed even before spawning; read_network should have error
222 0 : Err(res.expect_err("no error with WalAcceptor not spawn"))
223 : }
224 1776 : Some(handle) => {
225 1781 : let wal_acceptor_res = handle.await;
226 :
227 : // If there was any network error, return it.
228 1776 : res?;
229 :
230 : // Otherwise, WalAcceptor thread must have errored.
231 2 : match wal_acceptor_res {
232 0 : Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination
233 2 : Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
234 0 : Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
235 0 : "WalAcceptor task panicked",
236 0 : ))),
237 : }
238 : }
239 : }
240 1776 : }
241 : }
242 :
243 : struct NetworkReader<'a, IO> {
244 : ttid: TenantTimelineId,
245 : conn_id: ConnectionId,
246 : pgb_reader: &'a mut PostgresBackendReader<IO>,
247 : peer_addr: SocketAddr,
248 : // WalAcceptor is spawned when we learn server info from walproposer and
249 : // create timeline; handle is put here.
250 : acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
251 : }
252 :
253 : impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
254 1848 : async fn run(
255 1848 : self,
256 1848 : msg_tx: Sender<ProposerAcceptorMessage>,
257 1848 : msg_rx: Receiver<ProposerAcceptorMessage>,
258 1848 : reply_tx: Sender<AcceptorProposerMessage>,
259 1848 : ) -> Result<(), CopyStreamHandlerEnd> {
260 : // Receive information about server to create timeline, if not yet.
261 1848 : let next_msg = read_message(self.pgb_reader).await?;
262 1848 : let tli = match next_msg {
263 1848 : ProposerAcceptorMessage::Greeting(ref greeting) => {
264 1848 : info!(
265 1848 : "start handshake with walproposer {} sysid {} timeline {}",
266 1848 : self.peer_addr, greeting.system_id, greeting.tli,
267 1848 : );
268 1848 : let server_info = ServerInfo {
269 1848 : pg_version: greeting.pg_version,
270 1848 : system_id: greeting.system_id,
271 1848 : wal_seg_size: greeting.wal_seg_size,
272 1848 : };
273 2631 : GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID).await?
274 : }
275 : _ => {
276 0 : return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
277 0 : "unexpected message {next_msg:?} instead of greeting"
278 0 : )))
279 : }
280 : };
281 :
282 1848 : *self.acceptor_handle = Some(WalAcceptor::spawn(
283 1848 : tli.clone(),
284 1848 : msg_rx,
285 1848 : reply_tx,
286 1848 : Some(self.conn_id),
287 1848 : ));
288 1848 :
289 1848 : // Forward all messages to WalAcceptor
290 3528694 : read_network_loop(self.pgb_reader, msg_tx, next_msg).await
291 1774 : }
292 : }
293 :
294 : /// Read next message from walproposer.
295 : /// TODO: Return Ok(None) on graceful termination.
296 2678525 : async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
297 2678525 : pgb_reader: &mut PostgresBackendReader<IO>,
298 2678525 : ) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
299 3529950 : let copy_data = pgb_reader.read_copy_message().await?;
300 2676677 : let msg = ProposerAcceptorMessage::parse(copy_data)?;
301 2676677 : Ok(msg)
302 2678451 : }
303 :
304 1848 : async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
305 1848 : pgb_reader: &mut PostgresBackendReader<IO>,
306 1848 : msg_tx: Sender<ProposerAcceptorMessage>,
307 1848 : mut next_msg: ProposerAcceptorMessage,
308 1848 : ) -> Result<(), CopyStreamHandlerEnd> {
309 : loop {
310 2676677 : if msg_tx.send(next_msg).await.is_err() {
311 0 : return Ok(()); // chan closed, WalAcceptor terminated
312 2676677 : }
313 3528108 : next_msg = read_message(pgb_reader).await?;
314 : }
315 1774 : }
316 :
317 : /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
318 : /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
319 : /// tell the error.
320 1848 : async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
321 1848 : pgb_writer: &mut PostgresBackend<IO>,
322 1848 : mut reply_rx: Receiver<AcceptorProposerMessage>,
323 1848 : ) -> Result<(), CopyStreamHandlerEnd> {
324 1848 : let mut buf = BytesMut::with_capacity(128);
325 :
326 : loop {
327 3532316 : match reply_rx.recv().await {
328 1663975 : Some(msg) => {
329 1663975 : buf.clear();
330 1663975 : msg.serialize(&mut buf)?;
331 1663975 : pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
332 : }
333 2 : None => return Ok(()), // chan closed, WalAcceptor terminated
334 : }
335 : }
336 2 : }
337 :
338 : // Send keepalive messages to walproposer, to make sure it receives updates
339 : // even when it writes a steady stream of messages.
340 : const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
341 :
342 : /// Encapsulates a task which takes messages from msg_rx, processes and pushes
343 : /// replies to reply_tx; reading from socket and writing to disk in parallel is
344 : /// beneficial for performance, this struct provides writing to disk part.
345 : pub struct WalAcceptor {
346 : tli: Arc<Timeline>,
347 : msg_rx: Receiver<ProposerAcceptorMessage>,
348 : reply_tx: Sender<AcceptorProposerMessage>,
349 : conn_id: Option<ConnectionId>,
350 : }
351 :
352 : impl WalAcceptor {
353 : /// Spawn task with WalAcceptor running, return handle to it. Task returns
354 : /// Ok(()) if either of channels has closed, and Err if any error during
355 : /// message processing is encountered.
356 : ///
357 : /// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
358 1849 : pub fn spawn(
359 1849 : tli: Arc<Timeline>,
360 1849 : msg_rx: Receiver<ProposerAcceptorMessage>,
361 1849 : reply_tx: Sender<AcceptorProposerMessage>,
362 1849 : conn_id: Option<ConnectionId>,
363 1849 : ) -> JoinHandle<anyhow::Result<()>> {
364 1849 : task::spawn(async move {
365 1849 : let mut wa = WalAcceptor {
366 1849 : tli,
367 1849 : msg_rx,
368 1849 : reply_tx,
369 1849 : conn_id,
370 1849 : };
371 1849 :
372 1849 : let span_ttid = wa.tli.ttid; // satisfy borrow checker
373 1849 : wa.run()
374 1849 : .instrument(
375 1849 : info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
376 : )
377 6466674 : .await
378 1849 : })
379 1849 : }
380 :
381 : /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
382 : /// it must mean that network thread terminated.
383 1849 : async fn run(&mut self) -> anyhow::Result<()> {
384 1849 : // Register the connection and defer unregister.
385 1849 : // Order of the next two lines is important: we want first to remove our entry and then
386 1849 : // update status which depends on registered connections.
387 1849 : let _compute_conn_guard = ComputeConnectionGuard {
388 1849 : timeline: Arc::clone(&self.tli),
389 1849 : };
390 1849 : let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
391 1849 : self.tli.update_status_notify().await?;
392 :
393 : // After this timestamp we will stop processing AppendRequests and send a response
394 : // to the walproposer. walproposer sends at least one AppendRequest per second,
395 : // we will send keepalives by replying to these requests once per second.
396 1849 : let mut next_keepalive = Instant::now();
397 :
398 : loop {
399 1666690 : let opt_msg = self.msg_rx.recv().await;
400 1666618 : if opt_msg.is_none() {
401 1767 : return Ok(()); // chan closed, streaming terminated
402 1664851 : }
403 1664851 : let mut next_msg = opt_msg.unwrap();
404 1664851 :
405 1664851 : // Update walreceiver state in shmem for reporting.
406 1664851 : if let ProposerAcceptorMessage::Elected(_) = &next_msg {
407 864 : walreceiver_guard.get().status = WalReceiverStatus::Streaming;
408 1663987 : }
409 :
410 1664851 : let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
411 : // loop through AppendRequest's while it's readily available to
412 : // write as many WAL as possible without fsyncing
413 : //
414 : // Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
415 : // Otherwise, we might end up in a situation where we read a message, but don't
416 : // process it.
417 2672260 : while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
418 2672260 : let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
419 :
420 3210566 : if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
421 0 : if self.reply_tx.send(reply).await.is_err() {
422 0 : return Ok(()); // chan closed, streaming terminated
423 0 : }
424 2672258 : }
425 :
426 : // get out of this loop if keepalive time is reached
427 2672258 : if Instant::now() >= next_keepalive {
428 1455 : break;
429 2670803 : }
430 2670803 :
431 2670803 : match self.msg_rx.try_recv() {
432 1011965 : Ok(msg) => next_msg = msg,
433 1658831 : Err(TryRecvError::Empty) => break,
434 7 : Err(TryRecvError::Disconnected) => return Ok(()), // chan closed, streaming terminated
435 : }
436 : }
437 :
438 : // flush all written WAL to the disk
439 1660286 : self.tli
440 1660286 : .process_msg(&ProposerAcceptorMessage::FlushWAL)
441 38493 : .await?
442 : } else {
443 : // process message other than AppendRequest
444 1586737 : self.tli.process_msg(&next_msg).await?
445 : };
446 :
447 1664842 : if let Some(reply) = reply_msg {
448 1663978 : if self.reply_tx.send(reply).await.is_err() {
449 1 : return Ok(()); // chan closed, streaming terminated
450 1663977 : }
451 1663977 : // reset keepalive time
452 1663977 : next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
453 864 : }
454 : }
455 1777 : }
456 : }
457 :
458 : /// Calls update_status_notify in drop to update timeline status.
459 : struct ComputeConnectionGuard {
460 : timeline: Arc<Timeline>,
461 : }
462 :
463 : impl Drop for ComputeConnectionGuard {
464 1777 : fn drop(&mut self) {
465 1777 : let tli = self.timeline.clone();
466 1777 : tokio::spawn(async move {
467 1777 : if let Err(e) = tli.update_status_notify().await {
468 0 : error!("failed to update timeline status: {}", e);
469 1777 : }
470 1777 : });
471 1777 : }
472 : }
|