Line data Source code
1 : //! Safekeeper communication endpoint to WAL proposer (compute node).
2 : //! Gets messages from the network, passes them down to consensus module and
3 : //! sends replies back.
4 :
5 : use crate::handler::SafekeeperPostgresHandler;
6 : use crate::safekeeper::AcceptorProposerMessage;
7 : use crate::safekeeper::ProposerAcceptorMessage;
8 : use crate::safekeeper::ServerInfo;
9 : use crate::timeline::Timeline;
10 : use crate::wal_service::ConnectionId;
11 : use crate::GlobalTimelines;
12 : use anyhow::{anyhow, Context};
13 : use bytes::BytesMut;
14 : use parking_lot::MappedMutexGuard;
15 : use parking_lot::Mutex;
16 : use parking_lot::MutexGuard;
17 : use postgres_backend::CopyStreamHandlerEnd;
18 : use postgres_backend::PostgresBackend;
19 : use postgres_backend::PostgresBackendReader;
20 : use postgres_backend::QueryError;
21 : use pq_proto::BeMessage;
22 : use serde::Deserialize;
23 : use serde::Serialize;
24 : use std::net::SocketAddr;
25 : use std::sync::Arc;
26 : use tokio::io::AsyncRead;
27 : use tokio::io::AsyncWrite;
28 : use tokio::sync::mpsc::channel;
29 : use tokio::sync::mpsc::error::TryRecvError;
30 : use tokio::sync::mpsc::Receiver;
31 : use tokio::sync::mpsc::Sender;
32 : use tokio::task;
33 : use tokio::task::JoinHandle;
34 : use tokio::time::Duration;
35 : use tokio::time::Instant;
36 : use tracing::*;
37 : use utils::id::TenantTimelineId;
38 : use utils::lsn::Lsn;
39 :
40 : /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
41 : /// in Arc).
42 : pub struct WalReceivers {
43 : mutex: Mutex<WalReceiversShared>,
44 : }
45 :
46 : /// Id under which walreceiver is registered in shmem.
47 : type WalReceiverId = usize;
48 :
49 : impl WalReceivers {
50 604 : pub fn new() -> Arc<WalReceivers> {
51 604 : Arc::new(WalReceivers {
52 604 : mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
53 604 : })
54 604 : }
55 :
56 : /// Register new walreceiver. Returned guard provides access to the slot and
57 : /// automatically deregisters in Drop.
58 2105 : pub fn register(self: &Arc<WalReceivers>) -> WalReceiverGuard {
59 2105 : let slots = &mut self.mutex.lock().slots;
60 2105 : let walreceiver = WalReceiverState::Voting;
61 : // find empty slot or create new one
62 2105 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
63 1493 : slots[pos] = Some(walreceiver);
64 1493 : pos
65 : } else {
66 612 : let pos = slots.len();
67 612 : slots.push(Some(walreceiver));
68 612 : pos
69 : };
70 2105 : WalReceiverGuard {
71 2105 : id: pos,
72 2105 : walreceivers: self.clone(),
73 2105 : }
74 2105 : }
75 :
76 : /// Get reference to locked slot contents. Slot must exist (registered
77 : /// earlier).
78 968 : fn get_slot<'a>(
79 968 : self: &'a Arc<WalReceivers>,
80 968 : id: WalReceiverId,
81 968 : ) -> MappedMutexGuard<'a, WalReceiverState> {
82 968 : MutexGuard::map(self.mutex.lock(), |locked| {
83 968 : locked.slots[id]
84 968 : .as_mut()
85 968 : .expect("walreceiver doesn't exist")
86 968 : })
87 968 : }
88 :
89 : /// Get number of walreceivers (compute connections).
90 14005 : pub fn get_num(self: &Arc<WalReceivers>) -> usize {
91 14005 : self.mutex.lock().slots.iter().flatten().count()
92 14005 : }
93 :
94 : /// Get state of all walreceivers.
95 199 : pub fn get_all(self: &Arc<WalReceivers>) -> Vec<WalReceiverState> {
96 199 : self.mutex.lock().slots.iter().flatten().cloned().collect()
97 199 : }
98 :
99 : /// Unregister walsender.
100 2032 : fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
101 2032 : let mut shared = self.mutex.lock();
102 2032 : shared.slots[id] = None;
103 2032 : }
104 : }
105 :
106 : /// Only a few connections are expected (normally one), so store in Vec.
107 : struct WalReceiversShared {
108 : slots: Vec<Option<WalReceiverState>>,
109 : }
110 :
111 : /// Walreceiver status. Currently only whether it passed voting stage and
112 : /// started receiving the stream, but it is easy to add more if needed.
113 186 : #[derive(Debug, Clone, Serialize, Deserialize)]
114 : pub enum WalReceiverState {
115 : Voting,
116 : Streaming,
117 : }
118 :
119 : /// Scope guard to access slot in WalSenders registry and unregister from it in
120 : /// Drop.
121 : pub struct WalReceiverGuard {
122 : id: WalReceiverId,
123 : walreceivers: Arc<WalReceivers>,
124 : }
125 :
126 : impl WalReceiverGuard {
127 : /// Get reference to locked shared state contents.
128 968 : fn get(&self) -> MappedMutexGuard<WalReceiverState> {
129 968 : self.walreceivers.get_slot(self.id)
130 968 : }
131 : }
132 :
133 : impl Drop for WalReceiverGuard {
134 2032 : fn drop(&mut self) {
135 2032 : self.walreceivers.unregister(self.id);
136 2032 : }
137 : }
138 :
139 : const MSG_QUEUE_SIZE: usize = 256;
140 : const REPLY_QUEUE_SIZE: usize = 16;
141 :
142 : impl SafekeeperPostgresHandler {
143 : /// Wrapper around handle_start_wal_push_guts handling result. Error is
144 : /// handled here while we're still in walreceiver ttid span; with API
145 : /// extension, this can probably be moved into postgres_backend.
146 2105 : pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
147 2105 : &mut self,
148 2105 : pgb: &mut PostgresBackend<IO>,
149 2105 : ) -> Result<(), QueryError> {
150 4548274 : if let Err(end) = self.handle_start_wal_push_guts(pgb).await {
151 : // Log the result and probably send it to the client, closing the stream.
152 2032 : pgb.handle_copy_stream_end(end).await;
153 0 : }
154 2032 : Ok(())
155 2032 : }
156 :
157 2105 : pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
158 2105 : &mut self,
159 2105 : pgb: &mut PostgresBackend<IO>,
160 2105 : ) -> Result<(), CopyStreamHandlerEnd> {
161 2105 : // Notify the libpq client that it's allowed to send `CopyData` messages
162 2105 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
163 :
164 : // Experiments [1] confirm that doing network IO in one (this) thread and
165 : // processing with disc IO in another significantly improves
166 : // performance; we spawn off WalAcceptor thread for message processing
167 : // to this end.
168 : //
169 : // [1] https://github.com/neondatabase/neon/pull/1318
170 2105 : let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
171 2105 : let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
172 2105 : let mut acceptor_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
173 :
174 : // Concurrently receive and send data; replies are not synchronized with
175 : // sends, so this avoids deadlocks.
176 2105 : let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
177 2105 : let peer_addr = *pgb.get_peer_addr();
178 2105 : let network_reader = NetworkReader {
179 2105 : ttid: self.ttid,
180 2105 : conn_id: self.conn_id,
181 2105 : pgb_reader: &mut pgb_reader,
182 2105 : peer_addr,
183 2105 : acceptor_handle: &mut acceptor_handle,
184 2105 : };
185 2105 : let res = tokio::select! {
186 : // todo: add read|write .context to these errors
187 2032 : r = network_reader.run(msg_tx, msg_rx, reply_tx) => r,
188 0 : r = network_write(pgb, reply_rx) => r,
189 : };
190 :
191 : // Join pg backend back.
192 2032 : pgb.unsplit(pgb_reader)?;
193 :
194 : // Join the spawned WalAcceptor. At this point chans to/from it passed
195 : // to network routines are dropped, so it will exit as soon as it
196 : // touches them.
197 2032 : match acceptor_handle {
198 : None => {
199 : // failed even before spawning; read_network should have error
200 0 : Err(res.expect_err("no error with WalAcceptor not spawn"))
201 : }
202 2032 : Some(handle) => {
203 2037 : let wal_acceptor_res = handle.await;
204 :
205 : // If there was any network error, return it.
206 2032 : res?;
207 :
208 : // Otherwise, WalAcceptor thread must have errored.
209 0 : match wal_acceptor_res {
210 0 : Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination
211 0 : Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
212 0 : Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
213 0 : "WalAcceptor task panicked",
214 0 : ))),
215 : }
216 : }
217 : }
218 2032 : }
219 : }
220 :
221 : struct NetworkReader<'a, IO> {
222 : ttid: TenantTimelineId,
223 : conn_id: ConnectionId,
224 : pgb_reader: &'a mut PostgresBackendReader<IO>,
225 : peer_addr: SocketAddr,
226 : // WalAcceptor is spawned when we learn server info from walproposer and
227 : // create timeline; handle is put here.
228 : acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
229 : }
230 :
231 : impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
232 2105 : async fn run(
233 2105 : self,
234 2105 : msg_tx: Sender<ProposerAcceptorMessage>,
235 2105 : msg_rx: Receiver<ProposerAcceptorMessage>,
236 2105 : reply_tx: Sender<AcceptorProposerMessage>,
237 2105 : ) -> Result<(), CopyStreamHandlerEnd> {
238 : // Receive information about server to create timeline, if not yet.
239 2105 : let next_msg = read_message(self.pgb_reader).await?;
240 2105 : let tli = match next_msg {
241 2105 : ProposerAcceptorMessage::Greeting(ref greeting) => {
242 2105 : info!(
243 2105 : "start handshake with walproposer {} sysid {} timeline {}",
244 2105 : self.peer_addr, greeting.system_id, greeting.tli,
245 2105 : );
246 2105 : let server_info = ServerInfo {
247 2105 : pg_version: greeting.pg_version,
248 2105 : system_id: greeting.system_id,
249 2105 : wal_seg_size: greeting.wal_seg_size,
250 2105 : };
251 2756 : GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID).await?
252 : }
253 : _ => {
254 0 : return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
255 0 : "unexpected message {next_msg:?} instead of greeting"
256 0 : )))
257 : }
258 : };
259 :
260 2105 : *self.acceptor_handle = Some(WalAcceptor::spawn(
261 2105 : tli.clone(),
262 2105 : msg_rx,
263 2105 : reply_tx,
264 2105 : self.conn_id,
265 2105 : ));
266 2105 :
267 2105 : // Forward all messages to WalAcceptor
268 4541376 : read_network_loop(self.pgb_reader, msg_tx, next_msg).await
269 2032 : }
270 : }
271 :
272 : /// Read next message from walproposer.
273 : /// TODO: Return Ok(None) on graceful termination.
274 2650523 : async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
275 2650523 : pgb_reader: &mut PostgresBackendReader<IO>,
276 2650523 : ) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
277 4543441 : let copy_data = pgb_reader.read_copy_message().await?;
278 2648418 : let msg = ProposerAcceptorMessage::parse(copy_data)?;
279 2648418 : Ok(msg)
280 2650450 : }
281 :
282 2105 : async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
283 2105 : pgb_reader: &mut PostgresBackendReader<IO>,
284 2105 : msg_tx: Sender<ProposerAcceptorMessage>,
285 2105 : mut next_msg: ProposerAcceptorMessage,
286 2105 : ) -> Result<(), CopyStreamHandlerEnd> {
287 : loop {
288 2648418 : if msg_tx.send(next_msg).await.is_err() {
289 0 : return Ok(()); // chan closed, WalAcceptor terminated
290 2648418 : }
291 4541336 : next_msg = read_message(pgb_reader).await?;
292 : }
293 2032 : }
294 :
295 : /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
296 : /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
297 : /// tell the error.
298 2105 : async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
299 2105 : pgb_writer: &mut PostgresBackend<IO>,
300 2105 : mut reply_rx: Receiver<AcceptorProposerMessage>,
301 2105 : ) -> Result<(), CopyStreamHandlerEnd> {
302 2105 : let mut buf = BytesMut::with_capacity(128);
303 :
304 : loop {
305 4545217 : match reply_rx.recv().await {
306 2204482 : Some(msg) => {
307 2204482 : buf.clear();
308 2204482 : msg.serialize(&mut buf)?;
309 2204482 : pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
310 : }
311 0 : None => return Ok(()), // chan closed, WalAcceptor terminated
312 : }
313 : }
314 0 : }
315 :
316 : // Send keepalive messages to walproposer, to make sure it receives updates
317 : // even when it writes a steady stream of messages.
318 : const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
319 :
320 : /// Takes messages from msg_rx, processes and pushes replies to reply_tx.
321 : struct WalAcceptor {
322 : tli: Arc<Timeline>,
323 : msg_rx: Receiver<ProposerAcceptorMessage>,
324 : reply_tx: Sender<AcceptorProposerMessage>,
325 : }
326 :
327 : impl WalAcceptor {
328 : /// Spawn thread with WalAcceptor running, return handle to it.
329 2105 : fn spawn(
330 2105 : tli: Arc<Timeline>,
331 2105 : msg_rx: Receiver<ProposerAcceptorMessage>,
332 2105 : reply_tx: Sender<AcceptorProposerMessage>,
333 2105 : conn_id: ConnectionId,
334 2105 : ) -> JoinHandle<anyhow::Result<()>> {
335 2105 : task::spawn(async move {
336 2105 : let mut wa = WalAcceptor {
337 2105 : tli,
338 2105 : msg_rx,
339 2105 : reply_tx,
340 2105 : };
341 2105 :
342 2105 : let span_ttid = wa.tli.ttid; // satisfy borrow checker
343 2105 : wa.run()
344 2105 : .instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid))
345 6971489 : .await
346 2105 : })
347 2105 : }
348 :
349 : /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
350 : /// it must mean that network thread terminated.
351 2105 : async fn run(&mut self) -> anyhow::Result<()> {
352 2105 : // Register the connection and defer unregister.
353 2105 : // Order of the next two lines is important: we want first to remove our entry and then
354 2105 : // update status which depends on registered connections.
355 2105 : let _compute_conn_guard = ComputeConnectionGuard {
356 2105 : timeline: Arc::clone(&self.tli),
357 2105 : };
358 2105 : let walreceiver_guard = self.tli.get_walreceivers().register();
359 2105 : self.tli.update_status_notify().await?;
360 :
361 : // After this timestamp we will stop processing AppendRequests and send a response
362 : // to the walproposer. walproposer sends at least one AppendRequest per second,
363 : // we will send keepalives by replying to these requests once per second.
364 2105 : let mut next_keepalive = Instant::now();
365 :
366 : loop {
367 2207483 : let opt_msg = self.msg_rx.recv().await;
368 2207410 : if opt_msg.is_none() {
369 2024 : return Ok(()); // chan closed, streaming terminated
370 2205386 : }
371 2205386 : let mut next_msg = opt_msg.unwrap();
372 2205386 :
373 2205386 : // Update walreceiver state in shmem for reporting.
374 2205386 : if let ProposerAcceptorMessage::Elected(_) = &next_msg {
375 968 : *walreceiver_guard.get() = WalReceiverState::Streaming;
376 2204418 : }
377 :
378 2205386 : let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
379 : // loop through AppendRequest's while it's readily available to
380 : // write as many WAL as possible without fsyncing
381 : //
382 : // Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
383 : // Otherwise, we might end up in a situation where we read a message, but don't
384 : // process it.
385 2643242 : while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
386 2643242 : let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
387 :
388 3045750 : if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
389 72 : if self.reply_tx.send(reply).await.is_err() {
390 0 : return Ok(()); // chan closed, streaming terminated
391 72 : }
392 2643170 : }
393 :
394 : // get out of this loop if keepalive time is reached
395 2643242 : if Instant::now() >= next_keepalive {
396 998 : break;
397 2642244 : }
398 2642244 :
399 2642244 : match self.msg_rx.try_recv() {
400 443032 : Ok(msg) => next_msg = msg,
401 2199210 : Err(TryRecvError::Empty) => break,
402 2 : Err(TryRecvError::Disconnected) => return Ok(()), // chan closed, streaming terminated
403 : }
404 : }
405 :
406 : // flush all written WAL to the disk
407 2200208 : self.tli
408 2200208 : .process_msg(&ProposerAcceptorMessage::FlushWAL)
409 325 : .await?
410 : } else {
411 : // process message other than AppendRequest
412 1734650 : self.tli.process_msg(&next_msg).await?
413 : };
414 :
415 2205384 : if let Some(reply) = reply_msg {
416 2204416 : if self.reply_tx.send(reply).await.is_err() {
417 6 : return Ok(()); // chan closed, streaming terminated
418 2204410 : }
419 2204410 : // reset keepalive time
420 2204410 : next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
421 968 : }
422 : }
423 2032 : }
424 : }
425 :
426 : /// Calls update_status_notify in drop to update timeline status.
427 : struct ComputeConnectionGuard {
428 : timeline: Arc<Timeline>,
429 : }
430 :
431 : impl Drop for ComputeConnectionGuard {
432 2032 : fn drop(&mut self) {
433 2032 : let tli = self.timeline.clone();
434 2032 : tokio::spawn(async move {
435 2032 : if let Err(e) = tli.update_status_notify().await {
436 0 : error!("failed to update timeline status: {}", e);
437 2032 : }
438 2032 : });
439 2032 : }
440 : }
|