LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline/walreceiver - connection_manager.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 55.6 % 1060 589
Test Date: 2025-07-16 12:29:03 Functions: 63.9 % 72 46

            Line data    Source code
       1              : //! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
       2              : //! that contains the latest WAL to stream and this connection does not go stale.
       3              : //!
       4              : //! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
       5              : //! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
       6              : //! Current connection state is tracked too, to ensure it's not getting stale.
       7              : //!
       8              : //! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
       9              : //! then a (re)connection happens, if necessary.
      10              : //! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
      11              : 
      12              : use std::collections::HashMap;
      13              : use std::num::NonZeroU64;
      14              : use std::ops::ControlFlow;
      15              : use std::sync::Arc;
      16              : use std::time::Duration;
      17              : 
      18              : use anyhow::Context;
      19              : use chrono::{NaiveDateTime, Utc};
      20              : use pageserver_api::models::TimelineState;
      21              : use postgres_connection::PgConnectionConfig;
      22              : use storage_broker::proto::{
      23              :     FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
      24              :     SubscribeByFilterRequest, TenantTimelineId as ProtoTenantTimelineId, TypeSubscription,
      25              :     TypedMessage,
      26              : };
      27              : use storage_broker::{BrokerClientChannel, Code, Streaming};
      28              : use tokio_util::sync::CancellationToken;
      29              : use tracing::*;
      30              : use utils::backoff::{
      31              :     DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
      32              : };
      33              : use utils::id::{NodeId, TenantTimelineId};
      34              : use utils::lsn::Lsn;
      35              : use utils::postgres_client::{ConnectionConfigArgs, wal_stream_connection_config};
      36              : 
      37              : use super::walreceiver_connection::{WalConnectionStatus, WalReceiverError};
      38              : use super::{TaskEvent, TaskHandle, TaskStateUpdate, WalReceiverConf};
      39              : use crate::context::{DownloadBehavior, RequestContext};
      40              : use crate::metrics::{
      41              :     WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
      42              :     WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
      43              : };
      44              : use crate::task_mgr::TaskKind;
      45              : use crate::tenant::{Timeline, debug_assert_current_span_has_tenant_and_timeline_id};
      46              : 
      47              : pub(crate) struct Cancelled;
      48              : 
      49              : /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
      50              : /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
      51              : /// If storage broker subscription is cancelled, exits.
      52              : ///
      53              : /// # Cancel-Safety
      54              : ///
      55              : /// Not cancellation-safe. Use `cancel` token to request cancellation.
      56            0 : pub(super) async fn connection_manager_loop_step(
      57            0 :     broker_client: &mut BrokerClientChannel,
      58            0 :     connection_manager_state: &mut ConnectionManagerState,
      59            0 :     ctx: &RequestContext,
      60            0 :     cancel: &CancellationToken,
      61            0 :     manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
      62            0 : ) -> Result<(), Cancelled> {
      63            0 :     match tokio::select! {
      64            0 :         _ = cancel.cancelled() => { return Err(Cancelled); },
      65            0 :         st = connection_manager_state.timeline.wait_to_become_active(ctx) => { st }
      66              :     } {
      67            0 :         Ok(()) => {}
      68            0 :         Err(new_state) => {
      69            0 :             debug!(
      70              :                 ?new_state,
      71            0 :                 "state changed, stopping wal connection manager loop"
      72              :             );
      73            0 :             return Err(Cancelled);
      74              :         }
      75              :     }
      76              : 
      77            0 :     WALRECEIVER_ACTIVE_MANAGERS.inc();
      78            0 :     scopeguard::defer! {
      79              :         WALRECEIVER_ACTIVE_MANAGERS.dec();
      80              :     }
      81              : 
      82            0 :     let id = TenantTimelineId {
      83            0 :         tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
      84            0 :         timeline_id: connection_manager_state.timeline.timeline_id,
      85            0 :     };
      86              : 
      87            0 :     let mut timeline_state_updates = connection_manager_state
      88            0 :         .timeline
      89            0 :         .subscribe_for_state_updates();
      90              : 
      91            0 :     let mut wait_lsn_status = connection_manager_state
      92            0 :         .timeline
      93            0 :         .subscribe_for_wait_lsn_updates();
      94              : 
      95              :     // TODO: create a separate config option for discovery request interval
      96            0 :     let discovery_request_interval = connection_manager_state.conf.lagging_wal_timeout;
      97            0 :     let mut last_discovery_ts: Option<std::time::Instant> = None;
      98              : 
      99              :     // Subscribe to the broker updates. Stream shares underlying TCP connection
     100              :     // with other streams on this client (other connection managers). When
     101              :     // object goes out of scope, stream finishes in drop() automatically.
     102            0 :     let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
     103            0 :     let mut broker_reset_interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
     104            0 :     debug!("Subscribed for broker timeline updates");
     105              : 
     106              :     loop {
     107            0 :         let time_until_next_retry = connection_manager_state.time_until_next_retry();
     108            0 :         let any_activity = connection_manager_state.wal_connection.is_some()
     109            0 :             || !connection_manager_state.wal_stream_candidates.is_empty();
     110              : 
     111              :         // These things are happening concurrently:
     112              :         //
     113              :         //  - cancellation request
     114              :         //  - keep receiving WAL on the current connection
     115              :         //      - if the shared state says we need to change connection, disconnect and return
     116              :         //      - this runs in a separate task and we receive updates via a watch channel
     117              :         //  - change connection if the rules decide so, or if the current connection dies
     118              :         //  - receive updates from broker
     119              :         //      - this might change the current desired connection
     120              :         //  - timeline state changes to something that does not allow walreceiver to run concurrently
     121              :         //  - if there's no connection and no candidates, try to send a discovery request
     122              : 
     123              :         // NB: make sure each of the select expressions are cancellation-safe
     124              :         // (no need for arms to be cancellation-safe).
     125            0 :         tokio::select! {
     126            0 :             _ = cancel.cancelled() => { return Err(Cancelled); }
     127            0 :             Some(wal_connection_update) = async {
     128            0 :                 match connection_manager_state.wal_connection.as_mut() {
     129            0 :                     Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
     130            0 :                     None => None,
     131              :                 }
     132            0 :             } => {
     133            0 :                 let wal_connection = connection_manager_state.wal_connection.as_mut()
     134            0 :                     .expect("Should have a connection, as checked by the corresponding select! guard");
     135            0 :                 match wal_connection_update {
     136            0 :                     TaskEvent::Update(TaskStateUpdate::Started) => {},
     137            0 :                     TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
     138            0 :                         if new_status.has_processed_wal {
     139            0 :                             // We have advanced last_record_lsn by processing the WAL received
     140            0 :                             // from this safekeeper. This is good enough to clean unsuccessful
     141            0 :                             // retries history and allow reconnecting to this safekeeper without
     142            0 :                             // sleeping for a long time.
     143            0 :                             connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
     144            0 :                         }
     145            0 :                         wal_connection.status = new_status;
     146              :                     }
     147            0 :                     TaskEvent::End(walreceiver_task_result) => {
     148            0 :                         match walreceiver_task_result {
     149            0 :                             Ok(()) => debug!("WAL receiving task finished"),
     150            0 :                             Err(e) => error!("wal receiver task finished with an error: {e:?}"),
     151              :                         }
     152            0 :                         connection_manager_state.drop_old_connection(false).await;
     153              :                     },
     154              :                 }
     155              :             },
     156              : 
     157              :             // Got a new update from the broker
     158            0 :             broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
     159            0 :                 match broker_update {
     160            0 :                     Ok(Some(broker_update)) => {
     161            0 :                         broker_reset_interval.reset();
     162            0 :                         connection_manager_state.register_timeline_update(broker_update);
     163            0 :                     },
     164            0 :                     Err(status) => {
     165            0 :                         match status.code() {
     166            0 :                             Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
     167              :                                 // tonic's error handling doesn't provide a clear code for disconnections: we get
     168              :                                 // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
     169              :                                 // => https://github.com/neondatabase/neon/issues/9562
     170            0 :                                 info!("broker disconnected: {status}");
     171              :                             },
     172              :                             _ => {
     173            0 :                                 warn!("broker subscription failed: {status}");
     174              :                             }
     175              :                         }
     176            0 :                         return Ok(());
     177              :                     }
     178              :                     Ok(None) => {
     179            0 :                         error!("broker subscription stream ended"); // can't happen
     180            0 :                         return Ok(());
     181              :                     }
     182              :                 }
     183              :             },
     184              : 
     185              :             // If we've not received any updates from the broker from a while, are waiting for WAL
     186              :             // and have no safekeeper connection or connection candidates, then it might be that
     187              :             // the broker subscription is wedged. Drop the currrent subscription and re-subscribe
     188              :             // with the goal of unblocking it.
     189            0 :             _ = broker_reset_interval.tick() => {
     190            0 :                 let awaiting_lsn = wait_lsn_status.borrow().is_some();
     191            0 :                 let no_candidates = connection_manager_state.wal_stream_candidates.is_empty();
     192            0 :                 let no_connection = connection_manager_state.wal_connection.is_none();
     193              : 
     194            0 :                 if awaiting_lsn && no_candidates && no_connection {
     195            0 :                     tracing::warn!("No broker updates received for a while, but waiting for WAL. Re-setting stream ...");
     196            0 :                     broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
     197            0 :                 }
     198              :             },
     199              : 
     200            0 :             new_event = async {
     201              :                 // Reminder: this match arm needs to be cancellation-safe.
     202              :                 loop {
     203            0 :                     if connection_manager_state.timeline.current_state() == TimelineState::Loading {
     204            0 :                         warn!("wal connection manager should only be launched after timeline has become active");
     205            0 :                     }
     206            0 :                     match timeline_state_updates.changed().await {
     207              :                         Ok(()) => {
     208            0 :                             let new_state = connection_manager_state.timeline.current_state();
     209            0 :                             match new_state {
     210              :                                 // we're already active as walreceiver, no need to reactivate
     211            0 :                                 TimelineState::Active => continue,
     212              :                                 TimelineState::Broken { .. } | TimelineState::Stopping => {
     213            0 :                                     debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
     214            0 :                                     return ControlFlow::Break(());
     215              :                                 }
     216              :                                 TimelineState::Loading => {
     217            0 :                                     warn!("timeline transitioned back to Loading state, that should not happen");
     218            0 :                                     return ControlFlow::Continue(());
     219              :                                 }
     220              :                             }
     221              :                         }
     222            0 :                         Err(_sender_dropped_error) => return ControlFlow::Break(()),
     223              :                     }
     224              :                 }
     225            0 :             } => match new_event {
     226              :                 ControlFlow::Continue(()) => {
     227            0 :                     return Ok(());
     228              :                 }
     229              :                 ControlFlow::Break(()) => {
     230            0 :                     debug!("Timeline is no longer active, stopping wal connection manager loop");
     231            0 :                     return Err(Cancelled);
     232              :                 }
     233              :             },
     234              : 
     235            0 :             Some(()) = async {
     236            0 :                 match time_until_next_retry {
     237            0 :                     Some(sleep_time) => {
     238            0 :                         tokio::time::sleep(sleep_time).await;
     239            0 :                         Some(())
     240              :                     },
     241              :                     None => {
     242            0 :                         debug!("No candidates to retry, waiting indefinitely for the broker events");
     243            0 :                         None
     244              :                     }
     245              :                 }
     246            0 :             } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
     247              : 
     248            0 :             Some(()) = async {
     249              :                 // Reminder: this match arm needs to be cancellation-safe.
     250              :                 // Calculating time needed to wait until sending the next discovery request.
     251              :                 // Current implementation is conservative and sends discovery requests only when there are no candidates.
     252              : 
     253            0 :                 if any_activity {
     254              :                     // No need to send discovery requests if there is an active connection or candidates.
     255            0 :                     return None;
     256            0 :                 }
     257              : 
     258              :                 // Waiting for an active wait_lsn request.
     259            0 :                 while wait_lsn_status.borrow().is_none() {
     260            0 :                     if wait_lsn_status.changed().await.is_err() {
     261              :                         // wait_lsn_status channel was closed, exiting
     262            0 :                         warn!("wait_lsn_status channel was closed in connection_manager_loop_step");
     263            0 :                         return None;
     264            0 :                     }
     265              :                 }
     266              : 
     267              :                 // All preconditions met, preparing to send a discovery request.
     268            0 :                 let now = std::time::Instant::now();
     269            0 :                 let next_discovery_ts = last_discovery_ts
     270            0 :                     .map(|ts| ts + discovery_request_interval)
     271            0 :                     .unwrap_or_else(|| now);
     272              : 
     273            0 :                 if next_discovery_ts > now {
     274              :                     // Prevent sending discovery requests too frequently.
     275            0 :                     tokio::time::sleep(next_discovery_ts - now).await;
     276            0 :                 }
     277              : 
     278            0 :                 let tenant_timeline_id = Some(ProtoTenantTimelineId {
     279            0 :                     tenant_id: id.tenant_id.as_ref().to_owned(),
     280            0 :                     timeline_id: id.timeline_id.as_ref().to_owned(),
     281            0 :                 });
     282            0 :                 let request = SafekeeperDiscoveryRequest { tenant_timeline_id };
     283            0 :                 let msg = TypedMessage {
     284            0 :                     r#type: MessageType::SafekeeperDiscoveryRequest as i32,
     285            0 :                     safekeeper_timeline_info: None,
     286            0 :                     safekeeper_discovery_request: Some(request),
     287            0 :                     safekeeper_discovery_response: None,
     288            0 :                     };
     289              : 
     290            0 :                 last_discovery_ts = Some(std::time::Instant::now());
     291            0 :                 info!("No active connection and no candidates, sending discovery request to the broker");
     292              : 
     293              :                 // Cancellation safety: we want to send a message to the broker, but publish_one()
     294              :                 // function can get cancelled by the other select! arm. This is absolutely fine, because
     295              :                 // we just want to receive broker updates and discovery is not important if we already
     296              :                 // receive updates.
     297              :                 //
     298              :                 // It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
     299              :                 // This is totally fine because of the reason above.
     300              : 
     301              :                 // This is a fire-and-forget request, we don't care about the response
     302            0 :                 let _ = broker_client.publish_one(msg).await;
     303            0 :                 debug!("Discovery request sent to the broker");
     304            0 :                 None
     305            0 :             } => {}
     306              :         }
     307              : 
     308            0 :         if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
     309            0 :             info!("Switching to new connection candidate: {new_candidate:?}");
     310            0 :             connection_manager_state
     311            0 :                 .change_connection(new_candidate, ctx)
     312            0 :                 .await
     313            0 :         }
     314            0 :         *manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
     315              :     }
     316            0 : }
     317              : 
     318              : /// Endlessly try to subscribe for broker updates for a given timeline.
     319            0 : async fn subscribe_for_timeline_updates(
     320            0 :     broker_client: &mut BrokerClientChannel,
     321            0 :     id: TenantTimelineId,
     322            0 :     cancel: &CancellationToken,
     323            0 : ) -> Result<Streaming<TypedMessage>, Cancelled> {
     324            0 :     let mut attempt = 0;
     325              :     loop {
     326            0 :         exponential_backoff(
     327            0 :             attempt,
     328            0 :             DEFAULT_BASE_BACKOFF_SECONDS,
     329            0 :             DEFAULT_MAX_BACKOFF_SECONDS,
     330            0 :             cancel,
     331            0 :         )
     332            0 :         .await;
     333            0 :         attempt += 1;
     334              : 
     335              :         // subscribe to the specific timeline
     336            0 :         let request = SubscribeByFilterRequest {
     337            0 :             types: vec![
     338            0 :                 TypeSubscription {
     339            0 :                     r#type: MessageType::SafekeeperTimelineInfo as i32,
     340            0 :                 },
     341            0 :                 TypeSubscription {
     342            0 :                     r#type: MessageType::SafekeeperDiscoveryResponse as i32,
     343            0 :                 },
     344            0 :             ],
     345            0 :             tenant_timeline_id: Some(FilterTenantTimelineId {
     346            0 :                 enabled: true,
     347            0 :                 tenant_timeline_id: Some(ProtoTenantTimelineId {
     348            0 :                     tenant_id: id.tenant_id.as_ref().to_owned(),
     349            0 :                     timeline_id: id.timeline_id.as_ref().to_owned(),
     350            0 :                 }),
     351            0 :             }),
     352            0 :         };
     353              : 
     354              :         match {
     355            0 :             tokio::select! {
     356            0 :                 r = broker_client.subscribe_by_filter(request) => { r }
     357            0 :                 _ = cancel.cancelled() => { return Err(Cancelled); }
     358              :             }
     359              :         } {
     360            0 :             Ok(resp) => {
     361            0 :                 return Ok(resp.into_inner());
     362              :             }
     363            0 :             Err(e) => {
     364              :                 // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
     365              :                 // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
     366            0 :                 info!(
     367            0 :                     "Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}"
     368              :                 );
     369            0 :                 continue;
     370              :             }
     371              :         }
     372              :     }
     373            0 : }
     374              : 
     375              : const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
     376              : const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
     377              : const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
     378              : 
     379              : /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
     380              : pub(super) struct ConnectionManagerState {
     381              :     id: TenantTimelineId,
     382              :     /// Use pageserver data about the timeline to filter out some of the safekeepers.
     383              :     timeline: Arc<Timeline>,
     384              :     /// Child token of [`super::WalReceiver::cancel`], inherited to all tasks we spawn.
     385              :     cancel: CancellationToken,
     386              :     conf: WalReceiverConf,
     387              :     /// Current connection to safekeeper for WAL streaming.
     388              :     wal_connection: Option<WalConnection>,
     389              :     /// Info about retries and unsuccessful attempts to connect to safekeepers.
     390              :     wal_connection_retries: HashMap<NodeId, RetryInfo>,
     391              :     /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
     392              :     wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
     393              : }
     394              : 
     395              : /// An information about connection manager's current connection and connection candidates.
     396              : #[derive(Debug, Clone)]
     397              : pub struct ConnectionManagerStatus {
     398              :     existing_connection: Option<WalConnectionStatus>,
     399              :     wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
     400              : }
     401              : 
     402              : impl ConnectionManagerStatus {
     403              :     /// Generates a string, describing current connection status in a form, suitable for logging.
     404            0 :     pub fn to_human_readable_string(&self) -> String {
     405            0 :         let mut resulting_string = String::new();
     406            0 :         match &self.existing_connection {
     407            0 :             Some(connection) => {
     408            0 :                 if connection.has_processed_wal {
     409            0 :                     resulting_string.push_str(&format!(
     410            0 :                         " (update {}): streaming WAL from node {}, ",
     411            0 :                         connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
     412            0 :                         connection.node,
     413            0 :                     ));
     414              : 
     415            0 :                     match (connection.streaming_lsn, connection.commit_lsn) {
     416            0 :                         (None, None) => resulting_string.push_str("no streaming data"),
     417            0 :                         (None, Some(commit_lsn)) => {
     418            0 :                             resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
     419              :                         }
     420            0 :                         (Some(streaming_lsn), None) => {
     421            0 :                             resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
     422              :                         }
     423            0 :                         (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
     424            0 :                             &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
     425              :                         ),
     426              :                     }
     427            0 :                 } else if connection.is_connected {
     428            0 :                     resulting_string.push_str(&format!(
     429            0 :                         " (update {}): connecting to node {}",
     430            0 :                         connection
     431            0 :                             .latest_connection_update
     432            0 :                             .format("%Y-%m-%d %H:%M:%S"),
     433            0 :                         connection.node,
     434            0 :                     ));
     435            0 :                 } else {
     436            0 :                     resulting_string.push_str(&format!(
     437            0 :                         " (update {}): initializing node {} connection",
     438            0 :                         connection
     439            0 :                             .latest_connection_update
     440            0 :                             .format("%Y-%m-%d %H:%M:%S"),
     441            0 :                         connection.node,
     442            0 :                     ));
     443            0 :                 }
     444              :             }
     445            0 :             None => resulting_string.push_str(": disconnected"),
     446              :         }
     447              : 
     448            0 :         resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
     449            0 :         let mut candidates = self.wal_stream_candidates.iter().peekable();
     450            0 :         while let Some((node_id, candidate_info)) = candidates.next() {
     451            0 :             resulting_string.push_str(&format!(
     452            0 :                 "({}|{}|{})",
     453            0 :                 node_id,
     454            0 :                 candidate_info.latest_update.format("%H:%M:%S"),
     455            0 :                 Lsn(candidate_info.timeline.commit_lsn)
     456            0 :             ));
     457            0 :             if candidates.peek().is_some() {
     458            0 :                 resulting_string.push_str(", ");
     459            0 :             }
     460              :         }
     461            0 :         resulting_string.push(']');
     462              : 
     463            0 :         resulting_string
     464            0 :     }
     465              : }
     466              : 
     467              : /// Current connection data.
     468              : #[derive(Debug)]
     469              : struct WalConnection {
     470              :     /// Time when the connection was initiated.
     471              :     started_at: NaiveDateTime,
     472              :     /// Current safekeeper pageserver is connected to for WAL streaming.
     473              :     sk_id: NodeId,
     474              :     /// Availability zone of the safekeeper.
     475              :     availability_zone: Option<String>,
     476              :     /// Status of the connection.
     477              :     status: WalConnectionStatus,
     478              :     /// WAL streaming task handle.
     479              :     connection_task: TaskHandle<WalConnectionStatus>,
     480              :     /// Have we discovered that other safekeeper has more recent WAL than we do?
     481              :     discovered_new_wal: Option<NewCommittedWAL>,
     482              : }
     483              : 
     484              : /// Notion of a new committed WAL, which exists on other safekeeper.
     485              : #[derive(Debug, Clone, Copy)]
     486              : struct NewCommittedWAL {
     487              :     /// LSN of the new committed WAL.
     488              :     lsn: Lsn,
     489              :     /// When we discovered that the new committed WAL exists on other safekeeper.
     490              :     discovered_at: NaiveDateTime,
     491              : }
     492              : 
     493              : #[derive(Debug, Clone, Copy)]
     494              : struct RetryInfo {
     495              :     next_retry_at: Option<NaiveDateTime>,
     496              :     retry_duration_seconds: f64,
     497              : }
     498              : 
     499              : /// Data about the timeline to connect to, received from the broker.
     500              : #[derive(Debug, Clone)]
     501              : struct BrokerSkTimeline {
     502              :     timeline: SafekeeperDiscoveryResponse,
     503              :     /// Time at which the data was fetched from the broker last time, to track the stale data.
     504              :     latest_update: NaiveDateTime,
     505              : }
     506              : 
     507              : impl ConnectionManagerState {
     508            0 :     pub(super) fn new(
     509            0 :         timeline: Arc<Timeline>,
     510            0 :         conf: WalReceiverConf,
     511            0 :         cancel: CancellationToken,
     512            0 :     ) -> Self {
     513            0 :         let id = TenantTimelineId {
     514            0 :             tenant_id: timeline.tenant_shard_id.tenant_id,
     515            0 :             timeline_id: timeline.timeline_id,
     516            0 :         };
     517            0 :         Self {
     518            0 :             id,
     519            0 :             timeline,
     520            0 :             cancel,
     521            0 :             conf,
     522            0 :             wal_connection: None,
     523            0 :             wal_stream_candidates: HashMap::new(),
     524            0 :             wal_connection_retries: HashMap::new(),
     525            0 :         }
     526            0 :     }
     527              : 
     528            5 :     fn spawn<Fut>(
     529            5 :         &self,
     530            5 :         task: impl FnOnce(
     531            5 :             tokio::sync::watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
     532            5 :             CancellationToken,
     533            5 :         ) -> Fut
     534            5 :         + Send
     535            5 :         + 'static,
     536            5 :     ) -> TaskHandle<WalConnectionStatus>
     537            5 :     where
     538            5 :         Fut: std::future::Future<Output = anyhow::Result<()>> + Send,
     539              :     {
     540              :         // TODO: get rid of TaskHandle
     541            5 :         super::TaskHandle::spawn(&self.cancel, task)
     542            5 :     }
     543              : 
     544              :     /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
     545            0 :     async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
     546            0 :         WALRECEIVER_SWITCHES
     547            0 :             .with_label_values(&[new_sk.reason.name()])
     548            0 :             .inc();
     549              : 
     550            0 :         self.drop_old_connection(true).await;
     551              : 
     552            0 :         let node_id = new_sk.safekeeper_id;
     553            0 :         let connect_timeout = self.conf.wal_connect_timeout;
     554            0 :         let ingest_batch_size = self.conf.ingest_batch_size;
     555            0 :         let protocol = self.conf.protocol;
     556            0 :         let validate_wal_contiguity = self.conf.validate_wal_contiguity;
     557            0 :         let timeline = Arc::clone(&self.timeline);
     558            0 :         let ctx = ctx.detached_child(
     559            0 :             TaskKind::WalReceiverConnectionHandler,
     560            0 :             DownloadBehavior::Download,
     561              :         );
     562              : 
     563            0 :         let span = info_span!("connection", %node_id);
     564            0 :         let connection_handle = self.spawn(move |events_sender, cancellation| {
     565            0 :             async move {
     566            0 :                 debug_assert_current_span_has_tenant_and_timeline_id();
     567              : 
     568            0 :                 let res = super::walreceiver_connection::handle_walreceiver_connection(
     569            0 :                     timeline,
     570            0 :                     protocol,
     571            0 :                     new_sk.wal_source_connconf,
     572            0 :                     events_sender,
     573            0 :                     cancellation.clone(),
     574            0 :                     connect_timeout,
     575            0 :                     ctx,
     576            0 :                     node_id,
     577            0 :                     ingest_batch_size,
     578            0 :                     validate_wal_contiguity,
     579            0 :                 )
     580            0 :                 .await;
     581              : 
     582            0 :                 match res {
     583            0 :                     Ok(()) => Ok(()),
     584            0 :                     Err(e) => {
     585            0 :                         match e {
     586            0 :                             WalReceiverError::SuccessfulCompletion(msg) => {
     587            0 :                                 info!("walreceiver connection handling ended with success: {msg}");
     588            0 :                                 Ok(())
     589              :                             }
     590            0 :                             WalReceiverError::ExpectedSafekeeperError(e) => {
     591            0 :                                 info!("walreceiver connection handling ended: {e}");
     592            0 :                                 Ok(())
     593              :                             }
     594              :                             WalReceiverError::ClosedGate => {
     595            0 :                                 info!(
     596            0 :                                     "walreceiver connection handling ended because of closed gate"
     597              :                                 );
     598            0 :                                 Ok(())
     599              :                             }
     600            0 :                             WalReceiverError::Cancelled => Ok(()),
     601            0 :                             WalReceiverError::Other(e) => {
     602              :                                 // give out an error to have task_mgr give it a really verbose logging
     603            0 :                                 if cancellation.is_cancelled() {
     604              :                                     // Ideally we would learn about this via some path other than Other, but
     605              :                                     // that requires refactoring all the intermediate layers of ingest code
     606              :                                     // that only emit anyhow::Error
     607            0 :                                     Ok(())
     608              :                                 } else {
     609            0 :                                     Err(e).context("walreceiver connection handling failure")
     610              :                                 }
     611              :                             }
     612              :                         }
     613              :                     }
     614              :                 }
     615            0 :             }
     616            0 :             .instrument(span)
     617            0 :         });
     618              : 
     619            0 :         let now = Utc::now().naive_utc();
     620            0 :         self.wal_connection = Some(WalConnection {
     621            0 :             started_at: now,
     622            0 :             sk_id: new_sk.safekeeper_id,
     623            0 :             availability_zone: new_sk.availability_zone,
     624            0 :             status: WalConnectionStatus {
     625            0 :                 is_connected: false,
     626            0 :                 has_processed_wal: false,
     627            0 :                 latest_connection_update: now,
     628            0 :                 latest_wal_update: now,
     629            0 :                 streaming_lsn: None,
     630            0 :                 commit_lsn: None,
     631            0 :                 node: node_id,
     632            0 :             },
     633            0 :             connection_task: connection_handle,
     634            0 :             discovered_new_wal: None,
     635            0 :         });
     636            0 :     }
     637              : 
     638              :     /// Drops the current connection (if any) and updates retry timeout for the next
     639              :     /// connection attempt to the same safekeeper.
     640              :     ///
     641              :     /// # Cancel-Safety
     642              :     ///
     643              :     /// Not cancellation-safe.
     644            0 :     async fn drop_old_connection(&mut self, needs_shutdown: bool) {
     645            0 :         let wal_connection = match self.wal_connection.take() {
     646            0 :             Some(wal_connection) => wal_connection,
     647            0 :             None => return,
     648              :         };
     649              : 
     650            0 :         if needs_shutdown {
     651            0 :             wal_connection
     652            0 :                 .connection_task
     653            0 :                 .shutdown()
     654            0 :                 // This here is why this function isn't cancellation-safe.
     655            0 :                 // If we got cancelled here, then self.wal_connection is already None and we lose track of the task.
     656            0 :                 // Even if our caller diligently calls Self::shutdown(), it will find a self.wal_connection=None
     657            0 :                 // and thus be ineffective.
     658            0 :                 .await;
     659            0 :         }
     660              : 
     661            0 :         let retry = self
     662            0 :             .wal_connection_retries
     663            0 :             .entry(wal_connection.sk_id)
     664            0 :             .or_insert(RetryInfo {
     665            0 :                 next_retry_at: None,
     666            0 :                 retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
     667            0 :             });
     668              : 
     669            0 :         let now = Utc::now().naive_utc();
     670              : 
     671              :         // Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
     672              :         // and we add backoff to the time when we started the connection attempt. If the connection
     673              :         // was active for a long time, then next_retry_at will be in the past.
     674            0 :         retry.next_retry_at =
     675            0 :             wal_connection
     676            0 :                 .started_at
     677            0 :                 .checked_add_signed(chrono::Duration::milliseconds(
     678            0 :                     (retry.retry_duration_seconds * 1000.0) as i64,
     679            0 :                 ));
     680              : 
     681            0 :         if let Some(next) = &retry.next_retry_at {
     682            0 :             if next > &now {
     683            0 :                 info!(
     684            0 :                     "Next connection retry to {:?} is at {}",
     685              :                     wal_connection.sk_id, next
     686              :                 );
     687            0 :             }
     688            0 :         }
     689              : 
     690            0 :         let next_retry_duration =
     691            0 :             retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
     692              :         // Clamp the next retry duration to the maximum allowed.
     693            0 :         let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
     694              :         // Clamp the next retry duration to the minimum allowed.
     695            0 :         let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
     696              : 
     697            0 :         retry.retry_duration_seconds = next_retry_duration;
     698            0 :     }
     699              : 
     700              :     /// Returns time needed to wait to have a new candidate for WAL streaming.
     701            0 :     fn time_until_next_retry(&self) -> Option<Duration> {
     702            0 :         let now = Utc::now().naive_utc();
     703              : 
     704            0 :         let next_retry_at = self
     705            0 :             .wal_connection_retries
     706            0 :             .values()
     707            0 :             .filter_map(|retry| retry.next_retry_at)
     708            0 :             .filter(|next_retry_at| next_retry_at > &now)
     709            0 :             .min()?;
     710              : 
     711            0 :         (next_retry_at - now).to_std().ok()
     712            0 :     }
     713              : 
     714              :     /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
     715            0 :     fn register_timeline_update(&mut self, typed_msg: TypedMessage) {
     716            0 :         let mut is_discovery = false;
     717            0 :         let timeline_update = match typed_msg.r#type() {
     718              :             MessageType::SafekeeperTimelineInfo => {
     719            0 :                 let info = match typed_msg.safekeeper_timeline_info {
     720            0 :                     Some(info) => info,
     721              :                     None => {
     722            0 :                         warn!("bad proto message from broker: no safekeeper_timeline_info");
     723            0 :                         return;
     724              :                     }
     725              :                 };
     726            0 :                 SafekeeperDiscoveryResponse {
     727            0 :                     safekeeper_id: info.safekeeper_id,
     728            0 :                     tenant_timeline_id: info.tenant_timeline_id,
     729            0 :                     commit_lsn: info.commit_lsn,
     730            0 :                     safekeeper_connstr: info.safekeeper_connstr,
     731            0 :                     availability_zone: info.availability_zone,
     732            0 :                     standby_horizon: info.standby_horizon,
     733            0 :                 }
     734              :             }
     735              :             MessageType::SafekeeperDiscoveryResponse => {
     736            0 :                 is_discovery = true;
     737            0 :                 match typed_msg.safekeeper_discovery_response {
     738            0 :                     Some(response) => response,
     739              :                     None => {
     740            0 :                         warn!("bad proto message from broker: no safekeeper_discovery_response");
     741            0 :                         return;
     742              :                     }
     743              :                 }
     744              :             }
     745              :             _ => {
     746              :                 // unexpected message
     747            0 :                 return;
     748              :             }
     749              :         };
     750              : 
     751            0 :         WALRECEIVER_BROKER_UPDATES.inc();
     752              : 
     753            0 :         trace!(
     754            0 :             "safekeeper info update: standby_horizon(cutoff)={}",
     755              :             timeline_update.standby_horizon
     756              :         );
     757            0 :         if timeline_update.standby_horizon != 0 {
     758            0 :             // ignore reports from safekeepers not connected to replicas
     759            0 :             self.timeline
     760            0 :                 .standby_horizon
     761            0 :                 .store(Lsn(timeline_update.standby_horizon));
     762            0 :             self.timeline
     763            0 :                 .metrics
     764            0 :                 .standby_horizon_gauge
     765            0 :                 .set(timeline_update.standby_horizon as i64);
     766            0 :         }
     767              : 
     768            0 :         let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
     769            0 :         let old_entry = self.wal_stream_candidates.insert(
     770            0 :             new_safekeeper_id,
     771            0 :             BrokerSkTimeline {
     772            0 :                 timeline: timeline_update,
     773            0 :                 latest_update: Utc::now().naive_utc(),
     774            0 :             },
     775              :         );
     776              : 
     777            0 :         if old_entry.is_none() {
     778            0 :             info!(
     779              :                 ?is_discovery,
     780              :                 %new_safekeeper_id,
     781            0 :                 "New SK node was added",
     782              :             );
     783            0 :             WALRECEIVER_CANDIDATES_ADDED.inc();
     784            0 :         }
     785            0 :     }
     786              : 
     787              :     /// Cleans up stale broker records and checks the rest for the new connection candidate.
     788              :     /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
     789              :     /// The current rules for approving new candidates:
     790              :     /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
     791              :     /// * if there's no such entry, no new candidate found, abort
     792              :     /// * otherwise check if the candidate is much better than the current one
     793              :     ///
     794              :     /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
     795              :     /// General rules are following:
     796              :     /// * if connected safekeeper is not present, pick the candidate
     797              :     /// * if we haven't received any updates for some time, pick the candidate
     798              :     /// * if the candidate commit_lsn is much higher than the current one, pick the candidate
     799              :     /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
     800              :     /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
     801              :     ///
     802              :     /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
     803              :     /// Both thresholds are configured per tenant.
     804            9 :     fn next_connection_candidate(&mut self) -> Option<NewWalConnectionCandidate> {
     805            9 :         self.cleanup_old_candidates();
     806              : 
     807            9 :         match &self.wal_connection {
     808            5 :             Some(existing_wal_connection) => {
     809            5 :                 let connected_sk_node = existing_wal_connection.sk_id;
     810              : 
     811            5 :                 let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
     812            5 :                     self.select_connection_candidate(Some(connected_sk_node))?;
     813            5 :                 let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
     814              : 
     815            5 :                 let now = Utc::now().naive_utc();
     816            5 :                 if let Ok(latest_interaciton) =
     817            5 :                     (now - existing_wal_connection.status.latest_connection_update).to_std()
     818              :                 {
     819              :                     // Drop connection if we haven't received keepalive message for a while.
     820            5 :                     if latest_interaciton > self.conf.wal_connect_timeout {
     821            1 :                         return Some(NewWalConnectionCandidate {
     822            1 :                             safekeeper_id: new_sk_id,
     823            1 :                             wal_source_connconf: new_wal_source_connconf,
     824            1 :                             availability_zone: new_availability_zone,
     825            1 :                             reason: ReconnectReason::NoKeepAlives {
     826            1 :                                 last_keep_alive: Some(
     827            1 :                                     existing_wal_connection.status.latest_connection_update,
     828            1 :                                 ),
     829            1 :                                 check_time: now,
     830            1 :                                 threshold: self.conf.wal_connect_timeout,
     831            1 :                             },
     832            1 :                         });
     833            4 :                     }
     834            0 :                 }
     835              : 
     836            4 :                 if !existing_wal_connection.status.is_connected {
     837              :                     // We haven't connected yet and we shouldn't switch until connection timeout (condition above).
     838            0 :                     return None;
     839            4 :                 }
     840              : 
     841            4 :                 if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
     842            4 :                     let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
     843              :                     // Check if the new candidate has much more WAL than the current one.
     844            4 :                     match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
     845            4 :                         Some(new_sk_lsn_advantage) => {
     846            4 :                             if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
     847            1 :                                 return Some(NewWalConnectionCandidate {
     848            1 :                                     safekeeper_id: new_sk_id,
     849            1 :                                     wal_source_connconf: new_wal_source_connconf,
     850            1 :                                     availability_zone: new_availability_zone,
     851            1 :                                     reason: ReconnectReason::LaggingWal {
     852            1 :                                         current_commit_lsn,
     853            1 :                                         new_commit_lsn,
     854            1 :                                         threshold: self.conf.max_lsn_wal_lag,
     855            1 :                                     },
     856            1 :                                 });
     857            3 :                             }
     858              :                             // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
     859              :                             // and the current one is not, switch to the new one.
     860            3 :                             if self.conf.availability_zone.is_some()
     861            1 :                                 && existing_wal_connection.availability_zone
     862            1 :                                     != self.conf.availability_zone
     863            1 :                                 && self.conf.availability_zone == new_availability_zone
     864              :                             {
     865            1 :                                 return Some(NewWalConnectionCandidate {
     866            1 :                                     safekeeper_id: new_sk_id,
     867            1 :                                     availability_zone: new_availability_zone,
     868            1 :                                     wal_source_connconf: new_wal_source_connconf,
     869            1 :                                     reason: ReconnectReason::SwitchAvailabilityZone,
     870            1 :                                 });
     871            2 :                             }
     872              :                         }
     873            0 :                         None => debug!(
     874            0 :                             "Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
     875              :                         ),
     876              :                     }
     877            0 :                 }
     878              : 
     879            2 :                 let current_lsn = match existing_wal_connection.status.streaming_lsn {
     880            2 :                     Some(lsn) => lsn,
     881            0 :                     None => self.timeline.get_last_record_lsn(),
     882              :                 };
     883            2 :                 let current_commit_lsn = existing_wal_connection
     884            2 :                     .status
     885            2 :                     .commit_lsn
     886            2 :                     .unwrap_or(current_lsn);
     887            2 :                 let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
     888              : 
     889              :                 // Keep discovered_new_wal only if connected safekeeper has not caught up yet.
     890            2 :                 let mut discovered_new_wal = existing_wal_connection
     891            2 :                     .discovered_new_wal
     892            2 :                     .filter(|new_wal| new_wal.lsn > current_commit_lsn);
     893              : 
     894            2 :                 if discovered_new_wal.is_none() {
     895              :                     // Check if the new candidate has more WAL than the current one.
     896              :                     // If the new candidate has more WAL than the current one, we consider switching to the new candidate.
     897            1 :                     discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
     898            1 :                         trace!(
     899            0 :                             "New candidate has commit_lsn {}, higher than current_commit_lsn {}",
     900              :                             candidate_commit_lsn, current_commit_lsn
     901              :                         );
     902            1 :                         Some(NewCommittedWAL {
     903            1 :                             lsn: candidate_commit_lsn,
     904            1 :                             discovered_at: Utc::now().naive_utc(),
     905            1 :                         })
     906              :                     } else {
     907            0 :                         None
     908              :                     };
     909            1 :                 }
     910              : 
     911            2 :                 let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
     912              :                     // Connected safekeeper has more WAL, but we haven't received updates for some time.
     913            0 :                     trace!(
     914            0 :                         "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
     915            0 :                         (now - existing_wal_connection.status.latest_wal_update).to_std(),
     916              :                         current_lsn,
     917              :                         current_commit_lsn
     918              :                     );
     919            0 :                     Some(existing_wal_connection.status.latest_wal_update)
     920              :                 } else {
     921            2 :                     discovered_new_wal.as_ref().map(|new_wal| {
     922              :                         // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
     923            2 :                         new_wal
     924            2 :                             .discovered_at
     925            2 :                             .max(existing_wal_connection.status.latest_wal_update)
     926            2 :                     })
     927              :                 };
     928              : 
     929              :                 // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
     930            2 :                 if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
     931            2 :                     if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
     932            1 :                         if candidate_commit_lsn > current_commit_lsn
     933            1 :                             && waiting_for_new_wal > self.conf.lagging_wal_timeout
     934              :                         {
     935            1 :                             return Some(NewWalConnectionCandidate {
     936            1 :                                 safekeeper_id: new_sk_id,
     937            1 :                                 wal_source_connconf: new_wal_source_connconf,
     938            1 :                                 availability_zone: new_availability_zone,
     939            1 :                                 reason: ReconnectReason::NoWalTimeout {
     940            1 :                                     current_lsn,
     941            1 :                                     current_commit_lsn,
     942            1 :                                     candidate_commit_lsn,
     943            1 :                                     last_wal_interaction: Some(
     944            1 :                                         existing_wal_connection.status.latest_wal_update,
     945            1 :                                     ),
     946            1 :                                     check_time: now,
     947            1 :                                     threshold: self.conf.lagging_wal_timeout,
     948            1 :                                 },
     949            1 :                             });
     950            0 :                         }
     951            1 :                     }
     952            0 :                 }
     953              : 
     954            1 :                 self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
     955              :             }
     956              :             None => {
     957            3 :                 let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
     958            4 :                     self.select_connection_candidate(None)?;
     959            3 :                 return Some(NewWalConnectionCandidate {
     960            3 :                     safekeeper_id: new_sk_id,
     961            3 :                     availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
     962            3 :                     wal_source_connconf: new_wal_source_connconf,
     963            3 :                     reason: ReconnectReason::NoExistingConnection,
     964            3 :                 });
     965              :             }
     966              :         }
     967              : 
     968            1 :         None
     969            9 :     }
     970              : 
     971              :     /// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
     972              :     /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
     973              :     ///
     974              :     /// The candidate that is chosen:
     975              :     /// * has no pending retry cooldown
     976              :     /// * has greatest commit_lsn among the ones that are left
     977            9 :     fn select_connection_candidate(
     978            9 :         &self,
     979            9 :         node_to_omit: Option<NodeId>,
     980            9 :     ) -> Option<(NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
     981            9 :         self.applicable_connection_candidates()
     982           13 :             .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
     983            9 :             .max_by_key(|(_, info, _)| info.commit_lsn)
     984            9 :     }
     985              : 
     986              :     /// Returns a list of safekeepers that have valid info and ready for connection.
     987              :     /// Some safekeepers are filtered by the retry cooldown.
     988            9 :     fn applicable_connection_candidates(
     989            9 :         &self,
     990            9 :     ) -> impl Iterator<Item = (NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
     991            9 :         let now = Utc::now().naive_utc();
     992              : 
     993            9 :         self.wal_stream_candidates
     994            9 :             .iter()
     995           18 :             .filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
     996           16 :             .filter(move |(sk_id, _)| {
     997           16 :                 let next_retry_at = self
     998           16 :                     .wal_connection_retries
     999           16 :                     .get(sk_id)
    1000           16 :                     .and_then(|retry_info| {
    1001            1 :                         retry_info.next_retry_at
    1002            1 :                     });
    1003              : 
    1004           16 :                 next_retry_at.is_none() || next_retry_at.unwrap() <= now
    1005           16 :             }).filter_map(|(sk_id, broker_info)| {
    1006           15 :                 let info = &broker_info.timeline;
    1007           15 :                 if info.safekeeper_connstr.is_empty() {
    1008            2 :                     return None; // no connection string, ignore sk
    1009           13 :                 }
    1010              : 
    1011           13 :                 let shard_identity = self.timeline.get_shard_identity();
    1012           13 :                 let (shard_number, shard_count, shard_stripe_size) = (
    1013           13 :                     Some(shard_identity.number.0),
    1014           13 :                     Some(shard_identity.count.0),
    1015           13 :                     Some(shard_identity.stripe_size.0),
    1016           13 :                 );
    1017              : 
    1018           13 :                 let connection_conf_args = ConnectionConfigArgs {
    1019           13 :                     protocol: self.conf.protocol,
    1020           13 :                     ttid: self.id,
    1021           13 :                     shard_number,
    1022           13 :                     shard_count,
    1023           13 :                     shard_stripe_size,
    1024           13 :                     listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
    1025           13 :                     auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
    1026           13 :                     availability_zone: self.conf.availability_zone.as_deref()
    1027              :                 };
    1028              : 
    1029           13 :                 match wal_stream_connection_config(connection_conf_args) {
    1030           13 :                     Ok(connstr) => Some((*sk_id, info, connstr)),
    1031            0 :                     Err(e) => {
    1032            0 :                         error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
    1033            0 :                         None
    1034              :                     }
    1035              :                 }
    1036           15 :             })
    1037            9 :     }
    1038              : 
    1039              :     /// Remove candidates which haven't sent broker updates for a while.
    1040            9 :     fn cleanup_old_candidates(&mut self) {
    1041            9 :         let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
    1042            9 :         let lagging_wal_timeout = self.conf.lagging_wal_timeout;
    1043              : 
    1044           19 :         self.wal_stream_candidates.retain(|node_id, broker_info| {
    1045           19 :             if let Ok(time_since_latest_broker_update) =
    1046           19 :                 (Utc::now().naive_utc() - broker_info.latest_update).to_std()
    1047              :             {
    1048           19 :                 let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
    1049           19 :                 if !should_retain {
    1050            1 :                     node_ids_to_remove.push(*node_id);
    1051           18 :                 }
    1052           19 :                 should_retain
    1053              :             } else {
    1054            0 :                 true
    1055              :             }
    1056           19 :         });
    1057              : 
    1058            9 :         if !node_ids_to_remove.is_empty() {
    1059            2 :             for node_id in node_ids_to_remove {
    1060            1 :                 info!(
    1061            0 :                     "Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections"
    1062              :                 );
    1063            1 :                 self.wal_connection_retries.remove(&node_id);
    1064            1 :                 WALRECEIVER_CANDIDATES_REMOVED.inc();
    1065              :             }
    1066            8 :         }
    1067            9 :     }
    1068              : 
    1069              :     /// # Cancel-Safety
    1070              :     ///
    1071              :     /// Not cancellation-safe.
    1072            0 :     pub(super) async fn shutdown(mut self) {
    1073            0 :         if let Some(wal_connection) = self.wal_connection.take() {
    1074            0 :             wal_connection.connection_task.shutdown().await;
    1075            0 :         }
    1076            0 :     }
    1077              : 
    1078            0 :     fn manager_status(&self) -> ConnectionManagerStatus {
    1079              :         ConnectionManagerStatus {
    1080            0 :             existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
    1081            0 :             wal_stream_candidates: self.wal_stream_candidates.clone(),
    1082              :         }
    1083            0 :     }
    1084              : }
    1085              : 
    1086              : #[derive(Debug)]
    1087              : struct NewWalConnectionCandidate {
    1088              :     safekeeper_id: NodeId,
    1089              :     wal_source_connconf: PgConnectionConfig,
    1090              :     availability_zone: Option<String>,
    1091              :     reason: ReconnectReason,
    1092              : }
    1093              : 
    1094              : /// Stores the reason why WAL connection was switched, for furter debugging purposes.
    1095              : #[derive(Debug, PartialEq, Eq)]
    1096              : enum ReconnectReason {
    1097              :     NoExistingConnection,
    1098              :     LaggingWal {
    1099              :         current_commit_lsn: Lsn,
    1100              :         new_commit_lsn: Lsn,
    1101              :         threshold: NonZeroU64,
    1102              :     },
    1103              :     SwitchAvailabilityZone,
    1104              :     NoWalTimeout {
    1105              :         current_lsn: Lsn,
    1106              :         current_commit_lsn: Lsn,
    1107              :         candidate_commit_lsn: Lsn,
    1108              :         last_wal_interaction: Option<NaiveDateTime>,
    1109              :         check_time: NaiveDateTime,
    1110              :         threshold: Duration,
    1111              :     },
    1112              :     NoKeepAlives {
    1113              :         last_keep_alive: Option<NaiveDateTime>,
    1114              :         check_time: NaiveDateTime,
    1115              :         threshold: Duration,
    1116              :     },
    1117              : }
    1118              : 
    1119              : impl ReconnectReason {
    1120            0 :     fn name(&self) -> &str {
    1121            0 :         match self {
    1122            0 :             ReconnectReason::NoExistingConnection => "NoExistingConnection",
    1123            0 :             ReconnectReason::LaggingWal { .. } => "LaggingWal",
    1124            0 :             ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
    1125            0 :             ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
    1126            0 :             ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
    1127              :         }
    1128            0 :     }
    1129              : }
    1130              : 
    1131              : #[cfg(test)]
    1132              : mod tests {
    1133              :     use url::Host;
    1134              :     use utils::postgres_client::PostgresClientProtocol;
    1135              : 
    1136              :     use super::*;
    1137              :     use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
    1138              : 
    1139           19 :     fn dummy_broker_sk_timeline(
    1140           19 :         commit_lsn: u64,
    1141           19 :         safekeeper_connstr: &str,
    1142           19 :         latest_update: NaiveDateTime,
    1143           19 :     ) -> BrokerSkTimeline {
    1144           19 :         BrokerSkTimeline {
    1145           19 :             timeline: SafekeeperDiscoveryResponse {
    1146           19 :                 safekeeper_id: 0,
    1147           19 :                 tenant_timeline_id: None,
    1148           19 :                 commit_lsn,
    1149           19 :                 safekeeper_connstr: safekeeper_connstr.to_owned(),
    1150           19 :                 availability_zone: None,
    1151           19 :                 standby_horizon: 0,
    1152           19 :             },
    1153           19 :             latest_update,
    1154           19 :         }
    1155           19 :     }
    1156              : 
    1157              :     #[tokio::test]
    1158            1 :     async fn no_connection_no_candidate() -> anyhow::Result<()> {
    1159            1 :         let harness = TenantHarness::create("no_connection_no_candidate").await?;
    1160            1 :         let mut state = dummy_state(&harness).await;
    1161            1 :         let now = Utc::now().naive_utc();
    1162              : 
    1163            1 :         let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
    1164            1 :         let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
    1165              : 
    1166            1 :         state.wal_connection = None;
    1167            1 :         state.wal_stream_candidates = HashMap::from([
    1168            1 :             (NodeId(0), dummy_broker_sk_timeline(1, "", now)),
    1169            1 :             (NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
    1170            1 :             (NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
    1171            1 :             (
    1172            1 :                 NodeId(3),
    1173            1 :                 dummy_broker_sk_timeline(
    1174            1 :                     1 + state.conf.max_lsn_wal_lag.get(),
    1175            1 :                     "delay_over_threshold",
    1176            1 :                     delay_over_threshold,
    1177            1 :                 ),
    1178            1 :             ),
    1179            1 :         ]);
    1180              : 
    1181            1 :         let no_candidate = state.next_connection_candidate();
    1182            1 :         assert!(
    1183            1 :             no_candidate.is_none(),
    1184            0 :             "Expected no candidate selected out of non full data options, but got {no_candidate:?}"
    1185              :         );
    1186              : 
    1187            2 :         Ok(())
    1188            1 :     }
    1189              : 
    1190              :     #[tokio::test]
    1191            1 :     async fn connection_no_candidate() -> anyhow::Result<()> {
    1192            1 :         let harness = TenantHarness::create("connection_no_candidate").await?;
    1193            1 :         let mut state = dummy_state(&harness).await;
    1194            1 :         let now = Utc::now().naive_utc();
    1195              : 
    1196            1 :         let connected_sk_id = NodeId(0);
    1197            1 :         let current_lsn = 100_000;
    1198              : 
    1199            1 :         let connection_status = WalConnectionStatus {
    1200            1 :             is_connected: true,
    1201            1 :             has_processed_wal: true,
    1202            1 :             latest_connection_update: now,
    1203            1 :             latest_wal_update: now,
    1204            1 :             commit_lsn: Some(Lsn(current_lsn)),
    1205            1 :             streaming_lsn: Some(Lsn(current_lsn)),
    1206            1 :             node: NodeId(1),
    1207            1 :         };
    1208              : 
    1209            1 :         state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
    1210            1 :         state.wal_connection = Some(WalConnection {
    1211            1 :             started_at: now,
    1212            1 :             sk_id: connected_sk_id,
    1213            1 :             availability_zone: None,
    1214            1 :             status: connection_status,
    1215            1 :             connection_task: state.spawn(move |sender, _| async move {
    1216            1 :                 sender
    1217            1 :                     .send(TaskStateUpdate::Progress(connection_status))
    1218            1 :                     .ok();
    1219            1 :                 Ok(())
    1220            2 :             }),
    1221            1 :             discovered_new_wal: None,
    1222              :         });
    1223            1 :         state.wal_stream_candidates = HashMap::from([
    1224            1 :             (
    1225            1 :                 connected_sk_id,
    1226            1 :                 dummy_broker_sk_timeline(
    1227            1 :                     current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
    1228            1 :                     DUMMY_SAFEKEEPER_HOST,
    1229            1 :                     now,
    1230            1 :                 ),
    1231            1 :             ),
    1232            1 :             (
    1233            1 :                 NodeId(1),
    1234            1 :                 dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
    1235            1 :             ),
    1236            1 :             (
    1237            1 :                 NodeId(2),
    1238            1 :                 dummy_broker_sk_timeline(
    1239            1 :                     current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
    1240            1 :                     "not_enough_advanced_lsn",
    1241            1 :                     now,
    1242            1 :                 ),
    1243            1 :             ),
    1244            1 :         ]);
    1245              : 
    1246            1 :         let no_candidate = state.next_connection_candidate();
    1247            1 :         assert!(
    1248            1 :             no_candidate.is_none(),
    1249            0 :             "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}"
    1250              :         );
    1251              : 
    1252            2 :         Ok(())
    1253            1 :     }
    1254              : 
    1255              :     #[tokio::test]
    1256            1 :     async fn no_connection_candidate() -> anyhow::Result<()> {
    1257            1 :         let harness = TenantHarness::create("no_connection_candidate").await?;
    1258            1 :         let mut state = dummy_state(&harness).await;
    1259            1 :         let now = Utc::now().naive_utc();
    1260              : 
    1261            1 :         state.wal_connection = None;
    1262            1 :         state.wal_stream_candidates = HashMap::from([(
    1263            1 :             NodeId(0),
    1264            1 :             dummy_broker_sk_timeline(
    1265            1 :                 1 + state.conf.max_lsn_wal_lag.get(),
    1266            1 :                 DUMMY_SAFEKEEPER_HOST,
    1267            1 :                 now,
    1268            1 :             ),
    1269            1 :         )]);
    1270              : 
    1271            1 :         let only_candidate = state
    1272            1 :             .next_connection_candidate()
    1273            1 :             .expect("Expected one candidate selected out of the only data option, but got none");
    1274            1 :         assert_eq!(only_candidate.safekeeper_id, NodeId(0));
    1275            1 :         assert_eq!(
    1276              :             only_candidate.reason,
    1277              :             ReconnectReason::NoExistingConnection,
    1278            0 :             "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
    1279              :         );
    1280            1 :         assert_eq!(
    1281            1 :             only_candidate.wal_source_connconf.host(),
    1282            1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1283              :         );
    1284              : 
    1285            1 :         let selected_lsn = 100_000;
    1286            1 :         state.wal_stream_candidates = HashMap::from([
    1287            1 :             (
    1288            1 :                 NodeId(0),
    1289            1 :                 dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
    1290            1 :             ),
    1291            1 :             (
    1292            1 :                 NodeId(1),
    1293            1 :                 dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
    1294            1 :             ),
    1295            1 :             (
    1296            1 :                 NodeId(2),
    1297            1 :                 dummy_broker_sk_timeline(selected_lsn + 100, "", now),
    1298            1 :             ),
    1299            1 :         ]);
    1300            1 :         let biggest_wal_candidate = state.next_connection_candidate().expect(
    1301            1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1302              :         );
    1303              : 
    1304            1 :         assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1));
    1305            1 :         assert_eq!(
    1306              :             biggest_wal_candidate.reason,
    1307              :             ReconnectReason::NoExistingConnection,
    1308            0 :             "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
    1309              :         );
    1310            1 :         assert_eq!(
    1311            1 :             biggest_wal_candidate.wal_source_connconf.host(),
    1312            1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1313              :         );
    1314              : 
    1315            2 :         Ok(())
    1316            1 :     }
    1317              : 
    1318              :     #[tokio::test]
    1319            1 :     async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
    1320            1 :         let harness = TenantHarness::create("candidate_with_many_connection_failures").await?;
    1321            1 :         let mut state = dummy_state(&harness).await;
    1322            1 :         let now = Utc::now().naive_utc();
    1323              : 
    1324            1 :         let current_lsn = Lsn(100_000).align();
    1325            1 :         let bigger_lsn = Lsn(current_lsn.0 + 100).align();
    1326              : 
    1327            1 :         state.wal_connection = None;
    1328            1 :         state.wal_stream_candidates = HashMap::from([
    1329            1 :             (
    1330            1 :                 NodeId(0),
    1331            1 :                 dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1332            1 :             ),
    1333            1 :             (
    1334            1 :                 NodeId(1),
    1335            1 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1336            1 :             ),
    1337            1 :         ]);
    1338            1 :         state.wal_connection_retries = HashMap::from([(
    1339            1 :             NodeId(0),
    1340            1 :             RetryInfo {
    1341            1 :                 next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
    1342            1 :                 retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
    1343            1 :             },
    1344            1 :         )]);
    1345              : 
    1346            1 :         let candidate_with_less_errors = state
    1347            1 :             .next_connection_candidate()
    1348            1 :             .expect("Expected one candidate selected, but got none");
    1349            1 :         assert_eq!(
    1350              :             candidate_with_less_errors.safekeeper_id,
    1351              :             NodeId(1),
    1352            0 :             "Should select the node with no pending retry cooldown"
    1353              :         );
    1354              : 
    1355            2 :         Ok(())
    1356            1 :     }
    1357              : 
    1358              :     #[tokio::test]
    1359            1 :     async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
    1360            1 :         let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate").await?;
    1361            1 :         let mut state = dummy_state(&harness).await;
    1362            1 :         let current_lsn = Lsn(100_000).align();
    1363            1 :         let now = Utc::now().naive_utc();
    1364              : 
    1365            1 :         let connected_sk_id = NodeId(0);
    1366            1 :         let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
    1367              : 
    1368            1 :         let connection_status = WalConnectionStatus {
    1369            1 :             is_connected: true,
    1370            1 :             has_processed_wal: true,
    1371            1 :             latest_connection_update: now,
    1372            1 :             latest_wal_update: now,
    1373            1 :             commit_lsn: Some(current_lsn),
    1374            1 :             streaming_lsn: Some(current_lsn),
    1375            1 :             node: connected_sk_id,
    1376            1 :         };
    1377              : 
    1378            1 :         state.wal_connection = Some(WalConnection {
    1379            1 :             started_at: now,
    1380            1 :             sk_id: connected_sk_id,
    1381            1 :             availability_zone: None,
    1382            1 :             status: connection_status,
    1383            1 :             connection_task: state.spawn(move |sender, _| async move {
    1384            1 :                 sender
    1385            1 :                     .send(TaskStateUpdate::Progress(connection_status))
    1386            1 :                     .ok();
    1387            1 :                 Ok(())
    1388            2 :             }),
    1389            1 :             discovered_new_wal: None,
    1390              :         });
    1391            1 :         state.wal_stream_candidates = HashMap::from([
    1392            1 :             (
    1393            1 :                 connected_sk_id,
    1394            1 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1395            1 :             ),
    1396            1 :             (
    1397            1 :                 NodeId(1),
    1398            1 :                 dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
    1399            1 :             ),
    1400            1 :         ]);
    1401              : 
    1402            1 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1403            1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1404              :         );
    1405              : 
    1406            1 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1));
    1407            1 :         assert_eq!(
    1408              :             over_threshcurrent_candidate.reason,
    1409            1 :             ReconnectReason::LaggingWal {
    1410            1 :                 current_commit_lsn: current_lsn,
    1411            1 :                 new_commit_lsn: new_lsn,
    1412            1 :                 threshold: state.conf.max_lsn_wal_lag
    1413            1 :             },
    1414            0 :             "Should select bigger WAL safekeeper if it starts to lag enough"
    1415              :         );
    1416            1 :         assert_eq!(
    1417            1 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1418            1 :             &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
    1419              :         );
    1420              : 
    1421            2 :         Ok(())
    1422            1 :     }
    1423              : 
    1424              :     #[tokio::test]
    1425            1 :     async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
    1426            1 :         let harness =
    1427            1 :             TenantHarness::create("timeout_connection_threshold_current_candidate").await?;
    1428            1 :         let mut state = dummy_state(&harness).await;
    1429            1 :         let current_lsn = Lsn(100_000).align();
    1430            1 :         let now = Utc::now().naive_utc();
    1431              : 
    1432            1 :         let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
    1433            1 :         let time_over_threshold =
    1434            1 :             Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
    1435              : 
    1436            1 :         let connection_status = WalConnectionStatus {
    1437            1 :             is_connected: true,
    1438            1 :             has_processed_wal: true,
    1439            1 :             latest_connection_update: time_over_threshold,
    1440            1 :             latest_wal_update: time_over_threshold,
    1441            1 :             commit_lsn: Some(current_lsn),
    1442            1 :             streaming_lsn: Some(current_lsn),
    1443            1 :             node: NodeId(1),
    1444            1 :         };
    1445              : 
    1446            1 :         state.wal_connection = Some(WalConnection {
    1447            1 :             started_at: now,
    1448            1 :             sk_id: NodeId(1),
    1449            1 :             availability_zone: None,
    1450            1 :             status: connection_status,
    1451            1 :             connection_task: state.spawn(move |sender, _| async move {
    1452            1 :                 sender
    1453            1 :                     .send(TaskStateUpdate::Progress(connection_status))
    1454            1 :                     .ok();
    1455            1 :                 Ok(())
    1456            2 :             }),
    1457            1 :             discovered_new_wal: None,
    1458              :         });
    1459            1 :         state.wal_stream_candidates = HashMap::from([(
    1460            1 :             NodeId(0),
    1461            1 :             dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1462            1 :         )]);
    1463              : 
    1464            1 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1465            1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1466              :         );
    1467              : 
    1468            1 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
    1469            1 :         match over_threshcurrent_candidate.reason {
    1470              :             ReconnectReason::NoKeepAlives {
    1471            1 :                 last_keep_alive,
    1472            1 :                 threshold,
    1473              :                 ..
    1474              :             } => {
    1475            1 :                 assert_eq!(last_keep_alive, Some(time_over_threshold));
    1476            1 :                 assert_eq!(threshold, state.conf.lagging_wal_timeout);
    1477              :             }
    1478            0 :             unexpected => panic!("Unexpected reason: {unexpected:?}"),
    1479              :         }
    1480            1 :         assert_eq!(
    1481            1 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1482            1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1483              :         );
    1484              : 
    1485            2 :         Ok(())
    1486            1 :     }
    1487              : 
    1488              :     #[tokio::test]
    1489            1 :     async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
    1490            1 :         let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate").await?;
    1491            1 :         let mut state = dummy_state(&harness).await;
    1492            1 :         let current_lsn = Lsn(100_000).align();
    1493            1 :         let new_lsn = Lsn(100_100).align();
    1494            1 :         let now = Utc::now().naive_utc();
    1495              : 
    1496            1 :         let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
    1497            1 :         let time_over_threshold =
    1498            1 :             Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
    1499              : 
    1500            1 :         let connection_status = WalConnectionStatus {
    1501            1 :             is_connected: true,
    1502            1 :             has_processed_wal: true,
    1503            1 :             latest_connection_update: now,
    1504            1 :             latest_wal_update: time_over_threshold,
    1505            1 :             commit_lsn: Some(current_lsn),
    1506            1 :             streaming_lsn: Some(current_lsn),
    1507            1 :             node: NodeId(1),
    1508            1 :         };
    1509              : 
    1510            1 :         state.wal_connection = Some(WalConnection {
    1511            1 :             started_at: now,
    1512            1 :             sk_id: NodeId(1),
    1513            1 :             availability_zone: None,
    1514            1 :             status: connection_status,
    1515            2 :             connection_task: state.spawn(move |_, _| async move { Ok(()) }),
    1516            1 :             discovered_new_wal: Some(NewCommittedWAL {
    1517            1 :                 discovered_at: time_over_threshold,
    1518            1 :                 lsn: new_lsn,
    1519            1 :             }),
    1520              :         });
    1521            1 :         state.wal_stream_candidates = HashMap::from([(
    1522            1 :             NodeId(0),
    1523            1 :             dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1524            1 :         )]);
    1525              : 
    1526            1 :         let over_threshcurrent_candidate = state.next_connection_candidate().expect(
    1527            1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1528              :         );
    1529              : 
    1530            1 :         assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
    1531            1 :         match over_threshcurrent_candidate.reason {
    1532              :             ReconnectReason::NoWalTimeout {
    1533            1 :                 current_lsn,
    1534            1 :                 current_commit_lsn,
    1535            1 :                 candidate_commit_lsn,
    1536            1 :                 last_wal_interaction,
    1537            1 :                 threshold,
    1538              :                 ..
    1539              :             } => {
    1540            1 :                 assert_eq!(current_lsn, current_lsn);
    1541            1 :                 assert_eq!(current_commit_lsn, current_lsn);
    1542            1 :                 assert_eq!(candidate_commit_lsn, new_lsn);
    1543            1 :                 assert_eq!(last_wal_interaction, Some(time_over_threshold));
    1544            1 :                 assert_eq!(threshold, state.conf.lagging_wal_timeout);
    1545              :             }
    1546            0 :             unexpected => panic!("Unexpected reason: {unexpected:?}"),
    1547              :         }
    1548            1 :         assert_eq!(
    1549            1 :             over_threshcurrent_candidate.wal_source_connconf.host(),
    1550            1 :             &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
    1551              :         );
    1552              : 
    1553            2 :         Ok(())
    1554            1 :     }
    1555              : 
    1556              :     const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
    1557              : 
    1558            8 :     async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
    1559            8 :         let (tenant, ctx) = harness.load().await;
    1560            8 :         let timeline = tenant
    1561            8 :             .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
    1562            8 :             .await
    1563            8 :             .expect("Failed to create an empty timeline for dummy wal connection manager");
    1564              : 
    1565            8 :         let protocol = PostgresClientProtocol::Interpreted {
    1566            8 :             format: utils::postgres_client::InterpretedFormat::Protobuf,
    1567            8 :             compression: Some(utils::postgres_client::Compression::Zstd { level: 1 }),
    1568            8 :         };
    1569              : 
    1570            8 :         ConnectionManagerState {
    1571            8 :             id: TenantTimelineId {
    1572            8 :                 tenant_id: harness.tenant_shard_id.tenant_id,
    1573            8 :                 timeline_id: TIMELINE_ID,
    1574            8 :             },
    1575            8 :             timeline,
    1576            8 :             cancel: CancellationToken::new(),
    1577            8 :             conf: WalReceiverConf {
    1578            8 :                 protocol,
    1579            8 :                 wal_connect_timeout: Duration::from_secs(1),
    1580            8 :                 lagging_wal_timeout: Duration::from_secs(1),
    1581            8 :                 max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
    1582            8 :                 auth_token: None,
    1583            8 :                 availability_zone: None,
    1584            8 :                 ingest_batch_size: 1,
    1585            8 :                 validate_wal_contiguity: false,
    1586            8 :             },
    1587            8 :             wal_connection: None,
    1588            8 :             wal_stream_candidates: HashMap::new(),
    1589            8 :             wal_connection_retries: HashMap::new(),
    1590            8 :         }
    1591            8 :     }
    1592              : 
    1593              :     #[tokio::test]
    1594            1 :     async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
    1595              :         // Pageserver and one of safekeepers will be in the same availability zone
    1596              :         // and pageserver should prefer to connect to it.
    1597            1 :         let test_az = Some("test_az".to_owned());
    1598              : 
    1599            1 :         let harness = TenantHarness::create("switch_to_same_availability_zone").await?;
    1600            1 :         let mut state = dummy_state(&harness).await;
    1601            1 :         state.conf.availability_zone.clone_from(&test_az);
    1602            1 :         let current_lsn = Lsn(100_000).align();
    1603            1 :         let now = Utc::now().naive_utc();
    1604              : 
    1605            1 :         let connected_sk_id = NodeId(0);
    1606              : 
    1607            1 :         let connection_status = WalConnectionStatus {
    1608            1 :             is_connected: true,
    1609            1 :             has_processed_wal: true,
    1610            1 :             latest_connection_update: now,
    1611            1 :             latest_wal_update: now,
    1612            1 :             commit_lsn: Some(current_lsn),
    1613            1 :             streaming_lsn: Some(current_lsn),
    1614            1 :             node: connected_sk_id,
    1615            1 :         };
    1616              : 
    1617            1 :         state.wal_connection = Some(WalConnection {
    1618            1 :             started_at: now,
    1619            1 :             sk_id: connected_sk_id,
    1620            1 :             availability_zone: None,
    1621            1 :             status: connection_status,
    1622            1 :             connection_task: state.spawn(move |sender, _| async move {
    1623            1 :                 sender
    1624            1 :                     .send(TaskStateUpdate::Progress(connection_status))
    1625            1 :                     .ok();
    1626            1 :                 Ok(())
    1627            2 :             }),
    1628            1 :             discovered_new_wal: None,
    1629              :         });
    1630              : 
    1631              :         // We have another safekeeper with the same commit_lsn, and it have the same availability zone as
    1632              :         // the current pageserver.
    1633            1 :         let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
    1634            1 :         same_az_sk.timeline.availability_zone.clone_from(&test_az);
    1635              : 
    1636            1 :         state.wal_stream_candidates = HashMap::from([
    1637            1 :             (
    1638            1 :                 connected_sk_id,
    1639            1 :                 dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
    1640            1 :             ),
    1641            1 :             (NodeId(1), same_az_sk),
    1642            1 :         ]);
    1643              : 
    1644              :         // We expect that pageserver will switch to the safekeeper in the same availability zone,
    1645              :         // even if it has the same commit_lsn.
    1646            1 :         let next_candidate = state.next_connection_candidate().expect(
    1647            1 :             "Expected one candidate selected out of multiple valid data options, but got none",
    1648              :         );
    1649              : 
    1650            1 :         assert_eq!(next_candidate.safekeeper_id, NodeId(1));
    1651            1 :         assert_eq!(
    1652              :             next_candidate.reason,
    1653              :             ReconnectReason::SwitchAvailabilityZone,
    1654            0 :             "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
    1655              :         );
    1656            1 :         assert_eq!(
    1657            1 :             next_candidate.wal_source_connconf.host(),
    1658            1 :             &Host::Domain("same_az".to_owned())
    1659              :         );
    1660              : 
    1661            2 :         Ok(())
    1662            1 :     }
    1663              : }
        

Generated by: LCOV version 2.1-beta