Line data Source code
1 : //! WAL receiver manages an open connection to safekeeper, to get the WAL it streams into.
2 : //! To do so, a current implementation needs to do the following:
3 : //!
4 : //! * acknowledge the timelines that it needs to stream WAL into.
5 : //! Pageserver is able to dynamically (un)load tenants on attach and detach,
6 : //! hence WAL receiver needs to react on such events.
7 : //!
8 : //! * get a broker subscription, stream data from it to determine that a timeline needs WAL streaming.
9 : //! For that, it watches specific keys in storage_broker and pulls the relevant data periodically.
10 : //! The data is produced by safekeepers, that push it periodically and pull it to synchronize between each other.
11 : //! Without this data, no WAL streaming is possible currently.
12 : //!
13 : //! Only one active WAL streaming connection is allowed at a time.
14 : //! The connection is supposed to be updated periodically, based on safekeeper timeline data.
15 : //!
16 : //! * handle the actual connection and WAL streaming
17 : //!
18 : //! Handling happens dynamically, by portions of WAL being processed and registered in the server.
19 : //! Along with the registration, certain metadata is written to show WAL streaming progress and rely on that when considering safekeepers for connection.
20 : //!
21 : //! The current module contains high-level primitives used in the submodules; general synchronization, timeline acknowledgement and shutdown logic.
22 :
23 : mod connection_manager;
24 : mod walreceiver_connection;
25 :
26 : use std::future::Future;
27 : use std::num::NonZeroU64;
28 : use std::sync::Arc;
29 : use std::time::Duration;
30 :
31 : use storage_broker::BrokerClientChannel;
32 : use tokio::sync::watch;
33 : use tokio_util::sync::CancellationToken;
34 : use tracing::*;
35 : use utils::postgres_client::PostgresClientProtocol;
36 :
37 : use self::connection_manager::ConnectionManagerStatus;
38 : use super::Timeline;
39 : use crate::context::{DownloadBehavior, RequestContext};
40 : use crate::task_mgr::{TaskKind, WALRECEIVER_RUNTIME};
41 : use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
42 : use crate::tenant::timeline::walreceiver::connection_manager::{
43 : ConnectionManagerState, connection_manager_loop_step,
44 : };
45 :
46 : #[derive(Clone)]
47 : pub struct WalReceiverConf {
48 : pub protocol: PostgresClientProtocol,
49 : /// The timeout on the connection to safekeeper for WAL streaming.
50 : pub wal_connect_timeout: Duration,
51 : /// The timeout to use to determine when the current connection is "stale" and reconnect to the other one.
52 : pub lagging_wal_timeout: Duration,
53 : /// The Lsn lag to use to determine when the current connection is lagging to much behind and reconnect to the other one.
54 : pub max_lsn_wal_lag: NonZeroU64,
55 : pub auth_token: Option<Arc<String>>,
56 : pub availability_zone: Option<String>,
57 : pub ingest_batch_size: u64,
58 : pub validate_wal_contiguity: bool,
59 : }
60 :
61 : pub struct WalReceiver {
62 : manager_status: Arc<std::sync::RwLock<Option<ConnectionManagerStatus>>>,
63 : /// All task spawned by [`WalReceiver::start`] and its children are sensitive to this token.
64 : /// It's a child token of [`Timeline`] so that timeline shutdown can cancel WalReceiver tasks early for `freeze_and_flush=true`.
65 : cancel: CancellationToken,
66 : task: tokio::task::JoinHandle<()>,
67 : }
68 :
69 : impl WalReceiver {
70 0 : pub fn start(
71 0 : timeline: Arc<Timeline>,
72 0 : conf: WalReceiverConf,
73 0 : mut broker_client: BrokerClientChannel,
74 0 : ctx: &RequestContext,
75 0 : ) -> Self {
76 0 : let tenant_shard_id = timeline.tenant_shard_id;
77 0 : let timeline_id = timeline.timeline_id;
78 0 : let walreceiver_ctx =
79 0 : ctx.detached_child(TaskKind::WalReceiverManager, DownloadBehavior::Error);
80 0 : let loop_status = Arc::new(std::sync::RwLock::new(None));
81 0 : let manager_status = Arc::clone(&loop_status);
82 0 : let cancel = timeline.cancel.child_token();
83 0 : let task = WALRECEIVER_RUNTIME.spawn({
84 0 : let cancel = cancel.clone();
85 0 : async move {
86 0 : debug_assert_current_span_has_tenant_and_timeline_id();
87 : // acquire timeline gate so we know the task doesn't outlive the Timeline
88 0 : let Ok(_guard) = timeline.gate.enter() else {
89 0 : debug!("WAL receiver manager could not enter the gate timeline gate, it's closed already");
90 0 : return;
91 : };
92 0 : debug!("WAL receiver manager started, connecting to broker");
93 0 : let mut connection_manager_state = ConnectionManagerState::new(
94 0 : timeline,
95 0 : conf,
96 0 : cancel.clone(),
97 0 : );
98 0 : while !cancel.is_cancelled() {
99 0 : let loop_step_result = connection_manager_loop_step(
100 0 : &mut broker_client,
101 0 : &mut connection_manager_state,
102 0 : &walreceiver_ctx,
103 0 : &cancel,
104 0 : &loop_status,
105 0 : ).await;
106 0 : match loop_step_result {
107 0 : Ok(()) => continue,
108 0 : Err(_cancelled) => {
109 0 : trace!("Connection manager loop ended, shutting down");
110 0 : break;
111 : }
112 : }
113 : }
114 0 : connection_manager_state.shutdown().await;
115 0 : *loop_status.write().unwrap() = None;
116 0 : debug!("task exits");
117 0 : }
118 0 : .instrument(info_span!(parent: None, "wal_connection_manager", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), timeline_id = %timeline_id))
119 : });
120 :
121 0 : Self {
122 0 : manager_status,
123 0 : cancel,
124 0 : task,
125 0 : }
126 0 : }
127 :
128 : #[instrument(skip_all, level = tracing::Level::DEBUG)]
129 : pub async fn shutdown(self) {
130 : debug_assert_current_span_has_tenant_and_timeline_id();
131 : debug!("cancelling walreceiver tasks");
132 : self.cancel.cancel();
133 : match self.task.await {
134 : Ok(()) => debug!("Shutdown success"),
135 : Err(je) if je.is_cancelled() => unreachable!("not used"),
136 : Err(je) if je.is_panic() => {
137 : // already logged by panic hook
138 : }
139 : Err(je) => {
140 : error!("shutdown walreceiver task join error: {je}")
141 : }
142 : }
143 : }
144 :
145 0 : pub(crate) fn status(&self) -> Option<ConnectionManagerStatus> {
146 0 : self.manager_status.read().unwrap().clone()
147 0 : }
148 : }
149 :
150 : /// A handle of an asynchronous task.
151 : /// The task has a channel that it can use to communicate its lifecycle events in a certain form, see [`TaskEvent`]
152 : /// and a cancellation token that it can listen to for earlier interrupts.
153 : ///
154 : /// Note that the communication happens via the `watch` channel, that does not accumulate the events, replacing the old one with the never one on submission.
155 : /// That may lead to certain events not being observed by the listener.
156 : #[derive(Debug)]
157 : struct TaskHandle<E> {
158 : join_handle: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
159 : events_receiver: watch::Receiver<TaskStateUpdate<E>>,
160 : cancellation: CancellationToken,
161 : }
162 :
163 : enum TaskEvent<E> {
164 : Update(TaskStateUpdate<E>),
165 : End(anyhow::Result<()>),
166 : }
167 :
168 : #[derive(Debug, Clone)]
169 : enum TaskStateUpdate<E> {
170 : Started,
171 : Progress(E),
172 : }
173 :
174 : impl<E: Clone> TaskHandle<E> {
175 : /// Initializes the task, starting it immediately after the creation.
176 : ///
177 : /// The second argument to `task` is a child token of `cancel_parent` ([`CancellationToken::child_token`]).
178 : /// It being a child token enables us to provide a [`Self::shutdown`] method.
179 5 : fn spawn<Fut>(
180 5 : cancel_parent: &CancellationToken,
181 5 : task: impl FnOnce(watch::Sender<TaskStateUpdate<E>>, CancellationToken) -> Fut + Send + 'static,
182 5 : ) -> Self
183 5 : where
184 5 : Fut: Future<Output = anyhow::Result<()>> + Send,
185 5 : E: Send + Sync + 'static,
186 5 : {
187 5 : let cancellation = cancel_parent.child_token();
188 5 : let (events_sender, events_receiver) = watch::channel(TaskStateUpdate::Started);
189 5 :
190 5 : let cancellation_clone = cancellation.clone();
191 5 : let join_handle = WALRECEIVER_RUNTIME.spawn(async move {
192 5 : events_sender.send(TaskStateUpdate::Started).ok();
193 5 : task(events_sender, cancellation_clone).await
194 : // events_sender is dropped at some point during the .await above.
195 : // But the task is still running on WALRECEIVER_RUNTIME.
196 : // That is the window when `!jh.is_finished()`
197 : // is true inside `fn next_task_event()` below.
198 5 : });
199 5 :
200 5 : TaskHandle {
201 5 : join_handle: Some(join_handle),
202 5 : events_receiver,
203 5 : cancellation,
204 5 : }
205 5 : }
206 :
207 : /// # Cancel-Safety
208 : ///
209 : /// Cancellation-safe.
210 0 : async fn next_task_event(&mut self) -> TaskEvent<E> {
211 0 : match self.events_receiver.changed().await {
212 0 : Ok(()) => TaskEvent::Update((self.events_receiver.borrow()).clone()),
213 0 : Err(_task_channel_part_dropped) => {
214 0 : TaskEvent::End(match self.join_handle.as_mut() {
215 0 : Some(jh) => {
216 0 : if !jh.is_finished() {
217 : // See: https://github.com/neondatabase/neon/issues/2885
218 0 : trace!("sender is dropped while join handle is still alive");
219 0 : }
220 :
221 0 : let res = match jh.await {
222 0 : Ok(res) => res,
223 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
224 0 : Err(je) if je.is_panic() => {
225 0 : // already logged
226 0 : Ok(())
227 : }
228 0 : Err(je) => Err(anyhow::Error::new(je).context("join walreceiver task")),
229 : };
230 :
231 : // For cancellation-safety, drop join_handle only after successful .await.
232 0 : self.join_handle = None;
233 0 :
234 0 : res
235 : }
236 : None => {
237 : // Another option is to have an enum, join handle or result and give away the reference to it
238 0 : Err(anyhow::anyhow!("Task was joined more than once"))
239 : }
240 : })
241 : }
242 : }
243 0 : }
244 :
245 : /// Aborts current task, waiting for it to finish.
246 0 : async fn shutdown(self) {
247 0 : if let Some(jh) = self.join_handle {
248 0 : self.cancellation.cancel();
249 0 : match jh.await {
250 0 : Ok(Ok(())) => debug!("Shutdown success"),
251 0 : Ok(Err(e)) => error!("Shutdown task error: {e:?}"),
252 0 : Err(je) if je.is_cancelled() => unreachable!("not used"),
253 0 : Err(je) if je.is_panic() => {
254 0 : // already logged
255 0 : }
256 0 : Err(je) => {
257 0 : error!("Shutdown task join error: {je}")
258 : }
259 : }
260 0 : }
261 0 : }
262 : }
|