Line data Source code
1 : use pageserver_api::key::CONTROLFILE_KEY;
2 : use tokio::task::JoinSet;
3 : use utils::{
4 : completion::{self, Completion},
5 : id::TimelineId,
6 : };
7 :
8 : use super::failpoints::{Failpoint, FailpointKind};
9 : use super::*;
10 : use crate::context::DownloadBehavior;
11 : use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
12 :
13 : /// Used in tests to advance a future to wanted await point, and not futher.
14 : const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
15 :
16 : /// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
17 : /// timeout uses to advance futures.
18 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
19 :
20 : /// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
21 : #[tokio::test]
22 2 : async fn smoke_test() {
23 2 : let handle = tokio::runtime::Handle::current();
24 2 :
25 2 : let h = TenantHarness::create("smoke_test").unwrap();
26 2 : let span = h.span();
27 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
28 8 : let (tenant, _) = h.load().await;
29 2 :
30 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
31 2 :
32 2 : let timeline = tenant
33 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
34 6 : .await
35 2 : .unwrap();
36 2 :
37 2 : let layer = {
38 2 : let mut layers = {
39 2 : let layers = timeline.layers.read().await;
40 2 : layers.likely_resident_layers().collect::<Vec<_>>()
41 2 : };
42 2 :
43 2 : assert_eq!(layers.len(), 1);
44 2 :
45 2 : layers.swap_remove(0)
46 2 : };
47 2 :
48 2 : // all layers created at pageserver are like `layer`, initialized with strong
49 2 : // Arc<DownloadedLayer>.
50 2 :
51 2 : let img_before = {
52 2 : let mut data = ValueReconstructState::default();
53 2 : layer
54 2 : .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
55 4 : .await
56 2 : .unwrap();
57 2 : data.img
58 2 : .take()
59 2 : .expect("tenant harness writes the control file")
60 2 : };
61 2 :
62 2 : // important part is evicting the layer, which can be done when there are no more ResidentLayer
63 2 : // instances -- there currently are none, only two `Layer` values, one in the layermap and on
64 2 : // in scope.
65 2 : layer.evict_and_wait(FOREVER).await.unwrap();
66 2 :
67 2 : // double-evict returns an error, which is valid if both eviction_task and disk usage based
68 2 : // eviction would both evict the same layer at the same time.
69 2 :
70 2 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
71 2 : assert!(matches!(e, EvictionError::NotFound));
72 2 :
73 2 : // on accesses when the layer is evicted, it will automatically be downloaded.
74 2 : let img_after = {
75 2 : let mut data = ValueReconstructState::default();
76 2 : layer
77 2 : .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
78 2 : .instrument(download_span.clone())
79 8 : .await
80 2 : .unwrap();
81 2 : data.img.take().unwrap()
82 2 : };
83 2 :
84 2 : assert_eq!(img_before, img_after);
85 2 :
86 2 : // evict_and_wait can timeout, but it doesn't cancel the evicting itself
87 2 : //
88 2 : // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
89 2 : // artificially slow it down.
90 2 : let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
91 2 :
92 2 : match layer
93 2 : .evict_and_wait(std::time::Duration::ZERO)
94 2 : .await
95 2 : .unwrap_err()
96 2 : {
97 2 : EvictionError::Timeout => {
98 2 : // expected, but note that the eviction is "still ongoing"
99 8 : helper.release().await;
100 2 : // exhaust spawn_blocking pool to ensure it is now complete
101 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
102 24 : .await;
103 2 : }
104 2 : other => unreachable!("{other:?}"),
105 2 : }
106 2 :
107 2 : // only way to query if a layer is resident is to acquire a ResidentLayer instance.
108 2 : // Layer::keep_resident never downloads, but it might initialize if the layer file is found
109 2 : // downloaded locally.
110 2 : let none = layer.keep_resident().await;
111 2 : assert!(
112 2 : none.is_none(),
113 2 : "Expected none, because eviction removed the local file, found: {none:?}"
114 2 : );
115 2 :
116 2 : // plain downloading is rarely needed
117 2 : layer
118 2 : .download_and_keep_resident()
119 2 : .instrument(download_span)
120 5 : .await
121 2 : .unwrap();
122 2 :
123 2 : // last important part is deletion on drop: gc and compaction use it for compacted L0 layers
124 2 : // or fully garbage collected layers. deletion means deleting the local file, and scheduling a
125 2 : // deletion of the already unlinked from index_part.json remote file.
126 2 : //
127 2 : // marking a layer to be deleted on drop is irreversible; there is no technical reason against
128 2 : // reversiblity, but currently it is not needed so it is not provided.
129 2 : layer.delete_on_drop();
130 2 :
131 2 : let path = layer.local_path().to_owned();
132 2 :
133 2 : // wait_drop produces an unconnected to Layer future which will resolve when the
134 2 : // LayerInner::drop has completed.
135 2 : let mut wait_drop = std::pin::pin!(layer.wait_drop());
136 2 :
137 2 : // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
138 2 : // until here
139 2 : tokio::time::pause();
140 2 : tokio::time::timeout(ADVANCE, &mut wait_drop)
141 2 : .await
142 2 : .expect_err("should had timed out because two strong references exist");
143 2 :
144 2 : tokio::fs::metadata(&path)
145 2 : .await
146 2 : .expect("the local layer file still exists");
147 2 :
148 2 : let rtc = &timeline.remote_client;
149 2 :
150 2 : {
151 2 : let layers = &[layer];
152 2 : let mut g = timeline.layers.write().await;
153 2 : g.finish_gc_timeline(layers);
154 2 : // this just updates the remote_physical_size for demonstration purposes
155 2 : rtc.schedule_gc_update(layers).unwrap();
156 2 : }
157 2 :
158 2 : // when strong references are dropped, the file is deleted and remote deletion is scheduled
159 2 : wait_drop.await;
160 2 :
161 2 : let e = tokio::fs::metadata(&path)
162 2 : .await
163 2 : .expect_err("the local file is deleted");
164 2 : assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
165 2 :
166 2 : rtc.wait_completion().await.unwrap();
167 2 :
168 2 : assert_eq!(rtc.get_remote_physical_size(), 0);
169 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
170 2 : }
171 :
172 : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
173 : /// time. Now both of them complete per Arc drop semantics.
174 : #[tokio::test(start_paused = true)]
175 2 : async fn evict_and_wait_on_wanted_deleted() {
176 2 : // this is the runtime on which Layer spawns the blocking tasks on
177 2 : let handle = tokio::runtime::Handle::current();
178 2 :
179 2 : let h = TenantHarness::create("evict_and_wait_on_wanted_deleted").unwrap();
180 2 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
181 8 : let (tenant, ctx) = h.load().await;
182 2 :
183 2 : let timeline = tenant
184 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
185 6 : .await
186 2 : .unwrap();
187 2 :
188 2 : let layer = {
189 2 : let mut layers = {
190 2 : let layers = timeline.layers.read().await;
191 2 : layers.likely_resident_layers().collect::<Vec<_>>()
192 2 : };
193 2 :
194 2 : assert_eq!(layers.len(), 1);
195 2 :
196 2 : layers.swap_remove(0)
197 2 : };
198 2 :
199 2 : // setup done
200 2 :
201 2 : let resident = layer.keep_resident().await.unwrap();
202 2 :
203 2 : {
204 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
205 2 :
206 2 : // drive the future to await on the status channel
207 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
208 2 : .await
209 2 : .expect_err("should had been a timeout since we are holding the layer resident");
210 2 :
211 2 : layer.delete_on_drop();
212 2 :
213 2 : drop(resident);
214 2 :
215 2 : // make sure the eviction task gets to run
216 12 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
217 2 :
218 2 : let resident = layer.keep_resident().await;
219 2 : assert!(
220 2 : resident.is_none(),
221 2 : "keep_resident should not have re-initialized: {resident:?}"
222 2 : );
223 2 :
224 2 : evict_and_wait
225 2 : .await
226 2 : .expect("evict_and_wait should had succeeded");
227 2 :
228 2 : // works as intended
229 2 : }
230 2 :
231 2 : // assert that once we remove the `layer` from the layer map and drop our reference,
232 2 : // the deletion of the layer in remote_storage happens.
233 2 : {
234 2 : let mut layers = timeline.layers.write().await;
235 2 : layers.finish_gc_timeline(&[layer]);
236 2 : }
237 2 :
238 8 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
239 2 :
240 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
241 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
242 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
243 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
244 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
245 2 : }
246 :
247 : /// This test ensures we are able to read the layer while the layer eviction has been
248 : /// started but not completed.
249 : #[test]
250 2 : fn read_wins_pending_eviction() {
251 2 : let rt = tokio::runtime::Builder::new_current_thread()
252 2 : .max_blocking_threads(1)
253 2 : .enable_all()
254 2 : .start_paused(true)
255 2 : .build()
256 2 : .unwrap();
257 2 :
258 2 : rt.block_on(async move {
259 2 : // this is the runtime on which Layer spawns the blocking tasks on
260 2 : let handle = tokio::runtime::Handle::current();
261 2 : let h = TenantHarness::create("read_wins_pending_eviction").unwrap();
262 8 : let (tenant, ctx) = h.load().await;
263 2 : let span = h.span();
264 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
265 :
266 2 : let timeline = tenant
267 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
268 6 : .await
269 2 : .unwrap();
270 :
271 2 : let layer = {
272 2 : let mut layers = {
273 2 : let layers = timeline.layers.read().await;
274 2 : layers.likely_resident_layers().collect::<Vec<_>>()
275 2 : };
276 2 :
277 2 : assert_eq!(layers.len(), 1);
278 :
279 2 : layers.swap_remove(0)
280 : };
281 :
282 : // setup done
283 :
284 2 : let resident = layer.keep_resident().await.unwrap();
285 2 :
286 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
287 2 :
288 2 : // drive the future to await on the status channel
289 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
290 2 : .await
291 2 : .expect_err("should had been a timeout since we are holding the layer resident");
292 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
293 :
294 2 : let (completion, barrier) = utils::completion::channel();
295 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
296 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
297 2 : Some(arrival),
298 2 : barrier,
299 2 : ));
300 2 :
301 2 : // now the eviction cannot proceed because the threads are consumed while completion exists
302 2 : drop(resident);
303 2 : arrived_at_barrier.wait().await;
304 2 : assert!(!layer.is_likely_resident());
305 :
306 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
307 2 : layer
308 2 : .0
309 2 : .get_or_maybe_download(false, None)
310 2 : .instrument(download_span)
311 2 : .await
312 2 : .expect("should had reinitialized without downloading");
313 2 :
314 2 : assert!(layer.is_likely_resident());
315 :
316 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
317 2 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
318 0 : .await
319 2 : .expect("no timeout, because get_or_maybe_download re-initialized")
320 2 : .expect_err("eviction should not have succeeded because re-initialized");
321 2 :
322 2 : // works as intended: evictions lose to "downloads"
323 2 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
324 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
325 :
326 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
327 : // because of a failpoint
328 2 : assert_eq!(
329 2 : 0,
330 2 : LAYER_IMPL_METRICS
331 2 : .cancelled_evictions
332 2 : .values()
333 18 : .map(|ctr| ctr.get())
334 2 : .sum::<u64>()
335 2 : );
336 :
337 2 : drop(completion);
338 2 :
339 4 : tokio::time::sleep(ADVANCE).await;
340 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
341 2 : .await;
342 :
343 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
344 :
345 : // now we finally can observe the original eviction failing
346 : // it would had been possible to observe it earlier, but here it is guaranteed to have
347 : // happened.
348 2 : assert_eq!(
349 2 : 1,
350 2 : LAYER_IMPL_METRICS
351 2 : .cancelled_evictions
352 2 : .values()
353 18 : .map(|ctr| ctr.get())
354 2 : .sum::<u64>()
355 2 : );
356 :
357 2 : assert_eq!(
358 2 : 1,
359 2 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
360 2 : );
361 :
362 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
363 2 : });
364 2 : }
365 :
366 : /// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
367 : #[test]
368 2 : fn multiple_pending_evictions_in_order() {
369 2 : let name = "multiple_pending_evictions_in_order";
370 2 : let in_order = true;
371 2 : multiple_pending_evictions_scenario(name, in_order);
372 2 : }
373 :
374 : /// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
375 : #[test]
376 2 : fn multiple_pending_evictions_out_of_order() {
377 2 : let name = "multiple_pending_evictions_out_of_order";
378 2 : let in_order = false;
379 2 : multiple_pending_evictions_scenario(name, in_order);
380 2 : }
381 :
382 4 : fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
383 4 : let rt = tokio::runtime::Builder::new_current_thread()
384 4 : .max_blocking_threads(1)
385 4 : .enable_all()
386 4 : .start_paused(true)
387 4 : .build()
388 4 : .unwrap();
389 4 :
390 4 : rt.block_on(async move {
391 4 : // this is the runtime on which Layer spawns the blocking tasks on
392 4 : let handle = tokio::runtime::Handle::current();
393 4 : let h = TenantHarness::create(name).unwrap();
394 16 : let (tenant, ctx) = h.load().await;
395 4 : let span = h.span();
396 4 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
397 :
398 4 : let timeline = tenant
399 4 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
400 12 : .await
401 4 : .unwrap();
402 :
403 4 : let layer = {
404 4 : let mut layers = {
405 4 : let layers = timeline.layers.read().await;
406 4 : layers.likely_resident_layers().collect::<Vec<_>>()
407 4 : };
408 4 :
409 4 : assert_eq!(layers.len(), 1);
410 :
411 4 : layers.swap_remove(0)
412 : };
413 :
414 : // setup done
415 :
416 4 : let resident = layer.keep_resident().await.unwrap();
417 4 :
418 4 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
419 4 :
420 4 : // drive the future to await on the status channel
421 4 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
422 4 : .await
423 4 : .expect_err("should had been a timeout since we are holding the layer resident");
424 4 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
425 :
426 4 : let (completion1, barrier) = utils::completion::channel();
427 4 : let mut completion1 = Some(completion1);
428 4 : let (arrival, arrived_at_barrier) = utils::completion::channel();
429 4 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
430 4 : Some(arrival),
431 4 : barrier,
432 4 : ));
433 4 :
434 4 : // now the eviction cannot proceed because we are simulating arbitrary long delay for the
435 4 : // eviction task start.
436 4 : drop(resident);
437 4 : assert!(!layer.is_likely_resident());
438 :
439 4 : arrived_at_barrier.wait().await;
440 :
441 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
442 4 : layer
443 4 : .0
444 4 : .get_or_maybe_download(false, None)
445 4 : .instrument(download_span)
446 2 : .await
447 4 : .expect("should had reinitialized without downloading");
448 4 :
449 4 : assert!(layer.is_likely_resident());
450 :
451 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
452 4 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
453 0 : .await
454 4 : .expect("no timeout, because get_or_maybe_download re-initialized")
455 4 : .expect_err("eviction should not have succeeded because re-initialized");
456 4 :
457 4 : // works as intended: evictions lose to "downloads"
458 4 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
459 4 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
460 :
461 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
462 : // because of a failpoint
463 4 : assert_eq!(
464 4 : 0,
465 4 : LAYER_IMPL_METRICS
466 4 : .cancelled_evictions
467 4 : .values()
468 36 : .map(|ctr| ctr.get())
469 4 : .sum::<u64>()
470 4 : );
471 :
472 4 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
473 :
474 : // configure another failpoint for the second eviction -- evictions are per initialization,
475 : // so now that we've reinitialized the inner, we get to run two of them at the same time.
476 4 : let (completion2, barrier) = utils::completion::channel();
477 4 : let (arrival, arrived_at_barrier) = utils::completion::channel();
478 4 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
479 4 : Some(arrival),
480 4 : barrier,
481 4 : ));
482 4 :
483 4 : let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
484 4 :
485 4 : // advance to the wait on the queue
486 4 : tokio::time::timeout(ADVANCE, &mut second_eviction)
487 8 : .await
488 4 : .expect_err("timeout because failpoint is blocking");
489 4 :
490 4 : arrived_at_barrier.wait().await;
491 :
492 4 : assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
493 :
494 4 : let mut release_earlier_eviction = |expected_reason| {
495 4 : assert_eq!(
496 4 : 0,
497 4 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
498 4 : );
499 :
500 4 : drop(completion1.take().unwrap());
501 4 :
502 4 : let handle = &handle;
503 :
504 4 : async move {
505 4 : tokio::time::sleep(ADVANCE).await;
506 4 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
507 4 : handle, 1,
508 4 : )
509 7 : .await;
510 :
511 4 : assert_eq!(
512 4 : 1,
513 4 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
514 4 : );
515 4 : }
516 4 : };
517 :
518 4 : if in_order {
519 5 : release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
520 2 : }
521 :
522 : // release the later eviction which is for the current version
523 4 : drop(completion2);
524 8 : tokio::time::sleep(ADVANCE).await;
525 4 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
526 7 : .await;
527 :
528 4 : if !in_order {
529 6 : release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
530 2 : }
531 :
532 4 : tokio::time::timeout(ADVANCE, &mut second_eviction)
533 0 : .await
534 4 : .expect("eviction goes through now that spawn_blocking is unclogged")
535 4 : .expect("eviction should succeed, because version matches");
536 4 :
537 4 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
538 :
539 : // ensure the cancelled are unchanged
540 4 : assert_eq!(
541 4 : 1,
542 4 : LAYER_IMPL_METRICS
543 4 : .cancelled_evictions
544 4 : .values()
545 36 : .map(|ctr| ctr.get())
546 4 : .sum::<u64>()
547 4 : );
548 :
549 4 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
550 4 : });
551 4 : }
552 :
553 : /// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
554 : /// a `Layer::keep_resident` call.
555 : ///
556 : /// This matters because cancelling the eviction would leave us in a state where the file is on
557 : /// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
558 : /// have non-repairing `Layer::is_likely_resident`.
559 : #[tokio::test(start_paused = true)]
560 2 : async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
561 2 : let handle = tokio::runtime::Handle::current();
562 2 : let h =
563 2 : TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction").unwrap();
564 8 : let (tenant, ctx) = h.load().await;
565 2 :
566 2 : let timeline = tenant
567 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
568 6 : .await
569 2 : .unwrap();
570 2 :
571 2 : let layer = {
572 2 : let mut layers = {
573 2 : let layers = timeline.layers.read().await;
574 2 : layers.likely_resident_layers().collect::<Vec<_>>()
575 2 : };
576 2 :
577 2 : assert_eq!(layers.len(), 1);
578 2 :
579 2 : layers.swap_remove(0)
580 2 : };
581 2 :
582 2 : // this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
583 2 : // Err) at the right time as in "during" the `LayerInner::needs_download`.
584 2 : layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
585 2 :
586 2 : let (completion, barrier) = utils::completion::channel();
587 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
588 2 :
589 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
590 2 : Some(arrival),
591 2 : barrier,
592 2 : ));
593 2 :
594 2 : tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
595 2 : .await
596 2 : .expect_err("should had advanced to waiting on channel");
597 2 :
598 2 : arrived_at_barrier.wait().await;
599 2 :
600 2 : // simulate a cancelled read which is cancelled before it gets to re-initialize
601 2 : let e = layer
602 2 : .0
603 2 : .get_or_maybe_download(false, None)
604 2 : .await
605 2 : .unwrap_err();
606 2 : assert!(
607 2 : matches!(
608 2 : e,
609 2 : DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
610 2 : ),
611 2 : "{e:?}"
612 2 : );
613 2 :
614 2 : assert!(
615 2 : layer.0.needs_download().await.unwrap().is_none(),
616 2 : "file is still on disk"
617 2 : );
618 2 :
619 2 : // release the eviction task
620 2 : drop(completion);
621 2 : tokio::time::sleep(ADVANCE).await;
622 29 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
623 2 :
624 2 : // failpoint is still enabled, but it is not hit
625 2 : let e = layer
626 2 : .0
627 2 : .get_or_maybe_download(false, None)
628 3 : .await
629 2 : .unwrap_err();
630 2 : assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
631 2 :
632 2 : // failpoint is not counted as cancellation either
633 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
634 2 : }
635 :
636 : #[tokio::test(start_paused = true)]
637 2 : async fn evict_and_wait_does_not_wait_for_download() {
638 2 : // let handle = tokio::runtime::Handle::current();
639 2 : let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download").unwrap();
640 8 : let (tenant, ctx) = h.load().await;
641 2 : let span = h.span();
642 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
643 2 :
644 2 : let timeline = tenant
645 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
646 6 : .await
647 2 : .unwrap();
648 2 :
649 2 : let layer = {
650 2 : let mut layers = {
651 2 : let layers = timeline.layers.read().await;
652 2 : layers.likely_resident_layers().collect::<Vec<_>>()
653 2 : };
654 2 :
655 2 : assert_eq!(layers.len(), 1);
656 2 :
657 2 : layers.swap_remove(0)
658 2 : };
659 2 :
660 2 : // kind of forced setup: start an eviction but do not allow it progress until we are
661 2 : // downloading
662 2 : let (eviction_can_continue, barrier) = utils::completion::channel();
663 2 : let (arrival, eviction_arrived) = utils::completion::channel();
664 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
665 2 : Some(arrival),
666 2 : barrier,
667 2 : ));
668 2 :
669 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
670 2 :
671 2 : // use this once-awaited other_evict to synchronize with the eviction
672 2 : let other_evict = layer.evict_and_wait(FOREVER);
673 2 :
674 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
675 2 : .await
676 2 : .expect_err("should had advanced");
677 2 : eviction_arrived.wait().await;
678 2 : drop(eviction_can_continue);
679 2 : other_evict.await.unwrap();
680 2 :
681 2 : // now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
682 2 : assert!(!layer.is_likely_resident());
683 2 :
684 2 : // following new evict_and_wait will fail until we've completed the download
685 2 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
686 2 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
687 2 :
688 2 : let (download_can_continue, barrier) = utils::completion::channel();
689 2 : let (arrival, _download_arrived) = utils::completion::channel();
690 2 : layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
691 2 :
692 2 : let mut download = std::pin::pin!(layer
693 2 : .0
694 2 : .get_or_maybe_download(true, None)
695 2 : .instrument(download_span));
696 2 :
697 2 : assert!(
698 2 : !layer.is_likely_resident(),
699 2 : "during download layer is evicted"
700 2 : );
701 2 :
702 2 : tokio::time::timeout(ADVANCE, &mut download)
703 4 : .await
704 2 : .expect_err("should had timed out because of failpoint");
705 2 :
706 2 : // now we finally get to continue, and because the latest state is downloading, we deduce that
707 2 : // original eviction succeeded
708 2 : evict_and_wait.await.unwrap();
709 2 :
710 2 : // however a new evict_and_wait will fail
711 2 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
712 2 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
713 2 :
714 2 : assert!(!layer.is_likely_resident());
715 2 :
716 2 : drop(download_can_continue);
717 2 : download.await.expect("download should had succeeded");
718 2 : assert!(layer.is_likely_resident());
719 2 :
720 2 : // only now can we evict
721 2 : layer.evict_and_wait(FOREVER).await.unwrap();
722 2 : }
723 :
724 : /// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
725 : /// which is the last value.
726 : ///
727 : /// Also checks that the same does not happen on a non-evicted layer (regression test).
728 : #[tokio::test(start_paused = true)]
729 2 : async fn eviction_cancellation_on_drop() {
730 2 : use crate::repository::Value;
731 2 : use bytes::Bytes;
732 2 :
733 2 : // this is the runtime on which Layer spawns the blocking tasks on
734 2 : let handle = tokio::runtime::Handle::current();
735 2 :
736 2 : let h = TenantHarness::create("eviction_cancellation_on_drop").unwrap();
737 2 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
738 8 : let (tenant, ctx) = h.load().await;
739 2 :
740 2 : let timeline = tenant
741 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
742 5 : .await
743 2 : .unwrap();
744 2 :
745 2 : {
746 2 : // create_test_timeline wrote us one layer, write another
747 2 : let mut writer = timeline.writer().await;
748 2 : writer
749 2 : .put(
750 2 : Key::from_i128(5),
751 2 : Lsn(0x20),
752 2 : &Value::Image(Bytes::from_static(b"this does not matter either")),
753 2 : &ctx,
754 2 : )
755 2 : .await
756 2 : .unwrap();
757 2 :
758 2 : writer.finish_write(Lsn(0x20));
759 2 : }
760 2 :
761 2 : timeline.freeze_and_flush().await.unwrap();
762 2 :
763 2 : // wait for the upload to complete so our Arc::strong_count assertion holds
764 2 : timeline.remote_client.wait_completion().await.unwrap();
765 2 :
766 2 : let (evicted_layer, not_evicted) = {
767 2 : let mut layers = {
768 2 : let mut guard = timeline.layers.write().await;
769 2 : let layers = guard.likely_resident_layers().collect::<Vec<_>>();
770 2 : // remove the layers from layermap
771 2 : guard.finish_gc_timeline(&layers);
772 2 :
773 2 : layers
774 2 : };
775 2 :
776 2 : assert_eq!(layers.len(), 2);
777 2 :
778 2 : (layers.pop().unwrap(), layers.pop().unwrap())
779 2 : };
780 2 :
781 2 : let victims = [(evicted_layer, true), (not_evicted, false)];
782 2 :
783 6 : for (victim, evict) in victims {
784 4 : let resident = victim.keep_resident().await.unwrap();
785 4 : drop(victim);
786 4 :
787 4 : assert_eq!(Arc::strong_count(&resident.owner.0), 1);
788 2 :
789 4 : if evict {
790 2 : let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
791 2 :
792 2 : // drive the future to await on the status channel, and then drop it
793 2 : tokio::time::timeout(ADVANCE, evict_and_wait)
794 2 : .await
795 2 : .expect_err("should had been a timeout since we are holding the layer resident");
796 2 : }
797 2 :
798 2 : // 1 == we only evict one of the layers
799 4 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
800 2 :
801 4 : drop(resident);
802 4 :
803 4 : // run any spawned
804 4 : tokio::time::sleep(ADVANCE).await;
805 2 :
806 21 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
807 2 :
808 4 : assert_eq!(
809 4 : 1,
810 4 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
811 4 : );
812 2 : }
813 2 : }
814 :
815 : /// A test case to remind you the cost of these structures. You can bump the size limit
816 : /// below if it is really necessary to add more fields to the structures.
817 : #[test]
818 : #[cfg(target_arch = "x86_64")]
819 2 : fn layer_size() {
820 2 : assert_eq!(std::mem::size_of::<LayerAccessStats>(), 2040);
821 2 : assert_eq!(std::mem::size_of::<PersistentLayerDesc>(), 104);
822 2 : assert_eq!(std::mem::size_of::<LayerInner>(), 2344);
823 : // it also has the utf8 path
824 2 : }
825 :
826 : struct SpawnBlockingPoolHelper {
827 : awaited_by_spawn_blocking_tasks: Completion,
828 : blocking_tasks: JoinSet<()>,
829 : }
830 :
831 : impl SpawnBlockingPoolHelper {
832 : /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
833 : /// release is called.
834 : ///
835 : /// In the tests this can be used to ensure something cannot be started on the target runtimes
836 : /// spawn_blocking pool.
837 : ///
838 : /// This should be no issue nowdays, because nextest runs each test in it's own process.
839 2 : async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
840 2 : let default_max_blocking_threads = 512;
841 2 :
842 2 : Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
843 2 : }
844 :
845 26 : async fn consume_all_spawn_blocking_threads0(
846 26 : handle: &tokio::runtime::Handle,
847 26 : threads: usize,
848 26 : ) -> Self {
849 26 : assert_ne!(threads, 0);
850 :
851 26 : let (completion, barrier) = completion::channel();
852 26 : let (started, starts_completed) = completion::channel();
853 26 :
854 26 : let mut blocking_tasks = JoinSet::new();
855 26 :
856 7180 : for _ in 0..threads {
857 7180 : let barrier = barrier.clone();
858 7180 : let started = started.clone();
859 7180 : blocking_tasks.spawn_blocking_on(
860 7180 : move || {
861 7180 : drop(started);
862 7180 : tokio::runtime::Handle::current().block_on(barrier.wait());
863 7180 : },
864 7180 : handle,
865 7180 : );
866 7180 : }
867 :
868 26 : drop(started);
869 26 :
870 26 : starts_completed.wait().await;
871 :
872 26 : drop(barrier);
873 26 :
874 26 : tracing::trace!("consumed all threads");
875 :
876 26 : SpawnBlockingPoolHelper {
877 26 : awaited_by_spawn_blocking_tasks: completion,
878 26 : blocking_tasks,
879 26 : }
880 26 : }
881 :
882 : /// Release all previously blocked spawn_blocking threads
883 26 : async fn release(self) {
884 26 : let SpawnBlockingPoolHelper {
885 26 : awaited_by_spawn_blocking_tasks,
886 26 : mut blocking_tasks,
887 26 : } = self;
888 26 :
889 26 : drop(awaited_by_spawn_blocking_tasks);
890 :
891 7206 : while let Some(res) = blocking_tasks.join_next().await {
892 7180 : res.expect("none of the tasks should had panicked");
893 7180 : }
894 :
895 26 : tracing::trace!("released all threads");
896 26 : }
897 :
898 : /// In the tests it is used as an easy way of making sure something scheduled on the target
899 : /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
900 : /// before our tasks have a chance to schedule and complete.
901 12 : async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
902 94 : Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
903 12 : }
904 :
905 22 : async fn consume_and_release_all_of_spawn_blocking_threads0(
906 22 : handle: &tokio::runtime::Handle,
907 22 : threads: usize,
908 22 : ) {
909 22 : Self::consume_all_spawn_blocking_threads0(handle, threads)
910 18 : .await
911 22 : .release()
912 92 : .await
913 22 : }
914 : }
915 :
916 : #[test]
917 2 : fn spawn_blocking_pool_helper_actually_works() {
918 2 : // create a custom runtime for which we know and control how many blocking threads it has
919 2 : //
920 2 : // because the amount is not configurable for our helper, expect the same amount as
921 2 : // BACKGROUND_RUNTIME using the tokio defaults would have.
922 2 : let rt = tokio::runtime::Builder::new_current_thread()
923 2 : .max_blocking_threads(1)
924 2 : .enable_all()
925 2 : .build()
926 2 : .unwrap();
927 2 :
928 2 : let handle = rt.handle();
929 2 :
930 2 : rt.block_on(async move {
931 : // this will not return until all threads are spun up and actually executing the code
932 : // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
933 2 : let consumed =
934 2 : SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
935 :
936 2 : println!("consumed");
937 2 :
938 2 : let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
939 2 : // this will not get to run before we release
940 2 : }));
941 2 :
942 2 : println!("spawned");
943 2 :
944 2 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
945 2 : .await
946 2 : .expect_err("the task should not have gotten to run yet");
947 2 :
948 2 : println!("tried to join");
949 2 :
950 2 : consumed.release().await;
951 :
952 2 : println!("released");
953 2 :
954 2 : tokio::time::timeout(std::time::Duration::from_secs(1), jh)
955 0 : .await
956 2 : .expect("no timeout")
957 2 : .expect("no join error");
958 2 :
959 2 : println!("joined");
960 2 : });
961 2 : }
|