Line data Source code
1 : //! WAL receiver logic that ensures the pageserver gets connectected to safekeeper,
2 : //! that contains the latest WAL to stream and this connection does not go stale.
3 : //!
4 : //! To achieve that, a storage broker is used: safekepers propagate their timelines' state in it,
5 : //! the manager subscribes for changes and accumulates those to query the one with the biggest Lsn for connection.
6 : //! Current connection state is tracked too, to ensure it's not getting stale.
7 : //!
8 : //! After every connection or storage broker update fetched, the state gets updated correspondingly and rechecked for the new conneciton leader,
9 : //! then a (re)connection happens, if necessary.
10 : //! Only WAL streaming task expects to be finished, other loops (storage broker, connection management) never exit unless cancelled explicitly via the dedicated channel.
11 :
12 : use std::collections::HashMap;
13 : use std::num::NonZeroU64;
14 : use std::ops::ControlFlow;
15 : use std::sync::Arc;
16 : use std::time::Duration;
17 :
18 : use anyhow::Context;
19 : use chrono::{NaiveDateTime, Utc};
20 : use pageserver_api::models::TimelineState;
21 : use postgres_connection::PgConnectionConfig;
22 : use storage_broker::proto::{
23 : FilterTenantTimelineId, MessageType, SafekeeperDiscoveryRequest, SafekeeperDiscoveryResponse,
24 : SubscribeByFilterRequest, TenantTimelineId as ProtoTenantTimelineId, TypeSubscription,
25 : TypedMessage,
26 : };
27 : use storage_broker::{BrokerClientChannel, Code, Streaming};
28 : use tokio_util::sync::CancellationToken;
29 : use tracing::*;
30 : use utils::backoff::{
31 : DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, exponential_backoff,
32 : };
33 : use utils::id::{NodeId, TenantTimelineId};
34 : use utils::lsn::Lsn;
35 : use utils::postgres_client::{
36 : ConnectionConfigArgs, PostgresClientProtocol, wal_stream_connection_config,
37 : };
38 :
39 : use super::walreceiver_connection::{WalConnectionStatus, WalReceiverError};
40 : use super::{TaskEvent, TaskHandle, TaskStateUpdate, WalReceiverConf};
41 : use crate::context::{DownloadBehavior, RequestContext};
42 : use crate::metrics::{
43 : WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
44 : WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
45 : };
46 : use crate::task_mgr::TaskKind;
47 : use crate::tenant::{Timeline, debug_assert_current_span_has_tenant_and_timeline_id};
48 :
49 : pub(crate) struct Cancelled;
50 :
51 : /// Attempts to subscribe for timeline updates, pushed by safekeepers into the broker.
52 : /// Based on the updates, desides whether to start, keep or stop a WAL receiver task.
53 : /// If storage broker subscription is cancelled, exits.
54 : ///
55 : /// # Cancel-Safety
56 : ///
57 : /// Not cancellation-safe. Use `cancel` token to request cancellation.
58 0 : pub(super) async fn connection_manager_loop_step(
59 0 : broker_client: &mut BrokerClientChannel,
60 0 : connection_manager_state: &mut ConnectionManagerState,
61 0 : ctx: &RequestContext,
62 0 : cancel: &CancellationToken,
63 0 : manager_status: &std::sync::RwLock<Option<ConnectionManagerStatus>>,
64 0 : ) -> Result<(), Cancelled> {
65 0 : match tokio::select! {
66 0 : _ = cancel.cancelled() => { return Err(Cancelled); },
67 0 : st = connection_manager_state.timeline.wait_to_become_active(ctx) => { st }
68 : } {
69 0 : Ok(()) => {}
70 0 : Err(new_state) => {
71 0 : debug!(
72 : ?new_state,
73 0 : "state changed, stopping wal connection manager loop"
74 : );
75 0 : return Err(Cancelled);
76 : }
77 : }
78 :
79 0 : WALRECEIVER_ACTIVE_MANAGERS.inc();
80 0 : scopeguard::defer! {
81 0 : WALRECEIVER_ACTIVE_MANAGERS.dec();
82 0 : }
83 0 :
84 0 : let id = TenantTimelineId {
85 0 : tenant_id: connection_manager_state.timeline.tenant_shard_id.tenant_id,
86 0 : timeline_id: connection_manager_state.timeline.timeline_id,
87 0 : };
88 0 :
89 0 : let mut timeline_state_updates = connection_manager_state
90 0 : .timeline
91 0 : .subscribe_for_state_updates();
92 0 :
93 0 : let mut wait_lsn_status = connection_manager_state
94 0 : .timeline
95 0 : .subscribe_for_wait_lsn_updates();
96 0 :
97 0 : // TODO: create a separate config option for discovery request interval
98 0 : let discovery_request_interval = connection_manager_state.conf.lagging_wal_timeout;
99 0 : let mut last_discovery_ts: Option<std::time::Instant> = None;
100 :
101 : // Subscribe to the broker updates. Stream shares underlying TCP connection
102 : // with other streams on this client (other connection managers). When
103 : // object goes out of scope, stream finishes in drop() automatically.
104 0 : let mut broker_subscription = subscribe_for_timeline_updates(broker_client, id, cancel).await?;
105 0 : debug!("Subscribed for broker timeline updates");
106 :
107 : loop {
108 0 : let time_until_next_retry = connection_manager_state.time_until_next_retry();
109 0 : let any_activity = connection_manager_state.wal_connection.is_some()
110 0 : || !connection_manager_state.wal_stream_candidates.is_empty();
111 :
112 : // These things are happening concurrently:
113 : //
114 : // - cancellation request
115 : // - keep receiving WAL on the current connection
116 : // - if the shared state says we need to change connection, disconnect and return
117 : // - this runs in a separate task and we receive updates via a watch channel
118 : // - change connection if the rules decide so, or if the current connection dies
119 : // - receive updates from broker
120 : // - this might change the current desired connection
121 : // - timeline state changes to something that does not allow walreceiver to run concurrently
122 : // - if there's no connection and no candidates, try to send a discovery request
123 :
124 : // NB: make sure each of the select expressions are cancellation-safe
125 : // (no need for arms to be cancellation-safe).
126 0 : tokio::select! {
127 0 : _ = cancel.cancelled() => { return Err(Cancelled); }
128 0 : Some(wal_connection_update) = async {
129 0 : match connection_manager_state.wal_connection.as_mut() {
130 0 : Some(wal_connection) => Some(wal_connection.connection_task.next_task_event().await),
131 0 : None => None,
132 : }
133 0 : } => {
134 0 : let wal_connection = connection_manager_state.wal_connection.as_mut()
135 0 : .expect("Should have a connection, as checked by the corresponding select! guard");
136 0 : match wal_connection_update {
137 0 : TaskEvent::Update(TaskStateUpdate::Started) => {},
138 0 : TaskEvent::Update(TaskStateUpdate::Progress(new_status)) => {
139 0 : if new_status.has_processed_wal {
140 0 : // We have advanced last_record_lsn by processing the WAL received
141 0 : // from this safekeeper. This is good enough to clean unsuccessful
142 0 : // retries history and allow reconnecting to this safekeeper without
143 0 : // sleeping for a long time.
144 0 : connection_manager_state.wal_connection_retries.remove(&wal_connection.sk_id);
145 0 : }
146 0 : wal_connection.status = new_status;
147 : }
148 0 : TaskEvent::End(walreceiver_task_result) => {
149 0 : match walreceiver_task_result {
150 0 : Ok(()) => debug!("WAL receiving task finished"),
151 0 : Err(e) => error!("wal receiver task finished with an error: {e:?}"),
152 : }
153 0 : connection_manager_state.drop_old_connection(false).await;
154 : },
155 : }
156 : },
157 :
158 : // Got a new update from the broker
159 0 : broker_update = broker_subscription.message() /* TODO: review cancellation-safety */ => {
160 0 : match broker_update {
161 0 : Ok(Some(broker_update)) => connection_manager_state.register_timeline_update(broker_update),
162 0 : Err(status) => {
163 0 : match status.code() {
164 0 : Code::Unknown if status.message().contains("stream closed because of a broken pipe") || status.message().contains("connection reset") || status.message().contains("error reading a body from connection") => {
165 0 : // tonic's error handling doesn't provide a clear code for disconnections: we get
166 0 : // "h2 protocol error: error reading a body from connection: stream closed because of a broken pipe"
167 0 : // => https://github.com/neondatabase/neon/issues/9562
168 0 : info!("broker disconnected: {status}");
169 : },
170 : _ => {
171 0 : warn!("broker subscription failed: {status}");
172 : }
173 : }
174 0 : return Ok(());
175 : }
176 : Ok(None) => {
177 0 : error!("broker subscription stream ended"); // can't happen
178 0 : return Ok(());
179 : }
180 : }
181 : },
182 :
183 0 : new_event = async {
184 : // Reminder: this match arm needs to be cancellation-safe.
185 : loop {
186 0 : if connection_manager_state.timeline.current_state() == TimelineState::Loading {
187 0 : warn!("wal connection manager should only be launched after timeline has become active");
188 0 : }
189 0 : match timeline_state_updates.changed().await {
190 : Ok(()) => {
191 0 : let new_state = connection_manager_state.timeline.current_state();
192 0 : match new_state {
193 : // we're already active as walreceiver, no need to reactivate
194 0 : TimelineState::Active => continue,
195 : TimelineState::Broken { .. } | TimelineState::Stopping => {
196 0 : debug!("timeline entered terminal state {new_state:?}, stopping wal connection manager loop");
197 0 : return ControlFlow::Break(());
198 : }
199 : TimelineState::Loading => {
200 0 : warn!("timeline transitioned back to Loading state, that should not happen");
201 0 : return ControlFlow::Continue(());
202 : }
203 : }
204 : }
205 0 : Err(_sender_dropped_error) => return ControlFlow::Break(()),
206 : }
207 : }
208 0 : } => match new_event {
209 : ControlFlow::Continue(()) => {
210 0 : return Ok(());
211 : }
212 : ControlFlow::Break(()) => {
213 0 : debug!("Timeline is no longer active, stopping wal connection manager loop");
214 0 : return Err(Cancelled);
215 : }
216 : },
217 :
218 0 : Some(()) = async {
219 0 : match time_until_next_retry {
220 0 : Some(sleep_time) => {
221 0 : tokio::time::sleep(sleep_time).await;
222 0 : Some(())
223 : },
224 : None => {
225 0 : debug!("No candidates to retry, waiting indefinitely for the broker events");
226 0 : None
227 : }
228 : }
229 0 : } => debug!("Waking up for the next retry after waiting for {time_until_next_retry:?}"),
230 :
231 0 : Some(()) = async {
232 0 : // Reminder: this match arm needs to be cancellation-safe.
233 0 : // Calculating time needed to wait until sending the next discovery request.
234 0 : // Current implementation is conservative and sends discovery requests only when there are no candidates.
235 0 :
236 0 : if any_activity {
237 : // No need to send discovery requests if there is an active connection or candidates.
238 0 : return None;
239 0 : }
240 :
241 : // Waiting for an active wait_lsn request.
242 0 : while wait_lsn_status.borrow().is_none() {
243 0 : if wait_lsn_status.changed().await.is_err() {
244 : // wait_lsn_status channel was closed, exiting
245 0 : warn!("wait_lsn_status channel was closed in connection_manager_loop_step");
246 0 : return None;
247 0 : }
248 : }
249 :
250 : // All preconditions met, preparing to send a discovery request.
251 0 : let now = std::time::Instant::now();
252 0 : let next_discovery_ts = last_discovery_ts
253 0 : .map(|ts| ts + discovery_request_interval)
254 0 : .unwrap_or_else(|| now);
255 0 :
256 0 : if next_discovery_ts > now {
257 : // Prevent sending discovery requests too frequently.
258 0 : tokio::time::sleep(next_discovery_ts - now).await;
259 0 : }
260 :
261 0 : let tenant_timeline_id = Some(ProtoTenantTimelineId {
262 0 : tenant_id: id.tenant_id.as_ref().to_owned(),
263 0 : timeline_id: id.timeline_id.as_ref().to_owned(),
264 0 : });
265 0 : let request = SafekeeperDiscoveryRequest { tenant_timeline_id };
266 0 : let msg = TypedMessage {
267 0 : r#type: MessageType::SafekeeperDiscoveryRequest as i32,
268 0 : safekeeper_timeline_info: None,
269 0 : safekeeper_discovery_request: Some(request),
270 0 : safekeeper_discovery_response: None,
271 0 : };
272 0 :
273 0 : last_discovery_ts = Some(std::time::Instant::now());
274 0 : info!("No active connection and no candidates, sending discovery request to the broker");
275 :
276 : // Cancellation safety: we want to send a message to the broker, but publish_one()
277 : // function can get cancelled by the other select! arm. This is absolutely fine, because
278 : // we just want to receive broker updates and discovery is not important if we already
279 : // receive updates.
280 : //
281 : // It is possible that `last_discovery_ts` will be updated, but the message will not be sent.
282 : // This is totally fine because of the reason above.
283 :
284 : // This is a fire-and-forget request, we don't care about the response
285 0 : let _ = broker_client.publish_one(msg).await;
286 0 : debug!("Discovery request sent to the broker");
287 0 : None
288 0 : } => {}
289 : }
290 :
291 0 : if let Some(new_candidate) = connection_manager_state.next_connection_candidate() {
292 0 : info!("Switching to new connection candidate: {new_candidate:?}");
293 0 : connection_manager_state
294 0 : .change_connection(new_candidate, ctx)
295 0 : .await
296 0 : }
297 0 : *manager_status.write().unwrap() = Some(connection_manager_state.manager_status());
298 : }
299 0 : }
300 :
301 : /// Endlessly try to subscribe for broker updates for a given timeline.
302 0 : async fn subscribe_for_timeline_updates(
303 0 : broker_client: &mut BrokerClientChannel,
304 0 : id: TenantTimelineId,
305 0 : cancel: &CancellationToken,
306 0 : ) -> Result<Streaming<TypedMessage>, Cancelled> {
307 0 : let mut attempt = 0;
308 : loop {
309 0 : exponential_backoff(
310 0 : attempt,
311 0 : DEFAULT_BASE_BACKOFF_SECONDS,
312 0 : DEFAULT_MAX_BACKOFF_SECONDS,
313 0 : cancel,
314 0 : )
315 0 : .await;
316 0 : attempt += 1;
317 0 :
318 0 : // subscribe to the specific timeline
319 0 : let request = SubscribeByFilterRequest {
320 0 : types: vec![
321 0 : TypeSubscription {
322 0 : r#type: MessageType::SafekeeperTimelineInfo as i32,
323 0 : },
324 0 : TypeSubscription {
325 0 : r#type: MessageType::SafekeeperDiscoveryResponse as i32,
326 0 : },
327 0 : ],
328 0 : tenant_timeline_id: Some(FilterTenantTimelineId {
329 0 : enabled: true,
330 0 : tenant_timeline_id: Some(ProtoTenantTimelineId {
331 0 : tenant_id: id.tenant_id.as_ref().to_owned(),
332 0 : timeline_id: id.timeline_id.as_ref().to_owned(),
333 0 : }),
334 0 : }),
335 0 : };
336 0 :
337 0 : match {
338 0 : tokio::select! {
339 0 : r = broker_client.subscribe_by_filter(request) => { r }
340 0 : _ = cancel.cancelled() => { return Err(Cancelled); }
341 : }
342 : } {
343 0 : Ok(resp) => {
344 0 : return Ok(resp.into_inner());
345 : }
346 0 : Err(e) => {
347 0 : // Safekeeper nodes can stop pushing timeline updates to the broker, when no new writes happen and
348 0 : // entire WAL is streamed. Keep this noticeable with logging, but do not warn/error.
349 0 : info!(
350 0 : "Attempt #{attempt}, failed to subscribe for timeline {id} updates in broker: {e:#}"
351 : );
352 0 : continue;
353 : }
354 : }
355 : }
356 0 : }
357 :
358 : const WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS: f64 = 0.1;
359 : const WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS: f64 = 15.0;
360 : const WALCONNECTION_RETRY_BACKOFF_MULTIPLIER: f64 = 1.5;
361 :
362 : /// All data that's needed to run endless broker loop and keep the WAL streaming connection alive, if possible.
363 : pub(super) struct ConnectionManagerState {
364 : id: TenantTimelineId,
365 : /// Use pageserver data about the timeline to filter out some of the safekeepers.
366 : timeline: Arc<Timeline>,
367 : /// Child token of [`super::WalReceiver::cancel`], inherited to all tasks we spawn.
368 : cancel: CancellationToken,
369 : conf: WalReceiverConf,
370 : /// Current connection to safekeeper for WAL streaming.
371 : wal_connection: Option<WalConnection>,
372 : /// Info about retries and unsuccessful attempts to connect to safekeepers.
373 : wal_connection_retries: HashMap<NodeId, RetryInfo>,
374 : /// Data about all timelines, available for connection, fetched from storage broker, grouped by their corresponding safekeeper node id.
375 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
376 : }
377 :
378 : /// An information about connection manager's current connection and connection candidates.
379 : #[derive(Debug, Clone)]
380 : pub struct ConnectionManagerStatus {
381 : existing_connection: Option<WalConnectionStatus>,
382 : wal_stream_candidates: HashMap<NodeId, BrokerSkTimeline>,
383 : }
384 :
385 : impl ConnectionManagerStatus {
386 : /// Generates a string, describing current connection status in a form, suitable for logging.
387 0 : pub fn to_human_readable_string(&self) -> String {
388 0 : let mut resulting_string = String::new();
389 0 : match &self.existing_connection {
390 0 : Some(connection) => {
391 0 : if connection.has_processed_wal {
392 0 : resulting_string.push_str(&format!(
393 0 : " (update {}): streaming WAL from node {}, ",
394 0 : connection.latest_wal_update.format("%Y-%m-%d %H:%M:%S"),
395 0 : connection.node,
396 0 : ));
397 0 :
398 0 : match (connection.streaming_lsn, connection.commit_lsn) {
399 0 : (None, None) => resulting_string.push_str("no streaming data"),
400 0 : (None, Some(commit_lsn)) => {
401 0 : resulting_string.push_str(&format!("commit Lsn: {commit_lsn}"))
402 : }
403 0 : (Some(streaming_lsn), None) => {
404 0 : resulting_string.push_str(&format!("streaming Lsn: {streaming_lsn}"))
405 : }
406 0 : (Some(streaming_lsn), Some(commit_lsn)) => resulting_string.push_str(
407 0 : &format!("commit|streaming Lsn: {commit_lsn}|{streaming_lsn}"),
408 0 : ),
409 : }
410 0 : } else if connection.is_connected {
411 0 : resulting_string.push_str(&format!(
412 0 : " (update {}): connecting to node {}",
413 0 : connection
414 0 : .latest_connection_update
415 0 : .format("%Y-%m-%d %H:%M:%S"),
416 0 : connection.node,
417 0 : ));
418 0 : } else {
419 0 : resulting_string.push_str(&format!(
420 0 : " (update {}): initializing node {} connection",
421 0 : connection
422 0 : .latest_connection_update
423 0 : .format("%Y-%m-%d %H:%M:%S"),
424 0 : connection.node,
425 0 : ));
426 0 : }
427 : }
428 0 : None => resulting_string.push_str(": disconnected"),
429 : }
430 :
431 0 : resulting_string.push_str(", safekeeper candidates (id|update_time|commit_lsn): [");
432 0 : let mut candidates = self.wal_stream_candidates.iter().peekable();
433 0 : while let Some((node_id, candidate_info)) = candidates.next() {
434 0 : resulting_string.push_str(&format!(
435 0 : "({}|{}|{})",
436 0 : node_id,
437 0 : candidate_info.latest_update.format("%H:%M:%S"),
438 0 : Lsn(candidate_info.timeline.commit_lsn)
439 0 : ));
440 0 : if candidates.peek().is_some() {
441 0 : resulting_string.push_str(", ");
442 0 : }
443 : }
444 0 : resulting_string.push(']');
445 0 :
446 0 : resulting_string
447 0 : }
448 : }
449 :
450 : /// Current connection data.
451 : #[derive(Debug)]
452 : struct WalConnection {
453 : /// Time when the connection was initiated.
454 : started_at: NaiveDateTime,
455 : /// Current safekeeper pageserver is connected to for WAL streaming.
456 : sk_id: NodeId,
457 : /// Availability zone of the safekeeper.
458 : availability_zone: Option<String>,
459 : /// Status of the connection.
460 : status: WalConnectionStatus,
461 : /// WAL streaming task handle.
462 : connection_task: TaskHandle<WalConnectionStatus>,
463 : /// Have we discovered that other safekeeper has more recent WAL than we do?
464 : discovered_new_wal: Option<NewCommittedWAL>,
465 : }
466 :
467 : /// Notion of a new committed WAL, which exists on other safekeeper.
468 : #[derive(Debug, Clone, Copy)]
469 : struct NewCommittedWAL {
470 : /// LSN of the new committed WAL.
471 : lsn: Lsn,
472 : /// When we discovered that the new committed WAL exists on other safekeeper.
473 : discovered_at: NaiveDateTime,
474 : }
475 :
476 : #[derive(Debug, Clone, Copy)]
477 : struct RetryInfo {
478 : next_retry_at: Option<NaiveDateTime>,
479 : retry_duration_seconds: f64,
480 : }
481 :
482 : /// Data about the timeline to connect to, received from the broker.
483 : #[derive(Debug, Clone)]
484 : struct BrokerSkTimeline {
485 : timeline: SafekeeperDiscoveryResponse,
486 : /// Time at which the data was fetched from the broker last time, to track the stale data.
487 : latest_update: NaiveDateTime,
488 : }
489 :
490 : impl ConnectionManagerState {
491 0 : pub(super) fn new(
492 0 : timeline: Arc<Timeline>,
493 0 : conf: WalReceiverConf,
494 0 : cancel: CancellationToken,
495 0 : ) -> Self {
496 0 : let id = TenantTimelineId {
497 0 : tenant_id: timeline.tenant_shard_id.tenant_id,
498 0 : timeline_id: timeline.timeline_id,
499 0 : };
500 0 : Self {
501 0 : id,
502 0 : timeline,
503 0 : cancel,
504 0 : conf,
505 0 : wal_connection: None,
506 0 : wal_stream_candidates: HashMap::new(),
507 0 : wal_connection_retries: HashMap::new(),
508 0 : }
509 0 : }
510 :
511 60 : fn spawn<Fut>(
512 60 : &self,
513 60 : task: impl FnOnce(
514 60 : tokio::sync::watch::Sender<TaskStateUpdate<WalConnectionStatus>>,
515 60 : CancellationToken,
516 60 : ) -> Fut
517 60 : + Send
518 60 : + 'static,
519 60 : ) -> TaskHandle<WalConnectionStatus>
520 60 : where
521 60 : Fut: std::future::Future<Output = anyhow::Result<()>> + Send,
522 60 : {
523 60 : // TODO: get rid of TaskHandle
524 60 : super::TaskHandle::spawn(&self.cancel, task)
525 60 : }
526 :
527 : /// Shuts down the current connection (if any) and immediately starts another one with the given connection string.
528 0 : async fn change_connection(&mut self, new_sk: NewWalConnectionCandidate, ctx: &RequestContext) {
529 0 : WALRECEIVER_SWITCHES
530 0 : .with_label_values(&[new_sk.reason.name()])
531 0 : .inc();
532 0 :
533 0 : self.drop_old_connection(true).await;
534 :
535 0 : let node_id = new_sk.safekeeper_id;
536 0 : let connect_timeout = self.conf.wal_connect_timeout;
537 0 : let ingest_batch_size = self.conf.ingest_batch_size;
538 0 : let protocol = self.conf.protocol;
539 0 : let validate_wal_contiguity = self.conf.validate_wal_contiguity;
540 0 : let timeline = Arc::clone(&self.timeline);
541 0 : let ctx = ctx.detached_child(
542 0 : TaskKind::WalReceiverConnectionHandler,
543 0 : DownloadBehavior::Download,
544 0 : );
545 :
546 0 : let span = info_span!("connection", %node_id);
547 0 : let connection_handle = self.spawn(move |events_sender, cancellation| {
548 0 : async move {
549 0 : debug_assert_current_span_has_tenant_and_timeline_id();
550 :
551 0 : let res = super::walreceiver_connection::handle_walreceiver_connection(
552 0 : timeline,
553 0 : protocol,
554 0 : new_sk.wal_source_connconf,
555 0 : events_sender,
556 0 : cancellation.clone(),
557 0 : connect_timeout,
558 0 : ctx,
559 0 : node_id,
560 0 : ingest_batch_size,
561 0 : validate_wal_contiguity,
562 0 : )
563 0 : .await;
564 :
565 0 : match res {
566 0 : Ok(()) => Ok(()),
567 0 : Err(e) => {
568 0 : match e {
569 0 : WalReceiverError::SuccessfulCompletion(msg) => {
570 0 : info!("walreceiver connection handling ended with success: {msg}");
571 0 : Ok(())
572 : }
573 0 : WalReceiverError::ExpectedSafekeeperError(e) => {
574 0 : info!("walreceiver connection handling ended: {e}");
575 0 : Ok(())
576 : }
577 : WalReceiverError::ClosedGate => {
578 0 : info!(
579 0 : "walreceiver connection handling ended because of closed gate"
580 : );
581 0 : Ok(())
582 : }
583 0 : WalReceiverError::Cancelled => Ok(()),
584 0 : WalReceiverError::Other(e) => {
585 0 : // give out an error to have task_mgr give it a really verbose logging
586 0 : if cancellation.is_cancelled() {
587 : // Ideally we would learn about this via some path other than Other, but
588 : // that requires refactoring all the intermediate layers of ingest code
589 : // that only emit anyhow::Error
590 0 : Ok(())
591 : } else {
592 0 : Err(e).context("walreceiver connection handling failure")
593 : }
594 : }
595 : }
596 : }
597 : }
598 0 : }
599 0 : .instrument(span)
600 0 : });
601 0 :
602 0 : let now = Utc::now().naive_utc();
603 0 : self.wal_connection = Some(WalConnection {
604 0 : started_at: now,
605 0 : sk_id: new_sk.safekeeper_id,
606 0 : availability_zone: new_sk.availability_zone,
607 0 : status: WalConnectionStatus {
608 0 : is_connected: false,
609 0 : has_processed_wal: false,
610 0 : latest_connection_update: now,
611 0 : latest_wal_update: now,
612 0 : streaming_lsn: None,
613 0 : commit_lsn: None,
614 0 : node: node_id,
615 0 : },
616 0 : connection_task: connection_handle,
617 0 : discovered_new_wal: None,
618 0 : });
619 0 : }
620 :
621 : /// Drops the current connection (if any) and updates retry timeout for the next
622 : /// connection attempt to the same safekeeper.
623 : ///
624 : /// # Cancel-Safety
625 : ///
626 : /// Not cancellation-safe.
627 0 : async fn drop_old_connection(&mut self, needs_shutdown: bool) {
628 0 : let wal_connection = match self.wal_connection.take() {
629 0 : Some(wal_connection) => wal_connection,
630 0 : None => return,
631 : };
632 :
633 0 : if needs_shutdown {
634 0 : wal_connection
635 0 : .connection_task
636 0 : .shutdown()
637 0 : // This here is why this function isn't cancellation-safe.
638 0 : // If we got cancelled here, then self.wal_connection is already None and we lose track of the task.
639 0 : // Even if our caller diligently calls Self::shutdown(), it will find a self.wal_connection=None
640 0 : // and thus be ineffective.
641 0 : .await;
642 0 : }
643 :
644 0 : let retry = self
645 0 : .wal_connection_retries
646 0 : .entry(wal_connection.sk_id)
647 0 : .or_insert(RetryInfo {
648 0 : next_retry_at: None,
649 0 : retry_duration_seconds: WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS,
650 0 : });
651 0 :
652 0 : let now = Utc::now().naive_utc();
653 0 :
654 0 : // Schedule the next retry attempt. We want to have exponential backoff for connection attempts,
655 0 : // and we add backoff to the time when we started the connection attempt. If the connection
656 0 : // was active for a long time, then next_retry_at will be in the past.
657 0 : retry.next_retry_at =
658 0 : wal_connection
659 0 : .started_at
660 0 : .checked_add_signed(chrono::Duration::milliseconds(
661 0 : (retry.retry_duration_seconds * 1000.0) as i64,
662 0 : ));
663 :
664 0 : if let Some(next) = &retry.next_retry_at {
665 0 : if next > &now {
666 0 : info!(
667 0 : "Next connection retry to {:?} is at {}",
668 : wal_connection.sk_id, next
669 : );
670 0 : }
671 0 : }
672 :
673 0 : let next_retry_duration =
674 0 : retry.retry_duration_seconds * WALCONNECTION_RETRY_BACKOFF_MULTIPLIER;
675 0 : // Clamp the next retry duration to the maximum allowed.
676 0 : let next_retry_duration = next_retry_duration.min(WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS);
677 0 : // Clamp the next retry duration to the minimum allowed.
678 0 : let next_retry_duration = next_retry_duration.max(WALCONNECTION_RETRY_MIN_BACKOFF_SECONDS);
679 0 :
680 0 : retry.retry_duration_seconds = next_retry_duration;
681 0 : }
682 :
683 : /// Returns time needed to wait to have a new candidate for WAL streaming.
684 0 : fn time_until_next_retry(&self) -> Option<Duration> {
685 0 : let now = Utc::now().naive_utc();
686 :
687 0 : let next_retry_at = self
688 0 : .wal_connection_retries
689 0 : .values()
690 0 : .filter_map(|retry| retry.next_retry_at)
691 0 : .filter(|next_retry_at| next_retry_at > &now)
692 0 : .min()?;
693 :
694 0 : (next_retry_at - now).to_std().ok()
695 0 : }
696 :
697 : /// Adds another broker timeline into the state, if its more recent than the one already added there for the same key.
698 0 : fn register_timeline_update(&mut self, typed_msg: TypedMessage) {
699 0 : let mut is_discovery = false;
700 0 : let timeline_update = match typed_msg.r#type() {
701 : MessageType::SafekeeperTimelineInfo => {
702 0 : let info = match typed_msg.safekeeper_timeline_info {
703 0 : Some(info) => info,
704 : None => {
705 0 : warn!("bad proto message from broker: no safekeeper_timeline_info");
706 0 : return;
707 : }
708 : };
709 0 : SafekeeperDiscoveryResponse {
710 0 : safekeeper_id: info.safekeeper_id,
711 0 : tenant_timeline_id: info.tenant_timeline_id,
712 0 : commit_lsn: info.commit_lsn,
713 0 : safekeeper_connstr: info.safekeeper_connstr,
714 0 : availability_zone: info.availability_zone,
715 0 : standby_horizon: info.standby_horizon,
716 0 : }
717 : }
718 : MessageType::SafekeeperDiscoveryResponse => {
719 0 : is_discovery = true;
720 0 : match typed_msg.safekeeper_discovery_response {
721 0 : Some(response) => response,
722 : None => {
723 0 : warn!("bad proto message from broker: no safekeeper_discovery_response");
724 0 : return;
725 : }
726 : }
727 : }
728 : _ => {
729 : // unexpected message
730 0 : return;
731 : }
732 : };
733 :
734 0 : WALRECEIVER_BROKER_UPDATES.inc();
735 0 :
736 0 : trace!(
737 0 : "safekeeper info update: standby_horizon(cutoff)={}",
738 : timeline_update.standby_horizon
739 : );
740 0 : if timeline_update.standby_horizon != 0 {
741 0 : // ignore reports from safekeepers not connected to replicas
742 0 : self.timeline
743 0 : .standby_horizon
744 0 : .store(Lsn(timeline_update.standby_horizon));
745 0 : self.timeline
746 0 : .metrics
747 0 : .standby_horizon_gauge
748 0 : .set(timeline_update.standby_horizon as i64);
749 0 : }
750 :
751 0 : let new_safekeeper_id = NodeId(timeline_update.safekeeper_id);
752 0 : let old_entry = self.wal_stream_candidates.insert(
753 0 : new_safekeeper_id,
754 0 : BrokerSkTimeline {
755 0 : timeline: timeline_update,
756 0 : latest_update: Utc::now().naive_utc(),
757 0 : },
758 0 : );
759 0 :
760 0 : if old_entry.is_none() {
761 0 : info!(
762 : ?is_discovery,
763 : %new_safekeeper_id,
764 0 : "New SK node was added",
765 : );
766 0 : WALRECEIVER_CANDIDATES_ADDED.inc();
767 0 : }
768 0 : }
769 :
770 : /// Cleans up stale broker records and checks the rest for the new connection candidate.
771 : /// Returns a new candidate, if the current state is absent or somewhat lagging, `None` otherwise.
772 : /// The current rules for approving new candidates:
773 : /// * pick a candidate different from the connected safekeeper with biggest `commit_lsn` and lowest failed connection attemps
774 : /// * if there's no such entry, no new candidate found, abort
775 : /// * otherwise check if the candidate is much better than the current one
776 : ///
777 : /// To understand exact rules for determining if the candidate is better than the current one, refer to this function's implementation.
778 : /// General rules are following:
779 : /// * if connected safekeeper is not present, pick the candidate
780 : /// * if we haven't received any updates for some time, pick the candidate
781 : /// * if the candidate commit_lsn is much higher than the current one, pick the candidate
782 : /// * if the candidate commit_lsn is same, but candidate is located in the same AZ as the pageserver, pick the candidate
783 : /// * if connected safekeeper stopped sending us new WAL which is available on other safekeeper, pick the candidate
784 : ///
785 : /// This way we ensure to keep up with the most up-to-date safekeeper and don't try to jump from one safekeeper to another too frequently.
786 : /// Both thresholds are configured per tenant.
787 108 : fn next_connection_candidate(&mut self) -> Option<NewWalConnectionCandidate> {
788 108 : self.cleanup_old_candidates();
789 108 :
790 108 : match &self.wal_connection {
791 60 : Some(existing_wal_connection) => {
792 60 : let connected_sk_node = existing_wal_connection.sk_id;
793 :
794 60 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
795 60 : self.select_connection_candidate(Some(connected_sk_node))?;
796 60 : let new_availability_zone = new_safekeeper_broker_data.availability_zone.clone();
797 60 :
798 60 : let now = Utc::now().naive_utc();
799 60 : if let Ok(latest_interaciton) =
800 60 : (now - existing_wal_connection.status.latest_connection_update).to_std()
801 : {
802 : // Drop connection if we haven't received keepalive message for a while.
803 60 : if latest_interaciton > self.conf.wal_connect_timeout {
804 12 : return Some(NewWalConnectionCandidate {
805 12 : safekeeper_id: new_sk_id,
806 12 : wal_source_connconf: new_wal_source_connconf,
807 12 : availability_zone: new_availability_zone,
808 12 : reason: ReconnectReason::NoKeepAlives {
809 12 : last_keep_alive: Some(
810 12 : existing_wal_connection.status.latest_connection_update,
811 12 : ),
812 12 : check_time: now,
813 12 : threshold: self.conf.wal_connect_timeout,
814 12 : },
815 12 : });
816 48 : }
817 0 : }
818 :
819 48 : if !existing_wal_connection.status.is_connected {
820 : // We haven't connected yet and we shouldn't switch until connection timeout (condition above).
821 0 : return None;
822 48 : }
823 :
824 48 : if let Some(current_commit_lsn) = existing_wal_connection.status.commit_lsn {
825 48 : let new_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
826 48 : // Check if the new candidate has much more WAL than the current one.
827 48 : match new_commit_lsn.0.checked_sub(current_commit_lsn.0) {
828 48 : Some(new_sk_lsn_advantage) => {
829 48 : if new_sk_lsn_advantage >= self.conf.max_lsn_wal_lag.get() {
830 12 : return Some(NewWalConnectionCandidate {
831 12 : safekeeper_id: new_sk_id,
832 12 : wal_source_connconf: new_wal_source_connconf,
833 12 : availability_zone: new_availability_zone,
834 12 : reason: ReconnectReason::LaggingWal {
835 12 : current_commit_lsn,
836 12 : new_commit_lsn,
837 12 : threshold: self.conf.max_lsn_wal_lag,
838 12 : },
839 12 : });
840 36 : }
841 36 : // If we have a candidate with the same commit_lsn as the current one, which is in the same AZ as pageserver,
842 36 : // and the current one is not, switch to the new one.
843 36 : if self.conf.availability_zone.is_some()
844 12 : && existing_wal_connection.availability_zone
845 12 : != self.conf.availability_zone
846 12 : && self.conf.availability_zone == new_availability_zone
847 : {
848 12 : return Some(NewWalConnectionCandidate {
849 12 : safekeeper_id: new_sk_id,
850 12 : availability_zone: new_availability_zone,
851 12 : wal_source_connconf: new_wal_source_connconf,
852 12 : reason: ReconnectReason::SwitchAvailabilityZone,
853 12 : });
854 24 : }
855 : }
856 0 : None => debug!(
857 0 : "Best SK candidate has its commit_lsn behind connected SK's commit_lsn"
858 : ),
859 : }
860 0 : }
861 :
862 24 : let current_lsn = match existing_wal_connection.status.streaming_lsn {
863 24 : Some(lsn) => lsn,
864 0 : None => self.timeline.get_last_record_lsn(),
865 : };
866 24 : let current_commit_lsn = existing_wal_connection
867 24 : .status
868 24 : .commit_lsn
869 24 : .unwrap_or(current_lsn);
870 24 : let candidate_commit_lsn = Lsn(new_safekeeper_broker_data.commit_lsn);
871 24 :
872 24 : // Keep discovered_new_wal only if connected safekeeper has not caught up yet.
873 24 : let mut discovered_new_wal = existing_wal_connection
874 24 : .discovered_new_wal
875 24 : .filter(|new_wal| new_wal.lsn > current_commit_lsn);
876 24 :
877 24 : if discovered_new_wal.is_none() {
878 : // Check if the new candidate has more WAL than the current one.
879 : // If the new candidate has more WAL than the current one, we consider switching to the new candidate.
880 12 : discovered_new_wal = if candidate_commit_lsn > current_commit_lsn {
881 12 : trace!(
882 0 : "New candidate has commit_lsn {}, higher than current_commit_lsn {}",
883 : candidate_commit_lsn, current_commit_lsn
884 : );
885 12 : Some(NewCommittedWAL {
886 12 : lsn: candidate_commit_lsn,
887 12 : discovered_at: Utc::now().naive_utc(),
888 12 : })
889 : } else {
890 0 : None
891 : };
892 12 : }
893 :
894 24 : let waiting_for_new_lsn_since = if current_lsn < current_commit_lsn {
895 : // Connected safekeeper has more WAL, but we haven't received updates for some time.
896 0 : trace!(
897 0 : "Connected safekeeper has more WAL, but we haven't received updates for {:?}. current_lsn: {}, current_commit_lsn: {}",
898 0 : (now - existing_wal_connection.status.latest_wal_update).to_std(),
899 : current_lsn,
900 : current_commit_lsn
901 : );
902 0 : Some(existing_wal_connection.status.latest_wal_update)
903 : } else {
904 24 : discovered_new_wal.as_ref().map(|new_wal| {
905 24 : // We know that new WAL is available on other safekeeper, but connected safekeeper don't have it.
906 24 : new_wal
907 24 : .discovered_at
908 24 : .max(existing_wal_connection.status.latest_wal_update)
909 24 : })
910 : };
911 :
912 : // If we haven't received any WAL updates for a while and candidate has more WAL, switch to it.
913 24 : if let Some(waiting_for_new_lsn_since) = waiting_for_new_lsn_since {
914 24 : if let Ok(waiting_for_new_wal) = (now - waiting_for_new_lsn_since).to_std() {
915 12 : if candidate_commit_lsn > current_commit_lsn
916 12 : && waiting_for_new_wal > self.conf.lagging_wal_timeout
917 : {
918 12 : return Some(NewWalConnectionCandidate {
919 12 : safekeeper_id: new_sk_id,
920 12 : wal_source_connconf: new_wal_source_connconf,
921 12 : availability_zone: new_availability_zone,
922 12 : reason: ReconnectReason::NoWalTimeout {
923 12 : current_lsn,
924 12 : current_commit_lsn,
925 12 : candidate_commit_lsn,
926 12 : last_wal_interaction: Some(
927 12 : existing_wal_connection.status.latest_wal_update,
928 12 : ),
929 12 : check_time: now,
930 12 : threshold: self.conf.lagging_wal_timeout,
931 12 : },
932 12 : });
933 0 : }
934 12 : }
935 0 : }
936 :
937 12 : self.wal_connection.as_mut().unwrap().discovered_new_wal = discovered_new_wal;
938 : }
939 : None => {
940 36 : let (new_sk_id, new_safekeeper_broker_data, new_wal_source_connconf) =
941 48 : self.select_connection_candidate(None)?;
942 36 : return Some(NewWalConnectionCandidate {
943 36 : safekeeper_id: new_sk_id,
944 36 : availability_zone: new_safekeeper_broker_data.availability_zone.clone(),
945 36 : wal_source_connconf: new_wal_source_connconf,
946 36 : reason: ReconnectReason::NoExistingConnection,
947 36 : });
948 : }
949 : }
950 :
951 12 : None
952 108 : }
953 :
954 : /// Selects the best possible candidate, based on the data collected from the broker updates about the safekeepers.
955 : /// Optionally, omits the given node, to support gracefully switching from a healthy safekeeper to another.
956 : ///
957 : /// The candidate that is chosen:
958 : /// * has no pending retry cooldown
959 : /// * has greatest commit_lsn among the ones that are left
960 108 : fn select_connection_candidate(
961 108 : &self,
962 108 : node_to_omit: Option<NodeId>,
963 108 : ) -> Option<(NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
964 108 : self.applicable_connection_candidates()
965 156 : .filter(|&(sk_id, _, _)| Some(sk_id) != node_to_omit)
966 120 : .max_by_key(|(_, info, _)| info.commit_lsn)
967 108 : }
968 :
969 : /// Returns a list of safekeepers that have valid info and ready for connection.
970 : /// Some safekeepers are filtered by the retry cooldown.
971 108 : fn applicable_connection_candidates(
972 108 : &self,
973 108 : ) -> impl Iterator<Item = (NodeId, &SafekeeperDiscoveryResponse, PgConnectionConfig)> {
974 108 : let now = Utc::now().naive_utc();
975 108 :
976 108 : self.wal_stream_candidates
977 108 : .iter()
978 216 : .filter(|(_, info)| Lsn(info.timeline.commit_lsn) != Lsn::INVALID)
979 192 : .filter(move |(sk_id, _)| {
980 192 : let next_retry_at = self
981 192 : .wal_connection_retries
982 192 : .get(sk_id)
983 192 : .and_then(|retry_info| {
984 12 : retry_info.next_retry_at
985 192 : });
986 192 :
987 192 : next_retry_at.is_none() || next_retry_at.unwrap() <= now
988 192 : }).filter_map(|(sk_id, broker_info)| {
989 180 : let info = &broker_info.timeline;
990 180 : if info.safekeeper_connstr.is_empty() {
991 24 : return None; // no connection string, ignore sk
992 156 : }
993 :
994 156 : let (shard_number, shard_count, shard_stripe_size) = match self.conf.protocol {
995 : PostgresClientProtocol::Vanilla => {
996 156 : (None, None, None)
997 : },
998 : PostgresClientProtocol::Interpreted { .. } => {
999 0 : let shard_identity = self.timeline.get_shard_identity();
1000 0 : (
1001 0 : Some(shard_identity.number.0),
1002 0 : Some(shard_identity.count.0),
1003 0 : Some(shard_identity.stripe_size.0),
1004 0 : )
1005 : }
1006 : };
1007 :
1008 156 : let connection_conf_args = ConnectionConfigArgs {
1009 156 : protocol: self.conf.protocol,
1010 156 : ttid: self.id,
1011 156 : shard_number,
1012 156 : shard_count,
1013 156 : shard_stripe_size,
1014 156 : listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
1015 156 : auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
1016 156 : availability_zone: self.conf.availability_zone.as_deref()
1017 156 : };
1018 156 :
1019 156 : match wal_stream_connection_config(connection_conf_args) {
1020 156 : Ok(connstr) => Some((*sk_id, info, connstr)),
1021 0 : Err(e) => {
1022 0 : error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);
1023 0 : None
1024 : }
1025 : }
1026 180 : })
1027 108 : }
1028 :
1029 : /// Remove candidates which haven't sent broker updates for a while.
1030 108 : fn cleanup_old_candidates(&mut self) {
1031 108 : let mut node_ids_to_remove = Vec::with_capacity(self.wal_stream_candidates.len());
1032 108 : let lagging_wal_timeout = self.conf.lagging_wal_timeout;
1033 108 :
1034 228 : self.wal_stream_candidates.retain(|node_id, broker_info| {
1035 228 : if let Ok(time_since_latest_broker_update) =
1036 228 : (Utc::now().naive_utc() - broker_info.latest_update).to_std()
1037 : {
1038 228 : let should_retain = time_since_latest_broker_update < lagging_wal_timeout;
1039 228 : if !should_retain {
1040 12 : node_ids_to_remove.push(*node_id);
1041 216 : }
1042 228 : should_retain
1043 : } else {
1044 0 : true
1045 : }
1046 228 : });
1047 108 :
1048 108 : if !node_ids_to_remove.is_empty() {
1049 24 : for node_id in node_ids_to_remove {
1050 12 : info!(
1051 0 : "Safekeeper node {node_id} did not send events for over {lagging_wal_timeout:?}, not retrying the connections"
1052 : );
1053 12 : self.wal_connection_retries.remove(&node_id);
1054 12 : WALRECEIVER_CANDIDATES_REMOVED.inc();
1055 : }
1056 96 : }
1057 108 : }
1058 :
1059 : /// # Cancel-Safety
1060 : ///
1061 : /// Not cancellation-safe.
1062 0 : pub(super) async fn shutdown(mut self) {
1063 0 : if let Some(wal_connection) = self.wal_connection.take() {
1064 0 : wal_connection.connection_task.shutdown().await;
1065 0 : }
1066 0 : }
1067 :
1068 0 : fn manager_status(&self) -> ConnectionManagerStatus {
1069 0 : ConnectionManagerStatus {
1070 0 : existing_connection: self.wal_connection.as_ref().map(|conn| conn.status),
1071 0 : wal_stream_candidates: self.wal_stream_candidates.clone(),
1072 0 : }
1073 0 : }
1074 : }
1075 :
1076 : #[derive(Debug)]
1077 : struct NewWalConnectionCandidate {
1078 : safekeeper_id: NodeId,
1079 : wal_source_connconf: PgConnectionConfig,
1080 : availability_zone: Option<String>,
1081 : reason: ReconnectReason,
1082 : }
1083 :
1084 : /// Stores the reason why WAL connection was switched, for furter debugging purposes.
1085 : #[derive(Debug, PartialEq, Eq)]
1086 : enum ReconnectReason {
1087 : NoExistingConnection,
1088 : LaggingWal {
1089 : current_commit_lsn: Lsn,
1090 : new_commit_lsn: Lsn,
1091 : threshold: NonZeroU64,
1092 : },
1093 : SwitchAvailabilityZone,
1094 : NoWalTimeout {
1095 : current_lsn: Lsn,
1096 : current_commit_lsn: Lsn,
1097 : candidate_commit_lsn: Lsn,
1098 : last_wal_interaction: Option<NaiveDateTime>,
1099 : check_time: NaiveDateTime,
1100 : threshold: Duration,
1101 : },
1102 : NoKeepAlives {
1103 : last_keep_alive: Option<NaiveDateTime>,
1104 : check_time: NaiveDateTime,
1105 : threshold: Duration,
1106 : },
1107 : }
1108 :
1109 : impl ReconnectReason {
1110 0 : fn name(&self) -> &str {
1111 0 : match self {
1112 0 : ReconnectReason::NoExistingConnection => "NoExistingConnection",
1113 0 : ReconnectReason::LaggingWal { .. } => "LaggingWal",
1114 0 : ReconnectReason::SwitchAvailabilityZone => "SwitchAvailabilityZone",
1115 0 : ReconnectReason::NoWalTimeout { .. } => "NoWalTimeout",
1116 0 : ReconnectReason::NoKeepAlives { .. } => "NoKeepAlives",
1117 : }
1118 0 : }
1119 : }
1120 :
1121 : #[cfg(test)]
1122 : mod tests {
1123 : use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL;
1124 : use url::Host;
1125 :
1126 : use super::*;
1127 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
1128 :
1129 228 : fn dummy_broker_sk_timeline(
1130 228 : commit_lsn: u64,
1131 228 : safekeeper_connstr: &str,
1132 228 : latest_update: NaiveDateTime,
1133 228 : ) -> BrokerSkTimeline {
1134 228 : BrokerSkTimeline {
1135 228 : timeline: SafekeeperDiscoveryResponse {
1136 228 : safekeeper_id: 0,
1137 228 : tenant_timeline_id: None,
1138 228 : commit_lsn,
1139 228 : safekeeper_connstr: safekeeper_connstr.to_owned(),
1140 228 : availability_zone: None,
1141 228 : standby_horizon: 0,
1142 228 : },
1143 228 : latest_update,
1144 228 : }
1145 228 : }
1146 :
1147 : #[tokio::test]
1148 12 : async fn no_connection_no_candidate() -> anyhow::Result<()> {
1149 12 : let harness = TenantHarness::create("no_connection_no_candidate").await?;
1150 12 : let mut state = dummy_state(&harness).await;
1151 12 : let now = Utc::now().naive_utc();
1152 12 :
1153 12 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
1154 12 : let delay_over_threshold = now - lagging_wal_timeout - lagging_wal_timeout;
1155 12 :
1156 12 : state.wal_connection = None;
1157 12 : state.wal_stream_candidates = HashMap::from([
1158 12 : (NodeId(0), dummy_broker_sk_timeline(1, "", now)),
1159 12 : (NodeId(1), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
1160 12 : (NodeId(2), dummy_broker_sk_timeline(0, "no_commit_lsn", now)),
1161 12 : (
1162 12 : NodeId(3),
1163 12 : dummy_broker_sk_timeline(
1164 12 : 1 + state.conf.max_lsn_wal_lag.get(),
1165 12 : "delay_over_threshold",
1166 12 : delay_over_threshold,
1167 12 : ),
1168 12 : ),
1169 12 : ]);
1170 12 :
1171 12 : let no_candidate = state.next_connection_candidate();
1172 12 : assert!(
1173 12 : no_candidate.is_none(),
1174 12 : "Expected no candidate selected out of non full data options, but got {no_candidate:?}"
1175 12 : );
1176 12 :
1177 12 : Ok(())
1178 12 : }
1179 :
1180 : #[tokio::test]
1181 12 : async fn connection_no_candidate() -> anyhow::Result<()> {
1182 12 : let harness = TenantHarness::create("connection_no_candidate").await?;
1183 12 : let mut state = dummy_state(&harness).await;
1184 12 : let now = Utc::now().naive_utc();
1185 12 :
1186 12 : let connected_sk_id = NodeId(0);
1187 12 : let current_lsn = 100_000;
1188 12 :
1189 12 : let connection_status = WalConnectionStatus {
1190 12 : is_connected: true,
1191 12 : has_processed_wal: true,
1192 12 : latest_connection_update: now,
1193 12 : latest_wal_update: now,
1194 12 : commit_lsn: Some(Lsn(current_lsn)),
1195 12 : streaming_lsn: Some(Lsn(current_lsn)),
1196 12 : node: NodeId(1),
1197 12 : };
1198 12 :
1199 12 : state.conf.max_lsn_wal_lag = NonZeroU64::new(100).unwrap();
1200 12 : state.wal_connection = Some(WalConnection {
1201 12 : started_at: now,
1202 12 : sk_id: connected_sk_id,
1203 12 : availability_zone: None,
1204 12 : status: connection_status,
1205 12 : connection_task: state.spawn(move |sender, _| async move {
1206 12 : sender
1207 12 : .send(TaskStateUpdate::Progress(connection_status))
1208 12 : .ok();
1209 12 : Ok(())
1210 12 : }),
1211 12 : discovered_new_wal: None,
1212 12 : });
1213 12 : state.wal_stream_candidates = HashMap::from([
1214 12 : (
1215 12 : connected_sk_id,
1216 12 : dummy_broker_sk_timeline(
1217 12 : current_lsn + state.conf.max_lsn_wal_lag.get() * 2,
1218 12 : DUMMY_SAFEKEEPER_HOST,
1219 12 : now,
1220 12 : ),
1221 12 : ),
1222 12 : (
1223 12 : NodeId(1),
1224 12 : dummy_broker_sk_timeline(current_lsn, "not_advanced_lsn", now),
1225 12 : ),
1226 12 : (
1227 12 : NodeId(2),
1228 12 : dummy_broker_sk_timeline(
1229 12 : current_lsn + state.conf.max_lsn_wal_lag.get() / 2,
1230 12 : "not_enough_advanced_lsn",
1231 12 : now,
1232 12 : ),
1233 12 : ),
1234 12 : ]);
1235 12 :
1236 12 : let no_candidate = state.next_connection_candidate();
1237 12 : assert!(
1238 12 : no_candidate.is_none(),
1239 12 : "Expected no candidate selected out of valid options since candidate Lsn data is ignored and others' was not advanced enough, but got {no_candidate:?}"
1240 12 : );
1241 12 :
1242 12 : Ok(())
1243 12 : }
1244 :
1245 : #[tokio::test]
1246 12 : async fn no_connection_candidate() -> anyhow::Result<()> {
1247 12 : let harness = TenantHarness::create("no_connection_candidate").await?;
1248 12 : let mut state = dummy_state(&harness).await;
1249 12 : let now = Utc::now().naive_utc();
1250 12 :
1251 12 : state.wal_connection = None;
1252 12 : state.wal_stream_candidates = HashMap::from([(
1253 12 : NodeId(0),
1254 12 : dummy_broker_sk_timeline(
1255 12 : 1 + state.conf.max_lsn_wal_lag.get(),
1256 12 : DUMMY_SAFEKEEPER_HOST,
1257 12 : now,
1258 12 : ),
1259 12 : )]);
1260 12 :
1261 12 : let only_candidate = state
1262 12 : .next_connection_candidate()
1263 12 : .expect("Expected one candidate selected out of the only data option, but got none");
1264 12 : assert_eq!(only_candidate.safekeeper_id, NodeId(0));
1265 12 : assert_eq!(
1266 12 : only_candidate.reason,
1267 12 : ReconnectReason::NoExistingConnection,
1268 12 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1269 12 : );
1270 12 : assert_eq!(
1271 12 : only_candidate.wal_source_connconf.host(),
1272 12 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1273 12 : );
1274 12 :
1275 12 : let selected_lsn = 100_000;
1276 12 : state.wal_stream_candidates = HashMap::from([
1277 12 : (
1278 12 : NodeId(0),
1279 12 : dummy_broker_sk_timeline(selected_lsn - 100, "smaller_commit_lsn", now),
1280 12 : ),
1281 12 : (
1282 12 : NodeId(1),
1283 12 : dummy_broker_sk_timeline(selected_lsn, DUMMY_SAFEKEEPER_HOST, now),
1284 12 : ),
1285 12 : (
1286 12 : NodeId(2),
1287 12 : dummy_broker_sk_timeline(selected_lsn + 100, "", now),
1288 12 : ),
1289 12 : ]);
1290 12 : let biggest_wal_candidate = state.next_connection_candidate().expect(
1291 12 : "Expected one candidate selected out of multiple valid data options, but got none",
1292 12 : );
1293 12 :
1294 12 : assert_eq!(biggest_wal_candidate.safekeeper_id, NodeId(1));
1295 12 : assert_eq!(
1296 12 : biggest_wal_candidate.reason,
1297 12 : ReconnectReason::NoExistingConnection,
1298 12 : "Should select new safekeeper due to missing connection, even if there's also a lag in the wal over the threshold"
1299 12 : );
1300 12 : assert_eq!(
1301 12 : biggest_wal_candidate.wal_source_connconf.host(),
1302 12 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1303 12 : );
1304 12 :
1305 12 : Ok(())
1306 12 : }
1307 :
1308 : #[tokio::test]
1309 12 : async fn candidate_with_many_connection_failures() -> anyhow::Result<()> {
1310 12 : let harness = TenantHarness::create("candidate_with_many_connection_failures").await?;
1311 12 : let mut state = dummy_state(&harness).await;
1312 12 : let now = Utc::now().naive_utc();
1313 12 :
1314 12 : let current_lsn = Lsn(100_000).align();
1315 12 : let bigger_lsn = Lsn(current_lsn.0 + 100).align();
1316 12 :
1317 12 : state.wal_connection = None;
1318 12 : state.wal_stream_candidates = HashMap::from([
1319 12 : (
1320 12 : NodeId(0),
1321 12 : dummy_broker_sk_timeline(bigger_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1322 12 : ),
1323 12 : (
1324 12 : NodeId(1),
1325 12 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1326 12 : ),
1327 12 : ]);
1328 12 : state.wal_connection_retries = HashMap::from([(
1329 12 : NodeId(0),
1330 12 : RetryInfo {
1331 12 : next_retry_at: now.checked_add_signed(chrono::Duration::hours(1)),
1332 12 : retry_duration_seconds: WALCONNECTION_RETRY_MAX_BACKOFF_SECONDS,
1333 12 : },
1334 12 : )]);
1335 12 :
1336 12 : let candidate_with_less_errors = state
1337 12 : .next_connection_candidate()
1338 12 : .expect("Expected one candidate selected, but got none");
1339 12 : assert_eq!(
1340 12 : candidate_with_less_errors.safekeeper_id,
1341 12 : NodeId(1),
1342 12 : "Should select the node with no pending retry cooldown"
1343 12 : );
1344 12 :
1345 12 : Ok(())
1346 12 : }
1347 :
1348 : #[tokio::test]
1349 12 : async fn lsn_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1350 12 : let harness = TenantHarness::create("lsn_wal_over_threshcurrent_candidate").await?;
1351 12 : let mut state = dummy_state(&harness).await;
1352 12 : let current_lsn = Lsn(100_000).align();
1353 12 : let now = Utc::now().naive_utc();
1354 12 :
1355 12 : let connected_sk_id = NodeId(0);
1356 12 : let new_lsn = Lsn(current_lsn.0 + state.conf.max_lsn_wal_lag.get() + 1);
1357 12 :
1358 12 : let connection_status = WalConnectionStatus {
1359 12 : is_connected: true,
1360 12 : has_processed_wal: true,
1361 12 : latest_connection_update: now,
1362 12 : latest_wal_update: now,
1363 12 : commit_lsn: Some(current_lsn),
1364 12 : streaming_lsn: Some(current_lsn),
1365 12 : node: connected_sk_id,
1366 12 : };
1367 12 :
1368 12 : state.wal_connection = Some(WalConnection {
1369 12 : started_at: now,
1370 12 : sk_id: connected_sk_id,
1371 12 : availability_zone: None,
1372 12 : status: connection_status,
1373 12 : connection_task: state.spawn(move |sender, _| async move {
1374 12 : sender
1375 12 : .send(TaskStateUpdate::Progress(connection_status))
1376 12 : .ok();
1377 12 : Ok(())
1378 12 : }),
1379 12 : discovered_new_wal: None,
1380 12 : });
1381 12 : state.wal_stream_candidates = HashMap::from([
1382 12 : (
1383 12 : connected_sk_id,
1384 12 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1385 12 : ),
1386 12 : (
1387 12 : NodeId(1),
1388 12 : dummy_broker_sk_timeline(new_lsn.0, "advanced_by_lsn_safekeeper", now),
1389 12 : ),
1390 12 : ]);
1391 12 :
1392 12 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1393 12 : "Expected one candidate selected out of multiple valid data options, but got none",
1394 12 : );
1395 12 :
1396 12 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(1));
1397 12 : assert_eq!(
1398 12 : over_threshcurrent_candidate.reason,
1399 12 : ReconnectReason::LaggingWal {
1400 12 : current_commit_lsn: current_lsn,
1401 12 : new_commit_lsn: new_lsn,
1402 12 : threshold: state.conf.max_lsn_wal_lag
1403 12 : },
1404 12 : "Should select bigger WAL safekeeper if it starts to lag enough"
1405 12 : );
1406 12 : assert_eq!(
1407 12 : over_threshcurrent_candidate.wal_source_connconf.host(),
1408 12 : &Host::Domain("advanced_by_lsn_safekeeper".to_owned())
1409 12 : );
1410 12 :
1411 12 : Ok(())
1412 12 : }
1413 :
1414 : #[tokio::test]
1415 12 : async fn timeout_connection_threshold_current_candidate() -> anyhow::Result<()> {
1416 12 : let harness =
1417 12 : TenantHarness::create("timeout_connection_threshold_current_candidate").await?;
1418 12 : let mut state = dummy_state(&harness).await;
1419 12 : let current_lsn = Lsn(100_000).align();
1420 12 : let now = Utc::now().naive_utc();
1421 12 :
1422 12 : let wal_connect_timeout = chrono::Duration::from_std(state.conf.wal_connect_timeout)?;
1423 12 : let time_over_threshold =
1424 12 : Utc::now().naive_utc() - wal_connect_timeout - wal_connect_timeout;
1425 12 :
1426 12 : let connection_status = WalConnectionStatus {
1427 12 : is_connected: true,
1428 12 : has_processed_wal: true,
1429 12 : latest_connection_update: time_over_threshold,
1430 12 : latest_wal_update: time_over_threshold,
1431 12 : commit_lsn: Some(current_lsn),
1432 12 : streaming_lsn: Some(current_lsn),
1433 12 : node: NodeId(1),
1434 12 : };
1435 12 :
1436 12 : state.wal_connection = Some(WalConnection {
1437 12 : started_at: now,
1438 12 : sk_id: NodeId(1),
1439 12 : availability_zone: None,
1440 12 : status: connection_status,
1441 12 : connection_task: state.spawn(move |sender, _| async move {
1442 12 : sender
1443 12 : .send(TaskStateUpdate::Progress(connection_status))
1444 12 : .ok();
1445 12 : Ok(())
1446 12 : }),
1447 12 : discovered_new_wal: None,
1448 12 : });
1449 12 : state.wal_stream_candidates = HashMap::from([(
1450 12 : NodeId(0),
1451 12 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1452 12 : )]);
1453 12 :
1454 12 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1455 12 : "Expected one candidate selected out of multiple valid data options, but got none",
1456 12 : );
1457 12 :
1458 12 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1459 12 : match over_threshcurrent_candidate.reason {
1460 12 : ReconnectReason::NoKeepAlives {
1461 12 : last_keep_alive,
1462 12 : threshold,
1463 12 : ..
1464 12 : } => {
1465 12 : assert_eq!(last_keep_alive, Some(time_over_threshold));
1466 12 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1467 12 : }
1468 12 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1469 12 : }
1470 12 : assert_eq!(
1471 12 : over_threshcurrent_candidate.wal_source_connconf.host(),
1472 12 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1473 12 : );
1474 12 :
1475 12 : Ok(())
1476 12 : }
1477 :
1478 : #[tokio::test]
1479 12 : async fn timeout_wal_over_threshold_current_candidate() -> anyhow::Result<()> {
1480 12 : let harness = TenantHarness::create("timeout_wal_over_threshold_current_candidate").await?;
1481 12 : let mut state = dummy_state(&harness).await;
1482 12 : let current_lsn = Lsn(100_000).align();
1483 12 : let new_lsn = Lsn(100_100).align();
1484 12 : let now = Utc::now().naive_utc();
1485 12 :
1486 12 : let lagging_wal_timeout = chrono::Duration::from_std(state.conf.lagging_wal_timeout)?;
1487 12 : let time_over_threshold =
1488 12 : Utc::now().naive_utc() - lagging_wal_timeout - lagging_wal_timeout;
1489 12 :
1490 12 : let connection_status = WalConnectionStatus {
1491 12 : is_connected: true,
1492 12 : has_processed_wal: true,
1493 12 : latest_connection_update: now,
1494 12 : latest_wal_update: time_over_threshold,
1495 12 : commit_lsn: Some(current_lsn),
1496 12 : streaming_lsn: Some(current_lsn),
1497 12 : node: NodeId(1),
1498 12 : };
1499 12 :
1500 12 : state.wal_connection = Some(WalConnection {
1501 12 : started_at: now,
1502 12 : sk_id: NodeId(1),
1503 12 : availability_zone: None,
1504 12 : status: connection_status,
1505 12 : connection_task: state.spawn(move |_, _| async move { Ok(()) }),
1506 12 : discovered_new_wal: Some(NewCommittedWAL {
1507 12 : discovered_at: time_over_threshold,
1508 12 : lsn: new_lsn,
1509 12 : }),
1510 12 : });
1511 12 : state.wal_stream_candidates = HashMap::from([(
1512 12 : NodeId(0),
1513 12 : dummy_broker_sk_timeline(new_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1514 12 : )]);
1515 12 :
1516 12 : let over_threshcurrent_candidate = state.next_connection_candidate().expect(
1517 12 : "Expected one candidate selected out of multiple valid data options, but got none",
1518 12 : );
1519 12 :
1520 12 : assert_eq!(over_threshcurrent_candidate.safekeeper_id, NodeId(0));
1521 12 : match over_threshcurrent_candidate.reason {
1522 12 : ReconnectReason::NoWalTimeout {
1523 12 : current_lsn,
1524 12 : current_commit_lsn,
1525 12 : candidate_commit_lsn,
1526 12 : last_wal_interaction,
1527 12 : threshold,
1528 12 : ..
1529 12 : } => {
1530 12 : assert_eq!(current_lsn, current_lsn);
1531 12 : assert_eq!(current_commit_lsn, current_lsn);
1532 12 : assert_eq!(candidate_commit_lsn, new_lsn);
1533 12 : assert_eq!(last_wal_interaction, Some(time_over_threshold));
1534 12 : assert_eq!(threshold, state.conf.lagging_wal_timeout);
1535 12 : }
1536 12 : unexpected => panic!("Unexpected reason: {unexpected:?}"),
1537 12 : }
1538 12 : assert_eq!(
1539 12 : over_threshcurrent_candidate.wal_source_connconf.host(),
1540 12 : &Host::Domain(DUMMY_SAFEKEEPER_HOST.to_owned())
1541 12 : );
1542 12 :
1543 12 : Ok(())
1544 12 : }
1545 :
1546 : const DUMMY_SAFEKEEPER_HOST: &str = "safekeeper_connstr";
1547 :
1548 96 : async fn dummy_state(harness: &TenantHarness) -> ConnectionManagerState {
1549 96 : let (tenant, ctx) = harness.load().await;
1550 96 : let timeline = tenant
1551 96 : .create_test_timeline(TIMELINE_ID, Lsn(0x8), crate::DEFAULT_PG_VERSION, &ctx)
1552 96 : .await
1553 96 : .expect("Failed to create an empty timeline for dummy wal connection manager");
1554 96 :
1555 96 : ConnectionManagerState {
1556 96 : id: TenantTimelineId {
1557 96 : tenant_id: harness.tenant_shard_id.tenant_id,
1558 96 : timeline_id: TIMELINE_ID,
1559 96 : },
1560 96 : timeline,
1561 96 : cancel: CancellationToken::new(),
1562 96 : conf: WalReceiverConf {
1563 96 : protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
1564 96 : wal_connect_timeout: Duration::from_secs(1),
1565 96 : lagging_wal_timeout: Duration::from_secs(1),
1566 96 : max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
1567 96 : auth_token: None,
1568 96 : availability_zone: None,
1569 96 : ingest_batch_size: 1,
1570 96 : validate_wal_contiguity: false,
1571 96 : },
1572 96 : wal_connection: None,
1573 96 : wal_stream_candidates: HashMap::new(),
1574 96 : wal_connection_retries: HashMap::new(),
1575 96 : }
1576 96 : }
1577 :
1578 : #[tokio::test]
1579 12 : async fn switch_to_same_availability_zone() -> anyhow::Result<()> {
1580 12 : // Pageserver and one of safekeepers will be in the same availability zone
1581 12 : // and pageserver should prefer to connect to it.
1582 12 : let test_az = Some("test_az".to_owned());
1583 12 :
1584 12 : let harness = TenantHarness::create("switch_to_same_availability_zone").await?;
1585 12 : let mut state = dummy_state(&harness).await;
1586 12 : state.conf.availability_zone.clone_from(&test_az);
1587 12 : let current_lsn = Lsn(100_000).align();
1588 12 : let now = Utc::now().naive_utc();
1589 12 :
1590 12 : let connected_sk_id = NodeId(0);
1591 12 :
1592 12 : let connection_status = WalConnectionStatus {
1593 12 : is_connected: true,
1594 12 : has_processed_wal: true,
1595 12 : latest_connection_update: now,
1596 12 : latest_wal_update: now,
1597 12 : commit_lsn: Some(current_lsn),
1598 12 : streaming_lsn: Some(current_lsn),
1599 12 : node: connected_sk_id,
1600 12 : };
1601 12 :
1602 12 : state.wal_connection = Some(WalConnection {
1603 12 : started_at: now,
1604 12 : sk_id: connected_sk_id,
1605 12 : availability_zone: None,
1606 12 : status: connection_status,
1607 12 : connection_task: state.spawn(move |sender, _| async move {
1608 12 : sender
1609 12 : .send(TaskStateUpdate::Progress(connection_status))
1610 12 : .ok();
1611 12 : Ok(())
1612 12 : }),
1613 12 : discovered_new_wal: None,
1614 12 : });
1615 12 :
1616 12 : // We have another safekeeper with the same commit_lsn, and it have the same availability zone as
1617 12 : // the current pageserver.
1618 12 : let mut same_az_sk = dummy_broker_sk_timeline(current_lsn.0, "same_az", now);
1619 12 : same_az_sk.timeline.availability_zone.clone_from(&test_az);
1620 12 :
1621 12 : state.wal_stream_candidates = HashMap::from([
1622 12 : (
1623 12 : connected_sk_id,
1624 12 : dummy_broker_sk_timeline(current_lsn.0, DUMMY_SAFEKEEPER_HOST, now),
1625 12 : ),
1626 12 : (NodeId(1), same_az_sk),
1627 12 : ]);
1628 12 :
1629 12 : // We expect that pageserver will switch to the safekeeper in the same availability zone,
1630 12 : // even if it has the same commit_lsn.
1631 12 : let next_candidate = state.next_connection_candidate().expect(
1632 12 : "Expected one candidate selected out of multiple valid data options, but got none",
1633 12 : );
1634 12 :
1635 12 : assert_eq!(next_candidate.safekeeper_id, NodeId(1));
1636 12 : assert_eq!(
1637 12 : next_candidate.reason,
1638 12 : ReconnectReason::SwitchAvailabilityZone,
1639 12 : "Should switch to the safekeeper in the same availability zone, if it has the same commit_lsn"
1640 12 : );
1641 12 : assert_eq!(
1642 12 : next_candidate.wal_source_connconf.host(),
1643 12 : &Host::Domain("same_az".to_owned())
1644 12 : );
1645 12 :
1646 12 : Ok(())
1647 12 : }
1648 : }
|