TLA Line data Source code
1 : //! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
2 : //! that contains the latest WAL to stream and this connection does not go stale.
3 : //!
4 : //! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
5 : //! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
6 : //! Current connection state is tracked too, to ensure it's not getting stale.
7 : //!
8 : //! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
9 : //! then a (re)connection happens, if necessary.
10 : //! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
11 :
12 : use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
13 :
14 : use super::{TaskStateUpdate, WalReceiverConf};
15 : use crate::context::{DownloadBehavior, RequestContext};
16 : use crate::metrics::{
17 : WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
18 : WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
19 : };
20 : use crate::task_mgr::{shutdown_token, TaskKind};
21 : use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
22 : use anyhow::Context;
23 : use chrono::{NaiveDateTime, Utc};
24 : use pageserver_api::models::TimelineState;
25 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
26 : use storage_broker::proto::SafekeeperTimelineInfo;
27 : use storage_broker::proto::SubscribeSafekeeperInfoRequest;
28 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
29 : use storage_broker::BrokerClientChannel;
30 : use storage_broker::Streaming;
31 : use tokio::select;
32 : use tracing::*;
33 :
34 : use postgres_connection::PgConnectionConfig;
35 : use utils::backoff::{
36 : exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
37 : };
38 : use utils::postgres_client::wal_stream_connection_config;
39 : use utils::{
40 : id::{NodeId, TenantTimelineId},
41 : lsn::Lsn,
42 : };
43 :
44 : use super::{
45 : walreceiver_connection::WalConnectionStatus, walreceiver_connection::WalReceiverError,
46 : TaskEvent, TaskHandle,
47 : };
48 :
49 : /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
50 : /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
51 : /// If storage broker subscription is cancelled, exits.
52 CBC 1103 : pub(super) async fn connection_manager_loop_step(
53 1103 : broker_client: &mut BrokerClientChannel,
54 1103 : connection_manager_state: &mut ConnectionManagerState,
55 1103 : ctx: &RequestContext,
56 1103 : manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
57 1103 : ) -> ControlFlow<(), ()> {
58 1103 : match connection_manager_state
59 1103 : .timeline
60 1103 : .wait_to_become_active(ctx)
61 6 : .await
62 : {
63 1102 : Ok(()) => {}
64 GBC 1 : Err(new_state) => {
65 UBC 0 : debug!(
66 0 : ?new_state,
67 0 : "state changed, stopping wal connection manager loop"
68 0 : );
69 GBC 1 : return ControlFlow::Break(());
70 : }
71 : }
72 :
73 CBC 1102 : WALRECEIVER_ACTIVE_MANAGERS.inc();
74 1102 : scopeguard::defer! {
75 414 : WALRECEIVER_ACTIVE_MANAGERS.dec();
76 414 : }
77 :
78 1102 : let id = TenantTimelineId {
79 1102 : tenant_id: connection_manager_state.timeline.tenant_id,
80 1102 : timeline_id: connection_manager_state.timeline.timeline_id,
81 1102 : };
82 1102 :
83 1102 : let mut timeline_state_updates = connection_manager_state
84 1102 : .timeline
85 1102 : .subscribe_for_state_updates();
86 :
87 : // Subscribe to the broker updates. Stream shares underlying TCP connection
88 : // with other streams on this client (other connection managers). When
89 : // object goes out of scope, stream finishes in drop() automatically.
90 2196 : let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id).await;
91 UBC 0 : debug!("Subscribed for broker timeline updates");
92 :
93 : loop {
94 CBC 759873 : let time_until_next_retry = connection_manager_state.time_until_next_retry();
95 :
96 : // These things are happening concurrently:
97 : //
98 : // - keep receiving WAL on the current connection
99 : // - if the shared state says we need to change connection, disconnect and return
100 : // - this runs in a separate task and we receive updates via a watch channel
101 : // - change connection if the rules decide so, or if the current connection dies
102 : // - receive updates from broker
103 : // - this might change the current desired connection
104 : // - timeline state changes to something that does not allow walreceiver to run concurrently
105 759873 : select! {
106 759811 : Some(wal_connection_update) = async {
107 759811 : match connection_manager_state.wal_connection.as_mut() {
108 758252 : Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
109 1559 : None => None,
110 : }
111 754294 : } => {
112 : let wal_connection = connection_manager_state.wal_connection.as_mut()
113 : .expect("Should have a connection, as checked by the corresponding select! guard");
114 : match wal_connection_update {
115 : TaskEvent::Update(TaskStateUpdate::Started) => {},
116 : TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
117 : if new_status.has_processed_wal {
118 : // We have advanced last_record_lsn by processing the WAL received
119 : // from this safekeeper. This is good enough to clean unsuccessful
120 : // retries history and allow reconnecting to this safekeeper without
121 : // sleeping for a long time.
122 : connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
123 : }
124 : wal_connection.status = new_status;
125 : }
126 : TaskEvent::End(walreceiver_task_result) => {
127 : match walreceiver_task_result {
128 UBC 0 : Ok(()) => debug!("WAL receiving task finished"),
129 CBC 9 : Err(e) => error!("wal receiver task finished with an error: {e:?}"),
130 : }
131 : connection_manager_state.drop_old_connection(false).await;
132 : },
133 : }
134 : },
135 :
136 : // Got a new update from the broker
137 5804 : broker_update = broker_subscription.message() => {
138 : match broker_update {
139 : Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
140 : Err(e) => {
141 UBC 0 : error!("broker subscription failed: {e}");
142 : return ControlFlow::Continue(());
143 : }
144 : Ok(None) => {
145 0 : error!("broker subscription stream ended"); // can't happen
146 : return ControlFlow::Continue(());
147 : }
148 : }
149 : },
150 :
151 CBC 748189 : new_event = async {
152 748189 : loop {
153 748189 : if connection_manager_state.timeline.current_state() == TimelineState::Loading {
154 UBC 0 : warn!("wal connection manager should only be launched after timeline has become active");
155 CBC 748189 : }
156 748189 : match timeline_state_updates.changed().await {
157 : Ok(()) => {
158 342 : let new_state = connection_manager_state.timeline.current_state();
159 342 : match new_state {
160 : // we're already active as walreceiver, no need to reactivate
161 UBC 0 : TimelineState::Active => continue,
162 : TimelineState::Broken { .. } | TimelineState::Stopping => {
163 CBC 342 : debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
164 342 : return ControlFlow::Break(());
165 : }
166 : TimelineState::Loading => {
167 UBC 0 : warn!("timeline transitioned back to Loading state, that should not happen");
168 0 : return ControlFlow::Continue(());
169 : }
170 : }
171 : }
172 0 : Err(_sender_dropped_error) => return ControlFlow::Break(()),
173 : }
174 : }
175 CBC 342 : } => match new_event {
176 : ControlFlow::Continue(()) => {
177 : return ControlFlow::Continue(());
178 : }
179 : ControlFlow::Break(()) => {
180 UBC 0 : debug!("Timeline is no longer active, stopping wal connection manager loop");
181 : return ControlFlow::Break(());
182 : }
183 : },
184 :
185 CBC 753966 : Some(()) = async {
186 753966 : match time_until_next_retry {
187 1132 : Some(sleep_time) => {
188 1132 : tokio::time::sleep(sleep_time).await;
189 244 : Some(())
190 : },
191 : None => {
192 752834 : debug!("No candidates to retry, waiting indefinitely for the broker events");
193 752834 : None
194 : }
195 : }
196 753078 : } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
197 : }
198 :
199 758783 : if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
200 1189 : info!("Switching to new connection candidate: {new_candidate:?}");
201 1189 : connection_manager_state
202 1189 : .change_connection(new_candidate, ctx)
203 20 : .await
204 757592 : }
205 758780 : *manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
206 : }
207 343 : }
208 :
209 : /// Endlessly try to subscribe for broker updates for a given timeline.
210 1102 : async fn subscribe_for_timeline_updates(
211 1102 : broker_client: &mut BrokerClientChannel,
212 1102 : id: TenantTimelineId,
213 1102 : ) -> Streaming<SafekeeperTimelineInfo> {
214 1102 : let mut attempt = 0;
215 1102 : let cancel = shutdown_token();
216 :
217 : loop {
218 1102 : exponential_backoff(
219 1102 : attempt,
220 1102 : DEFAULT_BASE_BACKOFF_SECONDS,
221 1102 : DEFAULT_MAX_BACKOFF_SECONDS,
222 1102 : &cancel,
223 1102 : )
224 UBC 0 : .await;
225 CBC 1102 : attempt += 1;
226 1102 :
227 1102 : // subscribe to the specific timeline
228 1102 : let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
229 1102 : tenant_id: id.tenant_id.as_ref().to_owned(),
230 1102 : timeline_id: id.timeline_id.as_ref().to_owned(),
231 1102 : });
232 1102 : let request = SubscribeSafekeeperInfoRequest {
233 1102 : subscription_key: Some(key),
234 1102 : };
235 1102 :
236 2196 : match broker_client.subscribe_safekeeper_info(request).await {
237 1093 : Ok(resp) => {
238 1093 : return resp.into_inner();
239 : }
240 UBC 0 : Err(e) => {
241 : // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
242 : // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
243 0 : info!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}");
244 0 : continue;
245 : }
246 : }
247 : }
248 CBC 1093 : }
249 :
250 : const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
251 : const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
252 : const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
253 :
254 : /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
255 : pub(super) struct ConnectionManagerState {
256 : id: TenantTimelineId,
257 : /// Use pageserver data about the timeline to filter out some of the safekeepers.
258 : timeline: Arc<Timeline>,
259 : conf: WalReceiverConf,
260 : /// Current connection to safekeeper for WAL streaming.
261 : wal_connection: Option<WalConnection>,
262 : /// Info about retries and unsuccessful attempts to connect to safekeepers.
263 : wal_connection_retries: HashMap<NodeId, RetryInfo>,
264 : /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
265 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
266 : }
267 :
268 : /// An information about connection manager's current connection and connection candidates.
269 1316 : #[derive(Debug, Clone)]
270 : pub struct ConnectionManagerStatus {
271 : existing_connection: Option<WalConnectionStatus>,
272 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
273 : }
274 :
275 : impl ConnectionManagerStatus {
276 : /// Generates a string, describing current connection status in a form, suitable for logging.
277 1316 : pub fn to_human_readable_string(&self) -> String {
278 1316 : let mut resulting_string = String::new();
279 1316 : match &self.existing_connection {
280 1294 : Some(connection) => {
281 1294 : if connection.has_processed_wal {
282 1226 : resulting_string.push_str(&format!(
283 1226 : " (update {}): streaming WAL from node {}, ",
284 1226 : connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
285 1226 : connection.node,
286 1226 : ));
287 1226 :
288 1226 : match (connection.streaming_lsn, connection.commit_lsn) {
289 UBC 0 : (None, None) => resulting_string.push_str("no streaming data"),
290 0 : (None, Some(commit_lsn)) => {
291 0 : resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
292 : }
293 0 : (Some(streaming_lsn), None) => {
294 0 : resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
295 : }
296 CBC 1226 : (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
297 1226 : &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
298 1226 : ),
299 : }
300 68 : } else if connection.is_connected {
301 67 : resulting_string.push_str(&format!(
302 67 : " (update {}): connecting to node {}",
303 67 : connection
304 67 : .latest_connection_update
305 67 : .format("%Y-%m-%d %H:%M:%S"),
306 67 : connection.node,
307 67 : ));
308 67 : } else {
309 GBC 1 : resulting_string.push_str(&format!(
310 1 : " (update {}): initializing node {} connection",
311 1 : connection
312 1 : .latest_connection_update
313 1 : .format("%Y-%m-%d %H:%M:%S"),
314 1 : connection.node,
315 1 : ));
316 1 : }
317 : }
318 CBC 22 : None => resulting_string.push_str(": disconnected"),
319 : }
320 :
321 1316 : resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
322 1316 : let mut candidates = self.wal_stream_candidates.iter().peekable();
323 2751 : while let Some((node_id, candidate_info)) = candidates.next() {
324 1435 : resulting_string.push_str(&format!(
325 1435 : "({}|{}|{})",
326 1435 : node_id,
327 1435 : candidate_info.latest_update.format("%H:%M:%S"),
328 1435 : Lsn(candidate_info.timeline.commit_lsn)
329 1435 : ));
330 1435 : if candidates.peek().is_some() {
331 119 : resulting_string.push_str(", ");
332 1316 : }
333 : }
334 1316 : resulting_string.push(']');
335 1316 :
336 1316 : resulting_string
337 1316 : }
338 : }
339 :
340 : /// Current connection data.
341 UBC 0 : #[derive(Debug)]
342 : struct WalConnection {
343 : /// Time when the connection was initiated.
344 : started_at: NaiveDateTime,
345 : /// Current safekeeper pageserver is connected to for WAL streaming.
346 : sk_id: NodeId,
347 : /// Availability zone of the safekeeper.
348 : availability_zone: Option<String>,
349 : /// Status of the connection.
350 : status: WalConnectionStatus,
351 : /// WAL streaming task handle.
352 : connection_task: TaskHandle<WalConnectionStatus>,
353 : /// Have we discovered that other safekeeper has more recent WAL than we do?
354 : discovered_new_wal: Option<NewCommittedWAL>,
355 : }
356 :
357 : /// Notion of a new committed WAL, which exists on other safekeeper.
358 0 : #[derive(Debug, Clone, Copy)]
359 : struct NewCommittedWAL {
360 : /// LSN of the new committed WAL.
361 : lsn: Lsn,
362 : /// When we discovered that the new committed WAL exists on other safekeeper.
363 : discovered_at: NaiveDateTime,
364 : }
365 :
366 0 : #[derive(Debug, Clone, Copy)]
367 : struct RetryInfo {
368 : next_retry_at: Option<NaiveDateTime>,
369 : retry_duration_seconds: f64,
370 : }
371 :
372 : /// Data about the timeline to connect to, received from the broker.
373 CBC 1536862 : #[derive(Debug, Clone)]
374 : struct BrokerSkTimeline {
375 : timeline: SafekeeperTimelineInfo,
376 : /// Time at which the data was fetched from the broker last time, to track the stale data.
377 : latest_update: NaiveDateTime,
378 : }
379 :
380 : impl ConnectionManagerState {
381 1103 : pub(super) fn new(timeline: Arc<Timeline>, conf: WalReceiverConf) -> Self {
382 1103 : let id = TenantTimelineId {
383 1103 : tenant_id: timeline.tenant_id,
384 1103 : timeline_id: timeline.timeline_id,
385 1103 : };
386 1103 : Self {
387 1103 : id,
388 1103 : timeline,
389 1103 : conf,
390 1103 : wal_connection: None,
391 1103 : wal_stream_candidates: HashMap::new(),
392 1103 : wal_connection_retries: HashMap::new(),
393 1103 : }
394 1103 : }
395 :
396 : /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
397 1188 : async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
398 1188 : WALRECEIVER_SWITCHES
399 1188 : .with_label_values(&[new_sk.reason.name()])
400 1188 : .inc();
401 1188 :
402 1188 : self.drop_old_connection(true).await;
403 :
404 1188 : let node_id = new_sk.safekeeper_id;
405 1188 : let connect_timeout = self.conf.wal_connect_timeout;
406 1188 : let timeline = Arc::clone(&self.timeline);
407 1188 : let ctx = ctx.detached_child(
408 1188 : TaskKind::WalReceiverConnectionHandler,
409 1188 : DownloadBehavior::Download,
410 1188 : );
411 :
412 1188 : let span = info_span!("connection", %node_id);
413 1188 : let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
414 1188 : async move {
415 1188 : debug_assert_current_span_has_tenant_and_timeline_id();
416 :
417 1188 : let res = super::walreceiver_connection::handle_walreceiver_connection(
418 1188 : timeline,
419 1188 : new_sk.wal_source_connconf,
420 1188 : events_sender,
421 1188 : cancellation,
422 1188 : connect_timeout,
423 1188 : ctx,
424 1188 : node_id,
425 1188 : )
426 5085285 : .await;
427 :
428 1122 : match res {
429 175 : Ok(()) => Ok(()),
430 947 : Err(e) => {
431 947 : match e {
432 57 : WalReceiverError::SuccessfulCompletion(msg) => {
433 57 : info!("walreceiver connection handling ended with success: {msg}");
434 57 : Ok(())
435 : }
436 881 : WalReceiverError::ExpectedSafekeeperError(e) => {
437 881 : info!("walreceiver connection handling ended: {e}");
438 881 : Ok(())
439 : }
440 9 : WalReceiverError::Other(e) => {
441 9 : // give out an error to have task_mgr give it a really verbose logging
442 9 : Err(e).context("walreceiver connection handling failure")
443 : }
444 : }
445 : }
446 : }
447 1122 : }
448 1188 : .instrument(span)
449 1188 : });
450 1188 :
451 1188 : let now = Utc::now().naive_utc();
452 1188 : self.wal_connection = Some(WalConnection {
453 1188 : started_at: now,
454 1188 : sk_id: new_sk.safekeeper_id,
455 1188 : availability_zone: new_sk.availability_zone,
456 1188 : status: WalConnectionStatus {
457 1188 : is_connected: false,
458 1188 : has_processed_wal: false,
459 1188 : latest_connection_update: now,
460 1188 : latest_wal_update: now,
461 1188 : streaming_lsn: None,
462 1188 : commit_lsn: None,
463 1188 : node: node_id,
464 1188 : },
465 1188 : connection_task: connection_handle,
466 1188 : discovered_new_wal: None,
467 1188 : });
468 1188 : }
469 :
470 : /// Drops the current connection (if any) and updates retry timeout for the next
471 : /// connection attempt to the same safekeeper.
472 2083 : async fn drop_old_connection(&mut self, needs_shutdown: bool) {
473 2083 : let wal_connection = match self.wal_connection.take() {
474 914 : Some(wal_connection) => wal_connection,
475 1169 : None => return,
476 : };
477 :
478 914 : if needs_shutdown {
479 20 : wal_connection.connection_task.shutdown().await;
480 895 : }
481 :
482 914 : let retry = self
483 914 : .wal_connection_retries
484 914 : .entry(wal_connection.sk_id)
485 914 : .or_insert(RetryInfo {
486 914 : next_retry_at: None,
487 914 : retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
488 914 : });
489 914 :
490 914 : let now = Utc::now().naive_utc();
491 914 :
492 914 : // Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
493 914 : // and we add backoff to the time when we started the connection attempt. If the connection
494 914 : // was active for a long time, then next_retry_at will be in the past.
495 914 : retry.next_retry_at =
496 914 : wal_connection
497 914 : .started_at
498 914 : .checked_add_signed(chrono::Duration::milliseconds(
499 914 : (retry.retry_duration_seconds * 1000.0) as i64,
500 914 : ));
501 :
502 914 : if let Some(next) = &retry.next_retry_at {
503 914 : if next > &now {
504 546 : info!(
505 546 : "Next connection retry to {:?} is at {}",
506 546 : wal_connection.sk_id, next
507 546 : );
508 368 : }
509 UBC 0 : }
510 :
511 CBC 914 : let next_retry_duration =
512 914 : retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
513 914 : // Clamp the next retry duration to the maximum allowed.
514 914 : let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
515 914 : // Clamp the next retry duration to the minimum allowed.
516 914 : let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
517 914 :
518 914 : retry.retry_duration_seconds = next_retry_duration;
519 2083 : }
520 :
521 : /// Returns time needed to wait to have a new candidate for WAL streaming.
522 759873 : fn time_until_next_retry(&self) -> Option<Duration> {
523 759873 : let now = Utc::now().naive_utc();
524 :
525 759873 : let next_retry_at = self
526 759873 : .wal_connection_retries
527 759873 : .values()
528 759873 : .filter_map(|retry| retry.next_retry_at)
529 759873 : .filter(|next_retry_at| next_retry_at > &now)
530 759873 : .min()?;
531 :
532 1135 : (next_retry_at - now).to_std().ok()
533 759873 : }
534 :
535 : /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
536 5804 : fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
537 5804 : WALRECEIVER_BROKER_UPDATES.inc();
538 5804 :
539 5804 : let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
540 5804 : let old_entry = self.wal_stream_candidates.insert(
541 5804 : new_safekeeper_id,
542 5804 : BrokerSkTimeline {
543 5804 : timeline: timeline_update,
544 5804 : latest_update: Utc::now().naive_utc(),
545 5804 : },
546 5804 : );
547 5804 :
548 5804 : if old_entry.is_none() {
549 588 : info!("New SK node was added: {new_safekeeper_id}");
550 588 : WALRECEIVER_CANDIDATES_ADDED.inc();
551 5216 : }
552 5804 : }
553 :
554 : /// Cleans up stale broker records and checks the rest for the new connection candidate.
555 : /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
556 : /// The current rules for approving new candidates:
557 : /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
558 : /// * if there's no such entry, no new candidate found, abort
559 : /// * otherwise check if the candidate is much better than the current one
560 : ///
561 : /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
562 : /// General rules are following:
563 : /// * if connected safekeeper is not present, pick the candidate
564 : /// * if we haven't received any updates for some time, pick the candidate
565 : /// * if the candidate commit_lsn is much higher than the current one, pick the candidate
566 : /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
567 : /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
568 : ///
569 : /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
570 : /// Both thresholds are configured per tenant.
571 758790 : fn next_connection_candidate(&mut self) -> Option<NewWalConnectionCandidate> {
572 758790 : self.cleanup_old_candidates();
573 758790 :
574 758790 : match &self.wal_connection {
575 757149 : Some(existing_wal_connection) => {
576 757149 : let connected_sk_node = existing_wal_connection.sk_id;
577 :
578 416181 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
579 757149 : self.select_connection_candidate(Some(connected_sk_node))?;
580 416181 : let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
581 416181 :
582 416181 : let now = Utc::now().naive_utc();
583 416181 : if let Ok(latest_interaciton) =
584 416181 : (now - existing_wal_connection.status.latest_connection_update).to_std()
585 : {
586 : // Drop connection if we haven't received keepalive message for a while.
587 416181 : if latest_interaciton > self.conf.wal_connect_timeout {
588 1 : return Some(NewWalConnectionCandidate {
589 1 : safekeeper_id: new_sk_id,
590 1 : wal_source_connconf: new_wal_source_connconf,
591 1 : availability_zone: new_availability_zone,
592 1 : reason: ReconnectReason::NoKeepAlives {
593 1 : last_keep_alive: Some(
594 1 : existing_wal_connection.status.latest_connection_update,
595 1 : ),
596 1 : check_time: now,
597 1 : threshold: self.conf.wal_connect_timeout,
598 1 : },
599 1 : });
600 416180 : }
601 UBC 0 : }
602 :
603 CBC 416180 : if !existing_wal_connection.status.is_connected {
604 : // We haven't connected yet and we shouldn't switch until connection timeout (condition above).
605 176 : return None;
606 416004 : }
607 :
608 416004 : if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
609 415917 : let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
610 415917 : // Check if the new candidate has much more WAL than the current one.
611 415917 : match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
612 9388 : Some(new_sk_lsn_advantage) => {
613 9388 : if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
614 20 : return Some(NewWalConnectionCandidate {
615 20 : safekeeper_id: new_sk_id,
616 20 : wal_source_connconf: new_wal_source_connconf,
617 20 : availability_zone: new_availability_zone,
618 20 : reason: ReconnectReason::LaggingWal {
619 20 : current_commit_lsn,
620 20 : new_commit_lsn,
621 20 : threshold: self.conf.max_lsn_wal_lag,
622 20 : },
623 20 : });
624 9368 : }
625 9368 : // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
626 9368 : // and the current one is not, switch to the new one.
627 9368 : if self.conf.availability_zone.is_some()
628 172 : && existing_wal_connection.availability_zone
629 172 : != self.conf.availability_zone
630 172 : && self.conf.availability_zone == new_availability_zone
631 : {
632 1 : return Some(NewWalConnectionCandidate {
633 1 : safekeeper_id: new_sk_id,
634 1 : availability_zone: new_availability_zone,
635 1 : wal_source_connconf: new_wal_source_connconf,
636 1 : reason: ReconnectReason::SwitchAvailabilityZone,
637 1 : });
638 9367 : }
639 : }
640 406529 : None => debug!(
641 UBC 0 : "Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
642 0 : ),
643 : }
644 CBC 87 : }
645 :
646 415983 : let current_lsn = match existing_wal_connection.status.streaming_lsn {
647 415739 : Some(lsn) => lsn,
648 244 : None => self.timeline.get_last_record_lsn(),
649 : };
650 415983 : let current_commit_lsn = existing_wal_connection
651 415983 : .status
652 415983 : .commit_lsn
653 415983 : .unwrap_or(current_lsn);
654 415983 : let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
655 415983 :
656 415983 : // Keep discovered_new_wal only if connected safekeeper has not caught up yet.
657 415983 : let mut discovered_new_wal = existing_wal_connection
658 415983 : .discovered_new_wal
659 415983 : .filter(|new_wal| new_wal.lsn > current_commit_lsn);
660 415983 :
661 415983 : if discovered_new_wal.is_none() {
662 : // Check if the new candidate has more WAL than the current one.
663 : // If the new candidate has more WAL than the current one, we consider switching to the new candidate.
664 413155 : discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
665 121 : trace!(
666 UBC 0 : "New candidate has commit_lsn {}, higher than current_commit_lsn {}",
667 0 : candidate_commit_lsn,
668 0 : current_commit_lsn
669 0 : );
670 CBC 121 : Some(NewCommittedWAL {
671 121 : lsn: candidate_commit_lsn,
672 121 : discovered_at: Utc::now().naive_utc(),
673 121 : })
674 : } else {
675 413034 : None
676 : };
677 2828 : }
678 :
679 415983 : let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
680 : // Connected safekeeper has more WAL, but we haven't received updates for some time.
681 11001 : trace!(
682 UBC 0 : "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
683 0 : (now - existing_wal_connection.status.latest_wal_update).to_std(),
684 0 : current_lsn,
685 0 : current_commit_lsn
686 0 : );
687 CBC 11001 : Some(existing_wal_connection.status.latest_wal_update)
688 : } else {
689 404982 : discovered_new_wal.as_ref().map(|new_wal| {
690 125 : // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
691 125 : new_wal
692 125 : .discovered_at
693 125 : .max(existing_wal_connection.status.latest_wal_update)
694 404982 : })
695 : };
696 :
697 : // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
698 415983 : if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
699 11126 : if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
700 11083 : if candidate_commit_lsn > current_commit_lsn
701 2906 : && waiting_for_new_wal > self.conf.lagging_wal_timeout
702 : {
703 1 : return Some(NewWalConnectionCandidate {
704 1 : safekeeper_id: new_sk_id,
705 1 : wal_source_connconf: new_wal_source_connconf,
706 1 : availability_zone: new_availability_zone,
707 1 : reason: ReconnectReason::NoWalTimeout {
708 1 : current_lsn,
709 1 : current_commit_lsn,
710 1 : candidate_commit_lsn,
711 1 : last_wal_interaction: Some(
712 1 : existing_wal_connection.status.latest_wal_update,
713 1 : ),
714 1 : check_time: now,
715 1 : threshold: self.conf.lagging_wal_timeout,
716 1 : },
717 1 : });
718 11082 : }
719 43 : }
720 404857 : }
721 :
722 415982 : self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
723 : }
724 : None => {
725 1173 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
726 1641 : self.select_connection_candidate(None)?;
727 1173 : return Some(NewWalConnectionCandidate {
728 1173 : safekeeper_id: new_sk_id,
729 1173 : availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
730 1173 : wal_source_connconf: new_wal_source_connconf,
731 1173 : reason: ReconnectReason::NoExistingConnection,
732 1173 : });
733 : }
734 : }
735 :
736 415982 : None
737 758790 : }
738 :
739 : /// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
740 : /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
741 : ///
742 : /// The candidate that is chosen:
743 : /// * has no pending retry cooldown
744 : /// * has greatest commit_lsn among the ones that are left
745 758790 : fn select_connection_candidate(
746 758790 : &self,
747 758790 : node_to_omit: Option<NodeId>,
748 758790 : ) -> Option<(NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
749 758790 : self.applicable_connection_candidates()
750 1533965 : .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
751 776818 : .max_by_key(|(_, info, _)| info.commit_lsn)
752 758790 : }
753 :
754 : /// Returns a list of safekeepers that have valid info and ready for connection.
755 : /// Some safekeepers are filtered by the retry cooldown.
756 758790 : fn applicable_connection_candidates(
757 758790 : &self,
758 758790 : ) -> impl Iterator<Item = (NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
759 758790 : let now = Utc::now().naive_utc();
760 758790 :
761 758790 : self.wal_stream_candidates
762 758790 : .iter()
763 1535448 : .filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
764 1535443 : .filter(move |(sk_id, _)| {
765 1535443 : let next_retry_at = self
766 1535443 : .wal_connection_retries
767 1535443 : .get(sk_id)
768 1535443 : .and_then(|retry_info| {
769 310948 : retry_info.next_retry_at
770 1535443 : });
771 1535443 :
772 1535443 : next_retry_at.is_none() || next_retry_at.unwrap() <= now
773 1535443 : }).filter_map(|(sk_id, broker_info)| {
774 1533969 : let info = &broker_info.timeline;
775 1533969 : if info.safekeeper_connstr.is_empty() {
776 2 : return None; // no connection string, ignore sk
777 1533967 : }
778 1533967 : match wal_stream_connection_config(
779 1533967 : self.id,
780 1533967 : info.safekeeper_connstr.as_ref(),
781 1533967 : match &self.conf.auth_token {
782 1532816 : None => None,
783 1151 : Some(x) => Some(x),
784 : },
785 1533967 : self.conf.availability_zone.as_deref(),
786 : ) {
787 1533965 : Ok(connstr) => Some((*sk_id, info, connstr)),
788 GBC 2 : Err(e) => {
789 2 : error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
790 UBC 0 : None
791 : }
792 : }
793 CBC 1533967 : })
794 758790 : }
795 :
796 : /// Remove candidates which haven't sent broker updates for a while.
797 758790 : fn cleanup_old_candidates(&mut self) {
798 758790 : let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
799 758790 : let lagging_wal_timeout = self.conf.lagging_wal_timeout;
800 758790 :
801 758790 : self.wal_stream_candidates.retain(|node_id, broker_info| {
802 1535455 : if let Ok(time_since_latest_broker_update) =
803 1535455 : (Utc::now().naive_utc() - broker_info.latest_update).to_std()
804 : {
805 1535455 : let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
806 1535455 : if !should_retain {
807 7 : node_ids_to_remove.push(*node_id);
808 1535448 : }
809 1535455 : should_retain
810 : } else {
811 UBC 0 : true
812 : }
813 CBC 1535455 : });
814 758790 :
815 758790 : if !node_ids_to_remove.is_empty() {
816 14 : for node_id in node_ids_to_remove {
817 7 : info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
818 7 : self.wal_connection_retries.remove(&node_id);
819 7 : WALRECEIVER_CANDIDATES_REMOVED.inc();
820 : }
821 758783 : }
822 758790 : }
823 :
824 415 : pub(super) async fn shutdown(mut self) {
825 415 : if let Some(wal_connection) = self.wal_connection.take() {
826 255 : wal_connection.connection_task.shutdown().await;
827 207 : }
828 415 : }
829 :
830 758780 : fn manager_status(&self) -> ConnectionManagerStatus {
831 758780 : ConnectionManagerStatus {
832 758780 : existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
833 758780 : wal_stream_candidates: self.wal_stream_candidates.clone(),
834 758780 : }
835 758780 : }
836 : }
837 :
838 1189 : #[derive(Debug)]
839 : struct NewWalConnectionCandidate {
840 : safekeeper_id: NodeId,
841 : wal_source_connconf: PgConnectionConfig,
842 : availability_zone: Option<String>,
843 : reason: ReconnectReason,
844 : }
845 :
846 : /// Stores the reason why WAL connection was switched, for furter debugging purposes.
847 1189 : #[derive(Debug, PartialEq, Eq)]
848 : enum ReconnectReason {
849 : NoExistingConnection,
850 : LaggingWal {
851 : current_commit_lsn: Lsn,
852 : new_commit_lsn: Lsn,
853 : threshold: NonZeroU64,
854 : },
855 : SwitchAvailabilityZone,
856 : NoWalTimeout {
857 : current_lsn: Lsn,
858 : current_commit_lsn: Lsn,
859 : candidate_commit_lsn: Lsn,
860 : last_wal_interaction: Option<NaiveDateTime>,
861 : check_time: NaiveDateTime,
862 : threshold: Duration,
863 : },
864 : NoKeepAlives {
865 : last_keep_alive: Option<NaiveDateTime>,
866 : check_time: NaiveDateTime,
867 : threshold: Duration,
868 : },
869 : }
870 :
871 : impl ReconnectReason {
872 1188 : fn name(&self) -> &str {
873 1188 : match self {
874 1169 : ReconnectReason::NoExistingConnection => "NoExistingConnection",
875 19 : ReconnectReason::LaggingWal { .. } => "LaggingWal",
876 UBC 0 : ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
877 0 : ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
878 0 : ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
879 : }
880 CBC 1188 : }
881 : }
882 :
883 : #[cfg(test)]
884 : mod tests {
885 : use super::*;
886 : use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
887 : use url::Host;
888 :
889 19 : fn dummy_broker_sk_timeline(
890 19 : commit_lsn: u64,
891 19 : safekeeper_connstr: &str,
892 19 : latest_update: NaiveDateTime,
893 19 : ) -> BrokerSkTimeline {
894 19 : BrokerSkTimeline {
895 19 : timeline: SafekeeperTimelineInfo {
896 19 : safekeeper_id: 0,
897 19 : tenant_timeline_id: None,
898 19 : term: 0,
899 19 : last_log_term: 0,
900 19 : flush_lsn: 0,
901 19 : commit_lsn,
902 19 : backup_lsn: 0,
903 19 : remote_consistent_lsn: 0,
904 19 : peer_horizon_lsn: 0,
905 19 : local_start_lsn: 0,
906 19 : safekeeper_connstr: safekeeper_connstr.to_owned(),
907 19 : http_connstr: safekeeper_connstr.to_owned(),
908 19 : availability_zone: None,
909 19 : },
910 19 : latest_update,
911 19 : }
912 19 : }
913 :
914 1 : #[tokio::test]
915 1 : async fn no_connection_no_candidate() -> anyhow::Result<()> {
916 1 : let harness = TenantHarness::create("no_connection_no_candidate")?;
917 3 : let mut state = dummy_state(&harness).await;
918 1 : let now = Utc::now().naive_utc();
919 :
920 1 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
921 1 : let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
922 1 :
923 1 : state.wal_connection = None;
924 1 : state.wal_stream_candidates = HashMap::from([
925 1 : (NodeId(0), dummy_broker_sk_timeline(1, "", now)),
926 1 : (NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
927 1 : (NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
928 1 : (
929 1 : NodeId(3),
930 1 : dummy_broker_sk_timeline(
931 1 : 1 + state.conf.max_lsn_wal_lag.get(),
932 1 : "delay_over_threshold",
933 1 : delay_over_threshold,
934 1 : ),
935 1 : ),
936 1 : ]);
937 1 :
938 1 : let no_candidate = state.next_connection_candidate();
939 1 : assert!(
940 1 : no_candidate.is_none(),
941 UBC 0 : "Expected no candidate selected out of non full data options, but got {no_candidate:?}"
942 : );
943 :
944 CBC 1 : Ok(())
945 : }
946 :
947 1 : #[tokio::test]
948 1 : async fn connection_no_candidate() -> anyhow::Result<()> {
949 1 : let harness = TenantHarness::create("connection_no_candidate")?;
950 3 : let mut state = dummy_state(&harness).await;
951 1 : let now = Utc::now().naive_utc();
952 1 :
953 1 : let connected_sk_id = NodeId(0);
954 1 : let current_lsn = 100_000;
955 1 :
956 1 : let connection_status = WalConnectionStatus {
957 1 : is_connected: true,
958 1 : has_processed_wal: true,
959 1 : latest_connection_update: now,
960 1 : latest_wal_update: now,
961 1 : commit_lsn: Some(Lsn(current_lsn)),
962 1 : streaming_lsn: Some(Lsn(current_lsn)),
963 1 : node: NodeId(1),
964 1 : };
965 1 :
966 1 : state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
967 1 : state.wal_connection = Some(WalConnection {
968 1 : started_at: now,
969 1 : sk_id: connected_sk_id,
970 1 : availability_zone: None,
971 1 : status: connection_status,
972 1 : connection_task: TaskHandle::spawn(move |sender, _| async move {
973 1 : sender
974 1 : .send(TaskStateUpdate::Progress(connection_status))
975 1 : .ok();
976 1 : Ok(())
977 1 : }),
978 1 : discovered_new_wal: None,
979 1 : });
980 1 : state.wal_stream_candidates = HashMap::from([
981 1 : (
982 1 : connected_sk_id,
983 1 : dummy_broker_sk_timeline(
984 1 : current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
985 1 : DUMMY_SAFEKEEPER_HOST,
986 1 : now,
987 1 : ),
988 1 : ),
989 1 : (
990 1 : NodeId(1),
991 1 : dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
992 1 : ),
993 1 : (
994 1 : NodeId(2),
995 1 : dummy_broker_sk_timeline(
996 1 : current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
997 1 : "not_enough_advanced_lsn",
998 1 : now,
999 1 : ),
1000 1 : ),
1001 1 : ]);
1002 1 :
1003 1 : let no_candidate = state.next_connection_candidate();
1004 1 : assert!(
1005 1 : no_candidate.is_none(),
1006 UBC 0 : "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}"
1007 : );
1008 :
1009 CBC 1 : Ok(())
1010 : }
1011 :
1012 1 : #[tokio::test]
1013 1 : async fn no_connection_candidate() -> anyhow::Result<()> {
1014 1 : let harness = TenantHarness::create("no_connection_candidate")?;
1015 3 : let mut state = dummy_state(&harness).await;
1016 1 : let now = Utc::now().naive_utc();
1017 1 :
1018 1 : state.wal_connection = None;
1019 1 : state.wal_stream_candidates = HashMap::from([(
1020 1 : NodeId(0),
1021 1 : dummy_broker_sk_timeline(
1022 1 : 1 + state.conf.max_lsn_wal_lag.get(),
1023 1 : DUMMY_SAFEKEEPER_HOST,
1024 1 : now,
1025 1 : ),
1026 1 : )]);
1027 1 :
1028 1 : let only_candidate = state
1029 1 : .next_connection_candidate()
1030 1 : .expect("Expected one candidate selected out of the only data option, but got none");
1031 1 : assert_eq!(only_candidate.safekeeper_id, NodeId(0));
1032 1 : assert_eq!(
1033 : only_candidate.reason,
1034 : ReconnectReason::NoExistingConnection,
1035 UBC 0 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1036 : );
1037 CBC 1 : assert_eq!(
1038 1 : only_candidate.wal_source_connconf.host(),
1039 1 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1040 1 : );
1041 :
1042 1 : let selected_lsn = 100_000;
1043 1 : state.wal_stream_candidates = HashMap::from([
1044 1 : (
1045 1 : NodeId(0),
1046 1 : dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
1047 1 : ),
1048 1 : (
1049 1 : NodeId(1),
1050 1 : dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
1051 1 : ),
1052 1 : (
1053 1 : NodeId(2),
1054 1 : dummy_broker_sk_timeline(selected_lsn + 100, "", now),
1055 1 : ),
1056 1 : ]);
1057 1 : let biggest_wal_candidate = state.next_connection_candidate().expect(
1058 1 : "Expected one candidate selected out of multiple valid data options, but got none",
1059 1 : );
1060 1 :
1061 1 : assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1));
1062 1 : assert_eq!(
1063 : biggest_wal_candidate.reason,
1064 : ReconnectReason::NoExistingConnection,
1065 UBC 0 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1066 : );
1067 CBC 1 : assert_eq!(
1068 1 : biggest_wal_candidate.wal_source_connconf.host(),
1069 1 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1070 1 : );
1071 :
1072 1 : Ok(())
1073 : }
1074 :
1075 1 : #[tokio::test]
1076 1 : async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
1077 1 : let harness = TenantHarness::create("candidate_with_many_connection_failures")?;
1078 3 : let mut state = dummy_state(&harness).await;
1079 1 : let now = Utc::now().naive_utc();
1080 1 :
1081 1 : let current_lsn = Lsn(100_000).align();
1082 1 : let bigger_lsn = Lsn(current_lsn.0 + 100).align();
1083 1 :
1084 1 : state.wal_connection = None;
1085 1 : state.wal_stream_candidates = HashMap::from([
1086 1 : (
1087 1 : NodeId(0),
1088 1 : dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1089 1 : ),
1090 1 : (
1091 1 : NodeId(1),
1092 1 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1093 1 : ),
1094 1 : ]);
1095 1 : state.wal_connection_retries = HashMap::from([(
1096 1 : NodeId(0),
1097 1 : RetryInfo {
1098 1 : next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
1099 1 : retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
1100 1 : },
1101 1 : )]);
1102 1 :
1103 1 : let candidate_with_less_errors = state
1104 1 : .next_connection_candidate()
1105 1 : .expect("Expected one candidate selected, but got none");
1106 1 : assert_eq!(
1107 : candidate_with_less_errors.safekeeper_id,
1108 : NodeId(1),
1109 UBC 0 : "Should select the node with no pending retry cooldown"
1110 : );
1111 :
1112 CBC 1 : Ok(())
1113 : }
1114 :
1115 1 : #[tokio::test]
1116 1 : async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1117 1 : let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?;
1118 3 : let mut state = dummy_state(&harness).await;
1119 1 : let current_lsn = Lsn(100_000).align();
1120 1 : let now = Utc::now().naive_utc();
1121 1 :
1122 1 : let connected_sk_id = NodeId(0);
1123 1 : let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
1124 1 :
1125 1 : let connection_status = WalConnectionStatus {
1126 1 : is_connected: true,
1127 1 : has_processed_wal: true,
1128 1 : latest_connection_update: now,
1129 1 : latest_wal_update: now,
1130 1 : commit_lsn: Some(current_lsn),
1131 1 : streaming_lsn: Some(current_lsn),
1132 1 : node: connected_sk_id,
1133 1 : };
1134 1 :
1135 1 : state.wal_connection = Some(WalConnection {
1136 1 : started_at: now,
1137 1 : sk_id: connected_sk_id,
1138 1 : availability_zone: None,
1139 1 : status: connection_status,
1140 1 : connection_task: TaskHandle::spawn(move |sender, _| async move {
1141 1 : sender
1142 1 : .send(TaskStateUpdate::Progress(connection_status))
1143 1 : .ok();
1144 1 : Ok(())
1145 1 : }),
1146 1 : discovered_new_wal: None,
1147 1 : });
1148 1 : state.wal_stream_candidates = HashMap::from([
1149 1 : (
1150 1 : connected_sk_id,
1151 1 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1152 1 : ),
1153 1 : (
1154 1 : NodeId(1),
1155 1 : dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
1156 1 : ),
1157 1 : ]);
1158 1 :
1159 1 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1160 1 : "Expected one candidate selected out of multiple valid data options, but got none",
1161 1 : );
1162 1 :
1163 1 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1));
1164 1 : assert_eq!(
1165 1 : over_threshcurrent_candidate.reason,
1166 1 : ReconnectReason::LaggingWal {
1167 1 : current_commit_lsn: current_lsn,
1168 1 : new_commit_lsn: new_lsn,
1169 1 : threshold: state.conf.max_lsn_wal_lag
1170 1 : },
1171 UBC 0 : "Should select bigger WAL safekeeper if it starts to lag enough"
1172 : );
1173 CBC 1 : assert_eq!(
1174 1 : over_threshcurrent_candidate.wal_source_connconf.host(),
1175 1 : &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
1176 1 : );
1177 :
1178 1 : Ok(())
1179 : }
1180 :
1181 1 : #[tokio::test]
1182 1 : async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
1183 1 : let harness = TenantHarness::create("timeout_connection_threshold_current_candidate")?;
1184 3 : let mut state = dummy_state(&harness).await;
1185 1 : let current_lsn = Lsn(100_000).align();
1186 1 : let now = Utc::now().naive_utc();
1187 :
1188 1 : let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
1189 1 : let time_over_threshold =
1190 1 : Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
1191 1 :
1192 1 : let connection_status = WalConnectionStatus {
1193 1 : is_connected: true,
1194 1 : has_processed_wal: true,
1195 1 : latest_connection_update: time_over_threshold,
1196 1 : latest_wal_update: time_over_threshold,
1197 1 : commit_lsn: Some(current_lsn),
1198 1 : streaming_lsn: Some(current_lsn),
1199 1 : node: NodeId(1),
1200 1 : };
1201 1 :
1202 1 : state.wal_connection = Some(WalConnection {
1203 1 : started_at: now,
1204 1 : sk_id: NodeId(1),
1205 1 : availability_zone: None,
1206 1 : status: connection_status,
1207 1 : connection_task: TaskHandle::spawn(move |sender, _| async move {
1208 1 : sender
1209 1 : .send(TaskStateUpdate::Progress(connection_status))
1210 1 : .ok();
1211 1 : Ok(())
1212 1 : }),
1213 1 : discovered_new_wal: None,
1214 1 : });
1215 1 : state.wal_stream_candidates = HashMap::from([(
1216 1 : NodeId(0),
1217 1 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1218 1 : )]);
1219 1 :
1220 1 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1221 1 : "Expected one candidate selected out of multiple valid data options, but got none",
1222 1 : );
1223 1 :
1224 1 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1225 1 : match over_threshcurrent_candidate.reason {
1226 : ReconnectReason::NoKeepAlives {
1227 1 : last_keep_alive,
1228 1 : threshold,
1229 1 : ..
1230 1 : } => {
1231 1 : assert_eq!(last_keep_alive, Some(time_over_threshold));
1232 1 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1233 : }
1234 UBC 0 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1235 : }
1236 CBC 1 : assert_eq!(
1237 1 : over_threshcurrent_candidate.wal_source_connconf.host(),
1238 1 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1239 1 : );
1240 :
1241 1 : Ok(())
1242 : }
1243 :
1244 1 : #[tokio::test]
1245 1 : async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1246 1 : let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate")?;
1247 3 : let mut state = dummy_state(&harness).await;
1248 1 : let current_lsn = Lsn(100_000).align();
1249 1 : let new_lsn = Lsn(100_100).align();
1250 1 : let now = Utc::now().naive_utc();
1251 :
1252 1 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
1253 1 : let time_over_threshold =
1254 1 : Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
1255 1 :
1256 1 : let connection_status = WalConnectionStatus {
1257 1 : is_connected: true,
1258 1 : has_processed_wal: true,
1259 1 : latest_connection_update: now,
1260 1 : latest_wal_update: time_over_threshold,
1261 1 : commit_lsn: Some(current_lsn),
1262 1 : streaming_lsn: Some(current_lsn),
1263 1 : node: NodeId(1),
1264 1 : };
1265 1 :
1266 1 : state.wal_connection = Some(WalConnection {
1267 1 : started_at: now,
1268 1 : sk_id: NodeId(1),
1269 1 : availability_zone: None,
1270 1 : status: connection_status,
1271 1 : connection_task: TaskHandle::spawn(move |_, _| async move { Ok(()) }),
1272 1 : discovered_new_wal: Some(NewCommittedWAL {
1273 1 : discovered_at: time_over_threshold,
1274 1 : lsn: new_lsn,
1275 1 : }),
1276 1 : });
1277 1 : state.wal_stream_candidates = HashMap::from([(
1278 1 : NodeId(0),
1279 1 : dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1280 1 : )]);
1281 1 :
1282 1 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1283 1 : "Expected one candidate selected out of multiple valid data options, but got none",
1284 1 : );
1285 1 :
1286 1 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1287 1 : match over_threshcurrent_candidate.reason {
1288 : ReconnectReason::NoWalTimeout {
1289 1 : current_lsn,
1290 1 : current_commit_lsn,
1291 1 : candidate_commit_lsn,
1292 1 : last_wal_interaction,
1293 1 : threshold,
1294 1 : ..
1295 1 : } => {
1296 1 : assert_eq!(current_lsn, current_lsn);
1297 1 : assert_eq!(current_commit_lsn, current_lsn);
1298 1 : assert_eq!(candidate_commit_lsn, new_lsn);
1299 1 : assert_eq!(last_wal_interaction, Some(time_over_threshold));
1300 1 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1301 : }
1302 UBC 0 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1303 : }
1304 CBC 1 : assert_eq!(
1305 1 : over_threshcurrent_candidate.wal_source_connconf.host(),
1306 1 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1307 1 : );
1308 :
1309 1 : Ok(())
1310 : }
1311 :
1312 : const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
1313 :
1314 8 : async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
1315 8 : let (tenant, ctx) = harness.load().await;
1316 8 : let timeline = tenant
1317 8 : .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
1318 16 : .await
1319 8 : .expect("Failed to create an empty timeline for dummy wal connection manager");
1320 8 :
1321 8 : ConnectionManagerState {
1322 8 : id: TenantTimelineId {
1323 8 : tenant_id: harness.tenant_id,
1324 8 : timeline_id: TIMELINE_ID,
1325 8 : },
1326 8 : timeline,
1327 8 : conf: WalReceiverConf {
1328 8 : wal_connect_timeout: Duration::from_secs(1),
1329 8 : lagging_wal_timeout: Duration::from_secs(1),
1330 8 : max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
1331 8 : auth_token: None,
1332 8 : availability_zone: None,
1333 8 : },
1334 8 : wal_connection: None,
1335 8 : wal_stream_candidates: HashMap::new(),
1336 8 : wal_connection_retries: HashMap::new(),
1337 8 : }
1338 8 : }
1339 :
1340 1 : #[tokio::test]
1341 1 : async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
1342 1 : // Pageserver and one of safekeepers will be in the same availability zone
1343 1 : // and pageserver should prefer to connect to it.
1344 1 : let test_az = Some("test_az".to_owned());
1345 :
1346 1 : let harness = TenantHarness::create("switch_to_same_availability_zone")?;
1347 3 : let mut state = dummy_state(&harness).await;
1348 1 : state.conf.availability_zone = test_az.clone();
1349 1 : let current_lsn = Lsn(100_000).align();
1350 1 : let now = Utc::now().naive_utc();
1351 1 :
1352 1 : let connected_sk_id = NodeId(0);
1353 1 :
1354 1 : let connection_status = WalConnectionStatus {
1355 1 : is_connected: true,
1356 1 : has_processed_wal: true,
1357 1 : latest_connection_update: now,
1358 1 : latest_wal_update: now,
1359 1 : commit_lsn: Some(current_lsn),
1360 1 : streaming_lsn: Some(current_lsn),
1361 1 : node: connected_sk_id,
1362 1 : };
1363 1 :
1364 1 : state.wal_connection = Some(WalConnection {
1365 1 : started_at: now,
1366 1 : sk_id: connected_sk_id,
1367 1 : availability_zone: None,
1368 1 : status: connection_status,
1369 1 : connection_task: TaskHandle::spawn(move |sender, _| async move {
1370 1 : sender
1371 1 : .send(TaskStateUpdate::Progress(connection_status))
1372 1 : .ok();
1373 1 : Ok(())
1374 1 : }),
1375 1 : discovered_new_wal: None,
1376 1 : });
1377 1 :
1378 1 : // We have another safekeeper with the same commit_lsn, and it have the same availability zone as
1379 1 : // the current pageserver.
1380 1 : let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
1381 1 : same_az_sk.timeline.availability_zone = test_az.clone();
1382 1 :
1383 1 : state.wal_stream_candidates = HashMap::from([
1384 1 : (
1385 1 : connected_sk_id,
1386 1 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1387 1 : ),
1388 1 : (NodeId(1), same_az_sk),
1389 1 : ]);
1390 1 :
1391 1 : // We expect that pageserver will switch to the safekeeper in the same availability zone,
1392 1 : // even if it has the same commit_lsn.
1393 1 : let next_candidate = state.next_connection_candidate().expect(
1394 1 : "Expected one candidate selected out of multiple valid data options, but got none",
1395 1 : );
1396 1 :
1397 1 : assert_eq!(next_candidate.safekeeper_id, NodeId(1));
1398 1 : assert_eq!(
1399 : next_candidate.reason,
1400 : ReconnectReason::SwitchAvailabilityZone,
1401 UBC 0 : "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
1402 : );
1403 CBC 1 : assert_eq!(
1404 1 : next_candidate.wal_source_connconf.host(),
1405 1 : &Host::Domain("same_az".to_owned())
1406 1 : );
1407 :
1408 1 : Ok(())
1409 : }
1410 : }
|