LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer/layer - tests.rs (source / functions) Coverage Total Hit
Test: 90b23405d17e36048d3bb64e314067f397803f1b.info Lines: 99.4 % 877 872
Test Date: 2024-09-20 13:14:58 Functions: 100.0 % 44 44

            Line data    Source code
       1              : use std::time::UNIX_EPOCH;
       2              : 
       3              : use pageserver_api::key::CONTROLFILE_KEY;
       4              : use tokio::task::JoinSet;
       5              : use utils::{
       6              :     completion::{self, Completion},
       7              :     id::TimelineId,
       8              : };
       9              : 
      10              : use super::failpoints::{Failpoint, FailpointKind};
      11              : use super::*;
      12              : use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint};
      13              : use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
      14              : 
      15              : /// Used in tests to advance a future to wanted await point, and not futher.
      16              : const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
      17              : 
      18              : /// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
      19              : /// timeout uses to advance futures.
      20              : const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
      21              : 
      22              : /// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
      23              : #[tokio::test]
      24            6 : async fn smoke_test() {
      25            6 :     let handle = tokio::runtime::Handle::current();
      26            6 : 
      27            6 :     let h = TenantHarness::create("smoke_test").await.unwrap();
      28            6 :     let span = h.span();
      29            6 :     let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
      30           24 :     let (tenant, _) = h.load().await;
      31            6 : 
      32            6 :     let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
      33            6 : 
      34            6 :     let timeline = tenant
      35            6 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
      36           12 :         .await
      37            6 :         .unwrap();
      38            6 : 
      39            6 :     let layer = {
      40            6 :         let mut layers = {
      41            6 :             let layers = timeline.layers.read().await;
      42            6 :             layers.likely_resident_layers().cloned().collect::<Vec<_>>()
      43            6 :         };
      44            6 : 
      45            6 :         assert_eq!(layers.len(), 1);
      46            6 : 
      47            6 :         layers.swap_remove(0)
      48            6 :     };
      49            6 : 
      50            6 :     // all layers created at pageserver are like `layer`, initialized with strong
      51            6 :     // Arc<DownloadedLayer>.
      52            6 : 
      53            6 :     let controlfile_keyspace = KeySpace {
      54            6 :         ranges: vec![CONTROLFILE_KEY..CONTROLFILE_KEY.next()],
      55            6 :     };
      56            6 : 
      57            6 :     let img_before = {
      58            6 :         let mut data = ValuesReconstructState::default();
      59            6 :         layer
      60            6 :             .get_values_reconstruct_data(
      61            6 :                 controlfile_keyspace.clone(),
      62            6 :                 Lsn(0x10)..Lsn(0x11),
      63            6 :                 &mut data,
      64            6 :                 &ctx,
      65            6 :             )
      66           12 :             .await
      67            6 :             .unwrap();
      68            6 :         data.keys
      69            6 :             .remove(&CONTROLFILE_KEY)
      70            6 :             .expect("must be present")
      71            6 :             .expect("should not error")
      72            6 :             .img
      73            6 :             .take()
      74            6 :             .expect("tenant harness writes the control file")
      75            6 :     };
      76            6 : 
      77            6 :     // important part is evicting the layer, which can be done when there are no more ResidentLayer
      78            6 :     // instances -- there currently are none, only two `Layer` values, one in the layermap and on
      79            6 :     // in scope.
      80            6 :     layer.evict_and_wait(FOREVER).await.unwrap();
      81            6 : 
      82            6 :     // double-evict returns an error, which is valid if both eviction_task and disk usage based
      83            6 :     // eviction would both evict the same layer at the same time.
      84            6 : 
      85            6 :     let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
      86            6 :     assert!(matches!(e, EvictionError::NotFound));
      87            6 : 
      88            6 :     // on accesses when the layer is evicted, it will automatically be downloaded.
      89            6 :     let img_after = {
      90            6 :         let mut data = ValuesReconstructState::default();
      91            6 :         layer
      92            6 :             .get_values_reconstruct_data(
      93            6 :                 controlfile_keyspace.clone(),
      94            6 :                 Lsn(0x10)..Lsn(0x11),
      95            6 :                 &mut data,
      96            6 :                 &ctx,
      97            6 :             )
      98            6 :             .instrument(download_span.clone())
      99           24 :             .await
     100            6 :             .unwrap();
     101            6 :         data.keys
     102            6 :             .remove(&CONTROLFILE_KEY)
     103            6 :             .expect("must be present")
     104            6 :             .expect("should not error")
     105            6 :             .img
     106            6 :             .take()
     107            6 :             .expect("tenant harness writes the control file")
     108            6 :     };
     109            6 : 
     110            6 :     assert_eq!(img_before, img_after);
     111            6 : 
     112            6 :     // evict_and_wait can timeout, but it doesn't cancel the evicting itself
     113            6 :     //
     114            6 :     // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
     115            6 :     // artificially slow it down.
     116            6 :     let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
     117            6 : 
     118            6 :     match layer
     119            6 :         .evict_and_wait(std::time::Duration::ZERO)
     120            6 :         .await
     121            6 :         .unwrap_err()
     122            6 :     {
     123            6 :         EvictionError::Timeout => {
     124            6 :             // expected, but note that the eviction is "still ongoing"
     125           32 :             helper.release().await;
     126            6 :             // exhaust spawn_blocking pool to ensure it is now complete
     127            6 :             SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
     128           32 :                 .await;
     129            6 :         }
     130            6 :         other => unreachable!("{other:?}"),
     131            6 :     }
     132            6 : 
     133            6 :     // only way to query if a layer is resident is to acquire a ResidentLayer instance.
     134            6 :     // Layer::keep_resident never downloads, but it might initialize if the layer file is found
     135            6 :     // downloaded locally.
     136            6 :     let none = layer.keep_resident().await;
     137            6 :     assert!(
     138            6 :         none.is_none(),
     139            6 :         "Expected none, because eviction removed the local file, found: {none:?}"
     140            6 :     );
     141            6 : 
     142            6 :     // plain downloading is rarely needed
     143            6 :     layer
     144            6 :         .download_and_keep_resident()
     145            6 :         .instrument(download_span)
     146           14 :         .await
     147            6 :         .unwrap();
     148            6 : 
     149            6 :     // last important part is deletion on drop: gc and compaction use it for compacted L0 layers
     150            6 :     // or fully garbage collected layers. deletion means deleting the local file, and scheduling a
     151            6 :     // deletion of the already unlinked from index_part.json remote file.
     152            6 :     //
     153            6 :     // marking a layer to be deleted on drop is irreversible; there is no technical reason against
     154            6 :     // reversiblity, but currently it is not needed so it is not provided.
     155            6 :     layer.delete_on_drop();
     156            6 : 
     157            6 :     let path = layer.local_path().to_owned();
     158            6 : 
     159            6 :     // wait_drop produces an unconnected to Layer future which will resolve when the
     160            6 :     // LayerInner::drop has completed.
     161            6 :     let mut wait_drop = std::pin::pin!(layer.wait_drop());
     162            6 : 
     163            6 :     // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
     164            6 :     // until here
     165            6 :     tokio::time::pause();
     166            6 :     tokio::time::timeout(ADVANCE, &mut wait_drop)
     167            6 :         .await
     168            6 :         .expect_err("should had timed out because two strong references exist");
     169            6 : 
     170            6 :     tokio::fs::metadata(&path)
     171            6 :         .await
     172            6 :         .expect("the local layer file still exists");
     173            6 : 
     174            6 :     let rtc = &timeline.remote_client;
     175            6 : 
     176            6 :     {
     177            6 :         let layers = &[layer];
     178            6 :         let mut g = timeline.layers.write().await;
     179            6 :         g.open_mut().unwrap().finish_gc_timeline(layers);
     180            6 :         // this just updates the remote_physical_size for demonstration purposes
     181            6 :         rtc.schedule_gc_update(layers).unwrap();
     182            6 :     }
     183            6 : 
     184            6 :     // when strong references are dropped, the file is deleted and remote deletion is scheduled
     185            6 :     wait_drop.await;
     186            6 : 
     187            6 :     let e = tokio::fs::metadata(&path)
     188            6 :         .await
     189            6 :         .expect_err("the local file is deleted");
     190            6 :     assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
     191            6 : 
     192            6 :     rtc.wait_completion().await.unwrap();
     193            6 : 
     194            6 :     assert_eq!(rtc.get_remote_physical_size(), 0);
     195            6 :     assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     196            6 : }
     197              : 
     198              : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
     199              : /// time. Now both of them complete per Arc drop semantics.
     200              : #[tokio::test(start_paused = true)]
     201            6 : async fn evict_and_wait_on_wanted_deleted() {
     202            6 :     // this is the runtime on which Layer spawns the blocking tasks on
     203            6 :     let handle = tokio::runtime::Handle::current();
     204            6 : 
     205            6 :     let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
     206            6 :         .await
     207            6 :         .unwrap();
     208            6 :     utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
     209           24 :     let (tenant, ctx) = h.load().await;
     210            6 : 
     211            6 :     let timeline = tenant
     212            6 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     213           12 :         .await
     214            6 :         .unwrap();
     215            6 : 
     216            6 :     let layer = {
     217            6 :         let mut layers = {
     218            6 :             let layers = timeline.layers.read().await;
     219            6 :             layers.likely_resident_layers().cloned().collect::<Vec<_>>()
     220            6 :         };
     221            6 : 
     222            6 :         assert_eq!(layers.len(), 1);
     223            6 : 
     224            6 :         layers.swap_remove(0)
     225            6 :     };
     226            6 : 
     227            6 :     // setup done
     228            6 : 
     229            6 :     let resident = layer.keep_resident().await.unwrap();
     230            6 : 
     231            6 :     {
     232            6 :         let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     233            6 : 
     234            6 :         // drive the future to await on the status channel
     235            6 :         tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     236            6 :             .await
     237            6 :             .expect_err("should had been a timeout since we are holding the layer resident");
     238            6 : 
     239            6 :         layer.delete_on_drop();
     240            6 : 
     241            6 :         drop(resident);
     242            6 : 
     243            6 :         // make sure the eviction task gets to run
     244           73 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     245            6 : 
     246            6 :         let resident = layer.keep_resident().await;
     247            6 :         assert!(
     248            6 :             resident.is_none(),
     249            6 :             "keep_resident should not have re-initialized: {resident:?}"
     250            6 :         );
     251            6 : 
     252            6 :         evict_and_wait
     253            6 :             .await
     254            6 :             .expect("evict_and_wait should had succeeded");
     255            6 : 
     256            6 :         // works as intended
     257            6 :     }
     258            6 : 
     259            6 :     // assert that once we remove the `layer` from the layer map and drop our reference,
     260            6 :     // the deletion of the layer in remote_storage happens.
     261            6 :     {
     262            6 :         let mut layers = timeline.layers.write().await;
     263            6 :         layers.open_mut().unwrap().finish_gc_timeline(&[layer]);
     264            6 :     }
     265            6 : 
     266           41 :     SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     267            6 : 
     268            6 :     assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
     269            6 :     assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
     270            6 :     assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     271            6 :     assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
     272            6 :     assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     273            6 : }
     274              : 
     275              : /// This test ensures we are able to read the layer while the layer eviction has been
     276              : /// started but not completed.
     277              : #[test]
     278            6 : fn read_wins_pending_eviction() {
     279            6 :     let rt = tokio::runtime::Builder::new_current_thread()
     280            6 :         .max_blocking_threads(1)
     281            6 :         .enable_all()
     282            6 :         .start_paused(true)
     283            6 :         .build()
     284            6 :         .unwrap();
     285            6 : 
     286            6 :     rt.block_on(async move {
     287            6 :         // this is the runtime on which Layer spawns the blocking tasks on
     288            6 :         let handle = tokio::runtime::Handle::current();
     289            6 :         let h = TenantHarness::create("read_wins_pending_eviction")
     290            0 :             .await
     291            6 :             .unwrap();
     292           23 :         let (tenant, ctx) = h.load().await;
     293            6 :         let span = h.span();
     294            6 :         let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
     295              : 
     296            6 :         let timeline = tenant
     297            6 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     298           12 :             .await
     299            6 :             .unwrap();
     300              : 
     301            6 :         let layer = {
     302            6 :             let mut layers = {
     303            6 :                 let layers = timeline.layers.read().await;
     304            6 :                 layers.likely_resident_layers().cloned().collect::<Vec<_>>()
     305            6 :             };
     306            6 : 
     307            6 :             assert_eq!(layers.len(), 1);
     308              : 
     309            6 :             layers.swap_remove(0)
     310              :         };
     311              : 
     312              :         // setup done
     313              : 
     314            6 :         let resident = layer.keep_resident().await.unwrap();
     315            6 : 
     316            6 :         let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     317            6 : 
     318            6 :         // drive the future to await on the status channel
     319            6 :         tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     320            6 :             .await
     321            6 :             .expect_err("should had been a timeout since we are holding the layer resident");
     322            6 :         assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     323              : 
     324            6 :         let (completion, barrier) = utils::completion::channel();
     325            6 :         let (arrival, arrived_at_barrier) = utils::completion::channel();
     326            6 :         layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     327            6 :             Some(arrival),
     328            6 :             barrier,
     329            6 :         ));
     330            6 : 
     331            6 :         // now the eviction cannot proceed because the threads are consumed while completion exists
     332            6 :         drop(resident);
     333            6 :         arrived_at_barrier.wait().await;
     334            6 :         assert!(!layer.is_likely_resident());
     335              : 
     336              :         // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
     337            6 :         layer
     338            6 :             .0
     339            6 :             .get_or_maybe_download(false, None)
     340            6 :             .instrument(download_span)
     341            6 :             .await
     342            6 :             .expect("should had reinitialized without downloading");
     343            6 : 
     344            6 :         assert!(layer.is_likely_resident());
     345              : 
     346              :         // reinitialization notifies of new resident status, which should error out all evict_and_wait
     347            6 :         let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     348            0 :             .await
     349            6 :             .expect("no timeout, because get_or_maybe_download re-initialized")
     350            6 :             .expect_err("eviction should not have succeeded because re-initialized");
     351            6 : 
     352            6 :         // works as intended: evictions lose to "downloads"
     353            6 :         assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
     354            6 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     355              : 
     356              :         // this is not wrong: the eviction is technically still "on the way" as it's still queued
     357              :         // because of a failpoint
     358            6 :         assert_eq!(
     359            6 :             0,
     360            6 :             LAYER_IMPL_METRICS
     361            6 :                 .cancelled_evictions
     362            6 :                 .values()
     363           54 :                 .map(|ctr| ctr.get())
     364            6 :                 .sum::<u64>()
     365            6 :         );
     366              : 
     367            6 :         drop(completion);
     368            6 : 
     369           12 :         tokio::time::sleep(ADVANCE).await;
     370            6 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
     371            7 :             .await;
     372              : 
     373            6 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     374              : 
     375              :         // now we finally can observe the original eviction failing
     376              :         // it would had been possible to observe it earlier, but here it is guaranteed to have
     377              :         // happened.
     378            6 :         assert_eq!(
     379            6 :             1,
     380            6 :             LAYER_IMPL_METRICS
     381            6 :                 .cancelled_evictions
     382            6 :                 .values()
     383           54 :                 .map(|ctr| ctr.get())
     384            6 :                 .sum::<u64>()
     385            6 :         );
     386              : 
     387            6 :         assert_eq!(
     388            6 :             1,
     389            6 :             LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
     390            6 :         );
     391              : 
     392            6 :         assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     393            6 :     });
     394            6 : }
     395              : 
     396              : /// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
     397              : #[test]
     398            6 : fn multiple_pending_evictions_in_order() {
     399            6 :     let name = "multiple_pending_evictions_in_order";
     400            6 :     let in_order = true;
     401            6 :     multiple_pending_evictions_scenario(name, in_order);
     402            6 : }
     403              : 
     404              : /// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
     405              : #[test]
     406            6 : fn multiple_pending_evictions_out_of_order() {
     407            6 :     let name = "multiple_pending_evictions_out_of_order";
     408            6 :     let in_order = false;
     409            6 :     multiple_pending_evictions_scenario(name, in_order);
     410            6 : }
     411              : 
     412           12 : fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
     413           12 :     let rt = tokio::runtime::Builder::new_current_thread()
     414           12 :         .max_blocking_threads(1)
     415           12 :         .enable_all()
     416           12 :         .start_paused(true)
     417           12 :         .build()
     418           12 :         .unwrap();
     419           12 : 
     420           12 :     rt.block_on(async move {
     421           12 :         // this is the runtime on which Layer spawns the blocking tasks on
     422           12 :         let handle = tokio::runtime::Handle::current();
     423           12 :         let h = TenantHarness::create(name).await.unwrap();
     424           48 :         let (tenant, ctx) = h.load().await;
     425           12 :         let span = h.span();
     426           12 :         let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
     427              : 
     428           12 :         let timeline = tenant
     429           12 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     430           24 :             .await
     431           12 :             .unwrap();
     432              : 
     433           12 :         let layer = {
     434           12 :             let mut layers = {
     435           12 :                 let layers = timeline.layers.read().await;
     436           12 :                 layers.likely_resident_layers().cloned().collect::<Vec<_>>()
     437           12 :             };
     438           12 : 
     439           12 :             assert_eq!(layers.len(), 1);
     440              : 
     441           12 :             layers.swap_remove(0)
     442              :         };
     443              : 
     444              :         // setup done
     445              : 
     446           12 :         let resident = layer.keep_resident().await.unwrap();
     447           12 : 
     448           12 :         let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     449           12 : 
     450           12 :         // drive the future to await on the status channel
     451           12 :         tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     452           12 :             .await
     453           12 :             .expect_err("should had been a timeout since we are holding the layer resident");
     454           12 :         assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     455              : 
     456           12 :         let (completion1, barrier) = utils::completion::channel();
     457           12 :         let mut completion1 = Some(completion1);
     458           12 :         let (arrival, arrived_at_barrier) = utils::completion::channel();
     459           12 :         layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     460           12 :             Some(arrival),
     461           12 :             barrier,
     462           12 :         ));
     463           12 : 
     464           12 :         // now the eviction cannot proceed because we are simulating arbitrary long delay for the
     465           12 :         // eviction task start.
     466           12 :         drop(resident);
     467           12 :         assert!(!layer.is_likely_resident());
     468              : 
     469           12 :         arrived_at_barrier.wait().await;
     470              : 
     471              :         // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
     472           12 :         layer
     473           12 :             .0
     474           12 :             .get_or_maybe_download(false, None)
     475           12 :             .instrument(download_span)
     476           12 :             .await
     477           12 :             .expect("should had reinitialized without downloading");
     478           12 : 
     479           12 :         assert!(layer.is_likely_resident());
     480              : 
     481              :         // reinitialization notifies of new resident status, which should error out all evict_and_wait
     482           12 :         let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     483            0 :             .await
     484           12 :             .expect("no timeout, because get_or_maybe_download re-initialized")
     485           12 :             .expect_err("eviction should not have succeeded because re-initialized");
     486           12 : 
     487           12 :         // works as intended: evictions lose to "downloads"
     488           12 :         assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
     489           12 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     490              : 
     491              :         // this is not wrong: the eviction is technically still "on the way" as it's still queued
     492              :         // because of a failpoint
     493           12 :         assert_eq!(
     494           12 :             0,
     495           12 :             LAYER_IMPL_METRICS
     496           12 :                 .cancelled_evictions
     497           12 :                 .values()
     498          108 :                 .map(|ctr| ctr.get())
     499           12 :                 .sum::<u64>()
     500           12 :         );
     501              : 
     502           12 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     503              : 
     504              :         // configure another failpoint for the second eviction -- evictions are per initialization,
     505              :         // so now that we've reinitialized the inner, we get to run two of them at the same time.
     506           12 :         let (completion2, barrier) = utils::completion::channel();
     507           12 :         let (arrival, arrived_at_barrier) = utils::completion::channel();
     508           12 :         layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     509           12 :             Some(arrival),
     510           12 :             barrier,
     511           12 :         ));
     512           12 : 
     513           12 :         let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
     514           12 : 
     515           12 :         // advance to the wait on the queue
     516           12 :         tokio::time::timeout(ADVANCE, &mut second_eviction)
     517           24 :             .await
     518           12 :             .expect_err("timeout because failpoint is blocking");
     519           12 : 
     520           12 :         arrived_at_barrier.wait().await;
     521              : 
     522           12 :         assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
     523              : 
     524           12 :         let mut release_earlier_eviction = |expected_reason| {
     525           12 :             assert_eq!(
     526           12 :                 0,
     527           12 :                 LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
     528           12 :             );
     529              : 
     530           12 :             drop(completion1.take().unwrap());
     531           12 : 
     532           12 :             let handle = &handle;
     533              : 
     534           12 :             async move {
     535           12 :                 tokio::time::sleep(ADVANCE).await;
     536           12 :                 SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
     537           12 :                     handle, 1,
     538           12 :                 )
     539           19 :                 .await;
     540              : 
     541           12 :                 assert_eq!(
     542           12 :                     1,
     543           12 :                     LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
     544           12 :                 );
     545           12 :             }
     546           12 :         };
     547              : 
     548           12 :         if in_order {
     549           15 :             release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
     550            6 :         }
     551              : 
     552              :         // release the later eviction which is for the current version
     553           12 :         drop(completion2);
     554           24 :         tokio::time::sleep(ADVANCE).await;
     555           12 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
     556           19 :             .await;
     557              : 
     558           12 :         if !in_order {
     559           16 :             release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
     560            6 :         }
     561              : 
     562           12 :         tokio::time::timeout(ADVANCE, &mut second_eviction)
     563            0 :             .await
     564           12 :             .expect("eviction goes through now that spawn_blocking is unclogged")
     565           12 :             .expect("eviction should succeed, because version matches");
     566           12 : 
     567           12 :         assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
     568              : 
     569              :         // ensure the cancelled are unchanged
     570           12 :         assert_eq!(
     571           12 :             1,
     572           12 :             LAYER_IMPL_METRICS
     573           12 :                 .cancelled_evictions
     574           12 :                 .values()
     575          108 :                 .map(|ctr| ctr.get())
     576           12 :                 .sum::<u64>()
     577           12 :         );
     578              : 
     579           12 :         assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     580           12 :     });
     581           12 : }
     582              : 
     583              : /// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
     584              : /// a `Layer::keep_resident` call.
     585              : ///
     586              : /// This matters because cancelling the eviction would leave us in a state where the file is on
     587              : /// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
     588              : /// have non-repairing `Layer::is_likely_resident`.
     589              : #[tokio::test(start_paused = true)]
     590            6 : async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
     591            6 :     let handle = tokio::runtime::Handle::current();
     592            6 :     let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
     593            6 :         .await
     594            6 :         .unwrap();
     595           24 :     let (tenant, ctx) = h.load().await;
     596            6 : 
     597            6 :     let timeline = tenant
     598            6 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     599           12 :         .await
     600            6 :         .unwrap();
     601            6 : 
     602            6 :     let layer = {
     603            6 :         let mut layers = {
     604            6 :             let layers = timeline.layers.read().await;
     605            6 :             layers.likely_resident_layers().cloned().collect::<Vec<_>>()
     606            6 :         };
     607            6 : 
     608            6 :         assert_eq!(layers.len(), 1);
     609            6 : 
     610            6 :         layers.swap_remove(0)
     611            6 :     };
     612            6 : 
     613            6 :     // this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
     614            6 :     // Err) at the right time as in "during" the `LayerInner::needs_download`.
     615            6 :     layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
     616            6 : 
     617            6 :     let (completion, barrier) = utils::completion::channel();
     618            6 :     let (arrival, arrived_at_barrier) = utils::completion::channel();
     619            6 : 
     620            6 :     layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     621            6 :         Some(arrival),
     622            6 :         barrier,
     623            6 :     ));
     624            6 : 
     625            6 :     tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
     626            6 :         .await
     627            6 :         .expect_err("should had advanced to waiting on channel");
     628            6 : 
     629            6 :     arrived_at_barrier.wait().await;
     630            6 : 
     631            6 :     // simulate a cancelled read which is cancelled before it gets to re-initialize
     632            6 :     let e = layer
     633            6 :         .0
     634            6 :         .get_or_maybe_download(false, None)
     635            6 :         .await
     636            6 :         .unwrap_err();
     637            6 :     assert!(
     638            6 :         matches!(
     639            6 :             e,
     640            6 :             DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
     641            6 :         ),
     642            6 :         "{e:?}"
     643            6 :     );
     644            6 : 
     645            6 :     assert!(
     646            6 :         layer.0.needs_download().await.unwrap().is_none(),
     647            6 :         "file is still on disk"
     648            6 :     );
     649            6 : 
     650            6 :     // release the eviction task
     651            6 :     drop(completion);
     652            6 :     tokio::time::sleep(ADVANCE).await;
     653          118 :     SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     654            6 : 
     655            6 :     // failpoint is still enabled, but it is not hit
     656            6 :     let e = layer
     657            6 :         .0
     658            6 :         .get_or_maybe_download(false, None)
     659           11 :         .await
     660            6 :         .unwrap_err();
     661            6 :     assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
     662            6 : 
     663            6 :     // failpoint is not counted as cancellation either
     664            6 :     assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     665            6 : }
     666              : 
     667              : #[tokio::test(start_paused = true)]
     668            6 : async fn evict_and_wait_does_not_wait_for_download() {
     669            6 :     // let handle = tokio::runtime::Handle::current();
     670            6 :     let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
     671            6 :         .await
     672            6 :         .unwrap();
     673           24 :     let (tenant, ctx) = h.load().await;
     674            6 :     let span = h.span();
     675            6 :     let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
     676            6 : 
     677            6 :     let timeline = tenant
     678            6 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     679           12 :         .await
     680            6 :         .unwrap();
     681            6 : 
     682            6 :     let layer = {
     683            6 :         let mut layers = {
     684            6 :             let layers = timeline.layers.read().await;
     685            6 :             layers.likely_resident_layers().cloned().collect::<Vec<_>>()
     686            6 :         };
     687            6 : 
     688            6 :         assert_eq!(layers.len(), 1);
     689            6 : 
     690            6 :         layers.swap_remove(0)
     691            6 :     };
     692            6 : 
     693            6 :     // kind of forced setup: start an eviction but do not allow it progress until we are
     694            6 :     // downloading
     695            6 :     let (eviction_can_continue, barrier) = utils::completion::channel();
     696            6 :     let (arrival, eviction_arrived) = utils::completion::channel();
     697            6 :     layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     698            6 :         Some(arrival),
     699            6 :         barrier,
     700            6 :     ));
     701            6 : 
     702            6 :     let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     703            6 : 
     704            6 :     // use this once-awaited other_evict to synchronize with the eviction
     705            6 :     let other_evict = layer.evict_and_wait(FOREVER);
     706            6 : 
     707            6 :     tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     708            6 :         .await
     709            6 :         .expect_err("should had advanced");
     710            6 :     eviction_arrived.wait().await;
     711            6 :     drop(eviction_can_continue);
     712            6 :     other_evict.await.unwrap();
     713            6 : 
     714            6 :     // now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
     715            6 :     assert!(!layer.is_likely_resident());
     716            6 : 
     717            6 :     // following new evict_and_wait will fail until we've completed the download
     718            6 :     let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
     719            6 :     assert!(matches!(e, EvictionError::NotFound), "{e:?}");
     720            6 : 
     721            6 :     let (download_can_continue, barrier) = utils::completion::channel();
     722            6 :     let (arrival, _download_arrived) = utils::completion::channel();
     723            6 :     layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
     724            6 : 
     725            6 :     let mut download = std::pin::pin!(layer
     726            6 :         .0
     727            6 :         .get_or_maybe_download(true, None)
     728            6 :         .instrument(download_span));
     729            6 : 
     730            6 :     assert!(
     731            6 :         !layer.is_likely_resident(),
     732            6 :         "during download layer is evicted"
     733            6 :     );
     734            6 : 
     735            6 :     tokio::time::timeout(ADVANCE, &mut download)
     736           12 :         .await
     737            6 :         .expect_err("should had timed out because of failpoint");
     738            6 : 
     739            6 :     // now we finally get to continue, and because the latest state is downloading, we deduce that
     740            6 :     // original eviction succeeded
     741            6 :     evict_and_wait.await.unwrap();
     742            6 : 
     743            6 :     // however a new evict_and_wait will fail
     744            6 :     let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
     745            6 :     assert!(matches!(e, EvictionError::NotFound), "{e:?}");
     746            6 : 
     747            6 :     assert!(!layer.is_likely_resident());
     748            6 : 
     749            6 :     drop(download_can_continue);
     750            6 :     download.await.expect("download should had succeeded");
     751            6 :     assert!(layer.is_likely_resident());
     752            6 : 
     753            6 :     // only now can we evict
     754            6 :     layer.evict_and_wait(FOREVER).await.unwrap();
     755            6 : }
     756              : 
     757              : /// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
     758              : /// which is the last value.
     759              : ///
     760              : /// Also checks that the same does not happen on a non-evicted layer (regression test).
     761              : #[tokio::test(start_paused = true)]
     762            6 : async fn eviction_cancellation_on_drop() {
     763            6 :     use crate::repository::Value;
     764            6 :     use bytes::Bytes;
     765            6 : 
     766            6 :     // this is the runtime on which Layer spawns the blocking tasks on
     767            6 :     let handle = tokio::runtime::Handle::current();
     768            6 : 
     769            6 :     let h = TenantHarness::create("eviction_cancellation_on_drop")
     770            6 :         .await
     771            6 :         .unwrap();
     772            6 :     utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
     773           24 :     let (tenant, ctx) = h.load().await;
     774            6 : 
     775            6 :     let timeline = tenant
     776            6 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     777           12 :         .await
     778            6 :         .unwrap();
     779            6 : 
     780            6 :     {
     781            6 :         // create_test_timeline wrote us one layer, write another
     782            6 :         let mut writer = timeline.writer().await;
     783            6 :         writer
     784            6 :             .put(
     785            6 :                 crate::repository::Key::from_i128(5),
     786            6 :                 Lsn(0x20),
     787            6 :                 &Value::Image(Bytes::from_static(b"this does not matter either")),
     788            6 :                 &ctx,
     789            6 :             )
     790            6 :             .await
     791            6 :             .unwrap();
     792            6 : 
     793            6 :         writer.finish_write(Lsn(0x20));
     794            6 :     }
     795            6 : 
     796            6 :     timeline.freeze_and_flush().await.unwrap();
     797            6 : 
     798            6 :     // wait for the upload to complete so our Arc::strong_count assertion holds
     799            6 :     timeline.remote_client.wait_completion().await.unwrap();
     800            6 : 
     801            6 :     let (evicted_layer, not_evicted) = {
     802            6 :         let mut layers = {
     803            6 :             let mut guard = timeline.layers.write().await;
     804            6 :             let layers = guard.likely_resident_layers().cloned().collect::<Vec<_>>();
     805            6 :             // remove the layers from layermap
     806            6 :             guard.open_mut().unwrap().finish_gc_timeline(&layers);
     807            6 : 
     808            6 :             layers
     809            6 :         };
     810            6 : 
     811            6 :         assert_eq!(layers.len(), 2);
     812            6 : 
     813            6 :         (layers.pop().unwrap(), layers.pop().unwrap())
     814            6 :     };
     815            6 : 
     816            6 :     let victims = [(evicted_layer, true), (not_evicted, false)];
     817            6 : 
     818           18 :     for (victim, evict) in victims {
     819           12 :         let resident = victim.keep_resident().await.unwrap();
     820           12 :         drop(victim);
     821           12 : 
     822           12 :         assert_eq!(Arc::strong_count(&resident.owner.0), 1);
     823            6 : 
     824           12 :         if evict {
     825            6 :             let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
     826            6 : 
     827            6 :             // drive the future to await on the status channel, and then drop it
     828            6 :             tokio::time::timeout(ADVANCE, evict_and_wait)
     829            6 :                 .await
     830            6 :                 .expect_err("should had been a timeout since we are holding the layer resident");
     831            6 :         }
     832            6 : 
     833            6 :         // 1 == we only evict one of the layers
     834           12 :         assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     835            6 : 
     836           12 :         drop(resident);
     837           12 : 
     838           12 :         // run any spawned
     839           16 :         tokio::time::sleep(ADVANCE).await;
     840            6 : 
     841           54 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     842            6 : 
     843           12 :         assert_eq!(
     844           12 :             1,
     845           12 :             LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
     846           12 :         );
     847            6 :     }
     848            6 : }
     849              : 
     850              : /// A test case to remind you the cost of these structures. You can bump the size limit
     851              : /// below if it is really necessary to add more fields to the structures.
     852              : #[test]
     853              : #[cfg(target_arch = "x86_64")]
     854            6 : fn layer_size() {
     855            6 :     assert_eq!(size_of::<LayerAccessStats>(), 8);
     856            6 :     assert_eq!(size_of::<PersistentLayerDesc>(), 104);
     857            6 :     assert_eq!(size_of::<LayerInner>(), 296);
     858              :     // it also has the utf8 path
     859            6 : }
     860              : 
     861              : struct SpawnBlockingPoolHelper {
     862              :     awaited_by_spawn_blocking_tasks: Completion,
     863              :     blocking_tasks: JoinSet<()>,
     864              : }
     865              : 
     866              : impl SpawnBlockingPoolHelper {
     867              :     /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
     868              :     /// release is called.
     869              :     ///
     870              :     /// In the tests this can be used to ensure something cannot be started on the target runtimes
     871              :     /// spawn_blocking pool.
     872              :     ///
     873              :     /// This should be no issue nowdays, because nextest runs each test in it's own process.
     874            6 :     async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
     875            6 :         let default_max_blocking_threads = 512;
     876            6 : 
     877            6 :         Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
     878            6 :     }
     879              : 
     880           78 :     async fn consume_all_spawn_blocking_threads0(
     881           78 :         handle: &tokio::runtime::Handle,
     882           78 :         threads: usize,
     883           78 :     ) -> Self {
     884           78 :         assert_ne!(threads, 0);
     885              : 
     886           78 :         let (completion, barrier) = completion::channel();
     887           78 :         let (started, starts_completed) = completion::channel();
     888           78 : 
     889           78 :         let mut blocking_tasks = JoinSet::new();
     890           78 : 
     891        21540 :         for _ in 0..threads {
     892        21540 :             let barrier = barrier.clone();
     893        21540 :             let started = started.clone();
     894        21540 :             blocking_tasks.spawn_blocking_on(
     895        21540 :                 move || {
     896        21540 :                     drop(started);
     897        21540 :                     tokio::runtime::Handle::current().block_on(barrier.wait());
     898        21540 :                 },
     899        21540 :                 handle,
     900        21540 :             );
     901        21540 :         }
     902              : 
     903           78 :         drop(started);
     904           78 : 
     905           78 :         starts_completed.wait().await;
     906              : 
     907           78 :         drop(barrier);
     908           78 : 
     909           78 :         tracing::trace!("consumed all threads");
     910              : 
     911           78 :         SpawnBlockingPoolHelper {
     912           78 :             awaited_by_spawn_blocking_tasks: completion,
     913           78 :             blocking_tasks,
     914           78 :         }
     915           78 :     }
     916              : 
     917              :     /// Release all previously blocked spawn_blocking threads
     918           78 :     async fn release(self) {
     919           78 :         let SpawnBlockingPoolHelper {
     920           78 :             awaited_by_spawn_blocking_tasks,
     921           78 :             mut blocking_tasks,
     922           78 :         } = self;
     923           78 : 
     924           78 :         drop(awaited_by_spawn_blocking_tasks);
     925              : 
     926        21618 :         while let Some(res) = blocking_tasks.join_next().await {
     927        21540 :             res.expect("none of the tasks should had panicked");
     928        21540 :         }
     929              : 
     930           78 :         tracing::trace!("released all threads");
     931           78 :     }
     932              : 
     933              :     /// In the tests it is used as an easy way of making sure something scheduled on the target
     934              :     /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
     935              :     /// before our tasks have a chance to schedule and complete.
     936           36 :     async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
     937          318 :         Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
     938           36 :     }
     939              : 
     940           66 :     async fn consume_and_release_all_of_spawn_blocking_threads0(
     941           66 :         handle: &tokio::runtime::Handle,
     942           66 :         threads: usize,
     943           66 :     ) {
     944           66 :         Self::consume_all_spawn_blocking_threads0(handle, threads)
     945           51 :             .await
     946           66 :             .release()
     947          312 :             .await
     948           66 :     }
     949              : }
     950              : 
     951              : #[test]
     952            6 : fn spawn_blocking_pool_helper_actually_works() {
     953            6 :     // create a custom runtime for which we know and control how many blocking threads it has
     954            6 :     //
     955            6 :     // because the amount is not configurable for our helper, expect the same amount as
     956            6 :     // BACKGROUND_RUNTIME using the tokio defaults would have.
     957            6 :     let rt = tokio::runtime::Builder::new_current_thread()
     958            6 :         .max_blocking_threads(1)
     959            6 :         .enable_all()
     960            6 :         .build()
     961            6 :         .unwrap();
     962            6 : 
     963            6 :     let handle = rt.handle();
     964            6 : 
     965            6 :     rt.block_on(async move {
     966              :         // this will not return until all threads are spun up and actually executing the code
     967              :         // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
     968            6 :         let consumed =
     969            6 :             SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
     970              : 
     971            6 :         println!("consumed");
     972            6 : 
     973            6 :         let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
     974            6 :             // this will not get to run before we release
     975            6 :         }));
     976            6 : 
     977            6 :         println!("spawned");
     978            6 : 
     979            6 :         tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
     980            6 :             .await
     981            6 :             .expect_err("the task should not have gotten to run yet");
     982            6 : 
     983            6 :         println!("tried to join");
     984            6 : 
     985            6 :         consumed.release().await;
     986              : 
     987            6 :         println!("released");
     988            6 : 
     989            6 :         tokio::time::timeout(std::time::Duration::from_secs(1), jh)
     990            0 :             .await
     991            6 :             .expect("no timeout")
     992            6 :             .expect("no join error");
     993            6 : 
     994            6 :         println!("joined");
     995            6 :     });
     996            6 : }
     997              : 
     998              : /// Drop the low bits from a time, to emulate the precision loss in LayerAccessStats
     999           24 : fn lowres_time(hires: SystemTime) -> SystemTime {
    1000           24 :     let ts = hires.duration_since(UNIX_EPOCH).unwrap().as_secs();
    1001           24 :     UNIX_EPOCH + Duration::from_secs(ts)
    1002           24 : }
    1003              : 
    1004              : #[test]
    1005            6 : fn access_stats() {
    1006            6 :     let access_stats = LayerAccessStats::default();
    1007            6 :     // Default is visible
    1008            6 :     assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
    1009              : 
    1010            6 :     access_stats.set_visibility(LayerVisibilityHint::Covered);
    1011            6 :     assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
    1012            6 :     access_stats.set_visibility(LayerVisibilityHint::Visible);
    1013            6 :     assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
    1014              : 
    1015            6 :     let rtime = UNIX_EPOCH + Duration::from_secs(2000000000);
    1016            6 :     access_stats.record_residence_event_at(rtime);
    1017            6 :     assert_eq!(access_stats.latest_activity(), lowres_time(rtime));
    1018              : 
    1019            6 :     let atime = UNIX_EPOCH + Duration::from_secs(2100000000);
    1020            6 :     access_stats.record_access_at(atime);
    1021            6 :     assert_eq!(access_stats.latest_activity(), lowres_time(atime));
    1022              : 
    1023              :     // Setting visibility doesn't clobber access time
    1024            6 :     access_stats.set_visibility(LayerVisibilityHint::Covered);
    1025            6 :     assert_eq!(access_stats.latest_activity(), lowres_time(atime));
    1026            6 :     access_stats.set_visibility(LayerVisibilityHint::Visible);
    1027            6 :     assert_eq!(access_stats.latest_activity(), lowres_time(atime));
    1028            6 : }
    1029              : 
    1030              : #[test]
    1031            6 : fn access_stats_2038() {
    1032            6 :     // The access stats structure uses a timestamp representation that will run out
    1033            6 :     // of bits in 2038.  One year before that, this unit test will start failing.
    1034            6 : 
    1035            6 :     let one_year_from_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
    1036            6 :         + Duration::from_secs(3600 * 24 * 365);
    1037            6 : 
    1038            6 :     assert!(one_year_from_now.as_secs() < (2 << 31));
    1039            6 : }
        

Generated by: LCOV version 2.1-beta