Line data Source code
1 : //! The timeline manager task is responsible for managing the timeline's background tasks.
2 : //!
3 : //! It is spawned alongside each timeline and exits when the timeline is deleted.
4 : //! It watches for changes in the timeline state and decides when to spawn or kill background tasks.
5 : //! It also can manage some reactive state, like should the timeline be active for broker pushes or not.
6 : //!
7 : //! Be aware that you need to be extra careful with manager code, because it is not respawned on panic.
8 : //! Also, if it will stuck in some branch, it will prevent any further progress in the timeline.
9 :
10 : use std::{
11 : sync::{atomic::AtomicUsize, Arc},
12 : time::Duration,
13 : };
14 :
15 : use futures::channel::oneshot;
16 : use postgres_ffi::XLogSegNo;
17 : use serde::{Deserialize, Serialize};
18 : use tokio::{
19 : task::{JoinError, JoinHandle},
20 : time::Instant,
21 : };
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::{debug, info, info_span, instrument, warn, Instrument};
24 : use utils::lsn::Lsn;
25 :
26 : use crate::{
27 : control_file::{FileStorage, Storage},
28 : metrics::{
29 : MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS,
30 : NUM_EVICTED_TIMELINES,
31 : },
32 : rate_limit::{rand_duration, RateLimiter},
33 : recovery::recovery_main,
34 : remove_wal::calc_horizon_lsn,
35 : safekeeper::Term,
36 : send_wal::WalSenders,
37 : state::TimelineState,
38 : timeline::{ManagerTimeline, PeerInfo, ReadGuardSharedState, StateSK, WalResidentTimeline},
39 : timeline_guard::{AccessService, GuardId, ResidenceGuard},
40 : timelines_set::{TimelineSetGuard, TimelinesSet},
41 : wal_backup::{self, WalBackupTaskHandle},
42 : wal_backup_partial::{self, PartialBackup, PartialRemoteSegment},
43 : SafeKeeperConf,
44 : };
45 :
46 : pub(crate) struct StateSnapshot {
47 : // inmem values
48 : pub(crate) commit_lsn: Lsn,
49 : pub(crate) backup_lsn: Lsn,
50 : pub(crate) remote_consistent_lsn: Lsn,
51 :
52 : // persistent control file values
53 : pub(crate) cfile_commit_lsn: Lsn,
54 : pub(crate) cfile_remote_consistent_lsn: Lsn,
55 : pub(crate) cfile_backup_lsn: Lsn,
56 :
57 : // latest state
58 : pub(crate) flush_lsn: Lsn,
59 : pub(crate) last_log_term: Term,
60 :
61 : // misc
62 : pub(crate) cfile_last_persist_at: std::time::Instant,
63 : pub(crate) inmem_flush_pending: bool,
64 : pub(crate) wal_removal_on_hold: bool,
65 : pub(crate) peers: Vec<PeerInfo>,
66 : }
67 :
68 : impl StateSnapshot {
69 : /// Create a new snapshot of the timeline state.
70 0 : fn new(read_guard: ReadGuardSharedState, heartbeat_timeout: Duration) -> Self {
71 0 : let state = read_guard.sk.state();
72 0 : Self {
73 0 : commit_lsn: state.inmem.commit_lsn,
74 0 : backup_lsn: state.inmem.backup_lsn,
75 0 : remote_consistent_lsn: state.inmem.remote_consistent_lsn,
76 0 : cfile_commit_lsn: state.commit_lsn,
77 0 : cfile_remote_consistent_lsn: state.remote_consistent_lsn,
78 0 : cfile_backup_lsn: state.backup_lsn,
79 0 : flush_lsn: read_guard.sk.flush_lsn(),
80 0 : last_log_term: read_guard.sk.last_log_term(),
81 0 : cfile_last_persist_at: state.pers.last_persist_at(),
82 0 : inmem_flush_pending: Self::has_unflushed_inmem_state(state),
83 0 : wal_removal_on_hold: read_guard.wal_removal_on_hold,
84 0 : peers: read_guard.get_peers(heartbeat_timeout),
85 0 : }
86 0 : }
87 :
88 0 : fn has_unflushed_inmem_state(state: &TimelineState<FileStorage>) -> bool {
89 0 : state.inmem.commit_lsn > state.commit_lsn
90 0 : || state.inmem.backup_lsn > state.backup_lsn
91 0 : || state.inmem.peer_horizon_lsn > state.peer_horizon_lsn
92 0 : || state.inmem.remote_consistent_lsn > state.remote_consistent_lsn
93 0 : }
94 : }
95 :
96 : /// Control how often the manager task should wake up to check updates.
97 : /// There is no need to check for updates more often than this.
98 : const REFRESH_INTERVAL: Duration = Duration::from_millis(300);
99 :
100 : pub enum ManagerCtlMessage {
101 : /// Request to get a guard for WalResidentTimeline, with WAL files available locally.
102 : GuardRequest(tokio::sync::oneshot::Sender<anyhow::Result<ResidenceGuard>>),
103 : /// Request to drop the guard.
104 : GuardDrop(GuardId),
105 : /// Request to reset uploaded partial backup state.
106 : BackupPartialReset(oneshot::Sender<anyhow::Result<Vec<String>>>),
107 : }
108 :
109 : impl std::fmt::Debug for ManagerCtlMessage {
110 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
111 0 : match self {
112 0 : ManagerCtlMessage::GuardRequest(_) => write!(f, "GuardRequest"),
113 0 : ManagerCtlMessage::GuardDrop(id) => write!(f, "GuardDrop({:?})", id),
114 0 : ManagerCtlMessage::BackupPartialReset(_) => write!(f, "BackupPartialReset"),
115 : }
116 0 : }
117 : }
118 :
119 : pub struct ManagerCtl {
120 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
121 :
122 : // this is used to initialize manager, it will be moved out in bootstrap().
123 : init_manager_rx:
124 : std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>>>,
125 : }
126 :
127 : impl Default for ManagerCtl {
128 0 : fn default() -> Self {
129 0 : Self::new()
130 0 : }
131 : }
132 :
133 : impl ManagerCtl {
134 0 : pub fn new() -> Self {
135 0 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
136 0 : Self {
137 0 : manager_tx: tx,
138 0 : init_manager_rx: std::sync::Mutex::new(Some(rx)),
139 0 : }
140 0 : }
141 :
142 : /// Issue a new guard and wait for manager to prepare the timeline.
143 : /// Sends a message to the manager and waits for the response.
144 : /// Can be blocked indefinitely if the manager is stuck.
145 0 : pub async fn wal_residence_guard(&self) -> anyhow::Result<ResidenceGuard> {
146 0 : let (tx, rx) = tokio::sync::oneshot::channel();
147 0 : self.manager_tx.send(ManagerCtlMessage::GuardRequest(tx))?;
148 :
149 : // wait for the manager to respond with the guard
150 0 : rx.await
151 0 : .map_err(|e| anyhow::anyhow!("response read fail: {:?}", e))
152 0 : .and_then(std::convert::identity)
153 0 : }
154 :
155 : /// Request timeline manager to reset uploaded partial segment state and
156 : /// wait for the result.
157 0 : pub async fn backup_partial_reset(&self) -> anyhow::Result<Vec<String>> {
158 0 : let (tx, rx) = oneshot::channel();
159 0 : self.manager_tx
160 0 : .send(ManagerCtlMessage::BackupPartialReset(tx))
161 0 : .expect("manager task is not running");
162 0 : match rx.await {
163 0 : Ok(res) => res,
164 0 : Err(_) => anyhow::bail!("timeline manager is gone"),
165 : }
166 0 : }
167 :
168 : /// Must be called exactly once to bootstrap the manager.
169 0 : pub fn bootstrap_manager(
170 0 : &self,
171 0 : ) -> (
172 0 : tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
173 0 : tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
174 0 : ) {
175 0 : let rx = self
176 0 : .init_manager_rx
177 0 : .lock()
178 0 : .expect("mutex init_manager_rx poisoned")
179 0 : .take()
180 0 : .expect("manager already bootstrapped");
181 0 :
182 0 : (self.manager_tx.clone(), rx)
183 0 : }
184 : }
185 :
186 : pub(crate) struct Manager {
187 : // configuration & dependencies
188 : pub(crate) tli: ManagerTimeline,
189 : pub(crate) conf: SafeKeeperConf,
190 : pub(crate) wal_seg_size: usize,
191 : pub(crate) walsenders: Arc<WalSenders>,
192 :
193 : // current state
194 : pub(crate) state_version_rx: tokio::sync::watch::Receiver<usize>,
195 : pub(crate) num_computes_rx: tokio::sync::watch::Receiver<usize>,
196 : pub(crate) tli_broker_active: TimelineSetGuard,
197 : pub(crate) last_removed_segno: XLogSegNo,
198 : pub(crate) is_offloaded: bool,
199 :
200 : // background tasks
201 : pub(crate) backup_task: Option<WalBackupTaskHandle>,
202 : pub(crate) recovery_task: Option<JoinHandle<()>>,
203 : pub(crate) wal_removal_task: Option<JoinHandle<anyhow::Result<u64>>>,
204 :
205 : // partial backup
206 : pub(crate) partial_backup_task:
207 : Option<(JoinHandle<Option<PartialRemoteSegment>>, CancellationToken)>,
208 : pub(crate) partial_backup_uploaded: Option<PartialRemoteSegment>,
209 :
210 : // misc
211 : pub(crate) access_service: AccessService,
212 : pub(crate) global_rate_limiter: RateLimiter,
213 :
214 : // Anti-flapping state: we evict timelines eagerly if they are inactive, but should not
215 : // evict them if they go inactive very soon after being restored.
216 : pub(crate) evict_not_before: Instant,
217 : }
218 :
219 : /// This task gets spawned alongside each timeline and is responsible for managing the timeline's
220 : /// background tasks.
221 : /// Be careful, this task is not respawned on panic, so it should not panic.
222 0 : #[instrument(name = "manager", skip_all, fields(ttid = %tli.ttid))]
223 : pub async fn main_task(
224 : tli: ManagerTimeline,
225 : conf: SafeKeeperConf,
226 : broker_active_set: Arc<TimelinesSet>,
227 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
228 : mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
229 : global_rate_limiter: RateLimiter,
230 : ) {
231 : tli.set_status(Status::Started);
232 :
233 : let defer_tli = tli.tli.clone();
234 : scopeguard::defer! {
235 : if defer_tli.is_cancelled() {
236 : info!("manager task finished");
237 : } else {
238 : warn!("manager task finished prematurely");
239 : }
240 : };
241 :
242 : let mut mgr = Manager::new(
243 : tli,
244 : conf,
245 : broker_active_set,
246 : manager_tx,
247 : global_rate_limiter,
248 : )
249 : .await;
250 :
251 : // Start recovery task which always runs on the timeline.
252 : if !mgr.is_offloaded && mgr.conf.peer_recovery_enabled {
253 : let tli = mgr.wal_resident_timeline();
254 : mgr.recovery_task = Some(tokio::spawn(recovery_main(tli, mgr.conf.clone())));
255 : }
256 :
257 : // If timeline is evicted, reflect that in the metric.
258 : if mgr.is_offloaded {
259 : NUM_EVICTED_TIMELINES.inc();
260 : }
261 :
262 : let last_state = 'outer: loop {
263 : MANAGER_ITERATIONS_TOTAL.inc();
264 :
265 : mgr.set_status(Status::StateSnapshot);
266 : let state_snapshot = mgr.state_snapshot().await;
267 :
268 : let mut next_event: Option<Instant> = None;
269 : if !mgr.is_offloaded {
270 : let num_computes = *mgr.num_computes_rx.borrow();
271 :
272 : mgr.set_status(Status::UpdateBackup);
273 : let is_wal_backup_required = mgr.update_backup(num_computes, &state_snapshot).await;
274 : mgr.update_is_active(is_wal_backup_required, num_computes, &state_snapshot);
275 :
276 : mgr.set_status(Status::UpdateControlFile);
277 : mgr.update_control_file_save(&state_snapshot, &mut next_event)
278 : .await;
279 :
280 : mgr.set_status(Status::UpdateWalRemoval);
281 : mgr.update_wal_removal(&state_snapshot).await;
282 :
283 : mgr.set_status(Status::UpdatePartialBackup);
284 : mgr.update_partial_backup(&state_snapshot).await;
285 :
286 : let now = Instant::now();
287 : if mgr.evict_not_before > now {
288 : // we should wait until evict_not_before
289 : update_next_event(&mut next_event, mgr.evict_not_before);
290 : }
291 :
292 : if mgr.conf.enable_offload
293 : && mgr.evict_not_before <= now
294 : && mgr.ready_for_eviction(&next_event, &state_snapshot)
295 : {
296 : // check rate limiter and evict timeline if possible
297 : match mgr.global_rate_limiter.try_acquire_eviction() {
298 : Some(_permit) => {
299 : mgr.set_status(Status::EvictTimeline);
300 : mgr.evict_timeline().await;
301 : }
302 : None => {
303 : // we can't evict timeline now, will try again later
304 : mgr.evict_not_before =
305 : Instant::now() + rand_duration(&mgr.conf.eviction_min_resident);
306 : update_next_event(&mut next_event, mgr.evict_not_before);
307 : }
308 : }
309 : }
310 : }
311 :
312 : mgr.set_status(Status::Wait);
313 : // wait until something changes. tx channels are stored under Arc, so they will not be
314 : // dropped until the manager task is finished.
315 : tokio::select! {
316 : _ = mgr.tli.cancel.cancelled() => {
317 : // timeline was deleted
318 : break 'outer state_snapshot;
319 : }
320 0 : _ = async {
321 0 : // don't wake up on every state change, but at most every REFRESH_INTERVAL
322 0 : tokio::time::sleep(REFRESH_INTERVAL).await;
323 0 : let _ = mgr.state_version_rx.changed().await;
324 0 : } => {
325 : // state was updated
326 : }
327 : _ = mgr.num_computes_rx.changed() => {
328 : // number of connected computes was updated
329 : }
330 : _ = sleep_until(&next_event) => {
331 : // we were waiting for some event (e.g. cfile save)
332 : }
333 : res = await_task_finish(mgr.wal_removal_task.as_mut()) => {
334 : // WAL removal task finished
335 : mgr.wal_removal_task = None;
336 : mgr.update_wal_removal_end(res);
337 : }
338 0 : res = await_task_finish(mgr.partial_backup_task.as_mut().map(|(handle, _)| handle)) => {
339 : // partial backup task finished
340 : mgr.partial_backup_task = None;
341 : mgr.update_partial_backup_end(res);
342 : }
343 :
344 : msg = manager_rx.recv() => {
345 : mgr.set_status(Status::HandleMessage);
346 : mgr.handle_message(msg).await;
347 : }
348 : }
349 : };
350 : mgr.set_status(Status::Exiting);
351 :
352 : // remove timeline from the broker active set sooner, before waiting for background tasks
353 : mgr.tli_broker_active.set(false);
354 :
355 : // shutdown background tasks
356 : if mgr.conf.is_wal_backup_enabled() {
357 : wal_backup::update_task(&mut mgr, false, &last_state).await;
358 : }
359 :
360 : if let Some(recovery_task) = &mut mgr.recovery_task {
361 : if let Err(e) = recovery_task.await {
362 : warn!("recovery task failed: {:?}", e);
363 : }
364 : }
365 :
366 : if let Some((handle, cancel)) = &mut mgr.partial_backup_task {
367 : cancel.cancel();
368 : if let Err(e) = handle.await {
369 : warn!("partial backup task failed: {:?}", e);
370 : }
371 : }
372 :
373 : if let Some(wal_removal_task) = &mut mgr.wal_removal_task {
374 : let res = wal_removal_task.await;
375 : mgr.update_wal_removal_end(res);
376 : }
377 :
378 : // If timeline is deleted while evicted decrement the gauge.
379 : if mgr.tli.is_cancelled() && mgr.is_offloaded {
380 : NUM_EVICTED_TIMELINES.dec();
381 : }
382 :
383 : mgr.set_status(Status::Finished);
384 : }
385 :
386 : impl Manager {
387 0 : async fn new(
388 0 : tli: ManagerTimeline,
389 0 : conf: SafeKeeperConf,
390 0 : broker_active_set: Arc<TimelinesSet>,
391 0 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
392 0 : global_rate_limiter: RateLimiter,
393 0 : ) -> Manager {
394 0 : let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
395 : Manager {
396 0 : wal_seg_size: tli.get_wal_seg_size().await,
397 0 : walsenders: tli.get_walsenders().clone(),
398 0 : state_version_rx: tli.get_state_version_rx(),
399 0 : num_computes_rx: tli.get_walreceivers().get_num_rx(),
400 0 : tli_broker_active: broker_active_set.guard(tli.clone()),
401 0 : last_removed_segno: 0,
402 0 : is_offloaded,
403 0 : backup_task: None,
404 0 : recovery_task: None,
405 0 : wal_removal_task: None,
406 0 : partial_backup_task: None,
407 0 : partial_backup_uploaded,
408 0 : access_service: AccessService::new(manager_tx),
409 0 : tli,
410 0 : global_rate_limiter,
411 0 : // to smooth out evictions spike after restart
412 0 : evict_not_before: Instant::now() + rand_duration(&conf.eviction_min_resident),
413 0 : conf,
414 0 : }
415 0 : }
416 :
417 0 : fn set_status(&self, status: Status) {
418 0 : self.tli.set_status(status);
419 0 : }
420 :
421 : /// Get a WalResidentTimeline.
422 : /// Manager code must use this function instead of one from `Timeline`
423 : /// directly, because it will deadlock.
424 0 : pub(crate) fn wal_resident_timeline(&mut self) -> WalResidentTimeline {
425 0 : assert!(!self.is_offloaded);
426 0 : let guard = self.access_service.create_guard();
427 0 : WalResidentTimeline::new(self.tli.clone(), guard)
428 0 : }
429 :
430 : /// Get a snapshot of the timeline state.
431 0 : async fn state_snapshot(&self) -> StateSnapshot {
432 0 : let _timer = MISC_OPERATION_SECONDS
433 0 : .with_label_values(&["state_snapshot"])
434 0 : .start_timer();
435 0 :
436 0 : StateSnapshot::new(
437 0 : self.tli.read_shared_state().await,
438 0 : self.conf.heartbeat_timeout,
439 0 : )
440 0 : }
441 :
442 : /// Spawns/kills backup task and returns true if backup is required.
443 0 : async fn update_backup(&mut self, num_computes: usize, state: &StateSnapshot) -> bool {
444 0 : let is_wal_backup_required =
445 0 : wal_backup::is_wal_backup_required(self.wal_seg_size, num_computes, state);
446 0 :
447 0 : if self.conf.is_wal_backup_enabled() {
448 0 : wal_backup::update_task(self, is_wal_backup_required, state).await;
449 0 : }
450 :
451 : // update the state in Arc<Timeline>
452 0 : self.tli.wal_backup_active.store(
453 0 : self.backup_task.is_some(),
454 0 : std::sync::atomic::Ordering::Relaxed,
455 0 : );
456 0 : is_wal_backup_required
457 0 : }
458 :
459 : /// Update is_active flag and returns its value.
460 0 : fn update_is_active(
461 0 : &mut self,
462 0 : is_wal_backup_required: bool,
463 0 : num_computes: usize,
464 0 : state: &StateSnapshot,
465 0 : ) {
466 0 : let is_active = is_wal_backup_required
467 0 : || num_computes > 0
468 0 : || state.remote_consistent_lsn < state.commit_lsn;
469 :
470 : // update the broker timeline set
471 0 : if self.tli_broker_active.set(is_active) {
472 : // write log if state has changed
473 0 : info!(
474 0 : "timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
475 : is_active, state.remote_consistent_lsn, state.commit_lsn,
476 : );
477 :
478 0 : MANAGER_ACTIVE_CHANGES.inc();
479 0 : }
480 :
481 : // update the state in Arc<Timeline>
482 0 : self.tli
483 0 : .broker_active
484 0 : .store(is_active, std::sync::atomic::Ordering::Relaxed);
485 0 : }
486 :
487 : /// Save control file if needed. Returns Instant if we should persist the control file in the future.
488 0 : async fn update_control_file_save(
489 0 : &self,
490 0 : state: &StateSnapshot,
491 0 : next_event: &mut Option<Instant>,
492 0 : ) {
493 0 : if !state.inmem_flush_pending {
494 0 : return;
495 0 : }
496 0 :
497 0 : if state.cfile_last_persist_at.elapsed() > self.conf.control_file_save_interval {
498 0 : let mut write_guard = self.tli.write_shared_state().await;
499 : // it should be done in the background because it blocks manager task, but flush() should
500 : // be fast enough not to be a problem now
501 0 : if let Err(e) = write_guard.sk.state_mut().flush().await {
502 0 : warn!("failed to save control file: {:?}", e);
503 0 : }
504 0 : } else {
505 0 : // we should wait until some time passed until the next save
506 0 : update_next_event(
507 0 : next_event,
508 0 : (state.cfile_last_persist_at + self.conf.control_file_save_interval).into(),
509 0 : );
510 0 : }
511 0 : }
512 :
513 : /// Spawns WAL removal task if needed.
514 0 : async fn update_wal_removal(&mut self, state: &StateSnapshot) {
515 0 : if self.wal_removal_task.is_some() || state.wal_removal_on_hold {
516 : // WAL removal is already in progress or hold off
517 0 : return;
518 0 : }
519 :
520 : // If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
521 : // This allows to get better read speed for pageservers that are lagging behind,
522 : // at the cost of keeping more WAL on disk.
523 0 : let replication_horizon_lsn = if self.conf.walsenders_keep_horizon {
524 0 : self.walsenders.laggard_lsn()
525 : } else {
526 0 : None
527 : };
528 :
529 0 : let removal_horizon_lsn = calc_horizon_lsn(state, replication_horizon_lsn);
530 0 : let removal_horizon_segno = removal_horizon_lsn
531 0 : .segment_number(self.wal_seg_size)
532 0 : .saturating_sub(1);
533 0 :
534 0 : if removal_horizon_segno > self.last_removed_segno {
535 : // we need to remove WAL
536 0 : let remover = match self.tli.read_shared_state().await.sk {
537 0 : StateSK::Loaded(ref sk) => {
538 0 : crate::wal_storage::Storage::remove_up_to(&sk.wal_store, removal_horizon_segno)
539 : }
540 : StateSK::Offloaded(_) => {
541 : // we can't remove WAL if it's not loaded
542 0 : warn!("unexpectedly trying to run WAL removal on offloaded timeline");
543 0 : return;
544 : }
545 0 : StateSK::Empty => unreachable!(),
546 : };
547 :
548 0 : self.wal_removal_task = Some(tokio::spawn(
549 0 : async move {
550 0 : remover.await?;
551 0 : Ok(removal_horizon_segno)
552 0 : }
553 0 : .instrument(info_span!("WAL removal", ttid=%self.tli.ttid)),
554 : ));
555 0 : }
556 0 : }
557 :
558 : /// Update the state after WAL removal task finished.
559 0 : fn update_wal_removal_end(&mut self, res: Result<anyhow::Result<u64>, JoinError>) {
560 0 : let new_last_removed_segno = match res {
561 0 : Ok(Ok(segno)) => segno,
562 0 : Err(e) => {
563 0 : warn!("WAL removal task failed: {:?}", e);
564 0 : return;
565 : }
566 0 : Ok(Err(e)) => {
567 0 : warn!("WAL removal task failed: {:?}", e);
568 0 : return;
569 : }
570 : };
571 :
572 0 : self.last_removed_segno = new_last_removed_segno;
573 0 : // update the state in Arc<Timeline>
574 0 : self.tli
575 0 : .last_removed_segno
576 0 : .store(new_last_removed_segno, std::sync::atomic::Ordering::Relaxed);
577 0 : }
578 :
579 : /// Spawns partial WAL backup task if needed.
580 0 : async fn update_partial_backup(&mut self, state: &StateSnapshot) {
581 0 : // check if WAL backup is enabled and should be started
582 0 : if !self.conf.is_wal_backup_enabled() {
583 0 : return;
584 0 : }
585 0 :
586 0 : if self.partial_backup_task.is_some() {
587 : // partial backup is already running
588 0 : return;
589 0 : }
590 0 :
591 0 : if !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded) {
592 : // nothing to upload
593 0 : return;
594 0 : }
595 0 :
596 0 : // Get WalResidentTimeline and start partial backup task.
597 0 : let cancel = CancellationToken::new();
598 0 : let handle = tokio::spawn(wal_backup_partial::main_task(
599 0 : self.wal_resident_timeline(),
600 0 : self.conf.clone(),
601 0 : self.global_rate_limiter.clone(),
602 0 : cancel.clone(),
603 0 : ));
604 0 : self.partial_backup_task = Some((handle, cancel));
605 0 : }
606 :
607 : /// Update the state after partial WAL backup task finished.
608 0 : fn update_partial_backup_end(&mut self, res: Result<Option<PartialRemoteSegment>, JoinError>) {
609 0 : match res {
610 0 : Ok(new_upload_state) => {
611 0 : self.partial_backup_uploaded = new_upload_state;
612 0 : }
613 0 : Err(e) => {
614 0 : warn!("partial backup task panicked: {:?}", e);
615 : }
616 : }
617 0 : }
618 :
619 : /// Reset partial backup state and remove its remote storage data. Since it
620 : /// might concurrently uploading something, cancel the task first.
621 0 : async fn backup_partial_reset(&mut self) -> anyhow::Result<Vec<String>> {
622 0 : info!("resetting partial backup state");
623 : // Force unevict timeline if it is evicted before erasing partial backup
624 : // state. The intended use of this function is to drop corrupted remote
625 : // state; we haven't enabled local files deletion yet anywhere,
626 : // so direct switch is safe.
627 0 : if self.is_offloaded {
628 0 : self.tli.switch_to_present().await?;
629 : // switch manager state as soon as possible
630 0 : self.is_offloaded = false;
631 0 : }
632 :
633 0 : if let Some((handle, cancel)) = &mut self.partial_backup_task {
634 0 : cancel.cancel();
635 0 : info!("cancelled partial backup task, awaiting it");
636 : // we're going to reset .partial_backup_uploaded to None anyway, so ignore the result
637 0 : handle.await.ok();
638 0 : self.partial_backup_task = None;
639 0 : }
640 :
641 0 : let tli = self.wal_resident_timeline();
642 0 : let mut partial_backup = PartialBackup::new(tli, self.conf.clone()).await;
643 : // Reset might fail e.g. when cfile is already reset but s3 removal
644 : // failed, so set manager state to None beforehand. In any case caller
645 : // is expected to retry until success.
646 0 : self.partial_backup_uploaded = None;
647 0 : let res = partial_backup.reset().await?;
648 0 : info!("reset is done");
649 0 : Ok(res)
650 0 : }
651 :
652 : /// Handle message arrived from ManagerCtl.
653 0 : async fn handle_message(&mut self, msg: Option<ManagerCtlMessage>) {
654 0 : debug!("received manager message: {:?}", msg);
655 0 : match msg {
656 0 : Some(ManagerCtlMessage::GuardRequest(tx)) => {
657 0 : if self.is_offloaded {
658 : // trying to unevict timeline, but without gurarantee that it will be successful
659 0 : self.unevict_timeline().await;
660 0 : }
661 :
662 0 : let guard = if self.is_offloaded {
663 0 : Err(anyhow::anyhow!("timeline is offloaded, can't get a guard"))
664 : } else {
665 0 : Ok(self.access_service.create_guard())
666 : };
667 :
668 0 : if tx.send(guard).is_err() {
669 0 : warn!("failed to reply with a guard, receiver dropped");
670 0 : }
671 : }
672 0 : Some(ManagerCtlMessage::GuardDrop(guard_id)) => {
673 0 : self.access_service.drop_guard(guard_id);
674 0 : }
675 0 : Some(ManagerCtlMessage::BackupPartialReset(tx)) => {
676 0 : info!("resetting uploaded partial backup state");
677 0 : let res = self.backup_partial_reset().await;
678 0 : if let Err(ref e) = res {
679 0 : warn!("failed to reset partial backup state: {:?}", e);
680 0 : }
681 0 : if tx.send(res).is_err() {
682 0 : warn!("failed to send partial backup reset result, receiver dropped");
683 0 : }
684 : }
685 : None => {
686 : // can't happen, we're holding the sender
687 0 : unreachable!();
688 : }
689 : }
690 0 : }
691 : }
692 :
693 : // utility functions
694 0 : async fn sleep_until(option: &Option<tokio::time::Instant>) {
695 0 : if let Some(timeout) = option {
696 0 : tokio::time::sleep_until(*timeout).await;
697 : } else {
698 0 : futures::future::pending::<()>().await;
699 : }
700 0 : }
701 :
702 : /// Future that resolves when the task is finished or never if the task is None.
703 : ///
704 : /// Note: it accepts Option<&mut> instead of &mut Option<> because mapping the
705 : /// option to get the latter is hard.
706 0 : async fn await_task_finish<T>(option: Option<&mut JoinHandle<T>>) -> Result<T, JoinError> {
707 0 : if let Some(task) = option {
708 0 : task.await
709 : } else {
710 0 : futures::future::pending().await
711 : }
712 0 : }
713 :
714 : /// Update next_event if candidate is earlier.
715 0 : fn update_next_event(next_event: &mut Option<Instant>, candidate: Instant) {
716 0 : if let Some(next) = next_event {
717 0 : if candidate < *next {
718 0 : *next = candidate;
719 0 : }
720 0 : } else {
721 0 : *next_event = Some(candidate);
722 0 : }
723 0 : }
724 :
725 : #[repr(usize)]
726 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
727 : pub enum Status {
728 : NotStarted,
729 : Started,
730 : StateSnapshot,
731 : UpdateBackup,
732 : UpdateControlFile,
733 : UpdateWalRemoval,
734 : UpdatePartialBackup,
735 : EvictTimeline,
736 : Wait,
737 : HandleMessage,
738 : Exiting,
739 : Finished,
740 : }
741 :
742 : /// AtomicStatus is a wrapper around AtomicUsize adapted for the Status enum.
743 : pub struct AtomicStatus {
744 : inner: AtomicUsize,
745 : }
746 :
747 : impl Default for AtomicStatus {
748 0 : fn default() -> Self {
749 0 : Self::new()
750 0 : }
751 : }
752 :
753 : impl AtomicStatus {
754 0 : pub fn new() -> Self {
755 0 : AtomicStatus {
756 0 : inner: AtomicUsize::new(Status::NotStarted as usize),
757 0 : }
758 0 : }
759 :
760 0 : pub fn load(&self, order: std::sync::atomic::Ordering) -> Status {
761 0 : // Safety: This line of code uses `std::mem::transmute` to reinterpret the loaded value as `Status`.
762 0 : // It is safe to use `transmute` in this context because `Status` is a repr(usize) enum,
763 0 : // which means it has the same memory layout as usize.
764 0 : // However, it is important to ensure that the loaded value is a valid variant of `Status`,
765 0 : // otherwise, the behavior will be undefined.
766 0 : unsafe { std::mem::transmute(self.inner.load(order)) }
767 0 : }
768 :
769 0 : pub fn get(&self) -> Status {
770 0 : self.load(std::sync::atomic::Ordering::Relaxed)
771 0 : }
772 :
773 0 : pub fn store(&self, val: Status, order: std::sync::atomic::Ordering) {
774 0 : self.inner.store(val as usize, order);
775 0 : }
776 : }
|