Line data Source code
1 : use std::time::UNIX_EPOCH;
2 :
3 : use pageserver_api::key::{CONTROLFILE_KEY, Key};
4 : use tokio::task::JoinSet;
5 : use utils::completion::{self, Completion};
6 : use utils::id::TimelineId;
7 :
8 : use super::failpoints::{Failpoint, FailpointKind};
9 : use super::*;
10 : use crate::context::DownloadBehavior;
11 : use crate::tenant::harness::{TenantHarness, test_img};
12 : use crate::tenant::storage_layer::{IoConcurrency, LayerVisibilityHint};
13 :
14 : /// Used in tests to advance a future to wanted await point, and not futher.
15 : const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
16 :
17 : /// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
18 : /// timeout uses to advance futures.
19 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
20 :
21 : /// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
22 : #[tokio::test]
23 1 : async fn smoke_test() {
24 1 : let handle = tokio::runtime::Handle::current();
25 1 :
26 1 : let h = TenantHarness::create("smoke_test").await.unwrap();
27 1 : let span = h.span();
28 1 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
29 1 : let (tenant, ctx) = h.load().await;
30 1 : let io_concurrency = IoConcurrency::spawn_for_test();
31 1 :
32 1 : let image_layers = vec![(
33 1 : Lsn(0x40),
34 1 : vec![(
35 1 : Key::from_hex("620000000033333333444444445500000000").unwrap(),
36 1 : test_img("foo"),
37 1 : )],
38 1 : )];
39 1 :
40 1 : // Create a test timeline with one real layer, and one synthetic test layer. The synthetic
41 1 : // one is only there so that we can GC the real one without leaving the timeline's metadata
42 1 : // empty, which is an illegal state (see [`IndexPart::validate`]).
43 1 : let timeline = tenant
44 1 : .create_test_timeline_with_layers(
45 1 : TimelineId::generate(),
46 1 : Lsn(0x10),
47 1 : 14,
48 1 : &ctx,
49 1 : Default::default(), // in-memory layers
50 1 : Default::default(),
51 1 : image_layers,
52 1 : Lsn(0x100),
53 1 : )
54 1 : .await
55 1 : .unwrap();
56 1 : let ctx = &ctx.with_scope_timeline(&timeline);
57 1 :
58 1 : // Grab one of the timeline's layers to exercise in the test, and the other layer that is just
59 1 : // there to avoid the timeline being illegally empty
60 1 : let (layer, dummy_layer) = {
61 1 : let mut layers = {
62 1 : let layers = timeline.layers.read().await;
63 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
64 1 : };
65 1 :
66 1 : assert_eq!(layers.len(), 2);
67 1 :
68 2 : layers.sort_by_key(|l| l.layer_desc().get_key_range().start);
69 1 : let synthetic_layer = layers.pop().unwrap();
70 1 : let real_layer = layers.pop().unwrap();
71 1 : tracing::info!(
72 1 : "real_layer={:?} ({}), synthetic_layer={:?} ({})",
73 0 : real_layer,
74 0 : real_layer.layer_desc().file_size,
75 0 : synthetic_layer,
76 0 : synthetic_layer.layer_desc().file_size
77 1 : );
78 1 : (real_layer, synthetic_layer)
79 1 : };
80 1 :
81 1 : // all layers created at pageserver are like `layer`, initialized with strong
82 1 : // Arc<DownloadedLayer>.
83 1 :
84 1 : let controlfile_keyspace = KeySpace {
85 1 : ranges: vec![CONTROLFILE_KEY..CONTROLFILE_KEY.next()],
86 1 : };
87 1 :
88 1 : let img_before = {
89 1 : let mut data = ValuesReconstructState::new(io_concurrency.clone());
90 1 : layer
91 1 : .get_values_reconstruct_data(
92 1 : controlfile_keyspace.clone(),
93 1 : Lsn(0x10)..Lsn(0x11),
94 1 : &mut data,
95 1 : ctx,
96 1 : )
97 1 : .await
98 1 : .unwrap();
99 1 :
100 1 : data.keys
101 1 : .remove(&CONTROLFILE_KEY)
102 1 : .expect("must be present")
103 1 : .collect_pending_ios()
104 1 : .await
105 1 : .expect("must not error")
106 1 : .img
107 1 : .take()
108 1 : .expect("tenant harness writes the control file")
109 1 : };
110 1 :
111 1 : // important part is evicting the layer, which can be done when there are no more ResidentLayer
112 1 : // instances -- there currently are none, only two `Layer` values, one in the layermap and on
113 1 : // in scope.
114 1 : layer.evict_and_wait(FOREVER).await.unwrap();
115 1 :
116 1 : // double-evict returns an error, which is valid if both eviction_task and disk usage based
117 1 : // eviction would both evict the same layer at the same time.
118 1 :
119 1 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
120 1 : assert!(matches!(e, EvictionError::NotFound));
121 1 :
122 1 : let dl_ctx = RequestContextBuilder::from(ctx)
123 1 : .download_behavior(DownloadBehavior::Download)
124 1 : .attached_child();
125 1 :
126 1 : // on accesses when the layer is evicted, it will automatically be downloaded.
127 1 : let img_after = {
128 1 : let mut data = ValuesReconstructState::new(io_concurrency.clone());
129 1 : layer
130 1 : .get_values_reconstruct_data(
131 1 : controlfile_keyspace.clone(),
132 1 : Lsn(0x10)..Lsn(0x11),
133 1 : &mut data,
134 1 : &dl_ctx,
135 1 : )
136 1 : .instrument(download_span.clone())
137 1 : .await
138 1 : .unwrap();
139 1 : data.keys
140 1 : .remove(&CONTROLFILE_KEY)
141 1 : .expect("must be present")
142 1 : .collect_pending_ios()
143 1 : .await
144 1 : .expect("must not error")
145 1 : .img
146 1 : .take()
147 1 : .expect("tenant harness writes the control file")
148 1 : };
149 1 :
150 1 : assert_eq!(img_before, img_after);
151 1 :
152 1 : // evict_and_wait can timeout, but it doesn't cancel the evicting itself
153 1 : //
154 1 : // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
155 1 : // artificially slow it down.
156 1 : let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
157 1 :
158 1 : match layer
159 1 : .evict_and_wait(std::time::Duration::ZERO)
160 1 : .await
161 1 : .unwrap_err()
162 1 : {
163 1 : EvictionError::Timeout => {
164 1 : // expected, but note that the eviction is "still ongoing"
165 1 : helper.release().await;
166 1 : // exhaust spawn_blocking pool to ensure it is now complete
167 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
168 1 : .await;
169 1 : }
170 1 : other => unreachable!("{other:?}"),
171 1 : }
172 1 :
173 1 : // only way to query if a layer is resident is to acquire a ResidentLayer instance.
174 1 : // Layer::keep_resident never downloads, but it might initialize if the layer file is found
175 1 : // downloaded locally.
176 1 : let none = layer.keep_resident().await;
177 1 : assert!(
178 1 : none.is_none(),
179 1 : "Expected none, because eviction removed the local file, found: {none:?}"
180 1 : );
181 1 :
182 1 : // plain downloading is rarely needed
183 1 : layer
184 1 : .download_and_keep_resident(&dl_ctx)
185 1 : .instrument(download_span)
186 1 : .await
187 1 : .unwrap();
188 1 :
189 1 : // last important part is deletion on drop: gc and compaction use it for compacted L0 layers
190 1 : // or fully garbage collected layers. deletion means deleting the local file, and scheduling a
191 1 : // deletion of the already unlinked from index_part.json remote file.
192 1 : //
193 1 : // marking a layer to be deleted on drop is irreversible; there is no technical reason against
194 1 : // reversiblity, but currently it is not needed so it is not provided.
195 1 : layer.delete_on_drop();
196 1 :
197 1 : let path = layer.local_path().to_owned();
198 1 :
199 1 : // wait_drop produces an unconnected to Layer future which will resolve when the
200 1 : // LayerInner::drop has completed.
201 1 : let mut wait_drop = std::pin::pin!(layer.wait_drop());
202 1 :
203 1 : // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
204 1 : // until here
205 1 : tokio::time::pause();
206 1 : tokio::time::timeout(ADVANCE, &mut wait_drop)
207 1 : .await
208 1 : .expect_err("should had timed out because two strong references exist");
209 1 :
210 1 : tokio::fs::metadata(&path)
211 1 : .await
212 1 : .expect("the local layer file still exists");
213 1 :
214 1 : let rtc = &timeline.remote_client;
215 1 :
216 1 : // Simulate GC removing our test layer.
217 1 : {
218 1 : let mut g = timeline.layers.write().await;
219 1 :
220 1 : let layers = &[layer];
221 1 : g.open_mut().unwrap().finish_gc_timeline(layers);
222 1 :
223 1 : // this just updates the remote_physical_size for demonstration purposes
224 1 : rtc.schedule_gc_update(layers).unwrap();
225 1 : }
226 1 :
227 1 : // when strong references are dropped, the file is deleted and remote deletion is scheduled
228 1 : wait_drop.await;
229 1 :
230 1 : let e = tokio::fs::metadata(&path)
231 1 : .await
232 1 : .expect_err("the local file is deleted");
233 1 : assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
234 1 :
235 1 : rtc.wait_completion().await.unwrap();
236 1 :
237 1 : assert_eq!(
238 1 : rtc.get_remote_physical_size(),
239 1 : dummy_layer.metadata().file_size
240 1 : );
241 1 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
242 1 : }
243 :
244 : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
245 : /// time. Now both of them complete per Arc drop semantics.
246 : #[tokio::test(start_paused = true)]
247 1 : async fn evict_and_wait_on_wanted_deleted() {
248 1 : // this is the runtime on which Layer spawns the blocking tasks on
249 1 : let handle = tokio::runtime::Handle::current();
250 1 :
251 1 : let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
252 1 : .await
253 1 : .unwrap();
254 1 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
255 1 : let (tenant, ctx) = h.load().await;
256 1 :
257 1 : let timeline = tenant
258 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
259 1 : .await
260 1 : .unwrap();
261 1 :
262 1 : let layer = {
263 1 : let mut layers = {
264 1 : let layers = timeline.layers.read().await;
265 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
266 1 : };
267 1 :
268 1 : assert_eq!(layers.len(), 1);
269 1 :
270 1 : layers.swap_remove(0)
271 1 : };
272 1 :
273 1 : // setup done
274 1 :
275 1 : let resident = layer.keep_resident().await.unwrap();
276 1 :
277 1 : {
278 1 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
279 1 :
280 1 : // drive the future to await on the status channel
281 1 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
282 1 : .await
283 1 : .expect_err("should had been a timeout since we are holding the layer resident");
284 1 :
285 1 : layer.delete_on_drop();
286 1 :
287 1 : drop(resident);
288 1 :
289 1 : // make sure the eviction task gets to run
290 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
291 1 :
292 1 : let resident = layer.keep_resident().await;
293 1 : assert!(
294 1 : resident.is_none(),
295 1 : "keep_resident should not have re-initialized: {resident:?}"
296 1 : );
297 1 :
298 1 : evict_and_wait
299 1 : .await
300 1 : .expect("evict_and_wait should had succeeded");
301 1 :
302 1 : // works as intended
303 1 : }
304 1 :
305 1 : // assert that once we remove the `layer` from the layer map and drop our reference,
306 1 : // the deletion of the layer in remote_storage happens.
307 1 : {
308 1 : let mut layers = timeline.layers.write().await;
309 1 : layers.open_mut().unwrap().finish_gc_timeline(&[layer]);
310 1 : }
311 1 :
312 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
313 1 :
314 1 : assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
315 1 : assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
316 1 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
317 1 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
318 1 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
319 1 : }
320 :
321 : /// This test ensures we are able to read the layer while the layer eviction has been
322 : /// started but not completed.
323 : #[test]
324 1 : fn read_wins_pending_eviction() {
325 1 : let rt = tokio::runtime::Builder::new_current_thread()
326 1 : .max_blocking_threads(1)
327 1 : .enable_all()
328 1 : .start_paused(true)
329 1 : .build()
330 1 : .unwrap();
331 1 :
332 1 : rt.block_on(async move {
333 1 : // this is the runtime on which Layer spawns the blocking tasks on
334 1 : let handle = tokio::runtime::Handle::current();
335 1 : let h = TenantHarness::create("read_wins_pending_eviction")
336 1 : .await
337 1 : .unwrap();
338 1 : let (tenant, ctx) = h.load().await;
339 1 : let span = h.span();
340 1 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
341 :
342 1 : let timeline = tenant
343 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
344 1 : .await
345 1 : .unwrap();
346 1 : let ctx = ctx.with_scope_timeline(&timeline);
347 :
348 1 : let layer = {
349 1 : let mut layers = {
350 1 : let layers = timeline.layers.read().await;
351 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
352 1 : };
353 1 :
354 1 : assert_eq!(layers.len(), 1);
355 :
356 1 : layers.swap_remove(0)
357 : };
358 :
359 : // setup done
360 :
361 1 : let resident = layer.keep_resident().await.unwrap();
362 1 :
363 1 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
364 1 :
365 1 : // drive the future to await on the status channel
366 1 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
367 1 : .await
368 1 : .expect_err("should had been a timeout since we are holding the layer resident");
369 1 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
370 :
371 1 : let (completion, barrier) = utils::completion::channel();
372 1 : let (arrival, arrived_at_barrier) = utils::completion::channel();
373 1 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
374 1 : Some(arrival),
375 1 : barrier,
376 1 : ));
377 1 :
378 1 : // now the eviction cannot proceed because the threads are consumed while completion exists
379 1 : drop(resident);
380 1 : arrived_at_barrier.wait().await;
381 1 : assert!(!layer.is_likely_resident());
382 :
383 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
384 1 : layer
385 1 : .0
386 1 : .get_or_maybe_download(false, &ctx)
387 1 : .instrument(download_span)
388 1 : .await
389 1 : .expect("should had reinitialized without downloading");
390 1 :
391 1 : assert!(layer.is_likely_resident());
392 :
393 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
394 1 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
395 1 : .await
396 1 : .expect("no timeout, because get_or_maybe_download re-initialized")
397 1 : .expect_err("eviction should not have succeeded because re-initialized");
398 1 :
399 1 : // works as intended: evictions lose to "downloads"
400 1 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
401 1 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
402 :
403 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
404 : // because of a failpoint
405 1 : assert_eq!(
406 1 : 0,
407 1 : LAYER_IMPL_METRICS
408 1 : .cancelled_evictions
409 1 : .values()
410 9 : .map(|ctr| ctr.get())
411 1 : .sum::<u64>()
412 1 : );
413 :
414 1 : drop(completion);
415 1 :
416 1 : tokio::time::sleep(ADVANCE).await;
417 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
418 1 : .await;
419 :
420 1 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
421 :
422 : // now we finally can observe the original eviction failing
423 : // it would had been possible to observe it earlier, but here it is guaranteed to have
424 : // happened.
425 1 : assert_eq!(
426 1 : 1,
427 1 : LAYER_IMPL_METRICS
428 1 : .cancelled_evictions
429 1 : .values()
430 9 : .map(|ctr| ctr.get())
431 1 : .sum::<u64>()
432 1 : );
433 :
434 1 : assert_eq!(
435 1 : 1,
436 1 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
437 1 : );
438 :
439 1 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
440 1 : });
441 1 : }
442 :
443 : /// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
444 : #[test]
445 1 : fn multiple_pending_evictions_in_order() {
446 1 : let name = "multiple_pending_evictions_in_order";
447 1 : let in_order = true;
448 1 : multiple_pending_evictions_scenario(name, in_order);
449 1 : }
450 :
451 : /// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
452 : #[test]
453 1 : fn multiple_pending_evictions_out_of_order() {
454 1 : let name = "multiple_pending_evictions_out_of_order";
455 1 : let in_order = false;
456 1 : multiple_pending_evictions_scenario(name, in_order);
457 1 : }
458 :
459 2 : fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
460 2 : let rt = tokio::runtime::Builder::new_current_thread()
461 2 : .max_blocking_threads(1)
462 2 : .enable_all()
463 2 : .start_paused(true)
464 2 : .build()
465 2 : .unwrap();
466 2 :
467 2 : rt.block_on(async move {
468 2 : // this is the runtime on which Layer spawns the blocking tasks on
469 2 : let handle = tokio::runtime::Handle::current();
470 2 : let h = TenantHarness::create(name).await.unwrap();
471 2 : let (tenant, ctx) = h.load().await;
472 2 : let span = h.span();
473 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
474 :
475 2 : let timeline = tenant
476 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
477 2 : .await
478 2 : .unwrap();
479 2 : let ctx = ctx.with_scope_timeline(&timeline);
480 :
481 2 : let layer = {
482 2 : let mut layers = {
483 2 : let layers = timeline.layers.read().await;
484 2 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
485 2 : };
486 2 :
487 2 : assert_eq!(layers.len(), 1);
488 :
489 2 : layers.swap_remove(0)
490 : };
491 :
492 : // setup done
493 :
494 2 : let resident = layer.keep_resident().await.unwrap();
495 2 :
496 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
497 2 :
498 2 : // drive the future to await on the status channel
499 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
500 2 : .await
501 2 : .expect_err("should had been a timeout since we are holding the layer resident");
502 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
503 :
504 2 : let (completion1, barrier) = utils::completion::channel();
505 2 : let mut completion1 = Some(completion1);
506 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
507 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
508 2 : Some(arrival),
509 2 : barrier,
510 2 : ));
511 2 :
512 2 : // now the eviction cannot proceed because we are simulating arbitrary long delay for the
513 2 : // eviction task start.
514 2 : drop(resident);
515 2 : assert!(!layer.is_likely_resident());
516 :
517 2 : arrived_at_barrier.wait().await;
518 :
519 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
520 2 : layer
521 2 : .0
522 2 : .get_or_maybe_download(false, &ctx)
523 2 : .instrument(download_span)
524 2 : .await
525 2 : .expect("should had reinitialized without downloading");
526 2 :
527 2 : assert!(layer.is_likely_resident());
528 :
529 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
530 2 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
531 2 : .await
532 2 : .expect("no timeout, because get_or_maybe_download re-initialized")
533 2 : .expect_err("eviction should not have succeeded because re-initialized");
534 2 :
535 2 : // works as intended: evictions lose to "downloads"
536 2 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
537 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
538 :
539 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
540 : // because of a failpoint
541 2 : assert_eq!(
542 2 : 0,
543 2 : LAYER_IMPL_METRICS
544 2 : .cancelled_evictions
545 2 : .values()
546 18 : .map(|ctr| ctr.get())
547 2 : .sum::<u64>()
548 2 : );
549 :
550 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
551 :
552 : // configure another failpoint for the second eviction -- evictions are per initialization,
553 : // so now that we've reinitialized the inner, we get to run two of them at the same time.
554 2 : let (completion2, barrier) = utils::completion::channel();
555 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
556 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
557 2 : Some(arrival),
558 2 : barrier,
559 2 : ));
560 2 :
561 2 : let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
562 2 :
563 2 : // advance to the wait on the queue
564 2 : tokio::time::timeout(ADVANCE, &mut second_eviction)
565 2 : .await
566 2 : .expect_err("timeout because failpoint is blocking");
567 2 :
568 2 : arrived_at_barrier.wait().await;
569 :
570 2 : assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
571 :
572 2 : let mut release_earlier_eviction = |expected_reason| {
573 2 : assert_eq!(
574 2 : 0,
575 2 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
576 2 : );
577 :
578 2 : drop(completion1.take().unwrap());
579 2 :
580 2 : let handle = &handle;
581 :
582 2 : async move {
583 2 : tokio::time::sleep(ADVANCE).await;
584 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
585 2 : handle, 1,
586 2 : )
587 2 : .await;
588 :
589 2 : assert_eq!(
590 2 : 1,
591 2 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
592 2 : );
593 2 : }
594 2 : };
595 :
596 2 : if in_order {
597 1 : release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
598 1 : }
599 :
600 : // release the later eviction which is for the current version
601 2 : drop(completion2);
602 2 : tokio::time::sleep(ADVANCE).await;
603 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
604 2 : .await;
605 :
606 2 : if !in_order {
607 1 : release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
608 1 : }
609 :
610 2 : tokio::time::timeout(ADVANCE, &mut second_eviction)
611 2 : .await
612 2 : .expect("eviction goes through now that spawn_blocking is unclogged")
613 2 : .expect("eviction should succeed, because version matches");
614 2 :
615 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
616 :
617 : // ensure the cancelled are unchanged
618 2 : assert_eq!(
619 2 : 1,
620 2 : LAYER_IMPL_METRICS
621 2 : .cancelled_evictions
622 2 : .values()
623 18 : .map(|ctr| ctr.get())
624 2 : .sum::<u64>()
625 2 : );
626 :
627 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
628 2 : });
629 2 : }
630 :
631 : /// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
632 : /// a `Layer::keep_resident` call.
633 : ///
634 : /// This matters because cancelling the eviction would leave us in a state where the file is on
635 : /// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
636 : /// have non-repairing `Layer::is_likely_resident`.
637 : #[tokio::test(start_paused = true)]
638 1 : async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
639 1 : let handle = tokio::runtime::Handle::current();
640 1 : let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
641 1 : .await
642 1 : .unwrap();
643 1 : let (tenant, ctx) = h.load().await;
644 1 :
645 1 : let timeline = tenant
646 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
647 1 : .await
648 1 : .unwrap();
649 1 : let ctx = ctx.with_scope_timeline(&timeline);
650 1 :
651 1 : // This test does downloads
652 1 : let ctx = RequestContextBuilder::from(&ctx)
653 1 : .download_behavior(DownloadBehavior::Download)
654 1 : .attached_child();
655 1 :
656 1 : let layer = {
657 1 : let mut layers = {
658 1 : let layers = timeline.layers.read().await;
659 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
660 1 : };
661 1 :
662 1 : assert_eq!(layers.len(), 1);
663 1 :
664 1 : layers.swap_remove(0)
665 1 : };
666 1 :
667 1 : // this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
668 1 : // Err) at the right time as in "during" the `LayerInner::needs_download`.
669 1 : layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
670 1 :
671 1 : let (completion, barrier) = utils::completion::channel();
672 1 : let (arrival, arrived_at_barrier) = utils::completion::channel();
673 1 :
674 1 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
675 1 : Some(arrival),
676 1 : barrier,
677 1 : ));
678 1 :
679 1 : tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
680 1 : .await
681 1 : .expect_err("should had advanced to waiting on channel");
682 1 :
683 1 : arrived_at_barrier.wait().await;
684 1 :
685 1 : // simulate a cancelled read which is cancelled before it gets to re-initialize
686 1 : let e = layer
687 1 : .0
688 1 : .get_or_maybe_download(false, &ctx)
689 1 : .await
690 1 : .unwrap_err();
691 1 : assert!(
692 1 : matches!(
693 1 : e,
694 1 : DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
695 1 : ),
696 1 : "{e:?}"
697 1 : );
698 1 :
699 1 : assert!(
700 1 : layer.0.needs_download().await.unwrap().is_none(),
701 1 : "file is still on disk"
702 1 : );
703 1 :
704 1 : // release the eviction task
705 1 : drop(completion);
706 1 : tokio::time::sleep(ADVANCE).await;
707 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
708 1 :
709 1 : // failpoint is still enabled, but it is not hit
710 1 : let e = layer
711 1 : .0
712 1 : .get_or_maybe_download(false, &ctx)
713 1 : .await
714 1 : .unwrap_err();
715 1 : assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
716 1 :
717 1 : // failpoint is not counted as cancellation either
718 1 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
719 1 : }
720 :
721 : #[tokio::test(start_paused = true)]
722 1 : async fn evict_and_wait_does_not_wait_for_download() {
723 1 : // let handle = tokio::runtime::Handle::current();
724 1 : let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
725 1 : .await
726 1 : .unwrap();
727 1 : let (tenant, ctx) = h.load().await;
728 1 : let span = h.span();
729 1 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
730 1 :
731 1 : let timeline = tenant
732 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
733 1 : .await
734 1 : .unwrap();
735 1 : let ctx = ctx.with_scope_timeline(&timeline);
736 1 :
737 1 : // This test does downloads
738 1 : let ctx = RequestContextBuilder::from(&ctx)
739 1 : .download_behavior(DownloadBehavior::Download)
740 1 : .attached_child();
741 1 :
742 1 : let layer = {
743 1 : let mut layers = {
744 1 : let layers = timeline.layers.read().await;
745 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
746 1 : };
747 1 :
748 1 : assert_eq!(layers.len(), 1);
749 1 :
750 1 : layers.swap_remove(0)
751 1 : };
752 1 :
753 1 : // kind of forced setup: start an eviction but do not allow it progress until we are
754 1 : // downloading
755 1 : let (eviction_can_continue, barrier) = utils::completion::channel();
756 1 : let (arrival, eviction_arrived) = utils::completion::channel();
757 1 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
758 1 : Some(arrival),
759 1 : barrier,
760 1 : ));
761 1 :
762 1 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
763 1 :
764 1 : // use this once-awaited other_evict to synchronize with the eviction
765 1 : let other_evict = layer.evict_and_wait(FOREVER);
766 1 :
767 1 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
768 1 : .await
769 1 : .expect_err("should had advanced");
770 1 : eviction_arrived.wait().await;
771 1 : drop(eviction_can_continue);
772 1 : other_evict.await.unwrap();
773 1 :
774 1 : // now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
775 1 : assert!(!layer.is_likely_resident());
776 1 :
777 1 : // following new evict_and_wait will fail until we've completed the download
778 1 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
779 1 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
780 1 :
781 1 : let (download_can_continue, barrier) = utils::completion::channel();
782 1 : let (arrival, _download_arrived) = utils::completion::channel();
783 1 : layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
784 1 :
785 1 : let mut download = std::pin::pin!(
786 1 : layer
787 1 : .0
788 1 : .get_or_maybe_download(true, &ctx)
789 1 : .instrument(download_span)
790 1 : );
791 1 :
792 1 : assert!(
793 1 : !layer.is_likely_resident(),
794 1 : "during download layer is evicted"
795 1 : );
796 1 :
797 1 : tokio::time::timeout(ADVANCE, &mut download)
798 1 : .await
799 1 : .expect_err("should had timed out because of failpoint");
800 1 :
801 1 : // now we finally get to continue, and because the latest state is downloading, we deduce that
802 1 : // original eviction succeeded
803 1 : evict_and_wait.await.unwrap();
804 1 :
805 1 : // however a new evict_and_wait will fail
806 1 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
807 1 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
808 1 :
809 1 : assert!(!layer.is_likely_resident());
810 1 :
811 1 : drop(download_can_continue);
812 1 : download.await.expect("download should had succeeded");
813 1 : assert!(layer.is_likely_resident());
814 1 :
815 1 : // only now can we evict
816 1 : layer.evict_and_wait(FOREVER).await.unwrap();
817 1 : }
818 :
819 : /// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
820 : /// which is the last value.
821 : ///
822 : /// Also checks that the same does not happen on a non-evicted layer (regression test).
823 : #[tokio::test(start_paused = true)]
824 1 : async fn eviction_cancellation_on_drop() {
825 1 : use bytes::Bytes;
826 1 : use pageserver_api::value::Value;
827 1 :
828 1 : // this is the runtime on which Layer spawns the blocking tasks on
829 1 : let handle = tokio::runtime::Handle::current();
830 1 :
831 1 : let h = TenantHarness::create("eviction_cancellation_on_drop")
832 1 : .await
833 1 : .unwrap();
834 1 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
835 1 : let (tenant, ctx) = h.load().await;
836 1 :
837 1 : let timeline = tenant
838 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
839 1 : .await
840 1 : .unwrap();
841 1 :
842 1 : {
843 1 : // create_test_timeline wrote us one layer, write another
844 1 : let mut writer = timeline.writer().await;
845 1 : writer
846 1 : .put(
847 1 : pageserver_api::key::Key::from_i128(5),
848 1 : Lsn(0x20),
849 1 : &Value::Image(Bytes::from_static(b"this does not matter either")),
850 1 : &ctx,
851 1 : )
852 1 : .await
853 1 : .unwrap();
854 1 :
855 1 : writer.finish_write(Lsn(0x20));
856 1 : }
857 1 :
858 1 : timeline.freeze_and_flush().await.unwrap();
859 1 :
860 1 : // wait for the upload to complete so our Arc::strong_count assertion holds
861 1 : timeline.remote_client.wait_completion().await.unwrap();
862 1 :
863 1 : let (evicted_layer, not_evicted) = {
864 1 : let mut layers = {
865 1 : let mut guard = timeline.layers.write().await;
866 1 : let layers = guard.likely_resident_layers().cloned().collect::<Vec<_>>();
867 1 : // remove the layers from layermap
868 1 : guard.open_mut().unwrap().finish_gc_timeline(&layers);
869 1 :
870 1 : layers
871 1 : };
872 1 :
873 1 : assert_eq!(layers.len(), 2);
874 1 :
875 1 : (layers.pop().unwrap(), layers.pop().unwrap())
876 1 : };
877 1 :
878 1 : let victims = [(evicted_layer, true), (not_evicted, false)];
879 1 :
880 3 : for (victim, evict) in victims {
881 2 : let resident = victim.keep_resident().await.unwrap();
882 2 : drop(victim);
883 2 :
884 2 : assert_eq!(Arc::strong_count(&resident.owner.0), 1);
885 1 :
886 2 : if evict {
887 1 : let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
888 1 :
889 1 : // drive the future to await on the status channel, and then drop it
890 1 : tokio::time::timeout(ADVANCE, evict_and_wait)
891 1 : .await
892 1 : .expect_err("should had been a timeout since we are holding the layer resident");
893 1 : }
894 1 :
895 1 : // 1 == we only evict one of the layers
896 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
897 1 :
898 2 : drop(resident);
899 2 :
900 2 : // run any spawned
901 2 : tokio::time::sleep(ADVANCE).await;
902 1 :
903 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
904 1 :
905 2 : assert_eq!(
906 2 : 1,
907 2 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
908 2 : );
909 1 : }
910 1 : }
911 :
912 : /// A test case to remind you the cost of these structures. You can bump the size limit
913 : /// below if it is really necessary to add more fields to the structures.
914 : #[test]
915 : #[cfg(target_arch = "x86_64")]
916 1 : fn layer_size() {
917 1 : assert_eq!(size_of::<LayerAccessStats>(), 8);
918 1 : assert_eq!(size_of::<PersistentLayerDesc>(), 104);
919 1 : assert_eq!(size_of::<LayerInner>(), 296);
920 : // it also has the utf8 path
921 1 : }
922 :
923 : struct SpawnBlockingPoolHelper {
924 : awaited_by_spawn_blocking_tasks: Completion,
925 : blocking_tasks: JoinSet<()>,
926 : }
927 :
928 : impl SpawnBlockingPoolHelper {
929 : /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
930 : /// release is called.
931 : ///
932 : /// In the tests this can be used to ensure something cannot be started on the target runtimes
933 : /// spawn_blocking pool.
934 : ///
935 : /// This should be no issue nowdays, because nextest runs each test in it's own process.
936 1 : async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
937 1 : let default_max_blocking_threads = 512;
938 1 :
939 1 : Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
940 1 : }
941 :
942 13 : async fn consume_all_spawn_blocking_threads0(
943 13 : handle: &tokio::runtime::Handle,
944 13 : threads: usize,
945 13 : ) -> Self {
946 13 : assert_ne!(threads, 0);
947 :
948 13 : let (completion, barrier) = completion::channel();
949 13 : let (started, starts_completed) = completion::channel();
950 13 :
951 13 : let mut blocking_tasks = JoinSet::new();
952 13 :
953 3590 : for _ in 0..threads {
954 3590 : let barrier = barrier.clone();
955 3590 : let started = started.clone();
956 3590 : blocking_tasks.spawn_blocking_on(
957 3590 : move || {
958 3590 : drop(started);
959 3590 : tokio::runtime::Handle::current().block_on(barrier.wait());
960 3590 : },
961 3590 : handle,
962 3590 : );
963 3590 : }
964 :
965 13 : drop(started);
966 13 :
967 13 : starts_completed.wait().await;
968 :
969 13 : drop(barrier);
970 13 :
971 13 : tracing::trace!("consumed all threads");
972 :
973 13 : SpawnBlockingPoolHelper {
974 13 : awaited_by_spawn_blocking_tasks: completion,
975 13 : blocking_tasks,
976 13 : }
977 13 : }
978 :
979 : /// Release all previously blocked spawn_blocking threads
980 13 : async fn release(self) {
981 13 : let SpawnBlockingPoolHelper {
982 13 : awaited_by_spawn_blocking_tasks,
983 13 : mut blocking_tasks,
984 13 : } = self;
985 13 :
986 13 : drop(awaited_by_spawn_blocking_tasks);
987 :
988 3603 : while let Some(res) = blocking_tasks.join_next().await {
989 3590 : res.expect("none of the tasks should had panicked");
990 3590 : }
991 :
992 13 : tracing::trace!("released all threads");
993 13 : }
994 :
995 : /// In the tests it is used as an easy way of making sure something scheduled on the target
996 : /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
997 : /// before our tasks have a chance to schedule and complete.
998 6 : async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
999 6 : Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
1000 6 : }
1001 :
1002 11 : async fn consume_and_release_all_of_spawn_blocking_threads0(
1003 11 : handle: &tokio::runtime::Handle,
1004 11 : threads: usize,
1005 11 : ) {
1006 11 : Self::consume_all_spawn_blocking_threads0(handle, threads)
1007 11 : .await
1008 11 : .release()
1009 11 : .await
1010 11 : }
1011 : }
1012 :
1013 : #[test]
1014 1 : fn spawn_blocking_pool_helper_actually_works() {
1015 1 : // create a custom runtime for which we know and control how many blocking threads it has
1016 1 : //
1017 1 : // because the amount is not configurable for our helper, expect the same amount as
1018 1 : // BACKGROUND_RUNTIME using the tokio defaults would have.
1019 1 : let rt = tokio::runtime::Builder::new_current_thread()
1020 1 : .max_blocking_threads(1)
1021 1 : .enable_all()
1022 1 : .build()
1023 1 : .unwrap();
1024 1 :
1025 1 : let handle = rt.handle();
1026 1 :
1027 1 : rt.block_on(async move {
1028 : // this will not return until all threads are spun up and actually executing the code
1029 : // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
1030 1 : let consumed =
1031 1 : SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
1032 :
1033 1 : println!("consumed");
1034 1 :
1035 1 : let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
1036 1 : // this will not get to run before we release
1037 1 : }));
1038 1 :
1039 1 : println!("spawned");
1040 1 :
1041 1 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
1042 1 : .await
1043 1 : .expect_err("the task should not have gotten to run yet");
1044 1 :
1045 1 : println!("tried to join");
1046 1 :
1047 1 : consumed.release().await;
1048 :
1049 1 : println!("released");
1050 1 :
1051 1 : tokio::time::timeout(std::time::Duration::from_secs(1), jh)
1052 1 : .await
1053 1 : .expect("no timeout")
1054 1 : .expect("no join error");
1055 1 :
1056 1 : println!("joined");
1057 1 : });
1058 1 : }
1059 :
1060 : /// Drop the low bits from a time, to emulate the precision loss in LayerAccessStats
1061 4 : fn lowres_time(hires: SystemTime) -> SystemTime {
1062 4 : let ts = hires.duration_since(UNIX_EPOCH).unwrap().as_secs();
1063 4 : UNIX_EPOCH + Duration::from_secs(ts)
1064 4 : }
1065 :
1066 : #[test]
1067 1 : fn access_stats() {
1068 1 : let access_stats = LayerAccessStats::default();
1069 1 : // Default is visible
1070 1 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
1071 :
1072 1 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1073 1 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
1074 1 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1075 1 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
1076 :
1077 1 : let rtime = UNIX_EPOCH + Duration::from_secs(2000000000);
1078 1 : access_stats.record_residence_event_at(rtime);
1079 1 : assert_eq!(access_stats.latest_activity(), lowres_time(rtime));
1080 :
1081 1 : let atime = UNIX_EPOCH + Duration::from_secs(2100000000);
1082 1 : access_stats.record_access_at(atime);
1083 1 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1084 :
1085 : // Setting visibility doesn't clobber access time
1086 1 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1087 1 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1088 1 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1089 1 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1090 :
1091 : // Recording access implicitly makes layer visible, if it wasn't already
1092 1 : let atime = UNIX_EPOCH + Duration::from_secs(2200000000);
1093 1 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1094 1 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
1095 1 : assert!(access_stats.record_access_at(atime));
1096 1 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1097 1 : assert!(!access_stats.record_access_at(atime));
1098 1 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1099 1 : }
1100 :
1101 : #[test]
1102 1 : fn access_stats_2038() {
1103 1 : // The access stats structure uses a timestamp representation that will run out
1104 1 : // of bits in 2038. One year before that, this unit test will start failing.
1105 1 :
1106 1 : let one_year_from_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
1107 1 : + Duration::from_secs(3600 * 24 * 365);
1108 1 :
1109 1 : assert!(one_year_from_now.as_secs() < (2 << 31));
1110 1 : }
|