LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline/walreceiver - connection_manager.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 92.5 % 1004 929
Test Date: 2023-09-06 10:18:01 Functions: 76.5 % 98 75

            Line data    Source code
       1              : //! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
       2              : //! that contains the latest WAL to stream and this connection does not go stale.
       3              : //!
       4              : //! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
       5              : //! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
       6              : //! Current connection state is tracked too, to ensure it's not getting stale.
       7              : //!
       8              : //! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
       9              : //! then a (re)connection happens, if necessary.
      10              : //! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
      11              : 
      12              : use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
      13              : 
      14              : use super::{TaskStateUpdate, WalReceiverConf};
      15              : use crate::context::{DownloadBehavior, RequestContext};
      16              : use crate::metrics::{
      17              :     WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
      18              :     WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
      19              : };
      20              : use crate::task_mgr::{shutdown_token, TaskKind};
      21              : use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
      22              : use anyhow::Context;
      23              : use chrono::{NaiveDateTime, Utc};
      24              : use pageserver_api::models::TimelineState;
      25              : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
      26              : use storage_broker::proto::SafekeeperTimelineInfo;
      27              : use storage_broker::proto::SubscribeSafekeeperInfoRequest;
      28              : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      29              : use storage_broker::BrokerClientChannel;
      30              : use storage_broker::Streaming;
      31              : use tokio::select;
      32              : use tracing::*;
      33              : 
      34              : use postgres_connection::PgConnectionConfig;
      35              : use utils::backoff::{
      36              :     exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
      37              : };
      38              : use utils::postgres_client::wal_stream_connection_config;
      39              : use utils::{
      40              :     id::{NodeId, TenantTimelineId},
      41              :     lsn::Lsn,
      42              : };
      43              : 
      44              : use super::{
      45              :     walreceiver_connection::WalConnectionStatus, walreceiver_connection::WalReceiverError,
      46              :     TaskEvent, TaskHandle,
      47              : };
      48              : 
      49              : /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
      50              : /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
      51              : /// If storage broker subscription is cancelled, exits.
      52         1190 : pub(super) async fn connection_manager_loop_step(
      53         1190 :     broker_client: &mut BrokerClientChannel,
      54         1190 :     connection_manager_state: &mut ConnectionManagerState,
      55         1190 :     ctx: &RequestContext,
      56         1190 :     manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
      57         1190 : ) -> ControlFlow<(), ()> {
      58         1190 :     match connection_manager_state
      59         1190 :         .timeline
      60         1190 :         .wait_to_become_active(ctx)
      61           16 :         .await
      62              :     {
      63         1190 :         Ok(()) => {}
      64            0 :         Err(new_state) => {
      65            0 :             debug!(
      66            0 :                 ?new_state,
      67            0 :                 "state changed, stopping wal connection manager loop"
      68            0 :             );
      69            0 :             return ControlFlow::Break(());
      70              :         }
      71              :     }
      72              : 
      73         1190 :     WALRECEIVER_ACTIVE_MANAGERS.inc();
      74         1190 :     scopeguard::defer! {
      75          493 :         WALRECEIVER_ACTIVE_MANAGERS.dec();
      76          493 :     }
      77              : 
      78         1190 :     let id = TenantTimelineId {
      79         1190 :         tenant_id: connection_manager_state.timeline.tenant_id,
      80         1190 :         timeline_id: connection_manager_state.timeline.timeline_id,
      81         1190 :     };
      82         1190 : 
      83         1190 :     let mut timeline_state_updates = connection_manager_state
      84         1190 :         .timeline
      85         1190 :         .subscribe_for_state_updates();
      86              : 
      87              :     // Subscribe to the broker updates. Stream shares underlying TCP connection
      88              :     // with other streams on this client (other connection managers). When
      89              :     // object goes out of scope, stream finishes in drop() automatically.
      90         2373 :     let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
      91            0 :     debug!("Subscribed for broker timeline updates");
      92              : 
      93              :     loop {
      94       716369 :         let time_until_next_retry = connection_manager_state.time_until_next_retry();
      95              : 
      96              :         // These things are happening concurrently:
      97              :         //
      98              :         //  - keep receiving WAL on the current connection
      99              :         //      - if the shared state says we need to change connection, disconnect and return
     100              :         //      - this runs in a separate task and we receive updates via a watch channel
     101              :         //  - change connection if the rules decide so, or if the current connection dies
     102              :         //  - receive updates from broker
     103              :         //      - this might change the current desired connection
     104              :         //  - timeline state changes to something that does not allow walreceiver to run concurrently
     105       716369 :         select! {
     106       716320 :             Some(wal_connection_update) = async {
     107       716320 :                 match connection_manager_state.wal_connection.as_mut() {
     108       714720 :                     Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
     109         1600 :                     None => None,
     110              :                 }
     111       710867 :             } => {
     112              :                 let wal_connection = connection_manager_state.wal_connection.as_mut()
     113              :                     .expect("Should have a connection, as checked by the corresponding select! guard");
     114              :                 match wal_connection_update {
     115              :                     TaskEvent::Update(TaskStateUpdate::Started) => {},
     116              :                     TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
     117              :                         if new_status.has_processed_wal {
     118              :                             // We have advanced last_record_lsn by processing the WAL received
     119              :                             // from this safekeeper. This is good enough to clean unsuccessful
     120              :                             // retries history and allow reconnecting to this safekeeper without
     121              :                             // sleeping for a long time.
     122              :                             connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
     123              :                         }
     124              :                         wal_connection.status = new_status;
     125              :                     }
     126              :                     TaskEvent::End(walreceiver_task_result) => {
     127              :                         match walreceiver_task_result {
     128            0 :                             Ok(()) => debug!("WAL receiving task finished"),
     129           10 :                             Err(e) => error!("wal receiver task finished with an error: {e:?}"),
     130              :                         }
     131              :                         connection_manager_state.drop_old_connection(false).await;
     132              :                     },
     133              :                 }
     134              :             },
     135              : 
     136              :             // Got a new update from the broker
     137         5714 :             broker_update = broker_subscription.message() => {
     138              :                 match broker_update {
     139              :                     Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
     140              :                     Err(e) => {
     141            0 :                         error!("broker subscription failed: {e}");
     142              :                         return ControlFlow::Continue(());
     143              :                     }
     144              :                     Ok(None) => {
     145            0 :                         error!("broker subscription stream ended"); // can't happen
     146              :                         return ControlFlow::Continue(());
     147              :                     }
     148              :                 }
     149              :             },
     150              : 
     151       705461 :             new_event = async {
     152       705461 :                 loop {
     153       705461 :                     if connection_manager_state.timeline.current_state() == TimelineState::Loading {
     154            0 :                         warn!("wal connection manager should only be launched after timeline has become active");
     155       705461 :                     }
     156       705461 :                     match timeline_state_updates.changed().await {
     157              :                         Ok(()) => {
     158          396 :                             let new_state = connection_manager_state.timeline.current_state();
     159          396 :                             match new_state {
     160              :                                 // we're already active as walreceiver, no need to reactivate
     161            0 :                                 TimelineState::Active => continue,
     162              :                                 TimelineState::Broken { .. } | TimelineState::Stopping => {
     163          396 :                                     debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
     164          396 :                                     return ControlFlow::Break(());
     165              :                                 }
     166              :                                 TimelineState::Loading => {
     167            0 :                                     warn!("timeline transitioned back to Loading state, that should not happen");
     168            0 :                                     return ControlFlow::Continue(());
     169              :                                 }
     170              :                             }
     171              :                         }
     172            0 :                         Err(_sender_dropped_error) => return ControlFlow::Break(()),
     173              :                     }
     174              :                 }
     175          396 :             } => match new_event {
     176              :                 ControlFlow::Continue(()) => {
     177              :                     return ControlFlow::Continue(());
     178              :                 }
     179              :                 ControlFlow::Break(()) => {
     180            0 :                     debug!("Timeline is no longer active, stopping wal connection manager loop");
     181              :                     return ControlFlow::Break(());
     182              :                 }
     183              :             },
     184              : 
     185       710876 :             Some(()) = async {
     186       710876 :                 match time_until_next_retry {
     187          879 :                     Some(sleep_time) => {
     188          879 :                         tokio::time::sleep(sleep_time).await;
     189          206 :                         Some(())
     190              :                     },
     191              :                     None => {
     192       709997 :                         debug!("No candidates to retry, waiting indefinitely for the broker events");
     193       709997 :                         None
     194              :                     }
     195              :                 }
     196       710203 :             } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
     197              :         }
     198              : 
     199       715187 :         if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
     200         1222 :             info!("Switching to new connection candidate: {new_candidate:?}");
     201         1222 :             connection_manager_state
     202         1222 :                 .change_connection(new_candidate, ctx)
     203           15 :                 .await
     204       713965 :         }
     205       715187 :         *manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
     206              :     }
     207          396 : }
     208              : 
     209              : /// Endlessly try to subscribe for broker updates for a given timeline.
     210         1190 : async fn subscribe_for_timeline_updates(
     211         1190 :     broker_client: &mut BrokerClientChannel,
     212         1190 :     id: TenantTimelineId,
     213         1190 : ) -> Streaming<SafekeeperTimelineInfo> {
     214         1190 :     let mut attempt = 0;
     215         1190 :     let cancel = shutdown_token();
     216              : 
     217              :     loop {
     218         1190 :         exponential_backoff(
     219         1190 :             attempt,
     220         1190 :             DEFAULT_BASE_BACKOFF_SECONDS,
     221         1190 :             DEFAULT_MAX_BACKOFF_SECONDS,
     222         1190 :             &cancel,
     223         1190 :         )
     224            0 :         .await;
     225         1190 :         attempt += 1;
     226         1190 : 
     227         1190 :         // subscribe to the specific timeline
     228         1190 :         let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
     229         1190 :             tenant_id: id.tenant_id.as_ref().to_owned(),
     230         1190 :             timeline_id: id.timeline_id.as_ref().to_owned(),
     231         1190 :         });
     232         1190 :         let request = SubscribeSafekeeperInfoRequest {
     233         1190 :             subscription_key: Some(key),
     234         1190 :         };
     235         1190 : 
     236         2373 :         match broker_client.subscribe_safekeeper_info(request).await {
     237         1182 :             Ok(resp) => {
     238         1182 :                 return resp.into_inner();
     239              :             }
     240            0 :             Err(e) => {
     241              :                 // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
     242              :                 // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
     243            0 :                 info!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}");
     244            0 :                 continue;
     245              :             }
     246              :         }
     247              :     }
     248         1182 : }
     249              : 
     250              : const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
     251              : const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
     252              : const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
     253              : 
     254              : /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
     255              : pub(super) struct ConnectionManagerState {
     256              :     id: TenantTimelineId,
     257              :     /// Use pageserver data about the timeline to filter out some of the safekeepers.
     258              :     timeline: Arc<Timeline>,
     259              :     conf: WalReceiverConf,
     260              :     /// Current connection to safekeeper for WAL streaming.
     261              :     wal_connection: Option<WalConnection>,
     262              :     /// Info about retries and unsuccessful attempts to connect to safekeepers.
     263              :     wal_connection_retries: HashMap<NodeId, RetryInfo>,
     264              :     /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
     265              :     wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
     266              : }
     267              : 
     268              : /// An information about connection manager's current connection and connection candidates.
     269            3 : #[derive(Debug, Clone)]
     270              : pub struct ConnectionManagerStatus {
     271              :     existing_connection: Option<WalConnectionStatus>,
     272              :     wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
     273              : }
     274              : 
     275              : impl ConnectionManagerStatus {
     276              :     /// Generates a string, describing current connection status in a form, suitable for logging.
     277            3 :     pub fn to_human_readable_string(&self) -> String {
     278            3 :         let mut resulting_string = String::new();
     279            3 :         match &self.existing_connection {
     280            3 :             Some(connection) => {
     281            3 :                 if connection.has_processed_wal {
     282            3 :                     resulting_string.push_str(&format!(
     283            3 :                         " (update {}): streaming WAL from node {}, ",
     284            3 :                         connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
     285            3 :                         connection.node,
     286            3 :                     ));
     287            3 : 
     288            3 :                     match (connection.streaming_lsn, connection.commit_lsn) {
     289            0 :                         (None, None) => resulting_string.push_str("no streaming data"),
     290            0 :                         (None, Some(commit_lsn)) => {
     291            0 :                             resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
     292              :                         }
     293            0 :                         (Some(streaming_lsn), None) => {
     294            0 :                             resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
     295              :                         }
     296            3 :                         (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
     297            3 :                             &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
     298            3 :                         ),
     299              :                     }
     300            0 :                 } else if connection.is_connected {
     301            0 :                     resulting_string.push_str(&format!(
     302            0 :                         " (update {}): connecting to node {}",
     303            0 :                         connection
     304            0 :                             .latest_connection_update
     305            0 :                             .format("%Y-%m-%d %H:%M:%S"),
     306            0 :                         connection.node,
     307            0 :                     ));
     308            0 :                 } else {
     309            0 :                     resulting_string.push_str(&format!(
     310            0 :                         " (update {}): initializing node {} connection",
     311            0 :                         connection
     312            0 :                             .latest_connection_update
     313            0 :                             .format("%Y-%m-%d %H:%M:%S"),
     314            0 :                         connection.node,
     315            0 :                     ));
     316            0 :                 }
     317              :             }
     318            0 :             None => resulting_string.push_str(": disconnected"),
     319              :         }
     320              : 
     321            3 :         resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
     322            3 :         let mut candidates = self.wal_stream_candidates.iter().peekable();
     323            9 :         while let Some((node_id, candidate_info)) = candidates.next() {
     324            6 :             resulting_string.push_str(&format!(
     325            6 :                 "({}|{}|{})",
     326            6 :                 node_id,
     327            6 :                 candidate_info.latest_update.format("%H:%M:%S"),
     328            6 :                 Lsn(candidate_info.timeline.commit_lsn)
     329            6 :             ));
     330            6 :             if candidates.peek().is_some() {
     331            3 :                 resulting_string.push_str(", ");
     332            3 :             }
     333              :         }
     334            3 :         resulting_string.push(']');
     335            3 : 
     336            3 :         resulting_string
     337            3 :     }
     338              : }
     339              : 
     340              : /// Current connection data.
     341            0 : #[derive(Debug)]
     342              : struct WalConnection {
     343              :     /// Time when the connection was initiated.
     344              :     started_at: NaiveDateTime,
     345              :     /// Current safekeeper pageserver is connected to for WAL streaming.
     346              :     sk_id: NodeId,
     347              :     /// Availability zone of the safekeeper.
     348              :     availability_zone: Option<String>,
     349              :     /// Status of the connection.
     350              :     status: WalConnectionStatus,
     351              :     /// WAL streaming task handle.
     352              :     connection_task: TaskHandle<WalConnectionStatus>,
     353              :     /// Have we discovered that other safekeeper has more recent WAL than we do?
     354              :     discovered_new_wal: Option<NewCommittedWAL>,
     355              : }
     356              : 
     357              : /// Notion of a new committed WAL, which exists on other safekeeper.
     358            0 : #[derive(Debug, Clone, Copy)]
     359              : struct NewCommittedWAL {
     360              :     /// LSN of the new committed WAL.
     361              :     lsn: Lsn,
     362              :     /// When we discovered that the new committed WAL exists on other safekeeper.
     363              :     discovered_at: NaiveDateTime,
     364              : }
     365              : 
     366            0 : #[derive(Debug, Clone, Copy)]
     367              : struct RetryInfo {
     368              :     next_retry_at: Option<NaiveDateTime>,
     369              :     retry_duration_seconds: f64,
     370              : }
     371              : 
     372              : /// Data about the timeline to connect to, received from the broker.
     373      1367192 : #[derive(Debug, Clone)]
     374              : struct BrokerSkTimeline {
     375              :     timeline: SafekeeperTimelineInfo,
     376              :     /// Time at which the data was fetched from the broker last time, to track the stale data.
     377              :     latest_update: NaiveDateTime,
     378              : }
     379              : 
     380              : impl ConnectionManagerState {
     381         1190 :     pub(super) fn new(timeline: Arc<Timeline>, conf: WalReceiverConf) -> Self {
     382         1190 :         let id = TenantTimelineId {
     383         1190 :             tenant_id: timeline.tenant_id,
     384         1190 :             timeline_id: timeline.timeline_id,
     385         1190 :         };
     386         1190 :         Self {
     387         1190 :             id,
     388         1190 :             timeline,
     389         1190 :             conf,
     390         1190 :             wal_connection: None,
     391         1190 :             wal_stream_candidates: HashMap::new(),
     392         1190 :             wal_connection_retries: HashMap::new(),
     393         1190 :         }
     394         1190 :     }
     395              : 
     396              :     /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
     397         1222 :     async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
     398         1222 :         WALRECEIVER_SWITCHES
     399         1222 :             .with_label_values(&[new_sk.reason.name()])
     400         1222 :             .inc();
     401         1222 : 
     402         1222 :         self.drop_old_connection(true).await;
     403              : 
     404         1222 :         let node_id = new_sk.safekeeper_id;
     405         1222 :         let connect_timeout = self.conf.wal_connect_timeout;
     406         1222 :         let timeline = Arc::clone(&self.timeline);
     407         1222 :         let ctx = ctx.detached_child(
     408         1222 :             TaskKind::WalReceiverConnectionHandler,
     409         1222 :             DownloadBehavior::Download,
     410         1222 :         );
     411              : 
     412         1222 :         let span = info_span!("connection", %node_id);
     413         1222 :         let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
     414         1222 :             async move {
     415         1222 :                 debug_assert_current_span_has_tenant_and_timeline_id();
     416              : 
     417         1222 :                 let res = super::walreceiver_connection::handle_walreceiver_connection(
     418         1222 :                     timeline,
     419         1222 :                     new_sk.wal_source_connconf,
     420         1222 :                     events_sender,
     421         1222 :                     cancellation,
     422         1222 :                     connect_timeout,
     423         1222 :                     ctx,
     424         1222 :                     node_id,
     425         1222 :                 )
     426      6020556 :                 .await;
     427              : 
     428         1162 :                 match res {
     429          206 :                     Ok(()) => Ok(()),
     430          956 :                     Err(e) => {
     431          956 :                         match e {
     432          129 :                             WalReceiverError::SuccessfulCompletion(msg) => {
     433          129 :                                 info!("walreceiver connection handling ended with success: {msg}");
     434          129 :                                 Ok(())
     435              :                             }
     436          817 :                             WalReceiverError::ExpectedSafekeeperError(e) => {
     437          817 :                                 info!("walreceiver connection handling ended: {e}");
     438          817 :                                 Ok(())
     439              :                             }
     440           10 :                             WalReceiverError::Other(e) => {
     441           10 :                                 // give out an error to have task_mgr give it a really verbose logging
     442           10 :                                 Err(e).context("walreceiver connection handling failure")
     443              :                             }
     444              :                         }
     445              :                     }
     446              :                 }
     447         1162 :             }
     448         1222 :             .instrument(span)
     449         1222 :         });
     450         1222 : 
     451         1222 :         let now = Utc::now().naive_utc();
     452         1222 :         self.wal_connection = Some(WalConnection {
     453         1222 :             started_at: now,
     454         1222 :             sk_id: new_sk.safekeeper_id,
     455         1222 :             availability_zone: new_sk.availability_zone,
     456         1222 :             status: WalConnectionStatus {
     457         1222 :                 is_connected: false,
     458         1222 :                 has_processed_wal: false,
     459         1222 :                 latest_connection_update: now,
     460         1222 :                 latest_wal_update: now,
     461         1222 :                 streaming_lsn: None,
     462         1222 :                 commit_lsn: None,
     463         1222 :                 node: node_id,
     464         1222 :             },
     465         1222 :             connection_task: connection_handle,
     466         1222 :             discovered_new_wal: None,
     467         1222 :         });
     468         1222 :     }
     469              : 
     470              :     /// Drops the current connection (if any) and updates retry timeout for the next
     471              :     /// connection attempt to the same safekeeper.
     472         2128 :     async fn drop_old_connection(&mut self, needs_shutdown: bool) {
     473         2128 :         let wal_connection = match self.wal_connection.take() {
     474          921 :             Some(wal_connection) => wal_connection,
     475         1207 :             None => return,
     476              :         };
     477              : 
     478          921 :         if needs_shutdown {
     479           15 :             wal_connection.connection_task.shutdown().await;
     480          906 :         }
     481              : 
     482          921 :         let retry = self
     483          921 :             .wal_connection_retries
     484          921 :             .entry(wal_connection.sk_id)
     485          921 :             .or_insert(RetryInfo {
     486          921 :                 next_retry_at: None,
     487          921 :                 retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
     488          921 :             });
     489          921 : 
     490          921 :         let now = Utc::now().naive_utc();
     491          921 : 
     492          921 :         // Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
     493          921 :         // and we add backoff to the time when we started the connection attempt. If the connection
     494          921 :         // was active for a long time, then next_retry_at will be in the past.
     495          921 :         retry.next_retry_at =
     496          921 :             wal_connection
     497          921 :                 .started_at
     498          921 :                 .checked_add_signed(chrono::Duration::milliseconds(
     499          921 :                     (retry.retry_duration_seconds * 1000.0) as i64,
     500          921 :                 ));
     501              : 
     502          921 :         if let Some(next) = &retry.next_retry_at {
     503          921 :             if next > &now {
     504          500 :                 info!(
     505          500 :                     "Next connection retry to {:?} is at {}",
     506          500 :                     wal_connection.sk_id, next
     507          500 :                 );
     508          421 :             }
     509            0 :         }
     510              : 
     511          921 :         let next_retry_duration =
     512          921 :             retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
     513          921 :         // Clamp the next retry duration to the maximum allowed.
     514          921 :         let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
     515          921 :         // Clamp the next retry duration to the minimum allowed.
     516          921 :         let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
     517          921 : 
     518          921 :         retry.retry_duration_seconds = next_retry_duration;
     519         2128 :     }
     520              : 
     521              :     /// Returns time needed to wait to have a new candidate for WAL streaming.
     522       716370 :     fn time_until_next_retry(&self) -> Option<Duration> {
     523       716370 :         let now = Utc::now().naive_utc();
     524              : 
     525       716370 :         let next_retry_at = self
     526       716370 :             .wal_connection_retries
     527       716370 :             .values()
     528       716370 :             .filter_map(|retry| retry.next_retry_at)
     529       716370 :             .filter(|next_retry_at| next_retry_at > &now)
     530       716370 :             .min()?;
     531              : 
     532          879 :         (next_retry_at - now).to_std().ok()
     533       716370 :     }
     534              : 
     535              :     /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
     536         5714 :     fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
     537         5714 :         WALRECEIVER_BROKER_UPDATES.inc();
     538         5714 : 
     539         5714 :         let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
     540         5714 :         let old_entry = self.wal_stream_candidates.insert(
     541         5714 :             new_safekeeper_id,
     542         5714 :             BrokerSkTimeline {
     543         5714 :                 timeline: timeline_update,
     544         5714 :                 latest_update: Utc::now().naive_utc(),
     545         5714 :             },
     546         5714 :         );
     547         5714 : 
     548         5714 :         if old_entry.is_none() {
     549          611 :             info!("New SK node was added: {new_safekeeper_id}");
     550          611 :             WALRECEIVER_CANDIDATES_ADDED.inc();
     551         5103 :         }
     552         5714 :     }
     553              : 
     554              :     /// Cleans up stale broker records and checks the rest for the new connection candidate.
     555              :     /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
     556              :     /// The current rules for approving new candidates:
     557              :     /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
     558              :     /// * if there's no such entry, no new candidate found, abort
     559              :     /// * otherwise check if the candidate is much better than the current one
     560              :     ///
     561              :     /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
     562              :     /// General rules are following:
     563              :     /// * if connected safekeeper is not present, pick the candidate
     564              :     /// * if we haven't received any updates for some time, pick the candidate
     565              :     /// * if the candidate commit_lsn is much higher than the current one, pick the candidate
     566              :     /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
     567              :     /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
     568              :     ///
     569              :     /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
     570              :     /// Both thresholds are configured per tenant.
     571       715197 :     fn next_connection_candidate(&mut self) -> Option<NewWalConnectionCandidate> {
     572       715197 :         self.cleanup_old_candidates();
     573       715197 : 
     574       715197 :         match &self.wal_connection {
     575       713567 :             Some(existing_wal_connection) => {
     576       713567 :                 let connected_sk_node = existing_wal_connection.sk_id;
     577              : 
     578       366302 :                 let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
     579       713567 :                     self.select_connection_candidate(Some(connected_sk_node))?;
     580       366302 :                 let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
     581       366302 : 
     582       366302 :                 let now = Utc::now().naive_utc();
     583       366302 :                 if let Ok(latest_interaciton) =
     584       366302 :                     (now - existing_wal_connection.status.latest_connection_update).to_std()
     585              :                 {
     586              :                     // Drop connection if we haven't received keepalive message for a while.
     587       366302 :                     if latest_interaciton > self.conf.wal_connect_timeout {
     588            1 :                         return Some(NewWalConnectionCandidate {
     589            1 :                             safekeeper_id: new_sk_id,
     590            1 :                             wal_source_connconf: new_wal_source_connconf,
     591            1 :                             availability_zone: new_availability_zone,
     592            1 :                             reason: ReconnectReason::NoKeepAlives {
     593            1 :                                 last_keep_alive: Some(
     594            1 :                                     existing_wal_connection.status.latest_connection_update,
     595            1 :                                 ),
     596            1 :                                 check_time: now,
     597            1 :                                 threshold: self.conf.wal_connect_timeout,
     598            1 :                             },
     599            1 :                         });
     600       366301 :                     }
     601            0 :                 }
     602              : 
     603       366301 :                 if !existing_wal_connection.status.is_connected {
     604              :                     // We haven't connected yet and we shouldn't switch until connection timeout (condition above).
     605          173 :                     return None;
     606       366128 :                 }
     607              : 
     608       366128 :                 if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
     609       366053 :                     let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
     610       366053 :                     // Check if the new candidate has much more WAL than the current one.
     611       366053 :                     match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
     612         9201 :                         Some(new_sk_lsn_advantage) => {
     613         9201 :                             if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
     614           16 :                                 return Some(NewWalConnectionCandidate {
     615           16 :                                     safekeeper_id: new_sk_id,
     616           16 :                                     wal_source_connconf: new_wal_source_connconf,
     617           16 :                                     availability_zone: new_availability_zone,
     618           16 :                                     reason: ReconnectReason::LaggingWal {
     619           16 :                                         current_commit_lsn,
     620           16 :                                         new_commit_lsn,
     621           16 :                                         threshold: self.conf.max_lsn_wal_lag,
     622           16 :                                     },
     623           16 :                                 });
     624         9185 :                             }
     625         9185 :                             // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
     626         9185 :                             // and the current one is not, switch to the new one.
     627         9185 :                             if self.conf.availability_zone.is_some()
     628          139 :                                 && existing_wal_connection.availability_zone
     629          139 :                                     != self.conf.availability_zone
     630          139 :                                 && self.conf.availability_zone == new_availability_zone
     631              :                             {
     632            1 :                                 return Some(NewWalConnectionCandidate {
     633            1 :                                     safekeeper_id: new_sk_id,
     634            1 :                                     availability_zone: new_availability_zone,
     635            1 :                                     wal_source_connconf: new_wal_source_connconf,
     636            1 :                                     reason: ReconnectReason::SwitchAvailabilityZone,
     637            1 :                                 });
     638         9184 :                             }
     639              :                         }
     640       356852 :                         None => debug!(
     641            0 :                             "Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
     642            0 :                         ),
     643              :                     }
     644           75 :                 }
     645              : 
     646       366111 :                 let current_lsn = match existing_wal_connection.status.streaming_lsn {
     647       365817 :                     Some(lsn) => lsn,
     648          294 :                     None => self.timeline.get_last_record_lsn(),
     649              :                 };
     650       366111 :                 let current_commit_lsn = existing_wal_connection
     651       366111 :                     .status
     652       366111 :                     .commit_lsn
     653       366111 :                     .unwrap_or(current_lsn);
     654       366111 :                 let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
     655       366111 : 
     656       366111 :                 // Keep discovered_new_wal only if connected safekeeper has not caught up yet.
     657       366111 :                 let mut discovered_new_wal = existing_wal_connection
     658       366111 :                     .discovered_new_wal
     659       366111 :                     .filter(|new_wal| new_wal.lsn > current_commit_lsn);
     660       366111 : 
     661       366111 :                 if discovered_new_wal.is_none() {
     662              :                     // Check if the new candidate has more WAL than the current one.
     663              :                     // If the new candidate has more WAL than the current one, we consider switching to the new candidate.
     664       363018 :                     discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
     665          115 :                         trace!(
     666            0 :                             "New candidate has commit_lsn {}, higher than current_commit_lsn {}",
     667            0 :                             candidate_commit_lsn,
     668            0 :                             current_commit_lsn
     669            0 :                         );
     670          115 :                         Some(NewCommittedWAL {
     671          115 :                             lsn: candidate_commit_lsn,
     672          115 :                             discovered_at: Utc::now().naive_utc(),
     673          115 :                         })
     674              :                     } else {
     675       362903 :                         None
     676              :                     };
     677         3093 :                 }
     678              : 
     679       366111 :                 let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
     680              :                     // Connected safekeeper has more WAL, but we haven't received updates for some time.
     681        10910 :                     trace!(
     682            0 :                         "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
     683            0 :                         (now - existing_wal_connection.status.latest_wal_update).to_std(),
     684            0 :                         current_lsn,
     685            0 :                         current_commit_lsn
     686            0 :                     );
     687        10910 :                     Some(existing_wal_connection.status.latest_wal_update)
     688              :                 } else {
     689       355201 :                     discovered_new_wal.as_ref().map(|new_wal| {
     690          137 :                         // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
     691          137 :                         new_wal
     692          137 :                             .discovered_at
     693          137 :                             .max(existing_wal_connection.status.latest_wal_update)
     694       355201 :                     })
     695              :                 };
     696              : 
     697              :                 // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
     698       366111 :                 if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
     699        11047 :                     if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
     700        11011 :                         if candidate_commit_lsn > current_commit_lsn
     701         3172 :                             && waiting_for_new_wal > self.conf.lagging_wal_timeout
     702              :                         {
     703            1 :                             return Some(NewWalConnectionCandidate {
     704            1 :                                 safekeeper_id: new_sk_id,
     705            1 :                                 wal_source_connconf: new_wal_source_connconf,
     706            1 :                                 availability_zone: new_availability_zone,
     707            1 :                                 reason: ReconnectReason::NoWalTimeout {
     708            1 :                                     current_lsn,
     709            1 :                                     current_commit_lsn,
     710            1 :                                     candidate_commit_lsn,
     711            1 :                                     last_wal_interaction: Some(
     712            1 :                                         existing_wal_connection.status.latest_wal_update,
     713            1 :                                     ),
     714            1 :                                     check_time: now,
     715            1 :                                     threshold: self.conf.lagging_wal_timeout,
     716            1 :                                 },
     717            1 :                             });
     718        11010 :                         }
     719           36 :                     }
     720       355064 :                 }
     721              : 
     722       366110 :                 self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
     723              :             }
     724              :             None => {
     725         1210 :                 let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
     726         1630 :                     self.select_connection_candidate(None)?;
     727         1210 :                 return Some(NewWalConnectionCandidate {
     728         1210 :                     safekeeper_id: new_sk_id,
     729         1210 :                     availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
     730         1210 :                     wal_source_connconf: new_wal_source_connconf,
     731         1210 :                     reason: ReconnectReason::NoExistingConnection,
     732         1210 :                 });
     733              :             }
     734              :         }
     735              : 
     736       366110 :         None
     737       715197 :     }
     738              : 
     739              :     /// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
     740              :     /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
     741              :     ///
     742              :     /// The candidate that is chosen:
     743              :     /// * has no pending retry cooldown
     744              :     /// * has greatest commit_lsn among the ones that are left
     745       715197 :     fn select_connection_candidate(
     746       715197 :         &self,
     747       715197 :         node_to_omit: Option<NodeId>,
     748       715197 :     ) -> Option<(NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
     749       715197 :         self.applicable_connection_candidates()
     750      1366038 :             .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
     751       715197 :             .max_by_key(|(_, info, _)| info.commit_lsn)
     752       715197 :     }
     753              : 
     754              :     /// Returns a list of safekeepers that have valid info and ready for connection.
     755              :     /// Some safekeepers are filtered by the retry cooldown.
     756       715197 :     fn applicable_connection_candidates(
     757       715197 :         &self,
     758       715197 :     ) -> impl Iterator<Item = (NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
     759       715197 :         let now = Utc::now().naive_utc();
     760       715197 : 
     761       715197 :         self.wal_stream_candidates
     762       715197 :             .iter()
     763      1367203 :             .filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
     764      1367198 :             .filter(move |(sk_id, _)| {
     765      1367198 :                 let next_retry_at = self
     766      1367198 :                     .wal_connection_retries
     767      1367198 :                     .get(sk_id)
     768      1367198 :                     .and_then(|retry_info| {
     769       244214 :                         retry_info.next_retry_at
     770      1367198 :                     });
     771      1367198 : 
     772      1367198 :                 next_retry_at.is_none() || next_retry_at.unwrap() <= now
     773      1367198 :             }).filter_map(|(sk_id, broker_info)| {
     774      1366040 :                 let info = &broker_info.timeline;
     775      1366040 :                 if info.safekeeper_connstr.is_empty() {
     776            2 :                     return None; // no connection string, ignore sk
     777      1366038 :                 }
     778      1366038 :                 match wal_stream_connection_config(
     779      1366038 :                     self.id,
     780      1366038 :                     info.safekeeper_connstr.as_ref(),
     781      1366038 :                     match &self.conf.auth_token {
     782      1365081 :                         None => None,
     783          957 :                         Some(x) => Some(x),
     784              :                     },
     785      1366038 :                     self.conf.availability_zone.as_deref(),
     786              :                 ) {
     787      1366038 :                     Ok(connstr) => Some((*sk_id, info, connstr)),
     788            0 :                     Err(e) => {
     789            0 :                         error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
     790            0 :                         None
     791              :                     }
     792              :                 }
     793      1366040 :             })
     794       715197 :     }
     795              : 
     796              :     /// Remove candidates which haven't sent broker updates for a while.
     797       715197 :     fn cleanup_old_candidates(&mut self) {
     798       715197 :         let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
     799       715197 :         let lagging_wal_timeout = self.conf.lagging_wal_timeout;
     800       715197 : 
     801       715197 :         self.wal_stream_candidates.retain(|node_id, broker_info| {
     802      1367211 :             if let Ok(time_since_latest_broker_update) =
     803      1367211 :                 (Utc::now().naive_utc() - broker_info.latest_update).to_std()
     804              :             {
     805      1367211 :                 let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
     806      1367211 :                 if !should_retain {
     807            8 :                     node_ids_to_remove.push(*node_id);
     808      1367203 :                 }
     809      1367211 :                 should_retain
     810              :             } else {
     811            0 :                 true
     812              :             }
     813      1367211 :         });
     814       715197 : 
     815       715197 :         if !node_ids_to_remove.is_empty() {
     816           16 :             for node_id in node_ids_to_remove {
     817            8 :                 info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
     818            8 :                 self.wal_connection_retries.remove(&node_id);
     819            8 :                 WALRECEIVER_CANDIDATES_REMOVED.inc();
     820              :             }
     821       715189 :         }
     822       715197 :     }
     823              : 
     824          493 :     pub(super) async fn shutdown(mut self) {
     825          493 :         if let Some(wal_connection) = self.wal_connection.take() {
     826          290 :             wal_connection.connection_task.shutdown().await;
     827          252 :         }
     828          493 :     }
     829              : 
     830       715188 :     fn manager_status(&self) -> ConnectionManagerStatus {
     831       715188 :         ConnectionManagerStatus {
     832       715188 :             existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
     833       715188 :             wal_stream_candidates: self.wal_stream_candidates.clone(),
     834       715188 :         }
     835       715188 :     }
     836              : }
     837              : 
     838         1222 : #[derive(Debug)]
     839              : struct NewWalConnectionCandidate {
     840              :     safekeeper_id: NodeId,
     841              :     wal_source_connconf: PgConnectionConfig,
     842              :     availability_zone: Option<String>,
     843              :     reason: ReconnectReason,
     844              : }
     845              : 
     846              : /// Stores the reason why WAL connection was switched, for furter debugging purposes.
     847         1222 : #[derive(Debug, PartialEq, Eq)]
     848              : enum ReconnectReason {
     849              :     NoExistingConnection,
     850              :     LaggingWal {
     851              :         current_commit_lsn: Lsn,
     852              :         new_commit_lsn: Lsn,
     853              :         threshold: NonZeroU64,
     854              :     },
     855              :     SwitchAvailabilityZone,
     856              :     NoWalTimeout {
     857              :         current_lsn: Lsn,
     858              :         current_commit_lsn: Lsn,
     859              :         candidate_commit_lsn: Lsn,
     860              :         last_wal_interaction: Option<NaiveDateTime>,
     861              :         check_time: NaiveDateTime,
     862              :         threshold: Duration,
     863              :     },
     864              :     NoKeepAlives {
     865              :         last_keep_alive: Option<NaiveDateTime>,
     866              :         check_time: NaiveDateTime,
     867              :         threshold: Duration,
     868              :     },
     869              : }
     870              : 
     871              : impl ReconnectReason {
     872         1222 :     fn name(&self) -> &str {
     873         1222 :         match self {
     874         1207 :             ReconnectReason::NoExistingConnection => "NoExistingConnection",
     875           15 :             ReconnectReason::LaggingWal { .. } => "LaggingWal",
     876            0 :             ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
     877            0 :             ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
     878            0 :             ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
     879              :         }
     880         1222 :     }
     881              : }
     882              : 
     883              : #[cfg(test)]
     884              : mod tests {
     885              :     use super::*;
     886              :     use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
     887              :     use url::Host;
     888              : 
     889           19 :     fn dummy_broker_sk_timeline(
     890           19 :         commit_lsn: u64,
     891           19 :         safekeeper_connstr: &str,
     892           19 :         latest_update: NaiveDateTime,
     893           19 :     ) -> BrokerSkTimeline {
     894           19 :         BrokerSkTimeline {
     895           19 :             timeline: SafekeeperTimelineInfo {
     896           19 :                 safekeeper_id: 0,
     897           19 :                 tenant_timeline_id: None,
     898           19 :                 term: 0,
     899           19 :                 last_log_term: 0,
     900           19 :                 flush_lsn: 0,
     901           19 :                 commit_lsn,
     902           19 :                 backup_lsn: 0,
     903           19 :                 remote_consistent_lsn: 0,
     904           19 :                 peer_horizon_lsn: 0,
     905           19 :                 local_start_lsn: 0,
     906           19 :                 safekeeper_connstr: safekeeper_connstr.to_owned(),
     907           19 :                 http_connstr: safekeeper_connstr.to_owned(),
     908           19 :                 availability_zone: None,
     909           19 :             },
     910           19 :             latest_update,
     911           19 :         }
     912           19 :     }
     913              : 
     914            1 :     #[tokio::test]
     915            1 :     async fn no_connection_no_candidate() -> anyhow::Result<()> {
     916            1 :         let harness = TenantHarness::create("no_connection_no_candidate")?;
     917            3 :         let mut state = dummy_state(&harness).await;
     918            1 :         let now = Utc::now().naive_utc();
     919              : 
     920            1 :         let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
     921            1 :         let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
     922            1 : 
     923            1 :         state.wal_connection = None;
     924            1 :         state.wal_stream_candidates = HashMap::from([
     925            1 :             (NodeId(0), dummy_broker_sk_timeline(1, "", now)),
     926            1 :             (NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
     927            1 :             (NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
     928            1 :             (
     929            1 :                 NodeId(3),
     930            1 :                 dummy_broker_sk_timeline(
     931            1 :                     1 + state.conf.max_lsn_wal_lag.get(),
     932            1 :                     "delay_over_threshold",
     933            1 :                     delay_over_threshold,
     934            1 :                 ),
     935            1 :             ),
     936            1 :         ]);
     937            1 : 
     938            1 :         let no_candidate = state.next_connection_candidate();
     939            1 :         assert!(
     940            1 :             no_candidate.is_none(),
     941            0 :             "Expected no candidate selected out of non full data options, but got {no_candidate:?}"
     942              :         );
     943              : 
     944            1 :         Ok(())
     945              :     }
     946              : 
     947            1 :     #[tokio::test]
     948            1 :     async fn connection_no_candidate() -> anyhow::Result<()> {
     949            1 :         let harness = TenantHarness::create("connection_no_candidate")?;
     950            3 :         let mut state = dummy_state(&harness).await;
     951            1 :         let now = Utc::now().naive_utc();
     952            1 : 
     953            1 :         let connected_sk_id = NodeId(0);
     954            1 :         let current_lsn = 100_000;
     955            1 : 
     956            1 :         let connection_status = WalConnectionStatus {
     957            1 :             is_connected: true,
     958            1 :             has_processed_wal: true,
     959            1 :             latest_connection_update: now,
     960            1 :             latest_wal_update: now,
     961            1 :             commit_lsn: Some(Lsn(current_lsn)),
     962            1 :             streaming_lsn: Some(Lsn(current_lsn)),
     963            1 :             node: NodeId(1),
     964            1 :         };
     965            1 : 
     966            1 :         state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
     967            1 :         state.wal_connection = Some(WalConnection {
     968            1 :             started_at: now,
     969            1 :             sk_id: connected_sk_id,
     970            1 :             availability_zone: None,
     971            1 :             status: connection_status,
     972            1 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
     973            1 :                 sender
     974            1 :                     .send(TaskStateUpdate::Progress(connection_status))
     975            1 :                     .ok();
     976            1 :                 Ok(())
     977            1 :             }),
     978            1 :             discovered_new_wal: None,
     979            1 :         });
     980            1 :         state.wal_stream_candidates = HashMap::from([
     981            1 :             (
     982            1 :                 connected_sk_id,
     983            1 :                 dummy_broker_sk_timeline(
     984            1 :                     current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
     985            1 :                     DUMMY_SAFEKEEPER_HOST,
     986            1 :                     now,
     987            1 :                 ),
     988            1 :             ),
     989            1 :             (
     990            1 :                 NodeId(1),
     991            1 :                 dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
     992            1 :             ),
     993            1 :             (
     994            1 :                 NodeId(2),
     995            1 :                 dummy_broker_sk_timeline(
     996            1 :                     current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
     997            1 :                     "not_enough_advanced_lsn",
     998            1 :                     now,
     999            1 :                 ),
    1000            1 :             ),
    1001            1 :         ]);
    1002            1 : 
    1003            1 :         let no_candidate = state.next_connection_candidate();
    1004            1 :         assert!(
    1005            1 :             no_candidate.is_none(),
    1006            0 :             "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}"
    1007              :         );
    1008              : 
    1009            1 :         Ok(())
    1010              :     }
    1011              : 
    1012            1 :     #[tokio::test]
    1013            1 :     async fn no_connection_candidate() -> anyhow::Result<()> {
    1014            1 :         let harness = TenantHarness::create("no_connection_candidate")?;
    1015            3 :         let mut state = dummy_state(&harness).await;
    1016            1 :         let now = Utc::now().naive_utc();
    1017            1 : 
    1018            1 :         state.wal_connection = None;
    1019            1 :         state.wal_stream_candidates = HashMap::from([(
    1020            1 :             NodeId(0),
    1021            1 :             dummy_broker_sk_timeline(
    1022            1 :                 1 + state.conf.max_lsn_wal_lag.get(),
    1023            1 :                 DUMMY_SAFEKEEPER_HOST,
    1024            1 :                 now,
    1025            1 :             ),
    1026            1 :         )]);
    1027            1 : 
    1028            1 :         let only_candidate = state
    1029            1 :             .next_connection_candidate()
    1030            1 :             .expect("Expected one candidate selected out of the only data option, but got none");
    1031            1 :         assert_eq!(only_candidate.safekeeper_id, NodeId(0));
    1032            1 :         assert_eq!(
    1033              :             only_candidate.reason,
    1034              :             ReconnectReason::NoExistingConnection,
    1035            0 :             "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
    1036              :         );
    1037            1 :         assert_eq!(
    1038            1 :             only_candidate.wal_source_connconf.host(),
    1039            1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1040            1 :         );
    1041              : 
    1042            1 :         let selected_lsn = 100_000;
    1043            1 :         state.wal_stream_candidates = HashMap::from([
    1044            1 :             (
    1045            1 :                 NodeId(0),
    1046            1 :                 dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
    1047            1 :             ),
    1048            1 :             (
    1049            1 :                 NodeId(1),
    1050            1 :                 dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
    1051            1 :             ),
    1052            1 :             (
    1053            1 :                 NodeId(2),
    1054            1 :                 dummy_broker_sk_timeline(selected_lsn + 100, "", now),
    1055            1 :             ),
    1056            1 :         ]);
    1057            1 :         let biggest_wal_candidate = state.next_connection_candidate().expect(
    1058            1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1059            1 :         );
    1060            1 : 
    1061            1 :         assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1));
    1062            1 :         assert_eq!(
    1063              :             biggest_wal_candidate.reason,
    1064              :             ReconnectReason::NoExistingConnection,
    1065            0 :             "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
    1066              :         );
    1067            1 :         assert_eq!(
    1068            1 :             biggest_wal_candidate.wal_source_connconf.host(),
    1069            1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1070            1 :         );
    1071              : 
    1072            1 :         Ok(())
    1073              :     }
    1074              : 
    1075            1 :     #[tokio::test]
    1076            1 :     async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
    1077            1 :         let harness = TenantHarness::create("candidate_with_many_connection_failures")?;
    1078            3 :         let mut state = dummy_state(&harness).await;
    1079            1 :         let now = Utc::now().naive_utc();
    1080            1 : 
    1081            1 :         let current_lsn = Lsn(100_000).align();
    1082            1 :         let bigger_lsn = Lsn(current_lsn.0 + 100).align();
    1083            1 : 
    1084            1 :         state.wal_connection = None;
    1085            1 :         state.wal_stream_candidates = HashMap::from([
    1086            1 :             (
    1087            1 :                 NodeId(0),
    1088            1 :                 dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1089            1 :             ),
    1090            1 :             (
    1091            1 :                 NodeId(1),
    1092            1 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1093            1 :             ),
    1094            1 :         ]);
    1095            1 :         state.wal_connection_retries = HashMap::from([(
    1096            1 :             NodeId(0),
    1097            1 :             RetryInfo {
    1098            1 :                 next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
    1099            1 :                 retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
    1100            1 :             },
    1101            1 :         )]);
    1102            1 : 
    1103            1 :         let candidate_with_less_errors = state
    1104            1 :             .next_connection_candidate()
    1105            1 :             .expect("Expected one candidate selected, but got none");
    1106            1 :         assert_eq!(
    1107              :             candidate_with_less_errors.safekeeper_id,
    1108              :             NodeId(1),
    1109            0 :             "Should select the node with no pending retry cooldown"
    1110              :         );
    1111              : 
    1112            1 :         Ok(())
    1113              :     }
    1114              : 
    1115            1 :     #[tokio::test]
    1116            1 :     async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
    1117            1 :         let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?;
    1118            3 :         let mut state = dummy_state(&harness).await;
    1119            1 :         let current_lsn = Lsn(100_000).align();
    1120            1 :         let now = Utc::now().naive_utc();
    1121            1 : 
    1122            1 :         let connected_sk_id = NodeId(0);
    1123            1 :         let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
    1124            1 : 
    1125            1 :         let connection_status = WalConnectionStatus {
    1126            1 :             is_connected: true,
    1127            1 :             has_processed_wal: true,
    1128            1 :             latest_connection_update: now,
    1129            1 :             latest_wal_update: now,
    1130            1 :             commit_lsn: Some(current_lsn),
    1131            1 :             streaming_lsn: Some(current_lsn),
    1132            1 :             node: connected_sk_id,
    1133            1 :         };
    1134            1 : 
    1135            1 :         state.wal_connection = Some(WalConnection {
    1136            1 :             started_at: now,
    1137            1 :             sk_id: connected_sk_id,
    1138            1 :             availability_zone: None,
    1139            1 :             status: connection_status,
    1140            1 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
    1141            1 :                 sender
    1142            1 :                     .send(TaskStateUpdate::Progress(connection_status))
    1143            1 :                     .ok();
    1144            1 :                 Ok(())
    1145            1 :             }),
    1146            1 :             discovered_new_wal: None,
    1147            1 :         });
    1148            1 :         state.wal_stream_candidates = HashMap::from([
    1149            1 :             (
    1150            1 :                 connected_sk_id,
    1151            1 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1152            1 :             ),
    1153            1 :             (
    1154            1 :                 NodeId(1),
    1155            1 :                 dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
    1156            1 :             ),
    1157            1 :         ]);
    1158            1 : 
    1159            1 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1160            1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1161            1 :         );
    1162            1 : 
    1163            1 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1));
    1164            1 :         assert_eq!(
    1165            1 :             over_threshcurrent_candidate.reason,
    1166            1 :             ReconnectReason::LaggingWal {
    1167            1 :                 current_commit_lsn: current_lsn,
    1168            1 :                 new_commit_lsn: new_lsn,
    1169            1 :                 threshold: state.conf.max_lsn_wal_lag
    1170            1 :             },
    1171            0 :             "Should select bigger WAL safekeeper if it starts to lag enough"
    1172              :         );
    1173            1 :         assert_eq!(
    1174            1 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1175            1 :             &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
    1176            1 :         );
    1177              : 
    1178            1 :         Ok(())
    1179              :     }
    1180              : 
    1181            1 :     #[tokio::test]
    1182            1 :     async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
    1183            1 :         let harness = TenantHarness::create("timeout_connection_threshold_current_candidate")?;
    1184            3 :         let mut state = dummy_state(&harness).await;
    1185            1 :         let current_lsn = Lsn(100_000).align();
    1186            1 :         let now = Utc::now().naive_utc();
    1187              : 
    1188            1 :         let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
    1189            1 :         let time_over_threshold =
    1190            1 :             Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
    1191            1 : 
    1192            1 :         let connection_status = WalConnectionStatus {
    1193            1 :             is_connected: true,
    1194            1 :             has_processed_wal: true,
    1195            1 :             latest_connection_update: time_over_threshold,
    1196            1 :             latest_wal_update: time_over_threshold,
    1197            1 :             commit_lsn: Some(current_lsn),
    1198            1 :             streaming_lsn: Some(current_lsn),
    1199            1 :             node: NodeId(1),
    1200            1 :         };
    1201            1 : 
    1202            1 :         state.wal_connection = Some(WalConnection {
    1203            1 :             started_at: now,
    1204            1 :             sk_id: NodeId(1),
    1205            1 :             availability_zone: None,
    1206            1 :             status: connection_status,
    1207            1 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
    1208            1 :                 sender
    1209            1 :                     .send(TaskStateUpdate::Progress(connection_status))
    1210            1 :                     .ok();
    1211            1 :                 Ok(())
    1212            1 :             }),
    1213            1 :             discovered_new_wal: None,
    1214            1 :         });
    1215            1 :         state.wal_stream_candidates = HashMap::from([(
    1216            1 :             NodeId(0),
    1217            1 :             dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1218            1 :         )]);
    1219            1 : 
    1220            1 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1221            1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1222            1 :         );
    1223            1 : 
    1224            1 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
    1225            1 :         match over_threshcurrent_candidate.reason {
    1226              :             ReconnectReason::NoKeepAlives {
    1227            1 :                 last_keep_alive,
    1228            1 :                 threshold,
    1229            1 :                 ..
    1230            1 :             } => {
    1231            1 :                 assert_eq!(last_keep_alive, Some(time_over_threshold));
    1232            1 :                 assert_eq!(threshold, state.conf.lagging_wal_timeout);
    1233              :             }
    1234            0 :             unexpected => panic!("Unexpected reason: {unexpected:?}"),
    1235              :         }
    1236            1 :         assert_eq!(
    1237            1 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1238            1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1239            1 :         );
    1240              : 
    1241            1 :         Ok(())
    1242              :     }
    1243              : 
    1244            1 :     #[tokio::test]
    1245            1 :     async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
    1246            1 :         let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate")?;
    1247            3 :         let mut state = dummy_state(&harness).await;
    1248            1 :         let current_lsn = Lsn(100_000).align();
    1249            1 :         let new_lsn = Lsn(100_100).align();
    1250            1 :         let now = Utc::now().naive_utc();
    1251              : 
    1252            1 :         let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
    1253            1 :         let time_over_threshold =
    1254            1 :             Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
    1255            1 : 
    1256            1 :         let connection_status = WalConnectionStatus {
    1257            1 :             is_connected: true,
    1258            1 :             has_processed_wal: true,
    1259            1 :             latest_connection_update: now,
    1260            1 :             latest_wal_update: time_over_threshold,
    1261            1 :             commit_lsn: Some(current_lsn),
    1262            1 :             streaming_lsn: Some(current_lsn),
    1263            1 :             node: NodeId(1),
    1264            1 :         };
    1265            1 : 
    1266            1 :         state.wal_connection = Some(WalConnection {
    1267            1 :             started_at: now,
    1268            1 :             sk_id: NodeId(1),
    1269            1 :             availability_zone: None,
    1270            1 :             status: connection_status,
    1271            1 :             connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
    1272            1 :             discovered_new_wal: Some(NewCommittedWAL {
    1273            1 :                 discovered_at: time_over_threshold,
    1274            1 :                 lsn: new_lsn,
    1275            1 :             }),
    1276            1 :         });
    1277            1 :         state.wal_stream_candidates = HashMap::from([(
    1278            1 :             NodeId(0),
    1279            1 :             dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1280            1 :         )]);
    1281            1 : 
    1282            1 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1283            1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1284            1 :         );
    1285            1 : 
    1286            1 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
    1287            1 :         match over_threshcurrent_candidate.reason {
    1288              :             ReconnectReason::NoWalTimeout {
    1289            1 :                 current_lsn,
    1290            1 :                 current_commit_lsn,
    1291            1 :                 candidate_commit_lsn,
    1292            1 :                 last_wal_interaction,
    1293            1 :                 threshold,
    1294            1 :                 ..
    1295            1 :             } => {
    1296            1 :                 assert_eq!(current_lsn, current_lsn);
    1297            1 :                 assert_eq!(current_commit_lsn, current_lsn);
    1298            1 :                 assert_eq!(candidate_commit_lsn, new_lsn);
    1299            1 :                 assert_eq!(last_wal_interaction, Some(time_over_threshold));
    1300            1 :                 assert_eq!(threshold, state.conf.lagging_wal_timeout);
    1301              :             }
    1302            0 :             unexpected => panic!("Unexpected reason: {unexpected:?}"),
    1303              :         }
    1304            1 :         assert_eq!(
    1305            1 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1306            1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1307            1 :         );
    1308              : 
    1309            1 :         Ok(())
    1310              :     }
    1311              : 
    1312              :     const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
    1313              : 
    1314            8 :     async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
    1315            8 :         let (tenant, ctx) = harness.load().await;
    1316            8 :         let timeline = tenant
    1317            8 :             .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
    1318           16 :             .await
    1319            8 :             .expect("Failed to create an empty timeline for dummy wal connection manager");
    1320            8 : 
    1321            8 :         ConnectionManagerState {
    1322            8 :             id: TenantTimelineId {
    1323            8 :                 tenant_id: harness.tenant_id,
    1324            8 :                 timeline_id: TIMELINE_ID,
    1325            8 :             },
    1326            8 :             timeline,
    1327            8 :             conf: WalReceiverConf {
    1328            8 :                 wal_connect_timeout: Duration::from_secs(1),
    1329            8 :                 lagging_wal_timeout: Duration::from_secs(1),
    1330            8 :                 max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
    1331            8 :                 auth_token: None,
    1332            8 :                 availability_zone: None,
    1333            8 :             },
    1334            8 :             wal_connection: None,
    1335            8 :             wal_stream_candidates: HashMap::new(),
    1336            8 :             wal_connection_retries: HashMap::new(),
    1337            8 :         }
    1338            8 :     }
    1339              : 
    1340            1 :     #[tokio::test]
    1341            1 :     async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
    1342            1 :         // Pageserver and one of safekeepers will be in the same availability zone
    1343            1 :         // and pageserver should prefer to connect to it.
    1344            1 :         let test_az = Some("test_az".to_owned());
    1345              : 
    1346            1 :         let harness = TenantHarness::create("switch_to_same_availability_zone")?;
    1347            3 :         let mut state = dummy_state(&harness).await;
    1348            1 :         state.conf.availability_zone = test_az.clone();
    1349            1 :         let current_lsn = Lsn(100_000).align();
    1350            1 :         let now = Utc::now().naive_utc();
    1351            1 : 
    1352            1 :         let connected_sk_id = NodeId(0);
    1353            1 : 
    1354            1 :         let connection_status = WalConnectionStatus {
    1355            1 :             is_connected: true,
    1356            1 :             has_processed_wal: true,
    1357            1 :             latest_connection_update: now,
    1358            1 :             latest_wal_update: now,
    1359            1 :             commit_lsn: Some(current_lsn),
    1360            1 :             streaming_lsn: Some(current_lsn),
    1361            1 :             node: connected_sk_id,
    1362            1 :         };
    1363            1 : 
    1364            1 :         state.wal_connection = Some(WalConnection {
    1365            1 :             started_at: now,
    1366            1 :             sk_id: connected_sk_id,
    1367            1 :             availability_zone: None,
    1368            1 :             status: connection_status,
    1369            1 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
    1370            1 :                 sender
    1371            1 :                     .send(TaskStateUpdate::Progress(connection_status))
    1372            1 :                     .ok();
    1373            1 :                 Ok(())
    1374            1 :             }),
    1375            1 :             discovered_new_wal: None,
    1376            1 :         });
    1377            1 : 
    1378            1 :         // We have another safekeeper with the same commit_lsn, and it have the same availability zone as
    1379            1 :         // the current pageserver.
    1380            1 :         let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
    1381            1 :         same_az_sk.timeline.availability_zone = test_az.clone();
    1382            1 : 
    1383            1 :         state.wal_stream_candidates = HashMap::from([
    1384            1 :             (
    1385            1 :                 connected_sk_id,
    1386            1 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1387            1 :             ),
    1388            1 :             (NodeId(1), same_az_sk),
    1389            1 :         ]);
    1390            1 : 
    1391            1 :         // We expect that pageserver will switch to the safekeeper in the same availability zone,
    1392            1 :         // even if it has the same commit_lsn.
    1393            1 :         let next_candidate = state.next_connection_candidate().expect(
    1394            1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1395            1 :         );
    1396            1 : 
    1397            1 :         assert_eq!(next_candidate.safekeeper_id, NodeId(1));
    1398            1 :         assert_eq!(
    1399              :             next_candidate.reason,
    1400              :             ReconnectReason::SwitchAvailabilityZone,
    1401            0 :             "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
    1402              :         );
    1403            1 :         assert_eq!(
    1404            1 :             next_candidate.wal_source_connconf.host(),
    1405            1 :             &Host::Domain("same_az".to_owned())
    1406            1 :         );
    1407              : 
    1408            1 :         Ok(())
    1409              :     }
    1410              : }
        

Generated by: LCOV version 2.1-beta