Line data Source code
1 : use std::time::UNIX_EPOCH;
2 :
3 : use pageserver_api::key::CONTROLFILE_KEY;
4 : use tokio::task::JoinSet;
5 : use utils::{
6 : completion::{self, Completion},
7 : id::TimelineId,
8 : };
9 :
10 : use super::failpoints::{Failpoint, FailpointKind};
11 : use super::*;
12 : use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint};
13 : use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
14 :
15 : /// Used in tests to advance a future to wanted await point, and not futher.
16 : const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
17 :
18 : /// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
19 : /// timeout uses to advance futures.
20 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
21 :
22 : /// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
23 : #[tokio::test]
24 2 : async fn smoke_test() {
25 2 : let handle = tokio::runtime::Handle::current();
26 2 :
27 2 : let h = TenantHarness::create("smoke_test").await.unwrap();
28 2 : let span = h.span();
29 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
30 8 : let (tenant, _) = h.load().await;
31 2 :
32 2 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
33 2 :
34 2 : let timeline = tenant
35 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
36 4 : .await
37 2 : .unwrap();
38 2 :
39 2 : let layer = {
40 2 : let mut layers = {
41 2 : let layers = timeline.layers.read().await;
42 2 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
43 2 : };
44 2 :
45 2 : assert_eq!(layers.len(), 1);
46 2 :
47 2 : layers.swap_remove(0)
48 2 : };
49 2 :
50 2 : // all layers created at pageserver are like `layer`, initialized with strong
51 2 : // Arc<DownloadedLayer>.
52 2 :
53 2 : let controlfile_keyspace = KeySpace {
54 2 : ranges: vec![CONTROLFILE_KEY..CONTROLFILE_KEY.next()],
55 2 : };
56 2 :
57 2 : let img_before = {
58 2 : let mut data = ValuesReconstructState::default();
59 2 : layer
60 2 : .get_values_reconstruct_data(
61 2 : controlfile_keyspace.clone(),
62 2 : Lsn(0x10)..Lsn(0x11),
63 2 : &mut data,
64 2 : &ctx,
65 2 : )
66 4 : .await
67 2 : .unwrap();
68 2 : data.keys
69 2 : .remove(&CONTROLFILE_KEY)
70 2 : .expect("must be present")
71 2 : .expect("should not error")
72 2 : .img
73 2 : .take()
74 2 : .expect("tenant harness writes the control file")
75 2 : };
76 2 :
77 2 : // important part is evicting the layer, which can be done when there are no more ResidentLayer
78 2 : // instances -- there currently are none, only two `Layer` values, one in the layermap and on
79 2 : // in scope.
80 2 : layer.evict_and_wait(FOREVER).await.unwrap();
81 2 :
82 2 : // double-evict returns an error, which is valid if both eviction_task and disk usage based
83 2 : // eviction would both evict the same layer at the same time.
84 2 :
85 2 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
86 2 : assert!(matches!(e, EvictionError::NotFound));
87 2 :
88 2 : // on accesses when the layer is evicted, it will automatically be downloaded.
89 2 : let img_after = {
90 2 : let mut data = ValuesReconstructState::default();
91 2 : layer
92 2 : .get_values_reconstruct_data(
93 2 : controlfile_keyspace.clone(),
94 2 : Lsn(0x10)..Lsn(0x11),
95 2 : &mut data,
96 2 : &ctx,
97 2 : )
98 2 : .instrument(download_span.clone())
99 7 : .await
100 2 : .unwrap();
101 2 : data.keys
102 2 : .remove(&CONTROLFILE_KEY)
103 2 : .expect("must be present")
104 2 : .expect("should not error")
105 2 : .img
106 2 : .take()
107 2 : .expect("tenant harness writes the control file")
108 2 : };
109 2 :
110 2 : assert_eq!(img_before, img_after);
111 2 :
112 2 : // evict_and_wait can timeout, but it doesn't cancel the evicting itself
113 2 : //
114 2 : // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
115 2 : // artificially slow it down.
116 2 : let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
117 2 :
118 2 : match layer
119 2 : .evict_and_wait(std::time::Duration::ZERO)
120 2 : .await
121 2 : .unwrap_err()
122 2 : {
123 2 : EvictionError::Timeout => {
124 2 : // expected, but note that the eviction is "still ongoing"
125 9 : helper.release().await;
126 2 : // exhaust spawn_blocking pool to ensure it is now complete
127 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
128 19 : .await;
129 2 : }
130 2 : other => unreachable!("{other:?}"),
131 2 : }
132 2 :
133 2 : // only way to query if a layer is resident is to acquire a ResidentLayer instance.
134 2 : // Layer::keep_resident never downloads, but it might initialize if the layer file is found
135 2 : // downloaded locally.
136 2 : let none = layer.keep_resident().await;
137 2 : assert!(
138 2 : none.is_none(),
139 2 : "Expected none, because eviction removed the local file, found: {none:?}"
140 2 : );
141 2 :
142 2 : // plain downloading is rarely needed
143 2 : layer
144 2 : .download_and_keep_resident()
145 2 : .instrument(download_span)
146 4 : .await
147 2 : .unwrap();
148 2 :
149 2 : // last important part is deletion on drop: gc and compaction use it for compacted L0 layers
150 2 : // or fully garbage collected layers. deletion means deleting the local file, and scheduling a
151 2 : // deletion of the already unlinked from index_part.json remote file.
152 2 : //
153 2 : // marking a layer to be deleted on drop is irreversible; there is no technical reason against
154 2 : // reversiblity, but currently it is not needed so it is not provided.
155 2 : layer.delete_on_drop();
156 2 :
157 2 : let path = layer.local_path().to_owned();
158 2 :
159 2 : // wait_drop produces an unconnected to Layer future which will resolve when the
160 2 : // LayerInner::drop has completed.
161 2 : let mut wait_drop = std::pin::pin!(layer.wait_drop());
162 2 :
163 2 : // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
164 2 : // until here
165 2 : tokio::time::pause();
166 2 : tokio::time::timeout(ADVANCE, &mut wait_drop)
167 2 : .await
168 2 : .expect_err("should had timed out because two strong references exist");
169 2 :
170 2 : tokio::fs::metadata(&path)
171 2 : .await
172 2 : .expect("the local layer file still exists");
173 2 :
174 2 : let rtc = &timeline.remote_client;
175 2 :
176 2 : {
177 2 : let layers = &[layer];
178 2 : let mut g = timeline.layers.write().await;
179 2 : g.open_mut().unwrap().finish_gc_timeline(layers);
180 2 : // this just updates the remote_physical_size for demonstration purposes
181 2 : rtc.schedule_gc_update(layers).unwrap();
182 2 : }
183 2 :
184 2 : // when strong references are dropped, the file is deleted and remote deletion is scheduled
185 2 : wait_drop.await;
186 2 :
187 2 : let e = tokio::fs::metadata(&path)
188 2 : .await
189 2 : .expect_err("the local file is deleted");
190 2 : assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
191 2 :
192 2 : rtc.wait_completion().await.unwrap();
193 2 :
194 2 : assert_eq!(rtc.get_remote_physical_size(), 0);
195 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
196 2 : }
197 :
198 : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
199 : /// time. Now both of them complete per Arc drop semantics.
200 : #[tokio::test(start_paused = true)]
201 2 : async fn evict_and_wait_on_wanted_deleted() {
202 2 : // this is the runtime on which Layer spawns the blocking tasks on
203 2 : let handle = tokio::runtime::Handle::current();
204 2 :
205 2 : let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
206 2 : .await
207 2 : .unwrap();
208 2 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
209 8 : let (tenant, ctx) = h.load().await;
210 2 :
211 2 : let timeline = tenant
212 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
213 4 : .await
214 2 : .unwrap();
215 2 :
216 2 : let layer = {
217 2 : let mut layers = {
218 2 : let layers = timeline.layers.read().await;
219 2 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
220 2 : };
221 2 :
222 2 : assert_eq!(layers.len(), 1);
223 2 :
224 2 : layers.swap_remove(0)
225 2 : };
226 2 :
227 2 : // setup done
228 2 :
229 2 : let resident = layer.keep_resident().await.unwrap();
230 2 :
231 2 : {
232 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
233 2 :
234 2 : // drive the future to await on the status channel
235 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
236 2 : .await
237 2 : .expect_err("should had been a timeout since we are holding the layer resident");
238 2 :
239 2 : layer.delete_on_drop();
240 2 :
241 2 : drop(resident);
242 2 :
243 2 : // make sure the eviction task gets to run
244 8 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
245 2 :
246 2 : let resident = layer.keep_resident().await;
247 2 : assert!(
248 2 : resident.is_none(),
249 2 : "keep_resident should not have re-initialized: {resident:?}"
250 2 : );
251 2 :
252 2 : evict_and_wait
253 2 : .await
254 2 : .expect("evict_and_wait should had succeeded");
255 2 :
256 2 : // works as intended
257 2 : }
258 2 :
259 2 : // assert that once we remove the `layer` from the layer map and drop our reference,
260 2 : // the deletion of the layer in remote_storage happens.
261 2 : {
262 2 : let mut layers = timeline.layers.write().await;
263 2 : layers.open_mut().unwrap().finish_gc_timeline(&[layer]);
264 2 : }
265 2 :
266 8 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
267 2 :
268 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
269 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
270 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
271 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
272 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
273 2 : }
274 :
275 : /// This test ensures we are able to read the layer while the layer eviction has been
276 : /// started but not completed.
277 : #[test]
278 2 : fn read_wins_pending_eviction() {
279 2 : let rt = tokio::runtime::Builder::new_current_thread()
280 2 : .max_blocking_threads(1)
281 2 : .enable_all()
282 2 : .start_paused(true)
283 2 : .build()
284 2 : .unwrap();
285 2 :
286 2 : rt.block_on(async move {
287 2 : // this is the runtime on which Layer spawns the blocking tasks on
288 2 : let handle = tokio::runtime::Handle::current();
289 2 : let h = TenantHarness::create("read_wins_pending_eviction")
290 0 : .await
291 2 : .unwrap();
292 8 : let (tenant, ctx) = h.load().await;
293 2 : let span = h.span();
294 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
295 :
296 2 : let timeline = tenant
297 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
298 4 : .await
299 2 : .unwrap();
300 :
301 2 : let layer = {
302 2 : let mut layers = {
303 2 : let layers = timeline.layers.read().await;
304 2 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
305 2 : };
306 2 :
307 2 : assert_eq!(layers.len(), 1);
308 :
309 2 : layers.swap_remove(0)
310 : };
311 :
312 : // setup done
313 :
314 2 : let resident = layer.keep_resident().await.unwrap();
315 2 :
316 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
317 2 :
318 2 : // drive the future to await on the status channel
319 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
320 2 : .await
321 2 : .expect_err("should had been a timeout since we are holding the layer resident");
322 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
323 :
324 2 : let (completion, barrier) = utils::completion::channel();
325 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
326 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
327 2 : Some(arrival),
328 2 : barrier,
329 2 : ));
330 2 :
331 2 : // now the eviction cannot proceed because the threads are consumed while completion exists
332 2 : drop(resident);
333 2 : arrived_at_barrier.wait().await;
334 2 : assert!(!layer.is_likely_resident());
335 :
336 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
337 2 : layer
338 2 : .0
339 2 : .get_or_maybe_download(false, None)
340 2 : .instrument(download_span)
341 2 : .await
342 2 : .expect("should had reinitialized without downloading");
343 2 :
344 2 : assert!(layer.is_likely_resident());
345 :
346 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
347 2 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
348 0 : .await
349 2 : .expect("no timeout, because get_or_maybe_download re-initialized")
350 2 : .expect_err("eviction should not have succeeded because re-initialized");
351 2 :
352 2 : // works as intended: evictions lose to "downloads"
353 2 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
354 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
355 :
356 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
357 : // because of a failpoint
358 2 : assert_eq!(
359 2 : 0,
360 2 : LAYER_IMPL_METRICS
361 2 : .cancelled_evictions
362 2 : .values()
363 18 : .map(|ctr| ctr.get())
364 2 : .sum::<u64>()
365 2 : );
366 :
367 2 : drop(completion);
368 2 :
369 4 : tokio::time::sleep(ADVANCE).await;
370 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
371 2 : .await;
372 :
373 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
374 :
375 : // now we finally can observe the original eviction failing
376 : // it would had been possible to observe it earlier, but here it is guaranteed to have
377 : // happened.
378 2 : assert_eq!(
379 2 : 1,
380 2 : LAYER_IMPL_METRICS
381 2 : .cancelled_evictions
382 2 : .values()
383 18 : .map(|ctr| ctr.get())
384 2 : .sum::<u64>()
385 2 : );
386 :
387 2 : assert_eq!(
388 2 : 1,
389 2 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
390 2 : );
391 :
392 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
393 2 : });
394 2 : }
395 :
396 : /// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
397 : #[test]
398 2 : fn multiple_pending_evictions_in_order() {
399 2 : let name = "multiple_pending_evictions_in_order";
400 2 : let in_order = true;
401 2 : multiple_pending_evictions_scenario(name, in_order);
402 2 : }
403 :
404 : /// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
405 : #[test]
406 2 : fn multiple_pending_evictions_out_of_order() {
407 2 : let name = "multiple_pending_evictions_out_of_order";
408 2 : let in_order = false;
409 2 : multiple_pending_evictions_scenario(name, in_order);
410 2 : }
411 :
412 4 : fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
413 4 : let rt = tokio::runtime::Builder::new_current_thread()
414 4 : .max_blocking_threads(1)
415 4 : .enable_all()
416 4 : .start_paused(true)
417 4 : .build()
418 4 : .unwrap();
419 4 :
420 4 : rt.block_on(async move {
421 4 : // this is the runtime on which Layer spawns the blocking tasks on
422 4 : let handle = tokio::runtime::Handle::current();
423 4 : let h = TenantHarness::create(name).await.unwrap();
424 16 : let (tenant, ctx) = h.load().await;
425 4 : let span = h.span();
426 4 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
427 :
428 4 : let timeline = tenant
429 4 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
430 8 : .await
431 4 : .unwrap();
432 :
433 4 : let layer = {
434 4 : let mut layers = {
435 4 : let layers = timeline.layers.read().await;
436 4 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
437 4 : };
438 4 :
439 4 : assert_eq!(layers.len(), 1);
440 :
441 4 : layers.swap_remove(0)
442 : };
443 :
444 : // setup done
445 :
446 4 : let resident = layer.keep_resident().await.unwrap();
447 4 :
448 4 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
449 4 :
450 4 : // drive the future to await on the status channel
451 4 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
452 4 : .await
453 4 : .expect_err("should had been a timeout since we are holding the layer resident");
454 4 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
455 :
456 4 : let (completion1, barrier) = utils::completion::channel();
457 4 : let mut completion1 = Some(completion1);
458 4 : let (arrival, arrived_at_barrier) = utils::completion::channel();
459 4 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
460 4 : Some(arrival),
461 4 : barrier,
462 4 : ));
463 4 :
464 4 : // now the eviction cannot proceed because we are simulating arbitrary long delay for the
465 4 : // eviction task start.
466 4 : drop(resident);
467 4 : assert!(!layer.is_likely_resident());
468 :
469 4 : arrived_at_barrier.wait().await;
470 :
471 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
472 4 : layer
473 4 : .0
474 4 : .get_or_maybe_download(false, None)
475 4 : .instrument(download_span)
476 3 : .await
477 4 : .expect("should had reinitialized without downloading");
478 4 :
479 4 : assert!(layer.is_likely_resident());
480 :
481 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
482 4 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
483 0 : .await
484 4 : .expect("no timeout, because get_or_maybe_download re-initialized")
485 4 : .expect_err("eviction should not have succeeded because re-initialized");
486 4 :
487 4 : // works as intended: evictions lose to "downloads"
488 4 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
489 4 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
490 :
491 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
492 : // because of a failpoint
493 4 : assert_eq!(
494 4 : 0,
495 4 : LAYER_IMPL_METRICS
496 4 : .cancelled_evictions
497 4 : .values()
498 36 : .map(|ctr| ctr.get())
499 4 : .sum::<u64>()
500 4 : );
501 :
502 4 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
503 :
504 : // configure another failpoint for the second eviction -- evictions are per initialization,
505 : // so now that we've reinitialized the inner, we get to run two of them at the same time.
506 4 : let (completion2, barrier) = utils::completion::channel();
507 4 : let (arrival, arrived_at_barrier) = utils::completion::channel();
508 4 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
509 4 : Some(arrival),
510 4 : barrier,
511 4 : ));
512 4 :
513 4 : let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
514 4 :
515 4 : // advance to the wait on the queue
516 4 : tokio::time::timeout(ADVANCE, &mut second_eviction)
517 8 : .await
518 4 : .expect_err("timeout because failpoint is blocking");
519 4 :
520 4 : arrived_at_barrier.wait().await;
521 :
522 4 : assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
523 :
524 4 : let mut release_earlier_eviction = |expected_reason| {
525 4 : assert_eq!(
526 4 : 0,
527 4 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
528 4 : );
529 :
530 4 : drop(completion1.take().unwrap());
531 4 :
532 4 : let handle = &handle;
533 :
534 4 : async move {
535 4 : tokio::time::sleep(ADVANCE).await;
536 4 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
537 4 : handle, 1,
538 4 : )
539 5 : .await;
540 :
541 4 : assert_eq!(
542 4 : 1,
543 4 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
544 4 : );
545 4 : }
546 4 : };
547 :
548 4 : if in_order {
549 3 : release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
550 2 : }
551 :
552 : // release the later eviction which is for the current version
553 4 : drop(completion2);
554 8 : tokio::time::sleep(ADVANCE).await;
555 4 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
556 7 : .await;
557 :
558 4 : if !in_order {
559 6 : release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
560 2 : }
561 :
562 4 : tokio::time::timeout(ADVANCE, &mut second_eviction)
563 0 : .await
564 4 : .expect("eviction goes through now that spawn_blocking is unclogged")
565 4 : .expect("eviction should succeed, because version matches");
566 4 :
567 4 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
568 :
569 : // ensure the cancelled are unchanged
570 4 : assert_eq!(
571 4 : 1,
572 4 : LAYER_IMPL_METRICS
573 4 : .cancelled_evictions
574 4 : .values()
575 36 : .map(|ctr| ctr.get())
576 4 : .sum::<u64>()
577 4 : );
578 :
579 4 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
580 4 : });
581 4 : }
582 :
583 : /// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
584 : /// a `Layer::keep_resident` call.
585 : ///
586 : /// This matters because cancelling the eviction would leave us in a state where the file is on
587 : /// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
588 : /// have non-repairing `Layer::is_likely_resident`.
589 : #[tokio::test(start_paused = true)]
590 2 : async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
591 2 : let handle = tokio::runtime::Handle::current();
592 2 : let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
593 2 : .await
594 2 : .unwrap();
595 8 : let (tenant, ctx) = h.load().await;
596 2 :
597 2 : let timeline = tenant
598 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
599 4 : .await
600 2 : .unwrap();
601 2 :
602 2 : let layer = {
603 2 : let mut layers = {
604 2 : let layers = timeline.layers.read().await;
605 2 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
606 2 : };
607 2 :
608 2 : assert_eq!(layers.len(), 1);
609 2 :
610 2 : layers.swap_remove(0)
611 2 : };
612 2 :
613 2 : // this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
614 2 : // Err) at the right time as in "during" the `LayerInner::needs_download`.
615 2 : layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
616 2 :
617 2 : let (completion, barrier) = utils::completion::channel();
618 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
619 2 :
620 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
621 2 : Some(arrival),
622 2 : barrier,
623 2 : ));
624 2 :
625 2 : tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
626 2 : .await
627 2 : .expect_err("should had advanced to waiting on channel");
628 2 :
629 2 : arrived_at_barrier.wait().await;
630 2 :
631 2 : // simulate a cancelled read which is cancelled before it gets to re-initialize
632 2 : let e = layer
633 2 : .0
634 2 : .get_or_maybe_download(false, None)
635 2 : .await
636 2 : .unwrap_err();
637 2 : assert!(
638 2 : matches!(
639 2 : e,
640 2 : DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
641 2 : ),
642 2 : "{e:?}"
643 2 : );
644 2 :
645 2 : assert!(
646 2 : layer.0.needs_download().await.unwrap().is_none(),
647 2 : "file is still on disk"
648 2 : );
649 2 :
650 2 : // release the eviction task
651 2 : drop(completion);
652 2 : tokio::time::sleep(ADVANCE).await;
653 11 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
654 2 :
655 2 : // failpoint is still enabled, but it is not hit
656 2 : let e = layer
657 2 : .0
658 2 : .get_or_maybe_download(false, None)
659 2 : .await
660 2 : .unwrap_err();
661 2 : assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
662 2 :
663 2 : // failpoint is not counted as cancellation either
664 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
665 2 : }
666 :
667 : #[tokio::test(start_paused = true)]
668 2 : async fn evict_and_wait_does_not_wait_for_download() {
669 2 : // let handle = tokio::runtime::Handle::current();
670 2 : let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
671 2 : .await
672 2 : .unwrap();
673 8 : let (tenant, ctx) = h.load().await;
674 2 : let span = h.span();
675 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
676 2 :
677 2 : let timeline = tenant
678 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
679 4 : .await
680 2 : .unwrap();
681 2 :
682 2 : let layer = {
683 2 : let mut layers = {
684 2 : let layers = timeline.layers.read().await;
685 2 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
686 2 : };
687 2 :
688 2 : assert_eq!(layers.len(), 1);
689 2 :
690 2 : layers.swap_remove(0)
691 2 : };
692 2 :
693 2 : // kind of forced setup: start an eviction but do not allow it progress until we are
694 2 : // downloading
695 2 : let (eviction_can_continue, barrier) = utils::completion::channel();
696 2 : let (arrival, eviction_arrived) = utils::completion::channel();
697 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
698 2 : Some(arrival),
699 2 : barrier,
700 2 : ));
701 2 :
702 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
703 2 :
704 2 : // use this once-awaited other_evict to synchronize with the eviction
705 2 : let other_evict = layer.evict_and_wait(FOREVER);
706 2 :
707 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
708 2 : .await
709 2 : .expect_err("should had advanced");
710 2 : eviction_arrived.wait().await;
711 2 : drop(eviction_can_continue);
712 2 : other_evict.await.unwrap();
713 2 :
714 2 : // now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
715 2 : assert!(!layer.is_likely_resident());
716 2 :
717 2 : // following new evict_and_wait will fail until we've completed the download
718 2 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
719 2 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
720 2 :
721 2 : let (download_can_continue, barrier) = utils::completion::channel();
722 2 : let (arrival, _download_arrived) = utils::completion::channel();
723 2 : layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
724 2 :
725 2 : let mut download = std::pin::pin!(layer
726 2 : .0
727 2 : .get_or_maybe_download(true, None)
728 2 : .instrument(download_span));
729 2 :
730 2 : assert!(
731 2 : !layer.is_likely_resident(),
732 2 : "during download layer is evicted"
733 2 : );
734 2 :
735 2 : tokio::time::timeout(ADVANCE, &mut download)
736 3 : .await
737 2 : .expect_err("should had timed out because of failpoint");
738 2 :
739 2 : // now we finally get to continue, and because the latest state is downloading, we deduce that
740 2 : // original eviction succeeded
741 2 : evict_and_wait.await.unwrap();
742 2 :
743 2 : // however a new evict_and_wait will fail
744 2 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
745 2 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
746 2 :
747 2 : assert!(!layer.is_likely_resident());
748 2 :
749 2 : drop(download_can_continue);
750 2 : download.await.expect("download should had succeeded");
751 2 : assert!(layer.is_likely_resident());
752 2 :
753 2 : // only now can we evict
754 2 : layer.evict_and_wait(FOREVER).await.unwrap();
755 2 : }
756 :
757 : /// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
758 : /// which is the last value.
759 : ///
760 : /// Also checks that the same does not happen on a non-evicted layer (regression test).
761 : #[tokio::test(start_paused = true)]
762 2 : async fn eviction_cancellation_on_drop() {
763 2 : use crate::repository::Value;
764 2 : use bytes::Bytes;
765 2 :
766 2 : // this is the runtime on which Layer spawns the blocking tasks on
767 2 : let handle = tokio::runtime::Handle::current();
768 2 :
769 2 : let h = TenantHarness::create("eviction_cancellation_on_drop")
770 2 : .await
771 2 : .unwrap();
772 2 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
773 8 : let (tenant, ctx) = h.load().await;
774 2 :
775 2 : let timeline = tenant
776 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
777 4 : .await
778 2 : .unwrap();
779 2 :
780 2 : {
781 2 : // create_test_timeline wrote us one layer, write another
782 2 : let mut writer = timeline.writer().await;
783 2 : writer
784 2 : .put(
785 2 : crate::repository::Key::from_i128(5),
786 2 : Lsn(0x20),
787 2 : &Value::Image(Bytes::from_static(b"this does not matter either")),
788 2 : &ctx,
789 2 : )
790 2 : .await
791 2 : .unwrap();
792 2 :
793 2 : writer.finish_write(Lsn(0x20));
794 2 : }
795 2 :
796 2 : timeline.freeze_and_flush().await.unwrap();
797 2 :
798 2 : // wait for the upload to complete so our Arc::strong_count assertion holds
799 2 : timeline.remote_client.wait_completion().await.unwrap();
800 2 :
801 2 : let (evicted_layer, not_evicted) = {
802 2 : let mut layers = {
803 2 : let mut guard = timeline.layers.write().await;
804 2 : let layers = guard.likely_resident_layers().cloned().collect::<Vec<_>>();
805 2 : // remove the layers from layermap
806 2 : guard.open_mut().unwrap().finish_gc_timeline(&layers);
807 2 :
808 2 : layers
809 2 : };
810 2 :
811 2 : assert_eq!(layers.len(), 2);
812 2 :
813 2 : (layers.pop().unwrap(), layers.pop().unwrap())
814 2 : };
815 2 :
816 2 : let victims = [(evicted_layer, true), (not_evicted, false)];
817 2 :
818 6 : for (victim, evict) in victims {
819 4 : let resident = victim.keep_resident().await.unwrap();
820 4 : drop(victim);
821 4 :
822 4 : assert_eq!(Arc::strong_count(&resident.owner.0), 1);
823 2 :
824 4 : if evict {
825 2 : let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
826 2 :
827 2 : // drive the future to await on the status channel, and then drop it
828 2 : tokio::time::timeout(ADVANCE, evict_and_wait)
829 2 : .await
830 2 : .expect_err("should had been a timeout since we are holding the layer resident");
831 2 : }
832 2 :
833 2 : // 1 == we only evict one of the layers
834 4 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
835 2 :
836 4 : drop(resident);
837 4 :
838 4 : // run any spawned
839 5 : tokio::time::sleep(ADVANCE).await;
840 2 :
841 17 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
842 2 :
843 4 : assert_eq!(
844 4 : 1,
845 4 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
846 4 : );
847 2 : }
848 2 : }
849 :
850 : /// A test case to remind you the cost of these structures. You can bump the size limit
851 : /// below if it is really necessary to add more fields to the structures.
852 : #[test]
853 : #[cfg(target_arch = "x86_64")]
854 2 : fn layer_size() {
855 2 : assert_eq!(size_of::<LayerAccessStats>(), 8);
856 2 : assert_eq!(size_of::<PersistentLayerDesc>(), 104);
857 2 : assert_eq!(size_of::<LayerInner>(), 296);
858 : // it also has the utf8 path
859 2 : }
860 :
861 : struct SpawnBlockingPoolHelper {
862 : awaited_by_spawn_blocking_tasks: Completion,
863 : blocking_tasks: JoinSet<()>,
864 : }
865 :
866 : impl SpawnBlockingPoolHelper {
867 : /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
868 : /// release is called.
869 : ///
870 : /// In the tests this can be used to ensure something cannot be started on the target runtimes
871 : /// spawn_blocking pool.
872 : ///
873 : /// This should be no issue nowdays, because nextest runs each test in it's own process.
874 2 : async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
875 2 : let default_max_blocking_threads = 512;
876 2 :
877 2 : Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
878 2 : }
879 :
880 26 : async fn consume_all_spawn_blocking_threads0(
881 26 : handle: &tokio::runtime::Handle,
882 26 : threads: usize,
883 26 : ) -> Self {
884 26 : assert_ne!(threads, 0);
885 :
886 26 : let (completion, barrier) = completion::channel();
887 26 : let (started, starts_completed) = completion::channel();
888 26 :
889 26 : let mut blocking_tasks = JoinSet::new();
890 26 :
891 7180 : for _ in 0..threads {
892 7180 : let barrier = barrier.clone();
893 7180 : let started = started.clone();
894 7180 : blocking_tasks.spawn_blocking_on(
895 7180 : move || {
896 7180 : drop(started);
897 7180 : tokio::runtime::Handle::current().block_on(barrier.wait());
898 7180 : },
899 7180 : handle,
900 7180 : );
901 7180 : }
902 :
903 26 : drop(started);
904 26 :
905 26 : starts_completed.wait().await;
906 :
907 26 : drop(barrier);
908 26 :
909 26 : tracing::trace!("consumed all threads");
910 :
911 26 : SpawnBlockingPoolHelper {
912 26 : awaited_by_spawn_blocking_tasks: completion,
913 26 : blocking_tasks,
914 26 : }
915 26 : }
916 :
917 : /// Release all previously blocked spawn_blocking threads
918 26 : async fn release(self) {
919 26 : let SpawnBlockingPoolHelper {
920 26 : awaited_by_spawn_blocking_tasks,
921 26 : mut blocking_tasks,
922 26 : } = self;
923 26 :
924 26 : drop(awaited_by_spawn_blocking_tasks);
925 :
926 7206 : while let Some(res) = blocking_tasks.join_next().await {
927 7180 : res.expect("none of the tasks should had panicked");
928 7180 : }
929 :
930 26 : tracing::trace!("released all threads");
931 26 : }
932 :
933 : /// In the tests it is used as an easy way of making sure something scheduled on the target
934 : /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
935 : /// before our tasks have a chance to schedule and complete.
936 12 : async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
937 63 : Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
938 12 : }
939 :
940 22 : async fn consume_and_release_all_of_spawn_blocking_threads0(
941 22 : handle: &tokio::runtime::Handle,
942 22 : threads: usize,
943 22 : ) {
944 22 : Self::consume_all_spawn_blocking_threads0(handle, threads)
945 17 : .await
946 22 : .release()
947 60 : .await
948 22 : }
949 : }
950 :
951 : #[test]
952 2 : fn spawn_blocking_pool_helper_actually_works() {
953 2 : // create a custom runtime for which we know and control how many blocking threads it has
954 2 : //
955 2 : // because the amount is not configurable for our helper, expect the same amount as
956 2 : // BACKGROUND_RUNTIME using the tokio defaults would have.
957 2 : let rt = tokio::runtime::Builder::new_current_thread()
958 2 : .max_blocking_threads(1)
959 2 : .enable_all()
960 2 : .build()
961 2 : .unwrap();
962 2 :
963 2 : let handle = rt.handle();
964 2 :
965 2 : rt.block_on(async move {
966 : // this will not return until all threads are spun up and actually executing the code
967 : // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
968 2 : let consumed =
969 2 : SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
970 :
971 2 : println!("consumed");
972 2 :
973 2 : let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
974 2 : // this will not get to run before we release
975 2 : }));
976 2 :
977 2 : println!("spawned");
978 2 :
979 2 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
980 2 : .await
981 2 : .expect_err("the task should not have gotten to run yet");
982 2 :
983 2 : println!("tried to join");
984 2 :
985 2 : consumed.release().await;
986 :
987 2 : println!("released");
988 2 :
989 2 : tokio::time::timeout(std::time::Duration::from_secs(1), jh)
990 0 : .await
991 2 : .expect("no timeout")
992 2 : .expect("no join error");
993 2 :
994 2 : println!("joined");
995 2 : });
996 2 : }
997 :
998 : /// Drop the low bits from a time, to emulate the precision loss in LayerAccessStats
999 8 : fn lowres_time(hires: SystemTime) -> SystemTime {
1000 8 : let ts = hires.duration_since(UNIX_EPOCH).unwrap().as_secs();
1001 8 : UNIX_EPOCH + Duration::from_secs(ts)
1002 8 : }
1003 :
1004 : #[test]
1005 2 : fn access_stats() {
1006 2 : let access_stats = LayerAccessStats::default();
1007 2 : // Default is visible
1008 2 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
1009 :
1010 2 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1011 2 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
1012 2 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1013 2 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
1014 :
1015 2 : let rtime = UNIX_EPOCH + Duration::from_secs(2000000000);
1016 2 : access_stats.record_residence_event_at(rtime);
1017 2 : assert_eq!(access_stats.latest_activity(), lowres_time(rtime));
1018 :
1019 2 : let atime = UNIX_EPOCH + Duration::from_secs(2100000000);
1020 2 : access_stats.record_access_at(atime);
1021 2 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1022 :
1023 : // Setting visibility doesn't clobber access time
1024 2 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1025 2 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1026 2 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1027 2 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1028 :
1029 : // Recording access implicitly makes layer visible, if it wasn't already
1030 2 : let atime = UNIX_EPOCH + Duration::from_secs(2200000000);
1031 2 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1032 2 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
1033 2 : assert!(access_stats.record_access_at(atime));
1034 2 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1035 2 : assert!(!access_stats.record_access_at(atime));
1036 2 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1037 2 : }
1038 :
1039 : #[test]
1040 2 : fn access_stats_2038() {
1041 2 : // The access stats structure uses a timestamp representation that will run out
1042 2 : // of bits in 2038. One year before that, this unit test will start failing.
1043 2 :
1044 2 : let one_year_from_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
1045 2 : + Duration::from_secs(3600 * 24 * 365);
1046 2 :
1047 2 : assert!(one_year_from_now.as_secs() < (2 << 31));
1048 2 : }
|