LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline - walreceiver.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 83.5 % 115 96
Test Date: 2024-02-07 07:37:29 Functions: 72.7 % 33 24

            Line data    Source code
       1              : //! WAL receiver manages an open connection to safekeeper, to get the WAL it streams into.
       2              : //! To do so, a current implementation needs to do the following:
       3              : //!
       4              : //! * acknowledge the timelines that it needs to stream WAL into.
       5              : //! Pageserver is able to dynamically (un)load tenants on attach and detach,
       6              : //! hence WAL receiver needs to react on such events.
       7              : //!
       8              : //! * get a broker subscription, stream data from it to determine that a timeline needs WAL streaming.
       9              : //! For that, it watches specific keys in storage_broker and pulls the relevant data periodically.
      10              : //! The data is produced by safekeepers, that push it periodically and pull it to synchronize between each other.
      11              : //! Without this data, no WAL streaming is possible currently.
      12              : //!
      13              : //! Only one active WAL streaming connection is allowed at a time.
      14              : //! The connection is supposed to be updated periodically, based on safekeeper timeline data.
      15              : //!
      16              : //! * handle the actual connection and WAL streaming
      17              : //!
      18              : //! Handling happens dynamically, by portions of WAL being processed and registered in the server.
      19              : //! Along with the registration, certain metadata is written to show WAL streaming progress and rely on that when considering safekeepers for connection.
      20              : //!
      21              : //! The current module contains high-level primitives used in the submodules; general synchronization, timeline acknowledgement and shutdown logic.
      22              : 
      23              : mod connection_manager;
      24              : mod walreceiver_connection;
      25              : 
      26              : use crate::context::{DownloadBehavior, RequestContext};
      27              : use crate::task_mgr::{self, TaskKind, WALRECEIVER_RUNTIME};
      28              : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
      29              : use crate::tenant::timeline::walreceiver::connection_manager::{
      30              :     connection_manager_loop_step, ConnectionManagerState,
      31              : };
      32              : 
      33              : use pageserver_api::shard::TenantShardId;
      34              : use std::future::Future;
      35              : use std::num::NonZeroU64;
      36              : use std::ops::ControlFlow;
      37              : use std::sync::Arc;
      38              : use std::time::Duration;
      39              : use storage_broker::BrokerClientChannel;
      40              : use tokio::select;
      41              : use tokio::sync::watch;
      42              : use tokio_util::sync::CancellationToken;
      43              : use tracing::*;
      44              : 
      45              : use utils::id::TimelineId;
      46              : 
      47              : use self::connection_manager::ConnectionManagerStatus;
      48              : 
      49              : use super::Timeline;
      50              : 
      51            0 : #[derive(Clone)]
      52              : pub struct WalReceiverConf {
      53              :     /// The timeout on the connection to safekeeper for WAL streaming.
      54              :     pub wal_connect_timeout: Duration,
      55              :     /// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
      56              :     pub lagging_wal_timeout: Duration,
      57              :     /// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one.
      58              :     pub max_lsn_wal_lag: NonZeroU64,
      59              :     pub auth_token: Option<Arc<String>>,
      60              :     pub availability_zone: Option<String>,
      61              :     pub ingest_batch_size: u64,
      62              : }
      63              : 
      64              : pub struct WalReceiver {
      65              :     tenant_shard_id: TenantShardId,
      66              :     timeline_id: TimelineId,
      67              :     manager_status: Arc<std::sync::RwLock<Option<ConnectionManagerStatus>>>,
      68              : }
      69              : 
      70              : impl WalReceiver {
      71         1235 :     pub fn start(
      72         1235 :         timeline: Arc<Timeline>,
      73         1235 :         conf: WalReceiverConf,
      74         1235 :         mut broker_client: BrokerClientChannel,
      75         1235 :         ctx: &RequestContext,
      76         1235 :     ) -> Self {
      77         1235 :         let tenant_shard_id = timeline.tenant_shard_id;
      78         1235 :         let timeline_id = timeline.timeline_id;
      79         1235 :         let walreceiver_ctx =
      80         1235 :             ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
      81         1235 : 
      82         1235 :         let loop_status = Arc::new(std::sync::RwLock::new(None));
      83         1235 :         let manager_status = Arc::clone(&loop_status);
      84         1235 :         task_mgr::spawn(
      85         1235 :             WALRECEIVER_RUNTIME.handle(),
      86         1235 :             TaskKind::WalReceiverManager,
      87         1235 :             Some(timeline.tenant_shard_id),
      88         1235 :             Some(timeline_id),
      89         1235 :             &format!("walreceiver for timeline {tenant_shard_id}/{timeline_id}"),
      90              :             false,
      91         1235 :             async move {
      92         1235 :                 debug_assert_current_span_has_tenant_and_timeline_id();
      93         1235 :                 debug!("WAL receiver manager started, connecting to broker");
      94         1235 :                 let mut connection_manager_state = ConnectionManagerState::new(
      95         1235 :                     timeline,
      96         1235 :                     conf,
      97         1235 :                 );
      98         1235 :                 loop {
      99       717497 :                     select! {
     100              :                         _ = task_mgr::shutdown_watcher() => {
     101            0 :                             trace!("WAL receiver shutdown requested, shutting down");
     102              :                             break;
     103              :                         },
     104          483 :                         loop_step_result = connection_manager_loop_step(
     105              :                             &mut broker_client,
     106              :                             &mut connection_manager_state,
     107              :                             &walreceiver_ctx,
     108              :                             &loop_status,
     109              :                         ) => match loop_step_result {
     110              :                             ControlFlow::Continue(()) => continue,
     111              :                             ControlFlow::Break(()) => {
     112            0 :                                 trace!("Connection manager loop ended, shutting down");
     113              :                                 break;
     114              :                             }
     115              :                         },
     116         1235 :                     }
     117         1235 :                 }
     118              : 
     119          545 :                 connection_manager_state.shutdown().await;
     120          545 :                 *loop_status.write().unwrap() = None;
     121          545 :                 Ok(())
     122          545 :             }
     123         1235 :             .instrument(info_span!(parent: None, "wal_connection_manager", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), timeline_id = %timeline_id))
     124              :         );
     125              : 
     126         1235 :         Self {
     127         1235 :             tenant_shard_id,
     128         1235 :             timeline_id,
     129         1235 :             manager_status,
     130         1235 :         }
     131         1235 :     }
     132              : 
     133          160 :     pub async fn stop(self) {
     134          160 :         task_mgr::shutdown_tasks(
     135          160 :             Some(TaskKind::WalReceiverManager),
     136          160 :             Some(self.tenant_shard_id),
     137          160 :             Some(self.timeline_id),
     138          160 :         )
     139           48 :         .await;
     140          160 :     }
     141              : 
     142         2953 :     pub(crate) fn status(&self) -> Option<ConnectionManagerStatus> {
     143         2953 :         self.manager_status.read().unwrap().clone()
     144         2953 :     }
     145              : }
     146              : 
     147              : /// A handle of an asynchronous task.
     148              : /// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`]
     149              : /// and a cancellation token that it can listen to for earlier interrupts.
     150              : ///
     151              : /// Note that the communication happens via the `watch` channel, that does not accumulate the events, replacing the old one with the never one on submission.
     152              : /// That may lead to certain events not being observed by the listener.
     153            0 : #[derive(Debug)]
     154              : struct TaskHandle<E> {
     155              :     join_handle: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
     156              :     events_receiver: watch::Receiver<TaskStateUpdate<E>>,
     157              :     cancellation: CancellationToken,
     158              : }
     159              : 
     160              : enum TaskEvent<E> {
     161              :     Update(TaskStateUpdate<E>),
     162              :     End(anyhow::Result<()>),
     163              : }
     164              : 
     165       726284 : #[derive(Debug, Clone)]
     166              : enum TaskStateUpdate<E> {
     167              :     Started,
     168              :     Progress(E),
     169              : }
     170              : 
     171              : impl<E: Clone> TaskHandle<E> {
     172              :     /// Initializes the task, starting it immediately after the creation.
     173         1712 :     fn spawn<Fut>(
     174         1712 :         task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, CancellationToken) -> Fut + Send + 'static,
     175         1712 :     ) -> Self
     176         1712 :     where
     177         1712 :         Fut: Future<Output = anyhow::Result<()>> + Send,
     178         1712 :         E: Send + Sync + 'static,
     179         1712 :     {
     180         1712 :         let cancellation = CancellationToken::new();
     181         1712 :         let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started);
     182         1712 : 
     183         1712 :         let cancellation_clone = cancellation.clone();
     184         1712 :         let join_handle = WALRECEIVER_RUNTIME.spawn(async move {
     185         1712 :             events_sender.send(TaskStateUpdate::Started).ok();
     186       849884 :             task(events_sender, cancellation_clone).await
     187              :             // events_sender is dropped at some point during the .await above.
     188              :             // But the task is still running on WALRECEIVER_RUNTIME.
     189              :             // That is the window when `!jh.is_finished()`
     190              :             // is true inside `fn next_task_event()` below.
     191         1712 :         });
     192         1712 : 
     193         1712 :         TaskHandle {
     194         1712 :             join_handle: Some(join_handle),
     195         1712 :             events_receiver,
     196         1712 :             cancellation,
     197         1712 :         }
     198         1712 :     }
     199              : 
     200       735449 :     async fn next_task_event(&mut self) -> TaskEvent<E> {
     201       735449 :         match self.events_receiver.changed().await {
     202       726284 :             Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
     203         1381 :             Err(_task_channel_part_dropped) => {
     204         1381 :                 TaskEvent::End(match self.join_handle.as_mut() {
     205         1381 :                     Some(jh) => {
     206         1381 :                         if !jh.is_finished() {
     207              :                             // See: https://github.com/neondatabase/neon/issues/2885
     208            0 :                             trace!("sender is dropped while join handle is still alive");
     209         1209 :                         }
     210              : 
     211         1381 :                         let res = match jh.await {
     212         1381 :                             Ok(res) => res,
     213            0 :                             Err(je) if je.is_cancelled() => unreachable!("not used"),
     214            0 :                             Err(je) if je.is_panic() => {
     215            0 :                                 // already logged
     216            0 :                                 Ok(())
     217              :                             }
     218            0 :                             Err(je) => Err(anyhow::Error::new(je).context("join walreceiver task")),
     219              :                         };
     220              : 
     221              :                         // For cancellation-safety, drop join_handle only after successful .await.
     222         1381 :                         self.join_handle = None;
     223         1381 : 
     224         1381 :                         res
     225              :                     }
     226              :                     None => {
     227              :                         // Another option is to have an enum, join handle or result and give away the reference to it
     228            0 :                         Err(anyhow::anyhow!("Task was joined more than once"))
     229              :                     }
     230              :                 })
     231              :             }
     232              :         }
     233       727665 :     }
     234              : 
     235              :     /// Aborts current task, waiting for it to finish.
     236          279 :     async fn shutdown(self) {
     237          279 :         if let Some(jh) = self.join_handle {
     238          279 :             self.cancellation.cancel();
     239          306 :             match jh.await {
     240            0 :                 Ok(Ok(())) => debug!("Shutdown success"),
     241            1 :                 Ok(Err(e)) => error!("Shutdown task error: {e:?}"),
     242            0 :                 Err(je) if je.is_cancelled() => unreachable!("not used"),
     243            0 :                 Err(je) if je.is_panic() => {
     244            0 :                     // already logged
     245            0 :                 }
     246            0 :                 Err(je) => {
     247            0 :                     error!("Shutdown task join error: {je}")
     248              :                 }
     249              :             }
     250            0 :         }
     251          279 :     }
     252              : }
        

Generated by: LCOV version 2.1-beta