Line data Source code
1 : //! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
2 : //! that contains the latest WAL to stream and this connection does not go stale.
3 : //!
4 : //! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
5 : //! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
6 : //! Current connection state is tracked too, to ensure it's not getting stale.
7 : //!
8 : //! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
9 : //! then a (re)connection happens, if necessary.
10 : //! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
11 :
12 : use std::{collections::HashMap, num::NonZeroU64, ops::ControlFlow, sync::Arc, time::Duration};
13 :
14 : use super::{TaskStateUpdate, WalReceiverConf};
15 : use crate::context::{DownloadBehavior, RequestContext};
16 : use crate::metrics::{
17 : WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
18 : WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
19 : };
20 : use crate::task_mgr::TaskKind;
21 : use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
22 : use anyhow::Context;
23 : use chrono::{NaiveDateTime, Utc};
24 : use pageserver_api::models::TimelineState;
25 :
26 : use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId;
27 : use storage_broker::proto::{
28 : FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
29 : SubscribeByFilterRequest, TypeSubscription, TypedMessage,
30 : };
31 : use storage_broker::{BrokerClientChannel, Code, Streaming};
32 : use tokio_util::sync::CancellationToken;
33 : use tracing::*;
34 :
35 : use postgres_connection::PgConnectionConfig;
36 : use utils::backoff::{
37 : exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
38 : };
39 : use utils::postgres_client::wal_stream_connection_config;
40 : use utils::{
41 : id::{NodeId, TenantTimelineId},
42 : lsn::Lsn,
43 : };
44 :
45 : use super::{
46 : walreceiver_connection::WalConnectionStatus, walreceiver_connection::WalReceiverError,
47 : TaskEvent, TaskHandle,
48 : };
49 :
50 : pub(crate) struct Cancelled;
51 :
52 : /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
53 : /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
54 : /// If storage broker subscription is cancelled, exits.
55 : ///
56 : /// # Cancel-Safety
57 : ///
58 : /// Not cancellation-safe. Use `cancel` token to request cancellation.
59 0 : pub(super) async fn connection_manager_loop_step(
60 0 : broker_client: &mut BrokerClientChannel,
61 0 : connection_manager_state: &mut ConnectionManagerState,
62 0 : ctx: &RequestContext,
63 0 : cancel: &CancellationToken,
64 0 : manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
65 0 : ) -> Result<(), Cancelled> {
66 : match tokio::select! {
67 : _ = cancel.cancelled() => { return Err(Cancelled); },
68 : st = connection_manager_state.timeline.wait_to_become_active(ctx) => { st }
69 : } {
70 0 : Ok(()) => {}
71 0 : Err(new_state) => {
72 0 : debug!(
73 : ?new_state,
74 0 : "state changed, stopping wal connection manager loop"
75 : );
76 0 : return Err(Cancelled);
77 : }
78 : }
79 :
80 0 : WALRECEIVER_ACTIVE_MANAGERS.inc();
81 : scopeguard::defer! {
82 : WALRECEIVER_ACTIVE_MANAGERS.dec();
83 : }
84 :
85 0 : let id = TenantTimelineId {
86 0 : tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
87 0 : timeline_id: connection_manager_state.timeline.timeline_id,
88 0 : };
89 0 :
90 0 : let mut timeline_state_updates = connection_manager_state
91 0 : .timeline
92 0 : .subscribe_for_state_updates();
93 0 :
94 0 : let mut wait_lsn_status = connection_manager_state
95 0 : .timeline
96 0 : .subscribe_for_wait_lsn_updates();
97 0 :
98 0 : // TODO: create a separate config option for discovery request interval
99 0 : let discovery_request_interval = connection_manager_state.conf.lagging_wal_timeout;
100 0 : let mut last_discovery_ts: Option<std::time::Instant> = None;
101 :
102 : // Subscribe to the broker updates. Stream shares underlying TCP connection
103 : // with other streams on this client (other connection managers). When
104 : // object goes out of scope, stream finishes in drop() automatically.
105 0 : let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
106 0 : debug!("Subscribed for broker timeline updates");
107 :
108 : loop {
109 0 : let time_until_next_retry = connection_manager_state.time_until_next_retry();
110 0 : let any_activity = connection_manager_state.wal_connection.is_some()
111 0 : || !connection_manager_state.wal_stream_candidates.is_empty();
112 :
113 : // These things are happening concurrently:
114 : //
115 : // - cancellation request
116 : // - keep receiving WAL on the current connection
117 : // - if the shared state says we need to change connection, disconnect and return
118 : // - this runs in a separate task and we receive updates via a watch channel
119 : // - change connection if the rules decide so, or if the current connection dies
120 : // - receive updates from broker
121 : // - this might change the current desired connection
122 : // - timeline state changes to something that does not allow walreceiver to run concurrently
123 : // - if there's no connection and no candidates, try to send a discovery request
124 :
125 : // NB: make sure each of the select expressions are cancellation-safe
126 : // (no need for arms to be cancellation-safe).
127 : tokio::select! {
128 : _ = cancel.cancelled() => { return Err(Cancelled); }
129 0 : Some(wal_connection_update) = async {
130 0 : match connection_manager_state.wal_connection.as_mut() {
131 0 : Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
132 0 : None => None,
133 : }
134 0 : } => {
135 : let wal_connection = connection_manager_state.wal_connection.as_mut()
136 : .expect("Should have a connection, as checked by the corresponding select! guard");
137 : match wal_connection_update {
138 : TaskEvent::Update(TaskStateUpdate::Started) => {},
139 : TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
140 : if new_status.has_processed_wal {
141 : // We have advanced last_record_lsn by processing the WAL received
142 : // from this safekeeper. This is good enough to clean unsuccessful
143 : // retries history and allow reconnecting to this safekeeper without
144 : // sleeping for a long time.
145 : connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
146 : }
147 : wal_connection.status = new_status;
148 : }
149 : TaskEvent::End(walreceiver_task_result) => {
150 : match walreceiver_task_result {
151 : Ok(()) => debug!("WAL receiving task finished"),
152 : Err(e) => error!("wal receiver task finished with an error: {e:?}"),
153 : }
154 : connection_manager_state.drop_old_connection(false).await;
155 : },
156 : }
157 : },
158 :
159 : // Got a new update from the broker
160 : broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
161 : match broker_update {
162 : Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
163 : Err(status) => {
164 : match status.code() {
165 : Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") => {
166 : // tonic's error handling doesn't provide a clear code for disconnections: we get
167 : // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
168 : info!("broker disconnected: {status}");
169 : },
170 : _ => {
171 : warn!("broker subscription failed: {status}");
172 : }
173 : }
174 : return Ok(());
175 : }
176 : Ok(None) => {
177 : error!("broker subscription stream ended"); // can't happen
178 : return Ok(());
179 : }
180 : }
181 : },
182 :
183 0 : new_event = async {
184 : // Reminder: this match arm needs to be cancellation-safe.
185 0 : loop {
186 0 : if connection_manager_state.timeline.current_state() == TimelineState::Loading {
187 0 : warn!("wal connection manager should only be launched after timeline has become active");
188 0 : }
189 0 : match timeline_state_updates.changed().await {
190 : Ok(()) => {
191 0 : let new_state = connection_manager_state.timeline.current_state();
192 0 : match new_state {
193 : // we're already active as walreceiver, no need to reactivate
194 0 : TimelineState::Active => continue,
195 : TimelineState::Broken { .. } | TimelineState::Stopping => {
196 0 : debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
197 0 : return ControlFlow::Break(());
198 : }
199 : TimelineState::Loading => {
200 0 : warn!("timeline transitioned back to Loading state, that should not happen");
201 0 : return ControlFlow::Continue(());
202 : }
203 : }
204 : }
205 0 : Err(_sender_dropped_error) => return ControlFlow::Break(()),
206 : }
207 : }
208 0 : } => match new_event {
209 : ControlFlow::Continue(()) => {
210 : return Ok(());
211 : }
212 : ControlFlow::Break(()) => {
213 : debug!("Timeline is no longer active, stopping wal connection manager loop");
214 : return Err(Cancelled);
215 : }
216 : },
217 :
218 0 : Some(()) = async {
219 0 : match time_until_next_retry {
220 0 : Some(sleep_time) => {
221 0 : tokio::time::sleep(sleep_time).await;
222 0 : Some(())
223 : },
224 : None => {
225 0 : debug!("No candidates to retry, waiting indefinitely for the broker events");
226 0 : None
227 : }
228 : }
229 0 : } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
230 :
231 0 : Some(()) = async {
232 0 : // Reminder: this match arm needs to be cancellation-safe.
233 0 : // Calculating time needed to wait until sending the next discovery request.
234 0 : // Current implementation is conservative and sends discovery requests only when there are no candidates.
235 0 :
236 0 : if any_activity {
237 : // No need to send discovery requests if there is an active connection or candidates.
238 0 : return None;
239 0 : }
240 :
241 : // Waiting for an active wait_lsn request.
242 0 : while wait_lsn_status.borrow().is_none() {
243 0 : if wait_lsn_status.changed().await.is_err() {
244 : // wait_lsn_status channel was closed, exiting
245 0 : warn!("wait_lsn_status channel was closed in connection_manager_loop_step");
246 0 : return None;
247 0 : }
248 : }
249 :
250 : // All preconditions met, preparing to send a discovery request.
251 0 : let now = std::time::Instant::now();
252 0 : let next_discovery_ts = last_discovery_ts
253 0 : .map(|ts| ts + discovery_request_interval)
254 0 : .unwrap_or_else(|| now);
255 0 :
256 0 : if next_discovery_ts > now {
257 : // Prevent sending discovery requests too frequently.
258 0 : tokio::time::sleep(next_discovery_ts - now).await;
259 0 : }
260 :
261 0 : let tenant_timeline_id = Some(ProtoTenantTimelineId {
262 0 : tenant_id: id.tenant_id.as_ref().to_owned(),
263 0 : timeline_id: id.timeline_id.as_ref().to_owned(),
264 0 : });
265 0 : let request = SafekeeperDiscoveryRequest { tenant_timeline_id };
266 0 : let msg = TypedMessage {
267 0 : r#type: MessageType::SafekeeperDiscoveryRequest as i32,
268 0 : safekeeper_timeline_info: None,
269 0 : safekeeper_discovery_request: Some(request),
270 0 : safekeeper_discovery_response: None,
271 0 : };
272 0 :
273 0 : last_discovery_ts = Some(std::time::Instant::now());
274 0 : debug!("No active connection and no candidates, sending discovery request to the broker");
275 :
276 : // Cancellation safety: we want to send a message to the broker, but publish_one()
277 : // function can get cancelled by the other select! arm. This is absolutely fine, because
278 : // we just want to receive broker updates and discovery is not important if we already
279 : // receive updates.
280 : //
281 : // It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
282 : // This is totally fine because of the reason above.
283 :
284 : // This is a fire-and-forget request, we don't care about the response
285 0 : let _ = broker_client.publish_one(msg).await;
286 0 : debug!("Discovery request sent to the broker");
287 0 : None
288 0 : } => {}
289 : }
290 :
291 0 : if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
292 0 : info!("Switching to new connection candidate: {new_candidate:?}");
293 0 : connection_manager_state
294 0 : .change_connection(new_candidate, ctx)
295 0 : .await
296 0 : }
297 0 : *manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
298 : }
299 0 : }
300 :
301 : /// Endlessly try to subscribe for broker updates for a given timeline.
302 0 : async fn subscribe_for_timeline_updates(
303 0 : broker_client: &mut BrokerClientChannel,
304 0 : id: TenantTimelineId,
305 0 : cancel: &CancellationToken,
306 0 : ) -> Result<Streaming<TypedMessage>, Cancelled> {
307 0 : let mut attempt = 0;
308 : loop {
309 0 : exponential_backoff(
310 0 : attempt,
311 0 : DEFAULT_BASE_BACKOFF_SECONDS,
312 0 : DEFAULT_MAX_BACKOFF_SECONDS,
313 0 : cancel,
314 0 : )
315 0 : .await;
316 0 : attempt += 1;
317 0 :
318 0 : // subscribe to the specific timeline
319 0 : let request = SubscribeByFilterRequest {
320 0 : types: vec![
321 0 : TypeSubscription {
322 0 : r#type: MessageType::SafekeeperTimelineInfo as i32,
323 0 : },
324 0 : TypeSubscription {
325 0 : r#type: MessageType::SafekeeperDiscoveryResponse as i32,
326 0 : },
327 0 : ],
328 0 : tenant_timeline_id: Some(FilterTenantTimelineId {
329 0 : enabled: true,
330 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
331 0 : tenant_id: id.tenant_id.as_ref().to_owned(),
332 0 : timeline_id: id.timeline_id.as_ref().to_owned(),
333 0 : }),
334 0 : }),
335 0 : };
336 :
337 0 : match {
338 0 : tokio::select! {
339 : r = broker_client.subscribe_by_filter(request) => { r }
340 : _ = cancel.cancelled() => { return Err(Cancelled); }
341 0 : }
342 0 : } {
343 0 : Ok(resp) => {
344 0 : return Ok(resp.into_inner());
345 : }
346 0 : Err(e) => {
347 0 : // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
348 0 : // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
349 0 : info!("Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}");
350 0 : continue;
351 : }
352 : }
353 : }
354 0 : }
355 :
356 : const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
357 : const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
358 : const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
359 :
360 : /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
361 : pub(super) struct ConnectionManagerState {
362 : id: TenantTimelineId,
363 : /// Use pageserver data about the timeline to filter out some of the safekeepers.
364 : timeline: Arc<Timeline>,
365 : /// Child token of [`super::WalReceiver::cancel`], inherited to all tasks we spawn.
366 : cancel: CancellationToken,
367 : conf: WalReceiverConf,
368 : /// Current connection to safekeeper for WAL streaming.
369 : wal_connection: Option<WalConnection>,
370 : /// Info about retries and unsuccessful attempts to connect to safekeepers.
371 : wal_connection_retries: HashMap<NodeId, RetryInfo>,
372 : /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
373 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
374 : }
375 :
376 : /// An information about connection manager's current connection and connection candidates.
377 : #[derive(Debug, Clone)]
378 : pub struct ConnectionManagerStatus {
379 : existing_connection: Option<WalConnectionStatus>,
380 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
381 : }
382 :
383 : impl ConnectionManagerStatus {
384 : /// Generates a string, describing current connection status in a form, suitable for logging.
385 0 : pub fn to_human_readable_string(&self) -> String {
386 0 : let mut resulting_string = String::new();
387 0 : match &self.existing_connection {
388 0 : Some(connection) => {
389 0 : if connection.has_processed_wal {
390 0 : resulting_string.push_str(&format!(
391 0 : " (update {}): streaming WAL from node {}, ",
392 0 : connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
393 0 : connection.node,
394 0 : ));
395 0 :
396 0 : match (connection.streaming_lsn, connection.commit_lsn) {
397 0 : (None, None) => resulting_string.push_str("no streaming data"),
398 0 : (None, Some(commit_lsn)) => {
399 0 : resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
400 : }
401 0 : (Some(streaming_lsn), None) => {
402 0 : resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
403 : }
404 0 : (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
405 0 : &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
406 0 : ),
407 : }
408 0 : } else if connection.is_connected {
409 0 : resulting_string.push_str(&format!(
410 0 : " (update {}): connecting to node {}",
411 0 : connection
412 0 : .latest_connection_update
413 0 : .format("%Y-%m-%d %H:%M:%S"),
414 0 : connection.node,
415 0 : ));
416 0 : } else {
417 0 : resulting_string.push_str(&format!(
418 0 : " (update {}): initializing node {} connection",
419 0 : connection
420 0 : .latest_connection_update
421 0 : .format("%Y-%m-%d %H:%M:%S"),
422 0 : connection.node,
423 0 : ));
424 0 : }
425 : }
426 0 : None => resulting_string.push_str(": disconnected"),
427 : }
428 :
429 0 : resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
430 0 : let mut candidates = self.wal_stream_candidates.iter().peekable();
431 0 : while let Some((node_id, candidate_info)) = candidates.next() {
432 0 : resulting_string.push_str(&format!(
433 0 : "({}|{}|{})",
434 0 : node_id,
435 0 : candidate_info.latest_update.format("%H:%M:%S"),
436 0 : Lsn(candidate_info.timeline.commit_lsn)
437 0 : ));
438 0 : if candidates.peek().is_some() {
439 0 : resulting_string.push_str(", ");
440 0 : }
441 : }
442 0 : resulting_string.push(']');
443 0 :
444 0 : resulting_string
445 0 : }
446 : }
447 :
448 : /// Current connection data.
449 : #[derive(Debug)]
450 : struct WalConnection {
451 : /// Time when the connection was initiated.
452 : started_at: NaiveDateTime,
453 : /// Current safekeeper pageserver is connected to for WAL streaming.
454 : sk_id: NodeId,
455 : /// Availability zone of the safekeeper.
456 : availability_zone: Option<String>,
457 : /// Status of the connection.
458 : status: WalConnectionStatus,
459 : /// WAL streaming task handle.
460 : connection_task: TaskHandle<WalConnectionStatus>,
461 : /// Have we discovered that other safekeeper has more recent WAL than we do?
462 : discovered_new_wal: Option<NewCommittedWAL>,
463 : }
464 :
465 : /// Notion of a new committed WAL, which exists on other safekeeper.
466 : #[derive(Debug, Clone, Copy)]
467 : struct NewCommittedWAL {
468 : /// LSN of the new committed WAL.
469 : lsn: Lsn,
470 : /// When we discovered that the new committed WAL exists on other safekeeper.
471 : discovered_at: NaiveDateTime,
472 : }
473 :
474 : #[derive(Debug, Clone, Copy)]
475 : struct RetryInfo {
476 : next_retry_at: Option<NaiveDateTime>,
477 : retry_duration_seconds: f64,
478 : }
479 :
480 : /// Data about the timeline to connect to, received from the broker.
481 : #[derive(Debug, Clone)]
482 : struct BrokerSkTimeline {
483 : timeline: SafekeeperDiscoveryResponse,
484 : /// Time at which the data was fetched from the broker last time, to track the stale data.
485 : latest_update: NaiveDateTime,
486 : }
487 :
488 : impl ConnectionManagerState {
489 0 : pub(super) fn new(
490 0 : timeline: Arc<Timeline>,
491 0 : conf: WalReceiverConf,
492 0 : cancel: CancellationToken,
493 0 : ) -> Self {
494 0 : let id = TenantTimelineId {
495 0 : tenant_id: timeline.tenant_shard_id.tenant_id,
496 0 : timeline_id: timeline.timeline_id,
497 0 : };
498 0 : Self {
499 0 : id,
500 0 : timeline,
501 0 : cancel,
502 0 : conf,
503 0 : wal_connection: None,
504 0 : wal_stream_candidates: HashMap::new(),
505 0 : wal_connection_retries: HashMap::new(),
506 0 : }
507 0 : }
508 :
509 10 : fn spawn<Fut>(
510 10 : &self,
511 10 : task: impl FnOnce(
512 10 : tokio::sync::watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
513 10 : CancellationToken,
514 10 : ) -> Fut
515 10 : + Send
516 10 : + 'static,
517 10 : ) -> TaskHandle<WalConnectionStatus>
518 10 : where
519 10 : Fut: std::future::Future<Output = anyhow::Result<()>> + Send,
520 10 : {
521 10 : // TODO: get rid of TaskHandle
522 10 : super::TaskHandle::spawn(&self.cancel, task)
523 10 : }
524 :
525 : /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
526 0 : async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
527 0 : WALRECEIVER_SWITCHES
528 0 : .with_label_values(&[new_sk.reason.name()])
529 0 : .inc();
530 0 :
531 0 : self.drop_old_connection(true).await;
532 :
533 0 : let node_id = new_sk.safekeeper_id;
534 0 : let connect_timeout = self.conf.wal_connect_timeout;
535 0 : let ingest_batch_size = self.conf.ingest_batch_size;
536 0 : let timeline = Arc::clone(&self.timeline);
537 0 : let ctx = ctx.detached_child(
538 0 : TaskKind::WalReceiverConnectionHandler,
539 0 : DownloadBehavior::Download,
540 0 : );
541 :
542 0 : let span = info_span!("connection", %node_id);
543 0 : let connection_handle = self.spawn(move |events_sender, cancellation| {
544 0 : async move {
545 0 : debug_assert_current_span_has_tenant_and_timeline_id();
546 :
547 0 : let res = super::walreceiver_connection::handle_walreceiver_connection(
548 0 : timeline,
549 0 : new_sk.wal_source_connconf,
550 0 : events_sender,
551 0 : cancellation.clone(),
552 0 : connect_timeout,
553 0 : ctx,
554 0 : node_id,
555 0 : ingest_batch_size,
556 0 : )
557 0 : .await;
558 :
559 0 : match res {
560 0 : Ok(()) => Ok(()),
561 0 : Err(e) => {
562 0 : match e {
563 0 : WalReceiverError::SuccessfulCompletion(msg) => {
564 0 : info!("walreceiver connection handling ended with success: {msg}");
565 0 : Ok(())
566 : }
567 0 : WalReceiverError::ExpectedSafekeeperError(e) => {
568 0 : info!("walreceiver connection handling ended: {e}");
569 0 : Ok(())
570 : }
571 : WalReceiverError::ClosedGate => {
572 0 : info!(
573 0 : "walreceiver connection handling ended because of closed gate"
574 : );
575 0 : Ok(())
576 : }
577 0 : WalReceiverError::Other(e) => {
578 0 : // give out an error to have task_mgr give it a really verbose logging
579 0 : if cancellation.is_cancelled() {
580 : // Ideally we would learn about this via some path other than Other, but
581 : // that requires refactoring all the intermediate layers of ingest code
582 : // that only emit anyhow::Error
583 0 : Ok(())
584 : } else {
585 0 : Err(e).context("walreceiver connection handling failure")
586 : }
587 : }
588 : }
589 : }
590 : }
591 0 : }
592 0 : .instrument(span)
593 0 : });
594 0 :
595 0 : let now = Utc::now().naive_utc();
596 0 : self.wal_connection = Some(WalConnection {
597 0 : started_at: now,
598 0 : sk_id: new_sk.safekeeper_id,
599 0 : availability_zone: new_sk.availability_zone,
600 0 : status: WalConnectionStatus {
601 0 : is_connected: false,
602 0 : has_processed_wal: false,
603 0 : latest_connection_update: now,
604 0 : latest_wal_update: now,
605 0 : streaming_lsn: None,
606 0 : commit_lsn: None,
607 0 : node: node_id,
608 0 : },
609 0 : connection_task: connection_handle,
610 0 : discovered_new_wal: None,
611 0 : });
612 0 : }
613 :
614 : /// Drops the current connection (if any) and updates retry timeout for the next
615 : /// connection attempt to the same safekeeper.
616 : ///
617 : /// # Cancel-Safety
618 : ///
619 : /// Not cancellation-safe.
620 0 : async fn drop_old_connection(&mut self, needs_shutdown: bool) {
621 0 : let wal_connection = match self.wal_connection.take() {
622 0 : Some(wal_connection) => wal_connection,
623 0 : None => return,
624 : };
625 :
626 0 : if needs_shutdown {
627 0 : wal_connection
628 0 : .connection_task
629 0 : .shutdown()
630 : // This here is why this function isn't cancellation-safe.
631 : // If we got cancelled here, then self.wal_connection is already None and we lose track of the task.
632 : // Even if our caller diligently calls Self::shutdown(), it will find a self.wal_connection=None
633 : // and thus be ineffective.
634 0 : .await;
635 0 : }
636 :
637 0 : let retry = self
638 0 : .wal_connection_retries
639 0 : .entry(wal_connection.sk_id)
640 0 : .or_insert(RetryInfo {
641 0 : next_retry_at: None,
642 0 : retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
643 0 : });
644 0 :
645 0 : let now = Utc::now().naive_utc();
646 0 :
647 0 : // Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
648 0 : // and we add backoff to the time when we started the connection attempt. If the connection
649 0 : // was active for a long time, then next_retry_at will be in the past.
650 0 : retry.next_retry_at =
651 0 : wal_connection
652 0 : .started_at
653 0 : .checked_add_signed(chrono::Duration::milliseconds(
654 0 : (retry.retry_duration_seconds * 1000.0) as i64,
655 0 : ));
656 :
657 0 : if let Some(next) = &retry.next_retry_at {
658 0 : if next > &now {
659 0 : info!(
660 0 : "Next connection retry to {:?} is at {}",
661 : wal_connection.sk_id, next
662 : );
663 0 : }
664 0 : }
665 :
666 0 : let next_retry_duration =
667 0 : retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
668 0 : // Clamp the next retry duration to the maximum allowed.
669 0 : let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
670 0 : // Clamp the next retry duration to the minimum allowed.
671 0 : let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
672 0 :
673 0 : retry.retry_duration_seconds = next_retry_duration;
674 0 : }
675 :
676 : /// Returns time needed to wait to have a new candidate for WAL streaming.
677 0 : fn time_until_next_retry(&self) -> Option<Duration> {
678 0 : let now = Utc::now().naive_utc();
679 :
680 0 : let next_retry_at = self
681 0 : .wal_connection_retries
682 0 : .values()
683 0 : .filter_map(|retry| retry.next_retry_at)
684 0 : .filter(|next_retry_at| next_retry_at > &now)
685 0 : .min()?;
686 :
687 0 : (next_retry_at - now).to_std().ok()
688 0 : }
689 :
690 : /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
691 0 : fn register_timeline_update(&mut self, typed_msg: TypedMessage) {
692 0 : let mut is_discovery = false;
693 0 : let timeline_update = match typed_msg.r#type() {
694 : MessageType::SafekeeperTimelineInfo => {
695 0 : let info = match typed_msg.safekeeper_timeline_info {
696 0 : Some(info) => info,
697 : None => {
698 0 : warn!("bad proto message from broker: no safekeeper_timeline_info");
699 0 : return;
700 : }
701 : };
702 0 : SafekeeperDiscoveryResponse {
703 0 : safekeeper_id: info.safekeeper_id,
704 0 : tenant_timeline_id: info.tenant_timeline_id,
705 0 : commit_lsn: info.commit_lsn,
706 0 : safekeeper_connstr: info.safekeeper_connstr,
707 0 : availability_zone: info.availability_zone,
708 0 : }
709 : }
710 : MessageType::SafekeeperDiscoveryResponse => {
711 0 : is_discovery = true;
712 0 : match typed_msg.safekeeper_discovery_response {
713 0 : Some(response) => response,
714 : None => {
715 0 : warn!("bad proto message from broker: no safekeeper_discovery_response");
716 0 : return;
717 : }
718 : }
719 : }
720 : _ => {
721 : // unexpected message
722 0 : return;
723 : }
724 : };
725 :
726 0 : WALRECEIVER_BROKER_UPDATES.inc();
727 0 :
728 0 : let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
729 0 : let old_entry = self.wal_stream_candidates.insert(
730 0 : new_safekeeper_id,
731 0 : BrokerSkTimeline {
732 0 : timeline: timeline_update,
733 0 : latest_update: Utc::now().naive_utc(),
734 0 : },
735 0 : );
736 0 :
737 0 : if old_entry.is_none() {
738 0 : info!(
739 : ?is_discovery,
740 : %new_safekeeper_id,
741 0 : "New SK node was added",
742 : );
743 0 : WALRECEIVER_CANDIDATES_ADDED.inc();
744 0 : }
745 0 : }
746 :
747 : /// Cleans up stale broker records and checks the rest for the new connection candidate.
748 : /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
749 : /// The current rules for approving new candidates:
750 : /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
751 : /// * if there's no such entry, no new candidate found, abort
752 : /// * otherwise check if the candidate is much better than the current one
753 : ///
754 : /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
755 : /// General rules are following:
756 : /// * if connected safekeeper is not present, pick the candidate
757 : /// * if we haven't received any updates for some time, pick the candidate
758 : /// * if the candidate commit_lsn is much higher than the current one, pick the candidate
759 : /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
760 : /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
761 : ///
762 : /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
763 : /// Both thresholds are configured per tenant.
764 18 : fn next_connection_candidate(&mut self) -> Option<NewWalConnectionCandidate> {
765 18 : self.cleanup_old_candidates();
766 18 :
767 18 : match &self.wal_connection {
768 10 : Some(existing_wal_connection) => {
769 10 : let connected_sk_node = existing_wal_connection.sk_id;
770 :
771 10 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
772 10 : self.select_connection_candidate(Some(connected_sk_node))?;
773 10 : let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
774 10 :
775 10 : let now = Utc::now().naive_utc();
776 10 : if let Ok(latest_interaciton) =
777 10 : (now - existing_wal_connection.status.latest_connection_update).to_std()
778 : {
779 : // Drop connection if we haven't received keepalive message for a while.
780 10 : if latest_interaciton > self.conf.wal_connect_timeout {
781 2 : return Some(NewWalConnectionCandidate {
782 2 : safekeeper_id: new_sk_id,
783 2 : wal_source_connconf: new_wal_source_connconf,
784 2 : availability_zone: new_availability_zone,
785 2 : reason: ReconnectReason::NoKeepAlives {
786 2 : last_keep_alive: Some(
787 2 : existing_wal_connection.status.latest_connection_update,
788 2 : ),
789 2 : check_time: now,
790 2 : threshold: self.conf.wal_connect_timeout,
791 2 : },
792 2 : });
793 8 : }
794 0 : }
795 :
796 8 : if !existing_wal_connection.status.is_connected {
797 : // We haven't connected yet and we shouldn't switch until connection timeout (condition above).
798 0 : return None;
799 8 : }
800 :
801 8 : if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
802 8 : let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
803 8 : // Check if the new candidate has much more WAL than the current one.
804 8 : match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
805 8 : Some(new_sk_lsn_advantage) => {
806 8 : if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
807 2 : return Some(NewWalConnectionCandidate {
808 2 : safekeeper_id: new_sk_id,
809 2 : wal_source_connconf: new_wal_source_connconf,
810 2 : availability_zone: new_availability_zone,
811 2 : reason: ReconnectReason::LaggingWal {
812 2 : current_commit_lsn,
813 2 : new_commit_lsn,
814 2 : threshold: self.conf.max_lsn_wal_lag,
815 2 : },
816 2 : });
817 6 : }
818 6 : // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
819 6 : // and the current one is not, switch to the new one.
820 6 : if self.conf.availability_zone.is_some()
821 2 : && existing_wal_connection.availability_zone
822 2 : != self.conf.availability_zone
823 2 : && self.conf.availability_zone == new_availability_zone
824 : {
825 2 : return Some(NewWalConnectionCandidate {
826 2 : safekeeper_id: new_sk_id,
827 2 : availability_zone: new_availability_zone,
828 2 : wal_source_connconf: new_wal_source_connconf,
829 2 : reason: ReconnectReason::SwitchAvailabilityZone,
830 2 : });
831 4 : }
832 : }
833 0 : None => debug!(
834 0 : "Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
835 : ),
836 : }
837 0 : }
838 :
839 4 : let current_lsn = match existing_wal_connection.status.streaming_lsn {
840 4 : Some(lsn) => lsn,
841 0 : None => self.timeline.get_last_record_lsn(),
842 : };
843 4 : let current_commit_lsn = existing_wal_connection
844 4 : .status
845 4 : .commit_lsn
846 4 : .unwrap_or(current_lsn);
847 4 : let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
848 4 :
849 4 : // Keep discovered_new_wal only if connected safekeeper has not caught up yet.
850 4 : let mut discovered_new_wal = existing_wal_connection
851 4 : .discovered_new_wal
852 4 : .filter(|new_wal| new_wal.lsn > current_commit_lsn);
853 4 :
854 4 : if discovered_new_wal.is_none() {
855 : // Check if the new candidate has more WAL than the current one.
856 : // If the new candidate has more WAL than the current one, we consider switching to the new candidate.
857 2 : discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
858 2 : trace!(
859 0 : "New candidate has commit_lsn {}, higher than current_commit_lsn {}",
860 : candidate_commit_lsn,
861 : current_commit_lsn
862 : );
863 2 : Some(NewCommittedWAL {
864 2 : lsn: candidate_commit_lsn,
865 2 : discovered_at: Utc::now().naive_utc(),
866 2 : })
867 : } else {
868 0 : None
869 : };
870 2 : }
871 :
872 4 : let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
873 : // Connected safekeeper has more WAL, but we haven't received updates for some time.
874 0 : trace!(
875 0 : "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
876 0 : (now - existing_wal_connection.status.latest_wal_update).to_std(),
877 : current_lsn,
878 : current_commit_lsn
879 : );
880 0 : Some(existing_wal_connection.status.latest_wal_update)
881 : } else {
882 4 : discovered_new_wal.as_ref().map(|new_wal| {
883 4 : // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
884 4 : new_wal
885 4 : .discovered_at
886 4 : .max(existing_wal_connection.status.latest_wal_update)
887 4 : })
888 : };
889 :
890 : // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
891 4 : if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
892 4 : if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
893 2 : if candidate_commit_lsn > current_commit_lsn
894 2 : && waiting_for_new_wal > self.conf.lagging_wal_timeout
895 : {
896 2 : return Some(NewWalConnectionCandidate {
897 2 : safekeeper_id: new_sk_id,
898 2 : wal_source_connconf: new_wal_source_connconf,
899 2 : availability_zone: new_availability_zone,
900 2 : reason: ReconnectReason::NoWalTimeout {
901 2 : current_lsn,
902 2 : current_commit_lsn,
903 2 : candidate_commit_lsn,
904 2 : last_wal_interaction: Some(
905 2 : existing_wal_connection.status.latest_wal_update,
906 2 : ),
907 2 : check_time: now,
908 2 : threshold: self.conf.lagging_wal_timeout,
909 2 : },
910 2 : });
911 0 : }
912 2 : }
913 0 : }
914 :
915 2 : self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
916 : }
917 : None => {
918 6 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
919 8 : self.select_connection_candidate(None)?;
920 6 : return Some(NewWalConnectionCandidate {
921 6 : safekeeper_id: new_sk_id,
922 6 : availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
923 6 : wal_source_connconf: new_wal_source_connconf,
924 6 : reason: ReconnectReason::NoExistingConnection,
925 6 : });
926 : }
927 : }
928 :
929 2 : None
930 18 : }
931 :
932 : /// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
933 : /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
934 : ///
935 : /// The candidate that is chosen:
936 : /// * has no pending retry cooldown
937 : /// * has greatest commit_lsn among the ones that are left
938 18 : fn select_connection_candidate(
939 18 : &self,
940 18 : node_to_omit: Option<NodeId>,
941 18 : ) -> Option<(NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
942 18 : self.applicable_connection_candidates()
943 26 : .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
944 20 : .max_by_key(|(_, info, _)| info.commit_lsn)
945 18 : }
946 :
947 : /// Returns a list of safekeepers that have valid info and ready for connection.
948 : /// Some safekeepers are filtered by the retry cooldown.
949 18 : fn applicable_connection_candidates(
950 18 : &self,
951 18 : ) -> impl Iterator<Item = (NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
952 18 : let now = Utc::now().naive_utc();
953 18 :
954 18 : self.wal_stream_candidates
955 18 : .iter()
956 36 : .filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
957 32 : .filter(move |(sk_id, _)| {
958 32 : let next_retry_at = self
959 32 : .wal_connection_retries
960 32 : .get(sk_id)
961 32 : .and_then(|retry_info| {
962 2 : retry_info.next_retry_at
963 32 : });
964 32 :
965 32 : next_retry_at.is_none() || next_retry_at.unwrap() <= now
966 32 : }).filter_map(|(sk_id, broker_info)| {
967 30 : let info = &broker_info.timeline;
968 30 : if info.safekeeper_connstr.is_empty() {
969 4 : return None; // no connection string, ignore sk
970 26 : }
971 26 : match wal_stream_connection_config(
972 26 : self.id,
973 26 : info.safekeeper_connstr.as_ref(),
974 26 : match &self.conf.auth_token {
975 26 : None => None,
976 0 : Some(x) => Some(x),
977 : },
978 26 : self.conf.availability_zone.as_deref(),
979 : ) {
980 26 : Ok(connstr) => Some((*sk_id, info, connstr)),
981 0 : Err(e) => {
982 0 : error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
983 0 : None
984 : }
985 : }
986 30 : })
987 18 : }
988 :
989 : /// Remove candidates which haven't sent broker updates for a while.
990 18 : fn cleanup_old_candidates(&mut self) {
991 18 : let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
992 18 : let lagging_wal_timeout = self.conf.lagging_wal_timeout;
993 18 :
994 38 : self.wal_stream_candidates.retain(|node_id, broker_info| {
995 38 : if let Ok(time_since_latest_broker_update) =
996 38 : (Utc::now().naive_utc() - broker_info.latest_update).to_std()
997 : {
998 38 : let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
999 38 : if !should_retain {
1000 2 : node_ids_to_remove.push(*node_id);
1001 36 : }
1002 38 : should_retain
1003 : } else {
1004 0 : true
1005 : }
1006 38 : });
1007 18 :
1008 18 : if !node_ids_to_remove.is_empty() {
1009 4 : for node_id in node_ids_to_remove {
1010 2 : info!("Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections");
1011 2 : self.wal_connection_retries.remove(&node_id);
1012 2 : WALRECEIVER_CANDIDATES_REMOVED.inc();
1013 : }
1014 16 : }
1015 18 : }
1016 :
1017 : /// # Cancel-Safety
1018 : ///
1019 : /// Not cancellation-safe.
1020 0 : pub(super) async fn shutdown(mut self) {
1021 0 : if let Some(wal_connection) = self.wal_connection.take() {
1022 0 : wal_connection.connection_task.shutdown().await;
1023 0 : }
1024 0 : }
1025 :
1026 0 : fn manager_status(&self) -> ConnectionManagerStatus {
1027 0 : ConnectionManagerStatus {
1028 0 : existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
1029 0 : wal_stream_candidates: self.wal_stream_candidates.clone(),
1030 0 : }
1031 0 : }
1032 : }
1033 :
1034 : #[derive(Debug)]
1035 : struct NewWalConnectionCandidate {
1036 : safekeeper_id: NodeId,
1037 : wal_source_connconf: PgConnectionConfig,
1038 : availability_zone: Option<String>,
1039 : reason: ReconnectReason,
1040 : }
1041 :
1042 : /// Stores the reason why WAL connection was switched, for furter debugging purposes.
1043 : #[derive(Debug, PartialEq, Eq)]
1044 : enum ReconnectReason {
1045 : NoExistingConnection,
1046 : LaggingWal {
1047 : current_commit_lsn: Lsn,
1048 : new_commit_lsn: Lsn,
1049 : threshold: NonZeroU64,
1050 : },
1051 : SwitchAvailabilityZone,
1052 : NoWalTimeout {
1053 : current_lsn: Lsn,
1054 : current_commit_lsn: Lsn,
1055 : candidate_commit_lsn: Lsn,
1056 : last_wal_interaction: Option<NaiveDateTime>,
1057 : check_time: NaiveDateTime,
1058 : threshold: Duration,
1059 : },
1060 : NoKeepAlives {
1061 : last_keep_alive: Option<NaiveDateTime>,
1062 : check_time: NaiveDateTime,
1063 : threshold: Duration,
1064 : },
1065 : }
1066 :
1067 : impl ReconnectReason {
1068 0 : fn name(&self) -> &str {
1069 0 : match self {
1070 0 : ReconnectReason::NoExistingConnection => "NoExistingConnection",
1071 0 : ReconnectReason::LaggingWal { .. } => "LaggingWal",
1072 0 : ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
1073 0 : ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
1074 0 : ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
1075 : }
1076 0 : }
1077 : }
1078 :
1079 : #[cfg(test)]
1080 : mod tests {
1081 : use super::*;
1082 : use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
1083 : use url::Host;
1084 :
1085 38 : fn dummy_broker_sk_timeline(
1086 38 : commit_lsn: u64,
1087 38 : safekeeper_connstr: &str,
1088 38 : latest_update: NaiveDateTime,
1089 38 : ) -> BrokerSkTimeline {
1090 38 : BrokerSkTimeline {
1091 38 : timeline: SafekeeperDiscoveryResponse {
1092 38 : safekeeper_id: 0,
1093 38 : tenant_timeline_id: None,
1094 38 : commit_lsn,
1095 38 : safekeeper_connstr: safekeeper_connstr.to_owned(),
1096 38 : availability_zone: None,
1097 38 : },
1098 38 : latest_update,
1099 38 : }
1100 38 : }
1101 :
1102 : #[tokio::test]
1103 2 : async fn no_connection_no_candidate() -> anyhow::Result<()> {
1104 2 : let harness = TenantHarness::create("no_connection_no_candidate")?;
1105 14 : let mut state = dummy_state(&harness).await;
1106 2 : let now = Utc::now().naive_utc();
1107 2 :
1108 2 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
1109 2 : let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
1110 2 :
1111 2 : state.wal_connection = None;
1112 2 : state.wal_stream_candidates = HashMap::from([
1113 2 : (NodeId(0), dummy_broker_sk_timeline(1, "", now)),
1114 2 : (NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
1115 2 : (NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
1116 2 : (
1117 2 : NodeId(3),
1118 2 : dummy_broker_sk_timeline(
1119 2 : 1 + state.conf.max_lsn_wal_lag.get(),
1120 2 : "delay_over_threshold",
1121 2 : delay_over_threshold,
1122 2 : ),
1123 2 : ),
1124 2 : ]);
1125 2 :
1126 2 : let no_candidate = state.next_connection_candidate();
1127 2 : assert!(
1128 2 : no_candidate.is_none(),
1129 2 : "Expected no candidate selected out of non full data options, but got {no_candidate:?}"
1130 2 : );
1131 2 :
1132 2 : Ok(())
1133 2 : }
1134 :
1135 : #[tokio::test]
1136 2 : async fn connection_no_candidate() -> anyhow::Result<()> {
1137 2 : let harness = TenantHarness::create("connection_no_candidate")?;
1138 14 : let mut state = dummy_state(&harness).await;
1139 2 : let now = Utc::now().naive_utc();
1140 2 :
1141 2 : let connected_sk_id = NodeId(0);
1142 2 : let current_lsn = 100_000;
1143 2 :
1144 2 : let connection_status = WalConnectionStatus {
1145 2 : is_connected: true,
1146 2 : has_processed_wal: true,
1147 2 : latest_connection_update: now,
1148 2 : latest_wal_update: now,
1149 2 : commit_lsn: Some(Lsn(current_lsn)),
1150 2 : streaming_lsn: Some(Lsn(current_lsn)),
1151 2 : node: NodeId(1),
1152 2 : };
1153 2 :
1154 2 : state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
1155 2 : state.wal_connection = Some(WalConnection {
1156 2 : started_at: now,
1157 2 : sk_id: connected_sk_id,
1158 2 : availability_zone: None,
1159 2 : status: connection_status,
1160 2 : connection_task: state.spawn(move |sender, _| async move {
1161 2 : sender
1162 2 : .send(TaskStateUpdate::Progress(connection_status))
1163 2 : .ok();
1164 2 : Ok(())
1165 2 : }),
1166 2 : discovered_new_wal: None,
1167 2 : });
1168 2 : state.wal_stream_candidates = HashMap::from([
1169 2 : (
1170 2 : connected_sk_id,
1171 2 : dummy_broker_sk_timeline(
1172 2 : current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
1173 2 : DUMMY_SAFEKEEPER_HOST,
1174 2 : now,
1175 2 : ),
1176 2 : ),
1177 2 : (
1178 2 : NodeId(1),
1179 2 : dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
1180 2 : ),
1181 2 : (
1182 2 : NodeId(2),
1183 2 : dummy_broker_sk_timeline(
1184 2 : current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
1185 2 : "not_enough_advanced_lsn",
1186 2 : now,
1187 2 : ),
1188 2 : ),
1189 2 : ]);
1190 2 :
1191 2 : let no_candidate = state.next_connection_candidate();
1192 2 : assert!(
1193 2 : no_candidate.is_none(),
1194 2 : "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}"
1195 2 : );
1196 2 :
1197 2 : Ok(())
1198 2 : }
1199 :
1200 : #[tokio::test]
1201 2 : async fn no_connection_candidate() -> anyhow::Result<()> {
1202 2 : let harness = TenantHarness::create("no_connection_candidate")?;
1203 13 : let mut state = dummy_state(&harness).await;
1204 2 : let now = Utc::now().naive_utc();
1205 2 :
1206 2 : state.wal_connection = None;
1207 2 : state.wal_stream_candidates = HashMap::from([(
1208 2 : NodeId(0),
1209 2 : dummy_broker_sk_timeline(
1210 2 : 1 + state.conf.max_lsn_wal_lag.get(),
1211 2 : DUMMY_SAFEKEEPER_HOST,
1212 2 : now,
1213 2 : ),
1214 2 : )]);
1215 2 :
1216 2 : let only_candidate = state
1217 2 : .next_connection_candidate()
1218 2 : .expect("Expected one candidate selected out of the only data option, but got none");
1219 2 : assert_eq!(only_candidate.safekeeper_id, NodeId(0));
1220 2 : assert_eq!(
1221 2 : only_candidate.reason,
1222 2 : ReconnectReason::NoExistingConnection,
1223 2 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1224 2 : );
1225 2 : assert_eq!(
1226 2 : only_candidate.wal_source_connconf.host(),
1227 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1228 2 : );
1229 2 :
1230 2 : let selected_lsn = 100_000;
1231 2 : state.wal_stream_candidates = HashMap::from([
1232 2 : (
1233 2 : NodeId(0),
1234 2 : dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
1235 2 : ),
1236 2 : (
1237 2 : NodeId(1),
1238 2 : dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
1239 2 : ),
1240 2 : (
1241 2 : NodeId(2),
1242 2 : dummy_broker_sk_timeline(selected_lsn + 100, "", now),
1243 2 : ),
1244 2 : ]);
1245 2 : let biggest_wal_candidate = state.next_connection_candidate().expect(
1246 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1247 2 : );
1248 2 :
1249 2 : assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1));
1250 2 : assert_eq!(
1251 2 : biggest_wal_candidate.reason,
1252 2 : ReconnectReason::NoExistingConnection,
1253 2 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1254 2 : );
1255 2 : assert_eq!(
1256 2 : biggest_wal_candidate.wal_source_connconf.host(),
1257 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1258 2 : );
1259 2 :
1260 2 : Ok(())
1261 2 : }
1262 :
1263 : #[tokio::test]
1264 2 : async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
1265 2 : let harness = TenantHarness::create("candidate_with_many_connection_failures")?;
1266 11 : let mut state = dummy_state(&harness).await;
1267 2 : let now = Utc::now().naive_utc();
1268 2 :
1269 2 : let current_lsn = Lsn(100_000).align();
1270 2 : let bigger_lsn = Lsn(current_lsn.0 + 100).align();
1271 2 :
1272 2 : state.wal_connection = None;
1273 2 : state.wal_stream_candidates = HashMap::from([
1274 2 : (
1275 2 : NodeId(0),
1276 2 : dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1277 2 : ),
1278 2 : (
1279 2 : NodeId(1),
1280 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1281 2 : ),
1282 2 : ]);
1283 2 : state.wal_connection_retries = HashMap::from([(
1284 2 : NodeId(0),
1285 2 : RetryInfo {
1286 2 : next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
1287 2 : retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
1288 2 : },
1289 2 : )]);
1290 2 :
1291 2 : let candidate_with_less_errors = state
1292 2 : .next_connection_candidate()
1293 2 : .expect("Expected one candidate selected, but got none");
1294 2 : assert_eq!(
1295 2 : candidate_with_less_errors.safekeeper_id,
1296 2 : NodeId(1),
1297 2 : "Should select the node with no pending retry cooldown"
1298 2 : );
1299 2 :
1300 2 : Ok(())
1301 2 : }
1302 :
1303 : #[tokio::test]
1304 2 : async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1305 2 : let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate")?;
1306 12 : let mut state = dummy_state(&harness).await;
1307 2 : let current_lsn = Lsn(100_000).align();
1308 2 : let now = Utc::now().naive_utc();
1309 2 :
1310 2 : let connected_sk_id = NodeId(0);
1311 2 : let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
1312 2 :
1313 2 : let connection_status = WalConnectionStatus {
1314 2 : is_connected: true,
1315 2 : has_processed_wal: true,
1316 2 : latest_connection_update: now,
1317 2 : latest_wal_update: now,
1318 2 : commit_lsn: Some(current_lsn),
1319 2 : streaming_lsn: Some(current_lsn),
1320 2 : node: connected_sk_id,
1321 2 : };
1322 2 :
1323 2 : state.wal_connection = Some(WalConnection {
1324 2 : started_at: now,
1325 2 : sk_id: connected_sk_id,
1326 2 : availability_zone: None,
1327 2 : status: connection_status,
1328 2 : connection_task: state.spawn(move |sender, _| async move {
1329 2 : sender
1330 2 : .send(TaskStateUpdate::Progress(connection_status))
1331 2 : .ok();
1332 2 : Ok(())
1333 2 : }),
1334 2 : discovered_new_wal: None,
1335 2 : });
1336 2 : state.wal_stream_candidates = HashMap::from([
1337 2 : (
1338 2 : connected_sk_id,
1339 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1340 2 : ),
1341 2 : (
1342 2 : NodeId(1),
1343 2 : dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
1344 2 : ),
1345 2 : ]);
1346 2 :
1347 2 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1348 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1349 2 : );
1350 2 :
1351 2 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1));
1352 2 : assert_eq!(
1353 2 : over_threshcurrent_candidate.reason,
1354 2 : ReconnectReason::LaggingWal {
1355 2 : current_commit_lsn: current_lsn,
1356 2 : new_commit_lsn: new_lsn,
1357 2 : threshold: state.conf.max_lsn_wal_lag
1358 2 : },
1359 2 : "Should select bigger WAL safekeeper if it starts to lag enough"
1360 2 : );
1361 2 : assert_eq!(
1362 2 : over_threshcurrent_candidate.wal_source_connconf.host(),
1363 2 : &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
1364 2 : );
1365 2 :
1366 2 : Ok(())
1367 2 : }
1368 :
1369 : #[tokio::test]
1370 2 : async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
1371 2 : let harness = TenantHarness::create("timeout_connection_threshold_current_candidate")?;
1372 14 : let mut state = dummy_state(&harness).await;
1373 2 : let current_lsn = Lsn(100_000).align();
1374 2 : let now = Utc::now().naive_utc();
1375 2 :
1376 2 : let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
1377 2 : let time_over_threshold =
1378 2 : Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
1379 2 :
1380 2 : let connection_status = WalConnectionStatus {
1381 2 : is_connected: true,
1382 2 : has_processed_wal: true,
1383 2 : latest_connection_update: time_over_threshold,
1384 2 : latest_wal_update: time_over_threshold,
1385 2 : commit_lsn: Some(current_lsn),
1386 2 : streaming_lsn: Some(current_lsn),
1387 2 : node: NodeId(1),
1388 2 : };
1389 2 :
1390 2 : state.wal_connection = Some(WalConnection {
1391 2 : started_at: now,
1392 2 : sk_id: NodeId(1),
1393 2 : availability_zone: None,
1394 2 : status: connection_status,
1395 2 : connection_task: state.spawn(move |sender, _| async move {
1396 2 : sender
1397 2 : .send(TaskStateUpdate::Progress(connection_status))
1398 2 : .ok();
1399 2 : Ok(())
1400 2 : }),
1401 2 : discovered_new_wal: None,
1402 2 : });
1403 2 : state.wal_stream_candidates = HashMap::from([(
1404 2 : NodeId(0),
1405 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1406 2 : )]);
1407 2 :
1408 2 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1409 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1410 2 : );
1411 2 :
1412 2 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1413 2 : match over_threshcurrent_candidate.reason {
1414 2 : ReconnectReason::NoKeepAlives {
1415 2 : last_keep_alive,
1416 2 : threshold,
1417 2 : ..
1418 2 : } => {
1419 2 : assert_eq!(last_keep_alive, Some(time_over_threshold));
1420 2 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1421 2 : }
1422 2 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1423 2 : }
1424 2 : assert_eq!(
1425 2 : over_threshcurrent_candidate.wal_source_connconf.host(),
1426 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1427 2 : );
1428 2 :
1429 2 : Ok(())
1430 2 : }
1431 :
1432 : #[tokio::test]
1433 2 : async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1434 2 : let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate")?;
1435 14 : let mut state = dummy_state(&harness).await;
1436 2 : let current_lsn = Lsn(100_000).align();
1437 2 : let new_lsn = Lsn(100_100).align();
1438 2 : let now = Utc::now().naive_utc();
1439 2 :
1440 2 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
1441 2 : let time_over_threshold =
1442 2 : Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
1443 2 :
1444 2 : let connection_status = WalConnectionStatus {
1445 2 : is_connected: true,
1446 2 : has_processed_wal: true,
1447 2 : latest_connection_update: now,
1448 2 : latest_wal_update: time_over_threshold,
1449 2 : commit_lsn: Some(current_lsn),
1450 2 : streaming_lsn: Some(current_lsn),
1451 2 : node: NodeId(1),
1452 2 : };
1453 2 :
1454 2 : state.wal_connection = Some(WalConnection {
1455 2 : started_at: now,
1456 2 : sk_id: NodeId(1),
1457 2 : availability_zone: None,
1458 2 : status: connection_status,
1459 2 : connection_task: state.spawn(move |_, _| async move { Ok(()) }),
1460 2 : discovered_new_wal: Some(NewCommittedWAL {
1461 2 : discovered_at: time_over_threshold,
1462 2 : lsn: new_lsn,
1463 2 : }),
1464 2 : });
1465 2 : state.wal_stream_candidates = HashMap::from([(
1466 2 : NodeId(0),
1467 2 : dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1468 2 : )]);
1469 2 :
1470 2 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1471 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1472 2 : );
1473 2 :
1474 2 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1475 2 : match over_threshcurrent_candidate.reason {
1476 2 : ReconnectReason::NoWalTimeout {
1477 2 : current_lsn,
1478 2 : current_commit_lsn,
1479 2 : candidate_commit_lsn,
1480 2 : last_wal_interaction,
1481 2 : threshold,
1482 2 : ..
1483 2 : } => {
1484 2 : assert_eq!(current_lsn, current_lsn);
1485 2 : assert_eq!(current_commit_lsn, current_lsn);
1486 2 : assert_eq!(candidate_commit_lsn, new_lsn);
1487 2 : assert_eq!(last_wal_interaction, Some(time_over_threshold));
1488 2 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1489 2 : }
1490 2 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1491 2 : }
1492 2 : assert_eq!(
1493 2 : over_threshcurrent_candidate.wal_source_connconf.host(),
1494 2 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1495 2 : );
1496 2 :
1497 2 : Ok(())
1498 2 : }
1499 :
1500 : const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
1501 :
1502 16 : async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
1503 61 : let (tenant, ctx) = harness.load().await;
1504 16 : let timeline = tenant
1505 16 : .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
1506 45 : .await
1507 16 : .expect("Failed to create an empty timeline for dummy wal connection manager");
1508 16 :
1509 16 : ConnectionManagerState {
1510 16 : id: TenantTimelineId {
1511 16 : tenant_id: harness.tenant_shard_id.tenant_id,
1512 16 : timeline_id: TIMELINE_ID,
1513 16 : },
1514 16 : timeline,
1515 16 : cancel: CancellationToken::new(),
1516 16 : conf: WalReceiverConf {
1517 16 : wal_connect_timeout: Duration::from_secs(1),
1518 16 : lagging_wal_timeout: Duration::from_secs(1),
1519 16 : max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
1520 16 : auth_token: None,
1521 16 : availability_zone: None,
1522 16 : ingest_batch_size: 1,
1523 16 : },
1524 16 : wal_connection: None,
1525 16 : wal_stream_candidates: HashMap::new(),
1526 16 : wal_connection_retries: HashMap::new(),
1527 16 : }
1528 16 : }
1529 :
1530 : #[tokio::test]
1531 2 : async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
1532 2 : // Pageserver and one of safekeepers will be in the same availability zone
1533 2 : // and pageserver should prefer to connect to it.
1534 2 : let test_az = Some("test_az".to_owned());
1535 2 :
1536 2 : let harness = TenantHarness::create("switch_to_same_availability_zone")?;
1537 14 : let mut state = dummy_state(&harness).await;
1538 2 : state.conf.availability_zone.clone_from(&test_az);
1539 2 : let current_lsn = Lsn(100_000).align();
1540 2 : let now = Utc::now().naive_utc();
1541 2 :
1542 2 : let connected_sk_id = NodeId(0);
1543 2 :
1544 2 : let connection_status = WalConnectionStatus {
1545 2 : is_connected: true,
1546 2 : has_processed_wal: true,
1547 2 : latest_connection_update: now,
1548 2 : latest_wal_update: now,
1549 2 : commit_lsn: Some(current_lsn),
1550 2 : streaming_lsn: Some(current_lsn),
1551 2 : node: connected_sk_id,
1552 2 : };
1553 2 :
1554 2 : state.wal_connection = Some(WalConnection {
1555 2 : started_at: now,
1556 2 : sk_id: connected_sk_id,
1557 2 : availability_zone: None,
1558 2 : status: connection_status,
1559 2 : connection_task: state.spawn(move |sender, _| async move {
1560 2 : sender
1561 2 : .send(TaskStateUpdate::Progress(connection_status))
1562 2 : .ok();
1563 2 : Ok(())
1564 2 : }),
1565 2 : discovered_new_wal: None,
1566 2 : });
1567 2 :
1568 2 : // We have another safekeeper with the same commit_lsn, and it have the same availability zone as
1569 2 : // the current pageserver.
1570 2 : let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
1571 2 : same_az_sk.timeline.availability_zone.clone_from(&test_az);
1572 2 :
1573 2 : state.wal_stream_candidates = HashMap::from([
1574 2 : (
1575 2 : connected_sk_id,
1576 2 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1577 2 : ),
1578 2 : (NodeId(1), same_az_sk),
1579 2 : ]);
1580 2 :
1581 2 : // We expect that pageserver will switch to the safekeeper in the same availability zone,
1582 2 : // even if it has the same commit_lsn.
1583 2 : let next_candidate = state.next_connection_candidate().expect(
1584 2 : "Expected one candidate selected out of multiple valid data options, but got none",
1585 2 : );
1586 2 :
1587 2 : assert_eq!(next_candidate.safekeeper_id, NodeId(1));
1588 2 : assert_eq!(
1589 2 : next_candidate.reason,
1590 2 : ReconnectReason::SwitchAvailabilityZone,
1591 2 : "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
1592 2 : );
1593 2 : assert_eq!(
1594 2 : next_candidate.wal_source_connconf.host(),
1595 2 : &Host::Domain("same_az".to_owned())
1596 2 : );
1597 2 :
1598 2 : Ok(())
1599 2 : }
1600 : }
|