Line data Source code
1 : //! Actual Postgres connection handler to stream WAL to the server.
2 :
3 : use std::{
4 : error::Error,
5 : pin::pin,
6 : str::FromStr,
7 : sync::Arc,
8 : time::{Duration, SystemTime},
9 : };
10 :
11 : use anyhow::{anyhow, Context};
12 : use bytes::BytesMut;
13 : use chrono::{NaiveDateTime, Utc};
14 : use fail::fail_point;
15 : use futures::StreamExt;
16 : use postgres::{error::SqlState, SimpleQueryMessage, SimpleQueryRow};
17 : use postgres_ffi::WAL_SEGMENT_SIZE;
18 : use postgres_ffi::{v14::xlog_utils::normalize_lsn, waldecoder::WalDecodeError};
19 : use postgres_protocol::message::backend::ReplicationMessage;
20 : use postgres_types::PgLsn;
21 : use tokio::{select, sync::watch, time};
22 : use tokio_postgres::{replication::ReplicationStream, Client};
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::{debug, error, info, trace, warn, Instrument};
25 : use wal_decoder::{
26 : models::{FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords},
27 : wire_format::FromWireFormat,
28 : };
29 :
30 : use super::TaskStateUpdate;
31 : use crate::{
32 : context::RequestContext,
33 : metrics::{LIVE_CONNECTIONS, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
34 : pgdatadir_mapping::DatadirModification,
35 : task_mgr::{TaskKind, WALRECEIVER_RUNTIME},
36 : tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
37 : walingest::WalIngest,
38 : };
39 : use postgres_backend::is_expected_io_error;
40 : use postgres_connection::PgConnectionConfig;
41 : use postgres_ffi::waldecoder::WalStreamDecoder;
42 : use utils::{id::NodeId, lsn::Lsn, postgres_client::PostgresClientProtocol};
43 : use utils::{pageserver_feedback::PageserverFeedback, sync::gate::GateError};
44 :
45 : /// Status of the connection.
46 : #[derive(Debug, Clone, Copy)]
47 : pub(super) struct WalConnectionStatus {
48 : /// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
49 : pub is_connected: bool,
50 : /// Defines a healthy connection as one on which pageserver received WAL from safekeeper
51 : /// and is able to process it in walingest without errors.
52 : pub has_processed_wal: bool,
53 : /// Connection establishment time or the timestamp of a latest connection message received.
54 : pub latest_connection_update: NaiveDateTime,
55 : /// Time of the latest WAL message received.
56 : pub latest_wal_update: NaiveDateTime,
57 : /// Latest WAL update contained WAL up to this LSN. Next WAL message with start from that LSN.
58 : pub streaming_lsn: Option<Lsn>,
59 : /// Latest commit_lsn received from the safekeeper. Can be zero if no message has been received yet.
60 : pub commit_lsn: Option<Lsn>,
61 : /// The node it is connected to
62 : pub node: NodeId,
63 : }
64 :
65 : pub(super) enum WalReceiverError {
66 : /// An error of a type that does not indicate an issue, e.g. a connection closing
67 : ExpectedSafekeeperError(postgres::Error),
68 : /// An "error" message that carries a SUCCESSFUL_COMPLETION status code. Carries
69 : /// the message part of the original postgres error
70 : SuccessfulCompletion(String),
71 : /// Generic error
72 : Other(anyhow::Error),
73 : ClosedGate,
74 : }
75 :
76 : impl From<tokio_postgres::Error> for WalReceiverError {
77 0 : fn from(err: tokio_postgres::Error) -> Self {
78 0 : if let Some(dberror) = err.as_db_error().filter(|db_error| {
79 0 : db_error.code() == &SqlState::SUCCESSFUL_COMPLETION
80 0 : && db_error.message().contains("ending streaming")
81 0 : }) {
82 : // Strip the outer DbError, which carries a misleading "error" severity
83 0 : Self::SuccessfulCompletion(dberror.message().to_string())
84 0 : } else if err.is_closed()
85 0 : || err
86 0 : .source()
87 0 : .and_then(|source| source.downcast_ref::<std::io::Error>())
88 0 : .map(is_expected_io_error)
89 0 : .unwrap_or(false)
90 : {
91 0 : Self::ExpectedSafekeeperError(err)
92 : } else {
93 0 : Self::Other(anyhow::Error::new(err))
94 : }
95 0 : }
96 : }
97 :
98 : impl From<anyhow::Error> for WalReceiverError {
99 0 : fn from(err: anyhow::Error) -> Self {
100 0 : Self::Other(err)
101 0 : }
102 : }
103 :
104 : impl From<WalDecodeError> for WalReceiverError {
105 0 : fn from(err: WalDecodeError) -> Self {
106 0 : Self::Other(anyhow::Error::new(err))
107 0 : }
108 : }
109 :
110 : /// Open a connection to the given safekeeper and receive WAL, sending back progress
111 : /// messages as we go.
112 : #[allow(clippy::too_many_arguments)]
113 0 : pub(super) async fn handle_walreceiver_connection(
114 0 : timeline: Arc<Timeline>,
115 0 : protocol: PostgresClientProtocol,
116 0 : wal_source_connconf: PgConnectionConfig,
117 0 : events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
118 0 : cancellation: CancellationToken,
119 0 : connect_timeout: Duration,
120 0 : ctx: RequestContext,
121 0 : node: NodeId,
122 0 : ingest_batch_size: u64,
123 0 : ) -> Result<(), WalReceiverError> {
124 0 : debug_assert_current_span_has_tenant_and_timeline_id();
125 :
126 : // prevent timeline shutdown from finishing until we have exited
127 0 : let _guard = timeline.gate.enter().map_err(|e| match e {
128 0 : GateError::GateClosed => WalReceiverError::ClosedGate,
129 0 : })?;
130 : // This function spawns a side-car task (WalReceiverConnectionPoller).
131 : // Get its gate guard now as well.
132 0 : let poller_guard = timeline.gate.enter().map_err(|e| match e {
133 0 : GateError::GateClosed => WalReceiverError::ClosedGate,
134 0 : })?;
135 :
136 0 : WALRECEIVER_STARTED_CONNECTIONS.inc();
137 0 :
138 0 : // Connect to the database in replication mode.
139 0 : info!("connecting to {wal_source_connconf:?}");
140 :
141 0 : let (replication_client, connection) = {
142 0 : let mut config = wal_source_connconf.to_tokio_postgres_config();
143 0 : config.application_name("pageserver");
144 0 : config.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
145 0 : match time::timeout(connect_timeout, config.connect(postgres::NoTls)).await {
146 0 : Ok(client_and_conn) => client_and_conn?,
147 0 : Err(_elapsed) => {
148 0 : // Timing out to connect to a safekeeper node could happen long time, due to
149 0 : // many reasons that pageserver cannot control.
150 0 : // Do not produce an error, but make it visible, that timeouts happen by logging the `event.
151 0 : info!("Timed out while waiting {connect_timeout:?} for walreceiver connection to open");
152 0 : return Ok(());
153 : }
154 : }
155 : };
156 :
157 0 : debug!("connected!");
158 0 : let mut connection_status = WalConnectionStatus {
159 0 : is_connected: true,
160 0 : has_processed_wal: false,
161 0 : latest_connection_update: Utc::now().naive_utc(),
162 0 : latest_wal_update: Utc::now().naive_utc(),
163 0 : streaming_lsn: None,
164 0 : commit_lsn: None,
165 0 : node,
166 0 : };
167 0 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
168 0 : warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
169 0 : return Ok(());
170 0 : }
171 0 :
172 0 : // The connection object performs the actual communication with the database,
173 0 : // so spawn it off to run on its own. It shouldn't outlive this function, but,
174 0 : // due to lack of async drop, we can't enforce that. However, we ensure that
175 0 : // 1. it is sensitive to `cancellation` and
176 0 : // 2. holds the Timeline gate open so that after timeline shutdown,
177 0 : // we know this task is gone.
178 0 : let _connection_ctx = ctx.detached_child(
179 0 : TaskKind::WalReceiverConnectionPoller,
180 0 : ctx.download_behavior(),
181 0 : );
182 0 : let connection_cancellation = cancellation.clone();
183 0 : WALRECEIVER_RUNTIME.spawn(
184 0 : async move {
185 0 : debug_assert_current_span_has_tenant_and_timeline_id();
186 0 : select! {
187 0 : connection_result = connection => match connection_result {
188 0 : Ok(()) => debug!("Walreceiver db connection closed"),
189 0 : Err(connection_error) => {
190 0 : match WalReceiverError::from(connection_error) {
191 0 : WalReceiverError::ExpectedSafekeeperError(_) => {
192 0 : // silence, because most likely we've already exited the outer call
193 0 : // with a similar error.
194 0 : },
195 0 : WalReceiverError::SuccessfulCompletion(_) => {}
196 0 : WalReceiverError::ClosedGate => {
197 0 : // doesn't happen at runtime
198 0 : }
199 0 : WalReceiverError::Other(err) => {
200 0 : warn!("Connection aborted: {err:#}")
201 : }
202 : }
203 : }
204 : },
205 0 : _ = connection_cancellation.cancelled() => debug!("Connection cancelled"),
206 : }
207 0 : drop(poller_guard);
208 0 : }
209 : // Enrich the log lines emitted by this closure with meaningful context.
210 : // TODO: technically, this task outlives the surrounding function, so, the
211 : // spans won't be properly nested.
212 0 : .instrument(tracing::info_span!("poller")),
213 : );
214 :
215 0 : let _guard = LIVE_CONNECTIONS
216 0 : .with_label_values(&["wal_receiver"])
217 0 : .guard();
218 :
219 0 : let identify = identify_system(&replication_client).await?;
220 0 : info!("{identify:?}");
221 :
222 0 : let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
223 0 : let mut caught_up = false;
224 0 :
225 0 : connection_status.latest_connection_update = Utc::now().naive_utc();
226 0 : connection_status.latest_wal_update = Utc::now().naive_utc();
227 0 : connection_status.commit_lsn = Some(end_of_wal);
228 0 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
229 0 : warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
230 0 : return Ok(());
231 0 : }
232 0 :
233 0 : //
234 0 : // Start streaming the WAL, from where we left off previously.
235 0 : //
236 0 : // If we had previously received WAL up to some point in the middle of a WAL record, we
237 0 : // better start from the end of last full WAL record, not in the middle of one.
238 0 : let mut last_rec_lsn = timeline.get_last_record_lsn();
239 0 : let mut startpoint = last_rec_lsn;
240 0 :
241 0 : if startpoint == Lsn(0) {
242 0 : return Err(WalReceiverError::Other(anyhow!("No previous WAL position")));
243 0 : }
244 0 :
245 0 : // There might be some padding after the last full record, skip it.
246 0 : startpoint += startpoint.calc_padding(8u32);
247 0 :
248 0 : // If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
249 0 : // for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
250 0 : //. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
251 0 : // but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
252 0 : // header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
253 0 : // to the safekeepers.
254 0 : startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
255 0 :
256 0 : info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
257 :
258 0 : let query = format!("START_REPLICATION PHYSICAL {startpoint}");
259 :
260 0 : let copy_stream = replication_client.copy_both_simple(&query).await?;
261 0 : let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
262 0 :
263 0 : let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
264 :
265 0 : let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
266 :
267 0 : let interpreted_proto_config = match protocol {
268 0 : PostgresClientProtocol::Vanilla => None,
269 : PostgresClientProtocol::Interpreted {
270 0 : format,
271 0 : compression,
272 0 : } => Some((format, compression)),
273 : };
274 :
275 0 : while let Some(replication_message) = {
276 0 : select! {
277 0 : _ = cancellation.cancelled() => {
278 0 : debug!("walreceiver interrupted");
279 0 : None
280 : }
281 0 : replication_message = physical_stream.next() => replication_message,
282 : }
283 : } {
284 0 : let replication_message = replication_message?;
285 :
286 0 : let now = Utc::now().naive_utc();
287 0 : let last_rec_lsn_before_msg = last_rec_lsn;
288 0 :
289 0 : // Update the connection status before processing the message. If the message processing
290 0 : // fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
291 0 : match &replication_message {
292 0 : ReplicationMessage::XLogData(xlog_data) => {
293 0 : connection_status.latest_connection_update = now;
294 0 : connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
295 0 : connection_status.streaming_lsn = Some(Lsn::from(
296 0 : xlog_data.wal_start() + xlog_data.data().len() as u64,
297 0 : ));
298 0 : if !xlog_data.data().is_empty() {
299 0 : connection_status.latest_wal_update = now;
300 0 : }
301 : }
302 0 : ReplicationMessage::PrimaryKeepAlive(keepalive) => {
303 0 : connection_status.latest_connection_update = now;
304 0 : connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
305 0 : }
306 0 : ReplicationMessage::RawInterpretedWalRecords(raw) => {
307 0 : connection_status.latest_connection_update = now;
308 0 : if !raw.data().is_empty() {
309 0 : connection_status.latest_wal_update = now;
310 0 : }
311 :
312 0 : connection_status.commit_lsn = Some(Lsn::from(raw.commit_lsn()));
313 0 : connection_status.streaming_lsn = Some(Lsn::from(raw.streaming_lsn()));
314 : }
315 0 : &_ => {}
316 : };
317 0 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
318 0 : warn!("Wal connection event listener dropped, aborting the connection: {e}");
319 0 : return Ok(());
320 0 : }
321 :
322 0 : async fn commit(
323 0 : modification: &mut DatadirModification<'_>,
324 0 : uncommitted: &mut u64,
325 0 : filtered: &mut u64,
326 0 : ctx: &RequestContext,
327 0 : ) -> anyhow::Result<()> {
328 0 : WAL_INGEST
329 0 : .records_committed
330 0 : .inc_by(*uncommitted - *filtered);
331 0 : modification.commit(ctx).await?;
332 0 : *uncommitted = 0;
333 0 : *filtered = 0;
334 0 : Ok(())
335 0 : }
336 :
337 0 : let status_update = match replication_message {
338 0 : ReplicationMessage::RawInterpretedWalRecords(raw) => {
339 0 : WAL_INGEST.bytes_received.inc_by(raw.data().len() as u64);
340 0 :
341 0 : let mut uncommitted_records = 0;
342 0 : let mut filtered_records = 0;
343 0 :
344 0 : // This is the end LSN of the raw WAL from which the records
345 0 : // were interpreted.
346 0 : let streaming_lsn = Lsn::from(raw.streaming_lsn());
347 0 :
348 0 : let (format, compression) = interpreted_proto_config.unwrap();
349 0 : let batch = InterpretedWalRecords::from_wire(raw.data(), format, compression)
350 0 : .await
351 0 : .with_context(|| {
352 0 : anyhow::anyhow!(
353 0 : "Failed to deserialize interpreted records ending at LSN {streaming_lsn}"
354 0 : )
355 0 : })?;
356 :
357 : let InterpretedWalRecords {
358 0 : records,
359 0 : next_record_lsn,
360 0 : } = batch;
361 0 :
362 0 : tracing::debug!(
363 0 : "Received WAL up to {} with next_record_lsn={:?}",
364 : streaming_lsn,
365 : next_record_lsn
366 : );
367 :
368 : // We start the modification at 0 because each interpreted record
369 : // advances it to its end LSN. 0 is just an initialization placeholder.
370 0 : let mut modification = timeline.begin_modification(Lsn(0));
371 :
372 0 : for interpreted in records {
373 0 : if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
374 0 : && uncommitted_records > 0
375 : {
376 0 : commit(
377 0 : &mut modification,
378 0 : &mut uncommitted_records,
379 0 : &mut filtered_records,
380 0 : &ctx,
381 0 : )
382 0 : .await?;
383 0 : }
384 :
385 0 : let local_next_record_lsn = interpreted.next_record_lsn;
386 0 : let ingested = walingest
387 0 : .ingest_record(interpreted, &mut modification, &ctx)
388 0 : .await
389 0 : .with_context(|| {
390 0 : format!("could not ingest record at {local_next_record_lsn}")
391 0 : })?;
392 :
393 0 : if !ingested {
394 0 : tracing::debug!(
395 0 : "ingest: filtered out record @ LSN {local_next_record_lsn}"
396 : );
397 0 : WAL_INGEST.records_filtered.inc();
398 0 : filtered_records += 1;
399 0 : }
400 :
401 0 : uncommitted_records += 1;
402 0 :
403 0 : // FIXME: this cannot be made pausable_failpoint without fixing the
404 0 : // failpoint library; in tests, the added amount of debugging will cause us
405 0 : // to timeout the tests.
406 0 : fail_point!("walreceiver-after-ingest");
407 0 :
408 0 : // Commit every ingest_batch_size records. Even if we filtered out
409 0 : // all records, we still need to call commit to advance the LSN.
410 0 : if uncommitted_records >= ingest_batch_size
411 0 : || modification.approx_pending_bytes()
412 0 : > DatadirModification::MAX_PENDING_BYTES
413 : {
414 0 : commit(
415 0 : &mut modification,
416 0 : &mut uncommitted_records,
417 0 : &mut filtered_records,
418 0 : &ctx,
419 0 : )
420 0 : .await?;
421 0 : }
422 : }
423 :
424 : // Records might have been filtered out on the safekeeper side, but we still
425 : // need to advance last record LSN on all shards. If we've not ingested the latest
426 : // record, then set the LSN of the modification past it. This way all shards
427 : // advance their last record LSN at the same time.
428 0 : let needs_last_record_lsn_advance = match next_record_lsn.map(Lsn::from) {
429 0 : Some(lsn) if lsn > modification.get_lsn() => {
430 0 : modification.set_lsn(lsn).unwrap();
431 0 : true
432 : }
433 0 : _ => false,
434 : };
435 :
436 0 : if uncommitted_records > 0 || needs_last_record_lsn_advance {
437 : // Commit any uncommitted records
438 0 : commit(
439 0 : &mut modification,
440 0 : &mut uncommitted_records,
441 0 : &mut filtered_records,
442 0 : &ctx,
443 0 : )
444 0 : .await?;
445 0 : }
446 :
447 0 : if !caught_up && streaming_lsn >= end_of_wal {
448 0 : info!("caught up at LSN {streaming_lsn}");
449 0 : caught_up = true;
450 0 : }
451 :
452 0 : tracing::debug!(
453 0 : "Ingested WAL up to {streaming_lsn}. Last record LSN is {}",
454 0 : timeline.get_last_record_lsn()
455 : );
456 :
457 0 : Some(streaming_lsn)
458 : }
459 :
460 0 : ReplicationMessage::XLogData(xlog_data) => {
461 0 : // Pass the WAL data to the decoder, and see if we can decode
462 0 : // more records as a result.
463 0 : let data = xlog_data.data();
464 0 : let startlsn = Lsn::from(xlog_data.wal_start());
465 0 : let endlsn = startlsn + data.len() as u64;
466 0 :
467 0 : trace!("received XLogData between {startlsn} and {endlsn}");
468 :
469 0 : WAL_INGEST.bytes_received.inc_by(data.len() as u64);
470 0 : waldecoder.feed_bytes(data);
471 0 :
472 0 : {
473 0 : let mut modification = timeline.begin_modification(startlsn);
474 0 : let mut uncommitted_records = 0;
475 0 : let mut filtered_records = 0;
476 :
477 0 : while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
478 : // It is important to deal with the aligned records as lsn in getPage@LSN is
479 : // aligned and can be several bytes bigger. Without this alignment we are
480 : // at risk of hitting a deadlock.
481 0 : if !next_record_lsn.is_aligned() {
482 0 : return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
483 0 : }
484 :
485 : // Deserialize and interpret WAL record
486 0 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
487 0 : recdata,
488 0 : modification.tline.get_shard_identity(),
489 0 : next_record_lsn,
490 0 : modification.tline.pg_version,
491 0 : )?;
492 :
493 0 : if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
494 0 : && uncommitted_records > 0
495 : {
496 : // Special case: legacy PG database creations operate by reading pages from a 'template' database:
497 : // these are the only kinds of WAL record that require reading data blocks while ingesting. Ensure
498 : // all earlier writes of data blocks are visible by committing any modification in flight.
499 0 : commit(
500 0 : &mut modification,
501 0 : &mut uncommitted_records,
502 0 : &mut filtered_records,
503 0 : &ctx,
504 0 : )
505 0 : .await?;
506 0 : }
507 :
508 : // Ingest the records without immediately committing them.
509 0 : let ingested = walingest
510 0 : .ingest_record(interpreted, &mut modification, &ctx)
511 0 : .await
512 0 : .with_context(|| {
513 0 : format!("could not ingest record at {next_record_lsn}")
514 0 : })?;
515 0 : if !ingested {
516 0 : tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
517 0 : WAL_INGEST.records_filtered.inc();
518 0 : filtered_records += 1;
519 0 : }
520 :
521 : // FIXME: this cannot be made pausable_failpoint without fixing the
522 : // failpoint library; in tests, the added amount of debugging will cause us
523 : // to timeout the tests.
524 0 : fail_point!("walreceiver-after-ingest");
525 0 :
526 0 : last_rec_lsn = next_record_lsn;
527 0 :
528 0 : // Commit every ingest_batch_size records. Even if we filtered out
529 0 : // all records, we still need to call commit to advance the LSN.
530 0 : uncommitted_records += 1;
531 0 : if uncommitted_records >= ingest_batch_size
532 0 : || modification.approx_pending_bytes()
533 0 : > DatadirModification::MAX_PENDING_BYTES
534 : {
535 0 : commit(
536 0 : &mut modification,
537 0 : &mut uncommitted_records,
538 0 : &mut filtered_records,
539 0 : &ctx,
540 0 : )
541 0 : .await?;
542 0 : }
543 : }
544 :
545 : // Commit the remaining records.
546 0 : if uncommitted_records > 0 {
547 0 : commit(
548 0 : &mut modification,
549 0 : &mut uncommitted_records,
550 0 : &mut filtered_records,
551 0 : &ctx,
552 0 : )
553 0 : .await?;
554 0 : }
555 : }
556 :
557 0 : if !caught_up && endlsn >= end_of_wal {
558 0 : info!("caught up at LSN {endlsn}");
559 0 : caught_up = true;
560 0 : }
561 :
562 0 : Some(endlsn)
563 : }
564 :
565 0 : ReplicationMessage::PrimaryKeepAlive(keepalive) => {
566 0 : let wal_end = keepalive.wal_end();
567 0 : let timestamp = keepalive.timestamp();
568 0 : let reply_requested = keepalive.reply() != 0;
569 0 :
570 0 : trace!("received PrimaryKeepAlive(wal_end: {wal_end}, timestamp: {timestamp:?} reply: {reply_requested})");
571 :
572 0 : if reply_requested {
573 0 : Some(last_rec_lsn)
574 : } else {
575 0 : None
576 : }
577 : }
578 :
579 0 : _ => None,
580 : };
581 :
582 0 : if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
583 : // We have successfully processed at least one WAL record.
584 0 : connection_status.has_processed_wal = true;
585 0 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
586 0 : warn!("Wal connection event listener dropped, aborting the connection: {e}");
587 0 : return Ok(());
588 0 : }
589 0 : }
590 :
591 0 : if let Some(last_lsn) = status_update {
592 0 : let timeline_remote_consistent_lsn = timeline
593 0 : .get_remote_consistent_lsn_visible()
594 0 : .unwrap_or(Lsn(0));
595 0 :
596 0 : // The last LSN we processed. It is not guaranteed to survive pageserver crash.
597 0 : let last_received_lsn = last_lsn;
598 0 : // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
599 0 : let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
600 0 : // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
601 0 : // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
602 0 : let remote_consistent_lsn = timeline_remote_consistent_lsn;
603 0 : let ts = SystemTime::now();
604 0 :
605 0 : // Update the status about what we just received. This is shown in the mgmt API.
606 0 : let last_received_wal = WalReceiverInfo {
607 0 : wal_source_connconf: wal_source_connconf.clone(),
608 0 : last_received_msg_lsn: last_lsn,
609 0 : last_received_msg_ts: ts
610 0 : .duration_since(SystemTime::UNIX_EPOCH)
611 0 : .expect("Received message time should be before UNIX EPOCH!")
612 0 : .as_micros(),
613 0 : };
614 0 : *timeline.last_received_wal.lock().unwrap() = Some(last_received_wal);
615 :
616 : // Send the replication feedback message.
617 : // Regular standby_status_update fields are put into this message.
618 0 : let current_timeline_size = if timeline.tenant_shard_id.is_shard_zero() {
619 0 : timeline
620 0 : .get_current_logical_size(
621 0 : crate::tenant::timeline::GetLogicalSizePriority::User,
622 0 : &ctx,
623 0 : )
624 0 : // FIXME: https://github.com/neondatabase/neon/issues/5963
625 0 : .size_dont_care_about_accuracy()
626 : } else {
627 : // Non-zero shards send zero for logical size. The safekeeper will ignore
628 : // this number. This is because in a sharded tenant, only shard zero maintains
629 : // accurate logical size.
630 0 : 0
631 : };
632 :
633 0 : let status_update = PageserverFeedback {
634 0 : current_timeline_size,
635 0 : last_received_lsn,
636 0 : disk_consistent_lsn,
637 0 : remote_consistent_lsn,
638 0 : replytime: ts,
639 0 : shard_number: timeline.tenant_shard_id.shard_number.0 as u32,
640 0 : };
641 0 :
642 0 : debug!("neon_status_update {status_update:?}");
643 :
644 0 : let mut data = BytesMut::new();
645 0 : status_update.serialize(&mut data);
646 0 : physical_stream
647 0 : .as_mut()
648 0 : .zenith_status_update(data.len() as u64, &data)
649 0 : .await?;
650 0 : }
651 : }
652 :
653 0 : Ok(())
654 0 : }
655 :
656 : /// Data returned from the postgres `IDENTIFY_SYSTEM` command
657 : ///
658 : /// See the [postgres docs] for more details.
659 : ///
660 : /// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html
661 : #[derive(Debug)]
662 : // As of nightly 2021-09-11, fields that are only read by the type's `Debug` impl still count as
663 : // unused. Relevant issue: https://github.com/rust-lang/rust/issues/88900
664 : #[allow(dead_code)]
665 : struct IdentifySystem {
666 : systemid: u64,
667 : timeline: u32,
668 : xlogpos: PgLsn,
669 : dbname: Option<String>,
670 : }
671 :
672 : /// There was a problem parsing the response to
673 : /// a postgres IDENTIFY_SYSTEM command.
674 0 : #[derive(Debug, thiserror::Error)]
675 : #[error("IDENTIFY_SYSTEM parse error")]
676 : struct IdentifyError;
677 :
678 : /// Run the postgres `IDENTIFY_SYSTEM` command
679 0 : async fn identify_system(client: &Client) -> anyhow::Result<IdentifySystem> {
680 0 : let query_str = "IDENTIFY_SYSTEM";
681 0 : let response = client.simple_query(query_str).await?;
682 :
683 : // get(N) from row, then parse it as some destination type.
684 0 : fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>
685 0 : where
686 0 : T: FromStr,
687 0 : {
688 0 : let val = row.get(idx).ok_or(IdentifyError)?;
689 0 : val.parse::<T>().or(Err(IdentifyError))
690 0 : }
691 :
692 : // extract the row contents into an IdentifySystem struct.
693 : // written as a closure so I can use ? for Option here.
694 0 : if let Some(SimpleQueryMessage::Row(first_row)) = response.first() {
695 : Ok(IdentifySystem {
696 0 : systemid: get_parse(first_row, 0)?,
697 0 : timeline: get_parse(first_row, 1)?,
698 0 : xlogpos: get_parse(first_row, 2)?,
699 0 : dbname: get_parse(first_row, 3).ok(),
700 : })
701 : } else {
702 0 : Err(IdentifyError.into())
703 : }
704 0 : }
|