Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use utils::id::TimelineId;
4 :
5 : use super::remote_timeline_client::index::GcBlockingReason;
6 :
7 : type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
8 :
9 : /// GcBlock provides persistent (per-timeline) gc blocking.
10 : #[derive(Default)]
11 : pub(crate) struct GcBlock {
12 : /// The timelines which have current reasons to block gc.
13 : ///
14 : /// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
15 : /// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
16 : reasons: std::sync::Mutex<Storage>,
17 :
18 : /// GC background task or manually run `Tenant::gc_iteration` holds a lock on this.
19 : ///
20 : /// Do not add any more features taking and forbidding taking this lock. It should be
21 : /// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
22 : /// synchronizes with gc attempts by locking and unlocking this mutex.
23 : blocking: tokio::sync::Mutex<()>,
24 : }
25 :
26 : impl GcBlock {
27 : /// Start another gc iteration.
28 : ///
29 : /// Returns a guard to be held for the duration of gc iteration to allow synchronizing with
30 : /// it's ending, or if not currently possible, a value describing the reasons why not.
31 : ///
32 : /// Cancellation safe.
33 4 : pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> {
34 4 : let reasons = {
35 4 : let g = self.reasons.lock().unwrap();
36 4 :
37 4 : // TODO: the assumption is that this method gets called periodically. in prod, we use 1h, in
38 4 : // tests, we use everything. we should warn if the gc has been consecutively blocked
39 4 : // for more than 1h (within single tenant session?).
40 4 : BlockingReasons::clean_and_summarize(g)
41 : };
42 :
43 4 : if let Some(reasons) = reasons {
44 0 : Err(reasons)
45 : } else {
46 : Ok(Guard {
47 4 : _inner: self.blocking.lock().await,
48 : })
49 : }
50 4 : }
51 :
52 : /// Describe the current gc blocking reasons.
53 : ///
54 : /// TODO: make this json serializable.
55 0 : pub(crate) fn summary(&self) -> Option<BlockingReasons> {
56 0 : let g = self.reasons.lock().unwrap();
57 0 :
58 0 : BlockingReasons::summarize(&g)
59 0 : }
60 :
61 : /// Start blocking gc for this one timeline for the given reason.
62 : ///
63 : /// This is not a guard based API but instead it mimics set API. The returned future will not
64 : /// resolve until an existing gc round has completed.
65 : ///
66 : /// Returns true if this block was new, false if gc was already blocked for this reason.
67 : ///
68 : /// Cancellation safe: cancelling after first poll will keep the reason to block gc, but will
69 : /// keep the gc blocking reason.
70 0 : pub(crate) async fn insert(
71 0 : &self,
72 0 : timeline: &super::Timeline,
73 0 : reason: GcBlockingReason,
74 0 : ) -> anyhow::Result<bool> {
75 0 : let (added, uploaded) = {
76 0 : let mut g = self.reasons.lock().unwrap();
77 0 : let set = g.entry(timeline.timeline_id).or_default();
78 0 : let added = set.insert(reason);
79 :
80 : // LOCK ORDER: intentionally hold the lock, see self.reasons.
81 0 : let uploaded = timeline
82 0 : .remote_client
83 0 : .schedule_insert_gc_block_reason(reason)?;
84 :
85 0 : (added, uploaded)
86 0 : };
87 0 :
88 0 : uploaded.await?;
89 :
90 : // ensure that any ongoing gc iteration has completed
91 0 : drop(self.blocking.lock().await);
92 :
93 0 : Ok(added)
94 0 : }
95 :
96 : /// Remove blocking gc for this one timeline and the given reason.
97 0 : pub(crate) async fn remove(
98 0 : &self,
99 0 : timeline: &super::Timeline,
100 0 : reason: GcBlockingReason,
101 0 : ) -> anyhow::Result<()> {
102 : use std::collections::hash_map::Entry;
103 :
104 0 : super::span::debug_assert_current_span_has_tenant_and_timeline_id();
105 :
106 0 : let (remaining_blocks, uploaded) = {
107 0 : let mut g = self.reasons.lock().unwrap();
108 0 : match g.entry(timeline.timeline_id) {
109 0 : Entry::Occupied(mut oe) => {
110 0 : let set = oe.get_mut();
111 0 : set.remove(reason);
112 0 : if set.is_empty() {
113 0 : oe.remove();
114 0 : }
115 : }
116 0 : Entry::Vacant(_) => {
117 0 : // we must still do the index_part.json update regardless, in case we had earlier
118 0 : // been cancelled
119 0 : }
120 : }
121 :
122 0 : let remaining_blocks = g.len();
123 :
124 : // LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
125 0 : let uploaded = timeline
126 0 : .remote_client
127 0 : .schedule_remove_gc_block_reason(reason)?;
128 :
129 0 : (remaining_blocks, uploaded)
130 0 : };
131 0 : uploaded.await?;
132 :
133 : // no need to synchronize with gc iteration again
134 :
135 0 : if remaining_blocks > 0 {
136 0 : tracing::info!(remaining_blocks, removed=?reason, "gc blocking removed, but gc remains blocked");
137 : } else {
138 0 : tracing::info!("gc is now unblocked for the tenant");
139 : }
140 :
141 0 : Ok(())
142 0 : }
143 :
144 0 : pub(crate) fn before_delete(&self, timeline_id: &super::TimelineId) {
145 0 : let unblocked = {
146 0 : let mut g = self.reasons.lock().unwrap();
147 0 : if g.is_empty() {
148 0 : return;
149 0 : }
150 0 :
151 0 : g.remove(timeline_id);
152 0 :
153 0 : BlockingReasons::clean_and_summarize(g).is_none()
154 0 : };
155 0 :
156 0 : if unblocked {
157 0 : tracing::info!("gc is now unblocked following deletion");
158 0 : }
159 0 : }
160 :
161 : /// Initialize with the non-deleted timelines of this tenant.
162 190 : pub(crate) fn set_scanned(&self, scanned: Storage) {
163 190 : let mut g = self.reasons.lock().unwrap();
164 190 : assert!(g.is_empty());
165 190 : g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
166 :
167 190 : if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
168 0 : tracing::info!(summary=?reasons, "initialized with gc blocked");
169 190 : }
170 190 : }
171 : }
172 :
173 : pub(super) struct Guard<'a> {
174 : _inner: tokio::sync::MutexGuard<'a, ()>,
175 : }
176 :
177 : #[derive(Debug)]
178 : pub(crate) struct BlockingReasons {
179 : timelines: usize,
180 : reasons: enumset::EnumSet<GcBlockingReason>,
181 : }
182 :
183 : impl std::fmt::Display for BlockingReasons {
184 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
185 0 : write!(
186 0 : f,
187 0 : "{} timelines block for {:?}",
188 0 : self.timelines, self.reasons
189 0 : )
190 0 : }
191 : }
192 :
193 : impl BlockingReasons {
194 194 : fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
195 194 : let mut reasons = enumset::EnumSet::empty();
196 194 : g.retain(|_key, value| {
197 0 : reasons = reasons.union(*value);
198 0 : !value.is_empty()
199 194 : });
200 194 : if !g.is_empty() {
201 0 : Some(BlockingReasons {
202 0 : timelines: g.len(),
203 0 : reasons,
204 0 : })
205 : } else {
206 194 : None
207 : }
208 194 : }
209 :
210 0 : fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
211 0 : if g.is_empty() {
212 0 : None
213 : } else {
214 0 : let reasons = g
215 0 : .values()
216 0 : .fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
217 0 : Some(BlockingReasons {
218 0 : timelines: g.len(),
219 0 : reasons,
220 0 : })
221 : }
222 0 : }
223 : }
|