LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer/layer - tests.rs (source / functions) Coverage Total Hit
Test: 050dd70dd490b28fffe527eae9fb8a1222b5c59c.info Lines: 99.5 % 812 808
Test Date: 2024-06-25 21:28:46 Functions: 100.0 % 41 41

            Line data    Source code
       1              : use pageserver_api::key::CONTROLFILE_KEY;
       2              : use tokio::task::JoinSet;
       3              : use utils::{
       4              :     completion::{self, Completion},
       5              :     id::TimelineId,
       6              : };
       7              : 
       8              : use super::failpoints::{Failpoint, FailpointKind};
       9              : use super::*;
      10              : use crate::context::DownloadBehavior;
      11              : use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
      12              : 
      13              : /// Used in tests to advance a future to wanted await point, and not futher.
      14              : const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
      15              : 
      16              : /// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
      17              : /// timeout uses to advance futures.
      18              : const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
      19              : 
      20              : /// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
      21              : #[tokio::test]
      22            2 : async fn smoke_test() {
      23            2 :     let handle = tokio::runtime::Handle::current();
      24            2 : 
      25            2 :     let h = TenantHarness::create("smoke_test").unwrap();
      26            2 :     let span = h.span();
      27            2 :     let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
      28            8 :     let (tenant, _) = h.load().await;
      29            2 : 
      30            2 :     let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
      31            2 : 
      32            2 :     let timeline = tenant
      33            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
      34            6 :         .await
      35            2 :         .unwrap();
      36            2 : 
      37            2 :     let layer = {
      38            2 :         let mut layers = {
      39            2 :             let layers = timeline.layers.read().await;
      40            2 :             layers.likely_resident_layers().collect::<Vec<_>>()
      41            2 :         };
      42            2 : 
      43            2 :         assert_eq!(layers.len(), 1);
      44            2 : 
      45            2 :         layers.swap_remove(0)
      46            2 :     };
      47            2 : 
      48            2 :     // all layers created at pageserver are like `layer`, initialized with strong
      49            2 :     // Arc<DownloadedLayer>.
      50            2 : 
      51            2 :     let img_before = {
      52            2 :         let mut data = ValueReconstructState::default();
      53            2 :         layer
      54            2 :             .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
      55            4 :             .await
      56            2 :             .unwrap();
      57            2 :         data.img
      58            2 :             .take()
      59            2 :             .expect("tenant harness writes the control file")
      60            2 :     };
      61            2 : 
      62            2 :     // important part is evicting the layer, which can be done when there are no more ResidentLayer
      63            2 :     // instances -- there currently are none, only two `Layer` values, one in the layermap and on
      64            2 :     // in scope.
      65            2 :     layer.evict_and_wait(FOREVER).await.unwrap();
      66            2 : 
      67            2 :     // double-evict returns an error, which is valid if both eviction_task and disk usage based
      68            2 :     // eviction would both evict the same layer at the same time.
      69            2 : 
      70            2 :     let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
      71            2 :     assert!(matches!(e, EvictionError::NotFound));
      72            2 : 
      73            2 :     // on accesses when the layer is evicted, it will automatically be downloaded.
      74            2 :     let img_after = {
      75            2 :         let mut data = ValueReconstructState::default();
      76            2 :         layer
      77            2 :             .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
      78            2 :             .instrument(download_span.clone())
      79            9 :             .await
      80            2 :             .unwrap();
      81            2 :         data.img.take().unwrap()
      82            2 :     };
      83            2 : 
      84            2 :     assert_eq!(img_before, img_after);
      85            2 : 
      86            2 :     // evict_and_wait can timeout, but it doesn't cancel the evicting itself
      87            2 :     //
      88            2 :     // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
      89            2 :     // artificially slow it down.
      90            2 :     let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
      91            2 : 
      92            2 :     match layer
      93            2 :         .evict_and_wait(std::time::Duration::ZERO)
      94            2 :         .await
      95            2 :         .unwrap_err()
      96            2 :     {
      97            2 :         EvictionError::Timeout => {
      98            2 :             // expected, but note that the eviction is "still ongoing"
      99           16 :             helper.release().await;
     100            2 :             // exhaust spawn_blocking pool to ensure it is now complete
     101            2 :             SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
     102            9 :                 .await;
     103            2 :         }
     104            2 :         other => unreachable!("{other:?}"),
     105            2 :     }
     106            2 : 
     107            2 :     // only way to query if a layer is resident is to acquire a ResidentLayer instance.
     108            2 :     // Layer::keep_resident never downloads, but it might initialize if the layer file is found
     109            2 :     // downloaded locally.
     110            2 :     let none = layer.keep_resident().await;
     111            2 :     assert!(
     112            2 :         none.is_none(),
     113            2 :         "Expected none, because eviction removed the local file, found: {none:?}"
     114            2 :     );
     115            2 : 
     116            2 :     // plain downloading is rarely needed
     117            2 :     layer
     118            2 :         .download_and_keep_resident()
     119            2 :         .instrument(download_span)
     120            4 :         .await
     121            2 :         .unwrap();
     122            2 : 
     123            2 :     // last important part is deletion on drop: gc and compaction use it for compacted L0 layers
     124            2 :     // or fully garbage collected layers. deletion means deleting the local file, and scheduling a
     125            2 :     // deletion of the already unlinked from index_part.json remote file.
     126            2 :     //
     127            2 :     // marking a layer to be deleted on drop is irreversible; there is no technical reason against
     128            2 :     // reversiblity, but currently it is not needed so it is not provided.
     129            2 :     layer.delete_on_drop();
     130            2 : 
     131            2 :     let path = layer.local_path().to_owned();
     132            2 : 
     133            2 :     // wait_drop produces an unconnected to Layer future which will resolve when the
     134            2 :     // LayerInner::drop has completed.
     135            2 :     let mut wait_drop = std::pin::pin!(layer.wait_drop());
     136            2 : 
     137            2 :     // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
     138            2 :     // until here
     139            2 :     tokio::time::pause();
     140            2 :     tokio::time::timeout(ADVANCE, &mut wait_drop)
     141            2 :         .await
     142            2 :         .expect_err("should had timed out because two strong references exist");
     143            2 : 
     144            2 :     tokio::fs::metadata(&path)
     145            2 :         .await
     146            2 :         .expect("the local layer file still exists");
     147            2 : 
     148            2 :     let rtc = &timeline.remote_client;
     149            2 : 
     150            2 :     {
     151            2 :         let layers = &[layer];
     152            2 :         let mut g = timeline.layers.write().await;
     153            2 :         g.finish_gc_timeline(layers);
     154            2 :         // this just updates the remote_physical_size for demonstration purposes
     155            2 :         rtc.schedule_gc_update(layers).unwrap();
     156            2 :     }
     157            2 : 
     158            2 :     // when strong references are dropped, the file is deleted and remote deletion is scheduled
     159            2 :     wait_drop.await;
     160            2 : 
     161            2 :     let e = tokio::fs::metadata(&path)
     162            2 :         .await
     163            2 :         .expect_err("the local file is deleted");
     164            2 :     assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
     165            2 : 
     166            2 :     rtc.wait_completion().await.unwrap();
     167            2 : 
     168            2 :     assert_eq!(rtc.get_remote_physical_size(), 0);
     169            2 :     assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     170            2 : }
     171              : 
     172              : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
     173              : /// time. Now both of them complete per Arc drop semantics.
     174              : #[tokio::test(start_paused = true)]
     175            2 : async fn evict_and_wait_on_wanted_deleted() {
     176            2 :     // this is the runtime on which Layer spawns the blocking tasks on
     177            2 :     let handle = tokio::runtime::Handle::current();
     178            2 : 
     179            2 :     let h = TenantHarness::create("evict_and_wait_on_wanted_deleted").unwrap();
     180            2 :     utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
     181            8 :     let (tenant, ctx) = h.load().await;
     182            2 : 
     183            2 :     let timeline = tenant
     184            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     185            6 :         .await
     186            2 :         .unwrap();
     187            2 : 
     188            2 :     let layer = {
     189            2 :         let mut layers = {
     190            2 :             let layers = timeline.layers.read().await;
     191            2 :             layers.likely_resident_layers().collect::<Vec<_>>()
     192            2 :         };
     193            2 : 
     194            2 :         assert_eq!(layers.len(), 1);
     195            2 : 
     196            2 :         layers.swap_remove(0)
     197            2 :     };
     198            2 : 
     199            2 :     // setup done
     200            2 : 
     201            2 :     let resident = layer.keep_resident().await.unwrap();
     202            2 : 
     203            2 :     {
     204            2 :         let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     205            2 : 
     206            2 :         // drive the future to await on the status channel
     207            2 :         tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     208            2 :             .await
     209            2 :             .expect_err("should had been a timeout since we are holding the layer resident");
     210            2 : 
     211            2 :         layer.delete_on_drop();
     212            2 : 
     213            2 :         drop(resident);
     214            2 : 
     215            2 :         // make sure the eviction task gets to run
     216           56 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     217            2 : 
     218            2 :         let resident = layer.keep_resident().await;
     219            2 :         assert!(
     220            2 :             resident.is_none(),
     221            2 :             "keep_resident should not have re-initialized: {resident:?}"
     222            2 :         );
     223            2 : 
     224            2 :         evict_and_wait
     225            2 :             .await
     226            2 :             .expect("evict_and_wait should had succeeded");
     227            2 : 
     228            2 :         // works as intended
     229            2 :     }
     230            2 : 
     231            2 :     // assert that once we remove the `layer` from the layer map and drop our reference,
     232            2 :     // the deletion of the layer in remote_storage happens.
     233            2 :     {
     234            2 :         let mut layers = timeline.layers.write().await;
     235            2 :         layers.finish_gc_timeline(&[layer]);
     236            2 :     }
     237            2 : 
     238            8 :     SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     239            2 : 
     240            2 :     assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
     241            2 :     assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
     242            2 :     assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     243            2 :     assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
     244            2 :     assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     245            2 : }
     246              : 
     247              : /// This test ensures we are able to read the layer while the layer eviction has been
     248              : /// started but not completed.
     249              : #[test]
     250            2 : fn read_wins_pending_eviction() {
     251            2 :     let rt = tokio::runtime::Builder::new_current_thread()
     252            2 :         .max_blocking_threads(1)
     253            2 :         .enable_all()
     254            2 :         .start_paused(true)
     255            2 :         .build()
     256            2 :         .unwrap();
     257            2 : 
     258            2 :     rt.block_on(async move {
     259            2 :         // this is the runtime on which Layer spawns the blocking tasks on
     260            2 :         let handle = tokio::runtime::Handle::current();
     261            2 :         let h = TenantHarness::create("read_wins_pending_eviction").unwrap();
     262            8 :         let (tenant, ctx) = h.load().await;
     263            2 :         let span = h.span();
     264            2 :         let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
     265              : 
     266            2 :         let timeline = tenant
     267            2 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     268            6 :             .await
     269            2 :             .unwrap();
     270              : 
     271            2 :         let layer = {
     272            2 :             let mut layers = {
     273            2 :                 let layers = timeline.layers.read().await;
     274            2 :                 layers.likely_resident_layers().collect::<Vec<_>>()
     275            2 :             };
     276            2 : 
     277            2 :             assert_eq!(layers.len(), 1);
     278              : 
     279            2 :             layers.swap_remove(0)
     280              :         };
     281              : 
     282              :         // setup done
     283              : 
     284            2 :         let resident = layer.keep_resident().await.unwrap();
     285            2 : 
     286            2 :         let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     287            2 : 
     288            2 :         // drive the future to await on the status channel
     289            2 :         tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     290            2 :             .await
     291            2 :             .expect_err("should had been a timeout since we are holding the layer resident");
     292            2 :         assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     293              : 
     294            2 :         let (completion, barrier) = utils::completion::channel();
     295            2 :         let (arrival, arrived_at_barrier) = utils::completion::channel();
     296            2 :         layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     297            2 :             Some(arrival),
     298            2 :             barrier,
     299            2 :         ));
     300            2 : 
     301            2 :         // now the eviction cannot proceed because the threads are consumed while completion exists
     302            2 :         drop(resident);
     303            2 :         arrived_at_barrier.wait().await;
     304            2 :         assert!(!layer.is_likely_resident());
     305              : 
     306              :         // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
     307            2 :         layer
     308            2 :             .0
     309            2 :             .get_or_maybe_download(false, None)
     310            2 :             .instrument(download_span)
     311            1 :             .await
     312            2 :             .expect("should had reinitialized without downloading");
     313            2 : 
     314            2 :         assert!(layer.is_likely_resident());
     315              : 
     316              :         // reinitialization notifies of new resident status, which should error out all evict_and_wait
     317            2 :         let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     318            0 :             .await
     319            2 :             .expect("no timeout, because get_or_maybe_download re-initialized")
     320            2 :             .expect_err("eviction should not have succeeded because re-initialized");
     321            2 : 
     322            2 :         // works as intended: evictions lose to "downloads"
     323            2 :         assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
     324            2 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     325              : 
     326              :         // this is not wrong: the eviction is technically still "on the way" as it's still queued
     327              :         // because of a failpoint
     328            2 :         assert_eq!(
     329            2 :             0,
     330            2 :             LAYER_IMPL_METRICS
     331            2 :                 .cancelled_evictions
     332            2 :                 .values()
     333           18 :                 .map(|ctr| ctr.get())
     334            2 :                 .sum::<u64>()
     335            2 :         );
     336              : 
     337            2 :         drop(completion);
     338            2 : 
     339            4 :         tokio::time::sleep(ADVANCE).await;
     340            2 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
     341            2 :             .await;
     342              : 
     343            2 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     344              : 
     345              :         // now we finally can observe the original eviction failing
     346              :         // it would had been possible to observe it earlier, but here it is guaranteed to have
     347              :         // happened.
     348            2 :         assert_eq!(
     349            2 :             1,
     350            2 :             LAYER_IMPL_METRICS
     351            2 :                 .cancelled_evictions
     352            2 :                 .values()
     353           18 :                 .map(|ctr| ctr.get())
     354            2 :                 .sum::<u64>()
     355            2 :         );
     356              : 
     357            2 :         assert_eq!(
     358            2 :             1,
     359            2 :             LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
     360            2 :         );
     361              : 
     362            2 :         assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     363            2 :     });
     364            2 : }
     365              : 
     366              : /// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
     367              : #[test]
     368            2 : fn multiple_pending_evictions_in_order() {
     369            2 :     let name = "multiple_pending_evictions_in_order";
     370            2 :     let in_order = true;
     371            2 :     multiple_pending_evictions_scenario(name, in_order);
     372            2 : }
     373              : 
     374              : /// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
     375              : #[test]
     376            2 : fn multiple_pending_evictions_out_of_order() {
     377            2 :     let name = "multiple_pending_evictions_out_of_order";
     378            2 :     let in_order = false;
     379            2 :     multiple_pending_evictions_scenario(name, in_order);
     380            2 : }
     381              : 
     382            4 : fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
     383            4 :     let rt = tokio::runtime::Builder::new_current_thread()
     384            4 :         .max_blocking_threads(1)
     385            4 :         .enable_all()
     386            4 :         .start_paused(true)
     387            4 :         .build()
     388            4 :         .unwrap();
     389            4 : 
     390            4 :     rt.block_on(async move {
     391            4 :         // this is the runtime on which Layer spawns the blocking tasks on
     392            4 :         let handle = tokio::runtime::Handle::current();
     393            4 :         let h = TenantHarness::create(name).unwrap();
     394           16 :         let (tenant, ctx) = h.load().await;
     395            4 :         let span = h.span();
     396            4 :         let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
     397              : 
     398            4 :         let timeline = tenant
     399            4 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     400           12 :             .await
     401            4 :             .unwrap();
     402              : 
     403            4 :         let layer = {
     404            4 :             let mut layers = {
     405            4 :                 let layers = timeline.layers.read().await;
     406            4 :                 layers.likely_resident_layers().collect::<Vec<_>>()
     407            4 :             };
     408            4 : 
     409            4 :             assert_eq!(layers.len(), 1);
     410              : 
     411            4 :             layers.swap_remove(0)
     412              :         };
     413              : 
     414              :         // setup done
     415              : 
     416            4 :         let resident = layer.keep_resident().await.unwrap();
     417            4 : 
     418            4 :         let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     419            4 : 
     420            4 :         // drive the future to await on the status channel
     421            4 :         tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     422            4 :             .await
     423            4 :             .expect_err("should had been a timeout since we are holding the layer resident");
     424            4 :         assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     425              : 
     426            4 :         let (completion1, barrier) = utils::completion::channel();
     427            4 :         let mut completion1 = Some(completion1);
     428            4 :         let (arrival, arrived_at_barrier) = utils::completion::channel();
     429            4 :         layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     430            4 :             Some(arrival),
     431            4 :             barrier,
     432            4 :         ));
     433            4 : 
     434            4 :         // now the eviction cannot proceed because we are simulating arbitrary long delay for the
     435            4 :         // eviction task start.
     436            4 :         drop(resident);
     437            4 :         assert!(!layer.is_likely_resident());
     438              : 
     439            4 :         arrived_at_barrier.wait().await;
     440              : 
     441              :         // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
     442            4 :         layer
     443            4 :             .0
     444            4 :             .get_or_maybe_download(false, None)
     445            4 :             .instrument(download_span)
     446            4 :             .await
     447            4 :             .expect("should had reinitialized without downloading");
     448            4 : 
     449            4 :         assert!(layer.is_likely_resident());
     450              : 
     451              :         // reinitialization notifies of new resident status, which should error out all evict_and_wait
     452            4 :         let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     453            0 :             .await
     454            4 :             .expect("no timeout, because get_or_maybe_download re-initialized")
     455            4 :             .expect_err("eviction should not have succeeded because re-initialized");
     456            4 : 
     457            4 :         // works as intended: evictions lose to "downloads"
     458            4 :         assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
     459            4 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     460              : 
     461              :         // this is not wrong: the eviction is technically still "on the way" as it's still queued
     462              :         // because of a failpoint
     463            4 :         assert_eq!(
     464            4 :             0,
     465            4 :             LAYER_IMPL_METRICS
     466            4 :                 .cancelled_evictions
     467            4 :                 .values()
     468           36 :                 .map(|ctr| ctr.get())
     469            4 :                 .sum::<u64>()
     470            4 :         );
     471              : 
     472            4 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     473              : 
     474              :         // configure another failpoint for the second eviction -- evictions are per initialization,
     475              :         // so now that we've reinitialized the inner, we get to run two of them at the same time.
     476            4 :         let (completion2, barrier) = utils::completion::channel();
     477            4 :         let (arrival, arrived_at_barrier) = utils::completion::channel();
     478            4 :         layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     479            4 :             Some(arrival),
     480            4 :             barrier,
     481            4 :         ));
     482            4 : 
     483            4 :         let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
     484            4 : 
     485            4 :         // advance to the wait on the queue
     486            4 :         tokio::time::timeout(ADVANCE, &mut second_eviction)
     487            8 :             .await
     488            4 :             .expect_err("timeout because failpoint is blocking");
     489            4 : 
     490            4 :         arrived_at_barrier.wait().await;
     491              : 
     492            4 :         assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
     493              : 
     494            4 :         let mut release_earlier_eviction = |expected_reason| {
     495            4 :             assert_eq!(
     496            4 :                 0,
     497            4 :                 LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
     498            4 :             );
     499              : 
     500            4 :             drop(completion1.take().unwrap());
     501            4 : 
     502            4 :             let handle = &handle;
     503              : 
     504            4 :             async move {
     505            4 :                 tokio::time::sleep(ADVANCE).await;
     506            4 :                 SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
     507            4 :                     handle, 1,
     508            4 :                 )
     509            5 :                 .await;
     510              : 
     511            4 :                 assert_eq!(
     512            4 :                     1,
     513            4 :                     LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
     514            4 :                 );
     515            4 :             }
     516            4 :         };
     517              : 
     518            4 :         if in_order {
     519            5 :             release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
     520            2 :         }
     521              : 
     522              :         // release the later eviction which is for the current version
     523            4 :         drop(completion2);
     524            8 :         tokio::time::sleep(ADVANCE).await;
     525            4 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
     526            6 :             .await;
     527              : 
     528            4 :         if !in_order {
     529            4 :             release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
     530            2 :         }
     531              : 
     532            4 :         tokio::time::timeout(ADVANCE, &mut second_eviction)
     533            0 :             .await
     534            4 :             .expect("eviction goes through now that spawn_blocking is unclogged")
     535            4 :             .expect("eviction should succeed, because version matches");
     536            4 : 
     537            4 :         assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
     538              : 
     539              :         // ensure the cancelled are unchanged
     540            4 :         assert_eq!(
     541            4 :             1,
     542            4 :             LAYER_IMPL_METRICS
     543            4 :                 .cancelled_evictions
     544            4 :                 .values()
     545           36 :                 .map(|ctr| ctr.get())
     546            4 :                 .sum::<u64>()
     547            4 :         );
     548              : 
     549            4 :         assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     550            4 :     });
     551            4 : }
     552              : 
     553              : /// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
     554              : /// a `Layer::keep_resident` call.
     555              : ///
     556              : /// This matters because cancelling the eviction would leave us in a state where the file is on
     557              : /// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
     558              : /// have non-repairing `Layer::is_likely_resident`.
     559              : #[tokio::test(start_paused = true)]
     560            2 : async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
     561            2 :     let handle = tokio::runtime::Handle::current();
     562            2 :     let h =
     563            2 :         TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction").unwrap();
     564            8 :     let (tenant, ctx) = h.load().await;
     565            2 : 
     566            2 :     let timeline = tenant
     567            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     568            6 :         .await
     569            2 :         .unwrap();
     570            2 : 
     571            2 :     let layer = {
     572            2 :         let mut layers = {
     573            2 :             let layers = timeline.layers.read().await;
     574            2 :             layers.likely_resident_layers().collect::<Vec<_>>()
     575            2 :         };
     576            2 : 
     577            2 :         assert_eq!(layers.len(), 1);
     578            2 : 
     579            2 :         layers.swap_remove(0)
     580            2 :     };
     581            2 : 
     582            2 :     // this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
     583            2 :     // Err) at the right time as in "during" the `LayerInner::needs_download`.
     584            2 :     layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
     585            2 : 
     586            2 :     let (completion, barrier) = utils::completion::channel();
     587            2 :     let (arrival, arrived_at_barrier) = utils::completion::channel();
     588            2 : 
     589            2 :     layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     590            2 :         Some(arrival),
     591            2 :         barrier,
     592            2 :     ));
     593            2 : 
     594            2 :     tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
     595            2 :         .await
     596            2 :         .expect_err("should had advanced to waiting on channel");
     597            2 : 
     598            2 :     arrived_at_barrier.wait().await;
     599            2 : 
     600            2 :     // simulate a cancelled read which is cancelled before it gets to re-initialize
     601            2 :     let e = layer
     602            2 :         .0
     603            2 :         .get_or_maybe_download(false, None)
     604            2 :         .await
     605            2 :         .unwrap_err();
     606            2 :     assert!(
     607            2 :         matches!(
     608            2 :             e,
     609            2 :             DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
     610            2 :         ),
     611            2 :         "{e:?}"
     612            2 :     );
     613            2 : 
     614            2 :     assert!(
     615            2 :         layer.0.needs_download().await.unwrap().is_none(),
     616            2 :         "file is still on disk"
     617            2 :     );
     618            2 : 
     619            2 :     // release the eviction task
     620            2 :     drop(completion);
     621            2 :     tokio::time::sleep(ADVANCE).await;
     622           15 :     SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     623            2 : 
     624            2 :     // failpoint is still enabled, but it is not hit
     625            2 :     let e = layer
     626            2 :         .0
     627            2 :         .get_or_maybe_download(false, None)
     628            3 :         .await
     629            2 :         .unwrap_err();
     630            2 :     assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
     631            2 : 
     632            2 :     // failpoint is not counted as cancellation either
     633            2 :     assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     634            2 : }
     635              : 
     636              : #[tokio::test(start_paused = true)]
     637            2 : async fn evict_and_wait_does_not_wait_for_download() {
     638            2 :     // let handle = tokio::runtime::Handle::current();
     639            2 :     let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download").unwrap();
     640            8 :     let (tenant, ctx) = h.load().await;
     641            2 :     let span = h.span();
     642            2 :     let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
     643            2 : 
     644            2 :     let timeline = tenant
     645            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     646            6 :         .await
     647            2 :         .unwrap();
     648            2 : 
     649            2 :     let layer = {
     650            2 :         let mut layers = {
     651            2 :             let layers = timeline.layers.read().await;
     652            2 :             layers.likely_resident_layers().collect::<Vec<_>>()
     653            2 :         };
     654            2 : 
     655            2 :         assert_eq!(layers.len(), 1);
     656            2 : 
     657            2 :         layers.swap_remove(0)
     658            2 :     };
     659            2 : 
     660            2 :     // kind of forced setup: start an eviction but do not allow it progress until we are
     661            2 :     // downloading
     662            2 :     let (eviction_can_continue, barrier) = utils::completion::channel();
     663            2 :     let (arrival, eviction_arrived) = utils::completion::channel();
     664            2 :     layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     665            2 :         Some(arrival),
     666            2 :         barrier,
     667            2 :     ));
     668            2 : 
     669            2 :     let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     670            2 : 
     671            2 :     // use this once-awaited other_evict to synchronize with the eviction
     672            2 :     let other_evict = layer.evict_and_wait(FOREVER);
     673            2 : 
     674            2 :     tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     675            2 :         .await
     676            2 :         .expect_err("should had advanced");
     677            2 :     eviction_arrived.wait().await;
     678            2 :     drop(eviction_can_continue);
     679            2 :     other_evict.await.unwrap();
     680            2 : 
     681            2 :     // now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
     682            2 :     assert!(!layer.is_likely_resident());
     683            2 : 
     684            2 :     // following new evict_and_wait will fail until we've completed the download
     685            2 :     let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
     686            2 :     assert!(matches!(e, EvictionError::NotFound), "{e:?}");
     687            2 : 
     688            2 :     let (download_can_continue, barrier) = utils::completion::channel();
     689            2 :     let (arrival, _download_arrived) = utils::completion::channel();
     690            2 :     layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
     691            2 : 
     692            2 :     let mut download = std::pin::pin!(layer
     693            2 :         .0
     694            2 :         .get_or_maybe_download(true, None)
     695            2 :         .instrument(download_span));
     696            2 : 
     697            2 :     assert!(
     698            2 :         !layer.is_likely_resident(),
     699            2 :         "during download layer is evicted"
     700            2 :     );
     701            2 : 
     702            2 :     tokio::time::timeout(ADVANCE, &mut download)
     703            5 :         .await
     704            2 :         .expect_err("should had timed out because of failpoint");
     705            2 : 
     706            2 :     // now we finally get to continue, and because the latest state is downloading, we deduce that
     707            2 :     // original eviction succeeded
     708            2 :     evict_and_wait.await.unwrap();
     709            2 : 
     710            2 :     // however a new evict_and_wait will fail
     711            2 :     let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
     712            2 :     assert!(matches!(e, EvictionError::NotFound), "{e:?}");
     713            2 : 
     714            2 :     assert!(!layer.is_likely_resident());
     715            2 : 
     716            2 :     drop(download_can_continue);
     717            2 :     download.await.expect("download should had succeeded");
     718            2 :     assert!(layer.is_likely_resident());
     719            2 : 
     720            2 :     // only now can we evict
     721            2 :     layer.evict_and_wait(FOREVER).await.unwrap();
     722            2 : }
     723              : 
     724              : /// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
     725              : /// which is the last value.
     726              : ///
     727              : /// Also checks that the same does not happen on a non-evicted layer (regression test).
     728              : #[tokio::test(start_paused = true)]
     729            2 : async fn eviction_cancellation_on_drop() {
     730            2 :     use crate::repository::Value;
     731            2 :     use bytes::Bytes;
     732            2 : 
     733            2 :     // this is the runtime on which Layer spawns the blocking tasks on
     734            2 :     let handle = tokio::runtime::Handle::current();
     735            2 : 
     736            2 :     let h = TenantHarness::create("eviction_cancellation_on_drop").unwrap();
     737            2 :     utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
     738            8 :     let (tenant, ctx) = h.load().await;
     739            2 : 
     740            2 :     let timeline = tenant
     741            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     742            6 :         .await
     743            2 :         .unwrap();
     744            2 : 
     745            2 :     {
     746            2 :         // create_test_timeline wrote us one layer, write another
     747            2 :         let mut writer = timeline.writer().await;
     748            2 :         writer
     749            2 :             .put(
     750            2 :                 Key::from_i128(5),
     751            2 :                 Lsn(0x20),
     752            2 :                 &Value::Image(Bytes::from_static(b"this does not matter either")),
     753            2 :                 &ctx,
     754            2 :             )
     755            2 :             .await
     756            2 :             .unwrap();
     757            2 : 
     758            2 :         writer.finish_write(Lsn(0x20));
     759            2 :     }
     760            2 : 
     761            2 :     timeline.freeze_and_flush().await.unwrap();
     762            2 : 
     763            2 :     // wait for the upload to complete so our Arc::strong_count assertion holds
     764            2 :     timeline.remote_client.wait_completion().await.unwrap();
     765            2 : 
     766            2 :     let (evicted_layer, not_evicted) = {
     767            2 :         let mut layers = {
     768            2 :             let mut guard = timeline.layers.write().await;
     769            2 :             let layers = guard.likely_resident_layers().collect::<Vec<_>>();
     770            2 :             // remove the layers from layermap
     771            2 :             guard.finish_gc_timeline(&layers);
     772            2 : 
     773            2 :             layers
     774            2 :         };
     775            2 : 
     776            2 :         assert_eq!(layers.len(), 2);
     777            2 : 
     778            2 :         (layers.pop().unwrap(), layers.pop().unwrap())
     779            2 :     };
     780            2 : 
     781            2 :     let victims = [(evicted_layer, true), (not_evicted, false)];
     782            2 : 
     783            6 :     for (victim, evict) in victims {
     784            4 :         let resident = victim.keep_resident().await.unwrap();
     785            4 :         drop(victim);
     786            4 : 
     787            4 :         assert_eq!(Arc::strong_count(&resident.owner.0), 1);
     788            2 : 
     789            4 :         if evict {
     790            2 :             let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
     791            2 : 
     792            2 :             // drive the future to await on the status channel, and then drop it
     793            2 :             tokio::time::timeout(ADVANCE, evict_and_wait)
     794            2 :                 .await
     795            2 :                 .expect_err("should had been a timeout since we are holding the layer resident");
     796            2 :         }
     797            2 : 
     798            2 :         // 1 == we only evict one of the layers
     799            4 :         assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     800            2 : 
     801            4 :         drop(resident);
     802            4 : 
     803            4 :         // run any spawned
     804            5 :         tokio::time::sleep(ADVANCE).await;
     805            2 : 
     806           35 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     807            2 : 
     808            4 :         assert_eq!(
     809            4 :             1,
     810            4 :             LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
     811            4 :         );
     812            2 :     }
     813            2 : }
     814              : 
     815              : /// A test case to remind you the cost of these structures. You can bump the size limit
     816              : /// below if it is really necessary to add more fields to the structures.
     817              : #[test]
     818              : #[cfg(target_arch = "x86_64")]
     819            2 : fn layer_size() {
     820            2 :     assert_eq!(std::mem::size_of::<LayerAccessStats>(), 2040);
     821            2 :     assert_eq!(std::mem::size_of::<PersistentLayerDesc>(), 104);
     822            2 :     assert_eq!(std::mem::size_of::<LayerInner>(), 2344);
     823              :     // it also has the utf8 path
     824            2 : }
     825              : 
     826              : struct SpawnBlockingPoolHelper {
     827              :     awaited_by_spawn_blocking_tasks: Completion,
     828              :     blocking_tasks: JoinSet<()>,
     829              : }
     830              : 
     831              : impl SpawnBlockingPoolHelper {
     832              :     /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
     833              :     /// release is called.
     834              :     ///
     835              :     /// In the tests this can be used to ensure something cannot be started on the target runtimes
     836              :     /// spawn_blocking pool.
     837              :     ///
     838              :     /// This should be no issue nowdays, because nextest runs each test in it's own process.
     839            2 :     async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
     840            2 :         let default_max_blocking_threads = 512;
     841            2 : 
     842            2 :         Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
     843            2 :     }
     844              : 
     845           26 :     async fn consume_all_spawn_blocking_threads0(
     846           26 :         handle: &tokio::runtime::Handle,
     847           26 :         threads: usize,
     848           26 :     ) -> Self {
     849           26 :         assert_ne!(threads, 0);
     850              : 
     851           26 :         let (completion, barrier) = completion::channel();
     852           26 :         let (started, starts_completed) = completion::channel();
     853           26 : 
     854           26 :         let mut blocking_tasks = JoinSet::new();
     855           26 : 
     856         7180 :         for _ in 0..threads {
     857         7180 :             let barrier = barrier.clone();
     858         7180 :             let started = started.clone();
     859         7180 :             blocking_tasks.spawn_blocking_on(
     860         7180 :                 move || {
     861         7180 :                     drop(started);
     862         7180 :                     tokio::runtime::Handle::current().block_on(barrier.wait());
     863         7180 :                 },
     864         7180 :                 handle,
     865         7180 :             );
     866         7180 :         }
     867              : 
     868           26 :         drop(started);
     869           26 : 
     870           26 :         starts_completed.wait().await;
     871              : 
     872           26 :         drop(barrier);
     873           26 : 
     874           26 :         tracing::trace!("consumed all threads");
     875              : 
     876           26 :         SpawnBlockingPoolHelper {
     877           26 :             awaited_by_spawn_blocking_tasks: completion,
     878           26 :             blocking_tasks,
     879           26 :         }
     880           26 :     }
     881              : 
     882              :     /// Release all previously blocked spawn_blocking threads
     883           26 :     async fn release(self) {
     884           26 :         let SpawnBlockingPoolHelper {
     885           26 :             awaited_by_spawn_blocking_tasks,
     886           26 :             mut blocking_tasks,
     887           26 :         } = self;
     888           26 : 
     889           26 :         drop(awaited_by_spawn_blocking_tasks);
     890              : 
     891         7206 :         while let Some(res) = blocking_tasks.join_next().await {
     892         7180 :             res.expect("none of the tasks should had panicked");
     893         7180 :         }
     894              : 
     895           26 :         tracing::trace!("released all threads");
     896           26 :     }
     897              : 
     898              :     /// In the tests it is used as an easy way of making sure something scheduled on the target
     899              :     /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
     900              :     /// before our tasks have a chance to schedule and complete.
     901           12 :     async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
     902          123 :         Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
     903           12 :     }
     904              : 
     905           22 :     async fn consume_and_release_all_of_spawn_blocking_threads0(
     906           22 :         handle: &tokio::runtime::Handle,
     907           22 :         threads: usize,
     908           22 :     ) {
     909           22 :         Self::consume_all_spawn_blocking_threads0(handle, threads)
     910           15 :             .await
     911           22 :             .release()
     912          121 :             .await
     913           22 :     }
     914              : }
     915              : 
     916              : #[test]
     917            2 : fn spawn_blocking_pool_helper_actually_works() {
     918            2 :     // create a custom runtime for which we know and control how many blocking threads it has
     919            2 :     //
     920            2 :     // because the amount is not configurable for our helper, expect the same amount as
     921            2 :     // BACKGROUND_RUNTIME using the tokio defaults would have.
     922            2 :     let rt = tokio::runtime::Builder::new_current_thread()
     923            2 :         .max_blocking_threads(1)
     924            2 :         .enable_all()
     925            2 :         .build()
     926            2 :         .unwrap();
     927            2 : 
     928            2 :     let handle = rt.handle();
     929            2 : 
     930            2 :     rt.block_on(async move {
     931              :         // this will not return until all threads are spun up and actually executing the code
     932              :         // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
     933            2 :         let consumed =
     934            2 :             SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
     935              : 
     936            2 :         println!("consumed");
     937            2 : 
     938            2 :         let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
     939            2 :             // this will not get to run before we release
     940            2 :         }));
     941            2 : 
     942            2 :         println!("spawned");
     943            2 : 
     944            2 :         tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
     945            2 :             .await
     946            2 :             .expect_err("the task should not have gotten to run yet");
     947            2 : 
     948            2 :         println!("tried to join");
     949            2 : 
     950            2 :         consumed.release().await;
     951              : 
     952            2 :         println!("released");
     953            2 : 
     954            2 :         tokio::time::timeout(std::time::Duration::from_secs(1), jh)
     955            0 :             .await
     956            2 :             .expect("no timeout")
     957            2 :             .expect("no join error");
     958            2 : 
     959            2 :         println!("joined");
     960            2 :     });
     961            2 : }
        

Generated by: LCOV version 2.1-beta