Line data Source code
1 : //! Timeline residence guard
2 : //!
3 : //! It is needed to ensure that WAL segments are present on disk,
4 : //! as long as the code is holding the guard. This file implements guard logic, to issue
5 : //! and drop guards, and to notify the manager when the guard is dropped.
6 :
7 : use std::collections::HashSet;
8 :
9 : use tracing::debug;
10 : use utils::sync::gate::GateGuard;
11 :
12 : use crate::timeline_manager::ManagerCtlMessage;
13 :
14 : #[derive(Debug, Clone, Copy)]
15 : pub struct GuardId(u64);
16 :
17 : pub struct ResidenceGuard {
18 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
19 : guard_id: GuardId,
20 :
21 : /// [`ResidenceGuard`] represents a guarantee that a timeline's data remains resident,
22 : /// which by extension also means the timeline is not shut down (since after shut down
23 : /// our data may be deleted). Therefore everyone holding a residence guard must also
24 : /// hold a guard on [`crate::timeline::Timeline::gate`]
25 : _gate_guard: GateGuard,
26 : }
27 :
28 : impl Drop for ResidenceGuard {
29 9 : fn drop(&mut self) {
30 9 : // notify the manager that the guard is dropped
31 9 : let res = self
32 9 : .manager_tx
33 9 : .send(ManagerCtlMessage::GuardDrop(self.guard_id));
34 9 : if let Err(e) = res {
35 0 : debug!("failed to send GuardDrop message: {:?}", e);
36 9 : }
37 9 : }
38 : }
39 :
40 : /// AccessService is responsible for issuing and dropping residence guards.
41 : /// All guards are stored in the `guards` set.
42 : /// TODO: it's possible to add `String` name to each guard, for better observability.
43 : pub(crate) struct AccessService {
44 : next_guard_id: u64,
45 : guards: HashSet<u64>,
46 : manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>,
47 : }
48 :
49 : impl AccessService {
50 3 : pub(crate) fn new(manager_tx: tokio::sync::mpsc::UnboundedSender<ManagerCtlMessage>) -> Self {
51 3 : Self {
52 3 : next_guard_id: 0,
53 3 : guards: HashSet::new(),
54 3 : manager_tx,
55 3 : }
56 3 : }
57 :
58 0 : pub(crate) fn is_empty(&self) -> bool {
59 0 : self.guards.is_empty()
60 0 : }
61 :
62 : /// `timeline_gate_guard` is a guarantee that the timeline is not shut down
63 9 : pub(crate) fn create_guard(&mut self, timeline_gate_guard: GateGuard) -> ResidenceGuard {
64 9 : let guard_id = self.next_guard_id;
65 9 : self.next_guard_id += 1;
66 9 : self.guards.insert(guard_id);
67 9 :
68 9 : let guard_id = GuardId(guard_id);
69 9 : debug!("issued a new guard {:?}", guard_id);
70 :
71 9 : ResidenceGuard {
72 9 : manager_tx: self.manager_tx.clone(),
73 9 : guard_id,
74 9 : _gate_guard: timeline_gate_guard,
75 9 : }
76 9 : }
77 :
78 5 : pub(crate) fn drop_guard(&mut self, guard_id: GuardId) {
79 5 : debug!("dropping guard {:?}", guard_id);
80 5 : assert!(self.guards.remove(&guard_id.0));
81 5 : }
82 : }
|