Line data Source code
1 : //! The timeline manager task is responsible for managing the timeline's background tasks.
2 : //! It is spawned alongside each timeline and exits when the timeline is deleted.
3 : //! It watches for changes in the timeline state and decides when to spawn or kill background tasks.
4 : //! It also can manage some reactive state, like should the timeline be active for broker pushes or not.
5 :
6 : use std::{
7 : sync::Arc,
8 : time::{Duration, Instant},
9 : };
10 :
11 : use postgres_ffi::XLogSegNo;
12 : use tokio::task::{JoinError, JoinHandle};
13 : use tracing::{info, info_span, instrument, warn, Instrument};
14 : use utils::lsn::Lsn;
15 :
16 : use crate::{
17 : control_file::Storage,
18 : metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL},
19 : recovery::recovery_main,
20 : remove_wal::calc_horizon_lsn,
21 : send_wal::WalSenders,
22 : timeline::{PeerInfo, ReadGuardSharedState, Timeline},
23 : timelines_set::{TimelineSetGuard, TimelinesSet},
24 : wal_backup::{self, WalBackupTaskHandle},
25 : wal_backup_partial, SafeKeeperConf,
26 : };
27 :
28 : pub struct StateSnapshot {
29 : // inmem values
30 : pub commit_lsn: Lsn,
31 : pub backup_lsn: Lsn,
32 : pub remote_consistent_lsn: Lsn,
33 :
34 : // persistent control file values
35 : pub cfile_peer_horizon_lsn: Lsn,
36 : pub cfile_remote_consistent_lsn: Lsn,
37 : pub cfile_backup_lsn: Lsn,
38 :
39 : // misc
40 : pub cfile_last_persist_at: Instant,
41 : pub inmem_flush_pending: bool,
42 : pub wal_removal_on_hold: bool,
43 : pub peers: Vec<PeerInfo>,
44 : }
45 :
46 : impl StateSnapshot {
47 : /// Create a new snapshot of the timeline state.
48 0 : fn new(read_guard: ReadGuardSharedState, heartbeat_timeout: Duration) -> Self {
49 0 : Self {
50 0 : commit_lsn: read_guard.sk.state.inmem.commit_lsn,
51 0 : backup_lsn: read_guard.sk.state.inmem.backup_lsn,
52 0 : remote_consistent_lsn: read_guard.sk.state.inmem.remote_consistent_lsn,
53 0 : cfile_peer_horizon_lsn: read_guard.sk.state.peer_horizon_lsn,
54 0 : cfile_remote_consistent_lsn: read_guard.sk.state.remote_consistent_lsn,
55 0 : cfile_backup_lsn: read_guard.sk.state.backup_lsn,
56 0 : cfile_last_persist_at: read_guard.sk.state.pers.last_persist_at(),
57 0 : inmem_flush_pending: Self::has_unflushed_inmem_state(&read_guard),
58 0 : wal_removal_on_hold: read_guard.wal_removal_on_hold,
59 0 : peers: read_guard.get_peers(heartbeat_timeout),
60 0 : }
61 0 : }
62 :
63 0 : fn has_unflushed_inmem_state(read_guard: &ReadGuardSharedState) -> bool {
64 0 : let state = &read_guard.sk.state;
65 0 : state.inmem.commit_lsn > state.commit_lsn
66 0 : || state.inmem.backup_lsn > state.backup_lsn
67 0 : || state.inmem.peer_horizon_lsn > state.peer_horizon_lsn
68 0 : || state.inmem.remote_consistent_lsn > state.remote_consistent_lsn
69 0 : }
70 : }
71 :
72 : /// Control how often the manager task should wake up to check updates.
73 : /// There is no need to check for updates more often than this.
74 : const REFRESH_INTERVAL: Duration = Duration::from_millis(300);
75 :
76 : /// How often to save the control file if the is no other activity.
77 : const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300);
78 :
79 : /// This task gets spawned alongside each timeline and is responsible for managing the timeline's
80 : /// background tasks.
81 : /// Be careful, this task is not respawned on panic, so it should not panic.
82 0 : #[instrument(name = "manager", skip_all, fields(ttid = %tli.ttid))]
83 : pub async fn main_task(
84 : tli: Arc<Timeline>,
85 : conf: SafeKeeperConf,
86 : broker_active_set: Arc<TimelinesSet>,
87 : ) {
88 : scopeguard::defer! {
89 : if tli.is_cancelled() {
90 : info!("manager task finished");
91 : } else {
92 : warn!("manager task finished prematurely");
93 : }
94 : };
95 :
96 : // configuration & dependencies
97 : let wal_seg_size = tli.get_wal_seg_size().await;
98 : let heartbeat_timeout = conf.heartbeat_timeout;
99 : let walsenders = tli.get_walsenders();
100 : let walreceivers = tli.get_walreceivers();
101 :
102 : // current state
103 : let mut state_version_rx = tli.get_state_version_rx();
104 : let mut num_computes_rx = walreceivers.get_num_rx();
105 : let mut tli_broker_active = broker_active_set.guard(tli.clone());
106 : let mut last_removed_segno = 0 as XLogSegNo;
107 :
108 : // list of background tasks
109 : let mut backup_task: Option<WalBackupTaskHandle> = None;
110 : let mut recovery_task: Option<JoinHandle<()>> = None;
111 : let mut partial_backup_task: Option<JoinHandle<()>> = None;
112 : let mut wal_removal_task: Option<JoinHandle<anyhow::Result<u64>>> = None;
113 :
114 : // Start recovery task which always runs on the timeline.
115 : if conf.peer_recovery_enabled {
116 : match tli.full_access_guard().await {
117 : Ok(tli) => {
118 : recovery_task = Some(tokio::spawn(recovery_main(tli, conf.clone())));
119 : }
120 : Err(e) => {
121 : warn!("failed to start recovery task: {:?}", e);
122 : }
123 : }
124 : }
125 :
126 : // Start partial backup task which always runs on the timeline.
127 : if conf.is_wal_backup_enabled() && conf.partial_backup_enabled {
128 : match tli.full_access_guard().await {
129 : Ok(tli) => {
130 : partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task(
131 : tli,
132 : conf.clone(),
133 : )));
134 : }
135 : Err(e) => {
136 : warn!("failed to start partial backup task: {:?}", e);
137 : }
138 : }
139 : }
140 :
141 : let last_state = 'outer: loop {
142 : MANAGER_ITERATIONS_TOTAL.inc();
143 :
144 : let state_snapshot = StateSnapshot::new(tli.read_shared_state().await, heartbeat_timeout);
145 : let num_computes = *num_computes_rx.borrow();
146 :
147 : let is_wal_backup_required = update_backup(
148 : &conf,
149 : &tli,
150 : wal_seg_size,
151 : num_computes,
152 : &state_snapshot,
153 : &mut backup_task,
154 : )
155 : .await;
156 :
157 : let _is_active = update_is_active(
158 : is_wal_backup_required,
159 : num_computes,
160 : &state_snapshot,
161 : &mut tli_broker_active,
162 : &tli,
163 : );
164 :
165 : let next_cfile_save = update_control_file_save(&state_snapshot, &tli).await;
166 :
167 : update_wal_removal(
168 : &conf,
169 : walsenders,
170 : &tli,
171 : wal_seg_size,
172 : &state_snapshot,
173 : last_removed_segno,
174 : &mut wal_removal_task,
175 : )
176 : .await;
177 :
178 : // wait until something changes. tx channels are stored under Arc, so they will not be
179 : // dropped until the manager task is finished.
180 : tokio::select! {
181 : _ = tli.cancel.cancelled() => {
182 : // timeline was deleted
183 : break 'outer state_snapshot;
184 : }
185 0 : _ = async {
186 0 : // don't wake up on every state change, but at most every REFRESH_INTERVAL
187 0 : tokio::time::sleep(REFRESH_INTERVAL).await;
188 0 : let _ = state_version_rx.changed().await;
189 0 : } => {
190 : // state was updated
191 : }
192 : _ = num_computes_rx.changed() => {
193 : // number of connected computes was updated
194 : }
195 0 : _ = async {
196 0 : if let Some(timeout) = next_cfile_save {
197 0 : tokio::time::sleep_until(timeout).await
198 : } else {
199 0 : futures::future::pending().await
200 : }
201 0 : } => {
202 : // it's time to save the control file
203 : }
204 0 : res = async {
205 0 : if let Some(task) = &mut wal_removal_task {
206 0 : task.await
207 : } else {
208 0 : futures::future::pending().await
209 : }
210 0 : } => {
211 : // WAL removal task finished
212 : wal_removal_task = None;
213 : update_wal_removal_end(res, &tli, &mut last_removed_segno);
214 : }
215 : }
216 : };
217 :
218 : // remove timeline from the broker active set sooner, before waiting for background tasks
219 : tli_broker_active.set(false);
220 :
221 : // shutdown background tasks
222 : if conf.is_wal_backup_enabled() {
223 : wal_backup::update_task(&conf, &tli, false, &last_state, &mut backup_task).await;
224 : }
225 :
226 : if let Some(recovery_task) = recovery_task {
227 : if let Err(e) = recovery_task.await {
228 : warn!("recovery task failed: {:?}", e);
229 : }
230 : }
231 :
232 : if let Some(partial_backup_task) = partial_backup_task {
233 : if let Err(e) = partial_backup_task.await {
234 : warn!("partial backup task failed: {:?}", e);
235 : }
236 : }
237 :
238 : if let Some(wal_removal_task) = wal_removal_task {
239 : let res = wal_removal_task.await;
240 : update_wal_removal_end(res, &tli, &mut last_removed_segno);
241 : }
242 : }
243 :
244 : /// Spawns/kills backup task and returns true if backup is required.
245 0 : async fn update_backup(
246 0 : conf: &SafeKeeperConf,
247 0 : tli: &Arc<Timeline>,
248 0 : wal_seg_size: usize,
249 0 : num_computes: usize,
250 0 : state: &StateSnapshot,
251 0 : backup_task: &mut Option<WalBackupTaskHandle>,
252 0 : ) -> bool {
253 0 : let is_wal_backup_required =
254 0 : wal_backup::is_wal_backup_required(wal_seg_size, num_computes, state);
255 0 :
256 0 : if conf.is_wal_backup_enabled() {
257 0 : wal_backup::update_task(conf, tli, is_wal_backup_required, state, backup_task).await;
258 0 : }
259 :
260 : // update the state in Arc<Timeline>
261 0 : tli.wal_backup_active
262 0 : .store(backup_task.is_some(), std::sync::atomic::Ordering::Relaxed);
263 0 : is_wal_backup_required
264 0 : }
265 :
266 : /// Update is_active flag and returns its value.
267 0 : fn update_is_active(
268 0 : is_wal_backup_required: bool,
269 0 : num_computes: usize,
270 0 : state: &StateSnapshot,
271 0 : tli_broker_active: &mut TimelineSetGuard,
272 0 : tli: &Arc<Timeline>,
273 0 : ) -> bool {
274 0 : let is_active = is_wal_backup_required
275 0 : || num_computes > 0
276 0 : || state.remote_consistent_lsn < state.commit_lsn;
277 :
278 : // update the broker timeline set
279 0 : if tli_broker_active.set(is_active) {
280 : // write log if state has changed
281 0 : info!(
282 0 : "timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
283 : is_active, state.remote_consistent_lsn, state.commit_lsn,
284 : );
285 :
286 0 : MANAGER_ACTIVE_CHANGES.inc();
287 0 : }
288 :
289 : // update the state in Arc<Timeline>
290 0 : tli.broker_active
291 0 : .store(is_active, std::sync::atomic::Ordering::Relaxed);
292 0 : is_active
293 0 : }
294 :
295 : /// Save control file if needed. Returns Instant if we should persist the control file in the future.
296 0 : async fn update_control_file_save(
297 0 : state: &StateSnapshot,
298 0 : tli: &Arc<Timeline>,
299 0 : ) -> Option<tokio::time::Instant> {
300 0 : if !state.inmem_flush_pending {
301 0 : return None;
302 0 : }
303 0 :
304 0 : if state.cfile_last_persist_at.elapsed() > CF_SAVE_INTERVAL {
305 0 : let mut write_guard = tli.write_shared_state().await;
306 : // this can be done in the background because it blocks manager task, but flush() should
307 : // be fast enough not to be a problem now
308 0 : if let Err(e) = write_guard.sk.state.flush().await {
309 0 : warn!("failed to save control file: {:?}", e);
310 0 : }
311 :
312 0 : None
313 : } else {
314 : // we should wait until next CF_SAVE_INTERVAL
315 0 : Some((state.cfile_last_persist_at + CF_SAVE_INTERVAL).into())
316 : }
317 0 : }
318 :
319 : /// Spawns WAL removal task if needed.
320 0 : async fn update_wal_removal(
321 0 : conf: &SafeKeeperConf,
322 0 : walsenders: &Arc<WalSenders>,
323 0 : tli: &Arc<Timeline>,
324 0 : wal_seg_size: usize,
325 0 : state: &StateSnapshot,
326 0 : last_removed_segno: u64,
327 0 : wal_removal_task: &mut Option<JoinHandle<anyhow::Result<u64>>>,
328 0 : ) {
329 0 : if wal_removal_task.is_some() || state.wal_removal_on_hold {
330 : // WAL removal is already in progress or hold off
331 0 : return;
332 0 : }
333 :
334 : // If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
335 : // This allows to get better read speed for pageservers that are lagging behind,
336 : // at the cost of keeping more WAL on disk.
337 0 : let replication_horizon_lsn = if conf.walsenders_keep_horizon {
338 0 : walsenders.laggard_lsn()
339 : } else {
340 0 : None
341 : };
342 :
343 0 : let removal_horizon_lsn = calc_horizon_lsn(state, replication_horizon_lsn);
344 0 : let removal_horizon_segno = removal_horizon_lsn
345 0 : .segment_number(wal_seg_size)
346 0 : .saturating_sub(1);
347 0 :
348 0 : if removal_horizon_segno > last_removed_segno {
349 : // we need to remove WAL
350 0 : let remover = crate::wal_storage::Storage::remove_up_to(
351 0 : &tli.read_shared_state().await.sk.wal_store,
352 0 : removal_horizon_segno,
353 : );
354 0 : *wal_removal_task = Some(tokio::spawn(
355 0 : async move {
356 0 : remover.await?;
357 0 : Ok(removal_horizon_segno)
358 0 : }
359 0 : .instrument(info_span!("WAL removal", ttid=%tli.ttid)),
360 : ));
361 0 : }
362 0 : }
363 :
364 : /// Update the state after WAL removal task finished.
365 0 : fn update_wal_removal_end(
366 0 : res: Result<anyhow::Result<u64>, JoinError>,
367 0 : tli: &Arc<Timeline>,
368 0 : last_removed_segno: &mut u64,
369 0 : ) {
370 0 : let new_last_removed_segno = match res {
371 0 : Ok(Ok(segno)) => segno,
372 0 : Err(e) => {
373 0 : warn!("WAL removal task failed: {:?}", e);
374 0 : return;
375 : }
376 0 : Ok(Err(e)) => {
377 0 : warn!("WAL removal task failed: {:?}", e);
378 0 : return;
379 : }
380 : };
381 :
382 0 : *last_removed_segno = new_last_removed_segno;
383 0 : // update the state in Arc<Timeline>
384 0 : tli.last_removed_segno
385 0 : .store(new_last_removed_segno, std::sync::atomic::Ordering::Relaxed);
386 0 : }
|