LCOV - code coverage report
Current view: top level - storage_controller/src/service - chaos_injector.rs (source / functions) Coverage Total Hit
Test: 727bdccc1d7d53837da843959afb612f56da4e79.info Lines: 0.0 % 72 0
Test Date: 2025-01-30 15:18:43 Functions: 0.0 % 6 0

            Line data    Source code
       1              : use std::{
       2              :     collections::{BTreeMap, HashMap},
       3              :     sync::Arc,
       4              :     time::Duration,
       5              : };
       6              : 
       7              : use pageserver_api::controller_api::ShardSchedulingPolicy;
       8              : use rand::seq::SliceRandom;
       9              : use rand::thread_rng;
      10              : use tokio_util::sync::CancellationToken;
      11              : use utils::id::NodeId;
      12              : use utils::shard::TenantShardId;
      13              : 
      14              : use super::{Node, Scheduler, Service, TenantShard};
      15              : 
      16              : pub struct ChaosInjector {
      17              :     service: Arc<Service>,
      18              :     interval: Duration,
      19              : }
      20              : 
      21              : impl ChaosInjector {
      22            0 :     pub fn new(service: Arc<Service>, interval: Duration) -> Self {
      23            0 :         Self { service, interval }
      24            0 :     }
      25              : 
      26            0 :     pub async fn run(&mut self, cancel: CancellationToken) {
      27            0 :         let mut interval = tokio::time::interval(self.interval);
      28              : 
      29              :         loop {
      30            0 :             tokio::select! {
      31            0 :                 _ = interval.tick() => {}
      32            0 :                 _ = cancel.cancelled() => {
      33            0 :                     tracing::info!("Shutting down");
      34            0 :                     return;
      35              :                 }
      36              :             }
      37              : 
      38            0 :             self.inject_chaos().await;
      39              : 
      40            0 :             tracing::info!("Chaos iteration...");
      41              :         }
      42            0 :     }
      43              : 
      44              :     /// If a shard has a secondary and attached location, then re-assign the secondary to be
      45              :     /// attached and the attached to be secondary.
      46              :     ///
      47              :     /// Only modifies tenants if they're in Active scheduling policy.
      48            0 :     fn maybe_migrate_to_secondary(
      49            0 :         &self,
      50            0 :         tenant_shard_id: TenantShardId,
      51            0 :         nodes: &Arc<HashMap<NodeId, Node>>,
      52            0 :         tenants: &mut BTreeMap<TenantShardId, TenantShard>,
      53            0 :         scheduler: &mut Scheduler,
      54            0 :     ) {
      55            0 :         let shard = tenants
      56            0 :             .get_mut(&tenant_shard_id)
      57            0 :             .expect("Held lock between choosing ID and this get");
      58              : 
      59            0 :         if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
      60              :             // Skip non-active scheduling policies, so that a shard with a policy like Pause can
      61              :             // be pinned without being disrupted by us.
      62            0 :             tracing::info!(
      63            0 :                 "Skipping shard {tenant_shard_id}: scheduling policy is {:?}",
      64            0 :                 shard.get_scheduling_policy()
      65              :             );
      66            0 :             return;
      67            0 :         }
      68              : 
      69              :         // Pick a secondary to promote
      70            0 :         let Some(new_location) = shard
      71            0 :             .intent
      72            0 :             .get_secondary()
      73            0 :             .choose(&mut thread_rng())
      74            0 :             .cloned()
      75              :         else {
      76            0 :             tracing::info!(
      77            0 :                 "Skipping shard {tenant_shard_id}: no secondary location, can't migrate"
      78              :             );
      79            0 :             return;
      80              :         };
      81              : 
      82            0 :         let Some(old_location) = *shard.intent.get_attached() else {
      83            0 :             tracing::info!("Skipping shard {tenant_shard_id}: currently has no attached location");
      84            0 :             return;
      85              :         };
      86              : 
      87            0 :         tracing::info!("Injecting chaos: migrate {tenant_shard_id} {old_location}->{new_location}");
      88              : 
      89            0 :         shard.intent.demote_attached(scheduler, old_location);
      90            0 :         shard.intent.promote_attached(scheduler, new_location);
      91            0 :         self.service.maybe_reconcile_shard(shard, nodes);
      92            0 :     }
      93              : 
      94            0 :     async fn inject_chaos(&mut self) {
      95            0 :         // Pick some shards to interfere with
      96            0 :         let batch_size = 128;
      97            0 :         let mut inner = self.service.inner.write().unwrap();
      98            0 :         let (nodes, tenants, scheduler) = inner.parts_mut();
      99            0 :         let tenant_ids = tenants.keys().cloned().collect::<Vec<_>>();
     100            0 : 
     101            0 :         // Prefer to migrate tenants that are currently outside their home AZ.  This avoids the chaos injector
     102            0 :         // continuously pushing tenants outside their home AZ: instead, we'll tend to cycle between picking some
     103            0 :         // random tenants to move, and then on next chaos iteration moving them back, then picking some new
     104            0 :         // random tenants on the next iteration.
     105            0 :         let mut victims = Vec::with_capacity(batch_size);
     106            0 :         for shard in tenants.values() {
     107            0 :             if shard.is_attached_outside_preferred_az(nodes) {
     108            0 :                 victims.push(shard.tenant_shard_id);
     109            0 :             }
     110              : 
     111            0 :             if victims.len() >= batch_size {
     112            0 :                 break;
     113            0 :             }
     114              :         }
     115              : 
     116            0 :         let choose_random = batch_size.saturating_sub(victims.len());
     117            0 :         tracing::info!("Injecting chaos: found {} shards to migrate back to home AZ, picking {choose_random} random shards to migrate", victims.len());
     118              : 
     119            0 :         let random_victims = tenant_ids.choose_multiple(&mut thread_rng(), choose_random);
     120            0 :         victims.extend(random_victims);
     121              : 
     122            0 :         for victim in victims {
     123            0 :             self.maybe_migrate_to_secondary(victim, nodes, tenants, scheduler);
     124            0 :         }
     125            0 :     }
     126              : }
        

Generated by: LCOV version 2.1-beta