Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use utils::id::TimelineId;
4 :
5 : use super::remote_timeline_client::index::GcBlockingReason;
6 :
7 : type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
8 :
9 : #[derive(Default)]
10 : pub(crate) struct GcBlock {
11 : /// The timelines which have current reasons to block gc.
12 : ///
13 : /// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
14 : /// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
15 : reasons: std::sync::Mutex<Storage>,
16 : blocking: tokio::sync::Mutex<()>,
17 : }
18 :
19 : impl GcBlock {
20 : /// Start another gc iteration.
21 : ///
22 : /// Returns a guard to be held for the duration of gc iteration to allow synchronizing with
23 : /// it's ending, or if not currently possible, a value describing the reasons why not.
24 : ///
25 : /// Cancellation safe.
26 754 : pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> {
27 754 : let reasons = {
28 754 : let g = self.reasons.lock().unwrap();
29 754 :
30 754 : // TODO: the assumption is that this method gets called periodically. in prod, we use 1h, in
31 754 : // tests, we use everything. we should warn if the gc has been consecutively blocked
32 754 : // for more than 1h (within single tenant session?).
33 754 : BlockingReasons::clean_and_summarize(g)
34 : };
35 :
36 754 : if let Some(reasons) = reasons {
37 0 : Err(reasons)
38 : } else {
39 : Ok(Guard {
40 754 : _inner: self.blocking.lock().await,
41 : })
42 : }
43 754 : }
44 :
45 0 : pub(crate) fn summary(&self) -> Option<BlockingReasons> {
46 0 : let g = self.reasons.lock().unwrap();
47 0 :
48 0 : BlockingReasons::summarize(&g)
49 0 : }
50 :
51 : /// Start blocking gc for this one timeline for the given reason.
52 : ///
53 : /// This is not a guard based API but instead it mimics set API. The returned future will not
54 : /// resolve until an existing gc round has completed.
55 : ///
56 : /// Returns true if this block was new, false if gc was already blocked for this reason.
57 : ///
58 : /// Cancellation safe: cancelling after first poll will keep the reason to block gc, but will
59 : /// keep the gc blocking reason.
60 0 : pub(crate) async fn insert(
61 0 : &self,
62 0 : timeline: &super::Timeline,
63 0 : reason: GcBlockingReason,
64 0 : ) -> anyhow::Result<bool> {
65 0 : let (added, uploaded) = {
66 0 : let mut g = self.reasons.lock().unwrap();
67 0 : let set = g.entry(timeline.timeline_id).or_default();
68 0 : let added = set.insert(reason);
69 :
70 : // LOCK ORDER: intentionally hold the lock, see self.reasons.
71 0 : let uploaded = timeline
72 0 : .remote_client
73 0 : .schedule_insert_gc_block_reason(reason)?;
74 :
75 0 : (added, uploaded)
76 0 : };
77 0 :
78 0 : uploaded.await?;
79 :
80 : // ensure that any ongoing gc iteration has completed
81 0 : drop(self.blocking.lock().await);
82 :
83 0 : Ok(added)
84 0 : }
85 :
86 : /// Remove blocking gc for this one timeline and the given reason.
87 0 : pub(crate) async fn remove(
88 0 : &self,
89 0 : timeline: &super::Timeline,
90 0 : reason: GcBlockingReason,
91 0 : ) -> anyhow::Result<()> {
92 0 : use std::collections::hash_map::Entry;
93 0 :
94 0 : super::span::debug_assert_current_span_has_tenant_and_timeline_id();
95 :
96 0 : let (remaining_blocks, uploaded) = {
97 0 : let mut g = self.reasons.lock().unwrap();
98 0 : match g.entry(timeline.timeline_id) {
99 0 : Entry::Occupied(mut oe) => {
100 0 : let set = oe.get_mut();
101 0 : set.remove(reason);
102 0 : if set.is_empty() {
103 0 : oe.remove();
104 0 : }
105 : }
106 0 : Entry::Vacant(_) => {
107 0 : // we must still do the index_part.json update regardless, in case we had earlier
108 0 : // been cancelled
109 0 : }
110 : }
111 :
112 0 : let remaining_blocks = g.len();
113 :
114 : // LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
115 0 : let uploaded = timeline
116 0 : .remote_client
117 0 : .schedule_remove_gc_block_reason(reason)?;
118 :
119 0 : (remaining_blocks, uploaded)
120 0 : };
121 0 : uploaded.await?;
122 :
123 : // no need to synchronize with gc iteration again
124 :
125 0 : if remaining_blocks > 0 {
126 0 : tracing::info!(remaining_blocks, removed=?reason, "gc blocking removed, but gc remains blocked");
127 : } else {
128 0 : tracing::info!("gc is now unblocked for the tenant");
129 : }
130 :
131 0 : Ok(())
132 0 : }
133 :
134 0 : pub(crate) fn before_delete(&self, timeline: &super::Timeline) {
135 0 : let unblocked = {
136 0 : let mut g = self.reasons.lock().unwrap();
137 0 : if g.is_empty() {
138 0 : return;
139 0 : }
140 0 :
141 0 : g.remove(&timeline.timeline_id);
142 0 :
143 0 : BlockingReasons::clean_and_summarize(g).is_none()
144 0 : };
145 0 :
146 0 : if unblocked {
147 0 : tracing::info!("gc is now unblocked following deletion");
148 0 : }
149 0 : }
150 :
151 : /// Initialize with the non-deleted timelines of this tenant.
152 182 : pub(crate) fn set_scanned(&self, scanned: Storage) {
153 182 : let mut g = self.reasons.lock().unwrap();
154 182 : assert!(g.is_empty());
155 182 : g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
156 :
157 182 : if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
158 0 : tracing::info!(summary=?reasons, "initialized with gc blocked");
159 182 : }
160 182 : }
161 : }
162 :
163 : pub(super) struct Guard<'a> {
164 : _inner: tokio::sync::MutexGuard<'a, ()>,
165 : }
166 :
167 : #[derive(Debug)]
168 : pub(crate) struct BlockingReasons {
169 : timelines: usize,
170 : reasons: enumset::EnumSet<GcBlockingReason>,
171 : }
172 :
173 : impl std::fmt::Display for BlockingReasons {
174 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 0 : write!(
176 0 : f,
177 0 : "{} timelines block for {:?}",
178 0 : self.timelines, self.reasons
179 0 : )
180 0 : }
181 : }
182 :
183 : impl BlockingReasons {
184 936 : fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
185 936 : let mut reasons = enumset::EnumSet::empty();
186 936 : g.retain(|_key, value| {
187 0 : reasons = reasons.union(*value);
188 0 : !value.is_empty()
189 936 : });
190 936 : if !g.is_empty() {
191 0 : Some(BlockingReasons {
192 0 : timelines: g.len(),
193 0 : reasons,
194 0 : })
195 : } else {
196 936 : None
197 : }
198 936 : }
199 :
200 0 : fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
201 0 : if g.is_empty() {
202 0 : None
203 : } else {
204 0 : let reasons = g
205 0 : .values()
206 0 : .fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
207 0 : Some(BlockingReasons {
208 0 : timelines: g.len(),
209 0 : reasons,
210 0 : })
211 : }
212 0 : }
213 : }
|