LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/timeline/walreceiver - connection_manager.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 93.5 % 1020 954 66 954
Current Date: 2024-01-09 02:06:09 Functions: 75.8 % 99 75 24 75
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
       2                 : //! that contains the latest WAL to stream and this connection does not go stale.
       3                 : //!
       4                 : //! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
       5                 : //! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
       6                 : //! Current connection state is tracked too, to ensure it's not getting stale.
       7                 : //!
       8                 : //! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
       9                 : //! then a (re)connection happens, if necessary.
      10                 : //! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
      11                 : 
      12                 : use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
      13                 : 
      14                 : use super::{TaskStateUpdate, WalReceiverConf};
      15                 : use crate::context::{DownloadBehavior, RequestContext};
      16                 : use crate::metrics::{
      17                 :     WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
      18                 :     WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
      19                 : };
      20                 : use crate::task_mgr::{shutdown_token, TaskKind};
      21                 : use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
      22                 : use anyhow::Context;
      23                 : use chrono::{NaiveDateTime, Utc};
      24                 : use pageserver_api::models::TimelineState;
      25                 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
      26                 : use storage_broker::proto::SafekeeperTimelineInfo;
      27                 : use storage_broker::proto::SubscribeSafekeeperInfoRequest;
      28                 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      29                 : use storage_broker::{BrokerClientChannel, Code, Streaming};
      30                 : use tokio::select;
      31                 : use tracing::*;
      32                 : 
      33                 : use postgres_connection::PgConnectionConfig;
      34                 : use utils::backoff::{
      35                 :     exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
      36                 : };
      37                 : use utils::postgres_client::wal_stream_connection_config;
      38                 : use utils::{
      39                 :     id::{NodeId, TenantTimelineId},
      40                 :     lsn::Lsn,
      41                 : };
      42                 : 
      43                 : use super::{
      44                 :     walreceiver_connection::WalConnectionStatus, walreceiver_connection::WalReceiverError,
      45                 :     TaskEvent, TaskHandle,
      46                 : };
      47                 : 
      48                 : /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
      49                 : /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
      50                 : /// If storage broker subscription is cancelled, exits.
      51 CBC        1102 : pub(super) async fn connection_manager_loop_step(
      52            1102 :     broker_client: &mut BrokerClientChannel,
      53            1102 :     connection_manager_state: &mut ConnectionManagerState,
      54            1102 :     ctx: &RequestContext,
      55            1102 :     manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
      56            1102 : ) -> ControlFlow<(), ()> {
      57            1102 :     match connection_manager_state
      58            1102 :         .timeline
      59            1102 :         .wait_to_become_active(ctx)
      60              26 :         .await
      61                 :     {
      62            1102 :         Ok(()) => {}
      63 UBC           0 :         Err(new_state) => {
      64               0 :             debug!(
      65               0 :                 ?new_state,
      66               0 :                 "state changed, stopping wal connection manager loop"
      67               0 :             );
      68               0 :             return ControlFlow::Break(());
      69                 :         }
      70                 :     }
      71                 : 
      72 CBC        1102 :     WALRECEIVER_ACTIVE_MANAGERS.inc();
      73             471 :     scopeguard::defer! {
      74             471 :         WALRECEIVER_ACTIVE_MANAGERS.dec();
      75             471 :     }
      76                 : 
      77            1102 :     let id = TenantTimelineId {
      78            1102 :         tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
      79            1102 :         timeline_id: connection_manager_state.timeline.timeline_id,
      80            1102 :     };
      81            1102 : 
      82            1102 :     let mut timeline_state_updates = connection_manager_state
      83            1102 :         .timeline
      84            1102 :         .subscribe_for_state_updates();
      85                 : 
      86                 :     // Subscribe to the broker updates. Stream shares underlying TCP connection
      87                 :     // with other streams on this client (other connection managers). When
      88                 :     // object goes out of scope, stream finishes in drop() automatically.
      89            2202 :     let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
      90 UBC           0 :     debug!("Subscribed for broker timeline updates");
      91                 : 
      92                 :     loop {
      93 CBC      525937 :         let time_until_next_retry = connection_manager_state.time_until_next_retry();
      94          525937 : 
      95          525937 :         // These things are happening concurrently:
      96          525937 :         //
      97          525937 :         //  - keep receiving WAL on the current connection
      98          525937 :         //      - if the shared state says we need to change connection, disconnect and return
      99          525937 :         //      - this runs in a separate task and we receive updates via a watch channel
     100          525937 :         //  - change connection if the rules decide so, or if the current connection dies
     101          525937 :         //  - receive updates from broker
     102          525937 :         //      - this might change the current desired connection
     103          525937 :         //  - timeline state changes to something that does not allow walreceiver to run concurrently
     104         1040419 :         select! {
     105          525789 :             Some(wal_connection_update) = async {
     106          525789 :                 match connection_manager_state.wal_connection.as_mut() {
     107          523916 :                     Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
     108            1873 :                     None => None,
     109                 :                 }
     110          518108 :             } => {
     111                 :                 let wal_connection = connection_manager_state.wal_connection.as_mut()
     112                 :                     .expect("Should have a connection, as checked by the corresponding select! guard");
     113                 :                 match wal_connection_update {
     114                 :                     TaskEvent::Update(TaskStateUpdate::Started) => {},
     115                 :                     TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
     116                 :                         if new_status.has_processed_wal {
     117                 :                             // We have advanced last_record_lsn by processing the WAL received
     118                 :                             // from this safekeeper. This is good enough to clean unsuccessful
     119                 :                             // retries history and allow reconnecting to this safekeeper without
     120                 :                             // sleeping for a long time.
     121                 :                             connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
     122                 :                         }
     123                 :                         wal_connection.status = new_status;
     124                 :                     }
     125                 :                     TaskEvent::End(walreceiver_task_result) => {
     126                 :                         match walreceiver_task_result {
     127 UBC           0 :                             Ok(()) => debug!("WAL receiving task finished"),
     128 CBC           8 :                             Err(e) => error!("wal receiver task finished with an error: {e:?}"),
     129                 :                         }
     130                 :                         connection_manager_state.drop_old_connection(false).await;
     131                 :                     },
     132                 :                 }
     133                 :             },
     134                 : 
     135                 :             // Got a new update from the broker
     136            8081 :             broker_update = broker_subscription.message() => {
     137                 :                 match broker_update {
     138                 :                     Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
     139                 :                     Err(status) => {
     140                 :                         match status.code() {
     141                 :                             Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") => {
     142                 :                                 // tonic's error handling doesn't provide a clear code for disconnections: we get
     143                 :                                 // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
     144 UBC           0 :                                 info!("broker disconnected: {status}");
     145                 :                             },
     146                 :                             _ => {
     147               0 :                                 warn!("broker subscription failed: {status}");
     148                 :                             }
     149                 :                         }
     150                 :                         return ControlFlow::Continue(());
     151                 :                     }
     152                 :                     Ok(None) => {
     153               0 :                         error!("broker subscription stream ended"); // can't happen
     154                 :                         return ControlFlow::Continue(());
     155                 :                     }
     156                 :                 }
     157                 :             },
     158                 : 
     159 CBC      520989 :             new_event = async {
     160          520989 :                 loop {
     161          520989 :                     if connection_manager_state.timeline.current_state() == TimelineState::Loading {
     162 UBC           0 :                         warn!("wal connection manager should only be launched after timeline has become active");
     163 CBC      520989 :                     }
     164          520989 :                     match timeline_state_updates.changed().await {
     165                 :                         Ok(()) => {
     166             430 :                             let new_state = connection_manager_state.timeline.current_state();
     167             430 :                             match new_state {
     168                 :                                 // we're already active as walreceiver, no need to reactivate
     169 UBC           0 :                                 TimelineState::Active => continue,
     170                 :                                 TimelineState::Broken { .. } | TimelineState::Stopping => {
     171 CBC         430 :                                     debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
     172             430 :                                     return ControlFlow::Break(());
     173                 :                                 }
     174                 :                                 TimelineState::Loading => {
     175 UBC           0 :                                     warn!("timeline transitioned back to Loading state, that should not happen");
     176               0 :                                     return ControlFlow::Continue(());
     177                 :                                 }
     178                 :                             }
     179                 :                         }
     180               0 :                         Err(_sender_dropped_error) => return ControlFlow::Break(()),
     181                 :                     }
     182                 :                 }
     183 CBC         430 :             } => match new_event {
     184                 :                 ControlFlow::Continue(()) => {
     185                 :                     return ControlFlow::Continue(());
     186                 :                 }
     187                 :                 ControlFlow::Break(()) => {
     188 UBC           0 :                     debug!("Timeline is no longer active, stopping wal connection manager loop");
     189                 :                     return ControlFlow::Break(());
     190                 :                 }
     191                 :             },
     192                 : 
     193 CBC      523400 :             Some(()) = async {
     194          523400 :                 match time_until_next_retry {
     195            1271 :                     Some(sleep_time) => {
     196            1271 :                         tokio::time::sleep(sleep_time).await;
     197             523 :                         Some(())
     198                 :                     },
     199                 :                     None => {
     200          522129 :                         debug!("No candidates to retry, waiting indefinitely for the broker events");
     201          522129 :                         None
     202                 :                     }
     203                 :                 }
     204          522652 :             } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
     205                 :         }
     206                 : 
     207          524838 :         if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
     208            1529 :             info!("Switching to new connection candidate: {new_candidate:?}");
     209            1529 :             connection_manager_state
     210            1529 :                 .change_connection(new_candidate, ctx)
     211              35 :                 .await
     212          523309 :         }
     213          524838 :         *manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
     214                 :     }
     215             430 : }
     216                 : 
     217                 : /// Endlessly try to subscribe for broker updates for a given timeline.
     218            1102 : async fn subscribe_for_timeline_updates(
     219            1102 :     broker_client: &mut BrokerClientChannel,
     220            1102 :     id: TenantTimelineId,
     221            1102 : ) -> Streaming<SafekeeperTimelineInfo> {
     222            1102 :     let mut attempt = 0;
     223            1102 :     let cancel = shutdown_token();
     224                 : 
     225                 :     loop {
     226            1102 :         exponential_backoff(
     227            1102 :             attempt,
     228            1102 :             DEFAULT_BASE_BACKOFF_SECONDS,
     229            1102 :             DEFAULT_MAX_BACKOFF_SECONDS,
     230            1102 :             &cancel,
     231            1102 :         )
     232 UBC           0 :         .await;
     233 CBC        1102 :         attempt += 1;
     234            1102 : 
     235            1102 :         // subscribe to the specific timeline
     236            1102 :         let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
     237            1102 :             tenant_id: id.tenant_id.as_ref().to_owned(),
     238            1102 :             timeline_id: id.timeline_id.as_ref().to_owned(),
     239            1102 :         });
     240            1102 :         let request = SubscribeSafekeeperInfoRequest {
     241            1102 :             subscription_key: Some(key),
     242            1102 :         };
     243            1102 : 
     244            2202 :         match broker_client.subscribe_safekeeper_info(request).await {
     245            1099 :             Ok(resp) => {
     246            1099 :                 return resp.into_inner();
     247                 :             }
     248 UBC           0 :             Err(e) => {
     249                 :                 // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
     250                 :                 // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
     251               0 :                 info!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}");
     252               0 :                 continue;
     253                 :             }
     254                 :         }
     255                 :     }
     256 CBC        1099 : }
     257                 : 
     258                 : const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
     259                 : const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
     260                 : const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
     261                 : 
     262                 : /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
     263                 : pub(super) struct ConnectionManagerState {
     264                 :     id: TenantTimelineId,
     265                 :     /// Use pageserver data about the timeline to filter out some of the safekeepers.
     266                 :     timeline: Arc<Timeline>,
     267                 :     conf: WalReceiverConf,
     268                 :     /// Current connection to safekeeper for WAL streaming.
     269                 :     wal_connection: Option<WalConnection>,
     270                 :     /// Info about retries and unsuccessful attempts to connect to safekeepers.
     271                 :     wal_connection_retries: HashMap<NodeId, RetryInfo>,
     272                 :     /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
     273                 :     wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
     274                 : }
     275                 : 
     276                 : /// An information about connection manager's current connection and connection candidates.
     277            1812 : #[derive(Debug, Clone)]
     278                 : pub struct ConnectionManagerStatus {
     279                 :     existing_connection: Option<WalConnectionStatus>,
     280                 :     wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
     281                 : }
     282                 : 
     283                 : impl ConnectionManagerStatus {
     284                 :     /// Generates a string, describing current connection status in a form, suitable for logging.
     285            1812 :     pub fn to_human_readable_string(&self) -> String {
     286            1812 :         let mut resulting_string = String::new();
     287            1812 :         match &self.existing_connection {
     288            1770 :             Some(connection) => {
     289            1770 :                 if connection.has_processed_wal {
     290            1720 :                     resulting_string.push_str(&format!(
     291            1720 :                         " (update {}): streaming WAL from node {}, ",
     292            1720 :                         connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
     293            1720 :                         connection.node,
     294            1720 :                     ));
     295            1720 : 
     296            1720 :                     match (connection.streaming_lsn, connection.commit_lsn) {
     297 UBC           0 :                         (None, None) => resulting_string.push_str("no streaming data"),
     298               0 :                         (None, Some(commit_lsn)) => {
     299               0 :                             resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
     300                 :                         }
     301               0 :                         (Some(streaming_lsn), None) => {
     302               0 :                             resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
     303                 :                         }
     304 CBC        1720 :                         (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
     305            1720 :                             &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
     306            1720 :                         ),
     307                 :                     }
     308              50 :                 } else if connection.is_connected {
     309              50 :                     resulting_string.push_str(&format!(
     310              50 :                         " (update {}): connecting to node {}",
     311              50 :                         connection
     312              50 :                             .latest_connection_update
     313              50 :                             .format("%Y-%m-%d %H:%M:%S"),
     314              50 :                         connection.node,
     315              50 :                     ));
     316              50 :                 } else {
     317 UBC           0 :                     resulting_string.push_str(&format!(
     318               0 :                         " (update {}): initializing node {} connection",
     319               0 :                         connection
     320               0 :                             .latest_connection_update
     321               0 :                             .format("%Y-%m-%d %H:%M:%S"),
     322               0 :                         connection.node,
     323               0 :                     ));
     324               0 :                 }
     325                 :             }
     326 CBC          42 :             None => resulting_string.push_str(": disconnected"),
     327                 :         }
     328                 : 
     329            1812 :         resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
     330            1812 :         let mut candidates = self.wal_stream_candidates.iter().peekable();
     331            3764 :         while let Some((node_id, candidate_info)) = candidates.next() {
     332            1952 :             resulting_string.push_str(&format!(
     333            1952 :                 "({}|{}|{})",
     334            1952 :                 node_id,
     335            1952 :                 candidate_info.latest_update.format("%H:%M:%S"),
     336            1952 :                 Lsn(candidate_info.timeline.commit_lsn)
     337            1952 :             ));
     338            1952 :             if candidates.peek().is_some() {
     339             141 :                 resulting_string.push_str(", ");
     340            1811 :             }
     341                 :         }
     342            1812 :         resulting_string.push(']');
     343            1812 : 
     344            1812 :         resulting_string
     345            1812 :     }
     346                 : }
     347                 : 
     348                 : /// Current connection data.
     349 UBC           0 : #[derive(Debug)]
     350                 : struct WalConnection {
     351                 :     /// Time when the connection was initiated.
     352                 :     started_at: NaiveDateTime,
     353                 :     /// Current safekeeper pageserver is connected to for WAL streaming.
     354                 :     sk_id: NodeId,
     355                 :     /// Availability zone of the safekeeper.
     356                 :     availability_zone: Option<String>,
     357                 :     /// Status of the connection.
     358                 :     status: WalConnectionStatus,
     359                 :     /// WAL streaming task handle.
     360                 :     connection_task: TaskHandle<WalConnectionStatus>,
     361                 :     /// Have we discovered that other safekeeper has more recent WAL than we do?
     362                 :     discovered_new_wal: Option<NewCommittedWAL>,
     363                 : }
     364                 : 
     365                 : /// Notion of a new committed WAL, which exists on other safekeeper.
     366               0 : #[derive(Debug, Clone, Copy)]
     367                 : struct NewCommittedWAL {
     368                 :     /// LSN of the new committed WAL.
     369                 :     lsn: Lsn,
     370                 :     /// When we discovered that the new committed WAL exists on other safekeeper.
     371                 :     discovered_at: NaiveDateTime,
     372                 : }
     373                 : 
     374               0 : #[derive(Debug, Clone, Copy)]
     375                 : struct RetryInfo {
     376                 :     next_retry_at: Option<NaiveDateTime>,
     377                 :     retry_duration_seconds: f64,
     378                 : }
     379                 : 
     380                 : /// Data about the timeline to connect to, received from the broker.
     381 CBC      849212 : #[derive(Debug, Clone)]
     382                 : struct BrokerSkTimeline {
     383                 :     timeline: SafekeeperTimelineInfo,
     384                 :     /// Time at which the data was fetched from the broker last time, to track the stale data.
     385                 :     latest_update: NaiveDateTime,
     386                 : }
     387                 : 
     388                 : impl ConnectionManagerState {
     389            1102 :     pub(super) fn new(timeline: Arc<Timeline>, conf: WalReceiverConf) -> Self {
     390            1102 :         let id = TenantTimelineId {
     391            1102 :             tenant_id: timeline.tenant_shard_id.tenant_id,
     392            1102 :             timeline_id: timeline.timeline_id,
     393            1102 :         };
     394            1102 :         Self {
     395            1102 :             id,
     396            1102 :             timeline,
     397            1102 :             conf,
     398            1102 :             wal_connection: None,
     399            1102 :             wal_stream_candidates: HashMap::new(),
     400            1102 :             wal_connection_retries: HashMap::new(),
     401            1102 :         }
     402            1102 :     }
     403                 : 
     404                 :     /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
     405            1529 :     async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
     406            1529 :         WALRECEIVER_SWITCHES
     407            1529 :             .with_label_values(&[new_sk.reason.name()])
     408            1529 :             .inc();
     409            1529 : 
     410            1529 :         self.drop_old_connection(true).await;
     411                 : 
     412            1529 :         let node_id = new_sk.safekeeper_id;
     413            1529 :         let connect_timeout = self.conf.wal_connect_timeout;
     414            1529 :         let ingest_batch_size = self.conf.ingest_batch_size;
     415            1529 :         let timeline = Arc::clone(&self.timeline);
     416            1529 :         let ctx = ctx.detached_child(
     417            1529 :             TaskKind::WalReceiverConnectionHandler,
     418            1529 :             DownloadBehavior::Download,
     419            1529 :         );
     420                 : 
     421            1529 :         let span = info_span!("connection", %node_id);
     422            1529 :         let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
     423            1529 :             async move {
     424            1529 :                 debug_assert_current_span_has_tenant_and_timeline_id();
     425                 : 
     426            1529 :                 let res = super::walreceiver_connection::handle_walreceiver_connection(
     427            1529 :                     timeline,
     428            1529 :                     new_sk.wal_source_connconf,
     429            1529 :                     events_sender,
     430            1529 :                     cancellation.clone(),
     431            1529 :                     connect_timeout,
     432            1529 :                     ctx,
     433            1529 :                     node_id,
     434            1529 :                     ingest_batch_size,
     435            1529 :                 )
     436          629712 :                 .await;
     437                 : 
     438            1476 :                 match res {
     439             208 :                     Ok(()) => Ok(()),
     440            1268 :                     Err(e) => {
     441            1268 :                         match e {
     442              48 :                             WalReceiverError::SuccessfulCompletion(msg) => {
     443              48 :                                 info!("walreceiver connection handling ended with success: {msg}");
     444              48 :                                 Ok(())
     445                 :                             }
     446            1205 :                             WalReceiverError::ExpectedSafekeeperError(e) => {
     447            1205 :                                 info!("walreceiver connection handling ended: {e}");
     448            1205 :                                 Ok(())
     449                 :                             }
     450              15 :                             WalReceiverError::Other(e) => {
     451              15 :                                 // give out an error to have task_mgr give it a really verbose logging
     452              15 :                                 if cancellation.is_cancelled() {
     453                 :                                     // Ideally we would learn about this via some path other than Other, but
     454                 :                                     // that requires refactoring all the intermediate layers of ingest code
     455                 :                                     // that only emit anyhow::Error
     456               7 :                                     Ok(())
     457                 :                                 } else {
     458               8 :                                     Err(e).context("walreceiver connection handling failure")
     459                 :                                 }
     460                 :                             }
     461                 :                         }
     462                 :                     }
     463                 :                 }
     464            1476 :             }
     465            1529 :             .instrument(span)
     466            1529 :         });
     467            1529 : 
     468            1529 :         let now = Utc::now().naive_utc();
     469            1529 :         self.wal_connection = Some(WalConnection {
     470            1529 :             started_at: now,
     471            1529 :             sk_id: new_sk.safekeeper_id,
     472            1529 :             availability_zone: new_sk.availability_zone,
     473            1529 :             status: WalConnectionStatus {
     474            1529 :                 is_connected: false,
     475            1529 :                 has_processed_wal: false,
     476            1529 :                 latest_connection_update: now,
     477            1529 :                 latest_wal_update: now,
     478            1529 :                 streaming_lsn: None,
     479            1529 :                 commit_lsn: None,
     480            1529 :                 node: node_id,
     481            1529 :             },
     482            1529 :             connection_task: connection_handle,
     483            1529 :             discovered_new_wal: None,
     484            1529 :         });
     485            1529 :     }
     486                 : 
     487                 :     /// Drops the current connection (if any) and updates retry timeout for the next
     488                 :     /// connection attempt to the same safekeeper.
     489            2711 :     async fn drop_old_connection(&mut self, needs_shutdown: bool) {
     490            2711 :         let wal_connection = match self.wal_connection.take() {
     491            1217 :             Some(wal_connection) => wal_connection,
     492            1494 :             None => return,
     493                 :         };
     494                 : 
     495            1217 :         if needs_shutdown {
     496              35 :             wal_connection.connection_task.shutdown().await;
     497            1182 :         }
     498                 : 
     499            1217 :         let retry = self
     500            1217 :             .wal_connection_retries
     501            1217 :             .entry(wal_connection.sk_id)
     502            1217 :             .or_insert(RetryInfo {
     503            1217 :                 next_retry_at: None,
     504            1217 :                 retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
     505            1217 :             });
     506            1217 : 
     507            1217 :         let now = Utc::now().naive_utc();
     508            1217 : 
     509            1217 :         // Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
     510            1217 :         // and we add backoff to the time when we started the connection attempt. If the connection
     511            1217 :         // was active for a long time, then next_retry_at will be in the past.
     512            1217 :         retry.next_retry_at =
     513            1217 :             wal_connection
     514            1217 :                 .started_at
     515            1217 :                 .checked_add_signed(chrono::Duration::milliseconds(
     516            1217 :                     (retry.retry_duration_seconds * 1000.0) as i64,
     517            1217 :                 ));
     518                 : 
     519            1217 :         if let Some(next) = &retry.next_retry_at {
     520            1217 :             if next > &now {
     521             853 :                 info!(
     522             853 :                     "Next connection retry to {:?} is at {}",
     523             853 :                     wal_connection.sk_id, next
     524             853 :                 );
     525             364 :             }
     526 UBC           0 :         }
     527                 : 
     528 CBC        1217 :         let next_retry_duration =
     529            1217 :             retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
     530            1217 :         // Clamp the next retry duration to the maximum allowed.
     531            1217 :         let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
     532            1217 :         // Clamp the next retry duration to the minimum allowed.
     533            1217 :         let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
     534            1217 : 
     535            1217 :         retry.retry_duration_seconds = next_retry_duration;
     536            2711 :     }
     537                 : 
     538                 :     /// Returns time needed to wait to have a new candidate for WAL streaming.
     539          525937 :     fn time_until_next_retry(&self) -> Option<Duration> {
     540          525937 :         let now = Utc::now().naive_utc();
     541                 : 
     542          525937 :         let next_retry_at = self
     543          525937 :             .wal_connection_retries
     544          525937 :             .values()
     545          525937 :             .filter_map(|retry| retry.next_retry_at)
     546          525937 :             .filter(|next_retry_at| next_retry_at > &now)
     547          525937 :             .min()?;
     548                 : 
     549            1271 :         (next_retry_at - now).to_std().ok()
     550          525937 :     }
     551                 : 
     552                 :     /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
     553            8081 :     fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
     554            8081 :         WALRECEIVER_BROKER_UPDATES.inc();
     555            8081 : 
     556            8081 :         let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
     557            8081 :         let old_entry = self.wal_stream_candidates.insert(
     558            8081 :             new_safekeeper_id,
     559            8081 :             BrokerSkTimeline {
     560            8081 :                 timeline: timeline_update,
     561            8081 :                 latest_update: Utc::now().naive_utc(),
     562            8081 :             },
     563            8081 :         );
     564            8081 : 
     565            8081 :         if old_entry.is_none() {
     566             632 :             info!("New SK node was added: {new_safekeeper_id}");
     567             632 :             WALRECEIVER_CANDIDATES_ADDED.inc();
     568            7449 :         }
     569            8081 :     }
     570                 : 
     571                 :     /// Cleans up stale broker records and checks the rest for the new connection candidate.
     572                 :     /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
     573                 :     /// The current rules for approving new candidates:
     574                 :     /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
     575                 :     /// * if there's no such entry, no new candidate found, abort
     576                 :     /// * otherwise check if the candidate is much better than the current one
     577                 :     ///
     578                 :     /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
     579                 :     /// General rules are following:
     580                 :     /// * if connected safekeeper is not present, pick the candidate
     581                 :     /// * if we haven't received any updates for some time, pick the candidate
     582                 :     /// * if the candidate commit_lsn is much higher than the current one, pick the candidate
     583                 :     /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
     584                 :     /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
     585                 :     ///
     586                 :     /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
     587                 :     /// Both thresholds are configured per tenant.
     588          524847 :     fn next_connection_candidate(&mut self) -> Option<NewWalConnectionCandidate> {
     589          524847 :         self.cleanup_old_candidates();
     590          524847 : 
     591          524847 :         match &self.wal_connection {
     592          522571 :             Some(existing_wal_connection) => {
     593          522571 :                 let connected_sk_node = existing_wal_connection.sk_id;
     594                 : 
     595          182705 :                 let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
     596          522571 :                     self.select_connection_candidate(Some(connected_sk_node))?;
     597          182705 :                 let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
     598          182705 : 
     599          182705 :                 let now = Utc::now().naive_utc();
     600          182705 :                 if let Ok(latest_interaciton) =
     601          182705 :                     (now - existing_wal_connection.status.latest_connection_update).to_std()
     602                 :                 {
     603                 :                     // Drop connection if we haven't received keepalive message for a while.
     604          182705 :                     if latest_interaciton > self.conf.wal_connect_timeout {
     605               1 :                         return Some(NewWalConnectionCandidate {
     606               1 :                             safekeeper_id: new_sk_id,
     607               1 :                             wal_source_connconf: new_wal_source_connconf,
     608               1 :                             availability_zone: new_availability_zone,
     609               1 :                             reason: ReconnectReason::NoKeepAlives {
     610               1 :                                 last_keep_alive: Some(
     611               1 :                                     existing_wal_connection.status.latest_connection_update,
     612               1 :                                 ),
     613               1 :                                 check_time: now,
     614               1 :                                 threshold: self.conf.wal_connect_timeout,
     615               1 :                             },
     616               1 :                         });
     617          182704 :                     }
     618 UBC           0 :                 }
     619                 : 
     620 CBC      182704 :                 if !existing_wal_connection.status.is_connected {
     621                 :                     // We haven't connected yet and we shouldn't switch until connection timeout (condition above).
     622             208 :                     return None;
     623          182496 :                 }
     624                 : 
     625          182496 :                 if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
     626          182409 :                     let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
     627          182409 :                     // Check if the new candidate has much more WAL than the current one.
     628          182409 :                     match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
     629            9377 :                         Some(new_sk_lsn_advantage) => {
     630            9377 :                             if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
     631              36 :                                 return Some(NewWalConnectionCandidate {
     632              36 :                                     safekeeper_id: new_sk_id,
     633              36 :                                     wal_source_connconf: new_wal_source_connconf,
     634              36 :                                     availability_zone: new_availability_zone,
     635              36 :                                     reason: ReconnectReason::LaggingWal {
     636              36 :                                         current_commit_lsn,
     637              36 :                                         new_commit_lsn,
     638              36 :                                         threshold: self.conf.max_lsn_wal_lag,
     639              36 :                                     },
     640              36 :                                 });
     641            9341 :                             }
     642            9341 :                             // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
     643            9341 :                             // and the current one is not, switch to the new one.
     644            9341 :                             if self.conf.availability_zone.is_some()
     645             246 :                                 && existing_wal_connection.availability_zone
     646             246 :                                     != self.conf.availability_zone
     647             246 :                                 && self.conf.availability_zone == new_availability_zone
     648                 :                             {
     649               1 :                                 return Some(NewWalConnectionCandidate {
     650               1 :                                     safekeeper_id: new_sk_id,
     651               1 :                                     availability_zone: new_availability_zone,
     652               1 :                                     wal_source_connconf: new_wal_source_connconf,
     653               1 :                                     reason: ReconnectReason::SwitchAvailabilityZone,
     654               1 :                                 });
     655            9340 :                             }
     656                 :                         }
     657          173032 :                         None => debug!(
     658 UBC           0 :                             "Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
     659               0 :                         ),
     660                 :                     }
     661 CBC          87 :                 }
     662                 : 
     663          182459 :                 let current_lsn = match existing_wal_connection.status.streaming_lsn {
     664          182207 :                     Some(lsn) => lsn,
     665             252 :                     None => self.timeline.get_last_record_lsn(),
     666                 :                 };
     667          182459 :                 let current_commit_lsn = existing_wal_connection
     668          182459 :                     .status
     669          182459 :                     .commit_lsn
     670          182459 :                     .unwrap_or(current_lsn);
     671          182459 :                 let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
     672          182459 : 
     673          182459 :                 // Keep discovered_new_wal only if connected safekeeper has not caught up yet.
     674          182459 :                 let mut discovered_new_wal = existing_wal_connection
     675          182459 :                     .discovered_new_wal
     676          182459 :                     .filter(|new_wal| new_wal.lsn > current_commit_lsn);
     677          182459 : 
     678          182459 :                 if discovered_new_wal.is_none() {
     679                 :                     // Check if the new candidate has more WAL than the current one.
     680                 :                     // If the new candidate has more WAL than the current one, we consider switching to the new candidate.
     681          177964 :                     discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
     682             160 :                         trace!(
     683 UBC           0 :                             "New candidate has commit_lsn {}, higher than current_commit_lsn {}",
     684               0 :                             candidate_commit_lsn,
     685               0 :                             current_commit_lsn
     686               0 :                         );
     687 CBC         160 :                         Some(NewCommittedWAL {
     688             160 :                             lsn: candidate_commit_lsn,
     689             160 :                             discovered_at: Utc::now().naive_utc(),
     690             160 :                         })
     691                 :                     } else {
     692          177804 :                         None
     693                 :                     };
     694            4495 :                 }
     695                 : 
     696          182459 :                 let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
     697                 :                     // Connected safekeeper has more WAL, but we haven't received updates for some time.
     698            8412 :                     trace!(
     699 UBC           0 :                         "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
     700               0 :                         (now - existing_wal_connection.status.latest_wal_update).to_std(),
     701               0 :                         current_lsn,
     702               0 :                         current_commit_lsn
     703               0 :                     );
     704 CBC        8412 :                     Some(existing_wal_connection.status.latest_wal_update)
     705                 :                 } else {
     706          174047 :                     discovered_new_wal.as_ref().map(|new_wal| {
     707            1261 :                         // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
     708            1261 :                         new_wal
     709            1261 :                             .discovered_at
     710            1261 :                             .max(existing_wal_connection.status.latest_wal_update)
     711          174047 :                     })
     712                 :                 };
     713                 : 
     714                 :                 // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
     715          182459 :                 if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
     716            9673 :                     if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
     717            9612 :                         if candidate_commit_lsn > current_commit_lsn
     718            4594 :                             && waiting_for_new_wal > self.conf.lagging_wal_timeout
     719                 :                         {
     720               1 :                             return Some(NewWalConnectionCandidate {
     721               1 :                                 safekeeper_id: new_sk_id,
     722               1 :                                 wal_source_connconf: new_wal_source_connconf,
     723               1 :                                 availability_zone: new_availability_zone,
     724               1 :                                 reason: ReconnectReason::NoWalTimeout {
     725               1 :                                     current_lsn,
     726               1 :                                     current_commit_lsn,
     727               1 :                                     candidate_commit_lsn,
     728               1 :                                     last_wal_interaction: Some(
     729               1 :                                         existing_wal_connection.status.latest_wal_update,
     730               1 :                                     ),
     731               1 :                                     check_time: now,
     732               1 :                                     threshold: self.conf.lagging_wal_timeout,
     733               1 :                                 },
     734               1 :                             });
     735            9611 :                         }
     736              61 :                     }
     737          172786 :                 }
     738                 : 
     739          182458 :                 self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
     740                 :             }
     741                 :             None => {
     742            1497 :                 let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
     743            2276 :                     self.select_connection_candidate(None)?;
     744            1497 :                 return Some(NewWalConnectionCandidate {
     745            1497 :                     safekeeper_id: new_sk_id,
     746            1497 :                     availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
     747            1497 :                     wal_source_connconf: new_wal_source_connconf,
     748            1497 :                     reason: ReconnectReason::NoExistingConnection,
     749            1497 :                 });
     750                 :             }
     751                 :         }
     752                 : 
     753          182458 :         None
     754          524847 :     }
     755                 : 
     756                 :     /// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
     757                 :     /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
     758                 :     ///
     759                 :     /// The candidate that is chosen:
     760                 :     /// * has no pending retry cooldown
     761                 :     /// * has greatest commit_lsn among the ones that are left
     762          524847 :     fn select_connection_candidate(
     763          524847 :         &self,
     764          524847 :         node_to_omit: Option<NodeId>,
     765          524847 :     ) -> Option<(NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
     766          524847 :         self.applicable_connection_candidates()
     767          845549 :             .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
     768          524847 :             .max_by_key(|(_, info, _)| info.commit_lsn)
     769          524847 :     }
     770                 : 
     771                 :     /// Returns a list of safekeepers that have valid info and ready for connection.
     772                 :     /// Some safekeepers are filtered by the retry cooldown.
     773          524847 :     fn applicable_connection_candidates(
     774          524847 :         &self,
     775          524847 :     ) -> impl Iterator<Item = (NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
     776          524847 :         let now = Utc::now().naive_utc();
     777          524847 : 
     778          524847 :         self.wal_stream_candidates
     779          524847 :             .iter()
     780          847278 :             .filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
     781          847256 :             .filter(move |(sk_id, _)| {
     782          847256 :                 let next_retry_at = self
     783          847256 :                     .wal_connection_retries
     784          847256 :                     .get(sk_id)
     785          847256 :                     .and_then(|retry_info| {
     786          177006 :                         retry_info.next_retry_at
     787          847256 :                     });
     788          847256 : 
     789          847256 :                 next_retry_at.is_none() || next_retry_at.unwrap() <= now
     790          847256 :             }).filter_map(|(sk_id, broker_info)| {
     791          845551 :                 let info = &broker_info.timeline;
     792          845551 :                 if info.safekeeper_connstr.is_empty() {
     793               2 :                     return None; // no connection string, ignore sk
     794          845549 :                 }
     795          845549 :                 match wal_stream_connection_config(
     796          845549 :                     self.id,
     797          845549 :                     info.safekeeper_connstr.as_ref(),
     798          845549 :                     match &self.conf.auth_token {
     799          844413 :                         None => None,
     800            1136 :                         Some(x) => Some(x),
     801                 :                     },
     802          845549 :                     self.conf.availability_zone.as_deref(),
     803                 :                 ) {
     804          845549 :                     Ok(connstr) => Some((*sk_id, info, connstr)),
     805 UBC           0 :                     Err(e) => {
     806               0 :                         error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
     807               0 :                         None
     808                 :                     }
     809                 :                 }
     810 CBC      845551 :             })
     811          524847 :     }
     812                 : 
     813                 :     /// Remove candidates which haven't sent broker updates for a while.
     814          524847 :     fn cleanup_old_candidates(&mut self) {
     815          524847 :         let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
     816          524847 :         let lagging_wal_timeout = self.conf.lagging_wal_timeout;
     817          524847 : 
     818          847289 :         self.wal_stream_candidates.retain(|node_id, broker_info| {
     819          847289 :             if let Ok(time_since_latest_broker_update) =
     820          847289 :                 (Utc::now().naive_utc() - broker_info.latest_update).to_std()
     821                 :             {
     822          847289 :                 let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
     823          847289 :                 if !should_retain {
     824              11 :                     node_ids_to_remove.push(*node_id);
     825          847278 :                 }
     826          847289 :                 should_retain
     827                 :             } else {
     828 UBC           0 :                 true
     829                 :             }
     830 CBC      847289 :         });
     831          524847 : 
     832          524847 :         if !node_ids_to_remove.is_empty() {
     833              22 :             for node_id in node_ids_to_remove {
     834              11 :                 info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
     835              11 :                 self.wal_connection_retries.remove(&node_id);
     836              11 :                 WALRECEIVER_CANDIDATES_REMOVED.inc();
     837                 :             }
     838          524836 :         }
     839          524847 :     }
     840                 : 
     841             471 :     pub(super) async fn shutdown(mut self) {
     842             471 :         if let Some(wal_connection) = self.wal_connection.take() {
     843             291 :             wal_connection.connection_task.shutdown().await;
     844             213 :         }
     845             471 :     }
     846                 : 
     847          524838 :     fn manager_status(&self) -> ConnectionManagerStatus {
     848          524838 :         ConnectionManagerStatus {
     849          524838 :             existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
     850          524838 :             wal_stream_candidates: self.wal_stream_candidates.clone(),
     851          524838 :         }
     852          524838 :     }
     853                 : }
     854                 : 
     855            1529 : #[derive(Debug)]
     856                 : struct NewWalConnectionCandidate {
     857                 :     safekeeper_id: NodeId,
     858                 :     wal_source_connconf: PgConnectionConfig,
     859                 :     availability_zone: Option<String>,
     860                 :     reason: ReconnectReason,
     861                 : }
     862                 : 
     863                 : /// Stores the reason why WAL connection was switched, for furter debugging purposes.
     864            1529 : #[derive(Debug, PartialEq, Eq)]
     865                 : enum ReconnectReason {
     866                 :     NoExistingConnection,
     867                 :     LaggingWal {
     868                 :         current_commit_lsn: Lsn,
     869                 :         new_commit_lsn: Lsn,
     870                 :         threshold: NonZeroU64,
     871                 :     },
     872                 :     SwitchAvailabilityZone,
     873                 :     NoWalTimeout {
     874                 :         current_lsn: Lsn,
     875                 :         current_commit_lsn: Lsn,
     876                 :         candidate_commit_lsn: Lsn,
     877                 :         last_wal_interaction: Option<NaiveDateTime>,
     878                 :         check_time: NaiveDateTime,
     879                 :         threshold: Duration,
     880                 :     },
     881                 :     NoKeepAlives {
     882                 :         last_keep_alive: Option<NaiveDateTime>,
     883                 :         check_time: NaiveDateTime,
     884                 :         threshold: Duration,
     885                 :     },
     886                 : }
     887                 : 
     888                 : impl ReconnectReason {
     889            1529 :     fn name(&self) -> &str {
     890            1529 :         match self {
     891            1494 :             ReconnectReason::NoExistingConnection => "NoExistingConnection",
     892              35 :             ReconnectReason::LaggingWal { .. } => "LaggingWal",
     893 UBC           0 :             ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
     894               0 :             ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
     895               0 :             ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
     896                 :         }
     897 CBC        1529 :     }
     898                 : }
     899                 : 
     900                 : #[cfg(test)]
     901                 : mod tests {
     902                 :     use super::*;
     903                 :     use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
     904                 :     use url::Host;
     905                 : 
     906              19 :     fn dummy_broker_sk_timeline(
     907              19 :         commit_lsn: u64,
     908              19 :         safekeeper_connstr: &str,
     909              19 :         latest_update: NaiveDateTime,
     910              19 :     ) -> BrokerSkTimeline {
     911              19 :         BrokerSkTimeline {
     912              19 :             timeline: SafekeeperTimelineInfo {
     913              19 :                 safekeeper_id: 0,
     914              19 :                 tenant_timeline_id: None,
     915              19 :                 term: 0,
     916              19 :                 last_log_term: 0,
     917              19 :                 flush_lsn: 0,
     918              19 :                 commit_lsn,
     919              19 :                 backup_lsn: 0,
     920              19 :                 remote_consistent_lsn: 0,
     921              19 :                 peer_horizon_lsn: 0,
     922              19 :                 local_start_lsn: 0,
     923              19 :                 safekeeper_connstr: safekeeper_connstr.to_owned(),
     924              19 :                 http_connstr: safekeeper_connstr.to_owned(),
     925              19 :                 availability_zone: None,
     926              19 :             },
     927              19 :             latest_update,
     928              19 :         }
     929              19 :     }
     930                 : 
     931               1 :     #[tokio::test]
     932               1 :     async fn no_connection_no_candidate() -> anyhow::Result<()> {
     933               1 :         let harness = TenantHarness::create("no_connection_no_candidate")?;
     934               2 :         let mut state = dummy_state(&harness).await;
     935               1 :         let now = Utc::now().naive_utc();
     936                 : 
     937               1 :         let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
     938               1 :         let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
     939               1 : 
     940               1 :         state.wal_connection = None;
     941               1 :         state.wal_stream_candidates = HashMap::from([
     942               1 :             (NodeId(0), dummy_broker_sk_timeline(1, "", now)),
     943               1 :             (NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
     944               1 :             (NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
     945               1 :             (
     946               1 :                 NodeId(3),
     947               1 :                 dummy_broker_sk_timeline(
     948               1 :                     1 + state.conf.max_lsn_wal_lag.get(),
     949               1 :                     "delay_over_threshold",
     950               1 :                     delay_over_threshold,
     951               1 :                 ),
     952               1 :             ),
     953               1 :         ]);
     954               1 : 
     955               1 :         let no_candidate = state.next_connection_candidate();
     956               1 :         assert!(
     957               1 :             no_candidate.is_none(),
     958 UBC           0 :             "Expected no candidate selected out of non full data options, but got {no_candidate:?}"
     959                 :         );
     960                 : 
     961 CBC           1 :         Ok(())
     962                 :     }
     963                 : 
     964               1 :     #[tokio::test]
     965               1 :     async fn connection_no_candidate() -> anyhow::Result<()> {
     966               1 :         let harness = TenantHarness::create("connection_no_candidate")?;
     967               2 :         let mut state = dummy_state(&harness).await;
     968               1 :         let now = Utc::now().naive_utc();
     969               1 : 
     970               1 :         let connected_sk_id = NodeId(0);
     971               1 :         let current_lsn = 100_000;
     972               1 : 
     973               1 :         let connection_status = WalConnectionStatus {
     974               1 :             is_connected: true,
     975               1 :             has_processed_wal: true,
     976               1 :             latest_connection_update: now,
     977               1 :             latest_wal_update: now,
     978               1 :             commit_lsn: Some(Lsn(current_lsn)),
     979               1 :             streaming_lsn: Some(Lsn(current_lsn)),
     980               1 :             node: NodeId(1),
     981               1 :         };
     982               1 : 
     983               1 :         state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
     984               1 :         state.wal_connection = Some(WalConnection {
     985               1 :             started_at: now,
     986               1 :             sk_id: connected_sk_id,
     987               1 :             availability_zone: None,
     988               1 :             status: connection_status,
     989               1 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
     990               1 :                 sender
     991               1 :                     .send(TaskStateUpdate::Progress(connection_status))
     992               1 :                     .ok();
     993               1 :                 Ok(())
     994               1 :             }),
     995               1 :             discovered_new_wal: None,
     996               1 :         });
     997               1 :         state.wal_stream_candidates = HashMap::from([
     998               1 :             (
     999               1 :                 connected_sk_id,
    1000               1 :                 dummy_broker_sk_timeline(
    1001               1 :                     current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
    1002               1 :                     DUMMY_SAFEKEEPER_HOST,
    1003               1 :                     now,
    1004               1 :                 ),
    1005               1 :             ),
    1006               1 :             (
    1007               1 :                 NodeId(1),
    1008               1 :                 dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
    1009               1 :             ),
    1010               1 :             (
    1011               1 :                 NodeId(2),
    1012               1 :                 dummy_broker_sk_timeline(
    1013               1 :                     current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
    1014               1 :                     "not_enough_advanced_lsn",
    1015               1 :                     now,
    1016               1 :                 ),
    1017               1 :             ),
    1018               1 :         ]);
    1019               1 : 
    1020               1 :         let no_candidate = state.next_connection_candidate();
    1021               1 :         assert!(
    1022               1 :             no_candidate.is_none(),
    1023 UBC           0 :             "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}"
    1024                 :         );
    1025                 : 
    1026 CBC           1 :         Ok(())
    1027                 :     }
    1028                 : 
    1029               1 :     #[tokio::test]
    1030               1 :     async fn no_connection_candidate() -> anyhow::Result<()> {
    1031               1 :         let harness = TenantHarness::create("no_connection_candidate")?;
    1032               2 :         let mut state = dummy_state(&harness).await;
    1033               1 :         let now = Utc::now().naive_utc();
    1034               1 : 
    1035               1 :         state.wal_connection = None;
    1036               1 :         state.wal_stream_candidates = HashMap::from([(
    1037               1 :             NodeId(0),
    1038               1 :             dummy_broker_sk_timeline(
    1039               1 :                 1 + state.conf.max_lsn_wal_lag.get(),
    1040               1 :                 DUMMY_SAFEKEEPER_HOST,
    1041               1 :                 now,
    1042               1 :             ),
    1043               1 :         )]);
    1044               1 : 
    1045               1 :         let only_candidate = state
    1046               1 :             .next_connection_candidate()
    1047               1 :             .expect("Expected one candidate selected out of the only data option, but got none");
    1048               1 :         assert_eq!(only_candidate.safekeeper_id, NodeId(0));
    1049               1 :         assert_eq!(
    1050                 :             only_candidate.reason,
    1051                 :             ReconnectReason::NoExistingConnection,
    1052 UBC           0 :             "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
    1053                 :         );
    1054 CBC           1 :         assert_eq!(
    1055               1 :             only_candidate.wal_source_connconf.host(),
    1056               1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1057               1 :         );
    1058                 : 
    1059               1 :         let selected_lsn = 100_000;
    1060               1 :         state.wal_stream_candidates = HashMap::from([
    1061               1 :             (
    1062               1 :                 NodeId(0),
    1063               1 :                 dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
    1064               1 :             ),
    1065               1 :             (
    1066               1 :                 NodeId(1),
    1067               1 :                 dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
    1068               1 :             ),
    1069               1 :             (
    1070               1 :                 NodeId(2),
    1071               1 :                 dummy_broker_sk_timeline(selected_lsn + 100, "", now),
    1072               1 :             ),
    1073               1 :         ]);
    1074               1 :         let biggest_wal_candidate = state.next_connection_candidate().expect(
    1075               1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1076               1 :         );
    1077               1 : 
    1078               1 :         assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1));
    1079               1 :         assert_eq!(
    1080                 :             biggest_wal_candidate.reason,
    1081                 :             ReconnectReason::NoExistingConnection,
    1082 UBC           0 :             "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
    1083                 :         );
    1084 CBC           1 :         assert_eq!(
    1085               1 :             biggest_wal_candidate.wal_source_connconf.host(),
    1086               1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1087               1 :         );
    1088                 : 
    1089               1 :         Ok(())
    1090                 :     }
    1091                 : 
    1092               1 :     #[tokio::test]
    1093               1 :     async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
    1094               1 :         let harness = TenantHarness::create("candidate_with_many_connection_failures")?;
    1095               2 :         let mut state = dummy_state(&harness).await;
    1096               1 :         let now = Utc::now().naive_utc();
    1097               1 : 
    1098               1 :         let current_lsn = Lsn(100_000).align();
    1099               1 :         let bigger_lsn = Lsn(current_lsn.0 + 100).align();
    1100               1 : 
    1101               1 :         state.wal_connection = None;
    1102               1 :         state.wal_stream_candidates = HashMap::from([
    1103               1 :             (
    1104               1 :                 NodeId(0),
    1105               1 :                 dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1106               1 :             ),
    1107               1 :             (
    1108               1 :                 NodeId(1),
    1109               1 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1110               1 :             ),
    1111               1 :         ]);
    1112               1 :         state.wal_connection_retries = HashMap::from([(
    1113               1 :             NodeId(0),
    1114               1 :             RetryInfo {
    1115               1 :                 next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
    1116               1 :                 retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
    1117               1 :             },
    1118               1 :         )]);
    1119               1 : 
    1120               1 :         let candidate_with_less_errors = state
    1121               1 :             .next_connection_candidate()
    1122               1 :             .expect("Expected one candidate selected, but got none");
    1123               1 :         assert_eq!(
    1124                 :             candidate_with_less_errors.safekeeper_id,
    1125                 :             NodeId(1),
    1126 UBC           0 :             "Should select the node with no pending retry cooldown"
    1127                 :         );
    1128                 : 
    1129 CBC           1 :         Ok(())
    1130                 :     }
    1131                 : 
    1132               1 :     #[tokio::test]
    1133               1 :     async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
    1134               1 :         let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?;
    1135               2 :         let mut state = dummy_state(&harness).await;
    1136               1 :         let current_lsn = Lsn(100_000).align();
    1137               1 :         let now = Utc::now().naive_utc();
    1138               1 : 
    1139               1 :         let connected_sk_id = NodeId(0);
    1140               1 :         let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
    1141               1 : 
    1142               1 :         let connection_status = WalConnectionStatus {
    1143               1 :             is_connected: true,
    1144               1 :             has_processed_wal: true,
    1145               1 :             latest_connection_update: now,
    1146               1 :             latest_wal_update: now,
    1147               1 :             commit_lsn: Some(current_lsn),
    1148               1 :             streaming_lsn: Some(current_lsn),
    1149               1 :             node: connected_sk_id,
    1150               1 :         };
    1151               1 : 
    1152               1 :         state.wal_connection = Some(WalConnection {
    1153               1 :             started_at: now,
    1154               1 :             sk_id: connected_sk_id,
    1155               1 :             availability_zone: None,
    1156               1 :             status: connection_status,
    1157               1 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
    1158               1 :                 sender
    1159               1 :                     .send(TaskStateUpdate::Progress(connection_status))
    1160               1 :                     .ok();
    1161               1 :                 Ok(())
    1162               1 :             }),
    1163               1 :             discovered_new_wal: None,
    1164               1 :         });
    1165               1 :         state.wal_stream_candidates = HashMap::from([
    1166               1 :             (
    1167               1 :                 connected_sk_id,
    1168               1 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1169               1 :             ),
    1170               1 :             (
    1171               1 :                 NodeId(1),
    1172               1 :                 dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
    1173               1 :             ),
    1174               1 :         ]);
    1175               1 : 
    1176               1 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1177               1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1178               1 :         );
    1179               1 : 
    1180               1 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1));
    1181               1 :         assert_eq!(
    1182               1 :             over_threshcurrent_candidate.reason,
    1183               1 :             ReconnectReason::LaggingWal {
    1184               1 :                 current_commit_lsn: current_lsn,
    1185               1 :                 new_commit_lsn: new_lsn,
    1186               1 :                 threshold: state.conf.max_lsn_wal_lag
    1187               1 :             },
    1188 UBC           0 :             "Should select bigger WAL safekeeper if it starts to lag enough"
    1189                 :         );
    1190 CBC           1 :         assert_eq!(
    1191               1 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1192               1 :             &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
    1193               1 :         );
    1194                 : 
    1195               1 :         Ok(())
    1196                 :     }
    1197                 : 
    1198               1 :     #[tokio::test]
    1199               1 :     async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
    1200               1 :         let harness = TenantHarness::create("timeout_connection_threshold_current_candidate")?;
    1201               2 :         let mut state = dummy_state(&harness).await;
    1202               1 :         let current_lsn = Lsn(100_000).align();
    1203               1 :         let now = Utc::now().naive_utc();
    1204                 : 
    1205               1 :         let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
    1206               1 :         let time_over_threshold =
    1207               1 :             Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
    1208               1 : 
    1209               1 :         let connection_status = WalConnectionStatus {
    1210               1 :             is_connected: true,
    1211               1 :             has_processed_wal: true,
    1212               1 :             latest_connection_update: time_over_threshold,
    1213               1 :             latest_wal_update: time_over_threshold,
    1214               1 :             commit_lsn: Some(current_lsn),
    1215               1 :             streaming_lsn: Some(current_lsn),
    1216               1 :             node: NodeId(1),
    1217               1 :         };
    1218               1 : 
    1219               1 :         state.wal_connection = Some(WalConnection {
    1220               1 :             started_at: now,
    1221               1 :             sk_id: NodeId(1),
    1222               1 :             availability_zone: None,
    1223               1 :             status: connection_status,
    1224               1 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
    1225               1 :                 sender
    1226               1 :                     .send(TaskStateUpdate::Progress(connection_status))
    1227               1 :                     .ok();
    1228               1 :                 Ok(())
    1229               1 :             }),
    1230               1 :             discovered_new_wal: None,
    1231               1 :         });
    1232               1 :         state.wal_stream_candidates = HashMap::from([(
    1233               1 :             NodeId(0),
    1234               1 :             dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1235               1 :         )]);
    1236               1 : 
    1237               1 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1238               1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1239               1 :         );
    1240               1 : 
    1241               1 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
    1242               1 :         match over_threshcurrent_candidate.reason {
    1243                 :             ReconnectReason::NoKeepAlives {
    1244               1 :                 last_keep_alive,
    1245               1 :                 threshold,
    1246               1 :                 ..
    1247               1 :             } => {
    1248               1 :                 assert_eq!(last_keep_alive, Some(time_over_threshold));
    1249               1 :                 assert_eq!(threshold, state.conf.lagging_wal_timeout);
    1250                 :             }
    1251 UBC           0 :             unexpected => panic!("Unexpected reason: {unexpected:?}"),
    1252                 :         }
    1253 CBC           1 :         assert_eq!(
    1254               1 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1255               1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1256               1 :         );
    1257                 : 
    1258               1 :         Ok(())
    1259                 :     }
    1260                 : 
    1261               1 :     #[tokio::test]
    1262               1 :     async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
    1263               1 :         let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate")?;
    1264               3 :         let mut state = dummy_state(&harness).await;
    1265               1 :         let current_lsn = Lsn(100_000).align();
    1266               1 :         let new_lsn = Lsn(100_100).align();
    1267               1 :         let now = Utc::now().naive_utc();
    1268                 : 
    1269               1 :         let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
    1270               1 :         let time_over_threshold =
    1271               1 :             Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
    1272               1 : 
    1273               1 :         let connection_status = WalConnectionStatus {
    1274               1 :             is_connected: true,
    1275               1 :             has_processed_wal: true,
    1276               1 :             latest_connection_update: now,
    1277               1 :             latest_wal_update: time_over_threshold,
    1278               1 :             commit_lsn: Some(current_lsn),
    1279               1 :             streaming_lsn: Some(current_lsn),
    1280               1 :             node: NodeId(1),
    1281               1 :         };
    1282               1 : 
    1283               1 :         state.wal_connection = Some(WalConnection {
    1284               1 :             started_at: now,
    1285               1 :             sk_id: NodeId(1),
    1286               1 :             availability_zone: None,
    1287               1 :             status: connection_status,
    1288               1 :             connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
    1289               1 :             discovered_new_wal: Some(NewCommittedWAL {
    1290               1 :                 discovered_at: time_over_threshold,
    1291               1 :                 lsn: new_lsn,
    1292               1 :             }),
    1293               1 :         });
    1294               1 :         state.wal_stream_candidates = HashMap::from([(
    1295               1 :             NodeId(0),
    1296               1 :             dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1297               1 :         )]);
    1298               1 : 
    1299               1 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1300               1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1301               1 :         );
    1302               1 : 
    1303               1 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
    1304               1 :         match over_threshcurrent_candidate.reason {
    1305                 :             ReconnectReason::NoWalTimeout {
    1306               1 :                 current_lsn,
    1307               1 :                 current_commit_lsn,
    1308               1 :                 candidate_commit_lsn,
    1309               1 :                 last_wal_interaction,
    1310               1 :                 threshold,
    1311               1 :                 ..
    1312               1 :             } => {
    1313               1 :                 assert_eq!(current_lsn, current_lsn);
    1314               1 :                 assert_eq!(current_commit_lsn, current_lsn);
    1315               1 :                 assert_eq!(candidate_commit_lsn, new_lsn);
    1316               1 :                 assert_eq!(last_wal_interaction, Some(time_over_threshold));
    1317               1 :                 assert_eq!(threshold, state.conf.lagging_wal_timeout);
    1318                 :             }
    1319 UBC           0 :             unexpected => panic!("Unexpected reason: {unexpected:?}"),
    1320                 :         }
    1321 CBC           1 :         assert_eq!(
    1322               1 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1323               1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1324               1 :         );
    1325                 : 
    1326               1 :         Ok(())
    1327                 :     }
    1328                 : 
    1329                 :     const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
    1330                 : 
    1331               8 :     async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
    1332               8 :         let (tenant, ctx) = harness.load().await;
    1333               8 :         let timeline = tenant
    1334               8 :             .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
    1335               9 :             .await
    1336               8 :             .expect("Failed to create an empty timeline for dummy wal connection manager");
    1337               8 : 
    1338               8 :         ConnectionManagerState {
    1339               8 :             id: TenantTimelineId {
    1340               8 :                 tenant_id: harness.tenant_id,
    1341               8 :                 timeline_id: TIMELINE_ID,
    1342               8 :             },
    1343               8 :             timeline,
    1344               8 :             conf: WalReceiverConf {
    1345               8 :                 wal_connect_timeout: Duration::from_secs(1),
    1346               8 :                 lagging_wal_timeout: Duration::from_secs(1),
    1347               8 :                 max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
    1348               8 :                 auth_token: None,
    1349               8 :                 availability_zone: None,
    1350               8 :                 ingest_batch_size: 1,
    1351               8 :             },
    1352               8 :             wal_connection: None,
    1353               8 :             wal_stream_candidates: HashMap::new(),
    1354               8 :             wal_connection_retries: HashMap::new(),
    1355               8 :         }
    1356               8 :     }
    1357                 : 
    1358               1 :     #[tokio::test]
    1359               1 :     async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
    1360               1 :         // Pageserver and one of safekeepers will be in the same availability zone
    1361               1 :         // and pageserver should prefer to connect to it.
    1362               1 :         let test_az = Some("test_az".to_owned());
    1363                 : 
    1364               1 :         let harness = TenantHarness::create("switch_to_same_availability_zone")?;
    1365               2 :         let mut state = dummy_state(&harness).await;
    1366               1 :         state.conf.availability_zone = test_az.clone();
    1367               1 :         let current_lsn = Lsn(100_000).align();
    1368               1 :         let now = Utc::now().naive_utc();
    1369               1 : 
    1370               1 :         let connected_sk_id = NodeId(0);
    1371               1 : 
    1372               1 :         let connection_status = WalConnectionStatus {
    1373               1 :             is_connected: true,
    1374               1 :             has_processed_wal: true,
    1375               1 :             latest_connection_update: now,
    1376               1 :             latest_wal_update: now,
    1377               1 :             commit_lsn: Some(current_lsn),
    1378               1 :             streaming_lsn: Some(current_lsn),
    1379               1 :             node: connected_sk_id,
    1380               1 :         };
    1381               1 : 
    1382               1 :         state.wal_connection = Some(WalConnection {
    1383               1 :             started_at: now,
    1384               1 :             sk_id: connected_sk_id,
    1385               1 :             availability_zone: None,
    1386               1 :             status: connection_status,
    1387               1 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
    1388               1 :                 sender
    1389               1 :                     .send(TaskStateUpdate::Progress(connection_status))
    1390               1 :                     .ok();
    1391               1 :                 Ok(())
    1392               1 :             }),
    1393               1 :             discovered_new_wal: None,
    1394               1 :         });
    1395               1 : 
    1396               1 :         // We have another safekeeper with the same commit_lsn, and it have the same availability zone as
    1397               1 :         // the current pageserver.
    1398               1 :         let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
    1399               1 :         same_az_sk.timeline.availability_zone = test_az.clone();
    1400               1 : 
    1401               1 :         state.wal_stream_candidates = HashMap::from([
    1402               1 :             (
    1403               1 :                 connected_sk_id,
    1404               1 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1405               1 :             ),
    1406               1 :             (NodeId(1), same_az_sk),
    1407               1 :         ]);
    1408               1 : 
    1409               1 :         // We expect that pageserver will switch to the safekeeper in the same availability zone,
    1410               1 :         // even if it has the same commit_lsn.
    1411               1 :         let next_candidate = state.next_connection_candidate().expect(
    1412               1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1413               1 :         );
    1414               1 : 
    1415               1 :         assert_eq!(next_candidate.safekeeper_id, NodeId(1));
    1416               1 :         assert_eq!(
    1417                 :             next_candidate.reason,
    1418                 :             ReconnectReason::SwitchAvailabilityZone,
    1419 UBC           0 :             "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
    1420                 :         );
    1421 CBC           1 :         assert_eq!(
    1422               1 :             next_candidate.wal_source_connconf.host(),
    1423               1 :             &Host::Domain("same_az".to_owned())
    1424               1 :         );
    1425                 : 
    1426               1 :         Ok(())
    1427                 :     }
    1428                 : }
        

Generated by: LCOV version 2.1-beta