Line data Source code
1 : //! Timeline residence guard
2 : //!
3 : //! It is needed to ensure that WAL segments are present on disk,
4 : //! as long as the code is holding the guard. This file implements guard logic, to issue
5 : //! and drop guards, and to notify the manager when the guard is dropped.
6 :
7 : use std::collections::HashSet;
8 :
9 : use tracing::debug;
10 :
11 : use crate::timeline_manager::ManagerCtlMessage;
12 :
13 : #[derive(Debug, Clone, Copy)]
14 : pub struct GuardId(u64);
15 :
16 : pub struct ResidenceGuard {
17 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
18 : guard_id: GuardId,
19 : }
20 :
21 : impl Drop for ResidenceGuard {
22 0 : fn drop(&mut self) {
23 0 : // notify the manager that the guard is dropped
24 0 : let res = self
25 0 : .manager_tx
26 0 : .send(ManagerCtlMessage::GuardDrop(self.guard_id));
27 0 : if let Err(e) = res {
28 0 : debug!("failed to send GuardDrop message: {:?}", e);
29 0 : }
30 0 : }
31 : }
32 :
33 : /// AccessService is responsible for issuing and dropping residence guards.
34 : /// All guards are stored in the `guards` set.
35 : /// TODO: it's possible to add `String` name to each guard, for better observability.
36 : pub(crate) struct AccessService {
37 : next_guard_id: u64,
38 : guards: HashSet<u64>,
39 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
40 : }
41 :
42 : impl AccessService {
43 0 : pub(crate) fn new(manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>) -> Self {
44 0 : Self {
45 0 : next_guard_id: 0,
46 0 : guards: HashSet::new(),
47 0 : manager_tx,
48 0 : }
49 0 : }
50 :
51 0 : pub(crate) fn is_empty(&self) -> bool {
52 0 : self.guards.is_empty()
53 0 : }
54 :
55 0 : pub(crate) fn create_guard(&mut self) -> ResidenceGuard {
56 0 : let guard_id = self.next_guard_id;
57 0 : self.next_guard_id += 1;
58 0 : self.guards.insert(guard_id);
59 0 :
60 0 : let guard_id = GuardId(guard_id);
61 0 : debug!("issued a new guard {:?}", guard_id);
62 :
63 0 : ResidenceGuard {
64 0 : manager_tx: self.manager_tx.clone(),
65 0 : guard_id,
66 0 : }
67 0 : }
68 :
69 0 : pub(crate) fn drop_guard(&mut self, guard_id: GuardId) {
70 0 : debug!("dropping guard {:?}", guard_id);
71 0 : assert!(self.guards.remove(&guard_id.0));
72 0 : }
73 : }
|