Line data Source code
1 : use std::time::UNIX_EPOCH;
2 :
3 : use pageserver_api::key::{CONTROLFILE_KEY, Key};
4 : use postgres_ffi::PgMajorVersion;
5 : use tokio::task::JoinSet;
6 : use utils::completion::{self, Completion};
7 : use utils::id::TimelineId;
8 :
9 : use super::failpoints::{Failpoint, FailpointKind};
10 : use super::*;
11 : use crate::context::DownloadBehavior;
12 : use crate::tenant::harness::{TenantHarness, test_img};
13 : use crate::tenant::storage_layer::{IoConcurrency, LayerVisibilityHint};
14 : use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
15 :
16 : /// Used in tests to advance a future to wanted await point, and not futher.
17 : const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
18 :
19 : /// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
20 : /// timeout uses to advance futures.
21 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
22 :
23 : /// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
24 : #[tokio::test]
25 1 : async fn smoke_test() {
26 1 : let handle = tokio::runtime::Handle::current();
27 :
28 1 : let h = TenantHarness::create("smoke_test").await.unwrap();
29 1 : let span = h.span();
30 1 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
31 1 : let (tenant, ctx) = h.load().await;
32 1 : let io_concurrency = IoConcurrency::spawn_for_test();
33 :
34 1 : let image_layers = vec![(
35 1 : Lsn(0x40),
36 1 : vec![(
37 1 : Key::from_hex("620000000033333333444444445500000000").unwrap(),
38 1 : test_img("foo"),
39 1 : )],
40 1 : )];
41 :
42 : // Create a test timeline with one real layer, and one synthetic test layer. The synthetic
43 : // one is only there so that we can GC the real one without leaving the timeline's metadata
44 : // empty, which is an illegal state (see [`IndexPart::validate`]).
45 1 : let timeline = tenant
46 1 : .create_test_timeline_with_layers(
47 1 : TimelineId::generate(),
48 1 : Lsn(0x10),
49 1 : PgMajorVersion::PG14,
50 1 : &ctx,
51 1 : Default::default(), // in-memory layers
52 1 : Default::default(),
53 1 : image_layers,
54 1 : Lsn(0x100),
55 1 : )
56 1 : .await
57 1 : .unwrap();
58 1 : let ctx = &ctx.with_scope_timeline(&timeline);
59 :
60 : // Grab one of the timeline's layers to exercise in the test, and the other layer that is just
61 : // there to avoid the timeline being illegally empty
62 1 : let (layer, dummy_layer) = {
63 1 : let mut layers = {
64 1 : let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
65 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
66 : };
67 :
68 1 : assert_eq!(layers.len(), 2);
69 :
70 2 : layers.sort_by_key(|l| l.layer_desc().get_key_range().start);
71 1 : let synthetic_layer = layers.pop().unwrap();
72 1 : let real_layer = layers.pop().unwrap();
73 1 : tracing::info!(
74 0 : "real_layer={:?} ({}), synthetic_layer={:?} ({})",
75 : real_layer,
76 0 : real_layer.layer_desc().file_size,
77 : synthetic_layer,
78 0 : synthetic_layer.layer_desc().file_size
79 : );
80 1 : (real_layer, synthetic_layer)
81 : };
82 :
83 : // all layers created at pageserver are like `layer`, initialized with strong
84 : // Arc<DownloadedLayer>.
85 :
86 1 : let controlfile_keyspace = KeySpace {
87 1 : ranges: vec![CONTROLFILE_KEY..CONTROLFILE_KEY.next()],
88 1 : };
89 :
90 1 : let img_before = {
91 1 : let mut data = ValuesReconstructState::new(io_concurrency.clone());
92 1 : layer
93 1 : .get_values_reconstruct_data(
94 1 : controlfile_keyspace.clone(),
95 1 : Lsn(0x10)..Lsn(0x11),
96 1 : &mut data,
97 1 : ctx,
98 1 : )
99 1 : .await
100 1 : .unwrap();
101 :
102 1 : data.keys
103 1 : .remove(&CONTROLFILE_KEY)
104 1 : .expect("must be present")
105 1 : .collect_pending_ios()
106 1 : .await
107 1 : .expect("must not error")
108 : .img
109 1 : .take()
110 1 : .expect("tenant harness writes the control file")
111 : };
112 :
113 : // important part is evicting the layer, which can be done when there are no more ResidentLayer
114 : // instances -- there currently are none, only two `Layer` values, one in the layermap and on
115 : // in scope.
116 1 : layer.evict_and_wait(FOREVER).await.unwrap();
117 :
118 : // double-evict returns an error, which is valid if both eviction_task and disk usage based
119 : // eviction would both evict the same layer at the same time.
120 :
121 1 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
122 1 : assert!(matches!(e, EvictionError::NotFound));
123 :
124 1 : let dl_ctx = RequestContextBuilder::from(ctx)
125 1 : .download_behavior(DownloadBehavior::Download)
126 1 : .attached_child();
127 :
128 : // on accesses when the layer is evicted, it will automatically be downloaded.
129 1 : let img_after = {
130 1 : let mut data = ValuesReconstructState::new(io_concurrency.clone());
131 1 : layer
132 1 : .get_values_reconstruct_data(
133 1 : controlfile_keyspace.clone(),
134 1 : Lsn(0x10)..Lsn(0x11),
135 1 : &mut data,
136 1 : &dl_ctx,
137 1 : )
138 1 : .instrument(download_span.clone())
139 1 : .await
140 1 : .unwrap();
141 1 : data.keys
142 1 : .remove(&CONTROLFILE_KEY)
143 1 : .expect("must be present")
144 1 : .collect_pending_ios()
145 1 : .await
146 1 : .expect("must not error")
147 : .img
148 1 : .take()
149 1 : .expect("tenant harness writes the control file")
150 : };
151 :
152 1 : assert_eq!(img_before, img_after);
153 :
154 : // evict_and_wait can timeout, but it doesn't cancel the evicting itself
155 : //
156 : // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
157 : // artificially slow it down.
158 1 : let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
159 :
160 1 : match layer
161 1 : .evict_and_wait(std::time::Duration::ZERO)
162 1 : .await
163 1 : .unwrap_err()
164 : {
165 : EvictionError::Timeout => {
166 : // expected, but note that the eviction is "still ongoing"
167 1 : helper.release().await;
168 : // exhaust spawn_blocking pool to ensure it is now complete
169 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
170 1 : .await;
171 : }
172 0 : other => unreachable!("{other:?}"),
173 : }
174 :
175 : // only way to query if a layer is resident is to acquire a ResidentLayer instance.
176 : // Layer::keep_resident never downloads, but it might initialize if the layer file is found
177 : // downloaded locally.
178 1 : let none = layer.keep_resident().await;
179 1 : assert!(
180 1 : none.is_none(),
181 0 : "Expected none, because eviction removed the local file, found: {none:?}"
182 : );
183 :
184 : // plain downloading is rarely needed
185 1 : layer
186 1 : .download_and_keep_resident(&dl_ctx)
187 1 : .instrument(download_span)
188 1 : .await
189 1 : .unwrap();
190 :
191 : // last important part is deletion on drop: gc and compaction use it for compacted L0 layers
192 : // or fully garbage collected layers. deletion means deleting the local file, and scheduling a
193 : // deletion of the already unlinked from index_part.json remote file.
194 : //
195 : // marking a layer to be deleted on drop is irreversible; there is no technical reason against
196 : // reversiblity, but currently it is not needed so it is not provided.
197 1 : layer.delete_on_drop();
198 :
199 1 : let path = layer.local_path().to_owned();
200 :
201 : // wait_drop produces an unconnected to Layer future which will resolve when the
202 : // LayerInner::drop has completed.
203 1 : let mut wait_drop = std::pin::pin!(layer.wait_drop());
204 :
205 : // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
206 : // until here
207 1 : tokio::time::pause();
208 1 : tokio::time::timeout(ADVANCE, &mut wait_drop)
209 1 : .await
210 1 : .expect_err("should had timed out because two strong references exist");
211 :
212 1 : tokio::fs::metadata(&path)
213 1 : .await
214 1 : .expect("the local layer file still exists");
215 :
216 1 : let rtc = &timeline.remote_client;
217 :
218 : // Simulate GC removing our test layer.
219 : {
220 1 : let mut g = timeline.layers.write(LayerManagerLockHolder::Testing).await;
221 :
222 1 : let layers = &[layer];
223 1 : g.open_mut().unwrap().finish_gc_timeline(layers);
224 :
225 : // this just updates the remote_physical_size for demonstration purposes
226 1 : rtc.schedule_gc_update(layers).unwrap();
227 : }
228 :
229 : // when strong references are dropped, the file is deleted and remote deletion is scheduled
230 1 : wait_drop.await;
231 :
232 1 : let e = tokio::fs::metadata(&path)
233 1 : .await
234 1 : .expect_err("the local file is deleted");
235 1 : assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
236 :
237 1 : rtc.wait_completion().await.unwrap();
238 :
239 1 : assert_eq!(
240 1 : rtc.get_remote_physical_size(),
241 1 : dummy_layer.metadata().file_size
242 : );
243 1 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
244 1 : }
245 :
246 : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
247 : /// time. Now both of them complete per Arc drop semantics.
248 : #[tokio::test(start_paused = true)]
249 1 : async fn evict_and_wait_on_wanted_deleted() {
250 : // this is the runtime on which Layer spawns the blocking tasks on
251 1 : let handle = tokio::runtime::Handle::current();
252 :
253 1 : let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
254 1 : .await
255 1 : .unwrap();
256 1 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
257 1 : let (tenant, ctx) = h.load().await;
258 :
259 1 : let timeline = tenant
260 1 : .create_test_timeline(
261 1 : TimelineId::generate(),
262 1 : Lsn(0x10),
263 1 : PgMajorVersion::PG14,
264 1 : &ctx,
265 1 : )
266 1 : .await
267 1 : .unwrap();
268 :
269 1 : let layer = {
270 1 : let mut layers = {
271 1 : let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
272 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
273 : };
274 :
275 1 : assert_eq!(layers.len(), 1);
276 :
277 1 : layers.swap_remove(0)
278 : };
279 :
280 : // setup done
281 :
282 1 : let resident = layer.keep_resident().await.unwrap();
283 :
284 : {
285 1 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
286 :
287 : // drive the future to await on the status channel
288 1 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
289 1 : .await
290 1 : .expect_err("should had been a timeout since we are holding the layer resident");
291 :
292 1 : layer.delete_on_drop();
293 :
294 1 : drop(resident);
295 :
296 : // make sure the eviction task gets to run
297 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
298 :
299 1 : let resident = layer.keep_resident().await;
300 1 : assert!(
301 1 : resident.is_none(),
302 0 : "keep_resident should not have re-initialized: {resident:?}"
303 : );
304 :
305 1 : evict_and_wait
306 1 : .await
307 1 : .expect("evict_and_wait should had succeeded");
308 :
309 : // works as intended
310 : }
311 :
312 : // assert that once we remove the `layer` from the layer map and drop our reference,
313 : // the deletion of the layer in remote_storage happens.
314 : {
315 1 : let mut layers = timeline.layers.write(LayerManagerLockHolder::Testing).await;
316 1 : layers.open_mut().unwrap().finish_gc_timeline(&[layer]);
317 : }
318 :
319 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
320 :
321 1 : assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
322 1 : assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
323 1 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
324 1 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
325 1 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
326 1 : }
327 :
328 : /// This test ensures we are able to read the layer while the layer eviction has been
329 : /// started but not completed.
330 : #[test]
331 1 : fn read_wins_pending_eviction() {
332 1 : let rt = tokio::runtime::Builder::new_current_thread()
333 1 : .max_blocking_threads(1)
334 1 : .enable_all()
335 1 : .start_paused(true)
336 1 : .build()
337 1 : .unwrap();
338 :
339 1 : rt.block_on(async move {
340 : // this is the runtime on which Layer spawns the blocking tasks on
341 1 : let handle = tokio::runtime::Handle::current();
342 1 : let h = TenantHarness::create("read_wins_pending_eviction")
343 1 : .await
344 1 : .unwrap();
345 1 : let (tenant, ctx) = h.load().await;
346 1 : let span = h.span();
347 1 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
348 :
349 1 : let timeline = tenant
350 1 : .create_test_timeline(
351 1 : TimelineId::generate(),
352 1 : Lsn(0x10),
353 1 : PgMajorVersion::PG14,
354 1 : &ctx,
355 1 : )
356 1 : .await
357 1 : .unwrap();
358 1 : let ctx = ctx.with_scope_timeline(&timeline);
359 :
360 1 : let layer = {
361 1 : let mut layers = {
362 1 : let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
363 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
364 : };
365 :
366 1 : assert_eq!(layers.len(), 1);
367 :
368 1 : layers.swap_remove(0)
369 : };
370 :
371 : // setup done
372 :
373 1 : let resident = layer.keep_resident().await.unwrap();
374 :
375 1 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
376 :
377 : // drive the future to await on the status channel
378 1 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
379 1 : .await
380 1 : .expect_err("should had been a timeout since we are holding the layer resident");
381 1 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
382 :
383 1 : let (completion, barrier) = utils::completion::channel();
384 1 : let (arrival, arrived_at_barrier) = utils::completion::channel();
385 1 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
386 1 : Some(arrival),
387 1 : barrier,
388 1 : ));
389 :
390 : // now the eviction cannot proceed because the threads are consumed while completion exists
391 1 : drop(resident);
392 1 : arrived_at_barrier.wait().await;
393 1 : assert!(!layer.is_likely_resident());
394 :
395 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
396 1 : layer
397 1 : .0
398 1 : .get_or_maybe_download(false, &ctx)
399 1 : .instrument(download_span)
400 1 : .await
401 1 : .expect("should had reinitialized without downloading");
402 :
403 1 : assert!(layer.is_likely_resident());
404 :
405 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
406 1 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
407 1 : .await
408 1 : .expect("no timeout, because get_or_maybe_download re-initialized")
409 1 : .expect_err("eviction should not have succeeded because re-initialized");
410 :
411 : // works as intended: evictions lose to "downloads"
412 1 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
413 1 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
414 :
415 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
416 : // because of a failpoint
417 1 : assert_eq!(
418 : 0,
419 1 : LAYER_IMPL_METRICS
420 1 : .cancelled_evictions
421 1 : .values()
422 9 : .map(|ctr| ctr.get())
423 1 : .sum::<u64>()
424 : );
425 :
426 1 : drop(completion);
427 :
428 1 : tokio::time::sleep(ADVANCE).await;
429 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
430 1 : .await;
431 :
432 1 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
433 :
434 : // now we finally can observe the original eviction failing
435 : // it would had been possible to observe it earlier, but here it is guaranteed to have
436 : // happened.
437 1 : assert_eq!(
438 : 1,
439 1 : LAYER_IMPL_METRICS
440 1 : .cancelled_evictions
441 1 : .values()
442 9 : .map(|ctr| ctr.get())
443 1 : .sum::<u64>()
444 : );
445 :
446 1 : assert_eq!(
447 : 1,
448 1 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
449 : );
450 :
451 1 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
452 1 : });
453 1 : }
454 :
455 : /// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
456 : #[test]
457 1 : fn multiple_pending_evictions_in_order() {
458 1 : let name = "multiple_pending_evictions_in_order";
459 1 : let in_order = true;
460 1 : multiple_pending_evictions_scenario(name, in_order);
461 1 : }
462 :
463 : /// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
464 : #[test]
465 1 : fn multiple_pending_evictions_out_of_order() {
466 1 : let name = "multiple_pending_evictions_out_of_order";
467 1 : let in_order = false;
468 1 : multiple_pending_evictions_scenario(name, in_order);
469 1 : }
470 :
471 2 : fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
472 2 : let rt = tokio::runtime::Builder::new_current_thread()
473 2 : .max_blocking_threads(1)
474 2 : .enable_all()
475 2 : .start_paused(true)
476 2 : .build()
477 2 : .unwrap();
478 :
479 2 : rt.block_on(async move {
480 : // this is the runtime on which Layer spawns the blocking tasks on
481 2 : let handle = tokio::runtime::Handle::current();
482 2 : let h = TenantHarness::create(name).await.unwrap();
483 2 : let (tenant, ctx) = h.load().await;
484 2 : let span = h.span();
485 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
486 :
487 2 : let timeline = tenant
488 2 : .create_test_timeline(
489 2 : TimelineId::generate(),
490 2 : Lsn(0x10),
491 2 : PgMajorVersion::PG14,
492 2 : &ctx,
493 2 : )
494 2 : .await
495 2 : .unwrap();
496 2 : let ctx = ctx.with_scope_timeline(&timeline);
497 :
498 2 : let layer = {
499 2 : let mut layers = {
500 2 : let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
501 2 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
502 : };
503 :
504 2 : assert_eq!(layers.len(), 1);
505 :
506 2 : layers.swap_remove(0)
507 : };
508 :
509 : // setup done
510 :
511 2 : let resident = layer.keep_resident().await.unwrap();
512 :
513 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
514 :
515 : // drive the future to await on the status channel
516 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
517 2 : .await
518 2 : .expect_err("should had been a timeout since we are holding the layer resident");
519 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
520 :
521 2 : let (completion1, barrier) = utils::completion::channel();
522 2 : let mut completion1 = Some(completion1);
523 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
524 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
525 2 : Some(arrival),
526 2 : barrier,
527 2 : ));
528 :
529 : // now the eviction cannot proceed because we are simulating arbitrary long delay for the
530 : // eviction task start.
531 2 : drop(resident);
532 2 : assert!(!layer.is_likely_resident());
533 :
534 2 : arrived_at_barrier.wait().await;
535 :
536 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
537 2 : layer
538 2 : .0
539 2 : .get_or_maybe_download(false, &ctx)
540 2 : .instrument(download_span)
541 2 : .await
542 2 : .expect("should had reinitialized without downloading");
543 :
544 2 : assert!(layer.is_likely_resident());
545 :
546 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
547 2 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
548 2 : .await
549 2 : .expect("no timeout, because get_or_maybe_download re-initialized")
550 2 : .expect_err("eviction should not have succeeded because re-initialized");
551 :
552 : // works as intended: evictions lose to "downloads"
553 2 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
554 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
555 :
556 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
557 : // because of a failpoint
558 2 : assert_eq!(
559 : 0,
560 2 : LAYER_IMPL_METRICS
561 2 : .cancelled_evictions
562 2 : .values()
563 18 : .map(|ctr| ctr.get())
564 2 : .sum::<u64>()
565 : );
566 :
567 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
568 :
569 : // configure another failpoint for the second eviction -- evictions are per initialization,
570 : // so now that we've reinitialized the inner, we get to run two of them at the same time.
571 2 : let (completion2, barrier) = utils::completion::channel();
572 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
573 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
574 2 : Some(arrival),
575 2 : barrier,
576 2 : ));
577 :
578 2 : let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
579 :
580 : // advance to the wait on the queue
581 2 : tokio::time::timeout(ADVANCE, &mut second_eviction)
582 2 : .await
583 2 : .expect_err("timeout because failpoint is blocking");
584 :
585 2 : arrived_at_barrier.wait().await;
586 :
587 2 : assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
588 :
589 2 : let mut release_earlier_eviction = |expected_reason| {
590 2 : assert_eq!(
591 : 0,
592 2 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
593 : );
594 :
595 2 : drop(completion1.take().unwrap());
596 :
597 2 : let handle = &handle;
598 :
599 2 : async move {
600 2 : tokio::time::sleep(ADVANCE).await;
601 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
602 2 : handle, 1,
603 2 : )
604 2 : .await;
605 :
606 2 : assert_eq!(
607 : 1,
608 2 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
609 : );
610 2 : }
611 2 : };
612 :
613 2 : if in_order {
614 1 : release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
615 1 : }
616 :
617 : // release the later eviction which is for the current version
618 2 : drop(completion2);
619 2 : tokio::time::sleep(ADVANCE).await;
620 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
621 2 : .await;
622 :
623 2 : if !in_order {
624 1 : release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
625 1 : }
626 :
627 2 : tokio::time::timeout(ADVANCE, &mut second_eviction)
628 2 : .await
629 2 : .expect("eviction goes through now that spawn_blocking is unclogged")
630 2 : .expect("eviction should succeed, because version matches");
631 :
632 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
633 :
634 : // ensure the cancelled are unchanged
635 2 : assert_eq!(
636 : 1,
637 2 : LAYER_IMPL_METRICS
638 2 : .cancelled_evictions
639 2 : .values()
640 18 : .map(|ctr| ctr.get())
641 2 : .sum::<u64>()
642 : );
643 :
644 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
645 2 : });
646 2 : }
647 :
648 : /// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
649 : /// a `Layer::keep_resident` call.
650 : ///
651 : /// This matters because cancelling the eviction would leave us in a state where the file is on
652 : /// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
653 : /// have non-repairing `Layer::is_likely_resident`.
654 : #[tokio::test(start_paused = true)]
655 1 : async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
656 1 : let handle = tokio::runtime::Handle::current();
657 1 : let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
658 1 : .await
659 1 : .unwrap();
660 1 : let (tenant, ctx) = h.load().await;
661 :
662 1 : let timeline = tenant
663 1 : .create_test_timeline(
664 1 : TimelineId::generate(),
665 1 : Lsn(0x10),
666 1 : PgMajorVersion::PG14,
667 1 : &ctx,
668 1 : )
669 1 : .await
670 1 : .unwrap();
671 1 : let ctx = ctx.with_scope_timeline(&timeline);
672 :
673 : // This test does downloads
674 1 : let ctx = RequestContextBuilder::from(&ctx)
675 1 : .download_behavior(DownloadBehavior::Download)
676 1 : .attached_child();
677 :
678 1 : let layer = {
679 1 : let mut layers = {
680 1 : let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
681 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
682 : };
683 :
684 1 : assert_eq!(layers.len(), 1);
685 :
686 1 : layers.swap_remove(0)
687 : };
688 :
689 : // this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
690 : // Err) at the right time as in "during" the `LayerInner::needs_download`.
691 1 : layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
692 :
693 1 : let (completion, barrier) = utils::completion::channel();
694 1 : let (arrival, arrived_at_barrier) = utils::completion::channel();
695 :
696 1 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
697 1 : Some(arrival),
698 1 : barrier,
699 1 : ));
700 :
701 1 : tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
702 1 : .await
703 1 : .expect_err("should had advanced to waiting on channel");
704 :
705 1 : arrived_at_barrier.wait().await;
706 :
707 : // simulate a cancelled read which is cancelled before it gets to re-initialize
708 1 : let e = layer
709 1 : .0
710 1 : .get_or_maybe_download(false, &ctx)
711 1 : .await
712 1 : .unwrap_err();
713 1 : assert!(
714 0 : matches!(
715 1 : e,
716 : DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
717 : ),
718 0 : "{e:?}"
719 : );
720 :
721 1 : assert!(
722 1 : layer.0.needs_download().await.unwrap().is_none(),
723 0 : "file is still on disk"
724 : );
725 :
726 : // release the eviction task
727 1 : drop(completion);
728 1 : tokio::time::sleep(ADVANCE).await;
729 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
730 :
731 : // failpoint is still enabled, but it is not hit
732 1 : let e = layer
733 1 : .0
734 1 : .get_or_maybe_download(false, &ctx)
735 1 : .await
736 1 : .unwrap_err();
737 1 : assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
738 :
739 : // failpoint is not counted as cancellation either
740 1 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
741 1 : }
742 :
743 : #[tokio::test(start_paused = true)]
744 1 : async fn evict_and_wait_does_not_wait_for_download() {
745 : // let handle = tokio::runtime::Handle::current();
746 1 : let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
747 1 : .await
748 1 : .unwrap();
749 1 : let (tenant, ctx) = h.load().await;
750 1 : let span = h.span();
751 1 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
752 :
753 1 : let timeline = tenant
754 1 : .create_test_timeline(
755 1 : TimelineId::generate(),
756 1 : Lsn(0x10),
757 1 : PgMajorVersion::PG14,
758 1 : &ctx,
759 1 : )
760 1 : .await
761 1 : .unwrap();
762 1 : let ctx = ctx.with_scope_timeline(&timeline);
763 :
764 : // This test does downloads
765 1 : let ctx = RequestContextBuilder::from(&ctx)
766 1 : .download_behavior(DownloadBehavior::Download)
767 1 : .attached_child();
768 :
769 1 : let layer = {
770 1 : let mut layers = {
771 1 : let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
772 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
773 : };
774 :
775 1 : assert_eq!(layers.len(), 1);
776 :
777 1 : layers.swap_remove(0)
778 : };
779 :
780 : // kind of forced setup: start an eviction but do not allow it progress until we are
781 : // downloading
782 1 : let (eviction_can_continue, barrier) = utils::completion::channel();
783 1 : let (arrival, eviction_arrived) = utils::completion::channel();
784 1 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
785 1 : Some(arrival),
786 1 : barrier,
787 1 : ));
788 :
789 1 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
790 :
791 : // use this once-awaited other_evict to synchronize with the eviction
792 1 : let other_evict = layer.evict_and_wait(FOREVER);
793 :
794 1 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
795 1 : .await
796 1 : .expect_err("should had advanced");
797 1 : eviction_arrived.wait().await;
798 1 : drop(eviction_can_continue);
799 1 : other_evict.await.unwrap();
800 :
801 : // now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
802 1 : assert!(!layer.is_likely_resident());
803 :
804 : // following new evict_and_wait will fail until we've completed the download
805 1 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
806 1 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
807 :
808 1 : let (download_can_continue, barrier) = utils::completion::channel();
809 1 : let (arrival, _download_arrived) = utils::completion::channel();
810 1 : layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
811 :
812 1 : let mut download = std::pin::pin!(
813 1 : layer
814 1 : .0
815 1 : .get_or_maybe_download(true, &ctx)
816 1 : .instrument(download_span)
817 : );
818 :
819 1 : assert!(
820 1 : !layer.is_likely_resident(),
821 0 : "during download layer is evicted"
822 : );
823 :
824 1 : tokio::time::timeout(ADVANCE, &mut download)
825 1 : .await
826 1 : .expect_err("should had timed out because of failpoint");
827 :
828 : // now we finally get to continue, and because the latest state is downloading, we deduce that
829 : // original eviction succeeded
830 1 : evict_and_wait.await.unwrap();
831 :
832 : // however a new evict_and_wait will fail
833 1 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
834 1 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
835 :
836 1 : assert!(!layer.is_likely_resident());
837 :
838 1 : drop(download_can_continue);
839 1 : download.await.expect("download should had succeeded");
840 1 : assert!(layer.is_likely_resident());
841 :
842 : // only now can we evict
843 1 : layer.evict_and_wait(FOREVER).await.unwrap();
844 1 : }
845 :
846 : /// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
847 : /// which is the last value.
848 : ///
849 : /// Also checks that the same does not happen on a non-evicted layer (regression test).
850 : #[tokio::test(start_paused = true)]
851 1 : async fn eviction_cancellation_on_drop() {
852 : use bytes::Bytes;
853 : use wal_decoder::models::value::Value;
854 :
855 : // this is the runtime on which Layer spawns the blocking tasks on
856 1 : let handle = tokio::runtime::Handle::current();
857 :
858 1 : let h = TenantHarness::create("eviction_cancellation_on_drop")
859 1 : .await
860 1 : .unwrap();
861 1 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
862 1 : let (tenant, ctx) = h.load().await;
863 :
864 1 : let timeline = tenant
865 1 : .create_test_timeline(
866 1 : TimelineId::generate(),
867 1 : Lsn(0x10),
868 1 : PgMajorVersion::PG14,
869 1 : &ctx,
870 1 : )
871 1 : .await
872 1 : .unwrap();
873 :
874 : {
875 : // create_test_timeline wrote us one layer, write another
876 1 : let mut writer = timeline.writer().await;
877 1 : writer
878 1 : .put(
879 1 : pageserver_api::key::Key::from_i128(5),
880 1 : Lsn(0x20),
881 1 : &Value::Image(Bytes::from_static(b"this does not matter either")),
882 1 : &ctx,
883 1 : )
884 1 : .await
885 1 : .unwrap();
886 :
887 1 : writer.finish_write(Lsn(0x20));
888 : }
889 :
890 1 : timeline.freeze_and_flush().await.unwrap();
891 :
892 : // wait for the upload to complete so our Arc::strong_count assertion holds
893 1 : timeline.remote_client.wait_completion().await.unwrap();
894 :
895 1 : let (evicted_layer, not_evicted) = {
896 1 : let mut layers = {
897 1 : let mut guard = timeline.layers.write(LayerManagerLockHolder::Testing).await;
898 1 : let layers = guard.likely_resident_layers().cloned().collect::<Vec<_>>();
899 : // remove the layers from layermap
900 1 : guard.open_mut().unwrap().finish_gc_timeline(&layers);
901 :
902 1 : layers
903 : };
904 :
905 1 : assert_eq!(layers.len(), 2);
906 :
907 1 : (layers.pop().unwrap(), layers.pop().unwrap())
908 : };
909 :
910 1 : let victims = [(evicted_layer, true), (not_evicted, false)];
911 :
912 3 : for (victim, evict) in victims {
913 2 : let resident = victim.keep_resident().await.unwrap();
914 2 : drop(victim);
915 1 :
916 2 : assert_eq!(Arc::strong_count(&resident.owner.0), 1);
917 1 :
918 2 : if evict {
919 1 : let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
920 1 :
921 1 : // drive the future to await on the status channel, and then drop it
922 1 : tokio::time::timeout(ADVANCE, evict_and_wait)
923 1 : .await
924 1 : .expect_err("should had been a timeout since we are holding the layer resident");
925 1 : }
926 1 :
927 1 : // 1 == we only evict one of the layers
928 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
929 1 :
930 2 : drop(resident);
931 1 :
932 1 : // run any spawned
933 2 : tokio::time::sleep(ADVANCE).await;
934 1 :
935 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
936 1 :
937 2 : assert_eq!(
938 1 : 1,
939 2 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
940 1 : );
941 1 : }
942 1 : }
943 :
944 : /// A test case to remind you the cost of these structures. You can bump the size limit
945 : /// below if it is really necessary to add more fields to the structures.
946 : #[test]
947 : #[cfg(target_arch = "x86_64")]
948 1 : fn layer_size() {
949 1 : assert_eq!(size_of::<LayerAccessStats>(), 8);
950 1 : assert_eq!(size_of::<PersistentLayerDesc>(), 104);
951 1 : assert_eq!(size_of::<LayerInner>(), 296);
952 : // it also has the utf8 path
953 1 : }
954 :
955 : struct SpawnBlockingPoolHelper {
956 : awaited_by_spawn_blocking_tasks: Completion,
957 : blocking_tasks: JoinSet<()>,
958 : }
959 :
960 : impl SpawnBlockingPoolHelper {
961 : /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
962 : /// release is called.
963 : ///
964 : /// In the tests this can be used to ensure something cannot be started on the target runtimes
965 : /// spawn_blocking pool.
966 : ///
967 : /// This should be no issue nowdays, because nextest runs each test in it's own process.
968 1 : async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
969 1 : let default_max_blocking_threads = 512;
970 :
971 1 : Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
972 1 : }
973 :
974 13 : async fn consume_all_spawn_blocking_threads0(
975 13 : handle: &tokio::runtime::Handle,
976 13 : threads: usize,
977 13 : ) -> Self {
978 13 : assert_ne!(threads, 0);
979 :
980 13 : let (completion, barrier) = completion::channel();
981 13 : let (started, starts_completed) = completion::channel();
982 :
983 13 : let mut blocking_tasks = JoinSet::new();
984 :
985 13 : for _ in 0..threads {
986 3590 : let barrier = barrier.clone();
987 3590 : let started = started.clone();
988 3590 : blocking_tasks.spawn_blocking_on(
989 3590 : move || {
990 3590 : drop(started);
991 3590 : tokio::runtime::Handle::current().block_on(barrier.wait());
992 3590 : },
993 3590 : handle,
994 : );
995 : }
996 :
997 13 : drop(started);
998 :
999 13 : starts_completed.wait().await;
1000 :
1001 13 : drop(barrier);
1002 :
1003 13 : tracing::trace!("consumed all threads");
1004 :
1005 13 : SpawnBlockingPoolHelper {
1006 13 : awaited_by_spawn_blocking_tasks: completion,
1007 13 : blocking_tasks,
1008 13 : }
1009 13 : }
1010 :
1011 : /// Release all previously blocked spawn_blocking threads
1012 13 : async fn release(self) {
1013 : let SpawnBlockingPoolHelper {
1014 13 : awaited_by_spawn_blocking_tasks,
1015 13 : mut blocking_tasks,
1016 13 : } = self;
1017 :
1018 13 : drop(awaited_by_spawn_blocking_tasks);
1019 :
1020 3603 : while let Some(res) = blocking_tasks.join_next().await {
1021 3590 : res.expect("none of the tasks should had panicked");
1022 3590 : }
1023 :
1024 13 : tracing::trace!("released all threads");
1025 13 : }
1026 :
1027 : /// In the tests it is used as an easy way of making sure something scheduled on the target
1028 : /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
1029 : /// before our tasks have a chance to schedule and complete.
1030 6 : async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
1031 6 : Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
1032 6 : }
1033 :
1034 11 : async fn consume_and_release_all_of_spawn_blocking_threads0(
1035 11 : handle: &tokio::runtime::Handle,
1036 11 : threads: usize,
1037 11 : ) {
1038 11 : Self::consume_all_spawn_blocking_threads0(handle, threads)
1039 11 : .await
1040 11 : .release()
1041 11 : .await
1042 11 : }
1043 : }
1044 :
1045 : #[test]
1046 1 : fn spawn_blocking_pool_helper_actually_works() {
1047 : // create a custom runtime for which we know and control how many blocking threads it has
1048 : //
1049 : // because the amount is not configurable for our helper, expect the same amount as
1050 : // BACKGROUND_RUNTIME using the tokio defaults would have.
1051 1 : let rt = tokio::runtime::Builder::new_current_thread()
1052 1 : .max_blocking_threads(1)
1053 1 : .enable_all()
1054 1 : .build()
1055 1 : .unwrap();
1056 :
1057 1 : let handle = rt.handle();
1058 :
1059 1 : rt.block_on(async move {
1060 : // this will not return until all threads are spun up and actually executing the code
1061 : // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
1062 1 : let consumed =
1063 1 : SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
1064 :
1065 1 : println!("consumed");
1066 :
1067 1 : let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
1068 : // this will not get to run before we release
1069 1 : }));
1070 :
1071 1 : println!("spawned");
1072 :
1073 1 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
1074 1 : .await
1075 1 : .expect_err("the task should not have gotten to run yet");
1076 :
1077 1 : println!("tried to join");
1078 :
1079 1 : consumed.release().await;
1080 :
1081 1 : println!("released");
1082 :
1083 1 : tokio::time::timeout(std::time::Duration::from_secs(1), jh)
1084 1 : .await
1085 1 : .expect("no timeout")
1086 1 : .expect("no join error");
1087 :
1088 1 : println!("joined");
1089 1 : });
1090 1 : }
1091 :
1092 : /// Drop the low bits from a time, to emulate the precision loss in LayerAccessStats
1093 4 : fn lowres_time(hires: SystemTime) -> SystemTime {
1094 4 : let ts = hires.duration_since(UNIX_EPOCH).unwrap().as_secs();
1095 4 : UNIX_EPOCH + Duration::from_secs(ts)
1096 4 : }
1097 :
1098 : #[test]
1099 1 : fn access_stats() {
1100 1 : let access_stats = LayerAccessStats::default();
1101 : // Default is visible
1102 1 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
1103 :
1104 1 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1105 1 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
1106 1 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1107 1 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
1108 :
1109 1 : let rtime = UNIX_EPOCH + Duration::from_secs(2000000000);
1110 1 : access_stats.record_residence_event_at(rtime);
1111 1 : assert_eq!(access_stats.latest_activity(), lowres_time(rtime));
1112 :
1113 1 : let atime = UNIX_EPOCH + Duration::from_secs(2100000000);
1114 1 : access_stats.record_access_at(atime);
1115 1 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1116 :
1117 : // Setting visibility doesn't clobber access time
1118 1 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1119 1 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1120 1 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1121 1 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1122 :
1123 : // Recording access implicitly makes layer visible, if it wasn't already
1124 1 : let atime = UNIX_EPOCH + Duration::from_secs(2200000000);
1125 1 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1126 1 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
1127 1 : assert!(access_stats.record_access_at(atime));
1128 1 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1129 1 : assert!(!access_stats.record_access_at(atime));
1130 1 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1131 1 : }
1132 :
1133 : #[test]
1134 1 : fn access_stats_2038() {
1135 : // The access stats structure uses a timestamp representation that will run out
1136 : // of bits in 2038. One year before that, this unit test will start failing.
1137 :
1138 1 : let one_year_from_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
1139 1 : + Duration::from_secs(3600 * 24 * 365);
1140 :
1141 1 : assert!(one_year_from_now.as_secs() < (2 << 31));
1142 1 : }
|