Line data Source code
1 : //! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
2 : //! that contains the latest WAL to stream and this connection does not go stale.
3 : //!
4 : //! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
5 : //! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
6 : //! Current connection state is tracked too, to ensure it's not getting stale.
7 : //!
8 : //! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
9 : //! then a (re)connection happens, if necessary.
10 : //! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
11 :
12 : use std::collections::HashMap;
13 : use std::num::NonZeroU64;
14 : use std::ops::ControlFlow;
15 : use std::sync::Arc;
16 : use std::time::Duration;
17 :
18 : use anyhow::Context;
19 : use chrono::{NaiveDateTime, Utc};
20 : use pageserver_api::models::TimelineState;
21 : use postgres_connection::PgConnectionConfig;
22 : use storage_broker::proto::{
23 : FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
24 : SubscribeByFilterRequest, TenantTimelineId as ProtoTenantTimelineId, TypeSubscription,
25 : TypedMessage,
26 : };
27 : use storage_broker::{BrokerClientChannel, Code, Streaming};
28 : use tokio_util::sync::CancellationToken;
29 : use tracing::*;
30 : use utils::backoff::{
31 : DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
32 : };
33 : use utils::id::{NodeId, TenantTimelineId};
34 : use utils::lsn::Lsn;
35 : use utils::postgres_client::{ConnectionConfigArgs, wal_stream_connection_config};
36 :
37 : use super::walreceiver_connection::{WalConnectionStatus, WalReceiverError};
38 : use super::{TaskEvent, TaskHandle, TaskStateUpdate, WalReceiverConf};
39 : use crate::context::{DownloadBehavior, RequestContext};
40 : use crate::metrics::{
41 : WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
42 : WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
43 : };
44 : use crate::task_mgr::TaskKind;
45 : use crate::tenant::{Timeline, debug_assert_current_span_has_tenant_and_timeline_id};
46 :
47 : pub(crate) struct Cancelled;
48 :
49 : /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
50 : /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
51 : /// If storage broker subscription is cancelled, exits.
52 : ///
53 : /// # Cancel-Safety
54 : ///
55 : /// Not cancellation-safe. Use `cancel` token to request cancellation.
56 0 : pub(super) async fn connection_manager_loop_step(
57 0 : broker_client: &mut BrokerClientChannel,
58 0 : connection_manager_state: &mut ConnectionManagerState,
59 0 : ctx: &RequestContext,
60 0 : cancel: &CancellationToken,
61 0 : manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
62 0 : ) -> Result<(), Cancelled> {
63 0 : match tokio::select! {
64 0 : _ = cancel.cancelled() => { return Err(Cancelled); },
65 0 : st = connection_manager_state.timeline.wait_to_become_active(ctx) => { st }
66 : } {
67 0 : Ok(()) => {}
68 0 : Err(new_state) => {
69 0 : debug!(
70 : ?new_state,
71 0 : "state changed, stopping wal connection manager loop"
72 : );
73 0 : return Err(Cancelled);
74 : }
75 : }
76 :
77 0 : WALRECEIVER_ACTIVE_MANAGERS.inc();
78 0 : scopeguard::defer! {
79 : WALRECEIVER_ACTIVE_MANAGERS.dec();
80 : }
81 :
82 0 : let id = TenantTimelineId {
83 0 : tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
84 0 : timeline_id: connection_manager_state.timeline.timeline_id,
85 0 : };
86 :
87 0 : let mut timeline_state_updates = connection_manager_state
88 0 : .timeline
89 0 : .subscribe_for_state_updates();
90 :
91 0 : let mut wait_lsn_status = connection_manager_state
92 0 : .timeline
93 0 : .subscribe_for_wait_lsn_updates();
94 :
95 : // TODO: create a separate config option for discovery request interval
96 0 : let discovery_request_interval = connection_manager_state.conf.lagging_wal_timeout;
97 0 : let mut last_discovery_ts: Option<std::time::Instant> = None;
98 :
99 : // Subscribe to the broker updates. Stream shares underlying TCP connection
100 : // with other streams on this client (other connection managers). When
101 : // object goes out of scope, stream finishes in drop() automatically.
102 0 : let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
103 0 : let mut broker_reset_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
104 0 : debug!("Subscribed for broker timeline updates");
105 :
106 : loop {
107 0 : let time_until_next_retry = connection_manager_state.time_until_next_retry();
108 0 : let any_activity = connection_manager_state.wal_connection.is_some()
109 0 : || !connection_manager_state.wal_stream_candidates.is_empty();
110 :
111 : // These things are happening concurrently:
112 : //
113 : // - cancellation request
114 : // - keep receiving WAL on the current connection
115 : // - if the shared state says we need to change connection, disconnect and return
116 : // - this runs in a separate task and we receive updates via a watch channel
117 : // - change connection if the rules decide so, or if the current connection dies
118 : // - receive updates from broker
119 : // - this might change the current desired connection
120 : // - timeline state changes to something that does not allow walreceiver to run concurrently
121 : // - if there's no connection and no candidates, try to send a discovery request
122 :
123 : // NB: make sure each of the select expressions are cancellation-safe
124 : // (no need for arms to be cancellation-safe).
125 0 : tokio::select! {
126 0 : _ = cancel.cancelled() => { return Err(Cancelled); }
127 0 : Some(wal_connection_update) = async {
128 0 : match connection_manager_state.wal_connection.as_mut() {
129 0 : Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
130 0 : None => None,
131 : }
132 0 : } => {
133 0 : let wal_connection = connection_manager_state.wal_connection.as_mut()
134 0 : .expect("Should have a connection, as checked by the corresponding select! guard");
135 0 : match wal_connection_update {
136 0 : TaskEvent::Update(TaskStateUpdate::Started) => {},
137 0 : TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
138 0 : if new_status.has_processed_wal {
139 0 : // We have advanced last_record_lsn by processing the WAL received
140 0 : // from this safekeeper. This is good enough to clean unsuccessful
141 0 : // retries history and allow reconnecting to this safekeeper without
142 0 : // sleeping for a long time.
143 0 : connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
144 0 : }
145 0 : wal_connection.status = new_status;
146 : }
147 0 : TaskEvent::End(walreceiver_task_result) => {
148 0 : match walreceiver_task_result {
149 0 : Ok(()) => debug!("WAL receiving task finished"),
150 0 : Err(e) => error!("wal receiver task finished with an error: {e:?}"),
151 : }
152 0 : connection_manager_state.drop_old_connection(false).await;
153 : },
154 : }
155 : },
156 :
157 : // Got a new update from the broker
158 0 : broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
159 0 : match broker_update {
160 0 : Ok(Some(broker_update)) => {
161 0 : broker_reset_interval.reset();
162 0 : connection_manager_state.register_timeline_update(broker_update);
163 0 : },
164 0 : Err(status) => {
165 0 : match status.code() {
166 0 : Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
167 : // tonic's error handling doesn't provide a clear code for disconnections: we get
168 : // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
169 : // => https://github.com/neondatabase/neon/issues/9562
170 0 : info!("broker disconnected: {status}");
171 : },
172 : _ => {
173 0 : warn!("broker subscription failed: {status}");
174 : }
175 : }
176 0 : return Ok(());
177 : }
178 : Ok(None) => {
179 0 : error!("broker subscription stream ended"); // can't happen
180 0 : return Ok(());
181 : }
182 : }
183 : },
184 :
185 : // If we've not received any updates from the broker from a while, are waiting for WAL
186 : // and have no safekeeper connection or connection candidates, then it might be that
187 : // the broker subscription is wedged. Drop the currrent subscription and re-subscribe
188 : // with the goal of unblocking it.
189 0 : _ = broker_reset_interval.tick() => {
190 0 : let awaiting_lsn = wait_lsn_status.borrow().is_some();
191 0 : let no_candidates = connection_manager_state.wal_stream_candidates.is_empty();
192 0 : let no_connection = connection_manager_state.wal_connection.is_none();
193 :
194 0 : if awaiting_lsn && no_candidates && no_connection {
195 0 : tracing::warn!("No broker updates received for a while, but waiting for WAL. Re-setting stream ...");
196 0 : broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
197 0 : }
198 : },
199 :
200 0 : new_event = async {
201 : // Reminder: this match arm needs to be cancellation-safe.
202 : loop {
203 0 : if connection_manager_state.timeline.current_state() == TimelineState::Loading {
204 0 : warn!("wal connection manager should only be launched after timeline has become active");
205 0 : }
206 0 : match timeline_state_updates.changed().await {
207 : Ok(()) => {
208 0 : let new_state = connection_manager_state.timeline.current_state();
209 0 : match new_state {
210 : // we're already active as walreceiver, no need to reactivate
211 0 : TimelineState::Active => continue,
212 : TimelineState::Broken { .. } | TimelineState::Stopping => {
213 0 : debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
214 0 : return ControlFlow::Break(());
215 : }
216 : TimelineState::Loading => {
217 0 : warn!("timeline transitioned back to Loading state, that should not happen");
218 0 : return ControlFlow::Continue(());
219 : }
220 : }
221 : }
222 0 : Err(_sender_dropped_error) => return ControlFlow::Break(()),
223 : }
224 : }
225 0 : } => match new_event {
226 : ControlFlow::Continue(()) => {
227 0 : return Ok(());
228 : }
229 : ControlFlow::Break(()) => {
230 0 : debug!("Timeline is no longer active, stopping wal connection manager loop");
231 0 : return Err(Cancelled);
232 : }
233 : },
234 :
235 0 : Some(()) = async {
236 0 : match time_until_next_retry {
237 0 : Some(sleep_time) => {
238 0 : tokio::time::sleep(sleep_time).await;
239 0 : Some(())
240 : },
241 : None => {
242 0 : debug!("No candidates to retry, waiting indefinitely for the broker events");
243 0 : None
244 : }
245 : }
246 0 : } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
247 :
248 0 : Some(()) = async {
249 : // Reminder: this match arm needs to be cancellation-safe.
250 : // Calculating time needed to wait until sending the next discovery request.
251 : // Current implementation is conservative and sends discovery requests only when there are no candidates.
252 :
253 0 : if any_activity {
254 : // No need to send discovery requests if there is an active connection or candidates.
255 0 : return None;
256 0 : }
257 :
258 : // Waiting for an active wait_lsn request.
259 0 : while wait_lsn_status.borrow().is_none() {
260 0 : if wait_lsn_status.changed().await.is_err() {
261 : // wait_lsn_status channel was closed, exiting
262 0 : warn!("wait_lsn_status channel was closed in connection_manager_loop_step");
263 0 : return None;
264 0 : }
265 : }
266 :
267 : // All preconditions met, preparing to send a discovery request.
268 0 : let now = std::time::Instant::now();
269 0 : let next_discovery_ts = last_discovery_ts
270 0 : .map(|ts| ts + discovery_request_interval)
271 0 : .unwrap_or_else(|| now);
272 :
273 0 : if next_discovery_ts > now {
274 : // Prevent sending discovery requests too frequently.
275 0 : tokio::time::sleep(next_discovery_ts - now).await;
276 0 : }
277 :
278 0 : let tenant_timeline_id = Some(ProtoTenantTimelineId {
279 0 : tenant_id: id.tenant_id.as_ref().to_owned(),
280 0 : timeline_id: id.timeline_id.as_ref().to_owned(),
281 0 : });
282 0 : let request = SafekeeperDiscoveryRequest { tenant_timeline_id };
283 0 : let msg = TypedMessage {
284 0 : r#type: MessageType::SafekeeperDiscoveryRequest as i32,
285 0 : safekeeper_timeline_info: None,
286 0 : safekeeper_discovery_request: Some(request),
287 0 : safekeeper_discovery_response: None,
288 0 : };
289 :
290 0 : last_discovery_ts = Some(std::time::Instant::now());
291 0 : info!("No active connection and no candidates, sending discovery request to the broker");
292 :
293 : // Cancellation safety: we want to send a message to the broker, but publish_one()
294 : // function can get cancelled by the other select! arm. This is absolutely fine, because
295 : // we just want to receive broker updates and discovery is not important if we already
296 : // receive updates.
297 : //
298 : // It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
299 : // This is totally fine because of the reason above.
300 :
301 : // This is a fire-and-forget request, we don't care about the response
302 0 : let _ = broker_client.publish_one(msg).await;
303 0 : debug!("Discovery request sent to the broker");
304 0 : None
305 0 : } => {}
306 : }
307 :
308 0 : if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
309 0 : info!("Switching to new connection candidate: {new_candidate:?}");
310 0 : connection_manager_state
311 0 : .change_connection(new_candidate, ctx)
312 0 : .await
313 0 : }
314 0 : *manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
315 : }
316 0 : }
317 :
318 : /// Endlessly try to subscribe for broker updates for a given timeline.
319 0 : async fn subscribe_for_timeline_updates(
320 0 : broker_client: &mut BrokerClientChannel,
321 0 : id: TenantTimelineId,
322 0 : cancel: &CancellationToken,
323 0 : ) -> Result<Streaming<TypedMessage>, Cancelled> {
324 0 : let mut attempt = 0;
325 : loop {
326 0 : exponential_backoff(
327 0 : attempt,
328 0 : DEFAULT_BASE_BACKOFF_SECONDS,
329 0 : DEFAULT_MAX_BACKOFF_SECONDS,
330 0 : cancel,
331 0 : )
332 0 : .await;
333 0 : attempt += 1;
334 :
335 : // subscribe to the specific timeline
336 0 : let request = SubscribeByFilterRequest {
337 0 : types: vec![
338 0 : TypeSubscription {
339 0 : r#type: MessageType::SafekeeperTimelineInfo as i32,
340 0 : },
341 0 : TypeSubscription {
342 0 : r#type: MessageType::SafekeeperDiscoveryResponse as i32,
343 0 : },
344 0 : ],
345 0 : tenant_timeline_id: Some(FilterTenantTimelineId {
346 0 : enabled: true,
347 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
348 0 : tenant_id: id.tenant_id.as_ref().to_owned(),
349 0 : timeline_id: id.timeline_id.as_ref().to_owned(),
350 0 : }),
351 0 : }),
352 0 : };
353 :
354 : match {
355 0 : tokio::select! {
356 0 : r = broker_client.subscribe_by_filter(request) => { r }
357 0 : _ = cancel.cancelled() => { return Err(Cancelled); }
358 : }
359 : } {
360 0 : Ok(resp) => {
361 0 : return Ok(resp.into_inner());
362 : }
363 0 : Err(e) => {
364 : // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
365 : // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
366 0 : info!(
367 0 : "Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}"
368 : );
369 0 : continue;
370 : }
371 : }
372 : }
373 0 : }
374 :
375 : const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
376 : const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
377 : const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
378 :
379 : /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
380 : pub(super) struct ConnectionManagerState {
381 : id: TenantTimelineId,
382 : /// Use pageserver data about the timeline to filter out some of the safekeepers.
383 : timeline: Arc<Timeline>,
384 : /// Child token of [`super::WalReceiver::cancel`], inherited to all tasks we spawn.
385 : cancel: CancellationToken,
386 : conf: WalReceiverConf,
387 : /// Current connection to safekeeper for WAL streaming.
388 : wal_connection: Option<WalConnection>,
389 : /// Info about retries and unsuccessful attempts to connect to safekeepers.
390 : wal_connection_retries: HashMap<NodeId, RetryInfo>,
391 : /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
392 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
393 : }
394 :
395 : /// An information about connection manager's current connection and connection candidates.
396 : #[derive(Debug, Clone)]
397 : pub struct ConnectionManagerStatus {
398 : existing_connection: Option<WalConnectionStatus>,
399 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
400 : }
401 :
402 : impl ConnectionManagerStatus {
403 : /// Generates a string, describing current connection status in a form, suitable for logging.
404 0 : pub fn to_human_readable_string(&self) -> String {
405 0 : let mut resulting_string = String::new();
406 0 : match &self.existing_connection {
407 0 : Some(connection) => {
408 0 : if connection.has_processed_wal {
409 0 : resulting_string.push_str(&format!(
410 0 : " (update {}): streaming WAL from node {}, ",
411 0 : connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
412 0 : connection.node,
413 0 : ));
414 :
415 0 : match (connection.streaming_lsn, connection.commit_lsn) {
416 0 : (None, None) => resulting_string.push_str("no streaming data"),
417 0 : (None, Some(commit_lsn)) => {
418 0 : resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
419 : }
420 0 : (Some(streaming_lsn), None) => {
421 0 : resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
422 : }
423 0 : (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
424 0 : &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
425 : ),
426 : }
427 0 : } else if connection.is_connected {
428 0 : resulting_string.push_str(&format!(
429 0 : " (update {}): connecting to node {}",
430 0 : connection
431 0 : .latest_connection_update
432 0 : .format("%Y-%m-%d %H:%M:%S"),
433 0 : connection.node,
434 0 : ));
435 0 : } else {
436 0 : resulting_string.push_str(&format!(
437 0 : " (update {}): initializing node {} connection",
438 0 : connection
439 0 : .latest_connection_update
440 0 : .format("%Y-%m-%d %H:%M:%S"),
441 0 : connection.node,
442 0 : ));
443 0 : }
444 : }
445 0 : None => resulting_string.push_str(": disconnected"),
446 : }
447 :
448 0 : resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
449 0 : let mut candidates = self.wal_stream_candidates.iter().peekable();
450 0 : while let Some((node_id, candidate_info)) = candidates.next() {
451 0 : resulting_string.push_str(&format!(
452 0 : "({}|{}|{})",
453 0 : node_id,
454 0 : candidate_info.latest_update.format("%H:%M:%S"),
455 0 : Lsn(candidate_info.timeline.commit_lsn)
456 0 : ));
457 0 : if candidates.peek().is_some() {
458 0 : resulting_string.push_str(", ");
459 0 : }
460 : }
461 0 : resulting_string.push(']');
462 :
463 0 : resulting_string
464 0 : }
465 : }
466 :
467 : /// Current connection data.
468 : #[derive(Debug)]
469 : struct WalConnection {
470 : /// Time when the connection was initiated.
471 : started_at: NaiveDateTime,
472 : /// Current safekeeper pageserver is connected to for WAL streaming.
473 : sk_id: NodeId,
474 : /// Availability zone of the safekeeper.
475 : availability_zone: Option<String>,
476 : /// Status of the connection.
477 : status: WalConnectionStatus,
478 : /// WAL streaming task handle.
479 : connection_task: TaskHandle<WalConnectionStatus>,
480 : /// Have we discovered that other safekeeper has more recent WAL than we do?
481 : discovered_new_wal: Option<NewCommittedWAL>,
482 : }
483 :
484 : /// Notion of a new committed WAL, which exists on other safekeeper.
485 : #[derive(Debug, Clone, Copy)]
486 : struct NewCommittedWAL {
487 : /// LSN of the new committed WAL.
488 : lsn: Lsn,
489 : /// When we discovered that the new committed WAL exists on other safekeeper.
490 : discovered_at: NaiveDateTime,
491 : }
492 :
493 : #[derive(Debug, Clone, Copy)]
494 : struct RetryInfo {
495 : next_retry_at: Option<NaiveDateTime>,
496 : retry_duration_seconds: f64,
497 : }
498 :
499 : /// Data about the timeline to connect to, received from the broker.
500 : #[derive(Debug, Clone)]
501 : struct BrokerSkTimeline {
502 : timeline: SafekeeperDiscoveryResponse,
503 : /// Time at which the data was fetched from the broker last time, to track the stale data.
504 : latest_update: NaiveDateTime,
505 : }
506 :
507 : impl ConnectionManagerState {
508 0 : pub(super) fn new(
509 0 : timeline: Arc<Timeline>,
510 0 : conf: WalReceiverConf,
511 0 : cancel: CancellationToken,
512 0 : ) -> Self {
513 0 : let id = TenantTimelineId {
514 0 : tenant_id: timeline.tenant_shard_id.tenant_id,
515 0 : timeline_id: timeline.timeline_id,
516 0 : };
517 0 : Self {
518 0 : id,
519 0 : timeline,
520 0 : cancel,
521 0 : conf,
522 0 : wal_connection: None,
523 0 : wal_stream_candidates: HashMap::new(),
524 0 : wal_connection_retries: HashMap::new(),
525 0 : }
526 0 : }
527 :
528 5 : fn spawn<Fut>(
529 5 : &self,
530 5 : task: impl FnOnce(
531 5 : tokio::sync::watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
532 5 : CancellationToken,
533 5 : ) -> Fut
534 5 : + Send
535 5 : + 'static,
536 5 : ) -> TaskHandle<WalConnectionStatus>
537 5 : where
538 5 : Fut: std::future::Future<Output = anyhow::Result<()>> + Send,
539 : {
540 : // TODO: get rid of TaskHandle
541 5 : super::TaskHandle::spawn(&self.cancel, task)
542 5 : }
543 :
544 : /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
545 0 : async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
546 0 : WALRECEIVER_SWITCHES
547 0 : .with_label_values(&[new_sk.reason.name()])
548 0 : .inc();
549 :
550 0 : self.drop_old_connection(true).await;
551 :
552 0 : let node_id = new_sk.safekeeper_id;
553 0 : let connect_timeout = self.conf.wal_connect_timeout;
554 0 : let ingest_batch_size = self.conf.ingest_batch_size;
555 0 : let protocol = self.conf.protocol;
556 0 : let validate_wal_contiguity = self.conf.validate_wal_contiguity;
557 0 : let timeline = Arc::clone(&self.timeline);
558 0 : let ctx = ctx.detached_child(
559 0 : TaskKind::WalReceiverConnectionHandler,
560 0 : DownloadBehavior::Download,
561 : );
562 :
563 0 : let span = info_span!("connection", %node_id);
564 0 : let connection_handle = self.spawn(move |events_sender, cancellation| {
565 0 : async move {
566 0 : debug_assert_current_span_has_tenant_and_timeline_id();
567 :
568 0 : let res = super::walreceiver_connection::handle_walreceiver_connection(
569 0 : timeline,
570 0 : protocol,
571 0 : new_sk.wal_source_connconf,
572 0 : events_sender,
573 0 : cancellation.clone(),
574 0 : connect_timeout,
575 0 : ctx,
576 0 : node_id,
577 0 : ingest_batch_size,
578 0 : validate_wal_contiguity,
579 0 : )
580 0 : .await;
581 :
582 0 : match res {
583 0 : Ok(()) => Ok(()),
584 0 : Err(e) => {
585 0 : match e {
586 0 : WalReceiverError::SuccessfulCompletion(msg) => {
587 0 : info!("walreceiver connection handling ended with success: {msg}");
588 0 : Ok(())
589 : }
590 0 : WalReceiverError::ExpectedSafekeeperError(e) => {
591 0 : info!("walreceiver connection handling ended: {e}");
592 0 : Ok(())
593 : }
594 : WalReceiverError::ClosedGate => {
595 0 : info!(
596 0 : "walreceiver connection handling ended because of closed gate"
597 : );
598 0 : Ok(())
599 : }
600 0 : WalReceiverError::Cancelled => Ok(()),
601 0 : WalReceiverError::Other(e) => {
602 : // give out an error to have task_mgr give it a really verbose logging
603 0 : if cancellation.is_cancelled() {
604 : // Ideally we would learn about this via some path other than Other, but
605 : // that requires refactoring all the intermediate layers of ingest code
606 : // that only emit anyhow::Error
607 0 : Ok(())
608 : } else {
609 0 : Err(e).context("walreceiver connection handling failure")
610 : }
611 : }
612 : }
613 : }
614 : }
615 0 : }
616 0 : .instrument(span)
617 0 : });
618 :
619 0 : let now = Utc::now().naive_utc();
620 0 : self.wal_connection = Some(WalConnection {
621 0 : started_at: now,
622 0 : sk_id: new_sk.safekeeper_id,
623 0 : availability_zone: new_sk.availability_zone,
624 0 : status: WalConnectionStatus {
625 0 : is_connected: false,
626 0 : has_processed_wal: false,
627 0 : latest_connection_update: now,
628 0 : latest_wal_update: now,
629 0 : streaming_lsn: None,
630 0 : commit_lsn: None,
631 0 : node: node_id,
632 0 : },
633 0 : connection_task: connection_handle,
634 0 : discovered_new_wal: None,
635 0 : });
636 0 : }
637 :
638 : /// Drops the current connection (if any) and updates retry timeout for the next
639 : /// connection attempt to the same safekeeper.
640 : ///
641 : /// # Cancel-Safety
642 : ///
643 : /// Not cancellation-safe.
644 0 : async fn drop_old_connection(&mut self, needs_shutdown: bool) {
645 0 : let wal_connection = match self.wal_connection.take() {
646 0 : Some(wal_connection) => wal_connection,
647 0 : None => return,
648 : };
649 :
650 0 : if needs_shutdown {
651 0 : wal_connection
652 0 : .connection_task
653 0 : .shutdown()
654 0 : // This here is why this function isn't cancellation-safe.
655 0 : // If we got cancelled here, then self.wal_connection is already None and we lose track of the task.
656 0 : // Even if our caller diligently calls Self::shutdown(), it will find a self.wal_connection=None
657 0 : // and thus be ineffective.
658 0 : .await;
659 0 : }
660 :
661 0 : let retry = self
662 0 : .wal_connection_retries
663 0 : .entry(wal_connection.sk_id)
664 0 : .or_insert(RetryInfo {
665 0 : next_retry_at: None,
666 0 : retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
667 0 : });
668 :
669 0 : let now = Utc::now().naive_utc();
670 :
671 : // Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
672 : // and we add backoff to the time when we started the connection attempt. If the connection
673 : // was active for a long time, then next_retry_at will be in the past.
674 0 : retry.next_retry_at =
675 0 : wal_connection
676 0 : .started_at
677 0 : .checked_add_signed(chrono::Duration::milliseconds(
678 0 : (retry.retry_duration_seconds * 1000.0) as i64,
679 0 : ));
680 :
681 0 : if let Some(next) = &retry.next_retry_at {
682 0 : if next > &now {
683 0 : info!(
684 0 : "Next connection retry to {:?} is at {}",
685 : wal_connection.sk_id, next
686 : );
687 0 : }
688 0 : }
689 :
690 0 : let next_retry_duration =
691 0 : retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
692 : // Clamp the next retry duration to the maximum allowed.
693 0 : let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
694 : // Clamp the next retry duration to the minimum allowed.
695 0 : let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
696 :
697 0 : retry.retry_duration_seconds = next_retry_duration;
698 0 : }
699 :
700 : /// Returns time needed to wait to have a new candidate for WAL streaming.
701 0 : fn time_until_next_retry(&self) -> Option<Duration> {
702 0 : let now = Utc::now().naive_utc();
703 :
704 0 : let next_retry_at = self
705 0 : .wal_connection_retries
706 0 : .values()
707 0 : .filter_map(|retry| retry.next_retry_at)
708 0 : .filter(|next_retry_at| next_retry_at > &now)
709 0 : .min()?;
710 :
711 0 : (next_retry_at - now).to_std().ok()
712 0 : }
713 :
714 : /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
715 0 : fn register_timeline_update(&mut self, typed_msg: TypedMessage) {
716 0 : let mut is_discovery = false;
717 0 : let timeline_update = match typed_msg.r#type() {
718 : MessageType::SafekeeperTimelineInfo => {
719 0 : let info = match typed_msg.safekeeper_timeline_info {
720 0 : Some(info) => info,
721 : None => {
722 0 : warn!("bad proto message from broker: no safekeeper_timeline_info");
723 0 : return;
724 : }
725 : };
726 0 : SafekeeperDiscoveryResponse {
727 0 : safekeeper_id: info.safekeeper_id,
728 0 : tenant_timeline_id: info.tenant_timeline_id,
729 0 : commit_lsn: info.commit_lsn,
730 0 : safekeeper_connstr: info.safekeeper_connstr,
731 0 : availability_zone: info.availability_zone,
732 0 : standby_horizon: info.standby_horizon,
733 0 : }
734 : }
735 : MessageType::SafekeeperDiscoveryResponse => {
736 0 : is_discovery = true;
737 0 : match typed_msg.safekeeper_discovery_response {
738 0 : Some(response) => response,
739 : None => {
740 0 : warn!("bad proto message from broker: no safekeeper_discovery_response");
741 0 : return;
742 : }
743 : }
744 : }
745 : _ => {
746 : // unexpected message
747 0 : return;
748 : }
749 : };
750 :
751 0 : WALRECEIVER_BROKER_UPDATES.inc();
752 :
753 0 : trace!(
754 0 : "safekeeper info update: standby_horizon(cutoff)={}",
755 : timeline_update.standby_horizon
756 : );
757 0 : if timeline_update.standby_horizon != 0 {
758 0 : // ignore reports from safekeepers not connected to replicas
759 0 : self.timeline
760 0 : .standby_horizon
761 0 : .store(Lsn(timeline_update.standby_horizon));
762 0 : self.timeline
763 0 : .metrics
764 0 : .standby_horizon_gauge
765 0 : .set(timeline_update.standby_horizon as i64);
766 0 : }
767 :
768 0 : let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
769 0 : let old_entry = self.wal_stream_candidates.insert(
770 0 : new_safekeeper_id,
771 0 : BrokerSkTimeline {
772 0 : timeline: timeline_update,
773 0 : latest_update: Utc::now().naive_utc(),
774 0 : },
775 : );
776 :
777 0 : if old_entry.is_none() {
778 0 : info!(
779 : ?is_discovery,
780 : %new_safekeeper_id,
781 0 : "New SK node was added",
782 : );
783 0 : WALRECEIVER_CANDIDATES_ADDED.inc();
784 0 : }
785 0 : }
786 :
787 : /// Cleans up stale broker records and checks the rest for the new connection candidate.
788 : /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
789 : /// The current rules for approving new candidates:
790 : /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
791 : /// * if there's no such entry, no new candidate found, abort
792 : /// * otherwise check if the candidate is much better than the current one
793 : ///
794 : /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
795 : /// General rules are following:
796 : /// * if connected safekeeper is not present, pick the candidate
797 : /// * if we haven't received any updates for some time, pick the candidate
798 : /// * if the candidate commit_lsn is much higher than the current one, pick the candidate
799 : /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
800 : /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
801 : ///
802 : /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
803 : /// Both thresholds are configured per tenant.
804 9 : fn next_connection_candidate(&mut self) -> Option<NewWalConnectionCandidate> {
805 9 : self.cleanup_old_candidates();
806 :
807 9 : match &self.wal_connection {
808 5 : Some(existing_wal_connection) => {
809 5 : let connected_sk_node = existing_wal_connection.sk_id;
810 :
811 5 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
812 5 : self.select_connection_candidate(Some(connected_sk_node))?;
813 5 : let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
814 :
815 5 : let now = Utc::now().naive_utc();
816 5 : if let Ok(latest_interaciton) =
817 5 : (now - existing_wal_connection.status.latest_connection_update).to_std()
818 : {
819 : // Drop connection if we haven't received keepalive message for a while.
820 5 : if latest_interaciton > self.conf.wal_connect_timeout {
821 1 : return Some(NewWalConnectionCandidate {
822 1 : safekeeper_id: new_sk_id,
823 1 : wal_source_connconf: new_wal_source_connconf,
824 1 : availability_zone: new_availability_zone,
825 1 : reason: ReconnectReason::NoKeepAlives {
826 1 : last_keep_alive: Some(
827 1 : existing_wal_connection.status.latest_connection_update,
828 1 : ),
829 1 : check_time: now,
830 1 : threshold: self.conf.wal_connect_timeout,
831 1 : },
832 1 : });
833 4 : }
834 0 : }
835 :
836 4 : if !existing_wal_connection.status.is_connected {
837 : // We haven't connected yet and we shouldn't switch until connection timeout (condition above).
838 0 : return None;
839 4 : }
840 :
841 4 : if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
842 4 : let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
843 : // Check if the new candidate has much more WAL than the current one.
844 4 : match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
845 4 : Some(new_sk_lsn_advantage) => {
846 4 : if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
847 1 : return Some(NewWalConnectionCandidate {
848 1 : safekeeper_id: new_sk_id,
849 1 : wal_source_connconf: new_wal_source_connconf,
850 1 : availability_zone: new_availability_zone,
851 1 : reason: ReconnectReason::LaggingWal {
852 1 : current_commit_lsn,
853 1 : new_commit_lsn,
854 1 : threshold: self.conf.max_lsn_wal_lag,
855 1 : },
856 1 : });
857 3 : }
858 : // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
859 : // and the current one is not, switch to the new one.
860 3 : if self.conf.availability_zone.is_some()
861 1 : && existing_wal_connection.availability_zone
862 1 : != self.conf.availability_zone
863 1 : && self.conf.availability_zone == new_availability_zone
864 : {
865 1 : return Some(NewWalConnectionCandidate {
866 1 : safekeeper_id: new_sk_id,
867 1 : availability_zone: new_availability_zone,
868 1 : wal_source_connconf: new_wal_source_connconf,
869 1 : reason: ReconnectReason::SwitchAvailabilityZone,
870 1 : });
871 2 : }
872 : }
873 0 : None => debug!(
874 0 : "Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
875 : ),
876 : }
877 0 : }
878 :
879 2 : let current_lsn = match existing_wal_connection.status.streaming_lsn {
880 2 : Some(lsn) => lsn,
881 0 : None => self.timeline.get_last_record_lsn(),
882 : };
883 2 : let current_commit_lsn = existing_wal_connection
884 2 : .status
885 2 : .commit_lsn
886 2 : .unwrap_or(current_lsn);
887 2 : let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
888 :
889 : // Keep discovered_new_wal only if connected safekeeper has not caught up yet.
890 2 : let mut discovered_new_wal = existing_wal_connection
891 2 : .discovered_new_wal
892 2 : .filter(|new_wal| new_wal.lsn > current_commit_lsn);
893 :
894 2 : if discovered_new_wal.is_none() {
895 : // Check if the new candidate has more WAL than the current one.
896 : // If the new candidate has more WAL than the current one, we consider switching to the new candidate.
897 1 : discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
898 1 : trace!(
899 0 : "New candidate has commit_lsn {}, higher than current_commit_lsn {}",
900 : candidate_commit_lsn, current_commit_lsn
901 : );
902 1 : Some(NewCommittedWAL {
903 1 : lsn: candidate_commit_lsn,
904 1 : discovered_at: Utc::now().naive_utc(),
905 1 : })
906 : } else {
907 0 : None
908 : };
909 1 : }
910 :
911 2 : let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
912 : // Connected safekeeper has more WAL, but we haven't received updates for some time.
913 0 : trace!(
914 0 : "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
915 0 : (now - existing_wal_connection.status.latest_wal_update).to_std(),
916 : current_lsn,
917 : current_commit_lsn
918 : );
919 0 : Some(existing_wal_connection.status.latest_wal_update)
920 : } else {
921 2 : discovered_new_wal.as_ref().map(|new_wal| {
922 : // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
923 2 : new_wal
924 2 : .discovered_at
925 2 : .max(existing_wal_connection.status.latest_wal_update)
926 2 : })
927 : };
928 :
929 : // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
930 2 : if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
931 2 : if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
932 1 : if candidate_commit_lsn > current_commit_lsn
933 1 : && waiting_for_new_wal > self.conf.lagging_wal_timeout
934 : {
935 1 : return Some(NewWalConnectionCandidate {
936 1 : safekeeper_id: new_sk_id,
937 1 : wal_source_connconf: new_wal_source_connconf,
938 1 : availability_zone: new_availability_zone,
939 1 : reason: ReconnectReason::NoWalTimeout {
940 1 : current_lsn,
941 1 : current_commit_lsn,
942 1 : candidate_commit_lsn,
943 1 : last_wal_interaction: Some(
944 1 : existing_wal_connection.status.latest_wal_update,
945 1 : ),
946 1 : check_time: now,
947 1 : threshold: self.conf.lagging_wal_timeout,
948 1 : },
949 1 : });
950 0 : }
951 1 : }
952 0 : }
953 :
954 1 : self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
955 : }
956 : None => {
957 3 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
958 4 : self.select_connection_candidate(None)?;
959 3 : return Some(NewWalConnectionCandidate {
960 3 : safekeeper_id: new_sk_id,
961 3 : availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
962 3 : wal_source_connconf: new_wal_source_connconf,
963 3 : reason: ReconnectReason::NoExistingConnection,
964 3 : });
965 : }
966 : }
967 :
968 1 : None
969 9 : }
970 :
971 : /// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
972 : /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
973 : ///
974 : /// The candidate that is chosen:
975 : /// * has no pending retry cooldown
976 : /// * has greatest commit_lsn among the ones that are left
977 9 : fn select_connection_candidate(
978 9 : &self,
979 9 : node_to_omit: Option<NodeId>,
980 9 : ) -> Option<(NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
981 9 : self.applicable_connection_candidates()
982 13 : .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
983 9 : .max_by_key(|(_, info, _)| info.commit_lsn)
984 9 : }
985 :
986 : /// Returns a list of safekeepers that have valid info and ready for connection.
987 : /// Some safekeepers are filtered by the retry cooldown.
988 9 : fn applicable_connection_candidates(
989 9 : &self,
990 9 : ) -> impl Iterator<Item = (NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
991 9 : let now = Utc::now().naive_utc();
992 :
993 9 : self.wal_stream_candidates
994 9 : .iter()
995 18 : .filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
996 16 : .filter(move |(sk_id, _)| {
997 16 : let next_retry_at = self
998 16 : .wal_connection_retries
999 16 : .get(sk_id)
1000 16 : .and_then(|retry_info| {
1001 1 : retry_info.next_retry_at
1002 1 : });
1003 :
1004 16 : next_retry_at.is_none() || next_retry_at.unwrap() <= now
1005 16 : }).filter_map(|(sk_id, broker_info)| {
1006 15 : let info = &broker_info.timeline;
1007 15 : if info.safekeeper_connstr.is_empty() {
1008 2 : return None; // no connection string, ignore sk
1009 13 : }
1010 :
1011 13 : let shard_identity = self.timeline.get_shard_identity();
1012 13 : let (shard_number, shard_count, shard_stripe_size) = (
1013 13 : Some(shard_identity.number.0),
1014 13 : Some(shard_identity.count.0),
1015 13 : Some(shard_identity.stripe_size.0),
1016 13 : );
1017 :
1018 13 : let connection_conf_args = ConnectionConfigArgs {
1019 13 : protocol: self.conf.protocol,
1020 13 : ttid: self.id,
1021 13 : shard_number,
1022 13 : shard_count,
1023 13 : shard_stripe_size,
1024 13 : listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
1025 13 : auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
1026 13 : availability_zone: self.conf.availability_zone.as_deref()
1027 : };
1028 :
1029 13 : match wal_stream_connection_config(connection_conf_args) {
1030 13 : Ok(connstr) => Some((*sk_id, info, connstr)),
1031 0 : Err(e) => {
1032 0 : error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
1033 0 : None
1034 : }
1035 : }
1036 15 : })
1037 9 : }
1038 :
1039 : /// Remove candidates which haven't sent broker updates for a while.
1040 9 : fn cleanup_old_candidates(&mut self) {
1041 9 : let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
1042 9 : let lagging_wal_timeout = self.conf.lagging_wal_timeout;
1043 :
1044 19 : self.wal_stream_candidates.retain(|node_id, broker_info| {
1045 19 : if let Ok(time_since_latest_broker_update) =
1046 19 : (Utc::now().naive_utc() - broker_info.latest_update).to_std()
1047 : {
1048 19 : let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
1049 19 : if !should_retain {
1050 1 : node_ids_to_remove.push(*node_id);
1051 18 : }
1052 19 : should_retain
1053 : } else {
1054 0 : true
1055 : }
1056 19 : });
1057 :
1058 9 : if !node_ids_to_remove.is_empty() {
1059 2 : for node_id in node_ids_to_remove {
1060 1 : info!(
1061 0 : "Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections"
1062 : );
1063 1 : self.wal_connection_retries.remove(&node_id);
1064 1 : WALRECEIVER_CANDIDATES_REMOVED.inc();
1065 : }
1066 8 : }
1067 9 : }
1068 :
1069 : /// # Cancel-Safety
1070 : ///
1071 : /// Not cancellation-safe.
1072 0 : pub(super) async fn shutdown(mut self) {
1073 0 : if let Some(wal_connection) = self.wal_connection.take() {
1074 0 : wal_connection.connection_task.shutdown().await;
1075 0 : }
1076 0 : }
1077 :
1078 0 : fn manager_status(&self) -> ConnectionManagerStatus {
1079 : ConnectionManagerStatus {
1080 0 : existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
1081 0 : wal_stream_candidates: self.wal_stream_candidates.clone(),
1082 : }
1083 0 : }
1084 : }
1085 :
1086 : #[derive(Debug)]
1087 : struct NewWalConnectionCandidate {
1088 : safekeeper_id: NodeId,
1089 : wal_source_connconf: PgConnectionConfig,
1090 : availability_zone: Option<String>,
1091 : reason: ReconnectReason,
1092 : }
1093 :
1094 : /// Stores the reason why WAL connection was switched, for furter debugging purposes.
1095 : #[derive(Debug, PartialEq, Eq)]
1096 : enum ReconnectReason {
1097 : NoExistingConnection,
1098 : LaggingWal {
1099 : current_commit_lsn: Lsn,
1100 : new_commit_lsn: Lsn,
1101 : threshold: NonZeroU64,
1102 : },
1103 : SwitchAvailabilityZone,
1104 : NoWalTimeout {
1105 : current_lsn: Lsn,
1106 : current_commit_lsn: Lsn,
1107 : candidate_commit_lsn: Lsn,
1108 : last_wal_interaction: Option<NaiveDateTime>,
1109 : check_time: NaiveDateTime,
1110 : threshold: Duration,
1111 : },
1112 : NoKeepAlives {
1113 : last_keep_alive: Option<NaiveDateTime>,
1114 : check_time: NaiveDateTime,
1115 : threshold: Duration,
1116 : },
1117 : }
1118 :
1119 : impl ReconnectReason {
1120 0 : fn name(&self) -> &str {
1121 0 : match self {
1122 0 : ReconnectReason::NoExistingConnection => "NoExistingConnection",
1123 0 : ReconnectReason::LaggingWal { .. } => "LaggingWal",
1124 0 : ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
1125 0 : ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
1126 0 : ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
1127 : }
1128 0 : }
1129 : }
1130 :
1131 : #[cfg(test)]
1132 : mod tests {
1133 : use url::Host;
1134 : use utils::postgres_client::PostgresClientProtocol;
1135 :
1136 : use super::*;
1137 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
1138 :
1139 19 : fn dummy_broker_sk_timeline(
1140 19 : commit_lsn: u64,
1141 19 : safekeeper_connstr: &str,
1142 19 : latest_update: NaiveDateTime,
1143 19 : ) -> BrokerSkTimeline {
1144 19 : BrokerSkTimeline {
1145 19 : timeline: SafekeeperDiscoveryResponse {
1146 19 : safekeeper_id: 0,
1147 19 : tenant_timeline_id: None,
1148 19 : commit_lsn,
1149 19 : safekeeper_connstr: safekeeper_connstr.to_owned(),
1150 19 : availability_zone: None,
1151 19 : standby_horizon: 0,
1152 19 : },
1153 19 : latest_update,
1154 19 : }
1155 19 : }
1156 :
1157 : #[tokio::test]
1158 1 : async fn no_connection_no_candidate() -> anyhow::Result<()> {
1159 1 : let harness = TenantHarness::create("no_connection_no_candidate").await?;
1160 1 : let mut state = dummy_state(&harness).await;
1161 1 : let now = Utc::now().naive_utc();
1162 :
1163 1 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
1164 1 : let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
1165 :
1166 1 : state.wal_connection = None;
1167 1 : state.wal_stream_candidates = HashMap::from([
1168 1 : (NodeId(0), dummy_broker_sk_timeline(1, "", now)),
1169 1 : (NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
1170 1 : (NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
1171 1 : (
1172 1 : NodeId(3),
1173 1 : dummy_broker_sk_timeline(
1174 1 : 1 + state.conf.max_lsn_wal_lag.get(),
1175 1 : "delay_over_threshold",
1176 1 : delay_over_threshold,
1177 1 : ),
1178 1 : ),
1179 1 : ]);
1180 :
1181 1 : let no_candidate = state.next_connection_candidate();
1182 1 : assert!(
1183 1 : no_candidate.is_none(),
1184 0 : "Expected no candidate selected out of non full data options, but got {no_candidate:?}"
1185 : );
1186 :
1187 2 : Ok(())
1188 1 : }
1189 :
1190 : #[tokio::test]
1191 1 : async fn connection_no_candidate() -> anyhow::Result<()> {
1192 1 : let harness = TenantHarness::create("connection_no_candidate").await?;
1193 1 : let mut state = dummy_state(&harness).await;
1194 1 : let now = Utc::now().naive_utc();
1195 :
1196 1 : let connected_sk_id = NodeId(0);
1197 1 : let current_lsn = 100_000;
1198 :
1199 1 : let connection_status = WalConnectionStatus {
1200 1 : is_connected: true,
1201 1 : has_processed_wal: true,
1202 1 : latest_connection_update: now,
1203 1 : latest_wal_update: now,
1204 1 : commit_lsn: Some(Lsn(current_lsn)),
1205 1 : streaming_lsn: Some(Lsn(current_lsn)),
1206 1 : node: NodeId(1),
1207 1 : };
1208 :
1209 1 : state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
1210 1 : state.wal_connection = Some(WalConnection {
1211 1 : started_at: now,
1212 1 : sk_id: connected_sk_id,
1213 1 : availability_zone: None,
1214 1 : status: connection_status,
1215 1 : connection_task: state.spawn(move |sender, _| async move {
1216 1 : sender
1217 1 : .send(TaskStateUpdate::Progress(connection_status))
1218 1 : .ok();
1219 1 : Ok(())
1220 2 : }),
1221 1 : discovered_new_wal: None,
1222 : });
1223 1 : state.wal_stream_candidates = HashMap::from([
1224 1 : (
1225 1 : connected_sk_id,
1226 1 : dummy_broker_sk_timeline(
1227 1 : current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
1228 1 : DUMMY_SAFEKEEPER_HOST,
1229 1 : now,
1230 1 : ),
1231 1 : ),
1232 1 : (
1233 1 : NodeId(1),
1234 1 : dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
1235 1 : ),
1236 1 : (
1237 1 : NodeId(2),
1238 1 : dummy_broker_sk_timeline(
1239 1 : current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
1240 1 : "not_enough_advanced_lsn",
1241 1 : now,
1242 1 : ),
1243 1 : ),
1244 1 : ]);
1245 :
1246 1 : let no_candidate = state.next_connection_candidate();
1247 1 : assert!(
1248 1 : no_candidate.is_none(),
1249 0 : "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}"
1250 : );
1251 :
1252 2 : Ok(())
1253 1 : }
1254 :
1255 : #[tokio::test]
1256 1 : async fn no_connection_candidate() -> anyhow::Result<()> {
1257 1 : let harness = TenantHarness::create("no_connection_candidate").await?;
1258 1 : let mut state = dummy_state(&harness).await;
1259 1 : let now = Utc::now().naive_utc();
1260 :
1261 1 : state.wal_connection = None;
1262 1 : state.wal_stream_candidates = HashMap::from([(
1263 1 : NodeId(0),
1264 1 : dummy_broker_sk_timeline(
1265 1 : 1 + state.conf.max_lsn_wal_lag.get(),
1266 1 : DUMMY_SAFEKEEPER_HOST,
1267 1 : now,
1268 1 : ),
1269 1 : )]);
1270 :
1271 1 : let only_candidate = state
1272 1 : .next_connection_candidate()
1273 1 : .expect("Expected one candidate selected out of the only data option, but got none");
1274 1 : assert_eq!(only_candidate.safekeeper_id, NodeId(0));
1275 1 : assert_eq!(
1276 : only_candidate.reason,
1277 : ReconnectReason::NoExistingConnection,
1278 0 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1279 : );
1280 1 : assert_eq!(
1281 1 : only_candidate.wal_source_connconf.host(),
1282 1 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1283 : );
1284 :
1285 1 : let selected_lsn = 100_000;
1286 1 : state.wal_stream_candidates = HashMap::from([
1287 1 : (
1288 1 : NodeId(0),
1289 1 : dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
1290 1 : ),
1291 1 : (
1292 1 : NodeId(1),
1293 1 : dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
1294 1 : ),
1295 1 : (
1296 1 : NodeId(2),
1297 1 : dummy_broker_sk_timeline(selected_lsn + 100, "", now),
1298 1 : ),
1299 1 : ]);
1300 1 : let biggest_wal_candidate = state.next_connection_candidate().expect(
1301 1 : "Expected one candidate selected out of multiple valid data options, but got none",
1302 : );
1303 :
1304 1 : assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1));
1305 1 : assert_eq!(
1306 : biggest_wal_candidate.reason,
1307 : ReconnectReason::NoExistingConnection,
1308 0 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1309 : );
1310 1 : assert_eq!(
1311 1 : biggest_wal_candidate.wal_source_connconf.host(),
1312 1 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1313 : );
1314 :
1315 2 : Ok(())
1316 1 : }
1317 :
1318 : #[tokio::test]
1319 1 : async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
1320 1 : let harness = TenantHarness::create("candidate_with_many_connection_failures").await?;
1321 1 : let mut state = dummy_state(&harness).await;
1322 1 : let now = Utc::now().naive_utc();
1323 :
1324 1 : let current_lsn = Lsn(100_000).align();
1325 1 : let bigger_lsn = Lsn(current_lsn.0 + 100).align();
1326 :
1327 1 : state.wal_connection = None;
1328 1 : state.wal_stream_candidates = HashMap::from([
1329 1 : (
1330 1 : NodeId(0),
1331 1 : dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1332 1 : ),
1333 1 : (
1334 1 : NodeId(1),
1335 1 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1336 1 : ),
1337 1 : ]);
1338 1 : state.wal_connection_retries = HashMap::from([(
1339 1 : NodeId(0),
1340 1 : RetryInfo {
1341 1 : next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
1342 1 : retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
1343 1 : },
1344 1 : )]);
1345 :
1346 1 : let candidate_with_less_errors = state
1347 1 : .next_connection_candidate()
1348 1 : .expect("Expected one candidate selected, but got none");
1349 1 : assert_eq!(
1350 : candidate_with_less_errors.safekeeper_id,
1351 : NodeId(1),
1352 0 : "Should select the node with no pending retry cooldown"
1353 : );
1354 :
1355 2 : Ok(())
1356 1 : }
1357 :
1358 : #[tokio::test]
1359 1 : async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1360 1 : let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate").await?;
1361 1 : let mut state = dummy_state(&harness).await;
1362 1 : let current_lsn = Lsn(100_000).align();
1363 1 : let now = Utc::now().naive_utc();
1364 :
1365 1 : let connected_sk_id = NodeId(0);
1366 1 : let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
1367 :
1368 1 : let connection_status = WalConnectionStatus {
1369 1 : is_connected: true,
1370 1 : has_processed_wal: true,
1371 1 : latest_connection_update: now,
1372 1 : latest_wal_update: now,
1373 1 : commit_lsn: Some(current_lsn),
1374 1 : streaming_lsn: Some(current_lsn),
1375 1 : node: connected_sk_id,
1376 1 : };
1377 :
1378 1 : state.wal_connection = Some(WalConnection {
1379 1 : started_at: now,
1380 1 : sk_id: connected_sk_id,
1381 1 : availability_zone: None,
1382 1 : status: connection_status,
1383 1 : connection_task: state.spawn(move |sender, _| async move {
1384 1 : sender
1385 1 : .send(TaskStateUpdate::Progress(connection_status))
1386 1 : .ok();
1387 1 : Ok(())
1388 2 : }),
1389 1 : discovered_new_wal: None,
1390 : });
1391 1 : state.wal_stream_candidates = HashMap::from([
1392 1 : (
1393 1 : connected_sk_id,
1394 1 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1395 1 : ),
1396 1 : (
1397 1 : NodeId(1),
1398 1 : dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
1399 1 : ),
1400 1 : ]);
1401 :
1402 1 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1403 1 : "Expected one candidate selected out of multiple valid data options, but got none",
1404 : );
1405 :
1406 1 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1));
1407 1 : assert_eq!(
1408 : over_threshcurrent_candidate.reason,
1409 1 : ReconnectReason::LaggingWal {
1410 1 : current_commit_lsn: current_lsn,
1411 1 : new_commit_lsn: new_lsn,
1412 1 : threshold: state.conf.max_lsn_wal_lag
1413 1 : },
1414 0 : "Should select bigger WAL safekeeper if it starts to lag enough"
1415 : );
1416 1 : assert_eq!(
1417 1 : over_threshcurrent_candidate.wal_source_connconf.host(),
1418 1 : &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
1419 : );
1420 :
1421 2 : Ok(())
1422 1 : }
1423 :
1424 : #[tokio::test]
1425 1 : async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
1426 1 : let harness =
1427 1 : TenantHarness::create("timeout_connection_threshold_current_candidate").await?;
1428 1 : let mut state = dummy_state(&harness).await;
1429 1 : let current_lsn = Lsn(100_000).align();
1430 1 : let now = Utc::now().naive_utc();
1431 :
1432 1 : let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
1433 1 : let time_over_threshold =
1434 1 : Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
1435 :
1436 1 : let connection_status = WalConnectionStatus {
1437 1 : is_connected: true,
1438 1 : has_processed_wal: true,
1439 1 : latest_connection_update: time_over_threshold,
1440 1 : latest_wal_update: time_over_threshold,
1441 1 : commit_lsn: Some(current_lsn),
1442 1 : streaming_lsn: Some(current_lsn),
1443 1 : node: NodeId(1),
1444 1 : };
1445 :
1446 1 : state.wal_connection = Some(WalConnection {
1447 1 : started_at: now,
1448 1 : sk_id: NodeId(1),
1449 1 : availability_zone: None,
1450 1 : status: connection_status,
1451 1 : connection_task: state.spawn(move |sender, _| async move {
1452 1 : sender
1453 1 : .send(TaskStateUpdate::Progress(connection_status))
1454 1 : .ok();
1455 1 : Ok(())
1456 2 : }),
1457 1 : discovered_new_wal: None,
1458 : });
1459 1 : state.wal_stream_candidates = HashMap::from([(
1460 1 : NodeId(0),
1461 1 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1462 1 : )]);
1463 :
1464 1 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1465 1 : "Expected one candidate selected out of multiple valid data options, but got none",
1466 : );
1467 :
1468 1 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1469 1 : match over_threshcurrent_candidate.reason {
1470 : ReconnectReason::NoKeepAlives {
1471 1 : last_keep_alive,
1472 1 : threshold,
1473 : ..
1474 : } => {
1475 1 : assert_eq!(last_keep_alive, Some(time_over_threshold));
1476 1 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1477 : }
1478 0 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1479 : }
1480 1 : assert_eq!(
1481 1 : over_threshcurrent_candidate.wal_source_connconf.host(),
1482 1 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1483 : );
1484 :
1485 2 : Ok(())
1486 1 : }
1487 :
1488 : #[tokio::test]
1489 1 : async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1490 1 : let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate").await?;
1491 1 : let mut state = dummy_state(&harness).await;
1492 1 : let current_lsn = Lsn(100_000).align();
1493 1 : let new_lsn = Lsn(100_100).align();
1494 1 : let now = Utc::now().naive_utc();
1495 :
1496 1 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
1497 1 : let time_over_threshold =
1498 1 : Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
1499 :
1500 1 : let connection_status = WalConnectionStatus {
1501 1 : is_connected: true,
1502 1 : has_processed_wal: true,
1503 1 : latest_connection_update: now,
1504 1 : latest_wal_update: time_over_threshold,
1505 1 : commit_lsn: Some(current_lsn),
1506 1 : streaming_lsn: Some(current_lsn),
1507 1 : node: NodeId(1),
1508 1 : };
1509 :
1510 1 : state.wal_connection = Some(WalConnection {
1511 1 : started_at: now,
1512 1 : sk_id: NodeId(1),
1513 1 : availability_zone: None,
1514 1 : status: connection_status,
1515 2 : connection_task: state.spawn(move |_, _| async move { Ok(()) }),
1516 1 : discovered_new_wal: Some(NewCommittedWAL {
1517 1 : discovered_at: time_over_threshold,
1518 1 : lsn: new_lsn,
1519 1 : }),
1520 : });
1521 1 : state.wal_stream_candidates = HashMap::from([(
1522 1 : NodeId(0),
1523 1 : dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1524 1 : )]);
1525 :
1526 1 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1527 1 : "Expected one candidate selected out of multiple valid data options, but got none",
1528 : );
1529 :
1530 1 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1531 1 : match over_threshcurrent_candidate.reason {
1532 : ReconnectReason::NoWalTimeout {
1533 1 : current_lsn,
1534 1 : current_commit_lsn,
1535 1 : candidate_commit_lsn,
1536 1 : last_wal_interaction,
1537 1 : threshold,
1538 : ..
1539 : } => {
1540 1 : assert_eq!(current_lsn, current_lsn);
1541 1 : assert_eq!(current_commit_lsn, current_lsn);
1542 1 : assert_eq!(candidate_commit_lsn, new_lsn);
1543 1 : assert_eq!(last_wal_interaction, Some(time_over_threshold));
1544 1 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1545 : }
1546 0 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1547 : }
1548 1 : assert_eq!(
1549 1 : over_threshcurrent_candidate.wal_source_connconf.host(),
1550 1 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1551 : );
1552 :
1553 2 : Ok(())
1554 1 : }
1555 :
1556 : const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
1557 :
1558 8 : async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
1559 8 : let (tenant, ctx) = harness.load().await;
1560 8 : let timeline = tenant
1561 8 : .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
1562 8 : .await
1563 8 : .expect("Failed to create an empty timeline for dummy wal connection manager");
1564 :
1565 8 : let protocol = PostgresClientProtocol::Interpreted {
1566 8 : format: utils::postgres_client::InterpretedFormat::Protobuf,
1567 8 : compression: Some(utils::postgres_client::Compression::Zstd { level: 1 }),
1568 8 : };
1569 :
1570 8 : ConnectionManagerState {
1571 8 : id: TenantTimelineId {
1572 8 : tenant_id: harness.tenant_shard_id.tenant_id,
1573 8 : timeline_id: TIMELINE_ID,
1574 8 : },
1575 8 : timeline,
1576 8 : cancel: CancellationToken::new(),
1577 8 : conf: WalReceiverConf {
1578 8 : protocol,
1579 8 : wal_connect_timeout: Duration::from_secs(1),
1580 8 : lagging_wal_timeout: Duration::from_secs(1),
1581 8 : max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
1582 8 : auth_token: None,
1583 8 : availability_zone: None,
1584 8 : ingest_batch_size: 1,
1585 8 : validate_wal_contiguity: false,
1586 8 : },
1587 8 : wal_connection: None,
1588 8 : wal_stream_candidates: HashMap::new(),
1589 8 : wal_connection_retries: HashMap::new(),
1590 8 : }
1591 8 : }
1592 :
1593 : #[tokio::test]
1594 1 : async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
1595 : // Pageserver and one of safekeepers will be in the same availability zone
1596 : // and pageserver should prefer to connect to it.
1597 1 : let test_az = Some("test_az".to_owned());
1598 :
1599 1 : let harness = TenantHarness::create("switch_to_same_availability_zone").await?;
1600 1 : let mut state = dummy_state(&harness).await;
1601 1 : state.conf.availability_zone.clone_from(&test_az);
1602 1 : let current_lsn = Lsn(100_000).align();
1603 1 : let now = Utc::now().naive_utc();
1604 :
1605 1 : let connected_sk_id = NodeId(0);
1606 :
1607 1 : let connection_status = WalConnectionStatus {
1608 1 : is_connected: true,
1609 1 : has_processed_wal: true,
1610 1 : latest_connection_update: now,
1611 1 : latest_wal_update: now,
1612 1 : commit_lsn: Some(current_lsn),
1613 1 : streaming_lsn: Some(current_lsn),
1614 1 : node: connected_sk_id,
1615 1 : };
1616 :
1617 1 : state.wal_connection = Some(WalConnection {
1618 1 : started_at: now,
1619 1 : sk_id: connected_sk_id,
1620 1 : availability_zone: None,
1621 1 : status: connection_status,
1622 1 : connection_task: state.spawn(move |sender, _| async move {
1623 1 : sender
1624 1 : .send(TaskStateUpdate::Progress(connection_status))
1625 1 : .ok();
1626 1 : Ok(())
1627 2 : }),
1628 1 : discovered_new_wal: None,
1629 : });
1630 :
1631 : // We have another safekeeper with the same commit_lsn, and it have the same availability zone as
1632 : // the current pageserver.
1633 1 : let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
1634 1 : same_az_sk.timeline.availability_zone.clone_from(&test_az);
1635 :
1636 1 : state.wal_stream_candidates = HashMap::from([
1637 1 : (
1638 1 : connected_sk_id,
1639 1 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1640 1 : ),
1641 1 : (NodeId(1), same_az_sk),
1642 1 : ]);
1643 :
1644 : // We expect that pageserver will switch to the safekeeper in the same availability zone,
1645 : // even if it has the same commit_lsn.
1646 1 : let next_candidate = state.next_connection_candidate().expect(
1647 1 : "Expected one candidate selected out of multiple valid data options, but got none",
1648 : );
1649 :
1650 1 : assert_eq!(next_candidate.safekeeper_id, NodeId(1));
1651 1 : assert_eq!(
1652 : next_candidate.reason,
1653 : ReconnectReason::SwitchAvailabilityZone,
1654 0 : "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
1655 : );
1656 1 : assert_eq!(
1657 1 : next_candidate.wal_source_connconf.host(),
1658 1 : &Host::Domain("same_az".to_owned())
1659 : );
1660 :
1661 2 : Ok(())
1662 1 : }
1663 : }
|