Line data Source code
1 : //! Safekeeper communication endpoint to WAL proposer (compute node).
2 : //! Gets messages from the network, passes them down to consensus module and
3 : //! sends replies back.
4 :
5 : use crate::handler::SafekeeperPostgresHandler;
6 : use crate::safekeeper::AcceptorProposerMessage;
7 : use crate::safekeeper::ProposerAcceptorMessage;
8 : use crate::safekeeper::ServerInfo;
9 : use crate::timeline::WalResidentTimeline;
10 : use crate::wal_service::ConnectionId;
11 : use crate::GlobalTimelines;
12 : use anyhow::{anyhow, Context};
13 : use bytes::BytesMut;
14 : use parking_lot::MappedMutexGuard;
15 : use parking_lot::Mutex;
16 : use parking_lot::MutexGuard;
17 : use postgres_backend::CopyStreamHandlerEnd;
18 : use postgres_backend::PostgresBackend;
19 : use postgres_backend::PostgresBackendReader;
20 : use postgres_backend::QueryError;
21 : use pq_proto::BeMessage;
22 : use serde::Deserialize;
23 : use serde::Serialize;
24 : use std::net::SocketAddr;
25 : use std::sync::Arc;
26 : use tokio::io::AsyncRead;
27 : use tokio::io::AsyncWrite;
28 : use tokio::sync::mpsc::channel;
29 : use tokio::sync::mpsc::error::TryRecvError;
30 : use tokio::sync::mpsc::Receiver;
31 : use tokio::sync::mpsc::Sender;
32 : use tokio::task;
33 : use tokio::task::JoinHandle;
34 : use tokio::time::Duration;
35 : use tokio::time::Instant;
36 : use tracing::*;
37 : use utils::id::TenantTimelineId;
38 : use utils::lsn::Lsn;
39 : use utils::pageserver_feedback::PageserverFeedback;
40 :
41 : const DEFAULT_FEEDBACK_CAPACITY: usize = 8;
42 :
43 : /// Registry of WalReceivers (compute connections). Timeline holds it (wrapped
44 : /// in Arc).
45 : pub struct WalReceivers {
46 : mutex: Mutex<WalReceiversShared>,
47 : pageserver_feedback_tx: tokio::sync::broadcast::Sender<PageserverFeedback>,
48 :
49 : num_computes_tx: tokio::sync::watch::Sender<usize>,
50 : num_computes_rx: tokio::sync::watch::Receiver<usize>,
51 : }
52 :
53 : /// Id under which walreceiver is registered in shmem.
54 : type WalReceiverId = usize;
55 :
56 : impl WalReceivers {
57 0 : pub fn new() -> Arc<WalReceivers> {
58 0 : let (pageserver_feedback_tx, _) =
59 0 : tokio::sync::broadcast::channel(DEFAULT_FEEDBACK_CAPACITY);
60 0 :
61 0 : let (num_computes_tx, num_computes_rx) = tokio::sync::watch::channel(0usize);
62 0 :
63 0 : Arc::new(WalReceivers {
64 0 : mutex: Mutex::new(WalReceiversShared { slots: Vec::new() }),
65 0 : pageserver_feedback_tx,
66 0 : num_computes_tx,
67 0 : num_computes_rx,
68 0 : })
69 0 : }
70 :
71 : /// Register new walreceiver. Returned guard provides access to the slot and
72 : /// automatically deregisters in Drop.
73 0 : pub fn register(self: &Arc<WalReceivers>, conn_id: Option<ConnectionId>) -> WalReceiverGuard {
74 0 : let mut shared = self.mutex.lock();
75 0 : let slots = &mut shared.slots;
76 0 : let walreceiver = WalReceiverState {
77 0 : conn_id,
78 0 : status: WalReceiverStatus::Voting,
79 0 : };
80 : // find empty slot or create new one
81 0 : let pos = if let Some(pos) = slots.iter().position(|s| s.is_none()) {
82 0 : slots[pos] = Some(walreceiver);
83 0 : pos
84 : } else {
85 0 : let pos = slots.len();
86 0 : slots.push(Some(walreceiver));
87 0 : pos
88 : };
89 :
90 0 : self.update_num(&shared);
91 0 :
92 0 : WalReceiverGuard {
93 0 : id: pos,
94 0 : walreceivers: self.clone(),
95 0 : }
96 0 : }
97 :
98 : /// Get reference to locked slot contents. Slot must exist (registered
99 : /// earlier).
100 0 : fn get_slot<'a>(
101 0 : self: &'a Arc<WalReceivers>,
102 0 : id: WalReceiverId,
103 0 : ) -> MappedMutexGuard<'a, WalReceiverState> {
104 0 : MutexGuard::map(self.mutex.lock(), |locked| {
105 0 : locked.slots[id]
106 0 : .as_mut()
107 0 : .expect("walreceiver doesn't exist")
108 0 : })
109 0 : }
110 :
111 : /// Get number of walreceivers (compute connections).
112 0 : pub fn get_num(self: &Arc<WalReceivers>) -> usize {
113 0 : self.mutex.lock().get_num()
114 0 : }
115 :
116 : /// Get channel for number of walreceivers.
117 0 : pub fn get_num_rx(self: &Arc<WalReceivers>) -> tokio::sync::watch::Receiver<usize> {
118 0 : self.num_computes_rx.clone()
119 0 : }
120 :
121 : /// Should get called after every update of slots.
122 0 : fn update_num(self: &Arc<WalReceivers>, shared: &MutexGuard<WalReceiversShared>) {
123 0 : let num = shared.get_num();
124 0 : self.num_computes_tx.send_replace(num);
125 0 : }
126 :
127 : /// Get state of all walreceivers.
128 0 : pub fn get_all(self: &Arc<WalReceivers>) -> Vec<WalReceiverState> {
129 0 : self.mutex.lock().slots.iter().flatten().cloned().collect()
130 0 : }
131 :
132 : /// Get number of streaming walreceivers (normally 0 or 1) from compute.
133 0 : pub fn get_num_streaming(self: &Arc<WalReceivers>) -> usize {
134 0 : self.mutex
135 0 : .lock()
136 0 : .slots
137 0 : .iter()
138 0 : .flatten()
139 0 : // conn_id.is_none skips recovery which also registers here
140 0 : .filter(|s| s.conn_id.is_some() && matches!(s.status, WalReceiverStatus::Streaming))
141 0 : .count()
142 0 : }
143 :
144 : /// Unregister walreceiver.
145 0 : fn unregister(self: &Arc<WalReceivers>, id: WalReceiverId) {
146 0 : let mut shared = self.mutex.lock();
147 0 : shared.slots[id] = None;
148 0 : self.update_num(&shared);
149 0 : }
150 :
151 : /// Broadcast pageserver feedback to connected walproposers.
152 0 : pub fn broadcast_pageserver_feedback(&self, feedback: PageserverFeedback) {
153 0 : // Err means there is no subscribers, it is fine.
154 0 : let _ = self.pageserver_feedback_tx.send(feedback);
155 0 : }
156 : }
157 :
158 : /// Only a few connections are expected (normally one), so store in Vec.
159 : struct WalReceiversShared {
160 : slots: Vec<Option<WalReceiverState>>,
161 : }
162 :
163 : impl WalReceiversShared {
164 : /// Get number of walreceivers (compute connections).
165 0 : fn get_num(&self) -> usize {
166 0 : self.slots.iter().flatten().count()
167 0 : }
168 : }
169 :
170 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
171 : pub struct WalReceiverState {
172 : /// None means it is recovery initiated by us (this safekeeper).
173 : pub conn_id: Option<ConnectionId>,
174 : pub status: WalReceiverStatus,
175 : }
176 :
177 : /// Walreceiver status. Currently only whether it passed voting stage and
178 : /// started receiving the stream, but it is easy to add more if needed.
179 0 : #[derive(Debug, Clone, Serialize, Deserialize)]
180 : pub enum WalReceiverStatus {
181 : Voting,
182 : Streaming,
183 : }
184 :
185 : /// Scope guard to access slot in WalReceivers registry and unregister from
186 : /// it in Drop.
187 : pub struct WalReceiverGuard {
188 : id: WalReceiverId,
189 : walreceivers: Arc<WalReceivers>,
190 : }
191 :
192 : impl WalReceiverGuard {
193 : /// Get reference to locked shared state contents.
194 0 : fn get(&self) -> MappedMutexGuard<WalReceiverState> {
195 0 : self.walreceivers.get_slot(self.id)
196 0 : }
197 : }
198 :
199 : impl Drop for WalReceiverGuard {
200 0 : fn drop(&mut self) {
201 0 : self.walreceivers.unregister(self.id);
202 0 : }
203 : }
204 :
205 : pub const MSG_QUEUE_SIZE: usize = 256;
206 : pub const REPLY_QUEUE_SIZE: usize = 16;
207 :
208 : impl SafekeeperPostgresHandler {
209 : /// Wrapper around handle_start_wal_push_guts handling result. Error is
210 : /// handled here while we're still in walreceiver ttid span; with API
211 : /// extension, this can probably be moved into postgres_backend.
212 0 : pub async fn handle_start_wal_push<IO: AsyncRead + AsyncWrite + Unpin>(
213 0 : &mut self,
214 0 : pgb: &mut PostgresBackend<IO>,
215 0 : ) -> Result<(), QueryError> {
216 0 : let mut tli: Option<WalResidentTimeline> = None;
217 0 : if let Err(end) = self.handle_start_wal_push_guts(pgb, &mut tli).await {
218 : // Log the result and probably send it to the client, closing the stream.
219 0 : let handle_end_fut = pgb.handle_copy_stream_end(end);
220 : // If we managed to create the timeline, augment logging with current LSNs etc.
221 0 : if let Some(tli) = tli {
222 0 : let info = tli.get_safekeeper_info(&self.conf).await;
223 0 : handle_end_fut
224 0 : .instrument(info_span!("", term=%info.term, last_log_term=%info.last_log_term, flush_lsn=%Lsn(info.flush_lsn), commit_lsn=%Lsn(info.commit_lsn)))
225 0 : .await;
226 : } else {
227 0 : handle_end_fut.await;
228 : }
229 0 : }
230 0 : Ok(())
231 0 : }
232 :
233 0 : pub async fn handle_start_wal_push_guts<IO: AsyncRead + AsyncWrite + Unpin>(
234 0 : &mut self,
235 0 : pgb: &mut PostgresBackend<IO>,
236 0 : tli: &mut Option<WalResidentTimeline>,
237 0 : ) -> Result<(), CopyStreamHandlerEnd> {
238 0 : // Notify the libpq client that it's allowed to send `CopyData` messages
239 0 : pgb.write_message(&BeMessage::CopyBothResponse).await?;
240 :
241 : // Experiments [1] confirm that doing network IO in one (this) thread and
242 : // processing with disc IO in another significantly improves
243 : // performance; we spawn off WalAcceptor thread for message processing
244 : // to this end.
245 : //
246 : // [1] https://github.com/neondatabase/neon/pull/1318
247 0 : let (msg_tx, msg_rx) = channel(MSG_QUEUE_SIZE);
248 0 : let (reply_tx, reply_rx) = channel(REPLY_QUEUE_SIZE);
249 0 : let mut acceptor_handle: Option<JoinHandle<anyhow::Result<()>>> = None;
250 :
251 : // Concurrently receive and send data; replies are not synchronized with
252 : // sends, so this avoids deadlocks.
253 0 : let mut pgb_reader = pgb.split().context("START_WAL_PUSH split")?;
254 0 : let peer_addr = *pgb.get_peer_addr();
255 0 : let mut network_reader = NetworkReader {
256 0 : ttid: self.ttid,
257 0 : conn_id: self.conn_id,
258 0 : pgb_reader: &mut pgb_reader,
259 0 : peer_addr,
260 0 : acceptor_handle: &mut acceptor_handle,
261 0 : };
262 :
263 : // Read first message and create timeline if needed.
264 0 : let res = network_reader.read_first_message().await;
265 :
266 0 : let network_res = if let Ok((timeline, next_msg)) = res {
267 0 : let pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback> =
268 0 : timeline
269 0 : .get_walreceivers()
270 0 : .pageserver_feedback_tx
271 0 : .subscribe();
272 0 : *tli = Some(timeline.wal_residence_guard().await?);
273 :
274 0 : tokio::select! {
275 : // todo: add read|write .context to these errors
276 0 : r = network_reader.run(msg_tx, msg_rx, reply_tx, timeline, next_msg) => r,
277 0 : r = network_write(pgb, reply_rx, pageserver_feedback_rx) => r,
278 : }
279 : } else {
280 0 : res.map(|_| ())
281 : };
282 :
283 : // Join pg backend back.
284 0 : pgb.unsplit(pgb_reader)?;
285 :
286 : // Join the spawned WalAcceptor. At this point chans to/from it passed
287 : // to network routines are dropped, so it will exit as soon as it
288 : // touches them.
289 0 : match acceptor_handle {
290 : None => {
291 : // failed even before spawning; read_network should have error
292 0 : Err(network_res.expect_err("no error with WalAcceptor not spawn"))
293 : }
294 0 : Some(handle) => {
295 0 : let wal_acceptor_res = handle.await;
296 :
297 : // If there was any network error, return it.
298 0 : network_res?;
299 :
300 : // Otherwise, WalAcceptor thread must have errored.
301 0 : match wal_acceptor_res {
302 0 : Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination
303 0 : Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))),
304 0 : Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!(
305 0 : "WalAcceptor task panicked",
306 0 : ))),
307 : }
308 : }
309 : }
310 0 : }
311 : }
312 :
313 : struct NetworkReader<'a, IO> {
314 : ttid: TenantTimelineId,
315 : conn_id: ConnectionId,
316 : pgb_reader: &'a mut PostgresBackendReader<IO>,
317 : peer_addr: SocketAddr,
318 : // WalAcceptor is spawned when we learn server info from walproposer and
319 : // create timeline; handle is put here.
320 : acceptor_handle: &'a mut Option<JoinHandle<anyhow::Result<()>>>,
321 : }
322 :
323 : impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> {
324 0 : async fn read_first_message(
325 0 : &mut self,
326 0 : ) -> Result<(WalResidentTimeline, ProposerAcceptorMessage), CopyStreamHandlerEnd> {
327 : // Receive information about server to create timeline, if not yet.
328 0 : let next_msg = read_message(self.pgb_reader).await?;
329 0 : let tli = match next_msg {
330 0 : ProposerAcceptorMessage::Greeting(ref greeting) => {
331 0 : info!(
332 0 : "start handshake with walproposer {} sysid {} timeline {}",
333 : self.peer_addr, greeting.system_id, greeting.tli,
334 : );
335 0 : let server_info = ServerInfo {
336 0 : pg_version: greeting.pg_version,
337 0 : system_id: greeting.system_id,
338 0 : wal_seg_size: greeting.wal_seg_size,
339 0 : };
340 0 : let tli =
341 0 : GlobalTimelines::create(self.ttid, server_info, Lsn::INVALID, Lsn::INVALID)
342 0 : .await?;
343 0 : tli.wal_residence_guard().await?
344 : }
345 : _ => {
346 0 : return Err(CopyStreamHandlerEnd::Other(anyhow::anyhow!(
347 0 : "unexpected message {next_msg:?} instead of greeting"
348 0 : )))
349 : }
350 : };
351 0 : Ok((tli, next_msg))
352 0 : }
353 :
354 0 : async fn run(
355 0 : self,
356 0 : msg_tx: Sender<ProposerAcceptorMessage>,
357 0 : msg_rx: Receiver<ProposerAcceptorMessage>,
358 0 : reply_tx: Sender<AcceptorProposerMessage>,
359 0 : tli: WalResidentTimeline,
360 0 : next_msg: ProposerAcceptorMessage,
361 0 : ) -> Result<(), CopyStreamHandlerEnd> {
362 0 : *self.acceptor_handle = Some(WalAcceptor::spawn(
363 0 : tli,
364 0 : msg_rx,
365 0 : reply_tx,
366 0 : Some(self.conn_id),
367 0 : ));
368 0 :
369 0 : // Forward all messages to WalAcceptor
370 0 : read_network_loop(self.pgb_reader, msg_tx, next_msg).await
371 0 : }
372 : }
373 :
374 : /// Read next message from walproposer.
375 : /// TODO: Return Ok(None) on graceful termination.
376 0 : async fn read_message<IO: AsyncRead + AsyncWrite + Unpin>(
377 0 : pgb_reader: &mut PostgresBackendReader<IO>,
378 0 : ) -> Result<ProposerAcceptorMessage, CopyStreamHandlerEnd> {
379 0 : let copy_data = pgb_reader.read_copy_message().await?;
380 0 : let msg = ProposerAcceptorMessage::parse(copy_data)?;
381 0 : Ok(msg)
382 0 : }
383 :
384 0 : async fn read_network_loop<IO: AsyncRead + AsyncWrite + Unpin>(
385 0 : pgb_reader: &mut PostgresBackendReader<IO>,
386 0 : msg_tx: Sender<ProposerAcceptorMessage>,
387 0 : mut next_msg: ProposerAcceptorMessage,
388 0 : ) -> Result<(), CopyStreamHandlerEnd> {
389 : loop {
390 0 : if msg_tx.send(next_msg).await.is_err() {
391 0 : return Ok(()); // chan closed, WalAcceptor terminated
392 0 : }
393 0 : next_msg = read_message(pgb_reader).await?;
394 : }
395 0 : }
396 :
397 : /// Read replies from WalAcceptor and pass them back to socket. Returns Ok(())
398 : /// if reply_rx closed; it must mean WalAcceptor terminated, joining it should
399 : /// tell the error.
400 0 : async fn network_write<IO: AsyncRead + AsyncWrite + Unpin>(
401 0 : pgb_writer: &mut PostgresBackend<IO>,
402 0 : mut reply_rx: Receiver<AcceptorProposerMessage>,
403 0 : mut pageserver_feedback_rx: tokio::sync::broadcast::Receiver<PageserverFeedback>,
404 0 : ) -> Result<(), CopyStreamHandlerEnd> {
405 0 : let mut buf = BytesMut::with_capacity(128);
406 0 :
407 0 : // storing append_response to inject PageserverFeedback into it
408 0 : let mut last_append_response = None;
409 :
410 : loop {
411 : // trying to read either AcceptorProposerMessage or PageserverFeedback
412 0 : let msg = tokio::select! {
413 0 : reply = reply_rx.recv() => {
414 0 : if let Some(msg) = reply {
415 0 : if let AcceptorProposerMessage::AppendResponse(append_response) = &msg {
416 0 : last_append_response = Some(append_response.clone());
417 0 : }
418 0 : Some(msg)
419 : } else {
420 0 : return Ok(()); // chan closed, WalAcceptor terminated
421 : }
422 : }
423 :
424 0 : feedback = pageserver_feedback_rx.recv() =>
425 0 : match (feedback, &last_append_response) {
426 0 : (Ok(feedback), Some(append_response)) => {
427 0 : // clone AppendResponse and inject PageserverFeedback into it
428 0 : let mut append_response = append_response.clone();
429 0 : append_response.pageserver_feedback = Some(feedback);
430 0 : Some(AcceptorProposerMessage::AppendResponse(append_response))
431 : }
432 0 : _ => None,
433 : }
434 : };
435 :
436 0 : let Some(msg) = msg else {
437 0 : continue;
438 : };
439 :
440 0 : buf.clear();
441 0 : msg.serialize(&mut buf)?;
442 0 : pgb_writer.write_message(&BeMessage::CopyData(&buf)).await?;
443 : }
444 0 : }
445 :
446 : // Send keepalive messages to walproposer, to make sure it receives updates
447 : // even when it writes a steady stream of messages.
448 : const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
449 :
450 : /// Encapsulates a task which takes messages from msg_rx, processes and pushes
451 : /// replies to reply_tx.
452 : ///
453 : /// Reading from socket and writing to disk in parallel is beneficial for
454 : /// performance, this struct provides the writing to disk part.
455 : pub struct WalAcceptor {
456 : tli: WalResidentTimeline,
457 : msg_rx: Receiver<ProposerAcceptorMessage>,
458 : reply_tx: Sender<AcceptorProposerMessage>,
459 : conn_id: Option<ConnectionId>,
460 : }
461 :
462 : impl WalAcceptor {
463 : /// Spawn task with WalAcceptor running, return handle to it. Task returns
464 : /// Ok(()) if either of channels has closed, and Err if any error during
465 : /// message processing is encountered.
466 : ///
467 : /// conn_id None means WalAcceptor is used by recovery initiated at this safekeeper.
468 0 : pub fn spawn(
469 0 : tli: WalResidentTimeline,
470 0 : msg_rx: Receiver<ProposerAcceptorMessage>,
471 0 : reply_tx: Sender<AcceptorProposerMessage>,
472 0 : conn_id: Option<ConnectionId>,
473 0 : ) -> JoinHandle<anyhow::Result<()>> {
474 0 : task::spawn(async move {
475 0 : let mut wa = WalAcceptor {
476 0 : tli,
477 0 : msg_rx,
478 0 : reply_tx,
479 0 : conn_id,
480 0 : };
481 0 :
482 0 : let span_ttid = wa.tli.ttid; // satisfy borrow checker
483 0 : wa.run()
484 0 : .instrument(
485 0 : info_span!("WAL acceptor", cid = %conn_id.unwrap_or(0), ttid = %span_ttid),
486 : )
487 0 : .await
488 0 : })
489 0 : }
490 :
491 : /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed;
492 : /// it must mean that network thread terminated.
493 0 : async fn run(&mut self) -> anyhow::Result<()> {
494 0 : let walreceiver_guard = self.tli.get_walreceivers().register(self.conn_id);
495 0 :
496 0 : // After this timestamp we will stop processing AppendRequests and send a response
497 0 : // to the walproposer. walproposer sends at least one AppendRequest per second,
498 0 : // we will send keepalives by replying to these requests once per second.
499 0 : let mut next_keepalive = Instant::now();
500 :
501 0 : while let Some(mut next_msg) = self.msg_rx.recv().await {
502 : // Update walreceiver state in shmem for reporting.
503 0 : if let ProposerAcceptorMessage::Elected(_) = &next_msg {
504 0 : walreceiver_guard.get().status = WalReceiverStatus::Streaming;
505 0 : }
506 :
507 0 : let reply_msg = if matches!(next_msg, ProposerAcceptorMessage::AppendRequest(_)) {
508 : // Loop through AppendRequests while available to write as many WAL records as
509 : // possible without fsyncing.
510 : //
511 : // Make sure the WAL is flushed before returning, see:
512 : // https://github.com/neondatabase/neon/issues/9259
513 : //
514 : // Note: this will need to be rewritten if we want to read non-AppendRequest messages here.
515 : // Otherwise, we might end up in a situation where we read a message, but don't
516 : // process it.
517 0 : while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg {
518 0 : let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request);
519 :
520 0 : if let Some(reply) = self.tli.process_msg(&noflush_msg).await? {
521 0 : if self.reply_tx.send(reply).await.is_err() {
522 0 : break; // disconnected, flush WAL and return on next send/recv
523 0 : }
524 0 : }
525 :
526 : // get out of this loop if keepalive time is reached
527 0 : if Instant::now() >= next_keepalive {
528 0 : break;
529 0 : }
530 0 :
531 0 : // continue pulling AppendRequests if available
532 0 : match self.msg_rx.try_recv() {
533 0 : Ok(msg) => next_msg = msg,
534 0 : Err(TryRecvError::Empty) => break,
535 : // on disconnect, flush WAL and return on next send/recv
536 0 : Err(TryRecvError::Disconnected) => break,
537 : };
538 : }
539 :
540 : // flush all written WAL to the disk
541 0 : self.tli
542 0 : .process_msg(&ProposerAcceptorMessage::FlushWAL)
543 0 : .await?
544 : } else {
545 : // process message other than AppendRequest
546 0 : self.tli.process_msg(&next_msg).await?
547 : };
548 :
549 0 : if let Some(reply) = reply_msg {
550 0 : if self.reply_tx.send(reply).await.is_err() {
551 0 : return Ok(()); // chan closed, streaming terminated
552 0 : }
553 0 : // reset keepalive time
554 0 : next_keepalive = Instant::now() + KEEPALIVE_INTERVAL;
555 0 : }
556 : }
557 0 : Ok(())
558 0 : }
559 : }
|