Line data Source code
1 : //! WAL receiver manages an open connection to safekeeper, to get the WAL it streams into.
2 : //! To do so, a current implementation needs to do the following:
3 : //!
4 : //! * acknowledge the timelines that it needs to stream WAL into.
5 : //! Pageserver is able to dynamically (un)load tenants on attach and detach,
6 : //! hence WAL receiver needs to react on such events.
7 : //!
8 : //! * get a broker subscription, stream data from it to determine that a timeline needs WAL streaming.
9 : //! For that, it watches specific keys in storage_broker and pulls the relevant data periodically.
10 : //! The data is produced by safekeepers, that push it periodically and pull it to synchronize between each other.
11 : //! Without this data, no WAL streaming is possible currently.
12 : //!
13 : //! Only one active WAL streaming connection is allowed at a time.
14 : //! The connection is supposed to be updated periodically, based on safekeeper timeline data.
15 : //!
16 : //! * handle the actual connection and WAL streaming
17 : //!
18 : //! Handling happens dynamically, by portions of WAL being processed and registered in the server.
19 : //! Along with the registration, certain metadata is written to show WAL streaming progress and rely on that when considering safekeepers for connection.
20 : //!
21 : //! The current module contains high-level primitives used in the submodules; general synchronization, timeline acknowledgement and shutdown logic.
22 :
23 : mod connection_manager;
24 : mod walreceiver_connection;
25 :
26 : use crate::context::{DownloadBehavior, RequestContext};
27 : use crate::task_mgr::{self, TaskKind, WALRECEIVER_RUNTIME};
28 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
29 : use crate::tenant::timeline::walreceiver::connection_manager::{
30 : connection_manager_loop_step, ConnectionManagerState,
31 : };
32 :
33 : use pageserver_api::shard::TenantShardId;
34 : use std::future::Future;
35 : use std::num::NonZeroU64;
36 : use std::ops::ControlFlow;
37 : use std::sync::Arc;
38 : use std::time::Duration;
39 : use storage_broker::BrokerClientChannel;
40 : use tokio::select;
41 : use tokio::sync::watch;
42 : use tokio_util::sync::CancellationToken;
43 : use tracing::*;
44 :
45 : use utils::id::TimelineId;
46 :
47 : use self::connection_manager::ConnectionManagerStatus;
48 :
49 : use super::Timeline;
50 :
51 0 : #[derive(Clone)]
52 : pub struct WalReceiverConf {
53 : /// The timeout on the connection to safekeeper for WAL streaming.
54 : pub wal_connect_timeout: Duration,
55 : /// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
56 : pub lagging_wal_timeout: Duration,
57 : /// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one.
58 : pub max_lsn_wal_lag: NonZeroU64,
59 : pub auth_token: Option<Arc<String>>,
60 : pub availability_zone: Option<String>,
61 : pub ingest_batch_size: u64,
62 : }
63 :
64 : pub struct WalReceiver {
65 : tenant_shard_id: TenantShardId,
66 : timeline_id: TimelineId,
67 : manager_status: Arc<std::sync::RwLock<Option<ConnectionManagerStatus>>>,
68 : }
69 :
70 : impl WalReceiver {
71 1235 : pub fn start(
72 1235 : timeline: Arc<Timeline>,
73 1235 : conf: WalReceiverConf,
74 1235 : mut broker_client: BrokerClientChannel,
75 1235 : ctx: &RequestContext,
76 1235 : ) -> Self {
77 1235 : let tenant_shard_id = timeline.tenant_shard_id;
78 1235 : let timeline_id = timeline.timeline_id;
79 1235 : let walreceiver_ctx =
80 1235 : ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
81 1235 :
82 1235 : let loop_status = Arc::new(std::sync::RwLock::new(None));
83 1235 : let manager_status = Arc::clone(&loop_status);
84 1235 : task_mgr::spawn(
85 1235 : WALRECEIVER_RUNTIME.handle(),
86 1235 : TaskKind::WalReceiverManager,
87 1235 : Some(timeline.tenant_shard_id),
88 1235 : Some(timeline_id),
89 1235 : &format!("walreceiver for timeline {tenant_shard_id}/{timeline_id}"),
90 : false,
91 1235 : async move {
92 1235 : debug_assert_current_span_has_tenant_and_timeline_id();
93 1235 : debug!("WAL receiver manager started, connecting to broker");
94 1235 : let mut connection_manager_state = ConnectionManagerState::new(
95 1235 : timeline,
96 1235 : conf,
97 1235 : );
98 1235 : loop {
99 717497 : select! {
100 : _ = task_mgr::shutdown_watcher() => {
101 0 : trace!("WAL receiver shutdown requested, shutting down");
102 : break;
103 : },
104 483 : loop_step_result = connection_manager_loop_step(
105 : &mut broker_client,
106 : &mut connection_manager_state,
107 : &walreceiver_ctx,
108 : &loop_status,
109 : ) => match loop_step_result {
110 : ControlFlow::Continue(()) => continue,
111 : ControlFlow::Break(()) => {
112 0 : trace!("Connection manager loop ended, shutting down");
113 : break;
114 : }
115 : },
116 1235 : }
117 1235 : }
118 :
119 545 : connection_manager_state.shutdown().await;
120 545 : *loop_status.write().unwrap() = None;
121 545 : Ok(())
122 545 : }
123 1235 : .instrument(info_span!(parent: None, "wal_connection_manager", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), timeline_id = %timeline_id))
124 : );
125 :
126 1235 : Self {
127 1235 : tenant_shard_id,
128 1235 : timeline_id,
129 1235 : manager_status,
130 1235 : }
131 1235 : }
132 :
133 160 : pub async fn stop(self) {
134 160 : task_mgr::shutdown_tasks(
135 160 : Some(TaskKind::WalReceiverManager),
136 160 : Some(self.tenant_shard_id),
137 160 : Some(self.timeline_id),
138 160 : )
139 48 : .await;
140 160 : }
141 :
142 2953 : pub(crate) fn status(&self) -> Option<ConnectionManagerStatus> {
143 2953 : self.manager_status.read().unwrap().clone()
144 2953 : }
145 : }
146 :
147 : /// A handle of an asynchronous task.
148 : /// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`]
149 : /// and a cancellation token that it can listen to for earlier interrupts.
150 : ///
151 : /// Note that the communication happens via the `watch` channel, that does not accumulate the events, replacing the old one with the never one on submission.
152 : /// That may lead to certain events not being observed by the listener.
153 0 : #[derive(Debug)]
154 : struct TaskHandle<E> {
155 : join_handle: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
156 : events_receiver: watch::Receiver<TaskStateUpdate<E>>,
157 : cancellation: CancellationToken,
158 : }
159 :
160 : enum TaskEvent<E> {
161 : Update(TaskStateUpdate<E>),
162 : End(anyhow::Result<()>),
163 : }
164 :
165 726284 : #[derive(Debug, Clone)]
166 : enum TaskStateUpdate<E> {
167 : Started,
168 : Progress(E),
169 : }
170 :
171 : impl<E: Clone> TaskHandle<E> {
172 : /// Initializes the task, starting it immediately after the creation.
173 1712 : fn spawn<Fut>(
174 1712 : task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, CancellationToken) -> Fut + Send + 'static,
175 1712 : ) -> Self
176 1712 : where
177 1712 : Fut: Future<Output = anyhow::Result<()>> + Send,
178 1712 : E: Send + Sync + 'static,
179 1712 : {
180 1712 : let cancellation = CancellationToken::new();
181 1712 : let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started);
182 1712 :
183 1712 : let cancellation_clone = cancellation.clone();
184 1712 : let join_handle = WALRECEIVER_RUNTIME.spawn(async move {
185 1712 : events_sender.send(TaskStateUpdate::Started).ok();
186 849884 : task(events_sender, cancellation_clone).await
187 : // events_sender is dropped at some point during the .await above.
188 : // But the task is still running on WALRECEIVER_RUNTIME.
189 : // That is the window when `!jh.is_finished()`
190 : // is true inside `fn next_task_event()` below.
191 1712 : });
192 1712 :
193 1712 : TaskHandle {
194 1712 : join_handle: Some(join_handle),
195 1712 : events_receiver,
196 1712 : cancellation,
197 1712 : }
198 1712 : }
199 :
200 735449 : async fn next_task_event(&mut self) -> TaskEvent<E> {
201 735449 : match self.events_receiver.changed().await {
202 726284 : Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
203 1381 : Err(_task_channel_part_dropped) => {
204 1381 : TaskEvent::End(match self.join_handle.as_mut() {
205 1381 : Some(jh) => {
206 1381 : if !jh.is_finished() {
207 : // See: https://github.com/neondatabase/neon/issues/2885
208 0 : trace!("sender is dropped while join handle is still alive");
209 1209 : }
210 :
211 1381 : let res = match jh.await {
212 1381 : Ok(res) => res,
213 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
214 0 : Err(je) if je.is_panic() => {
215 0 : // already logged
216 0 : Ok(())
217 : }
218 0 : Err(je) => Err(anyhow::Error::new(je).context("join walreceiver task")),
219 : };
220 :
221 : // For cancellation-safety, drop join_handle only after successful .await.
222 1381 : self.join_handle = None;
223 1381 :
224 1381 : res
225 : }
226 : None => {
227 : // Another option is to have an enum, join handle or result and give away the reference to it
228 0 : Err(anyhow::anyhow!("Task was joined more than once"))
229 : }
230 : })
231 : }
232 : }
233 727665 : }
234 :
235 : /// Aborts current task, waiting for it to finish.
236 279 : async fn shutdown(self) {
237 279 : if let Some(jh) = self.join_handle {
238 279 : self.cancellation.cancel();
239 306 : match jh.await {
240 0 : Ok(Ok(())) => debug!("Shutdown success"),
241 1 : Ok(Err(e)) => error!("Shutdown task error: {e:?}"),
242 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
243 0 : Err(je) if je.is_panic() => {
244 0 : // already logged
245 0 : }
246 0 : Err(je) => {
247 0 : error!("Shutdown task join error: {je}")
248 : }
249 : }
250 0 : }
251 279 : }
252 : }
|