Line data Source code
1 : //! Timeline residence guard is needed to ensure that WAL segments are present on disk,
2 : //! as long as the code is holding the guard. This file implements guard logic, to issue
3 : //! and drop guards, and to notify the manager when the guard is dropped.
4 :
5 : use std::collections::HashSet;
6 :
7 : use tracing::{debug, warn};
8 :
9 : use crate::timeline_manager::ManagerCtlMessage;
10 :
11 : #[derive(Debug, Clone, Copy)]
12 : pub struct GuardId(u64);
13 :
14 : pub struct ResidenceGuard {
15 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
16 : guard_id: GuardId,
17 : }
18 :
19 : impl Drop for ResidenceGuard {
20 0 : fn drop(&mut self) {
21 0 : // notify the manager that the guard is dropped
22 0 : let res = self
23 0 : .manager_tx
24 0 : .send(ManagerCtlMessage::GuardDrop(self.guard_id));
25 0 : if let Err(e) = res {
26 0 : warn!("failed to send GuardDrop message: {:?}", e);
27 0 : }
28 0 : }
29 : }
30 :
31 : /// AccessService is responsible for issuing and dropping residence guards.
32 : /// All guards are stored in the `guards` set.
33 : /// TODO: it's possible to add `String` name to each guard, for better observability.
34 : pub(crate) struct AccessService {
35 : next_guard_id: u64,
36 : guards: HashSet<u64>,
37 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
38 : }
39 :
40 : impl AccessService {
41 0 : pub(crate) fn new(manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>) -> Self {
42 0 : Self {
43 0 : next_guard_id: 0,
44 0 : guards: HashSet::new(),
45 0 : manager_tx,
46 0 : }
47 0 : }
48 :
49 0 : pub(crate) fn is_empty(&self) -> bool {
50 0 : self.guards.is_empty()
51 0 : }
52 :
53 0 : pub(crate) fn create_guard(&mut self) -> ResidenceGuard {
54 0 : let guard_id = self.next_guard_id;
55 0 : self.next_guard_id += 1;
56 0 : self.guards.insert(guard_id);
57 0 :
58 0 : let guard_id = GuardId(guard_id);
59 0 : debug!("issued a new guard {:?}", guard_id);
60 :
61 0 : ResidenceGuard {
62 0 : manager_tx: self.manager_tx.clone(),
63 0 : guard_id,
64 0 : }
65 0 : }
66 :
67 0 : pub(crate) fn drop_guard(&mut self, guard_id: GuardId) {
68 0 : debug!("dropping guard {:?}", guard_id);
69 0 : assert!(self.guards.remove(&guard_id.0));
70 0 : }
71 : }
|