LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant/timeline - walreceiver.rs (source / functions) Coverage Total Hit UBC CBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 82.0 % 111 91 20 91
Current Date: 2023-10-19 02:04:12 Functions: 69.7 % 33 23 10 23
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! WAL receiver manages an open connection to safekeeper, to get the WAL it streams into.
       2                 : //! To do so, a current implementation needs to do the following:
       3                 : //!
       4                 : //! * acknowledge the timelines that it needs to stream WAL into.
       5                 : //! Pageserver is able to dynamically (un)load tenants on attach and detach,
       6                 : //! hence WAL receiver needs to react on such events.
       7                 : //!
       8                 : //! * get a broker subscription, stream data from it to determine that a timeline needs WAL streaming.
       9                 : //! For that, it watches specific keys in storage_broker and pulls the relevant data periodically.
      10                 : //! The data is produced by safekeepers, that push it periodically and pull it to synchronize between each other.
      11                 : //! Without this data, no WAL streaming is possible currently.
      12                 : //!
      13                 : //! Only one active WAL streaming connection is allowed at a time.
      14                 : //! The connection is supposed to be updated periodically, based on safekeeper timeline data.
      15                 : //!
      16                 : //! * handle the actual connection and WAL streaming
      17                 : //!
      18                 : //! Handling happens dynamically, by portions of WAL being processed and registered in the server.
      19                 : //! Along with the registration, certain metadata is written to show WAL streaming progress and rely on that when considering safekeepers for connection.
      20                 : //!
      21                 : //! The current module contains high-level primitives used in the submodules; general synchronization, timeline acknowledgement and shutdown logic.
      22                 : 
      23                 : mod connection_manager;
      24                 : mod walreceiver_connection;
      25                 : 
      26                 : use crate::context::{DownloadBehavior, RequestContext};
      27                 : use crate::task_mgr::{self, TaskKind, WALRECEIVER_RUNTIME};
      28                 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
      29                 : use crate::tenant::timeline::walreceiver::connection_manager::{
      30                 :     connection_manager_loop_step, ConnectionManagerState,
      31                 : };
      32                 : 
      33                 : use std::future::Future;
      34                 : use std::num::NonZeroU64;
      35                 : use std::ops::ControlFlow;
      36                 : use std::sync::Arc;
      37                 : use std::time::Duration;
      38                 : use storage_broker::BrokerClientChannel;
      39                 : use tokio::select;
      40                 : use tokio::sync::watch;
      41                 : use tokio_util::sync::CancellationToken;
      42                 : use tracing::*;
      43                 : 
      44                 : use utils::id::TenantTimelineId;
      45                 : 
      46                 : use self::connection_manager::ConnectionManagerStatus;
      47                 : 
      48                 : use super::Timeline;
      49                 : 
      50 UBC           0 : #[derive(Clone)]
      51                 : pub struct WalReceiverConf {
      52                 :     /// The timeout on the connection to safekeeper for WAL streaming.
      53                 :     pub wal_connect_timeout: Duration,
      54                 :     /// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
      55                 :     pub lagging_wal_timeout: Duration,
      56                 :     /// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one.
      57                 :     pub max_lsn_wal_lag: NonZeroU64,
      58                 :     pub auth_token: Option<Arc<String>>,
      59                 :     pub availability_zone: Option<String>,
      60                 : }
      61                 : 
      62                 : pub struct WalReceiver {
      63                 :     timeline: TenantTimelineId,
      64                 :     manager_status: Arc<std::sync::RwLock<Option<ConnectionManagerStatus>>>,
      65                 : }
      66                 : 
      67                 : impl WalReceiver {
      68 CBC        1103 :     pub fn start(
      69            1103 :         timeline: Arc<Timeline>,
      70            1103 :         conf: WalReceiverConf,
      71            1103 :         mut broker_client: BrokerClientChannel,
      72            1103 :         ctx: &RequestContext,
      73            1103 :     ) -> Self {
      74            1103 :         let tenant_id = timeline.tenant_id;
      75            1103 :         let timeline_id = timeline.timeline_id;
      76            1103 :         let walreceiver_ctx =
      77            1103 :             ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
      78            1103 : 
      79            1103 :         let loop_status = Arc::new(std::sync::RwLock::new(None));
      80            1103 :         let manager_status = Arc::clone(&loop_status);
      81            1103 :         task_mgr::spawn(
      82            1103 :             WALRECEIVER_RUNTIME.handle(),
      83            1103 :             TaskKind::WalReceiverManager,
      84            1103 :             Some(tenant_id),
      85            1103 :             Some(timeline_id),
      86            1103 :             &format!("walreceiver for timeline {tenant_id}/{timeline_id}"),
      87                 :             false,
      88            1103 :             async move {
      89            1103 :                 debug_assert_current_span_has_tenant_and_timeline_id();
      90            1103 :                 debug!("WAL receiver manager started, connecting to broker");
      91            1103 :                 let mut connection_manager_state = ConnectionManagerState::new(
      92            1103 :                     timeline,
      93            1103 :                     conf,
      94            1103 :                 );
      95                 :                 loop {
      96            1103 :                     select! {
      97                 :                         _ = task_mgr::shutdown_watcher() => {
      98 UBC           0 :                             trace!("WAL receiver shutdown requested, shutting down");
      99                 :                             break;
     100                 :                         },
     101 CBC         343 :                         loop_step_result = connection_manager_loop_step(
     102                 :                             &mut broker_client,
     103                 :                             &mut connection_manager_state,
     104                 :                             &walreceiver_ctx,
     105                 :                             &loop_status,
     106                 :                         ) => match loop_step_result {
     107                 :                             ControlFlow::Continue(()) => continue,
     108                 :                             ControlFlow::Break(()) => {
     109 UBC           0 :                                 trace!("Connection manager loop ended, shutting down");
     110                 :                                 break;
     111                 :                             }
     112                 :                         },
     113                 :                     }
     114                 :                 }
     115                 : 
     116 CBC         415 :                 connection_manager_state.shutdown().await;
     117             415 :                 *loop_status.write().unwrap() = None;
     118             415 :                 Ok(())
     119             415 :             }
     120            1103 :             .instrument(info_span!(parent: None, "wal_connection_manager", tenant_id = %tenant_id, timeline_id = %timeline_id))
     121                 :         );
     122                 : 
     123            1103 :         Self {
     124            1103 :             timeline: TenantTimelineId::new(tenant_id, timeline_id),
     125            1103 :             manager_status,
     126            1103 :         }
     127            1103 :     }
     128                 : 
     129             175 :     pub async fn stop(self) {
     130             175 :         task_mgr::shutdown_tasks(
     131             175 :             Some(TaskKind::WalReceiverManager),
     132             175 :             Some(self.timeline.tenant_id),
     133             175 :             Some(self.timeline.timeline_id),
     134             175 :         )
     135              75 :         .await;
     136             175 :     }
     137                 : 
     138            2513 :     pub(crate) fn status(&self) -> Option<ConnectionManagerStatus> {
     139            2513 :         self.manager_status.read().unwrap().clone()
     140            2513 :     }
     141                 : }
     142                 : 
     143                 : /// A handle of an asynchronous task.
     144                 : /// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`]
     145                 : /// and a cancellation token that it can listen to for earlier interrupts.
     146                 : ///
     147                 : /// Note that the communication happens via the `watch` channel, that does not accumulate the events, replacing the old one with the never one on submission.
     148                 : /// That may lead to certain events not being observed by the listener.
     149 UBC           0 : #[derive(Debug)]
     150                 : struct TaskHandle<E> {
     151                 :     join_handle: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
     152                 :     events_receiver: watch::Receiver<TaskStateUpdate<E>>,
     153                 :     cancellation: CancellationToken,
     154                 : }
     155                 : 
     156                 : enum TaskEvent<E> {
     157                 :     Update(TaskStateUpdate<E>),
     158                 :     End(anyhow::Result<()>),
     159                 : }
     160                 : 
     161 CBC      751840 : #[derive(Debug, Clone)]
     162                 : enum TaskStateUpdate<E> {
     163                 :     Started,
     164                 :     Progress(E),
     165                 : }
     166                 : 
     167                 : impl<E: Clone> TaskHandle<E> {
     168                 :     /// Initializes the task, starting it immediately after the creation.
     169            1193 :     fn spawn<Fut>(
     170            1193 :         task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, CancellationToken) -> Fut + Send + 'static,
     171            1193 :     ) -> Self
     172            1193 :     where
     173            1193 :         Fut: Future<Output = anyhow::Result<()>> + Send,
     174            1193 :         E: Send + Sync + 'static,
     175            1193 :     {
     176            1193 :         let cancellation = CancellationToken::new();
     177            1193 :         let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started);
     178            1193 : 
     179            1193 :         let cancellation_clone = cancellation.clone();
     180            1193 :         let join_handle = WALRECEIVER_RUNTIME.spawn(async move {
     181            1193 :             events_sender.send(TaskStateUpdate::Started).ok();
     182         5085286 :             task(events_sender, cancellation_clone).await
     183                 :             // events_sender is dropped at some point during the .await above.
     184                 :             // But the task is still running on WALRECEIVER_RUNTIME.
     185                 :             // That is the window when `!jh.is_finished()`
     186                 :             // is true inside `fn next_task_event()` below.
     187            1193 :         });
     188            1193 : 
     189            1193 :         TaskHandle {
     190            1193 :             join_handle: Some(join_handle),
     191            1193 :             events_receiver,
     192            1193 :             cancellation,
     193            1193 :         }
     194            1193 :     }
     195                 : 
     196          758252 :     async fn next_task_event(&mut self) -> TaskEvent<E> {
     197          758252 :         match self.events_receiver.changed().await {
     198          751840 :             Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
     199             895 :             Err(_task_channel_part_dropped) => {
     200             895 :                 TaskEvent::End(match self.join_handle.as_mut() {
     201             895 :                     Some(jh) => {
     202             895 :                         if !jh.is_finished() {
     203                 :                             // See: https://github.com/neondatabase/neon/issues/2885
     204 UBC           0 :                             trace!("sender is dropped while join handle is still alive");
     205 CBC         867 :                         }
     206                 : 
     207             895 :                         let res = match jh.await {
     208             895 :                             Ok(res) => res,
     209 UBC           0 :                             Err(je) if je.is_cancelled() => unreachable!("not used"),
     210               0 :                             Err(je) if je.is_panic() => {
     211               0 :                                 // already logged
     212               0 :                                 Ok(())
     213                 :                             }
     214               0 :                             Err(je) => Err(anyhow::Error::new(je).context("join walreceiver task")),
     215                 :                         };
     216                 : 
     217                 :                         // For cancellation-safety, drop join_handle only after successful .await.
     218 CBC         895 :                         self.join_handle = None;
     219             895 : 
     220             895 :                         res
     221                 :                     }
     222                 :                     None => {
     223                 :                         // Another option is to have an enum, join handle or result and give away the reference to it
     224 UBC           0 :                         Err(anyhow::anyhow!("Task was joined more than once"))
     225                 :                     }
     226                 :                 })
     227                 :             }
     228                 :         }
     229 CBC      752735 :     }
     230                 : 
     231                 :     /// Aborts current task, waiting for it to finish.
     232             227 :     async fn shutdown(self) {
     233             227 :         if let Some(jh) = self.join_handle {
     234             227 :             self.cancellation.cancel();
     235             275 :             match jh.await {
     236 UBC           0 :                 Ok(Ok(())) => debug!("Shutdown success"),
     237               0 :                 Ok(Err(e)) => error!("Shutdown task error: {e:?}"),
     238               0 :                 Err(je) if je.is_cancelled() => unreachable!("not used"),
     239               0 :                 Err(je) if je.is_panic() => {
     240               0 :                     // already logged
     241               0 :                 }
     242               0 :                 Err(je) => {
     243               0 :                     error!("Shutdown task join error: {je}")
     244                 :                 }
     245                 :             }
     246               0 :         }
     247 CBC         227 :     }
     248                 : }
        

Generated by: LCOV version 2.1-beta