TLA Line data Source code
1 : //! Actual Postgres connection handler to stream WAL to the server.
2 :
3 : use std::{
4 : error::Error,
5 : pin::pin,
6 : str::FromStr,
7 : sync::Arc,
8 : time::{Duration, SystemTime},
9 : };
10 :
11 : use anyhow::{anyhow, Context};
12 : use bytes::BytesMut;
13 : use chrono::{NaiveDateTime, Utc};
14 : use fail::fail_point;
15 : use futures::StreamExt;
16 : use postgres::{error::SqlState, SimpleQueryMessage, SimpleQueryRow};
17 : use postgres_ffi::WAL_SEGMENT_SIZE;
18 : use postgres_ffi::{v14::xlog_utils::normalize_lsn, waldecoder::WalDecodeError};
19 : use postgres_protocol::message::backend::ReplicationMessage;
20 : use postgres_types::PgLsn;
21 : use tokio::{select, sync::watch, time};
22 : use tokio_postgres::{replication::ReplicationStream, Client};
23 : use tokio_util::sync::CancellationToken;
24 : use tracing::{debug, error, info, trace, warn, Instrument};
25 :
26 : use super::TaskStateUpdate;
27 : use crate::{
28 : context::RequestContext,
29 : metrics::{LIVE_CONNECTIONS_COUNT, WALRECEIVER_STARTED_CONNECTIONS},
30 : task_mgr,
31 : task_mgr::TaskKind,
32 : task_mgr::WALRECEIVER_RUNTIME,
33 : tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline, WalReceiverInfo},
34 : walingest::WalIngest,
35 : walrecord::DecodedWALRecord,
36 : };
37 : use postgres_backend::is_expected_io_error;
38 : use postgres_connection::PgConnectionConfig;
39 : use postgres_ffi::waldecoder::WalStreamDecoder;
40 : use utils::pageserver_feedback::PageserverFeedback;
41 : use utils::{id::NodeId, lsn::Lsn};
42 :
43 : /// Status of the connection.
44 CBC 751946 : #[derive(Debug, Clone, Copy)]
45 : pub(super) struct WalConnectionStatus {
46 : /// If we were able to initiate a postgres connection, this means that safekeeper process is at least running.
47 : pub is_connected: bool,
48 : /// Defines a healthy connection as one on which pageserver received WAL from safekeeper
49 : /// and is able to process it in walingest without errors.
50 : pub has_processed_wal: bool,
51 : /// Connection establishment time or the timestamp of a latest connection message received.
52 : pub latest_connection_update: NaiveDateTime,
53 : /// Time of the latest WAL message received.
54 : pub latest_wal_update: NaiveDateTime,
55 : /// Latest WAL update contained WAL up to this LSN. Next WAL message with start from that LSN.
56 : pub streaming_lsn: Option<Lsn>,
57 : /// Latest commit_lsn received from the safekeeper. Can be zero if no message has been received yet.
58 : pub commit_lsn: Option<Lsn>,
59 : /// The node it is connected to
60 : pub node: NodeId,
61 : }
62 :
63 : pub(super) enum WalReceiverError {
64 : /// An error of a type that does not indicate an issue, e.g. a connection closing
65 : ExpectedSafekeeperError(postgres::Error),
66 : /// An "error" message that carries a SUCCESSFUL_COMPLETION status code. Carries
67 : /// the message part of the original postgres error
68 : SuccessfulCompletion(String),
69 : /// Generic error
70 : Other(anyhow::Error),
71 : }
72 :
73 : impl From<tokio_postgres::Error> for WalReceiverError {
74 : fn from(err: tokio_postgres::Error) -> Self {
75 1266 : if let Some(dberror) = err.as_db_error().filter(|db_error| {
76 57 : db_error.code() == &SqlState::SUCCESSFUL_COMPLETION
77 57 : && db_error.message().contains("ending streaming")
78 1266 : }) {
79 : // Strip the outer DbError, which carries a misleading "error" severity
80 57 : Self::SuccessfulCompletion(dberror.message().to_string())
81 1209 : } else if err.is_closed()
82 527 : || err
83 527 : .source()
84 527 : .and_then(|source| source.downcast_ref::<std::io::Error>())
85 527 : .map(is_expected_io_error)
86 527 : .unwrap_or(false)
87 : {
88 1201 : Self::ExpectedSafekeeperError(err)
89 : } else {
90 8 : Self::Other(anyhow::Error::new(err))
91 : }
92 1266 : }
93 : }
94 :
95 : impl From<anyhow::Error> for WalReceiverError {
96 9 : fn from(err: anyhow::Error) -> Self {
97 9 : Self::Other(err)
98 9 : }
99 : }
100 :
101 : impl From<WalDecodeError> for WalReceiverError {
102 UBC 0 : fn from(err: WalDecodeError) -> Self {
103 0 : Self::Other(anyhow::Error::new(err))
104 0 : }
105 : }
106 :
107 : /// Open a connection to the given safekeeper and receive WAL, sending back progress
108 : /// messages as we go.
109 CBC 1189 : pub(super) async fn handle_walreceiver_connection(
110 1189 : timeline: Arc<Timeline>,
111 1189 : wal_source_connconf: PgConnectionConfig,
112 1189 : events_sender: watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
113 1189 : cancellation: CancellationToken,
114 1189 : connect_timeout: Duration,
115 1189 : ctx: RequestContext,
116 1189 : node: NodeId,
117 1189 : ) -> Result<(), WalReceiverError> {
118 1188 : debug_assert_current_span_has_tenant_and_timeline_id();
119 1188 :
120 1188 : WALRECEIVER_STARTED_CONNECTIONS.inc();
121 :
122 : // Connect to the database in replication mode.
123 1188 : info!("connecting to {wal_source_connconf:?}");
124 :
125 679 : let (mut replication_client, connection) = {
126 1188 : let mut config = wal_source_connconf.to_tokio_postgres_config();
127 1188 : config.application_name("pageserver");
128 1188 : config.replication_mode(tokio_postgres::config::ReplicationMode::Physical);
129 1893 : match time::timeout(connect_timeout, config.connect(postgres::NoTls)).await {
130 1188 : Ok(client_and_conn) => client_and_conn?,
131 UBC 0 : Err(_elapsed) => {
132 : // Timing out to connect to a safekeeper node could happen long time, due to
133 : // many reasons that pageserver cannot control.
134 : // Do not produce an error, but make it visible, that timeouts happen by logging the `event.
135 0 : info!("Timed out while waiting {connect_timeout:?} for walreceiver connection to open");
136 0 : return Ok(());
137 : }
138 : }
139 : };
140 :
141 0 : debug!("connected!");
142 CBC 679 : let mut connection_status = WalConnectionStatus {
143 679 : is_connected: true,
144 679 : has_processed_wal: false,
145 679 : latest_connection_update: Utc::now().naive_utc(),
146 679 : latest_wal_update: Utc::now().naive_utc(),
147 679 : streaming_lsn: None,
148 679 : commit_lsn: None,
149 679 : node,
150 679 : };
151 679 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
152 UBC 0 : warn!("Wal connection event listener dropped right after connection init, aborting the connection: {e}");
153 0 : return Ok(());
154 CBC 679 : }
155 679 :
156 679 : // The connection object performs the actual communication with the database,
157 679 : // so spawn it off to run on its own.
158 679 : let _connection_ctx = ctx.detached_child(
159 679 : TaskKind::WalReceiverConnectionPoller,
160 679 : ctx.download_behavior(),
161 679 : );
162 679 : let connection_cancellation = cancellation.clone();
163 679 : task_mgr::spawn(
164 679 : WALRECEIVER_RUNTIME.handle(),
165 679 : TaskKind::WalReceiverConnectionPoller,
166 679 : Some(timeline.tenant_id),
167 679 : Some(timeline.timeline_id),
168 679 : "walreceiver connection",
169 : false,
170 679 : async move {
171 679 : debug_assert_current_span_has_tenant_and_timeline_id();
172 679 :
173 1419587 : select! {
174 386 : connection_result = connection => match connection_result {
175 UBC 0 : Ok(()) => debug!("Walreceiver db connection closed"),
176 : Err(connection_error) => {
177 : match WalReceiverError::from(connection_error) {
178 : WalReceiverError::ExpectedSafekeeperError(_) => {
179 : // silence, because most likely we've already exited the outer call
180 : // with a similar error.
181 : },
182 : WalReceiverError::SuccessfulCompletion(_) => {}
183 : WalReceiverError::Other(err) => {
184 CBC 8 : warn!("Connection aborted: {err:#}")
185 : }
186 : }
187 : }
188 : },
189 UBC 0 : _ = connection_cancellation.cancelled() => debug!("Connection cancelled"),
190 : }
191 CBC 613 : Ok(())
192 613 : }
193 : // Enrich the log lines emitted by this closure with meaningful context.
194 : // TODO: technically, this task outlives the surrounding function, so, the
195 : // spans won't be properly nested.
196 679 : .instrument(tracing::info_span!("poller")),
197 : );
198 :
199 : // Immediately increment the gauge, then create a job to decrement it on task exit.
200 : // One of the pros of `defer!` is that this will *most probably*
201 : // get called, even in presence of panics.
202 679 : let gauge = LIVE_CONNECTIONS_COUNT.with_label_values(&["wal_receiver"]);
203 679 : gauge.inc();
204 613 : scopeguard::defer! {
205 613 : gauge.dec();
206 613 : }
207 :
208 679 : let identify = identify_system(&mut replication_client).await?;
209 679 : info!("{identify:?}");
210 :
211 679 : let end_of_wal = Lsn::from(u64::from(identify.xlogpos));
212 679 : let mut caught_up = false;
213 679 :
214 679 : connection_status.latest_connection_update = Utc::now().naive_utc();
215 679 : connection_status.latest_wal_update = Utc::now().naive_utc();
216 679 : connection_status.commit_lsn = Some(end_of_wal);
217 679 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
218 UBC 0 : warn!("Wal connection event listener dropped after IDENTIFY_SYSTEM, aborting the connection: {e}");
219 0 : return Ok(());
220 CBC 679 : }
221 679 :
222 679 : //
223 679 : // Start streaming the WAL, from where we left off previously.
224 679 : //
225 679 : // If we had previously received WAL up to some point in the middle of a WAL record, we
226 679 : // better start from the end of last full WAL record, not in the middle of one.
227 679 : let mut last_rec_lsn = timeline.get_last_record_lsn();
228 679 : let mut startpoint = last_rec_lsn;
229 679 :
230 679 : if startpoint == Lsn(0) {
231 UBC 0 : return Err(WalReceiverError::Other(anyhow!("No previous WAL position")));
232 CBC 679 : }
233 679 :
234 679 : // There might be some padding after the last full record, skip it.
235 679 : startpoint += startpoint.calc_padding(8u32);
236 679 :
237 679 : // If the starting point is at a WAL page boundary, skip past the page header. We don't need the page headers
238 679 : // for anything, and in some corner cases, the compute node might have never generated the WAL for page headers
239 679 : //. That happens if you create a branch at page boundary: the start point of the branch is at the page boundary,
240 679 : // but when the compute node first starts on the branch, we normalize the first REDO position to just after the page
241 679 : // header (see generate_pg_control()), so the WAL for the page header is never streamed from the compute node
242 679 : // to the safekeepers.
243 679 : startpoint = normalize_lsn(startpoint, WAL_SEGMENT_SIZE);
244 :
245 679 : info!("last_record_lsn {last_rec_lsn} starting replication from {startpoint}, safekeeper is at {end_of_wal}...");
246 :
247 679 : let query = format!("START_REPLICATION PHYSICAL {startpoint}");
248 :
249 679 : let copy_stream = replication_client.copy_both_simple(&query).await?;
250 677 : let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
251 677 :
252 677 : let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
253 :
254 677 : let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx).await?;
255 :
256 777307 : while let Some(replication_message) = {
257 777538 : select! {
258 : _ = cancellation.cancelled() => {
259 UBC 0 : debug!("walreceiver interrupted");
260 : None
261 : }
262 CBC 777307 : replication_message = physical_stream.next() => replication_message,
263 : }
264 : } {
265 777307 : let replication_message = replication_message?;
266 :
267 776912 : let now = Utc::now().naive_utc();
268 776912 : let last_rec_lsn_before_msg = last_rec_lsn;
269 776912 :
270 776912 : // Update the connection status before processing the message. If the message processing
271 776912 : // fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
272 776912 : match &replication_message {
273 774979 : ReplicationMessage::XLogData(xlog_data) => {
274 774979 : connection_status.latest_connection_update = now;
275 774979 : connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
276 774979 : connection_status.streaming_lsn = Some(Lsn::from(
277 774979 : xlog_data.wal_start() + xlog_data.data().len() as u64,
278 774979 : ));
279 774979 : if !xlog_data.data().is_empty() {
280 774979 : connection_status.latest_wal_update = now;
281 774979 : }
282 : }
283 1933 : ReplicationMessage::PrimaryKeepAlive(keepalive) => {
284 1933 : connection_status.latest_connection_update = now;
285 1933 : connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
286 1933 : }
287 UBC 0 : &_ => {}
288 : };
289 CBC 776912 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
290 UBC 0 : warn!("Wal connection event listener dropped, aborting the connection: {e}");
291 0 : return Ok(());
292 CBC 776912 : }
293 :
294 776912 : let status_update = match replication_message {
295 774979 : ReplicationMessage::XLogData(xlog_data) => {
296 774979 : // Pass the WAL data to the decoder, and see if we can decode
297 774979 : // more records as a result.
298 774979 : let data = xlog_data.data();
299 774979 : let startlsn = Lsn::from(xlog_data.wal_start());
300 774979 : let endlsn = startlsn + data.len() as u64;
301 :
302 UBC 0 : trace!("received XLogData between {startlsn} and {endlsn}");
303 :
304 CBC 774979 : waldecoder.feed_bytes(data);
305 774979 :
306 774979 : {
307 774979 : let mut decoded = DecodedWALRecord::default();
308 774979 : let mut modification = timeline.begin_modification(endlsn);
309 69363164 : while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
310 : // It is important to deal with the aligned records as lsn in getPage@LSN is
311 : // aligned and can be several bytes bigger. Without this alignment we are
312 : // at risk of hitting a deadlock.
313 68588194 : if !lsn.is_aligned() {
314 UBC 0 : return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
315 CBC 68588194 : }
316 68588194 :
317 68588194 : walingest
318 68588194 : .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
319 4419908 : .await
320 68588185 : .with_context(|| format!("could not ingest record at {lsn}"))?;
321 :
322 UBC 0 : fail_point!("walreceiver-after-ingest");
323 :
324 CBC 68588185 : last_rec_lsn = lsn;
325 : }
326 : }
327 :
328 774969 : if !caught_up && endlsn >= end_of_wal {
329 539 : info!("caught up at LSN {endlsn}");
330 539 : caught_up = true;
331 774430 : }
332 :
333 774969 : Some(endlsn)
334 : }
335 :
336 1933 : ReplicationMessage::PrimaryKeepAlive(keepalive) => {
337 1933 : let wal_end = keepalive.wal_end();
338 1933 : let timestamp = keepalive.timestamp();
339 1933 : let reply_requested = keepalive.reply() != 0;
340 :
341 UBC 0 : trace!("received PrimaryKeepAlive(wal_end: {wal_end}, timestamp: {timestamp:?} reply: {reply_requested})");
342 :
343 CBC 1933 : if reply_requested {
344 1933 : Some(last_rec_lsn)
345 : } else {
346 UBC 0 : None
347 : }
348 : }
349 :
350 0 : _ => None,
351 : };
352 :
353 CBC 776902 : if !connection_status.has_processed_wal && last_rec_lsn > last_rec_lsn_before_msg {
354 : // We have successfully processed at least one WAL record.
355 550 : connection_status.has_processed_wal = true;
356 550 : if let Err(e) = events_sender.send(TaskStateUpdate::Progress(connection_status)) {
357 UBC 0 : warn!("Wal connection event listener dropped, aborting the connection: {e}");
358 0 : return Ok(());
359 CBC 550 : }
360 776352 : }
361 :
362 776902 : timeline
363 776902 : .check_checkpoint_distance()
364 1975 : .await
365 776902 : .with_context(|| {
366 UBC 0 : format!(
367 0 : "Failed to check checkpoint distance for timeline {}",
368 0 : timeline.timeline_id
369 0 : )
370 CBC 776902 : })?;
371 :
372 776902 : if let Some(last_lsn) = status_update {
373 776902 : let timeline_remote_consistent_lsn = timeline
374 776902 : .get_remote_consistent_lsn_visible()
375 776902 : .unwrap_or(Lsn(0));
376 776902 :
377 776902 : // The last LSN we processed. It is not guaranteed to survive pageserver crash.
378 776902 : let last_received_lsn = last_lsn;
379 776902 : // `disk_consistent_lsn` is the LSN at which page server guarantees local persistence of all received data
380 776902 : let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
381 776902 : // The last LSN that is synced to remote storage and is guaranteed to survive pageserver crash
382 776902 : // Used by safekeepers to remove WAL preceding `remote_consistent_lsn`.
383 776902 : let remote_consistent_lsn = timeline_remote_consistent_lsn;
384 776902 : let ts = SystemTime::now();
385 776902 :
386 776902 : // Update the status about what we just received. This is shown in the mgmt API.
387 776902 : let last_received_wal = WalReceiverInfo {
388 776902 : wal_source_connconf: wal_source_connconf.clone(),
389 776902 : last_received_msg_lsn: last_lsn,
390 776902 : last_received_msg_ts: ts
391 776902 : .duration_since(SystemTime::UNIX_EPOCH)
392 776902 : .expect("Received message time should be before UNIX EPOCH!")
393 776902 : .as_micros(),
394 776902 : };
395 776902 : *timeline.last_received_wal.lock().unwrap() = Some(last_received_wal);
396 :
397 : // Send the replication feedback message.
398 : // Regular standby_status_update fields are put into this message.
399 776902 : let (timeline_logical_size, _) = timeline
400 776902 : .get_current_logical_size(&ctx)
401 776902 : .context("Status update creation failed to get current logical size")?;
402 776902 : let status_update = PageserverFeedback {
403 776902 : current_timeline_size: timeline_logical_size,
404 776902 : last_received_lsn,
405 776902 : disk_consistent_lsn,
406 776902 : remote_consistent_lsn,
407 776902 : replytime: ts,
408 776902 : };
409 :
410 UBC 0 : debug!("neon_status_update {status_update:?}");
411 :
412 CBC 776902 : let mut data = BytesMut::new();
413 776902 : status_update.serialize(&mut data);
414 776902 : physical_stream
415 776902 : .as_mut()
416 776902 : .zenith_status_update(data.len() as u64, &data)
417 22930 : .await?;
418 UBC 0 : }
419 : }
420 :
421 CBC 175 : Ok(())
422 1122 : }
423 :
424 : /// Data returned from the postgres `IDENTIFY_SYSTEM` command
425 : ///
426 : /// See the [postgres docs] for more details.
427 : ///
428 : /// [postgres docs]: https://www.postgresql.org/docs/current/protocol-replication.html
429 679 : #[derive(Debug)]
430 : // As of nightly 2021-09-11, fields that are only read by the type's `Debug` impl still count as
431 : // unused. Relevant issue: https://github.com/rust-lang/rust/issues/88900
432 : #[allow(dead_code)]
433 : struct IdentifySystem {
434 : systemid: u64,
435 : timeline: u32,
436 : xlogpos: PgLsn,
437 : dbname: Option<String>,
438 : }
439 :
440 : /// There was a problem parsing the response to
441 : /// a postgres IDENTIFY_SYSTEM command.
442 UBC 0 : #[derive(Debug, thiserror::Error)]
443 : #[error("IDENTIFY_SYSTEM parse error")]
444 : struct IdentifyError;
445 :
446 : /// Run the postgres `IDENTIFY_SYSTEM` command
447 CBC 679 : async fn identify_system(client: &mut Client) -> anyhow::Result<IdentifySystem> {
448 679 : let query_str = "IDENTIFY_SYSTEM";
449 679 : let response = client.simple_query(query_str).await?;
450 :
451 : // get(N) from row, then parse it as some destination type.
452 2716 : fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>
453 2716 : where
454 2716 : T: FromStr,
455 2716 : {
456 2716 : let val = row.get(idx).ok_or(IdentifyError)?;
457 2037 : val.parse::<T>().or(Err(IdentifyError))
458 2716 : }
459 :
460 : // extract the row contents into an IdentifySystem struct.
461 : // written as a closure so I can use ? for Option here.
462 679 : if let Some(SimpleQueryMessage::Row(first_row)) = response.get(0) {
463 : Ok(IdentifySystem {
464 679 : systemid: get_parse(first_row, 0)?,
465 679 : timeline: get_parse(first_row, 1)?,
466 679 : xlogpos: get_parse(first_row, 2)?,
467 679 : dbname: get_parse(first_row, 3).ok(),
468 : })
469 : } else {
470 UBC 0 : Err(IdentifyError.into())
471 : }
472 CBC 679 : }
|