Line data Source code
1 : //! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
2 : //! that contains the latest WAL to stream and this connection does not go stale.
3 : //!
4 : //! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
5 : //! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
6 : //! Current connection state is tracked too, to ensure it's not getting stale.
7 : //!
8 : //! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
9 : //! then a (re)connection happens, if necessary.
10 : //! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
11 :
12 : use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
13 :
14 : use super::{TaskStateUpdate, WalReceiverConf};
15 : use crate::context::{DownloadBehavior, RequestContext};
16 : use crate::metrics::{
17 : WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
18 : WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
19 : };
20 : use crate::task_mgr::TaskKind;
21 : use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
22 : use anyhow::Context;
23 : use chrono::{NaiveDateTime, Utc};
24 : use pageserver_api::models::TimelineState;
25 : use storage_broker::proto::subscribe_safekeeper_info_request::SubscriptionKey;
26 : use storage_broker::proto::SafekeeperTimelineInfo;
27 : use storage_broker::proto::SubscribeSafekeeperInfoRequest;
28 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
29 : use storage_broker::{BrokerClientChannel, Code, Streaming};
30 : use tokio_util::sync::CancellationToken;
31 : use tracing::*;
32 :
33 : use postgres_connection::PgConnectionConfig;
34 : use utils::backoff::{
35 : exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
36 : };
37 : use utils::postgres_client::wal_stream_connection_config;
38 : use utils::{
39 : id::{NodeId, TenantTimelineId},
40 : lsn::Lsn,
41 : };
42 :
43 : use super::{
44 : walreceiver_connection::WalConnectionStatus, walreceiver_connection::WalReceiverError,
45 : TaskEvent, TaskHandle,
46 : };
47 :
48 : pub(crate) struct Cancelled;
49 :
50 : /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
51 : /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
52 : /// If storage broker subscription is cancelled, exits.
53 : ///
54 : /// # Cancel-Safety
55 : ///
56 : /// Not cancellation-safe. Use `cancel` token to request cancellation.
57 0 : pub(super) async fn connection_manager_loop_step(
58 0 : broker_client: &mut BrokerClientChannel,
59 0 : connection_manager_state: &mut ConnectionManagerState,
60 0 : ctx: &RequestContext,
61 0 : cancel: &CancellationToken,
62 0 : manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
63 0 : ) -> Result<(), Cancelled> {
64 0 : match tokio::select! {
65 : _ = cancel.cancelled() => { return Err(Cancelled); },
66 0 : st = connection_manager_state.timeline.wait_to_become_active(ctx) => { st }
67 : } {
68 0 : Ok(()) => {}
69 0 : Err(new_state) => {
70 0 : debug!(
71 0 : ?new_state,
72 0 : "state changed, stopping wal connection manager loop"
73 0 : );
74 0 : return Err(Cancelled);
75 : }
76 : }
77 :
78 0 : WALRECEIVER_ACTIVE_MANAGERS.inc();
79 0 : scopeguard::defer! {
80 0 : WALRECEIVER_ACTIVE_MANAGERS.dec();
81 0 : }
82 :
83 0 : let id = TenantTimelineId {
84 0 : tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
85 0 : timeline_id: connection_manager_state.timeline.timeline_id,
86 0 : };
87 0 :
88 0 : let mut timeline_state_updates = connection_manager_state
89 0 : .timeline
90 0 : .subscribe_for_state_updates();
91 :
92 : // Subscribe to the broker updates. Stream shares underlying TCP connection
93 : // with other streams on this client (other connection managers). When
94 : // object goes out of scope, stream finishes in drop() automatically.
95 0 : let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
96 0 : debug!("Subscribed for broker timeline updates");
97 :
98 : loop {
99 0 : let time_until_next_retry = connection_manager_state.time_until_next_retry();
100 :
101 : // These things are happening concurrently:
102 : //
103 : // - cancellation request
104 : // - keep receiving WAL on the current connection
105 : // - if the shared state says we need to change connection, disconnect and return
106 : // - this runs in a separate task and we receive updates via a watch channel
107 : // - change connection if the rules decide so, or if the current connection dies
108 : // - receive updates from broker
109 : // - this might change the current desired connection
110 : // - timeline state changes to something that does not allow walreceiver to run concurrently
111 :
112 : // NB: make sure each of the select expressions are cancellation-safe
113 : // (no need for arms to be cancellation-safe).
114 0 : tokio::select! {
115 : _ = cancel.cancelled() => { return Err(Cancelled); }
116 0 : Some(wal_connection_update) = async {
117 0 : match connection_manager_state.wal_connection.as_mut() {
118 0 : Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
119 0 : None => None,
120 : }
121 0 : } => {
122 : let wal_connection = connection_manager_state.wal_connection.as_mut()
123 : .expect("Should have a connection, as checked by the corresponding select! guard");
124 : match wal_connection_update {
125 : TaskEvent::Update(TaskStateUpdate::Started) => {},
126 : TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
127 : if new_status.has_processed_wal {
128 : // We have advanced last_record_lsn by processing the WAL received
129 : // from this safekeeper. This is good enough to clean unsuccessful
130 : // retries history and allow reconnecting to this safekeeper without
131 : // sleeping for a long time.
132 : connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
133 : }
134 : wal_connection.status = new_status;
135 : }
136 : TaskEvent::End(walreceiver_task_result) => {
137 : match walreceiver_task_result {
138 0 : Ok(()) => debug!("WAL receiving task finished"),
139 0 : Err(e) => error!("wal receiver task finished with an error: {e:?}"),
140 : }
141 : connection_manager_state.drop_old_connection(false).await;
142 : },
143 : }
144 : },
145 :
146 : // Got a new update from the broker
147 0 : broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
148 : match broker_update {
149 : Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
150 : Err(status) => {
151 : match status.code() {
152 : Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") => {
153 : // tonic's error handling doesn't provide a clear code for disconnections: we get
154 : // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
155 0 : info!("broker disconnected: {status}");
156 : },
157 : _ => {
158 0 : warn!("broker subscription failed: {status}");
159 : }
160 : }
161 : return Ok(());
162 : }
163 : Ok(None) => {
164 0 : error!("broker subscription stream ended"); // can't happen
165 : return Ok(());
166 : }
167 : }
168 : },
169 :
170 0 : new_event = async {
171 : // Reminder: this match arm needs to be cancellation-safe.
172 0 : loop {
173 0 : if connection_manager_state.timeline.current_state() == TimelineState::Loading {
174 0 : warn!("wal connection manager should only be launched after timeline has become active");
175 0 : }
176 0 : match timeline_state_updates.changed().await {
177 : Ok(()) => {
178 0 : let new_state = connection_manager_state.timeline.current_state();
179 0 : match new_state {
180 : // we're already active as walreceiver, no need to reactivate
181 0 : TimelineState::Active => continue,
182 : TimelineState::Broken { .. } | TimelineState::Stopping => {
183 0 : debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
184 0 : return ControlFlow::Break(());
185 : }
186 : TimelineState::Loading => {
187 0 : warn!("timeline transitioned back to Loading state, that should not happen");
188 0 : return ControlFlow::Continue(());
189 : }
190 : }
191 : }
192 0 : Err(_sender_dropped_error) => return ControlFlow::Break(()),
193 : }
194 : }
195 0 : } => match new_event {
196 : ControlFlow::Continue(()) => {
197 : return Ok(());
198 : }
199 : ControlFlow::Break(()) => {
200 0 : debug!("Timeline is no longer active, stopping wal connection manager loop");
201 : return Err(Cancelled);
202 : }
203 : },
204 :
205 0 : Some(()) = async {
206 0 : match time_until_next_retry {
207 0 : Some(sleep_time) => {
208 0 : tokio::time::sleep(sleep_time).await;
209 0 : Some(())
210 : },
211 : None => {
212 0 : debug!("No candidates to retry, waiting indefinitely for the broker events");
213 0 : None
214 : }
215 : }
216 0 : } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
217 : }
218 :
219 0 : if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
220 0 : info!("Switching to new connection candidate: {new_candidate:?}");
221 0 : connection_manager_state
222 0 : .change_connection(new_candidate, ctx)
223 0 : .await
224 0 : }
225 0 : *manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
226 : }
227 0 : }
228 :
229 : /// Endlessly try to subscribe for broker updates for a given timeline.
230 0 : async fn subscribe_for_timeline_updates(
231 0 : broker_client: &mut BrokerClientChannel,
232 0 : id: TenantTimelineId,
233 0 : cancel: &CancellationToken,
234 0 : ) -> Result<Streaming<SafekeeperTimelineInfo>, Cancelled> {
235 0 : let mut attempt = 0;
236 : loop {
237 0 : exponential_backoff(
238 0 : attempt,
239 0 : DEFAULT_BASE_BACKOFF_SECONDS,
240 0 : DEFAULT_MAX_BACKOFF_SECONDS,
241 0 : cancel,
242 0 : )
243 0 : .await;
244 0 : attempt += 1;
245 0 :
246 0 : // subscribe to the specific timeline
247 0 : let key = SubscriptionKey::TenantTimelineId(ProtoTenantTimelineId {
248 0 : tenant_id: id.tenant_id.as_ref().to_owned(),
249 0 : timeline_id: id.timeline_id.as_ref().to_owned(),
250 0 : });
251 0 : let request = SubscribeSafekeeperInfoRequest {
252 0 : subscription_key: Some(key),
253 0 : };
254 :
255 0 : match {
256 0 : tokio::select! {
257 0 : r = broker_client.subscribe_safekeeper_info(request) => { r }
258 : _ = cancel.cancelled() => { return Err(Cancelled); }
259 0 : }
260 0 : } {
261 0 : Ok(resp) => {
262 0 : return Ok(resp.into_inner());
263 : }
264 0 : Err(e) => {
265 0 : // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
266 0 : // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
267 0 : info!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}");
268 0 : continue;
269 : }
270 : }
271 : }
272 0 : }
273 :
274 : const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
275 : const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
276 : const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
277 :
278 : /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
279 : pub(super) struct ConnectionManagerState {
280 : id: TenantTimelineId,
281 : /// Use pageserver data about the timeline to filter out some of the safekeepers.
282 : timeline: Arc<Timeline>,
283 : /// Child token of [`super::WalReceiver::cancel`], inherited to all tasks we spawn.
284 : cancel: CancellationToken,
285 : conf: WalReceiverConf,
286 : /// Current connection to safekeeper for WAL streaming.
287 : wal_connection: Option<WalConnection>,
288 : /// Info about retries and unsuccessful attempts to connect to safekeepers.
289 : wal_connection_retries: HashMap<NodeId, RetryInfo>,
290 : /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
291 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
292 : }
293 :
294 : /// An information about connection manager's current connection and connection candidates.
295 : #[derive(Debug, Clone)]
296 : pub struct ConnectionManagerStatus {
297 : existing_connection: Option<WalConnectionStatus>,
298 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
299 : }
300 :
301 : impl ConnectionManagerStatus {
302 : /// Generates a string, describing current connection status in a form, suitable for logging.
303 0 : pub fn to_human_readable_string(&self) -> String {
304 0 : let mut resulting_string = String::new();
305 0 : match &self.existing_connection {
306 0 : Some(connection) => {
307 0 : if connection.has_processed_wal {
308 0 : resulting_string.push_str(&format!(
309 0 : " (update {}): streaming WAL from node {}, ",
310 0 : connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
311 0 : connection.node,
312 0 : ));
313 0 :
314 0 : match (connection.streaming_lsn, connection.commit_lsn) {
315 0 : (None, None) => resulting_string.push_str("no streaming data"),
316 0 : (None, Some(commit_lsn)) => {
317 0 : resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
318 : }
319 0 : (Some(streaming_lsn), None) => {
320 0 : resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
321 : }
322 0 : (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
323 0 : &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
324 0 : ),
325 : }
326 0 : } else if connection.is_connected {
327 0 : resulting_string.push_str(&format!(
328 0 : " (update {}): connecting to node {}",
329 0 : connection
330 0 : .latest_connection_update
331 0 : .format("%Y-%m-%d %H:%M:%S"),
332 0 : connection.node,
333 0 : ));
334 0 : } else {
335 0 : resulting_string.push_str(&format!(
336 0 : " (update {}): initializing node {} connection",
337 0 : connection
338 0 : .latest_connection_update
339 0 : .format("%Y-%m-%d %H:%M:%S"),
340 0 : connection.node,
341 0 : ));
342 0 : }
343 : }
344 0 : None => resulting_string.push_str(": disconnected"),
345 : }
346 :
347 0 : resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
348 0 : let mut candidates = self.wal_stream_candidates.iter().peekable();
349 0 : while let Some((node_id, candidate_info)) = candidates.next() {
350 0 : resulting_string.push_str(&format!(
351 0 : "({}|{}|{})",
352 0 : node_id,
353 0 : candidate_info.latest_update.format("%H:%M:%S"),
354 0 : Lsn(candidate_info.timeline.commit_lsn)
355 0 : ));
356 0 : if candidates.peek().is_some() {
357 0 : resulting_string.push_str(", ");
358 0 : }
359 : }
360 0 : resulting_string.push(']');
361 0 :
362 0 : resulting_string
363 0 : }
364 : }
365 :
366 : /// Current connection data.
367 : #[derive(Debug)]
368 : struct WalConnection {
369 : /// Time when the connection was initiated.
370 : started_at: NaiveDateTime,
371 : /// Current safekeeper pageserver is connected to for WAL streaming.
372 : sk_id: NodeId,
373 : /// Availability zone of the safekeeper.
374 : availability_zone: Option<String>,
375 : /// Status of the connection.
376 : status: WalConnectionStatus,
377 : /// WAL streaming task handle.
378 : connection_task: TaskHandle<WalConnectionStatus>,
379 : /// Have we discovered that other safekeeper has more recent WAL than we do?
380 : discovered_new_wal: Option<NewCommittedWAL>,
381 : }
382 :
383 : /// Notion of a new committed WAL, which exists on other safekeeper.
384 : #[derive(Debug, Clone, Copy)]
385 : struct NewCommittedWAL {
386 : /// LSN of the new committed WAL.
387 : lsn: Lsn,
388 : /// When we discovered that the new committed WAL exists on other safekeeper.
389 : discovered_at: NaiveDateTime,
390 : }
391 :
392 : #[derive(Debug, Clone, Copy)]
393 : struct RetryInfo {
394 : next_retry_at: Option<NaiveDateTime>,
395 : retry_duration_seconds: f64,
396 : }
397 :
398 : /// Data about the timeline to connect to, received from the broker.
399 : #[derive(Debug, Clone)]
400 : struct BrokerSkTimeline {
401 : timeline: SafekeeperTimelineInfo,
402 : /// Time at which the data was fetched from the broker last time, to track the stale data.
403 : latest_update: NaiveDateTime,
404 : }
405 :
406 : impl ConnectionManagerState {
407 0 : pub(super) fn new(
408 0 : timeline: Arc<Timeline>,
409 0 : conf: WalReceiverConf,
410 0 : cancel: CancellationToken,
411 0 : ) -> Self {
412 0 : let id = TenantTimelineId {
413 0 : tenant_id: timeline.tenant_shard_id.tenant_id,
414 0 : timeline_id: timeline.timeline_id,
415 0 : };
416 0 : Self {
417 0 : id,
418 0 : timeline,
419 0 : cancel,
420 0 : conf,
421 0 : wal_connection: None,
422 0 : wal_stream_candidates: HashMap::new(),
423 0 : wal_connection_retries: HashMap::new(),
424 0 : }
425 0 : }
426 :
427 10 : fn spawn<Fut>(
428 10 : &self,
429 10 : task: impl FnOnce(
430 10 : tokio::sync::watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
431 10 : CancellationToken,
432 10 : ) -> Fut
433 10 : + Send
434 10 : + 'static,
435 10 : ) -> TaskHandle<WalConnectionStatus>
436 10 : where
437 10 : Fut: std::future::Future<Output = anyhow::Result<()>> + Send,
438 10 : {
439 10 : // TODO: get rid of TaskHandle
440 10 : super::TaskHandle::spawn(&self.cancel, task)
441 10 : }
442 :
443 : /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
444 0 : async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
445 0 : WALRECEIVER_SWITCHES
446 0 : .with_label_values(&[new_sk.reason.name()])
447 0 : .inc();
448 0 :
449 0 : self.drop_old_connection(true).await;
450 :
451 0 : let node_id = new_sk.safekeeper_id;
452 0 : let connect_timeout = self.conf.wal_connect_timeout;
453 0 : let ingest_batch_size = self.conf.ingest_batch_size;
454 0 : let timeline = Arc::clone(&self.timeline);
455 0 : let ctx = ctx.detached_child(
456 0 : TaskKind::WalReceiverConnectionHandler,
457 0 : DownloadBehavior::Download,
458 0 : );
459 :
460 0 : let span = info_span!("connection", %node_id);
461 0 : let connection_handle = self.spawn(move |events_sender, cancellation| {
462 0 : async move {
463 0 : debug_assert_current_span_has_tenant_and_timeline_id();
464 :
465 0 : let res = super::walreceiver_connection::handle_walreceiver_connection(
466 0 : timeline,
467 0 : new_sk.wal_source_connconf,
468 0 : events_sender,
469 0 : cancellation.clone(),
470 0 : connect_timeout,
471 0 : ctx,
472 0 : node_id,
473 0 : ingest_batch_size,
474 0 : )
475 0 : .await;
476 :
477 0 : match res {
478 0 : Ok(()) => Ok(()),
479 0 : Err(e) => {
480 0 : match e {
481 0 : WalReceiverError::SuccessfulCompletion(msg) => {
482 0 : info!("walreceiver connection handling ended with success: {msg}");
483 0 : Ok(())
484 : }
485 0 : WalReceiverError::ExpectedSafekeeperError(e) => {
486 0 : info!("walreceiver connection handling ended: {e}");
487 0 : Ok(())
488 : }
489 : WalReceiverError::ClosedGate => {
490 0 : info!(
491 0 : "walreceiver connection handling ended because of closed gate"
492 0 : );
493 0 : Ok(())
494 : }
495 0 : WalReceiverError::Other(e) => {
496 0 : // give out an error to have task_mgr give it a really verbose logging
497 0 : if cancellation.is_cancelled() {
498 : // Ideally we would learn about this via some path other than Other, but
499 : // that requires refactoring all the intermediate layers of ingest code
500 : // that only emit anyhow::Error
501 0 : Ok(())
502 : } else {
503 0 : Err(e).context("walreceiver connection handling failure")
504 : }
505 : }
506 : }
507 : }
508 : }
509 0 : }
510 0 : .instrument(span)
511 0 : });
512 0 :
513 0 : let now = Utc::now().naive_utc();
514 0 : self.wal_connection = Some(WalConnection {
515 0 : started_at: now,
516 0 : sk_id: new_sk.safekeeper_id,
517 0 : availability_zone: new_sk.availability_zone,
518 0 : status: WalConnectionStatus {
519 0 : is_connected: false,
520 0 : has_processed_wal: false,
521 0 : latest_connection_update: now,
522 0 : latest_wal_update: now,
523 0 : streaming_lsn: None,
524 0 : commit_lsn: None,
525 0 : node: node_id,
526 0 : },
527 0 : connection_task: connection_handle,
528 0 : discovered_new_wal: None,
529 0 : });
530 0 : }
531 :
532 : /// Drops the current connection (if any) and updates retry timeout for the next
533 : /// connection attempt to the same safekeeper.
534 : ///
535 : /// # Cancel-Safety
536 : ///
537 : /// Not cancellation-safe.
538 0 : async fn drop_old_connection(&mut self, needs_shutdown: bool) {
539 0 : let wal_connection = match self.wal_connection.take() {
540 0 : Some(wal_connection) => wal_connection,
541 0 : None => return,
542 : };
543 :
544 0 : if needs_shutdown {
545 0 : wal_connection
546 0 : .connection_task
547 0 : .shutdown()
548 : // This here is why this function isn't cancellation-safe.
549 : // If we got cancelled here, then self.wal_connection is already None and we lose track of the task.
550 : // Even if our caller diligently calls Self::shutdown(), it will find a self.wal_connection=None
551 : // and thus be ineffective.
552 0 : .await;
553 0 : }
554 :
555 0 : let retry = self
556 0 : .wal_connection_retries
557 0 : .entry(wal_connection.sk_id)
558 0 : .or_insert(RetryInfo {
559 0 : next_retry_at: None,
560 0 : retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
561 0 : });
562 0 :
563 0 : let now = Utc::now().naive_utc();
564 0 :
565 0 : // Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
566 0 : // and we add backoff to the time when we started the connection attempt. If the connection
567 0 : // was active for a long time, then next_retry_at will be in the past.
568 0 : retry.next_retry_at =
569 0 : wal_connection
570 0 : .started_at
571 0 : .checked_add_signed(chrono::Duration::milliseconds(
572 0 : (retry.retry_duration_seconds * 1000.0) as i64,
573 0 : ));
574 :
575 0 : if let Some(next) = &retry.next_retry_at {
576 0 : if next > &now {
577 0 : info!(
578 0 : "Next connection retry to {:?} is at {}",
579 0 : wal_connection.sk_id, next
580 0 : );
581 0 : }
582 0 : }
583 :
584 0 : let next_retry_duration =
585 0 : retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
586 0 : // Clamp the next retry duration to the maximum allowed.
587 0 : let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
588 0 : // Clamp the next retry duration to the minimum allowed.
589 0 : let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
590 0 :
591 0 : retry.retry_duration_seconds = next_retry_duration;
592 0 : }
593 :
594 : /// Returns time needed to wait to have a new candidate for WAL streaming.
595 0 : fn time_until_next_retry(&self) -> Option<Duration> {
596 0 : let now = Utc::now().naive_utc();
597 :
598 0 : let next_retry_at = self
599 0 : .wal_connection_retries
600 0 : .values()
601 0 : .filter_map(|retry| retry.next_retry_at)
602 0 : .filter(|next_retry_at| next_retry_at > &now)
603 0 : .min()?;
604 :
605 0 : (next_retry_at - now).to_std().ok()
606 0 : }
607 :
608 : /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
609 0 : fn register_timeline_update(&mut self, timeline_update: SafekeeperTimelineInfo) {
610 0 : WALRECEIVER_BROKER_UPDATES.inc();
611 0 :
612 0 : let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
613 0 : let old_entry = self.wal_stream_candidates.insert(
614 0 : new_safekeeper_id,
615 0 : BrokerSkTimeline {
616 0 : timeline: timeline_update,
617 0 : latest_update: Utc::now().naive_utc(),
618 0 : },
619 0 : );
620 0 :
621 0 : if old_entry.is_none() {
622 0 : info!("New SK node was added: {new_safekeeper_id}");
623 0 : WALRECEIVER_CANDIDATES_ADDED.inc();
624 0 : }
625 0 : }
626 :
627 : /// Cleans up stale broker records and checks the rest for the new connection candidate.
628 : /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
629 : /// The current rules for approving new candidates:
630 : /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
631 : /// * if there's no such entry, no new candidate found, abort
632 : /// * otherwise check if the candidate is much better than the current one
633 : ///
634 : /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
635 : /// General rules are following:
636 : /// * if connected safekeeper is not present, pick the candidate
637 : /// * if we haven't received any updates for some time, pick the candidate
638 : /// * if the candidate commit_lsn is much higher than the current one, pick the candidate
639 : /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
640 : /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
641 : ///
642 : /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
643 : /// Both thresholds are configured per tenant.
644 18 : fn next_connection_candidate(&mut self) -> Option<NewWalConnectionCandidate> {
645 18 : self.cleanup_old_candidates();
646 18 :
647 18 : match &self.wal_connection {
648 10 : Some(existing_wal_connection) => {
649 10 : let connected_sk_node = existing_wal_connection.sk_id;
650 :
651 10 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
652 10 : self.select_connection_candidate(Some(connected_sk_node))?;
653 10 : let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
654 10 :
655 10 : let now = Utc::now().naive_utc();
656 10 : if let Ok(latest_interaciton) =
657 10 : (now - existing_wal_connection.status.latest_connection_update).to_std()
658 : {
659 : // Drop connection if we haven't received keepalive message for a while.
660 10 : if latest_interaciton > self.conf.wal_connect_timeout {
661 2 : return Some(NewWalConnectionCandidate {
662 2 : safekeeper_id: new_sk_id,
663 2 : wal_source_connconf: new_wal_source_connconf,
664 2 : availability_zone: new_availability_zone,
665 2 : reason: ReconnectReason::NoKeepAlives {
666 2 : last_keep_alive: Some(
667 2 : existing_wal_connection.status.latest_connection_update,
668 2 : ),
669 2 : check_time: now,
670 2 : threshold: self.conf.wal_connect_timeout,
671 2 : },
672 2 : });
673 8 : }
674 0 : }
675 :
676 8 : if !existing_wal_connection.status.is_connected {
677 : // We haven't connected yet and we shouldn't switch until connection timeout (condition above).
678 0 : return None;
679 8 : }
680 :
681 8 : if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
682 8 : let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
683 8 : // Check if the new candidate has much more WAL than the current one.
684 8 : match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
685 8 : Some(new_sk_lsn_advantage) => {
686 8 : if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
687 2 : return Some(NewWalConnectionCandidate {
688 2 : safekeeper_id: new_sk_id,
689 2 : wal_source_connconf: new_wal_source_connconf,
690 2 : availability_zone: new_availability_zone,
691 2 : reason: ReconnectReason::LaggingWal {
692 2 : current_commit_lsn,
693 2 : new_commit_lsn,
694 2 : threshold: self.conf.max_lsn_wal_lag,
695 2 : },
696 2 : });
697 6 : }
698 6 : // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
699 6 : // and the current one is not, switch to the new one.
700 6 : if self.conf.availability_zone.is_some()
701 2 : && existing_wal_connection.availability_zone
702 2 : != self.conf.availability_zone
703 2 : && self.conf.availability_zone == new_availability_zone
704 : {
705 2 : return Some(NewWalConnectionCandidate {
706 2 : safekeeper_id: new_sk_id,
707 2 : availability_zone: new_availability_zone,
708 2 : wal_source_connconf: new_wal_source_connconf,
709 2 : reason: ReconnectReason::SwitchAvailabilityZone,
710 2 : });
711 4 : }
712 : }
713 0 : None => debug!(
714 0 : "Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
715 0 : ),
716 : }
717 0 : }
718 :
719 4 : let current_lsn = match existing_wal_connection.status.streaming_lsn {
720 4 : Some(lsn) => lsn,
721 0 : None => self.timeline.get_last_record_lsn(),
722 : };
723 4 : let current_commit_lsn = existing_wal_connection
724 4 : .status
725 4 : .commit_lsn
726 4 : .unwrap_or(current_lsn);
727 4 : let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
728 4 :
729 4 : // Keep discovered_new_wal only if connected safekeeper has not caught up yet.
730 4 : let mut discovered_new_wal = existing_wal_connection
731 4 : .discovered_new_wal
732 4 : .filter(|new_wal| new_wal.lsn > current_commit_lsn);
733 4 :
734 4 : if discovered_new_wal.is_none() {
735 : // Check if the new candidate has more WAL than the current one.
736 : // If the new candidate has more WAL than the current one, we consider switching to the new candidate.
737 2 : discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
738 2 : trace!(
739 0 : "New candidate has commit_lsn {}, higher than current_commit_lsn {}",
740 0 : candidate_commit_lsn,
741 0 : current_commit_lsn
742 0 : );
743 2 : Some(NewCommittedWAL {
744 2 : lsn: candidate_commit_lsn,
745 2 : discovered_at: Utc::now().naive_utc(),
746 2 : })
747 : } else {
748 0 : None
749 : };
750 2 : }
751 :
752 4 : let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
753 : // Connected safekeeper has more WAL, but we haven't received updates for some time.
754 0 : trace!(
755 0 : "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
756 0 : (now - existing_wal_connection.status.latest_wal_update).to_std(),
757 0 : current_lsn,
758 0 : current_commit_lsn
759 0 : );
760 0 : Some(existing_wal_connection.status.latest_wal_update)
761 : } else {
762 4 : discovered_new_wal.as_ref().map(|new_wal| {
763 4 : // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
764 4 : new_wal
765 4 : .discovered_at
766 4 : .max(existing_wal_connection.status.latest_wal_update)
767 4 : })
768 : };
769 :
770 : // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
771 4 : if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
772 4 : if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
773 2 : if candidate_commit_lsn > current_commit_lsn
774 2 : && waiting_for_new_wal > self.conf.lagging_wal_timeout
775 : {
776 2 : return Some(NewWalConnectionCandidate {
777 2 : safekeeper_id: new_sk_id,
778 2 : wal_source_connconf: new_wal_source_connconf,
779 2 : availability_zone: new_availability_zone,
780 2 : reason: ReconnectReason::NoWalTimeout {
781 2 : current_lsn,
782 2 : current_commit_lsn,
783 2 : candidate_commit_lsn,
784 2 : last_wal_interaction: Some(
785 2 : existing_wal_connection.status.latest_wal_update,
786 2 : ),
787 2 : check_time: now,
788 2 : threshold: self.conf.lagging_wal_timeout,
789 2 : },
790 2 : });
791 0 : }
792 2 : }
793 0 : }
794 :
795 2 : self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
796 : }
797 : None => {
798 6 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
799 8 : self.select_connection_candidate(None)?;
800 6 : return Some(NewWalConnectionCandidate {
801 6 : safekeeper_id: new_sk_id,
802 6 : availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
803 6 : wal_source_connconf: new_wal_source_connconf,
804 6 : reason: ReconnectReason::NoExistingConnection,
805 6 : });
806 : }
807 : }
808 :
809 2 : None
810 18 : }
811 :
812 : /// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
813 : /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
814 : ///
815 : /// The candidate that is chosen:
816 : /// * has no pending retry cooldown
817 : /// * has greatest commit_lsn among the ones that are left
818 18 : fn select_connection_candidate(
819 18 : &self,
820 18 : node_to_omit: Option<NodeId>,
821 18 : ) -> Option<(NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
822 18 : self.applicable_connection_candidates()
823 26 : .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
824 20 : .max_by_key(|(_, info, _)| info.commit_lsn)
825 18 : }
826 :
827 : /// Returns a list of safekeepers that have valid info and ready for connection.
828 : /// Some safekeepers are filtered by the retry cooldown.
829 18 : fn applicable_connection_candidates(
830 18 : &self,
831 18 : ) -> impl Iterator<Item = (NodeId, &SafekeeperTimelineInfo, PgConnectionConfig)> {
832 18 : let now = Utc::now().naive_utc();
833 18 :
834 18 : self.wal_stream_candidates
835 18 : .iter()
836 36 : .filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
837 32 : .filter(move |(sk_id, _)| {
838 32 : let next_retry_at = self
839 32 : .wal_connection_retries
840 32 : .get(sk_id)
841 32 : .and_then(|retry_info| {
842 2 : retry_info.next_retry_at
843 32 : });
844 32 :
845 32 : next_retry_at.is_none() || next_retry_at.unwrap() <= now
846 32 : }).filter_map(|(sk_id, broker_info)| {
847 30 : let info = &broker_info.timeline;
848 30 : if info.safekeeper_connstr.is_empty() {
849 4 : return None; // no connection string, ignore sk
850 26 : }
851 26 : match wal_stream_connection_config(
852 26 : self.id,
853 26 : info.safekeeper_connstr.as_ref(),
854 26 : match &self.conf.auth_token {
855 26 : None => None,
856 0 : Some(x) => Some(x),
857 : },
858 26 : self.conf.availability_zone.as_deref(),
859 : ) {
860 26 : Ok(connstr) => Some((*sk_id, info, connstr)),
861 0 : Err(e) => {
862 0 : error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
863 0 : None
864 : }
865 : }
866 30 : })
867 18 : }
868 :
869 : /// Remove candidates which haven't sent broker updates for a while.
870 18 : fn cleanup_old_candidates(&mut self) {
871 18 : let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
872 18 : let lagging_wal_timeout = self.conf.lagging_wal_timeout;
873 18 :
874 38 : self.wal_stream_candidates.retain(|node_id, broker_info| {
875 38 : if let Ok(time_since_latest_broker_update) =
876 38 : (Utc::now().naive_utc() - broker_info.latest_update).to_std()
877 : {
878 38 : let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
879 38 : if !should_retain {
880 2 : node_ids_to_remove.push(*node_id);
881 36 : }
882 38 : should_retain
883 : } else {
884 0 : true
885 : }
886 38 : });
887 18 :
888 18 : if !node_ids_to_remove.is_empty() {
889 4 : for node_id in node_ids_to_remove {
890 2 : info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
891 2 : self.wal_connection_retries.remove(&node_id);
892 2 : WALRECEIVER_CANDIDATES_REMOVED.inc();
893 : }
894 16 : }
895 18 : }
896 :
897 : /// # Cancel-Safety
898 : ///
899 : /// Not cancellation-safe.
900 0 : pub(super) async fn shutdown(mut self) {
901 0 : if let Some(wal_connection) = self.wal_connection.take() {
902 0 : wal_connection.connection_task.shutdown().await;
903 0 : }
904 0 : }
905 :
906 0 : fn manager_status(&self) -> ConnectionManagerStatus {
907 0 : ConnectionManagerStatus {
908 0 : existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
909 0 : wal_stream_candidates: self.wal_stream_candidates.clone(),
910 0 : }
911 0 : }
912 : }
913 :
914 : #[derive(Debug)]
915 : struct NewWalConnectionCandidate {
916 : safekeeper_id: NodeId,
917 : wal_source_connconf: PgConnectionConfig,
918 : availability_zone: Option<String>,
919 : reason: ReconnectReason,
920 : }
921 :
922 : /// Stores the reason why WAL connection was switched, for furter debugging purposes.
923 : #[derive(Debug, PartialEq, Eq)]
924 : enum ReconnectReason {
925 : NoExistingConnection,
926 : LaggingWal {
927 : current_commit_lsn: Lsn,
928 : new_commit_lsn: Lsn,
929 : threshold: NonZeroU64,
930 : },
931 : SwitchAvailabilityZone,
932 : NoWalTimeout {
933 : current_lsn: Lsn,
934 : current_commit_lsn: Lsn,
935 : candidate_commit_lsn: Lsn,
936 : last_wal_interaction: Option<NaiveDateTime>,
937 : check_time: NaiveDateTime,
938 : threshold: Duration,
939 : },
940 : NoKeepAlives {
941 : last_keep_alive: Option<NaiveDateTime>,
942 : check_time: NaiveDateTime,
943 : threshold: Duration,
944 : },
945 : }
946 :
947 : impl ReconnectReason {
948 0 : fn name(&self) -> &str {
949 0 : match self {
950 0 : ReconnectReason::NoExistingConnection => "NoExistingConnection",
951 0 : ReconnectReason::LaggingWal { .. } => "LaggingWal",
952 0 : ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
953 0 : ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
954 0 : ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
955 : }
956 0 : }
957 : }
958 :
959 : #[cfg(test)]
960 : mod tests {
961 : use super::*;
962 : use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
963 : use url::Host;
964 :
965 38 : fn dummy_broker_sk_timeline(
966 38 : commit_lsn: u64,
967 38 : safekeeper_connstr: &str,
968 38 : latest_update: NaiveDateTime,
969 38 : ) -> BrokerSkTimeline {
970 38 : BrokerSkTimeline {
971 38 : timeline: SafekeeperTimelineInfo {
972 38 : safekeeper_id: 0,
973 38 : tenant_timeline_id: None,
974 38 : term: 0,
975 38 : last_log_term: 0,
976 38 : flush_lsn: 0,
977 38 : commit_lsn,
978 38 : backup_lsn: 0,
979 38 : remote_consistent_lsn: 0,
980 38 : peer_horizon_lsn: 0,
981 38 : local_start_lsn: 0,
982 38 : safekeeper_connstr: safekeeper_connstr.to_owned(),
983 38 : http_connstr: safekeeper_connstr.to_owned(),
984 38 : availability_zone: None,
985 38 : },
986 38 : latest_update,
987 38 : }
988 38 : }
989 :
990 : #[tokio::test]
991 2 : async fn no_connection_no_candidate() -> anyhow::Result<()> {
992 2 : let harness = TenantHarness::create("no_connection_no_candidate")?;
993 5 : let mut state = dummy_state(&harness).await;
994 2 : let now = Utc::now().naive_utc();
995 2 :
996 2 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
997 2 : let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
998 2 :
999 2 : state.wal_connection = None;
1000 2 : state.wal_stream_candidates = HashMap::from([
1001 2 : (NodeId(0), dummy_broker_sk_timeline(1, "", now)),
1002 2 : (NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
1003 2 : (NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
1004 2 : (
1005 2 : NodeId(3),
1006 2 : dummy_broker_sk_timeline(
1007 2 : 1 + state.conf.max_lsn_wal_lag.get(),
1008 2 : "delay_over_threshold",
1009 2 : delay_over_threshold,
1010 2 : ),
1011 2 : ),
1012 2 : ]);
1013 2 :
1014 2 : let no_candidate = state.next_connection_candidate();
1015 2 : assert!(
1016 2 : no_candidate.is_none(),
1017 2 : "Expected no candidate selected out of non full data options, but got {no_candidate:?}"
1018 2 : );
1019 2 :
1020 2 : Ok(())
1021 2 : }
1022 :
1023 : #[tokio::test]
1024 2 : async fn connection_no_candidate() -> anyhow::Result<()> {
1025 2 : let harness = TenantHarness::create("connection_no_candidate")?;
1026 6 : let mut state = dummy_state(&harness).await;
1027 2 : let now = Utc::now().naive_utc();
1028 2 :
1029 2 : let connected_sk_id = NodeId(0);
1030 2 : let current_lsn = 100_000;
1031 2 :
1032 2 : let connection_status = WalConnectionStatus {
1033 2 : is_connected: true,
1034 2 : has_processed_wal: true,
1035 2 : latest_connection_update: now,
1036 2 : latest_wal_update: now,
1037 2 : commit_lsn: Some(Lsn(current_lsn)),
1038 2 : streaming_lsn: Some(Lsn(current_lsn)),
1039 2 : node: NodeId(1),
1040 2 : };
1041 2 :
1042 2 : state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
1043 2 : state.wal_connection = Some(WalConnection {
1044 2 : started_at: now,
1045 2 : sk_id: connected_sk_id,
1046 2 : availability_zone: None,
1047 2 : status: connection_status,
1048 2 : connection_task: state.spawn(move |sender, _| async move {
1049 2 : sender
1050 2 : .send(TaskStateUpdate::Progress(connection_status))
1051 2 : .ok();
1052 2 : Ok(())
1053 2 : }),
1054 2 : discovered_new_wal: None,
1055 2 : });
1056 2 : state.wal_stream_candidates = HashMap::from([
1057 2 : (
1058 2 : connected_sk_id,
1059 2 : dummy_broker_sk_timeline(
1060 2 : current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
1061 2 : DUMMY_SAFEKEEPER_HOST,
1062 2 : now,
1063 2 : ),
1064 2 : ),
1065 2 : (
1066 2 : NodeId(1),
1067 2 : dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
1068 2 : ),
1069 2 : (
1070 2 : NodeId(2),
1071 2 : dummy_broker_sk_timeline(
1072 2 : current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
1073 2 : "not_enough_advanced_lsn",
1074 2 : now,
1075 2 : ),
1076 2 : ),
1077 2 : ]);
1078 2 :
1079 2 : let no_candidate = state.next_connection_candidate();
1080 2 : assert!(
1081 2 : no_candidate.is_none(),
1082 2 : "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}"
1083 2 : );
1084 2 :
1085 2 : Ok(())
1086 2 : }
1087 :
1088 : #[tokio::test]
1089 2 : async fn no_connection_candidate() -> anyhow::Result<()> {
1090 2 : let harness = TenantHarness::create("no_connection_candidate")?;
1091 6 : let mut state = dummy_state(&harness).await;
1092 2 : let now = Utc::now().naive_utc();
1093 2 :
1094 2 : state.wal_connection = None;
1095 2 : state.wal_stream_candidates = HashMap::from([(
1096 2 : NodeId(0),
1097 2 : dummy_broker_sk_timeline(
1098 2 : 1 + state.conf.max_lsn_wal_lag.get(),
1099 2 : DUMMY_SAFEKEEPER_HOST,
1100 2 : now,
1101 2 : ),
1102 2 : )]);
1103 2 :
1104 2 : let only_candidate = state
1105 2 : .next_connection_candidate()
1106 2 : .expect("Expected one candidate selected out of the only data option, but got none");
1107 2 : assert_eq!(only_candidate.safekeeper_id, NodeId(0));
1108 2 : assert_eq!(
1109 2 : only_candidate.reason,
1110 2 : ReconnectReason::NoExistingConnection,
1111 2 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1112 2 : );
1113 2 : assert_eq!(
1114 2 : only_candidate.wal_source_connconf.host(),
1115 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1116 2 : );
1117 2 :
1118 2 : let selected_lsn = 100_000;
1119 2 : state.wal_stream_candidates = HashMap::from([
1120 2 : (
1121 2 : NodeId(0),
1122 2 : dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
1123 2 : ),
1124 2 : (
1125 2 : NodeId(1),
1126 2 : dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
1127 2 : ),
1128 2 : (
1129 2 : NodeId(2),
1130 2 : dummy_broker_sk_timeline(selected_lsn + 100, "", now),
1131 2 : ),
1132 2 : ]);
1133 2 : let biggest_wal_candidate = state.next_connection_candidate().expect(
1134 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1135 2 : );
1136 2 :
1137 2 : assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1));
1138 2 : assert_eq!(
1139 2 : biggest_wal_candidate.reason,
1140 2 : ReconnectReason::NoExistingConnection,
1141 2 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1142 2 : );
1143 2 : assert_eq!(
1144 2 : biggest_wal_candidate.wal_source_connconf.host(),
1145 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1146 2 : );
1147 2 :
1148 2 : Ok(())
1149 2 : }
1150 :
1151 : #[tokio::test]
1152 2 : async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
1153 2 : let harness = TenantHarness::create("candidate_with_many_connection_failures")?;
1154 5 : let mut state = dummy_state(&harness).await;
1155 2 : let now = Utc::now().naive_utc();
1156 2 :
1157 2 : let current_lsn = Lsn(100_000).align();
1158 2 : let bigger_lsn = Lsn(current_lsn.0 + 100).align();
1159 2 :
1160 2 : state.wal_connection = None;
1161 2 : state.wal_stream_candidates = HashMap::from([
1162 2 : (
1163 2 : NodeId(0),
1164 2 : dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1165 2 : ),
1166 2 : (
1167 2 : NodeId(1),
1168 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1169 2 : ),
1170 2 : ]);
1171 2 : state.wal_connection_retries = HashMap::from([(
1172 2 : NodeId(0),
1173 2 : RetryInfo {
1174 2 : next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
1175 2 : retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
1176 2 : },
1177 2 : )]);
1178 2 :
1179 2 : let candidate_with_less_errors = state
1180 2 : .next_connection_candidate()
1181 2 : .expect("Expected one candidate selected, but got none");
1182 2 : assert_eq!(
1183 2 : candidate_with_less_errors.safekeeper_id,
1184 2 : NodeId(1),
1185 2 : "Should select the node with no pending retry cooldown"
1186 2 : );
1187 2 :
1188 2 : Ok(())
1189 2 : }
1190 :
1191 : #[tokio::test]
1192 2 : async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1193 2 : let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?;
1194 6 : let mut state = dummy_state(&harness).await;
1195 2 : let current_lsn = Lsn(100_000).align();
1196 2 : let now = Utc::now().naive_utc();
1197 2 :
1198 2 : let connected_sk_id = NodeId(0);
1199 2 : let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
1200 2 :
1201 2 : let connection_status = WalConnectionStatus {
1202 2 : is_connected: true,
1203 2 : has_processed_wal: true,
1204 2 : latest_connection_update: now,
1205 2 : latest_wal_update: now,
1206 2 : commit_lsn: Some(current_lsn),
1207 2 : streaming_lsn: Some(current_lsn),
1208 2 : node: connected_sk_id,
1209 2 : };
1210 2 :
1211 2 : state.wal_connection = Some(WalConnection {
1212 2 : started_at: now,
1213 2 : sk_id: connected_sk_id,
1214 2 : availability_zone: None,
1215 2 : status: connection_status,
1216 2 : connection_task: state.spawn(move |sender, _| async move {
1217 2 : sender
1218 2 : .send(TaskStateUpdate::Progress(connection_status))
1219 2 : .ok();
1220 2 : Ok(())
1221 2 : }),
1222 2 : discovered_new_wal: None,
1223 2 : });
1224 2 : state.wal_stream_candidates = HashMap::from([
1225 2 : (
1226 2 : connected_sk_id,
1227 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1228 2 : ),
1229 2 : (
1230 2 : NodeId(1),
1231 2 : dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
1232 2 : ),
1233 2 : ]);
1234 2 :
1235 2 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1236 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1237 2 : );
1238 2 :
1239 2 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1));
1240 2 : assert_eq!(
1241 2 : over_threshcurrent_candidate.reason,
1242 2 : ReconnectReason::LaggingWal {
1243 2 : current_commit_lsn: current_lsn,
1244 2 : new_commit_lsn: new_lsn,
1245 2 : threshold: state.conf.max_lsn_wal_lag
1246 2 : },
1247 2 : "Should select bigger WAL safekeeper if it starts to lag enough"
1248 2 : );
1249 2 : assert_eq!(
1250 2 : over_threshcurrent_candidate.wal_source_connconf.host(),
1251 2 : &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
1252 2 : );
1253 2 :
1254 2 : Ok(())
1255 2 : }
1256 :
1257 : #[tokio::test]
1258 2 : async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
1259 2 : let harness = TenantHarness::create("timeout_connection_threshold_current_candidate")?;
1260 6 : let mut state = dummy_state(&harness).await;
1261 2 : let current_lsn = Lsn(100_000).align();
1262 2 : let now = Utc::now().naive_utc();
1263 2 :
1264 2 : let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
1265 2 : let time_over_threshold =
1266 2 : Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
1267 2 :
1268 2 : let connection_status = WalConnectionStatus {
1269 2 : is_connected: true,
1270 2 : has_processed_wal: true,
1271 2 : latest_connection_update: time_over_threshold,
1272 2 : latest_wal_update: time_over_threshold,
1273 2 : commit_lsn: Some(current_lsn),
1274 2 : streaming_lsn: Some(current_lsn),
1275 2 : node: NodeId(1),
1276 2 : };
1277 2 :
1278 2 : state.wal_connection = Some(WalConnection {
1279 2 : started_at: now,
1280 2 : sk_id: NodeId(1),
1281 2 : availability_zone: None,
1282 2 : status: connection_status,
1283 2 : connection_task: state.spawn(move |sender, _| async move {
1284 2 : sender
1285 2 : .send(TaskStateUpdate::Progress(connection_status))
1286 2 : .ok();
1287 2 : Ok(())
1288 2 : }),
1289 2 : discovered_new_wal: None,
1290 2 : });
1291 2 : state.wal_stream_candidates = HashMap::from([(
1292 2 : NodeId(0),
1293 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1294 2 : )]);
1295 2 :
1296 2 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1297 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1298 2 : );
1299 2 :
1300 2 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1301 2 : match over_threshcurrent_candidate.reason {
1302 2 : ReconnectReason::NoKeepAlives {
1303 2 : last_keep_alive,
1304 2 : threshold,
1305 2 : ..
1306 2 : } => {
1307 2 : assert_eq!(last_keep_alive, Some(time_over_threshold));
1308 2 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1309 2 : }
1310 2 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1311 2 : }
1312 2 : assert_eq!(
1313 2 : over_threshcurrent_candidate.wal_source_connconf.host(),
1314 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1315 2 : );
1316 2 :
1317 2 : Ok(())
1318 2 : }
1319 :
1320 : #[tokio::test]
1321 2 : async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1322 2 : let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate")?;
1323 6 : let mut state = dummy_state(&harness).await;
1324 2 : let current_lsn = Lsn(100_000).align();
1325 2 : let new_lsn = Lsn(100_100).align();
1326 2 : let now = Utc::now().naive_utc();
1327 2 :
1328 2 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
1329 2 : let time_over_threshold =
1330 2 : Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
1331 2 :
1332 2 : let connection_status = WalConnectionStatus {
1333 2 : is_connected: true,
1334 2 : has_processed_wal: true,
1335 2 : latest_connection_update: now,
1336 2 : latest_wal_update: time_over_threshold,
1337 2 : commit_lsn: Some(current_lsn),
1338 2 : streaming_lsn: Some(current_lsn),
1339 2 : node: NodeId(1),
1340 2 : };
1341 2 :
1342 2 : state.wal_connection = Some(WalConnection {
1343 2 : started_at: now,
1344 2 : sk_id: NodeId(1),
1345 2 : availability_zone: None,
1346 2 : status: connection_status,
1347 2 : connection_task: state.spawn(move |_, _| async move { Ok(()) }),
1348 2 : discovered_new_wal: Some(NewCommittedWAL {
1349 2 : discovered_at: time_over_threshold,
1350 2 : lsn: new_lsn,
1351 2 : }),
1352 2 : });
1353 2 : state.wal_stream_candidates = HashMap::from([(
1354 2 : NodeId(0),
1355 2 : dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1356 2 : )]);
1357 2 :
1358 2 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1359 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1360 2 : );
1361 2 :
1362 2 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1363 2 : match over_threshcurrent_candidate.reason {
1364 2 : ReconnectReason::NoWalTimeout {
1365 2 : current_lsn,
1366 2 : current_commit_lsn,
1367 2 : candidate_commit_lsn,
1368 2 : last_wal_interaction,
1369 2 : threshold,
1370 2 : ..
1371 2 : } => {
1372 2 : assert_eq!(current_lsn, current_lsn);
1373 2 : assert_eq!(current_commit_lsn, current_lsn);
1374 2 : assert_eq!(candidate_commit_lsn, new_lsn);
1375 2 : assert_eq!(last_wal_interaction, Some(time_over_threshold));
1376 2 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1377 2 : }
1378 2 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1379 2 : }
1380 2 : assert_eq!(
1381 2 : over_threshcurrent_candidate.wal_source_connconf.host(),
1382 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1383 2 : );
1384 2 :
1385 2 : Ok(())
1386 2 : }
1387 :
1388 : const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
1389 :
1390 16 : async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
1391 16 : let (tenant, ctx) = harness.load().await;
1392 16 : let timeline = tenant
1393 16 : .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
1394 46 : .await
1395 16 : .expect("Failed to create an empty timeline for dummy wal connection manager");
1396 16 :
1397 16 : ConnectionManagerState {
1398 16 : id: TenantTimelineId {
1399 16 : tenant_id: harness.tenant_shard_id.tenant_id,
1400 16 : timeline_id: TIMELINE_ID,
1401 16 : },
1402 16 : timeline,
1403 16 : cancel: CancellationToken::new(),
1404 16 : conf: WalReceiverConf {
1405 16 : wal_connect_timeout: Duration::from_secs(1),
1406 16 : lagging_wal_timeout: Duration::from_secs(1),
1407 16 : max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
1408 16 : auth_token: None,
1409 16 : availability_zone: None,
1410 16 : ingest_batch_size: 1,
1411 16 : },
1412 16 : wal_connection: None,
1413 16 : wal_stream_candidates: HashMap::new(),
1414 16 : wal_connection_retries: HashMap::new(),
1415 16 : }
1416 16 : }
1417 :
1418 : #[tokio::test]
1419 2 : async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
1420 2 : // Pageserver and one of safekeepers will be in the same availability zone
1421 2 : // and pageserver should prefer to connect to it.
1422 2 : let test_az = Some("test_az".to_owned());
1423 2 :
1424 2 : let harness = TenantHarness::create("switch_to_same_availability_zone")?;
1425 6 : let mut state = dummy_state(&harness).await;
1426 2 : state.conf.availability_zone = test_az.clone();
1427 2 : let current_lsn = Lsn(100_000).align();
1428 2 : let now = Utc::now().naive_utc();
1429 2 :
1430 2 : let connected_sk_id = NodeId(0);
1431 2 :
1432 2 : let connection_status = WalConnectionStatus {
1433 2 : is_connected: true,
1434 2 : has_processed_wal: true,
1435 2 : latest_connection_update: now,
1436 2 : latest_wal_update: now,
1437 2 : commit_lsn: Some(current_lsn),
1438 2 : streaming_lsn: Some(current_lsn),
1439 2 : node: connected_sk_id,
1440 2 : };
1441 2 :
1442 2 : state.wal_connection = Some(WalConnection {
1443 2 : started_at: now,
1444 2 : sk_id: connected_sk_id,
1445 2 : availability_zone: None,
1446 2 : status: connection_status,
1447 2 : connection_task: state.spawn(move |sender, _| async move {
1448 2 : sender
1449 2 : .send(TaskStateUpdate::Progress(connection_status))
1450 2 : .ok();
1451 2 : Ok(())
1452 2 : }),
1453 2 : discovered_new_wal: None,
1454 2 : });
1455 2 :
1456 2 : // We have another safekeeper with the same commit_lsn, and it have the same availability zone as
1457 2 : // the current pageserver.
1458 2 : let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
1459 2 : same_az_sk.timeline.availability_zone = test_az.clone();
1460 2 :
1461 2 : state.wal_stream_candidates = HashMap::from([
1462 2 : (
1463 2 : connected_sk_id,
1464 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1465 2 : ),
1466 2 : (NodeId(1), same_az_sk),
1467 2 : ]);
1468 2 :
1469 2 : // We expect that pageserver will switch to the safekeeper in the same availability zone,
1470 2 : // even if it has the same commit_lsn.
1471 2 : let next_candidate = state.next_connection_candidate().expect(
1472 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1473 2 : );
1474 2 :
1475 2 : assert_eq!(next_candidate.safekeeper_id, NodeId(1));
1476 2 : assert_eq!(
1477 2 : next_candidate.reason,
1478 2 : ReconnectReason::SwitchAvailabilityZone,
1479 2 : "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
1480 2 : );
1481 2 : assert_eq!(
1482 2 : next_candidate.wal_source_connconf.host(),
1483 2 : &Host::Domain("same_az".to_owned())
1484 2 : );
1485 2 :
1486 2 : Ok(())
1487 2 : }
1488 : }
|