LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - walreceiver.rs (source / functions) Coverage Total Hit
Test: 472031e0b71f3195f7f21b1f2b20de09fd07bb56.info Lines: 20.7 % 111 23
Test Date: 2025-05-26 10:37:33 Functions: 52.6 % 19 10

            Line data    Source code
       1              : //! WAL receiver manages an open connection to safekeeper, to get the WAL it streams into.
       2              : //! To do so, a current implementation needs to do the following:
       3              : //!
       4              : //! * acknowledge the timelines that it needs to stream WAL into.
       5              : //!   Pageserver is able to dynamically (un)load tenants on attach and detach,
       6              : //!   hence WAL receiver needs to react on such events.
       7              : //!
       8              : //! * get a broker subscription, stream data from it to determine that a timeline needs WAL streaming.
       9              : //!   For that, it watches specific keys in storage_broker and pulls the relevant data periodically.
      10              : //!   The data is produced by safekeepers, that push it periodically and pull it to synchronize between each other.
      11              : //!   Without this data, no WAL streaming is possible currently.
      12              : //!
      13              : //! Only one active WAL streaming connection is allowed at a time.
      14              : //! The connection is supposed to be updated periodically, based on safekeeper timeline data.
      15              : //!
      16              : //! * handle the actual connection and WAL streaming
      17              : //!
      18              : //! Handling happens dynamically, by portions of WAL being processed and registered in the server.
      19              : //! Along with the registration, certain metadata is written to show WAL streaming progress and rely on that when considering safekeepers for connection.
      20              : //!
      21              : //! The current module contains high-level primitives used in the submodules; general synchronization, timeline acknowledgement and shutdown logic.
      22              : 
      23              : mod connection_manager;
      24              : mod walreceiver_connection;
      25              : 
      26              : use std::future::Future;
      27              : use std::num::NonZeroU64;
      28              : use std::sync::Arc;
      29              : use std::time::Duration;
      30              : 
      31              : use storage_broker::BrokerClientChannel;
      32              : use tokio::sync::watch;
      33              : use tokio_util::sync::CancellationToken;
      34              : use tracing::*;
      35              : use utils::postgres_client::PostgresClientProtocol;
      36              : 
      37              : use self::connection_manager::ConnectionManagerStatus;
      38              : use super::Timeline;
      39              : use crate::context::{DownloadBehavior, RequestContext};
      40              : use crate::task_mgr::{TaskKind, WALRECEIVER_RUNTIME};
      41              : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
      42              : use crate::tenant::timeline::walreceiver::connection_manager::{
      43              :     ConnectionManagerState, connection_manager_loop_step,
      44              : };
      45              : 
      46              : #[derive(Clone)]
      47              : pub struct WalReceiverConf {
      48              :     pub protocol: PostgresClientProtocol,
      49              :     /// The timeout on the connection to safekeeper for WAL streaming.
      50              :     pub wal_connect_timeout: Duration,
      51              :     /// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
      52              :     pub lagging_wal_timeout: Duration,
      53              :     /// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one.
      54              :     pub max_lsn_wal_lag: NonZeroU64,
      55              :     pub auth_token: Option<Arc<String>>,
      56              :     pub availability_zone: Option<String>,
      57              :     pub ingest_batch_size: u64,
      58              :     pub validate_wal_contiguity: bool,
      59              : }
      60              : 
      61              : pub struct WalReceiver {
      62              :     manager_status: Arc<std::sync::RwLock<Option<ConnectionManagerStatus>>>,
      63              :     /// All task spawned by [`WalReceiver::start`] and its children are sensitive to this token.
      64              :     /// It's a child token of [`Timeline`] so that timeline shutdown can cancel WalReceiver tasks early for `freeze_and_flush=true`.
      65              :     cancel: CancellationToken,
      66              :     task: tokio::task::JoinHandle<()>,
      67              : }
      68              : 
      69              : impl WalReceiver {
      70            0 :     pub fn start(
      71            0 :         timeline: Arc<Timeline>,
      72            0 :         conf: WalReceiverConf,
      73            0 :         mut broker_client: BrokerClientChannel,
      74            0 :         ctx: &RequestContext,
      75            0 :     ) -> Self {
      76            0 :         let tenant_shard_id = timeline.tenant_shard_id;
      77            0 :         let timeline_id = timeline.timeline_id;
      78            0 :         let walreceiver_ctx =
      79            0 :             ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
      80            0 :         let loop_status = Arc::new(std::sync::RwLock::new(None));
      81            0 :         let manager_status = Arc::clone(&loop_status);
      82            0 :         let cancel = timeline.cancel.child_token();
      83            0 :         let task = WALRECEIVER_RUNTIME.spawn({
      84            0 :             let cancel = cancel.clone();
      85            0 :             async move {
      86            0 :                 debug_assert_current_span_has_tenant_and_timeline_id();
      87              :                 // acquire timeline gate so we know the task doesn't outlive the Timeline
      88            0 :                 let Ok(_guard) = timeline.gate.enter() else {
      89            0 :                     debug!("WAL receiver manager could not enter the gate timeline gate, it's closed already");
      90            0 :                     return;
      91              :                 };
      92            0 :                 debug!("WAL receiver manager started, connecting to broker");
      93            0 :                 let mut connection_manager_state = ConnectionManagerState::new(
      94            0 :                     timeline,
      95            0 :                     conf,
      96            0 :                     cancel.clone(),
      97            0 :                 );
      98            0 :                 while !cancel.is_cancelled() {
      99            0 :                     let loop_step_result = connection_manager_loop_step(
     100            0 :                         &mut broker_client,
     101            0 :                         &mut connection_manager_state,
     102            0 :                         &walreceiver_ctx,
     103            0 :                         &cancel,
     104            0 :                         &loop_status,
     105            0 :                     ).await;
     106            0 :                     match loop_step_result {
     107            0 :                         Ok(()) => continue,
     108            0 :                         Err(_cancelled) => {
     109            0 :                             trace!("Connection manager loop ended, shutting down");
     110            0 :                             break;
     111              :                         }
     112              :                     }
     113              :                 }
     114            0 :                 connection_manager_state.shutdown().await;
     115            0 :                 *loop_status.write().unwrap() = None;
     116            0 :                 debug!("task exits");
     117            0 :             }
     118            0 :             .instrument(info_span!(parent: None, "wal_connection_manager", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), timeline_id = %timeline_id))
     119              :         });
     120              : 
     121            0 :         Self {
     122            0 :             manager_status,
     123            0 :             cancel,
     124            0 :             task,
     125            0 :         }
     126            0 :     }
     127              : 
     128              :     #[instrument(skip_all, level = tracing::Level::DEBUG)]
     129              :     pub async fn shutdown(self) {
     130              :         debug_assert_current_span_has_tenant_and_timeline_id();
     131              :         debug!("cancelling walreceiver tasks");
     132              :         self.cancel.cancel();
     133              :         match self.task.await {
     134              :             Ok(()) => debug!("Shutdown success"),
     135              :             Err(je) if je.is_cancelled() => unreachable!("not used"),
     136              :             Err(je) if je.is_panic() => {
     137              :                 // already logged by panic hook
     138              :             }
     139              :             Err(je) => {
     140              :                 error!("shutdown walreceiver task join error: {je}")
     141              :             }
     142              :         }
     143              :     }
     144              : 
     145            0 :     pub(crate) fn status(&self) -> Option<ConnectionManagerStatus> {
     146            0 :         self.manager_status.read().unwrap().clone()
     147            0 :     }
     148              : }
     149              : 
     150              : /// A handle of an asynchronous task.
     151              : /// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`]
     152              : /// and a cancellation token that it can listen to for earlier interrupts.
     153              : ///
     154              : /// Note that the communication happens via the `watch` channel, that does not accumulate the events, replacing the old one with the never one on submission.
     155              : /// That may lead to certain events not being observed by the listener.
     156              : #[derive(Debug)]
     157              : struct TaskHandle<E> {
     158              :     join_handle: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
     159              :     events_receiver: watch::Receiver<TaskStateUpdate<E>>,
     160              :     cancellation: CancellationToken,
     161              : }
     162              : 
     163              : enum TaskEvent<E> {
     164              :     Update(TaskStateUpdate<E>),
     165              :     End(anyhow::Result<()>),
     166              : }
     167              : 
     168              : #[derive(Debug, Clone)]
     169              : enum TaskStateUpdate<E> {
     170              :     Started,
     171              :     Progress(E),
     172              : }
     173              : 
     174              : impl<E: Clone> TaskHandle<E> {
     175              :     /// Initializes the task, starting it immediately after the creation.
     176              :     ///
     177              :     /// The second argument to `task` is a child token of `cancel_parent` ([`CancellationToken::child_token`]).
     178              :     /// It being a child token enables us to provide a [`Self::shutdown`] method.
     179            5 :     fn spawn<Fut>(
     180            5 :         cancel_parent: &CancellationToken,
     181            5 :         task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, CancellationToken) -> Fut + Send + 'static,
     182            5 :     ) -> Self
     183            5 :     where
     184            5 :         Fut: Future<Output = anyhow::Result<()>> + Send,
     185            5 :         E: Send + Sync + 'static,
     186            5 :     {
     187            5 :         let cancellation = cancel_parent.child_token();
     188            5 :         let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started);
     189            5 : 
     190            5 :         let cancellation_clone = cancellation.clone();
     191            5 :         let join_handle = WALRECEIVER_RUNTIME.spawn(async move {
     192            5 :             events_sender.send(TaskStateUpdate::Started).ok();
     193            5 :             task(events_sender, cancellation_clone).await
     194              :             // events_sender is dropped at some point during the .await above.
     195              :             // But the task is still running on WALRECEIVER_RUNTIME.
     196              :             // That is the window when `!jh.is_finished()`
     197              :             // is true inside `fn next_task_event()` below.
     198            5 :         });
     199            5 : 
     200            5 :         TaskHandle {
     201            5 :             join_handle: Some(join_handle),
     202            5 :             events_receiver,
     203            5 :             cancellation,
     204            5 :         }
     205            5 :     }
     206              : 
     207              :     /// # Cancel-Safety
     208              :     ///
     209              :     /// Cancellation-safe.
     210            0 :     async fn next_task_event(&mut self) -> TaskEvent<E> {
     211            0 :         match self.events_receiver.changed().await {
     212            0 :             Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
     213            0 :             Err(_task_channel_part_dropped) => {
     214            0 :                 TaskEvent::End(match self.join_handle.as_mut() {
     215            0 :                     Some(jh) => {
     216            0 :                         if !jh.is_finished() {
     217              :                             // See: https://github.com/neondatabase/neon/issues/2885
     218            0 :                             trace!("sender is dropped while join handle is still alive");
     219            0 :                         }
     220              : 
     221            0 :                         let res = match jh.await {
     222            0 :                             Ok(res) => res,
     223            0 :                             Err(je) if je.is_cancelled() => unreachable!("not used"),
     224            0 :                             Err(je) if je.is_panic() => {
     225            0 :                                 // already logged
     226            0 :                                 Ok(())
     227              :                             }
     228            0 :                             Err(je) => Err(anyhow::Error::new(je).context("join walreceiver task")),
     229              :                         };
     230              : 
     231              :                         // For cancellation-safety, drop join_handle only after successful .await.
     232            0 :                         self.join_handle = None;
     233            0 : 
     234            0 :                         res
     235              :                     }
     236              :                     None => {
     237              :                         // Another option is to have an enum, join handle or result and give away the reference to it
     238            0 :                         Err(anyhow::anyhow!("Task was joined more than once"))
     239              :                     }
     240              :                 })
     241              :             }
     242              :         }
     243            0 :     }
     244              : 
     245              :     /// Aborts current task, waiting for it to finish.
     246            0 :     async fn shutdown(self) {
     247            0 :         if let Some(jh) = self.join_handle {
     248            0 :             self.cancellation.cancel();
     249            0 :             match jh.await {
     250            0 :                 Ok(Ok(())) => debug!("Shutdown success"),
     251            0 :                 Ok(Err(e)) => error!("Shutdown task error: {e:?}"),
     252            0 :                 Err(je) if je.is_cancelled() => unreachable!("not used"),
     253            0 :                 Err(je) if je.is_panic() => {
     254            0 :                     // already logged
     255            0 :                 }
     256            0 :                 Err(je) => {
     257            0 :                     error!("Shutdown task join error: {je}")
     258              :                 }
     259              :             }
     260            0 :         }
     261            0 :     }
     262              : }
        

Generated by: LCOV version 2.1-beta