LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer/layer - tests.rs (source / functions) Coverage Total Hit
Test: bb522999b2ee0ee028df22bb188d3a84170ba700.info Lines: 99.4 % 821 816
Test Date: 2024-07-21 16:16:09 Functions: 100.0 % 41 41

            Line data    Source code
       1              : use pageserver_api::key::CONTROLFILE_KEY;
       2              : use tokio::task::JoinSet;
       3              : use utils::{
       4              :     completion::{self, Completion},
       5              :     id::TimelineId,
       6              : };
       7              : 
       8              : use super::failpoints::{Failpoint, FailpointKind};
       9              : use super::*;
      10              : use crate::context::DownloadBehavior;
      11              : use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
      12              : 
      13              : /// Used in tests to advance a future to wanted await point, and not futher.
      14              : const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
      15              : 
      16              : /// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
      17              : /// timeout uses to advance futures.
      18              : const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
      19              : 
      20              : /// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
      21              : #[tokio::test]
      22            2 : async fn smoke_test() {
      23            2 :     let handle = tokio::runtime::Handle::current();
      24            2 : 
      25            2 :     let h = TenantHarness::create("smoke_test").await.unwrap();
      26            2 :     let span = h.span();
      27            2 :     let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
      28            8 :     let (tenant, _) = h.load().await;
      29            2 : 
      30            2 :     let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
      31            2 : 
      32            2 :     let timeline = tenant
      33            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
      34            6 :         .await
      35            2 :         .unwrap();
      36            2 : 
      37            2 :     let layer = {
      38            2 :         let mut layers = {
      39            2 :             let layers = timeline.layers.read().await;
      40            2 :             layers.likely_resident_layers().collect::<Vec<_>>()
      41            2 :         };
      42            2 : 
      43            2 :         assert_eq!(layers.len(), 1);
      44            2 : 
      45            2 :         layers.swap_remove(0)
      46            2 :     };
      47            2 : 
      48            2 :     // all layers created at pageserver are like `layer`, initialized with strong
      49            2 :     // Arc<DownloadedLayer>.
      50            2 : 
      51            2 :     let img_before = {
      52            2 :         let mut data = ValueReconstructState::default();
      53            2 :         layer
      54            2 :             .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
      55            4 :             .await
      56            2 :             .unwrap();
      57            2 :         data.img
      58            2 :             .take()
      59            2 :             .expect("tenant harness writes the control file")
      60            2 :     };
      61            2 : 
      62            2 :     // important part is evicting the layer, which can be done when there are no more ResidentLayer
      63            2 :     // instances -- there currently are none, only two `Layer` values, one in the layermap and on
      64            2 :     // in scope.
      65            2 :     layer.evict_and_wait(FOREVER).await.unwrap();
      66            2 : 
      67            2 :     // double-evict returns an error, which is valid if both eviction_task and disk usage based
      68            2 :     // eviction would both evict the same layer at the same time.
      69            2 : 
      70            2 :     let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
      71            2 :     assert!(matches!(e, EvictionError::NotFound));
      72            2 : 
      73            2 :     // on accesses when the layer is evicted, it will automatically be downloaded.
      74            2 :     let img_after = {
      75            2 :         let mut data = ValueReconstructState::default();
      76            2 :         layer
      77            2 :             .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
      78            2 :             .instrument(download_span.clone())
      79            9 :             .await
      80            2 :             .unwrap();
      81            2 :         data.img.take().unwrap()
      82            2 :     };
      83            2 : 
      84            2 :     assert_eq!(img_before, img_after);
      85            2 : 
      86            2 :     // evict_and_wait can timeout, but it doesn't cancel the evicting itself
      87            2 :     //
      88            2 :     // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
      89            2 :     // artificially slow it down.
      90            2 :     let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
      91            2 : 
      92            2 :     match layer
      93            2 :         .evict_and_wait(std::time::Duration::ZERO)
      94            2 :         .await
      95            2 :         .unwrap_err()
      96            2 :     {
      97            2 :         EvictionError::Timeout => {
      98            2 :             // expected, but note that the eviction is "still ongoing"
      99           34 :             helper.release().await;
     100            2 :             // exhaust spawn_blocking pool to ensure it is now complete
     101            2 :             SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
     102           10 :                 .await;
     103            2 :         }
     104            2 :         other => unreachable!("{other:?}"),
     105            2 :     }
     106            2 : 
     107            2 :     // only way to query if a layer is resident is to acquire a ResidentLayer instance.
     108            2 :     // Layer::keep_resident never downloads, but it might initialize if the layer file is found
     109            2 :     // downloaded locally.
     110            2 :     let none = layer.keep_resident().await;
     111            2 :     assert!(
     112            2 :         none.is_none(),
     113            2 :         "Expected none, because eviction removed the local file, found: {none:?}"
     114            2 :     );
     115            2 : 
     116            2 :     // plain downloading is rarely needed
     117            2 :     layer
     118            2 :         .download_and_keep_resident()
     119            2 :         .instrument(download_span)
     120            4 :         .await
     121            2 :         .unwrap();
     122            2 : 
     123            2 :     // last important part is deletion on drop: gc and compaction use it for compacted L0 layers
     124            2 :     // or fully garbage collected layers. deletion means deleting the local file, and scheduling a
     125            2 :     // deletion of the already unlinked from index_part.json remote file.
     126            2 :     //
     127            2 :     // marking a layer to be deleted on drop is irreversible; there is no technical reason against
     128            2 :     // reversiblity, but currently it is not needed so it is not provided.
     129            2 :     layer.delete_on_drop();
     130            2 : 
     131            2 :     let path = layer.local_path().to_owned();
     132            2 : 
     133            2 :     // wait_drop produces an unconnected to Layer future which will resolve when the
     134            2 :     // LayerInner::drop has completed.
     135            2 :     let mut wait_drop = std::pin::pin!(layer.wait_drop());
     136            2 : 
     137            2 :     // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
     138            2 :     // until here
     139            2 :     tokio::time::pause();
     140            2 :     tokio::time::timeout(ADVANCE, &mut wait_drop)
     141            2 :         .await
     142            2 :         .expect_err("should had timed out because two strong references exist");
     143            2 : 
     144            2 :     tokio::fs::metadata(&path)
     145            2 :         .await
     146            2 :         .expect("the local layer file still exists");
     147            2 : 
     148            2 :     let rtc = &timeline.remote_client;
     149            2 : 
     150            2 :     {
     151            2 :         let layers = &[layer];
     152            2 :         let mut g = timeline.layers.write().await;
     153            2 :         g.finish_gc_timeline(layers);
     154            2 :         // this just updates the remote_physical_size for demonstration purposes
     155            2 :         rtc.schedule_gc_update(layers).unwrap();
     156            2 :     }
     157            2 : 
     158            2 :     // when strong references are dropped, the file is deleted and remote deletion is scheduled
     159            2 :     wait_drop.await;
     160            2 : 
     161            2 :     let e = tokio::fs::metadata(&path)
     162            2 :         .await
     163            2 :         .expect_err("the local file is deleted");
     164            2 :     assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
     165            2 : 
     166            2 :     rtc.wait_completion().await.unwrap();
     167            2 : 
     168            2 :     assert_eq!(rtc.get_remote_physical_size(), 0);
     169            2 :     assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     170            2 : }
     171              : 
     172              : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
     173              : /// time. Now both of them complete per Arc drop semantics.
     174              : #[tokio::test(start_paused = true)]
     175            2 : async fn evict_and_wait_on_wanted_deleted() {
     176            2 :     // this is the runtime on which Layer spawns the blocking tasks on
     177            2 :     let handle = tokio::runtime::Handle::current();
     178            2 : 
     179            2 :     let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
     180            2 :         .await
     181            2 :         .unwrap();
     182            2 :     utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
     183            8 :     let (tenant, ctx) = h.load().await;
     184            2 : 
     185            2 :     let timeline = tenant
     186            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     187            6 :         .await
     188            2 :         .unwrap();
     189            2 : 
     190            2 :     let layer = {
     191            2 :         let mut layers = {
     192            2 :             let layers = timeline.layers.read().await;
     193            2 :             layers.likely_resident_layers().collect::<Vec<_>>()
     194            2 :         };
     195            2 : 
     196            2 :         assert_eq!(layers.len(), 1);
     197            2 : 
     198            2 :         layers.swap_remove(0)
     199            2 :     };
     200            2 : 
     201            2 :     // setup done
     202            2 : 
     203            2 :     let resident = layer.keep_resident().await.unwrap();
     204            2 : 
     205            2 :     {
     206            2 :         let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     207            2 : 
     208            2 :         // drive the future to await on the status channel
     209            2 :         tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     210            2 :             .await
     211            2 :             .expect_err("should had been a timeout since we are holding the layer resident");
     212            2 : 
     213            2 :         layer.delete_on_drop();
     214            2 : 
     215            2 :         drop(resident);
     216            2 : 
     217            2 :         // make sure the eviction task gets to run
     218            8 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     219            2 : 
     220            2 :         let resident = layer.keep_resident().await;
     221            2 :         assert!(
     222            2 :             resident.is_none(),
     223            2 :             "keep_resident should not have re-initialized: {resident:?}"
     224            2 :         );
     225            2 : 
     226            2 :         evict_and_wait
     227            4 :             .await
     228            2 :             .expect("evict_and_wait should had succeeded");
     229            2 : 
     230            2 :         // works as intended
     231            2 :     }
     232            2 : 
     233            2 :     // assert that once we remove the `layer` from the layer map and drop our reference,
     234            2 :     // the deletion of the layer in remote_storage happens.
     235            2 :     {
     236            2 :         let mut layers = timeline.layers.write().await;
     237            2 :         layers.finish_gc_timeline(&[layer]);
     238            2 :     }
     239            2 : 
     240            8 :     SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     241            2 : 
     242            2 :     assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
     243            2 :     assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
     244            2 :     assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     245            2 :     assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
     246            2 :     assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     247            2 : }
     248              : 
     249              : /// This test ensures we are able to read the layer while the layer eviction has been
     250              : /// started but not completed.
     251              : #[test]
     252            2 : fn read_wins_pending_eviction() {
     253            2 :     let rt = tokio::runtime::Builder::new_current_thread()
     254            2 :         .max_blocking_threads(1)
     255            2 :         .enable_all()
     256            2 :         .start_paused(true)
     257            2 :         .build()
     258            2 :         .unwrap();
     259            2 : 
     260            2 :     rt.block_on(async move {
     261            2 :         // this is the runtime on which Layer spawns the blocking tasks on
     262            2 :         let handle = tokio::runtime::Handle::current();
     263            2 :         let h = TenantHarness::create("read_wins_pending_eviction")
     264            0 :             .await
     265            2 :             .unwrap();
     266            8 :         let (tenant, ctx) = h.load().await;
     267            2 :         let span = h.span();
     268            2 :         let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
     269              : 
     270            2 :         let timeline = tenant
     271            2 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     272            5 :             .await
     273            2 :             .unwrap();
     274              : 
     275            2 :         let layer = {
     276            2 :             let mut layers = {
     277            2 :                 let layers = timeline.layers.read().await;
     278            2 :                 layers.likely_resident_layers().collect::<Vec<_>>()
     279            2 :             };
     280            2 : 
     281            2 :             assert_eq!(layers.len(), 1);
     282              : 
     283            2 :             layers.swap_remove(0)
     284              :         };
     285              : 
     286              :         // setup done
     287              : 
     288            2 :         let resident = layer.keep_resident().await.unwrap();
     289            2 : 
     290            2 :         let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     291            2 : 
     292            2 :         // drive the future to await on the status channel
     293            2 :         tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     294            2 :             .await
     295            2 :             .expect_err("should had been a timeout since we are holding the layer resident");
     296            2 :         assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     297              : 
     298            2 :         let (completion, barrier) = utils::completion::channel();
     299            2 :         let (arrival, arrived_at_barrier) = utils::completion::channel();
     300            2 :         layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     301            2 :             Some(arrival),
     302            2 :             barrier,
     303            2 :         ));
     304            2 : 
     305            2 :         // now the eviction cannot proceed because the threads are consumed while completion exists
     306            2 :         drop(resident);
     307            2 :         arrived_at_barrier.wait().await;
     308            2 :         assert!(!layer.is_likely_resident());
     309              : 
     310              :         // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
     311            2 :         layer
     312            2 :             .0
     313            2 :             .get_or_maybe_download(false, None)
     314            2 :             .instrument(download_span)
     315            1 :             .await
     316            2 :             .expect("should had reinitialized without downloading");
     317            2 : 
     318            2 :         assert!(layer.is_likely_resident());
     319              : 
     320              :         // reinitialization notifies of new resident status, which should error out all evict_and_wait
     321            2 :         let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     322            0 :             .await
     323            2 :             .expect("no timeout, because get_or_maybe_download re-initialized")
     324            2 :             .expect_err("eviction should not have succeeded because re-initialized");
     325            2 : 
     326            2 :         // works as intended: evictions lose to "downloads"
     327            2 :         assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
     328            2 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     329              : 
     330              :         // this is not wrong: the eviction is technically still "on the way" as it's still queued
     331              :         // because of a failpoint
     332            2 :         assert_eq!(
     333            2 :             0,
     334            2 :             LAYER_IMPL_METRICS
     335            2 :                 .cancelled_evictions
     336            2 :                 .values()
     337           18 :                 .map(|ctr| ctr.get())
     338            2 :                 .sum::<u64>()
     339            2 :         );
     340              : 
     341            2 :         drop(completion);
     342            2 : 
     343            4 :         tokio::time::sleep(ADVANCE).await;
     344            2 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
     345            2 :             .await;
     346              : 
     347            2 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     348              : 
     349              :         // now we finally can observe the original eviction failing
     350              :         // it would had been possible to observe it earlier, but here it is guaranteed to have
     351              :         // happened.
     352            2 :         assert_eq!(
     353            2 :             1,
     354            2 :             LAYER_IMPL_METRICS
     355            2 :                 .cancelled_evictions
     356            2 :                 .values()
     357           18 :                 .map(|ctr| ctr.get())
     358            2 :                 .sum::<u64>()
     359            2 :         );
     360              : 
     361            2 :         assert_eq!(
     362            2 :             1,
     363            2 :             LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
     364            2 :         );
     365              : 
     366            2 :         assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     367            2 :     });
     368            2 : }
     369              : 
     370              : /// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
     371              : #[test]
     372            2 : fn multiple_pending_evictions_in_order() {
     373            2 :     let name = "multiple_pending_evictions_in_order";
     374            2 :     let in_order = true;
     375            2 :     multiple_pending_evictions_scenario(name, in_order);
     376            2 : }
     377              : 
     378              : /// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
     379              : #[test]
     380            2 : fn multiple_pending_evictions_out_of_order() {
     381            2 :     let name = "multiple_pending_evictions_out_of_order";
     382            2 :     let in_order = false;
     383            2 :     multiple_pending_evictions_scenario(name, in_order);
     384            2 : }
     385              : 
     386            4 : fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
     387            4 :     let rt = tokio::runtime::Builder::new_current_thread()
     388            4 :         .max_blocking_threads(1)
     389            4 :         .enable_all()
     390            4 :         .start_paused(true)
     391            4 :         .build()
     392            4 :         .unwrap();
     393            4 : 
     394            4 :     rt.block_on(async move {
     395            4 :         // this is the runtime on which Layer spawns the blocking tasks on
     396            4 :         let handle = tokio::runtime::Handle::current();
     397            4 :         let h = TenantHarness::create(name).await.unwrap();
     398           16 :         let (tenant, ctx) = h.load().await;
     399            4 :         let span = h.span();
     400            4 :         let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
     401              : 
     402            4 :         let timeline = tenant
     403            4 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     404           12 :             .await
     405            4 :             .unwrap();
     406              : 
     407            4 :         let layer = {
     408            4 :             let mut layers = {
     409            4 :                 let layers = timeline.layers.read().await;
     410            4 :                 layers.likely_resident_layers().collect::<Vec<_>>()
     411            4 :             };
     412            4 : 
     413            4 :             assert_eq!(layers.len(), 1);
     414              : 
     415            4 :             layers.swap_remove(0)
     416              :         };
     417              : 
     418              :         // setup done
     419              : 
     420            4 :         let resident = layer.keep_resident().await.unwrap();
     421            4 : 
     422            4 :         let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     423            4 : 
     424            4 :         // drive the future to await on the status channel
     425            4 :         tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     426            4 :             .await
     427            4 :             .expect_err("should had been a timeout since we are holding the layer resident");
     428            4 :         assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     429              : 
     430            4 :         let (completion1, barrier) = utils::completion::channel();
     431            4 :         let mut completion1 = Some(completion1);
     432            4 :         let (arrival, arrived_at_barrier) = utils::completion::channel();
     433            4 :         layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     434            4 :             Some(arrival),
     435            4 :             barrier,
     436            4 :         ));
     437            4 : 
     438            4 :         // now the eviction cannot proceed because we are simulating arbitrary long delay for the
     439            4 :         // eviction task start.
     440            4 :         drop(resident);
     441            4 :         assert!(!layer.is_likely_resident());
     442              : 
     443            4 :         arrived_at_barrier.wait().await;
     444              : 
     445              :         // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
     446            4 :         layer
     447            4 :             .0
     448            4 :             .get_or_maybe_download(false, None)
     449            4 :             .instrument(download_span)
     450            2 :             .await
     451            4 :             .expect("should had reinitialized without downloading");
     452            4 : 
     453            4 :         assert!(layer.is_likely_resident());
     454              : 
     455              :         // reinitialization notifies of new resident status, which should error out all evict_and_wait
     456            4 :         let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     457            0 :             .await
     458            4 :             .expect("no timeout, because get_or_maybe_download re-initialized")
     459            4 :             .expect_err("eviction should not have succeeded because re-initialized");
     460            4 : 
     461            4 :         // works as intended: evictions lose to "downloads"
     462            4 :         assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
     463            4 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     464              : 
     465              :         // this is not wrong: the eviction is technically still "on the way" as it's still queued
     466              :         // because of a failpoint
     467            4 :         assert_eq!(
     468            4 :             0,
     469            4 :             LAYER_IMPL_METRICS
     470            4 :                 .cancelled_evictions
     471            4 :                 .values()
     472           36 :                 .map(|ctr| ctr.get())
     473            4 :                 .sum::<u64>()
     474            4 :         );
     475              : 
     476            4 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     477              : 
     478              :         // configure another failpoint for the second eviction -- evictions are per initialization,
     479              :         // so now that we've reinitialized the inner, we get to run two of them at the same time.
     480            4 :         let (completion2, barrier) = utils::completion::channel();
     481            4 :         let (arrival, arrived_at_barrier) = utils::completion::channel();
     482            4 :         layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     483            4 :             Some(arrival),
     484            4 :             barrier,
     485            4 :         ));
     486            4 : 
     487            4 :         let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
     488            4 : 
     489            4 :         // advance to the wait on the queue
     490            4 :         tokio::time::timeout(ADVANCE, &mut second_eviction)
     491            8 :             .await
     492            4 :             .expect_err("timeout because failpoint is blocking");
     493            4 : 
     494            4 :         arrived_at_barrier.wait().await;
     495              : 
     496            4 :         assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
     497              : 
     498            4 :         let mut release_earlier_eviction = |expected_reason| {
     499            4 :             assert_eq!(
     500            4 :                 0,
     501            4 :                 LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
     502            4 :             );
     503              : 
     504            4 :             drop(completion1.take().unwrap());
     505            4 : 
     506            4 :             let handle = &handle;
     507              : 
     508            4 :             async move {
     509            4 :                 tokio::time::sleep(ADVANCE).await;
     510            4 :                 SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
     511            4 :                     handle, 1,
     512            4 :                 )
     513            7 :                 .await;
     514              : 
     515            4 :                 assert_eq!(
     516            4 :                     1,
     517            4 :                     LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
     518            4 :                 );
     519            4 :             }
     520            4 :         };
     521              : 
     522            4 :         if in_order {
     523            5 :             release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
     524            2 :         }
     525              : 
     526              :         // release the later eviction which is for the current version
     527            4 :         drop(completion2);
     528            8 :         tokio::time::sleep(ADVANCE).await;
     529            4 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
     530            7 :             .await;
     531              : 
     532            4 :         if !in_order {
     533            6 :             release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
     534            2 :         }
     535              : 
     536            4 :         tokio::time::timeout(ADVANCE, &mut second_eviction)
     537            0 :             .await
     538            4 :             .expect("eviction goes through now that spawn_blocking is unclogged")
     539            4 :             .expect("eviction should succeed, because version matches");
     540            4 : 
     541            4 :         assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
     542              : 
     543              :         // ensure the cancelled are unchanged
     544            4 :         assert_eq!(
     545            4 :             1,
     546            4 :             LAYER_IMPL_METRICS
     547            4 :                 .cancelled_evictions
     548            4 :                 .values()
     549           36 :                 .map(|ctr| ctr.get())
     550            4 :                 .sum::<u64>()
     551            4 :         );
     552              : 
     553            4 :         assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     554            4 :     });
     555            4 : }
     556              : 
     557              : /// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
     558              : /// a `Layer::keep_resident` call.
     559              : ///
     560              : /// This matters because cancelling the eviction would leave us in a state where the file is on
     561              : /// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
     562              : /// have non-repairing `Layer::is_likely_resident`.
     563              : #[tokio::test(start_paused = true)]
     564            2 : async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
     565            2 :     let handle = tokio::runtime::Handle::current();
     566            2 :     let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
     567            2 :         .await
     568            2 :         .unwrap();
     569            8 :     let (tenant, ctx) = h.load().await;
     570            2 : 
     571            2 :     let timeline = tenant
     572            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     573            6 :         .await
     574            2 :         .unwrap();
     575            2 : 
     576            2 :     let layer = {
     577            2 :         let mut layers = {
     578            2 :             let layers = timeline.layers.read().await;
     579            2 :             layers.likely_resident_layers().collect::<Vec<_>>()
     580            2 :         };
     581            2 : 
     582            2 :         assert_eq!(layers.len(), 1);
     583            2 : 
     584            2 :         layers.swap_remove(0)
     585            2 :     };
     586            2 : 
     587            2 :     // this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
     588            2 :     // Err) at the right time as in "during" the `LayerInner::needs_download`.
     589            2 :     layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
     590            2 : 
     591            2 :     let (completion, barrier) = utils::completion::channel();
     592            2 :     let (arrival, arrived_at_barrier) = utils::completion::channel();
     593            2 : 
     594            2 :     layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     595            2 :         Some(arrival),
     596            2 :         barrier,
     597            2 :     ));
     598            2 : 
     599            2 :     tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
     600            2 :         .await
     601            2 :         .expect_err("should had advanced to waiting on channel");
     602            2 : 
     603            2 :     arrived_at_barrier.wait().await;
     604            2 : 
     605            2 :     // simulate a cancelled read which is cancelled before it gets to re-initialize
     606            2 :     let e = layer
     607            2 :         .0
     608            2 :         .get_or_maybe_download(false, None)
     609            2 :         .await
     610            2 :         .unwrap_err();
     611            2 :     assert!(
     612            2 :         matches!(
     613            2 :             e,
     614            2 :             DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
     615            2 :         ),
     616            2 :         "{e:?}"
     617            2 :     );
     618            2 : 
     619            2 :     assert!(
     620            2 :         layer.0.needs_download().await.unwrap().is_none(),
     621            2 :         "file is still on disk"
     622            2 :     );
     623            2 : 
     624            2 :     // release the eviction task
     625            2 :     drop(completion);
     626            2 :     tokio::time::sleep(ADVANCE).await;
     627           22 :     SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     628            2 : 
     629            2 :     // failpoint is still enabled, but it is not hit
     630            2 :     let e = layer
     631            2 :         .0
     632            2 :         .get_or_maybe_download(false, None)
     633            2 :         .await
     634            2 :         .unwrap_err();
     635            2 :     assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
     636            2 : 
     637            2 :     // failpoint is not counted as cancellation either
     638            2 :     assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     639            2 : }
     640              : 
     641              : #[tokio::test(start_paused = true)]
     642            2 : async fn evict_and_wait_does_not_wait_for_download() {
     643            2 :     // let handle = tokio::runtime::Handle::current();
     644            2 :     let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
     645            2 :         .await
     646            2 :         .unwrap();
     647            8 :     let (tenant, ctx) = h.load().await;
     648            2 :     let span = h.span();
     649            2 :     let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
     650            2 : 
     651            2 :     let timeline = tenant
     652            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     653            6 :         .await
     654            2 :         .unwrap();
     655            2 : 
     656            2 :     let layer = {
     657            2 :         let mut layers = {
     658            2 :             let layers = timeline.layers.read().await;
     659            2 :             layers.likely_resident_layers().collect::<Vec<_>>()
     660            2 :         };
     661            2 : 
     662            2 :         assert_eq!(layers.len(), 1);
     663            2 : 
     664            2 :         layers.swap_remove(0)
     665            2 :     };
     666            2 : 
     667            2 :     // kind of forced setup: start an eviction but do not allow it progress until we are
     668            2 :     // downloading
     669            2 :     let (eviction_can_continue, barrier) = utils::completion::channel();
     670            2 :     let (arrival, eviction_arrived) = utils::completion::channel();
     671            2 :     layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     672            2 :         Some(arrival),
     673            2 :         barrier,
     674            2 :     ));
     675            2 : 
     676            2 :     let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     677            2 : 
     678            2 :     // use this once-awaited other_evict to synchronize with the eviction
     679            2 :     let other_evict = layer.evict_and_wait(FOREVER);
     680            2 : 
     681            2 :     tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     682            2 :         .await
     683            2 :         .expect_err("should had advanced");
     684            2 :     eviction_arrived.wait().await;
     685            2 :     drop(eviction_can_continue);
     686            2 :     other_evict.await.unwrap();
     687            2 : 
     688            2 :     // now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
     689            2 :     assert!(!layer.is_likely_resident());
     690            2 : 
     691            2 :     // following new evict_and_wait will fail until we've completed the download
     692            2 :     let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
     693            2 :     assert!(matches!(e, EvictionError::NotFound), "{e:?}");
     694            2 : 
     695            2 :     let (download_can_continue, barrier) = utils::completion::channel();
     696            2 :     let (arrival, _download_arrived) = utils::completion::channel();
     697            2 :     layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
     698            2 : 
     699            2 :     let mut download = std::pin::pin!(layer
     700            2 :         .0
     701            2 :         .get_or_maybe_download(true, None)
     702            2 :         .instrument(download_span));
     703            2 : 
     704            2 :     assert!(
     705            2 :         !layer.is_likely_resident(),
     706            2 :         "during download layer is evicted"
     707            2 :     );
     708            2 : 
     709            2 :     tokio::time::timeout(ADVANCE, &mut download)
     710            5 :         .await
     711            2 :         .expect_err("should had timed out because of failpoint");
     712            2 : 
     713            2 :     // now we finally get to continue, and because the latest state is downloading, we deduce that
     714            2 :     // original eviction succeeded
     715            2 :     evict_and_wait.await.unwrap();
     716            2 : 
     717            2 :     // however a new evict_and_wait will fail
     718            2 :     let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
     719            2 :     assert!(matches!(e, EvictionError::NotFound), "{e:?}");
     720            2 : 
     721            2 :     assert!(!layer.is_likely_resident());
     722            2 : 
     723            2 :     drop(download_can_continue);
     724            2 :     download.await.expect("download should had succeeded");
     725            2 :     assert!(layer.is_likely_resident());
     726            2 : 
     727            2 :     // only now can we evict
     728            2 :     layer.evict_and_wait(FOREVER).await.unwrap();
     729            2 : }
     730              : 
     731              : /// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
     732              : /// which is the last value.
     733              : ///
     734              : /// Also checks that the same does not happen on a non-evicted layer (regression test).
     735              : #[tokio::test(start_paused = true)]
     736            2 : async fn eviction_cancellation_on_drop() {
     737            2 :     use crate::repository::Value;
     738            2 :     use bytes::Bytes;
     739            2 : 
     740            2 :     // this is the runtime on which Layer spawns the blocking tasks on
     741            2 :     let handle = tokio::runtime::Handle::current();
     742            2 : 
     743            2 :     let h = TenantHarness::create("eviction_cancellation_on_drop")
     744            2 :         .await
     745            2 :         .unwrap();
     746            2 :     utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
     747            8 :     let (tenant, ctx) = h.load().await;
     748            2 : 
     749            2 :     let timeline = tenant
     750            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     751            6 :         .await
     752            2 :         .unwrap();
     753            2 : 
     754            2 :     {
     755            2 :         // create_test_timeline wrote us one layer, write another
     756            2 :         let mut writer = timeline.writer().await;
     757            2 :         writer
     758            2 :             .put(
     759            2 :                 Key::from_i128(5),
     760            2 :                 Lsn(0x20),
     761            2 :                 &Value::Image(Bytes::from_static(b"this does not matter either")),
     762            2 :                 &ctx,
     763            2 :             )
     764            2 :             .await
     765            2 :             .unwrap();
     766            2 : 
     767            2 :         writer.finish_write(Lsn(0x20));
     768            2 :     }
     769            2 : 
     770            2 :     timeline.freeze_and_flush().await.unwrap();
     771            2 : 
     772            2 :     // wait for the upload to complete so our Arc::strong_count assertion holds
     773            2 :     timeline.remote_client.wait_completion().await.unwrap();
     774            2 : 
     775            2 :     let (evicted_layer, not_evicted) = {
     776            2 :         let mut layers = {
     777            2 :             let mut guard = timeline.layers.write().await;
     778            2 :             let layers = guard.likely_resident_layers().collect::<Vec<_>>();
     779            2 :             // remove the layers from layermap
     780            2 :             guard.finish_gc_timeline(&layers);
     781            2 : 
     782            2 :             layers
     783            2 :         };
     784            2 : 
     785            2 :         assert_eq!(layers.len(), 2);
     786            2 : 
     787            2 :         (layers.pop().unwrap(), layers.pop().unwrap())
     788            2 :     };
     789            2 : 
     790            2 :     let victims = [(evicted_layer, true), (not_evicted, false)];
     791            2 : 
     792            6 :     for (victim, evict) in victims {
     793            4 :         let resident = victim.keep_resident().await.unwrap();
     794            4 :         drop(victim);
     795            4 : 
     796            4 :         assert_eq!(Arc::strong_count(&resident.owner.0), 1);
     797            2 : 
     798            4 :         if evict {
     799            2 :             let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
     800            2 : 
     801            2 :             // drive the future to await on the status channel, and then drop it
     802            2 :             tokio::time::timeout(ADVANCE, evict_and_wait)
     803            2 :                 .await
     804            2 :                 .expect_err("should had been a timeout since we are holding the layer resident");
     805            2 :         }
     806            2 : 
     807            2 :         // 1 == we only evict one of the layers
     808            4 :         assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     809            2 : 
     810            4 :         drop(resident);
     811            4 : 
     812            4 :         // run any spawned
     813            4 :         tokio::time::sleep(ADVANCE).await;
     814            2 : 
     815          101 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     816            2 : 
     817            4 :         assert_eq!(
     818            4 :             1,
     819            4 :             LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
     820            4 :         );
     821            2 :     }
     822            2 : }
     823              : 
     824              : /// A test case to remind you the cost of these structures. You can bump the size limit
     825              : /// below if it is really necessary to add more fields to the structures.
     826              : #[test]
     827              : #[cfg(target_arch = "x86_64")]
     828            2 : fn layer_size() {
     829            2 :     assert_eq!(std::mem::size_of::<LayerAccessStats>(), 2040);
     830            2 :     assert_eq!(std::mem::size_of::<PersistentLayerDesc>(), 104);
     831            2 :     assert_eq!(std::mem::size_of::<LayerInner>(), 2344);
     832              :     // it also has the utf8 path
     833            2 : }
     834              : 
     835              : struct SpawnBlockingPoolHelper {
     836              :     awaited_by_spawn_blocking_tasks: Completion,
     837              :     blocking_tasks: JoinSet<()>,
     838              : }
     839              : 
     840              : impl SpawnBlockingPoolHelper {
     841              :     /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
     842              :     /// release is called.
     843              :     ///
     844              :     /// In the tests this can be used to ensure something cannot be started on the target runtimes
     845              :     /// spawn_blocking pool.
     846              :     ///
     847              :     /// This should be no issue nowdays, because nextest runs each test in it's own process.
     848            2 :     async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
     849            2 :         let default_max_blocking_threads = 512;
     850            2 : 
     851            2 :         Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
     852            2 :     }
     853              : 
     854           26 :     async fn consume_all_spawn_blocking_threads0(
     855           26 :         handle: &tokio::runtime::Handle,
     856           26 :         threads: usize,
     857           26 :     ) -> Self {
     858           26 :         assert_ne!(threads, 0);
     859              : 
     860           26 :         let (completion, barrier) = completion::channel();
     861           26 :         let (started, starts_completed) = completion::channel();
     862           26 : 
     863           26 :         let mut blocking_tasks = JoinSet::new();
     864           26 : 
     865         7180 :         for _ in 0..threads {
     866         7180 :             let barrier = barrier.clone();
     867         7180 :             let started = started.clone();
     868         7180 :             blocking_tasks.spawn_blocking_on(
     869         7180 :                 move || {
     870         7180 :                     drop(started);
     871         7180 :                     tokio::runtime::Handle::current().block_on(barrier.wait());
     872         7180 :                 },
     873         7180 :                 handle,
     874         7180 :             );
     875         7180 :         }
     876              : 
     877           26 :         drop(started);
     878           26 : 
     879           26 :         starts_completed.wait().await;
     880              : 
     881           26 :         drop(barrier);
     882           26 : 
     883           26 :         tracing::trace!("consumed all threads");
     884              : 
     885           26 :         SpawnBlockingPoolHelper {
     886           26 :             awaited_by_spawn_blocking_tasks: completion,
     887           26 :             blocking_tasks,
     888           26 :         }
     889           26 :     }
     890              : 
     891              :     /// Release all previously blocked spawn_blocking threads
     892           26 :     async fn release(self) {
     893           26 :         let SpawnBlockingPoolHelper {
     894           26 :             awaited_by_spawn_blocking_tasks,
     895           26 :             mut blocking_tasks,
     896           26 :         } = self;
     897           26 : 
     898           26 :         drop(awaited_by_spawn_blocking_tasks);
     899              : 
     900         7206 :         while let Some(res) = blocking_tasks.join_next().await {
     901         7180 :             res.expect("none of the tasks should had panicked");
     902         7180 :         }
     903              : 
     904           26 :         tracing::trace!("released all threads");
     905           26 :     }
     906              : 
     907              :     /// In the tests it is used as an easy way of making sure something scheduled on the target
     908              :     /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
     909              :     /// before our tasks have a chance to schedule and complete.
     910           12 :     async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
     911          149 :         Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
     912           12 :     }
     913              : 
     914           22 :     async fn consume_and_release_all_of_spawn_blocking_threads0(
     915           22 :         handle: &tokio::runtime::Handle,
     916           22 :         threads: usize,
     917           22 :     ) {
     918           22 :         Self::consume_all_spawn_blocking_threads0(handle, threads)
     919           18 :             .await
     920           22 :             .release()
     921          147 :             .await
     922           22 :     }
     923              : }
     924              : 
     925              : #[test]
     926            2 : fn spawn_blocking_pool_helper_actually_works() {
     927            2 :     // create a custom runtime for which we know and control how many blocking threads it has
     928            2 :     //
     929            2 :     // because the amount is not configurable for our helper, expect the same amount as
     930            2 :     // BACKGROUND_RUNTIME using the tokio defaults would have.
     931            2 :     let rt = tokio::runtime::Builder::new_current_thread()
     932            2 :         .max_blocking_threads(1)
     933            2 :         .enable_all()
     934            2 :         .build()
     935            2 :         .unwrap();
     936            2 : 
     937            2 :     let handle = rt.handle();
     938            2 : 
     939            2 :     rt.block_on(async move {
     940              :         // this will not return until all threads are spun up and actually executing the code
     941              :         // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
     942            2 :         let consumed =
     943            2 :             SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
     944              : 
     945            2 :         println!("consumed");
     946            2 : 
     947            2 :         let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
     948            2 :             // this will not get to run before we release
     949            2 :         }));
     950            2 : 
     951            2 :         println!("spawned");
     952            2 : 
     953            2 :         tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
     954            2 :             .await
     955            2 :             .expect_err("the task should not have gotten to run yet");
     956            2 : 
     957            2 :         println!("tried to join");
     958            2 : 
     959            2 :         consumed.release().await;
     960              : 
     961            2 :         println!("released");
     962            2 : 
     963            2 :         tokio::time::timeout(std::time::Duration::from_secs(1), jh)
     964            0 :             .await
     965            2 :             .expect("no timeout")
     966            2 :             .expect("no join error");
     967            2 : 
     968            2 :         println!("joined");
     969            2 :     });
     970            2 : }
        

Generated by: LCOV version 2.1-beta