LCOV - code coverage report
Current view: top level - storage_controller/src/service - chaos_injector.rs (source / functions) Coverage Total Hit
Test: 49aa928ec5b4b510172d8b5c6d154da28e70a46c.info Lines: 0.0 % 45 0
Test Date: 2024-11-13 18:23:39 Functions: 0.0 % 5 0

            Line data    Source code
       1              : use std::{sync::Arc, time::Duration};
       2              : 
       3              : use pageserver_api::controller_api::ShardSchedulingPolicy;
       4              : use rand::seq::SliceRandom;
       5              : use rand::thread_rng;
       6              : use tokio_util::sync::CancellationToken;
       7              : 
       8              : use super::Service;
       9              : 
      10              : pub struct ChaosInjector {
      11              :     service: Arc<Service>,
      12              :     interval: Duration,
      13              : }
      14              : 
      15              : impl ChaosInjector {
      16            0 :     pub fn new(service: Arc<Service>, interval: Duration) -> Self {
      17            0 :         Self { service, interval }
      18            0 :     }
      19              : 
      20            0 :     pub async fn run(&mut self, cancel: CancellationToken) {
      21            0 :         let mut interval = tokio::time::interval(self.interval);
      22              : 
      23              :         loop {
      24            0 :             tokio::select! {
      25            0 :                 _ = interval.tick() => {}
      26            0 :                 _ = cancel.cancelled() => {
      27            0 :                     tracing::info!("Shutting down");
      28            0 :                     return;
      29              :                 }
      30              :             }
      31              : 
      32            0 :             self.inject_chaos().await;
      33              : 
      34            0 :             tracing::info!("Chaos iteration...");
      35              :         }
      36            0 :     }
      37              : 
      38            0 :     async fn inject_chaos(&mut self) {
      39            0 :         // Pick some shards to interfere with
      40            0 :         let batch_size = 128;
      41            0 :         let mut inner = self.service.inner.write().unwrap();
      42            0 :         let (nodes, tenants, scheduler) = inner.parts_mut();
      43            0 :         let tenant_ids = tenants.keys().cloned().collect::<Vec<_>>();
      44            0 :         let victims = tenant_ids.choose_multiple(&mut thread_rng(), batch_size);
      45              : 
      46            0 :         for victim in victims {
      47            0 :             let shard = tenants
      48            0 :                 .get_mut(victim)
      49            0 :                 .expect("Held lock between choosing ID and this get");
      50              : 
      51            0 :             if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
      52              :                 // Skip non-active scheduling policies, so that a shard with a policy like Pause can
      53              :                 // be pinned without being disrupted by us.
      54            0 :                 tracing::info!(
      55            0 :                     "Skipping shard {victim}: scheduling policy is {:?}",
      56            0 :                     shard.get_scheduling_policy()
      57              :                 );
      58            0 :                 continue;
      59            0 :             }
      60              : 
      61              :             // Pick a secondary to promote
      62            0 :             let Some(new_location) = shard
      63            0 :                 .intent
      64            0 :                 .get_secondary()
      65            0 :                 .choose(&mut thread_rng())
      66            0 :                 .cloned()
      67              :             else {
      68            0 :                 tracing::info!("Skipping shard {victim}: no secondary location, can't migrate");
      69            0 :                 continue;
      70              :             };
      71              : 
      72            0 :             let Some(old_location) = *shard.intent.get_attached() else {
      73            0 :                 tracing::info!("Skipping shard {victim}: currently has no attached location");
      74            0 :                 continue;
      75              :             };
      76              : 
      77            0 :             tracing::info!("Injecting chaos: migrate {victim} {old_location}->{new_location}");
      78              : 
      79            0 :             shard.intent.demote_attached(scheduler, old_location);
      80            0 :             shard.intent.promote_attached(scheduler, new_location);
      81            0 :             self.service.maybe_reconcile_shard(shard, nodes);
      82              :         }
      83            0 :     }
      84              : }
        

Generated by: LCOV version 2.1-beta