Line data Source code
1 : //! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
2 : //! that contains the latest WAL to stream and this connection does not go stale.
3 : //!
4 : //! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
5 : //! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
6 : //! Current connection state is tracked too, to ensure it's not getting stale.
7 : //!
8 : //! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
9 : //! then a (re)connection happens, if necessary.
10 : //! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
11 :
12 : use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
13 :
14 : use super::{TaskStateUpdate, WalReceiverConf};
15 : use crate::context::{DownloadBehavior, RequestContext};
16 : use crate::metrics::{
17 : WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
18 : WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
19 : };
20 : use crate::task_mgr::{shutdown_token, TaskKind};
21 : use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
22 : use anyhow::Context;
23 : use chrono::{NaiveDateTime, Utc};
24 : use pageserver_api::models::TimelineState;
25 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
26 : use storage_broker::proto::SafekeeperTimelineInfo;
27 : use storage_broker::proto::SubscribeSafekeeperInfoRequest;
28 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
29 : use storage_broker::{BrokerClientChannel, Code, Streaming};
30 : use tokio::select;
31 : use tracing::*;
32 :
33 : use postgres_connection::PgConnectionConfig;
34 : use utils::backoff::{
35 : exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
36 : };
37 : use utils::postgres_client::wal_stream_connection_config;
38 : use utils::{
39 : id::{NodeId, TenantTimelineId},
40 : lsn::Lsn,
41 : };
42 :
43 : use super::{
44 : walreceiver_connection::WalConnectionStatus, walreceiver_connection::WalReceiverError,
45 : TaskEvent, TaskHandle,
46 : };
47 :
48 : /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
49 : /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
50 : /// If storage broker subscription is cancelled, exits.
51 1235 : pub(super) async fn connection_manager_loop_step(
52 1235 : broker_client: &mut BrokerClientChannel,
53 1235 : connection_manager_state: &mut ConnectionManagerState,
54 1235 : ctx: &RequestContext,
55 1235 : manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
56 1235 : ) -> ControlFlow<(), ()> {
57 1235 : match connection_manager_state
58 1235 : .timeline
59 1235 : .wait_to_become_active(ctx)
60 25 : .await
61 : {
62 1233 : Ok(()) => {}
63 2 : Err(new_state) => {
64 0 : debug!(
65 0 : ?new_state,
66 0 : "state changed, stopping wal connection manager loop"
67 0 : );
68 2 : return ControlFlow::Break(());
69 : }
70 : }
71 :
72 1233 : WALRECEIVER_ACTIVE_MANAGERS.inc();
73 543 : scopeguard::defer! {
74 543 : WALRECEIVER_ACTIVE_MANAGERS.dec();
75 543 : }
76 :
77 1233 : let id = TenantTimelineId {
78 1233 : tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
79 1233 : timeline_id: connection_manager_state.timeline.timeline_id,
80 1233 : };
81 1233 :
82 1233 : let mut timeline_state_updates = connection_manager_state
83 1233 : .timeline
84 1233 : .subscribe_for_state_updates();
85 :
86 : // Subscribe to the broker updates. Stream shares underlying TCP connection
87 : // with other streams on this client (other connection managers). When
88 : // object goes out of scope, stream finishes in drop() automatically.
89 2458 : let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
90 0 : debug!("Subscribed for broker timeline updates");
91 :
92 : loop {
93 737728 : let time_until_next_retry = connection_manager_state.time_until_next_retry();
94 737728 :
95 737728 : // These things are happening concurrently:
96 737728 : //
97 737728 : // - keep receiving WAL on the current connection
98 737728 : // - if the shared state says we need to change connection, disconnect and return
99 737728 : // - this runs in a separate task and we receive updates via a watch channel
100 737728 : // - change connection if the rules decide so, or if the current connection dies
101 737728 : // - receive updates from broker
102 737728 : // - this might change the current desired connection
103 737728 : // - timeline state changes to something that does not allow walreceiver to run concurrently
104 1451442 : select! {
105 737577 : Some(wal_connection_update) = async {
106 737577 : match connection_manager_state.wal_connection.as_mut() {
107 735449 : Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
108 2128 : None => None,
109 : }
110 729793 : } => {
111 : let wal_connection = connection_manager_state.wal_connection.as_mut()
112 : .expect("Should have a connection, as checked by the corresponding select! guard");
113 : match wal_connection_update {
114 : TaskEvent::Update(TaskStateUpdate::Started) => {},
115 : TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
116 : if new_status.has_processed_wal {
117 : // We have advanced last_record_lsn by processing the WAL received
118 : // from this safekeeper. This is good enough to clean unsuccessful
119 : // retries history and allow reconnecting to this safekeeper without
120 : // sleeping for a long time.
121 : connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
122 : }
123 : wal_connection.status = new_status;
124 : }
125 : TaskEvent::End(walreceiver_task_result) => {
126 : match walreceiver_task_result {
127 0 : Ok(()) => debug!("WAL receiving task finished"),
128 14 : Err(e) => error!("wal receiver task finished with an error: {e:?}"),
129 : }
130 : connection_manager_state.drop_old_connection(false).await;
131 : },
132 : }
133 : },
134 :
135 : // Got a new update from the broker
136 8230 : broker_update = broker_subscription.message() => {
137 : match broker_update {
138 : Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
139 : Err(status) => {
140 : match status.code() {
141 : Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") => {
142 : // tonic's error handling doesn't provide a clear code for disconnections: we get
143 : // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
144 0 : info!("broker disconnected: {status}");
145 : },
146 : _ => {
147 0 : warn!("broker subscription failed: {status}");
148 : }
149 : }
150 : return ControlFlow::Continue(());
151 : }
152 : Ok(None) => {
153 0 : error!("broker subscription stream ended"); // can't happen
154 : return ControlFlow::Continue(());
155 : }
156 : }
157 : },
158 :
159 727119 : new_event = async {
160 727119 : loop {
161 727119 : if connection_manager_state.timeline.current_state() == TimelineState::Loading {
162 0 : warn!("wal connection manager should only be launched after timeline has become active");
163 727119 : }
164 727119 : match timeline_state_updates.changed().await {
165 : Ok(()) => {
166 481 : let new_state = connection_manager_state.timeline.current_state();
167 481 : match new_state {
168 : // we're already active as walreceiver, no need to reactivate
169 0 : TimelineState::Active => continue,
170 : TimelineState::Broken { .. } | TimelineState::Stopping => {
171 481 : debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
172 481 : return ControlFlow::Break(());
173 : }
174 : TimelineState::Loading => {
175 0 : warn!("timeline transitioned back to Loading state, that should not happen");
176 0 : return ControlFlow::Continue(());
177 : }
178 : }
179 : }
180 0 : Err(_sender_dropped_error) => return ControlFlow::Break(()),
181 : }
182 : }
183 481 : } => match new_event {
184 : ControlFlow::Continue(()) => {
185 : return ControlFlow::Continue(());
186 : }
187 : ControlFlow::Break(()) => {
188 0 : debug!("Timeline is no longer active, stopping wal connection manager loop");
189 : return ControlFlow::Break(());
190 : }
191 : },
192 :
193 732421 : Some(()) = async {
194 732421 : match time_until_next_retry {
195 1780 : Some(sleep_time) => {
196 1780 : tokio::time::sleep(sleep_time).await;
197 621 : Some(())
198 : },
199 : None => {
200 730641 : debug!("No candidates to retry, waiting indefinitely for the broker events");
201 730641 : None
202 : }
203 : }
204 731262 : } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
205 : }
206 :
207 736514 : if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
208 1702 : info!("Switching to new connection candidate: {new_candidate:?}");
209 1702 : connection_manager_state
210 1702 : .change_connection(new_candidate, ctx)
211 16 : .await
212 734812 : }
213 736514 : *manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
214 : }
215 483 : }
216 :
217 : /// Endlessly try to subscribe for broker updates for a given timeline.
218 1233 : async fn subscribe_for_timeline_updates(
219 1233 : broker_client: &mut BrokerClientChannel,
220 1233 : id: TenantTimelineId,
221 1233 : ) -> Streaming<SafekeeperTimelineInfo> {
222 1233 : let mut attempt = 0;
223 1233 : let cancel = shutdown_token();
224 :
225 : loop {
226 1233 : exponential_backoff(
227 1233 : attempt,
228 1233 : DEFAULT_BASE_BACKOFF_SECONDS,
229 1233 : DEFAULT_MAX_BACKOFF_SECONDS,
230 1233 : &cancel,
231 1233 : )
232 0 : .await;
233 1233 : attempt += 1;
234 1233 :
235 1233 : // subscribe to the specific timeline
236 1233 : let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
237 1233 : tenant_id: id.tenant_id.as_ref().to_owned(),
238 1233 : timeline_id: id.timeline_id.as_ref().to_owned(),
239 1233 : });
240 1233 : let request = SubscribeSafekeeperInfoRequest {
241 1233 : subscription_key: Some(key),
242 1233 : };
243 1233 :
244 2458 : match broker_client.subscribe_safekeeper_info(request).await {
245 1214 : Ok(resp) => {
246 1214 : return resp.into_inner();
247 : }
248 0 : Err(e) => {
249 : // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
250 : // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
251 0 : info!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}");
252 0 : continue;
253 : }
254 : }
255 : }
256 1214 : }
257 :
258 : const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
259 : const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
260 : const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
261 :
262 : /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
263 : pub(super) struct ConnectionManagerState {
264 : id: TenantTimelineId,
265 : /// Use pageserver data about the timeline to filter out some of the safekeepers.
266 : timeline: Arc<Timeline>,
267 : conf: WalReceiverConf,
268 : /// Current connection to safekeeper for WAL streaming.
269 : wal_connection: Option<WalConnection>,
270 : /// Info about retries and unsuccessful attempts to connect to safekeepers.
271 : wal_connection_retries: HashMap<NodeId, RetryInfo>,
272 : /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
273 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
274 : }
275 :
276 : /// An information about connection manager's current connection and connection candidates.
277 1794 : #[derive(Debug, Clone)]
278 : pub struct ConnectionManagerStatus {
279 : existing_connection: Option<WalConnectionStatus>,
280 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
281 : }
282 :
283 : impl ConnectionManagerStatus {
284 : /// Generates a string, describing current connection status in a form, suitable for logging.
285 1794 : pub fn to_human_readable_string(&self) -> String {
286 1794 : let mut resulting_string = String::new();
287 1794 : match &self.existing_connection {
288 1751 : Some(connection) => {
289 1751 : if connection.has_processed_wal {
290 1694 : resulting_string.push_str(&format!(
291 1694 : " (update {}): streaming WAL from node {}, ",
292 1694 : connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
293 1694 : connection.node,
294 1694 : ));
295 1694 :
296 1694 : match (connection.streaming_lsn, connection.commit_lsn) {
297 0 : (None, None) => resulting_string.push_str("no streaming data"),
298 0 : (None, Some(commit_lsn)) => {
299 0 : resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
300 : }
301 0 : (Some(streaming_lsn), None) => {
302 0 : resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
303 : }
304 1694 : (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
305 1694 : &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
306 1694 : ),
307 : }
308 57 : } else if connection.is_connected {
309 57 : resulting_string.push_str(&format!(
310 57 : " (update {}): connecting to node {}",
311 57 : connection
312 57 : .latest_connection_update
313 57 : .format("%Y-%m-%d %H:%M:%S"),
314 57 : connection.node,
315 57 : ));
316 57 : } else {
317 0 : resulting_string.push_str(&format!(
318 0 : " (update {}): initializing node {} connection",
319 0 : connection
320 0 : .latest_connection_update
321 0 : .format("%Y-%m-%d %H:%M:%S"),
322 0 : connection.node,
323 0 : ));
324 0 : }
325 : }
326 43 : None => resulting_string.push_str(": disconnected"),
327 : }
328 :
329 1794 : resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
330 1794 : let mut candidates = self.wal_stream_candidates.iter().peekable();
331 3731 : while let Some((node_id, candidate_info)) = candidates.next() {
332 1937 : resulting_string.push_str(&format!(
333 1937 : "({}|{}|{})",
334 1937 : node_id,
335 1937 : candidate_info.latest_update.format("%H:%M:%S"),
336 1937 : Lsn(candidate_info.timeline.commit_lsn)
337 1937 : ));
338 1937 : if candidates.peek().is_some() {
339 144 : resulting_string.push_str(", ");
340 1793 : }
341 : }
342 1794 : resulting_string.push(']');
343 1794 :
344 1794 : resulting_string
345 1794 : }
346 : }
347 :
348 : /// Current connection data.
349 0 : #[derive(Debug)]
350 : struct WalConnection {
351 : /// Time when the connection was initiated.
352 : started_at: NaiveDateTime,
353 : /// Current safekeeper pageserver is connected to for WAL streaming.
354 : sk_id: NodeId,
355 : /// Availability zone of the safekeeper.
356 : availability_zone: Option<String>,
357 : /// Status of the connection.
358 : status: WalConnectionStatus,
359 : /// WAL streaming task handle.
360 : connection_task: TaskHandle<WalConnectionStatus>,
361 : /// Have we discovered that other safekeeper has more recent WAL than we do?
362 : discovered_new_wal: Option<NewCommittedWAL>,
363 : }
364 :
365 : /// Notion of a new committed WAL, which exists on other safekeeper.
366 0 : #[derive(Debug, Clone, Copy)]
367 : struct NewCommittedWAL {
368 : /// LSN of the new committed WAL.
369 : lsn: Lsn,
370 : /// When we discovered that the new committed WAL exists on other safekeeper.
371 : discovered_at: NaiveDateTime,
372 : }
373 :
374 0 : #[derive(Debug, Clone, Copy)]
375 : struct RetryInfo {
376 : next_retry_at: Option<NaiveDateTime>,
377 : retry_duration_seconds: f64,
378 : }
379 :
380 : /// Data about the timeline to connect to, received from the broker.
381 1159378 : #[derive(Debug, Clone)]
382 : struct BrokerSkTimeline {
383 : timeline: SafekeeperTimelineInfo,
384 : /// Time at which the data was fetched from the broker last time, to track the stale data.
385 : latest_update: NaiveDateTime,
386 : }
387 :
388 : impl ConnectionManagerState {
389 1235 : pub(super) fn new(timeline: Arc<Timeline>, conf: WalReceiverConf) -> Self {
390 1235 : let id = TenantTimelineId {
391 1235 : tenant_id: timeline.tenant_shard_id.tenant_id,
392 1235 : timeline_id: timeline.timeline_id,
393 1235 : };
394 1235 : Self {
395 1235 : id,
396 1235 : timeline,
397 1235 : conf,
398 1235 : wal_connection: None,
399 1235 : wal_stream_candidates: HashMap::new(),
400 1235 : wal_connection_retries: HashMap::new(),
401 1235 : }
402 1235 : }
403 :
404 : /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
405 1702 : async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
406 1702 : WALRECEIVER_SWITCHES
407 1702 : .with_label_values(&[new_sk.reason.name()])
408 1702 : .inc();
409 1702 :
410 1702 : self.drop_old_connection(true).await;
411 :
412 1702 : let node_id = new_sk.safekeeper_id;
413 1702 : let connect_timeout = self.conf.wal_connect_timeout;
414 1702 : let ingest_batch_size = self.conf.ingest_batch_size;
415 1702 : let timeline = Arc::clone(&self.timeline);
416 1702 : let ctx = ctx.detached_child(
417 1702 : TaskKind::WalReceiverConnectionHandler,
418 1702 : DownloadBehavior::Download,
419 1702 : );
420 :
421 1702 : let span = info_span!("connection", %node_id);
422 1702 : let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
423 1702 : async move {
424 1702 : debug_assert_current_span_has_tenant_and_timeline_id();
425 :
426 1702 : let res = super::walreceiver_connection::handle_walreceiver_connection(
427 1702 : timeline,
428 1702 : new_sk.wal_source_connconf,
429 1702 : events_sender,
430 1702 : cancellation.clone(),
431 1702 : connect_timeout,
432 1702 : ctx,
433 1702 : node_id,
434 1702 : ingest_batch_size,
435 1702 : )
436 849884 : .await;
437 :
438 1660 : match res {
439 205 : Ok(()) => Ok(()),
440 1455 : Err(e) => {
441 1455 : match e {
442 54 : WalReceiverError::SuccessfulCompletion(msg) => {
443 54 : info!("walreceiver connection handling ended with success: {msg}");
444 54 : Ok(())
445 : }
446 1383 : WalReceiverError::ExpectedSafekeeperError(e) => {
447 1383 : info!("walreceiver connection handling ended: {e}");
448 1383 : Ok(())
449 : }
450 18 : WalReceiverError::Other(e) => {
451 18 : // give out an error to have task_mgr give it a really verbose logging
452 18 : if cancellation.is_cancelled() {
453 : // Ideally we would learn about this via some path other than Other, but
454 : // that requires refactoring all the intermediate layers of ingest code
455 : // that only emit anyhow::Error
456 3 : Ok(())
457 : } else {
458 15 : Err(e).context("walreceiver connection handling failure")
459 : }
460 : }
461 : }
462 : }
463 : }
464 1660 : }
465 1702 : .instrument(span)
466 1702 : });
467 1702 :
468 1702 : let now = Utc::now().naive_utc();
469 1702 : self.wal_connection = Some(WalConnection {
470 1702 : started_at: now,
471 1702 : sk_id: new_sk.safekeeper_id,
472 1702 : availability_zone: new_sk.availability_zone,
473 1702 : status: WalConnectionStatus {
474 1702 : is_connected: false,
475 1702 : has_processed_wal: false,
476 1702 : latest_connection_update: now,
477 1702 : latest_wal_update: now,
478 1702 : streaming_lsn: None,
479 1702 : commit_lsn: None,
480 1702 : node: node_id,
481 1702 : },
482 1702 : connection_task: connection_handle,
483 1702 : discovered_new_wal: None,
484 1702 : });
485 1702 : }
486 :
487 : /// Drops the current connection (if any) and updates retry timeout for the next
488 : /// connection attempt to the same safekeeper.
489 3081 : async fn drop_old_connection(&mut self, needs_shutdown: bool) {
490 3081 : let wal_connection = match self.wal_connection.take() {
491 1395 : Some(wal_connection) => wal_connection,
492 1686 : None => return,
493 : };
494 :
495 1395 : if needs_shutdown {
496 16 : wal_connection.connection_task.shutdown().await;
497 1379 : }
498 :
499 1395 : let retry = self
500 1395 : .wal_connection_retries
501 1395 : .entry(wal_connection.sk_id)
502 1395 : .or_insert(RetryInfo {
503 1395 : next_retry_at: None,
504 1395 : retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
505 1395 : });
506 1395 :
507 1395 : let now = Utc::now().naive_utc();
508 1395 :
509 1395 : // Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
510 1395 : // and we add backoff to the time when we started the connection attempt. If the connection
511 1395 : // was active for a long time, then next_retry_at will be in the past.
512 1395 : retry.next_retry_at =
513 1395 : wal_connection
514 1395 : .started_at
515 1395 : .checked_add_signed(chrono::Duration::milliseconds(
516 1395 : (retry.retry_duration_seconds * 1000.0) as i64,
517 1395 : ));
518 :
519 1395 : if let Some(next) = &retry.next_retry_at {
520 1395 : if next > &now {
521 999 : info!(
522 999 : "Next connection retry to {:?} is at {}",
523 999 : wal_connection.sk_id, next
524 999 : );
525 396 : }
526 0 : }
527 :
528 1395 : let next_retry_duration =
529 1395 : retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
530 1395 : // Clamp the next retry duration to the maximum allowed.
531 1395 : let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
532 1395 : // Clamp the next retry duration to the minimum allowed.
533 1395 : let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
534 1395 :
535 1395 : retry.retry_duration_seconds = next_retry_duration;
536 3081 : }
537 :
538 : /// Returns time needed to wait to have a new candidate for WAL streaming.
539 737728 : fn time_until_next_retry(&self) -> Option<Duration> {
540 737728 : let now = Utc::now().naive_utc();
541 :
542 737728 : let next_retry_at = self
543 737728 : .wal_connection_retries
544 737728 : .values()
545 737728 : .filter_map(|retry| retry.next_retry_at)
546 737728 : .filter(|next_retry_at| next_retry_at > &now)
547 737728 : .min()?;
548 :
549 1783 : (next_retry_at - now).to_std().ok()
550 737728 : }
551 :
552 : /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
553 8230 : fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
554 8230 : WALRECEIVER_BROKER_UPDATES.inc();
555 8230 :
556 8230 : info!(
557 8230 : "Safekeeper info update: standby_horizon(cutoff)={}",
558 8230 : timeline_update.standby_horizon
559 8230 : );
560 8230 : if timeline_update.standby_horizon != 0 {
561 204 : // ignore reports from safekeepers mnot connected to replicas
562 204 : self.timeline
563 204 : .standby_horizon
564 204 : .store(Lsn(timeline_update.standby_horizon));
565 8026 : }
566 :
567 8230 : let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
568 8230 : let old_entry = self.wal_stream_candidates.insert(
569 8230 : new_safekeeper_id,
570 8230 : BrokerSkTimeline {
571 8230 : timeline: timeline_update,
572 8230 : latest_update: Utc::now().naive_utc(),
573 8230 : },
574 8230 : );
575 8230 : if old_entry.is_none() {
576 670 : info!("New SK node was added: {new_safekeeper_id}");
577 670 : WALRECEIVER_CANDIDATES_ADDED.inc();
578 7560 : }
579 8230 : }
580 :
581 : /// Cleans up stale broker records and checks the rest for the new connection candidate.
582 : /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
583 : /// The current rules for approving new candidates:
584 : /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
585 : /// * if there's no such entry, no new candidate found, abort
586 : /// * otherwise check if the candidate is much better than the current one
587 : ///
588 : /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
589 : /// General rules are following:
590 : /// * if connected safekeeper is not present, pick the candidate
591 : /// * if we haven't received any updates for some time, pick the candidate
592 : /// * if the candidate commit_lsn is much higher than the current one, pick the candidate
593 : /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
594 : /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
595 : ///
596 : /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
597 : /// Both thresholds are configured per tenant.
598 736532 : fn next_connection_candidate(&mut self) -> Option<NewWalConnectionCandidate> {
599 736532 : self.cleanup_old_candidates();
600 736532 :
601 736532 : match &self.wal_connection {
602 733924 : Some(existing_wal_connection) => {
603 733924 : let connected_sk_node = existing_wal_connection.sk_id;
604 :
605 232806 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
606 733924 : self.select_connection_candidate(Some(connected_sk_node))?;
607 232806 : let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
608 232806 :
609 232806 : let now = Utc::now().naive_utc();
610 232806 : if let Ok(latest_interaciton) =
611 232806 : (now - existing_wal_connection.status.latest_connection_update).to_std()
612 : {
613 : // Drop connection if we haven't received keepalive message for a while.
614 232806 : if latest_interaciton > self.conf.wal_connect_timeout {
615 2 : return Some(NewWalConnectionCandidate {
616 2 : safekeeper_id: new_sk_id,
617 2 : wal_source_connconf: new_wal_source_connconf,
618 2 : availability_zone: new_availability_zone,
619 2 : reason: ReconnectReason::NoKeepAlives {
620 2 : last_keep_alive: Some(
621 2 : existing_wal_connection.status.latest_connection_update,
622 2 : ),
623 2 : check_time: now,
624 2 : threshold: self.conf.wal_connect_timeout,
625 2 : },
626 2 : });
627 232804 : }
628 0 : }
629 :
630 232804 : if !existing_wal_connection.status.is_connected {
631 : // We haven't connected yet and we shouldn't switch until connection timeout (condition above).
632 211 : return None;
633 232593 : }
634 :
635 232593 : if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
636 232502 : let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
637 232502 : // Check if the new candidate has much more WAL than the current one.
638 232502 : match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
639 9170 : Some(new_sk_lsn_advantage) => {
640 9170 : if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
641 18 : return Some(NewWalConnectionCandidate {
642 18 : safekeeper_id: new_sk_id,
643 18 : wal_source_connconf: new_wal_source_connconf,
644 18 : availability_zone: new_availability_zone,
645 18 : reason: ReconnectReason::LaggingWal {
646 18 : current_commit_lsn,
647 18 : new_commit_lsn,
648 18 : threshold: self.conf.max_lsn_wal_lag,
649 18 : },
650 18 : });
651 9152 : }
652 9152 : // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
653 9152 : // and the current one is not, switch to the new one.
654 9152 : if self.conf.availability_zone.is_some()
655 184 : && existing_wal_connection.availability_zone
656 184 : != self.conf.availability_zone
657 184 : && self.conf.availability_zone == new_availability_zone
658 : {
659 2 : return Some(NewWalConnectionCandidate {
660 2 : safekeeper_id: new_sk_id,
661 2 : availability_zone: new_availability_zone,
662 2 : wal_source_connconf: new_wal_source_connconf,
663 2 : reason: ReconnectReason::SwitchAvailabilityZone,
664 2 : });
665 9150 : }
666 : }
667 223332 : None => debug!(
668 0 : "Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
669 0 : ),
670 : }
671 91 : }
672 :
673 232573 : let current_lsn = match existing_wal_connection.status.streaming_lsn {
674 232315 : Some(lsn) => lsn,
675 258 : None => self.timeline.get_last_record_lsn(),
676 : };
677 232573 : let current_commit_lsn = existing_wal_connection
678 232573 : .status
679 232573 : .commit_lsn
680 232573 : .unwrap_or(current_lsn);
681 232573 : let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
682 232573 :
683 232573 : // Keep discovered_new_wal only if connected safekeeper has not caught up yet.
684 232573 : let mut discovered_new_wal = existing_wal_connection
685 232573 : .discovered_new_wal
686 232573 : .filter(|new_wal| new_wal.lsn > current_commit_lsn);
687 232573 :
688 232573 : if discovered_new_wal.is_none() {
689 : // Check if the new candidate has more WAL than the current one.
690 : // If the new candidate has more WAL than the current one, we consider switching to the new candidate.
691 228409 : discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
692 144 : trace!(
693 0 : "New candidate has commit_lsn {}, higher than current_commit_lsn {}",
694 0 : candidate_commit_lsn,
695 0 : current_commit_lsn
696 0 : );
697 144 : Some(NewCommittedWAL {
698 144 : lsn: candidate_commit_lsn,
699 144 : discovered_at: Utc::now().naive_utc(),
700 144 : })
701 : } else {
702 228265 : None
703 : };
704 4164 : }
705 :
706 232573 : let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
707 : // Connected safekeeper has more WAL, but we haven't received updates for some time.
708 8876 : trace!(
709 0 : "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
710 0 : (now - existing_wal_connection.status.latest_wal_update).to_std(),
711 0 : current_lsn,
712 0 : current_commit_lsn
713 0 : );
714 8876 : Some(existing_wal_connection.status.latest_wal_update)
715 : } else {
716 223697 : discovered_new_wal.as_ref().map(|new_wal| {
717 1025 : // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
718 1025 : new_wal
719 1025 : .discovered_at
720 1025 : .max(existing_wal_connection.status.latest_wal_update)
721 223697 : })
722 : };
723 :
724 : // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
725 232573 : if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
726 9901 : if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
727 9841 : if candidate_commit_lsn > current_commit_lsn
728 4248 : && waiting_for_new_wal > self.conf.lagging_wal_timeout
729 : {
730 2 : return Some(NewWalConnectionCandidate {
731 2 : safekeeper_id: new_sk_id,
732 2 : wal_source_connconf: new_wal_source_connconf,
733 2 : availability_zone: new_availability_zone,
734 2 : reason: ReconnectReason::NoWalTimeout {
735 2 : current_lsn,
736 2 : current_commit_lsn,
737 2 : candidate_commit_lsn,
738 2 : last_wal_interaction: Some(
739 2 : existing_wal_connection.status.latest_wal_update,
740 2 : ),
741 2 : check_time: now,
742 2 : threshold: self.conf.lagging_wal_timeout,
743 2 : },
744 2 : });
745 9839 : }
746 60 : }
747 222672 : }
748 :
749 232571 : self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
750 : }
751 : None => {
752 1692 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
753 2608 : self.select_connection_candidate(None)?;
754 1692 : return Some(NewWalConnectionCandidate {
755 1692 : safekeeper_id: new_sk_id,
756 1692 : availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
757 1692 : wal_source_connconf: new_wal_source_connconf,
758 1692 : reason: ReconnectReason::NoExistingConnection,
759 1692 : });
760 : }
761 : }
762 :
763 232571 : None
764 736532 : }
765 :
766 : /// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
767 : /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
768 : ///
769 : /// The candidate that is chosen:
770 : /// * has no pending retry cooldown
771 : /// * has greatest commit_lsn among the ones that are left
772 736532 : fn select_connection_candidate(
773 736532 : &self,
774 736532 : node_to_omit: Option<NodeId>,
775 736532 : ) -> Option<(NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
776 736532 : self.applicable_connection_candidates()
777 1155118 : .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
778 736532 : .max_by_key(|(_, info, _)| info.commit_lsn)
779 736532 : }
780 :
781 : /// Returns a list of safekeepers that have valid info and ready for connection.
782 : /// Some safekeepers are filtered by the retry cooldown.
783 736532 : fn applicable_connection_candidates(
784 736532 : &self,
785 736532 : ) -> impl Iterator<Item = (NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
786 736532 : let now = Utc::now().naive_utc();
787 736532 :
788 736532 : self.wal_stream_candidates
789 736532 : .iter()
790 1157477 : .filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
791 1157462 : .filter(move |(sk_id, _)| {
792 1157462 : let next_retry_at = self
793 1157462 : .wal_connection_retries
794 1157462 : .get(sk_id)
795 1157462 : .and_then(|retry_info| {
796 207854 : retry_info.next_retry_at
797 1157462 : });
798 1157462 :
799 1157462 : next_retry_at.is_none() || next_retry_at.unwrap() <= now
800 1157462 : }).filter_map(|(sk_id, broker_info)| {
801 1155122 : let info = &broker_info.timeline;
802 1155122 : if info.safekeeper_connstr.is_empty() {
803 4 : return None; // no connection string, ignore sk
804 1155118 : }
805 1155118 : match wal_stream_connection_config(
806 1155118 : self.id,
807 1155118 : info.safekeeper_connstr.as_ref(),
808 1155118 : match &self.conf.auth_token {
809 1154012 : None => None,
810 1106 : Some(x) => Some(x),
811 : },
812 1155118 : self.conf.availability_zone.as_deref(),
813 : ) {
814 1155118 : Ok(connstr) => Some((*sk_id, info, connstr)),
815 0 : Err(e) => {
816 0 : error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
817 0 : None
818 : }
819 : }
820 1155122 : })
821 736532 : }
822 :
823 : /// Remove candidates which haven't sent broker updates for a while.
824 736532 : fn cleanup_old_candidates(&mut self) {
825 736532 : let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
826 736532 : let lagging_wal_timeout = self.conf.lagging_wal_timeout;
827 736532 :
828 1157488 : self.wal_stream_candidates.retain(|node_id, broker_info| {
829 1157488 : if let Ok(time_since_latest_broker_update) =
830 1157488 : (Utc::now().naive_utc() - broker_info.latest_update).to_std()
831 : {
832 1157488 : let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
833 1157488 : if !should_retain {
834 11 : node_ids_to_remove.push(*node_id);
835 1157477 : }
836 1157488 : should_retain
837 : } else {
838 0 : true
839 : }
840 1157488 : });
841 736532 :
842 736532 : if !node_ids_to_remove.is_empty() {
843 22 : for node_id in node_ids_to_remove {
844 11 : info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
845 11 : self.wal_connection_retries.remove(&node_id);
846 11 : WALRECEIVER_CANDIDATES_REMOVED.inc();
847 : }
848 736521 : }
849 736532 : }
850 :
851 545 : pub(super) async fn shutdown(mut self) {
852 545 : if let Some(wal_connection) = self.wal_connection.take() {
853 290 : wal_connection.connection_task.shutdown().await;
854 282 : }
855 545 : }
856 :
857 736514 : fn manager_status(&self) -> ConnectionManagerStatus {
858 736514 : ConnectionManagerStatus {
859 736514 : existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
860 736514 : wal_stream_candidates: self.wal_stream_candidates.clone(),
861 736514 : }
862 736514 : }
863 : }
864 :
865 1702 : #[derive(Debug)]
866 : struct NewWalConnectionCandidate {
867 : safekeeper_id: NodeId,
868 : wal_source_connconf: PgConnectionConfig,
869 : availability_zone: Option<String>,
870 : reason: ReconnectReason,
871 : }
872 :
873 : /// Stores the reason why WAL connection was switched, for furter debugging purposes.
874 1702 : #[derive(Debug, PartialEq, Eq)]
875 : enum ReconnectReason {
876 : NoExistingConnection,
877 : LaggingWal {
878 : current_commit_lsn: Lsn,
879 : new_commit_lsn: Lsn,
880 : threshold: NonZeroU64,
881 : },
882 : SwitchAvailabilityZone,
883 : NoWalTimeout {
884 : current_lsn: Lsn,
885 : current_commit_lsn: Lsn,
886 : candidate_commit_lsn: Lsn,
887 : last_wal_interaction: Option<NaiveDateTime>,
888 : check_time: NaiveDateTime,
889 : threshold: Duration,
890 : },
891 : NoKeepAlives {
892 : last_keep_alive: Option<NaiveDateTime>,
893 : check_time: NaiveDateTime,
894 : threshold: Duration,
895 : },
896 : }
897 :
898 : impl ReconnectReason {
899 1702 : fn name(&self) -> &str {
900 1702 : match self {
901 1686 : ReconnectReason::NoExistingConnection => "NoExistingConnection",
902 16 : ReconnectReason::LaggingWal { .. } => "LaggingWal",
903 0 : ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
904 0 : ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
905 0 : ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
906 : }
907 1702 : }
908 : }
909 :
910 : #[cfg(test)]
911 : mod tests {
912 : use super::*;
913 : use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
914 : use url::Host;
915 :
916 38 : fn dummy_broker_sk_timeline(
917 38 : commit_lsn: u64,
918 38 : safekeeper_connstr: &str,
919 38 : latest_update: NaiveDateTime,
920 38 : ) -> BrokerSkTimeline {
921 38 : BrokerSkTimeline {
922 38 : timeline: SafekeeperTimelineInfo {
923 38 : safekeeper_id: 0,
924 38 : tenant_timeline_id: None,
925 38 : term: 0,
926 38 : last_log_term: 0,
927 38 : flush_lsn: 0,
928 38 : commit_lsn,
929 38 : backup_lsn: 0,
930 38 : remote_consistent_lsn: 0,
931 38 : peer_horizon_lsn: 0,
932 38 : local_start_lsn: 0,
933 38 : standby_horizon: 0,
934 38 : safekeeper_connstr: safekeeper_connstr.to_owned(),
935 38 : http_connstr: safekeeper_connstr.to_owned(),
936 38 : availability_zone: None,
937 38 : },
938 38 : latest_update,
939 38 : }
940 38 : }
941 :
942 2 : #[tokio::test]
943 2 : async fn no_connection_no_candidate() -> anyhow::Result<()> {
944 2 : let harness = TenantHarness::create("no_connection_no_candidate")?;
945 8 : let mut state = dummy_state(&harness).await;
946 2 : let now = Utc::now().naive_utc();
947 :
948 2 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
949 2 : let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
950 2 :
951 2 : state.wal_connection = None;
952 2 : state.wal_stream_candidates = HashMap::from([
953 2 : (NodeId(0), dummy_broker_sk_timeline(1, "", now)),
954 2 : (NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
955 2 : (NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
956 2 : (
957 2 : NodeId(3),
958 2 : dummy_broker_sk_timeline(
959 2 : 1 + state.conf.max_lsn_wal_lag.get(),
960 2 : "delay_over_threshold",
961 2 : delay_over_threshold,
962 2 : ),
963 2 : ),
964 2 : ]);
965 2 :
966 2 : let no_candidate = state.next_connection_candidate();
967 2 : assert!(
968 2 : no_candidate.is_none(),
969 0 : "Expected no candidate selected out of non full data options, but got {no_candidate:?}"
970 : );
971 :
972 2 : Ok(())
973 : }
974 :
975 2 : #[tokio::test]
976 2 : async fn connection_no_candidate() -> anyhow::Result<()> {
977 2 : let harness = TenantHarness::create("connection_no_candidate")?;
978 10 : let mut state = dummy_state(&harness).await;
979 2 : let now = Utc::now().naive_utc();
980 2 :
981 2 : let connected_sk_id = NodeId(0);
982 2 : let current_lsn = 100_000;
983 2 :
984 2 : let connection_status = WalConnectionStatus {
985 2 : is_connected: true,
986 2 : has_processed_wal: true,
987 2 : latest_connection_update: now,
988 2 : latest_wal_update: now,
989 2 : commit_lsn: Some(Lsn(current_lsn)),
990 2 : streaming_lsn: Some(Lsn(current_lsn)),
991 2 : node: NodeId(1),
992 2 : };
993 2 :
994 2 : state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
995 2 : state.wal_connection = Some(WalConnection {
996 2 : started_at: now,
997 2 : sk_id: connected_sk_id,
998 2 : availability_zone: None,
999 2 : status: connection_status,
1000 2 : connection_task: TaskHandle::spawn(move |sender, _| async move {
1001 2 : sender
1002 2 : .send(TaskStateUpdate::Progress(connection_status))
1003 2 : .ok();
1004 2 : Ok(())
1005 2 : }),
1006 2 : discovered_new_wal: None,
1007 2 : });
1008 2 : state.wal_stream_candidates = HashMap::from([
1009 2 : (
1010 2 : connected_sk_id,
1011 2 : dummy_broker_sk_timeline(
1012 2 : current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
1013 2 : DUMMY_SAFEKEEPER_HOST,
1014 2 : now,
1015 2 : ),
1016 2 : ),
1017 2 : (
1018 2 : NodeId(1),
1019 2 : dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
1020 2 : ),
1021 2 : (
1022 2 : NodeId(2),
1023 2 : dummy_broker_sk_timeline(
1024 2 : current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
1025 2 : "not_enough_advanced_lsn",
1026 2 : now,
1027 2 : ),
1028 2 : ),
1029 2 : ]);
1030 2 :
1031 2 : let no_candidate = state.next_connection_candidate();
1032 2 : assert!(
1033 2 : no_candidate.is_none(),
1034 0 : "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}"
1035 : );
1036 :
1037 2 : Ok(())
1038 : }
1039 :
1040 2 : #[tokio::test]
1041 2 : async fn no_connection_candidate() -> anyhow::Result<()> {
1042 2 : let harness = TenantHarness::create("no_connection_candidate")?;
1043 9 : let mut state = dummy_state(&harness).await;
1044 2 : let now = Utc::now().naive_utc();
1045 2 :
1046 2 : state.wal_connection = None;
1047 2 : state.wal_stream_candidates = HashMap::from([(
1048 2 : NodeId(0),
1049 2 : dummy_broker_sk_timeline(
1050 2 : 1 + state.conf.max_lsn_wal_lag.get(),
1051 2 : DUMMY_SAFEKEEPER_HOST,
1052 2 : now,
1053 2 : ),
1054 2 : )]);
1055 2 :
1056 2 : let only_candidate = state
1057 2 : .next_connection_candidate()
1058 2 : .expect("Expected one candidate selected out of the only data option, but got none");
1059 2 : assert_eq!(only_candidate.safekeeper_id, NodeId(0));
1060 2 : assert_eq!(
1061 : only_candidate.reason,
1062 : ReconnectReason::NoExistingConnection,
1063 0 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1064 : );
1065 2 : assert_eq!(
1066 2 : only_candidate.wal_source_connconf.host(),
1067 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1068 2 : );
1069 :
1070 2 : let selected_lsn = 100_000;
1071 2 : state.wal_stream_candidates = HashMap::from([
1072 2 : (
1073 2 : NodeId(0),
1074 2 : dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
1075 2 : ),
1076 2 : (
1077 2 : NodeId(1),
1078 2 : dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
1079 2 : ),
1080 2 : (
1081 2 : NodeId(2),
1082 2 : dummy_broker_sk_timeline(selected_lsn + 100, "", now),
1083 2 : ),
1084 2 : ]);
1085 2 : let biggest_wal_candidate = state.next_connection_candidate().expect(
1086 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1087 2 : );
1088 2 :
1089 2 : assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1));
1090 2 : assert_eq!(
1091 : biggest_wal_candidate.reason,
1092 : ReconnectReason::NoExistingConnection,
1093 0 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1094 : );
1095 2 : assert_eq!(
1096 2 : biggest_wal_candidate.wal_source_connconf.host(),
1097 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1098 2 : );
1099 :
1100 2 : Ok(())
1101 : }
1102 :
1103 2 : #[tokio::test]
1104 2 : async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
1105 2 : let harness = TenantHarness::create("candidate_with_many_connection_failures")?;
1106 9 : let mut state = dummy_state(&harness).await;
1107 2 : let now = Utc::now().naive_utc();
1108 2 :
1109 2 : let current_lsn = Lsn(100_000).align();
1110 2 : let bigger_lsn = Lsn(current_lsn.0 + 100).align();
1111 2 :
1112 2 : state.wal_connection = None;
1113 2 : state.wal_stream_candidates = HashMap::from([
1114 2 : (
1115 2 : NodeId(0),
1116 2 : dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1117 2 : ),
1118 2 : (
1119 2 : NodeId(1),
1120 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1121 2 : ),
1122 2 : ]);
1123 2 : state.wal_connection_retries = HashMap::from([(
1124 2 : NodeId(0),
1125 2 : RetryInfo {
1126 2 : next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
1127 2 : retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
1128 2 : },
1129 2 : )]);
1130 2 :
1131 2 : let candidate_with_less_errors = state
1132 2 : .next_connection_candidate()
1133 2 : .expect("Expected one candidate selected, but got none");
1134 2 : assert_eq!(
1135 : candidate_with_less_errors.safekeeper_id,
1136 : NodeId(1),
1137 0 : "Should select the node with no pending retry cooldown"
1138 : );
1139 :
1140 2 : Ok(())
1141 : }
1142 :
1143 2 : #[tokio::test]
1144 2 : async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1145 2 : let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?;
1146 9 : let mut state = dummy_state(&harness).await;
1147 2 : let current_lsn = Lsn(100_000).align();
1148 2 : let now = Utc::now().naive_utc();
1149 2 :
1150 2 : let connected_sk_id = NodeId(0);
1151 2 : let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
1152 2 :
1153 2 : let connection_status = WalConnectionStatus {
1154 2 : is_connected: true,
1155 2 : has_processed_wal: true,
1156 2 : latest_connection_update: now,
1157 2 : latest_wal_update: now,
1158 2 : commit_lsn: Some(current_lsn),
1159 2 : streaming_lsn: Some(current_lsn),
1160 2 : node: connected_sk_id,
1161 2 : };
1162 2 :
1163 2 : state.wal_connection = Some(WalConnection {
1164 2 : started_at: now,
1165 2 : sk_id: connected_sk_id,
1166 2 : availability_zone: None,
1167 2 : status: connection_status,
1168 2 : connection_task: TaskHandle::spawn(move |sender, _| async move {
1169 2 : sender
1170 2 : .send(TaskStateUpdate::Progress(connection_status))
1171 2 : .ok();
1172 2 : Ok(())
1173 2 : }),
1174 2 : discovered_new_wal: None,
1175 2 : });
1176 2 : state.wal_stream_candidates = HashMap::from([
1177 2 : (
1178 2 : connected_sk_id,
1179 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1180 2 : ),
1181 2 : (
1182 2 : NodeId(1),
1183 2 : dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
1184 2 : ),
1185 2 : ]);
1186 2 :
1187 2 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1188 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1189 2 : );
1190 2 :
1191 2 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1));
1192 2 : assert_eq!(
1193 2 : over_threshcurrent_candidate.reason,
1194 2 : ReconnectReason::LaggingWal {
1195 2 : current_commit_lsn: current_lsn,
1196 2 : new_commit_lsn: new_lsn,
1197 2 : threshold: state.conf.max_lsn_wal_lag
1198 2 : },
1199 0 : "Should select bigger WAL safekeeper if it starts to lag enough"
1200 : );
1201 2 : assert_eq!(
1202 2 : over_threshcurrent_candidate.wal_source_connconf.host(),
1203 2 : &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
1204 2 : );
1205 :
1206 2 : Ok(())
1207 : }
1208 :
1209 2 : #[tokio::test]
1210 2 : async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
1211 2 : let harness = TenantHarness::create("timeout_connection_threshold_current_candidate")?;
1212 9 : let mut state = dummy_state(&harness).await;
1213 2 : let current_lsn = Lsn(100_000).align();
1214 2 : let now = Utc::now().naive_utc();
1215 :
1216 2 : let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
1217 2 : let time_over_threshold =
1218 2 : Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
1219 2 :
1220 2 : let connection_status = WalConnectionStatus {
1221 2 : is_connected: true,
1222 2 : has_processed_wal: true,
1223 2 : latest_connection_update: time_over_threshold,
1224 2 : latest_wal_update: time_over_threshold,
1225 2 : commit_lsn: Some(current_lsn),
1226 2 : streaming_lsn: Some(current_lsn),
1227 2 : node: NodeId(1),
1228 2 : };
1229 2 :
1230 2 : state.wal_connection = Some(WalConnection {
1231 2 : started_at: now,
1232 2 : sk_id: NodeId(1),
1233 2 : availability_zone: None,
1234 2 : status: connection_status,
1235 2 : connection_task: TaskHandle::spawn(move |sender, _| async move {
1236 2 : sender
1237 2 : .send(TaskStateUpdate::Progress(connection_status))
1238 2 : .ok();
1239 2 : Ok(())
1240 2 : }),
1241 2 : discovered_new_wal: None,
1242 2 : });
1243 2 : state.wal_stream_candidates = HashMap::from([(
1244 2 : NodeId(0),
1245 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1246 2 : )]);
1247 2 :
1248 2 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1249 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1250 2 : );
1251 2 :
1252 2 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1253 2 : match over_threshcurrent_candidate.reason {
1254 : ReconnectReason::NoKeepAlives {
1255 2 : last_keep_alive,
1256 2 : threshold,
1257 2 : ..
1258 2 : } => {
1259 2 : assert_eq!(last_keep_alive, Some(time_over_threshold));
1260 2 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1261 : }
1262 0 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1263 : }
1264 2 : assert_eq!(
1265 2 : over_threshcurrent_candidate.wal_source_connconf.host(),
1266 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1267 2 : );
1268 :
1269 2 : Ok(())
1270 : }
1271 :
1272 2 : #[tokio::test]
1273 2 : async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1274 2 : let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate")?;
1275 9 : let mut state = dummy_state(&harness).await;
1276 2 : let current_lsn = Lsn(100_000).align();
1277 2 : let new_lsn = Lsn(100_100).align();
1278 2 : let now = Utc::now().naive_utc();
1279 :
1280 2 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
1281 2 : let time_over_threshold =
1282 2 : Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
1283 2 :
1284 2 : let connection_status = WalConnectionStatus {
1285 2 : is_connected: true,
1286 2 : has_processed_wal: true,
1287 2 : latest_connection_update: now,
1288 2 : latest_wal_update: time_over_threshold,
1289 2 : commit_lsn: Some(current_lsn),
1290 2 : streaming_lsn: Some(current_lsn),
1291 2 : node: NodeId(1),
1292 2 : };
1293 2 :
1294 2 : state.wal_connection = Some(WalConnection {
1295 2 : started_at: now,
1296 2 : sk_id: NodeId(1),
1297 2 : availability_zone: None,
1298 2 : status: connection_status,
1299 2 : connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
1300 2 : discovered_new_wal: Some(NewCommittedWAL {
1301 2 : discovered_at: time_over_threshold,
1302 2 : lsn: new_lsn,
1303 2 : }),
1304 2 : });
1305 2 : state.wal_stream_candidates = HashMap::from([(
1306 2 : NodeId(0),
1307 2 : dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1308 2 : )]);
1309 2 :
1310 2 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1311 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1312 2 : );
1313 2 :
1314 2 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1315 2 : match over_threshcurrent_candidate.reason {
1316 : ReconnectReason::NoWalTimeout {
1317 2 : current_lsn,
1318 2 : current_commit_lsn,
1319 2 : candidate_commit_lsn,
1320 2 : last_wal_interaction,
1321 2 : threshold,
1322 2 : ..
1323 2 : } => {
1324 2 : assert_eq!(current_lsn, current_lsn);
1325 2 : assert_eq!(current_commit_lsn, current_lsn);
1326 2 : assert_eq!(candidate_commit_lsn, new_lsn);
1327 2 : assert_eq!(last_wal_interaction, Some(time_over_threshold));
1328 2 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1329 : }
1330 0 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1331 : }
1332 2 : assert_eq!(
1333 2 : over_threshcurrent_candidate.wal_source_connconf.host(),
1334 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1335 2 : );
1336 :
1337 2 : Ok(())
1338 : }
1339 :
1340 : const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
1341 :
1342 16 : async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
1343 16 : let (tenant, ctx) = harness.load().await;
1344 16 : let timeline = tenant
1345 16 : .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
1346 55 : .await
1347 16 : .expect("Failed to create an empty timeline for dummy wal connection manager");
1348 16 :
1349 16 : ConnectionManagerState {
1350 16 : id: TenantTimelineId {
1351 16 : tenant_id: harness.tenant_shard_id.tenant_id,
1352 16 : timeline_id: TIMELINE_ID,
1353 16 : },
1354 16 : timeline,
1355 16 : conf: WalReceiverConf {
1356 16 : wal_connect_timeout: Duration::from_secs(1),
1357 16 : lagging_wal_timeout: Duration::from_secs(1),
1358 16 : max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
1359 16 : auth_token: None,
1360 16 : availability_zone: None,
1361 16 : ingest_batch_size: 1,
1362 16 : },
1363 16 : wal_connection: None,
1364 16 : wal_stream_candidates: HashMap::new(),
1365 16 : wal_connection_retries: HashMap::new(),
1366 16 : }
1367 16 : }
1368 :
1369 2 : #[tokio::test]
1370 2 : async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
1371 2 : // Pageserver and one of safekeepers will be in the same availability zone
1372 2 : // and pageserver should prefer to connect to it.
1373 2 : let test_az = Some("test_az".to_owned());
1374 :
1375 2 : let harness = TenantHarness::create("switch_to_same_availability_zone")?;
1376 8 : let mut state = dummy_state(&harness).await;
1377 2 : state.conf.availability_zone = test_az.clone();
1378 2 : let current_lsn = Lsn(100_000).align();
1379 2 : let now = Utc::now().naive_utc();
1380 2 :
1381 2 : let connected_sk_id = NodeId(0);
1382 2 :
1383 2 : let connection_status = WalConnectionStatus {
1384 2 : is_connected: true,
1385 2 : has_processed_wal: true,
1386 2 : latest_connection_update: now,
1387 2 : latest_wal_update: now,
1388 2 : commit_lsn: Some(current_lsn),
1389 2 : streaming_lsn: Some(current_lsn),
1390 2 : node: connected_sk_id,
1391 2 : };
1392 2 :
1393 2 : state.wal_connection = Some(WalConnection {
1394 2 : started_at: now,
1395 2 : sk_id: connected_sk_id,
1396 2 : availability_zone: None,
1397 2 : status: connection_status,
1398 2 : connection_task: TaskHandle::spawn(move |sender, _| async move {
1399 2 : sender
1400 2 : .send(TaskStateUpdate::Progress(connection_status))
1401 2 : .ok();
1402 2 : Ok(())
1403 2 : }),
1404 2 : discovered_new_wal: None,
1405 2 : });
1406 2 :
1407 2 : // We have another safekeeper with the same commit_lsn, and it have the same availability zone as
1408 2 : // the current pageserver.
1409 2 : let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
1410 2 : same_az_sk.timeline.availability_zone = test_az.clone();
1411 2 :
1412 2 : state.wal_stream_candidates = HashMap::from([
1413 2 : (
1414 2 : connected_sk_id,
1415 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1416 2 : ),
1417 2 : (NodeId(1), same_az_sk),
1418 2 : ]);
1419 2 :
1420 2 : // We expect that pageserver will switch to the safekeeper in the same availability zone,
1421 2 : // even if it has the same commit_lsn.
1422 2 : let next_candidate = state.next_connection_candidate().expect(
1423 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1424 2 : );
1425 2 :
1426 2 : assert_eq!(next_candidate.safekeeper_id, NodeId(1));
1427 2 : assert_eq!(
1428 : next_candidate.reason,
1429 : ReconnectReason::SwitchAvailabilityZone,
1430 0 : "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
1431 : );
1432 2 : assert_eq!(
1433 2 : next_candidate.wal_source_connconf.host(),
1434 2 : &Host::Domain("same_az".to_owned())
1435 2 : );
1436 :
1437 2 : Ok(())
1438 : }
1439 : }
|