TLA Line data Source code
1 : //! Safekeeper communication endpoint to WAL proposer (compute node).
2 : //! Gets messages from the network, passes them down to consensus module and
3 : //! sends replies back.
4 :
5 : use crate::handler::SafekeeperPostgresHandler;
6 : use crate::safekeeper::AcceptorProposerMessage;
7 : use crate::safekeeper::ProposerAcceptorMessage;
8 : use crate::safekeeper::ServerInfo;
9 : use crate::timeline::Timeline;
10 : use crate::wal_service::ConnectionId;
11 : use crate::GlobalTimelines;
12 : use anyhow::{anyhow, Context};
13 : use bytes::BytesMut;
14 : use parking_lot::MappedMutexGuard;
15 : use parking_lot::Mutex;
16 : use parking_lot::MutexGuard;
17 : use postgres_backend::CopyStreamHandlerEnd;
18 : use postgres_backend::PostgresBackend;
19 : use postgres_backend::PostgresBackendReader;
20 : use postgres_backend::QueryError;
21 : use pq_proto::BeMessage;
22 : use serde::Deserialize;
23 : use serde::Serialize;
24 : use std::net::SocketAddr;
25 : use std::sync::Arc;
26 : use tokio::io::AsyncRead;
27 : use tokio::io::AsyncWrite;
28 : use tokio::sync::mpsc::channel;
29 : use tokio::sync::mpsc::error::TryRecvError;
30 : use tokio::sync::mpsc::Receiver;
31 : use tokio::sync::mpsc::Sender;
32 : use tokio::task;
33 : use tokio::task::JoinHandle;
34 : use tokio::time::Duration;
35 : use tokio::time::Instant;
36 : use tracing::*;
37 : use utils::id::TenantTimelineId;
38 : use utils::lsn::Lsn;
39 :
40 : /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
41 : /// in Arc).
42 : pub struct WalReceivers {
43 : mutex: Mutex<WalReceiversShared>,
44 : }
45 :
46 : /// Id under which walreceiver is registered in shmem.
47 : type WalReceiverId = usize;
48 :
49 : impl WalReceivers {
50 CBC 565 : pub fn new() -> Arc<WalReceivers> {
51 565 : Arc::new(WalReceivers {
52 565 : mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
53 565 : })
54 565 : }
55 :
56 : /// Register new walreceiver. Returned guard provides access to the slot and
57 : /// automatically deregisters in Drop.
58 2012 : pub fn register(self: &Arc<WalReceivers>) -> WalReceiverGuard {
59 2012 : let slots = &mut self.mutex.lock().slots;
60 2012 : let walreceiver = WalReceiverState::Voting;
61 : // find empty slot or create new one
62 2012 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
63 1447 : slots[pos] = Some(walreceiver);
64 1447 : pos
65 : } else {
66 565 : let pos = slots.len();
67 565 : slots.push(Some(walreceiver));
68 565 : pos
69 : };
70 2012 : WalReceiverGuard {
71 2012 : id: pos,
72 2012 : walreceivers: self.clone(),
73 2012 : }
74 2012 : }
75 :
76 : /// Get reference to locked slot contents. Slot must exist (registered
77 : /// earlier).
78 1016 : fn get_slot<'a>(
79 1016 : self: &'a Arc<WalReceivers>,
80 1016 : id: WalReceiverId,
81 1016 : ) -> MappedMutexGuard<'a, WalReceiverState> {
82 1016 : MutexGuard::map(self.mutex.lock(), |locked| {
83 1016 : locked.slots[id]
84 1016 : .as_mut()
85 1016 : .expect("walreceiver doesn't exist")
86 1016 : })
87 1016 : }
88 :
89 : /// Get number of walreceivers (compute connections).
90 16500 : pub fn get_num(self: &Arc<WalReceivers>) -> usize {
91 16500 : self.mutex.lock().slots.iter().flatten().count()
92 16500 : }
93 :
94 : /// Get state of all walreceivers.
95 212 : pub fn get_all(self: &Arc<WalReceivers>) -> Vec<WalReceiverState> {
96 212 : self.mutex.lock().slots.iter().flatten().cloned().collect()
97 212 : }
98 :
99 : /// Unregister walsender.
100 1935 : fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
101 1935 : let mut shared = self.mutex.lock();
102 1935 : shared.slots[id] = None;
103 1935 : }
104 : }
105 :
106 : /// Only a few connections are expected (normally one), so store in Vec.
107 : struct WalReceiversShared {
108 : slots: Vec<Option<WalReceiverState>>,
109 : }
110 :
111 : /// Walreceiver status. Currently only whether it passed voting stage and
112 : /// started receiving the stream, but it is easy to add more if needed.
113 198 : #[derive(Debug, Clone, Serialize, Deserialize)]
114 : pub enum WalReceiverState {
115 : Voting,
116 : Streaming,
117 : }
118 :
119 : /// Scope guard to access slot in WalSenders registry and unregister from it in
120 : /// Drop.
121 : pub struct WalReceiverGuard {
122 : id: WalReceiverId,
123 : walreceivers: Arc<WalReceivers>,
124 : }
125 :
126 : impl WalReceiverGuard {
127 : /// Get reference to locked shared state contents.
128 1016 : fn get(&self) -> MappedMutexGuard<WalReceiverState> {
129 1016 : self.walreceivers.get_slot(self.id)
130 1016 : }
131 : }
132 :
133 : impl Drop for WalReceiverGuard {
134 1935 : fn drop(&mut self) {
135 1935 : self.walreceivers.unregister(self.id);
136 1935 : }
137 : }
138 :
139 : const MSG_QUEUE_SIZE: usize = 256;
140 : const REPLY_QUEUE_SIZE: usize = 16;
141 :
142 : impl SafekeeperPostgresHandler {
143 : /// Wrapper around handle_start_wal_push_guts handling result. Error is
144 : /// handled here while we're still in walreceiver ttid span; with API
145 : /// extension, this can probably be moved into postgres_backend.
146 2013 : pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
147 2013 : &mut self,
148 2013 : pgb: &mut PostgresBackend<IO>,
149 2013 : ) -> Result<(), QueryError> {
150 5068421 : if let Err(end) = self.handle_start_wal_push_guts(pgb).await {
151 : // Log the result and probably send it to the client, closing the stream.
152 1936 : pgb.handle_copy_stream_end(end).await;
153 UBC 0 : }
154 CBC 1936 : Ok(())
155 1936 : }
156 :
157 2013 : pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
158 2013 : &mut self,
159 2013 : pgb: &mut PostgresBackend<IO>,
160 2013 : ) -> Result<(), CopyStreamHandlerEnd> {
161 2013 : // Notify the libpq client that it's allowed to send `CopyData` messages
162 2013 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
163 :
164 : // Experiments [1] confirm that doing network IO in one (this) thread and
165 : // processing with disc IO in another significantly improves
166 : // performance; we spawn off WalAcceptor thread for message processing
167 : // to this end.
168 : //
169 : // [1] https://github.com/neondatabase/neon/pull/1318
170 2013 : let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
171 2013 : let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
172 2013 : let mut acceptor_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
173 :
174 : // Concurrently receive and send data; replies are not synchronized with
175 : // sends, so this avoids deadlocks.
176 2013 : let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
177 2013 : let peer_addr = *pgb.get_peer_addr();
178 2013 : let network_reader = NetworkReader {
179 2013 : ttid: self.ttid,
180 2013 : conn_id: self.conn_id,
181 2013 : pgb_reader: &mut pgb_reader,
182 2013 : peer_addr,
183 2013 : acceptor_handle: &mut acceptor_handle,
184 2013 : };
185 2013 : let res = tokio::select! {
186 : // todo: add read|write .context to these errors
187 1936 : r = network_reader.run(msg_tx, msg_rx, reply_tx) => r,
188 UBC 0 : r = network_write(pgb, reply_rx) => r,
189 : };
190 :
191 : // Join pg backend back.
192 CBC 1936 : pgb.unsplit(pgb_reader)?;
193 :
194 : // Join the spawned WalAcceptor. At this point chans to/from it passed
195 : // to network routines are dropped, so it will exit as soon as it
196 : // touches them.
197 1936 : match acceptor_handle {
198 : None => {
199 : // failed even before spawning; read_network should have error
200 GBC 1 : Err(res.expect_err("no error with WalAcceptor not spawn"))
201 : }
202 CBC 1935 : Some(handle) => {
203 1937 : let wal_acceptor_res = handle.await;
204 :
205 : // If there was any network error, return it.
206 1935 : res?;
207 :
208 : // Otherwise, WalAcceptor thread must have errored.
209 UBC 0 : match wal_acceptor_res {
210 0 : Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination
211 0 : Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
212 0 : Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
213 0 : "WalAcceptor task panicked",
214 0 : ))),
215 : }
216 : }
217 : }
218 CBC 1936 : }
219 : }
220 :
221 : struct NetworkReader<'a, IO> {
222 : ttid: TenantTimelineId,
223 : conn_id: ConnectionId,
224 : pgb_reader: &'a mut PostgresBackendReader<IO>,
225 : peer_addr: SocketAddr,
226 : // WalAcceptor is spawned when we learn server info from walproposer and
227 : // create timeline; handle is put here.
228 : acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
229 : }
230 :
231 : impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
232 2013 : async fn run(
233 2013 : self,
234 2013 : msg_tx: Sender<ProposerAcceptorMessage>,
235 2013 : msg_rx: Receiver<ProposerAcceptorMessage>,
236 2013 : reply_tx: Sender<AcceptorProposerMessage>,
237 2013 : ) -> Result<(), CopyStreamHandlerEnd> {
238 : // Receive information about server to create timeline, if not yet.
239 2013 : let next_msg = read_message(self.pgb_reader).await?;
240 2012 : let tli = match next_msg {
241 2012 : ProposerAcceptorMessage::Greeting(ref greeting) => {
242 2012 : info!(
243 2012 : "start handshake with walproposer {} sysid {} timeline {}",
244 2012 : self.peer_addr, greeting.system_id, greeting.tli,
245 2012 : );
246 2012 : let server_info = ServerInfo {
247 2012 : pg_version: greeting.pg_version,
248 2012 : system_id: greeting.system_id,
249 2012 : wal_seg_size: greeting.wal_seg_size,
250 2012 : };
251 2540 : GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID).await?
252 : }
253 : _ => {
254 UBC 0 : return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
255 0 : "unexpected message {next_msg:?} instead of greeting"
256 0 : )))
257 : }
258 : };
259 :
260 CBC 2012 : *self.acceptor_handle = Some(WalAcceptor::spawn(
261 2012 : tli.clone(),
262 2012 : msg_rx,
263 2012 : reply_tx,
264 2012 : self.conn_id,
265 2012 : ));
266 2012 :
267 2012 : // Forward all messages to WalAcceptor
268 5061931 : read_network_loop(self.pgb_reader, msg_tx, next_msg).await
269 1936 : }
270 : }
271 :
272 : /// Read next message from walproposer.
273 : /// TODO: Return Ok(None) on graceful termination.
274 2919773 : async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
275 2919773 : pgb_reader: &mut PostgresBackendReader<IO>,
276 2919773 : ) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
277 5063908 : let copy_data = pgb_reader.read_copy_message().await?;
278 2917760 : let msg = ProposerAcceptorMessage::parse(copy_data)?;
279 2917760 : Ok(msg)
280 2919696 : }
281 :
282 2012 : async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
283 2012 : pgb_reader: &mut PostgresBackendReader<IO>,
284 2012 : msg_tx: Sender<ProposerAcceptorMessage>,
285 2012 : mut next_msg: ProposerAcceptorMessage,
286 2012 : ) -> Result<(), CopyStreamHandlerEnd> {
287 : loop {
288 2917760 : if msg_tx.send(next_msg).await.is_err() {
289 UBC 0 : return Ok(()); // chan closed, WalAcceptor terminated
290 CBC 2917760 : }
291 5061895 : next_msg = read_message(pgb_reader).await?;
292 : }
293 1935 : }
294 :
295 : /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
296 : /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
297 : /// tell the error.
298 2013 : async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
299 2013 : pgb_writer: &mut PostgresBackend<IO>,
300 2013 : mut reply_rx: Receiver<AcceptorProposerMessage>,
301 2013 : ) -> Result<(), CopyStreamHandlerEnd> {
302 2013 : let mut buf = BytesMut::with_capacity(128);
303 :
304 : loop {
305 5065514 : match reply_rx.recv().await {
306 2463777 : Some(msg) => {
307 2463777 : buf.clear();
308 2463777 : msg.serialize(&mut buf)?;
309 2463777 : pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
310 : }
311 UBC 0 : None => return Ok(()), // chan closed, WalAcceptor terminated
312 : }
313 : }
314 0 : }
315 :
316 : // Send keepalive messages to walproposer, to make sure it receives updates
317 : // even when it writes a steady stream of messages.
318 : const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
319 :
320 : /// Takes messages from msg_rx, processes and pushes replies to reply_tx.
321 : struct WalAcceptor {
322 : tli: Arc<Timeline>,
323 : msg_rx: Receiver<ProposerAcceptorMessage>,
324 : reply_tx: Sender<AcceptorProposerMessage>,
325 : }
326 :
327 : impl WalAcceptor {
328 : /// Spawn thread with WalAcceptor running, return handle to it.
329 CBC 2012 : fn spawn(
330 2012 : tli: Arc<Timeline>,
331 2012 : msg_rx: Receiver<ProposerAcceptorMessage>,
332 2012 : reply_tx: Sender<AcceptorProposerMessage>,
333 2012 : conn_id: ConnectionId,
334 2012 : ) -> JoinHandle<anyhow::Result<()>> {
335 2012 : task::spawn(async move {
336 2012 : let mut wa = WalAcceptor {
337 2012 : tli,
338 2012 : msg_rx,
339 2012 : reply_tx,
340 2012 : };
341 2012 :
342 2012 : let span_ttid = wa.tli.ttid; // satisfy borrow checker
343 2012 : wa.run()
344 2012 : .instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid))
345 7207096 : .await
346 2012 : })
347 2012 : }
348 :
349 : /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
350 : /// it must mean that network thread terminated.
351 2012 : async fn run(&mut self) -> anyhow::Result<()> {
352 2012 : // Register the connection and defer unregister.
353 2012 : // Order of the next two lines is important: we want first to remove our entry and then
354 2012 : // update status which depends on registered connections.
355 2012 : let _compute_conn_guard = ComputeConnectionGuard {
356 2012 : timeline: Arc::clone(&self.tli),
357 2012 : };
358 2012 : let walreceiver_guard = self.tli.get_walreceivers().register();
359 2012 : self.tli.update_status_notify().await?;
360 :
361 : // After this timestamp we will stop processing AppendRequests and send a response
362 : // to the walproposer. walproposer sends at least one AppendRequest per second,
363 : // we will send keepalives by replying to these requests once per second.
364 2012 : let mut next_keepalive = Instant::now();
365 :
366 : loop {
367 2466748 : let opt_msg = self.msg_rx.recv().await;
368 2466671 : if opt_msg.is_none() {
369 1932 : return Ok(()); // chan closed, streaming terminated
370 2464739 : }
371 2464739 : let mut next_msg = opt_msg.unwrap();
372 2464739 :
373 2464739 : // Update walreceiver state in shmem for reporting.
374 2464739 : if let ProposerAcceptorMessage::Elected(_) = &next_msg {
375 1016 : *walreceiver_guard.get() = WalReceiverState::Streaming;
376 2463723 : }
377 :
378 2464739 : let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
379 : // loop through AppendRequest's while it's readily available to
380 : // write as many WAL as possible without fsyncing
381 : //
382 : // Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
383 : // Otherwise, we might end up in a situation where we read a message, but don't
384 : // process it.
385 2912723 : while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
386 2912723 : let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
387 :
388 3135031 : if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
389 57 : if self.reply_tx.send(reply).await.is_err() {
390 UBC 0 : return Ok(()); // chan closed, streaming terminated
391 CBC 57 : }
392 2912666 : }
393 :
394 : // get out of this loop if keepalive time is reached
395 2912723 : if Instant::now() >= next_keepalive {
396 1339 : break;
397 2911384 : }
398 2911384 :
399 2911384 : match self.msg_rx.try_recv() {
400 453021 : Ok(msg) => next_msg = msg,
401 2458361 : Err(TryRecvError::Empty) => break,
402 2 : Err(TryRecvError::Disconnected) => return Ok(()), // chan closed, streaming terminated
403 : }
404 : }
405 :
406 : // flush all written WAL to the disk
407 2459700 : self.tli
408 2459700 : .process_msg(&ProposerAcceptorMessage::FlushWAL)
409 338 : .await?
410 : } else {
411 : // process message other than AppendRequest
412 1622170 : self.tli.process_msg(&next_msg).await?
413 : };
414 :
415 2464737 : if let Some(reply) = reply_msg {
416 2463721 : if self.reply_tx.send(reply).await.is_err() {
417 1 : return Ok(()); // chan closed, streaming terminated
418 2463720 : }
419 2463720 : // reset keepalive time
420 2463720 : next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
421 1016 : }
422 : }
423 1935 : }
424 : }
425 :
426 : /// Calls update_status_notify in drop to update timeline status.
427 : struct ComputeConnectionGuard {
428 : timeline: Arc<Timeline>,
429 : }
430 :
431 : impl Drop for ComputeConnectionGuard {
432 1935 : fn drop(&mut self) {
433 1935 : let tli = self.timeline.clone();
434 1935 : tokio::spawn(async move {
435 1935 : if let Err(e) = tli.update_status_notify().await {
436 UBC 0 : error!("failed to update timeline status: {}", e);
437 CBC 1935 : }
438 1935 : });
439 1935 : }
440 : }
|