LCOV - code coverage report
Current view: top level - storage_controller/src/service - chaos_injector.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 0.0 % 85 0
Test Date: 2025-02-20 13:11:02 Functions: 0.0 % 10 0

            Line data    Source code
       1              : use std::{
       2              :     collections::{BTreeMap, HashMap},
       3              :     sync::Arc,
       4              :     time::Duration,
       5              : };
       6              : 
       7              : use pageserver_api::controller_api::ShardSchedulingPolicy;
       8              : use rand::seq::SliceRandom;
       9              : use rand::thread_rng;
      10              : use tokio_util::sync::CancellationToken;
      11              : use utils::id::NodeId;
      12              : use utils::shard::TenantShardId;
      13              : 
      14              : use super::{Node, Scheduler, Service, TenantShard};
      15              : 
      16              : pub struct ChaosInjector {
      17              :     service: Arc<Service>,
      18              :     interval: Duration,
      19              : }
      20              : 
      21              : impl ChaosInjector {
      22            0 :     pub fn new(service: Arc<Service>, interval: Duration) -> Self {
      23            0 :         Self { service, interval }
      24            0 :     }
      25              : 
      26            0 :     pub async fn run(&mut self, cancel: CancellationToken) {
      27            0 :         let mut interval = tokio::time::interval(self.interval);
      28              : 
      29              :         loop {
      30            0 :             tokio::select! {
      31            0 :                 _ = interval.tick() => {}
      32            0 :                 _ = cancel.cancelled() => {
      33            0 :                     tracing::info!("Shutting down");
      34            0 :                     return;
      35              :                 }
      36              :             }
      37              : 
      38            0 :             self.inject_chaos().await;
      39              : 
      40            0 :             tracing::info!("Chaos iteration...");
      41              :         }
      42            0 :     }
      43              : 
      44              :     /// If a shard has a secondary and attached location, then re-assign the secondary to be
      45              :     /// attached and the attached to be secondary.
      46              :     ///
      47              :     /// Only modifies tenants if they're in Active scheduling policy.
      48            0 :     fn maybe_migrate_to_secondary(
      49            0 :         &self,
      50            0 :         tenant_shard_id: TenantShardId,
      51            0 :         nodes: &Arc<HashMap<NodeId, Node>>,
      52            0 :         tenants: &mut BTreeMap<TenantShardId, TenantShard>,
      53            0 :         scheduler: &mut Scheduler,
      54            0 :     ) {
      55            0 :         let shard = tenants
      56            0 :             .get_mut(&tenant_shard_id)
      57            0 :             .expect("Held lock between choosing ID and this get");
      58              : 
      59            0 :         if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
      60              :             // Skip non-active scheduling policies, so that a shard with a policy like Pause can
      61              :             // be pinned without being disrupted by us.
      62            0 :             tracing::info!(
      63            0 :                 "Skipping shard {tenant_shard_id}: scheduling policy is {:?}",
      64            0 :                 shard.get_scheduling_policy()
      65              :             );
      66            0 :             return;
      67            0 :         }
      68              : 
      69              :         // Pick a secondary to promote
      70            0 :         let Some(new_location) = shard
      71            0 :             .intent
      72            0 :             .get_secondary()
      73            0 :             .choose(&mut thread_rng())
      74            0 :             .cloned()
      75              :         else {
      76            0 :             tracing::info!(
      77            0 :                 "Skipping shard {tenant_shard_id}: no secondary location, can't migrate"
      78              :             );
      79            0 :             return;
      80              :         };
      81              : 
      82            0 :         let Some(old_location) = *shard.intent.get_attached() else {
      83            0 :             tracing::info!("Skipping shard {tenant_shard_id}: currently has no attached location");
      84            0 :             return;
      85              :         };
      86              : 
      87            0 :         tracing::info!("Injecting chaos: migrate {tenant_shard_id} {old_location}->{new_location}");
      88              : 
      89            0 :         shard.intent.demote_attached(scheduler, old_location);
      90            0 :         shard.intent.promote_attached(scheduler, new_location);
      91            0 :         self.service.maybe_reconcile_shard(
      92            0 :             shard,
      93            0 :             nodes,
      94            0 :             crate::reconciler::ReconcilerPriority::Normal,
      95            0 :         );
      96            0 :     }
      97              : 
      98            0 :     async fn inject_chaos(&mut self) {
      99            0 :         // Pick some shards to interfere with
     100            0 :         let batch_size = 128;
     101            0 :         let mut inner = self.service.inner.write().unwrap();
     102            0 :         let (nodes, tenants, scheduler) = inner.parts_mut();
     103            0 : 
     104            0 :         // Prefer to migrate tenants that are currently outside their home AZ.  This avoids the chaos injector
     105            0 :         // continuously pushing tenants outside their home AZ: instead, we'll tend to cycle between picking some
     106            0 :         // random tenants to move, and then on next chaos iteration moving them back, then picking some new
     107            0 :         // random tenants on the next iteration.
     108            0 :         let (out_of_home_az, in_home_az): (Vec<_>, Vec<_>) = tenants
     109            0 :             .values()
     110            0 :             .map(|shard| {
     111            0 :                 (
     112            0 :                     shard.tenant_shard_id,
     113            0 :                     shard.is_attached_outside_preferred_az(nodes),
     114            0 :                 )
     115            0 :             })
     116            0 :             .partition(|(_id, is_outside)| *is_outside);
     117            0 : 
     118            0 :         let mut out_of_home_az: Vec<_> = out_of_home_az.into_iter().map(|(id, _)| id).collect();
     119            0 :         let mut in_home_az: Vec<_> = in_home_az.into_iter().map(|(id, _)| id).collect();
     120            0 : 
     121            0 :         let mut victims = Vec::with_capacity(batch_size);
     122            0 :         if out_of_home_az.len() >= batch_size {
     123            0 :             tracing::info!("Injecting chaos: found {batch_size} shards to migrate back to home AZ (total {} out of home AZ)", out_of_home_az.len());
     124              : 
     125            0 :             out_of_home_az.shuffle(&mut thread_rng());
     126            0 :             victims.extend(out_of_home_az.into_iter().take(batch_size));
     127              :         } else {
     128            0 :             tracing::info!("Injecting chaos: found {} shards to migrate back to home AZ, picking {} random shards to migrate", out_of_home_az.len(), std::cmp::min(batch_size - out_of_home_az.len(), in_home_az.len()));
     129              : 
     130            0 :             victims.extend(out_of_home_az);
     131            0 :             in_home_az.shuffle(&mut thread_rng());
     132            0 :             victims.extend(in_home_az.into_iter().take(batch_size - victims.len()));
     133              :         }
     134              : 
     135            0 :         for victim in victims {
     136            0 :             self.maybe_migrate_to_secondary(victim, nodes, tenants, scheduler);
     137            0 :         }
     138            0 :     }
     139              : }
        

Generated by: LCOV version 2.1-beta