Line data Source code
1 : //! The timeline manager task is responsible for managing the timeline's background tasks.
2 : //! It is spawned alongside each timeline and exits when the timeline is deleted.
3 : //! It watches for changes in the timeline state and decides when to spawn or kill background tasks.
4 : //! It also can manage some reactive state, like should the timeline be active for broker pushes or not.
5 : //!
6 : //! Be aware that you need to be extra careful with manager code, because it is not respawned on panic.
7 : //! Also, if it will stuck in some branch, it will prevent any further progress in the timeline.
8 :
9 : use std::{
10 : sync::{atomic::AtomicUsize, Arc},
11 : time::Duration,
12 : };
13 :
14 : use postgres_ffi::XLogSegNo;
15 : use serde::{Deserialize, Serialize};
16 : use tokio::{
17 : task::{JoinError, JoinHandle},
18 : time::Instant,
19 : };
20 : use tracing::{debug, info, info_span, instrument, warn, Instrument};
21 : use utils::lsn::Lsn;
22 :
23 : use crate::{
24 : control_file::{FileStorage, Storage},
25 : metrics::{MANAGER_ACTIVE_CHANGES, MANAGER_ITERATIONS_TOTAL, MISC_OPERATION_SECONDS},
26 : rate_limit::{rand_duration, RateLimiter},
27 : recovery::recovery_main,
28 : remove_wal::calc_horizon_lsn,
29 : safekeeper::Term,
30 : send_wal::WalSenders,
31 : state::TimelineState,
32 : timeline::{ManagerTimeline, PeerInfo, ReadGuardSharedState, StateSK, WalResidentTimeline},
33 : timeline_guard::{AccessService, GuardId, ResidenceGuard},
34 : timelines_set::{TimelineSetGuard, TimelinesSet},
35 : wal_backup::{self, WalBackupTaskHandle},
36 : wal_backup_partial::{self, PartialRemoteSegment},
37 : SafeKeeperConf,
38 : };
39 :
40 : pub(crate) struct StateSnapshot {
41 : // inmem values
42 : pub(crate) commit_lsn: Lsn,
43 : pub(crate) backup_lsn: Lsn,
44 : pub(crate) remote_consistent_lsn: Lsn,
45 :
46 : // persistent control file values
47 : pub(crate) cfile_peer_horizon_lsn: Lsn,
48 : pub(crate) cfile_remote_consistent_lsn: Lsn,
49 : pub(crate) cfile_backup_lsn: Lsn,
50 :
51 : // latest state
52 : pub(crate) flush_lsn: Lsn,
53 : pub(crate) last_log_term: Term,
54 :
55 : // misc
56 : pub(crate) cfile_last_persist_at: std::time::Instant,
57 : pub(crate) inmem_flush_pending: bool,
58 : pub(crate) wal_removal_on_hold: bool,
59 : pub(crate) peers: Vec<PeerInfo>,
60 : }
61 :
62 : impl StateSnapshot {
63 : /// Create a new snapshot of the timeline state.
64 0 : fn new(read_guard: ReadGuardSharedState, heartbeat_timeout: Duration) -> Self {
65 0 : let state = read_guard.sk.state();
66 0 : Self {
67 0 : commit_lsn: state.inmem.commit_lsn,
68 0 : backup_lsn: state.inmem.backup_lsn,
69 0 : remote_consistent_lsn: state.inmem.remote_consistent_lsn,
70 0 : cfile_peer_horizon_lsn: state.peer_horizon_lsn,
71 0 : cfile_remote_consistent_lsn: state.remote_consistent_lsn,
72 0 : cfile_backup_lsn: state.backup_lsn,
73 0 : flush_lsn: read_guard.sk.flush_lsn(),
74 0 : last_log_term: read_guard.sk.last_log_term(),
75 0 : cfile_last_persist_at: state.pers.last_persist_at(),
76 0 : inmem_flush_pending: Self::has_unflushed_inmem_state(state),
77 0 : wal_removal_on_hold: read_guard.wal_removal_on_hold,
78 0 : peers: read_guard.get_peers(heartbeat_timeout),
79 0 : }
80 0 : }
81 :
82 0 : fn has_unflushed_inmem_state(state: &TimelineState<FileStorage>) -> bool {
83 0 : state.inmem.commit_lsn > state.commit_lsn
84 0 : || state.inmem.backup_lsn > state.backup_lsn
85 0 : || state.inmem.peer_horizon_lsn > state.peer_horizon_lsn
86 0 : || state.inmem.remote_consistent_lsn > state.remote_consistent_lsn
87 0 : }
88 : }
89 :
90 : /// Control how often the manager task should wake up to check updates.
91 : /// There is no need to check for updates more often than this.
92 : const REFRESH_INTERVAL: Duration = Duration::from_millis(300);
93 :
94 : pub enum ManagerCtlMessage {
95 : /// Request to get a guard for WalResidentTimeline, with WAL files available locally.
96 : GuardRequest(tokio::sync::oneshot::Sender<anyhow::Result<ResidenceGuard>>),
97 : /// Request to drop the guard.
98 : GuardDrop(GuardId),
99 : }
100 :
101 : impl std::fmt::Debug for ManagerCtlMessage {
102 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 0 : match self {
104 0 : ManagerCtlMessage::GuardRequest(_) => write!(f, "GuardRequest"),
105 0 : ManagerCtlMessage::GuardDrop(id) => write!(f, "GuardDrop({:?})", id),
106 : }
107 0 : }
108 : }
109 :
110 : pub struct ManagerCtl {
111 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
112 :
113 : // this is used to initialize manager, it will be moved out in bootstrap().
114 : init_manager_rx:
115 : std::sync::Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>>>,
116 : }
117 :
118 : impl Default for ManagerCtl {
119 0 : fn default() -> Self {
120 0 : Self::new()
121 0 : }
122 : }
123 :
124 : impl ManagerCtl {
125 0 : pub fn new() -> Self {
126 0 : let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
127 0 : Self {
128 0 : manager_tx: tx,
129 0 : init_manager_rx: std::sync::Mutex::new(Some(rx)),
130 0 : }
131 0 : }
132 :
133 : /// Issue a new guard and wait for manager to prepare the timeline.
134 : /// Sends a message to the manager and waits for the response.
135 : /// Can be blocked indefinitely if the manager is stuck.
136 0 : pub async fn wal_residence_guard(&self) -> anyhow::Result<ResidenceGuard> {
137 0 : let (tx, rx) = tokio::sync::oneshot::channel();
138 0 : self.manager_tx.send(ManagerCtlMessage::GuardRequest(tx))?;
139 :
140 : // wait for the manager to respond with the guard
141 0 : rx.await
142 0 : .map_err(|e| anyhow::anyhow!("response read fail: {:?}", e))
143 0 : .and_then(std::convert::identity)
144 0 : }
145 :
146 : /// Must be called exactly once to bootstrap the manager.
147 0 : pub fn bootstrap_manager(
148 0 : &self,
149 0 : ) -> (
150 0 : tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
151 0 : tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
152 0 : ) {
153 0 : let rx = self
154 0 : .init_manager_rx
155 0 : .lock()
156 0 : .expect("mutex init_manager_rx poisoned")
157 0 : .take()
158 0 : .expect("manager already bootstrapped");
159 0 :
160 0 : (self.manager_tx.clone(), rx)
161 0 : }
162 : }
163 :
164 : pub(crate) struct Manager {
165 : // configuration & dependencies
166 : pub(crate) tli: ManagerTimeline,
167 : pub(crate) conf: SafeKeeperConf,
168 : pub(crate) wal_seg_size: usize,
169 : pub(crate) walsenders: Arc<WalSenders>,
170 :
171 : // current state
172 : pub(crate) state_version_rx: tokio::sync::watch::Receiver<usize>,
173 : pub(crate) num_computes_rx: tokio::sync::watch::Receiver<usize>,
174 : pub(crate) tli_broker_active: TimelineSetGuard,
175 : pub(crate) last_removed_segno: XLogSegNo,
176 : pub(crate) is_offloaded: bool,
177 :
178 : // background tasks
179 : pub(crate) backup_task: Option<WalBackupTaskHandle>,
180 : pub(crate) recovery_task: Option<JoinHandle<()>>,
181 : pub(crate) wal_removal_task: Option<JoinHandle<anyhow::Result<u64>>>,
182 :
183 : // partial backup
184 : pub(crate) partial_backup_task: Option<JoinHandle<Option<PartialRemoteSegment>>>,
185 : pub(crate) partial_backup_uploaded: Option<PartialRemoteSegment>,
186 :
187 : // misc
188 : pub(crate) access_service: AccessService,
189 : pub(crate) global_rate_limiter: RateLimiter,
190 :
191 : // Anti-flapping state: we evict timelines eagerly if they are inactive, but should not
192 : // evict them if they go inactive very soon after being restored.
193 : pub(crate) evict_not_before: Instant,
194 : }
195 :
196 : /// This task gets spawned alongside each timeline and is responsible for managing the timeline's
197 : /// background tasks.
198 : /// Be careful, this task is not respawned on panic, so it should not panic.
199 0 : #[instrument(name = "manager", skip_all, fields(ttid = %tli.ttid))]
200 : pub async fn main_task(
201 : tli: ManagerTimeline,
202 : conf: SafeKeeperConf,
203 : broker_active_set: Arc<TimelinesSet>,
204 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
205 : mut manager_rx: tokio::sync::mpsc::UnboundedReceiver<ManagerCtlMessage>,
206 : global_rate_limiter: RateLimiter,
207 : ) {
208 : tli.set_status(Status::Started);
209 :
210 : let defer_tli = tli.tli.clone();
211 : scopeguard::defer! {
212 : if defer_tli.is_cancelled() {
213 : info!("manager task finished");
214 : } else {
215 : warn!("manager task finished prematurely");
216 : }
217 : };
218 :
219 : let mut mgr = Manager::new(
220 : tli,
221 : conf,
222 : broker_active_set,
223 : manager_tx,
224 : global_rate_limiter,
225 : )
226 : .await;
227 :
228 : // Start recovery task which always runs on the timeline.
229 : if !mgr.is_offloaded && mgr.conf.peer_recovery_enabled {
230 : let tli = mgr.wal_resident_timeline();
231 : mgr.recovery_task = Some(tokio::spawn(recovery_main(tli, mgr.conf.clone())));
232 : }
233 :
234 : let last_state = 'outer: loop {
235 : MANAGER_ITERATIONS_TOTAL.inc();
236 :
237 : mgr.set_status(Status::StateSnapshot);
238 : let state_snapshot = mgr.state_snapshot().await;
239 :
240 : let mut next_event: Option<Instant> = None;
241 : if !mgr.is_offloaded {
242 : let num_computes = *mgr.num_computes_rx.borrow();
243 :
244 : mgr.set_status(Status::UpdateBackup);
245 : let is_wal_backup_required = mgr.update_backup(num_computes, &state_snapshot).await;
246 : mgr.update_is_active(is_wal_backup_required, num_computes, &state_snapshot);
247 :
248 : mgr.set_status(Status::UpdateControlFile);
249 : mgr.update_control_file_save(&state_snapshot, &mut next_event)
250 : .await;
251 :
252 : mgr.set_status(Status::UpdateWalRemoval);
253 : mgr.update_wal_removal(&state_snapshot).await;
254 :
255 : mgr.set_status(Status::UpdatePartialBackup);
256 : mgr.update_partial_backup(&state_snapshot).await;
257 :
258 : let now = Instant::now();
259 : if mgr.evict_not_before > now {
260 : // we should wait until evict_not_before
261 : update_next_event(&mut next_event, mgr.evict_not_before);
262 : }
263 :
264 : if mgr.conf.enable_offload
265 : && mgr.evict_not_before <= now
266 : && mgr.ready_for_eviction(&next_event, &state_snapshot)
267 : {
268 : // check rate limiter and evict timeline if possible
269 : match mgr.global_rate_limiter.try_acquire_eviction() {
270 : Some(_permit) => {
271 : mgr.set_status(Status::EvictTimeline);
272 : mgr.evict_timeline().await;
273 : }
274 : None => {
275 : // we can't evict timeline now, will try again later
276 : mgr.evict_not_before =
277 : Instant::now() + rand_duration(&mgr.conf.eviction_min_resident);
278 : update_next_event(&mut next_event, mgr.evict_not_before);
279 : }
280 : }
281 : }
282 : }
283 :
284 : mgr.set_status(Status::Wait);
285 : // wait until something changes. tx channels are stored under Arc, so they will not be
286 : // dropped until the manager task is finished.
287 : tokio::select! {
288 : _ = mgr.tli.cancel.cancelled() => {
289 : // timeline was deleted
290 : break 'outer state_snapshot;
291 : }
292 0 : _ = async {
293 0 : // don't wake up on every state change, but at most every REFRESH_INTERVAL
294 0 : tokio::time::sleep(REFRESH_INTERVAL).await;
295 0 : let _ = mgr.state_version_rx.changed().await;
296 0 : } => {
297 : // state was updated
298 : }
299 : _ = mgr.num_computes_rx.changed() => {
300 : // number of connected computes was updated
301 : }
302 : _ = sleep_until(&next_event) => {
303 : // we were waiting for some event (e.g. cfile save)
304 : }
305 : res = await_task_finish(&mut mgr.wal_removal_task) => {
306 : // WAL removal task finished
307 : mgr.wal_removal_task = None;
308 : mgr.update_wal_removal_end(res);
309 : }
310 : res = await_task_finish(&mut mgr.partial_backup_task) => {
311 : // partial backup task finished
312 : mgr.partial_backup_task = None;
313 : mgr.update_partial_backup_end(res);
314 : }
315 :
316 : msg = manager_rx.recv() => {
317 : mgr.set_status(Status::HandleMessage);
318 : mgr.handle_message(msg).await;
319 : }
320 : }
321 : };
322 : mgr.set_status(Status::Exiting);
323 :
324 : // remove timeline from the broker active set sooner, before waiting for background tasks
325 : mgr.tli_broker_active.set(false);
326 :
327 : // shutdown background tasks
328 : if mgr.conf.is_wal_backup_enabled() {
329 : wal_backup::update_task(&mut mgr, false, &last_state).await;
330 : }
331 :
332 : if let Some(recovery_task) = &mut mgr.recovery_task {
333 : if let Err(e) = recovery_task.await {
334 : warn!("recovery task failed: {:?}", e);
335 : }
336 : }
337 :
338 : if let Some(partial_backup_task) = &mut mgr.partial_backup_task {
339 : if let Err(e) = partial_backup_task.await {
340 : warn!("partial backup task failed: {:?}", e);
341 : }
342 : }
343 :
344 : if let Some(wal_removal_task) = &mut mgr.wal_removal_task {
345 : let res = wal_removal_task.await;
346 : mgr.update_wal_removal_end(res);
347 : }
348 :
349 : mgr.set_status(Status::Finished);
350 : }
351 :
352 : impl Manager {
353 0 : async fn new(
354 0 : tli: ManagerTimeline,
355 0 : conf: SafeKeeperConf,
356 0 : broker_active_set: Arc<TimelinesSet>,
357 0 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
358 0 : global_rate_limiter: RateLimiter,
359 0 : ) -> Manager {
360 0 : let (is_offloaded, partial_backup_uploaded) = tli.bootstrap_mgr().await;
361 : Manager {
362 0 : wal_seg_size: tli.get_wal_seg_size().await,
363 0 : walsenders: tli.get_walsenders().clone(),
364 0 : state_version_rx: tli.get_state_version_rx(),
365 0 : num_computes_rx: tli.get_walreceivers().get_num_rx(),
366 0 : tli_broker_active: broker_active_set.guard(tli.clone()),
367 0 : last_removed_segno: 0,
368 0 : is_offloaded,
369 0 : backup_task: None,
370 0 : recovery_task: None,
371 0 : wal_removal_task: None,
372 0 : partial_backup_task: None,
373 0 : partial_backup_uploaded,
374 0 : access_service: AccessService::new(manager_tx),
375 0 : tli,
376 0 : global_rate_limiter,
377 0 : // to smooth out evictions spike after restart
378 0 : evict_not_before: Instant::now() + rand_duration(&conf.eviction_min_resident),
379 0 : conf,
380 0 : }
381 0 : }
382 :
383 0 : fn set_status(&self, status: Status) {
384 0 : self.tli.set_status(status);
385 0 : }
386 :
387 : /// Get a WalResidentTimeline.
388 : /// Manager code must use this function instead of one from `Timeline`
389 : /// directly, because it will deadlock.
390 0 : pub(crate) fn wal_resident_timeline(&mut self) -> WalResidentTimeline {
391 0 : assert!(!self.is_offloaded);
392 0 : let guard = self.access_service.create_guard();
393 0 : WalResidentTimeline::new(self.tli.clone(), guard)
394 0 : }
395 :
396 : /// Get a snapshot of the timeline state.
397 0 : async fn state_snapshot(&self) -> StateSnapshot {
398 0 : let _timer = MISC_OPERATION_SECONDS
399 0 : .with_label_values(&["state_snapshot"])
400 0 : .start_timer();
401 0 :
402 0 : StateSnapshot::new(
403 0 : self.tli.read_shared_state().await,
404 0 : self.conf.heartbeat_timeout,
405 0 : )
406 0 : }
407 :
408 : /// Spawns/kills backup task and returns true if backup is required.
409 0 : async fn update_backup(&mut self, num_computes: usize, state: &StateSnapshot) -> bool {
410 0 : let is_wal_backup_required =
411 0 : wal_backup::is_wal_backup_required(self.wal_seg_size, num_computes, state);
412 0 :
413 0 : if self.conf.is_wal_backup_enabled() {
414 0 : wal_backup::update_task(self, is_wal_backup_required, state).await;
415 0 : }
416 :
417 : // update the state in Arc<Timeline>
418 0 : self.tli.wal_backup_active.store(
419 0 : self.backup_task.is_some(),
420 0 : std::sync::atomic::Ordering::Relaxed,
421 0 : );
422 0 : is_wal_backup_required
423 0 : }
424 :
425 : /// Update is_active flag and returns its value.
426 0 : fn update_is_active(
427 0 : &mut self,
428 0 : is_wal_backup_required: bool,
429 0 : num_computes: usize,
430 0 : state: &StateSnapshot,
431 0 : ) {
432 0 : let is_active = is_wal_backup_required
433 0 : || num_computes > 0
434 0 : || state.remote_consistent_lsn < state.commit_lsn;
435 :
436 : // update the broker timeline set
437 0 : if self.tli_broker_active.set(is_active) {
438 : // write log if state has changed
439 0 : info!(
440 0 : "timeline active={} now, remote_consistent_lsn={}, commit_lsn={}",
441 : is_active, state.remote_consistent_lsn, state.commit_lsn,
442 : );
443 :
444 0 : MANAGER_ACTIVE_CHANGES.inc();
445 0 : }
446 :
447 : // update the state in Arc<Timeline>
448 0 : self.tli
449 0 : .broker_active
450 0 : .store(is_active, std::sync::atomic::Ordering::Relaxed);
451 0 : }
452 :
453 : /// Save control file if needed. Returns Instant if we should persist the control file in the future.
454 0 : async fn update_control_file_save(
455 0 : &self,
456 0 : state: &StateSnapshot,
457 0 : next_event: &mut Option<Instant>,
458 0 : ) {
459 0 : if !state.inmem_flush_pending {
460 0 : return;
461 0 : }
462 0 :
463 0 : if state.cfile_last_persist_at.elapsed() > self.conf.control_file_save_interval {
464 0 : let mut write_guard = self.tli.write_shared_state().await;
465 : // it should be done in the background because it blocks manager task, but flush() should
466 : // be fast enough not to be a problem now
467 0 : if let Err(e) = write_guard.sk.state_mut().flush().await {
468 0 : warn!("failed to save control file: {:?}", e);
469 0 : }
470 0 : } else {
471 0 : // we should wait until some time passed until the next save
472 0 : update_next_event(
473 0 : next_event,
474 0 : (state.cfile_last_persist_at + self.conf.control_file_save_interval).into(),
475 0 : );
476 0 : }
477 0 : }
478 :
479 : /// Spawns WAL removal task if needed.
480 0 : async fn update_wal_removal(&mut self, state: &StateSnapshot) {
481 0 : if self.wal_removal_task.is_some() || state.wal_removal_on_hold {
482 : // WAL removal is already in progress or hold off
483 0 : return;
484 0 : }
485 :
486 : // If enabled, we use LSN of the most lagging walsender as a WAL removal horizon.
487 : // This allows to get better read speed for pageservers that are lagging behind,
488 : // at the cost of keeping more WAL on disk.
489 0 : let replication_horizon_lsn = if self.conf.walsenders_keep_horizon {
490 0 : self.walsenders.laggard_lsn()
491 : } else {
492 0 : None
493 : };
494 :
495 0 : let removal_horizon_lsn = calc_horizon_lsn(state, replication_horizon_lsn);
496 0 : let removal_horizon_segno = removal_horizon_lsn
497 0 : .segment_number(self.wal_seg_size)
498 0 : .saturating_sub(1);
499 0 :
500 0 : if removal_horizon_segno > self.last_removed_segno {
501 : // we need to remove WAL
502 0 : let remover = match self.tli.read_shared_state().await.sk {
503 0 : StateSK::Loaded(ref sk) => {
504 0 : crate::wal_storage::Storage::remove_up_to(&sk.wal_store, removal_horizon_segno)
505 : }
506 : StateSK::Offloaded(_) => {
507 : // we can't remove WAL if it's not loaded
508 0 : warn!("unexpectedly trying to run WAL removal on offloaded timeline");
509 0 : return;
510 : }
511 0 : StateSK::Empty => unreachable!(),
512 : };
513 :
514 0 : self.wal_removal_task = Some(tokio::spawn(
515 0 : async move {
516 0 : remover.await?;
517 0 : Ok(removal_horizon_segno)
518 0 : }
519 0 : .instrument(info_span!("WAL removal", ttid=%self.tli.ttid)),
520 : ));
521 0 : }
522 0 : }
523 :
524 : /// Update the state after WAL removal task finished.
525 0 : fn update_wal_removal_end(&mut self, res: Result<anyhow::Result<u64>, JoinError>) {
526 0 : let new_last_removed_segno = match res {
527 0 : Ok(Ok(segno)) => segno,
528 0 : Err(e) => {
529 0 : warn!("WAL removal task failed: {:?}", e);
530 0 : return;
531 : }
532 0 : Ok(Err(e)) => {
533 0 : warn!("WAL removal task failed: {:?}", e);
534 0 : return;
535 : }
536 : };
537 :
538 0 : self.last_removed_segno = new_last_removed_segno;
539 0 : // update the state in Arc<Timeline>
540 0 : self.tli
541 0 : .last_removed_segno
542 0 : .store(new_last_removed_segno, std::sync::atomic::Ordering::Relaxed);
543 0 : }
544 :
545 : /// Spawns partial WAL backup task if needed.
546 0 : async fn update_partial_backup(&mut self, state: &StateSnapshot) {
547 0 : // check if partial backup is enabled and should be started
548 0 : if !self.conf.is_wal_backup_enabled() || !self.conf.partial_backup_enabled {
549 0 : return;
550 0 : }
551 0 :
552 0 : if self.partial_backup_task.is_some() {
553 : // partial backup is already running
554 0 : return;
555 0 : }
556 0 :
557 0 : if !wal_backup_partial::needs_uploading(state, &self.partial_backup_uploaded) {
558 : // nothing to upload
559 0 : return;
560 0 : }
561 0 :
562 0 : // Get WalResidentTimeline and start partial backup task.
563 0 : self.partial_backup_task = Some(tokio::spawn(wal_backup_partial::main_task(
564 0 : self.wal_resident_timeline(),
565 0 : self.conf.clone(),
566 0 : self.global_rate_limiter.clone(),
567 0 : )));
568 0 : }
569 :
570 : /// Update the state after partial WAL backup task finished.
571 0 : fn update_partial_backup_end(&mut self, res: Result<Option<PartialRemoteSegment>, JoinError>) {
572 0 : match res {
573 0 : Ok(new_upload_state) => {
574 0 : self.partial_backup_uploaded = new_upload_state;
575 0 : }
576 0 : Err(e) => {
577 0 : warn!("partial backup task panicked: {:?}", e);
578 : }
579 : }
580 0 : }
581 :
582 : /// Handle message arrived from ManagerCtl.
583 0 : async fn handle_message(&mut self, msg: Option<ManagerCtlMessage>) {
584 0 : debug!("received manager message: {:?}", msg);
585 0 : match msg {
586 0 : Some(ManagerCtlMessage::GuardRequest(tx)) => {
587 0 : if self.is_offloaded {
588 : // trying to unevict timeline, but without gurarantee that it will be successful
589 0 : self.unevict_timeline().await;
590 0 : }
591 :
592 0 : let guard = if self.is_offloaded {
593 0 : Err(anyhow::anyhow!("timeline is offloaded, can't get a guard"))
594 : } else {
595 0 : Ok(self.access_service.create_guard())
596 : };
597 :
598 0 : if tx.send(guard).is_err() {
599 0 : warn!("failed to reply with a guard, receiver dropped");
600 0 : }
601 : }
602 0 : Some(ManagerCtlMessage::GuardDrop(guard_id)) => {
603 0 : self.access_service.drop_guard(guard_id);
604 0 : }
605 : None => {
606 : // can't happen, we're holding the sender
607 0 : unreachable!();
608 : }
609 : }
610 0 : }
611 : }
612 :
613 : // utility functions
614 0 : async fn sleep_until(option: &Option<tokio::time::Instant>) {
615 0 : if let Some(timeout) = option {
616 0 : tokio::time::sleep_until(*timeout).await;
617 : } else {
618 0 : futures::future::pending::<()>().await;
619 : }
620 0 : }
621 :
622 0 : async fn await_task_finish<T>(option: &mut Option<JoinHandle<T>>) -> Result<T, JoinError> {
623 0 : if let Some(task) = option {
624 0 : task.await
625 : } else {
626 0 : futures::future::pending().await
627 : }
628 0 : }
629 :
630 : /// Update next_event if candidate is earlier.
631 0 : fn update_next_event(next_event: &mut Option<Instant>, candidate: Instant) {
632 0 : if let Some(next) = next_event {
633 0 : if candidate < *next {
634 0 : *next = candidate;
635 0 : }
636 0 : } else {
637 0 : *next_event = Some(candidate);
638 0 : }
639 0 : }
640 :
641 : #[repr(usize)]
642 0 : #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
643 : pub enum Status {
644 : NotStarted,
645 : Started,
646 : StateSnapshot,
647 : UpdateBackup,
648 : UpdateControlFile,
649 : UpdateWalRemoval,
650 : UpdatePartialBackup,
651 : EvictTimeline,
652 : Wait,
653 : HandleMessage,
654 : Exiting,
655 : Finished,
656 : }
657 :
658 : /// AtomicStatus is a wrapper around AtomicUsize adapted for the Status enum.
659 : pub struct AtomicStatus {
660 : inner: AtomicUsize,
661 : }
662 :
663 : impl Default for AtomicStatus {
664 0 : fn default() -> Self {
665 0 : Self::new()
666 0 : }
667 : }
668 :
669 : impl AtomicStatus {
670 0 : pub fn new() -> Self {
671 0 : AtomicStatus {
672 0 : inner: AtomicUsize::new(Status::NotStarted as usize),
673 0 : }
674 0 : }
675 :
676 0 : pub fn load(&self, order: std::sync::atomic::Ordering) -> Status {
677 0 : // Safety: This line of code uses `std::mem::transmute` to reinterpret the loaded value as `Status`.
678 0 : // It is safe to use `transmute` in this context because `Status` is a repr(usize) enum,
679 0 : // which means it has the same memory layout as usize.
680 0 : // However, it is important to ensure that the loaded value is a valid variant of `Status`,
681 0 : // otherwise, the behavior will be undefined.
682 0 : unsafe { std::mem::transmute(self.inner.load(order)) }
683 0 : }
684 :
685 0 : pub fn get(&self) -> Status {
686 0 : self.load(std::sync::atomic::Ordering::Relaxed)
687 0 : }
688 :
689 0 : pub fn store(&self, val: Status, order: std::sync::atomic::Ordering) {
690 0 : self.inner.store(val as usize, order);
691 0 : }
692 : }
|