Line data Source code
1 : use std::time::UNIX_EPOCH;
2 :
3 : use pageserver_api::key::CONTROLFILE_KEY;
4 : use tokio::task::JoinSet;
5 : use utils::{
6 : completion::{self, Completion},
7 : id::TimelineId,
8 : };
9 :
10 : use super::failpoints::{Failpoint, FailpointKind};
11 : use super::*;
12 : use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint};
13 : use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
14 :
15 : /// Used in tests to advance a future to wanted await point, and not futher.
16 : const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
17 :
18 : /// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
19 : /// timeout uses to advance futures.
20 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
21 :
22 : /// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
23 : #[tokio::test]
24 6 : async fn smoke_test() {
25 6 : let handle = tokio::runtime::Handle::current();
26 6 :
27 6 : let h = TenantHarness::create("smoke_test").await.unwrap();
28 6 : let span = h.span();
29 6 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
30 24 : let (tenant, _) = h.load().await;
31 6 :
32 6 : let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
33 6 :
34 6 : let timeline = tenant
35 6 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
36 12 : .await
37 6 : .unwrap();
38 6 :
39 6 : let layer = {
40 6 : let mut layers = {
41 6 : let layers = timeline.layers.read().await;
42 6 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
43 6 : };
44 6 :
45 6 : assert_eq!(layers.len(), 1);
46 6 :
47 6 : layers.swap_remove(0)
48 6 : };
49 6 :
50 6 : // all layers created at pageserver are like `layer`, initialized with strong
51 6 : // Arc<DownloadedLayer>.
52 6 :
53 6 : let controlfile_keyspace = KeySpace {
54 6 : ranges: vec![CONTROLFILE_KEY..CONTROLFILE_KEY.next()],
55 6 : };
56 6 :
57 6 : let img_before = {
58 6 : let mut data = ValuesReconstructState::default();
59 6 : layer
60 6 : .get_values_reconstruct_data(
61 6 : controlfile_keyspace.clone(),
62 6 : Lsn(0x10)..Lsn(0x11),
63 6 : &mut data,
64 6 : &ctx,
65 6 : )
66 12 : .await
67 6 : .unwrap();
68 6 : data.keys
69 6 : .remove(&CONTROLFILE_KEY)
70 6 : .expect("must be present")
71 6 : .expect("should not error")
72 6 : .img
73 6 : .take()
74 6 : .expect("tenant harness writes the control file")
75 6 : };
76 6 :
77 6 : // important part is evicting the layer, which can be done when there are no more ResidentLayer
78 6 : // instances -- there currently are none, only two `Layer` values, one in the layermap and on
79 6 : // in scope.
80 6 : layer.evict_and_wait(FOREVER).await.unwrap();
81 6 :
82 6 : // double-evict returns an error, which is valid if both eviction_task and disk usage based
83 6 : // eviction would both evict the same layer at the same time.
84 6 :
85 6 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
86 6 : assert!(matches!(e, EvictionError::NotFound));
87 6 :
88 6 : // on accesses when the layer is evicted, it will automatically be downloaded.
89 6 : let img_after = {
90 6 : let mut data = ValuesReconstructState::default();
91 6 : layer
92 6 : .get_values_reconstruct_data(
93 6 : controlfile_keyspace.clone(),
94 6 : Lsn(0x10)..Lsn(0x11),
95 6 : &mut data,
96 6 : &ctx,
97 6 : )
98 6 : .instrument(download_span.clone())
99 23 : .await
100 6 : .unwrap();
101 6 : data.keys
102 6 : .remove(&CONTROLFILE_KEY)
103 6 : .expect("must be present")
104 6 : .expect("should not error")
105 6 : .img
106 6 : .take()
107 6 : .expect("tenant harness writes the control file")
108 6 : };
109 6 :
110 6 : assert_eq!(img_before, img_after);
111 6 :
112 6 : // evict_and_wait can timeout, but it doesn't cancel the evicting itself
113 6 : //
114 6 : // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
115 6 : // artificially slow it down.
116 6 : let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
117 6 :
118 6 : match layer
119 6 : .evict_and_wait(std::time::Duration::ZERO)
120 6 : .await
121 6 : .unwrap_err()
122 6 : {
123 6 : EvictionError::Timeout => {
124 6 : // expected, but note that the eviction is "still ongoing"
125 35 : helper.release().await;
126 6 : // exhaust spawn_blocking pool to ensure it is now complete
127 6 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
128 86 : .await;
129 6 : }
130 6 : other => unreachable!("{other:?}"),
131 6 : }
132 6 :
133 6 : // only way to query if a layer is resident is to acquire a ResidentLayer instance.
134 6 : // Layer::keep_resident never downloads, but it might initialize if the layer file is found
135 6 : // downloaded locally.
136 6 : let none = layer.keep_resident().await;
137 6 : assert!(
138 6 : none.is_none(),
139 6 : "Expected none, because eviction removed the local file, found: {none:?}"
140 6 : );
141 6 :
142 6 : // plain downloading is rarely needed
143 6 : layer
144 6 : .download_and_keep_resident()
145 6 : .instrument(download_span)
146 15 : .await
147 6 : .unwrap();
148 6 :
149 6 : // last important part is deletion on drop: gc and compaction use it for compacted L0 layers
150 6 : // or fully garbage collected layers. deletion means deleting the local file, and scheduling a
151 6 : // deletion of the already unlinked from index_part.json remote file.
152 6 : //
153 6 : // marking a layer to be deleted on drop is irreversible; there is no technical reason against
154 6 : // reversiblity, but currently it is not needed so it is not provided.
155 6 : layer.delete_on_drop();
156 6 :
157 6 : let path = layer.local_path().to_owned();
158 6 :
159 6 : // wait_drop produces an unconnected to Layer future which will resolve when the
160 6 : // LayerInner::drop has completed.
161 6 : let mut wait_drop = std::pin::pin!(layer.wait_drop());
162 6 :
163 6 : // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
164 6 : // until here
165 6 : tokio::time::pause();
166 6 : tokio::time::timeout(ADVANCE, &mut wait_drop)
167 6 : .await
168 6 : .expect_err("should had timed out because two strong references exist");
169 6 :
170 6 : tokio::fs::metadata(&path)
171 6 : .await
172 6 : .expect("the local layer file still exists");
173 6 :
174 6 : let rtc = &timeline.remote_client;
175 6 :
176 6 : {
177 6 : let layers = &[layer];
178 6 : let mut g = timeline.layers.write().await;
179 6 : g.open_mut().unwrap().finish_gc_timeline(layers);
180 6 : // this just updates the remote_physical_size for demonstration purposes
181 6 : rtc.schedule_gc_update(layers).unwrap();
182 6 : }
183 6 :
184 6 : // when strong references are dropped, the file is deleted and remote deletion is scheduled
185 6 : wait_drop.await;
186 6 :
187 6 : let e = tokio::fs::metadata(&path)
188 6 : .await
189 6 : .expect_err("the local file is deleted");
190 6 : assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
191 6 :
192 6 : rtc.wait_completion().await.unwrap();
193 6 :
194 6 : assert_eq!(rtc.get_remote_physical_size(), 0);
195 6 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
196 6 : }
197 :
198 : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
199 : /// time. Now both of them complete per Arc drop semantics.
200 : #[tokio::test(start_paused = true)]
201 6 : async fn evict_and_wait_on_wanted_deleted() {
202 6 : // this is the runtime on which Layer spawns the blocking tasks on
203 6 : let handle = tokio::runtime::Handle::current();
204 6 :
205 6 : let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
206 6 : .await
207 6 : .unwrap();
208 6 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
209 24 : let (tenant, ctx) = h.load().await;
210 6 :
211 6 : let timeline = tenant
212 6 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
213 12 : .await
214 6 : .unwrap();
215 6 :
216 6 : let layer = {
217 6 : let mut layers = {
218 6 : let layers = timeline.layers.read().await;
219 6 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
220 6 : };
221 6 :
222 6 : assert_eq!(layers.len(), 1);
223 6 :
224 6 : layers.swap_remove(0)
225 6 : };
226 6 :
227 6 : // setup done
228 6 :
229 6 : let resident = layer.keep_resident().await.unwrap();
230 6 :
231 6 : {
232 6 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
233 6 :
234 6 : // drive the future to await on the status channel
235 6 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
236 6 : .await
237 6 : .expect_err("should had been a timeout since we are holding the layer resident");
238 6 :
239 6 : layer.delete_on_drop();
240 6 :
241 6 : drop(resident);
242 6 :
243 6 : // make sure the eviction task gets to run
244 71 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
245 6 :
246 6 : let resident = layer.keep_resident().await;
247 6 : assert!(
248 6 : resident.is_none(),
249 6 : "keep_resident should not have re-initialized: {resident:?}"
250 6 : );
251 6 :
252 6 : evict_and_wait
253 6 : .await
254 6 : .expect("evict_and_wait should had succeeded");
255 6 :
256 6 : // works as intended
257 6 : }
258 6 :
259 6 : // assert that once we remove the `layer` from the layer map and drop our reference,
260 6 : // the deletion of the layer in remote_storage happens.
261 6 : {
262 6 : let mut layers = timeline.layers.write().await;
263 6 : layers.open_mut().unwrap().finish_gc_timeline(&[layer]);
264 6 : }
265 6 :
266 38 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
267 6 :
268 6 : assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
269 6 : assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
270 6 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
271 6 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
272 6 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
273 6 : }
274 :
275 : /// This test ensures we are able to read the layer while the layer eviction has been
276 : /// started but not completed.
277 : #[test]
278 6 : fn read_wins_pending_eviction() {
279 6 : let rt = tokio::runtime::Builder::new_current_thread()
280 6 : .max_blocking_threads(1)
281 6 : .enable_all()
282 6 : .start_paused(true)
283 6 : .build()
284 6 : .unwrap();
285 6 :
286 6 : rt.block_on(async move {
287 6 : // this is the runtime on which Layer spawns the blocking tasks on
288 6 : let handle = tokio::runtime::Handle::current();
289 6 : let h = TenantHarness::create("read_wins_pending_eviction")
290 0 : .await
291 6 : .unwrap();
292 24 : let (tenant, ctx) = h.load().await;
293 6 : let span = h.span();
294 6 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
295 :
296 6 : let timeline = tenant
297 6 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
298 12 : .await
299 6 : .unwrap();
300 :
301 6 : let layer = {
302 6 : let mut layers = {
303 6 : let layers = timeline.layers.read().await;
304 6 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
305 6 : };
306 6 :
307 6 : assert_eq!(layers.len(), 1);
308 :
309 6 : layers.swap_remove(0)
310 : };
311 :
312 : // setup done
313 :
314 6 : let resident = layer.keep_resident().await.unwrap();
315 6 :
316 6 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
317 6 :
318 6 : // drive the future to await on the status channel
319 6 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
320 6 : .await
321 6 : .expect_err("should had been a timeout since we are holding the layer resident");
322 6 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
323 :
324 6 : let (completion, barrier) = utils::completion::channel();
325 6 : let (arrival, arrived_at_barrier) = utils::completion::channel();
326 6 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
327 6 : Some(arrival),
328 6 : barrier,
329 6 : ));
330 6 :
331 6 : // now the eviction cannot proceed because the threads are consumed while completion exists
332 6 : drop(resident);
333 6 : arrived_at_barrier.wait().await;
334 6 : assert!(!layer.is_likely_resident());
335 :
336 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
337 6 : layer
338 6 : .0
339 6 : .get_or_maybe_download(false, None)
340 6 : .instrument(download_span)
341 6 : .await
342 6 : .expect("should had reinitialized without downloading");
343 6 :
344 6 : assert!(layer.is_likely_resident());
345 :
346 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
347 6 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
348 0 : .await
349 6 : .expect("no timeout, because get_or_maybe_download re-initialized")
350 6 : .expect_err("eviction should not have succeeded because re-initialized");
351 6 :
352 6 : // works as intended: evictions lose to "downloads"
353 6 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
354 6 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
355 :
356 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
357 : // because of a failpoint
358 6 : assert_eq!(
359 6 : 0,
360 6 : LAYER_IMPL_METRICS
361 6 : .cancelled_evictions
362 6 : .values()
363 54 : .map(|ctr| ctr.get())
364 6 : .sum::<u64>()
365 6 : );
366 :
367 6 : drop(completion);
368 6 :
369 12 : tokio::time::sleep(ADVANCE).await;
370 6 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
371 7 : .await;
372 :
373 6 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
374 :
375 : // now we finally can observe the original eviction failing
376 : // it would had been possible to observe it earlier, but here it is guaranteed to have
377 : // happened.
378 6 : assert_eq!(
379 6 : 1,
380 6 : LAYER_IMPL_METRICS
381 6 : .cancelled_evictions
382 6 : .values()
383 54 : .map(|ctr| ctr.get())
384 6 : .sum::<u64>()
385 6 : );
386 :
387 6 : assert_eq!(
388 6 : 1,
389 6 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
390 6 : );
391 :
392 6 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
393 6 : });
394 6 : }
395 :
396 : /// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
397 : #[test]
398 6 : fn multiple_pending_evictions_in_order() {
399 6 : let name = "multiple_pending_evictions_in_order";
400 6 : let in_order = true;
401 6 : multiple_pending_evictions_scenario(name, in_order);
402 6 : }
403 :
404 : /// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
405 : #[test]
406 6 : fn multiple_pending_evictions_out_of_order() {
407 6 : let name = "multiple_pending_evictions_out_of_order";
408 6 : let in_order = false;
409 6 : multiple_pending_evictions_scenario(name, in_order);
410 6 : }
411 :
412 12 : fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
413 12 : let rt = tokio::runtime::Builder::new_current_thread()
414 12 : .max_blocking_threads(1)
415 12 : .enable_all()
416 12 : .start_paused(true)
417 12 : .build()
418 12 : .unwrap();
419 12 :
420 12 : rt.block_on(async move {
421 12 : // this is the runtime on which Layer spawns the blocking tasks on
422 12 : let handle = tokio::runtime::Handle::current();
423 12 : let h = TenantHarness::create(name).await.unwrap();
424 48 : let (tenant, ctx) = h.load().await;
425 12 : let span = h.span();
426 12 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
427 :
428 12 : let timeline = tenant
429 12 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
430 24 : .await
431 12 : .unwrap();
432 :
433 12 : let layer = {
434 12 : let mut layers = {
435 12 : let layers = timeline.layers.read().await;
436 12 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
437 12 : };
438 12 :
439 12 : assert_eq!(layers.len(), 1);
440 :
441 12 : layers.swap_remove(0)
442 : };
443 :
444 : // setup done
445 :
446 12 : let resident = layer.keep_resident().await.unwrap();
447 12 :
448 12 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
449 12 :
450 12 : // drive the future to await on the status channel
451 12 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
452 12 : .await
453 12 : .expect_err("should had been a timeout since we are holding the layer resident");
454 12 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
455 :
456 12 : let (completion1, barrier) = utils::completion::channel();
457 12 : let mut completion1 = Some(completion1);
458 12 : let (arrival, arrived_at_barrier) = utils::completion::channel();
459 12 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
460 12 : Some(arrival),
461 12 : barrier,
462 12 : ));
463 12 :
464 12 : // now the eviction cannot proceed because we are simulating arbitrary long delay for the
465 12 : // eviction task start.
466 12 : drop(resident);
467 12 : assert!(!layer.is_likely_resident());
468 :
469 12 : arrived_at_barrier.wait().await;
470 :
471 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
472 12 : layer
473 12 : .0
474 12 : .get_or_maybe_download(false, None)
475 12 : .instrument(download_span)
476 12 : .await
477 12 : .expect("should had reinitialized without downloading");
478 12 :
479 12 : assert!(layer.is_likely_resident());
480 :
481 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
482 12 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
483 0 : .await
484 12 : .expect("no timeout, because get_or_maybe_download re-initialized")
485 12 : .expect_err("eviction should not have succeeded because re-initialized");
486 12 :
487 12 : // works as intended: evictions lose to "downloads"
488 12 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
489 12 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
490 :
491 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
492 : // because of a failpoint
493 12 : assert_eq!(
494 12 : 0,
495 12 : LAYER_IMPL_METRICS
496 12 : .cancelled_evictions
497 12 : .values()
498 108 : .map(|ctr| ctr.get())
499 12 : .sum::<u64>()
500 12 : );
501 :
502 12 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
503 :
504 : // configure another failpoint for the second eviction -- evictions are per initialization,
505 : // so now that we've reinitialized the inner, we get to run two of them at the same time.
506 12 : let (completion2, barrier) = utils::completion::channel();
507 12 : let (arrival, arrived_at_barrier) = utils::completion::channel();
508 12 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
509 12 : Some(arrival),
510 12 : barrier,
511 12 : ));
512 12 :
513 12 : let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
514 12 :
515 12 : // advance to the wait on the queue
516 12 : tokio::time::timeout(ADVANCE, &mut second_eviction)
517 24 : .await
518 12 : .expect_err("timeout because failpoint is blocking");
519 12 :
520 12 : arrived_at_barrier.wait().await;
521 :
522 12 : assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
523 :
524 12 : let mut release_earlier_eviction = |expected_reason| {
525 12 : assert_eq!(
526 12 : 0,
527 12 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
528 12 : );
529 :
530 12 : drop(completion1.take().unwrap());
531 12 :
532 12 : let handle = &handle;
533 :
534 12 : async move {
535 12 : tokio::time::sleep(ADVANCE).await;
536 12 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
537 12 : handle, 1,
538 12 : )
539 16 : .await;
540 :
541 12 : assert_eq!(
542 12 : 1,
543 12 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
544 12 : );
545 12 : }
546 12 : };
547 :
548 12 : if in_order {
549 12 : release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
550 6 : }
551 :
552 : // release the later eviction which is for the current version
553 12 : drop(completion2);
554 24 : tokio::time::sleep(ADVANCE).await;
555 12 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
556 18 : .await;
557 :
558 12 : if !in_order {
559 16 : release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
560 6 : }
561 :
562 12 : tokio::time::timeout(ADVANCE, &mut second_eviction)
563 0 : .await
564 12 : .expect("eviction goes through now that spawn_blocking is unclogged")
565 12 : .expect("eviction should succeed, because version matches");
566 12 :
567 12 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
568 :
569 : // ensure the cancelled are unchanged
570 12 : assert_eq!(
571 12 : 1,
572 12 : LAYER_IMPL_METRICS
573 12 : .cancelled_evictions
574 12 : .values()
575 108 : .map(|ctr| ctr.get())
576 12 : .sum::<u64>()
577 12 : );
578 :
579 12 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
580 12 : });
581 12 : }
582 :
583 : /// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
584 : /// a `Layer::keep_resident` call.
585 : ///
586 : /// This matters because cancelling the eviction would leave us in a state where the file is on
587 : /// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
588 : /// have non-repairing `Layer::is_likely_resident`.
589 : #[tokio::test(start_paused = true)]
590 6 : async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
591 6 : let handle = tokio::runtime::Handle::current();
592 6 : let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
593 6 : .await
594 6 : .unwrap();
595 24 : let (tenant, ctx) = h.load().await;
596 6 :
597 6 : let timeline = tenant
598 6 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
599 12 : .await
600 6 : .unwrap();
601 6 :
602 6 : let layer = {
603 6 : let mut layers = {
604 6 : let layers = timeline.layers.read().await;
605 6 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
606 6 : };
607 6 :
608 6 : assert_eq!(layers.len(), 1);
609 6 :
610 6 : layers.swap_remove(0)
611 6 : };
612 6 :
613 6 : // this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
614 6 : // Err) at the right time as in "during" the `LayerInner::needs_download`.
615 6 : layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
616 6 :
617 6 : let (completion, barrier) = utils::completion::channel();
618 6 : let (arrival, arrived_at_barrier) = utils::completion::channel();
619 6 :
620 6 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
621 6 : Some(arrival),
622 6 : barrier,
623 6 : ));
624 6 :
625 6 : tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
626 6 : .await
627 6 : .expect_err("should had advanced to waiting on channel");
628 6 :
629 6 : arrived_at_barrier.wait().await;
630 6 :
631 6 : // simulate a cancelled read which is cancelled before it gets to re-initialize
632 6 : let e = layer
633 6 : .0
634 6 : .get_or_maybe_download(false, None)
635 6 : .await
636 6 : .unwrap_err();
637 6 : assert!(
638 6 : matches!(
639 6 : e,
640 6 : DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
641 6 : ),
642 6 : "{e:?}"
643 6 : );
644 6 :
645 6 : assert!(
646 6 : layer.0.needs_download().await.unwrap().is_none(),
647 6 : "file is still on disk"
648 6 : );
649 6 :
650 6 : // release the eviction task
651 6 : drop(completion);
652 6 : tokio::time::sleep(ADVANCE).await;
653 76 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
654 6 :
655 6 : // failpoint is still enabled, but it is not hit
656 6 : let e = layer
657 6 : .0
658 6 : .get_or_maybe_download(false, None)
659 9 : .await
660 6 : .unwrap_err();
661 6 : assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
662 6 :
663 6 : // failpoint is not counted as cancellation either
664 6 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
665 6 : }
666 :
667 : #[tokio::test(start_paused = true)]
668 6 : async fn evict_and_wait_does_not_wait_for_download() {
669 6 : // let handle = tokio::runtime::Handle::current();
670 6 : let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
671 6 : .await
672 6 : .unwrap();
673 24 : let (tenant, ctx) = h.load().await;
674 6 : let span = h.span();
675 6 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
676 6 :
677 6 : let timeline = tenant
678 6 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
679 12 : .await
680 6 : .unwrap();
681 6 :
682 6 : let layer = {
683 6 : let mut layers = {
684 6 : let layers = timeline.layers.read().await;
685 6 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
686 6 : };
687 6 :
688 6 : assert_eq!(layers.len(), 1);
689 6 :
690 6 : layers.swap_remove(0)
691 6 : };
692 6 :
693 6 : // kind of forced setup: start an eviction but do not allow it progress until we are
694 6 : // downloading
695 6 : let (eviction_can_continue, barrier) = utils::completion::channel();
696 6 : let (arrival, eviction_arrived) = utils::completion::channel();
697 6 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
698 6 : Some(arrival),
699 6 : barrier,
700 6 : ));
701 6 :
702 6 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
703 6 :
704 6 : // use this once-awaited other_evict to synchronize with the eviction
705 6 : let other_evict = layer.evict_and_wait(FOREVER);
706 6 :
707 6 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
708 6 : .await
709 6 : .expect_err("should had advanced");
710 6 : eviction_arrived.wait().await;
711 6 : drop(eviction_can_continue);
712 6 : other_evict.await.unwrap();
713 6 :
714 6 : // now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
715 6 : assert!(!layer.is_likely_resident());
716 6 :
717 6 : // following new evict_and_wait will fail until we've completed the download
718 6 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
719 6 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
720 6 :
721 6 : let (download_can_continue, barrier) = utils::completion::channel();
722 6 : let (arrival, _download_arrived) = utils::completion::channel();
723 6 : layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
724 6 :
725 6 : let mut download = std::pin::pin!(layer
726 6 : .0
727 6 : .get_or_maybe_download(true, None)
728 6 : .instrument(download_span));
729 6 :
730 6 : assert!(
731 6 : !layer.is_likely_resident(),
732 6 : "during download layer is evicted"
733 6 : );
734 6 :
735 6 : tokio::time::timeout(ADVANCE, &mut download)
736 12 : .await
737 6 : .expect_err("should had timed out because of failpoint");
738 6 :
739 6 : // now we finally get to continue, and because the latest state is downloading, we deduce that
740 6 : // original eviction succeeded
741 6 : evict_and_wait.await.unwrap();
742 6 :
743 6 : // however a new evict_and_wait will fail
744 6 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
745 6 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
746 6 :
747 6 : assert!(!layer.is_likely_resident());
748 6 :
749 6 : drop(download_can_continue);
750 6 : download.await.expect("download should had succeeded");
751 6 : assert!(layer.is_likely_resident());
752 6 :
753 6 : // only now can we evict
754 6 : layer.evict_and_wait(FOREVER).await.unwrap();
755 6 : }
756 :
757 : /// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
758 : /// which is the last value.
759 : ///
760 : /// Also checks that the same does not happen on a non-evicted layer (regression test).
761 : #[tokio::test(start_paused = true)]
762 6 : async fn eviction_cancellation_on_drop() {
763 6 : use crate::repository::Value;
764 6 : use bytes::Bytes;
765 6 :
766 6 : // this is the runtime on which Layer spawns the blocking tasks on
767 6 : let handle = tokio::runtime::Handle::current();
768 6 :
769 6 : let h = TenantHarness::create("eviction_cancellation_on_drop")
770 6 : .await
771 6 : .unwrap();
772 6 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
773 24 : let (tenant, ctx) = h.load().await;
774 6 :
775 6 : let timeline = tenant
776 6 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
777 12 : .await
778 6 : .unwrap();
779 6 :
780 6 : {
781 6 : // create_test_timeline wrote us one layer, write another
782 6 : let mut writer = timeline.writer().await;
783 6 : writer
784 6 : .put(
785 6 : crate::repository::Key::from_i128(5),
786 6 : Lsn(0x20),
787 6 : &Value::Image(Bytes::from_static(b"this does not matter either")),
788 6 : &ctx,
789 6 : )
790 6 : .await
791 6 : .unwrap();
792 6 :
793 6 : writer.finish_write(Lsn(0x20));
794 6 : }
795 6 :
796 7 : timeline.freeze_and_flush().await.unwrap();
797 6 :
798 6 : // wait for the upload to complete so our Arc::strong_count assertion holds
799 6 : timeline.remote_client.wait_completion().await.unwrap();
800 6 :
801 6 : let (evicted_layer, not_evicted) = {
802 6 : let mut layers = {
803 6 : let mut guard = timeline.layers.write().await;
804 6 : let layers = guard.likely_resident_layers().cloned().collect::<Vec<_>>();
805 6 : // remove the layers from layermap
806 6 : guard.open_mut().unwrap().finish_gc_timeline(&layers);
807 6 :
808 6 : layers
809 6 : };
810 6 :
811 6 : assert_eq!(layers.len(), 2);
812 6 :
813 6 : (layers.pop().unwrap(), layers.pop().unwrap())
814 6 : };
815 6 :
816 6 : let victims = [(evicted_layer, true), (not_evicted, false)];
817 6 :
818 18 : for (victim, evict) in victims {
819 12 : let resident = victim.keep_resident().await.unwrap();
820 12 : drop(victim);
821 12 :
822 12 : assert_eq!(Arc::strong_count(&resident.owner.0), 1);
823 6 :
824 12 : if evict {
825 6 : let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
826 6 :
827 6 : // drive the future to await on the status channel, and then drop it
828 6 : tokio::time::timeout(ADVANCE, evict_and_wait)
829 6 : .await
830 6 : .expect_err("should had been a timeout since we are holding the layer resident");
831 6 : }
832 6 :
833 6 : // 1 == we only evict one of the layers
834 12 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
835 6 :
836 12 : drop(resident);
837 12 :
838 12 : // run any spawned
839 17 : tokio::time::sleep(ADVANCE).await;
840 6 :
841 57 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
842 6 :
843 12 : assert_eq!(
844 12 : 1,
845 12 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
846 12 : );
847 6 : }
848 6 : }
849 :
850 : /// A test case to remind you the cost of these structures. You can bump the size limit
851 : /// below if it is really necessary to add more fields to the structures.
852 : #[test]
853 : #[cfg(target_arch = "x86_64")]
854 6 : fn layer_size() {
855 6 : assert_eq!(size_of::<LayerAccessStats>(), 8);
856 6 : assert_eq!(size_of::<PersistentLayerDesc>(), 104);
857 6 : assert_eq!(size_of::<LayerInner>(), 296);
858 : // it also has the utf8 path
859 6 : }
860 :
861 : struct SpawnBlockingPoolHelper {
862 : awaited_by_spawn_blocking_tasks: Completion,
863 : blocking_tasks: JoinSet<()>,
864 : }
865 :
866 : impl SpawnBlockingPoolHelper {
867 : /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
868 : /// release is called.
869 : ///
870 : /// In the tests this can be used to ensure something cannot be started on the target runtimes
871 : /// spawn_blocking pool.
872 : ///
873 : /// This should be no issue nowdays, because nextest runs each test in it's own process.
874 6 : async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
875 6 : let default_max_blocking_threads = 512;
876 6 :
877 6 : Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
878 6 : }
879 :
880 78 : async fn consume_all_spawn_blocking_threads0(
881 78 : handle: &tokio::runtime::Handle,
882 78 : threads: usize,
883 78 : ) -> Self {
884 78 : assert_ne!(threads, 0);
885 :
886 78 : let (completion, barrier) = completion::channel();
887 78 : let (started, starts_completed) = completion::channel();
888 78 :
889 78 : let mut blocking_tasks = JoinSet::new();
890 78 :
891 21540 : for _ in 0..threads {
892 21540 : let barrier = barrier.clone();
893 21540 : let started = started.clone();
894 21540 : blocking_tasks.spawn_blocking_on(
895 21540 : move || {
896 21540 : drop(started);
897 21540 : tokio::runtime::Handle::current().block_on(barrier.wait());
898 21540 : },
899 21540 : handle,
900 21540 : );
901 21540 : }
902 :
903 78 : drop(started);
904 78 :
905 78 : starts_completed.wait().await;
906 :
907 78 : drop(barrier);
908 78 :
909 78 : tracing::trace!("consumed all threads");
910 :
911 78 : SpawnBlockingPoolHelper {
912 78 : awaited_by_spawn_blocking_tasks: completion,
913 78 : blocking_tasks,
914 78 : }
915 78 : }
916 :
917 : /// Release all previously blocked spawn_blocking threads
918 78 : async fn release(self) {
919 78 : let SpawnBlockingPoolHelper {
920 78 : awaited_by_spawn_blocking_tasks,
921 78 : mut blocking_tasks,
922 78 : } = self;
923 78 :
924 78 : drop(awaited_by_spawn_blocking_tasks);
925 :
926 21618 : while let Some(res) = blocking_tasks.join_next().await {
927 21540 : res.expect("none of the tasks should had panicked");
928 21540 : }
929 :
930 78 : tracing::trace!("released all threads");
931 78 : }
932 :
933 : /// In the tests it is used as an easy way of making sure something scheduled on the target
934 : /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
935 : /// before our tasks have a chance to schedule and complete.
936 36 : async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
937 328 : Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
938 36 : }
939 :
940 66 : async fn consume_and_release_all_of_spawn_blocking_threads0(
941 66 : handle: &tokio::runtime::Handle,
942 66 : threads: usize,
943 66 : ) {
944 66 : Self::consume_all_spawn_blocking_threads0(handle, threads)
945 47 : .await
946 66 : .release()
947 322 : .await
948 66 : }
949 : }
950 :
951 : #[test]
952 6 : fn spawn_blocking_pool_helper_actually_works() {
953 6 : // create a custom runtime for which we know and control how many blocking threads it has
954 6 : //
955 6 : // because the amount is not configurable for our helper, expect the same amount as
956 6 : // BACKGROUND_RUNTIME using the tokio defaults would have.
957 6 : let rt = tokio::runtime::Builder::new_current_thread()
958 6 : .max_blocking_threads(1)
959 6 : .enable_all()
960 6 : .build()
961 6 : .unwrap();
962 6 :
963 6 : let handle = rt.handle();
964 6 :
965 6 : rt.block_on(async move {
966 : // this will not return until all threads are spun up and actually executing the code
967 : // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
968 6 : let consumed =
969 6 : SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
970 :
971 6 : println!("consumed");
972 6 :
973 6 : let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
974 6 : // this will not get to run before we release
975 6 : }));
976 6 :
977 6 : println!("spawned");
978 6 :
979 6 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
980 6 : .await
981 6 : .expect_err("the task should not have gotten to run yet");
982 6 :
983 6 : println!("tried to join");
984 6 :
985 6 : consumed.release().await;
986 :
987 6 : println!("released");
988 6 :
989 6 : tokio::time::timeout(std::time::Duration::from_secs(1), jh)
990 0 : .await
991 6 : .expect("no timeout")
992 6 : .expect("no join error");
993 6 :
994 6 : println!("joined");
995 6 : });
996 6 : }
997 :
998 : /// Drop the low bits from a time, to emulate the precision loss in LayerAccessStats
999 24 : fn lowres_time(hires: SystemTime) -> SystemTime {
1000 24 : let ts = hires.duration_since(UNIX_EPOCH).unwrap().as_secs();
1001 24 : UNIX_EPOCH + Duration::from_secs(ts)
1002 24 : }
1003 :
1004 : #[test]
1005 6 : fn access_stats() {
1006 6 : let access_stats = LayerAccessStats::default();
1007 6 : // Default is visible
1008 6 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
1009 :
1010 6 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1011 6 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
1012 6 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1013 6 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
1014 :
1015 6 : let rtime = UNIX_EPOCH + Duration::from_secs(2000000000);
1016 6 : access_stats.record_residence_event_at(rtime);
1017 6 : assert_eq!(access_stats.latest_activity(), lowres_time(rtime));
1018 :
1019 6 : let atime = UNIX_EPOCH + Duration::from_secs(2100000000);
1020 6 : access_stats.record_access_at(atime);
1021 6 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1022 :
1023 : // Setting visibility doesn't clobber access time
1024 6 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1025 6 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1026 6 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1027 6 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1028 :
1029 : // Recording access implicitly makes layer visible, if it wasn't already
1030 6 : let atime = UNIX_EPOCH + Duration::from_secs(2200000000);
1031 6 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1032 6 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
1033 6 : assert!(access_stats.record_access_at(atime));
1034 6 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1035 6 : assert!(!access_stats.record_access_at(atime));
1036 6 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1037 6 : }
1038 :
1039 : #[test]
1040 6 : fn access_stats_2038() {
1041 6 : // The access stats structure uses a timestamp representation that will run out
1042 6 : // of bits in 2038. One year before that, this unit test will start failing.
1043 6 :
1044 6 : let one_year_from_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
1045 6 : + Duration::from_secs(3600 * 24 * 365);
1046 6 :
1047 6 : assert!(one_year_from_now.as_secs() < (2 << 31));
1048 6 : }
|