LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/timeline/walreceiver - connection_manager.rs (source / functions) Coverage Total Hit UBC GBC GIC CBC ECB
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 94.7 % 1004 951 53 12 939
Current Date: 2023-10-19 02:04:12 Functions: 76.5 % 98 75 23 3 72 3
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
       2                 : //! that contains the latest WAL to stream and this connection does not go stale.
       3                 : //!
       4                 : //! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
       5                 : //! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
       6                 : //! Current connection state is tracked too, to ensure it's not getting stale.
       7                 : //!
       8                 : //! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
       9                 : //! then a (re)connection happens, if necessary.
      10                 : //! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
      11                 : 
      12                 : use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
      13                 : 
      14                 : use super::{TaskStateUpdate, WalReceiverConf};
      15                 : use crate::context::{DownloadBehavior, RequestContext};
      16                 : use crate::metrics::{
      17                 :     WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
      18                 :     WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
      19                 : };
      20                 : use crate::task_mgr::{shutdown_token, TaskKind};
      21                 : use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
      22                 : use anyhow::Context;
      23                 : use chrono::{NaiveDateTime, Utc};
      24                 : use pageserver_api::models::TimelineState;
      25                 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
      26                 : use storage_broker::proto::SafekeeperTimelineInfo;
      27                 : use storage_broker::proto::SubscribeSafekeeperInfoRequest;
      28                 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      29                 : use storage_broker::BrokerClientChannel;
      30                 : use storage_broker::Streaming;
      31                 : use tokio::select;
      32                 : use tracing::*;
      33                 : 
      34                 : use postgres_connection::PgConnectionConfig;
      35                 : use utils::backoff::{
      36                 :     exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
      37                 : };
      38                 : use utils::postgres_client::wal_stream_connection_config;
      39                 : use utils::{
      40                 :     id::{NodeId, TenantTimelineId},
      41                 :     lsn::Lsn,
      42                 : };
      43                 : 
      44                 : use super::{
      45                 :     walreceiver_connection::WalConnectionStatus, walreceiver_connection::WalReceiverError,
      46                 :     TaskEvent, TaskHandle,
      47                 : };
      48                 : 
      49                 : /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
      50                 : /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
      51                 : /// If storage broker subscription is cancelled, exits.
      52 CBC        1103 : pub(super) async fn connection_manager_loop_step(
      53            1103 :     broker_client: &mut BrokerClientChannel,
      54            1103 :     connection_manager_state: &mut ConnectionManagerState,
      55            1103 :     ctx: &RequestContext,
      56            1103 :     manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
      57            1103 : ) -> ControlFlow<(), ()> {
      58            1103 :     match connection_manager_state
      59            1103 :         .timeline
      60            1103 :         .wait_to_become_active(ctx)
      61               6 :         .await
      62                 :     {
      63            1102 :         Ok(()) => {}
      64 GBC           1 :         Err(new_state) => {
      65 UBC           0 :             debug!(
      66               0 :                 ?new_state,
      67               0 :                 "state changed, stopping wal connection manager loop"
      68               0 :             );
      69 GBC           1 :             return ControlFlow::Break(());
      70                 :         }
      71                 :     }
      72                 : 
      73 CBC        1102 :     WALRECEIVER_ACTIVE_MANAGERS.inc();
      74            1102 :     scopeguard::defer! {
      75             414 :         WALRECEIVER_ACTIVE_MANAGERS.dec();
      76             414 :     }
      77                 : 
      78            1102 :     let id = TenantTimelineId {
      79            1102 :         tenant_id: connection_manager_state.timeline.tenant_id,
      80            1102 :         timeline_id: connection_manager_state.timeline.timeline_id,
      81            1102 :     };
      82            1102 : 
      83            1102 :     let mut timeline_state_updates = connection_manager_state
      84            1102 :         .timeline
      85            1102 :         .subscribe_for_state_updates();
      86                 : 
      87                 :     // Subscribe to the broker updates. Stream shares underlying TCP connection
      88                 :     // with other streams on this client (other connection managers). When
      89                 :     // object goes out of scope, stream finishes in drop() automatically.
      90            2196 :     let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
      91 UBC           0 :     debug!("Subscribed for broker timeline updates");
      92                 : 
      93                 :     loop {
      94 CBC      759873 :         let time_until_next_retry = connection_manager_state.time_until_next_retry();
      95                 : 
      96                 :         // These things are happening concurrently:
      97                 :         //
      98                 :         //  - keep receiving WAL on the current connection
      99                 :         //      - if the shared state says we need to change connection, disconnect and return
     100                 :         //      - this runs in a separate task and we receive updates via a watch channel
     101                 :         //  - change connection if the rules decide so, or if the current connection dies
     102                 :         //  - receive updates from broker
     103                 :         //      - this might change the current desired connection
     104                 :         //  - timeline state changes to something that does not allow walreceiver to run concurrently
     105          759873 :         select! {
     106          759811 :             Some(wal_connection_update) = async {
     107          759811 :                 match connection_manager_state.wal_connection.as_mut() {
     108          758252 :                     Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
     109            1559 :                     None => None,
     110                 :                 }
     111          754294 :             } => {
     112                 :                 let wal_connection = connection_manager_state.wal_connection.as_mut()
     113                 :                     .expect("Should have a connection, as checked by the corresponding select! guard");
     114                 :                 match wal_connection_update {
     115                 :                     TaskEvent::Update(TaskStateUpdate::Started) => {},
     116                 :                     TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
     117                 :                         if new_status.has_processed_wal {
     118                 :                             // We have advanced last_record_lsn by processing the WAL received
     119                 :                             // from this safekeeper. This is good enough to clean unsuccessful
     120                 :                             // retries history and allow reconnecting to this safekeeper without
     121                 :                             // sleeping for a long time.
     122                 :                             connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
     123                 :                         }
     124                 :                         wal_connection.status = new_status;
     125                 :                     }
     126                 :                     TaskEvent::End(walreceiver_task_result) => {
     127                 :                         match walreceiver_task_result {
     128 UBC           0 :                             Ok(()) => debug!("WAL receiving task finished"),
     129 CBC           9 :                             Err(e) => error!("wal receiver task finished with an error: {e:?}"),
     130                 :                         }
     131                 :                         connection_manager_state.drop_old_connection(false).await;
     132                 :                     },
     133                 :                 }
     134                 :             },
     135                 : 
     136                 :             // Got a new update from the broker
     137            5804 :             broker_update = broker_subscription.message() => {
     138                 :                 match broker_update {
     139                 :                     Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
     140                 :                     Err(e) => {
     141 UBC           0 :                         error!("broker subscription failed: {e}");
     142                 :                         return ControlFlow::Continue(());
     143                 :                     }
     144                 :                     Ok(None) => {
     145               0 :                         error!("broker subscription stream ended"); // can't happen
     146                 :                         return ControlFlow::Continue(());
     147                 :                     }
     148                 :                 }
     149                 :             },
     150                 : 
     151 CBC      748189 :             new_event = async {
     152          748189 :                 loop {
     153          748189 :                     if connection_manager_state.timeline.current_state() == TimelineState::Loading {
     154 UBC           0 :                         warn!("wal connection manager should only be launched after timeline has become active");
     155 CBC      748189 :                     }
     156          748189 :                     match timeline_state_updates.changed().await {
     157                 :                         Ok(()) => {
     158             342 :                             let new_state = connection_manager_state.timeline.current_state();
     159             342 :                             match new_state {
     160                 :                                 // we're already active as walreceiver, no need to reactivate
     161 UBC           0 :                                 TimelineState::Active => continue,
     162                 :                                 TimelineState::Broken { .. } | TimelineState::Stopping => {
     163 CBC         342 :                                     debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
     164             342 :                                     return ControlFlow::Break(());
     165                 :                                 }
     166                 :                                 TimelineState::Loading => {
     167 UBC           0 :                                     warn!("timeline transitioned back to Loading state, that should not happen");
     168               0 :                                     return ControlFlow::Continue(());
     169                 :                                 }
     170                 :                             }
     171                 :                         }
     172               0 :                         Err(_sender_dropped_error) => return ControlFlow::Break(()),
     173                 :                     }
     174                 :                 }
     175 CBC         342 :             } => match new_event {
     176                 :                 ControlFlow::Continue(()) => {
     177                 :                     return ControlFlow::Continue(());
     178                 :                 }
     179                 :                 ControlFlow::Break(()) => {
     180 UBC           0 :                     debug!("Timeline is no longer active, stopping wal connection manager loop");
     181                 :                     return ControlFlow::Break(());
     182                 :                 }
     183                 :             },
     184                 : 
     185 CBC      753966 :             Some(()) = async {
     186          753966 :                 match time_until_next_retry {
     187            1132 :                     Some(sleep_time) => {
     188            1132 :                         tokio::time::sleep(sleep_time).await;
     189             244 :                         Some(())
     190                 :                     },
     191                 :                     None => {
     192          752834 :                         debug!("No candidates to retry, waiting indefinitely for the broker events");
     193          752834 :                         None
     194                 :                     }
     195                 :                 }
     196          753078 :             } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
     197                 :         }
     198                 : 
     199          758783 :         if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
     200            1189 :             info!("Switching to new connection candidate: {new_candidate:?}");
     201            1189 :             connection_manager_state
     202            1189 :                 .change_connection(new_candidate, ctx)
     203              20 :                 .await
     204          757592 :         }
     205          758780 :         *manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
     206                 :     }
     207             343 : }
     208                 : 
     209                 : /// Endlessly try to subscribe for broker updates for a given timeline.
     210            1102 : async fn subscribe_for_timeline_updates(
     211            1102 :     broker_client: &mut BrokerClientChannel,
     212            1102 :     id: TenantTimelineId,
     213            1102 : ) -> Streaming<SafekeeperTimelineInfo> {
     214            1102 :     let mut attempt = 0;
     215            1102 :     let cancel = shutdown_token();
     216                 : 
     217                 :     loop {
     218            1102 :         exponential_backoff(
     219            1102 :             attempt,
     220            1102 :             DEFAULT_BASE_BACKOFF_SECONDS,
     221            1102 :             DEFAULT_MAX_BACKOFF_SECONDS,
     222            1102 :             &cancel,
     223            1102 :         )
     224 UBC           0 :         .await;
     225 CBC        1102 :         attempt += 1;
     226            1102 : 
     227            1102 :         // subscribe to the specific timeline
     228            1102 :         let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
     229            1102 :             tenant_id: id.tenant_id.as_ref().to_owned(),
     230            1102 :             timeline_id: id.timeline_id.as_ref().to_owned(),
     231            1102 :         });
     232            1102 :         let request = SubscribeSafekeeperInfoRequest {
     233            1102 :             subscription_key: Some(key),
     234            1102 :         };
     235            1102 : 
     236            2196 :         match broker_client.subscribe_safekeeper_info(request).await {
     237            1093 :             Ok(resp) => {
     238            1093 :                 return resp.into_inner();
     239                 :             }
     240 UBC           0 :             Err(e) => {
     241                 :                 // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
     242                 :                 // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
     243               0 :                 info!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}");
     244               0 :                 continue;
     245                 :             }
     246                 :         }
     247                 :     }
     248 CBC        1093 : }
     249                 : 
     250                 : const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
     251                 : const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
     252                 : const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
     253                 : 
     254                 : /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
     255                 : pub(super) struct ConnectionManagerState {
     256                 :     id: TenantTimelineId,
     257                 :     /// Use pageserver data about the timeline to filter out some of the safekeepers.
     258                 :     timeline: Arc<Timeline>,
     259                 :     conf: WalReceiverConf,
     260                 :     /// Current connection to safekeeper for WAL streaming.
     261                 :     wal_connection: Option<WalConnection>,
     262                 :     /// Info about retries and unsuccessful attempts to connect to safekeepers.
     263                 :     wal_connection_retries: HashMap<NodeId, RetryInfo>,
     264                 :     /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
     265                 :     wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
     266                 : }
     267                 : 
     268                 : /// An information about connection manager's current connection and connection candidates.
     269            1316 : #[derive(Debug, Clone)]
     270                 : pub struct ConnectionManagerStatus {
     271                 :     existing_connection: Option<WalConnectionStatus>,
     272                 :     wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
     273                 : }
     274                 : 
     275                 : impl ConnectionManagerStatus {
     276                 :     /// Generates a string, describing current connection status in a form, suitable for logging.
     277            1316 :     pub fn to_human_readable_string(&self) -> String {
     278            1316 :         let mut resulting_string = String::new();
     279            1316 :         match &self.existing_connection {
     280            1294 :             Some(connection) => {
     281            1294 :                 if connection.has_processed_wal {
     282            1226 :                     resulting_string.push_str(&format!(
     283            1226 :                         " (update {}): streaming WAL from node {}, ",
     284            1226 :                         connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
     285            1226 :                         connection.node,
     286            1226 :                     ));
     287            1226 : 
     288            1226 :                     match (connection.streaming_lsn, connection.commit_lsn) {
     289 UBC           0 :                         (None, None) => resulting_string.push_str("no streaming data"),
     290               0 :                         (None, Some(commit_lsn)) => {
     291               0 :                             resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
     292                 :                         }
     293               0 :                         (Some(streaming_lsn), None) => {
     294               0 :                             resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
     295                 :                         }
     296 CBC        1226 :                         (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
     297            1226 :                             &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
     298            1226 :                         ),
     299                 :                     }
     300              68 :                 } else if connection.is_connected {
     301              67 :                     resulting_string.push_str(&format!(
     302              67 :                         " (update {}): connecting to node {}",
     303              67 :                         connection
     304              67 :                             .latest_connection_update
     305              67 :                             .format("%Y-%m-%d %H:%M:%S"),
     306              67 :                         connection.node,
     307              67 :                     ));
     308              67 :                 } else {
     309 GBC           1 :                     resulting_string.push_str(&format!(
     310               1 :                         " (update {}): initializing node {} connection",
     311               1 :                         connection
     312               1 :                             .latest_connection_update
     313               1 :                             .format("%Y-%m-%d %H:%M:%S"),
     314               1 :                         connection.node,
     315               1 :                     ));
     316               1 :                 }
     317                 :             }
     318 CBC          22 :             None => resulting_string.push_str(": disconnected"),
     319                 :         }
     320                 : 
     321            1316 :         resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
     322            1316 :         let mut candidates = self.wal_stream_candidates.iter().peekable();
     323            2751 :         while let Some((node_id, candidate_info)) = candidates.next() {
     324            1435 :             resulting_string.push_str(&format!(
     325            1435 :                 "({}|{}|{})",
     326            1435 :                 node_id,
     327            1435 :                 candidate_info.latest_update.format("%H:%M:%S"),
     328            1435 :                 Lsn(candidate_info.timeline.commit_lsn)
     329            1435 :             ));
     330            1435 :             if candidates.peek().is_some() {
     331             119 :                 resulting_string.push_str(", ");
     332            1316 :             }
     333                 :         }
     334            1316 :         resulting_string.push(']');
     335            1316 : 
     336            1316 :         resulting_string
     337            1316 :     }
     338                 : }
     339                 : 
     340                 : /// Current connection data.
     341 UBC           0 : #[derive(Debug)]
     342                 : struct WalConnection {
     343                 :     /// Time when the connection was initiated.
     344                 :     started_at: NaiveDateTime,
     345                 :     /// Current safekeeper pageserver is connected to for WAL streaming.
     346                 :     sk_id: NodeId,
     347                 :     /// Availability zone of the safekeeper.
     348                 :     availability_zone: Option<String>,
     349                 :     /// Status of the connection.
     350                 :     status: WalConnectionStatus,
     351                 :     /// WAL streaming task handle.
     352                 :     connection_task: TaskHandle<WalConnectionStatus>,
     353                 :     /// Have we discovered that other safekeeper has more recent WAL than we do?
     354                 :     discovered_new_wal: Option<NewCommittedWAL>,
     355                 : }
     356                 : 
     357                 : /// Notion of a new committed WAL, which exists on other safekeeper.
     358               0 : #[derive(Debug, Clone, Copy)]
     359                 : struct NewCommittedWAL {
     360                 :     /// LSN of the new committed WAL.
     361                 :     lsn: Lsn,
     362                 :     /// When we discovered that the new committed WAL exists on other safekeeper.
     363                 :     discovered_at: NaiveDateTime,
     364                 : }
     365                 : 
     366               0 : #[derive(Debug, Clone, Copy)]
     367                 : struct RetryInfo {
     368                 :     next_retry_at: Option<NaiveDateTime>,
     369                 :     retry_duration_seconds: f64,
     370                 : }
     371                 : 
     372                 : /// Data about the timeline to connect to, received from the broker.
     373 CBC     1536862 : #[derive(Debug, Clone)]
     374                 : struct BrokerSkTimeline {
     375                 :     timeline: SafekeeperTimelineInfo,
     376                 :     /// Time at which the data was fetched from the broker last time, to track the stale data.
     377                 :     latest_update: NaiveDateTime,
     378                 : }
     379                 : 
     380                 : impl ConnectionManagerState {
     381            1103 :     pub(super) fn new(timeline: Arc<Timeline>, conf: WalReceiverConf) -> Self {
     382            1103 :         let id = TenantTimelineId {
     383            1103 :             tenant_id: timeline.tenant_id,
     384            1103 :             timeline_id: timeline.timeline_id,
     385            1103 :         };
     386            1103 :         Self {
     387            1103 :             id,
     388            1103 :             timeline,
     389            1103 :             conf,
     390            1103 :             wal_connection: None,
     391            1103 :             wal_stream_candidates: HashMap::new(),
     392            1103 :             wal_connection_retries: HashMap::new(),
     393            1103 :         }
     394            1103 :     }
     395                 : 
     396                 :     /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
     397            1188 :     async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
     398            1188 :         WALRECEIVER_SWITCHES
     399            1188 :             .with_label_values(&[new_sk.reason.name()])
     400            1188 :             .inc();
     401            1188 : 
     402            1188 :         self.drop_old_connection(true).await;
     403                 : 
     404            1188 :         let node_id = new_sk.safekeeper_id;
     405            1188 :         let connect_timeout = self.conf.wal_connect_timeout;
     406            1188 :         let timeline = Arc::clone(&self.timeline);
     407            1188 :         let ctx = ctx.detached_child(
     408            1188 :             TaskKind::WalReceiverConnectionHandler,
     409            1188 :             DownloadBehavior::Download,
     410            1188 :         );
     411                 : 
     412            1188 :         let span = info_span!("connection", %node_id);
     413            1188 :         let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
     414            1188 :             async move {
     415            1188 :                 debug_assert_current_span_has_tenant_and_timeline_id();
     416                 : 
     417            1188 :                 let res = super::walreceiver_connection::handle_walreceiver_connection(
     418            1188 :                     timeline,
     419            1188 :                     new_sk.wal_source_connconf,
     420            1188 :                     events_sender,
     421            1188 :                     cancellation,
     422            1188 :                     connect_timeout,
     423            1188 :                     ctx,
     424            1188 :                     node_id,
     425            1188 :                 )
     426         5085285 :                 .await;
     427                 : 
     428            1122 :                 match res {
     429             175 :                     Ok(()) => Ok(()),
     430             947 :                     Err(e) => {
     431             947 :                         match e {
     432              57 :                             WalReceiverError::SuccessfulCompletion(msg) => {
     433              57 :                                 info!("walreceiver connection handling ended with success: {msg}");
     434              57 :                                 Ok(())
     435                 :                             }
     436             881 :                             WalReceiverError::ExpectedSafekeeperError(e) => {
     437             881 :                                 info!("walreceiver connection handling ended: {e}");
     438             881 :                                 Ok(())
     439                 :                             }
     440               9 :                             WalReceiverError::Other(e) => {
     441               9 :                                 // give out an error to have task_mgr give it a really verbose logging
     442               9 :                                 Err(e).context("walreceiver connection handling failure")
     443                 :                             }
     444                 :                         }
     445                 :                     }
     446                 :                 }
     447            1122 :             }
     448            1188 :             .instrument(span)
     449            1188 :         });
     450            1188 : 
     451            1188 :         let now = Utc::now().naive_utc();
     452            1188 :         self.wal_connection = Some(WalConnection {
     453            1188 :             started_at: now,
     454            1188 :             sk_id: new_sk.safekeeper_id,
     455            1188 :             availability_zone: new_sk.availability_zone,
     456            1188 :             status: WalConnectionStatus {
     457            1188 :                 is_connected: false,
     458            1188 :                 has_processed_wal: false,
     459            1188 :                 latest_connection_update: now,
     460            1188 :                 latest_wal_update: now,
     461            1188 :                 streaming_lsn: None,
     462            1188 :                 commit_lsn: None,
     463            1188 :                 node: node_id,
     464            1188 :             },
     465            1188 :             connection_task: connection_handle,
     466            1188 :             discovered_new_wal: None,
     467            1188 :         });
     468            1188 :     }
     469                 : 
     470                 :     /// Drops the current connection (if any) and updates retry timeout for the next
     471                 :     /// connection attempt to the same safekeeper.
     472            2083 :     async fn drop_old_connection(&mut self, needs_shutdown: bool) {
     473            2083 :         let wal_connection = match self.wal_connection.take() {
     474             914 :             Some(wal_connection) => wal_connection,
     475            1169 :             None => return,
     476                 :         };
     477                 : 
     478             914 :         if needs_shutdown {
     479              20 :             wal_connection.connection_task.shutdown().await;
     480             895 :         }
     481                 : 
     482             914 :         let retry = self
     483             914 :             .wal_connection_retries
     484             914 :             .entry(wal_connection.sk_id)
     485             914 :             .or_insert(RetryInfo {
     486             914 :                 next_retry_at: None,
     487             914 :                 retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
     488             914 :             });
     489             914 : 
     490             914 :         let now = Utc::now().naive_utc();
     491             914 : 
     492             914 :         // Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
     493             914 :         // and we add backoff to the time when we started the connection attempt. If the connection
     494             914 :         // was active for a long time, then next_retry_at will be in the past.
     495             914 :         retry.next_retry_at =
     496             914 :             wal_connection
     497             914 :                 .started_at
     498             914 :                 .checked_add_signed(chrono::Duration::milliseconds(
     499             914 :                     (retry.retry_duration_seconds * 1000.0) as i64,
     500             914 :                 ));
     501                 : 
     502             914 :         if let Some(next) = &retry.next_retry_at {
     503             914 :             if next > &now {
     504             546 :                 info!(
     505             546 :                     "Next connection retry to {:?} is at {}",
     506             546 :                     wal_connection.sk_id, next
     507             546 :                 );
     508             368 :             }
     509 UBC           0 :         }
     510                 : 
     511 CBC         914 :         let next_retry_duration =
     512             914 :             retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
     513             914 :         // Clamp the next retry duration to the maximum allowed.
     514             914 :         let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
     515             914 :         // Clamp the next retry duration to the minimum allowed.
     516             914 :         let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
     517             914 : 
     518             914 :         retry.retry_duration_seconds = next_retry_duration;
     519            2083 :     }
     520                 : 
     521                 :     /// Returns time needed to wait to have a new candidate for WAL streaming.
     522          759873 :     fn time_until_next_retry(&self) -> Option<Duration> {
     523          759873 :         let now = Utc::now().naive_utc();
     524                 : 
     525          759873 :         let next_retry_at = self
     526          759873 :             .wal_connection_retries
     527          759873 :             .values()
     528          759873 :             .filter_map(|retry| retry.next_retry_at)
     529          759873 :             .filter(|next_retry_at| next_retry_at > &now)
     530          759873 :             .min()?;
     531                 : 
     532            1135 :         (next_retry_at - now).to_std().ok()
     533          759873 :     }
     534                 : 
     535                 :     /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
     536            5804 :     fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
     537            5804 :         WALRECEIVER_BROKER_UPDATES.inc();
     538            5804 : 
     539            5804 :         let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
     540            5804 :         let old_entry = self.wal_stream_candidates.insert(
     541            5804 :             new_safekeeper_id,
     542            5804 :             BrokerSkTimeline {
     543            5804 :                 timeline: timeline_update,
     544            5804 :                 latest_update: Utc::now().naive_utc(),
     545            5804 :             },
     546            5804 :         );
     547            5804 : 
     548            5804 :         if old_entry.is_none() {
     549             588 :             info!("New SK node was added: {new_safekeeper_id}");
     550             588 :             WALRECEIVER_CANDIDATES_ADDED.inc();
     551            5216 :         }
     552            5804 :     }
     553                 : 
     554                 :     /// Cleans up stale broker records and checks the rest for the new connection candidate.
     555                 :     /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
     556                 :     /// The current rules for approving new candidates:
     557                 :     /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
     558                 :     /// * if there's no such entry, no new candidate found, abort
     559                 :     /// * otherwise check if the candidate is much better than the current one
     560                 :     ///
     561                 :     /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
     562                 :     /// General rules are following:
     563                 :     /// * if connected safekeeper is not present, pick the candidate
     564                 :     /// * if we haven't received any updates for some time, pick the candidate
     565                 :     /// * if the candidate commit_lsn is much higher than the current one, pick the candidate
     566                 :     /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
     567                 :     /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
     568                 :     ///
     569                 :     /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
     570                 :     /// Both thresholds are configured per tenant.
     571          758790 :     fn next_connection_candidate(&mut self) -> Option<NewWalConnectionCandidate> {
     572          758790 :         self.cleanup_old_candidates();
     573          758790 : 
     574          758790 :         match &self.wal_connection {
     575          757149 :             Some(existing_wal_connection) => {
     576          757149 :                 let connected_sk_node = existing_wal_connection.sk_id;
     577                 : 
     578          416181 :                 let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
     579          757149 :                     self.select_connection_candidate(Some(connected_sk_node))?;
     580          416181 :                 let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
     581          416181 : 
     582          416181 :                 let now = Utc::now().naive_utc();
     583          416181 :                 if let Ok(latest_interaciton) =
     584          416181 :                     (now - existing_wal_connection.status.latest_connection_update).to_std()
     585                 :                 {
     586                 :                     // Drop connection if we haven't received keepalive message for a while.
     587          416181 :                     if latest_interaciton > self.conf.wal_connect_timeout {
     588               1 :                         return Some(NewWalConnectionCandidate {
     589               1 :                             safekeeper_id: new_sk_id,
     590               1 :                             wal_source_connconf: new_wal_source_connconf,
     591               1 :                             availability_zone: new_availability_zone,
     592               1 :                             reason: ReconnectReason::NoKeepAlives {
     593               1 :                                 last_keep_alive: Some(
     594               1 :                                     existing_wal_connection.status.latest_connection_update,
     595               1 :                                 ),
     596               1 :                                 check_time: now,
     597               1 :                                 threshold: self.conf.wal_connect_timeout,
     598               1 :                             },
     599               1 :                         });
     600          416180 :                     }
     601 UBC           0 :                 }
     602                 : 
     603 CBC      416180 :                 if !existing_wal_connection.status.is_connected {
     604                 :                     // We haven't connected yet and we shouldn't switch until connection timeout (condition above).
     605             176 :                     return None;
     606          416004 :                 }
     607                 : 
     608          416004 :                 if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
     609          415917 :                     let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
     610          415917 :                     // Check if the new candidate has much more WAL than the current one.
     611          415917 :                     match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
     612            9388 :                         Some(new_sk_lsn_advantage) => {
     613            9388 :                             if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
     614              20 :                                 return Some(NewWalConnectionCandidate {
     615              20 :                                     safekeeper_id: new_sk_id,
     616              20 :                                     wal_source_connconf: new_wal_source_connconf,
     617              20 :                                     availability_zone: new_availability_zone,
     618              20 :                                     reason: ReconnectReason::LaggingWal {
     619              20 :                                         current_commit_lsn,
     620              20 :                                         new_commit_lsn,
     621              20 :                                         threshold: self.conf.max_lsn_wal_lag,
     622              20 :                                     },
     623              20 :                                 });
     624            9368 :                             }
     625            9368 :                             // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
     626            9368 :                             // and the current one is not, switch to the new one.
     627            9368 :                             if self.conf.availability_zone.is_some()
     628             172 :                                 && existing_wal_connection.availability_zone
     629             172 :                                     != self.conf.availability_zone
     630             172 :                                 && self.conf.availability_zone == new_availability_zone
     631                 :                             {
     632               1 :                                 return Some(NewWalConnectionCandidate {
     633               1 :                                     safekeeper_id: new_sk_id,
     634               1 :                                     availability_zone: new_availability_zone,
     635               1 :                                     wal_source_connconf: new_wal_source_connconf,
     636               1 :                                     reason: ReconnectReason::SwitchAvailabilityZone,
     637               1 :                                 });
     638            9367 :                             }
     639                 :                         }
     640          406529 :                         None => debug!(
     641 UBC           0 :                             "Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
     642               0 :                         ),
     643                 :                     }
     644 CBC          87 :                 }
     645                 : 
     646          415983 :                 let current_lsn = match existing_wal_connection.status.streaming_lsn {
     647          415739 :                     Some(lsn) => lsn,
     648             244 :                     None => self.timeline.get_last_record_lsn(),
     649                 :                 };
     650          415983 :                 let current_commit_lsn = existing_wal_connection
     651          415983 :                     .status
     652          415983 :                     .commit_lsn
     653          415983 :                     .unwrap_or(current_lsn);
     654          415983 :                 let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
     655          415983 : 
     656          415983 :                 // Keep discovered_new_wal only if connected safekeeper has not caught up yet.
     657          415983 :                 let mut discovered_new_wal = existing_wal_connection
     658          415983 :                     .discovered_new_wal
     659          415983 :                     .filter(|new_wal| new_wal.lsn > current_commit_lsn);
     660          415983 : 
     661          415983 :                 if discovered_new_wal.is_none() {
     662                 :                     // Check if the new candidate has more WAL than the current one.
     663                 :                     // If the new candidate has more WAL than the current one, we consider switching to the new candidate.
     664          413155 :                     discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
     665             121 :                         trace!(
     666 UBC           0 :                             "New candidate has commit_lsn {}, higher than current_commit_lsn {}",
     667               0 :                             candidate_commit_lsn,
     668               0 :                             current_commit_lsn
     669               0 :                         );
     670 CBC         121 :                         Some(NewCommittedWAL {
     671             121 :                             lsn: candidate_commit_lsn,
     672             121 :                             discovered_at: Utc::now().naive_utc(),
     673             121 :                         })
     674                 :                     } else {
     675          413034 :                         None
     676                 :                     };
     677            2828 :                 }
     678                 : 
     679          415983 :                 let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
     680                 :                     // Connected safekeeper has more WAL, but we haven't received updates for some time.
     681           11001 :                     trace!(
     682 UBC           0 :                         "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
     683               0 :                         (now - existing_wal_connection.status.latest_wal_update).to_std(),
     684               0 :                         current_lsn,
     685               0 :                         current_commit_lsn
     686               0 :                     );
     687 CBC       11001 :                     Some(existing_wal_connection.status.latest_wal_update)
     688                 :                 } else {
     689          404982 :                     discovered_new_wal.as_ref().map(|new_wal| {
     690             125 :                         // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
     691             125 :                         new_wal
     692             125 :                             .discovered_at
     693             125 :                             .max(existing_wal_connection.status.latest_wal_update)
     694          404982 :                     })
     695                 :                 };
     696                 : 
     697                 :                 // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
     698          415983 :                 if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
     699           11126 :                     if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
     700           11083 :                         if candidate_commit_lsn > current_commit_lsn
     701            2906 :                             && waiting_for_new_wal > self.conf.lagging_wal_timeout
     702                 :                         {
     703               1 :                             return Some(NewWalConnectionCandidate {
     704               1 :                                 safekeeper_id: new_sk_id,
     705               1 :                                 wal_source_connconf: new_wal_source_connconf,
     706               1 :                                 availability_zone: new_availability_zone,
     707               1 :                                 reason: ReconnectReason::NoWalTimeout {
     708               1 :                                     current_lsn,
     709               1 :                                     current_commit_lsn,
     710               1 :                                     candidate_commit_lsn,
     711               1 :                                     last_wal_interaction: Some(
     712               1 :                                         existing_wal_connection.status.latest_wal_update,
     713               1 :                                     ),
     714               1 :                                     check_time: now,
     715               1 :                                     threshold: self.conf.lagging_wal_timeout,
     716               1 :                                 },
     717               1 :                             });
     718           11082 :                         }
     719              43 :                     }
     720          404857 :                 }
     721                 : 
     722          415982 :                 self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
     723                 :             }
     724                 :             None => {
     725            1173 :                 let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
     726            1641 :                     self.select_connection_candidate(None)?;
     727            1173 :                 return Some(NewWalConnectionCandidate {
     728            1173 :                     safekeeper_id: new_sk_id,
     729            1173 :                     availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
     730            1173 :                     wal_source_connconf: new_wal_source_connconf,
     731            1173 :                     reason: ReconnectReason::NoExistingConnection,
     732            1173 :                 });
     733                 :             }
     734                 :         }
     735                 : 
     736          415982 :         None
     737          758790 :     }
     738                 : 
     739                 :     /// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
     740                 :     /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
     741                 :     ///
     742                 :     /// The candidate that is chosen:
     743                 :     /// * has no pending retry cooldown
     744                 :     /// * has greatest commit_lsn among the ones that are left
     745          758790 :     fn select_connection_candidate(
     746          758790 :         &self,
     747          758790 :         node_to_omit: Option<NodeId>,
     748          758790 :     ) -> Option<(NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
     749          758790 :         self.applicable_connection_candidates()
     750         1533965 :             .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
     751          776818 :             .max_by_key(|(_, info, _)| info.commit_lsn)
     752          758790 :     }
     753                 : 
     754                 :     /// Returns a list of safekeepers that have valid info and ready for connection.
     755                 :     /// Some safekeepers are filtered by the retry cooldown.
     756          758790 :     fn applicable_connection_candidates(
     757          758790 :         &self,
     758          758790 :     ) -> impl Iterator<Item = (NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
     759          758790 :         let now = Utc::now().naive_utc();
     760          758790 : 
     761          758790 :         self.wal_stream_candidates
     762          758790 :             .iter()
     763         1535448 :             .filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
     764         1535443 :             .filter(move |(sk_id, _)| {
     765         1535443 :                 let next_retry_at = self
     766         1535443 :                     .wal_connection_retries
     767         1535443 :                     .get(sk_id)
     768         1535443 :                     .and_then(|retry_info| {
     769          310948 :                         retry_info.next_retry_at
     770         1535443 :                     });
     771         1535443 : 
     772         1535443 :                 next_retry_at.is_none() || next_retry_at.unwrap() <= now
     773         1535443 :             }).filter_map(|(sk_id, broker_info)| {
     774         1533969 :                 let info = &broker_info.timeline;
     775         1533969 :                 if info.safekeeper_connstr.is_empty() {
     776               2 :                     return None; // no connection string, ignore sk
     777         1533967 :                 }
     778         1533967 :                 match wal_stream_connection_config(
     779         1533967 :                     self.id,
     780         1533967 :                     info.safekeeper_connstr.as_ref(),
     781         1533967 :                     match &self.conf.auth_token {
     782         1532816 :                         None => None,
     783            1151 :                         Some(x) => Some(x),
     784                 :                     },
     785         1533967 :                     self.conf.availability_zone.as_deref(),
     786                 :                 ) {
     787         1533965 :                     Ok(connstr) => Some((*sk_id, info, connstr)),
     788 GBC           2 :                     Err(e) => {
     789               2 :                         error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
     790 UBC           0 :                         None
     791                 :                     }
     792                 :                 }
     793 CBC     1533967 :             })
     794          758790 :     }
     795                 : 
     796                 :     /// Remove candidates which haven't sent broker updates for a while.
     797          758790 :     fn cleanup_old_candidates(&mut self) {
     798          758790 :         let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
     799          758790 :         let lagging_wal_timeout = self.conf.lagging_wal_timeout;
     800          758790 : 
     801          758790 :         self.wal_stream_candidates.retain(|node_id, broker_info| {
     802         1535455 :             if let Ok(time_since_latest_broker_update) =
     803         1535455 :                 (Utc::now().naive_utc() - broker_info.latest_update).to_std()
     804                 :             {
     805         1535455 :                 let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
     806         1535455 :                 if !should_retain {
     807               7 :                     node_ids_to_remove.push(*node_id);
     808         1535448 :                 }
     809         1535455 :                 should_retain
     810                 :             } else {
     811 UBC           0 :                 true
     812                 :             }
     813 CBC     1535455 :         });
     814          758790 : 
     815          758790 :         if !node_ids_to_remove.is_empty() {
     816              14 :             for node_id in node_ids_to_remove {
     817               7 :                 info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
     818               7 :                 self.wal_connection_retries.remove(&node_id);
     819               7 :                 WALRECEIVER_CANDIDATES_REMOVED.inc();
     820                 :             }
     821          758783 :         }
     822          758790 :     }
     823                 : 
     824             415 :     pub(super) async fn shutdown(mut self) {
     825             415 :         if let Some(wal_connection) = self.wal_connection.take() {
     826             255 :             wal_connection.connection_task.shutdown().await;
     827             207 :         }
     828             415 :     }
     829                 : 
     830          758780 :     fn manager_status(&self) -> ConnectionManagerStatus {
     831          758780 :         ConnectionManagerStatus {
     832          758780 :             existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
     833          758780 :             wal_stream_candidates: self.wal_stream_candidates.clone(),
     834          758780 :         }
     835          758780 :     }
     836                 : }
     837                 : 
     838            1189 : #[derive(Debug)]
     839                 : struct NewWalConnectionCandidate {
     840                 :     safekeeper_id: NodeId,
     841                 :     wal_source_connconf: PgConnectionConfig,
     842                 :     availability_zone: Option<String>,
     843                 :     reason: ReconnectReason,
     844                 : }
     845                 : 
     846                 : /// Stores the reason why WAL connection was switched, for furter debugging purposes.
     847            1189 : #[derive(Debug, PartialEq, Eq)]
     848                 : enum ReconnectReason {
     849                 :     NoExistingConnection,
     850                 :     LaggingWal {
     851                 :         current_commit_lsn: Lsn,
     852                 :         new_commit_lsn: Lsn,
     853                 :         threshold: NonZeroU64,
     854                 :     },
     855                 :     SwitchAvailabilityZone,
     856                 :     NoWalTimeout {
     857                 :         current_lsn: Lsn,
     858                 :         current_commit_lsn: Lsn,
     859                 :         candidate_commit_lsn: Lsn,
     860                 :         last_wal_interaction: Option<NaiveDateTime>,
     861                 :         check_time: NaiveDateTime,
     862                 :         threshold: Duration,
     863                 :     },
     864                 :     NoKeepAlives {
     865                 :         last_keep_alive: Option<NaiveDateTime>,
     866                 :         check_time: NaiveDateTime,
     867                 :         threshold: Duration,
     868                 :     },
     869                 : }
     870                 : 
     871                 : impl ReconnectReason {
     872            1188 :     fn name(&self) -> &str {
     873            1188 :         match self {
     874            1169 :             ReconnectReason::NoExistingConnection => "NoExistingConnection",
     875              19 :             ReconnectReason::LaggingWal { .. } => "LaggingWal",
     876 UBC           0 :             ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
     877               0 :             ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
     878               0 :             ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
     879                 :         }
     880 CBC        1188 :     }
     881                 : }
     882                 : 
     883                 : #[cfg(test)]
     884                 : mod tests {
     885                 :     use super::*;
     886                 :     use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
     887                 :     use url::Host;
     888                 : 
     889              19 :     fn dummy_broker_sk_timeline(
     890              19 :         commit_lsn: u64,
     891              19 :         safekeeper_connstr: &str,
     892              19 :         latest_update: NaiveDateTime,
     893              19 :     ) -> BrokerSkTimeline {
     894              19 :         BrokerSkTimeline {
     895              19 :             timeline: SafekeeperTimelineInfo {
     896              19 :                 safekeeper_id: 0,
     897              19 :                 tenant_timeline_id: None,
     898              19 :                 term: 0,
     899              19 :                 last_log_term: 0,
     900              19 :                 flush_lsn: 0,
     901              19 :                 commit_lsn,
     902              19 :                 backup_lsn: 0,
     903              19 :                 remote_consistent_lsn: 0,
     904              19 :                 peer_horizon_lsn: 0,
     905              19 :                 local_start_lsn: 0,
     906              19 :                 safekeeper_connstr: safekeeper_connstr.to_owned(),
     907              19 :                 http_connstr: safekeeper_connstr.to_owned(),
     908              19 :                 availability_zone: None,
     909              19 :             },
     910              19 :             latest_update,
     911              19 :         }
     912              19 :     }
     913                 : 
     914               1 :     #[tokio::test]
     915               1 :     async fn no_connection_no_candidate() -> anyhow::Result<()> {
     916               1 :         let harness = TenantHarness::create("no_connection_no_candidate")?;
     917               3 :         let mut state = dummy_state(&harness).await;
     918               1 :         let now = Utc::now().naive_utc();
     919                 : 
     920               1 :         let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
     921               1 :         let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
     922               1 : 
     923               1 :         state.wal_connection = None;
     924               1 :         state.wal_stream_candidates = HashMap::from([
     925               1 :             (NodeId(0), dummy_broker_sk_timeline(1, "", now)),
     926               1 :             (NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
     927               1 :             (NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
     928               1 :             (
     929               1 :                 NodeId(3),
     930               1 :                 dummy_broker_sk_timeline(
     931               1 :                     1 + state.conf.max_lsn_wal_lag.get(),
     932               1 :                     "delay_over_threshold",
     933               1 :                     delay_over_threshold,
     934               1 :                 ),
     935               1 :             ),
     936               1 :         ]);
     937               1 : 
     938               1 :         let no_candidate = state.next_connection_candidate();
     939               1 :         assert!(
     940               1 :             no_candidate.is_none(),
     941 UBC           0 :             "Expected no candidate selected out of non full data options, but got {no_candidate:?}"
     942                 :         );
     943                 : 
     944 CBC           1 :         Ok(())
     945                 :     }
     946                 : 
     947               1 :     #[tokio::test]
     948               1 :     async fn connection_no_candidate() -> anyhow::Result<()> {
     949               1 :         let harness = TenantHarness::create("connection_no_candidate")?;
     950               3 :         let mut state = dummy_state(&harness).await;
     951               1 :         let now = Utc::now().naive_utc();
     952               1 : 
     953               1 :         let connected_sk_id = NodeId(0);
     954               1 :         let current_lsn = 100_000;
     955               1 : 
     956               1 :         let connection_status = WalConnectionStatus {
     957               1 :             is_connected: true,
     958               1 :             has_processed_wal: true,
     959               1 :             latest_connection_update: now,
     960               1 :             latest_wal_update: now,
     961               1 :             commit_lsn: Some(Lsn(current_lsn)),
     962               1 :             streaming_lsn: Some(Lsn(current_lsn)),
     963               1 :             node: NodeId(1),
     964               1 :         };
     965               1 : 
     966               1 :         state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
     967               1 :         state.wal_connection = Some(WalConnection {
     968               1 :             started_at: now,
     969               1 :             sk_id: connected_sk_id,
     970               1 :             availability_zone: None,
     971               1 :             status: connection_status,
     972               1 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
     973               1 :                 sender
     974               1 :                     .send(TaskStateUpdate::Progress(connection_status))
     975               1 :                     .ok();
     976               1 :                 Ok(())
     977               1 :             }),
     978               1 :             discovered_new_wal: None,
     979               1 :         });
     980               1 :         state.wal_stream_candidates = HashMap::from([
     981               1 :             (
     982               1 :                 connected_sk_id,
     983               1 :                 dummy_broker_sk_timeline(
     984               1 :                     current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
     985               1 :                     DUMMY_SAFEKEEPER_HOST,
     986               1 :                     now,
     987               1 :                 ),
     988               1 :             ),
     989               1 :             (
     990               1 :                 NodeId(1),
     991               1 :                 dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
     992               1 :             ),
     993               1 :             (
     994               1 :                 NodeId(2),
     995               1 :                 dummy_broker_sk_timeline(
     996               1 :                     current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
     997               1 :                     "not_enough_advanced_lsn",
     998               1 :                     now,
     999               1 :                 ),
    1000               1 :             ),
    1001               1 :         ]);
    1002               1 : 
    1003               1 :         let no_candidate = state.next_connection_candidate();
    1004               1 :         assert!(
    1005               1 :             no_candidate.is_none(),
    1006 UBC           0 :             "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}"
    1007                 :         );
    1008                 : 
    1009 CBC           1 :         Ok(())
    1010                 :     }
    1011                 : 
    1012               1 :     #[tokio::test]
    1013               1 :     async fn no_connection_candidate() -> anyhow::Result<()> {
    1014               1 :         let harness = TenantHarness::create("no_connection_candidate")?;
    1015               3 :         let mut state = dummy_state(&harness).await;
    1016               1 :         let now = Utc::now().naive_utc();
    1017               1 : 
    1018               1 :         state.wal_connection = None;
    1019               1 :         state.wal_stream_candidates = HashMap::from([(
    1020               1 :             NodeId(0),
    1021               1 :             dummy_broker_sk_timeline(
    1022               1 :                 1 + state.conf.max_lsn_wal_lag.get(),
    1023               1 :                 DUMMY_SAFEKEEPER_HOST,
    1024               1 :                 now,
    1025               1 :             ),
    1026               1 :         )]);
    1027               1 : 
    1028               1 :         let only_candidate = state
    1029               1 :             .next_connection_candidate()
    1030               1 :             .expect("Expected one candidate selected out of the only data option, but got none");
    1031               1 :         assert_eq!(only_candidate.safekeeper_id, NodeId(0));
    1032               1 :         assert_eq!(
    1033                 :             only_candidate.reason,
    1034                 :             ReconnectReason::NoExistingConnection,
    1035 UBC           0 :             "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
    1036                 :         );
    1037 CBC           1 :         assert_eq!(
    1038               1 :             only_candidate.wal_source_connconf.host(),
    1039               1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1040               1 :         );
    1041                 : 
    1042               1 :         let selected_lsn = 100_000;
    1043               1 :         state.wal_stream_candidates = HashMap::from([
    1044               1 :             (
    1045               1 :                 NodeId(0),
    1046               1 :                 dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
    1047               1 :             ),
    1048               1 :             (
    1049               1 :                 NodeId(1),
    1050               1 :                 dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
    1051               1 :             ),
    1052               1 :             (
    1053               1 :                 NodeId(2),
    1054               1 :                 dummy_broker_sk_timeline(selected_lsn + 100, "", now),
    1055               1 :             ),
    1056               1 :         ]);
    1057               1 :         let biggest_wal_candidate = state.next_connection_candidate().expect(
    1058               1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1059               1 :         );
    1060               1 : 
    1061               1 :         assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1));
    1062               1 :         assert_eq!(
    1063                 :             biggest_wal_candidate.reason,
    1064                 :             ReconnectReason::NoExistingConnection,
    1065 UBC           0 :             "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
    1066                 :         );
    1067 CBC           1 :         assert_eq!(
    1068               1 :             biggest_wal_candidate.wal_source_connconf.host(),
    1069               1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1070               1 :         );
    1071                 : 
    1072               1 :         Ok(())
    1073                 :     }
    1074                 : 
    1075               1 :     #[tokio::test]
    1076               1 :     async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
    1077               1 :         let harness = TenantHarness::create("candidate_with_many_connection_failures")?;
    1078               3 :         let mut state = dummy_state(&harness).await;
    1079               1 :         let now = Utc::now().naive_utc();
    1080               1 : 
    1081               1 :         let current_lsn = Lsn(100_000).align();
    1082               1 :         let bigger_lsn = Lsn(current_lsn.0 + 100).align();
    1083               1 : 
    1084               1 :         state.wal_connection = None;
    1085               1 :         state.wal_stream_candidates = HashMap::from([
    1086               1 :             (
    1087               1 :                 NodeId(0),
    1088               1 :                 dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1089               1 :             ),
    1090               1 :             (
    1091               1 :                 NodeId(1),
    1092               1 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1093               1 :             ),
    1094               1 :         ]);
    1095               1 :         state.wal_connection_retries = HashMap::from([(
    1096               1 :             NodeId(0),
    1097               1 :             RetryInfo {
    1098               1 :                 next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
    1099               1 :                 retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
    1100               1 :             },
    1101               1 :         )]);
    1102               1 : 
    1103               1 :         let candidate_with_less_errors = state
    1104               1 :             .next_connection_candidate()
    1105               1 :             .expect("Expected one candidate selected, but got none");
    1106               1 :         assert_eq!(
    1107                 :             candidate_with_less_errors.safekeeper_id,
    1108                 :             NodeId(1),
    1109 UBC           0 :             "Should select the node with no pending retry cooldown"
    1110                 :         );
    1111                 : 
    1112 CBC           1 :         Ok(())
    1113                 :     }
    1114                 : 
    1115               1 :     #[tokio::test]
    1116               1 :     async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
    1117               1 :         let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?;
    1118               3 :         let mut state = dummy_state(&harness).await;
    1119               1 :         let current_lsn = Lsn(100_000).align();
    1120               1 :         let now = Utc::now().naive_utc();
    1121               1 : 
    1122               1 :         let connected_sk_id = NodeId(0);
    1123               1 :         let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
    1124               1 : 
    1125               1 :         let connection_status = WalConnectionStatus {
    1126               1 :             is_connected: true,
    1127               1 :             has_processed_wal: true,
    1128               1 :             latest_connection_update: now,
    1129               1 :             latest_wal_update: now,
    1130               1 :             commit_lsn: Some(current_lsn),
    1131               1 :             streaming_lsn: Some(current_lsn),
    1132               1 :             node: connected_sk_id,
    1133               1 :         };
    1134               1 : 
    1135               1 :         state.wal_connection = Some(WalConnection {
    1136               1 :             started_at: now,
    1137               1 :             sk_id: connected_sk_id,
    1138               1 :             availability_zone: None,
    1139               1 :             status: connection_status,
    1140               1 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
    1141               1 :                 sender
    1142               1 :                     .send(TaskStateUpdate::Progress(connection_status))
    1143               1 :                     .ok();
    1144               1 :                 Ok(())
    1145               1 :             }),
    1146               1 :             discovered_new_wal: None,
    1147               1 :         });
    1148               1 :         state.wal_stream_candidates = HashMap::from([
    1149               1 :             (
    1150               1 :                 connected_sk_id,
    1151               1 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1152               1 :             ),
    1153               1 :             (
    1154               1 :                 NodeId(1),
    1155               1 :                 dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
    1156               1 :             ),
    1157               1 :         ]);
    1158               1 : 
    1159               1 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1160               1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1161               1 :         );
    1162               1 : 
    1163               1 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1));
    1164               1 :         assert_eq!(
    1165               1 :             over_threshcurrent_candidate.reason,
    1166               1 :             ReconnectReason::LaggingWal {
    1167               1 :                 current_commit_lsn: current_lsn,
    1168               1 :                 new_commit_lsn: new_lsn,
    1169               1 :                 threshold: state.conf.max_lsn_wal_lag
    1170               1 :             },
    1171 UBC           0 :             "Should select bigger WAL safekeeper if it starts to lag enough"
    1172                 :         );
    1173 CBC           1 :         assert_eq!(
    1174               1 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1175               1 :             &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
    1176               1 :         );
    1177                 : 
    1178               1 :         Ok(())
    1179                 :     }
    1180                 : 
    1181               1 :     #[tokio::test]
    1182               1 :     async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
    1183               1 :         let harness = TenantHarness::create("timeout_connection_threshold_current_candidate")?;
    1184               3 :         let mut state = dummy_state(&harness).await;
    1185               1 :         let current_lsn = Lsn(100_000).align();
    1186               1 :         let now = Utc::now().naive_utc();
    1187                 : 
    1188               1 :         let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
    1189               1 :         let time_over_threshold =
    1190               1 :             Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
    1191               1 : 
    1192               1 :         let connection_status = WalConnectionStatus {
    1193               1 :             is_connected: true,
    1194               1 :             has_processed_wal: true,
    1195               1 :             latest_connection_update: time_over_threshold,
    1196               1 :             latest_wal_update: time_over_threshold,
    1197               1 :             commit_lsn: Some(current_lsn),
    1198               1 :             streaming_lsn: Some(current_lsn),
    1199               1 :             node: NodeId(1),
    1200               1 :         };
    1201               1 : 
    1202               1 :         state.wal_connection = Some(WalConnection {
    1203               1 :             started_at: now,
    1204               1 :             sk_id: NodeId(1),
    1205               1 :             availability_zone: None,
    1206               1 :             status: connection_status,
    1207               1 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
    1208               1 :                 sender
    1209               1 :                     .send(TaskStateUpdate::Progress(connection_status))
    1210               1 :                     .ok();
    1211               1 :                 Ok(())
    1212               1 :             }),
    1213               1 :             discovered_new_wal: None,
    1214               1 :         });
    1215               1 :         state.wal_stream_candidates = HashMap::from([(
    1216               1 :             NodeId(0),
    1217               1 :             dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1218               1 :         )]);
    1219               1 : 
    1220               1 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1221               1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1222               1 :         );
    1223               1 : 
    1224               1 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
    1225               1 :         match over_threshcurrent_candidate.reason {
    1226                 :             ReconnectReason::NoKeepAlives {
    1227               1 :                 last_keep_alive,
    1228               1 :                 threshold,
    1229               1 :                 ..
    1230               1 :             } => {
    1231               1 :                 assert_eq!(last_keep_alive, Some(time_over_threshold));
    1232               1 :                 assert_eq!(threshold, state.conf.lagging_wal_timeout);
    1233                 :             }
    1234 UBC           0 :             unexpected => panic!("Unexpected reason: {unexpected:?}"),
    1235                 :         }
    1236 CBC           1 :         assert_eq!(
    1237               1 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1238               1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1239               1 :         );
    1240                 : 
    1241               1 :         Ok(())
    1242                 :     }
    1243                 : 
    1244               1 :     #[tokio::test]
    1245               1 :     async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
    1246               1 :         let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate")?;
    1247               3 :         let mut state = dummy_state(&harness).await;
    1248               1 :         let current_lsn = Lsn(100_000).align();
    1249               1 :         let new_lsn = Lsn(100_100).align();
    1250               1 :         let now = Utc::now().naive_utc();
    1251                 : 
    1252               1 :         let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
    1253               1 :         let time_over_threshold =
    1254               1 :             Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
    1255               1 : 
    1256               1 :         let connection_status = WalConnectionStatus {
    1257               1 :             is_connected: true,
    1258               1 :             has_processed_wal: true,
    1259               1 :             latest_connection_update: now,
    1260               1 :             latest_wal_update: time_over_threshold,
    1261               1 :             commit_lsn: Some(current_lsn),
    1262               1 :             streaming_lsn: Some(current_lsn),
    1263               1 :             node: NodeId(1),
    1264               1 :         };
    1265               1 : 
    1266               1 :         state.wal_connection = Some(WalConnection {
    1267               1 :             started_at: now,
    1268               1 :             sk_id: NodeId(1),
    1269               1 :             availability_zone: None,
    1270               1 :             status: connection_status,
    1271               1 :             connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
    1272               1 :             discovered_new_wal: Some(NewCommittedWAL {
    1273               1 :                 discovered_at: time_over_threshold,
    1274               1 :                 lsn: new_lsn,
    1275               1 :             }),
    1276               1 :         });
    1277               1 :         state.wal_stream_candidates = HashMap::from([(
    1278               1 :             NodeId(0),
    1279               1 :             dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1280               1 :         )]);
    1281               1 : 
    1282               1 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1283               1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1284               1 :         );
    1285               1 : 
    1286               1 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
    1287               1 :         match over_threshcurrent_candidate.reason {
    1288                 :             ReconnectReason::NoWalTimeout {
    1289               1 :                 current_lsn,
    1290               1 :                 current_commit_lsn,
    1291               1 :                 candidate_commit_lsn,
    1292               1 :                 last_wal_interaction,
    1293               1 :                 threshold,
    1294               1 :                 ..
    1295               1 :             } => {
    1296               1 :                 assert_eq!(current_lsn, current_lsn);
    1297               1 :                 assert_eq!(current_commit_lsn, current_lsn);
    1298               1 :                 assert_eq!(candidate_commit_lsn, new_lsn);
    1299               1 :                 assert_eq!(last_wal_interaction, Some(time_over_threshold));
    1300               1 :                 assert_eq!(threshold, state.conf.lagging_wal_timeout);
    1301                 :             }
    1302 UBC           0 :             unexpected => panic!("Unexpected reason: {unexpected:?}"),
    1303                 :         }
    1304 CBC           1 :         assert_eq!(
    1305               1 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1306               1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1307               1 :         );
    1308                 : 
    1309               1 :         Ok(())
    1310                 :     }
    1311                 : 
    1312                 :     const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
    1313                 : 
    1314               8 :     async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
    1315               8 :         let (tenant, ctx) = harness.load().await;
    1316               8 :         let timeline = tenant
    1317               8 :             .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
    1318              16 :             .await
    1319               8 :             .expect("Failed to create an empty timeline for dummy wal connection manager");
    1320               8 : 
    1321               8 :         ConnectionManagerState {
    1322               8 :             id: TenantTimelineId {
    1323               8 :                 tenant_id: harness.tenant_id,
    1324               8 :                 timeline_id: TIMELINE_ID,
    1325               8 :             },
    1326               8 :             timeline,
    1327               8 :             conf: WalReceiverConf {
    1328               8 :                 wal_connect_timeout: Duration::from_secs(1),
    1329               8 :                 lagging_wal_timeout: Duration::from_secs(1),
    1330               8 :                 max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
    1331               8 :                 auth_token: None,
    1332               8 :                 availability_zone: None,
    1333               8 :             },
    1334               8 :             wal_connection: None,
    1335               8 :             wal_stream_candidates: HashMap::new(),
    1336               8 :             wal_connection_retries: HashMap::new(),
    1337               8 :         }
    1338               8 :     }
    1339                 : 
    1340               1 :     #[tokio::test]
    1341               1 :     async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
    1342               1 :         // Pageserver and one of safekeepers will be in the same availability zone
    1343               1 :         // and pageserver should prefer to connect to it.
    1344               1 :         let test_az = Some("test_az".to_owned());
    1345                 : 
    1346               1 :         let harness = TenantHarness::create("switch_to_same_availability_zone")?;
    1347               3 :         let mut state = dummy_state(&harness).await;
    1348               1 :         state.conf.availability_zone = test_az.clone();
    1349               1 :         let current_lsn = Lsn(100_000).align();
    1350               1 :         let now = Utc::now().naive_utc();
    1351               1 : 
    1352               1 :         let connected_sk_id = NodeId(0);
    1353               1 : 
    1354               1 :         let connection_status = WalConnectionStatus {
    1355               1 :             is_connected: true,
    1356               1 :             has_processed_wal: true,
    1357               1 :             latest_connection_update: now,
    1358               1 :             latest_wal_update: now,
    1359               1 :             commit_lsn: Some(current_lsn),
    1360               1 :             streaming_lsn: Some(current_lsn),
    1361               1 :             node: connected_sk_id,
    1362               1 :         };
    1363               1 : 
    1364               1 :         state.wal_connection = Some(WalConnection {
    1365               1 :             started_at: now,
    1366               1 :             sk_id: connected_sk_id,
    1367               1 :             availability_zone: None,
    1368               1 :             status: connection_status,
    1369               1 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
    1370               1 :                 sender
    1371               1 :                     .send(TaskStateUpdate::Progress(connection_status))
    1372               1 :                     .ok();
    1373               1 :                 Ok(())
    1374               1 :             }),
    1375               1 :             discovered_new_wal: None,
    1376               1 :         });
    1377               1 : 
    1378               1 :         // We have another safekeeper with the same commit_lsn, and it have the same availability zone as
    1379               1 :         // the current pageserver.
    1380               1 :         let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
    1381               1 :         same_az_sk.timeline.availability_zone = test_az.clone();
    1382               1 : 
    1383               1 :         state.wal_stream_candidates = HashMap::from([
    1384               1 :             (
    1385               1 :                 connected_sk_id,
    1386               1 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1387               1 :             ),
    1388               1 :             (NodeId(1), same_az_sk),
    1389               1 :         ]);
    1390               1 : 
    1391               1 :         // We expect that pageserver will switch to the safekeeper in the same availability zone,
    1392               1 :         // even if it has the same commit_lsn.
    1393               1 :         let next_candidate = state.next_connection_candidate().expect(
    1394               1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1395               1 :         );
    1396               1 : 
    1397               1 :         assert_eq!(next_candidate.safekeeper_id, NodeId(1));
    1398               1 :         assert_eq!(
    1399                 :             next_candidate.reason,
    1400                 :             ReconnectReason::SwitchAvailabilityZone,
    1401 UBC           0 :             "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
    1402                 :         );
    1403 CBC           1 :         assert_eq!(
    1404               1 :             next_candidate.wal_source_connconf.host(),
    1405               1 :             &Host::Domain("same_az".to_owned())
    1406               1 :         );
    1407                 : 
    1408               1 :         Ok(())
    1409                 :     }
    1410                 : }
        

Generated by: LCOV version 2.1-beta