Line data Source code
1 : use std::collections::HashMap;
2 : use std::sync::Arc;
3 :
4 : use utils::id::TimelineId;
5 :
6 : use super::remote_timeline_client::index::GcBlockingReason;
7 :
8 : type Storage = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
9 :
10 : /// GcBlock provides persistent (per-timeline) gc blocking.
11 : #[derive(Default)]
12 : pub(crate) struct GcBlock {
13 : /// The timelines which have current reasons to block gc.
14 : ///
15 : /// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
16 : /// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
17 : reasons: std::sync::Mutex<Storage>,
18 :
19 : /// GC background task or manually run `Tenant::gc_iteration` holds a lock on this.
20 : ///
21 : /// Do not add any more features taking and forbidding taking this lock. It should be
22 : /// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
23 : /// synchronizes with gc attempts by locking and unlocking this mutex.
24 : blocking: Arc<tokio::sync::Mutex<()>>,
25 : }
26 :
27 : impl GcBlock {
28 : /// Start another gc iteration.
29 : ///
30 : /// Returns a guard to be held for the duration of gc iteration to allow synchronizing with
31 : /// it's ending, or if not currently possible, a value describing the reasons why not.
32 : ///
33 : /// Cancellation safe.
34 8 : pub(super) async fn start(&self) -> Result<Guard, BlockingReasons> {
35 8 : let reasons = {
36 8 : let g = self.reasons.lock().unwrap();
37 8 :
38 8 : // TODO: the assumption is that this method gets called periodically. in prod, we use 1h, in
39 8 : // tests, we use everything. we should warn if the gc has been consecutively blocked
40 8 : // for more than 1h (within single tenant session?).
41 8 : BlockingReasons::clean_and_summarize(g)
42 : };
43 :
44 8 : if let Some(reasons) = reasons {
45 0 : Err(reasons)
46 : } else {
47 : Ok(Guard {
48 8 : _inner: self.blocking.clone().lock_owned().await,
49 : })
50 : }
51 8 : }
52 :
53 : /// Describe the current gc blocking reasons.
54 : ///
55 : /// TODO: make this json serializable.
56 0 : pub(crate) fn summary(&self) -> Option<BlockingReasons> {
57 0 : let g = self.reasons.lock().unwrap();
58 0 :
59 0 : BlockingReasons::summarize(&g)
60 0 : }
61 :
62 : /// Start blocking gc for this one timeline for the given reason.
63 : ///
64 : /// This is not a guard based API but instead it mimics set API. The returned future will not
65 : /// resolve until an existing gc round has completed.
66 : ///
67 : /// Returns true if this block was new, false if gc was already blocked for this reason.
68 : ///
69 : /// Cancellation safe: cancelling after first poll will keep the reason to block gc, but will
70 : /// keep the gc blocking reason.
71 0 : pub(crate) async fn insert(
72 0 : &self,
73 0 : timeline: &super::Timeline,
74 0 : reason: GcBlockingReason,
75 0 : ) -> anyhow::Result<bool> {
76 0 : let (added, uploaded) = {
77 0 : let mut g = self.reasons.lock().unwrap();
78 0 : let set = g.entry(timeline.timeline_id).or_default();
79 0 : let added = set.insert(reason);
80 :
81 : // LOCK ORDER: intentionally hold the lock, see self.reasons.
82 0 : let uploaded = timeline
83 0 : .remote_client
84 0 : .schedule_insert_gc_block_reason(reason)?;
85 :
86 0 : (added, uploaded)
87 0 : };
88 0 :
89 0 : uploaded.await?;
90 :
91 : // ensure that any ongoing gc iteration has completed
92 0 : drop(self.blocking.lock().await);
93 :
94 0 : Ok(added)
95 0 : }
96 :
97 : /// Remove blocking gc for this one timeline and the given reason.
98 0 : pub(crate) async fn remove(
99 0 : &self,
100 0 : timeline: &super::Timeline,
101 0 : reason: GcBlockingReason,
102 0 : ) -> anyhow::Result<()> {
103 : use std::collections::hash_map::Entry;
104 :
105 0 : super::span::debug_assert_current_span_has_tenant_and_timeline_id();
106 :
107 0 : let (remaining_blocks, uploaded) = {
108 0 : let mut g = self.reasons.lock().unwrap();
109 0 : match g.entry(timeline.timeline_id) {
110 0 : Entry::Occupied(mut oe) => {
111 0 : let set = oe.get_mut();
112 0 : set.remove(reason);
113 0 : if set.is_empty() {
114 0 : oe.remove();
115 0 : }
116 : }
117 0 : Entry::Vacant(_) => {
118 0 : // we must still do the index_part.json update regardless, in case we had earlier
119 0 : // been cancelled
120 0 : }
121 : }
122 :
123 0 : let remaining_blocks = g.len();
124 :
125 : // LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
126 0 : let uploaded = timeline
127 0 : .remote_client
128 0 : .schedule_remove_gc_block_reason(reason)?;
129 :
130 0 : (remaining_blocks, uploaded)
131 0 : };
132 0 : uploaded.await?;
133 :
134 : // no need to synchronize with gc iteration again
135 :
136 0 : if remaining_blocks > 0 {
137 0 : tracing::info!(remaining_blocks, removed=?reason, "gc blocking removed, but gc remains blocked");
138 : } else {
139 0 : tracing::info!("gc is now unblocked for the tenant");
140 : }
141 :
142 0 : Ok(())
143 0 : }
144 :
145 0 : pub(crate) fn before_delete(&self, timeline_id: &super::TimelineId) {
146 0 : let unblocked = {
147 0 : let mut g = self.reasons.lock().unwrap();
148 0 : if g.is_empty() {
149 0 : return;
150 0 : }
151 0 :
152 0 : g.remove(timeline_id);
153 0 :
154 0 : BlockingReasons::clean_and_summarize(g).is_none()
155 0 : };
156 0 :
157 0 : if unblocked {
158 0 : tracing::info!("gc is now unblocked following deletion");
159 0 : }
160 0 : }
161 :
162 : /// Initialize with the non-deleted timelines of this tenant.
163 452 : pub(crate) fn set_scanned(&self, scanned: Storage) {
164 452 : let mut g = self.reasons.lock().unwrap();
165 452 : assert!(g.is_empty());
166 452 : g.extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
167 :
168 452 : if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
169 0 : tracing::info!(summary=?reasons, "initialized with gc blocked");
170 452 : }
171 452 : }
172 : }
173 :
174 : pub(crate) struct Guard {
175 : _inner: tokio::sync::OwnedMutexGuard<()>,
176 : }
177 :
178 : #[derive(Debug)]
179 : pub(crate) struct BlockingReasons {
180 : timelines: usize,
181 : reasons: enumset::EnumSet<GcBlockingReason>,
182 : }
183 :
184 : impl std::fmt::Display for BlockingReasons {
185 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
186 0 : write!(
187 0 : f,
188 0 : "{} timelines block for {:?}",
189 0 : self.timelines, self.reasons
190 0 : )
191 0 : }
192 : }
193 :
194 : impl BlockingReasons {
195 460 : fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
196 460 : let mut reasons = enumset::EnumSet::empty();
197 460 : g.retain(|_key, value| {
198 0 : reasons = reasons.union(*value);
199 0 : !value.is_empty()
200 460 : });
201 460 : if !g.is_empty() {
202 0 : Some(BlockingReasons {
203 0 : timelines: g.len(),
204 0 : reasons,
205 0 : })
206 : } else {
207 460 : None
208 : }
209 460 : }
210 :
211 0 : fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
212 0 : if g.is_empty() {
213 0 : None
214 : } else {
215 0 : let reasons = g
216 0 : .values()
217 0 : .fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
218 0 : Some(BlockingReasons {
219 0 : timelines: g.len(),
220 0 : reasons,
221 0 : })
222 : }
223 0 : }
224 : }
|