LCOV - code coverage report
Current view: top level - pageserver/src/tenant/storage_layer/layer - tests.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 100.0 % 220 220
Test Date: 2024-02-29 11:57:12 Functions: 100.0 % 13 13

            Line data    Source code
       1              : use futures::StreamExt;
       2              : use tokio::task::JoinSet;
       3              : use utils::{
       4              :     completion::{self, Completion},
       5              :     id::TimelineId,
       6              : };
       7              : 
       8              : use super::*;
       9              : use crate::task_mgr::BACKGROUND_RUNTIME;
      10              : use crate::tenant::harness::TenantHarness;
      11              : 
      12              : /// This test demonstrates a previous hang when a eviction and deletion were requested at the same
      13              : /// time. Now both of them complete per Arc drop semantics.
      14            2 : #[tokio::test(start_paused = true)]
      15            2 : async fn evict_and_wait_on_wanted_deleted() {
      16            2 :     // this is the runtime on which Layer spawns the blocking tasks on
      17            2 :     let handle = BACKGROUND_RUNTIME.handle();
      18            2 : 
      19            2 :     let h = TenantHarness::create("evict_and_wait_on_wanted_deleted").unwrap();
      20            2 :     utils::logging::replace_panic_hook_with_tracing_panic_hook().forget();
      21            2 :     let (tenant, ctx) = h.load().await;
      22            2 : 
      23            2 :     let timeline = tenant
      24            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
      25            6 :         .await
      26            2 :         .unwrap();
      27            2 : 
      28            2 :     let layer = {
      29            2 :         let mut layers = {
      30            2 :             let layers = timeline.layers.read().await;
      31            2 :             layers.resident_layers().collect::<Vec<_>>().await
      32            2 :         };
      33            2 : 
      34            2 :         assert_eq!(layers.len(), 1);
      35            2 : 
      36            2 :         layers.swap_remove(0)
      37            2 :     };
      38            2 : 
      39            2 :     // setup done
      40            2 : 
      41            2 :     let resident = layer.keep_resident().await.unwrap();
      42            2 : 
      43            2 :     {
      44            2 :         let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait());
      45            2 : 
      46            2 :         // drive the future to await on the status channel
      47            2 :         tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait)
      48            2 :             .await
      49            2 :             .expect_err("should had been a timeout since we are holding the layer resident");
      50            2 : 
      51            2 :         layer.delete_on_drop();
      52            2 : 
      53            2 :         drop(resident);
      54            2 : 
      55            2 :         // make sure the eviction task gets to run
      56           20 :         SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle).await;
      57            2 : 
      58            2 :         let resident = layer.keep_resident().await;
      59            2 :         assert!(
      60            2 :             matches!(resident, Ok(None)),
      61            2 :             "keep_resident should not have re-initialized: {resident:?}"
      62            2 :         );
      63            2 : 
      64            2 :         evict_and_wait
      65            2 :             .await
      66            2 :             .expect("evict_and_wait should had succeeded");
      67            2 : 
      68            2 :         // works as intended
      69            2 :     }
      70            2 : 
      71            2 :     // assert that once we remove the `layer` from the layer map and drop our reference,
      72            2 :     // the deletion of the layer in remote_storage happens.
      73            2 :     {
      74            2 :         let mut layers = timeline.layers.write().await;
      75            2 :         layers.finish_gc_timeline(&[layer]);
      76            2 :     }
      77            2 : 
      78           35 :     SpawnBlockingPoolHelper::consume_and_release_all_of_spawn_blocking_threads(handle).await;
      79            2 : 
      80            2 :     assert_eq!(1, LAYER_IMPL_METRICS.started_deletes.get());
      81            2 :     assert_eq!(1, LAYER_IMPL_METRICS.completed_deletes.get());
      82            2 :     assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
      83            2 :     assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
      84            2 : }
      85              : 
      86              : /// This test shows that ensures we are able to read the layer while the layer eviction has been
      87              : /// started but not completed due to spawn_blocking pool being blocked.
      88              : ///
      89              : /// Here `Layer::keep_resident` is used to "simulate" reads, because it cannot download.
      90            2 : #[tokio::test(start_paused = true)]
      91            2 : async fn residency_check_while_evict_and_wait_on_clogged_spawn_blocking() {
      92            2 :     // this is the runtime on which Layer spawns the blocking tasks on
      93            2 :     let handle = BACKGROUND_RUNTIME.handle();
      94            2 :     let h = TenantHarness::create("residency_check_while_evict_and_wait_on_clogged_spawn_blocking")
      95            2 :         .unwrap();
      96            2 :     let (tenant, ctx) = h.load().await;
      97            2 : 
      98            2 :     let timeline = tenant
      99            2 :         .create_test_timeline(TimelineId::generate(), Lsn(0x10), 14, &ctx)
     100            6 :         .await
     101            2 :         .unwrap();
     102            2 : 
     103            2 :     let layer = {
     104            2 :         let mut layers = {
     105            2 :             let layers = timeline.layers.read().await;
     106            2 :             layers.resident_layers().collect::<Vec<_>>().await
     107            2 :         };
     108            2 : 
     109            2 :         assert_eq!(layers.len(), 1);
     110            2 : 
     111            2 :         layers.swap_remove(0)
     112            2 :     };
     113            2 : 
     114            2 :     // setup done
     115            2 : 
     116            2 :     let resident = layer.keep_resident().await.unwrap();
     117            2 : 
     118            2 :     let mut evict_and_wait = std::pin::pin!(layer.evict_and_wait());
     119            2 : 
     120            2 :     // drive the future to await on the status channel
     121            2 :     tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait)
     122            2 :         .await
     123            2 :         .expect_err("should had been a timeout since we are holding the layer resident");
     124            2 :     assert_eq!(1, LAYER_IMPL_METRICS.started_evictions.get());
     125            2 : 
     126            2 :     // clog up BACKGROUND_RUNTIME spawn_blocking
     127           48 :     let helper = SpawnBlockingPoolHelper::consume_all_spawn_blocking_threads(handle).await;
     128            2 : 
     129            2 :     // now the eviction cannot proceed because the threads are consumed while completion exists
     130            2 :     drop(resident);
     131            2 : 
     132            2 :     // because no actual eviction happened, we get to just reinitialize the DownloadedLayer
     133            2 :     layer
     134            2 :         .keep_resident()
     135            4 :         .await
     136            2 :         .expect("keep_resident should had reinitialized without downloading")
     137            2 :         .expect("ResidentLayer");
     138            2 : 
     139            2 :     // because the keep_resident check alters wanted evicted without sending a message, we will
     140            2 :     // never get completed
     141            2 :     let e = tokio::time::timeout(std::time::Duration::from_secs(3600), &mut evict_and_wait)
     142            2 :         .await
     143            2 :         .expect("no timeout, because keep_resident re-initialized")
     144            2 :         .expect_err("eviction should not have succeeded because re-initialized");
     145            2 : 
     146            2 :     // works as intended: evictions lose to "downloads"
     147            2 :     assert!(matches!(e, EvictionError::Downloaded), "{e:?}");
     148            2 :     assert_eq!(0, LAYER_IMPL_METRICS.completed_evictions.get());
     149            2 : 
     150            2 :     // this is not wrong: the eviction is technically still "on the way" as it's still queued
     151            2 :     // because spawn_blocking is clogged up
     152            2 :     assert_eq!(
     153            2 :         0,
     154            2 :         LAYER_IMPL_METRICS
     155            2 :             .cancelled_evictions
     156            2 :             .values()
     157           16 :             .map(|ctr| ctr.get())
     158            2 :             .sum::<u64>()
     159            2 :     );
     160            2 : 
     161            2 :     let mut second_eviction = std::pin::pin!(layer.evict_and_wait());
     162            2 : 
     163            2 :     tokio::time::timeout(std::time::Duration::from_secs(3600), &mut second_eviction)
     164            2 :         .await
     165            2 :         .expect_err("timeout because spawn_blocking is clogged");
     166            2 : 
     167            2 :     // in this case we don't leak started evictions, but I think there is still a chance of that
     168            2 :     // happening, because we could have upgrades race multiple evictions while only one of them
     169            2 :     // happens?
     170            2 :     assert_eq!(2, LAYER_IMPL_METRICS.started_evictions.get());
     171            2 : 
     172            8 :     helper.release().await;
     173            2 : 
     174            2 :     tokio::time::timeout(std::time::Duration::from_secs(3600), &mut second_eviction)
     175            2 :         .await
     176            2 :         .expect("eviction goes through now that spawn_blocking is unclogged")
     177            2 :         .expect("eviction should succeed, because version matches");
     178            2 : 
     179            2 :     assert_eq!(1, LAYER_IMPL_METRICS.completed_evictions.get());
     180            2 : 
     181            2 :     // now we finally can observe the original spawn_blocking failing
     182            2 :     // it would had been possible to observe it earlier, but here it is guaranteed to have
     183            2 :     // happened.
     184            2 :     assert_eq!(
     185            2 :         1,
     186            2 :         LAYER_IMPL_METRICS
     187            2 :             .cancelled_evictions
     188            2 :             .values()
     189           16 :             .map(|ctr| ctr.get())
     190            2 :             .sum::<u64>()
     191            2 :     );
     192            2 : }
     193              : 
     194              : struct SpawnBlockingPoolHelper {
     195              :     awaited_by_spawn_blocking_tasks: Completion,
     196              :     blocking_tasks: JoinSet<()>,
     197              : }
     198              : 
     199              : impl SpawnBlockingPoolHelper {
     200              :     /// All `crate::task_mgr::BACKGROUND_RUNTIME` spawn_blocking threads will be consumed until
     201              :     /// release is called.
     202              :     ///
     203              :     /// In the tests this can be used to ensure something cannot be started on the target runtimes
     204              :     /// spawn_blocking pool.
     205              :     ///
     206              :     /// This should be no issue nowdays, because nextest runs each test in it's own process.
     207            6 :     async fn consume_all_spawn_blocking_threads(handle: &tokio::runtime::Handle) -> Self {
     208            6 :         let (completion, barrier) = completion::channel();
     209            6 :         let (tx, mut rx) = tokio::sync::mpsc::channel(8);
     210            6 : 
     211            6 :         let assumed_max_blocking_threads = 512;
     212            6 : 
     213            6 :         let mut blocking_tasks = JoinSet::new();
     214            6 : 
     215         3072 :         for _ in 0..assumed_max_blocking_threads {
     216         3072 :             let barrier = barrier.clone();
     217         3072 :             let tx = tx.clone();
     218         3072 :             blocking_tasks.spawn_blocking_on(
     219         3072 :                 move || {
     220         3072 :                     tx.blocking_send(()).unwrap();
     221         3072 :                     drop(tx);
     222         3072 :                     tokio::runtime::Handle::current().block_on(barrier.wait());
     223         3072 :                 },
     224         3072 :                 handle,
     225         3072 :             );
     226         3072 :         }
     227              : 
     228            6 :         drop(barrier);
     229            6 : 
     230            6 :         for _ in 0..assumed_max_blocking_threads {
     231         3072 :             rx.recv().await.unwrap();
     232              :         }
     233              : 
     234            6 :         SpawnBlockingPoolHelper {
     235            6 :             awaited_by_spawn_blocking_tasks: completion,
     236            6 :             blocking_tasks,
     237            6 :         }
     238            6 :     }
     239              : 
     240              :     /// Release all previously blocked spawn_blocking threads
     241            6 :     async fn release(self) {
     242            6 :         let SpawnBlockingPoolHelper {
     243            6 :             awaited_by_spawn_blocking_tasks,
     244            6 :             mut blocking_tasks,
     245            6 :         } = self;
     246            6 : 
     247            6 :         drop(awaited_by_spawn_blocking_tasks);
     248              : 
     249         3078 :         while let Some(res) = blocking_tasks.join_next().await {
     250         3072 :             res.expect("none of the tasks should had panicked");
     251         3072 :         }
     252            6 :     }
     253              : 
     254              :     /// In the tests it is used as an easy way of making sure something scheduled on the target
     255              :     /// runtimes `spawn_blocking` has completed, because it must've been scheduled and completed
     256              :     /// before our tasks have a chance to schedule and complete.
     257            4 :     async fn consume_and_release_all_of_spawn_blocking_threads(handle: &tokio::runtime::Handle) {
     258            4 :         Self::consume_all_spawn_blocking_threads(handle)
     259           39 :             .await
     260            4 :             .release()
     261           16 :             .await
     262            4 :     }
     263              : }
        

Generated by: LCOV version 2.1-beta