LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline/walreceiver - connection_manager.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 93.8 % 1030 966
Test Date: 2024-02-07 07:37:29 Functions: 76.0 % 100 76

            Line data    Source code
       1              : //! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
       2              : //! that contains the latest WAL to stream and this connection does not go stale.
       3              : //!
       4              : //! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
       5              : //! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
       6              : //! Current connection state is tracked too, to ensure it's not getting stale.
       7              : //!
       8              : //! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
       9              : //! then a (re)connection happens, if necessary.
      10              : //! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
      11              : 
      12              : use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
      13              : 
      14              : use super::{TaskStateUpdate, WalReceiverConf};
      15              : use crate::context::{DownloadBehavior, RequestContext};
      16              : use crate::metrics::{
      17              :     WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
      18              :     WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
      19              : };
      20              : use crate::task_mgr::{shutdown_token, TaskKind};
      21              : use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
      22              : use anyhow::Context;
      23              : use chrono::{NaiveDateTime, Utc};
      24              : use pageserver_api::models::TimelineState;
      25              : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
      26              : use storage_broker::proto::SafekeeperTimelineInfo;
      27              : use storage_broker::proto::SubscribeSafekeeperInfoRequest;
      28              : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
      29              : use storage_broker::{BrokerClientChannel, Code, Streaming};
      30              : use tokio::select;
      31              : use tracing::*;
      32              : 
      33              : use postgres_connection::PgConnectionConfig;
      34              : use utils::backoff::{
      35              :     exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
      36              : };
      37              : use utils::postgres_client::wal_stream_connection_config;
      38              : use utils::{
      39              :     id::{NodeId, TenantTimelineId},
      40              :     lsn::Lsn,
      41              : };
      42              : 
      43              : use super::{
      44              :     walreceiver_connection::WalConnectionStatus, walreceiver_connection::WalReceiverError,
      45              :     TaskEvent, TaskHandle,
      46              : };
      47              : 
      48              : /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
      49              : /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
      50              : /// If storage broker subscription is cancelled, exits.
      51         1235 : pub(super) async fn connection_manager_loop_step(
      52         1235 :     broker_client: &mut BrokerClientChannel,
      53         1235 :     connection_manager_state: &mut ConnectionManagerState,
      54         1235 :     ctx: &RequestContext,
      55         1235 :     manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
      56         1235 : ) -> ControlFlow<(), ()> {
      57         1235 :     match connection_manager_state
      58         1235 :         .timeline
      59         1235 :         .wait_to_become_active(ctx)
      60           25 :         .await
      61              :     {
      62         1233 :         Ok(()) => {}
      63            2 :         Err(new_state) => {
      64            0 :             debug!(
      65            0 :                 ?new_state,
      66            0 :                 "state changed, stopping wal connection manager loop"
      67            0 :             );
      68            2 :             return ControlFlow::Break(());
      69              :         }
      70              :     }
      71              : 
      72         1233 :     WALRECEIVER_ACTIVE_MANAGERS.inc();
      73          543 :     scopeguard::defer! {
      74          543 :         WALRECEIVER_ACTIVE_MANAGERS.dec();
      75          543 :     }
      76              : 
      77         1233 :     let id = TenantTimelineId {
      78         1233 :         tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
      79         1233 :         timeline_id: connection_manager_state.timeline.timeline_id,
      80         1233 :     };
      81         1233 : 
      82         1233 :     let mut timeline_state_updates = connection_manager_state
      83         1233 :         .timeline
      84         1233 :         .subscribe_for_state_updates();
      85              : 
      86              :     // Subscribe to the broker updates. Stream shares underlying TCP connection
      87              :     // with other streams on this client (other connection managers). When
      88              :     // object goes out of scope, stream finishes in drop() automatically.
      89         2458 :     let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
      90            0 :     debug!("Subscribed for broker timeline updates");
      91              : 
      92              :     loop {
      93       737728 :         let time_until_next_retry = connection_manager_state.time_until_next_retry();
      94       737728 : 
      95       737728 :         // These things are happening concurrently:
      96       737728 :         //
      97       737728 :         //  - keep receiving WAL on the current connection
      98       737728 :         //      - if the shared state says we need to change connection, disconnect and return
      99       737728 :         //      - this runs in a separate task and we receive updates via a watch channel
     100       737728 :         //  - change connection if the rules decide so, or if the current connection dies
     101       737728 :         //  - receive updates from broker
     102       737728 :         //      - this might change the current desired connection
     103       737728 :         //  - timeline state changes to something that does not allow walreceiver to run concurrently
     104      1451442 :         select! {
     105       737577 :             Some(wal_connection_update) = async {
     106       737577 :                 match connection_manager_state.wal_connection.as_mut() {
     107       735449 :                     Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
     108         2128 :                     None => None,
     109              :                 }
     110       729793 :             } => {
     111              :                 let wal_connection = connection_manager_state.wal_connection.as_mut()
     112              :                     .expect("Should have a connection, as checked by the corresponding select! guard");
     113              :                 match wal_connection_update {
     114              :                     TaskEvent::Update(TaskStateUpdate::Started) => {},
     115              :                     TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
     116              :                         if new_status.has_processed_wal {
     117              :                             // We have advanced last_record_lsn by processing the WAL received
     118              :                             // from this safekeeper. This is good enough to clean unsuccessful
     119              :                             // retries history and allow reconnecting to this safekeeper without
     120              :                             // sleeping for a long time.
     121              :                             connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
     122              :                         }
     123              :                         wal_connection.status = new_status;
     124              :                     }
     125              :                     TaskEvent::End(walreceiver_task_result) => {
     126              :                         match walreceiver_task_result {
     127            0 :                             Ok(()) => debug!("WAL receiving task finished"),
     128           14 :                             Err(e) => error!("wal receiver task finished with an error: {e:?}"),
     129              :                         }
     130              :                         connection_manager_state.drop_old_connection(false).await;
     131              :                     },
     132              :                 }
     133              :             },
     134              : 
     135              :             // Got a new update from the broker
     136         8230 :             broker_update = broker_subscription.message() => {
     137              :                 match broker_update {
     138              :                     Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
     139              :                     Err(status) => {
     140              :                         match status.code() {
     141              :                             Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") => {
     142              :                                 // tonic's error handling doesn't provide a clear code for disconnections: we get
     143              :                                 // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
     144            0 :                                 info!("broker disconnected: {status}");
     145              :                             },
     146              :                             _ => {
     147            0 :                                 warn!("broker subscription failed: {status}");
     148              :                             }
     149              :                         }
     150              :                         return ControlFlow::Continue(());
     151              :                     }
     152              :                     Ok(None) => {
     153            0 :                         error!("broker subscription stream ended"); // can't happen
     154              :                         return ControlFlow::Continue(());
     155              :                     }
     156              :                 }
     157              :             },
     158              : 
     159       727119 :             new_event = async {
     160       727119 :                 loop {
     161       727119 :                     if connection_manager_state.timeline.current_state() == TimelineState::Loading {
     162            0 :                         warn!("wal connection manager should only be launched after timeline has become active");
     163       727119 :                     }
     164       727119 :                     match timeline_state_updates.changed().await {
     165              :                         Ok(()) => {
     166          481 :                             let new_state = connection_manager_state.timeline.current_state();
     167          481 :                             match new_state {
     168              :                                 // we're already active as walreceiver, no need to reactivate
     169            0 :                                 TimelineState::Active => continue,
     170              :                                 TimelineState::Broken { .. } | TimelineState::Stopping => {
     171          481 :                                     debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
     172          481 :                                     return ControlFlow::Break(());
     173              :                                 }
     174              :                                 TimelineState::Loading => {
     175            0 :                                     warn!("timeline transitioned back to Loading state, that should not happen");
     176            0 :                                     return ControlFlow::Continue(());
     177              :                                 }
     178              :                             }
     179              :                         }
     180            0 :                         Err(_sender_dropped_error) => return ControlFlow::Break(()),
     181              :                     }
     182              :                 }
     183          481 :             } => match new_event {
     184              :                 ControlFlow::Continue(()) => {
     185              :                     return ControlFlow::Continue(());
     186              :                 }
     187              :                 ControlFlow::Break(()) => {
     188            0 :                     debug!("Timeline is no longer active, stopping wal connection manager loop");
     189              :                     return ControlFlow::Break(());
     190              :                 }
     191              :             },
     192              : 
     193       732421 :             Some(()) = async {
     194       732421 :                 match time_until_next_retry {
     195         1780 :                     Some(sleep_time) => {
     196         1780 :                         tokio::time::sleep(sleep_time).await;
     197          621 :                         Some(())
     198              :                     },
     199              :                     None => {
     200       730641 :                         debug!("No candidates to retry, waiting indefinitely for the broker events");
     201       730641 :                         None
     202              :                     }
     203              :                 }
     204       731262 :             } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
     205              :         }
     206              : 
     207       736514 :         if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
     208         1702 :             info!("Switching to new connection candidate: {new_candidate:?}");
     209         1702 :             connection_manager_state
     210         1702 :                 .change_connection(new_candidate, ctx)
     211           16 :                 .await
     212       734812 :         }
     213       736514 :         *manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
     214              :     }
     215          483 : }
     216              : 
     217              : /// Endlessly try to subscribe for broker updates for a given timeline.
     218         1233 : async fn subscribe_for_timeline_updates(
     219         1233 :     broker_client: &mut BrokerClientChannel,
     220         1233 :     id: TenantTimelineId,
     221         1233 : ) -> Streaming<SafekeeperTimelineInfo> {
     222         1233 :     let mut attempt = 0;
     223         1233 :     let cancel = shutdown_token();
     224              : 
     225              :     loop {
     226         1233 :         exponential_backoff(
     227         1233 :             attempt,
     228         1233 :             DEFAULT_BASE_BACKOFF_SECONDS,
     229         1233 :             DEFAULT_MAX_BACKOFF_SECONDS,
     230         1233 :             &cancel,
     231         1233 :         )
     232            0 :         .await;
     233         1233 :         attempt += 1;
     234         1233 : 
     235         1233 :         // subscribe to the specific timeline
     236         1233 :         let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
     237         1233 :             tenant_id: id.tenant_id.as_ref().to_owned(),
     238         1233 :             timeline_id: id.timeline_id.as_ref().to_owned(),
     239         1233 :         });
     240         1233 :         let request = SubscribeSafekeeperInfoRequest {
     241         1233 :             subscription_key: Some(key),
     242         1233 :         };
     243         1233 : 
     244         2458 :         match broker_client.subscribe_safekeeper_info(request).await {
     245         1214 :             Ok(resp) => {
     246         1214 :                 return resp.into_inner();
     247              :             }
     248            0 :             Err(e) => {
     249              :                 // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
     250              :                 // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
     251            0 :                 info!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}");
     252            0 :                 continue;
     253              :             }
     254              :         }
     255              :     }
     256         1214 : }
     257              : 
     258              : const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
     259              : const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
     260              : const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
     261              : 
     262              : /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
     263              : pub(super) struct ConnectionManagerState {
     264              :     id: TenantTimelineId,
     265              :     /// Use pageserver data about the timeline to filter out some of the safekeepers.
     266              :     timeline: Arc<Timeline>,
     267              :     conf: WalReceiverConf,
     268              :     /// Current connection to safekeeper for WAL streaming.
     269              :     wal_connection: Option<WalConnection>,
     270              :     /// Info about retries and unsuccessful attempts to connect to safekeepers.
     271              :     wal_connection_retries: HashMap<NodeId, RetryInfo>,
     272              :     /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
     273              :     wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
     274              : }
     275              : 
     276              : /// An information about connection manager's current connection and connection candidates.
     277         1794 : #[derive(Debug, Clone)]
     278              : pub struct ConnectionManagerStatus {
     279              :     existing_connection: Option<WalConnectionStatus>,
     280              :     wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
     281              : }
     282              : 
     283              : impl ConnectionManagerStatus {
     284              :     /// Generates a string, describing current connection status in a form, suitable for logging.
     285         1794 :     pub fn to_human_readable_string(&self) -> String {
     286         1794 :         let mut resulting_string = String::new();
     287         1794 :         match &self.existing_connection {
     288         1751 :             Some(connection) => {
     289         1751 :                 if connection.has_processed_wal {
     290         1694 :                     resulting_string.push_str(&format!(
     291         1694 :                         " (update {}): streaming WAL from node {}, ",
     292         1694 :                         connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
     293         1694 :                         connection.node,
     294         1694 :                     ));
     295         1694 : 
     296         1694 :                     match (connection.streaming_lsn, connection.commit_lsn) {
     297            0 :                         (None, None) => resulting_string.push_str("no streaming data"),
     298            0 :                         (None, Some(commit_lsn)) => {
     299            0 :                             resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
     300              :                         }
     301            0 :                         (Some(streaming_lsn), None) => {
     302            0 :                             resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
     303              :                         }
     304         1694 :                         (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
     305         1694 :                             &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
     306         1694 :                         ),
     307              :                     }
     308           57 :                 } else if connection.is_connected {
     309           57 :                     resulting_string.push_str(&format!(
     310           57 :                         " (update {}): connecting to node {}",
     311           57 :                         connection
     312           57 :                             .latest_connection_update
     313           57 :                             .format("%Y-%m-%d %H:%M:%S"),
     314           57 :                         connection.node,
     315           57 :                     ));
     316           57 :                 } else {
     317            0 :                     resulting_string.push_str(&format!(
     318            0 :                         " (update {}): initializing node {} connection",
     319            0 :                         connection
     320            0 :                             .latest_connection_update
     321            0 :                             .format("%Y-%m-%d %H:%M:%S"),
     322            0 :                         connection.node,
     323            0 :                     ));
     324            0 :                 }
     325              :             }
     326           43 :             None => resulting_string.push_str(": disconnected"),
     327              :         }
     328              : 
     329         1794 :         resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
     330         1794 :         let mut candidates = self.wal_stream_candidates.iter().peekable();
     331         3731 :         while let Some((node_id, candidate_info)) = candidates.next() {
     332         1937 :             resulting_string.push_str(&format!(
     333         1937 :                 "({}|{}|{})",
     334         1937 :                 node_id,
     335         1937 :                 candidate_info.latest_update.format("%H:%M:%S"),
     336         1937 :                 Lsn(candidate_info.timeline.commit_lsn)
     337         1937 :             ));
     338         1937 :             if candidates.peek().is_some() {
     339          144 :                 resulting_string.push_str(", ");
     340         1793 :             }
     341              :         }
     342         1794 :         resulting_string.push(']');
     343         1794 : 
     344         1794 :         resulting_string
     345         1794 :     }
     346              : }
     347              : 
     348              : /// Current connection data.
     349            0 : #[derive(Debug)]
     350              : struct WalConnection {
     351              :     /// Time when the connection was initiated.
     352              :     started_at: NaiveDateTime,
     353              :     /// Current safekeeper pageserver is connected to for WAL streaming.
     354              :     sk_id: NodeId,
     355              :     /// Availability zone of the safekeeper.
     356              :     availability_zone: Option<String>,
     357              :     /// Status of the connection.
     358              :     status: WalConnectionStatus,
     359              :     /// WAL streaming task handle.
     360              :     connection_task: TaskHandle<WalConnectionStatus>,
     361              :     /// Have we discovered that other safekeeper has more recent WAL than we do?
     362              :     discovered_new_wal: Option<NewCommittedWAL>,
     363              : }
     364              : 
     365              : /// Notion of a new committed WAL, which exists on other safekeeper.
     366            0 : #[derive(Debug, Clone, Copy)]
     367              : struct NewCommittedWAL {
     368              :     /// LSN of the new committed WAL.
     369              :     lsn: Lsn,
     370              :     /// When we discovered that the new committed WAL exists on other safekeeper.
     371              :     discovered_at: NaiveDateTime,
     372              : }
     373              : 
     374            0 : #[derive(Debug, Clone, Copy)]
     375              : struct RetryInfo {
     376              :     next_retry_at: Option<NaiveDateTime>,
     377              :     retry_duration_seconds: f64,
     378              : }
     379              : 
     380              : /// Data about the timeline to connect to, received from the broker.
     381      1159378 : #[derive(Debug, Clone)]
     382              : struct BrokerSkTimeline {
     383              :     timeline: SafekeeperTimelineInfo,
     384              :     /// Time at which the data was fetched from the broker last time, to track the stale data.
     385              :     latest_update: NaiveDateTime,
     386              : }
     387              : 
     388              : impl ConnectionManagerState {
     389         1235 :     pub(super) fn new(timeline: Arc<Timeline>, conf: WalReceiverConf) -> Self {
     390         1235 :         let id = TenantTimelineId {
     391         1235 :             tenant_id: timeline.tenant_shard_id.tenant_id,
     392         1235 :             timeline_id: timeline.timeline_id,
     393         1235 :         };
     394         1235 :         Self {
     395         1235 :             id,
     396         1235 :             timeline,
     397         1235 :             conf,
     398         1235 :             wal_connection: None,
     399         1235 :             wal_stream_candidates: HashMap::new(),
     400         1235 :             wal_connection_retries: HashMap::new(),
     401         1235 :         }
     402         1235 :     }
     403              : 
     404              :     /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
     405         1702 :     async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
     406         1702 :         WALRECEIVER_SWITCHES
     407         1702 :             .with_label_values(&[new_sk.reason.name()])
     408         1702 :             .inc();
     409         1702 : 
     410         1702 :         self.drop_old_connection(true).await;
     411              : 
     412         1702 :         let node_id = new_sk.safekeeper_id;
     413         1702 :         let connect_timeout = self.conf.wal_connect_timeout;
     414         1702 :         let ingest_batch_size = self.conf.ingest_batch_size;
     415         1702 :         let timeline = Arc::clone(&self.timeline);
     416         1702 :         let ctx = ctx.detached_child(
     417         1702 :             TaskKind::WalReceiverConnectionHandler,
     418         1702 :             DownloadBehavior::Download,
     419         1702 :         );
     420              : 
     421         1702 :         let span = info_span!("connection", %node_id);
     422         1702 :         let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
     423         1702 :             async move {
     424         1702 :                 debug_assert_current_span_has_tenant_and_timeline_id();
     425              : 
     426         1702 :                 let res = super::walreceiver_connection::handle_walreceiver_connection(
     427         1702 :                     timeline,
     428         1702 :                     new_sk.wal_source_connconf,
     429         1702 :                     events_sender,
     430         1702 :                     cancellation.clone(),
     431         1702 :                     connect_timeout,
     432         1702 :                     ctx,
     433         1702 :                     node_id,
     434         1702 :                     ingest_batch_size,
     435         1702 :                 )
     436       849884 :                 .await;
     437              : 
     438         1660 :                 match res {
     439          205 :                     Ok(()) => Ok(()),
     440         1455 :                     Err(e) => {
     441         1455 :                         match e {
     442           54 :                             WalReceiverError::SuccessfulCompletion(msg) => {
     443           54 :                                 info!("walreceiver connection handling ended with success: {msg}");
     444           54 :                                 Ok(())
     445              :                             }
     446         1383 :                             WalReceiverError::ExpectedSafekeeperError(e) => {
     447         1383 :                                 info!("walreceiver connection handling ended: {e}");
     448         1383 :                                 Ok(())
     449              :                             }
     450           18 :                             WalReceiverError::Other(e) => {
     451           18 :                                 // give out an error to have task_mgr give it a really verbose logging
     452           18 :                                 if cancellation.is_cancelled() {
     453              :                                     // Ideally we would learn about this via some path other than Other, but
     454              :                                     // that requires refactoring all the intermediate layers of ingest code
     455              :                                     // that only emit anyhow::Error
     456            3 :                                     Ok(())
     457              :                                 } else {
     458           15 :                                     Err(e).context("walreceiver connection handling failure")
     459              :                                 }
     460              :                             }
     461              :                         }
     462              :                     }
     463              :                 }
     464         1660 :             }
     465         1702 :             .instrument(span)
     466         1702 :         });
     467         1702 : 
     468         1702 :         let now = Utc::now().naive_utc();
     469         1702 :         self.wal_connection = Some(WalConnection {
     470         1702 :             started_at: now,
     471         1702 :             sk_id: new_sk.safekeeper_id,
     472         1702 :             availability_zone: new_sk.availability_zone,
     473         1702 :             status: WalConnectionStatus {
     474         1702 :                 is_connected: false,
     475         1702 :                 has_processed_wal: false,
     476         1702 :                 latest_connection_update: now,
     477         1702 :                 latest_wal_update: now,
     478         1702 :                 streaming_lsn: None,
     479         1702 :                 commit_lsn: None,
     480         1702 :                 node: node_id,
     481         1702 :             },
     482         1702 :             connection_task: connection_handle,
     483         1702 :             discovered_new_wal: None,
     484         1702 :         });
     485         1702 :     }
     486              : 
     487              :     /// Drops the current connection (if any) and updates retry timeout for the next
     488              :     /// connection attempt to the same safekeeper.
     489         3081 :     async fn drop_old_connection(&mut self, needs_shutdown: bool) {
     490         3081 :         let wal_connection = match self.wal_connection.take() {
     491         1395 :             Some(wal_connection) => wal_connection,
     492         1686 :             None => return,
     493              :         };
     494              : 
     495         1395 :         if needs_shutdown {
     496           16 :             wal_connection.connection_task.shutdown().await;
     497         1379 :         }
     498              : 
     499         1395 :         let retry = self
     500         1395 :             .wal_connection_retries
     501         1395 :             .entry(wal_connection.sk_id)
     502         1395 :             .or_insert(RetryInfo {
     503         1395 :                 next_retry_at: None,
     504         1395 :                 retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
     505         1395 :             });
     506         1395 : 
     507         1395 :         let now = Utc::now().naive_utc();
     508         1395 : 
     509         1395 :         // Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
     510         1395 :         // and we add backoff to the time when we started the connection attempt. If the connection
     511         1395 :         // was active for a long time, then next_retry_at will be in the past.
     512         1395 :         retry.next_retry_at =
     513         1395 :             wal_connection
     514         1395 :                 .started_at
     515         1395 :                 .checked_add_signed(chrono::Duration::milliseconds(
     516         1395 :                     (retry.retry_duration_seconds * 1000.0) as i64,
     517         1395 :                 ));
     518              : 
     519         1395 :         if let Some(next) = &retry.next_retry_at {
     520         1395 :             if next > &now {
     521          999 :                 info!(
     522          999 :                     "Next connection retry to {:?} is at {}",
     523          999 :                     wal_connection.sk_id, next
     524          999 :                 );
     525          396 :             }
     526            0 :         }
     527              : 
     528         1395 :         let next_retry_duration =
     529         1395 :             retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
     530         1395 :         // Clamp the next retry duration to the maximum allowed.
     531         1395 :         let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
     532         1395 :         // Clamp the next retry duration to the minimum allowed.
     533         1395 :         let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
     534         1395 : 
     535         1395 :         retry.retry_duration_seconds = next_retry_duration;
     536         3081 :     }
     537              : 
     538              :     /// Returns time needed to wait to have a new candidate for WAL streaming.
     539       737728 :     fn time_until_next_retry(&self) -> Option<Duration> {
     540       737728 :         let now = Utc::now().naive_utc();
     541              : 
     542       737728 :         let next_retry_at = self
     543       737728 :             .wal_connection_retries
     544       737728 :             .values()
     545       737728 :             .filter_map(|retry| retry.next_retry_at)
     546       737728 :             .filter(|next_retry_at| next_retry_at > &now)
     547       737728 :             .min()?;
     548              : 
     549         1783 :         (next_retry_at - now).to_std().ok()
     550       737728 :     }
     551              : 
     552              :     /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
     553         8230 :     fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
     554         8230 :         WALRECEIVER_BROKER_UPDATES.inc();
     555         8230 : 
     556         8230 :         info!(
     557         8230 :             "Safekeeper info update: standby_horizon(cutoff)={}",
     558         8230 :             timeline_update.standby_horizon
     559         8230 :         );
     560         8230 :         if timeline_update.standby_horizon != 0 {
     561          204 :             // ignore reports from safekeepers mnot connected to replicas
     562          204 :             self.timeline
     563          204 :                 .standby_horizon
     564          204 :                 .store(Lsn(timeline_update.standby_horizon));
     565         8026 :         }
     566              : 
     567         8230 :         let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
     568         8230 :         let old_entry = self.wal_stream_candidates.insert(
     569         8230 :             new_safekeeper_id,
     570         8230 :             BrokerSkTimeline {
     571         8230 :                 timeline: timeline_update,
     572         8230 :                 latest_update: Utc::now().naive_utc(),
     573         8230 :             },
     574         8230 :         );
     575         8230 :         if old_entry.is_none() {
     576          670 :             info!("New SK node was added: {new_safekeeper_id}");
     577          670 :             WALRECEIVER_CANDIDATES_ADDED.inc();
     578         7560 :         }
     579         8230 :     }
     580              : 
     581              :     /// Cleans up stale broker records and checks the rest for the new connection candidate.
     582              :     /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
     583              :     /// The current rules for approving new candidates:
     584              :     /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
     585              :     /// * if there's no such entry, no new candidate found, abort
     586              :     /// * otherwise check if the candidate is much better than the current one
     587              :     ///
     588              :     /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
     589              :     /// General rules are following:
     590              :     /// * if connected safekeeper is not present, pick the candidate
     591              :     /// * if we haven't received any updates for some time, pick the candidate
     592              :     /// * if the candidate commit_lsn is much higher than the current one, pick the candidate
     593              :     /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
     594              :     /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
     595              :     ///
     596              :     /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
     597              :     /// Both thresholds are configured per tenant.
     598       736532 :     fn next_connection_candidate(&mut self) -> Option<NewWalConnectionCandidate> {
     599       736532 :         self.cleanup_old_candidates();
     600       736532 : 
     601       736532 :         match &self.wal_connection {
     602       733924 :             Some(existing_wal_connection) => {
     603       733924 :                 let connected_sk_node = existing_wal_connection.sk_id;
     604              : 
     605       232806 :                 let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
     606       733924 :                     self.select_connection_candidate(Some(connected_sk_node))?;
     607       232806 :                 let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
     608       232806 : 
     609       232806 :                 let now = Utc::now().naive_utc();
     610       232806 :                 if let Ok(latest_interaciton) =
     611       232806 :                     (now - existing_wal_connection.status.latest_connection_update).to_std()
     612              :                 {
     613              :                     // Drop connection if we haven't received keepalive message for a while.
     614       232806 :                     if latest_interaciton > self.conf.wal_connect_timeout {
     615            2 :                         return Some(NewWalConnectionCandidate {
     616            2 :                             safekeeper_id: new_sk_id,
     617            2 :                             wal_source_connconf: new_wal_source_connconf,
     618            2 :                             availability_zone: new_availability_zone,
     619            2 :                             reason: ReconnectReason::NoKeepAlives {
     620            2 :                                 last_keep_alive: Some(
     621            2 :                                     existing_wal_connection.status.latest_connection_update,
     622            2 :                                 ),
     623            2 :                                 check_time: now,
     624            2 :                                 threshold: self.conf.wal_connect_timeout,
     625            2 :                             },
     626            2 :                         });
     627       232804 :                     }
     628            0 :                 }
     629              : 
     630       232804 :                 if !existing_wal_connection.status.is_connected {
     631              :                     // We haven't connected yet and we shouldn't switch until connection timeout (condition above).
     632          211 :                     return None;
     633       232593 :                 }
     634              : 
     635       232593 :                 if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
     636       232502 :                     let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
     637       232502 :                     // Check if the new candidate has much more WAL than the current one.
     638       232502 :                     match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
     639         9170 :                         Some(new_sk_lsn_advantage) => {
     640         9170 :                             if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
     641           18 :                                 return Some(NewWalConnectionCandidate {
     642           18 :                                     safekeeper_id: new_sk_id,
     643           18 :                                     wal_source_connconf: new_wal_source_connconf,
     644           18 :                                     availability_zone: new_availability_zone,
     645           18 :                                     reason: ReconnectReason::LaggingWal {
     646           18 :                                         current_commit_lsn,
     647           18 :                                         new_commit_lsn,
     648           18 :                                         threshold: self.conf.max_lsn_wal_lag,
     649           18 :                                     },
     650           18 :                                 });
     651         9152 :                             }
     652         9152 :                             // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
     653         9152 :                             // and the current one is not, switch to the new one.
     654         9152 :                             if self.conf.availability_zone.is_some()
     655          184 :                                 && existing_wal_connection.availability_zone
     656          184 :                                     != self.conf.availability_zone
     657          184 :                                 && self.conf.availability_zone == new_availability_zone
     658              :                             {
     659            2 :                                 return Some(NewWalConnectionCandidate {
     660            2 :                                     safekeeper_id: new_sk_id,
     661            2 :                                     availability_zone: new_availability_zone,
     662            2 :                                     wal_source_connconf: new_wal_source_connconf,
     663            2 :                                     reason: ReconnectReason::SwitchAvailabilityZone,
     664            2 :                                 });
     665         9150 :                             }
     666              :                         }
     667       223332 :                         None => debug!(
     668            0 :                             "Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
     669            0 :                         ),
     670              :                     }
     671           91 :                 }
     672              : 
     673       232573 :                 let current_lsn = match existing_wal_connection.status.streaming_lsn {
     674       232315 :                     Some(lsn) => lsn,
     675          258 :                     None => self.timeline.get_last_record_lsn(),
     676              :                 };
     677       232573 :                 let current_commit_lsn = existing_wal_connection
     678       232573 :                     .status
     679       232573 :                     .commit_lsn
     680       232573 :                     .unwrap_or(current_lsn);
     681       232573 :                 let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
     682       232573 : 
     683       232573 :                 // Keep discovered_new_wal only if connected safekeeper has not caught up yet.
     684       232573 :                 let mut discovered_new_wal = existing_wal_connection
     685       232573 :                     .discovered_new_wal
     686       232573 :                     .filter(|new_wal| new_wal.lsn > current_commit_lsn);
     687       232573 : 
     688       232573 :                 if discovered_new_wal.is_none() {
     689              :                     // Check if the new candidate has more WAL than the current one.
     690              :                     // If the new candidate has more WAL than the current one, we consider switching to the new candidate.
     691       228409 :                     discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
     692          144 :                         trace!(
     693            0 :                             "New candidate has commit_lsn {}, higher than current_commit_lsn {}",
     694            0 :                             candidate_commit_lsn,
     695            0 :                             current_commit_lsn
     696            0 :                         );
     697          144 :                         Some(NewCommittedWAL {
     698          144 :                             lsn: candidate_commit_lsn,
     699          144 :                             discovered_at: Utc::now().naive_utc(),
     700          144 :                         })
     701              :                     } else {
     702       228265 :                         None
     703              :                     };
     704         4164 :                 }
     705              : 
     706       232573 :                 let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
     707              :                     // Connected safekeeper has more WAL, but we haven't received updates for some time.
     708         8876 :                     trace!(
     709            0 :                         "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
     710            0 :                         (now - existing_wal_connection.status.latest_wal_update).to_std(),
     711            0 :                         current_lsn,
     712            0 :                         current_commit_lsn
     713            0 :                     );
     714         8876 :                     Some(existing_wal_connection.status.latest_wal_update)
     715              :                 } else {
     716       223697 :                     discovered_new_wal.as_ref().map(|new_wal| {
     717         1025 :                         // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
     718         1025 :                         new_wal
     719         1025 :                             .discovered_at
     720         1025 :                             .max(existing_wal_connection.status.latest_wal_update)
     721       223697 :                     })
     722              :                 };
     723              : 
     724              :                 // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
     725       232573 :                 if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
     726         9901 :                     if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
     727         9841 :                         if candidate_commit_lsn > current_commit_lsn
     728         4248 :                             && waiting_for_new_wal > self.conf.lagging_wal_timeout
     729              :                         {
     730            2 :                             return Some(NewWalConnectionCandidate {
     731            2 :                                 safekeeper_id: new_sk_id,
     732            2 :                                 wal_source_connconf: new_wal_source_connconf,
     733            2 :                                 availability_zone: new_availability_zone,
     734            2 :                                 reason: ReconnectReason::NoWalTimeout {
     735            2 :                                     current_lsn,
     736            2 :                                     current_commit_lsn,
     737            2 :                                     candidate_commit_lsn,
     738            2 :                                     last_wal_interaction: Some(
     739            2 :                                         existing_wal_connection.status.latest_wal_update,
     740            2 :                                     ),
     741            2 :                                     check_time: now,
     742            2 :                                     threshold: self.conf.lagging_wal_timeout,
     743            2 :                                 },
     744            2 :                             });
     745         9839 :                         }
     746           60 :                     }
     747       222672 :                 }
     748              : 
     749       232571 :                 self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
     750              :             }
     751              :             None => {
     752         1692 :                 let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
     753         2608 :                     self.select_connection_candidate(None)?;
     754         1692 :                 return Some(NewWalConnectionCandidate {
     755         1692 :                     safekeeper_id: new_sk_id,
     756         1692 :                     availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
     757         1692 :                     wal_source_connconf: new_wal_source_connconf,
     758         1692 :                     reason: ReconnectReason::NoExistingConnection,
     759         1692 :                 });
     760              :             }
     761              :         }
     762              : 
     763       232571 :         None
     764       736532 :     }
     765              : 
     766              :     /// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
     767              :     /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
     768              :     ///
     769              :     /// The candidate that is chosen:
     770              :     /// * has no pending retry cooldown
     771              :     /// * has greatest commit_lsn among the ones that are left
     772       736532 :     fn select_connection_candidate(
     773       736532 :         &self,
     774       736532 :         node_to_omit: Option<NodeId>,
     775       736532 :     ) -> Option<(NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
     776       736532 :         self.applicable_connection_candidates()
     777      1155118 :             .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
     778       736532 :             .max_by_key(|(_, info, _)| info.commit_lsn)
     779       736532 :     }
     780              : 
     781              :     /// Returns a list of safekeepers that have valid info and ready for connection.
     782              :     /// Some safekeepers are filtered by the retry cooldown.
     783       736532 :     fn applicable_connection_candidates(
     784       736532 :         &self,
     785       736532 :     ) -> impl Iterator<Item = (NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
     786       736532 :         let now = Utc::now().naive_utc();
     787       736532 : 
     788       736532 :         self.wal_stream_candidates
     789       736532 :             .iter()
     790      1157477 :             .filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
     791      1157462 :             .filter(move |(sk_id, _)| {
     792      1157462 :                 let next_retry_at = self
     793      1157462 :                     .wal_connection_retries
     794      1157462 :                     .get(sk_id)
     795      1157462 :                     .and_then(|retry_info| {
     796       207854 :                         retry_info.next_retry_at
     797      1157462 :                     });
     798      1157462 : 
     799      1157462 :                 next_retry_at.is_none() || next_retry_at.unwrap() <= now
     800      1157462 :             }).filter_map(|(sk_id, broker_info)| {
     801      1155122 :                 let info = &broker_info.timeline;
     802      1155122 :                 if info.safekeeper_connstr.is_empty() {
     803            4 :                     return None; // no connection string, ignore sk
     804      1155118 :                 }
     805      1155118 :                 match wal_stream_connection_config(
     806      1155118 :                     self.id,
     807      1155118 :                     info.safekeeper_connstr.as_ref(),
     808      1155118 :                     match &self.conf.auth_token {
     809      1154012 :                         None => None,
     810         1106 :                         Some(x) => Some(x),
     811              :                     },
     812      1155118 :                     self.conf.availability_zone.as_deref(),
     813              :                 ) {
     814      1155118 :                     Ok(connstr) => Some((*sk_id, info, connstr)),
     815            0 :                     Err(e) => {
     816            0 :                         error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
     817            0 :                         None
     818              :                     }
     819              :                 }
     820      1155122 :             })
     821       736532 :     }
     822              : 
     823              :     /// Remove candidates which haven't sent broker updates for a while.
     824       736532 :     fn cleanup_old_candidates(&mut self) {
     825       736532 :         let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
     826       736532 :         let lagging_wal_timeout = self.conf.lagging_wal_timeout;
     827       736532 : 
     828      1157488 :         self.wal_stream_candidates.retain(|node_id, broker_info| {
     829      1157488 :             if let Ok(time_since_latest_broker_update) =
     830      1157488 :                 (Utc::now().naive_utc() - broker_info.latest_update).to_std()
     831              :             {
     832      1157488 :                 let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
     833      1157488 :                 if !should_retain {
     834           11 :                     node_ids_to_remove.push(*node_id);
     835      1157477 :                 }
     836      1157488 :                 should_retain
     837              :             } else {
     838            0 :                 true
     839              :             }
     840      1157488 :         });
     841       736532 : 
     842       736532 :         if !node_ids_to_remove.is_empty() {
     843           22 :             for node_id in node_ids_to_remove {
     844           11 :                 info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
     845           11 :                 self.wal_connection_retries.remove(&node_id);
     846           11 :                 WALRECEIVER_CANDIDATES_REMOVED.inc();
     847              :             }
     848       736521 :         }
     849       736532 :     }
     850              : 
     851          545 :     pub(super) async fn shutdown(mut self) {
     852          545 :         if let Some(wal_connection) = self.wal_connection.take() {
     853          290 :             wal_connection.connection_task.shutdown().await;
     854          282 :         }
     855          545 :     }
     856              : 
     857       736514 :     fn manager_status(&self) -> ConnectionManagerStatus {
     858       736514 :         ConnectionManagerStatus {
     859       736514 :             existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
     860       736514 :             wal_stream_candidates: self.wal_stream_candidates.clone(),
     861       736514 :         }
     862       736514 :     }
     863              : }
     864              : 
     865         1702 : #[derive(Debug)]
     866              : struct NewWalConnectionCandidate {
     867              :     safekeeper_id: NodeId,
     868              :     wal_source_connconf: PgConnectionConfig,
     869              :     availability_zone: Option<String>,
     870              :     reason: ReconnectReason,
     871              : }
     872              : 
     873              : /// Stores the reason why WAL connection was switched, for furter debugging purposes.
     874         1702 : #[derive(Debug, PartialEq, Eq)]
     875              : enum ReconnectReason {
     876              :     NoExistingConnection,
     877              :     LaggingWal {
     878              :         current_commit_lsn: Lsn,
     879              :         new_commit_lsn: Lsn,
     880              :         threshold: NonZeroU64,
     881              :     },
     882              :     SwitchAvailabilityZone,
     883              :     NoWalTimeout {
     884              :         current_lsn: Lsn,
     885              :         current_commit_lsn: Lsn,
     886              :         candidate_commit_lsn: Lsn,
     887              :         last_wal_interaction: Option<NaiveDateTime>,
     888              :         check_time: NaiveDateTime,
     889              :         threshold: Duration,
     890              :     },
     891              :     NoKeepAlives {
     892              :         last_keep_alive: Option<NaiveDateTime>,
     893              :         check_time: NaiveDateTime,
     894              :         threshold: Duration,
     895              :     },
     896              : }
     897              : 
     898              : impl ReconnectReason {
     899         1702 :     fn name(&self) -> &str {
     900         1702 :         match self {
     901         1686 :             ReconnectReason::NoExistingConnection => "NoExistingConnection",
     902           16 :             ReconnectReason::LaggingWal { .. } => "LaggingWal",
     903            0 :             ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
     904            0 :             ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
     905            0 :             ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
     906              :         }
     907         1702 :     }
     908              : }
     909              : 
     910              : #[cfg(test)]
     911              : mod tests {
     912              :     use super::*;
     913              :     use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
     914              :     use url::Host;
     915              : 
     916           38 :     fn dummy_broker_sk_timeline(
     917           38 :         commit_lsn: u64,
     918           38 :         safekeeper_connstr: &str,
     919           38 :         latest_update: NaiveDateTime,
     920           38 :     ) -> BrokerSkTimeline {
     921           38 :         BrokerSkTimeline {
     922           38 :             timeline: SafekeeperTimelineInfo {
     923           38 :                 safekeeper_id: 0,
     924           38 :                 tenant_timeline_id: None,
     925           38 :                 term: 0,
     926           38 :                 last_log_term: 0,
     927           38 :                 flush_lsn: 0,
     928           38 :                 commit_lsn,
     929           38 :                 backup_lsn: 0,
     930           38 :                 remote_consistent_lsn: 0,
     931           38 :                 peer_horizon_lsn: 0,
     932           38 :                 local_start_lsn: 0,
     933           38 :                 standby_horizon: 0,
     934           38 :                 safekeeper_connstr: safekeeper_connstr.to_owned(),
     935           38 :                 http_connstr: safekeeper_connstr.to_owned(),
     936           38 :                 availability_zone: None,
     937           38 :             },
     938           38 :             latest_update,
     939           38 :         }
     940           38 :     }
     941              : 
     942            2 :     #[tokio::test]
     943            2 :     async fn no_connection_no_candidate() -> anyhow::Result<()> {
     944            2 :         let harness = TenantHarness::create("no_connection_no_candidate")?;
     945            8 :         let mut state = dummy_state(&harness).await;
     946            2 :         let now = Utc::now().naive_utc();
     947              : 
     948            2 :         let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
     949            2 :         let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
     950            2 : 
     951            2 :         state.wal_connection = None;
     952            2 :         state.wal_stream_candidates = HashMap::from([
     953            2 :             (NodeId(0), dummy_broker_sk_timeline(1, "", now)),
     954            2 :             (NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
     955            2 :             (NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
     956            2 :             (
     957            2 :                 NodeId(3),
     958            2 :                 dummy_broker_sk_timeline(
     959            2 :                     1 + state.conf.max_lsn_wal_lag.get(),
     960            2 :                     "delay_over_threshold",
     961            2 :                     delay_over_threshold,
     962            2 :                 ),
     963            2 :             ),
     964            2 :         ]);
     965            2 : 
     966            2 :         let no_candidate = state.next_connection_candidate();
     967            2 :         assert!(
     968            2 :             no_candidate.is_none(),
     969            0 :             "Expected no candidate selected out of non full data options, but got {no_candidate:?}"
     970              :         );
     971              : 
     972            2 :         Ok(())
     973              :     }
     974              : 
     975            2 :     #[tokio::test]
     976            2 :     async fn connection_no_candidate() -> anyhow::Result<()> {
     977            2 :         let harness = TenantHarness::create("connection_no_candidate")?;
     978           10 :         let mut state = dummy_state(&harness).await;
     979            2 :         let now = Utc::now().naive_utc();
     980            2 : 
     981            2 :         let connected_sk_id = NodeId(0);
     982            2 :         let current_lsn = 100_000;
     983            2 : 
     984            2 :         let connection_status = WalConnectionStatus {
     985            2 :             is_connected: true,
     986            2 :             has_processed_wal: true,
     987            2 :             latest_connection_update: now,
     988            2 :             latest_wal_update: now,
     989            2 :             commit_lsn: Some(Lsn(current_lsn)),
     990            2 :             streaming_lsn: Some(Lsn(current_lsn)),
     991            2 :             node: NodeId(1),
     992            2 :         };
     993            2 : 
     994            2 :         state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
     995            2 :         state.wal_connection = Some(WalConnection {
     996            2 :             started_at: now,
     997            2 :             sk_id: connected_sk_id,
     998            2 :             availability_zone: None,
     999            2 :             status: connection_status,
    1000            2 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
    1001            2 :                 sender
    1002            2 :                     .send(TaskStateUpdate::Progress(connection_status))
    1003            2 :                     .ok();
    1004            2 :                 Ok(())
    1005            2 :             }),
    1006            2 :             discovered_new_wal: None,
    1007            2 :         });
    1008            2 :         state.wal_stream_candidates = HashMap::from([
    1009            2 :             (
    1010            2 :                 connected_sk_id,
    1011            2 :                 dummy_broker_sk_timeline(
    1012            2 :                     current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
    1013            2 :                     DUMMY_SAFEKEEPER_HOST,
    1014            2 :                     now,
    1015            2 :                 ),
    1016            2 :             ),
    1017            2 :             (
    1018            2 :                 NodeId(1),
    1019            2 :                 dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
    1020            2 :             ),
    1021            2 :             (
    1022            2 :                 NodeId(2),
    1023            2 :                 dummy_broker_sk_timeline(
    1024            2 :                     current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
    1025            2 :                     "not_enough_advanced_lsn",
    1026            2 :                     now,
    1027            2 :                 ),
    1028            2 :             ),
    1029            2 :         ]);
    1030            2 : 
    1031            2 :         let no_candidate = state.next_connection_candidate();
    1032            2 :         assert!(
    1033            2 :             no_candidate.is_none(),
    1034            0 :             "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}"
    1035              :         );
    1036              : 
    1037            2 :         Ok(())
    1038              :     }
    1039              : 
    1040            2 :     #[tokio::test]
    1041            2 :     async fn no_connection_candidate() -> anyhow::Result<()> {
    1042            2 :         let harness = TenantHarness::create("no_connection_candidate")?;
    1043            9 :         let mut state = dummy_state(&harness).await;
    1044            2 :         let now = Utc::now().naive_utc();
    1045            2 : 
    1046            2 :         state.wal_connection = None;
    1047            2 :         state.wal_stream_candidates = HashMap::from([(
    1048            2 :             NodeId(0),
    1049            2 :             dummy_broker_sk_timeline(
    1050            2 :                 1 + state.conf.max_lsn_wal_lag.get(),
    1051            2 :                 DUMMY_SAFEKEEPER_HOST,
    1052            2 :                 now,
    1053            2 :             ),
    1054            2 :         )]);
    1055            2 : 
    1056            2 :         let only_candidate = state
    1057            2 :             .next_connection_candidate()
    1058            2 :             .expect("Expected one candidate selected out of the only data option, but got none");
    1059            2 :         assert_eq!(only_candidate.safekeeper_id, NodeId(0));
    1060            2 :         assert_eq!(
    1061              :             only_candidate.reason,
    1062              :             ReconnectReason::NoExistingConnection,
    1063            0 :             "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
    1064              :         );
    1065            2 :         assert_eq!(
    1066            2 :             only_candidate.wal_source_connconf.host(),
    1067            2 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1068            2 :         );
    1069              : 
    1070            2 :         let selected_lsn = 100_000;
    1071            2 :         state.wal_stream_candidates = HashMap::from([
    1072            2 :             (
    1073            2 :                 NodeId(0),
    1074            2 :                 dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
    1075            2 :             ),
    1076            2 :             (
    1077            2 :                 NodeId(1),
    1078            2 :                 dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
    1079            2 :             ),
    1080            2 :             (
    1081            2 :                 NodeId(2),
    1082            2 :                 dummy_broker_sk_timeline(selected_lsn + 100, "", now),
    1083            2 :             ),
    1084            2 :         ]);
    1085            2 :         let biggest_wal_candidate = state.next_connection_candidate().expect(
    1086            2 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1087            2 :         );
    1088            2 : 
    1089            2 :         assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1));
    1090            2 :         assert_eq!(
    1091              :             biggest_wal_candidate.reason,
    1092              :             ReconnectReason::NoExistingConnection,
    1093            0 :             "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
    1094              :         );
    1095            2 :         assert_eq!(
    1096            2 :             biggest_wal_candidate.wal_source_connconf.host(),
    1097            2 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1098            2 :         );
    1099              : 
    1100            2 :         Ok(())
    1101              :     }
    1102              : 
    1103            2 :     #[tokio::test]
    1104            2 :     async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
    1105            2 :         let harness = TenantHarness::create("candidate_with_many_connection_failures")?;
    1106            9 :         let mut state = dummy_state(&harness).await;
    1107            2 :         let now = Utc::now().naive_utc();
    1108            2 : 
    1109            2 :         let current_lsn = Lsn(100_000).align();
    1110            2 :         let bigger_lsn = Lsn(current_lsn.0 + 100).align();
    1111            2 : 
    1112            2 :         state.wal_connection = None;
    1113            2 :         state.wal_stream_candidates = HashMap::from([
    1114            2 :             (
    1115            2 :                 NodeId(0),
    1116            2 :                 dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1117            2 :             ),
    1118            2 :             (
    1119            2 :                 NodeId(1),
    1120            2 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1121            2 :             ),
    1122            2 :         ]);
    1123            2 :         state.wal_connection_retries = HashMap::from([(
    1124            2 :             NodeId(0),
    1125            2 :             RetryInfo {
    1126            2 :                 next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
    1127            2 :                 retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
    1128            2 :             },
    1129            2 :         )]);
    1130            2 : 
    1131            2 :         let candidate_with_less_errors = state
    1132            2 :             .next_connection_candidate()
    1133            2 :             .expect("Expected one candidate selected, but got none");
    1134            2 :         assert_eq!(
    1135              :             candidate_with_less_errors.safekeeper_id,
    1136              :             NodeId(1),
    1137            0 :             "Should select the node with no pending retry cooldown"
    1138              :         );
    1139              : 
    1140            2 :         Ok(())
    1141              :     }
    1142              : 
    1143            2 :     #[tokio::test]
    1144            2 :     async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
    1145            2 :         let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?;
    1146            9 :         let mut state = dummy_state(&harness).await;
    1147            2 :         let current_lsn = Lsn(100_000).align();
    1148            2 :         let now = Utc::now().naive_utc();
    1149            2 : 
    1150            2 :         let connected_sk_id = NodeId(0);
    1151            2 :         let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
    1152            2 : 
    1153            2 :         let connection_status = WalConnectionStatus {
    1154            2 :             is_connected: true,
    1155            2 :             has_processed_wal: true,
    1156            2 :             latest_connection_update: now,
    1157            2 :             latest_wal_update: now,
    1158            2 :             commit_lsn: Some(current_lsn),
    1159            2 :             streaming_lsn: Some(current_lsn),
    1160            2 :             node: connected_sk_id,
    1161            2 :         };
    1162            2 : 
    1163            2 :         state.wal_connection = Some(WalConnection {
    1164            2 :             started_at: now,
    1165            2 :             sk_id: connected_sk_id,
    1166            2 :             availability_zone: None,
    1167            2 :             status: connection_status,
    1168            2 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
    1169            2 :                 sender
    1170            2 :                     .send(TaskStateUpdate::Progress(connection_status))
    1171            2 :                     .ok();
    1172            2 :                 Ok(())
    1173            2 :             }),
    1174            2 :             discovered_new_wal: None,
    1175            2 :         });
    1176            2 :         state.wal_stream_candidates = HashMap::from([
    1177            2 :             (
    1178            2 :                 connected_sk_id,
    1179            2 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1180            2 :             ),
    1181            2 :             (
    1182            2 :                 NodeId(1),
    1183            2 :                 dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
    1184            2 :             ),
    1185            2 :         ]);
    1186            2 : 
    1187            2 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1188            2 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1189            2 :         );
    1190            2 : 
    1191            2 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1));
    1192            2 :         assert_eq!(
    1193            2 :             over_threshcurrent_candidate.reason,
    1194            2 :             ReconnectReason::LaggingWal {
    1195            2 :                 current_commit_lsn: current_lsn,
    1196            2 :                 new_commit_lsn: new_lsn,
    1197            2 :                 threshold: state.conf.max_lsn_wal_lag
    1198            2 :             },
    1199            0 :             "Should select bigger WAL safekeeper if it starts to lag enough"
    1200              :         );
    1201            2 :         assert_eq!(
    1202            2 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1203            2 :             &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
    1204            2 :         );
    1205              : 
    1206            2 :         Ok(())
    1207              :     }
    1208              : 
    1209            2 :     #[tokio::test]
    1210            2 :     async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
    1211            2 :         let harness = TenantHarness::create("timeout_connection_threshold_current_candidate")?;
    1212            9 :         let mut state = dummy_state(&harness).await;
    1213            2 :         let current_lsn = Lsn(100_000).align();
    1214            2 :         let now = Utc::now().naive_utc();
    1215              : 
    1216            2 :         let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
    1217            2 :         let time_over_threshold =
    1218            2 :             Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
    1219            2 : 
    1220            2 :         let connection_status = WalConnectionStatus {
    1221            2 :             is_connected: true,
    1222            2 :             has_processed_wal: true,
    1223            2 :             latest_connection_update: time_over_threshold,
    1224            2 :             latest_wal_update: time_over_threshold,
    1225            2 :             commit_lsn: Some(current_lsn),
    1226            2 :             streaming_lsn: Some(current_lsn),
    1227            2 :             node: NodeId(1),
    1228            2 :         };
    1229            2 : 
    1230            2 :         state.wal_connection = Some(WalConnection {
    1231            2 :             started_at: now,
    1232            2 :             sk_id: NodeId(1),
    1233            2 :             availability_zone: None,
    1234            2 :             status: connection_status,
    1235            2 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
    1236            2 :                 sender
    1237            2 :                     .send(TaskStateUpdate::Progress(connection_status))
    1238            2 :                     .ok();
    1239            2 :                 Ok(())
    1240            2 :             }),
    1241            2 :             discovered_new_wal: None,
    1242            2 :         });
    1243            2 :         state.wal_stream_candidates = HashMap::from([(
    1244            2 :             NodeId(0),
    1245            2 :             dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1246            2 :         )]);
    1247            2 : 
    1248            2 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1249            2 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1250            2 :         );
    1251            2 : 
    1252            2 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
    1253            2 :         match over_threshcurrent_candidate.reason {
    1254              :             ReconnectReason::NoKeepAlives {
    1255            2 :                 last_keep_alive,
    1256            2 :                 threshold,
    1257            2 :                 ..
    1258            2 :             } => {
    1259            2 :                 assert_eq!(last_keep_alive, Some(time_over_threshold));
    1260            2 :                 assert_eq!(threshold, state.conf.lagging_wal_timeout);
    1261              :             }
    1262            0 :             unexpected => panic!("Unexpected reason: {unexpected:?}"),
    1263              :         }
    1264            2 :         assert_eq!(
    1265            2 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1266            2 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1267            2 :         );
    1268              : 
    1269            2 :         Ok(())
    1270              :     }
    1271              : 
    1272            2 :     #[tokio::test]
    1273            2 :     async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
    1274            2 :         let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate")?;
    1275            9 :         let mut state = dummy_state(&harness).await;
    1276            2 :         let current_lsn = Lsn(100_000).align();
    1277            2 :         let new_lsn = Lsn(100_100).align();
    1278            2 :         let now = Utc::now().naive_utc();
    1279              : 
    1280            2 :         let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
    1281            2 :         let time_over_threshold =
    1282            2 :             Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
    1283            2 : 
    1284            2 :         let connection_status = WalConnectionStatus {
    1285            2 :             is_connected: true,
    1286            2 :             has_processed_wal: true,
    1287            2 :             latest_connection_update: now,
    1288            2 :             latest_wal_update: time_over_threshold,
    1289            2 :             commit_lsn: Some(current_lsn),
    1290            2 :             streaming_lsn: Some(current_lsn),
    1291            2 :             node: NodeId(1),
    1292            2 :         };
    1293            2 : 
    1294            2 :         state.wal_connection = Some(WalConnection {
    1295            2 :             started_at: now,
    1296            2 :             sk_id: NodeId(1),
    1297            2 :             availability_zone: None,
    1298            2 :             status: connection_status,
    1299            2 :             connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
    1300            2 :             discovered_new_wal: Some(NewCommittedWAL {
    1301            2 :                 discovered_at: time_over_threshold,
    1302            2 :                 lsn: new_lsn,
    1303            2 :             }),
    1304            2 :         });
    1305            2 :         state.wal_stream_candidates = HashMap::from([(
    1306            2 :             NodeId(0),
    1307            2 :             dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1308            2 :         )]);
    1309            2 : 
    1310            2 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1311            2 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1312            2 :         );
    1313            2 : 
    1314            2 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
    1315            2 :         match over_threshcurrent_candidate.reason {
    1316              :             ReconnectReason::NoWalTimeout {
    1317            2 :                 current_lsn,
    1318            2 :                 current_commit_lsn,
    1319            2 :                 candidate_commit_lsn,
    1320            2 :                 last_wal_interaction,
    1321            2 :                 threshold,
    1322            2 :                 ..
    1323            2 :             } => {
    1324            2 :                 assert_eq!(current_lsn, current_lsn);
    1325            2 :                 assert_eq!(current_commit_lsn, current_lsn);
    1326            2 :                 assert_eq!(candidate_commit_lsn, new_lsn);
    1327            2 :                 assert_eq!(last_wal_interaction, Some(time_over_threshold));
    1328            2 :                 assert_eq!(threshold, state.conf.lagging_wal_timeout);
    1329              :             }
    1330            0 :             unexpected => panic!("Unexpected reason: {unexpected:?}"),
    1331              :         }
    1332            2 :         assert_eq!(
    1333            2 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1334            2 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1335            2 :         );
    1336              : 
    1337            2 :         Ok(())
    1338              :     }
    1339              : 
    1340              :     const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
    1341              : 
    1342           16 :     async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
    1343           16 :         let (tenant, ctx) = harness.load().await;
    1344           16 :         let timeline = tenant
    1345           16 :             .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
    1346           55 :             .await
    1347           16 :             .expect("Failed to create an empty timeline for dummy wal connection manager");
    1348           16 : 
    1349           16 :         ConnectionManagerState {
    1350           16 :             id: TenantTimelineId {
    1351           16 :                 tenant_id: harness.tenant_shard_id.tenant_id,
    1352           16 :                 timeline_id: TIMELINE_ID,
    1353           16 :             },
    1354           16 :             timeline,
    1355           16 :             conf: WalReceiverConf {
    1356           16 :                 wal_connect_timeout: Duration::from_secs(1),
    1357           16 :                 lagging_wal_timeout: Duration::from_secs(1),
    1358           16 :                 max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
    1359           16 :                 auth_token: None,
    1360           16 :                 availability_zone: None,
    1361           16 :                 ingest_batch_size: 1,
    1362           16 :             },
    1363           16 :             wal_connection: None,
    1364           16 :             wal_stream_candidates: HashMap::new(),
    1365           16 :             wal_connection_retries: HashMap::new(),
    1366           16 :         }
    1367           16 :     }
    1368              : 
    1369            2 :     #[tokio::test]
    1370            2 :     async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
    1371            2 :         // Pageserver and one of safekeepers will be in the same availability zone
    1372            2 :         // and pageserver should prefer to connect to it.
    1373            2 :         let test_az = Some("test_az".to_owned());
    1374              : 
    1375            2 :         let harness = TenantHarness::create("switch_to_same_availability_zone")?;
    1376            8 :         let mut state = dummy_state(&harness).await;
    1377            2 :         state.conf.availability_zone = test_az.clone();
    1378            2 :         let current_lsn = Lsn(100_000).align();
    1379            2 :         let now = Utc::now().naive_utc();
    1380            2 : 
    1381            2 :         let connected_sk_id = NodeId(0);
    1382            2 : 
    1383            2 :         let connection_status = WalConnectionStatus {
    1384            2 :             is_connected: true,
    1385            2 :             has_processed_wal: true,
    1386            2 :             latest_connection_update: now,
    1387            2 :             latest_wal_update: now,
    1388            2 :             commit_lsn: Some(current_lsn),
    1389            2 :             streaming_lsn: Some(current_lsn),
    1390            2 :             node: connected_sk_id,
    1391            2 :         };
    1392            2 : 
    1393            2 :         state.wal_connection = Some(WalConnection {
    1394            2 :             started_at: now,
    1395            2 :             sk_id: connected_sk_id,
    1396            2 :             availability_zone: None,
    1397            2 :             status: connection_status,
    1398            2 :             connection_task: TaskHandle::spawn(move |sender, _| async move {
    1399            2 :                 sender
    1400            2 :                     .send(TaskStateUpdate::Progress(connection_status))
    1401            2 :                     .ok();
    1402            2 :                 Ok(())
    1403            2 :             }),
    1404            2 :             discovered_new_wal: None,
    1405            2 :         });
    1406            2 : 
    1407            2 :         // We have another safekeeper with the same commit_lsn, and it have the same availability zone as
    1408            2 :         // the current pageserver.
    1409            2 :         let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
    1410            2 :         same_az_sk.timeline.availability_zone = test_az.clone();
    1411            2 : 
    1412            2 :         state.wal_stream_candidates = HashMap::from([
    1413            2 :             (
    1414            2 :                 connected_sk_id,
    1415            2 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1416            2 :             ),
    1417            2 :             (NodeId(1), same_az_sk),
    1418            2 :         ]);
    1419            2 : 
    1420            2 :         // We expect that pageserver will switch to the safekeeper in the same availability zone,
    1421            2 :         // even if it has the same commit_lsn.
    1422            2 :         let next_candidate = state.next_connection_candidate().expect(
    1423            2 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1424            2 :         );
    1425            2 : 
    1426            2 :         assert_eq!(next_candidate.safekeeper_id, NodeId(1));
    1427            2 :         assert_eq!(
    1428              :             next_candidate.reason,
    1429              :             ReconnectReason::SwitchAvailabilityZone,
    1430            0 :             "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
    1431              :         );
    1432            2 :         assert_eq!(
    1433            2 :             next_candidate.wal_source_connconf.host(),
    1434            2 :             &Host::Domain("same_az".to_owned())
    1435            2 :         );
    1436              : 
    1437            2 :         Ok(())
    1438              :     }
    1439              : }
        

Generated by: LCOV version 2.1-beta