Line data Source code
1 : use std::{collections::HashMap, time::Duration};
2 :
3 : use super::remote_timeline_client::index::GcBlockingReason;
4 : use tokio::time::Instant;
5 : use utils::id::TimelineId;
6 :
7 : type TimelinesBlocked = HashMap<TimelineId, enumset::EnumSet<GcBlockingReason>>;
8 :
9 : #[derive(Default)]
10 : struct Storage {
11 : timelines_blocked: TimelinesBlocked,
12 : /// The deadline before which we are blocked from GC so that
13 : /// leases have a chance to be renewed.
14 : lsn_lease_deadline: Option<Instant>,
15 : }
16 :
17 : impl Storage {
18 2832 : fn is_blocked_by_lsn_lease_deadline(&self) -> bool {
19 2832 : self.lsn_lease_deadline
20 2832 : .map(|d| Instant::now() < d)
21 2832 : .unwrap_or(false)
22 2832 : }
23 : }
24 :
25 : /// GcBlock provides persistent (per-timeline) gc blocking and facilitates transient time based gc
26 : /// blocking.
27 : #[derive(Default)]
28 : pub(crate) struct GcBlock {
29 : /// The timelines which have current reasons to block gc.
30 : ///
31 : /// LOCK ORDER: this is held locked while scheduling the next index_part update. This is done
32 : /// to keep the this field up to date with RemoteTimelineClient `upload_queue.dirty`.
33 : reasons: std::sync::Mutex<Storage>,
34 :
35 : /// GC background task or manually run `Tenant::gc_iteration` holds a lock on this.
36 : ///
37 : /// Do not add any more features taking and forbidding taking this lock. It should be
38 : /// `tokio::sync::Notify`, but that is rarely used. On the other side, [`GcBlock::insert`]
39 : /// synchronizes with gc attempts by locking and unlocking this mutex.
40 : blocking: tokio::sync::Mutex<()>,
41 : }
42 :
43 : impl GcBlock {
44 : /// Start another gc iteration.
45 : ///
46 : /// Returns a guard to be held for the duration of gc iteration to allow synchronizing with
47 : /// it's ending, or if not currently possible, a value describing the reasons why not.
48 : ///
49 : /// Cancellation safe.
50 2262 : pub(super) async fn start(&self) -> Result<Guard<'_>, BlockingReasons> {
51 2262 : let reasons = {
52 2262 : let g = self.reasons.lock().unwrap();
53 2262 :
54 2262 : // TODO: the assumption is that this method gets called periodically. in prod, we use 1h, in
55 2262 : // tests, we use everything. we should warn if the gc has been consecutively blocked
56 2262 : // for more than 1h (within single tenant session?).
57 2262 : BlockingReasons::clean_and_summarize(g)
58 : };
59 :
60 2262 : if let Some(reasons) = reasons {
61 0 : Err(reasons)
62 : } else {
63 : Ok(Guard {
64 2262 : _inner: self.blocking.lock().await,
65 : })
66 : }
67 2262 : }
68 :
69 : /// Sets a deadline before which we cannot proceed to GC due to lsn lease.
70 : ///
71 : /// We do this as the leases mapping are not persisted to disk. By delaying GC by lease
72 : /// length, we guarantee that all the leases we granted before will have a chance to renew
73 : /// when we run GC for the first time after restart / transition from AttachedMulti to AttachedSingle.
74 0 : pub(super) fn set_lsn_lease_deadline(&self, lsn_lease_length: Duration) {
75 0 : let deadline = Instant::now() + lsn_lease_length;
76 0 : let mut g = self.reasons.lock().unwrap();
77 0 : g.lsn_lease_deadline = Some(deadline);
78 0 : }
79 :
80 : /// Describe the current gc blocking reasons.
81 : ///
82 : /// TODO: make this json serializable.
83 0 : pub(crate) fn summary(&self) -> Option<BlockingReasons> {
84 0 : let g = self.reasons.lock().unwrap();
85 0 :
86 0 : BlockingReasons::summarize(&g)
87 0 : }
88 :
89 : /// Start blocking gc for this one timeline for the given reason.
90 : ///
91 : /// This is not a guard based API but instead it mimics set API. The returned future will not
92 : /// resolve until an existing gc round has completed.
93 : ///
94 : /// Returns true if this block was new, false if gc was already blocked for this reason.
95 : ///
96 : /// Cancellation safe: cancelling after first poll will keep the reason to block gc, but will
97 : /// keep the gc blocking reason.
98 0 : pub(crate) async fn insert(
99 0 : &self,
100 0 : timeline: &super::Timeline,
101 0 : reason: GcBlockingReason,
102 0 : ) -> anyhow::Result<bool> {
103 0 : let (added, uploaded) = {
104 0 : let mut g = self.reasons.lock().unwrap();
105 0 : let set = g.timelines_blocked.entry(timeline.timeline_id).or_default();
106 0 : let added = set.insert(reason);
107 :
108 : // LOCK ORDER: intentionally hold the lock, see self.reasons.
109 0 : let uploaded = timeline
110 0 : .remote_client
111 0 : .schedule_insert_gc_block_reason(reason)?;
112 :
113 0 : (added, uploaded)
114 0 : };
115 0 :
116 0 : uploaded.await?;
117 :
118 : // ensure that any ongoing gc iteration has completed
119 0 : drop(self.blocking.lock().await);
120 :
121 0 : Ok(added)
122 0 : }
123 :
124 : /// Remove blocking gc for this one timeline and the given reason.
125 0 : pub(crate) async fn remove(
126 0 : &self,
127 0 : timeline: &super::Timeline,
128 0 : reason: GcBlockingReason,
129 0 : ) -> anyhow::Result<()> {
130 : use std::collections::hash_map::Entry;
131 :
132 0 : super::span::debug_assert_current_span_has_tenant_and_timeline_id();
133 :
134 0 : let (remaining_blocks, uploaded) = {
135 0 : let mut g = self.reasons.lock().unwrap();
136 0 : match g.timelines_blocked.entry(timeline.timeline_id) {
137 0 : Entry::Occupied(mut oe) => {
138 0 : let set = oe.get_mut();
139 0 : set.remove(reason);
140 0 : if set.is_empty() {
141 0 : oe.remove();
142 0 : }
143 : }
144 0 : Entry::Vacant(_) => {
145 0 : // we must still do the index_part.json update regardless, in case we had earlier
146 0 : // been cancelled
147 0 : }
148 : }
149 :
150 0 : let remaining_blocks = g.timelines_blocked.len();
151 :
152 : // LOCK ORDER: intentionally hold the lock while scheduling; see self.reasons
153 0 : let uploaded = timeline
154 0 : .remote_client
155 0 : .schedule_remove_gc_block_reason(reason)?;
156 :
157 0 : (remaining_blocks, uploaded)
158 0 : };
159 0 : uploaded.await?;
160 :
161 : // no need to synchronize with gc iteration again
162 :
163 0 : if remaining_blocks > 0 {
164 0 : tracing::info!(remaining_blocks, removed=?reason, "gc blocking removed, but gc remains blocked");
165 : } else {
166 0 : tracing::info!("gc is now unblocked for the tenant");
167 : }
168 :
169 0 : Ok(())
170 0 : }
171 :
172 0 : pub(crate) fn before_delete(&self, timeline: &super::Timeline) {
173 0 : let unblocked = {
174 0 : let mut g = self.reasons.lock().unwrap();
175 0 : if g.timelines_blocked.is_empty() {
176 0 : return;
177 0 : }
178 0 :
179 0 : g.timelines_blocked.remove(&timeline.timeline_id);
180 0 :
181 0 : BlockingReasons::clean_and_summarize(g).is_none()
182 0 : };
183 0 :
184 0 : if unblocked {
185 0 : tracing::info!("gc is now unblocked following deletion");
186 0 : }
187 0 : }
188 :
189 : /// Initialize with the non-deleted timelines of this tenant.
190 570 : pub(crate) fn set_scanned(&self, scanned: TimelinesBlocked) {
191 570 : let mut g = self.reasons.lock().unwrap();
192 570 : assert!(g.timelines_blocked.is_empty());
193 570 : g.timelines_blocked
194 570 : .extend(scanned.into_iter().filter(|(_, v)| !v.is_empty()));
195 :
196 570 : if let Some(reasons) = BlockingReasons::clean_and_summarize(g) {
197 0 : tracing::info!(summary=?reasons, "initialized with gc blocked");
198 570 : }
199 570 : }
200 : }
201 :
202 : pub(super) struct Guard<'a> {
203 : _inner: tokio::sync::MutexGuard<'a, ()>,
204 : }
205 :
206 : #[derive(Debug)]
207 : pub(crate) struct BlockingReasons {
208 : tenant_blocked_by_lsn_lease_deadline: bool,
209 : timelines: usize,
210 : reasons: enumset::EnumSet<GcBlockingReason>,
211 : }
212 :
213 : impl std::fmt::Display for BlockingReasons {
214 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215 0 : write!(
216 0 : f,
217 0 : "tenant_blocked_by_lsn_lease_deadline: {}, {} timelines block for {:?}",
218 0 : self.tenant_blocked_by_lsn_lease_deadline, self.timelines, self.reasons
219 0 : )
220 0 : }
221 : }
222 :
223 : impl BlockingReasons {
224 2832 : fn clean_and_summarize(mut g: std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
225 2832 : let mut reasons = enumset::EnumSet::empty();
226 2832 : g.timelines_blocked.retain(|_key, value| {
227 0 : reasons = reasons.union(*value);
228 0 : !value.is_empty()
229 2832 : });
230 2832 : let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
231 2832 : if !g.timelines_blocked.is_empty() || blocked_by_lsn_lease_deadline {
232 0 : Some(BlockingReasons {
233 0 : tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
234 0 : timelines: g.timelines_blocked.len(),
235 0 : reasons,
236 0 : })
237 : } else {
238 2832 : None
239 : }
240 2832 : }
241 :
242 0 : fn summarize(g: &std::sync::MutexGuard<'_, Storage>) -> Option<Self> {
243 0 : let blocked_by_lsn_lease_deadline = g.is_blocked_by_lsn_lease_deadline();
244 0 : if g.timelines_blocked.is_empty() && !blocked_by_lsn_lease_deadline {
245 0 : None
246 : } else {
247 0 : let reasons = g
248 0 : .timelines_blocked
249 0 : .values()
250 0 : .fold(enumset::EnumSet::empty(), |acc, next| acc.union(*next));
251 0 : Some(BlockingReasons {
252 0 : tenant_blocked_by_lsn_lease_deadline: blocked_by_lsn_lease_deadline,
253 0 : timelines: g.timelines_blocked.len(),
254 0 : reasons,
255 0 : })
256 : }
257 0 : }
258 : }
|