Line data Source code
1 : //! Actual Postgres connection handler to stream WAL to the server.
2 :
3 : use std::error::Error;
4 : use std::pin::pin;
5 : use std::str::FromStr;
6 : use std::sync::Arc;
7 : use std::time::{Duration, SystemTime};
8 :
9 : use anyhow::{Context, anyhow};
10 : use bytes::BytesMut;
11 : use chrono::{NaiveDateTime, Utc};
12 : use fail::fail_point;
13 : use futures::StreamExt;
14 : use postgres_backend::is_expected_io_error;
15 : use postgres_connection::PgConnectionConfig;
16 : use postgres_ffi::WAL_SEGMENT_SIZE;
17 : use postgres_ffi::v14::xlog_utils::normalize_lsn;
18 : use postgres_ffi::waldecoder::WalDecodeError;
19 : use postgres_protocol::message::backend::ReplicationMessage;
20 : use postgres_types::PgLsn;
21 : use tokio::sync::watch;
22 : use tokio::{select, time};
23 : use tokio_postgres::error::SqlState;
24 : use tokio_postgres::replication::ReplicationStream;
25 : use tokio_postgres::{Client, SimpleQueryMessage, SimpleQueryRow};
26 : use tokio_util::sync::CancellationToken;
27 : use tracing::{Instrument, debug, error, info, trace, warn};
28 : use utils::critical_timeline;
29 : use utils::id::NodeId;
30 : use utils::lsn::Lsn;
31 : use utils::pageserver_feedback::PageserverFeedback;
32 : use utils::postgres_client::PostgresClientProtocol;
33 : use utils::sync::gate::GateError;
34 : use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecords};
35 : use wal_decoder::wire_format::FromWireFormat;
36 :
37 : use super::TaskStateUpdate;
38 : use crate::context::RequestContext;
39 : use crate::metrics::{LIVE_CONNECTIONS, WAL_INGEST, WALRECEIVER_STARTED_CONNECTIONS};
40 : use crate::pgdatadir_mapping::DatadirModification;
41 : use crate::task_mgr::{TaskKind, WALRECEIVER_RUNTIME};
42 : use crate::tenant::{
43 : Timeline, WalReceiverInfo, debug_assert_current_span_has_tenant_and_timeline_id,
44 : };
45 : use crate::walingest::WalIngest;
46 :
47 : /// Status of the connection.
48 : #[derive(Debug, Clone, Copy)]
49 : pub(super) struct WalConnectionStatus {
50 : /// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
51 : pub is_connected: bool,
52 : /// Defines a healthy connection as one on which pageserver received WAL from safekeeper
53 : /// and is able to process it in walingest without errors.
54 : pub has_processed_wal: bool,
55 : /// Connection establishment time or the timestamp of a latest connection message received.
56 : pub latest_connection_update: NaiveDateTime,
57 : /// Time of the latest WAL message received.
58 : pub latest_wal_update: NaiveDateTime,
59 : /// Latest WAL update contained WAL up to this LSN. Next WAL message with start from that LSN.
60 : pub streaming_lsn: Option<Lsn>,
61 : /// Latest commit_lsn received from the safekeeper. Can be zero if no message has been received yet.
62 : pub commit_lsn: Option<Lsn>,
63 : /// The node it is connected to
64 : pub node: NodeId,
65 : }
66 :
67 : pub(super) enum WalReceiverError {
68 : /// An error of a type that does not indicate an issue, e.g. a connection closing
69 : ExpectedSafekeeperError(tokio_postgres::Error),
70 : /// An "error" message that carries a SUCCESSFUL_COMPLETION status code. Carries
71 : /// the message part of the original postgres error
72 : SuccessfulCompletion(String),
73 : /// Generic error
74 : Other(anyhow::Error),
75 : ClosedGate,
76 : Cancelled,
77 : }
78 :
79 : impl From<tokio_postgres::Error> for WalReceiverError {
80 0 : fn from(err: tokio_postgres::Error) -> Self {
81 0 : if let Some(dberror) = err.as_db_error().filter(|db_error| {
82 0 : db_error.code() == &SqlState::SUCCESSFUL_COMPLETION
83 0 : && db_error.message().contains("ending streaming")
84 0 : }) {
85 : // Strip the outer DbError, which carries a misleading "error" severity
86 0 : Self::SuccessfulCompletion(dberror.message().to_string())
87 0 : } else if err.is_closed()
88 0 : || err
89 0 : .source()
90 0 : .and_then(|source| source.downcast_ref::<std::io::Error>())
91 0 : .map(is_expected_io_error)
92 0 : .unwrap_or(false)
93 : {
94 0 : Self::ExpectedSafekeeperError(err)
95 : } else {
96 0 : Self::Other(anyhow::Error::new(err))
97 : }
98 0 : }
99 : }
100 :
101 : impl From<anyhow::Error> for WalReceiverError {
102 0 : fn from(err: anyhow::Error) -> Self {
103 0 : Self::Other(err)
104 0 : }
105 : }
106 :
107 : impl From<WalDecodeError> for WalReceiverError {
108 0 : fn from(err: WalDecodeError) -> Self {
109 0 : Self::Other(anyhow::Error::new(err))
110 0 : }
111 : }
112 :
113 : /// Open a connection to the given safekeeper and receive WAL, sending back progress
114 : /// messages as we go.
115 : #[allow(clippy::too_many_arguments)]
116 0 : pub(super) async fn handle_walreceiver_connection(
117 0 : timeline: Arc<Timeline>,
118 0 : protocol: PostgresClientProtocol,
119 0 : wal_source_connconf: PgConnectionConfig,
120 0 : events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
121 0 : cancellation: CancellationToken,
122 0 : connect_timeout: Duration,
123 0 : ctx: RequestContext,
124 0 : safekeeper_node: NodeId,
125 0 : ingest_batch_size: u64,
126 0 : validate_wal_contiguity: bool,
127 0 : ) -> Result<(), WalReceiverError> {
128 0 : debug_assert_current_span_has_tenant_and_timeline_id();
129 :
130 : // prevent timeline shutdown from finishing until we have exited
131 0 : let _guard = timeline.gate.enter().map_err(|e| match e {
132 0 : GateError::GateClosed => WalReceiverError::ClosedGate,
133 0 : })?;
134 : // This function spawns a side-car task (WalReceiverConnectionPoller).
135 : // Get its gate guard now as well.
136 0 : let poller_guard = timeline.gate.enter().map_err(|e| match e {
137 0 : GateError::GateClosed => WalReceiverError::ClosedGate,
138 0 : })?;
139 :
140 0 : WALRECEIVER_STARTED_CONNECTIONS.inc();
141 :
142 : // Connect to the database in replication mode.
143 0 : info!("connecting to {wal_source_connconf:?}");
144 :
145 0 : let (replication_client, connection) = {
146 0 : let mut config = wal_source_connconf.to_tokio_postgres_config();
147 0 : config.application_name(format!("pageserver-{}", timeline.conf.id.0).as_str());
148 0 : config.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
149 0 : match time::timeout(connect_timeout, config.connect(tokio_postgres::NoTls)).await {
150 0 : Ok(client_and_conn) => client_and_conn?,
151 0 : Err(_elapsed) => {
152 : // Timing out to connect to a safekeeper node could happen long time, due to
153 : // many reasons that pageserver cannot control.
154 : // Do not produce an error, but make it visible, that timeouts happen by logging the `event.
155 0 : info!(
156 0 : "Timed out while waiting {connect_timeout:?} for walreceiver connection to open"
157 : );
158 0 : return Ok(());
159 : }
160 : }
161 : };
162 :
163 0 : debug!("connected!");
164 0 : let mut connection_status = WalConnectionStatus {
165 0 : is_connected: true,
166 0 : has_processed_wal: false,
167 0 : latest_connection_update: Utc::now().naive_utc(),
168 0 : latest_wal_update: Utc::now().naive_utc(),
169 0 : streaming_lsn: None,
170 0 : commit_lsn: None,
171 0 : node: safekeeper_node,
172 0 : };
173 0 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
174 0 : warn!(
175 0 : "Wal connection event listener dropped right after connection init, aborting the connection: {e}"
176 : );
177 0 : return Ok(());
178 0 : }
179 :
180 : // The connection object performs the actual communication with the database,
181 : // so spawn it off to run on its own. It shouldn't outlive this function, but,
182 : // due to lack of async drop, we can't enforce that. However, we ensure that
183 : // 1. it is sensitive to `cancellation` and
184 : // 2. holds the Timeline gate open so that after timeline shutdown,
185 : // we know this task is gone.
186 0 : let _connection_ctx = ctx.detached_child(
187 0 : TaskKind::WalReceiverConnectionPoller,
188 0 : ctx.download_behavior(),
189 : );
190 0 : let connection_cancellation = cancellation.clone();
191 0 : WALRECEIVER_RUNTIME.spawn(
192 0 : async move {
193 0 : debug_assert_current_span_has_tenant_and_timeline_id();
194 0 : select! {
195 0 : connection_result = connection => match connection_result {
196 0 : Ok(()) => debug!("Walreceiver db connection closed"),
197 0 : Err(connection_error) => {
198 0 : match WalReceiverError::from(connection_error) {
199 0 : WalReceiverError::ExpectedSafekeeperError(_) => {
200 0 : // silence, because most likely we've already exited the outer call
201 0 : // with a similar error.
202 0 : },
203 0 : WalReceiverError::SuccessfulCompletion(_) => {}
204 : WalReceiverError::Cancelled => {
205 0 : debug!("Connection cancelled")
206 : }
207 0 : WalReceiverError::ClosedGate => {
208 0 : // doesn't happen at runtime
209 0 : }
210 0 : WalReceiverError::Other(err) => {
211 0 : warn!("Connection aborted: {err:#}")
212 : }
213 : }
214 : }
215 : },
216 0 : _ = connection_cancellation.cancelled() => debug!("Connection cancelled"),
217 : }
218 0 : drop(poller_guard);
219 0 : }
220 : // Enrich the log lines emitted by this closure with meaningful context.
221 : // TODO: technically, this task outlives the surrounding function, so, the
222 : // spans won't be properly nested.
223 0 : .instrument(tracing::info_span!("poller")),
224 : );
225 :
226 0 : let _guard = LIVE_CONNECTIONS
227 0 : .with_label_values(&["wal_receiver"])
228 0 : .guard();
229 :
230 0 : let identify = identify_system(&replication_client).await?;
231 0 : info!("{identify:?}");
232 :
233 0 : let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
234 0 : let mut caught_up = false;
235 :
236 0 : connection_status.latest_connection_update = Utc::now().naive_utc();
237 0 : connection_status.latest_wal_update = Utc::now().naive_utc();
238 0 : connection_status.commit_lsn = Some(end_of_wal);
239 0 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
240 0 : warn!(
241 0 : "Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}"
242 : );
243 0 : return Ok(());
244 0 : }
245 :
246 : //
247 : // Start streaming the WAL, from where we left off previously.
248 : //
249 : // If we had previously received WAL up to some point in the middle of a WAL record, we
250 : // better start from the end of last full WAL record, not in the middle of one.
251 0 : let mut last_rec_lsn = timeline.get_last_record_lsn();
252 0 : let mut startpoint = last_rec_lsn;
253 :
254 0 : if startpoint == Lsn(0) {
255 0 : return Err(WalReceiverError::Other(anyhow!("No previous WAL position")));
256 0 : }
257 :
258 : // There might be some padding after the last full record, skip it.
259 0 : startpoint += startpoint.calc_padding(8u32);
260 :
261 : // If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
262 : // for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
263 : //. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
264 : // but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
265 : // header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
266 : // to the safekeepers.
267 0 : startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
268 :
269 0 : info!(
270 0 : "last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}..."
271 : );
272 :
273 0 : let query = format!("START_REPLICATION PHYSICAL {startpoint}");
274 :
275 0 : let copy_stream = replication_client.copy_both_simple(&query).await?;
276 0 : let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
277 :
278 0 : let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
279 0 : .await
280 0 : .map_err(|e| match e.kind {
281 0 : crate::walingest::WalIngestErrorKind::Cancelled => WalReceiverError::Cancelled,
282 0 : _ => WalReceiverError::Other(e.into()),
283 0 : })?;
284 :
285 0 : let (format, compression) = match protocol {
286 : PostgresClientProtocol::Interpreted {
287 0 : format,
288 0 : compression,
289 0 : } => (format, compression),
290 : PostgresClientProtocol::Vanilla => {
291 0 : return Err(WalReceiverError::Other(anyhow!(
292 0 : "Vanilla WAL receiver protocol is no longer supported for ingest"
293 0 : )));
294 : }
295 : };
296 :
297 0 : let mut expected_wal_start = startpoint;
298 0 : while let Some(replication_message) = {
299 0 : select! {
300 : biased;
301 0 : _ = cancellation.cancelled() => {
302 0 : debug!("walreceiver interrupted");
303 0 : None
304 : }
305 0 : replication_message = physical_stream.next() => replication_message,
306 : }
307 : } {
308 0 : let replication_message = replication_message?;
309 :
310 0 : let now = Utc::now().naive_utc();
311 0 : let last_rec_lsn_before_msg = last_rec_lsn;
312 :
313 : // Update the connection status before processing the message. If the message processing
314 : // fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
315 0 : match &replication_message {
316 0 : ReplicationMessage::PrimaryKeepAlive(keepalive) => {
317 0 : connection_status.latest_connection_update = now;
318 0 : connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
319 0 : }
320 0 : ReplicationMessage::RawInterpretedWalRecords(raw) => {
321 0 : connection_status.latest_connection_update = now;
322 0 : if !raw.data().is_empty() {
323 0 : connection_status.latest_wal_update = now;
324 0 : }
325 :
326 0 : connection_status.commit_lsn = Some(Lsn::from(raw.commit_lsn()));
327 0 : connection_status.streaming_lsn = Some(Lsn::from(raw.streaming_lsn()));
328 : }
329 0 : &_ => {}
330 : };
331 0 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
332 0 : warn!("Wal connection event listener dropped, aborting the connection: {e}");
333 0 : return Ok(());
334 0 : }
335 :
336 0 : let status_update = match replication_message {
337 0 : ReplicationMessage::RawInterpretedWalRecords(raw) => {
338 0 : WAL_INGEST.bytes_received.inc_by(raw.data().len() as u64);
339 :
340 0 : let mut uncommitted_records = 0;
341 :
342 : // This is the end LSN of the raw WAL from which the records
343 : // were interpreted.
344 0 : let streaming_lsn = Lsn::from(raw.streaming_lsn());
345 :
346 0 : let batch = InterpretedWalRecords::from_wire(raw.data(), format, compression)
347 0 : .await
348 0 : .with_context(|| {
349 0 : anyhow::anyhow!(
350 0 : "Failed to deserialize interpreted records ending at LSN {streaming_lsn}"
351 : )
352 0 : })?;
353 :
354 : // Guard against WAL gaps. If the start LSN of the PG WAL section
355 : // from which the interpreted records were extracted, doesn't match
356 : // the end of the previous batch (or the starting point for the first batch),
357 : // then kill this WAL receiver connection and start a new one.
358 0 : if validate_wal_contiguity {
359 0 : if let Some(raw_wal_start_lsn) = batch.raw_wal_start_lsn {
360 0 : match raw_wal_start_lsn.cmp(&expected_wal_start) {
361 : std::cmp::Ordering::Greater => {
362 0 : let msg = format!(
363 0 : "Gap in streamed WAL: [{expected_wal_start}, {raw_wal_start_lsn}"
364 : );
365 0 : critical_timeline!(
366 0 : timeline.tenant_shard_id,
367 0 : timeline.timeline_id,
368 0 : "{msg}"
369 : );
370 0 : return Err(WalReceiverError::Other(anyhow!(msg)));
371 : }
372 : std::cmp::Ordering::Less => {
373 : // Other shards are reading WAL behind us.
374 : // This is valid, but check that we received records
375 : // that we haven't seen before.
376 0 : if let Some(first_rec) = batch.records.first() {
377 0 : if first_rec.next_record_lsn < last_rec_lsn {
378 0 : let msg = format!(
379 0 : "Received record with next_record_lsn multiple times ({} < {})",
380 : first_rec.next_record_lsn, expected_wal_start
381 : );
382 0 : critical_timeline!(
383 0 : timeline.tenant_shard_id,
384 0 : timeline.timeline_id,
385 0 : "{msg}"
386 : );
387 0 : return Err(WalReceiverError::Other(anyhow!(msg)));
388 0 : }
389 0 : }
390 : }
391 0 : std::cmp::Ordering::Equal => {}
392 : }
393 0 : }
394 0 : }
395 :
396 : let InterpretedWalRecords {
397 0 : records,
398 0 : next_record_lsn,
399 : raw_wal_start_lsn: _,
400 0 : } = batch;
401 :
402 0 : tracing::debug!(
403 0 : "Received WAL up to {} with next_record_lsn={}",
404 : streaming_lsn,
405 : next_record_lsn
406 : );
407 :
408 : // We start the modification at 0 because each interpreted record
409 : // advances it to its end LSN. 0 is just an initialization placeholder.
410 0 : let mut modification = timeline.begin_modification(Lsn(0));
411 :
412 0 : async fn commit(
413 0 : modification: &mut DatadirModification<'_>,
414 0 : ctx: &RequestContext,
415 0 : uncommitted: &mut u64,
416 0 : ) -> anyhow::Result<()> {
417 0 : let stats = modification.stats();
418 0 : modification.commit(ctx).await?;
419 0 : WAL_INGEST.records_committed.inc_by(*uncommitted);
420 0 : WAL_INGEST.inc_values_committed(&stats);
421 0 : *uncommitted = 0;
422 0 : Ok(())
423 0 : }
424 :
425 0 : if !records.is_empty() {
426 0 : timeline
427 0 : .metrics
428 0 : .wal_records_received
429 0 : .inc_by(records.len() as u64);
430 0 : }
431 :
432 0 : for interpreted in records {
433 0 : if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
434 0 : && uncommitted_records > 0
435 : {
436 0 : commit(&mut modification, &ctx, &mut uncommitted_records).await?;
437 0 : }
438 :
439 0 : let local_next_record_lsn = interpreted.next_record_lsn;
440 :
441 0 : if interpreted.is_observed() {
442 0 : WAL_INGEST.records_observed.inc();
443 0 : }
444 :
445 0 : walingest
446 0 : .ingest_record(interpreted, &mut modification, &ctx)
447 0 : .await
448 0 : .with_context(|| {
449 0 : format!("could not ingest record at {local_next_record_lsn}")
450 0 : })
451 0 : .inspect_err(|err| {
452 : // TODO: we can't differentiate cancellation errors with
453 : // anyhow::Error, so just ignore it if we're cancelled.
454 0 : if !cancellation.is_cancelled() && !timeline.is_stopping() {
455 0 : critical_timeline!(
456 0 : timeline.tenant_shard_id,
457 0 : timeline.timeline_id,
458 0 : "{err:?}"
459 : );
460 0 : }
461 0 : })?;
462 :
463 0 : uncommitted_records += 1;
464 :
465 : // FIXME: this cannot be made pausable_failpoint without fixing the
466 : // failpoint library; in tests, the added amount of debugging will cause us
467 : // to timeout the tests.
468 0 : fail_point!("walreceiver-after-ingest");
469 :
470 : // Commit every ingest_batch_size records. Even if we filtered out
471 : // all records, we still need to call commit to advance the LSN.
472 0 : if uncommitted_records >= ingest_batch_size
473 0 : || modification.approx_pending_bytes()
474 0 : > DatadirModification::MAX_PENDING_BYTES
475 : {
476 0 : commit(&mut modification, &ctx, &mut uncommitted_records).await?;
477 0 : }
478 : }
479 :
480 : // Records might have been filtered out on the safekeeper side, but we still
481 : // need to advance last record LSN on all shards. If we've not ingested the latest
482 : // record, then set the LSN of the modification past it. This way all shards
483 : // advance their last record LSN at the same time.
484 0 : let needs_last_record_lsn_advance = if next_record_lsn > modification.get_lsn() {
485 0 : modification.set_lsn(next_record_lsn).unwrap();
486 0 : true
487 : } else {
488 0 : false
489 : };
490 :
491 0 : if uncommitted_records > 0 || needs_last_record_lsn_advance {
492 : // Commit any uncommitted records
493 0 : commit(&mut modification, &ctx, &mut uncommitted_records).await?;
494 0 : }
495 :
496 0 : if !caught_up && streaming_lsn >= end_of_wal {
497 0 : info!("caught up at LSN {streaming_lsn}");
498 0 : caught_up = true;
499 0 : }
500 :
501 0 : tracing::debug!(
502 0 : "Ingested WAL up to {streaming_lsn}. Last record LSN is {}",
503 0 : timeline.get_last_record_lsn()
504 : );
505 :
506 0 : last_rec_lsn = next_record_lsn;
507 0 : expected_wal_start = streaming_lsn;
508 :
509 0 : Some(streaming_lsn)
510 : }
511 :
512 0 : ReplicationMessage::PrimaryKeepAlive(keepalive) => {
513 0 : let wal_end = keepalive.wal_end();
514 0 : let timestamp = keepalive.timestamp();
515 0 : let reply_requested = keepalive.reply() != 0;
516 :
517 0 : trace!(
518 0 : "received PrimaryKeepAlive(wal_end: {wal_end}, timestamp: {timestamp:?} reply: {reply_requested})"
519 : );
520 :
521 0 : if reply_requested {
522 0 : Some(last_rec_lsn)
523 : } else {
524 0 : None
525 : }
526 : }
527 :
528 0 : _ => None,
529 : };
530 :
531 0 : if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
532 : // We have successfully processed at least one WAL record.
533 0 : connection_status.has_processed_wal = true;
534 0 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
535 0 : warn!("Wal connection event listener dropped, aborting the connection: {e}");
536 0 : return Ok(());
537 0 : }
538 0 : }
539 :
540 0 : if let Some(last_lsn) = status_update {
541 0 : let timeline_remote_consistent_lsn = timeline
542 0 : .get_remote_consistent_lsn_visible()
543 0 : .unwrap_or(Lsn(0));
544 :
545 : // The last LSN we processed. It is not guaranteed to survive pageserver crash.
546 0 : let last_received_lsn = last_lsn;
547 : // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
548 0 : let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
549 : // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
550 : // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
551 0 : let remote_consistent_lsn = timeline_remote_consistent_lsn;
552 0 : let ts = SystemTime::now();
553 :
554 : // Update the status about what we just received. This is shown in the mgmt API.
555 0 : let last_received_wal = WalReceiverInfo {
556 0 : wal_source_connconf: wal_source_connconf.clone(),
557 0 : last_received_msg_lsn: last_lsn,
558 0 : last_received_msg_ts: ts
559 0 : .duration_since(SystemTime::UNIX_EPOCH)
560 0 : .expect("Received message time should be before UNIX EPOCH!")
561 0 : .as_micros(),
562 0 : };
563 0 : *timeline.last_received_wal.lock().unwrap() = Some(last_received_wal);
564 :
565 : // Send the replication feedback message.
566 : // Regular standby_status_update fields are put into this message.
567 0 : let current_timeline_size = if timeline.tenant_shard_id.is_shard_zero() {
568 0 : timeline
569 0 : .get_current_logical_size(
570 0 : crate::tenant::timeline::GetLogicalSizePriority::User,
571 0 : &ctx,
572 0 : )
573 : // FIXME: https://github.com/neondatabase/neon/issues/5963
574 0 : .size_dont_care_about_accuracy()
575 : } else {
576 : // Non-zero shards send zero for logical size. The safekeeper will ignore
577 : // this number. This is because in a sharded tenant, only shard zero maintains
578 : // accurate logical size.
579 0 : 0
580 : };
581 :
582 0 : let status_update = PageserverFeedback {
583 0 : current_timeline_size,
584 0 : last_received_lsn,
585 0 : disk_consistent_lsn,
586 0 : remote_consistent_lsn,
587 0 : replytime: ts,
588 0 : shard_number: timeline.tenant_shard_id.shard_number.0 as u32,
589 0 : };
590 :
591 0 : debug!("neon_status_update {status_update:?}");
592 :
593 0 : let mut data = BytesMut::new();
594 0 : status_update.serialize(&mut data);
595 0 : physical_stream
596 0 : .as_mut()
597 0 : .zenith_status_update(data.len() as u64, &data)
598 0 : .await?;
599 0 : }
600 : }
601 :
602 0 : Ok(())
603 0 : }
604 :
605 : /// Data returned from the postgres `IDENTIFY_SYSTEM` command
606 : ///
607 : /// See the [postgres docs] for more details.
608 : ///
609 : /// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html
610 : #[derive(Debug)]
611 : // As of nightly 2021-09-11, fields that are only read by the type's `Debug` impl still count as
612 : // unused. Relevant issue: https://github.com/rust-lang/rust/issues/88900
613 : #[allow(dead_code)]
614 : struct IdentifySystem {
615 : systemid: u64,
616 : timeline: u32,
617 : xlogpos: PgLsn,
618 : dbname: Option<String>,
619 : }
620 :
621 : /// There was a problem parsing the response to
622 : /// a postgres IDENTIFY_SYSTEM command.
623 : #[derive(Debug, thiserror::Error)]
624 : #[error("IDENTIFY_SYSTEM parse error")]
625 : struct IdentifyError;
626 :
627 : /// Run the postgres `IDENTIFY_SYSTEM` command
628 0 : async fn identify_system(client: &Client) -> anyhow::Result<IdentifySystem> {
629 0 : let query_str = "IDENTIFY_SYSTEM";
630 0 : let response = client.simple_query(query_str).await?;
631 :
632 : // get(N) from row, then parse it as some destination type.
633 0 : fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>
634 0 : where
635 0 : T: FromStr,
636 : {
637 0 : let val = row.get(idx).ok_or(IdentifyError)?;
638 0 : val.parse::<T>().or(Err(IdentifyError))
639 0 : }
640 :
641 : // extract the row contents into an IdentifySystem struct.
642 : // written as a closure so I can use ? for Option here.
643 0 : if let Some(SimpleQueryMessage::Row(first_row)) = response.first() {
644 : Ok(IdentifySystem {
645 0 : systemid: get_parse(first_row, 0)?,
646 0 : timeline: get_parse(first_row, 1)?,
647 0 : xlogpos: get_parse(first_row, 2)?,
648 0 : dbname: get_parse(first_row, 3).ok(),
649 : })
650 : } else {
651 0 : Err(IdentifyError.into())
652 : }
653 0 : }
|