Line data Source code
1 : use std::time::UNIX_EPOCH;
2 :
3 : use pageserver_api::key::{CONTROLFILE_KEY, Key};
4 : use tokio::task::JoinSet;
5 : use utils::completion::{self, Completion};
6 : use utils::id::TimelineId;
7 :
8 : use super::failpoints::{Failpoint, FailpointKind};
9 : use super::*;
10 : use crate::context::DownloadBehavior;
11 : use crate::tenant::harness::{TenantHarness, test_img};
12 : use crate::tenant::storage_layer::{IoConcurrency, LayerVisibilityHint};
13 :
14 : /// Used in tests to advance a future to wanted await point, and not futher.
15 : const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
16 :
17 : /// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
18 : /// timeout uses to advance futures.
19 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
20 :
21 : /// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
22 : #[tokio::test]
23 4 : async fn smoke_test() {
24 4 : let handle = tokio::runtime::Handle::current();
25 4 :
26 4 : let h = TenantHarness::create("smoke_test").await.unwrap();
27 4 : let span = h.span();
28 4 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
29 4 : let (tenant, ctx) = h.load().await;
30 4 : let io_concurrency = IoConcurrency::spawn_for_test();
31 4 :
32 4 : let image_layers = vec![(
33 4 : Lsn(0x40),
34 4 : vec![(
35 4 : Key::from_hex("620000000033333333444444445500000000").unwrap(),
36 4 : test_img("foo"),
37 4 : )],
38 4 : )];
39 4 :
40 4 : // Create a test timeline with one real layer, and one synthetic test layer. The synthetic
41 4 : // one is only there so that we can GC the real one without leaving the timeline's metadata
42 4 : // empty, which is an illegal state (see [`IndexPart::validate`]).
43 4 : let timeline = tenant
44 4 : .create_test_timeline_with_layers(
45 4 : TimelineId::generate(),
46 4 : Lsn(0x10),
47 4 : 14,
48 4 : &ctx,
49 4 : Default::default(), // in-memory layers
50 4 : Default::default(),
51 4 : image_layers,
52 4 : Lsn(0x100),
53 4 : )
54 4 : .await
55 4 : .unwrap();
56 4 : let ctx = &ctx.with_scope_timeline(&timeline);
57 4 :
58 4 : // Grab one of the timeline's layers to exercise in the test, and the other layer that is just
59 4 : // there to avoid the timeline being illegally empty
60 4 : let (layer, dummy_layer) = {
61 4 : let mut layers = {
62 4 : let layers = timeline.layers.read().await;
63 4 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
64 4 : };
65 4 :
66 4 : assert_eq!(layers.len(), 2);
67 4 :
68 8 : layers.sort_by_key(|l| l.layer_desc().get_key_range().start);
69 4 : let synthetic_layer = layers.pop().unwrap();
70 4 : let real_layer = layers.pop().unwrap();
71 4 : tracing::info!(
72 4 : "real_layer={:?} ({}), synthetic_layer={:?} ({})",
73 0 : real_layer,
74 0 : real_layer.layer_desc().file_size,
75 0 : synthetic_layer,
76 0 : synthetic_layer.layer_desc().file_size
77 4 : );
78 4 : (real_layer, synthetic_layer)
79 4 : };
80 4 :
81 4 : // all layers created at pageserver are like `layer`, initialized with strong
82 4 : // Arc<DownloadedLayer>.
83 4 :
84 4 : let controlfile_keyspace = KeySpace {
85 4 : ranges: vec![CONTROLFILE_KEY..CONTROLFILE_KEY.next()],
86 4 : };
87 4 :
88 4 : let img_before = {
89 4 : let mut data = ValuesReconstructState::new(io_concurrency.clone());
90 4 : layer
91 4 : .get_values_reconstruct_data(
92 4 : controlfile_keyspace.clone(),
93 4 : Lsn(0x10)..Lsn(0x11),
94 4 : &mut data,
95 4 : ctx,
96 4 : )
97 4 : .await
98 4 : .unwrap();
99 4 :
100 4 : data.keys
101 4 : .remove(&CONTROLFILE_KEY)
102 4 : .expect("must be present")
103 4 : .collect_pending_ios()
104 4 : .await
105 4 : .expect("must not error")
106 4 : .img
107 4 : .take()
108 4 : .expect("tenant harness writes the control file")
109 4 : };
110 4 :
111 4 : // important part is evicting the layer, which can be done when there are no more ResidentLayer
112 4 : // instances -- there currently are none, only two `Layer` values, one in the layermap and on
113 4 : // in scope.
114 4 : layer.evict_and_wait(FOREVER).await.unwrap();
115 4 :
116 4 : // double-evict returns an error, which is valid if both eviction_task and disk usage based
117 4 : // eviction would both evict the same layer at the same time.
118 4 :
119 4 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
120 4 : assert!(matches!(e, EvictionError::NotFound));
121 4 :
122 4 : // on accesses when the layer is evicted, it will automatically be downloaded.
123 4 : let img_after = {
124 4 : let mut data = ValuesReconstructState::new(io_concurrency.clone());
125 4 : layer
126 4 : .get_values_reconstruct_data(
127 4 : controlfile_keyspace.clone(),
128 4 : Lsn(0x10)..Lsn(0x11),
129 4 : &mut data,
130 4 : ctx,
131 4 : )
132 4 : .instrument(download_span.clone())
133 4 : .await
134 4 : .unwrap();
135 4 : data.keys
136 4 : .remove(&CONTROLFILE_KEY)
137 4 : .expect("must be present")
138 4 : .collect_pending_ios()
139 4 : .await
140 4 : .expect("must not error")
141 4 : .img
142 4 : .take()
143 4 : .expect("tenant harness writes the control file")
144 4 : };
145 4 :
146 4 : assert_eq!(img_before, img_after);
147 4 :
148 4 : // evict_and_wait can timeout, but it doesn't cancel the evicting itself
149 4 : //
150 4 : // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
151 4 : // artificially slow it down.
152 4 : let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
153 4 :
154 4 : match layer
155 4 : .evict_and_wait(std::time::Duration::ZERO)
156 4 : .await
157 4 : .unwrap_err()
158 4 : {
159 4 : EvictionError::Timeout => {
160 4 : // expected, but note that the eviction is "still ongoing"
161 4 : helper.release().await;
162 4 : // exhaust spawn_blocking pool to ensure it is now complete
163 4 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
164 4 : .await;
165 4 : }
166 4 : other => unreachable!("{other:?}"),
167 4 : }
168 4 :
169 4 : // only way to query if a layer is resident is to acquire a ResidentLayer instance.
170 4 : // Layer::keep_resident never downloads, but it might initialize if the layer file is found
171 4 : // downloaded locally.
172 4 : let none = layer.keep_resident().await;
173 4 : assert!(
174 4 : none.is_none(),
175 4 : "Expected none, because eviction removed the local file, found: {none:?}"
176 4 : );
177 4 :
178 4 : // plain downloading is rarely needed
179 4 : layer
180 4 : .download_and_keep_resident(ctx)
181 4 : .instrument(download_span)
182 4 : .await
183 4 : .unwrap();
184 4 :
185 4 : // last important part is deletion on drop: gc and compaction use it for compacted L0 layers
186 4 : // or fully garbage collected layers. deletion means deleting the local file, and scheduling a
187 4 : // deletion of the already unlinked from index_part.json remote file.
188 4 : //
189 4 : // marking a layer to be deleted on drop is irreversible; there is no technical reason against
190 4 : // reversiblity, but currently it is not needed so it is not provided.
191 4 : layer.delete_on_drop();
192 4 :
193 4 : let path = layer.local_path().to_owned();
194 4 :
195 4 : // wait_drop produces an unconnected to Layer future which will resolve when the
196 4 : // LayerInner::drop has completed.
197 4 : let mut wait_drop = std::pin::pin!(layer.wait_drop());
198 4 :
199 4 : // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
200 4 : // until here
201 4 : tokio::time::pause();
202 4 : tokio::time::timeout(ADVANCE, &mut wait_drop)
203 4 : .await
204 4 : .expect_err("should had timed out because two strong references exist");
205 4 :
206 4 : tokio::fs::metadata(&path)
207 4 : .await
208 4 : .expect("the local layer file still exists");
209 4 :
210 4 : let rtc = &timeline.remote_client;
211 4 :
212 4 : // Simulate GC removing our test layer.
213 4 : {
214 4 : let mut g = timeline.layers.write().await;
215 4 :
216 4 : let layers = &[layer];
217 4 : g.open_mut().unwrap().finish_gc_timeline(layers);
218 4 :
219 4 : // this just updates the remote_physical_size for demonstration purposes
220 4 : rtc.schedule_gc_update(layers).unwrap();
221 4 : }
222 4 :
223 4 : // when strong references are dropped, the file is deleted and remote deletion is scheduled
224 4 : wait_drop.await;
225 4 :
226 4 : let e = tokio::fs::metadata(&path)
227 4 : .await
228 4 : .expect_err("the local file is deleted");
229 4 : assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
230 4 :
231 4 : rtc.wait_completion().await.unwrap();
232 4 :
233 4 : assert_eq!(
234 4 : rtc.get_remote_physical_size(),
235 4 : dummy_layer.metadata().file_size
236 4 : );
237 4 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
238 4 : }
239 :
240 : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
241 : /// time. Now both of them complete per Arc drop semantics.
242 : #[tokio::test(start_paused = true)]
243 4 : async fn evict_and_wait_on_wanted_deleted() {
244 4 : // this is the runtime on which Layer spawns the blocking tasks on
245 4 : let handle = tokio::runtime::Handle::current();
246 4 :
247 4 : let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
248 4 : .await
249 4 : .unwrap();
250 4 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
251 4 : let (tenant, ctx) = h.load().await;
252 4 :
253 4 : let timeline = tenant
254 4 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
255 4 : .await
256 4 : .unwrap();
257 4 :
258 4 : let layer = {
259 4 : let mut layers = {
260 4 : let layers = timeline.layers.read().await;
261 4 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
262 4 : };
263 4 :
264 4 : assert_eq!(layers.len(), 1);
265 4 :
266 4 : layers.swap_remove(0)
267 4 : };
268 4 :
269 4 : // setup done
270 4 :
271 4 : let resident = layer.keep_resident().await.unwrap();
272 4 :
273 4 : {
274 4 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
275 4 :
276 4 : // drive the future to await on the status channel
277 4 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
278 4 : .await
279 4 : .expect_err("should had been a timeout since we are holding the layer resident");
280 4 :
281 4 : layer.delete_on_drop();
282 4 :
283 4 : drop(resident);
284 4 :
285 4 : // make sure the eviction task gets to run
286 4 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
287 4 :
288 4 : let resident = layer.keep_resident().await;
289 4 : assert!(
290 4 : resident.is_none(),
291 4 : "keep_resident should not have re-initialized: {resident:?}"
292 4 : );
293 4 :
294 4 : evict_and_wait
295 4 : .await
296 4 : .expect("evict_and_wait should had succeeded");
297 4 :
298 4 : // works as intended
299 4 : }
300 4 :
301 4 : // assert that once we remove the `layer` from the layer map and drop our reference,
302 4 : // the deletion of the layer in remote_storage happens.
303 4 : {
304 4 : let mut layers = timeline.layers.write().await;
305 4 : layers.open_mut().unwrap().finish_gc_timeline(&[layer]);
306 4 : }
307 4 :
308 4 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
309 4 :
310 4 : assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
311 4 : assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
312 4 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
313 4 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
314 4 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
315 4 : }
316 :
317 : /// This test ensures we are able to read the layer while the layer eviction has been
318 : /// started but not completed.
319 : #[test]
320 4 : fn read_wins_pending_eviction() {
321 4 : let rt = tokio::runtime::Builder::new_current_thread()
322 4 : .max_blocking_threads(1)
323 4 : .enable_all()
324 4 : .start_paused(true)
325 4 : .build()
326 4 : .unwrap();
327 4 :
328 4 : rt.block_on(async move {
329 4 : // this is the runtime on which Layer spawns the blocking tasks on
330 4 : let handle = tokio::runtime::Handle::current();
331 4 : let h = TenantHarness::create("read_wins_pending_eviction")
332 4 : .await
333 4 : .unwrap();
334 4 : let (tenant, ctx) = h.load().await;
335 4 : let span = h.span();
336 4 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
337 :
338 4 : let timeline = tenant
339 4 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
340 4 : .await
341 4 : .unwrap();
342 4 : let ctx = ctx.with_scope_timeline(&timeline);
343 :
344 4 : let layer = {
345 4 : let mut layers = {
346 4 : let layers = timeline.layers.read().await;
347 4 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
348 4 : };
349 4 :
350 4 : assert_eq!(layers.len(), 1);
351 :
352 4 : layers.swap_remove(0)
353 : };
354 :
355 : // setup done
356 :
357 4 : let resident = layer.keep_resident().await.unwrap();
358 4 :
359 4 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
360 4 :
361 4 : // drive the future to await on the status channel
362 4 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
363 4 : .await
364 4 : .expect_err("should had been a timeout since we are holding the layer resident");
365 4 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
366 :
367 4 : let (completion, barrier) = utils::completion::channel();
368 4 : let (arrival, arrived_at_barrier) = utils::completion::channel();
369 4 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
370 4 : Some(arrival),
371 4 : barrier,
372 4 : ));
373 4 :
374 4 : // now the eviction cannot proceed because the threads are consumed while completion exists
375 4 : drop(resident);
376 4 : arrived_at_barrier.wait().await;
377 4 : assert!(!layer.is_likely_resident());
378 :
379 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
380 4 : layer
381 4 : .0
382 4 : .get_or_maybe_download(false, &ctx)
383 4 : .instrument(download_span)
384 4 : .await
385 4 : .expect("should had reinitialized without downloading");
386 4 :
387 4 : assert!(layer.is_likely_resident());
388 :
389 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
390 4 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
391 4 : .await
392 4 : .expect("no timeout, because get_or_maybe_download re-initialized")
393 4 : .expect_err("eviction should not have succeeded because re-initialized");
394 4 :
395 4 : // works as intended: evictions lose to "downloads"
396 4 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
397 4 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
398 :
399 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
400 : // because of a failpoint
401 4 : assert_eq!(
402 4 : 0,
403 4 : LAYER_IMPL_METRICS
404 4 : .cancelled_evictions
405 4 : .values()
406 36 : .map(|ctr| ctr.get())
407 4 : .sum::<u64>()
408 4 : );
409 :
410 4 : drop(completion);
411 4 :
412 4 : tokio::time::sleep(ADVANCE).await;
413 4 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
414 4 : .await;
415 :
416 4 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
417 :
418 : // now we finally can observe the original eviction failing
419 : // it would had been possible to observe it earlier, but here it is guaranteed to have
420 : // happened.
421 4 : assert_eq!(
422 4 : 1,
423 4 : LAYER_IMPL_METRICS
424 4 : .cancelled_evictions
425 4 : .values()
426 36 : .map(|ctr| ctr.get())
427 4 : .sum::<u64>()
428 4 : );
429 :
430 4 : assert_eq!(
431 4 : 1,
432 4 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
433 4 : );
434 :
435 4 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
436 4 : });
437 4 : }
438 :
439 : /// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
440 : #[test]
441 4 : fn multiple_pending_evictions_in_order() {
442 4 : let name = "multiple_pending_evictions_in_order";
443 4 : let in_order = true;
444 4 : multiple_pending_evictions_scenario(name, in_order);
445 4 : }
446 :
447 : /// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
448 : #[test]
449 4 : fn multiple_pending_evictions_out_of_order() {
450 4 : let name = "multiple_pending_evictions_out_of_order";
451 4 : let in_order = false;
452 4 : multiple_pending_evictions_scenario(name, in_order);
453 4 : }
454 :
455 8 : fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
456 8 : let rt = tokio::runtime::Builder::new_current_thread()
457 8 : .max_blocking_threads(1)
458 8 : .enable_all()
459 8 : .start_paused(true)
460 8 : .build()
461 8 : .unwrap();
462 8 :
463 8 : rt.block_on(async move {
464 8 : // this is the runtime on which Layer spawns the blocking tasks on
465 8 : let handle = tokio::runtime::Handle::current();
466 8 : let h = TenantHarness::create(name).await.unwrap();
467 8 : let (tenant, ctx) = h.load().await;
468 8 : let span = h.span();
469 8 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
470 :
471 8 : let timeline = tenant
472 8 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
473 8 : .await
474 8 : .unwrap();
475 8 : let ctx = ctx.with_scope_timeline(&timeline);
476 :
477 8 : let layer = {
478 8 : let mut layers = {
479 8 : let layers = timeline.layers.read().await;
480 8 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
481 8 : };
482 8 :
483 8 : assert_eq!(layers.len(), 1);
484 :
485 8 : layers.swap_remove(0)
486 : };
487 :
488 : // setup done
489 :
490 8 : let resident = layer.keep_resident().await.unwrap();
491 8 :
492 8 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
493 8 :
494 8 : // drive the future to await on the status channel
495 8 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
496 8 : .await
497 8 : .expect_err("should had been a timeout since we are holding the layer resident");
498 8 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
499 :
500 8 : let (completion1, barrier) = utils::completion::channel();
501 8 : let mut completion1 = Some(completion1);
502 8 : let (arrival, arrived_at_barrier) = utils::completion::channel();
503 8 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
504 8 : Some(arrival),
505 8 : barrier,
506 8 : ));
507 8 :
508 8 : // now the eviction cannot proceed because we are simulating arbitrary long delay for the
509 8 : // eviction task start.
510 8 : drop(resident);
511 8 : assert!(!layer.is_likely_resident());
512 :
513 8 : arrived_at_barrier.wait().await;
514 :
515 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
516 8 : layer
517 8 : .0
518 8 : .get_or_maybe_download(false, &ctx)
519 8 : .instrument(download_span)
520 8 : .await
521 8 : .expect("should had reinitialized without downloading");
522 8 :
523 8 : assert!(layer.is_likely_resident());
524 :
525 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
526 8 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
527 8 : .await
528 8 : .expect("no timeout, because get_or_maybe_download re-initialized")
529 8 : .expect_err("eviction should not have succeeded because re-initialized");
530 8 :
531 8 : // works as intended: evictions lose to "downloads"
532 8 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
533 8 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
534 :
535 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
536 : // because of a failpoint
537 8 : assert_eq!(
538 8 : 0,
539 8 : LAYER_IMPL_METRICS
540 8 : .cancelled_evictions
541 8 : .values()
542 72 : .map(|ctr| ctr.get())
543 8 : .sum::<u64>()
544 8 : );
545 :
546 8 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
547 :
548 : // configure another failpoint for the second eviction -- evictions are per initialization,
549 : // so now that we've reinitialized the inner, we get to run two of them at the same time.
550 8 : let (completion2, barrier) = utils::completion::channel();
551 8 : let (arrival, arrived_at_barrier) = utils::completion::channel();
552 8 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
553 8 : Some(arrival),
554 8 : barrier,
555 8 : ));
556 8 :
557 8 : let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
558 8 :
559 8 : // advance to the wait on the queue
560 8 : tokio::time::timeout(ADVANCE, &mut second_eviction)
561 8 : .await
562 8 : .expect_err("timeout because failpoint is blocking");
563 8 :
564 8 : arrived_at_barrier.wait().await;
565 :
566 8 : assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
567 :
568 8 : let mut release_earlier_eviction = |expected_reason| {
569 8 : assert_eq!(
570 8 : 0,
571 8 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
572 8 : );
573 :
574 8 : drop(completion1.take().unwrap());
575 8 :
576 8 : let handle = &handle;
577 :
578 8 : async move {
579 8 : tokio::time::sleep(ADVANCE).await;
580 8 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
581 8 : handle, 1,
582 8 : )
583 8 : .await;
584 :
585 8 : assert_eq!(
586 8 : 1,
587 8 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
588 8 : );
589 8 : }
590 8 : };
591 :
592 8 : if in_order {
593 4 : release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
594 4 : }
595 :
596 : // release the later eviction which is for the current version
597 8 : drop(completion2);
598 8 : tokio::time::sleep(ADVANCE).await;
599 8 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
600 8 : .await;
601 :
602 8 : if !in_order {
603 4 : release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
604 4 : }
605 :
606 8 : tokio::time::timeout(ADVANCE, &mut second_eviction)
607 8 : .await
608 8 : .expect("eviction goes through now that spawn_blocking is unclogged")
609 8 : .expect("eviction should succeed, because version matches");
610 8 :
611 8 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
612 :
613 : // ensure the cancelled are unchanged
614 8 : assert_eq!(
615 8 : 1,
616 8 : LAYER_IMPL_METRICS
617 8 : .cancelled_evictions
618 8 : .values()
619 72 : .map(|ctr| ctr.get())
620 8 : .sum::<u64>()
621 8 : );
622 :
623 8 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
624 8 : });
625 8 : }
626 :
627 : /// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
628 : /// a `Layer::keep_resident` call.
629 : ///
630 : /// This matters because cancelling the eviction would leave us in a state where the file is on
631 : /// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
632 : /// have non-repairing `Layer::is_likely_resident`.
633 : #[tokio::test(start_paused = true)]
634 4 : async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
635 4 : let handle = tokio::runtime::Handle::current();
636 4 : let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
637 4 : .await
638 4 : .unwrap();
639 4 : let (tenant, ctx) = h.load().await;
640 4 :
641 4 : let timeline = tenant
642 4 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
643 4 : .await
644 4 : .unwrap();
645 4 : let ctx = ctx.with_scope_timeline(&timeline);
646 4 :
647 4 : // This test does downloads
648 4 : let ctx = RequestContextBuilder::extend(&ctx)
649 4 : .download_behavior(DownloadBehavior::Download)
650 4 : .build();
651 4 : let layer = {
652 4 : let mut layers = {
653 4 : let layers = timeline.layers.read().await;
654 4 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
655 4 : };
656 4 :
657 4 : assert_eq!(layers.len(), 1);
658 4 :
659 4 : layers.swap_remove(0)
660 4 : };
661 4 :
662 4 : // this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
663 4 : // Err) at the right time as in "during" the `LayerInner::needs_download`.
664 4 : layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
665 4 :
666 4 : let (completion, barrier) = utils::completion::channel();
667 4 : let (arrival, arrived_at_barrier) = utils::completion::channel();
668 4 :
669 4 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
670 4 : Some(arrival),
671 4 : barrier,
672 4 : ));
673 4 :
674 4 : tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
675 4 : .await
676 4 : .expect_err("should had advanced to waiting on channel");
677 4 :
678 4 : arrived_at_barrier.wait().await;
679 4 :
680 4 : // simulate a cancelled read which is cancelled before it gets to re-initialize
681 4 : let e = layer
682 4 : .0
683 4 : .get_or_maybe_download(false, &ctx)
684 4 : .await
685 4 : .unwrap_err();
686 4 : assert!(
687 4 : matches!(
688 4 : e,
689 4 : DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
690 4 : ),
691 4 : "{e:?}"
692 4 : );
693 4 :
694 4 : assert!(
695 4 : layer.0.needs_download().await.unwrap().is_none(),
696 4 : "file is still on disk"
697 4 : );
698 4 :
699 4 : // release the eviction task
700 4 : drop(completion);
701 4 : tokio::time::sleep(ADVANCE).await;
702 4 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
703 4 :
704 4 : // failpoint is still enabled, but it is not hit
705 4 : let e = layer
706 4 : .0
707 4 : .get_or_maybe_download(false, &ctx)
708 4 : .await
709 4 : .unwrap_err();
710 4 : assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
711 4 :
712 4 : // failpoint is not counted as cancellation either
713 4 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
714 4 : }
715 :
716 : #[tokio::test(start_paused = true)]
717 4 : async fn evict_and_wait_does_not_wait_for_download() {
718 4 : // let handle = tokio::runtime::Handle::current();
719 4 : let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
720 4 : .await
721 4 : .unwrap();
722 4 : let (tenant, ctx) = h.load().await;
723 4 : let span = h.span();
724 4 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
725 4 :
726 4 : let timeline = tenant
727 4 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
728 4 : .await
729 4 : .unwrap();
730 4 : let ctx = ctx.with_scope_timeline(&timeline);
731 4 :
732 4 : // This test does downloads
733 4 : let ctx = RequestContextBuilder::extend(&ctx)
734 4 : .download_behavior(DownloadBehavior::Download)
735 4 : .build();
736 4 :
737 4 : let layer = {
738 4 : let mut layers = {
739 4 : let layers = timeline.layers.read().await;
740 4 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
741 4 : };
742 4 :
743 4 : assert_eq!(layers.len(), 1);
744 4 :
745 4 : layers.swap_remove(0)
746 4 : };
747 4 :
748 4 : // kind of forced setup: start an eviction but do not allow it progress until we are
749 4 : // downloading
750 4 : let (eviction_can_continue, barrier) = utils::completion::channel();
751 4 : let (arrival, eviction_arrived) = utils::completion::channel();
752 4 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
753 4 : Some(arrival),
754 4 : barrier,
755 4 : ));
756 4 :
757 4 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
758 4 :
759 4 : // use this once-awaited other_evict to synchronize with the eviction
760 4 : let other_evict = layer.evict_and_wait(FOREVER);
761 4 :
762 4 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
763 4 : .await
764 4 : .expect_err("should had advanced");
765 4 : eviction_arrived.wait().await;
766 4 : drop(eviction_can_continue);
767 4 : other_evict.await.unwrap();
768 4 :
769 4 : // now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
770 4 : assert!(!layer.is_likely_resident());
771 4 :
772 4 : // following new evict_and_wait will fail until we've completed the download
773 4 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
774 4 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
775 4 :
776 4 : let (download_can_continue, barrier) = utils::completion::channel();
777 4 : let (arrival, _download_arrived) = utils::completion::channel();
778 4 : layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
779 4 :
780 4 : let mut download = std::pin::pin!(
781 4 : layer
782 4 : .0
783 4 : .get_or_maybe_download(true, &ctx)
784 4 : .instrument(download_span)
785 4 : );
786 4 :
787 4 : assert!(
788 4 : !layer.is_likely_resident(),
789 4 : "during download layer is evicted"
790 4 : );
791 4 :
792 4 : tokio::time::timeout(ADVANCE, &mut download)
793 4 : .await
794 4 : .expect_err("should had timed out because of failpoint");
795 4 :
796 4 : // now we finally get to continue, and because the latest state is downloading, we deduce that
797 4 : // original eviction succeeded
798 4 : evict_and_wait.await.unwrap();
799 4 :
800 4 : // however a new evict_and_wait will fail
801 4 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
802 4 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
803 4 :
804 4 : assert!(!layer.is_likely_resident());
805 4 :
806 4 : drop(download_can_continue);
807 4 : download.await.expect("download should had succeeded");
808 4 : assert!(layer.is_likely_resident());
809 4 :
810 4 : // only now can we evict
811 4 : layer.evict_and_wait(FOREVER).await.unwrap();
812 4 : }
813 :
814 : /// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
815 : /// which is the last value.
816 : ///
817 : /// Also checks that the same does not happen on a non-evicted layer (regression test).
818 : #[tokio::test(start_paused = true)]
819 4 : async fn eviction_cancellation_on_drop() {
820 4 : use bytes::Bytes;
821 4 : use pageserver_api::value::Value;
822 4 :
823 4 : // this is the runtime on which Layer spawns the blocking tasks on
824 4 : let handle = tokio::runtime::Handle::current();
825 4 :
826 4 : let h = TenantHarness::create("eviction_cancellation_on_drop")
827 4 : .await
828 4 : .unwrap();
829 4 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
830 4 : let (tenant, ctx) = h.load().await;
831 4 :
832 4 : let timeline = tenant
833 4 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
834 4 : .await
835 4 : .unwrap();
836 4 :
837 4 : {
838 4 : // create_test_timeline wrote us one layer, write another
839 4 : let mut writer = timeline.writer().await;
840 4 : writer
841 4 : .put(
842 4 : pageserver_api::key::Key::from_i128(5),
843 4 : Lsn(0x20),
844 4 : &Value::Image(Bytes::from_static(b"this does not matter either")),
845 4 : &ctx,
846 4 : )
847 4 : .await
848 4 : .unwrap();
849 4 :
850 4 : writer.finish_write(Lsn(0x20));
851 4 : }
852 4 :
853 4 : timeline.freeze_and_flush().await.unwrap();
854 4 :
855 4 : // wait for the upload to complete so our Arc::strong_count assertion holds
856 4 : timeline.remote_client.wait_completion().await.unwrap();
857 4 :
858 4 : let (evicted_layer, not_evicted) = {
859 4 : let mut layers = {
860 4 : let mut guard = timeline.layers.write().await;
861 4 : let layers = guard.likely_resident_layers().cloned().collect::<Vec<_>>();
862 4 : // remove the layers from layermap
863 4 : guard.open_mut().unwrap().finish_gc_timeline(&layers);
864 4 :
865 4 : layers
866 4 : };
867 4 :
868 4 : assert_eq!(layers.len(), 2);
869 4 :
870 4 : (layers.pop().unwrap(), layers.pop().unwrap())
871 4 : };
872 4 :
873 4 : let victims = [(evicted_layer, true), (not_evicted, false)];
874 4 :
875 12 : for (victim, evict) in victims {
876 8 : let resident = victim.keep_resident().await.unwrap();
877 8 : drop(victim);
878 8 :
879 8 : assert_eq!(Arc::strong_count(&resident.owner.0), 1);
880 4 :
881 8 : if evict {
882 4 : let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
883 4 :
884 4 : // drive the future to await on the status channel, and then drop it
885 4 : tokio::time::timeout(ADVANCE, evict_and_wait)
886 4 : .await
887 4 : .expect_err("should had been a timeout since we are holding the layer resident");
888 4 : }
889 4 :
890 4 : // 1 == we only evict one of the layers
891 8 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
892 4 :
893 8 : drop(resident);
894 8 :
895 8 : // run any spawned
896 8 : tokio::time::sleep(ADVANCE).await;
897 4 :
898 8 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
899 4 :
900 8 : assert_eq!(
901 8 : 1,
902 8 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
903 8 : );
904 4 : }
905 4 : }
906 :
907 : /// A test case to remind you the cost of these structures. You can bump the size limit
908 : /// below if it is really necessary to add more fields to the structures.
909 : #[test]
910 : #[cfg(target_arch = "x86_64")]
911 4 : fn layer_size() {
912 4 : assert_eq!(size_of::<LayerAccessStats>(), 8);
913 4 : assert_eq!(size_of::<PersistentLayerDesc>(), 104);
914 4 : assert_eq!(size_of::<LayerInner>(), 296);
915 : // it also has the utf8 path
916 4 : }
917 :
918 : struct SpawnBlockingPoolHelper {
919 : awaited_by_spawn_blocking_tasks: Completion,
920 : blocking_tasks: JoinSet<()>,
921 : }
922 :
923 : impl SpawnBlockingPoolHelper {
924 : /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
925 : /// release is called.
926 : ///
927 : /// In the tests this can be used to ensure something cannot be started on the target runtimes
928 : /// spawn_blocking pool.
929 : ///
930 : /// This should be no issue nowdays, because nextest runs each test in it's own process.
931 4 : async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
932 4 : let default_max_blocking_threads = 512;
933 4 :
934 4 : Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
935 4 : }
936 :
937 52 : async fn consume_all_spawn_blocking_threads0(
938 52 : handle: &tokio::runtime::Handle,
939 52 : threads: usize,
940 52 : ) -> Self {
941 52 : assert_ne!(threads, 0);
942 :
943 52 : let (completion, barrier) = completion::channel();
944 52 : let (started, starts_completed) = completion::channel();
945 52 :
946 52 : let mut blocking_tasks = JoinSet::new();
947 52 :
948 14360 : for _ in 0..threads {
949 14360 : let barrier = barrier.clone();
950 14360 : let started = started.clone();
951 14360 : blocking_tasks.spawn_blocking_on(
952 14360 : move || {
953 14360 : drop(started);
954 14360 : tokio::runtime::Handle::current().block_on(barrier.wait());
955 14360 : },
956 14360 : handle,
957 14360 : );
958 14360 : }
959 :
960 52 : drop(started);
961 52 :
962 52 : starts_completed.wait().await;
963 :
964 52 : drop(barrier);
965 52 :
966 52 : tracing::trace!("consumed all threads");
967 :
968 52 : SpawnBlockingPoolHelper {
969 52 : awaited_by_spawn_blocking_tasks: completion,
970 52 : blocking_tasks,
971 52 : }
972 52 : }
973 :
974 : /// Release all previously blocked spawn_blocking threads
975 52 : async fn release(self) {
976 52 : let SpawnBlockingPoolHelper {
977 52 : awaited_by_spawn_blocking_tasks,
978 52 : mut blocking_tasks,
979 52 : } = self;
980 52 :
981 52 : drop(awaited_by_spawn_blocking_tasks);
982 :
983 14412 : while let Some(res) = blocking_tasks.join_next().await {
984 14360 : res.expect("none of the tasks should had panicked");
985 14360 : }
986 :
987 52 : tracing::trace!("released all threads");
988 52 : }
989 :
990 : /// In the tests it is used as an easy way of making sure something scheduled on the target
991 : /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
992 : /// before our tasks have a chance to schedule and complete.
993 24 : async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
994 24 : Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
995 24 : }
996 :
997 44 : async fn consume_and_release_all_of_spawn_blocking_threads0(
998 44 : handle: &tokio::runtime::Handle,
999 44 : threads: usize,
1000 44 : ) {
1001 44 : Self::consume_all_spawn_blocking_threads0(handle, threads)
1002 44 : .await
1003 44 : .release()
1004 44 : .await
1005 44 : }
1006 : }
1007 :
1008 : #[test]
1009 4 : fn spawn_blocking_pool_helper_actually_works() {
1010 4 : // create a custom runtime for which we know and control how many blocking threads it has
1011 4 : //
1012 4 : // because the amount is not configurable for our helper, expect the same amount as
1013 4 : // BACKGROUND_RUNTIME using the tokio defaults would have.
1014 4 : let rt = tokio::runtime::Builder::new_current_thread()
1015 4 : .max_blocking_threads(1)
1016 4 : .enable_all()
1017 4 : .build()
1018 4 : .unwrap();
1019 4 :
1020 4 : let handle = rt.handle();
1021 4 :
1022 4 : rt.block_on(async move {
1023 : // this will not return until all threads are spun up and actually executing the code
1024 : // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
1025 4 : let consumed =
1026 4 : SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
1027 :
1028 4 : println!("consumed");
1029 4 :
1030 4 : let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
1031 4 : // this will not get to run before we release
1032 4 : }));
1033 4 :
1034 4 : println!("spawned");
1035 4 :
1036 4 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
1037 4 : .await
1038 4 : .expect_err("the task should not have gotten to run yet");
1039 4 :
1040 4 : println!("tried to join");
1041 4 :
1042 4 : consumed.release().await;
1043 :
1044 4 : println!("released");
1045 4 :
1046 4 : tokio::time::timeout(std::time::Duration::from_secs(1), jh)
1047 4 : .await
1048 4 : .expect("no timeout")
1049 4 : .expect("no join error");
1050 4 :
1051 4 : println!("joined");
1052 4 : });
1053 4 : }
1054 :
1055 : /// Drop the low bits from a time, to emulate the precision loss in LayerAccessStats
1056 16 : fn lowres_time(hires: SystemTime) -> SystemTime {
1057 16 : let ts = hires.duration_since(UNIX_EPOCH).unwrap().as_secs();
1058 16 : UNIX_EPOCH + Duration::from_secs(ts)
1059 16 : }
1060 :
1061 : #[test]
1062 4 : fn access_stats() {
1063 4 : let access_stats = LayerAccessStats::default();
1064 4 : // Default is visible
1065 4 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
1066 :
1067 4 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1068 4 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
1069 4 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1070 4 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
1071 :
1072 4 : let rtime = UNIX_EPOCH + Duration::from_secs(2000000000);
1073 4 : access_stats.record_residence_event_at(rtime);
1074 4 : assert_eq!(access_stats.latest_activity(), lowres_time(rtime));
1075 :
1076 4 : let atime = UNIX_EPOCH + Duration::from_secs(2100000000);
1077 4 : access_stats.record_access_at(atime);
1078 4 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1079 :
1080 : // Setting visibility doesn't clobber access time
1081 4 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1082 4 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1083 4 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1084 4 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1085 :
1086 : // Recording access implicitly makes layer visible, if it wasn't already
1087 4 : let atime = UNIX_EPOCH + Duration::from_secs(2200000000);
1088 4 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1089 4 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
1090 4 : assert!(access_stats.record_access_at(atime));
1091 4 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1092 4 : assert!(!access_stats.record_access_at(atime));
1093 4 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1094 4 : }
1095 :
1096 : #[test]
1097 4 : fn access_stats_2038() {
1098 4 : // The access stats structure uses a timestamp representation that will run out
1099 4 : // of bits in 2038. One year before that, this unit test will start failing.
1100 4 :
1101 4 : let one_year_from_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
1102 4 : + Duration::from_secs(3600 * 24 * 365);
1103 4 :
1104 4 : assert!(one_year_from_now.as_secs() < (2 << 31));
1105 4 : }
|