Line data Source code
1 : //! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
2 : //! that contains the latest WAL to stream and this connection does not go stale.
3 : //!
4 : //! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
5 : //! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
6 : //! Current connection state is tracked too, to ensure it's not getting stale.
7 : //!
8 : //! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
9 : //! then a (re)connection happens, if necessary.
10 : //! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
11 :
12 : use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
13 :
14 : use super::{TaskStateUpdate, WalReceiverConf};
15 : use crate::context::{DownloadBehavior, RequestContext};
16 : use crate::metrics::{
17 : WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
18 : WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
19 : };
20 : use crate::task_mgr::TaskKind;
21 : use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
22 : use anyhow::Context;
23 : use chrono::{NaiveDateTime, Utc};
24 : use pageserver_api::models::TimelineState;
25 :
26 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
27 : use storage_broker::proto::{
28 : FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
29 : SubscribeByFilterRequest, TypeSubscription, TypedMessage,
30 : };
31 : use storage_broker::{BrokerClientChannel, Code, Streaming};
32 : use tokio_util::sync::CancellationToken;
33 : use tracing::*;
34 :
35 : use postgres_connection::PgConnectionConfig;
36 : use utils::backoff::{
37 : exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
38 : };
39 : use utils::postgres_client::wal_stream_connection_config;
40 : use utils::{
41 : id::{NodeId, TenantTimelineId},
42 : lsn::Lsn,
43 : };
44 :
45 : use super::{
46 : walreceiver_connection::WalConnectionStatus, walreceiver_connection::WalReceiverError,
47 : TaskEvent, TaskHandle,
48 : };
49 :
50 : pub(crate) struct Cancelled;
51 :
52 : /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
53 : /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
54 : /// If storage broker subscription is cancelled, exits.
55 : ///
56 : /// # Cancel-Safety
57 : ///
58 : /// Not cancellation-safe. Use `cancel` token to request cancellation.
59 0 : pub(super) async fn connection_manager_loop_step(
60 0 : broker_client: &mut BrokerClientChannel,
61 0 : connection_manager_state: &mut ConnectionManagerState,
62 0 : ctx: &RequestContext,
63 0 : cancel: &CancellationToken,
64 0 : manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
65 0 : ) -> Result<(), Cancelled> {
66 0 : match tokio::select! {
67 0 : _ = cancel.cancelled() => { return Err(Cancelled); },
68 0 : st = connection_manager_state.timeline.wait_to_become_active(ctx) => { st }
69 : } {
70 0 : Ok(()) => {}
71 0 : Err(new_state) => {
72 0 : debug!(
73 : ?new_state,
74 0 : "state changed, stopping wal connection manager loop"
75 : );
76 0 : return Err(Cancelled);
77 : }
78 : }
79 :
80 0 : WALRECEIVER_ACTIVE_MANAGERS.inc();
81 0 : scopeguard::defer! {
82 0 : WALRECEIVER_ACTIVE_MANAGERS.dec();
83 0 : }
84 0 :
85 0 : let id = TenantTimelineId {
86 0 : tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
87 0 : timeline_id: connection_manager_state.timeline.timeline_id,
88 0 : };
89 0 :
90 0 : let mut timeline_state_updates = connection_manager_state
91 0 : .timeline
92 0 : .subscribe_for_state_updates();
93 0 :
94 0 : let mut wait_lsn_status = connection_manager_state
95 0 : .timeline
96 0 : .subscribe_for_wait_lsn_updates();
97 0 :
98 0 : // TODO: create a separate config option for discovery request interval
99 0 : let discovery_request_interval = connection_manager_state.conf.lagging_wal_timeout;
100 0 : let mut last_discovery_ts: Option<std::time::Instant> = None;
101 :
102 : // Subscribe to the broker updates. Stream shares underlying TCP connection
103 : // with other streams on this client (other connection managers). When
104 : // object goes out of scope, stream finishes in drop() automatically.
105 0 : let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
106 0 : debug!("Subscribed for broker timeline updates");
107 :
108 : loop {
109 0 : let time_until_next_retry = connection_manager_state.time_until_next_retry();
110 0 : let any_activity = connection_manager_state.wal_connection.is_some()
111 0 : || !connection_manager_state.wal_stream_candidates.is_empty();
112 :
113 : // These things are happening concurrently:
114 : //
115 : // - cancellation request
116 : // - keep receiving WAL on the current connection
117 : // - if the shared state says we need to change connection, disconnect and return
118 : // - this runs in a separate task and we receive updates via a watch channel
119 : // - change connection if the rules decide so, or if the current connection dies
120 : // - receive updates from broker
121 : // - this might change the current desired connection
122 : // - timeline state changes to something that does not allow walreceiver to run concurrently
123 : // - if there's no connection and no candidates, try to send a discovery request
124 :
125 : // NB: make sure each of the select expressions are cancellation-safe
126 : // (no need for arms to be cancellation-safe).
127 0 : tokio::select! {
128 0 : _ = cancel.cancelled() => { return Err(Cancelled); }
129 0 : Some(wal_connection_update) = async {
130 0 : match connection_manager_state.wal_connection.as_mut() {
131 0 : Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
132 0 : None => None,
133 : }
134 0 : } => {
135 0 : let wal_connection = connection_manager_state.wal_connection.as_mut()
136 0 : .expect("Should have a connection, as checked by the corresponding select! guard");
137 0 : match wal_connection_update {
138 0 : TaskEvent::Update(TaskStateUpdate::Started) => {},
139 0 : TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
140 0 : if new_status.has_processed_wal {
141 0 : // We have advanced last_record_lsn by processing the WAL received
142 0 : // from this safekeeper. This is good enough to clean unsuccessful
143 0 : // retries history and allow reconnecting to this safekeeper without
144 0 : // sleeping for a long time.
145 0 : connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
146 0 : }
147 0 : wal_connection.status = new_status;
148 : }
149 0 : TaskEvent::End(walreceiver_task_result) => {
150 0 : match walreceiver_task_result {
151 0 : Ok(()) => debug!("WAL receiving task finished"),
152 0 : Err(e) => error!("wal receiver task finished with an error: {e:?}"),
153 : }
154 0 : connection_manager_state.drop_old_connection(false).await;
155 : },
156 : }
157 : },
158 :
159 : // Got a new update from the broker
160 0 : broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
161 0 : match broker_update {
162 0 : Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
163 0 : Err(status) => {
164 0 : match status.code() {
165 0 : Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") => {
166 0 : // tonic's error handling doesn't provide a clear code for disconnections: we get
167 0 : // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
168 0 : info!("broker disconnected: {status}");
169 : },
170 : _ => {
171 0 : warn!("broker subscription failed: {status}");
172 : }
173 : }
174 0 : return Ok(());
175 : }
176 : Ok(None) => {
177 0 : error!("broker subscription stream ended"); // can't happen
178 0 : return Ok(());
179 : }
180 : }
181 : },
182 :
183 0 : new_event = async {
184 : // Reminder: this match arm needs to be cancellation-safe.
185 : loop {
186 0 : if connection_manager_state.timeline.current_state() == TimelineState::Loading {
187 0 : warn!("wal connection manager should only be launched after timeline has become active");
188 0 : }
189 0 : match timeline_state_updates.changed().await {
190 : Ok(()) => {
191 0 : let new_state = connection_manager_state.timeline.current_state();
192 0 : match new_state {
193 : // we're already active as walreceiver, no need to reactivate
194 0 : TimelineState::Active => continue,
195 : TimelineState::Broken { .. } | TimelineState::Stopping => {
196 0 : debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
197 0 : return ControlFlow::Break(());
198 : }
199 : TimelineState::Loading => {
200 0 : warn!("timeline transitioned back to Loading state, that should not happen");
201 0 : return ControlFlow::Continue(());
202 : }
203 : }
204 : }
205 0 : Err(_sender_dropped_error) => return ControlFlow::Break(()),
206 : }
207 : }
208 0 : } => match new_event {
209 : ControlFlow::Continue(()) => {
210 0 : return Ok(());
211 : }
212 : ControlFlow::Break(()) => {
213 0 : debug!("Timeline is no longer active, stopping wal connection manager loop");
214 0 : return Err(Cancelled);
215 : }
216 : },
217 :
218 0 : Some(()) = async {
219 0 : match time_until_next_retry {
220 0 : Some(sleep_time) => {
221 0 : tokio::time::sleep(sleep_time).await;
222 0 : Some(())
223 : },
224 : None => {
225 0 : debug!("No candidates to retry, waiting indefinitely for the broker events");
226 0 : None
227 : }
228 : }
229 0 : } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
230 :
231 0 : Some(()) = async {
232 0 : // Reminder: this match arm needs to be cancellation-safe.
233 0 : // Calculating time needed to wait until sending the next discovery request.
234 0 : // Current implementation is conservative and sends discovery requests only when there are no candidates.
235 0 :
236 0 : if any_activity {
237 : // No need to send discovery requests if there is an active connection or candidates.
238 0 : return None;
239 0 : }
240 :
241 : // Waiting for an active wait_lsn request.
242 0 : while wait_lsn_status.borrow().is_none() {
243 0 : if wait_lsn_status.changed().await.is_err() {
244 : // wait_lsn_status channel was closed, exiting
245 0 : warn!("wait_lsn_status channel was closed in connection_manager_loop_step");
246 0 : return None;
247 0 : }
248 : }
249 :
250 : // All preconditions met, preparing to send a discovery request.
251 0 : let now = std::time::Instant::now();
252 0 : let next_discovery_ts = last_discovery_ts
253 0 : .map(|ts| ts + discovery_request_interval)
254 0 : .unwrap_or_else(|| now);
255 0 :
256 0 : if next_discovery_ts > now {
257 : // Prevent sending discovery requests too frequently.
258 0 : tokio::time::sleep(next_discovery_ts - now).await;
259 0 : }
260 :
261 0 : let tenant_timeline_id = Some(ProtoTenantTimelineId {
262 0 : tenant_id: id.tenant_id.as_ref().to_owned(),
263 0 : timeline_id: id.timeline_id.as_ref().to_owned(),
264 0 : });
265 0 : let request = SafekeeperDiscoveryRequest { tenant_timeline_id };
266 0 : let msg = TypedMessage {
267 0 : r#type: MessageType::SafekeeperDiscoveryRequest as i32,
268 0 : safekeeper_timeline_info: None,
269 0 : safekeeper_discovery_request: Some(request),
270 0 : safekeeper_discovery_response: None,
271 0 : };
272 0 :
273 0 : last_discovery_ts = Some(std::time::Instant::now());
274 0 : debug!("No active connection and no candidates, sending discovery request to the broker");
275 :
276 : // Cancellation safety: we want to send a message to the broker, but publish_one()
277 : // function can get cancelled by the other select! arm. This is absolutely fine, because
278 : // we just want to receive broker updates and discovery is not important if we already
279 : // receive updates.
280 : //
281 : // It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
282 : // This is totally fine because of the reason above.
283 :
284 : // This is a fire-and-forget request, we don't care about the response
285 0 : let _ = broker_client.publish_one(msg).await;
286 0 : debug!("Discovery request sent to the broker");
287 0 : None
288 0 : } => {}
289 : }
290 :
291 0 : if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
292 0 : info!("Switching to new connection candidate: {new_candidate:?}");
293 0 : connection_manager_state
294 0 : .change_connection(new_candidate, ctx)
295 0 : .await
296 0 : }
297 0 : *manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
298 : }
299 0 : }
300 :
301 : /// Endlessly try to subscribe for broker updates for a given timeline.
302 0 : async fn subscribe_for_timeline_updates(
303 0 : broker_client: &mut BrokerClientChannel,
304 0 : id: TenantTimelineId,
305 0 : cancel: &CancellationToken,
306 0 : ) -> Result<Streaming<TypedMessage>, Cancelled> {
307 0 : let mut attempt = 0;
308 : loop {
309 0 : exponential_backoff(
310 0 : attempt,
311 0 : DEFAULT_BASE_BACKOFF_SECONDS,
312 0 : DEFAULT_MAX_BACKOFF_SECONDS,
313 0 : cancel,
314 0 : )
315 0 : .await;
316 0 : attempt += 1;
317 0 :
318 0 : // subscribe to the specific timeline
319 0 : let request = SubscribeByFilterRequest {
320 0 : types: vec![
321 0 : TypeSubscription {
322 0 : r#type: MessageType::SafekeeperTimelineInfo as i32,
323 0 : },
324 0 : TypeSubscription {
325 0 : r#type: MessageType::SafekeeperDiscoveryResponse as i32,
326 0 : },
327 0 : ],
328 0 : tenant_timeline_id: Some(FilterTenantTimelineId {
329 0 : enabled: true,
330 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
331 0 : tenant_id: id.tenant_id.as_ref().to_owned(),
332 0 : timeline_id: id.timeline_id.as_ref().to_owned(),
333 0 : }),
334 0 : }),
335 0 : };
336 0 :
337 0 : match {
338 0 : tokio::select! {
339 0 : r = broker_client.subscribe_by_filter(request) => { r }
340 0 : _ = cancel.cancelled() => { return Err(Cancelled); }
341 : }
342 : } {
343 0 : Ok(resp) => {
344 0 : return Ok(resp.into_inner());
345 : }
346 0 : Err(e) => {
347 0 : // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
348 0 : // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
349 0 : info!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}");
350 0 : continue;
351 : }
352 : }
353 : }
354 0 : }
355 :
356 : const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
357 : const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
358 : const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
359 :
360 : /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
361 : pub(super) struct ConnectionManagerState {
362 : id: TenantTimelineId,
363 : /// Use pageserver data about the timeline to filter out some of the safekeepers.
364 : timeline: Arc<Timeline>,
365 : /// Child token of [`super::WalReceiver::cancel`], inherited to all tasks we spawn.
366 : cancel: CancellationToken,
367 : conf: WalReceiverConf,
368 : /// Current connection to safekeeper for WAL streaming.
369 : wal_connection: Option<WalConnection>,
370 : /// Info about retries and unsuccessful attempts to connect to safekeepers.
371 : wal_connection_retries: HashMap<NodeId, RetryInfo>,
372 : /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
373 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
374 : }
375 :
376 : /// An information about connection manager's current connection and connection candidates.
377 : #[derive(Debug, Clone)]
378 : pub struct ConnectionManagerStatus {
379 : existing_connection: Option<WalConnectionStatus>,
380 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
381 : }
382 :
383 : impl ConnectionManagerStatus {
384 : /// Generates a string, describing current connection status in a form, suitable for logging.
385 0 : pub fn to_human_readable_string(&self) -> String {
386 0 : let mut resulting_string = String::new();
387 0 : match &self.existing_connection {
388 0 : Some(connection) => {
389 0 : if connection.has_processed_wal {
390 0 : resulting_string.push_str(&format!(
391 0 : " (update {}): streaming WAL from node {}, ",
392 0 : connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
393 0 : connection.node,
394 0 : ));
395 0 :
396 0 : match (connection.streaming_lsn, connection.commit_lsn) {
397 0 : (None, None) => resulting_string.push_str("no streaming data"),
398 0 : (None, Some(commit_lsn)) => {
399 0 : resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
400 : }
401 0 : (Some(streaming_lsn), None) => {
402 0 : resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
403 : }
404 0 : (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
405 0 : &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
406 0 : ),
407 : }
408 0 : } else if connection.is_connected {
409 0 : resulting_string.push_str(&format!(
410 0 : " (update {}): connecting to node {}",
411 0 : connection
412 0 : .latest_connection_update
413 0 : .format("%Y-%m-%d %H:%M:%S"),
414 0 : connection.node,
415 0 : ));
416 0 : } else {
417 0 : resulting_string.push_str(&format!(
418 0 : " (update {}): initializing node {} connection",
419 0 : connection
420 0 : .latest_connection_update
421 0 : .format("%Y-%m-%d %H:%M:%S"),
422 0 : connection.node,
423 0 : ));
424 0 : }
425 : }
426 0 : None => resulting_string.push_str(": disconnected"),
427 : }
428 :
429 0 : resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
430 0 : let mut candidates = self.wal_stream_candidates.iter().peekable();
431 0 : while let Some((node_id, candidate_info)) = candidates.next() {
432 0 : resulting_string.push_str(&format!(
433 0 : "({}|{}|{})",
434 0 : node_id,
435 0 : candidate_info.latest_update.format("%H:%M:%S"),
436 0 : Lsn(candidate_info.timeline.commit_lsn)
437 0 : ));
438 0 : if candidates.peek().is_some() {
439 0 : resulting_string.push_str(", ");
440 0 : }
441 : }
442 0 : resulting_string.push(']');
443 0 :
444 0 : resulting_string
445 0 : }
446 : }
447 :
448 : /// Current connection data.
449 : #[derive(Debug)]
450 : struct WalConnection {
451 : /// Time when the connection was initiated.
452 : started_at: NaiveDateTime,
453 : /// Current safekeeper pageserver is connected to for WAL streaming.
454 : sk_id: NodeId,
455 : /// Availability zone of the safekeeper.
456 : availability_zone: Option<String>,
457 : /// Status of the connection.
458 : status: WalConnectionStatus,
459 : /// WAL streaming task handle.
460 : connection_task: TaskHandle<WalConnectionStatus>,
461 : /// Have we discovered that other safekeeper has more recent WAL than we do?
462 : discovered_new_wal: Option<NewCommittedWAL>,
463 : }
464 :
465 : /// Notion of a new committed WAL, which exists on other safekeeper.
466 : #[derive(Debug, Clone, Copy)]
467 : struct NewCommittedWAL {
468 : /// LSN of the new committed WAL.
469 : lsn: Lsn,
470 : /// When we discovered that the new committed WAL exists on other safekeeper.
471 : discovered_at: NaiveDateTime,
472 : }
473 :
474 : #[derive(Debug, Clone, Copy)]
475 : struct RetryInfo {
476 : next_retry_at: Option<NaiveDateTime>,
477 : retry_duration_seconds: f64,
478 : }
479 :
480 : /// Data about the timeline to connect to, received from the broker.
481 : #[derive(Debug, Clone)]
482 : struct BrokerSkTimeline {
483 : timeline: SafekeeperDiscoveryResponse,
484 : /// Time at which the data was fetched from the broker last time, to track the stale data.
485 : latest_update: NaiveDateTime,
486 : }
487 :
488 : impl ConnectionManagerState {
489 0 : pub(super) fn new(
490 0 : timeline: Arc<Timeline>,
491 0 : conf: WalReceiverConf,
492 0 : cancel: CancellationToken,
493 0 : ) -> Self {
494 0 : let id = TenantTimelineId {
495 0 : tenant_id: timeline.tenant_shard_id.tenant_id,
496 0 : timeline_id: timeline.timeline_id,
497 0 : };
498 0 : Self {
499 0 : id,
500 0 : timeline,
501 0 : cancel,
502 0 : conf,
503 0 : wal_connection: None,
504 0 : wal_stream_candidates: HashMap::new(),
505 0 : wal_connection_retries: HashMap::new(),
506 0 : }
507 0 : }
508 :
509 10 : fn spawn<Fut>(
510 10 : &self,
511 10 : task: impl FnOnce(
512 10 : tokio::sync::watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
513 10 : CancellationToken,
514 10 : ) -> Fut
515 10 : + Send
516 10 : + 'static,
517 10 : ) -> TaskHandle<WalConnectionStatus>
518 10 : where
519 10 : Fut: std::future::Future<Output = anyhow::Result<()>> + Send,
520 10 : {
521 10 : // TODO: get rid of TaskHandle
522 10 : super::TaskHandle::spawn(&self.cancel, task)
523 10 : }
524 :
525 : /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
526 0 : async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
527 0 : WALRECEIVER_SWITCHES
528 0 : .with_label_values(&[new_sk.reason.name()])
529 0 : .inc();
530 0 :
531 0 : self.drop_old_connection(true).await;
532 :
533 0 : let node_id = new_sk.safekeeper_id;
534 0 : let connect_timeout = self.conf.wal_connect_timeout;
535 0 : let ingest_batch_size = self.conf.ingest_batch_size;
536 0 : let timeline = Arc::clone(&self.timeline);
537 0 : let ctx = ctx.detached_child(
538 0 : TaskKind::WalReceiverConnectionHandler,
539 0 : DownloadBehavior::Download,
540 0 : );
541 :
542 0 : let span = info_span!("connection", %node_id);
543 0 : let connection_handle = self.spawn(move |events_sender, cancellation| {
544 0 : async move {
545 0 : debug_assert_current_span_has_tenant_and_timeline_id();
546 :
547 0 : let res = super::walreceiver_connection::handle_walreceiver_connection(
548 0 : timeline,
549 0 : new_sk.wal_source_connconf,
550 0 : events_sender,
551 0 : cancellation.clone(),
552 0 : connect_timeout,
553 0 : ctx,
554 0 : node_id,
555 0 : ingest_batch_size,
556 0 : )
557 0 : .await;
558 :
559 0 : match res {
560 0 : Ok(()) => Ok(()),
561 0 : Err(e) => {
562 0 : match e {
563 0 : WalReceiverError::SuccessfulCompletion(msg) => {
564 0 : info!("walreceiver connection handling ended with success: {msg}");
565 0 : Ok(())
566 : }
567 0 : WalReceiverError::ExpectedSafekeeperError(e) => {
568 0 : info!("walreceiver connection handling ended: {e}");
569 0 : Ok(())
570 : }
571 : WalReceiverError::ClosedGate => {
572 0 : info!(
573 0 : "walreceiver connection handling ended because of closed gate"
574 : );
575 0 : Ok(())
576 : }
577 0 : WalReceiverError::Other(e) => {
578 0 : // give out an error to have task_mgr give it a really verbose logging
579 0 : if cancellation.is_cancelled() {
580 : // Ideally we would learn about this via some path other than Other, but
581 : // that requires refactoring all the intermediate layers of ingest code
582 : // that only emit anyhow::Error
583 0 : Ok(())
584 : } else {
585 0 : Err(e).context("walreceiver connection handling failure")
586 : }
587 : }
588 : }
589 : }
590 : }
591 0 : }
592 0 : .instrument(span)
593 0 : });
594 0 :
595 0 : let now = Utc::now().naive_utc();
596 0 : self.wal_connection = Some(WalConnection {
597 0 : started_at: now,
598 0 : sk_id: new_sk.safekeeper_id,
599 0 : availability_zone: new_sk.availability_zone,
600 0 : status: WalConnectionStatus {
601 0 : is_connected: false,
602 0 : has_processed_wal: false,
603 0 : latest_connection_update: now,
604 0 : latest_wal_update: now,
605 0 : streaming_lsn: None,
606 0 : commit_lsn: None,
607 0 : node: node_id,
608 0 : },
609 0 : connection_task: connection_handle,
610 0 : discovered_new_wal: None,
611 0 : });
612 0 : }
613 :
614 : /// Drops the current connection (if any) and updates retry timeout for the next
615 : /// connection attempt to the same safekeeper.
616 : ///
617 : /// # Cancel-Safety
618 : ///
619 : /// Not cancellation-safe.
620 0 : async fn drop_old_connection(&mut self, needs_shutdown: bool) {
621 0 : let wal_connection = match self.wal_connection.take() {
622 0 : Some(wal_connection) => wal_connection,
623 0 : None => return,
624 : };
625 :
626 0 : if needs_shutdown {
627 0 : wal_connection
628 0 : .connection_task
629 0 : .shutdown()
630 : // This here is why this function isn't cancellation-safe.
631 : // If we got cancelled here, then self.wal_connection is already None and we lose track of the task.
632 : // Even if our caller diligently calls Self::shutdown(), it will find a self.wal_connection=None
633 : // and thus be ineffective.
634 0 : .await;
635 0 : }
636 :
637 0 : let retry = self
638 0 : .wal_connection_retries
639 0 : .entry(wal_connection.sk_id)
640 0 : .or_insert(RetryInfo {
641 0 : next_retry_at: None,
642 0 : retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
643 0 : });
644 0 :
645 0 : let now = Utc::now().naive_utc();
646 0 :
647 0 : // Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
648 0 : // and we add backoff to the time when we started the connection attempt. If the connection
649 0 : // was active for a long time, then next_retry_at will be in the past.
650 0 : retry.next_retry_at =
651 0 : wal_connection
652 0 : .started_at
653 0 : .checked_add_signed(chrono::Duration::milliseconds(
654 0 : (retry.retry_duration_seconds * 1000.0) as i64,
655 0 : ));
656 :
657 0 : if let Some(next) = &retry.next_retry_at {
658 0 : if next > &now {
659 0 : info!(
660 0 : "Next connection retry to {:?} is at {}",
661 : wal_connection.sk_id, next
662 : );
663 0 : }
664 0 : }
665 :
666 0 : let next_retry_duration =
667 0 : retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
668 0 : // Clamp the next retry duration to the maximum allowed.
669 0 : let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
670 0 : // Clamp the next retry duration to the minimum allowed.
671 0 : let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
672 0 :
673 0 : retry.retry_duration_seconds = next_retry_duration;
674 0 : }
675 :
676 : /// Returns time needed to wait to have a new candidate for WAL streaming.
677 0 : fn time_until_next_retry(&self) -> Option<Duration> {
678 0 : let now = Utc::now().naive_utc();
679 :
680 0 : let next_retry_at = self
681 0 : .wal_connection_retries
682 0 : .values()
683 0 : .filter_map(|retry| retry.next_retry_at)
684 0 : .filter(|next_retry_at| next_retry_at > &now)
685 0 : .min()?;
686 :
687 0 : (next_retry_at - now).to_std().ok()
688 0 : }
689 :
690 : /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
691 0 : fn register_timeline_update(&mut self, typed_msg: TypedMessage) {
692 0 : let mut is_discovery = false;
693 0 : let timeline_update = match typed_msg.r#type() {
694 : MessageType::SafekeeperTimelineInfo => {
695 0 : let info = match typed_msg.safekeeper_timeline_info {
696 0 : Some(info) => info,
697 : None => {
698 0 : warn!("bad proto message from broker: no safekeeper_timeline_info");
699 0 : return;
700 : }
701 : };
702 0 : SafekeeperDiscoveryResponse {
703 0 : safekeeper_id: info.safekeeper_id,
704 0 : tenant_timeline_id: info.tenant_timeline_id,
705 0 : commit_lsn: info.commit_lsn,
706 0 : safekeeper_connstr: info.safekeeper_connstr,
707 0 : availability_zone: info.availability_zone,
708 0 : standby_horizon: info.standby_horizon,
709 0 : }
710 : }
711 : MessageType::SafekeeperDiscoveryResponse => {
712 0 : is_discovery = true;
713 0 : match typed_msg.safekeeper_discovery_response {
714 0 : Some(response) => response,
715 : None => {
716 0 : warn!("bad proto message from broker: no safekeeper_discovery_response");
717 0 : return;
718 : }
719 : }
720 : }
721 : _ => {
722 : // unexpected message
723 0 : return;
724 : }
725 : };
726 :
727 0 : WALRECEIVER_BROKER_UPDATES.inc();
728 0 :
729 0 : trace!(
730 0 : "safekeeper info update: standby_horizon(cutoff)={}",
731 : timeline_update.standby_horizon
732 : );
733 0 : if timeline_update.standby_horizon != 0 {
734 0 : // ignore reports from safekeepers not connected to replicas
735 0 : self.timeline
736 0 : .standby_horizon
737 0 : .store(Lsn(timeline_update.standby_horizon));
738 0 : self.timeline
739 0 : .metrics
740 0 : .standby_horizon_gauge
741 0 : .set(timeline_update.standby_horizon as i64);
742 0 : }
743 :
744 0 : let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
745 0 : let old_entry = self.wal_stream_candidates.insert(
746 0 : new_safekeeper_id,
747 0 : BrokerSkTimeline {
748 0 : timeline: timeline_update,
749 0 : latest_update: Utc::now().naive_utc(),
750 0 : },
751 0 : );
752 0 :
753 0 : if old_entry.is_none() {
754 0 : info!(
755 : ?is_discovery,
756 : %new_safekeeper_id,
757 0 : "New SK node was added",
758 : );
759 0 : WALRECEIVER_CANDIDATES_ADDED.inc();
760 0 : }
761 0 : }
762 :
763 : /// Cleans up stale broker records and checks the rest for the new connection candidate.
764 : /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
765 : /// The current rules for approving new candidates:
766 : /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
767 : /// * if there's no such entry, no new candidate found, abort
768 : /// * otherwise check if the candidate is much better than the current one
769 : ///
770 : /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
771 : /// General rules are following:
772 : /// * if connected safekeeper is not present, pick the candidate
773 : /// * if we haven't received any updates for some time, pick the candidate
774 : /// * if the candidate commit_lsn is much higher than the current one, pick the candidate
775 : /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
776 : /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
777 : ///
778 : /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
779 : /// Both thresholds are configured per tenant.
780 18 : fn next_connection_candidate(&mut self) -> Option<NewWalConnectionCandidate> {
781 18 : self.cleanup_old_candidates();
782 18 :
783 18 : match &self.wal_connection {
784 10 : Some(existing_wal_connection) => {
785 10 : let connected_sk_node = existing_wal_connection.sk_id;
786 :
787 10 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
788 10 : self.select_connection_candidate(Some(connected_sk_node))?;
789 10 : let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
790 10 :
791 10 : let now = Utc::now().naive_utc();
792 10 : if let Ok(latest_interaciton) =
793 10 : (now - existing_wal_connection.status.latest_connection_update).to_std()
794 : {
795 : // Drop connection if we haven't received keepalive message for a while.
796 10 : if latest_interaciton > self.conf.wal_connect_timeout {
797 2 : return Some(NewWalConnectionCandidate {
798 2 : safekeeper_id: new_sk_id,
799 2 : wal_source_connconf: new_wal_source_connconf,
800 2 : availability_zone: new_availability_zone,
801 2 : reason: ReconnectReason::NoKeepAlives {
802 2 : last_keep_alive: Some(
803 2 : existing_wal_connection.status.latest_connection_update,
804 2 : ),
805 2 : check_time: now,
806 2 : threshold: self.conf.wal_connect_timeout,
807 2 : },
808 2 : });
809 8 : }
810 0 : }
811 :
812 8 : if !existing_wal_connection.status.is_connected {
813 : // We haven't connected yet and we shouldn't switch until connection timeout (condition above).
814 0 : return None;
815 8 : }
816 :
817 8 : if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
818 8 : let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
819 8 : // Check if the new candidate has much more WAL than the current one.
820 8 : match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
821 8 : Some(new_sk_lsn_advantage) => {
822 8 : if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
823 2 : return Some(NewWalConnectionCandidate {
824 2 : safekeeper_id: new_sk_id,
825 2 : wal_source_connconf: new_wal_source_connconf,
826 2 : availability_zone: new_availability_zone,
827 2 : reason: ReconnectReason::LaggingWal {
828 2 : current_commit_lsn,
829 2 : new_commit_lsn,
830 2 : threshold: self.conf.max_lsn_wal_lag,
831 2 : },
832 2 : });
833 6 : }
834 6 : // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
835 6 : // and the current one is not, switch to the new one.
836 6 : if self.conf.availability_zone.is_some()
837 2 : && existing_wal_connection.availability_zone
838 2 : != self.conf.availability_zone
839 2 : && self.conf.availability_zone == new_availability_zone
840 : {
841 2 : return Some(NewWalConnectionCandidate {
842 2 : safekeeper_id: new_sk_id,
843 2 : availability_zone: new_availability_zone,
844 2 : wal_source_connconf: new_wal_source_connconf,
845 2 : reason: ReconnectReason::SwitchAvailabilityZone,
846 2 : });
847 4 : }
848 : }
849 0 : None => debug!(
850 0 : "Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
851 : ),
852 : }
853 0 : }
854 :
855 4 : let current_lsn = match existing_wal_connection.status.streaming_lsn {
856 4 : Some(lsn) => lsn,
857 0 : None => self.timeline.get_last_record_lsn(),
858 : };
859 4 : let current_commit_lsn = existing_wal_connection
860 4 : .status
861 4 : .commit_lsn
862 4 : .unwrap_or(current_lsn);
863 4 : let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
864 4 :
865 4 : // Keep discovered_new_wal only if connected safekeeper has not caught up yet.
866 4 : let mut discovered_new_wal = existing_wal_connection
867 4 : .discovered_new_wal
868 4 : .filter(|new_wal| new_wal.lsn > current_commit_lsn);
869 4 :
870 4 : if discovered_new_wal.is_none() {
871 : // Check if the new candidate has more WAL than the current one.
872 : // If the new candidate has more WAL than the current one, we consider switching to the new candidate.
873 2 : discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
874 2 : trace!(
875 0 : "New candidate has commit_lsn {}, higher than current_commit_lsn {}",
876 : candidate_commit_lsn,
877 : current_commit_lsn
878 : );
879 2 : Some(NewCommittedWAL {
880 2 : lsn: candidate_commit_lsn,
881 2 : discovered_at: Utc::now().naive_utc(),
882 2 : })
883 : } else {
884 0 : None
885 : };
886 2 : }
887 :
888 4 : let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
889 : // Connected safekeeper has more WAL, but we haven't received updates for some time.
890 0 : trace!(
891 0 : "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
892 0 : (now - existing_wal_connection.status.latest_wal_update).to_std(),
893 : current_lsn,
894 : current_commit_lsn
895 : );
896 0 : Some(existing_wal_connection.status.latest_wal_update)
897 : } else {
898 4 : discovered_new_wal.as_ref().map(|new_wal| {
899 4 : // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
900 4 : new_wal
901 4 : .discovered_at
902 4 : .max(existing_wal_connection.status.latest_wal_update)
903 4 : })
904 : };
905 :
906 : // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
907 4 : if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
908 4 : if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
909 2 : if candidate_commit_lsn > current_commit_lsn
910 2 : && waiting_for_new_wal > self.conf.lagging_wal_timeout
911 : {
912 2 : return Some(NewWalConnectionCandidate {
913 2 : safekeeper_id: new_sk_id,
914 2 : wal_source_connconf: new_wal_source_connconf,
915 2 : availability_zone: new_availability_zone,
916 2 : reason: ReconnectReason::NoWalTimeout {
917 2 : current_lsn,
918 2 : current_commit_lsn,
919 2 : candidate_commit_lsn,
920 2 : last_wal_interaction: Some(
921 2 : existing_wal_connection.status.latest_wal_update,
922 2 : ),
923 2 : check_time: now,
924 2 : threshold: self.conf.lagging_wal_timeout,
925 2 : },
926 2 : });
927 0 : }
928 2 : }
929 0 : }
930 :
931 2 : self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
932 : }
933 : None => {
934 6 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
935 8 : self.select_connection_candidate(None)?;
936 6 : return Some(NewWalConnectionCandidate {
937 6 : safekeeper_id: new_sk_id,
938 6 : availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
939 6 : wal_source_connconf: new_wal_source_connconf,
940 6 : reason: ReconnectReason::NoExistingConnection,
941 6 : });
942 : }
943 : }
944 :
945 2 : None
946 18 : }
947 :
948 : /// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
949 : /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
950 : ///
951 : /// The candidate that is chosen:
952 : /// * has no pending retry cooldown
953 : /// * has greatest commit_lsn among the ones that are left
954 18 : fn select_connection_candidate(
955 18 : &self,
956 18 : node_to_omit: Option<NodeId>,
957 18 : ) -> Option<(NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
958 18 : self.applicable_connection_candidates()
959 26 : .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
960 20 : .max_by_key(|(_, info, _)| info.commit_lsn)
961 18 : }
962 :
963 : /// Returns a list of safekeepers that have valid info and ready for connection.
964 : /// Some safekeepers are filtered by the retry cooldown.
965 18 : fn applicable_connection_candidates(
966 18 : &self,
967 18 : ) -> impl Iterator<Item = (NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
968 18 : let now = Utc::now().naive_utc();
969 18 :
970 18 : self.wal_stream_candidates
971 18 : .iter()
972 36 : .filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
973 32 : .filter(move |(sk_id, _)| {
974 32 : let next_retry_at = self
975 32 : .wal_connection_retries
976 32 : .get(sk_id)
977 32 : .and_then(|retry_info| {
978 2 : retry_info.next_retry_at
979 32 : });
980 32 :
981 32 : next_retry_at.is_none() || next_retry_at.unwrap() <= now
982 32 : }).filter_map(|(sk_id, broker_info)| {
983 30 : let info = &broker_info.timeline;
984 30 : if info.safekeeper_connstr.is_empty() {
985 4 : return None; // no connection string, ignore sk
986 26 : }
987 26 : match wal_stream_connection_config(
988 26 : self.id,
989 26 : info.safekeeper_connstr.as_ref(),
990 26 : match &self.conf.auth_token {
991 26 : None => None,
992 0 : Some(x) => Some(x),
993 : },
994 26 : self.conf.availability_zone.as_deref(),
995 : ) {
996 26 : Ok(connstr) => Some((*sk_id, info, connstr)),
997 0 : Err(e) => {
998 0 : error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
999 0 : None
1000 : }
1001 : }
1002 30 : })
1003 18 : }
1004 :
1005 : /// Remove candidates which haven't sent broker updates for a while.
1006 18 : fn cleanup_old_candidates(&mut self) {
1007 18 : let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
1008 18 : let lagging_wal_timeout = self.conf.lagging_wal_timeout;
1009 18 :
1010 38 : self.wal_stream_candidates.retain(|node_id, broker_info| {
1011 38 : if let Ok(time_since_latest_broker_update) =
1012 38 : (Utc::now().naive_utc() - broker_info.latest_update).to_std()
1013 : {
1014 38 : let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
1015 38 : if !should_retain {
1016 2 : node_ids_to_remove.push(*node_id);
1017 36 : }
1018 38 : should_retain
1019 : } else {
1020 0 : true
1021 : }
1022 38 : });
1023 18 :
1024 18 : if !node_ids_to_remove.is_empty() {
1025 4 : for node_id in node_ids_to_remove {
1026 2 : info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
1027 2 : self.wal_connection_retries.remove(&node_id);
1028 2 : WALRECEIVER_CANDIDATES_REMOVED.inc();
1029 : }
1030 16 : }
1031 18 : }
1032 :
1033 : /// # Cancel-Safety
1034 : ///
1035 : /// Not cancellation-safe.
1036 0 : pub(super) async fn shutdown(mut self) {
1037 0 : if let Some(wal_connection) = self.wal_connection.take() {
1038 0 : wal_connection.connection_task.shutdown().await;
1039 0 : }
1040 0 : }
1041 :
1042 0 : fn manager_status(&self) -> ConnectionManagerStatus {
1043 0 : ConnectionManagerStatus {
1044 0 : existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
1045 0 : wal_stream_candidates: self.wal_stream_candidates.clone(),
1046 0 : }
1047 0 : }
1048 : }
1049 :
1050 : #[derive(Debug)]
1051 : struct NewWalConnectionCandidate {
1052 : safekeeper_id: NodeId,
1053 : wal_source_connconf: PgConnectionConfig,
1054 : availability_zone: Option<String>,
1055 : reason: ReconnectReason,
1056 : }
1057 :
1058 : /// Stores the reason why WAL connection was switched, for furter debugging purposes.
1059 : #[derive(Debug, PartialEq, Eq)]
1060 : enum ReconnectReason {
1061 : NoExistingConnection,
1062 : LaggingWal {
1063 : current_commit_lsn: Lsn,
1064 : new_commit_lsn: Lsn,
1065 : threshold: NonZeroU64,
1066 : },
1067 : SwitchAvailabilityZone,
1068 : NoWalTimeout {
1069 : current_lsn: Lsn,
1070 : current_commit_lsn: Lsn,
1071 : candidate_commit_lsn: Lsn,
1072 : last_wal_interaction: Option<NaiveDateTime>,
1073 : check_time: NaiveDateTime,
1074 : threshold: Duration,
1075 : },
1076 : NoKeepAlives {
1077 : last_keep_alive: Option<NaiveDateTime>,
1078 : check_time: NaiveDateTime,
1079 : threshold: Duration,
1080 : },
1081 : }
1082 :
1083 : impl ReconnectReason {
1084 0 : fn name(&self) -> &str {
1085 0 : match self {
1086 0 : ReconnectReason::NoExistingConnection => "NoExistingConnection",
1087 0 : ReconnectReason::LaggingWal { .. } => "LaggingWal",
1088 0 : ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
1089 0 : ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
1090 0 : ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
1091 : }
1092 0 : }
1093 : }
1094 :
1095 : #[cfg(test)]
1096 : mod tests {
1097 : use super::*;
1098 : use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
1099 : use url::Host;
1100 :
1101 38 : fn dummy_broker_sk_timeline(
1102 38 : commit_lsn: u64,
1103 38 : safekeeper_connstr: &str,
1104 38 : latest_update: NaiveDateTime,
1105 38 : ) -> BrokerSkTimeline {
1106 38 : BrokerSkTimeline {
1107 38 : timeline: SafekeeperDiscoveryResponse {
1108 38 : safekeeper_id: 0,
1109 38 : tenant_timeline_id: None,
1110 38 : commit_lsn,
1111 38 : safekeeper_connstr: safekeeper_connstr.to_owned(),
1112 38 : availability_zone: None,
1113 38 : standby_horizon: 0,
1114 38 : },
1115 38 : latest_update,
1116 38 : }
1117 38 : }
1118 :
1119 : #[tokio::test]
1120 2 : async fn no_connection_no_candidate() -> anyhow::Result<()> {
1121 2 : let harness = TenantHarness::create("no_connection_no_candidate").await?;
1122 12 : let mut state = dummy_state(&harness).await;
1123 2 : let now = Utc::now().naive_utc();
1124 2 :
1125 2 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
1126 2 : let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
1127 2 :
1128 2 : state.wal_connection = None;
1129 2 : state.wal_stream_candidates = HashMap::from([
1130 2 : (NodeId(0), dummy_broker_sk_timeline(1, "", now)),
1131 2 : (NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
1132 2 : (NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
1133 2 : (
1134 2 : NodeId(3),
1135 2 : dummy_broker_sk_timeline(
1136 2 : 1 + state.conf.max_lsn_wal_lag.get(),
1137 2 : "delay_over_threshold",
1138 2 : delay_over_threshold,
1139 2 : ),
1140 2 : ),
1141 2 : ]);
1142 2 :
1143 2 : let no_candidate = state.next_connection_candidate();
1144 2 : assert!(
1145 2 : no_candidate.is_none(),
1146 2 : "Expected no candidate selected out of non full data options, but got {no_candidate:?}"
1147 2 : );
1148 2 :
1149 2 : Ok(())
1150 2 : }
1151 :
1152 : #[tokio::test]
1153 2 : async fn connection_no_candidate() -> anyhow::Result<()> {
1154 2 : let harness = TenantHarness::create("connection_no_candidate").await?;
1155 12 : let mut state = dummy_state(&harness).await;
1156 2 : let now = Utc::now().naive_utc();
1157 2 :
1158 2 : let connected_sk_id = NodeId(0);
1159 2 : let current_lsn = 100_000;
1160 2 :
1161 2 : let connection_status = WalConnectionStatus {
1162 2 : is_connected: true,
1163 2 : has_processed_wal: true,
1164 2 : latest_connection_update: now,
1165 2 : latest_wal_update: now,
1166 2 : commit_lsn: Some(Lsn(current_lsn)),
1167 2 : streaming_lsn: Some(Lsn(current_lsn)),
1168 2 : node: NodeId(1),
1169 2 : };
1170 2 :
1171 2 : state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
1172 2 : state.wal_connection = Some(WalConnection {
1173 2 : started_at: now,
1174 2 : sk_id: connected_sk_id,
1175 2 : availability_zone: None,
1176 2 : status: connection_status,
1177 2 : connection_task: state.spawn(move |sender, _| async move {
1178 2 : sender
1179 2 : .send(TaskStateUpdate::Progress(connection_status))
1180 2 : .ok();
1181 2 : Ok(())
1182 2 : }),
1183 2 : discovered_new_wal: None,
1184 2 : });
1185 2 : state.wal_stream_candidates = HashMap::from([
1186 2 : (
1187 2 : connected_sk_id,
1188 2 : dummy_broker_sk_timeline(
1189 2 : current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
1190 2 : DUMMY_SAFEKEEPER_HOST,
1191 2 : now,
1192 2 : ),
1193 2 : ),
1194 2 : (
1195 2 : NodeId(1),
1196 2 : dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
1197 2 : ),
1198 2 : (
1199 2 : NodeId(2),
1200 2 : dummy_broker_sk_timeline(
1201 2 : current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
1202 2 : "not_enough_advanced_lsn",
1203 2 : now,
1204 2 : ),
1205 2 : ),
1206 2 : ]);
1207 2 :
1208 2 : let no_candidate = state.next_connection_candidate();
1209 2 : assert!(
1210 2 : no_candidate.is_none(),
1211 2 : "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}"
1212 2 : );
1213 2 :
1214 2 : Ok(())
1215 2 : }
1216 :
1217 : #[tokio::test]
1218 2 : async fn no_connection_candidate() -> anyhow::Result<()> {
1219 2 : let harness = TenantHarness::create("no_connection_candidate").await?;
1220 12 : let mut state = dummy_state(&harness).await;
1221 2 : let now = Utc::now().naive_utc();
1222 2 :
1223 2 : state.wal_connection = None;
1224 2 : state.wal_stream_candidates = HashMap::from([(
1225 2 : NodeId(0),
1226 2 : dummy_broker_sk_timeline(
1227 2 : 1 + state.conf.max_lsn_wal_lag.get(),
1228 2 : DUMMY_SAFEKEEPER_HOST,
1229 2 : now,
1230 2 : ),
1231 2 : )]);
1232 2 :
1233 2 : let only_candidate = state
1234 2 : .next_connection_candidate()
1235 2 : .expect("Expected one candidate selected out of the only data option, but got none");
1236 2 : assert_eq!(only_candidate.safekeeper_id, NodeId(0));
1237 2 : assert_eq!(
1238 2 : only_candidate.reason,
1239 2 : ReconnectReason::NoExistingConnection,
1240 2 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1241 2 : );
1242 2 : assert_eq!(
1243 2 : only_candidate.wal_source_connconf.host(),
1244 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1245 2 : );
1246 2 :
1247 2 : let selected_lsn = 100_000;
1248 2 : state.wal_stream_candidates = HashMap::from([
1249 2 : (
1250 2 : NodeId(0),
1251 2 : dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
1252 2 : ),
1253 2 : (
1254 2 : NodeId(1),
1255 2 : dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
1256 2 : ),
1257 2 : (
1258 2 : NodeId(2),
1259 2 : dummy_broker_sk_timeline(selected_lsn + 100, "", now),
1260 2 : ),
1261 2 : ]);
1262 2 : let biggest_wal_candidate = state.next_connection_candidate().expect(
1263 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1264 2 : );
1265 2 :
1266 2 : assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1));
1267 2 : assert_eq!(
1268 2 : biggest_wal_candidate.reason,
1269 2 : ReconnectReason::NoExistingConnection,
1270 2 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1271 2 : );
1272 2 : assert_eq!(
1273 2 : biggest_wal_candidate.wal_source_connconf.host(),
1274 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1275 2 : );
1276 2 :
1277 2 : Ok(())
1278 2 : }
1279 :
1280 : #[tokio::test]
1281 2 : async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
1282 2 : let harness = TenantHarness::create("candidate_with_many_connection_failures").await?;
1283 12 : let mut state = dummy_state(&harness).await;
1284 2 : let now = Utc::now().naive_utc();
1285 2 :
1286 2 : let current_lsn = Lsn(100_000).align();
1287 2 : let bigger_lsn = Lsn(current_lsn.0 + 100).align();
1288 2 :
1289 2 : state.wal_connection = None;
1290 2 : state.wal_stream_candidates = HashMap::from([
1291 2 : (
1292 2 : NodeId(0),
1293 2 : dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1294 2 : ),
1295 2 : (
1296 2 : NodeId(1),
1297 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1298 2 : ),
1299 2 : ]);
1300 2 : state.wal_connection_retries = HashMap::from([(
1301 2 : NodeId(0),
1302 2 : RetryInfo {
1303 2 : next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
1304 2 : retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
1305 2 : },
1306 2 : )]);
1307 2 :
1308 2 : let candidate_with_less_errors = state
1309 2 : .next_connection_candidate()
1310 2 : .expect("Expected one candidate selected, but got none");
1311 2 : assert_eq!(
1312 2 : candidate_with_less_errors.safekeeper_id,
1313 2 : NodeId(1),
1314 2 : "Should select the node with no pending retry cooldown"
1315 2 : );
1316 2 :
1317 2 : Ok(())
1318 2 : }
1319 :
1320 : #[tokio::test]
1321 2 : async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1322 2 : let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate").await?;
1323 12 : let mut state = dummy_state(&harness).await;
1324 2 : let current_lsn = Lsn(100_000).align();
1325 2 : let now = Utc::now().naive_utc();
1326 2 :
1327 2 : let connected_sk_id = NodeId(0);
1328 2 : let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
1329 2 :
1330 2 : let connection_status = WalConnectionStatus {
1331 2 : is_connected: true,
1332 2 : has_processed_wal: true,
1333 2 : latest_connection_update: now,
1334 2 : latest_wal_update: now,
1335 2 : commit_lsn: Some(current_lsn),
1336 2 : streaming_lsn: Some(current_lsn),
1337 2 : node: connected_sk_id,
1338 2 : };
1339 2 :
1340 2 : state.wal_connection = Some(WalConnection {
1341 2 : started_at: now,
1342 2 : sk_id: connected_sk_id,
1343 2 : availability_zone: None,
1344 2 : status: connection_status,
1345 2 : connection_task: state.spawn(move |sender, _| async move {
1346 2 : sender
1347 2 : .send(TaskStateUpdate::Progress(connection_status))
1348 2 : .ok();
1349 2 : Ok(())
1350 2 : }),
1351 2 : discovered_new_wal: None,
1352 2 : });
1353 2 : state.wal_stream_candidates = HashMap::from([
1354 2 : (
1355 2 : connected_sk_id,
1356 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1357 2 : ),
1358 2 : (
1359 2 : NodeId(1),
1360 2 : dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
1361 2 : ),
1362 2 : ]);
1363 2 :
1364 2 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1365 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1366 2 : );
1367 2 :
1368 2 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1));
1369 2 : assert_eq!(
1370 2 : over_threshcurrent_candidate.reason,
1371 2 : ReconnectReason::LaggingWal {
1372 2 : current_commit_lsn: current_lsn,
1373 2 : new_commit_lsn: new_lsn,
1374 2 : threshold: state.conf.max_lsn_wal_lag
1375 2 : },
1376 2 : "Should select bigger WAL safekeeper if it starts to lag enough"
1377 2 : );
1378 2 : assert_eq!(
1379 2 : over_threshcurrent_candidate.wal_source_connconf.host(),
1380 2 : &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
1381 2 : );
1382 2 :
1383 2 : Ok(())
1384 2 : }
1385 :
1386 : #[tokio::test]
1387 2 : async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
1388 2 : let harness =
1389 2 : TenantHarness::create("timeout_connection_threshold_current_candidate").await?;
1390 12 : let mut state = dummy_state(&harness).await;
1391 2 : let current_lsn = Lsn(100_000).align();
1392 2 : let now = Utc::now().naive_utc();
1393 2 :
1394 2 : let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
1395 2 : let time_over_threshold =
1396 2 : Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
1397 2 :
1398 2 : let connection_status = WalConnectionStatus {
1399 2 : is_connected: true,
1400 2 : has_processed_wal: true,
1401 2 : latest_connection_update: time_over_threshold,
1402 2 : latest_wal_update: time_over_threshold,
1403 2 : commit_lsn: Some(current_lsn),
1404 2 : streaming_lsn: Some(current_lsn),
1405 2 : node: NodeId(1),
1406 2 : };
1407 2 :
1408 2 : state.wal_connection = Some(WalConnection {
1409 2 : started_at: now,
1410 2 : sk_id: NodeId(1),
1411 2 : availability_zone: None,
1412 2 : status: connection_status,
1413 2 : connection_task: state.spawn(move |sender, _| async move {
1414 2 : sender
1415 2 : .send(TaskStateUpdate::Progress(connection_status))
1416 2 : .ok();
1417 2 : Ok(())
1418 2 : }),
1419 2 : discovered_new_wal: None,
1420 2 : });
1421 2 : state.wal_stream_candidates = HashMap::from([(
1422 2 : NodeId(0),
1423 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1424 2 : )]);
1425 2 :
1426 2 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1427 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1428 2 : );
1429 2 :
1430 2 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1431 2 : match over_threshcurrent_candidate.reason {
1432 2 : ReconnectReason::NoKeepAlives {
1433 2 : last_keep_alive,
1434 2 : threshold,
1435 2 : ..
1436 2 : } => {
1437 2 : assert_eq!(last_keep_alive, Some(time_over_threshold));
1438 2 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1439 2 : }
1440 2 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1441 2 : }
1442 2 : assert_eq!(
1443 2 : over_threshcurrent_candidate.wal_source_connconf.host(),
1444 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1445 2 : );
1446 2 :
1447 2 : Ok(())
1448 2 : }
1449 :
1450 : #[tokio::test]
1451 2 : async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1452 2 : let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate").await?;
1453 12 : let mut state = dummy_state(&harness).await;
1454 2 : let current_lsn = Lsn(100_000).align();
1455 2 : let new_lsn = Lsn(100_100).align();
1456 2 : let now = Utc::now().naive_utc();
1457 2 :
1458 2 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
1459 2 : let time_over_threshold =
1460 2 : Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
1461 2 :
1462 2 : let connection_status = WalConnectionStatus {
1463 2 : is_connected: true,
1464 2 : has_processed_wal: true,
1465 2 : latest_connection_update: now,
1466 2 : latest_wal_update: time_over_threshold,
1467 2 : commit_lsn: Some(current_lsn),
1468 2 : streaming_lsn: Some(current_lsn),
1469 2 : node: NodeId(1),
1470 2 : };
1471 2 :
1472 2 : state.wal_connection = Some(WalConnection {
1473 2 : started_at: now,
1474 2 : sk_id: NodeId(1),
1475 2 : availability_zone: None,
1476 2 : status: connection_status,
1477 2 : connection_task: state.spawn(move |_, _| async move { Ok(()) }),
1478 2 : discovered_new_wal: Some(NewCommittedWAL {
1479 2 : discovered_at: time_over_threshold,
1480 2 : lsn: new_lsn,
1481 2 : }),
1482 2 : });
1483 2 : state.wal_stream_candidates = HashMap::from([(
1484 2 : NodeId(0),
1485 2 : dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1486 2 : )]);
1487 2 :
1488 2 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1489 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1490 2 : );
1491 2 :
1492 2 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1493 2 : match over_threshcurrent_candidate.reason {
1494 2 : ReconnectReason::NoWalTimeout {
1495 2 : current_lsn,
1496 2 : current_commit_lsn,
1497 2 : candidate_commit_lsn,
1498 2 : last_wal_interaction,
1499 2 : threshold,
1500 2 : ..
1501 2 : } => {
1502 2 : assert_eq!(current_lsn, current_lsn);
1503 2 : assert_eq!(current_commit_lsn, current_lsn);
1504 2 : assert_eq!(candidate_commit_lsn, new_lsn);
1505 2 : assert_eq!(last_wal_interaction, Some(time_over_threshold));
1506 2 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1507 2 : }
1508 2 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1509 2 : }
1510 2 : assert_eq!(
1511 2 : over_threshcurrent_candidate.wal_source_connconf.host(),
1512 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1513 2 : );
1514 2 :
1515 2 : Ok(())
1516 2 : }
1517 :
1518 : const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
1519 :
1520 16 : async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
1521 64 : let (tenant, ctx) = harness.load().await;
1522 16 : let timeline = tenant
1523 16 : .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
1524 32 : .await
1525 16 : .expect("Failed to create an empty timeline for dummy wal connection manager");
1526 16 :
1527 16 : ConnectionManagerState {
1528 16 : id: TenantTimelineId {
1529 16 : tenant_id: harness.tenant_shard_id.tenant_id,
1530 16 : timeline_id: TIMELINE_ID,
1531 16 : },
1532 16 : timeline,
1533 16 : cancel: CancellationToken::new(),
1534 16 : conf: WalReceiverConf {
1535 16 : wal_connect_timeout: Duration::from_secs(1),
1536 16 : lagging_wal_timeout: Duration::from_secs(1),
1537 16 : max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
1538 16 : auth_token: None,
1539 16 : availability_zone: None,
1540 16 : ingest_batch_size: 1,
1541 16 : },
1542 16 : wal_connection: None,
1543 16 : wal_stream_candidates: HashMap::new(),
1544 16 : wal_connection_retries: HashMap::new(),
1545 16 : }
1546 16 : }
1547 :
1548 : #[tokio::test]
1549 2 : async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
1550 2 : // Pageserver and one of safekeepers will be in the same availability zone
1551 2 : // and pageserver should prefer to connect to it.
1552 2 : let test_az = Some("test_az".to_owned());
1553 2 :
1554 2 : let harness = TenantHarness::create("switch_to_same_availability_zone").await?;
1555 12 : let mut state = dummy_state(&harness).await;
1556 2 : state.conf.availability_zone.clone_from(&test_az);
1557 2 : let current_lsn = Lsn(100_000).align();
1558 2 : let now = Utc::now().naive_utc();
1559 2 :
1560 2 : let connected_sk_id = NodeId(0);
1561 2 :
1562 2 : let connection_status = WalConnectionStatus {
1563 2 : is_connected: true,
1564 2 : has_processed_wal: true,
1565 2 : latest_connection_update: now,
1566 2 : latest_wal_update: now,
1567 2 : commit_lsn: Some(current_lsn),
1568 2 : streaming_lsn: Some(current_lsn),
1569 2 : node: connected_sk_id,
1570 2 : };
1571 2 :
1572 2 : state.wal_connection = Some(WalConnection {
1573 2 : started_at: now,
1574 2 : sk_id: connected_sk_id,
1575 2 : availability_zone: None,
1576 2 : status: connection_status,
1577 2 : connection_task: state.spawn(move |sender, _| async move {
1578 2 : sender
1579 2 : .send(TaskStateUpdate::Progress(connection_status))
1580 2 : .ok();
1581 2 : Ok(())
1582 2 : }),
1583 2 : discovered_new_wal: None,
1584 2 : });
1585 2 :
1586 2 : // We have another safekeeper with the same commit_lsn, and it have the same availability zone as
1587 2 : // the current pageserver.
1588 2 : let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
1589 2 : same_az_sk.timeline.availability_zone.clone_from(&test_az);
1590 2 :
1591 2 : state.wal_stream_candidates = HashMap::from([
1592 2 : (
1593 2 : connected_sk_id,
1594 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1595 2 : ),
1596 2 : (NodeId(1), same_az_sk),
1597 2 : ]);
1598 2 :
1599 2 : // We expect that pageserver will switch to the safekeeper in the same availability zone,
1600 2 : // even if it has the same commit_lsn.
1601 2 : let next_candidate = state.next_connection_candidate().expect(
1602 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1603 2 : );
1604 2 :
1605 2 : assert_eq!(next_candidate.safekeeper_id, NodeId(1));
1606 2 : assert_eq!(
1607 2 : next_candidate.reason,
1608 2 : ReconnectReason::SwitchAvailabilityZone,
1609 2 : "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
1610 2 : );
1611 2 : assert_eq!(
1612 2 : next_candidate.wal_source_connconf.host(),
1613 2 : &Host::Domain("same_az".to_owned())
1614 2 : );
1615 2 :
1616 2 : Ok(())
1617 2 : }
1618 : }
|