Line data Source code
1 : //! The timeline manager task is responsible for managing the timeline's background tasks.
2 : //!
3 : //! It is spawned alongside each timeline and exits when the timeline is deleted.
4 : //! It watches for changes in the timeline state and decides when to spawn or kill background tasks.
5 : //! It also can manage some reactive state, like should the timeline be active for broker pushes or not.
6 : //!
7 : //! Be aware that you need to be extra careful with manager code, because it is not respawned on panic.
8 : //! Also, if it will stuck in some branch, it will prevent any further progress in the timeline.
9 :
10 : use std::sync::Arc;
11 : use std::sync::atomic::AtomicUsize;
12 : use std::time::Duration;
13 :
14 : use futures::channel::oneshot;
15 : use postgres_ffi::XLogSegNo;
16 : use safekeeper_api::Term;
17 : use safekeeper_api::models::PeerInfo;
18 : use serde::{Deserialize, Serialize};
19 : use tokio::task::{JoinError, JoinHandle};
20 : use tokio::time::Instant;
21 : use tokio_util::sync::CancellationToken;
22 : use tracing::{Instrument, debug, info, info_span, instrument, warn};
23 : use utils::lsn::Lsn;
24 :
25 : use crate::SafeKeeperConf;
26 : use crate::control_file::{FileStorage, Storage};
27 : use crate::metrics::{
28 : MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS, NUM_EVICTED_TIMELINES,
29 : };
30 : use crate::rate_limit::{RateLimiter, rand_duration};
31 : use crate::recovery::recovery_main;
32 : use crate::remove_wal::calc_horizon_lsn;
33 : use crate::send_wal::WalSenders;
34 : use crate::state::TimelineState;
35 : use crate::timeline::{ManagerTimeline, ReadGuardSharedState, StateSK, WalResidentTimeline};
36 : use crate::timeline_guard::{AccessService, GuardId, ResidenceGuard};
37 : use crate::timelines_set::{TimelineSetGuard, TimelinesSet};
38 : use crate::wal_backup::{self, WalBackupTaskHandle};
39 : use crate::wal_backup_partial::{self, PartialBackup, PartialRemoteSegment};
40 :
41 : pub(crate) struct StateSnapshot {
42 : // inmem values
43 : pub(crate) commit_lsn: Lsn,
44 : pub(crate) backup_lsn: Lsn,
45 : pub(crate) remote_consistent_lsn: Lsn,
46 :
47 : // persistent control file values
48 : pub(crate) cfile_commit_lsn: Lsn,
49 : pub(crate) cfile_remote_consistent_lsn: Lsn,
50 : pub(crate) cfile_backup_lsn: Lsn,
51 :
52 : // latest state
53 : pub(crate) flush_lsn: Lsn,
54 : pub(crate) last_log_term: Term,
55 :
56 : // misc
57 : pub(crate) cfile_last_persist_at: std::time::Instant,
58 : pub(crate) inmem_flush_pending: bool,
59 : pub(crate) wal_removal_on_hold: bool,
60 : pub(crate) peers: Vec<PeerInfo>,
61 : }
62 :
63 : impl StateSnapshot {
64 : /// Create a new snapshot of the timeline state.
65 37 : fn new(read_guard: ReadGuardSharedState, heartbeat_timeout: Duration) -> Self {
66 37 : let state = read_guard.sk.state();
67 37 : Self {
68 37 : commit_lsn: state.inmem.commit_lsn,
69 37 : backup_lsn: state.inmem.backup_lsn,
70 37 : remote_consistent_lsn: state.inmem.remote_consistent_lsn,
71 37 : cfile_commit_lsn: state.commit_lsn,
72 37 : cfile_remote_consistent_lsn: state.remote_consistent_lsn,
73 37 : cfile_backup_lsn: state.backup_lsn,
74 37 : flush_lsn: read_guard.sk.flush_lsn(),
75 37 : last_log_term: read_guard.sk.last_log_term(),
76 37 : cfile_last_persist_at: state.pers.last_persist_at(),
77 37 : inmem_flush_pending: Self::has_unflushed_inmem_state(state),
78 37 : wal_removal_on_hold: read_guard.wal_removal_on_hold,
79 37 : peers: read_guard.get_peers(heartbeat_timeout),
80 37 : }
81 37 : }
82 :
83 37 : fn has_unflushed_inmem_state(state: &TimelineState<FileStorage>) -> bool {
84 37 : state.inmem.commit_lsn > state.commit_lsn
85 24 : || state.inmem.backup_lsn > state.backup_lsn
86 24 : || state.inmem.peer_horizon_lsn > state.peer_horizon_lsn
87 24 : || state.inmem.remote_consistent_lsn > state.remote_consistent_lsn
88 37 : }
89 : }
90 :
91 : /// Control how often the manager task should wake up to check updates.
92 : /// There is no need to check for updates more often than this.
93 : const REFRESH_INTERVAL: Duration = Duration::from_millis(300);
94 :
95 : pub enum ManagerCtlMessage {
96 : /// Request to get a guard for WalResidentTimeline, with WAL files available locally.
97 : GuardRequest(tokio::sync::oneshot::Sender<anyhow::Result<ResidenceGuard>>),
98 : /// Get a guard for WalResidentTimeline if the timeline is not currently offloaded, else None
99 : TryGuardRequest(tokio::sync::oneshot::Sender<Option<ResidenceGuard>>),
100 : /// Request to drop the guard.
101 : GuardDrop(GuardId),
102 : /// Request to reset uploaded partial backup state.
103 : BackupPartialReset(oneshot::Sender<anyhow::Result<Vec<String>>>),
104 : }
105 :
106 : impl std::fmt::Debug for ManagerCtlMessage {
107 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 0 : match self {
109 0 : ManagerCtlMessage::GuardRequest(_) => write!(f, "GuardRequest"),
110 0 : ManagerCtlMessage::TryGuardRequest(_) => write!(f, "TryGuardRequest"),
111 0 : ManagerCtlMessage::GuardDrop(id) => write!(f, "GuardDrop({:?})", id),
112 0 : ManagerCtlMessage::BackupPartialReset(_) => write!(f, "BackupPartialReset"),
113 : }
114 0 : }
115 : }
116 :
117 : pub struct ManagerCtl {
118 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
119 :
120 : // this is used to initialize manager, it will be moved out in bootstrap().
121 : init_manager_rx:
122 : std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>>>,
123 : }
124 :
125 : impl Default for ManagerCtl {
126 0 : fn default() -> Self {
127 0 : Self::new()
128 0 : }
129 : }
130 :
131 : impl ManagerCtl {
132 5 : pub fn new() -> Self {
133 5 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
134 5 : Self {
135 5 : manager_tx: tx,
136 5 : init_manager_rx: std::sync::Mutex::new(Some(rx)),
137 5 : }
138 5 : }
139 :
140 : /// Issue a new guard and wait for manager to prepare the timeline.
141 : /// Sends a message to the manager and waits for the response.
142 : /// Can be blocked indefinitely if the manager is stuck.
143 10 : pub async fn wal_residence_guard(&self) -> anyhow::Result<ResidenceGuard> {
144 10 : let (tx, rx) = tokio::sync::oneshot::channel();
145 10 : self.manager_tx.send(ManagerCtlMessage::GuardRequest(tx))?;
146 :
147 : // wait for the manager to respond with the guard
148 10 : rx.await
149 10 : .map_err(|e| anyhow::anyhow!("response read fail: {:?}", e))
150 10 : .and_then(std::convert::identity)
151 10 : }
152 :
153 : /// Issue a new guard if the timeline is currently not offloaded, else return None
154 : /// Sends a message to the manager and waits for the response.
155 : /// Can be blocked indefinitely if the manager is stuck.
156 0 : pub async fn try_wal_residence_guard(&self) -> anyhow::Result<Option<ResidenceGuard>> {
157 0 : let (tx, rx) = tokio::sync::oneshot::channel();
158 0 : self.manager_tx
159 0 : .send(ManagerCtlMessage::TryGuardRequest(tx))?;
160 :
161 : // wait for the manager to respond with the guard
162 0 : rx.await
163 0 : .map_err(|e| anyhow::anyhow!("response read fail: {:?}", e))
164 0 : }
165 :
166 : /// Request timeline manager to reset uploaded partial segment state and
167 : /// wait for the result.
168 0 : pub async fn backup_partial_reset(&self) -> anyhow::Result<Vec<String>> {
169 0 : let (tx, rx) = oneshot::channel();
170 0 : self.manager_tx
171 0 : .send(ManagerCtlMessage::BackupPartialReset(tx))
172 0 : .expect("manager task is not running");
173 0 : match rx.await {
174 0 : Ok(res) => res,
175 0 : Err(_) => anyhow::bail!("timeline manager is gone"),
176 : }
177 0 : }
178 :
179 : /// Must be called exactly once to bootstrap the manager.
180 5 : pub fn bootstrap_manager(
181 5 : &self,
182 5 : ) -> (
183 5 : tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
184 5 : tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
185 5 : ) {
186 5 : let rx = self
187 5 : .init_manager_rx
188 5 : .lock()
189 5 : .expect("mutex init_manager_rx poisoned")
190 5 : .take()
191 5 : .expect("manager already bootstrapped");
192 5 :
193 5 : (self.manager_tx.clone(), rx)
194 5 : }
195 : }
196 :
197 : pub(crate) struct Manager {
198 : // configuration & dependencies
199 : pub(crate) tli: ManagerTimeline,
200 : pub(crate) conf: SafeKeeperConf,
201 : pub(crate) wal_seg_size: usize,
202 : pub(crate) walsenders: Arc<WalSenders>,
203 :
204 : // current state
205 : pub(crate) state_version_rx: tokio::sync::watch::Receiver<usize>,
206 : pub(crate) num_computes_rx: tokio::sync::watch::Receiver<usize>,
207 : pub(crate) tli_broker_active: TimelineSetGuard,
208 : pub(crate) last_removed_segno: XLogSegNo,
209 : pub(crate) is_offloaded: bool,
210 :
211 : // background tasks
212 : pub(crate) backup_task: Option<WalBackupTaskHandle>,
213 : pub(crate) recovery_task: Option<JoinHandle<()>>,
214 : pub(crate) wal_removal_task: Option<JoinHandle<anyhow::Result<u64>>>,
215 :
216 : // partial backup
217 : pub(crate) partial_backup_task:
218 : Option<(JoinHandle<Option<PartialRemoteSegment>>, CancellationToken)>,
219 : pub(crate) partial_backup_uploaded: Option<PartialRemoteSegment>,
220 :
221 : // misc
222 : pub(crate) access_service: AccessService,
223 : pub(crate) global_rate_limiter: RateLimiter,
224 :
225 : // Anti-flapping state: we evict timelines eagerly if they are inactive, but should not
226 : // evict them if they go inactive very soon after being restored.
227 : pub(crate) evict_not_before: Instant,
228 : }
229 :
230 : /// This task gets spawned alongside each timeline and is responsible for managing the timeline's
231 : /// background tasks.
232 : /// Be careful, this task is not respawned on panic, so it should not panic.
233 : #[instrument(name = "manager", skip_all, fields(ttid = %tli.ttid))]
234 : pub async fn main_task(
235 : tli: ManagerTimeline,
236 : conf: SafeKeeperConf,
237 : broker_active_set: Arc<TimelinesSet>,
238 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
239 : mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
240 : global_rate_limiter: RateLimiter,
241 : ) {
242 : tli.set_status(Status::Started);
243 :
244 : let defer_tli = tli.tli.clone();
245 : scopeguard::defer! {
246 : if defer_tli.is_cancelled() {
247 : info!("manager task finished");
248 : } else {
249 : warn!("manager task finished prematurely");
250 : }
251 : };
252 :
253 : let mut mgr = Manager::new(
254 : tli,
255 : conf,
256 : broker_active_set,
257 : manager_tx,
258 : global_rate_limiter,
259 : )
260 : .await;
261 :
262 : // Start recovery task which always runs on the timeline.
263 : if !mgr.is_offloaded && mgr.conf.peer_recovery_enabled {
264 : // Recovery task is only spawned if we can get a residence guard (i.e. timeline is not already shutting down)
265 : if let Ok(tli) = mgr.wal_resident_timeline() {
266 : mgr.recovery_task = Some(tokio::spawn(recovery_main(tli, mgr.conf.clone())));
267 : }
268 : }
269 :
270 : // If timeline is evicted, reflect that in the metric.
271 : if mgr.is_offloaded {
272 : NUM_EVICTED_TIMELINES.inc();
273 : }
274 :
275 : let last_state = 'outer: loop {
276 : MANAGER_ITERATIONS_TOTAL.inc();
277 :
278 : mgr.set_status(Status::StateSnapshot);
279 : let state_snapshot = mgr.state_snapshot().await;
280 :
281 : let mut next_event: Option<Instant> = None;
282 : if !mgr.is_offloaded {
283 : let num_computes = *mgr.num_computes_rx.borrow();
284 :
285 : mgr.set_status(Status::UpdateBackup);
286 : let is_wal_backup_required = mgr.update_backup(num_computes, &state_snapshot).await;
287 : mgr.update_is_active(is_wal_backup_required, num_computes, &state_snapshot);
288 :
289 : mgr.set_status(Status::UpdateControlFile);
290 : mgr.update_control_file_save(&state_snapshot, &mut next_event)
291 : .await;
292 :
293 : mgr.set_status(Status::UpdateWalRemoval);
294 : mgr.update_wal_removal(&state_snapshot).await;
295 :
296 : mgr.set_status(Status::UpdatePartialBackup);
297 : mgr.update_partial_backup(&state_snapshot).await;
298 :
299 : let now = Instant::now();
300 : if mgr.evict_not_before > now {
301 : // we should wait until evict_not_before
302 : update_next_event(&mut next_event, mgr.evict_not_before);
303 : }
304 :
305 : if mgr.conf.enable_offload
306 : && mgr.evict_not_before <= now
307 : && mgr.ready_for_eviction(&next_event, &state_snapshot)
308 : {
309 : // check rate limiter and evict timeline if possible
310 : match mgr.global_rate_limiter.try_acquire_eviction() {
311 : Some(_permit) => {
312 : mgr.set_status(Status::EvictTimeline);
313 : if !mgr.evict_timeline().await {
314 : // eviction failed, try again later
315 : mgr.evict_not_before =
316 : Instant::now() + rand_duration(&mgr.conf.eviction_min_resident);
317 : update_next_event(&mut next_event, mgr.evict_not_before);
318 : }
319 : }
320 : None => {
321 : // we can't evict timeline now, will try again later
322 : mgr.evict_not_before =
323 : Instant::now() + rand_duration(&mgr.conf.eviction_min_resident);
324 : update_next_event(&mut next_event, mgr.evict_not_before);
325 : }
326 : }
327 : }
328 : }
329 :
330 : mgr.set_status(Status::Wait);
331 : // wait until something changes. tx channels are stored under Arc, so they will not be
332 : // dropped until the manager task is finished.
333 : tokio::select! {
334 : _ = mgr.tli.cancel.cancelled() => {
335 : // timeline was deleted
336 : break 'outer state_snapshot;
337 : }
338 33 : _ = async {
339 33 : // don't wake up on every state change, but at most every REFRESH_INTERVAL
340 33 : tokio::time::sleep(REFRESH_INTERVAL).await;
341 5 : let _ = mgr.state_version_rx.changed().await;
342 5 : } => {
343 : // state was updated
344 : }
345 : _ = mgr.num_computes_rx.changed() => {
346 : // number of connected computes was updated
347 : }
348 : _ = sleep_until(&next_event) => {
349 : // we were waiting for some event (e.g. cfile save)
350 : }
351 : res = await_task_finish(mgr.wal_removal_task.as_mut()) => {
352 : // WAL removal task finished
353 : mgr.wal_removal_task = None;
354 : mgr.update_wal_removal_end(res);
355 : }
356 0 : res = await_task_finish(mgr.partial_backup_task.as_mut().map(|(handle, _)| handle)) => {
357 : // partial backup task finished
358 : mgr.partial_backup_task = None;
359 : mgr.update_partial_backup_end(res);
360 : }
361 :
362 : msg = manager_rx.recv() => {
363 : mgr.set_status(Status::HandleMessage);
364 : mgr.handle_message(msg).await;
365 : }
366 : }
367 : };
368 : mgr.set_status(Status::Exiting);
369 :
370 : // remove timeline from the broker active set sooner, before waiting for background tasks
371 : mgr.tli_broker_active.set(false);
372 :
373 : // shutdown background tasks
374 : if mgr.conf.is_wal_backup_enabled() {
375 : if let Some(backup_task) = mgr.backup_task.take() {
376 : // If we fell through here, then the timeline is shutting down. This is important
377 : // because otherwise joining on the wal_backup handle might hang.
378 : assert!(mgr.tli.cancel.is_cancelled());
379 :
380 : backup_task.join().await;
381 : }
382 : wal_backup::update_task(&mut mgr, false, &last_state).await;
383 : }
384 :
385 : if let Some(recovery_task) = &mut mgr.recovery_task {
386 : if let Err(e) = recovery_task.await {
387 : warn!("recovery task failed: {:?}", e);
388 : }
389 : }
390 :
391 : if let Some((handle, cancel)) = &mut mgr.partial_backup_task {
392 : cancel.cancel();
393 : if let Err(e) = handle.await {
394 : warn!("partial backup task failed: {:?}", e);
395 : }
396 : }
397 :
398 : if let Some(wal_removal_task) = &mut mgr.wal_removal_task {
399 : let res = wal_removal_task.await;
400 : mgr.update_wal_removal_end(res);
401 : }
402 :
403 : // If timeline is deleted while evicted decrement the gauge.
404 : if mgr.tli.is_cancelled() && mgr.is_offloaded {
405 : NUM_EVICTED_TIMELINES.dec();
406 : }
407 :
408 : mgr.set_status(Status::Finished);
409 : }
410 :
411 : impl Manager {
412 5 : async fn new(
413 5 : tli: ManagerTimeline,
414 5 : conf: SafeKeeperConf,
415 5 : broker_active_set: Arc<TimelinesSet>,
416 5 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
417 5 : global_rate_limiter: RateLimiter,
418 5 : ) -> Manager {
419 5 : let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
420 : Manager {
421 5 : wal_seg_size: tli.get_wal_seg_size().await,
422 5 : walsenders: tli.get_walsenders().clone(),
423 5 : state_version_rx: tli.get_state_version_rx(),
424 5 : num_computes_rx: tli.get_walreceivers().get_num_rx(),
425 5 : tli_broker_active: broker_active_set.guard(tli.clone()),
426 5 : last_removed_segno: 0,
427 5 : is_offloaded,
428 5 : backup_task: None,
429 5 : recovery_task: None,
430 5 : wal_removal_task: None,
431 5 : partial_backup_task: None,
432 5 : partial_backup_uploaded,
433 5 : access_service: AccessService::new(manager_tx),
434 5 : tli,
435 5 : global_rate_limiter,
436 5 : // to smooth out evictions spike after restart
437 5 : evict_not_before: Instant::now() + rand_duration(&conf.eviction_min_resident),
438 5 : conf,
439 5 : }
440 5 : }
441 :
442 239 : fn set_status(&self, status: Status) {
443 239 : self.tli.set_status(status);
444 239 : }
445 :
446 : /// Get a WalResidentTimeline.
447 : /// Manager code must use this function instead of one from `Timeline`
448 : /// directly, because it will deadlock.
449 : ///
450 : /// This function is fallible because the guard may not be created if the timeline is
451 : /// shutting down.
452 5 : pub(crate) fn wal_resident_timeline(&mut self) -> anyhow::Result<WalResidentTimeline> {
453 5 : assert!(!self.is_offloaded);
454 5 : let guard = self.access_service.create_guard(
455 5 : self.tli
456 5 : .gate
457 5 : .enter()
458 5 : .map_err(|_| anyhow::anyhow!("Timeline shutting down"))?,
459 : );
460 5 : Ok(WalResidentTimeline::new(self.tli.clone(), guard))
461 5 : }
462 :
463 : /// Get a snapshot of the timeline state.
464 37 : async fn state_snapshot(&self) -> StateSnapshot {
465 37 : let _timer = MISC_OPERATION_SECONDS
466 37 : .with_label_values(&["state_snapshot"])
467 37 : .start_timer();
468 37 :
469 37 : StateSnapshot::new(
470 37 : self.tli.read_shared_state().await,
471 37 : self.conf.heartbeat_timeout,
472 37 : )
473 37 : }
474 :
475 : /// Spawns/kills backup task and returns true if backup is required.
476 37 : async fn update_backup(&mut self, num_computes: usize, state: &StateSnapshot) -> bool {
477 37 : let is_wal_backup_required =
478 37 : wal_backup::is_wal_backup_required(self.wal_seg_size, num_computes, state);
479 37 :
480 37 : if self.conf.is_wal_backup_enabled() {
481 0 : wal_backup::update_task(self, is_wal_backup_required, state).await;
482 37 : }
483 :
484 : // update the state in Arc<Timeline>
485 37 : self.tli.wal_backup_active.store(
486 37 : self.backup_task.is_some(),
487 37 : std::sync::atomic::Ordering::Relaxed,
488 37 : );
489 37 : is_wal_backup_required
490 37 : }
491 :
492 : /// Update is_active flag and returns its value.
493 37 : fn update_is_active(
494 37 : &mut self,
495 37 : is_wal_backup_required: bool,
496 37 : num_computes: usize,
497 37 : state: &StateSnapshot,
498 37 : ) {
499 37 : let is_active = is_wal_backup_required
500 26 : || num_computes > 0
501 26 : || state.remote_consistent_lsn < state.commit_lsn;
502 :
503 : // update the broker timeline set
504 37 : if self.tli_broker_active.set(is_active) {
505 : // write log if state has changed
506 5 : info!(
507 0 : "timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
508 : is_active, state.remote_consistent_lsn, state.commit_lsn,
509 : );
510 :
511 5 : MANAGER_ACTIVE_CHANGES.inc();
512 32 : }
513 :
514 : // update the state in Arc<Timeline>
515 37 : self.tli
516 37 : .broker_active
517 37 : .store(is_active, std::sync::atomic::Ordering::Relaxed);
518 37 : }
519 :
520 : /// Save control file if needed. Returns Instant if we should persist the control file in the future.
521 37 : async fn update_control_file_save(
522 37 : &self,
523 37 : state: &StateSnapshot,
524 37 : next_event: &mut Option<Instant>,
525 37 : ) {
526 37 : if !state.inmem_flush_pending {
527 24 : return;
528 13 : }
529 13 :
530 13 : if state.cfile_last_persist_at.elapsed() > self.conf.control_file_save_interval
531 : // If the control file's commit_lsn lags more than one segment behind the current
532 : // commit_lsn, flush immediately to limit recovery time in case of a crash. We don't do
533 : // this on the WAL ingest hot path since it incurs fsync latency.
534 9 : || state.commit_lsn.saturating_sub(state.cfile_commit_lsn).0 >= self.wal_seg_size as u64
535 : {
536 4 : let mut write_guard = self.tli.write_shared_state().await;
537 : // it should be done in the background because it blocks manager task, but flush() should
538 : // be fast enough not to be a problem now
539 4 : if let Err(e) = write_guard.sk.state_mut().flush().await {
540 0 : warn!("failed to save control file: {:?}", e);
541 4 : }
542 9 : } else {
543 9 : // we should wait until some time passed until the next save
544 9 : update_next_event(
545 9 : next_event,
546 9 : (state.cfile_last_persist_at + self.conf.control_file_save_interval).into(),
547 9 : );
548 9 : }
549 37 : }
550 :
551 : /// Spawns WAL removal task if needed.
552 37 : async fn update_wal_removal(&mut self, state: &StateSnapshot) {
553 37 : if self.wal_removal_task.is_some() || state.wal_removal_on_hold {
554 : // WAL removal is already in progress or hold off
555 0 : return;
556 37 : }
557 :
558 : // If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
559 : // This allows to get better read speed for pageservers that are lagging behind,
560 : // at the cost of keeping more WAL on disk.
561 37 : let replication_horizon_lsn = if self.conf.walsenders_keep_horizon {
562 0 : self.walsenders.laggard_lsn()
563 : } else {
564 37 : None
565 : };
566 :
567 37 : let removal_horizon_lsn = calc_horizon_lsn(state, replication_horizon_lsn);
568 37 : let removal_horizon_segno = removal_horizon_lsn
569 37 : .segment_number(self.wal_seg_size)
570 37 : .saturating_sub(1);
571 37 :
572 37 : if removal_horizon_segno > self.last_removed_segno {
573 : // we need to remove WAL
574 0 : let Ok(timeline_gate_guard) = self.tli.gate.enter() else {
575 0 : tracing::info!("Timeline shutdown, not spawning WAL removal task");
576 0 : return;
577 : };
578 :
579 0 : let remover = match self.tli.read_shared_state().await.sk {
580 0 : StateSK::Loaded(ref sk) => {
581 0 : crate::wal_storage::Storage::remove_up_to(&sk.wal_store, removal_horizon_segno)
582 : }
583 : StateSK::Offloaded(_) => {
584 : // we can't remove WAL if it's not loaded
585 0 : warn!("unexpectedly trying to run WAL removal on offloaded timeline");
586 0 : return;
587 : }
588 0 : StateSK::Empty => unreachable!(),
589 : };
590 :
591 0 : self.wal_removal_task = Some(tokio::spawn(
592 0 : async move {
593 0 : let _timeline_gate_guard = timeline_gate_guard;
594 0 :
595 0 : remover.await?;
596 0 : Ok(removal_horizon_segno)
597 0 : }
598 0 : .instrument(info_span!("WAL removal", ttid=%self.tli.ttid)),
599 : ));
600 37 : }
601 37 : }
602 :
603 : /// Update the state after WAL removal task finished.
604 0 : fn update_wal_removal_end(&mut self, res: Result<anyhow::Result<u64>, JoinError>) {
605 0 : let new_last_removed_segno = match res {
606 0 : Ok(Ok(segno)) => segno,
607 0 : Err(e) => {
608 0 : warn!("WAL removal task failed: {:?}", e);
609 0 : return;
610 : }
611 0 : Ok(Err(e)) => {
612 0 : warn!("WAL removal task failed: {:?}", e);
613 0 : return;
614 : }
615 : };
616 :
617 0 : self.last_removed_segno = new_last_removed_segno;
618 0 : // update the state in Arc<Timeline>
619 0 : self.tli
620 0 : .last_removed_segno
621 0 : .store(new_last_removed_segno, std::sync::atomic::Ordering::Relaxed);
622 0 : }
623 :
624 : /// Spawns partial WAL backup task if needed.
625 37 : async fn update_partial_backup(&mut self, state: &StateSnapshot) {
626 37 : // check if WAL backup is enabled and should be started
627 37 : if !self.conf.is_wal_backup_enabled() {
628 37 : return;
629 0 : }
630 0 :
631 0 : if self.partial_backup_task.is_some() {
632 : // partial backup is already running
633 0 : return;
634 0 : }
635 0 :
636 0 : if !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded) {
637 : // nothing to upload
638 0 : return;
639 0 : }
640 :
641 0 : let Ok(resident) = self.wal_resident_timeline() else {
642 : // Shutting down
643 0 : return;
644 : };
645 :
646 : // Get WalResidentTimeline and start partial backup task.
647 0 : let cancel = CancellationToken::new();
648 0 : let handle = tokio::spawn(wal_backup_partial::main_task(
649 0 : resident,
650 0 : self.conf.clone(),
651 0 : self.global_rate_limiter.clone(),
652 0 : cancel.clone(),
653 0 : ));
654 0 : self.partial_backup_task = Some((handle, cancel));
655 37 : }
656 :
657 : /// Update the state after partial WAL backup task finished.
658 0 : fn update_partial_backup_end(&mut self, res: Result<Option<PartialRemoteSegment>, JoinError>) {
659 0 : match res {
660 0 : Ok(new_upload_state) => {
661 0 : self.partial_backup_uploaded = new_upload_state;
662 0 : }
663 0 : Err(e) => {
664 0 : warn!("partial backup task panicked: {:?}", e);
665 : }
666 : }
667 0 : }
668 :
669 : /// Reset partial backup state and remove its remote storage data. Since it
670 : /// might concurrently uploading something, cancel the task first.
671 0 : async fn backup_partial_reset(&mut self) -> anyhow::Result<Vec<String>> {
672 0 : info!("resetting partial backup state");
673 : // Force unevict timeline if it is evicted before erasing partial backup
674 : // state. The intended use of this function is to drop corrupted remote
675 : // state; we haven't enabled local files deletion yet anywhere,
676 : // so direct switch is safe.
677 0 : if self.is_offloaded {
678 0 : self.tli.switch_to_present().await?;
679 : // switch manager state as soon as possible
680 0 : self.is_offloaded = false;
681 0 : }
682 :
683 0 : if let Some((handle, cancel)) = &mut self.partial_backup_task {
684 0 : cancel.cancel();
685 0 : info!("cancelled partial backup task, awaiting it");
686 : // we're going to reset .partial_backup_uploaded to None anyway, so ignore the result
687 0 : handle.await.ok();
688 0 : self.partial_backup_task = None;
689 0 : }
690 :
691 0 : let tli = self.wal_resident_timeline()?;
692 0 : let mut partial_backup = PartialBackup::new(tli, self.conf.clone()).await;
693 : // Reset might fail e.g. when cfile is already reset but s3 removal
694 : // failed, so set manager state to None beforehand. In any case caller
695 : // is expected to retry until success.
696 0 : self.partial_backup_uploaded = None;
697 0 : let res = partial_backup.reset().await?;
698 0 : info!("reset is done");
699 0 : Ok(res)
700 0 : }
701 :
702 : /// Handle message arrived from ManagerCtl.
703 17 : async fn handle_message(&mut self, msg: Option<ManagerCtlMessage>) {
704 17 : debug!("received manager message: {:?}", msg);
705 17 : match msg {
706 10 : Some(ManagerCtlMessage::GuardRequest(tx)) => {
707 10 : if self.is_offloaded {
708 : // trying to unevict timeline, but without gurarantee that it will be successful
709 0 : self.unevict_timeline().await;
710 10 : }
711 :
712 10 : let guard = if self.is_offloaded {
713 0 : Err(anyhow::anyhow!("timeline is offloaded, can't get a guard"))
714 : } else {
715 10 : match self.tli.gate.enter() {
716 10 : Ok(gate_guard) => Ok(self.access_service.create_guard(gate_guard)),
717 0 : Err(_) => Err(anyhow::anyhow!(
718 0 : "timeline is shutting down, can't get a guard"
719 0 : )),
720 : }
721 : };
722 :
723 10 : if tx.send(guard).is_err() {
724 0 : warn!("failed to reply with a guard, receiver dropped");
725 10 : }
726 : }
727 0 : Some(ManagerCtlMessage::TryGuardRequest(tx)) => {
728 0 : let result = if self.is_offloaded {
729 0 : None
730 : } else {
731 0 : match self.tli.gate.enter() {
732 0 : Ok(gate_guard) => Some(self.access_service.create_guard(gate_guard)),
733 0 : Err(_) => None,
734 : }
735 : };
736 :
737 0 : if tx.send(result).is_err() {
738 0 : warn!("failed to reply with a guard, receiver dropped");
739 0 : }
740 : }
741 7 : Some(ManagerCtlMessage::GuardDrop(guard_id)) => {
742 7 : self.access_service.drop_guard(guard_id);
743 7 : }
744 0 : Some(ManagerCtlMessage::BackupPartialReset(tx)) => {
745 0 : info!("resetting uploaded partial backup state");
746 0 : let res = self.backup_partial_reset().await;
747 0 : if let Err(ref e) = res {
748 0 : warn!("failed to reset partial backup state: {:?}", e);
749 0 : }
750 0 : if tx.send(res).is_err() {
751 0 : warn!("failed to send partial backup reset result, receiver dropped");
752 0 : }
753 : }
754 : None => {
755 : // can't happen, we're holding the sender
756 0 : unreachable!();
757 : }
758 : }
759 17 : }
760 : }
761 :
762 : // utility functions
763 37 : async fn sleep_until(option: &Option<tokio::time::Instant>) {
764 34 : if let Some(timeout) = option {
765 8 : tokio::time::sleep_until(*timeout).await;
766 : } else {
767 26 : futures::future::pending::<()>().await;
768 : }
769 1 : }
770 :
771 : /// Future that resolves when the task is finished or never if the task is None.
772 : ///
773 : /// Note: it accepts Option<&mut> instead of &mut Option<> because mapping the
774 : /// option to get the latter is hard.
775 74 : async fn await_task_finish<T>(option: Option<&mut JoinHandle<T>>) -> Result<T, JoinError> {
776 71 : if let Some(task) = option {
777 0 : task.await
778 : } else {
779 71 : futures::future::pending().await
780 : }
781 0 : }
782 :
783 : /// Update next_event if candidate is earlier.
784 9 : fn update_next_event(next_event: &mut Option<Instant>, candidate: Instant) {
785 9 : if let Some(next) = next_event {
786 0 : if candidate < *next {
787 0 : *next = candidate;
788 0 : }
789 9 : } else {
790 9 : *next_event = Some(candidate);
791 9 : }
792 9 : }
793 :
794 : #[repr(usize)]
795 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
796 : pub enum Status {
797 : NotStarted,
798 : Started,
799 : StateSnapshot,
800 : UpdateBackup,
801 : UpdateControlFile,
802 : UpdateWalRemoval,
803 : UpdatePartialBackup,
804 : EvictTimeline,
805 : Wait,
806 : HandleMessage,
807 : Exiting,
808 : Finished,
809 : }
810 :
811 : /// AtomicStatus is a wrapper around AtomicUsize adapted for the Status enum.
812 : pub struct AtomicStatus {
813 : inner: AtomicUsize,
814 : }
815 :
816 : impl Default for AtomicStatus {
817 0 : fn default() -> Self {
818 0 : Self::new()
819 0 : }
820 : }
821 :
822 : impl AtomicStatus {
823 5 : pub fn new() -> Self {
824 5 : AtomicStatus {
825 5 : inner: AtomicUsize::new(Status::NotStarted as usize),
826 5 : }
827 5 : }
828 :
829 10 : pub fn load(&self, order: std::sync::atomic::Ordering) -> Status {
830 10 : // Safety: This line of code uses `std::mem::transmute` to reinterpret the loaded value as `Status`.
831 10 : // It is safe to use `transmute` in this context because `Status` is a repr(usize) enum,
832 10 : // which means it has the same memory layout as usize.
833 10 : // However, it is important to ensure that the loaded value is a valid variant of `Status`,
834 10 : // otherwise, the behavior will be undefined.
835 10 : unsafe { std::mem::transmute(self.inner.load(order)) }
836 10 : }
837 :
838 10 : pub fn get(&self) -> Status {
839 10 : self.load(std::sync::atomic::Ordering::Relaxed)
840 10 : }
841 :
842 244 : pub fn store(&self, val: Status, order: std::sync::atomic::Ordering) {
843 244 : self.inner.store(val as usize, order);
844 244 : }
845 : }
|