Line data Source code
1 : use std::time::UNIX_EPOCH;
2 :
3 : use pageserver_api::key::{CONTROLFILE_KEY, Key};
4 : use tokio::task::JoinSet;
5 : use utils::completion::{self, Completion};
6 : use utils::id::TimelineId;
7 :
8 : use super::failpoints::{Failpoint, FailpointKind};
9 : use super::*;
10 : use crate::context::DownloadBehavior;
11 : use crate::tenant::harness::{TenantHarness, test_img};
12 : use crate::tenant::storage_layer::{IoConcurrency, LayerVisibilityHint};
13 : use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
14 :
15 : /// Used in tests to advance a future to wanted await point, and not futher.
16 : const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
17 :
18 : /// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
19 : /// timeout uses to advance futures.
20 : const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
21 :
22 : /// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
23 : #[tokio::test]
24 1 : async fn smoke_test() {
25 1 : let handle = tokio::runtime::Handle::current();
26 1 :
27 1 : let h = TenantHarness::create("smoke_test").await.unwrap();
28 1 : let span = h.span();
29 1 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
30 1 : let (tenant, ctx) = h.load().await;
31 1 : let io_concurrency = IoConcurrency::spawn_for_test();
32 1 :
33 1 : let image_layers = vec![(
34 1 : Lsn(0x40),
35 1 : vec![(
36 1 : Key::from_hex("620000000033333333444444445500000000").unwrap(),
37 1 : test_img("foo"),
38 1 : )],
39 1 : )];
40 1 :
41 1 : // Create a test timeline with one real layer, and one synthetic test layer. The synthetic
42 1 : // one is only there so that we can GC the real one without leaving the timeline's metadata
43 1 : // empty, which is an illegal state (see [`IndexPart::validate`]).
44 1 : let timeline = tenant
45 1 : .create_test_timeline_with_layers(
46 1 : TimelineId::generate(),
47 1 : Lsn(0x10),
48 1 : 14,
49 1 : &ctx,
50 1 : Default::default(), // in-memory layers
51 1 : Default::default(),
52 1 : image_layers,
53 1 : Lsn(0x100),
54 1 : )
55 1 : .await
56 1 : .unwrap();
57 1 : let ctx = &ctx.with_scope_timeline(&timeline);
58 1 :
59 1 : // Grab one of the timeline's layers to exercise in the test, and the other layer that is just
60 1 : // there to avoid the timeline being illegally empty
61 1 : let (layer, dummy_layer) = {
62 1 : let mut layers = {
63 1 : let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
64 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
65 1 : };
66 1 :
67 1 : assert_eq!(layers.len(), 2);
68 1 :
69 2 : layers.sort_by_key(|l| l.layer_desc().get_key_range().start);
70 1 : let synthetic_layer = layers.pop().unwrap();
71 1 : let real_layer = layers.pop().unwrap();
72 1 : tracing::info!(
73 1 : "real_layer={:?} ({}), synthetic_layer={:?} ({})",
74 0 : real_layer,
75 0 : real_layer.layer_desc().file_size,
76 0 : synthetic_layer,
77 0 : synthetic_layer.layer_desc().file_size
78 1 : );
79 1 : (real_layer, synthetic_layer)
80 1 : };
81 1 :
82 1 : // all layers created at pageserver are like `layer`, initialized with strong
83 1 : // Arc<DownloadedLayer>.
84 1 :
85 1 : let controlfile_keyspace = KeySpace {
86 1 : ranges: vec![CONTROLFILE_KEY..CONTROLFILE_KEY.next()],
87 1 : };
88 1 :
89 1 : let img_before = {
90 1 : let mut data = ValuesReconstructState::new(io_concurrency.clone());
91 1 : layer
92 1 : .get_values_reconstruct_data(
93 1 : controlfile_keyspace.clone(),
94 1 : Lsn(0x10)..Lsn(0x11),
95 1 : &mut data,
96 1 : ctx,
97 1 : )
98 1 : .await
99 1 : .unwrap();
100 1 :
101 1 : data.keys
102 1 : .remove(&CONTROLFILE_KEY)
103 1 : .expect("must be present")
104 1 : .collect_pending_ios()
105 1 : .await
106 1 : .expect("must not error")
107 1 : .img
108 1 : .take()
109 1 : .expect("tenant harness writes the control file")
110 1 : };
111 1 :
112 1 : // important part is evicting the layer, which can be done when there are no more ResidentLayer
113 1 : // instances -- there currently are none, only two `Layer` values, one in the layermap and on
114 1 : // in scope.
115 1 : layer.evict_and_wait(FOREVER).await.unwrap();
116 1 :
117 1 : // double-evict returns an error, which is valid if both eviction_task and disk usage based
118 1 : // eviction would both evict the same layer at the same time.
119 1 :
120 1 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
121 1 : assert!(matches!(e, EvictionError::NotFound));
122 1 :
123 1 : let dl_ctx = RequestContextBuilder::from(ctx)
124 1 : .download_behavior(DownloadBehavior::Download)
125 1 : .attached_child();
126 1 :
127 1 : // on accesses when the layer is evicted, it will automatically be downloaded.
128 1 : let img_after = {
129 1 : let mut data = ValuesReconstructState::new(io_concurrency.clone());
130 1 : layer
131 1 : .get_values_reconstruct_data(
132 1 : controlfile_keyspace.clone(),
133 1 : Lsn(0x10)..Lsn(0x11),
134 1 : &mut data,
135 1 : &dl_ctx,
136 1 : )
137 1 : .instrument(download_span.clone())
138 1 : .await
139 1 : .unwrap();
140 1 : data.keys
141 1 : .remove(&CONTROLFILE_KEY)
142 1 : .expect("must be present")
143 1 : .collect_pending_ios()
144 1 : .await
145 1 : .expect("must not error")
146 1 : .img
147 1 : .take()
148 1 : .expect("tenant harness writes the control file")
149 1 : };
150 1 :
151 1 : assert_eq!(img_before, img_after);
152 1 :
153 1 : // evict_and_wait can timeout, but it doesn't cancel the evicting itself
154 1 : //
155 1 : // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
156 1 : // artificially slow it down.
157 1 : let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
158 1 :
159 1 : match layer
160 1 : .evict_and_wait(std::time::Duration::ZERO)
161 1 : .await
162 1 : .unwrap_err()
163 1 : {
164 1 : EvictionError::Timeout => {
165 1 : // expected, but note that the eviction is "still ongoing"
166 1 : helper.release().await;
167 1 : // exhaust spawn_blocking pool to ensure it is now complete
168 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
169 1 : .await;
170 1 : }
171 1 : other => unreachable!("{other:?}"),
172 1 : }
173 1 :
174 1 : // only way to query if a layer is resident is to acquire a ResidentLayer instance.
175 1 : // Layer::keep_resident never downloads, but it might initialize if the layer file is found
176 1 : // downloaded locally.
177 1 : let none = layer.keep_resident().await;
178 1 : assert!(
179 1 : none.is_none(),
180 1 : "Expected none, because eviction removed the local file, found: {none:?}"
181 1 : );
182 1 :
183 1 : // plain downloading is rarely needed
184 1 : layer
185 1 : .download_and_keep_resident(&dl_ctx)
186 1 : .instrument(download_span)
187 1 : .await
188 1 : .unwrap();
189 1 :
190 1 : // last important part is deletion on drop: gc and compaction use it for compacted L0 layers
191 1 : // or fully garbage collected layers. deletion means deleting the local file, and scheduling a
192 1 : // deletion of the already unlinked from index_part.json remote file.
193 1 : //
194 1 : // marking a layer to be deleted on drop is irreversible; there is no technical reason against
195 1 : // reversiblity, but currently it is not needed so it is not provided.
196 1 : layer.delete_on_drop();
197 1 :
198 1 : let path = layer.local_path().to_owned();
199 1 :
200 1 : // wait_drop produces an unconnected to Layer future which will resolve when the
201 1 : // LayerInner::drop has completed.
202 1 : let mut wait_drop = std::pin::pin!(layer.wait_drop());
203 1 :
204 1 : // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
205 1 : // until here
206 1 : tokio::time::pause();
207 1 : tokio::time::timeout(ADVANCE, &mut wait_drop)
208 1 : .await
209 1 : .expect_err("should had timed out because two strong references exist");
210 1 :
211 1 : tokio::fs::metadata(&path)
212 1 : .await
213 1 : .expect("the local layer file still exists");
214 1 :
215 1 : let rtc = &timeline.remote_client;
216 1 :
217 1 : // Simulate GC removing our test layer.
218 1 : {
219 1 : let mut g = timeline.layers.write(LayerManagerLockHolder::Testing).await;
220 1 :
221 1 : let layers = &[layer];
222 1 : g.open_mut().unwrap().finish_gc_timeline(layers);
223 1 :
224 1 : // this just updates the remote_physical_size for demonstration purposes
225 1 : rtc.schedule_gc_update(layers).unwrap();
226 1 : }
227 1 :
228 1 : // when strong references are dropped, the file is deleted and remote deletion is scheduled
229 1 : wait_drop.await;
230 1 :
231 1 : let e = tokio::fs::metadata(&path)
232 1 : .await
233 1 : .expect_err("the local file is deleted");
234 1 : assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
235 1 :
236 1 : rtc.wait_completion().await.unwrap();
237 1 :
238 1 : assert_eq!(
239 1 : rtc.get_remote_physical_size(),
240 1 : dummy_layer.metadata().file_size
241 1 : );
242 1 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
243 1 : }
244 :
245 : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
246 : /// time. Now both of them complete per Arc drop semantics.
247 : #[tokio::test(start_paused = true)]
248 1 : async fn evict_and_wait_on_wanted_deleted() {
249 1 : // this is the runtime on which Layer spawns the blocking tasks on
250 1 : let handle = tokio::runtime::Handle::current();
251 1 :
252 1 : let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
253 1 : .await
254 1 : .unwrap();
255 1 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
256 1 : let (tenant, ctx) = h.load().await;
257 1 :
258 1 : let timeline = tenant
259 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
260 1 : .await
261 1 : .unwrap();
262 1 :
263 1 : let layer = {
264 1 : let mut layers = {
265 1 : let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
266 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
267 1 : };
268 1 :
269 1 : assert_eq!(layers.len(), 1);
270 1 :
271 1 : layers.swap_remove(0)
272 1 : };
273 1 :
274 1 : // setup done
275 1 :
276 1 : let resident = layer.keep_resident().await.unwrap();
277 1 :
278 1 : {
279 1 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
280 1 :
281 1 : // drive the future to await on the status channel
282 1 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
283 1 : .await
284 1 : .expect_err("should had been a timeout since we are holding the layer resident");
285 1 :
286 1 : layer.delete_on_drop();
287 1 :
288 1 : drop(resident);
289 1 :
290 1 : // make sure the eviction task gets to run
291 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
292 1 :
293 1 : let resident = layer.keep_resident().await;
294 1 : assert!(
295 1 : resident.is_none(),
296 1 : "keep_resident should not have re-initialized: {resident:?}"
297 1 : );
298 1 :
299 1 : evict_and_wait
300 1 : .await
301 1 : .expect("evict_and_wait should had succeeded");
302 1 :
303 1 : // works as intended
304 1 : }
305 1 :
306 1 : // assert that once we remove the `layer` from the layer map and drop our reference,
307 1 : // the deletion of the layer in remote_storage happens.
308 1 : {
309 1 : let mut layers = timeline.layers.write(LayerManagerLockHolder::Testing).await;
310 1 : layers.open_mut().unwrap().finish_gc_timeline(&[layer]);
311 1 : }
312 1 :
313 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
314 1 :
315 1 : assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
316 1 : assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
317 1 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
318 1 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
319 1 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
320 1 : }
321 :
322 : /// This test ensures we are able to read the layer while the layer eviction has been
323 : /// started but not completed.
324 : #[test]
325 1 : fn read_wins_pending_eviction() {
326 1 : let rt = tokio::runtime::Builder::new_current_thread()
327 1 : .max_blocking_threads(1)
328 1 : .enable_all()
329 1 : .start_paused(true)
330 1 : .build()
331 1 : .unwrap();
332 1 :
333 1 : rt.block_on(async move {
334 1 : // this is the runtime on which Layer spawns the blocking tasks on
335 1 : let handle = tokio::runtime::Handle::current();
336 1 : let h = TenantHarness::create("read_wins_pending_eviction")
337 1 : .await
338 1 : .unwrap();
339 1 : let (tenant, ctx) = h.load().await;
340 1 : let span = h.span();
341 1 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
342 :
343 1 : let timeline = tenant
344 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
345 1 : .await
346 1 : .unwrap();
347 1 : let ctx = ctx.with_scope_timeline(&timeline);
348 :
349 1 : let layer = {
350 1 : let mut layers = {
351 1 : let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
352 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
353 1 : };
354 1 :
355 1 : assert_eq!(layers.len(), 1);
356 :
357 1 : layers.swap_remove(0)
358 : };
359 :
360 : // setup done
361 :
362 1 : let resident = layer.keep_resident().await.unwrap();
363 1 :
364 1 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
365 1 :
366 1 : // drive the future to await on the status channel
367 1 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
368 1 : .await
369 1 : .expect_err("should had been a timeout since we are holding the layer resident");
370 1 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
371 :
372 1 : let (completion, barrier) = utils::completion::channel();
373 1 : let (arrival, arrived_at_barrier) = utils::completion::channel();
374 1 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
375 1 : Some(arrival),
376 1 : barrier,
377 1 : ));
378 1 :
379 1 : // now the eviction cannot proceed because the threads are consumed while completion exists
380 1 : drop(resident);
381 1 : arrived_at_barrier.wait().await;
382 1 : assert!(!layer.is_likely_resident());
383 :
384 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
385 1 : layer
386 1 : .0
387 1 : .get_or_maybe_download(false, &ctx)
388 1 : .instrument(download_span)
389 1 : .await
390 1 : .expect("should had reinitialized without downloading");
391 1 :
392 1 : assert!(layer.is_likely_resident());
393 :
394 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
395 1 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
396 1 : .await
397 1 : .expect("no timeout, because get_or_maybe_download re-initialized")
398 1 : .expect_err("eviction should not have succeeded because re-initialized");
399 1 :
400 1 : // works as intended: evictions lose to "downloads"
401 1 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
402 1 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
403 :
404 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
405 : // because of a failpoint
406 1 : assert_eq!(
407 1 : 0,
408 1 : LAYER_IMPL_METRICS
409 1 : .cancelled_evictions
410 1 : .values()
411 9 : .map(|ctr| ctr.get())
412 1 : .sum::<u64>()
413 1 : );
414 :
415 1 : drop(completion);
416 1 :
417 1 : tokio::time::sleep(ADVANCE).await;
418 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
419 1 : .await;
420 :
421 1 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
422 :
423 : // now we finally can observe the original eviction failing
424 : // it would had been possible to observe it earlier, but here it is guaranteed to have
425 : // happened.
426 1 : assert_eq!(
427 1 : 1,
428 1 : LAYER_IMPL_METRICS
429 1 : .cancelled_evictions
430 1 : .values()
431 9 : .map(|ctr| ctr.get())
432 1 : .sum::<u64>()
433 1 : );
434 :
435 1 : assert_eq!(
436 1 : 1,
437 1 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
438 1 : );
439 :
440 1 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
441 1 : });
442 1 : }
443 :
444 : /// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
445 : #[test]
446 1 : fn multiple_pending_evictions_in_order() {
447 1 : let name = "multiple_pending_evictions_in_order";
448 1 : let in_order = true;
449 1 : multiple_pending_evictions_scenario(name, in_order);
450 1 : }
451 :
452 : /// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
453 : #[test]
454 1 : fn multiple_pending_evictions_out_of_order() {
455 1 : let name = "multiple_pending_evictions_out_of_order";
456 1 : let in_order = false;
457 1 : multiple_pending_evictions_scenario(name, in_order);
458 1 : }
459 :
460 2 : fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
461 2 : let rt = tokio::runtime::Builder::new_current_thread()
462 2 : .max_blocking_threads(1)
463 2 : .enable_all()
464 2 : .start_paused(true)
465 2 : .build()
466 2 : .unwrap();
467 2 :
468 2 : rt.block_on(async move {
469 2 : // this is the runtime on which Layer spawns the blocking tasks on
470 2 : let handle = tokio::runtime::Handle::current();
471 2 : let h = TenantHarness::create(name).await.unwrap();
472 2 : let (tenant, ctx) = h.load().await;
473 2 : let span = h.span();
474 2 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
475 :
476 2 : let timeline = tenant
477 2 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
478 2 : .await
479 2 : .unwrap();
480 2 : let ctx = ctx.with_scope_timeline(&timeline);
481 :
482 2 : let layer = {
483 2 : let mut layers = {
484 2 : let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
485 2 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
486 2 : };
487 2 :
488 2 : assert_eq!(layers.len(), 1);
489 :
490 2 : layers.swap_remove(0)
491 : };
492 :
493 : // setup done
494 :
495 2 : let resident = layer.keep_resident().await.unwrap();
496 2 :
497 2 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
498 2 :
499 2 : // drive the future to await on the status channel
500 2 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
501 2 : .await
502 2 : .expect_err("should had been a timeout since we are holding the layer resident");
503 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
504 :
505 2 : let (completion1, barrier) = utils::completion::channel();
506 2 : let mut completion1 = Some(completion1);
507 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
508 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
509 2 : Some(arrival),
510 2 : barrier,
511 2 : ));
512 2 :
513 2 : // now the eviction cannot proceed because we are simulating arbitrary long delay for the
514 2 : // eviction task start.
515 2 : drop(resident);
516 2 : assert!(!layer.is_likely_resident());
517 :
518 2 : arrived_at_barrier.wait().await;
519 :
520 : // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
521 2 : layer
522 2 : .0
523 2 : .get_or_maybe_download(false, &ctx)
524 2 : .instrument(download_span)
525 2 : .await
526 2 : .expect("should had reinitialized without downloading");
527 2 :
528 2 : assert!(layer.is_likely_resident());
529 :
530 : // reinitialization notifies of new resident status, which should error out all evict_and_wait
531 2 : let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
532 2 : .await
533 2 : .expect("no timeout, because get_or_maybe_download re-initialized")
534 2 : .expect_err("eviction should not have succeeded because re-initialized");
535 2 :
536 2 : // works as intended: evictions lose to "downloads"
537 2 : assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
538 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
539 :
540 : // this is not wrong: the eviction is technically still "on the way" as it's still queued
541 : // because of a failpoint
542 2 : assert_eq!(
543 2 : 0,
544 2 : LAYER_IMPL_METRICS
545 2 : .cancelled_evictions
546 2 : .values()
547 18 : .map(|ctr| ctr.get())
548 2 : .sum::<u64>()
549 2 : );
550 :
551 2 : assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
552 :
553 : // configure another failpoint for the second eviction -- evictions are per initialization,
554 : // so now that we've reinitialized the inner, we get to run two of them at the same time.
555 2 : let (completion2, barrier) = utils::completion::channel();
556 2 : let (arrival, arrived_at_barrier) = utils::completion::channel();
557 2 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
558 2 : Some(arrival),
559 2 : barrier,
560 2 : ));
561 2 :
562 2 : let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
563 2 :
564 2 : // advance to the wait on the queue
565 2 : tokio::time::timeout(ADVANCE, &mut second_eviction)
566 2 : .await
567 2 : .expect_err("timeout because failpoint is blocking");
568 2 :
569 2 : arrived_at_barrier.wait().await;
570 :
571 2 : assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
572 :
573 2 : let mut release_earlier_eviction = |expected_reason| {
574 2 : assert_eq!(
575 2 : 0,
576 2 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
577 2 : );
578 :
579 2 : drop(completion1.take().unwrap());
580 2 :
581 2 : let handle = &handle;
582 :
583 2 : async move {
584 2 : tokio::time::sleep(ADVANCE).await;
585 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
586 2 : handle, 1,
587 2 : )
588 2 : .await;
589 :
590 2 : assert_eq!(
591 2 : 1,
592 2 : LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
593 2 : );
594 2 : }
595 2 : };
596 :
597 2 : if in_order {
598 1 : release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
599 1 : }
600 :
601 : // release the later eviction which is for the current version
602 2 : drop(completion2);
603 2 : tokio::time::sleep(ADVANCE).await;
604 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
605 2 : .await;
606 :
607 2 : if !in_order {
608 1 : release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
609 1 : }
610 :
611 2 : tokio::time::timeout(ADVANCE, &mut second_eviction)
612 2 : .await
613 2 : .expect("eviction goes through now that spawn_blocking is unclogged")
614 2 : .expect("eviction should succeed, because version matches");
615 2 :
616 2 : assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
617 :
618 : // ensure the cancelled are unchanged
619 2 : assert_eq!(
620 2 : 1,
621 2 : LAYER_IMPL_METRICS
622 2 : .cancelled_evictions
623 2 : .values()
624 18 : .map(|ctr| ctr.get())
625 2 : .sum::<u64>()
626 2 : );
627 :
628 2 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
629 2 : });
630 2 : }
631 :
632 : /// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
633 : /// a `Layer::keep_resident` call.
634 : ///
635 : /// This matters because cancelling the eviction would leave us in a state where the file is on
636 : /// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
637 : /// have non-repairing `Layer::is_likely_resident`.
638 : #[tokio::test(start_paused = true)]
639 1 : async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
640 1 : let handle = tokio::runtime::Handle::current();
641 1 : let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
642 1 : .await
643 1 : .unwrap();
644 1 : let (tenant, ctx) = h.load().await;
645 1 :
646 1 : let timeline = tenant
647 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
648 1 : .await
649 1 : .unwrap();
650 1 : let ctx = ctx.with_scope_timeline(&timeline);
651 1 :
652 1 : // This test does downloads
653 1 : let ctx = RequestContextBuilder::from(&ctx)
654 1 : .download_behavior(DownloadBehavior::Download)
655 1 : .attached_child();
656 1 :
657 1 : let layer = {
658 1 : let mut layers = {
659 1 : let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
660 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
661 1 : };
662 1 :
663 1 : assert_eq!(layers.len(), 1);
664 1 :
665 1 : layers.swap_remove(0)
666 1 : };
667 1 :
668 1 : // this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
669 1 : // Err) at the right time as in "during" the `LayerInner::needs_download`.
670 1 : layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
671 1 :
672 1 : let (completion, barrier) = utils::completion::channel();
673 1 : let (arrival, arrived_at_barrier) = utils::completion::channel();
674 1 :
675 1 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
676 1 : Some(arrival),
677 1 : barrier,
678 1 : ));
679 1 :
680 1 : tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
681 1 : .await
682 1 : .expect_err("should had advanced to waiting on channel");
683 1 :
684 1 : arrived_at_barrier.wait().await;
685 1 :
686 1 : // simulate a cancelled read which is cancelled before it gets to re-initialize
687 1 : let e = layer
688 1 : .0
689 1 : .get_or_maybe_download(false, &ctx)
690 1 : .await
691 1 : .unwrap_err();
692 1 : assert!(
693 1 : matches!(
694 1 : e,
695 1 : DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
696 1 : ),
697 1 : "{e:?}"
698 1 : );
699 1 :
700 1 : assert!(
701 1 : layer.0.needs_download().await.unwrap().is_none(),
702 1 : "file is still on disk"
703 1 : );
704 1 :
705 1 : // release the eviction task
706 1 : drop(completion);
707 1 : tokio::time::sleep(ADVANCE).await;
708 1 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
709 1 :
710 1 : // failpoint is still enabled, but it is not hit
711 1 : let e = layer
712 1 : .0
713 1 : .get_or_maybe_download(false, &ctx)
714 1 : .await
715 1 : .unwrap_err();
716 1 : assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
717 1 :
718 1 : // failpoint is not counted as cancellation either
719 1 : assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
720 1 : }
721 :
722 : #[tokio::test(start_paused = true)]
723 1 : async fn evict_and_wait_does_not_wait_for_download() {
724 1 : // let handle = tokio::runtime::Handle::current();
725 1 : let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
726 1 : .await
727 1 : .unwrap();
728 1 : let (tenant, ctx) = h.load().await;
729 1 : let span = h.span();
730 1 : let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
731 1 :
732 1 : let timeline = tenant
733 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
734 1 : .await
735 1 : .unwrap();
736 1 : let ctx = ctx.with_scope_timeline(&timeline);
737 1 :
738 1 : // This test does downloads
739 1 : let ctx = RequestContextBuilder::from(&ctx)
740 1 : .download_behavior(DownloadBehavior::Download)
741 1 : .attached_child();
742 1 :
743 1 : let layer = {
744 1 : let mut layers = {
745 1 : let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
746 1 : layers.likely_resident_layers().cloned().collect::<Vec<_>>()
747 1 : };
748 1 :
749 1 : assert_eq!(layers.len(), 1);
750 1 :
751 1 : layers.swap_remove(0)
752 1 : };
753 1 :
754 1 : // kind of forced setup: start an eviction but do not allow it progress until we are
755 1 : // downloading
756 1 : let (eviction_can_continue, barrier) = utils::completion::channel();
757 1 : let (arrival, eviction_arrived) = utils::completion::channel();
758 1 : layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
759 1 : Some(arrival),
760 1 : barrier,
761 1 : ));
762 1 :
763 1 : let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
764 1 :
765 1 : // use this once-awaited other_evict to synchronize with the eviction
766 1 : let other_evict = layer.evict_and_wait(FOREVER);
767 1 :
768 1 : tokio::time::timeout(ADVANCE, &mut evict_and_wait)
769 1 : .await
770 1 : .expect_err("should had advanced");
771 1 : eviction_arrived.wait().await;
772 1 : drop(eviction_can_continue);
773 1 : other_evict.await.unwrap();
774 1 :
775 1 : // now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
776 1 : assert!(!layer.is_likely_resident());
777 1 :
778 1 : // following new evict_and_wait will fail until we've completed the download
779 1 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
780 1 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
781 1 :
782 1 : let (download_can_continue, barrier) = utils::completion::channel();
783 1 : let (arrival, _download_arrived) = utils::completion::channel();
784 1 : layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
785 1 :
786 1 : let mut download = std::pin::pin!(
787 1 : layer
788 1 : .0
789 1 : .get_or_maybe_download(true, &ctx)
790 1 : .instrument(download_span)
791 1 : );
792 1 :
793 1 : assert!(
794 1 : !layer.is_likely_resident(),
795 1 : "during download layer is evicted"
796 1 : );
797 1 :
798 1 : tokio::time::timeout(ADVANCE, &mut download)
799 1 : .await
800 1 : .expect_err("should had timed out because of failpoint");
801 1 :
802 1 : // now we finally get to continue, and because the latest state is downloading, we deduce that
803 1 : // original eviction succeeded
804 1 : evict_and_wait.await.unwrap();
805 1 :
806 1 : // however a new evict_and_wait will fail
807 1 : let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
808 1 : assert!(matches!(e, EvictionError::NotFound), "{e:?}");
809 1 :
810 1 : assert!(!layer.is_likely_resident());
811 1 :
812 1 : drop(download_can_continue);
813 1 : download.await.expect("download should had succeeded");
814 1 : assert!(layer.is_likely_resident());
815 1 :
816 1 : // only now can we evict
817 1 : layer.evict_and_wait(FOREVER).await.unwrap();
818 1 : }
819 :
820 : /// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
821 : /// which is the last value.
822 : ///
823 : /// Also checks that the same does not happen on a non-evicted layer (regression test).
824 : #[tokio::test(start_paused = true)]
825 1 : async fn eviction_cancellation_on_drop() {
826 1 : use bytes::Bytes;
827 1 : use pageserver_api::value::Value;
828 1 :
829 1 : // this is the runtime on which Layer spawns the blocking tasks on
830 1 : let handle = tokio::runtime::Handle::current();
831 1 :
832 1 : let h = TenantHarness::create("eviction_cancellation_on_drop")
833 1 : .await
834 1 : .unwrap();
835 1 : utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
836 1 : let (tenant, ctx) = h.load().await;
837 1 :
838 1 : let timeline = tenant
839 1 : .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
840 1 : .await
841 1 : .unwrap();
842 1 :
843 1 : {
844 1 : // create_test_timeline wrote us one layer, write another
845 1 : let mut writer = timeline.writer().await;
846 1 : writer
847 1 : .put(
848 1 : pageserver_api::key::Key::from_i128(5),
849 1 : Lsn(0x20),
850 1 : &Value::Image(Bytes::from_static(b"this does not matter either")),
851 1 : &ctx,
852 1 : )
853 1 : .await
854 1 : .unwrap();
855 1 :
856 1 : writer.finish_write(Lsn(0x20));
857 1 : }
858 1 :
859 1 : timeline.freeze_and_flush().await.unwrap();
860 1 :
861 1 : // wait for the upload to complete so our Arc::strong_count assertion holds
862 1 : timeline.remote_client.wait_completion().await.unwrap();
863 1 :
864 1 : let (evicted_layer, not_evicted) = {
865 1 : let mut layers = {
866 1 : let mut guard = timeline.layers.write(LayerManagerLockHolder::Testing).await;
867 1 : let layers = guard.likely_resident_layers().cloned().collect::<Vec<_>>();
868 1 : // remove the layers from layermap
869 1 : guard.open_mut().unwrap().finish_gc_timeline(&layers);
870 1 :
871 1 : layers
872 1 : };
873 1 :
874 1 : assert_eq!(layers.len(), 2);
875 1 :
876 1 : (layers.pop().unwrap(), layers.pop().unwrap())
877 1 : };
878 1 :
879 1 : let victims = [(evicted_layer, true), (not_evicted, false)];
880 1 :
881 3 : for (victim, evict) in victims {
882 2 : let resident = victim.keep_resident().await.unwrap();
883 2 : drop(victim);
884 2 :
885 2 : assert_eq!(Arc::strong_count(&resident.owner.0), 1);
886 1 :
887 2 : if evict {
888 1 : let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
889 1 :
890 1 : // drive the future to await on the status channel, and then drop it
891 1 : tokio::time::timeout(ADVANCE, evict_and_wait)
892 1 : .await
893 1 : .expect_err("should had been a timeout since we are holding the layer resident");
894 1 : }
895 1 :
896 1 : // 1 == we only evict one of the layers
897 2 : assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
898 1 :
899 2 : drop(resident);
900 2 :
901 2 : // run any spawned
902 2 : tokio::time::sleep(ADVANCE).await;
903 1 :
904 2 : SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
905 1 :
906 2 : assert_eq!(
907 2 : 1,
908 2 : LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
909 2 : );
910 1 : }
911 1 : }
912 :
913 : /// A test case to remind you the cost of these structures. You can bump the size limit
914 : /// below if it is really necessary to add more fields to the structures.
915 : #[test]
916 : #[cfg(target_arch = "x86_64")]
917 1 : fn layer_size() {
918 1 : assert_eq!(size_of::<LayerAccessStats>(), 8);
919 1 : assert_eq!(size_of::<PersistentLayerDesc>(), 104);
920 1 : assert_eq!(size_of::<LayerInner>(), 296);
921 : // it also has the utf8 path
922 1 : }
923 :
924 : struct SpawnBlockingPoolHelper {
925 : awaited_by_spawn_blocking_tasks: Completion,
926 : blocking_tasks: JoinSet<()>,
927 : }
928 :
929 : impl SpawnBlockingPoolHelper {
930 : /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
931 : /// release is called.
932 : ///
933 : /// In the tests this can be used to ensure something cannot be started on the target runtimes
934 : /// spawn_blocking pool.
935 : ///
936 : /// This should be no issue nowdays, because nextest runs each test in it's own process.
937 1 : async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
938 1 : let default_max_blocking_threads = 512;
939 1 :
940 1 : Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
941 1 : }
942 :
943 13 : async fn consume_all_spawn_blocking_threads0(
944 13 : handle: &tokio::runtime::Handle,
945 13 : threads: usize,
946 13 : ) -> Self {
947 13 : assert_ne!(threads, 0);
948 :
949 13 : let (completion, barrier) = completion::channel();
950 13 : let (started, starts_completed) = completion::channel();
951 13 :
952 13 : let mut blocking_tasks = JoinSet::new();
953 13 :
954 3590 : for _ in 0..threads {
955 3590 : let barrier = barrier.clone();
956 3590 : let started = started.clone();
957 3590 : blocking_tasks.spawn_blocking_on(
958 3590 : move || {
959 3590 : drop(started);
960 3590 : tokio::runtime::Handle::current().block_on(barrier.wait());
961 3590 : },
962 3590 : handle,
963 3590 : );
964 3590 : }
965 :
966 13 : drop(started);
967 13 :
968 13 : starts_completed.wait().await;
969 :
970 13 : drop(barrier);
971 13 :
972 13 : tracing::trace!("consumed all threads");
973 :
974 13 : SpawnBlockingPoolHelper {
975 13 : awaited_by_spawn_blocking_tasks: completion,
976 13 : blocking_tasks,
977 13 : }
978 13 : }
979 :
980 : /// Release all previously blocked spawn_blocking threads
981 13 : async fn release(self) {
982 13 : let SpawnBlockingPoolHelper {
983 13 : awaited_by_spawn_blocking_tasks,
984 13 : mut blocking_tasks,
985 13 : } = self;
986 13 :
987 13 : drop(awaited_by_spawn_blocking_tasks);
988 :
989 3603 : while let Some(res) = blocking_tasks.join_next().await {
990 3590 : res.expect("none of the tasks should had panicked");
991 3590 : }
992 :
993 13 : tracing::trace!("released all threads");
994 13 : }
995 :
996 : /// In the tests it is used as an easy way of making sure something scheduled on the target
997 : /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
998 : /// before our tasks have a chance to schedule and complete.
999 6 : async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
1000 6 : Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
1001 6 : }
1002 :
1003 11 : async fn consume_and_release_all_of_spawn_blocking_threads0(
1004 11 : handle: &tokio::runtime::Handle,
1005 11 : threads: usize,
1006 11 : ) {
1007 11 : Self::consume_all_spawn_blocking_threads0(handle, threads)
1008 11 : .await
1009 11 : .release()
1010 11 : .await
1011 11 : }
1012 : }
1013 :
1014 : #[test]
1015 1 : fn spawn_blocking_pool_helper_actually_works() {
1016 1 : // create a custom runtime for which we know and control how many blocking threads it has
1017 1 : //
1018 1 : // because the amount is not configurable for our helper, expect the same amount as
1019 1 : // BACKGROUND_RUNTIME using the tokio defaults would have.
1020 1 : let rt = tokio::runtime::Builder::new_current_thread()
1021 1 : .max_blocking_threads(1)
1022 1 : .enable_all()
1023 1 : .build()
1024 1 : .unwrap();
1025 1 :
1026 1 : let handle = rt.handle();
1027 1 :
1028 1 : rt.block_on(async move {
1029 : // this will not return until all threads are spun up and actually executing the code
1030 : // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
1031 1 : let consumed =
1032 1 : SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
1033 :
1034 1 : println!("consumed");
1035 1 :
1036 1 : let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
1037 1 : // this will not get to run before we release
1038 1 : }));
1039 1 :
1040 1 : println!("spawned");
1041 1 :
1042 1 : tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
1043 1 : .await
1044 1 : .expect_err("the task should not have gotten to run yet");
1045 1 :
1046 1 : println!("tried to join");
1047 1 :
1048 1 : consumed.release().await;
1049 :
1050 1 : println!("released");
1051 1 :
1052 1 : tokio::time::timeout(std::time::Duration::from_secs(1), jh)
1053 1 : .await
1054 1 : .expect("no timeout")
1055 1 : .expect("no join error");
1056 1 :
1057 1 : println!("joined");
1058 1 : });
1059 1 : }
1060 :
1061 : /// Drop the low bits from a time, to emulate the precision loss in LayerAccessStats
1062 4 : fn lowres_time(hires: SystemTime) -> SystemTime {
1063 4 : let ts = hires.duration_since(UNIX_EPOCH).unwrap().as_secs();
1064 4 : UNIX_EPOCH + Duration::from_secs(ts)
1065 4 : }
1066 :
1067 : #[test]
1068 1 : fn access_stats() {
1069 1 : let access_stats = LayerAccessStats::default();
1070 1 : // Default is visible
1071 1 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
1072 :
1073 1 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1074 1 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
1075 1 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1076 1 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
1077 :
1078 1 : let rtime = UNIX_EPOCH + Duration::from_secs(2000000000);
1079 1 : access_stats.record_residence_event_at(rtime);
1080 1 : assert_eq!(access_stats.latest_activity(), lowres_time(rtime));
1081 :
1082 1 : let atime = UNIX_EPOCH + Duration::from_secs(2100000000);
1083 1 : access_stats.record_access_at(atime);
1084 1 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1085 :
1086 : // Setting visibility doesn't clobber access time
1087 1 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1088 1 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1089 1 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1090 1 : assert_eq!(access_stats.latest_activity(), lowres_time(atime));
1091 :
1092 : // Recording access implicitly makes layer visible, if it wasn't already
1093 1 : let atime = UNIX_EPOCH + Duration::from_secs(2200000000);
1094 1 : access_stats.set_visibility(LayerVisibilityHint::Covered);
1095 1 : assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
1096 1 : assert!(access_stats.record_access_at(atime));
1097 1 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1098 1 : assert!(!access_stats.record_access_at(atime));
1099 1 : access_stats.set_visibility(LayerVisibilityHint::Visible);
1100 1 : }
1101 :
1102 : #[test]
1103 1 : fn access_stats_2038() {
1104 1 : // The access stats structure uses a timestamp representation that will run out
1105 1 : // of bits in 2038. One year before that, this unit test will start failing.
1106 1 :
1107 1 : let one_year_from_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
1108 1 : + Duration::from_secs(3600 * 24 * 365);
1109 1 :
1110 1 : assert!(one_year_from_now.as_secs() < (2 << 31));
1111 1 : }
|