Line data Source code
1 : use futures::StreamExt;
2 : use tokio::task::JoinSet;
3 : use utils::{
4 : completion::{self, Completion},
5 : id::TimelineId,
6 : };
7 :
8 : use super::*;
9 : use crate::task_mgr::BACKGROUND_RUNTIME;
10 : use crate::tenant::harness::TenantHarness;
11 :
12 : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
13 : /// time. Now both of them complete per Arc drop semantics.
14 2 : #[tokio::test(start_paused = true)]
15 2 : async fn evict_and_wait_on_wanted_deleted() {
16 2 : // this is the runtime on which Layer spawns the blocking tasks on
17 2 : let handle = BACKGROUND_RUNTIME.handle();
18 2 :
19 2 : let h = TenantHarness::create("evict_and_wait_on_wanted_deleted").unwrap();
20 2 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
21 2 : let (tenant, ctx) = h.load().await;
22 2 :
23 2 : let timeline = tenant
24 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
25 6 : .await
26 2 : .unwrap();
27 2 :
28 2 : let layer = {
29 2 : let mut layers = {
30 2 : let layers = timeline.layers.read().await;
31 2 : layers.resident_layers().collect::<Vec<_>>().await
32 2 : };
33 2 :
34 2 : assert_eq!(layers.len(), 1);
35 2 :
36 2 : layers.swap_remove(0)
37 2 : };
38 2 :
39 2 : // setup done
40 2 :
41 2 : let resident = layer.keep_resident().await.unwrap();
42 2 :
43 2 : {
44 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait());
45 2 :
46 2 : // drive the future to await on the status channel
47 2 : tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait)
48 2 : .await
49 2 : .expect_err("should had been a timeout since we are holding the layer resident");
50 2 :
51 2 : layer.delete_on_drop();
52 2 :
53 2 : drop(resident);
54 2 :
55 2 : // make sure the eviction task gets to run
56 20 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle).await;
57 2 :
58 2 : let resident = layer.keep_resident().await;
59 2 : assert!(
60 2 : matches!(resident, Ok(None)),
61 2 : "keep_resident should not have re-initialized: {resident:?}"
62 2 : );
63 2 :
64 2 : evict_and_wait
65 2 : .await
66 2 : .expect("evict_and_wait should had succeeded");
67 2 :
68 2 : // works as intended
69 2 : }
70 2 :
71 2 : // assert that once we remove the `layer` from the layer map and drop our reference,
72 2 : // the deletion of the layer in remote_storage happens.
73 2 : {
74 2 : let mut layers = timeline.layers.write().await;
75 2 : layers.finish_gc_timeline(&[layer]);
76 2 : }
77 2 :
78 35 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle).await;
79 2 :
80 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
81 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
82 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
83 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
84 2 : }
85 :
86 : /// This test shows that ensures we are able to read the layer while the layer eviction has been
87 : /// started but not completed due to spawn_blocking pool being blocked.
88 : ///
89 : /// Here `Layer::keep_resident` is used to "simulate" reads, because it cannot download.
90 2 : #[tokio::test(start_paused = true)]
91 2 : async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
92 2 : // this is the runtime on which Layer spawns the blocking tasks on
93 2 : let handle = BACKGROUND_RUNTIME.handle();
94 2 : let h = TenantHarness::create("residency_check_while_evict_and_wait_on_clogged_spawn_blocking")
95 2 : .unwrap();
96 2 : let (tenant, ctx) = h.load().await;
97 2 :
98 2 : let timeline = tenant
99 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
100 6 : .await
101 2 : .unwrap();
102 2 :
103 2 : let layer = {
104 2 : let mut layers = {
105 2 : let layers = timeline.layers.read().await;
106 2 : layers.resident_layers().collect::<Vec<_>>().await
107 2 : };
108 2 :
109 2 : assert_eq!(layers.len(), 1);
110 2 :
111 2 : layers.swap_remove(0)
112 2 : };
113 2 :
114 2 : // setup done
115 2 :
116 2 : let resident = layer.keep_resident().await.unwrap();
117 2 :
118 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait());
119 2 :
120 2 : // drive the future to await on the status channel
121 2 : tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait)
122 2 : .await
123 2 : .expect_err("should had been a timeout since we are holding the layer resident");
124 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
125 2 :
126 2 : // clog up BACKGROUND_RUNTIME spawn_blocking
127 48 : let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(handle).await;
128 2 :
129 2 : // now the eviction cannot proceed because the threads are consumed while completion exists
130 2 : drop(resident);
131 2 :
132 2 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
133 2 : layer
134 2 : .keep_resident()
135 4 : .await
136 2 : .expect("keep_resident should had reinitialized without downloading")
137 2 : .expect("ResidentLayer");
138 2 :
139 2 : // because the keep_resident check alters wanted evicted without sending a message, we will
140 2 : // never get completed
141 2 : let e = tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait)
142 2 : .await
143 2 : .expect("no timeout, because keep_resident re-initialized")
144 2 : .expect_err("eviction should not have succeeded because re-initialized");
145 2 :
146 2 : // works as intended: evictions lose to "downloads"
147 2 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
148 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
149 2 :
150 2 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
151 2 : // because spawn_blocking is clogged up
152 2 : assert_eq!(
153 2 : 0,
154 2 : LAYER_IMPL_METRICS
155 2 : .cancelled_evictions
156 2 : .values()
157 16 : .map(|ctr| ctr.get())
158 2 : .sum::<u64>()
159 2 : );
160 2 :
161 2 : let mut second_eviction = std::pin::pin!(layer.evict_and_wait());
162 2 :
163 2 : tokio::time::timeout(std::time::Duration::from_secs(3600), &mut second_eviction)
164 2 : .await
165 2 : .expect_err("timeout because spawn_blocking is clogged");
166 2 :
167 2 : // in this case we don't leak started evictions, but I think there is still a chance of that
168 2 : // happening, because we could have upgrades race multiple evictions while only one of them
169 2 : // happens?
170 2 : assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
171 2 :
172 8 : helper.release().await;
173 2 :
174 2 : tokio::time::timeout(std::time::Duration::from_secs(3600), &mut second_eviction)
175 2 : .await
176 2 : .expect("eviction goes through now that spawn_blocking is unclogged")
177 2 : .expect("eviction should succeed, because version matches");
178 2 :
179 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
180 2 :
181 2 : // now we finally can observe the original spawn_blocking failing
182 2 : // it would had been possible to observe it earlier, but here it is guaranteed to have
183 2 : // happened.
184 2 : assert_eq!(
185 2 : 1,
186 2 : LAYER_IMPL_METRICS
187 2 : .cancelled_evictions
188 2 : .values()
189 16 : .map(|ctr| ctr.get())
190 2 : .sum::<u64>()
191 2 : );
192 2 : }
193 :
194 : struct SpawnBlockingPoolHelper {
195 : awaited_by_spawn_blocking_tasks: Completion,
196 : blocking_tasks: JoinSet<()>,
197 : }
198 :
199 : impl SpawnBlockingPoolHelper {
200 : /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
201 : /// release is called.
202 : ///
203 : /// In the tests this can be used to ensure something cannot be started on the target runtimes
204 : /// spawn_blocking pool.
205 : ///
206 : /// This should be no issue nowdays, because nextest runs each test in it's own process.
207 6 : async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
208 6 : let (completion, barrier) = completion::channel();
209 6 : let (tx, mut rx) = tokio::sync::mpsc::channel(8);
210 6 :
211 6 : let assumed_max_blocking_threads = 512;
212 6 :
213 6 : let mut blocking_tasks = JoinSet::new();
214 6 :
215 3072 : for _ in 0..assumed_max_blocking_threads {
216 3072 : let barrier = barrier.clone();
217 3072 : let tx = tx.clone();
218 3072 : blocking_tasks.spawn_blocking_on(
219 3072 : move || {
220 3072 : tx.blocking_send(()).unwrap();
221 3072 : drop(tx);
222 3072 : tokio::runtime::Handle::current().block_on(barrier.wait());
223 3072 : },
224 3072 : handle,
225 3072 : );
226 3072 : }
227 :
228 6 : drop(barrier);
229 6 :
230 6 : for _ in 0..assumed_max_blocking_threads {
231 3072 : rx.recv().await.unwrap();
232 : }
233 :
234 6 : SpawnBlockingPoolHelper {
235 6 : awaited_by_spawn_blocking_tasks: completion,
236 6 : blocking_tasks,
237 6 : }
238 6 : }
239 :
240 : /// Release all previously blocked spawn_blocking threads
241 6 : async fn release(self) {
242 6 : let SpawnBlockingPoolHelper {
243 6 : awaited_by_spawn_blocking_tasks,
244 6 : mut blocking_tasks,
245 6 : } = self;
246 6 :
247 6 : drop(awaited_by_spawn_blocking_tasks);
248 :
249 3078 : while let Some(res) = blocking_tasks.join_next().await {
250 3072 : res.expect("none of the tasks should had panicked");
251 3072 : }
252 6 : }
253 :
254 : /// In the tests it is used as an easy way of making sure something scheduled on the target
255 : /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
256 : /// before our tasks have a chance to schedule and complete.
257 4 : async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
258 4 : Self::consume_all_spawn_blocking_threads(handle)
259 39 : .await
260 4 : .release()
261 16 : .await
262 4 : }
263 : }
|