LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer/layer - tests.rs (source / functions) Coverage Total Hit
Test: 42f947419473a288706e86ecdf7c2863d760d5d7.info Lines: 99.4 % 853 848
Test Date: 2024-08-02 21:34:27 Functions: 100.0 % 44 44

            Line data    Source code
       1              : use std::time::UNIX_EPOCH;
       2              : 
       3              : use pageserver_api::key::CONTROLFILE_KEY;
       4              : use tokio::task::JoinSet;
       5              : use utils::{
       6              :     completion::{self, Completion},
       7              :     id::TimelineId,
       8              : };
       9              : 
      10              : use super::failpoints::{Failpoint, FailpointKind};
      11              : use super::*;
      12              : use crate::{context::DownloadBehavior, tenant::storage_layer::LayerVisibilityHint};
      13              : use crate::{task_mgr::TaskKind, tenant::harness::TenantHarness};
      14              : 
      15              : /// Used in tests to advance a future to wanted await point, and not futher.
      16              : const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
      17              : 
      18              : /// Used in tests to indicate forever long timeout; has to be longer than the amount of ADVANCE
      19              : /// timeout uses to advance futures.
      20              : const FOREVER: std::time::Duration = std::time::Duration::from_secs(ADVANCE.as_secs() * 24 * 7);
      21              : 
      22              : /// Demonstrate the API and resident -> evicted -> resident -> deleted transitions.
      23              : #[tokio::test]
      24            2 : async fn smoke_test() {
      25            2 :     let handle = tokio::runtime::Handle::current();
      26            2 : 
      27            2 :     let h = TenantHarness::create("smoke_test").await.unwrap();
      28            2 :     let span = h.span();
      29            2 :     let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
      30            8 :     let (tenant, _) = h.load().await;
      31            2 : 
      32            2 :     let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Download);
      33            2 : 
      34            2 :     let timeline = tenant
      35            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
      36            4 :         .await
      37            2 :         .unwrap();
      38            2 : 
      39            2 :     let layer = {
      40            2 :         let mut layers = {
      41            2 :             let layers = timeline.layers.read().await;
      42            2 :             layers.likely_resident_layers().collect::<Vec<_>>()
      43            2 :         };
      44            2 : 
      45            2 :         assert_eq!(layers.len(), 1);
      46            2 : 
      47            2 :         layers.swap_remove(0)
      48            2 :     };
      49            2 : 
      50            2 :     // all layers created at pageserver are like `layer`, initialized with strong
      51            2 :     // Arc<DownloadedLayer>.
      52            2 : 
      53            2 :     let img_before = {
      54            2 :         let mut data = ValueReconstructState::default();
      55            2 :         layer
      56            2 :             .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
      57            4 :             .await
      58            2 :             .unwrap();
      59            2 :         data.img
      60            2 :             .take()
      61            2 :             .expect("tenant harness writes the control file")
      62            2 :     };
      63            2 : 
      64            2 :     // important part is evicting the layer, which can be done when there are no more ResidentLayer
      65            2 :     // instances -- there currently are none, only two `Layer` values, one in the layermap and on
      66            2 :     // in scope.
      67            2 :     layer.evict_and_wait(FOREVER).await.unwrap();
      68            2 : 
      69            2 :     // double-evict returns an error, which is valid if both eviction_task and disk usage based
      70            2 :     // eviction would both evict the same layer at the same time.
      71            2 : 
      72            2 :     let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
      73            2 :     assert!(matches!(e, EvictionError::NotFound));
      74            2 : 
      75            2 :     // on accesses when the layer is evicted, it will automatically be downloaded.
      76            2 :     let img_after = {
      77            2 :         let mut data = ValueReconstructState::default();
      78            2 :         layer
      79            2 :             .get_value_reconstruct_data(CONTROLFILE_KEY, Lsn(0x10)..Lsn(0x11), &mut data, &ctx)
      80            2 :             .instrument(download_span.clone())
      81            9 :             .await
      82            2 :             .unwrap();
      83            2 :         data.img.take().unwrap()
      84            2 :     };
      85            2 : 
      86            2 :     assert_eq!(img_before, img_after);
      87            2 : 
      88            2 :     // evict_and_wait can timeout, but it doesn't cancel the evicting itself
      89            2 :     //
      90            2 :     // ZERO for timeout does not work reliably, so first take up all spawn_blocking slots to
      91            2 :     // artificially slow it down.
      92            2 :     let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(&handle).await;
      93            2 : 
      94            2 :     match layer
      95            2 :         .evict_and_wait(std::time::Duration::ZERO)
      96            2 :         .await
      97            2 :         .unwrap_err()
      98            2 :     {
      99            2 :         EvictionError::Timeout => {
     100            2 :             // expected, but note that the eviction is "still ongoing"
     101           51 :             helper.release().await;
     102            2 :             // exhaust spawn_blocking pool to ensure it is now complete
     103            2 :             SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle)
     104            8 :                 .await;
     105            2 :         }
     106            2 :         other => unreachable!("{other:?}"),
     107            2 :     }
     108            2 : 
     109            2 :     // only way to query if a layer is resident is to acquire a ResidentLayer instance.
     110            2 :     // Layer::keep_resident never downloads, but it might initialize if the layer file is found
     111            2 :     // downloaded locally.
     112            2 :     let none = layer.keep_resident().await;
     113            2 :     assert!(
     114            2 :         none.is_none(),
     115            2 :         "Expected none, because eviction removed the local file, found: {none:?}"
     116            2 :     );
     117            2 : 
     118            2 :     // plain downloading is rarely needed
     119            2 :     layer
     120            2 :         .download_and_keep_resident()
     121            2 :         .instrument(download_span)
     122            6 :         .await
     123            2 :         .unwrap();
     124            2 : 
     125            2 :     // last important part is deletion on drop: gc and compaction use it for compacted L0 layers
     126            2 :     // or fully garbage collected layers. deletion means deleting the local file, and scheduling a
     127            2 :     // deletion of the already unlinked from index_part.json remote file.
     128            2 :     //
     129            2 :     // marking a layer to be deleted on drop is irreversible; there is no technical reason against
     130            2 :     // reversiblity, but currently it is not needed so it is not provided.
     131            2 :     layer.delete_on_drop();
     132            2 : 
     133            2 :     let path = layer.local_path().to_owned();
     134            2 : 
     135            2 :     // wait_drop produces an unconnected to Layer future which will resolve when the
     136            2 :     // LayerInner::drop has completed.
     137            2 :     let mut wait_drop = std::pin::pin!(layer.wait_drop());
     138            2 : 
     139            2 :     // paused time doesn't really work well with timeouts and evict_and_wait, so delay pausing
     140            2 :     // until here
     141            2 :     tokio::time::pause();
     142            2 :     tokio::time::timeout(ADVANCE, &mut wait_drop)
     143            2 :         .await
     144            2 :         .expect_err("should had timed out because two strong references exist");
     145            2 : 
     146            2 :     tokio::fs::metadata(&path)
     147            2 :         .await
     148            2 :         .expect("the local layer file still exists");
     149            2 : 
     150            2 :     let rtc = &timeline.remote_client;
     151            2 : 
     152            2 :     {
     153            2 :         let layers = &[layer];
     154            2 :         let mut g = timeline.layers.write().await;
     155            2 :         g.finish_gc_timeline(layers);
     156            2 :         // this just updates the remote_physical_size for demonstration purposes
     157            2 :         rtc.schedule_gc_update(layers).unwrap();
     158            2 :     }
     159            2 : 
     160            2 :     // when strong references are dropped, the file is deleted and remote deletion is scheduled
     161            2 :     wait_drop.await;
     162            2 : 
     163            2 :     let e = tokio::fs::metadata(&path)
     164            2 :         .await
     165            2 :         .expect_err("the local file is deleted");
     166            2 :     assert_eq!(e.kind(), std::io::ErrorKind::NotFound);
     167            2 : 
     168            2 :     rtc.wait_completion().await.unwrap();
     169            2 : 
     170            2 :     assert_eq!(rtc.get_remote_physical_size(), 0);
     171            2 :     assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     172            2 : }
     173              : 
     174              : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
     175              : /// time. Now both of them complete per Arc drop semantics.
     176              : #[tokio::test(start_paused = true)]
     177            2 : async fn evict_and_wait_on_wanted_deleted() {
     178            2 :     // this is the runtime on which Layer spawns the blocking tasks on
     179            2 :     let handle = tokio::runtime::Handle::current();
     180            2 : 
     181            2 :     let h = TenantHarness::create("evict_and_wait_on_wanted_deleted")
     182            2 :         .await
     183            2 :         .unwrap();
     184            2 :     utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
     185            8 :     let (tenant, ctx) = h.load().await;
     186            2 : 
     187            2 :     let timeline = tenant
     188            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     189            4 :         .await
     190            2 :         .unwrap();
     191            2 : 
     192            2 :     let layer = {
     193            2 :         let mut layers = {
     194            2 :             let layers = timeline.layers.read().await;
     195            2 :             layers.likely_resident_layers().collect::<Vec<_>>()
     196            2 :         };
     197            2 : 
     198            2 :         assert_eq!(layers.len(), 1);
     199            2 : 
     200            2 :         layers.swap_remove(0)
     201            2 :     };
     202            2 : 
     203            2 :     // setup done
     204            2 : 
     205            2 :     let resident = layer.keep_resident().await.unwrap();
     206            2 : 
     207            2 :     {
     208            2 :         let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     209            2 : 
     210            2 :         // drive the future to await on the status channel
     211            2 :         tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     212            2 :             .await
     213            2 :             .expect_err("should had been a timeout since we are holding the layer resident");
     214            2 : 
     215            2 :         layer.delete_on_drop();
     216            2 : 
     217            2 :         drop(resident);
     218            2 : 
     219            2 :         // make sure the eviction task gets to run
     220            9 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     221            2 : 
     222            2 :         let resident = layer.keep_resident().await;
     223            2 :         assert!(
     224            2 :             resident.is_none(),
     225            2 :             "keep_resident should not have re-initialized: {resident:?}"
     226            2 :         );
     227            2 : 
     228            2 :         evict_and_wait
     229            2 :             .await
     230            2 :             .expect("evict_and_wait should had succeeded");
     231            2 : 
     232            2 :         // works as intended
     233            2 :     }
     234            2 : 
     235            2 :     // assert that once we remove the `layer` from the layer map and drop our reference,
     236            2 :     // the deletion of the layer in remote_storage happens.
     237            2 :     {
     238            2 :         let mut layers = timeline.layers.write().await;
     239            2 :         layers.finish_gc_timeline(&[layer]);
     240            2 :     }
     241            2 : 
     242            8 :     SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     243            2 : 
     244            2 :     assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
     245            2 :     assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
     246            2 :     assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     247            2 :     assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
     248            2 :     assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     249            2 : }
     250              : 
     251              : /// This test ensures we are able to read the layer while the layer eviction has been
     252              : /// started but not completed.
     253              : #[test]
     254            2 : fn read_wins_pending_eviction() {
     255            2 :     let rt = tokio::runtime::Builder::new_current_thread()
     256            2 :         .max_blocking_threads(1)
     257            2 :         .enable_all()
     258            2 :         .start_paused(true)
     259            2 :         .build()
     260            2 :         .unwrap();
     261            2 : 
     262            2 :     rt.block_on(async move {
     263            2 :         // this is the runtime on which Layer spawns the blocking tasks on
     264            2 :         let handle = tokio::runtime::Handle::current();
     265            2 :         let h = TenantHarness::create("read_wins_pending_eviction")
     266            0 :             .await
     267            2 :             .unwrap();
     268            8 :         let (tenant, ctx) = h.load().await;
     269            2 :         let span = h.span();
     270            2 :         let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
     271              : 
     272            2 :         let timeline = tenant
     273            2 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     274            4 :             .await
     275            2 :             .unwrap();
     276              : 
     277            2 :         let layer = {
     278            2 :             let mut layers = {
     279            2 :                 let layers = timeline.layers.read().await;
     280            2 :                 layers.likely_resident_layers().collect::<Vec<_>>()
     281            2 :             };
     282            2 : 
     283            2 :             assert_eq!(layers.len(), 1);
     284              : 
     285            2 :             layers.swap_remove(0)
     286              :         };
     287              : 
     288              :         // setup done
     289              : 
     290            2 :         let resident = layer.keep_resident().await.unwrap();
     291            2 : 
     292            2 :         let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     293            2 : 
     294            2 :         // drive the future to await on the status channel
     295            2 :         tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     296            2 :             .await
     297            2 :             .expect_err("should had been a timeout since we are holding the layer resident");
     298            2 :         assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     299              : 
     300            2 :         let (completion, barrier) = utils::completion::channel();
     301            2 :         let (arrival, arrived_at_barrier) = utils::completion::channel();
     302            2 :         layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     303            2 :             Some(arrival),
     304            2 :             barrier,
     305            2 :         ));
     306            2 : 
     307            2 :         // now the eviction cannot proceed because the threads are consumed while completion exists
     308            2 :         drop(resident);
     309            2 :         arrived_at_barrier.wait().await;
     310            2 :         assert!(!layer.is_likely_resident());
     311              : 
     312              :         // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
     313            2 :         layer
     314            2 :             .0
     315            2 :             .get_or_maybe_download(false, None)
     316            2 :             .instrument(download_span)
     317            2 :             .await
     318            2 :             .expect("should had reinitialized without downloading");
     319            2 : 
     320            2 :         assert!(layer.is_likely_resident());
     321              : 
     322              :         // reinitialization notifies of new resident status, which should error out all evict_and_wait
     323            2 :         let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     324            0 :             .await
     325            2 :             .expect("no timeout, because get_or_maybe_download re-initialized")
     326            2 :             .expect_err("eviction should not have succeeded because re-initialized");
     327            2 : 
     328            2 :         // works as intended: evictions lose to "downloads"
     329            2 :         assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
     330            2 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     331              : 
     332              :         // this is not wrong: the eviction is technically still "on the way" as it's still queued
     333              :         // because of a failpoint
     334            2 :         assert_eq!(
     335            2 :             0,
     336            2 :             LAYER_IMPL_METRICS
     337            2 :                 .cancelled_evictions
     338            2 :                 .values()
     339           18 :                 .map(|ctr| ctr.get())
     340            2 :                 .sum::<u64>()
     341            2 :         );
     342              : 
     343            2 :         drop(completion);
     344            2 : 
     345            4 :         tokio::time::sleep(ADVANCE).await;
     346            2 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
     347            2 :             .await;
     348              : 
     349            2 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     350              : 
     351              :         // now we finally can observe the original eviction failing
     352              :         // it would had been possible to observe it earlier, but here it is guaranteed to have
     353              :         // happened.
     354            2 :         assert_eq!(
     355            2 :             1,
     356            2 :             LAYER_IMPL_METRICS
     357            2 :                 .cancelled_evictions
     358            2 :                 .values()
     359           18 :                 .map(|ctr| ctr.get())
     360            2 :                 .sum::<u64>()
     361            2 :         );
     362              : 
     363            2 :         assert_eq!(
     364            2 :             1,
     365            2 :             LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::AlreadyReinitialized].get()
     366            2 :         );
     367              : 
     368            2 :         assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     369            2 :     });
     370            2 : }
     371              : 
     372              : /// Use failpoint to delay an eviction starting to get a VersionCheckFailed.
     373              : #[test]
     374            2 : fn multiple_pending_evictions_in_order() {
     375            2 :     let name = "multiple_pending_evictions_in_order";
     376            2 :     let in_order = true;
     377            2 :     multiple_pending_evictions_scenario(name, in_order);
     378            2 : }
     379              : 
     380              : /// Use failpoint to reorder later eviction before first to get a UnexpectedEvictedState.
     381              : #[test]
     382            2 : fn multiple_pending_evictions_out_of_order() {
     383            2 :     let name = "multiple_pending_evictions_out_of_order";
     384            2 :     let in_order = false;
     385            2 :     multiple_pending_evictions_scenario(name, in_order);
     386            2 : }
     387              : 
     388            4 : fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
     389            4 :     let rt = tokio::runtime::Builder::new_current_thread()
     390            4 :         .max_blocking_threads(1)
     391            4 :         .enable_all()
     392            4 :         .start_paused(true)
     393            4 :         .build()
     394            4 :         .unwrap();
     395            4 : 
     396            4 :     rt.block_on(async move {
     397            4 :         // this is the runtime on which Layer spawns the blocking tasks on
     398            4 :         let handle = tokio::runtime::Handle::current();
     399            4 :         let h = TenantHarness::create(name).await.unwrap();
     400           16 :         let (tenant, ctx) = h.load().await;
     401            4 :         let span = h.span();
     402            4 :         let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
     403              : 
     404            4 :         let timeline = tenant
     405            4 :             .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     406            8 :             .await
     407            4 :             .unwrap();
     408              : 
     409            4 :         let layer = {
     410            4 :             let mut layers = {
     411            4 :                 let layers = timeline.layers.read().await;
     412            4 :                 layers.likely_resident_layers().collect::<Vec<_>>()
     413            4 :             };
     414            4 : 
     415            4 :             assert_eq!(layers.len(), 1);
     416              : 
     417            4 :             layers.swap_remove(0)
     418              :         };
     419              : 
     420              :         // setup done
     421              : 
     422            4 :         let resident = layer.keep_resident().await.unwrap();
     423            4 : 
     424            4 :         let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     425            4 : 
     426            4 :         // drive the future to await on the status channel
     427            4 :         tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     428            4 :             .await
     429            4 :             .expect_err("should had been a timeout since we are holding the layer resident");
     430            4 :         assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     431              : 
     432            4 :         let (completion1, barrier) = utils::completion::channel();
     433            4 :         let mut completion1 = Some(completion1);
     434            4 :         let (arrival, arrived_at_barrier) = utils::completion::channel();
     435            4 :         layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     436            4 :             Some(arrival),
     437            4 :             barrier,
     438            4 :         ));
     439            4 : 
     440            4 :         // now the eviction cannot proceed because we are simulating arbitrary long delay for the
     441            4 :         // eviction task start.
     442            4 :         drop(resident);
     443            4 :         assert!(!layer.is_likely_resident());
     444              : 
     445            4 :         arrived_at_barrier.wait().await;
     446              : 
     447              :         // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
     448            4 :         layer
     449            4 :             .0
     450            4 :             .get_or_maybe_download(false, None)
     451            4 :             .instrument(download_span)
     452            4 :             .await
     453            4 :             .expect("should had reinitialized without downloading");
     454            4 : 
     455            4 :         assert!(layer.is_likely_resident());
     456              : 
     457              :         // reinitialization notifies of new resident status, which should error out all evict_and_wait
     458            4 :         let e = tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     459            0 :             .await
     460            4 :             .expect("no timeout, because get_or_maybe_download re-initialized")
     461            4 :             .expect_err("eviction should not have succeeded because re-initialized");
     462            4 : 
     463            4 :         // works as intended: evictions lose to "downloads"
     464            4 :         assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
     465            4 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     466              : 
     467              :         // this is not wrong: the eviction is technically still "on the way" as it's still queued
     468              :         // because of a failpoint
     469            4 :         assert_eq!(
     470            4 :             0,
     471            4 :             LAYER_IMPL_METRICS
     472            4 :                 .cancelled_evictions
     473            4 :                 .values()
     474           36 :                 .map(|ctr| ctr.get())
     475            4 :                 .sum::<u64>()
     476            4 :         );
     477              : 
     478            4 :         assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     479              : 
     480              :         // configure another failpoint for the second eviction -- evictions are per initialization,
     481              :         // so now that we've reinitialized the inner, we get to run two of them at the same time.
     482            4 :         let (completion2, barrier) = utils::completion::channel();
     483            4 :         let (arrival, arrived_at_barrier) = utils::completion::channel();
     484            4 :         layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     485            4 :             Some(arrival),
     486            4 :             barrier,
     487            4 :         ));
     488            4 : 
     489            4 :         let mut second_eviction = std::pin::pin!(layer.evict_and_wait(FOREVER));
     490            4 : 
     491            4 :         // advance to the wait on the queue
     492            4 :         tokio::time::timeout(ADVANCE, &mut second_eviction)
     493            8 :             .await
     494            4 :             .expect_err("timeout because failpoint is blocking");
     495            4 : 
     496            4 :         arrived_at_barrier.wait().await;
     497              : 
     498            4 :         assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
     499              : 
     500            4 :         let mut release_earlier_eviction = |expected_reason| {
     501            4 :             assert_eq!(
     502            4 :                 0,
     503            4 :                 LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
     504            4 :             );
     505              : 
     506            4 :             drop(completion1.take().unwrap());
     507            4 : 
     508            4 :             let handle = &handle;
     509              : 
     510            4 :             async move {
     511            4 :                 tokio::time::sleep(ADVANCE).await;
     512            4 :                 SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(
     513            4 :                     handle, 1,
     514            4 :                 )
     515            6 :                 .await;
     516              : 
     517            4 :                 assert_eq!(
     518            4 :                     1,
     519            4 :                     LAYER_IMPL_METRICS.cancelled_evictions[expected_reason].get(),
     520            4 :                 );
     521            4 :             }
     522            4 :         };
     523              : 
     524            4 :         if in_order {
     525            4 :             release_earlier_eviction(EvictionCancelled::VersionCheckFailed).await;
     526            2 :         }
     527              : 
     528              :         // release the later eviction which is for the current version
     529            4 :         drop(completion2);
     530            8 :         tokio::time::sleep(ADVANCE).await;
     531            4 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads0(&handle, 1)
     532            6 :             .await;
     533              : 
     534            4 :         if !in_order {
     535            6 :             release_earlier_eviction(EvictionCancelled::UnexpectedEvictedState).await;
     536            2 :         }
     537              : 
     538            4 :         tokio::time::timeout(ADVANCE, &mut second_eviction)
     539            0 :             .await
     540            4 :             .expect("eviction goes through now that spawn_blocking is unclogged")
     541            4 :             .expect("eviction should succeed, because version matches");
     542            4 : 
     543            4 :         assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
     544              : 
     545              :         // ensure the cancelled are unchanged
     546            4 :         assert_eq!(
     547            4 :             1,
     548            4 :             LAYER_IMPL_METRICS
     549            4 :                 .cancelled_evictions
     550            4 :                 .values()
     551           36 :                 .map(|ctr| ctr.get())
     552            4 :                 .sum::<u64>()
     553            4 :         );
     554              : 
     555            4 :         assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     556            4 :     });
     557            4 : }
     558              : 
     559              : /// The test ensures with a failpoint that a pending eviction is not cancelled by what is currently
     560              : /// a `Layer::keep_resident` call.
     561              : ///
     562              : /// This matters because cancelling the eviction would leave us in a state where the file is on
     563              : /// disk but the layer internal state says it has not been initialized. Futhermore, it allows us to
     564              : /// have non-repairing `Layer::is_likely_resident`.
     565              : #[tokio::test(start_paused = true)]
     566            2 : async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
     567            2 :     let handle = tokio::runtime::Handle::current();
     568            2 :     let h = TenantHarness::create("cancelled_get_or_maybe_download_does_not_cancel_eviction")
     569            2 :         .await
     570            2 :         .unwrap();
     571            8 :     let (tenant, ctx) = h.load().await;
     572            2 : 
     573            2 :     let timeline = tenant
     574            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     575            4 :         .await
     576            2 :         .unwrap();
     577            2 : 
     578            2 :     let layer = {
     579            2 :         let mut layers = {
     580            2 :             let layers = timeline.layers.read().await;
     581            2 :             layers.likely_resident_layers().collect::<Vec<_>>()
     582            2 :         };
     583            2 : 
     584            2 :         assert_eq!(layers.len(), 1);
     585            2 : 
     586            2 :         layers.swap_remove(0)
     587            2 :     };
     588            2 : 
     589            2 :     // this failpoint will simulate the `get_or_maybe_download` becoming cancelled (by returning an
     590            2 :     // Err) at the right time as in "during" the `LayerInner::needs_download`.
     591            2 :     layer.enable_failpoint(Failpoint::AfterDeterminingLayerNeedsNoDownload);
     592            2 : 
     593            2 :     let (completion, barrier) = utils::completion::channel();
     594            2 :     let (arrival, arrived_at_barrier) = utils::completion::channel();
     595            2 : 
     596            2 :     layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     597            2 :         Some(arrival),
     598            2 :         barrier,
     599            2 :     ));
     600            2 : 
     601            2 :     tokio::time::timeout(ADVANCE, layer.evict_and_wait(FOREVER))
     602            2 :         .await
     603            2 :         .expect_err("should had advanced to waiting on channel");
     604            2 : 
     605            2 :     arrived_at_barrier.wait().await;
     606            2 : 
     607            2 :     // simulate a cancelled read which is cancelled before it gets to re-initialize
     608            2 :     let e = layer
     609            2 :         .0
     610            2 :         .get_or_maybe_download(false, None)
     611            2 :         .await
     612            2 :         .unwrap_err();
     613            2 :     assert!(
     614            2 :         matches!(
     615            2 :             e,
     616            2 :             DownloadError::Failpoint(FailpointKind::AfterDeterminingLayerNeedsNoDownload)
     617            2 :         ),
     618            2 :         "{e:?}"
     619            2 :     );
     620            2 : 
     621            2 :     assert!(
     622            2 :         layer.0.needs_download().await.unwrap().is_none(),
     623            2 :         "file is still on disk"
     624            2 :     );
     625            2 : 
     626            2 :     // release the eviction task
     627            2 :     drop(completion);
     628            2 :     tokio::time::sleep(ADVANCE).await;
     629           10 :     SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     630            2 : 
     631            2 :     // failpoint is still enabled, but it is not hit
     632            2 :     let e = layer
     633            2 :         .0
     634            2 :         .get_or_maybe_download(false, None)
     635            4 :         .await
     636            2 :         .unwrap_err();
     637            2 :     assert!(matches!(e, DownloadError::DownloadRequired), "{e:?}");
     638            2 : 
     639            2 :     // failpoint is not counted as cancellation either
     640            2 :     assert_eq!(0, LAYER_IMPL_METRICS.inits_cancelled.get())
     641            2 : }
     642              : 
     643              : #[tokio::test(start_paused = true)]
     644            2 : async fn evict_and_wait_does_not_wait_for_download() {
     645            2 :     // let handle = tokio::runtime::Handle::current();
     646            2 :     let h = TenantHarness::create("evict_and_wait_does_not_wait_for_download")
     647            2 :         .await
     648            2 :         .unwrap();
     649            8 :     let (tenant, ctx) = h.load().await;
     650            2 :     let span = h.span();
     651            2 :     let download_span = span.in_scope(|| tracing::info_span!("downloading", timeline_id = 1));
     652            2 : 
     653            2 :     let timeline = tenant
     654            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     655            4 :         .await
     656            2 :         .unwrap();
     657            2 : 
     658            2 :     let layer = {
     659            2 :         let mut layers = {
     660            2 :             let layers = timeline.layers.read().await;
     661            2 :             layers.likely_resident_layers().collect::<Vec<_>>()
     662            2 :         };
     663            2 : 
     664            2 :         assert_eq!(layers.len(), 1);
     665            2 : 
     666            2 :         layers.swap_remove(0)
     667            2 :     };
     668            2 : 
     669            2 :     // kind of forced setup: start an eviction but do not allow it progress until we are
     670            2 :     // downloading
     671            2 :     let (eviction_can_continue, barrier) = utils::completion::channel();
     672            2 :     let (arrival, eviction_arrived) = utils::completion::channel();
     673            2 :     layer.enable_failpoint(Failpoint::WaitBeforeStartingEvicting(
     674            2 :         Some(arrival),
     675            2 :         barrier,
     676            2 :     ));
     677            2 : 
     678            2 :     let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait(FOREVER));
     679            2 : 
     680            2 :     // use this once-awaited other_evict to synchronize with the eviction
     681            2 :     let other_evict = layer.evict_and_wait(FOREVER);
     682            2 : 
     683            2 :     tokio::time::timeout(ADVANCE, &mut evict_and_wait)
     684            2 :         .await
     685            2 :         .expect_err("should had advanced");
     686            2 :     eviction_arrived.wait().await;
     687            2 :     drop(eviction_can_continue);
     688            2 :     other_evict.await.unwrap();
     689            2 : 
     690            2 :     // now the layer is evicted, and the "evict_and_wait" is waiting on the receiver
     691            2 :     assert!(!layer.is_likely_resident());
     692            2 : 
     693            2 :     // following new evict_and_wait will fail until we've completed the download
     694            2 :     let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
     695            2 :     assert!(matches!(e, EvictionError::NotFound), "{e:?}");
     696            2 : 
     697            2 :     let (download_can_continue, barrier) = utils::completion::channel();
     698            2 :     let (arrival, _download_arrived) = utils::completion::channel();
     699            2 :     layer.enable_failpoint(Failpoint::WaitBeforeDownloading(Some(arrival), barrier));
     700            2 : 
     701            2 :     let mut download = std::pin::pin!(layer
     702            2 :         .0
     703            2 :         .get_or_maybe_download(true, None)
     704            2 :         .instrument(download_span));
     705            2 : 
     706            2 :     assert!(
     707            2 :         !layer.is_likely_resident(),
     708            2 :         "during download layer is evicted"
     709            2 :     );
     710            2 : 
     711            2 :     tokio::time::timeout(ADVANCE, &mut download)
     712            5 :         .await
     713            2 :         .expect_err("should had timed out because of failpoint");
     714            2 : 
     715            2 :     // now we finally get to continue, and because the latest state is downloading, we deduce that
     716            2 :     // original eviction succeeded
     717            2 :     evict_and_wait.await.unwrap();
     718            2 : 
     719            2 :     // however a new evict_and_wait will fail
     720            2 :     let e = layer.evict_and_wait(FOREVER).await.unwrap_err();
     721            2 :     assert!(matches!(e, EvictionError::NotFound), "{e:?}");
     722            2 : 
     723            2 :     assert!(!layer.is_likely_resident());
     724            2 : 
     725            2 :     drop(download_can_continue);
     726            2 :     download.await.expect("download should had succeeded");
     727            2 :     assert!(layer.is_likely_resident());
     728            2 : 
     729            2 :     // only now can we evict
     730            2 :     layer.evict_and_wait(FOREVER).await.unwrap();
     731            2 : }
     732              : 
     733              : /// Asserts that there is no miscalculation when Layer is dropped while it is being kept resident,
     734              : /// which is the last value.
     735              : ///
     736              : /// Also checks that the same does not happen on a non-evicted layer (regression test).
     737              : #[tokio::test(start_paused = true)]
     738            2 : async fn eviction_cancellation_on_drop() {
     739            2 :     use crate::repository::Value;
     740            2 :     use bytes::Bytes;
     741            2 : 
     742            2 :     // this is the runtime on which Layer spawns the blocking tasks on
     743            2 :     let handle = tokio::runtime::Handle::current();
     744            2 : 
     745            2 :     let h = TenantHarness::create("eviction_cancellation_on_drop")
     746            2 :         .await
     747            2 :         .unwrap();
     748            2 :     utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
     749            8 :     let (tenant, ctx) = h.load().await;
     750            2 : 
     751            2 :     let timeline = tenant
     752            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     753            4 :         .await
     754            2 :         .unwrap();
     755            2 : 
     756            2 :     {
     757            2 :         // create_test_timeline wrote us one layer, write another
     758            2 :         let mut writer = timeline.writer().await;
     759            2 :         writer
     760            2 :             .put(
     761            2 :                 Key::from_i128(5),
     762            2 :                 Lsn(0x20),
     763            2 :                 &Value::Image(Bytes::from_static(b"this does not matter either")),
     764            2 :                 &ctx,
     765            2 :             )
     766            2 :             .await
     767            2 :             .unwrap();
     768            2 : 
     769            2 :         writer.finish_write(Lsn(0x20));
     770            2 :     }
     771            2 : 
     772            2 :     timeline.freeze_and_flush().await.unwrap();
     773            2 : 
     774            2 :     // wait for the upload to complete so our Arc::strong_count assertion holds
     775            2 :     timeline.remote_client.wait_completion().await.unwrap();
     776            2 : 
     777            2 :     let (evicted_layer, not_evicted) = {
     778            2 :         let mut layers = {
     779            2 :             let mut guard = timeline.layers.write().await;
     780            2 :             let layers = guard.likely_resident_layers().collect::<Vec<_>>();
     781            2 :             // remove the layers from layermap
     782            2 :             guard.finish_gc_timeline(&layers);
     783            2 : 
     784            2 :             layers
     785            2 :         };
     786            2 : 
     787            2 :         assert_eq!(layers.len(), 2);
     788            2 : 
     789            2 :         (layers.pop().unwrap(), layers.pop().unwrap())
     790            2 :     };
     791            2 : 
     792            2 :     let victims = [(evicted_layer, true), (not_evicted, false)];
     793            2 : 
     794            6 :     for (victim, evict) in victims {
     795            4 :         let resident = victim.keep_resident().await.unwrap();
     796            4 :         drop(victim);
     797            4 : 
     798            4 :         assert_eq!(Arc::strong_count(&resident.owner.0), 1);
     799            2 : 
     800            4 :         if evict {
     801            2 :             let evict_and_wait = resident.owner.evict_and_wait(FOREVER);
     802            2 : 
     803            2 :             // drive the future to await on the status channel, and then drop it
     804            2 :             tokio::time::timeout(ADVANCE, evict_and_wait)
     805            2 :                 .await
     806            2 :                 .expect_err("should had been a timeout since we are holding the layer resident");
     807            2 :         }
     808            2 : 
     809            2 :         // 1 == we only evict one of the layers
     810            4 :         assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     811            2 : 
     812            4 :         drop(resident);
     813            4 : 
     814            4 :         // run any spawned
     815            5 :         tokio::time::sleep(ADVANCE).await;
     816            2 : 
     817           17 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(&handle).await;
     818            2 : 
     819            4 :         assert_eq!(
     820            4 :             1,
     821            4 :             LAYER_IMPL_METRICS.cancelled_evictions[EvictionCancelled::LayerGone].get()
     822            4 :         );
     823            2 :     }
     824            2 : }
     825              : 
     826              : /// A test case to remind you the cost of these structures. You can bump the size limit
     827              : /// below if it is really necessary to add more fields to the structures.
     828              : #[test]
     829              : #[cfg(target_arch = "x86_64")]
     830            2 : fn layer_size() {
     831            2 :     assert_eq!(size_of::<LayerAccessStats>(), 8);
     832            2 :     assert_eq!(size_of::<PersistentLayerDesc>(), 104);
     833            2 :     assert_eq!(size_of::<LayerInner>(), 312);
     834              :     // it also has the utf8 path
     835            2 : }
     836              : 
     837              : struct SpawnBlockingPoolHelper {
     838              :     awaited_by_spawn_blocking_tasks: Completion,
     839              :     blocking_tasks: JoinSet<()>,
     840              : }
     841              : 
     842              : impl SpawnBlockingPoolHelper {
     843              :     /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
     844              :     /// release is called.
     845              :     ///
     846              :     /// In the tests this can be used to ensure something cannot be started on the target runtimes
     847              :     /// spawn_blocking pool.
     848              :     ///
     849              :     /// This should be no issue nowdays, because nextest runs each test in it's own process.
     850            2 :     async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
     851            2 :         let default_max_blocking_threads = 512;
     852            2 : 
     853            2 :         Self::consume_all_spawn_blocking_threads0(handle, default_max_blocking_threads).await
     854            2 :     }
     855              : 
     856           26 :     async fn consume_all_spawn_blocking_threads0(
     857           26 :         handle: &tokio::runtime::Handle,
     858           26 :         threads: usize,
     859           26 :     ) -> Self {
     860           26 :         assert_ne!(threads, 0);
     861              : 
     862           26 :         let (completion, barrier) = completion::channel();
     863           26 :         let (started, starts_completed) = completion::channel();
     864           26 : 
     865           26 :         let mut blocking_tasks = JoinSet::new();
     866           26 : 
     867         7180 :         for _ in 0..threads {
     868         7180 :             let barrier = barrier.clone();
     869         7180 :             let started = started.clone();
     870         7180 :             blocking_tasks.spawn_blocking_on(
     871         7180 :                 move || {
     872         7180 :                     drop(started);
     873         7180 :                     tokio::runtime::Handle::current().block_on(barrier.wait());
     874         7180 :                 },
     875         7180 :                 handle,
     876         7180 :             );
     877         7180 :         }
     878              : 
     879           26 :         drop(started);
     880           26 : 
     881           26 :         starts_completed.wait().await;
     882              : 
     883           26 :         drop(barrier);
     884           26 : 
     885           26 :         tracing::trace!("consumed all threads");
     886              : 
     887           26 :         SpawnBlockingPoolHelper {
     888           26 :             awaited_by_spawn_blocking_tasks: completion,
     889           26 :             blocking_tasks,
     890           26 :         }
     891           26 :     }
     892              : 
     893              :     /// Release all previously blocked spawn_blocking threads
     894           26 :     async fn release(self) {
     895           26 :         let SpawnBlockingPoolHelper {
     896           26 :             awaited_by_spawn_blocking_tasks,
     897           26 :             mut blocking_tasks,
     898           26 :         } = self;
     899           26 : 
     900           26 :         drop(awaited_by_spawn_blocking_tasks);
     901              : 
     902         7206 :         while let Some(res) = blocking_tasks.join_next().await {
     903         7180 :             res.expect("none of the tasks should had panicked");
     904         7180 :         }
     905              : 
     906           26 :         tracing::trace!("released all threads");
     907           26 :     }
     908              : 
     909              :     /// In the tests it is used as an easy way of making sure something scheduled on the target
     910              :     /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
     911              :     /// before our tasks have a chance to schedule and complete.
     912           12 :     async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
     913           52 :         Self::consume_and_release_all_of_spawn_blocking_threads0(handle, 512).await
     914           12 :     }
     915              : 
     916           22 :     async fn consume_and_release_all_of_spawn_blocking_threads0(
     917           22 :         handle: &tokio::runtime::Handle,
     918           22 :         threads: usize,
     919           22 :     ) {
     920           22 :         Self::consume_all_spawn_blocking_threads0(handle, threads)
     921           16 :             .await
     922           22 :             .release()
     923           50 :             .await
     924           22 :     }
     925              : }
     926              : 
     927              : #[test]
     928            2 : fn spawn_blocking_pool_helper_actually_works() {
     929            2 :     // create a custom runtime for which we know and control how many blocking threads it has
     930            2 :     //
     931            2 :     // because the amount is not configurable for our helper, expect the same amount as
     932            2 :     // BACKGROUND_RUNTIME using the tokio defaults would have.
     933            2 :     let rt = tokio::runtime::Builder::new_current_thread()
     934            2 :         .max_blocking_threads(1)
     935            2 :         .enable_all()
     936            2 :         .build()
     937            2 :         .unwrap();
     938            2 : 
     939            2 :     let handle = rt.handle();
     940            2 : 
     941            2 :     rt.block_on(async move {
     942              :         // this will not return until all threads are spun up and actually executing the code
     943              :         // waiting on `consumed` to be `SpawnBlockingPoolHelper::release`'d.
     944            2 :         let consumed =
     945            2 :             SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads0(handle, 1).await;
     946              : 
     947            2 :         println!("consumed");
     948            2 : 
     949            2 :         let mut jh = std::pin::pin!(tokio::task::spawn_blocking(move || {
     950            2 :             // this will not get to run before we release
     951            2 :         }));
     952            2 : 
     953            2 :         println!("spawned");
     954            2 : 
     955            2 :         tokio::time::timeout(std::time::Duration::from_secs(1), &mut jh)
     956            2 :             .await
     957            2 :             .expect_err("the task should not have gotten to run yet");
     958            2 : 
     959            2 :         println!("tried to join");
     960            2 : 
     961            2 :         consumed.release().await;
     962              : 
     963            2 :         println!("released");
     964            2 : 
     965            2 :         tokio::time::timeout(std::time::Duration::from_secs(1), jh)
     966            0 :             .await
     967            2 :             .expect("no timeout")
     968            2 :             .expect("no join error");
     969            2 : 
     970            2 :         println!("joined");
     971            2 :     });
     972            2 : }
     973              : 
     974              : /// Drop the low bits from a time, to emulate the precision loss in LayerAccessStats
     975            8 : fn lowres_time(hires: SystemTime) -> SystemTime {
     976            8 :     let ts = hires.duration_since(UNIX_EPOCH).unwrap().as_secs();
     977            8 :     UNIX_EPOCH + Duration::from_secs(ts)
     978            8 : }
     979              : 
     980              : #[test]
     981            2 : fn access_stats() {
     982            2 :     let access_stats = LayerAccessStats::default();
     983            2 :     // Default is visible
     984            2 :     assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
     985              : 
     986            2 :     access_stats.set_visibility(LayerVisibilityHint::Covered);
     987            2 :     assert_eq!(access_stats.visibility(), LayerVisibilityHint::Covered);
     988            2 :     access_stats.set_visibility(LayerVisibilityHint::Visible);
     989            2 :     assert_eq!(access_stats.visibility(), LayerVisibilityHint::Visible);
     990              : 
     991            2 :     let rtime = UNIX_EPOCH + Duration::from_secs(2000000000);
     992            2 :     access_stats.record_residence_event_at(rtime);
     993            2 :     assert_eq!(access_stats.latest_activity(), lowres_time(rtime));
     994              : 
     995            2 :     let atime = UNIX_EPOCH + Duration::from_secs(2100000000);
     996            2 :     access_stats.record_access_at(atime);
     997            2 :     assert_eq!(access_stats.latest_activity(), lowres_time(atime));
     998              : 
     999              :     // Setting visibility doesn't clobber access time
    1000            2 :     access_stats.set_visibility(LayerVisibilityHint::Covered);
    1001            2 :     assert_eq!(access_stats.latest_activity(), lowres_time(atime));
    1002            2 :     access_stats.set_visibility(LayerVisibilityHint::Visible);
    1003            2 :     assert_eq!(access_stats.latest_activity(), lowres_time(atime));
    1004            2 : }
    1005              : 
    1006              : #[test]
    1007            2 : fn access_stats_2038() {
    1008            2 :     // The access stats structure uses a timestamp representation that will run out
    1009            2 :     // of bits in 2038.  One year before that, this unit test will start failing.
    1010            2 : 
    1011            2 :     let one_year_from_now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
    1012            2 :         + Duration::from_secs(3600 * 24 * 365);
    1013            2 : 
    1014            2 :     assert!(one_year_from_now.as_secs() < (2 << 31));
    1015            2 : }
        

Generated by: LCOV version 2.1-beta