Line data Source code
1 : use pageserver_api::key::CONTROLFILE_KEY;
2 : use tokio::task::JoinSet;
3 : use utils::{
4 : completion::{self, Completion},
5 : id::TimelineId,
6 : };
7 :
8 : use super::failpoints::{Failpoint, FailpointKind};
9 : use super::*;
10 : use crate::context::DownloadBehavior;
11 : use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
12 :
13 : /// Used in tests to advance a future to wanted await point, and not futher.
14 : const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
15 :
16 : /// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
17 : /// timeout uses to advance futures.
18 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
19 :
20 : /// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
21 : #[tokio::test]
22 2 : async fn smoke_test() {
23 2 : let handle = tokio::runtime::Handle::current();
24 2 :
25 2 : let h = TenantHarness::create("smoke_test").await.unwrap();
26 2 : let span = h.span();
27 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
28 8 : let (tenant, _) = h.load().await;
29 2 :
30 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
31 2 :
32 2 : let timeline = tenant
33 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
34 6 : .await
35 2 : .unwrap();
36 2 :
37 2 : let layer = {
38 2 : let mut layers = {
39 2 : let layers = timeline.layers.read().await;
40 2 : layers.likely_resident_layers().collect::<Vec<_>>()
41 2 : };
42 2 :
43 2 : assert_eq!(layers.len(), 1);
44 2 :
45 2 : layers.swap_remove(0)
46 2 : };
47 2 :
48 2 : // all layers created at pageserver are like `layer`, initialized with strong
49 2 : // Arc<DownloadedLayer>.
50 2 :
51 2 : let img_before = {
52 2 : let mut data = ValueReconstructState::default();
53 2 : layer
54 2 : .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
55 4 : .await
56 2 : .unwrap();
57 2 : data.img
58 2 : .take()
59 2 : .expect("tenant harness writes the control file")
60 2 : };
61 2 :
62 2 : // important part is evicting the layer, which can be done when there are no more ResidentLayer
63 2 : // instances -- there currently are none, only two `Layer` values, one in the layermap and on
64 2 : // in scope.
65 2 : layer.evict_and_wait(FOREVER).await.unwrap();
66 2 :
67 2 : // double-evict returns an error, which is valid if both eviction_task and disk usage based
68 2 : // eviction would both evict the same layer at the same time.
69 2 :
70 2 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
71 2 : assert!(matches!(e, EvictionError::NotFound));
72 2 :
73 2 : // on accesses when the layer is evicted, it will automatically be downloaded.
74 2 : let img_after = {
75 2 : let mut data = ValueReconstructState::default();
76 2 : layer
77 2 : .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
78 2 : .instrument(download_span.clone())
79 9 : .await
80 2 : .unwrap();
81 2 : data.img.take().unwrap()
82 2 : };
83 2 :
84 2 : assert_eq!(img_before, img_after);
85 2 :
86 2 : // evict_and_wait can timeout, but it doesn't cancel the evicting itself
87 2 : //
88 2 : // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
89 2 : // artificially slow it down.
90 2 : let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
91 2 :
92 2 : match layer
93 2 : .evict_and_wait(std::time::Duration::ZERO)
94 2 : .await
95 2 : .unwrap_err()
96 2 : {
97 2 : EvictionError::Timeout => {
98 2 : // expected, but note that the eviction is "still ongoing"
99 34 : helper.release().await;
100 2 : // exhaust spawn_blocking pool to ensure it is now complete
101 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
102 10 : .await;
103 2 : }
104 2 : other => unreachable!("{other:?}"),
105 2 : }
106 2 :
107 2 : // only way to query if a layer is resident is to acquire a ResidentLayer instance.
108 2 : // Layer::keep_resident never downloads, but it might initialize if the layer file is found
109 2 : // downloaded locally.
110 2 : let none = layer.keep_resident().await;
111 2 : assert!(
112 2 : none.is_none(),
113 2 : "Expected none, because eviction removed the local file, found: {none:?}"
114 2 : );
115 2 :
116 2 : // plain downloading is rarely needed
117 2 : layer
118 2 : .download_and_keep_resident()
119 2 : .instrument(download_span)
120 4 : .await
121 2 : .unwrap();
122 2 :
123 2 : // last important part is deletion on drop: gc and compaction use it for compacted L0 layers
124 2 : // or fully garbage collected layers. deletion means deleting the local file, and scheduling a
125 2 : // deletion of the already unlinked from index_part.json remote file.
126 2 : //
127 2 : // marking a layer to be deleted on drop is irreversible; there is no technical reason against
128 2 : // reversiblity, but currently it is not needed so it is not provided.
129 2 : layer.delete_on_drop();
130 2 :
131 2 : let path = layer.local_path().to_owned();
132 2 :
133 2 : // wait_drop produces an unconnected to Layer future which will resolve when the
134 2 : // LayerInner::drop has completed.
135 2 : let mut wait_drop = std::pin::pin!(layer.wait_drop());
136 2 :
137 2 : // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
138 2 : // until here
139 2 : tokio::time::pause();
140 2 : tokio::time::timeout(ADVANCE, &mut wait_drop)
141 2 : .await
142 2 : .expect_err("should had timed out because two strong references exist");
143 2 :
144 2 : tokio::fs::metadata(&path)
145 2 : .await
146 2 : .expect("the local layer file still exists");
147 2 :
148 2 : let rtc = &timeline.remote_client;
149 2 :
150 2 : {
151 2 : let layers = &[layer];
152 2 : let mut g = timeline.layers.write().await;
153 2 : g.finish_gc_timeline(layers);
154 2 : // this just updates the remote_physical_size for demonstration purposes
155 2 : rtc.schedule_gc_update(layers).unwrap();
156 2 : }
157 2 :
158 2 : // when strong references are dropped, the file is deleted and remote deletion is scheduled
159 2 : wait_drop.await;
160 2 :
161 2 : let e = tokio::fs::metadata(&path)
162 2 : .await
163 2 : .expect_err("the local file is deleted");
164 2 : assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
165 2 :
166 2 : rtc.wait_completion().await.unwrap();
167 2 :
168 2 : assert_eq!(rtc.get_remote_physical_size(), 0);
169 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
170 2 : }
171 :
172 : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
173 : /// time. Now both of them complete per Arc drop semantics.
174 : #[tokio::test(start_paused = true)]
175 2 : async fn evict_and_wait_on_wanted_deleted() {
176 2 : // this is the runtime on which Layer spawns the blocking tasks on
177 2 : let handle = tokio::runtime::Handle::current();
178 2 :
179 2 : let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
180 2 : .await
181 2 : .unwrap();
182 2 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
183 8 : let (tenant, ctx) = h.load().await;
184 2 :
185 2 : let timeline = tenant
186 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
187 6 : .await
188 2 : .unwrap();
189 2 :
190 2 : let layer = {
191 2 : let mut layers = {
192 2 : let layers = timeline.layers.read().await;
193 2 : layers.likely_resident_layers().collect::<Vec<_>>()
194 2 : };
195 2 :
196 2 : assert_eq!(layers.len(), 1);
197 2 :
198 2 : layers.swap_remove(0)
199 2 : };
200 2 :
201 2 : // setup done
202 2 :
203 2 : let resident = layer.keep_resident().await.unwrap();
204 2 :
205 2 : {
206 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
207 2 :
208 2 : // drive the future to await on the status channel
209 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
210 2 : .await
211 2 : .expect_err("should had been a timeout since we are holding the layer resident");
212 2 :
213 2 : layer.delete_on_drop();
214 2 :
215 2 : drop(resident);
216 2 :
217 2 : // make sure the eviction task gets to run
218 8 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
219 2 :
220 2 : let resident = layer.keep_resident().await;
221 2 : assert!(
222 2 : resident.is_none(),
223 2 : "keep_resident should not have re-initialized: {resident:?}"
224 2 : );
225 2 :
226 2 : evict_and_wait
227 4 : .await
228 2 : .expect("evict_and_wait should had succeeded");
229 2 :
230 2 : // works as intended
231 2 : }
232 2 :
233 2 : // assert that once we remove the `layer` from the layer map and drop our reference,
234 2 : // the deletion of the layer in remote_storage happens.
235 2 : {
236 2 : let mut layers = timeline.layers.write().await;
237 2 : layers.finish_gc_timeline(&[layer]);
238 2 : }
239 2 :
240 8 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
241 2 :
242 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
243 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
244 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
245 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
246 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
247 2 : }
248 :
249 : /// This test ensures we are able to read the layer while the layer eviction has been
250 : /// started but not completed.
251 : #[test]
252 2 : fn read_wins_pending_eviction() {
253 2 : let rt = tokio::runtime::Builder::new_current_thread()
254 2 : .max_blocking_threads(1)
255 2 : .enable_all()
256 2 : .start_paused(true)
257 2 : .build()
258 2 : .unwrap();
259 2 :
260 2 : rt.block_on(async move {
261 2 : // this is the runtime on which Layer spawns the blocking tasks on
262 2 : let handle = tokio::runtime::Handle::current();
263 2 : let h = TenantHarness::create("read_wins_pending_eviction")
264 0 : .await
265 2 : .unwrap();
266 8 : let (tenant, ctx) = h.load().await;
267 2 : let span = h.span();
268 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
269 :
270 2 : let timeline = tenant
271 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
272 5 : .await
273 2 : .unwrap();
274 :
275 2 : let layer = {
276 2 : let mut layers = {
277 2 : let layers = timeline.layers.read().await;
278 2 : layers.likely_resident_layers().collect::<Vec<_>>()
279 2 : };
280 2 :
281 2 : assert_eq!(layers.len(), 1);
282 :
283 2 : layers.swap_remove(0)
284 : };
285 :
286 : // setup done
287 :
288 2 : let resident = layer.keep_resident().await.unwrap();
289 2 :
290 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
291 2 :
292 2 : // drive the future to await on the status channel
293 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
294 2 : .await
295 2 : .expect_err("should had been a timeout since we are holding the layer resident");
296 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
297 :
298 2 : let (completion, barrier) = utils::completion::channel();
299 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
300 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
301 2 : Some(arrival),
302 2 : barrier,
303 2 : ));
304 2 :
305 2 : // now the eviction cannot proceed because the threads are consumed while completion exists
306 2 : drop(resident);
307 2 : arrived_at_barrier.wait().await;
308 2 : assert!(!layer.is_likely_resident());
309 :
310 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
311 2 : layer
312 2 : .0
313 2 : .get_or_maybe_download(false, None)
314 2 : .instrument(download_span)
315 1 : .await
316 2 : .expect("should had reinitialized without downloading");
317 2 :
318 2 : assert!(layer.is_likely_resident());
319 :
320 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
321 2 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
322 0 : .await
323 2 : .expect("no timeout, because get_or_maybe_download re-initialized")
324 2 : .expect_err("eviction should not have succeeded because re-initialized");
325 2 :
326 2 : // works as intended: evictions lose to "downloads"
327 2 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
328 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
329 :
330 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
331 : // because of a failpoint
332 2 : assert_eq!(
333 2 : 0,
334 2 : LAYER_IMPL_METRICS
335 2 : .cancelled_evictions
336 2 : .values()
337 18 : .map(|ctr| ctr.get())
338 2 : .sum::<u64>()
339 2 : );
340 :
341 2 : drop(completion);
342 2 :
343 4 : tokio::time::sleep(ADVANCE).await;
344 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
345 2 : .await;
346 :
347 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
348 :
349 : // now we finally can observe the original eviction failing
350 : // it would had been possible to observe it earlier, but here it is guaranteed to have
351 : // happened.
352 2 : assert_eq!(
353 2 : 1,
354 2 : LAYER_IMPL_METRICS
355 2 : .cancelled_evictions
356 2 : .values()
357 18 : .map(|ctr| ctr.get())
358 2 : .sum::<u64>()
359 2 : );
360 :
361 2 : assert_eq!(
362 2 : 1,
363 2 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
364 2 : );
365 :
366 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
367 2 : });
368 2 : }
369 :
370 : /// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
371 : #[test]
372 2 : fn multiple_pending_evictions_in_order() {
373 2 : let name = "multiple_pending_evictions_in_order";
374 2 : let in_order = true;
375 2 : multiple_pending_evictions_scenario(name, in_order);
376 2 : }
377 :
378 : /// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
379 : #[test]
380 2 : fn multiple_pending_evictions_out_of_order() {
381 2 : let name = "multiple_pending_evictions_out_of_order";
382 2 : let in_order = false;
383 2 : multiple_pending_evictions_scenario(name, in_order);
384 2 : }
385 :
386 4 : fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
387 4 : let rt = tokio::runtime::Builder::new_current_thread()
388 4 : .max_blocking_threads(1)
389 4 : .enable_all()
390 4 : .start_paused(true)
391 4 : .build()
392 4 : .unwrap();
393 4 :
394 4 : rt.block_on(async move {
395 4 : // this is the runtime on which Layer spawns the blocking tasks on
396 4 : let handle = tokio::runtime::Handle::current();
397 4 : let h = TenantHarness::create(name).await.unwrap();
398 16 : let (tenant, ctx) = h.load().await;
399 4 : let span = h.span();
400 4 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
401 :
402 4 : let timeline = tenant
403 4 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
404 12 : .await
405 4 : .unwrap();
406 :
407 4 : let layer = {
408 4 : let mut layers = {
409 4 : let layers = timeline.layers.read().await;
410 4 : layers.likely_resident_layers().collect::<Vec<_>>()
411 4 : };
412 4 :
413 4 : assert_eq!(layers.len(), 1);
414 :
415 4 : layers.swap_remove(0)
416 : };
417 :
418 : // setup done
419 :
420 4 : let resident = layer.keep_resident().await.unwrap();
421 4 :
422 4 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
423 4 :
424 4 : // drive the future to await on the status channel
425 4 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
426 4 : .await
427 4 : .expect_err("should had been a timeout since we are holding the layer resident");
428 4 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
429 :
430 4 : let (completion1, barrier) = utils::completion::channel();
431 4 : let mut completion1 = Some(completion1);
432 4 : let (arrival, arrived_at_barrier) = utils::completion::channel();
433 4 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
434 4 : Some(arrival),
435 4 : barrier,
436 4 : ));
437 4 :
438 4 : // now the eviction cannot proceed because we are simulating arbitrary long delay for the
439 4 : // eviction task start.
440 4 : drop(resident);
441 4 : assert!(!layer.is_likely_resident());
442 :
443 4 : arrived_at_barrier.wait().await;
444 :
445 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
446 4 : layer
447 4 : .0
448 4 : .get_or_maybe_download(false, None)
449 4 : .instrument(download_span)
450 2 : .await
451 4 : .expect("should had reinitialized without downloading");
452 4 :
453 4 : assert!(layer.is_likely_resident());
454 :
455 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
456 4 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
457 0 : .await
458 4 : .expect("no timeout, because get_or_maybe_download re-initialized")
459 4 : .expect_err("eviction should not have succeeded because re-initialized");
460 4 :
461 4 : // works as intended: evictions lose to "downloads"
462 4 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
463 4 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
464 :
465 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
466 : // because of a failpoint
467 4 : assert_eq!(
468 4 : 0,
469 4 : LAYER_IMPL_METRICS
470 4 : .cancelled_evictions
471 4 : .values()
472 36 : .map(|ctr| ctr.get())
473 4 : .sum::<u64>()
474 4 : );
475 :
476 4 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
477 :
478 : // configure another failpoint for the second eviction -- evictions are per initialization,
479 : // so now that we've reinitialized the inner, we get to run two of them at the same time.
480 4 : let (completion2, barrier) = utils::completion::channel();
481 4 : let (arrival, arrived_at_barrier) = utils::completion::channel();
482 4 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
483 4 : Some(arrival),
484 4 : barrier,
485 4 : ));
486 4 :
487 4 : let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
488 4 :
489 4 : // advance to the wait on the queue
490 4 : tokio::time::timeout(ADVANCE, &mut second_eviction)
491 8 : .await
492 4 : .expect_err("timeout because failpoint is blocking");
493 4 :
494 4 : arrived_at_barrier.wait().await;
495 :
496 4 : assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
497 :
498 4 : let mut release_earlier_eviction = |expected_reason| {
499 4 : assert_eq!(
500 4 : 0,
501 4 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
502 4 : );
503 :
504 4 : drop(completion1.take().unwrap());
505 4 :
506 4 : let handle = &handle;
507 :
508 4 : async move {
509 4 : tokio::time::sleep(ADVANCE).await;
510 4 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
511 4 : handle, 1,
512 4 : )
513 7 : .await;
514 :
515 4 : assert_eq!(
516 4 : 1,
517 4 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
518 4 : );
519 4 : }
520 4 : };
521 :
522 4 : if in_order {
523 5 : release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
524 2 : }
525 :
526 : // release the later eviction which is for the current version
527 4 : drop(completion2);
528 8 : tokio::time::sleep(ADVANCE).await;
529 4 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
530 7 : .await;
531 :
532 4 : if !in_order {
533 6 : release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
534 2 : }
535 :
536 4 : tokio::time::timeout(ADVANCE, &mut second_eviction)
537 0 : .await
538 4 : .expect("eviction goes through now that spawn_blocking is unclogged")
539 4 : .expect("eviction should succeed, because version matches");
540 4 :
541 4 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
542 :
543 : // ensure the cancelled are unchanged
544 4 : assert_eq!(
545 4 : 1,
546 4 : LAYER_IMPL_METRICS
547 4 : .cancelled_evictions
548 4 : .values()
549 36 : .map(|ctr| ctr.get())
550 4 : .sum::<u64>()
551 4 : );
552 :
553 4 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
554 4 : });
555 4 : }
556 :
557 : /// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
558 : /// a `Layer::keep_resident` call.
559 : ///
560 : /// This matters because cancelling the eviction would leave us in a state where the file is on
561 : /// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
562 : /// have non-repairing `Layer::is_likely_resident`.
563 : #[tokio::test(start_paused = true)]
564 2 : async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
565 2 : let handle = tokio::runtime::Handle::current();
566 2 : let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
567 2 : .await
568 2 : .unwrap();
569 8 : let (tenant, ctx) = h.load().await;
570 2 :
571 2 : let timeline = tenant
572 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
573 6 : .await
574 2 : .unwrap();
575 2 :
576 2 : let layer = {
577 2 : let mut layers = {
578 2 : let layers = timeline.layers.read().await;
579 2 : layers.likely_resident_layers().collect::<Vec<_>>()
580 2 : };
581 2 :
582 2 : assert_eq!(layers.len(), 1);
583 2 :
584 2 : layers.swap_remove(0)
585 2 : };
586 2 :
587 2 : // this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
588 2 : // Err) at the right time as in "during" the `LayerInner::needs_download`.
589 2 : layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
590 2 :
591 2 : let (completion, barrier) = utils::completion::channel();
592 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
593 2 :
594 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
595 2 : Some(arrival),
596 2 : barrier,
597 2 : ));
598 2 :
599 2 : tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
600 2 : .await
601 2 : .expect_err("should had advanced to waiting on channel");
602 2 :
603 2 : arrived_at_barrier.wait().await;
604 2 :
605 2 : // simulate a cancelled read which is cancelled before it gets to re-initialize
606 2 : let e = layer
607 2 : .0
608 2 : .get_or_maybe_download(false, None)
609 2 : .await
610 2 : .unwrap_err();
611 2 : assert!(
612 2 : matches!(
613 2 : e,
614 2 : DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
615 2 : ),
616 2 : "{e:?}"
617 2 : );
618 2 :
619 2 : assert!(
620 2 : layer.0.needs_download().await.unwrap().is_none(),
621 2 : "file is still on disk"
622 2 : );
623 2 :
624 2 : // release the eviction task
625 2 : drop(completion);
626 2 : tokio::time::sleep(ADVANCE).await;
627 22 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
628 2 :
629 2 : // failpoint is still enabled, but it is not hit
630 2 : let e = layer
631 2 : .0
632 2 : .get_or_maybe_download(false, None)
633 2 : .await
634 2 : .unwrap_err();
635 2 : assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
636 2 :
637 2 : // failpoint is not counted as cancellation either
638 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
639 2 : }
640 :
641 : #[tokio::test(start_paused = true)]
642 2 : async fn evict_and_wait_does_not_wait_for_download() {
643 2 : // let handle = tokio::runtime::Handle::current();
644 2 : let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
645 2 : .await
646 2 : .unwrap();
647 8 : let (tenant, ctx) = h.load().await;
648 2 : let span = h.span();
649 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
650 2 :
651 2 : let timeline = tenant
652 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
653 6 : .await
654 2 : .unwrap();
655 2 :
656 2 : let layer = {
657 2 : let mut layers = {
658 2 : let layers = timeline.layers.read().await;
659 2 : layers.likely_resident_layers().collect::<Vec<_>>()
660 2 : };
661 2 :
662 2 : assert_eq!(layers.len(), 1);
663 2 :
664 2 : layers.swap_remove(0)
665 2 : };
666 2 :
667 2 : // kind of forced setup: start an eviction but do not allow it progress until we are
668 2 : // downloading
669 2 : let (eviction_can_continue, barrier) = utils::completion::channel();
670 2 : let (arrival, eviction_arrived) = utils::completion::channel();
671 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
672 2 : Some(arrival),
673 2 : barrier,
674 2 : ));
675 2 :
676 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
677 2 :
678 2 : // use this once-awaited other_evict to synchronize with the eviction
679 2 : let other_evict = layer.evict_and_wait(FOREVER);
680 2 :
681 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
682 2 : .await
683 2 : .expect_err("should had advanced");
684 2 : eviction_arrived.wait().await;
685 2 : drop(eviction_can_continue);
686 2 : other_evict.await.unwrap();
687 2 :
688 2 : // now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
689 2 : assert!(!layer.is_likely_resident());
690 2 :
691 2 : // following new evict_and_wait will fail until we've completed the download
692 2 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
693 2 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
694 2 :
695 2 : let (download_can_continue, barrier) = utils::completion::channel();
696 2 : let (arrival, _download_arrived) = utils::completion::channel();
697 2 : layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
698 2 :
699 2 : let mut download = std::pin::pin!(layer
700 2 : .0
701 2 : .get_or_maybe_download(true, None)
702 2 : .instrument(download_span));
703 2 :
704 2 : assert!(
705 2 : !layer.is_likely_resident(),
706 2 : "during download layer is evicted"
707 2 : );
708 2 :
709 2 : tokio::time::timeout(ADVANCE, &mut download)
710 5 : .await
711 2 : .expect_err("should had timed out because of failpoint");
712 2 :
713 2 : // now we finally get to continue, and because the latest state is downloading, we deduce that
714 2 : // original eviction succeeded
715 2 : evict_and_wait.await.unwrap();
716 2 :
717 2 : // however a new evict_and_wait will fail
718 2 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
719 2 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
720 2 :
721 2 : assert!(!layer.is_likely_resident());
722 2 :
723 2 : drop(download_can_continue);
724 2 : download.await.expect("download should had succeeded");
725 2 : assert!(layer.is_likely_resident());
726 2 :
727 2 : // only now can we evict
728 2 : layer.evict_and_wait(FOREVER).await.unwrap();
729 2 : }
730 :
731 : /// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
732 : /// which is the last value.
733 : ///
734 : /// Also checks that the same does not happen on a non-evicted layer (regression test).
735 : #[tokio::test(start_paused = true)]
736 2 : async fn eviction_cancellation_on_drop() {
737 2 : use crate::repository::Value;
738 2 : use bytes::Bytes;
739 2 :
740 2 : // this is the runtime on which Layer spawns the blocking tasks on
741 2 : let handle = tokio::runtime::Handle::current();
742 2 :
743 2 : let h = TenantHarness::create("eviction_cancellation_on_drop")
744 2 : .await
745 2 : .unwrap();
746 2 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
747 8 : let (tenant, ctx) = h.load().await;
748 2 :
749 2 : let timeline = tenant
750 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
751 6 : .await
752 2 : .unwrap();
753 2 :
754 2 : {
755 2 : // create_test_timeline wrote us one layer, write another
756 2 : let mut writer = timeline.writer().await;
757 2 : writer
758 2 : .put(
759 2 : Key::from_i128(5),
760 2 : Lsn(0x20),
761 2 : &Value::Image(Bytes::from_static(b"this does not matter either")),
762 2 : &ctx,
763 2 : )
764 2 : .await
765 2 : .unwrap();
766 2 :
767 2 : writer.finish_write(Lsn(0x20));
768 2 : }
769 2 :
770 2 : timeline.freeze_and_flush().await.unwrap();
771 2 :
772 2 : // wait for the upload to complete so our Arc::strong_count assertion holds
773 2 : timeline.remote_client.wait_completion().await.unwrap();
774 2 :
775 2 : let (evicted_layer, not_evicted) = {
776 2 : let mut layers = {
777 2 : let mut guard = timeline.layers.write().await;
778 2 : let layers = guard.likely_resident_layers().collect::<Vec<_>>();
779 2 : // remove the layers from layermap
780 2 : guard.finish_gc_timeline(&layers);
781 2 :
782 2 : layers
783 2 : };
784 2 :
785 2 : assert_eq!(layers.len(), 2);
786 2 :
787 2 : (layers.pop().unwrap(), layers.pop().unwrap())
788 2 : };
789 2 :
790 2 : let victims = [(evicted_layer, true), (not_evicted, false)];
791 2 :
792 6 : for (victim, evict) in victims {
793 4 : let resident = victim.keep_resident().await.unwrap();
794 4 : drop(victim);
795 4 :
796 4 : assert_eq!(Arc::strong_count(&resident.owner.0), 1);
797 2 :
798 4 : if evict {
799 2 : let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
800 2 :
801 2 : // drive the future to await on the status channel, and then drop it
802 2 : tokio::time::timeout(ADVANCE, evict_and_wait)
803 2 : .await
804 2 : .expect_err("should had been a timeout since we are holding the layer resident");
805 2 : }
806 2 :
807 2 : // 1 == we only evict one of the layers
808 4 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
809 2 :
810 4 : drop(resident);
811 4 :
812 4 : // run any spawned
813 4 : tokio::time::sleep(ADVANCE).await;
814 2 :
815 101 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
816 2 :
817 4 : assert_eq!(
818 4 : 1,
819 4 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
820 4 : );
821 2 : }
822 2 : }
823 :
824 : /// A test case to remind you the cost of these structures. You can bump the size limit
825 : /// below if it is really necessary to add more fields to the structures.
826 : #[test]
827 : #[cfg(target_arch = "x86_64")]
828 2 : fn layer_size() {
829 2 : assert_eq!(std::mem::size_of::<LayerAccessStats>(), 2040);
830 2 : assert_eq!(std::mem::size_of::<PersistentLayerDesc>(), 104);
831 2 : assert_eq!(std::mem::size_of::<LayerInner>(), 2344);
832 : // it also has the utf8 path
833 2 : }
834 :
835 : struct SpawnBlockingPoolHelper {
836 : awaited_by_spawn_blocking_tasks: Completion,
837 : blocking_tasks: JoinSet<()>,
838 : }
839 :
840 : impl SpawnBlockingPoolHelper {
841 : /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
842 : /// release is called.
843 : ///
844 : /// In the tests this can be used to ensure something cannot be started on the target runtimes
845 : /// spawn_blocking pool.
846 : ///
847 : /// This should be no issue nowdays, because nextest runs each test in it's own process.
848 2 : async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
849 2 : let default_max_blocking_threads = 512;
850 2 :
851 2 : Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
852 2 : }
853 :
854 26 : async fn consume_all_spawn_blocking_threads0(
855 26 : handle: &tokio::runtime::Handle,
856 26 : threads: usize,
857 26 : ) -> Self {
858 26 : assert_ne!(threads, 0);
859 :
860 26 : let (completion, barrier) = completion::channel();
861 26 : let (started, starts_completed) = completion::channel();
862 26 :
863 26 : let mut blocking_tasks = JoinSet::new();
864 26 :
865 7180 : for _ in 0..threads {
866 7180 : let barrier = barrier.clone();
867 7180 : let started = started.clone();
868 7180 : blocking_tasks.spawn_blocking_on(
869 7180 : move || {
870 7180 : drop(started);
871 7180 : tokio::runtime::Handle::current().block_on(barrier.wait());
872 7180 : },
873 7180 : handle,
874 7180 : );
875 7180 : }
876 :
877 26 : drop(started);
878 26 :
879 26 : starts_completed.wait().await;
880 :
881 26 : drop(barrier);
882 26 :
883 26 : tracing::trace!("consumed all threads");
884 :
885 26 : SpawnBlockingPoolHelper {
886 26 : awaited_by_spawn_blocking_tasks: completion,
887 26 : blocking_tasks,
888 26 : }
889 26 : }
890 :
891 : /// Release all previously blocked spawn_blocking threads
892 26 : async fn release(self) {
893 26 : let SpawnBlockingPoolHelper {
894 26 : awaited_by_spawn_blocking_tasks,
895 26 : mut blocking_tasks,
896 26 : } = self;
897 26 :
898 26 : drop(awaited_by_spawn_blocking_tasks);
899 :
900 7206 : while let Some(res) = blocking_tasks.join_next().await {
901 7180 : res.expect("none of the tasks should had panicked");
902 7180 : }
903 :
904 26 : tracing::trace!("released all threads");
905 26 : }
906 :
907 : /// In the tests it is used as an easy way of making sure something scheduled on the target
908 : /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
909 : /// before our tasks have a chance to schedule and complete.
910 12 : async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
911 149 : Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
912 12 : }
913 :
914 22 : async fn consume_and_release_all_of_spawn_blocking_threads0(
915 22 : handle: &tokio::runtime::Handle,
916 22 : threads: usize,
917 22 : ) {
918 22 : Self::consume_all_spawn_blocking_threads0(handle, threads)
919 18 : .await
920 22 : .release()
921 147 : .await
922 22 : }
923 : }
924 :
925 : #[test]
926 2 : fn spawn_blocking_pool_helper_actually_works() {
927 2 : // create a custom runtime for which we know and control how many blocking threads it has
928 2 : //
929 2 : // because the amount is not configurable for our helper, expect the same amount as
930 2 : // BACKGROUND_RUNTIME using the tokio defaults would have.
931 2 : let rt = tokio::runtime::Builder::new_current_thread()
932 2 : .max_blocking_threads(1)
933 2 : .enable_all()
934 2 : .build()
935 2 : .unwrap();
936 2 :
937 2 : let handle = rt.handle();
938 2 :
939 2 : rt.block_on(async move {
940 : // this will not return until all threads are spun up and actually executing the code
941 : // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
942 2 : let consumed =
943 2 : SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
944 :
945 2 : println!("consumed");
946 2 :
947 2 : let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
948 2 : // this will not get to run before we release
949 2 : }));
950 2 :
951 2 : println!("spawned");
952 2 :
953 2 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
954 2 : .await
955 2 : .expect_err("the task should not have gotten to run yet");
956 2 :
957 2 : println!("tried to join");
958 2 :
959 2 : consumed.release().await;
960 :
961 2 : println!("released");
962 2 :
963 2 : tokio::time::timeout(std::time::Duration::from_secs(1), jh)
964 0 : .await
965 2 : .expect("no timeout")
966 2 : .expect("no join error");
967 2 :
968 2 : println!("joined");
969 2 : });
970 2 : }
|