LCOV - code coverage report
Current view: top level - storage_controller/src/service - chaos_injector.rs (source / functions) Coverage Total Hit
Test: feead26e04cdef6e988ff1765b1cb7075eb48d3d.info Lines: 0.0 % 125 0
Test Date: 2025-02-28 12:11:00 Functions: 0.0 % 15 0

            Line data    Source code
       1              : use std::collections::{BTreeMap, HashMap};
       2              : use std::sync::Arc;
       3              : use std::time::Duration;
       4              : 
       5              : use pageserver_api::controller_api::ShardSchedulingPolicy;
       6              : use rand::seq::SliceRandom;
       7              : use rand::thread_rng;
       8              : use tokio_util::sync::CancellationToken;
       9              : use utils::id::NodeId;
      10              : use utils::shard::TenantShardId;
      11              : 
      12              : use super::{Node, Scheduler, Service, TenantShard};
      13              : 
      14              : pub struct ChaosInjector {
      15              :     service: Arc<Service>,
      16              :     interval: Duration,
      17              :     chaos_exit_crontab: Option<cron::Schedule>,
      18              : }
      19              : 
      20            0 : fn cron_to_next_duration(cron: &cron::Schedule) -> anyhow::Result<tokio::time::Sleep> {
      21              :     use chrono::Utc;
      22            0 :     let next = cron.upcoming(Utc).next().unwrap();
      23            0 :     let duration = (next - Utc::now()).to_std()?;
      24            0 :     Ok(tokio::time::sleep(duration))
      25            0 : }
      26              : 
      27            0 : async fn maybe_sleep(sleep: Option<tokio::time::Sleep>) -> Option<()> {
      28            0 :     if let Some(sleep) = sleep {
      29            0 :         sleep.await;
      30            0 :         Some(())
      31              :     } else {
      32            0 :         None
      33              :     }
      34            0 : }
      35              : 
      36              : impl ChaosInjector {
      37            0 :     pub fn new(
      38            0 :         service: Arc<Service>,
      39            0 :         interval: Duration,
      40            0 :         chaos_exit_crontab: Option<cron::Schedule>,
      41            0 :     ) -> Self {
      42            0 :         Self {
      43            0 :             service,
      44            0 :             interval,
      45            0 :             chaos_exit_crontab,
      46            0 :         }
      47            0 :     }
      48              : 
      49            0 :     pub async fn run(&mut self, cancel: CancellationToken) {
      50            0 :         let mut interval = tokio::time::interval(self.interval);
      51            0 :         let cron_interval = {
      52            0 :             if let Some(ref chaos_exit_crontab) = self.chaos_exit_crontab {
      53            0 :                 match cron_to_next_duration(chaos_exit_crontab) {
      54            0 :                     Ok(interval_exit) => Some(interval_exit),
      55            0 :                     Err(e) => {
      56            0 :                         tracing::error!("Error processing the cron schedule: {e}");
      57            0 :                         None
      58              :                     }
      59              :                 }
      60              :             } else {
      61            0 :                 None
      62              :             }
      63              :         };
      64              :         enum ChaosEvent {
      65              :             ShuffleTenant,
      66              :             ForceKill,
      67              :         }
      68            0 :         let chaos_type = tokio::select! {
      69            0 :             _ = interval.tick() => {
      70            0 :                 ChaosEvent::ShuffleTenant
      71              :             }
      72            0 :             Some(_) = maybe_sleep(cron_interval) => {
      73            0 :                 ChaosEvent::ForceKill
      74              :             }
      75            0 :             _ = cancel.cancelled() => {
      76            0 :                 tracing::info!("Shutting down");
      77            0 :                 return;
      78              :             }
      79              :         };
      80              : 
      81            0 :         match chaos_type {
      82              :             ChaosEvent::ShuffleTenant => {
      83            0 :                 self.inject_chaos().await;
      84              :             }
      85              :             ChaosEvent::ForceKill => {
      86            0 :                 self.force_kill().await;
      87              :             }
      88              :         }
      89              : 
      90            0 :         tracing::info!("Chaos iteration...");
      91            0 :     }
      92              : 
      93              :     /// If a shard has a secondary and attached location, then re-assign the secondary to be
      94              :     /// attached and the attached to be secondary.
      95              :     ///
      96              :     /// Only modifies tenants if they're in Active scheduling policy.
      97            0 :     fn maybe_migrate_to_secondary(
      98            0 :         &self,
      99            0 :         tenant_shard_id: TenantShardId,
     100            0 :         nodes: &Arc<HashMap<NodeId, Node>>,
     101            0 :         tenants: &mut BTreeMap<TenantShardId, TenantShard>,
     102            0 :         scheduler: &mut Scheduler,
     103            0 :     ) {
     104            0 :         let shard = tenants
     105            0 :             .get_mut(&tenant_shard_id)
     106            0 :             .expect("Held lock between choosing ID and this get");
     107              : 
     108            0 :         if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
     109              :             // Skip non-active scheduling policies, so that a shard with a policy like Pause can
     110              :             // be pinned without being disrupted by us.
     111            0 :             tracing::info!(
     112            0 :                 "Skipping shard {tenant_shard_id}: scheduling policy is {:?}",
     113            0 :                 shard.get_scheduling_policy()
     114              :             );
     115            0 :             return;
     116            0 :         }
     117              : 
     118              :         // Pick a secondary to promote
     119            0 :         let Some(new_location) = shard
     120            0 :             .intent
     121            0 :             .get_secondary()
     122            0 :             .choose(&mut thread_rng())
     123            0 :             .cloned()
     124              :         else {
     125            0 :             tracing::info!(
     126            0 :                 "Skipping shard {tenant_shard_id}: no secondary location, can't migrate"
     127              :             );
     128            0 :             return;
     129              :         };
     130              : 
     131            0 :         let Some(old_location) = *shard.intent.get_attached() else {
     132            0 :             tracing::info!("Skipping shard {tenant_shard_id}: currently has no attached location");
     133            0 :             return;
     134              :         };
     135              : 
     136            0 :         tracing::info!("Injecting chaos: migrate {tenant_shard_id} {old_location}->{new_location}");
     137              : 
     138            0 :         shard.intent.demote_attached(scheduler, old_location);
     139            0 :         shard.intent.promote_attached(scheduler, new_location);
     140            0 :         self.service.maybe_reconcile_shard(
     141            0 :             shard,
     142            0 :             nodes,
     143            0 :             crate::reconciler::ReconcilerPriority::Normal,
     144            0 :         );
     145            0 :     }
     146              : 
     147            0 :     async fn force_kill(&mut self) {
     148            0 :         tracing::warn!("Injecting chaos: force kill");
     149            0 :         std::process::exit(1);
     150              :     }
     151              : 
     152            0 :     async fn inject_chaos(&mut self) {
     153            0 :         // Pick some shards to interfere with
     154            0 :         let batch_size = 128;
     155            0 :         let mut inner = self.service.inner.write().unwrap();
     156            0 :         let (nodes, tenants, scheduler) = inner.parts_mut();
     157            0 : 
     158            0 :         // Prefer to migrate tenants that are currently outside their home AZ.  This avoids the chaos injector
     159            0 :         // continuously pushing tenants outside their home AZ: instead, we'll tend to cycle between picking some
     160            0 :         // random tenants to move, and then on next chaos iteration moving them back, then picking some new
     161            0 :         // random tenants on the next iteration.
     162            0 :         let (out_of_home_az, in_home_az): (Vec<_>, Vec<_>) = tenants
     163            0 :             .values()
     164            0 :             .map(|shard| {
     165            0 :                 (
     166            0 :                     shard.tenant_shard_id,
     167            0 :                     shard.is_attached_outside_preferred_az(nodes),
     168            0 :                 )
     169            0 :             })
     170            0 :             .partition(|(_id, is_outside)| *is_outside);
     171            0 : 
     172            0 :         let mut out_of_home_az: Vec<_> = out_of_home_az.into_iter().map(|(id, _)| id).collect();
     173            0 :         let mut in_home_az: Vec<_> = in_home_az.into_iter().map(|(id, _)| id).collect();
     174            0 : 
     175            0 :         let mut victims = Vec::with_capacity(batch_size);
     176            0 :         if out_of_home_az.len() >= batch_size {
     177            0 :             tracing::info!(
     178            0 :                 "Injecting chaos: found {batch_size} shards to migrate back to home AZ (total {} out of home AZ)",
     179            0 :                 out_of_home_az.len()
     180              :             );
     181              : 
     182            0 :             out_of_home_az.shuffle(&mut thread_rng());
     183            0 :             victims.extend(out_of_home_az.into_iter().take(batch_size));
     184              :         } else {
     185            0 :             tracing::info!(
     186            0 :                 "Injecting chaos: found {} shards to migrate back to home AZ, picking {} random shards to migrate",
     187            0 :                 out_of_home_az.len(),
     188            0 :                 std::cmp::min(batch_size - out_of_home_az.len(), in_home_az.len())
     189              :             );
     190              : 
     191            0 :             victims.extend(out_of_home_az);
     192            0 :             in_home_az.shuffle(&mut thread_rng());
     193            0 :             victims.extend(in_home_az.into_iter().take(batch_size - victims.len()));
     194              :         }
     195              : 
     196            0 :         for victim in victims {
     197            0 :             self.maybe_migrate_to_secondary(victim, nodes, tenants, scheduler);
     198            0 :         }
     199            0 :     }
     200              : }
        

Generated by: LCOV version 2.1-beta