LCOV - code coverage report
Current view: top level - storage_controller/src/service - chaos_injector.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 0.0 % 127 0
Test Date: 2025-03-12 00:01:28 Functions: 0.0 % 16 0

            Line data    Source code
       1              : use std::collections::{BTreeMap, HashMap};
       2              : use std::sync::Arc;
       3              : use std::time::Duration;
       4              : 
       5              : use pageserver_api::controller_api::ShardSchedulingPolicy;
       6              : use rand::seq::SliceRandom;
       7              : use rand::thread_rng;
       8              : use tokio_util::sync::CancellationToken;
       9              : use utils::id::NodeId;
      10              : use utils::shard::TenantShardId;
      11              : 
      12              : use super::{Node, Scheduler, Service, TenantShard};
      13              : 
      14              : pub struct ChaosInjector {
      15              :     service: Arc<Service>,
      16              :     interval: Duration,
      17              :     chaos_exit_crontab: Option<cron::Schedule>,
      18              : }
      19              : 
      20            0 : fn cron_to_next_duration(cron: &cron::Schedule) -> anyhow::Result<tokio::time::Sleep> {
      21              :     use chrono::Utc;
      22            0 :     let next = cron.upcoming(Utc).next().unwrap();
      23            0 :     let duration = (next - Utc::now()).to_std()?;
      24            0 :     Ok(tokio::time::sleep(duration))
      25            0 : }
      26              : 
      27            0 : async fn maybe_sleep(sleep: Option<tokio::time::Sleep>) -> Option<()> {
      28            0 :     if let Some(sleep) = sleep {
      29            0 :         sleep.await;
      30            0 :         Some(())
      31              :     } else {
      32            0 :         None
      33              :     }
      34            0 : }
      35              : 
      36              : impl ChaosInjector {
      37            0 :     pub fn new(
      38            0 :         service: Arc<Service>,
      39            0 :         interval: Duration,
      40            0 :         chaos_exit_crontab: Option<cron::Schedule>,
      41            0 :     ) -> Self {
      42            0 :         Self {
      43            0 :             service,
      44            0 :             interval,
      45            0 :             chaos_exit_crontab,
      46            0 :         }
      47            0 :     }
      48              : 
      49            0 :     fn get_cron_interval_sleep_future(&self) -> Option<tokio::time::Sleep> {
      50            0 :         if let Some(ref chaos_exit_crontab) = self.chaos_exit_crontab {
      51            0 :             match cron_to_next_duration(chaos_exit_crontab) {
      52            0 :                 Ok(interval_exit) => Some(interval_exit),
      53            0 :                 Err(e) => {
      54            0 :                     tracing::error!("Error processing the cron schedule: {e}");
      55            0 :                     None
      56              :                 }
      57              :             }
      58              :         } else {
      59            0 :             None
      60              :         }
      61            0 :     }
      62              : 
      63            0 :     pub async fn run(&mut self, cancel: CancellationToken) {
      64            0 :         let mut interval = tokio::time::interval(self.interval);
      65              :         #[derive(Debug)]
      66              :         enum ChaosEvent {
      67              :             ShuffleTenant,
      68              :             ForceKill,
      69              :         }
      70              :         loop {
      71            0 :             let cron_interval = self.get_cron_interval_sleep_future();
      72            0 :             let chaos_type = tokio::select! {
      73            0 :                 _ = interval.tick() => {
      74            0 :                     ChaosEvent::ShuffleTenant
      75              :                 }
      76            0 :                 Some(_) = maybe_sleep(cron_interval) => {
      77            0 :                     ChaosEvent::ForceKill
      78              :                 }
      79            0 :                 _ = cancel.cancelled() => {
      80            0 :                     tracing::info!("Shutting down");
      81            0 :                     return;
      82              :                 }
      83              :             };
      84            0 :             tracing::info!("Chaos iteration: {chaos_type:?}...");
      85            0 :             match chaos_type {
      86              :                 ChaosEvent::ShuffleTenant => {
      87            0 :                     self.inject_chaos().await;
      88              :                 }
      89              :                 ChaosEvent::ForceKill => {
      90            0 :                     self.force_kill().await;
      91              :                 }
      92              :             }
      93              :         }
      94            0 :     }
      95              : 
      96              :     /// If a shard has a secondary and attached location, then re-assign the secondary to be
      97              :     /// attached and the attached to be secondary.
      98              :     ///
      99              :     /// Only modifies tenants if they're in Active scheduling policy.
     100            0 :     fn maybe_migrate_to_secondary(
     101            0 :         &self,
     102            0 :         tenant_shard_id: TenantShardId,
     103            0 :         nodes: &Arc<HashMap<NodeId, Node>>,
     104            0 :         tenants: &mut BTreeMap<TenantShardId, TenantShard>,
     105            0 :         scheduler: &mut Scheduler,
     106            0 :     ) {
     107            0 :         let shard = tenants
     108            0 :             .get_mut(&tenant_shard_id)
     109            0 :             .expect("Held lock between choosing ID and this get");
     110              : 
     111            0 :         if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
     112              :             // Skip non-active scheduling policies, so that a shard with a policy like Pause can
     113              :             // be pinned without being disrupted by us.
     114            0 :             tracing::info!(
     115            0 :                 "Skipping shard {tenant_shard_id}: scheduling policy is {:?}",
     116            0 :                 shard.get_scheduling_policy()
     117              :             );
     118            0 :             return;
     119            0 :         }
     120              : 
     121              :         // Pick a secondary to promote
     122            0 :         let Some(new_location) = shard
     123            0 :             .intent
     124            0 :             .get_secondary()
     125            0 :             .choose(&mut thread_rng())
     126            0 :             .cloned()
     127              :         else {
     128            0 :             tracing::info!(
     129            0 :                 "Skipping shard {tenant_shard_id}: no secondary location, can't migrate"
     130              :             );
     131            0 :             return;
     132              :         };
     133              : 
     134            0 :         let Some(old_location) = *shard.intent.get_attached() else {
     135            0 :             tracing::info!("Skipping shard {tenant_shard_id}: currently has no attached location");
     136            0 :             return;
     137              :         };
     138              : 
     139            0 :         tracing::info!("Injecting chaos: migrate {tenant_shard_id} {old_location}->{new_location}");
     140              : 
     141            0 :         shard.intent.demote_attached(scheduler, old_location);
     142            0 :         shard.intent.promote_attached(scheduler, new_location);
     143            0 :         self.service.maybe_reconcile_shard(
     144            0 :             shard,
     145            0 :             nodes,
     146            0 :             crate::reconciler::ReconcilerPriority::Normal,
     147            0 :         );
     148            0 :     }
     149              : 
     150            0 :     async fn force_kill(&mut self) {
     151            0 :         tracing::warn!("Injecting chaos: force kill");
     152            0 :         std::process::exit(1);
     153              :     }
     154              : 
     155            0 :     async fn inject_chaos(&mut self) {
     156            0 :         // Pick some shards to interfere with
     157            0 :         let batch_size = 128;
     158            0 :         let mut inner = self.service.inner.write().unwrap();
     159            0 :         let (nodes, tenants, scheduler) = inner.parts_mut();
     160            0 : 
     161            0 :         // Prefer to migrate tenants that are currently outside their home AZ.  This avoids the chaos injector
     162            0 :         // continuously pushing tenants outside their home AZ: instead, we'll tend to cycle between picking some
     163            0 :         // random tenants to move, and then on next chaos iteration moving them back, then picking some new
     164            0 :         // random tenants on the next iteration.
     165            0 :         let (out_of_home_az, in_home_az): (Vec<_>, Vec<_>) = tenants
     166            0 :             .values()
     167            0 :             .map(|shard| {
     168            0 :                 (
     169            0 :                     shard.tenant_shard_id,
     170            0 :                     shard.is_attached_outside_preferred_az(nodes),
     171            0 :                 )
     172            0 :             })
     173            0 :             .partition(|(_id, is_outside)| *is_outside);
     174            0 : 
     175            0 :         let mut out_of_home_az: Vec<_> = out_of_home_az.into_iter().map(|(id, _)| id).collect();
     176            0 :         let mut in_home_az: Vec<_> = in_home_az.into_iter().map(|(id, _)| id).collect();
     177            0 : 
     178            0 :         let mut victims = Vec::with_capacity(batch_size);
     179            0 :         if out_of_home_az.len() >= batch_size {
     180            0 :             tracing::info!(
     181            0 :                 "Injecting chaos: found {batch_size} shards to migrate back to home AZ (total {} out of home AZ)",
     182            0 :                 out_of_home_az.len()
     183              :             );
     184              : 
     185            0 :             out_of_home_az.shuffle(&mut thread_rng());
     186            0 :             victims.extend(out_of_home_az.into_iter().take(batch_size));
     187              :         } else {
     188            0 :             tracing::info!(
     189            0 :                 "Injecting chaos: found {} shards to migrate back to home AZ, picking {} random shards to migrate",
     190            0 :                 out_of_home_az.len(),
     191            0 :                 std::cmp::min(batch_size - out_of_home_az.len(), in_home_az.len())
     192              :             );
     193              : 
     194            0 :             victims.extend(out_of_home_az);
     195            0 :             in_home_az.shuffle(&mut thread_rng());
     196            0 :             victims.extend(in_home_az.into_iter().take(batch_size - victims.len()));
     197              :         }
     198              : 
     199            0 :         for victim in victims {
     200            0 :             self.maybe_migrate_to_secondary(victim, nodes, tenants, scheduler);
     201            0 :         }
     202            0 :     }
     203              : }
        

Generated by: LCOV version 2.1-beta