Line data Source code
1 : use std::time::UNIX_EPOCH;
2 :
3 : use pageserver_api::key::CONTROLFILE_KEY;
4 : use tokio::task::JoinSet;
5 : use utils::{
6 : completion::{self, Completion},
7 : id::TimelineId,
8 : };
9 :
10 : use super::failpoints::{Failpoint, FailpointKind};
11 : use super::*;
12 : use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint};
13 : use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
14 :
15 : /// Used in tests to advance a future to wanted await point, and not futher.
16 : const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
17 :
18 : /// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
19 : /// timeout uses to advance futures.
20 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
21 :
22 : /// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
23 : #[tokio::test]
24 2 : async fn smoke_test() {
25 2 : let handle = tokio::runtime::Handle::current();
26 2 :
27 2 : let h = TenantHarness::create("smoke_test").await.unwrap();
28 2 : let span = h.span();
29 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
30 8 : let (tenant, _) = h.load().await;
31 2 :
32 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
33 2 :
34 2 : let timeline = tenant
35 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
36 4 : .await
37 2 : .unwrap();
38 2 :
39 2 : let layer = {
40 2 : let mut layers = {
41 2 : let layers = timeline.layers.read().await;
42 2 : layers.likely_resident_layers().collect::<Vec<_>>()
43 2 : };
44 2 :
45 2 : assert_eq!(layers.len(), 1);
46 2 :
47 2 : layers.swap_remove(0)
48 2 : };
49 2 :
50 2 : // all layers created at pageserver are like `layer`, initialized with strong
51 2 : // Arc<DownloadedLayer>.
52 2 :
53 2 : let img_before = {
54 2 : let mut data = ValueReconstructState::default();
55 2 : layer
56 2 : .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
57 4 : .await
58 2 : .unwrap();
59 2 : data.img
60 2 : .take()
61 2 : .expect("tenant harness writes the control file")
62 2 : };
63 2 :
64 2 : // important part is evicting the layer, which can be done when there are no more ResidentLayer
65 2 : // instances -- there currently are none, only two `Layer` values, one in the layermap and on
66 2 : // in scope.
67 2 : layer.evict_and_wait(FOREVER).await.unwrap();
68 2 :
69 2 : // double-evict returns an error, which is valid if both eviction_task and disk usage based
70 2 : // eviction would both evict the same layer at the same time.
71 2 :
72 2 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
73 2 : assert!(matches!(e, EvictionError::NotFound));
74 2 :
75 2 : // on accesses when the layer is evicted, it will automatically be downloaded.
76 2 : let img_after = {
77 2 : let mut data = ValueReconstructState::default();
78 2 : layer
79 2 : .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
80 2 : .instrument(download_span.clone())
81 9 : .await
82 2 : .unwrap();
83 2 : data.img.take().unwrap()
84 2 : };
85 2 :
86 2 : assert_eq!(img_before, img_after);
87 2 :
88 2 : // evict_and_wait can timeout, but it doesn't cancel the evicting itself
89 2 : //
90 2 : // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
91 2 : // artificially slow it down.
92 2 : let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
93 2 :
94 2 : match layer
95 2 : .evict_and_wait(std::time::Duration::ZERO)
96 2 : .await
97 2 : .unwrap_err()
98 2 : {
99 2 : EvictionError::Timeout => {
100 2 : // expected, but note that the eviction is "still ongoing"
101 51 : helper.release().await;
102 2 : // exhaust spawn_blocking pool to ensure it is now complete
103 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
104 8 : .await;
105 2 : }
106 2 : other => unreachable!("{other:?}"),
107 2 : }
108 2 :
109 2 : // only way to query if a layer is resident is to acquire a ResidentLayer instance.
110 2 : // Layer::keep_resident never downloads, but it might initialize if the layer file is found
111 2 : // downloaded locally.
112 2 : let none = layer.keep_resident().await;
113 2 : assert!(
114 2 : none.is_none(),
115 2 : "Expected none, because eviction removed the local file, found: {none:?}"
116 2 : );
117 2 :
118 2 : // plain downloading is rarely needed
119 2 : layer
120 2 : .download_and_keep_resident()
121 2 : .instrument(download_span)
122 6 : .await
123 2 : .unwrap();
124 2 :
125 2 : // last important part is deletion on drop: gc and compaction use it for compacted L0 layers
126 2 : // or fully garbage collected layers. deletion means deleting the local file, and scheduling a
127 2 : // deletion of the already unlinked from index_part.json remote file.
128 2 : //
129 2 : // marking a layer to be deleted on drop is irreversible; there is no technical reason against
130 2 : // reversiblity, but currently it is not needed so it is not provided.
131 2 : layer.delete_on_drop();
132 2 :
133 2 : let path = layer.local_path().to_owned();
134 2 :
135 2 : // wait_drop produces an unconnected to Layer future which will resolve when the
136 2 : // LayerInner::drop has completed.
137 2 : let mut wait_drop = std::pin::pin!(layer.wait_drop());
138 2 :
139 2 : // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
140 2 : // until here
141 2 : tokio::time::pause();
142 2 : tokio::time::timeout(ADVANCE, &mut wait_drop)
143 2 : .await
144 2 : .expect_err("should had timed out because two strong references exist");
145 2 :
146 2 : tokio::fs::metadata(&path)
147 2 : .await
148 2 : .expect("the local layer file still exists");
149 2 :
150 2 : let rtc = &timeline.remote_client;
151 2 :
152 2 : {
153 2 : let layers = &[layer];
154 2 : let mut g = timeline.layers.write().await;
155 2 : g.finish_gc_timeline(layers);
156 2 : // this just updates the remote_physical_size for demonstration purposes
157 2 : rtc.schedule_gc_update(layers).unwrap();
158 2 : }
159 2 :
160 2 : // when strong references are dropped, the file is deleted and remote deletion is scheduled
161 2 : wait_drop.await;
162 2 :
163 2 : let e = tokio::fs::metadata(&path)
164 2 : .await
165 2 : .expect_err("the local file is deleted");
166 2 : assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
167 2 :
168 2 : rtc.wait_completion().await.unwrap();
169 2 :
170 2 : assert_eq!(rtc.get_remote_physical_size(), 0);
171 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
172 2 : }
173 :
174 : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
175 : /// time. Now both of them complete per Arc drop semantics.
176 : #[tokio::test(start_paused = true)]
177 2 : async fn evict_and_wait_on_wanted_deleted() {
178 2 : // this is the runtime on which Layer spawns the blocking tasks on
179 2 : let handle = tokio::runtime::Handle::current();
180 2 :
181 2 : let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
182 2 : .await
183 2 : .unwrap();
184 2 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
185 8 : let (tenant, ctx) = h.load().await;
186 2 :
187 2 : let timeline = tenant
188 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
189 4 : .await
190 2 : .unwrap();
191 2 :
192 2 : let layer = {
193 2 : let mut layers = {
194 2 : let layers = timeline.layers.read().await;
195 2 : layers.likely_resident_layers().collect::<Vec<_>>()
196 2 : };
197 2 :
198 2 : assert_eq!(layers.len(), 1);
199 2 :
200 2 : layers.swap_remove(0)
201 2 : };
202 2 :
203 2 : // setup done
204 2 :
205 2 : let resident = layer.keep_resident().await.unwrap();
206 2 :
207 2 : {
208 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
209 2 :
210 2 : // drive the future to await on the status channel
211 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
212 2 : .await
213 2 : .expect_err("should had been a timeout since we are holding the layer resident");
214 2 :
215 2 : layer.delete_on_drop();
216 2 :
217 2 : drop(resident);
218 2 :
219 2 : // make sure the eviction task gets to run
220 9 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
221 2 :
222 2 : let resident = layer.keep_resident().await;
223 2 : assert!(
224 2 : resident.is_none(),
225 2 : "keep_resident should not have re-initialized: {resident:?}"
226 2 : );
227 2 :
228 2 : evict_and_wait
229 2 : .await
230 2 : .expect("evict_and_wait should had succeeded");
231 2 :
232 2 : // works as intended
233 2 : }
234 2 :
235 2 : // assert that once we remove the `layer` from the layer map and drop our reference,
236 2 : // the deletion of the layer in remote_storage happens.
237 2 : {
238 2 : let mut layers = timeline.layers.write().await;
239 2 : layers.finish_gc_timeline(&[layer]);
240 2 : }
241 2 :
242 8 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
243 2 :
244 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
245 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
246 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
247 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
248 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
249 2 : }
250 :
251 : /// This test ensures we are able to read the layer while the layer eviction has been
252 : /// started but not completed.
253 : #[test]
254 2 : fn read_wins_pending_eviction() {
255 2 : let rt = tokio::runtime::Builder::new_current_thread()
256 2 : .max_blocking_threads(1)
257 2 : .enable_all()
258 2 : .start_paused(true)
259 2 : .build()
260 2 : .unwrap();
261 2 :
262 2 : rt.block_on(async move {
263 2 : // this is the runtime on which Layer spawns the blocking tasks on
264 2 : let handle = tokio::runtime::Handle::current();
265 2 : let h = TenantHarness::create("read_wins_pending_eviction")
266 0 : .await
267 2 : .unwrap();
268 8 : let (tenant, ctx) = h.load().await;
269 2 : let span = h.span();
270 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
271 :
272 2 : let timeline = tenant
273 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
274 4 : .await
275 2 : .unwrap();
276 :
277 2 : let layer = {
278 2 : let mut layers = {
279 2 : let layers = timeline.layers.read().await;
280 2 : layers.likely_resident_layers().collect::<Vec<_>>()
281 2 : };
282 2 :
283 2 : assert_eq!(layers.len(), 1);
284 :
285 2 : layers.swap_remove(0)
286 : };
287 :
288 : // setup done
289 :
290 2 : let resident = layer.keep_resident().await.unwrap();
291 2 :
292 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
293 2 :
294 2 : // drive the future to await on the status channel
295 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
296 2 : .await
297 2 : .expect_err("should had been a timeout since we are holding the layer resident");
298 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
299 :
300 2 : let (completion, barrier) = utils::completion::channel();
301 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
302 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
303 2 : Some(arrival),
304 2 : barrier,
305 2 : ));
306 2 :
307 2 : // now the eviction cannot proceed because the threads are consumed while completion exists
308 2 : drop(resident);
309 2 : arrived_at_barrier.wait().await;
310 2 : assert!(!layer.is_likely_resident());
311 :
312 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
313 2 : layer
314 2 : .0
315 2 : .get_or_maybe_download(false, None)
316 2 : .instrument(download_span)
317 2 : .await
318 2 : .expect("should had reinitialized without downloading");
319 2 :
320 2 : assert!(layer.is_likely_resident());
321 :
322 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
323 2 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
324 0 : .await
325 2 : .expect("no timeout, because get_or_maybe_download re-initialized")
326 2 : .expect_err("eviction should not have succeeded because re-initialized");
327 2 :
328 2 : // works as intended: evictions lose to "downloads"
329 2 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
330 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
331 :
332 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
333 : // because of a failpoint
334 2 : assert_eq!(
335 2 : 0,
336 2 : LAYER_IMPL_METRICS
337 2 : .cancelled_evictions
338 2 : .values()
339 18 : .map(|ctr| ctr.get())
340 2 : .sum::<u64>()
341 2 : );
342 :
343 2 : drop(completion);
344 2 :
345 4 : tokio::time::sleep(ADVANCE).await;
346 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
347 2 : .await;
348 :
349 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
350 :
351 : // now we finally can observe the original eviction failing
352 : // it would had been possible to observe it earlier, but here it is guaranteed to have
353 : // happened.
354 2 : assert_eq!(
355 2 : 1,
356 2 : LAYER_IMPL_METRICS
357 2 : .cancelled_evictions
358 2 : .values()
359 18 : .map(|ctr| ctr.get())
360 2 : .sum::<u64>()
361 2 : );
362 :
363 2 : assert_eq!(
364 2 : 1,
365 2 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
366 2 : );
367 :
368 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
369 2 : });
370 2 : }
371 :
372 : /// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
373 : #[test]
374 2 : fn multiple_pending_evictions_in_order() {
375 2 : let name = "multiple_pending_evictions_in_order";
376 2 : let in_order = true;
377 2 : multiple_pending_evictions_scenario(name, in_order);
378 2 : }
379 :
380 : /// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
381 : #[test]
382 2 : fn multiple_pending_evictions_out_of_order() {
383 2 : let name = "multiple_pending_evictions_out_of_order";
384 2 : let in_order = false;
385 2 : multiple_pending_evictions_scenario(name, in_order);
386 2 : }
387 :
388 4 : fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
389 4 : let rt = tokio::runtime::Builder::new_current_thread()
390 4 : .max_blocking_threads(1)
391 4 : .enable_all()
392 4 : .start_paused(true)
393 4 : .build()
394 4 : .unwrap();
395 4 :
396 4 : rt.block_on(async move {
397 4 : // this is the runtime on which Layer spawns the blocking tasks on
398 4 : let handle = tokio::runtime::Handle::current();
399 4 : let h = TenantHarness::create(name).await.unwrap();
400 16 : let (tenant, ctx) = h.load().await;
401 4 : let span = h.span();
402 4 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
403 :
404 4 : let timeline = tenant
405 4 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
406 8 : .await
407 4 : .unwrap();
408 :
409 4 : let layer = {
410 4 : let mut layers = {
411 4 : let layers = timeline.layers.read().await;
412 4 : layers.likely_resident_layers().collect::<Vec<_>>()
413 4 : };
414 4 :
415 4 : assert_eq!(layers.len(), 1);
416 :
417 4 : layers.swap_remove(0)
418 : };
419 :
420 : // setup done
421 :
422 4 : let resident = layer.keep_resident().await.unwrap();
423 4 :
424 4 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
425 4 :
426 4 : // drive the future to await on the status channel
427 4 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
428 4 : .await
429 4 : .expect_err("should had been a timeout since we are holding the layer resident");
430 4 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
431 :
432 4 : let (completion1, barrier) = utils::completion::channel();
433 4 : let mut completion1 = Some(completion1);
434 4 : let (arrival, arrived_at_barrier) = utils::completion::channel();
435 4 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
436 4 : Some(arrival),
437 4 : barrier,
438 4 : ));
439 4 :
440 4 : // now the eviction cannot proceed because we are simulating arbitrary long delay for the
441 4 : // eviction task start.
442 4 : drop(resident);
443 4 : assert!(!layer.is_likely_resident());
444 :
445 4 : arrived_at_barrier.wait().await;
446 :
447 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
448 4 : layer
449 4 : .0
450 4 : .get_or_maybe_download(false, None)
451 4 : .instrument(download_span)
452 4 : .await
453 4 : .expect("should had reinitialized without downloading");
454 4 :
455 4 : assert!(layer.is_likely_resident());
456 :
457 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
458 4 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
459 0 : .await
460 4 : .expect("no timeout, because get_or_maybe_download re-initialized")
461 4 : .expect_err("eviction should not have succeeded because re-initialized");
462 4 :
463 4 : // works as intended: evictions lose to "downloads"
464 4 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
465 4 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
466 :
467 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
468 : // because of a failpoint
469 4 : assert_eq!(
470 4 : 0,
471 4 : LAYER_IMPL_METRICS
472 4 : .cancelled_evictions
473 4 : .values()
474 36 : .map(|ctr| ctr.get())
475 4 : .sum::<u64>()
476 4 : );
477 :
478 4 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
479 :
480 : // configure another failpoint for the second eviction -- evictions are per initialization,
481 : // so now that we've reinitialized the inner, we get to run two of them at the same time.
482 4 : let (completion2, barrier) = utils::completion::channel();
483 4 : let (arrival, arrived_at_barrier) = utils::completion::channel();
484 4 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
485 4 : Some(arrival),
486 4 : barrier,
487 4 : ));
488 4 :
489 4 : let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
490 4 :
491 4 : // advance to the wait on the queue
492 4 : tokio::time::timeout(ADVANCE, &mut second_eviction)
493 8 : .await
494 4 : .expect_err("timeout because failpoint is blocking");
495 4 :
496 4 : arrived_at_barrier.wait().await;
497 :
498 4 : assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
499 :
500 4 : let mut release_earlier_eviction = |expected_reason| {
501 4 : assert_eq!(
502 4 : 0,
503 4 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
504 4 : );
505 :
506 4 : drop(completion1.take().unwrap());
507 4 :
508 4 : let handle = &handle;
509 :
510 4 : async move {
511 4 : tokio::time::sleep(ADVANCE).await;
512 4 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
513 4 : handle, 1,
514 4 : )
515 6 : .await;
516 :
517 4 : assert_eq!(
518 4 : 1,
519 4 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
520 4 : );
521 4 : }
522 4 : };
523 :
524 4 : if in_order {
525 4 : release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
526 2 : }
527 :
528 : // release the later eviction which is for the current version
529 4 : drop(completion2);
530 8 : tokio::time::sleep(ADVANCE).await;
531 4 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
532 6 : .await;
533 :
534 4 : if !in_order {
535 6 : release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
536 2 : }
537 :
538 4 : tokio::time::timeout(ADVANCE, &mut second_eviction)
539 0 : .await
540 4 : .expect("eviction goes through now that spawn_blocking is unclogged")
541 4 : .expect("eviction should succeed, because version matches");
542 4 :
543 4 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
544 :
545 : // ensure the cancelled are unchanged
546 4 : assert_eq!(
547 4 : 1,
548 4 : LAYER_IMPL_METRICS
549 4 : .cancelled_evictions
550 4 : .values()
551 36 : .map(|ctr| ctr.get())
552 4 : .sum::<u64>()
553 4 : );
554 :
555 4 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
556 4 : });
557 4 : }
558 :
559 : /// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
560 : /// a `Layer::keep_resident` call.
561 : ///
562 : /// This matters because cancelling the eviction would leave us in a state where the file is on
563 : /// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
564 : /// have non-repairing `Layer::is_likely_resident`.
565 : #[tokio::test(start_paused = true)]
566 2 : async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
567 2 : let handle = tokio::runtime::Handle::current();
568 2 : let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
569 2 : .await
570 2 : .unwrap();
571 8 : let (tenant, ctx) = h.load().await;
572 2 :
573 2 : let timeline = tenant
574 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
575 4 : .await
576 2 : .unwrap();
577 2 :
578 2 : let layer = {
579 2 : let mut layers = {
580 2 : let layers = timeline.layers.read().await;
581 2 : layers.likely_resident_layers().collect::<Vec<_>>()
582 2 : };
583 2 :
584 2 : assert_eq!(layers.len(), 1);
585 2 :
586 2 : layers.swap_remove(0)
587 2 : };
588 2 :
589 2 : // this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
590 2 : // Err) at the right time as in "during" the `LayerInner::needs_download`.
591 2 : layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
592 2 :
593 2 : let (completion, barrier) = utils::completion::channel();
594 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
595 2 :
596 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
597 2 : Some(arrival),
598 2 : barrier,
599 2 : ));
600 2 :
601 2 : tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
602 2 : .await
603 2 : .expect_err("should had advanced to waiting on channel");
604 2 :
605 2 : arrived_at_barrier.wait().await;
606 2 :
607 2 : // simulate a cancelled read which is cancelled before it gets to re-initialize
608 2 : let e = layer
609 2 : .0
610 2 : .get_or_maybe_download(false, None)
611 2 : .await
612 2 : .unwrap_err();
613 2 : assert!(
614 2 : matches!(
615 2 : e,
616 2 : DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
617 2 : ),
618 2 : "{e:?}"
619 2 : );
620 2 :
621 2 : assert!(
622 2 : layer.0.needs_download().await.unwrap().is_none(),
623 2 : "file is still on disk"
624 2 : );
625 2 :
626 2 : // release the eviction task
627 2 : drop(completion);
628 2 : tokio::time::sleep(ADVANCE).await;
629 10 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
630 2 :
631 2 : // failpoint is still enabled, but it is not hit
632 2 : let e = layer
633 2 : .0
634 2 : .get_or_maybe_download(false, None)
635 4 : .await
636 2 : .unwrap_err();
637 2 : assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
638 2 :
639 2 : // failpoint is not counted as cancellation either
640 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
641 2 : }
642 :
643 : #[tokio::test(start_paused = true)]
644 2 : async fn evict_and_wait_does_not_wait_for_download() {
645 2 : // let handle = tokio::runtime::Handle::current();
646 2 : let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
647 2 : .await
648 2 : .unwrap();
649 8 : let (tenant, ctx) = h.load().await;
650 2 : let span = h.span();
651 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
652 2 :
653 2 : let timeline = tenant
654 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
655 4 : .await
656 2 : .unwrap();
657 2 :
658 2 : let layer = {
659 2 : let mut layers = {
660 2 : let layers = timeline.layers.read().await;
661 2 : layers.likely_resident_layers().collect::<Vec<_>>()
662 2 : };
663 2 :
664 2 : assert_eq!(layers.len(), 1);
665 2 :
666 2 : layers.swap_remove(0)
667 2 : };
668 2 :
669 2 : // kind of forced setup: start an eviction but do not allow it progress until we are
670 2 : // downloading
671 2 : let (eviction_can_continue, barrier) = utils::completion::channel();
672 2 : let (arrival, eviction_arrived) = utils::completion::channel();
673 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
674 2 : Some(arrival),
675 2 : barrier,
676 2 : ));
677 2 :
678 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
679 2 :
680 2 : // use this once-awaited other_evict to synchronize with the eviction
681 2 : let other_evict = layer.evict_and_wait(FOREVER);
682 2 :
683 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
684 2 : .await
685 2 : .expect_err("should had advanced");
686 2 : eviction_arrived.wait().await;
687 2 : drop(eviction_can_continue);
688 2 : other_evict.await.unwrap();
689 2 :
690 2 : // now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
691 2 : assert!(!layer.is_likely_resident());
692 2 :
693 2 : // following new evict_and_wait will fail until we've completed the download
694 2 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
695 2 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
696 2 :
697 2 : let (download_can_continue, barrier) = utils::completion::channel();
698 2 : let (arrival, _download_arrived) = utils::completion::channel();
699 2 : layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
700 2 :
701 2 : let mut download = std::pin::pin!(layer
702 2 : .0
703 2 : .get_or_maybe_download(true, None)
704 2 : .instrument(download_span));
705 2 :
706 2 : assert!(
707 2 : !layer.is_likely_resident(),
708 2 : "during download layer is evicted"
709 2 : );
710 2 :
711 2 : tokio::time::timeout(ADVANCE, &mut download)
712 5 : .await
713 2 : .expect_err("should had timed out because of failpoint");
714 2 :
715 2 : // now we finally get to continue, and because the latest state is downloading, we deduce that
716 2 : // original eviction succeeded
717 2 : evict_and_wait.await.unwrap();
718 2 :
719 2 : // however a new evict_and_wait will fail
720 2 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
721 2 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
722 2 :
723 2 : assert!(!layer.is_likely_resident());
724 2 :
725 2 : drop(download_can_continue);
726 2 : download.await.expect("download should had succeeded");
727 2 : assert!(layer.is_likely_resident());
728 2 :
729 2 : // only now can we evict
730 2 : layer.evict_and_wait(FOREVER).await.unwrap();
731 2 : }
732 :
733 : /// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
734 : /// which is the last value.
735 : ///
736 : /// Also checks that the same does not happen on a non-evicted layer (regression test).
737 : #[tokio::test(start_paused = true)]
738 2 : async fn eviction_cancellation_on_drop() {
739 2 : use crate::repository::Value;
740 2 : use bytes::Bytes;
741 2 :
742 2 : // this is the runtime on which Layer spawns the blocking tasks on
743 2 : let handle = tokio::runtime::Handle::current();
744 2 :
745 2 : let h = TenantHarness::create("eviction_cancellation_on_drop")
746 2 : .await
747 2 : .unwrap();
748 2 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
749 8 : let (tenant, ctx) = h.load().await;
750 2 :
751 2 : let timeline = tenant
752 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
753 4 : .await
754 2 : .unwrap();
755 2 :
756 2 : {
757 2 : // create_test_timeline wrote us one layer, write another
758 2 : let mut writer = timeline.writer().await;
759 2 : writer
760 2 : .put(
761 2 : Key::from_i128(5),
762 2 : Lsn(0x20),
763 2 : &Value::Image(Bytes::from_static(b"this does not matter either")),
764 2 : &ctx,
765 2 : )
766 2 : .await
767 2 : .unwrap();
768 2 :
769 2 : writer.finish_write(Lsn(0x20));
770 2 : }
771 2 :
772 2 : timeline.freeze_and_flush().await.unwrap();
773 2 :
774 2 : // wait for the upload to complete so our Arc::strong_count assertion holds
775 2 : timeline.remote_client.wait_completion().await.unwrap();
776 2 :
777 2 : let (evicted_layer, not_evicted) = {
778 2 : let mut layers = {
779 2 : let mut guard = timeline.layers.write().await;
780 2 : let layers = guard.likely_resident_layers().collect::<Vec<_>>();
781 2 : // remove the layers from layermap
782 2 : guard.finish_gc_timeline(&layers);
783 2 :
784 2 : layers
785 2 : };
786 2 :
787 2 : assert_eq!(layers.len(), 2);
788 2 :
789 2 : (layers.pop().unwrap(), layers.pop().unwrap())
790 2 : };
791 2 :
792 2 : let victims = [(evicted_layer, true), (not_evicted, false)];
793 2 :
794 6 : for (victim, evict) in victims {
795 4 : let resident = victim.keep_resident().await.unwrap();
796 4 : drop(victim);
797 4 :
798 4 : assert_eq!(Arc::strong_count(&resident.owner.0), 1);
799 2 :
800 4 : if evict {
801 2 : let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
802 2 :
803 2 : // drive the future to await on the status channel, and then drop it
804 2 : tokio::time::timeout(ADVANCE, evict_and_wait)
805 2 : .await
806 2 : .expect_err("should had been a timeout since we are holding the layer resident");
807 2 : }
808 2 :
809 2 : // 1 == we only evict one of the layers
810 4 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
811 2 :
812 4 : drop(resident);
813 4 :
814 4 : // run any spawned
815 5 : tokio::time::sleep(ADVANCE).await;
816 2 :
817 17 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
818 2 :
819 4 : assert_eq!(
820 4 : 1,
821 4 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
822 4 : );
823 2 : }
824 2 : }
825 :
826 : /// A test case to remind you the cost of these structures. You can bump the size limit
827 : /// below if it is really necessary to add more fields to the structures.
828 : #[test]
829 : #[cfg(target_arch = "x86_64")]
830 2 : fn layer_size() {
831 2 : assert_eq!(size_of::<LayerAccessStats>(), 8);
832 2 : assert_eq!(size_of::<PersistentLayerDesc>(), 104);
833 2 : assert_eq!(size_of::<LayerInner>(), 312);
834 : // it also has the utf8 path
835 2 : }
836 :
837 : struct SpawnBlockingPoolHelper {
838 : awaited_by_spawn_blocking_tasks: Completion,
839 : blocking_tasks: JoinSet<()>,
840 : }
841 :
842 : impl SpawnBlockingPoolHelper {
843 : /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
844 : /// release is called.
845 : ///
846 : /// In the tests this can be used to ensure something cannot be started on the target runtimes
847 : /// spawn_blocking pool.
848 : ///
849 : /// This should be no issue nowdays, because nextest runs each test in it's own process.
850 2 : async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
851 2 : let default_max_blocking_threads = 512;
852 2 :
853 2 : Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
854 2 : }
855 :
856 26 : async fn consume_all_spawn_blocking_threads0(
857 26 : handle: &tokio::runtime::Handle,
858 26 : threads: usize,
859 26 : ) -> Self {
860 26 : assert_ne!(threads, 0);
861 :
862 26 : let (completion, barrier) = completion::channel();
863 26 : let (started, starts_completed) = completion::channel();
864 26 :
865 26 : let mut blocking_tasks = JoinSet::new();
866 26 :
867 7180 : for _ in 0..threads {
868 7180 : let barrier = barrier.clone();
869 7180 : let started = started.clone();
870 7180 : blocking_tasks.spawn_blocking_on(
871 7180 : move || {
872 7180 : drop(started);
873 7180 : tokio::runtime::Handle::current().block_on(barrier.wait());
874 7180 : },
875 7180 : handle,
876 7180 : );
877 7180 : }
878 :
879 26 : drop(started);
880 26 :
881 26 : starts_completed.wait().await;
882 :
883 26 : drop(barrier);
884 26 :
885 26 : tracing::trace!("consumed all threads");
886 :
887 26 : SpawnBlockingPoolHelper {
888 26 : awaited_by_spawn_blocking_tasks: completion,
889 26 : blocking_tasks,
890 26 : }
891 26 : }
892 :
893 : /// Release all previously blocked spawn_blocking threads
894 26 : async fn release(self) {
895 26 : let SpawnBlockingPoolHelper {
896 26 : awaited_by_spawn_blocking_tasks,
897 26 : mut blocking_tasks,
898 26 : } = self;
899 26 :
900 26 : drop(awaited_by_spawn_blocking_tasks);
901 :
902 7206 : while let Some(res) = blocking_tasks.join_next().await {
903 7180 : res.expect("none of the tasks should had panicked");
904 7180 : }
905 :
906 26 : tracing::trace!("released all threads");
907 26 : }
908 :
909 : /// In the tests it is used as an easy way of making sure something scheduled on the target
910 : /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
911 : /// before our tasks have a chance to schedule and complete.
912 12 : async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
913 52 : Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
914 12 : }
915 :
916 22 : async fn consume_and_release_all_of_spawn_blocking_threads0(
917 22 : handle: &tokio::runtime::Handle,
918 22 : threads: usize,
919 22 : ) {
920 22 : Self::consume_all_spawn_blocking_threads0(handle, threads)
921 16 : .await
922 22 : .release()
923 50 : .await
924 22 : }
925 : }
926 :
927 : #[test]
928 2 : fn spawn_blocking_pool_helper_actually_works() {
929 2 : // create a custom runtime for which we know and control how many blocking threads it has
930 2 : //
931 2 : // because the amount is not configurable for our helper, expect the same amount as
932 2 : // BACKGROUND_RUNTIME using the tokio defaults would have.
933 2 : let rt = tokio::runtime::Builder::new_current_thread()
934 2 : .max_blocking_threads(1)
935 2 : .enable_all()
936 2 : .build()
937 2 : .unwrap();
938 2 :
939 2 : let handle = rt.handle();
940 2 :
941 2 : rt.block_on(async move {
942 : // this will not return until all threads are spun up and actually executing the code
943 : // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
944 2 : let consumed =
945 2 : SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
946 :
947 2 : println!("consumed");
948 2 :
949 2 : let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
950 2 : // this will not get to run before we release
951 2 : }));
952 2 :
953 2 : println!("spawned");
954 2 :
955 2 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
956 2 : .await
957 2 : .expect_err("the task should not have gotten to run yet");
958 2 :
959 2 : println!("tried to join");
960 2 :
961 2 : consumed.release().await;
962 :
963 2 : println!("released");
964 2 :
965 2 : tokio::time::timeout(std::time::Duration::from_secs(1), jh)
966 0 : .await
967 2 : .expect("no timeout")
968 2 : .expect("no join error");
969 2 :
970 2 : println!("joined");
971 2 : });
972 2 : }
973 :
974 : /// Drop the low bits from a time, to emulate the precision loss in LayerAccessStats
975 8 : fn lowres_time(hires: SystemTime) -> SystemTime {
976 8 : let ts = hires.duration_since(UNIX_EPOCH).unwrap().as_secs();
977 8 : UNIX_EPOCH + Duration::from_secs(ts)
978 8 : }
979 :
980 : #[test]
981 2 : fn access_stats() {
982 2 : let access_stats = LayerAccessStats::default();
983 2 : // Default is visible
984 2 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
985 :
986 2 : access_stats.set_visibility(LayerVisibilityHint::Covered);
987 2 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
988 2 : access_stats.set_visibility(LayerVisibilityHint::Visible);
989 2 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
990 :
991 2 : let rtime = UNIX_EPOCH + Duration::from_secs(2000000000);
992 2 : access_stats.record_residence_event_at(rtime);
993 2 : assert_eq!(access_stats.latest_activity(), lowres_time(rtime));
994 :
995 2 : let atime = UNIX_EPOCH + Duration::from_secs(2100000000);
996 2 : access_stats.record_access_at(atime);
997 2 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
998 :
999 : // Setting visibility doesn't clobber access time
1000 2 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1001 2 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1002 2 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1003 2 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1004 2 : }
1005 :
1006 : #[test]
1007 2 : fn access_stats_2038() {
1008 2 : // The access stats structure uses a timestamp representation that will run out
1009 2 : // of bits in 2038. One year before that, this unit test will start failing.
1010 2 :
1011 2 : let one_year_from_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
1012 2 : + Duration::from_secs(3600 * 24 * 365);
1013 2 :
1014 2 : assert!(one_year_from_now.as_secs() < (2 << 31));
1015 2 : }
|