Line data Source code
1 : //! Actual Postgres connection handler to stream WAL to the server.
2 :
3 : use std::{
4 : error::Error,
5 : pin::pin,
6 : str::FromStr,
7 : sync::Arc,
8 : time::{Duration, SystemTime},
9 : };
10 :
11 : use anyhow::{anyhow, Context};
12 : use bytes::BytesMut;
13 : use chrono::{NaiveDateTime, Utc};
14 : use fail::fail_point;
15 : use futures::StreamExt;
16 : use postgres::{error::SqlState, SimpleQueryMessage, SimpleQueryRow};
17 : use postgres_ffi::WAL_SEGMENT_SIZE;
18 : use postgres_ffi::{v14::xlog_utils::normalize_lsn, waldecoder::WalDecodeError};
19 : use postgres_protocol::message::backend::ReplicationMessage;
20 : use postgres_types::PgLsn;
21 : use tokio::{select, sync::watch, time};
22 : use tokio_postgres::{replication::ReplicationStream, Client};
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::{debug, error, info, trace, warn, Instrument};
25 :
26 : use super::TaskStateUpdate;
27 : use crate::{
28 : context::RequestContext,
29 : metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS, WAL_INGEST},
30 : task_mgr,
31 : task_mgr::TaskKind,
32 : task_mgr::WALRECEIVER_RUNTIME,
33 : tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
34 : walingest::WalIngest,
35 : walrecord::DecodedWALRecord,
36 : };
37 : use postgres_backend::is_expected_io_error;
38 : use postgres_connection::PgConnectionConfig;
39 : use postgres_ffi::waldecoder::WalStreamDecoder;
40 : use utils::pageserver_feedback::PageserverFeedback;
41 : use utils::{id::NodeId, lsn::Lsn};
42 :
43 : /// Status of the connection.
44 684001 : #[derive(Debug, Clone, Copy)]
45 : pub(super) struct WalConnectionStatus {
46 : /// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
47 : pub is_connected: bool,
48 : /// Defines a healthy connection as one on which pageserver received WAL from safekeeper
49 : /// and is able to process it in walingest without errors.
50 : pub has_processed_wal: bool,
51 : /// Connection establishment time or the timestamp of a latest connection message received.
52 : pub latest_connection_update: NaiveDateTime,
53 : /// Time of the latest WAL message received.
54 : pub latest_wal_update: NaiveDateTime,
55 : /// Latest WAL update contained WAL up to this LSN. Next WAL message with start from that LSN.
56 : pub streaming_lsn: Option<Lsn>,
57 : /// Latest commit_lsn received from the safekeeper. Can be zero if no message has been received yet.
58 : pub commit_lsn: Option<Lsn>,
59 : /// The node it is connected to
60 : pub node: NodeId,
61 : }
62 :
63 : pub(super) enum WalReceiverError {
64 : /// An error of a type that does not indicate an issue, e.g. a connection closing
65 : ExpectedSafekeeperError(postgres::Error),
66 : /// An "error" message that carries a SUCCESSFUL_COMPLETION status code. Carries
67 : /// the message part of the original postgres error
68 : SuccessfulCompletion(String),
69 : /// Generic error
70 : Other(anyhow::Error),
71 : }
72 :
73 : impl From<tokio_postgres::Error> for WalReceiverError {
74 1856 : fn from(err: tokio_postgres::Error) -> Self {
75 1856 : if let Some(dberror) = err.as_db_error().filter(|db_error| {
76 43 : db_error.code() == &SqlState::SUCCESSFUL_COMPLETION
77 43 : && db_error.message().contains("ending streaming")
78 1856 : }) {
79 : // Strip the outer DbError, which carries a misleading "error" severity
80 43 : Self::SuccessfulCompletion(dberror.message().to_string())
81 1813 : } else if err.is_closed()
82 1006 : || err
83 1006 : .source()
84 1006 : .and_then(|source| source.downcast_ref::<std::io::Error>())
85 1006 : .map(is_expected_io_error)
86 1006 : .unwrap_or(false)
87 : {
88 1802 : Self::ExpectedSafekeeperError(err)
89 : } else {
90 11 : Self::Other(anyhow::Error::new(err))
91 : }
92 1856 : }
93 : }
94 :
95 : impl From<anyhow::Error> for WalReceiverError {
96 19 : fn from(err: anyhow::Error) -> Self {
97 19 : Self::Other(err)
98 19 : }
99 : }
100 :
101 : impl From<WalDecodeError> for WalReceiverError {
102 0 : fn from(err: WalDecodeError) -> Self {
103 0 : Self::Other(anyhow::Error::new(err))
104 0 : }
105 : }
106 :
107 : /// Open a connection to the given safekeeper and receive WAL, sending back progress
108 : /// messages as we go.
109 : #[allow(clippy::too_many_arguments)]
110 1751 : pub(super) async fn handle_walreceiver_connection(
111 1751 : timeline: Arc<Timeline>,
112 1751 : wal_source_connconf: PgConnectionConfig,
113 1751 : events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
114 1751 : cancellation: CancellationToken,
115 1751 : connect_timeout: Duration,
116 1751 : ctx: RequestContext,
117 1751 : node: NodeId,
118 1751 : ingest_batch_size: u64,
119 1751 : ) -> Result<(), WalReceiverError> {
120 1751 : debug_assert_current_span_has_tenant_and_timeline_id();
121 1751 :
122 1751 : WALRECEIVER_STARTED_CONNECTIONS.inc();
123 :
124 : // Connect to the database in replication mode.
125 1751 : info!("connecting to {wal_source_connconf:?}");
126 :
127 763 : let (replication_client, connection) = {
128 1751 : let mut config = wal_source_connconf.to_tokio_postgres_config();
129 1751 : config.application_name("pageserver");
130 1751 : config.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
131 2478 : match time::timeout(connect_timeout, config.connect(postgres::NoTls)).await {
132 1751 : Ok(client_and_conn) => client_and_conn?,
133 0 : Err(_elapsed) => {
134 : // Timing out to connect to a safekeeper node could happen long time, due to
135 : // many reasons that pageserver cannot control.
136 : // Do not produce an error, but make it visible, that timeouts happen by logging the `event.
137 0 : info!("Timed out while waiting {connect_timeout:?} for walreceiver connection to open");
138 0 : return Ok(());
139 : }
140 : }
141 : };
142 :
143 0 : debug!("connected!");
144 763 : let mut connection_status = WalConnectionStatus {
145 763 : is_connected: true,
146 763 : has_processed_wal: false,
147 763 : latest_connection_update: Utc::now().naive_utc(),
148 763 : latest_wal_update: Utc::now().naive_utc(),
149 763 : streaming_lsn: None,
150 763 : commit_lsn: None,
151 763 : node,
152 763 : };
153 763 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
154 0 : warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
155 0 : return Ok(());
156 763 : }
157 763 :
158 763 : // The connection object performs the actual communication with the database,
159 763 : // so spawn it off to run on its own.
160 763 : let _connection_ctx = ctx.detached_child(
161 763 : TaskKind::WalReceiverConnectionPoller,
162 763 : ctx.download_behavior(),
163 763 : );
164 763 : let connection_cancellation = cancellation.clone();
165 763 : task_mgr::spawn(
166 763 : WALRECEIVER_RUNTIME.handle(),
167 763 : TaskKind::WalReceiverConnectionPoller,
168 763 : Some(timeline.tenant_shard_id),
169 763 : Some(timeline.timeline_id),
170 763 : "walreceiver connection",
171 : false,
172 763 : async move {
173 763 : debug_assert_current_span_has_tenant_and_timeline_id();
174 763 :
175 1239040 : select! {
176 423 : connection_result = connection => match connection_result {
177 0 : Ok(()) => debug!("Walreceiver db connection closed"),
178 : Err(connection_error) => {
179 : match WalReceiverError::from(connection_error) {
180 : WalReceiverError::ExpectedSafekeeperError(_) => {
181 : // silence, because most likely we've already exited the outer call
182 : // with a similar error.
183 : },
184 : WalReceiverError::SuccessfulCompletion(_) => {}
185 : WalReceiverError::Other(err) => {
186 11 : warn!("Connection aborted: {err:#}")
187 : }
188 : }
189 : }
190 : },
191 0 : _ = connection_cancellation.cancelled() => debug!("Connection cancelled"),
192 : }
193 719 : Ok(())
194 719 : }
195 : // Enrich the log lines emitted by this closure with meaningful context.
196 : // TODO: technically, this task outlives the surrounding function, so, the
197 : // spans won't be properly nested.
198 763 : .instrument(tracing::info_span!("poller")),
199 : );
200 :
201 : // Immediately increment the gauge, then create a job to decrement it on task exit.
202 : // One of the pros of `defer!` is that this will *most probably*
203 : // get called, even in presence of panics.
204 763 : let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
205 763 : gauge.inc();
206 719 : scopeguard::defer! {
207 719 : gauge.dec();
208 719 : }
209 :
210 763 : let identify = identify_system(&replication_client).await?;
211 761 : info!("{identify:?}");
212 :
213 761 : let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
214 761 : let mut caught_up = false;
215 761 :
216 761 : connection_status.latest_connection_update = Utc::now().naive_utc();
217 761 : connection_status.latest_wal_update = Utc::now().naive_utc();
218 761 : connection_status.commit_lsn = Some(end_of_wal);
219 761 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
220 0 : warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
221 0 : return Ok(());
222 761 : }
223 761 :
224 761 : //
225 761 : // Start streaming the WAL, from where we left off previously.
226 761 : //
227 761 : // If we had previously received WAL up to some point in the middle of a WAL record, we
228 761 : // better start from the end of last full WAL record, not in the middle of one.
229 761 : let mut last_rec_lsn = timeline.get_last_record_lsn();
230 761 : let mut startpoint = last_rec_lsn;
231 761 :
232 761 : if startpoint == Lsn(0) {
233 0 : return Err(WalReceiverError::Other(anyhow!("No previous WAL position")));
234 761 : }
235 761 :
236 761 : // There might be some padding after the last full record, skip it.
237 761 : startpoint += startpoint.calc_padding(8u32);
238 761 :
239 761 : // If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
240 761 : // for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
241 761 : //. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
242 761 : // but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
243 761 : // header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
244 761 : // to the safekeepers.
245 761 : startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
246 :
247 761 : info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
248 :
249 761 : let query = format!("START_REPLICATION PHYSICAL {startpoint}");
250 :
251 761 : let copy_stream = replication_client.copy_both_simple(&query).await?;
252 760 : let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
253 760 :
254 760 : let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
255 :
256 760 : let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
257 :
258 752578 : while let Some(replication_message) = {
259 752832 : select! {
260 : _ = cancellation.cancelled() => {
261 0 : debug!("walreceiver interrupted");
262 : None
263 : }
264 752578 : replication_message = physical_stream.next() => replication_message,
265 : }
266 : } {
267 752578 : let replication_message = replication_message?;
268 :
269 752124 : let now = Utc::now().naive_utc();
270 752124 : let last_rec_lsn_before_msg = last_rec_lsn;
271 752124 :
272 752124 : // Update the connection status before processing the message. If the message processing
273 752124 : // fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
274 752124 : match &replication_message {
275 748621 : ReplicationMessage::XLogData(xlog_data) => {
276 748621 : connection_status.latest_connection_update = now;
277 748621 : connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
278 748621 : connection_status.streaming_lsn = Some(Lsn::from(
279 748621 : xlog_data.wal_start() + xlog_data.data().len() as u64,
280 748621 : ));
281 748621 : if !xlog_data.data().is_empty() {
282 748621 : connection_status.latest_wal_update = now;
283 748621 : }
284 : }
285 3503 : ReplicationMessage::PrimaryKeepAlive(keepalive) => {
286 3503 : connection_status.latest_connection_update = now;
287 3503 : connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
288 3503 : }
289 0 : &_ => {}
290 : };
291 752124 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
292 0 : warn!("Wal connection event listener dropped, aborting the connection: {e}");
293 0 : return Ok(());
294 752124 : }
295 :
296 752124 : let status_update = match replication_message {
297 748621 : ReplicationMessage::XLogData(xlog_data) => {
298 748621 : // Pass the WAL data to the decoder, and see if we can decode
299 748621 : // more records as a result.
300 748621 : let data = xlog_data.data();
301 748621 : let startlsn = Lsn::from(xlog_data.wal_start());
302 748621 : let endlsn = startlsn + data.len() as u64;
303 :
304 0 : trace!("received XLogData between {startlsn} and {endlsn}");
305 :
306 748621 : waldecoder.feed_bytes(data);
307 748621 :
308 748621 : {
309 748621 : let mut decoded = DecodedWALRecord::default();
310 748621 : let mut modification = timeline.begin_modification(startlsn);
311 748621 : let mut uncommitted_records = 0;
312 748621 : let mut filtered_records = 0;
313 73296003 : while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
314 : // It is important to deal with the aligned records as lsn in getPage@LSN is
315 : // aligned and can be several bytes bigger. Without this alignment we are
316 : // at risk of hitting a deadlock.
317 72547387 : if !lsn.is_aligned() {
318 0 : return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
319 72547387 : }
320 :
321 : // Ingest the records without immediately committing them.
322 72547387 : let ingested = walingest
323 72547387 : .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
324 12591 : .await
325 72547387 : .with_context(|| format!("could not ingest record at {lsn}"))?;
326 72547383 : if !ingested {
327 0 : tracing::debug!("ingest: filtered out record @ LSN {lsn}");
328 66317453 : WAL_INGEST.records_filtered.inc();
329 66317453 : filtered_records += 1;
330 6229930 : }
331 :
332 0 : fail_point!("walreceiver-after-ingest");
333 :
334 72547383 : last_rec_lsn = lsn;
335 72547383 :
336 72547383 : // Commit every ingest_batch_size records. Even if we filtered out
337 72547383 : // all records, we still need to call commit to advance the LSN.
338 72547383 : uncommitted_records += 1;
339 72547383 : if uncommitted_records >= ingest_batch_size {
340 604825 : WAL_INGEST
341 604825 : .records_committed
342 604825 : .inc_by(uncommitted_records - filtered_records);
343 604825 : modification.commit(&ctx).await?;
344 604824 : uncommitted_records = 0;
345 604824 : filtered_records = 0;
346 604824 :
347 604824 : //
348 604824 : // We should check checkpoint distance after appending each ingest_batch_size bytes because otherwise
349 604824 : // layer size can become much larger than `checkpoint_distance`.
350 604824 : // It can append because wal-sender is sending WAL using 125kb chucks and some WAL records can cause writing large
351 604824 : // amount of data to key-value storage. So performing this check only after processing
352 604824 : // all WAL records in the chunk, can cause huge L0 layer files.
353 604824 : //
354 604824 : timeline
355 604824 : .check_checkpoint_distance()
356 7286 : .await
357 604824 : .with_context(|| {
358 0 : format!(
359 0 : "Failed to check checkpoint distance for timeline {}",
360 0 : timeline.timeline_id
361 0 : )
362 604824 : })?;
363 71942558 : }
364 : }
365 :
366 : // Commit the remaining records.
367 748616 : if uncommitted_records > 0 {
368 747521 : WAL_INGEST
369 747521 : .records_committed
370 747521 : .inc_by(uncommitted_records - filtered_records);
371 747521 : modification.commit(&ctx).await?;
372 1095 : }
373 : }
374 :
375 748616 : if !caught_up && endlsn >= end_of_wal {
376 648 : info!("caught up at LSN {endlsn}");
377 648 : caught_up = true;
378 747968 : }
379 :
380 748616 : Some(endlsn)
381 : }
382 :
383 3503 : ReplicationMessage::PrimaryKeepAlive(keepalive) => {
384 3503 : let wal_end = keepalive.wal_end();
385 3503 : let timestamp = keepalive.timestamp();
386 3503 : let reply_requested = keepalive.reply() != 0;
387 :
388 0 : trace!("received PrimaryKeepAlive(wal_end: {wal_end}, timestamp: {timestamp:?} reply: {reply_requested})");
389 :
390 3503 : if reply_requested {
391 3503 : Some(last_rec_lsn)
392 : } else {
393 0 : None
394 : }
395 : }
396 :
397 0 : _ => None,
398 : };
399 :
400 752119 : if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
401 : // We have successfully processed at least one WAL record.
402 655 : connection_status.has_processed_wal = true;
403 655 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
404 0 : warn!("Wal connection event listener dropped, aborting the connection: {e}");
405 0 : return Ok(());
406 655 : }
407 751464 : }
408 :
409 752119 : timeline
410 752119 : .check_checkpoint_distance()
411 2546 : .await
412 752119 : .with_context(|| {
413 0 : format!(
414 0 : "Failed to check checkpoint distance for timeline {}",
415 0 : timeline.timeline_id
416 0 : )
417 752119 : })?;
418 :
419 752119 : if let Some(last_lsn) = status_update {
420 752119 : let timeline_remote_consistent_lsn = timeline
421 752119 : .get_remote_consistent_lsn_visible()
422 752119 : .unwrap_or(Lsn(0));
423 752119 :
424 752119 : // The last LSN we processed. It is not guaranteed to survive pageserver crash.
425 752119 : let last_received_lsn = last_lsn;
426 752119 : // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
427 752119 : let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
428 752119 : // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
429 752119 : // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
430 752119 : let remote_consistent_lsn = timeline_remote_consistent_lsn;
431 752119 : let ts = SystemTime::now();
432 752119 :
433 752119 : // Update the status about what we just received. This is shown in the mgmt API.
434 752119 : let last_received_wal = WalReceiverInfo {
435 752119 : wal_source_connconf: wal_source_connconf.clone(),
436 752119 : last_received_msg_lsn: last_lsn,
437 752119 : last_received_msg_ts: ts
438 752119 : .duration_since(SystemTime::UNIX_EPOCH)
439 752119 : .expect("Received message time should be before UNIX EPOCH!")
440 752119 : .as_micros(),
441 752119 : };
442 752119 : *timeline.last_received_wal.lock().unwrap() = Some(last_received_wal);
443 :
444 : // Send the replication feedback message.
445 : // Regular standby_status_update fields are put into this message.
446 752119 : let current_timeline_size = if timeline.tenant_shard_id.is_zero() {
447 662127 : timeline
448 662127 : .get_current_logical_size(
449 662127 : crate::tenant::timeline::GetLogicalSizePriority::User,
450 662127 : &ctx,
451 662127 : )
452 662127 : // FIXME: https://github.com/neondatabase/neon/issues/5963
453 662127 : .size_dont_care_about_accuracy()
454 : } else {
455 : // Non-zero shards send zero for logical size. The safekeeper will ignore
456 : // this number. This is because in a sharded tenant, only shard zero maintains
457 : // accurate logical size.
458 89992 : 0
459 : };
460 :
461 752119 : let status_update = PageserverFeedback {
462 752119 : current_timeline_size,
463 752119 : last_received_lsn,
464 752119 : disk_consistent_lsn,
465 752119 : remote_consistent_lsn,
466 752119 : replytime: ts,
467 752119 : };
468 :
469 0 : debug!("neon_status_update {status_update:?}");
470 :
471 752119 : let mut data = BytesMut::new();
472 752119 : status_update.serialize(&mut data);
473 752119 : physical_stream
474 752119 : .as_mut()
475 752119 : .zenith_status_update(data.len() as u64, &data)
476 52500 : .await?;
477 0 : }
478 : }
479 :
480 211 : Ok(())
481 1707 : }
482 :
483 : /// Data returned from the postgres `IDENTIFY_SYSTEM` command
484 : ///
485 : /// See the [postgres docs] for more details.
486 : ///
487 : /// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html
488 761 : #[derive(Debug)]
489 : // As of nightly 2021-09-11, fields that are only read by the type's `Debug` impl still count as
490 : // unused. Relevant issue: https://github.com/rust-lang/rust/issues/88900
491 : #[allow(dead_code)]
492 : struct IdentifySystem {
493 : systemid: u64,
494 : timeline: u32,
495 : xlogpos: PgLsn,
496 : dbname: Option<String>,
497 : }
498 :
499 : /// There was a problem parsing the response to
500 : /// a postgres IDENTIFY_SYSTEM command.
501 0 : #[derive(Debug, thiserror::Error)]
502 : #[error("IDENTIFY_SYSTEM parse error")]
503 : struct IdentifyError;
504 :
505 : /// Run the postgres `IDENTIFY_SYSTEM` command
506 763 : async fn identify_system(client: &Client) -> anyhow::Result<IdentifySystem> {
507 763 : let query_str = "IDENTIFY_SYSTEM";
508 763 : let response = client.simple_query(query_str).await?;
509 :
510 : // get(N) from row, then parse it as some destination type.
511 3044 : fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>
512 3044 : where
513 3044 : T: FromStr,
514 3044 : {
515 3044 : let val = row.get(idx).ok_or(IdentifyError)?;
516 2283 : val.parse::<T>().or(Err(IdentifyError))
517 3044 : }
518 :
519 : // extract the row contents into an IdentifySystem struct.
520 : // written as a closure so I can use ? for Option here.
521 761 : if let Some(SimpleQueryMessage::Row(first_row)) = response.first() {
522 : Ok(IdentifySystem {
523 761 : systemid: get_parse(first_row, 0)?,
524 761 : timeline: get_parse(first_row, 1)?,
525 761 : xlogpos: get_parse(first_row, 2)?,
526 761 : dbname: get_parse(first_row, 3).ok(),
527 : })
528 : } else {
529 0 : Err(IdentifyError.into())
530 : }
531 763 : }
|