Line data Source code
1 : //! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
2 : //! that contains the latest WAL to stream and this connection does not go stale.
3 : //!
4 : //! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
5 : //! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
6 : //! Current connection state is tracked too, to ensure it's not getting stale.
7 : //!
8 : //! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
9 : //! then a (re)connection happens, if necessary.
10 : //! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
11 :
12 : use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
13 :
14 : use super::{TaskStateUpdate, WalReceiverConf};
15 : use crate::context::{DownloadBehavior, RequestContext};
16 : use crate::metrics::{
17 : WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
18 : WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
19 : };
20 : use crate::task_mgr::TaskKind;
21 : use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
22 : use anyhow::Context;
23 : use chrono::{NaiveDateTime, Utc};
24 : use pageserver_api::models::TimelineState;
25 :
26 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
27 : use storage_broker::proto::{
28 : FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
29 : SubscribeByFilterRequest, TypeSubscription, TypedMessage,
30 : };
31 : use storage_broker::{BrokerClientChannel, Code, Streaming};
32 : use tokio_util::sync::CancellationToken;
33 : use tracing::*;
34 :
35 : use postgres_connection::PgConnectionConfig;
36 : use utils::backoff::{
37 : exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
38 : };
39 : use utils::postgres_client::{
40 : wal_stream_connection_config, ConnectionConfigArgs, PostgresClientProtocol,
41 : };
42 : use utils::{
43 : id::{NodeId, TenantTimelineId},
44 : lsn::Lsn,
45 : };
46 :
47 : use super::{
48 : walreceiver_connection::WalConnectionStatus, walreceiver_connection::WalReceiverError,
49 : TaskEvent, TaskHandle,
50 : };
51 :
52 : pub(crate) struct Cancelled;
53 :
54 : /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
55 : /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
56 : /// If storage broker subscription is cancelled, exits.
57 : ///
58 : /// # Cancel-Safety
59 : ///
60 : /// Not cancellation-safe. Use `cancel` token to request cancellation.
61 0 : pub(super) async fn connection_manager_loop_step(
62 0 : broker_client: &mut BrokerClientChannel,
63 0 : connection_manager_state: &mut ConnectionManagerState,
64 0 : ctx: &RequestContext,
65 0 : cancel: &CancellationToken,
66 0 : manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
67 0 : ) -> Result<(), Cancelled> {
68 0 : match tokio::select! {
69 0 : _ = cancel.cancelled() => { return Err(Cancelled); },
70 0 : st = connection_manager_state.timeline.wait_to_become_active(ctx) => { st }
71 : } {
72 0 : Ok(()) => {}
73 0 : Err(new_state) => {
74 0 : debug!(
75 : ?new_state,
76 0 : "state changed, stopping wal connection manager loop"
77 : );
78 0 : return Err(Cancelled);
79 : }
80 : }
81 :
82 0 : WALRECEIVER_ACTIVE_MANAGERS.inc();
83 0 : scopeguard::defer! {
84 0 : WALRECEIVER_ACTIVE_MANAGERS.dec();
85 0 : }
86 0 :
87 0 : let id = TenantTimelineId {
88 0 : tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
89 0 : timeline_id: connection_manager_state.timeline.timeline_id,
90 0 : };
91 0 :
92 0 : let mut timeline_state_updates = connection_manager_state
93 0 : .timeline
94 0 : .subscribe_for_state_updates();
95 0 :
96 0 : let mut wait_lsn_status = connection_manager_state
97 0 : .timeline
98 0 : .subscribe_for_wait_lsn_updates();
99 0 :
100 0 : // TODO: create a separate config option for discovery request interval
101 0 : let discovery_request_interval = connection_manager_state.conf.lagging_wal_timeout;
102 0 : let mut last_discovery_ts: Option<std::time::Instant> = None;
103 :
104 : // Subscribe to the broker updates. Stream shares underlying TCP connection
105 : // with other streams on this client (other connection managers). When
106 : // object goes out of scope, stream finishes in drop() automatically.
107 0 : let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
108 0 : debug!("Subscribed for broker timeline updates");
109 :
110 : loop {
111 0 : let time_until_next_retry = connection_manager_state.time_until_next_retry();
112 0 : let any_activity = connection_manager_state.wal_connection.is_some()
113 0 : || !connection_manager_state.wal_stream_candidates.is_empty();
114 :
115 : // These things are happening concurrently:
116 : //
117 : // - cancellation request
118 : // - keep receiving WAL on the current connection
119 : // - if the shared state says we need to change connection, disconnect and return
120 : // - this runs in a separate task and we receive updates via a watch channel
121 : // - change connection if the rules decide so, or if the current connection dies
122 : // - receive updates from broker
123 : // - this might change the current desired connection
124 : // - timeline state changes to something that does not allow walreceiver to run concurrently
125 : // - if there's no connection and no candidates, try to send a discovery request
126 :
127 : // NB: make sure each of the select expressions are cancellation-safe
128 : // (no need for arms to be cancellation-safe).
129 0 : tokio::select! {
130 0 : _ = cancel.cancelled() => { return Err(Cancelled); }
131 0 : Some(wal_connection_update) = async {
132 0 : match connection_manager_state.wal_connection.as_mut() {
133 0 : Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
134 0 : None => None,
135 : }
136 0 : } => {
137 0 : let wal_connection = connection_manager_state.wal_connection.as_mut()
138 0 : .expect("Should have a connection, as checked by the corresponding select! guard");
139 0 : match wal_connection_update {
140 0 : TaskEvent::Update(TaskStateUpdate::Started) => {},
141 0 : TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
142 0 : if new_status.has_processed_wal {
143 0 : // We have advanced last_record_lsn by processing the WAL received
144 0 : // from this safekeeper. This is good enough to clean unsuccessful
145 0 : // retries history and allow reconnecting to this safekeeper without
146 0 : // sleeping for a long time.
147 0 : connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
148 0 : }
149 0 : wal_connection.status = new_status;
150 : }
151 0 : TaskEvent::End(walreceiver_task_result) => {
152 0 : match walreceiver_task_result {
153 0 : Ok(()) => debug!("WAL receiving task finished"),
154 0 : Err(e) => error!("wal receiver task finished with an error: {e:?}"),
155 : }
156 0 : connection_manager_state.drop_old_connection(false).await;
157 : },
158 : }
159 : },
160 :
161 : // Got a new update from the broker
162 0 : broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
163 0 : match broker_update {
164 0 : Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
165 0 : Err(status) => {
166 0 : match status.code() {
167 0 : Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") => {
168 0 : // tonic's error handling doesn't provide a clear code for disconnections: we get
169 0 : // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
170 0 : info!("broker disconnected: {status}");
171 : },
172 : _ => {
173 0 : warn!("broker subscription failed: {status}");
174 : }
175 : }
176 0 : return Ok(());
177 : }
178 : Ok(None) => {
179 0 : error!("broker subscription stream ended"); // can't happen
180 0 : return Ok(());
181 : }
182 : }
183 : },
184 :
185 0 : new_event = async {
186 : // Reminder: this match arm needs to be cancellation-safe.
187 : loop {
188 0 : if connection_manager_state.timeline.current_state() == TimelineState::Loading {
189 0 : warn!("wal connection manager should only be launched after timeline has become active");
190 0 : }
191 0 : match timeline_state_updates.changed().await {
192 : Ok(()) => {
193 0 : let new_state = connection_manager_state.timeline.current_state();
194 0 : match new_state {
195 : // we're already active as walreceiver, no need to reactivate
196 0 : TimelineState::Active => continue,
197 : TimelineState::Broken { .. } | TimelineState::Stopping => {
198 0 : debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
199 0 : return ControlFlow::Break(());
200 : }
201 : TimelineState::Loading => {
202 0 : warn!("timeline transitioned back to Loading state, that should not happen");
203 0 : return ControlFlow::Continue(());
204 : }
205 : }
206 : }
207 0 : Err(_sender_dropped_error) => return ControlFlow::Break(()),
208 : }
209 : }
210 0 : } => match new_event {
211 : ControlFlow::Continue(()) => {
212 0 : return Ok(());
213 : }
214 : ControlFlow::Break(()) => {
215 0 : debug!("Timeline is no longer active, stopping wal connection manager loop");
216 0 : return Err(Cancelled);
217 : }
218 : },
219 :
220 0 : Some(()) = async {
221 0 : match time_until_next_retry {
222 0 : Some(sleep_time) => {
223 0 : tokio::time::sleep(sleep_time).await;
224 0 : Some(())
225 : },
226 : None => {
227 0 : debug!("No candidates to retry, waiting indefinitely for the broker events");
228 0 : None
229 : }
230 : }
231 0 : } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
232 :
233 0 : Some(()) = async {
234 0 : // Reminder: this match arm needs to be cancellation-safe.
235 0 : // Calculating time needed to wait until sending the next discovery request.
236 0 : // Current implementation is conservative and sends discovery requests only when there are no candidates.
237 0 :
238 0 : if any_activity {
239 : // No need to send discovery requests if there is an active connection or candidates.
240 0 : return None;
241 0 : }
242 :
243 : // Waiting for an active wait_lsn request.
244 0 : while wait_lsn_status.borrow().is_none() {
245 0 : if wait_lsn_status.changed().await.is_err() {
246 : // wait_lsn_status channel was closed, exiting
247 0 : warn!("wait_lsn_status channel was closed in connection_manager_loop_step");
248 0 : return None;
249 0 : }
250 : }
251 :
252 : // All preconditions met, preparing to send a discovery request.
253 0 : let now = std::time::Instant::now();
254 0 : let next_discovery_ts = last_discovery_ts
255 0 : .map(|ts| ts + discovery_request_interval)
256 0 : .unwrap_or_else(|| now);
257 0 :
258 0 : if next_discovery_ts > now {
259 : // Prevent sending discovery requests too frequently.
260 0 : tokio::time::sleep(next_discovery_ts - now).await;
261 0 : }
262 :
263 0 : let tenant_timeline_id = Some(ProtoTenantTimelineId {
264 0 : tenant_id: id.tenant_id.as_ref().to_owned(),
265 0 : timeline_id: id.timeline_id.as_ref().to_owned(),
266 0 : });
267 0 : let request = SafekeeperDiscoveryRequest { tenant_timeline_id };
268 0 : let msg = TypedMessage {
269 0 : r#type: MessageType::SafekeeperDiscoveryRequest as i32,
270 0 : safekeeper_timeline_info: None,
271 0 : safekeeper_discovery_request: Some(request),
272 0 : safekeeper_discovery_response: None,
273 0 : };
274 0 :
275 0 : last_discovery_ts = Some(std::time::Instant::now());
276 0 : debug!("No active connection and no candidates, sending discovery request to the broker");
277 :
278 : // Cancellation safety: we want to send a message to the broker, but publish_one()
279 : // function can get cancelled by the other select! arm. This is absolutely fine, because
280 : // we just want to receive broker updates and discovery is not important if we already
281 : // receive updates.
282 : //
283 : // It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
284 : // This is totally fine because of the reason above.
285 :
286 : // This is a fire-and-forget request, we don't care about the response
287 0 : let _ = broker_client.publish_one(msg).await;
288 0 : debug!("Discovery request sent to the broker");
289 0 : None
290 0 : } => {}
291 : }
292 :
293 0 : if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
294 0 : info!("Switching to new connection candidate: {new_candidate:?}");
295 0 : connection_manager_state
296 0 : .change_connection(new_candidate, ctx)
297 0 : .await
298 0 : }
299 0 : *manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
300 : }
301 0 : }
302 :
303 : /// Endlessly try to subscribe for broker updates for a given timeline.
304 0 : async fn subscribe_for_timeline_updates(
305 0 : broker_client: &mut BrokerClientChannel,
306 0 : id: TenantTimelineId,
307 0 : cancel: &CancellationToken,
308 0 : ) -> Result<Streaming<TypedMessage>, Cancelled> {
309 0 : let mut attempt = 0;
310 : loop {
311 0 : exponential_backoff(
312 0 : attempt,
313 0 : DEFAULT_BASE_BACKOFF_SECONDS,
314 0 : DEFAULT_MAX_BACKOFF_SECONDS,
315 0 : cancel,
316 0 : )
317 0 : .await;
318 0 : attempt += 1;
319 0 :
320 0 : // subscribe to the specific timeline
321 0 : let request = SubscribeByFilterRequest {
322 0 : types: vec![
323 0 : TypeSubscription {
324 0 : r#type: MessageType::SafekeeperTimelineInfo as i32,
325 0 : },
326 0 : TypeSubscription {
327 0 : r#type: MessageType::SafekeeperDiscoveryResponse as i32,
328 0 : },
329 0 : ],
330 0 : tenant_timeline_id: Some(FilterTenantTimelineId {
331 0 : enabled: true,
332 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
333 0 : tenant_id: id.tenant_id.as_ref().to_owned(),
334 0 : timeline_id: id.timeline_id.as_ref().to_owned(),
335 0 : }),
336 0 : }),
337 0 : };
338 0 :
339 0 : match {
340 0 : tokio::select! {
341 0 : r = broker_client.subscribe_by_filter(request) => { r }
342 0 : _ = cancel.cancelled() => { return Err(Cancelled); }
343 : }
344 : } {
345 0 : Ok(resp) => {
346 0 : return Ok(resp.into_inner());
347 : }
348 0 : Err(e) => {
349 0 : // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
350 0 : // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
351 0 : info!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}");
352 0 : continue;
353 : }
354 : }
355 : }
356 0 : }
357 :
358 : const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
359 : const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
360 : const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
361 :
362 : /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
363 : pub(super) struct ConnectionManagerState {
364 : id: TenantTimelineId,
365 : /// Use pageserver data about the timeline to filter out some of the safekeepers.
366 : timeline: Arc<Timeline>,
367 : /// Child token of [`super::WalReceiver::cancel`], inherited to all tasks we spawn.
368 : cancel: CancellationToken,
369 : conf: WalReceiverConf,
370 : /// Current connection to safekeeper for WAL streaming.
371 : wal_connection: Option<WalConnection>,
372 : /// Info about retries and unsuccessful attempts to connect to safekeepers.
373 : wal_connection_retries: HashMap<NodeId, RetryInfo>,
374 : /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
375 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
376 : }
377 :
378 : /// An information about connection manager's current connection and connection candidates.
379 : #[derive(Debug, Clone)]
380 : pub struct ConnectionManagerStatus {
381 : existing_connection: Option<WalConnectionStatus>,
382 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
383 : }
384 :
385 : impl ConnectionManagerStatus {
386 : /// Generates a string, describing current connection status in a form, suitable for logging.
387 0 : pub fn to_human_readable_string(&self) -> String {
388 0 : let mut resulting_string = String::new();
389 0 : match &self.existing_connection {
390 0 : Some(connection) => {
391 0 : if connection.has_processed_wal {
392 0 : resulting_string.push_str(&format!(
393 0 : " (update {}): streaming WAL from node {}, ",
394 0 : connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
395 0 : connection.node,
396 0 : ));
397 0 :
398 0 : match (connection.streaming_lsn, connection.commit_lsn) {
399 0 : (None, None) => resulting_string.push_str("no streaming data"),
400 0 : (None, Some(commit_lsn)) => {
401 0 : resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
402 : }
403 0 : (Some(streaming_lsn), None) => {
404 0 : resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
405 : }
406 0 : (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
407 0 : &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
408 0 : ),
409 : }
410 0 : } else if connection.is_connected {
411 0 : resulting_string.push_str(&format!(
412 0 : " (update {}): connecting to node {}",
413 0 : connection
414 0 : .latest_connection_update
415 0 : .format("%Y-%m-%d %H:%M:%S"),
416 0 : connection.node,
417 0 : ));
418 0 : } else {
419 0 : resulting_string.push_str(&format!(
420 0 : " (update {}): initializing node {} connection",
421 0 : connection
422 0 : .latest_connection_update
423 0 : .format("%Y-%m-%d %H:%M:%S"),
424 0 : connection.node,
425 0 : ));
426 0 : }
427 : }
428 0 : None => resulting_string.push_str(": disconnected"),
429 : }
430 :
431 0 : resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
432 0 : let mut candidates = self.wal_stream_candidates.iter().peekable();
433 0 : while let Some((node_id, candidate_info)) = candidates.next() {
434 0 : resulting_string.push_str(&format!(
435 0 : "({}|{}|{})",
436 0 : node_id,
437 0 : candidate_info.latest_update.format("%H:%M:%S"),
438 0 : Lsn(candidate_info.timeline.commit_lsn)
439 0 : ));
440 0 : if candidates.peek().is_some() {
441 0 : resulting_string.push_str(", ");
442 0 : }
443 : }
444 0 : resulting_string.push(']');
445 0 :
446 0 : resulting_string
447 0 : }
448 : }
449 :
450 : /// Current connection data.
451 : #[derive(Debug)]
452 : struct WalConnection {
453 : /// Time when the connection was initiated.
454 : started_at: NaiveDateTime,
455 : /// Current safekeeper pageserver is connected to for WAL streaming.
456 : sk_id: NodeId,
457 : /// Availability zone of the safekeeper.
458 : availability_zone: Option<String>,
459 : /// Status of the connection.
460 : status: WalConnectionStatus,
461 : /// WAL streaming task handle.
462 : connection_task: TaskHandle<WalConnectionStatus>,
463 : /// Have we discovered that other safekeeper has more recent WAL than we do?
464 : discovered_new_wal: Option<NewCommittedWAL>,
465 : }
466 :
467 : /// Notion of a new committed WAL, which exists on other safekeeper.
468 : #[derive(Debug, Clone, Copy)]
469 : struct NewCommittedWAL {
470 : /// LSN of the new committed WAL.
471 : lsn: Lsn,
472 : /// When we discovered that the new committed WAL exists on other safekeeper.
473 : discovered_at: NaiveDateTime,
474 : }
475 :
476 : #[derive(Debug, Clone, Copy)]
477 : struct RetryInfo {
478 : next_retry_at: Option<NaiveDateTime>,
479 : retry_duration_seconds: f64,
480 : }
481 :
482 : /// Data about the timeline to connect to, received from the broker.
483 : #[derive(Debug, Clone)]
484 : struct BrokerSkTimeline {
485 : timeline: SafekeeperDiscoveryResponse,
486 : /// Time at which the data was fetched from the broker last time, to track the stale data.
487 : latest_update: NaiveDateTime,
488 : }
489 :
490 : impl ConnectionManagerState {
491 0 : pub(super) fn new(
492 0 : timeline: Arc<Timeline>,
493 0 : conf: WalReceiverConf,
494 0 : cancel: CancellationToken,
495 0 : ) -> Self {
496 0 : let id = TenantTimelineId {
497 0 : tenant_id: timeline.tenant_shard_id.tenant_id,
498 0 : timeline_id: timeline.timeline_id,
499 0 : };
500 0 : Self {
501 0 : id,
502 0 : timeline,
503 0 : cancel,
504 0 : conf,
505 0 : wal_connection: None,
506 0 : wal_stream_candidates: HashMap::new(),
507 0 : wal_connection_retries: HashMap::new(),
508 0 : }
509 0 : }
510 :
511 20 : fn spawn<Fut>(
512 20 : &self,
513 20 : task: impl FnOnce(
514 20 : tokio::sync::watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
515 20 : CancellationToken,
516 20 : ) -> Fut
517 20 : + Send
518 20 : + 'static,
519 20 : ) -> TaskHandle<WalConnectionStatus>
520 20 : where
521 20 : Fut: std::future::Future<Output = anyhow::Result<()>> + Send,
522 20 : {
523 20 : // TODO: get rid of TaskHandle
524 20 : super::TaskHandle::spawn(&self.cancel, task)
525 20 : }
526 :
527 : /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
528 0 : async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
529 0 : WALRECEIVER_SWITCHES
530 0 : .with_label_values(&[new_sk.reason.name()])
531 0 : .inc();
532 0 :
533 0 : self.drop_old_connection(true).await;
534 :
535 0 : let node_id = new_sk.safekeeper_id;
536 0 : let connect_timeout = self.conf.wal_connect_timeout;
537 0 : let ingest_batch_size = self.conf.ingest_batch_size;
538 0 : let protocol = self.conf.protocol;
539 0 : let timeline = Arc::clone(&self.timeline);
540 0 : let ctx = ctx.detached_child(
541 0 : TaskKind::WalReceiverConnectionHandler,
542 0 : DownloadBehavior::Download,
543 0 : );
544 :
545 0 : let span = info_span!("connection", %node_id);
546 0 : let connection_handle = self.spawn(move |events_sender, cancellation| {
547 0 : async move {
548 0 : debug_assert_current_span_has_tenant_and_timeline_id();
549 :
550 0 : let res = super::walreceiver_connection::handle_walreceiver_connection(
551 0 : timeline,
552 0 : protocol,
553 0 : new_sk.wal_source_connconf,
554 0 : events_sender,
555 0 : cancellation.clone(),
556 0 : connect_timeout,
557 0 : ctx,
558 0 : node_id,
559 0 : ingest_batch_size,
560 0 : )
561 0 : .await;
562 :
563 0 : match res {
564 0 : Ok(()) => Ok(()),
565 0 : Err(e) => {
566 0 : match e {
567 0 : WalReceiverError::SuccessfulCompletion(msg) => {
568 0 : info!("walreceiver connection handling ended with success: {msg}");
569 0 : Ok(())
570 : }
571 0 : WalReceiverError::ExpectedSafekeeperError(e) => {
572 0 : info!("walreceiver connection handling ended: {e}");
573 0 : Ok(())
574 : }
575 : WalReceiverError::ClosedGate => {
576 0 : info!(
577 0 : "walreceiver connection handling ended because of closed gate"
578 : );
579 0 : Ok(())
580 : }
581 0 : WalReceiverError::Other(e) => {
582 0 : // give out an error to have task_mgr give it a really verbose logging
583 0 : if cancellation.is_cancelled() {
584 : // Ideally we would learn about this via some path other than Other, but
585 : // that requires refactoring all the intermediate layers of ingest code
586 : // that only emit anyhow::Error
587 0 : Ok(())
588 : } else {
589 0 : Err(e).context("walreceiver connection handling failure")
590 : }
591 : }
592 : }
593 : }
594 : }
595 0 : }
596 0 : .instrument(span)
597 0 : });
598 0 :
599 0 : let now = Utc::now().naive_utc();
600 0 : self.wal_connection = Some(WalConnection {
601 0 : started_at: now,
602 0 : sk_id: new_sk.safekeeper_id,
603 0 : availability_zone: new_sk.availability_zone,
604 0 : status: WalConnectionStatus {
605 0 : is_connected: false,
606 0 : has_processed_wal: false,
607 0 : latest_connection_update: now,
608 0 : latest_wal_update: now,
609 0 : streaming_lsn: None,
610 0 : commit_lsn: None,
611 0 : node: node_id,
612 0 : },
613 0 : connection_task: connection_handle,
614 0 : discovered_new_wal: None,
615 0 : });
616 0 : }
617 :
618 : /// Drops the current connection (if any) and updates retry timeout for the next
619 : /// connection attempt to the same safekeeper.
620 : ///
621 : /// # Cancel-Safety
622 : ///
623 : /// Not cancellation-safe.
624 0 : async fn drop_old_connection(&mut self, needs_shutdown: bool) {
625 0 : let wal_connection = match self.wal_connection.take() {
626 0 : Some(wal_connection) => wal_connection,
627 0 : None => return,
628 : };
629 :
630 0 : if needs_shutdown {
631 0 : wal_connection
632 0 : .connection_task
633 0 : .shutdown()
634 0 : // This here is why this function isn't cancellation-safe.
635 0 : // If we got cancelled here, then self.wal_connection is already None and we lose track of the task.
636 0 : // Even if our caller diligently calls Self::shutdown(), it will find a self.wal_connection=None
637 0 : // and thus be ineffective.
638 0 : .await;
639 0 : }
640 :
641 0 : let retry = self
642 0 : .wal_connection_retries
643 0 : .entry(wal_connection.sk_id)
644 0 : .or_insert(RetryInfo {
645 0 : next_retry_at: None,
646 0 : retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
647 0 : });
648 0 :
649 0 : let now = Utc::now().naive_utc();
650 0 :
651 0 : // Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
652 0 : // and we add backoff to the time when we started the connection attempt. If the connection
653 0 : // was active for a long time, then next_retry_at will be in the past.
654 0 : retry.next_retry_at =
655 0 : wal_connection
656 0 : .started_at
657 0 : .checked_add_signed(chrono::Duration::milliseconds(
658 0 : (retry.retry_duration_seconds * 1000.0) as i64,
659 0 : ));
660 :
661 0 : if let Some(next) = &retry.next_retry_at {
662 0 : if next > &now {
663 0 : info!(
664 0 : "Next connection retry to {:?} is at {}",
665 : wal_connection.sk_id, next
666 : );
667 0 : }
668 0 : }
669 :
670 0 : let next_retry_duration =
671 0 : retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
672 0 : // Clamp the next retry duration to the maximum allowed.
673 0 : let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
674 0 : // Clamp the next retry duration to the minimum allowed.
675 0 : let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
676 0 :
677 0 : retry.retry_duration_seconds = next_retry_duration;
678 0 : }
679 :
680 : /// Returns time needed to wait to have a new candidate for WAL streaming.
681 0 : fn time_until_next_retry(&self) -> Option<Duration> {
682 0 : let now = Utc::now().naive_utc();
683 :
684 0 : let next_retry_at = self
685 0 : .wal_connection_retries
686 0 : .values()
687 0 : .filter_map(|retry| retry.next_retry_at)
688 0 : .filter(|next_retry_at| next_retry_at > &now)
689 0 : .min()?;
690 :
691 0 : (next_retry_at - now).to_std().ok()
692 0 : }
693 :
694 : /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
695 0 : fn register_timeline_update(&mut self, typed_msg: TypedMessage) {
696 0 : let mut is_discovery = false;
697 0 : let timeline_update = match typed_msg.r#type() {
698 : MessageType::SafekeeperTimelineInfo => {
699 0 : let info = match typed_msg.safekeeper_timeline_info {
700 0 : Some(info) => info,
701 : None => {
702 0 : warn!("bad proto message from broker: no safekeeper_timeline_info");
703 0 : return;
704 : }
705 : };
706 0 : SafekeeperDiscoveryResponse {
707 0 : safekeeper_id: info.safekeeper_id,
708 0 : tenant_timeline_id: info.tenant_timeline_id,
709 0 : commit_lsn: info.commit_lsn,
710 0 : safekeeper_connstr: info.safekeeper_connstr,
711 0 : availability_zone: info.availability_zone,
712 0 : standby_horizon: info.standby_horizon,
713 0 : }
714 : }
715 : MessageType::SafekeeperDiscoveryResponse => {
716 0 : is_discovery = true;
717 0 : match typed_msg.safekeeper_discovery_response {
718 0 : Some(response) => response,
719 : None => {
720 0 : warn!("bad proto message from broker: no safekeeper_discovery_response");
721 0 : return;
722 : }
723 : }
724 : }
725 : _ => {
726 : // unexpected message
727 0 : return;
728 : }
729 : };
730 :
731 0 : WALRECEIVER_BROKER_UPDATES.inc();
732 0 :
733 0 : trace!(
734 0 : "safekeeper info update: standby_horizon(cutoff)={}",
735 : timeline_update.standby_horizon
736 : );
737 0 : if timeline_update.standby_horizon != 0 {
738 0 : // ignore reports from safekeepers not connected to replicas
739 0 : self.timeline
740 0 : .standby_horizon
741 0 : .store(Lsn(timeline_update.standby_horizon));
742 0 : self.timeline
743 0 : .metrics
744 0 : .standby_horizon_gauge
745 0 : .set(timeline_update.standby_horizon as i64);
746 0 : }
747 :
748 0 : let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
749 0 : let old_entry = self.wal_stream_candidates.insert(
750 0 : new_safekeeper_id,
751 0 : BrokerSkTimeline {
752 0 : timeline: timeline_update,
753 0 : latest_update: Utc::now().naive_utc(),
754 0 : },
755 0 : );
756 0 :
757 0 : if old_entry.is_none() {
758 0 : info!(
759 : ?is_discovery,
760 : %new_safekeeper_id,
761 0 : "New SK node was added",
762 : );
763 0 : WALRECEIVER_CANDIDATES_ADDED.inc();
764 0 : }
765 0 : }
766 :
767 : /// Cleans up stale broker records and checks the rest for the new connection candidate.
768 : /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
769 : /// The current rules for approving new candidates:
770 : /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
771 : /// * if there's no such entry, no new candidate found, abort
772 : /// * otherwise check if the candidate is much better than the current one
773 : ///
774 : /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
775 : /// General rules are following:
776 : /// * if connected safekeeper is not present, pick the candidate
777 : /// * if we haven't received any updates for some time, pick the candidate
778 : /// * if the candidate commit_lsn is much higher than the current one, pick the candidate
779 : /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
780 : /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
781 : ///
782 : /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
783 : /// Both thresholds are configured per tenant.
784 36 : fn next_connection_candidate(&mut self) -> Option<NewWalConnectionCandidate> {
785 36 : self.cleanup_old_candidates();
786 36 :
787 36 : match &self.wal_connection {
788 20 : Some(existing_wal_connection) => {
789 20 : let connected_sk_node = existing_wal_connection.sk_id;
790 :
791 20 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
792 20 : self.select_connection_candidate(Some(connected_sk_node))?;
793 20 : let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
794 20 :
795 20 : let now = Utc::now().naive_utc();
796 20 : if let Ok(latest_interaciton) =
797 20 : (now - existing_wal_connection.status.latest_connection_update).to_std()
798 : {
799 : // Drop connection if we haven't received keepalive message for a while.
800 20 : if latest_interaciton > self.conf.wal_connect_timeout {
801 4 : return Some(NewWalConnectionCandidate {
802 4 : safekeeper_id: new_sk_id,
803 4 : wal_source_connconf: new_wal_source_connconf,
804 4 : availability_zone: new_availability_zone,
805 4 : reason: ReconnectReason::NoKeepAlives {
806 4 : last_keep_alive: Some(
807 4 : existing_wal_connection.status.latest_connection_update,
808 4 : ),
809 4 : check_time: now,
810 4 : threshold: self.conf.wal_connect_timeout,
811 4 : },
812 4 : });
813 16 : }
814 0 : }
815 :
816 16 : if !existing_wal_connection.status.is_connected {
817 : // We haven't connected yet and we shouldn't switch until connection timeout (condition above).
818 0 : return None;
819 16 : }
820 :
821 16 : if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
822 16 : let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
823 16 : // Check if the new candidate has much more WAL than the current one.
824 16 : match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
825 16 : Some(new_sk_lsn_advantage) => {
826 16 : if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
827 4 : return Some(NewWalConnectionCandidate {
828 4 : safekeeper_id: new_sk_id,
829 4 : wal_source_connconf: new_wal_source_connconf,
830 4 : availability_zone: new_availability_zone,
831 4 : reason: ReconnectReason::LaggingWal {
832 4 : current_commit_lsn,
833 4 : new_commit_lsn,
834 4 : threshold: self.conf.max_lsn_wal_lag,
835 4 : },
836 4 : });
837 12 : }
838 12 : // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
839 12 : // and the current one is not, switch to the new one.
840 12 : if self.conf.availability_zone.is_some()
841 4 : && existing_wal_connection.availability_zone
842 4 : != self.conf.availability_zone
843 4 : && self.conf.availability_zone == new_availability_zone
844 : {
845 4 : return Some(NewWalConnectionCandidate {
846 4 : safekeeper_id: new_sk_id,
847 4 : availability_zone: new_availability_zone,
848 4 : wal_source_connconf: new_wal_source_connconf,
849 4 : reason: ReconnectReason::SwitchAvailabilityZone,
850 4 : });
851 8 : }
852 : }
853 0 : None => debug!(
854 0 : "Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
855 : ),
856 : }
857 0 : }
858 :
859 8 : let current_lsn = match existing_wal_connection.status.streaming_lsn {
860 8 : Some(lsn) => lsn,
861 0 : None => self.timeline.get_last_record_lsn(),
862 : };
863 8 : let current_commit_lsn = existing_wal_connection
864 8 : .status
865 8 : .commit_lsn
866 8 : .unwrap_or(current_lsn);
867 8 : let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
868 8 :
869 8 : // Keep discovered_new_wal only if connected safekeeper has not caught up yet.
870 8 : let mut discovered_new_wal = existing_wal_connection
871 8 : .discovered_new_wal
872 8 : .filter(|new_wal| new_wal.lsn > current_commit_lsn);
873 8 :
874 8 : if discovered_new_wal.is_none() {
875 : // Check if the new candidate has more WAL than the current one.
876 : // If the new candidate has more WAL than the current one, we consider switching to the new candidate.
877 4 : discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
878 4 : trace!(
879 0 : "New candidate has commit_lsn {}, higher than current_commit_lsn {}",
880 : candidate_commit_lsn,
881 : current_commit_lsn
882 : );
883 4 : Some(NewCommittedWAL {
884 4 : lsn: candidate_commit_lsn,
885 4 : discovered_at: Utc::now().naive_utc(),
886 4 : })
887 : } else {
888 0 : None
889 : };
890 4 : }
891 :
892 8 : let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
893 : // Connected safekeeper has more WAL, but we haven't received updates for some time.
894 0 : trace!(
895 0 : "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
896 0 : (now - existing_wal_connection.status.latest_wal_update).to_std(),
897 : current_lsn,
898 : current_commit_lsn
899 : );
900 0 : Some(existing_wal_connection.status.latest_wal_update)
901 : } else {
902 8 : discovered_new_wal.as_ref().map(|new_wal| {
903 8 : // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
904 8 : new_wal
905 8 : .discovered_at
906 8 : .max(existing_wal_connection.status.latest_wal_update)
907 8 : })
908 : };
909 :
910 : // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
911 8 : if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
912 8 : if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
913 4 : if candidate_commit_lsn > current_commit_lsn
914 4 : && waiting_for_new_wal > self.conf.lagging_wal_timeout
915 : {
916 4 : return Some(NewWalConnectionCandidate {
917 4 : safekeeper_id: new_sk_id,
918 4 : wal_source_connconf: new_wal_source_connconf,
919 4 : availability_zone: new_availability_zone,
920 4 : reason: ReconnectReason::NoWalTimeout {
921 4 : current_lsn,
922 4 : current_commit_lsn,
923 4 : candidate_commit_lsn,
924 4 : last_wal_interaction: Some(
925 4 : existing_wal_connection.status.latest_wal_update,
926 4 : ),
927 4 : check_time: now,
928 4 : threshold: self.conf.lagging_wal_timeout,
929 4 : },
930 4 : });
931 0 : }
932 4 : }
933 0 : }
934 :
935 4 : self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
936 : }
937 : None => {
938 12 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
939 16 : self.select_connection_candidate(None)?;
940 12 : return Some(NewWalConnectionCandidate {
941 12 : safekeeper_id: new_sk_id,
942 12 : availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
943 12 : wal_source_connconf: new_wal_source_connconf,
944 12 : reason: ReconnectReason::NoExistingConnection,
945 12 : });
946 : }
947 : }
948 :
949 4 : None
950 36 : }
951 :
952 : /// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
953 : /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
954 : ///
955 : /// The candidate that is chosen:
956 : /// * has no pending retry cooldown
957 : /// * has greatest commit_lsn among the ones that are left
958 36 : fn select_connection_candidate(
959 36 : &self,
960 36 : node_to_omit: Option<NodeId>,
961 36 : ) -> Option<(NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
962 36 : self.applicable_connection_candidates()
963 52 : .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
964 40 : .max_by_key(|(_, info, _)| info.commit_lsn)
965 36 : }
966 :
967 : /// Returns a list of safekeepers that have valid info and ready for connection.
968 : /// Some safekeepers are filtered by the retry cooldown.
969 36 : fn applicable_connection_candidates(
970 36 : &self,
971 36 : ) -> impl Iterator<Item = (NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
972 36 : let now = Utc::now().naive_utc();
973 36 :
974 36 : self.wal_stream_candidates
975 36 : .iter()
976 72 : .filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
977 64 : .filter(move |(sk_id, _)| {
978 64 : let next_retry_at = self
979 64 : .wal_connection_retries
980 64 : .get(sk_id)
981 64 : .and_then(|retry_info| {
982 4 : retry_info.next_retry_at
983 64 : });
984 64 :
985 64 : next_retry_at.is_none() || next_retry_at.unwrap() <= now
986 64 : }).filter_map(|(sk_id, broker_info)| {
987 60 : let info = &broker_info.timeline;
988 60 : if info.safekeeper_connstr.is_empty() {
989 8 : return None; // no connection string, ignore sk
990 52 : }
991 :
992 52 : let (shard_number, shard_count, shard_stripe_size) = match self.conf.protocol {
993 : PostgresClientProtocol::Vanilla => {
994 52 : (None, None, None)
995 : },
996 : PostgresClientProtocol::Interpreted { .. } => {
997 0 : let shard_identity = self.timeline.get_shard_identity();
998 0 : (
999 0 : Some(shard_identity.number.0),
1000 0 : Some(shard_identity.count.0),
1001 0 : Some(shard_identity.stripe_size.0),
1002 0 : )
1003 : }
1004 : };
1005 :
1006 52 : let connection_conf_args = ConnectionConfigArgs {
1007 52 : protocol: self.conf.protocol,
1008 52 : ttid: self.id,
1009 52 : shard_number,
1010 52 : shard_count,
1011 52 : shard_stripe_size,
1012 52 : listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
1013 52 : auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
1014 52 : availability_zone: self.conf.availability_zone.as_deref()
1015 52 : };
1016 52 :
1017 52 : match wal_stream_connection_config(connection_conf_args) {
1018 52 : Ok(connstr) => Some((*sk_id, info, connstr)),
1019 0 : Err(e) => {
1020 0 : error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
1021 0 : None
1022 : }
1023 : }
1024 60 : })
1025 36 : }
1026 :
1027 : /// Remove candidates which haven't sent broker updates for a while.
1028 36 : fn cleanup_old_candidates(&mut self) {
1029 36 : let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
1030 36 : let lagging_wal_timeout = self.conf.lagging_wal_timeout;
1031 36 :
1032 76 : self.wal_stream_candidates.retain(|node_id, broker_info| {
1033 76 : if let Ok(time_since_latest_broker_update) =
1034 76 : (Utc::now().naive_utc() - broker_info.latest_update).to_std()
1035 : {
1036 76 : let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
1037 76 : if !should_retain {
1038 4 : node_ids_to_remove.push(*node_id);
1039 72 : }
1040 76 : should_retain
1041 : } else {
1042 0 : true
1043 : }
1044 76 : });
1045 36 :
1046 36 : if !node_ids_to_remove.is_empty() {
1047 8 : for node_id in node_ids_to_remove {
1048 4 : info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
1049 4 : self.wal_connection_retries.remove(&node_id);
1050 4 : WALRECEIVER_CANDIDATES_REMOVED.inc();
1051 : }
1052 32 : }
1053 36 : }
1054 :
1055 : /// # Cancel-Safety
1056 : ///
1057 : /// Not cancellation-safe.
1058 0 : pub(super) async fn shutdown(mut self) {
1059 0 : if let Some(wal_connection) = self.wal_connection.take() {
1060 0 : wal_connection.connection_task.shutdown().await;
1061 0 : }
1062 0 : }
1063 :
1064 0 : fn manager_status(&self) -> ConnectionManagerStatus {
1065 0 : ConnectionManagerStatus {
1066 0 : existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
1067 0 : wal_stream_candidates: self.wal_stream_candidates.clone(),
1068 0 : }
1069 0 : }
1070 : }
1071 :
1072 : #[derive(Debug)]
1073 : struct NewWalConnectionCandidate {
1074 : safekeeper_id: NodeId,
1075 : wal_source_connconf: PgConnectionConfig,
1076 : availability_zone: Option<String>,
1077 : reason: ReconnectReason,
1078 : }
1079 :
1080 : /// Stores the reason why WAL connection was switched, for furter debugging purposes.
1081 : #[derive(Debug, PartialEq, Eq)]
1082 : enum ReconnectReason {
1083 : NoExistingConnection,
1084 : LaggingWal {
1085 : current_commit_lsn: Lsn,
1086 : new_commit_lsn: Lsn,
1087 : threshold: NonZeroU64,
1088 : },
1089 : SwitchAvailabilityZone,
1090 : NoWalTimeout {
1091 : current_lsn: Lsn,
1092 : current_commit_lsn: Lsn,
1093 : candidate_commit_lsn: Lsn,
1094 : last_wal_interaction: Option<NaiveDateTime>,
1095 : check_time: NaiveDateTime,
1096 : threshold: Duration,
1097 : },
1098 : NoKeepAlives {
1099 : last_keep_alive: Option<NaiveDateTime>,
1100 : check_time: NaiveDateTime,
1101 : threshold: Duration,
1102 : },
1103 : }
1104 :
1105 : impl ReconnectReason {
1106 0 : fn name(&self) -> &str {
1107 0 : match self {
1108 0 : ReconnectReason::NoExistingConnection => "NoExistingConnection",
1109 0 : ReconnectReason::LaggingWal { .. } => "LaggingWal",
1110 0 : ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
1111 0 : ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
1112 0 : ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
1113 : }
1114 0 : }
1115 : }
1116 :
1117 : #[cfg(test)]
1118 : mod tests {
1119 : use super::*;
1120 : use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
1121 : use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL;
1122 : use url::Host;
1123 :
1124 76 : fn dummy_broker_sk_timeline(
1125 76 : commit_lsn: u64,
1126 76 : safekeeper_connstr: &str,
1127 76 : latest_update: NaiveDateTime,
1128 76 : ) -> BrokerSkTimeline {
1129 76 : BrokerSkTimeline {
1130 76 : timeline: SafekeeperDiscoveryResponse {
1131 76 : safekeeper_id: 0,
1132 76 : tenant_timeline_id: None,
1133 76 : commit_lsn,
1134 76 : safekeeper_connstr: safekeeper_connstr.to_owned(),
1135 76 : availability_zone: None,
1136 76 : standby_horizon: 0,
1137 76 : },
1138 76 : latest_update,
1139 76 : }
1140 76 : }
1141 :
1142 : #[tokio::test]
1143 4 : async fn no_connection_no_candidate() -> anyhow::Result<()> {
1144 4 : let harness = TenantHarness::create("no_connection_no_candidate").await?;
1145 4 : let mut state = dummy_state(&harness).await;
1146 4 : let now = Utc::now().naive_utc();
1147 4 :
1148 4 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
1149 4 : let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
1150 4 :
1151 4 : state.wal_connection = None;
1152 4 : state.wal_stream_candidates = HashMap::from([
1153 4 : (NodeId(0), dummy_broker_sk_timeline(1, "", now)),
1154 4 : (NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
1155 4 : (NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
1156 4 : (
1157 4 : NodeId(3),
1158 4 : dummy_broker_sk_timeline(
1159 4 : 1 + state.conf.max_lsn_wal_lag.get(),
1160 4 : "delay_over_threshold",
1161 4 : delay_over_threshold,
1162 4 : ),
1163 4 : ),
1164 4 : ]);
1165 4 :
1166 4 : let no_candidate = state.next_connection_candidate();
1167 4 : assert!(
1168 4 : no_candidate.is_none(),
1169 4 : "Expected no candidate selected out of non full data options, but got {no_candidate:?}"
1170 4 : );
1171 4 :
1172 4 : Ok(())
1173 4 : }
1174 :
1175 : #[tokio::test]
1176 4 : async fn connection_no_candidate() -> anyhow::Result<()> {
1177 4 : let harness = TenantHarness::create("connection_no_candidate").await?;
1178 4 : let mut state = dummy_state(&harness).await;
1179 4 : let now = Utc::now().naive_utc();
1180 4 :
1181 4 : let connected_sk_id = NodeId(0);
1182 4 : let current_lsn = 100_000;
1183 4 :
1184 4 : let connection_status = WalConnectionStatus {
1185 4 : is_connected: true,
1186 4 : has_processed_wal: true,
1187 4 : latest_connection_update: now,
1188 4 : latest_wal_update: now,
1189 4 : commit_lsn: Some(Lsn(current_lsn)),
1190 4 : streaming_lsn: Some(Lsn(current_lsn)),
1191 4 : node: NodeId(1),
1192 4 : };
1193 4 :
1194 4 : state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
1195 4 : state.wal_connection = Some(WalConnection {
1196 4 : started_at: now,
1197 4 : sk_id: connected_sk_id,
1198 4 : availability_zone: None,
1199 4 : status: connection_status,
1200 4 : connection_task: state.spawn(move |sender, _| async move {
1201 4 : sender
1202 4 : .send(TaskStateUpdate::Progress(connection_status))
1203 4 : .ok();
1204 4 : Ok(())
1205 4 : }),
1206 4 : discovered_new_wal: None,
1207 4 : });
1208 4 : state.wal_stream_candidates = HashMap::from([
1209 4 : (
1210 4 : connected_sk_id,
1211 4 : dummy_broker_sk_timeline(
1212 4 : current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
1213 4 : DUMMY_SAFEKEEPER_HOST,
1214 4 : now,
1215 4 : ),
1216 4 : ),
1217 4 : (
1218 4 : NodeId(1),
1219 4 : dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
1220 4 : ),
1221 4 : (
1222 4 : NodeId(2),
1223 4 : dummy_broker_sk_timeline(
1224 4 : current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
1225 4 : "not_enough_advanced_lsn",
1226 4 : now,
1227 4 : ),
1228 4 : ),
1229 4 : ]);
1230 4 :
1231 4 : let no_candidate = state.next_connection_candidate();
1232 4 : assert!(
1233 4 : no_candidate.is_none(),
1234 4 : "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}"
1235 4 : );
1236 4 :
1237 4 : Ok(())
1238 4 : }
1239 :
1240 : #[tokio::test]
1241 4 : async fn no_connection_candidate() -> anyhow::Result<()> {
1242 4 : let harness = TenantHarness::create("no_connection_candidate").await?;
1243 4 : let mut state = dummy_state(&harness).await;
1244 4 : let now = Utc::now().naive_utc();
1245 4 :
1246 4 : state.wal_connection = None;
1247 4 : state.wal_stream_candidates = HashMap::from([(
1248 4 : NodeId(0),
1249 4 : dummy_broker_sk_timeline(
1250 4 : 1 + state.conf.max_lsn_wal_lag.get(),
1251 4 : DUMMY_SAFEKEEPER_HOST,
1252 4 : now,
1253 4 : ),
1254 4 : )]);
1255 4 :
1256 4 : let only_candidate = state
1257 4 : .next_connection_candidate()
1258 4 : .expect("Expected one candidate selected out of the only data option, but got none");
1259 4 : assert_eq!(only_candidate.safekeeper_id, NodeId(0));
1260 4 : assert_eq!(
1261 4 : only_candidate.reason,
1262 4 : ReconnectReason::NoExistingConnection,
1263 4 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1264 4 : );
1265 4 : assert_eq!(
1266 4 : only_candidate.wal_source_connconf.host(),
1267 4 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1268 4 : );
1269 4 :
1270 4 : let selected_lsn = 100_000;
1271 4 : state.wal_stream_candidates = HashMap::from([
1272 4 : (
1273 4 : NodeId(0),
1274 4 : dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
1275 4 : ),
1276 4 : (
1277 4 : NodeId(1),
1278 4 : dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
1279 4 : ),
1280 4 : (
1281 4 : NodeId(2),
1282 4 : dummy_broker_sk_timeline(selected_lsn + 100, "", now),
1283 4 : ),
1284 4 : ]);
1285 4 : let biggest_wal_candidate = state.next_connection_candidate().expect(
1286 4 : "Expected one candidate selected out of multiple valid data options, but got none",
1287 4 : );
1288 4 :
1289 4 : assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1));
1290 4 : assert_eq!(
1291 4 : biggest_wal_candidate.reason,
1292 4 : ReconnectReason::NoExistingConnection,
1293 4 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1294 4 : );
1295 4 : assert_eq!(
1296 4 : biggest_wal_candidate.wal_source_connconf.host(),
1297 4 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1298 4 : );
1299 4 :
1300 4 : Ok(())
1301 4 : }
1302 :
1303 : #[tokio::test]
1304 4 : async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
1305 4 : let harness = TenantHarness::create("candidate_with_many_connection_failures").await?;
1306 4 : let mut state = dummy_state(&harness).await;
1307 4 : let now = Utc::now().naive_utc();
1308 4 :
1309 4 : let current_lsn = Lsn(100_000).align();
1310 4 : let bigger_lsn = Lsn(current_lsn.0 + 100).align();
1311 4 :
1312 4 : state.wal_connection = None;
1313 4 : state.wal_stream_candidates = HashMap::from([
1314 4 : (
1315 4 : NodeId(0),
1316 4 : dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1317 4 : ),
1318 4 : (
1319 4 : NodeId(1),
1320 4 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1321 4 : ),
1322 4 : ]);
1323 4 : state.wal_connection_retries = HashMap::from([(
1324 4 : NodeId(0),
1325 4 : RetryInfo {
1326 4 : next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
1327 4 : retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
1328 4 : },
1329 4 : )]);
1330 4 :
1331 4 : let candidate_with_less_errors = state
1332 4 : .next_connection_candidate()
1333 4 : .expect("Expected one candidate selected, but got none");
1334 4 : assert_eq!(
1335 4 : candidate_with_less_errors.safekeeper_id,
1336 4 : NodeId(1),
1337 4 : "Should select the node with no pending retry cooldown"
1338 4 : );
1339 4 :
1340 4 : Ok(())
1341 4 : }
1342 :
1343 : #[tokio::test]
1344 4 : async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1345 4 : let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate").await?;
1346 4 : let mut state = dummy_state(&harness).await;
1347 4 : let current_lsn = Lsn(100_000).align();
1348 4 : let now = Utc::now().naive_utc();
1349 4 :
1350 4 : let connected_sk_id = NodeId(0);
1351 4 : let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
1352 4 :
1353 4 : let connection_status = WalConnectionStatus {
1354 4 : is_connected: true,
1355 4 : has_processed_wal: true,
1356 4 : latest_connection_update: now,
1357 4 : latest_wal_update: now,
1358 4 : commit_lsn: Some(current_lsn),
1359 4 : streaming_lsn: Some(current_lsn),
1360 4 : node: connected_sk_id,
1361 4 : };
1362 4 :
1363 4 : state.wal_connection = Some(WalConnection {
1364 4 : started_at: now,
1365 4 : sk_id: connected_sk_id,
1366 4 : availability_zone: None,
1367 4 : status: connection_status,
1368 4 : connection_task: state.spawn(move |sender, _| async move {
1369 4 : sender
1370 4 : .send(TaskStateUpdate::Progress(connection_status))
1371 4 : .ok();
1372 4 : Ok(())
1373 4 : }),
1374 4 : discovered_new_wal: None,
1375 4 : });
1376 4 : state.wal_stream_candidates = HashMap::from([
1377 4 : (
1378 4 : connected_sk_id,
1379 4 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1380 4 : ),
1381 4 : (
1382 4 : NodeId(1),
1383 4 : dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
1384 4 : ),
1385 4 : ]);
1386 4 :
1387 4 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1388 4 : "Expected one candidate selected out of multiple valid data options, but got none",
1389 4 : );
1390 4 :
1391 4 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1));
1392 4 : assert_eq!(
1393 4 : over_threshcurrent_candidate.reason,
1394 4 : ReconnectReason::LaggingWal {
1395 4 : current_commit_lsn: current_lsn,
1396 4 : new_commit_lsn: new_lsn,
1397 4 : threshold: state.conf.max_lsn_wal_lag
1398 4 : },
1399 4 : "Should select bigger WAL safekeeper if it starts to lag enough"
1400 4 : );
1401 4 : assert_eq!(
1402 4 : over_threshcurrent_candidate.wal_source_connconf.host(),
1403 4 : &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
1404 4 : );
1405 4 :
1406 4 : Ok(())
1407 4 : }
1408 :
1409 : #[tokio::test]
1410 4 : async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
1411 4 : let harness =
1412 4 : TenantHarness::create("timeout_connection_threshold_current_candidate").await?;
1413 4 : let mut state = dummy_state(&harness).await;
1414 4 : let current_lsn = Lsn(100_000).align();
1415 4 : let now = Utc::now().naive_utc();
1416 4 :
1417 4 : let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
1418 4 : let time_over_threshold =
1419 4 : Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
1420 4 :
1421 4 : let connection_status = WalConnectionStatus {
1422 4 : is_connected: true,
1423 4 : has_processed_wal: true,
1424 4 : latest_connection_update: time_over_threshold,
1425 4 : latest_wal_update: time_over_threshold,
1426 4 : commit_lsn: Some(current_lsn),
1427 4 : streaming_lsn: Some(current_lsn),
1428 4 : node: NodeId(1),
1429 4 : };
1430 4 :
1431 4 : state.wal_connection = Some(WalConnection {
1432 4 : started_at: now,
1433 4 : sk_id: NodeId(1),
1434 4 : availability_zone: None,
1435 4 : status: connection_status,
1436 4 : connection_task: state.spawn(move |sender, _| async move {
1437 4 : sender
1438 4 : .send(TaskStateUpdate::Progress(connection_status))
1439 4 : .ok();
1440 4 : Ok(())
1441 4 : }),
1442 4 : discovered_new_wal: None,
1443 4 : });
1444 4 : state.wal_stream_candidates = HashMap::from([(
1445 4 : NodeId(0),
1446 4 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1447 4 : )]);
1448 4 :
1449 4 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1450 4 : "Expected one candidate selected out of multiple valid data options, but got none",
1451 4 : );
1452 4 :
1453 4 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1454 4 : match over_threshcurrent_candidate.reason {
1455 4 : ReconnectReason::NoKeepAlives {
1456 4 : last_keep_alive,
1457 4 : threshold,
1458 4 : ..
1459 4 : } => {
1460 4 : assert_eq!(last_keep_alive, Some(time_over_threshold));
1461 4 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1462 4 : }
1463 4 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1464 4 : }
1465 4 : assert_eq!(
1466 4 : over_threshcurrent_candidate.wal_source_connconf.host(),
1467 4 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1468 4 : );
1469 4 :
1470 4 : Ok(())
1471 4 : }
1472 :
1473 : #[tokio::test]
1474 4 : async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1475 4 : let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate").await?;
1476 4 : let mut state = dummy_state(&harness).await;
1477 4 : let current_lsn = Lsn(100_000).align();
1478 4 : let new_lsn = Lsn(100_100).align();
1479 4 : let now = Utc::now().naive_utc();
1480 4 :
1481 4 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
1482 4 : let time_over_threshold =
1483 4 : Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
1484 4 :
1485 4 : let connection_status = WalConnectionStatus {
1486 4 : is_connected: true,
1487 4 : has_processed_wal: true,
1488 4 : latest_connection_update: now,
1489 4 : latest_wal_update: time_over_threshold,
1490 4 : commit_lsn: Some(current_lsn),
1491 4 : streaming_lsn: Some(current_lsn),
1492 4 : node: NodeId(1),
1493 4 : };
1494 4 :
1495 4 : state.wal_connection = Some(WalConnection {
1496 4 : started_at: now,
1497 4 : sk_id: NodeId(1),
1498 4 : availability_zone: None,
1499 4 : status: connection_status,
1500 4 : connection_task: state.spawn(move |_, _| async move { Ok(()) }),
1501 4 : discovered_new_wal: Some(NewCommittedWAL {
1502 4 : discovered_at: time_over_threshold,
1503 4 : lsn: new_lsn,
1504 4 : }),
1505 4 : });
1506 4 : state.wal_stream_candidates = HashMap::from([(
1507 4 : NodeId(0),
1508 4 : dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1509 4 : )]);
1510 4 :
1511 4 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1512 4 : "Expected one candidate selected out of multiple valid data options, but got none",
1513 4 : );
1514 4 :
1515 4 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1516 4 : match over_threshcurrent_candidate.reason {
1517 4 : ReconnectReason::NoWalTimeout {
1518 4 : current_lsn,
1519 4 : current_commit_lsn,
1520 4 : candidate_commit_lsn,
1521 4 : last_wal_interaction,
1522 4 : threshold,
1523 4 : ..
1524 4 : } => {
1525 4 : assert_eq!(current_lsn, current_lsn);
1526 4 : assert_eq!(current_commit_lsn, current_lsn);
1527 4 : assert_eq!(candidate_commit_lsn, new_lsn);
1528 4 : assert_eq!(last_wal_interaction, Some(time_over_threshold));
1529 4 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1530 4 : }
1531 4 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1532 4 : }
1533 4 : assert_eq!(
1534 4 : over_threshcurrent_candidate.wal_source_connconf.host(),
1535 4 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1536 4 : );
1537 4 :
1538 4 : Ok(())
1539 4 : }
1540 :
1541 : const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
1542 :
1543 32 : async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
1544 32 : let (tenant, ctx) = harness.load().await;
1545 32 : let timeline = tenant
1546 32 : .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
1547 32 : .await
1548 32 : .expect("Failed to create an empty timeline for dummy wal connection manager");
1549 32 :
1550 32 : ConnectionManagerState {
1551 32 : id: TenantTimelineId {
1552 32 : tenant_id: harness.tenant_shard_id.tenant_id,
1553 32 : timeline_id: TIMELINE_ID,
1554 32 : },
1555 32 : timeline,
1556 32 : cancel: CancellationToken::new(),
1557 32 : conf: WalReceiverConf {
1558 32 : protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
1559 32 : wal_connect_timeout: Duration::from_secs(1),
1560 32 : lagging_wal_timeout: Duration::from_secs(1),
1561 32 : max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
1562 32 : auth_token: None,
1563 32 : availability_zone: None,
1564 32 : ingest_batch_size: 1,
1565 32 : },
1566 32 : wal_connection: None,
1567 32 : wal_stream_candidates: HashMap::new(),
1568 32 : wal_connection_retries: HashMap::new(),
1569 32 : }
1570 32 : }
1571 :
1572 : #[tokio::test]
1573 4 : async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
1574 4 : // Pageserver and one of safekeepers will be in the same availability zone
1575 4 : // and pageserver should prefer to connect to it.
1576 4 : let test_az = Some("test_az".to_owned());
1577 4 :
1578 4 : let harness = TenantHarness::create("switch_to_same_availability_zone").await?;
1579 4 : let mut state = dummy_state(&harness).await;
1580 4 : state.conf.availability_zone.clone_from(&test_az);
1581 4 : let current_lsn = Lsn(100_000).align();
1582 4 : let now = Utc::now().naive_utc();
1583 4 :
1584 4 : let connected_sk_id = NodeId(0);
1585 4 :
1586 4 : let connection_status = WalConnectionStatus {
1587 4 : is_connected: true,
1588 4 : has_processed_wal: true,
1589 4 : latest_connection_update: now,
1590 4 : latest_wal_update: now,
1591 4 : commit_lsn: Some(current_lsn),
1592 4 : streaming_lsn: Some(current_lsn),
1593 4 : node: connected_sk_id,
1594 4 : };
1595 4 :
1596 4 : state.wal_connection = Some(WalConnection {
1597 4 : started_at: now,
1598 4 : sk_id: connected_sk_id,
1599 4 : availability_zone: None,
1600 4 : status: connection_status,
1601 4 : connection_task: state.spawn(move |sender, _| async move {
1602 4 : sender
1603 4 : .send(TaskStateUpdate::Progress(connection_status))
1604 4 : .ok();
1605 4 : Ok(())
1606 4 : }),
1607 4 : discovered_new_wal: None,
1608 4 : });
1609 4 :
1610 4 : // We have another safekeeper with the same commit_lsn, and it have the same availability zone as
1611 4 : // the current pageserver.
1612 4 : let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
1613 4 : same_az_sk.timeline.availability_zone.clone_from(&test_az);
1614 4 :
1615 4 : state.wal_stream_candidates = HashMap::from([
1616 4 : (
1617 4 : connected_sk_id,
1618 4 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1619 4 : ),
1620 4 : (NodeId(1), same_az_sk),
1621 4 : ]);
1622 4 :
1623 4 : // We expect that pageserver will switch to the safekeeper in the same availability zone,
1624 4 : // even if it has the same commit_lsn.
1625 4 : let next_candidate = state.next_connection_candidate().expect(
1626 4 : "Expected one candidate selected out of multiple valid data options, but got none",
1627 4 : );
1628 4 :
1629 4 : assert_eq!(next_candidate.safekeeper_id, NodeId(1));
1630 4 : assert_eq!(
1631 4 : next_candidate.reason,
1632 4 : ReconnectReason::SwitchAvailabilityZone,
1633 4 : "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
1634 4 : );
1635 4 : assert_eq!(
1636 4 : next_candidate.wal_source_connconf.host(),
1637 4 : &Host::Domain("same_az".to_owned())
1638 4 : );
1639 4 :
1640 4 : Ok(())
1641 4 : }
1642 : }
|