LCOV - code coverage report
Current view: top level - storage_controller/src/service - chaos_injector.rs (source / functions) Coverage Total Hit
Test: 98683a8629f0f7f0031d02e04512998d589d76ea.info Lines: 0.0 % 173 0
Test Date: 2025-04-11 16:58:57 Functions: 0.0 % 19 0

            Line data    Source code
       1              : use std::collections::{BTreeMap, HashMap};
       2              : use std::sync::Arc;
       3              : use std::time::Duration;
       4              : 
       5              : use pageserver_api::controller_api::ShardSchedulingPolicy;
       6              : use rand::seq::SliceRandom;
       7              : use rand::{Rng, thread_rng};
       8              : use tokio_util::sync::CancellationToken;
       9              : use utils::id::NodeId;
      10              : use utils::shard::TenantShardId;
      11              : 
      12              : use super::{Node, Scheduler, Service, TenantShard};
      13              : 
      14              : pub struct ChaosInjector {
      15              :     service: Arc<Service>,
      16              :     interval: Duration,
      17              :     chaos_exit_crontab: Option<cron::Schedule>,
      18              : }
      19              : 
      20            0 : fn cron_to_next_duration(cron: &cron::Schedule) -> anyhow::Result<tokio::time::Sleep> {
      21              :     use chrono::Utc;
      22            0 :     let next = cron.upcoming(Utc).next().unwrap();
      23            0 :     let duration = (next - Utc::now()).to_std()?;
      24            0 :     Ok(tokio::time::sleep(duration))
      25            0 : }
      26              : 
      27            0 : async fn maybe_sleep(sleep: Option<tokio::time::Sleep>) -> Option<()> {
      28            0 :     if let Some(sleep) = sleep {
      29            0 :         sleep.await;
      30            0 :         Some(())
      31              :     } else {
      32            0 :         None
      33              :     }
      34            0 : }
      35              : 
      36              : impl ChaosInjector {
      37            0 :     pub fn new(
      38            0 :         service: Arc<Service>,
      39            0 :         interval: Duration,
      40            0 :         chaos_exit_crontab: Option<cron::Schedule>,
      41            0 :     ) -> Self {
      42            0 :         Self {
      43            0 :             service,
      44            0 :             interval,
      45            0 :             chaos_exit_crontab,
      46            0 :         }
      47            0 :     }
      48              : 
      49            0 :     fn get_cron_interval_sleep_future(&self) -> Option<tokio::time::Sleep> {
      50            0 :         if let Some(ref chaos_exit_crontab) = self.chaos_exit_crontab {
      51            0 :             match cron_to_next_duration(chaos_exit_crontab) {
      52            0 :                 Ok(interval_exit) => Some(interval_exit),
      53            0 :                 Err(e) => {
      54            0 :                     tracing::error!("Error processing the cron schedule: {e}");
      55            0 :                     None
      56              :                 }
      57              :             }
      58              :         } else {
      59            0 :             None
      60              :         }
      61            0 :     }
      62              : 
      63            0 :     pub async fn run(&mut self, cancel: CancellationToken) {
      64            0 :         let mut interval = tokio::time::interval(self.interval);
      65              :         #[derive(Debug)]
      66              :         enum ChaosEvent {
      67              :             MigrationsToSecondary,
      68              :             ForceKillController,
      69              :             GracefulMigrationsAnywhere,
      70              :         }
      71              :         loop {
      72            0 :             let cron_interval = self.get_cron_interval_sleep_future();
      73            0 :             let chaos_type = tokio::select! {
      74            0 :                 _ = interval.tick() => {
      75            0 :                     if thread_rng().gen_bool(0.5) {
      76            0 :                         ChaosEvent::MigrationsToSecondary
      77              :                     } else {
      78            0 :                         ChaosEvent::GracefulMigrationsAnywhere
      79              :                     }
      80              :                 }
      81            0 :                 Some(_) = maybe_sleep(cron_interval) => {
      82            0 :                     ChaosEvent::ForceKillController
      83              :                 }
      84            0 :                 _ = cancel.cancelled() => {
      85            0 :                     tracing::info!("Shutting down");
      86            0 :                     return;
      87              :                 }
      88              :             };
      89            0 :             tracing::info!("Chaos iteration: {chaos_type:?}...");
      90            0 :             match chaos_type {
      91            0 :                 ChaosEvent::MigrationsToSecondary => {
      92            0 :                     self.inject_migrations_to_secondary();
      93            0 :                 }
      94            0 :                 ChaosEvent::GracefulMigrationsAnywhere => {
      95            0 :                     self.inject_graceful_migrations_anywhere();
      96            0 :                 }
      97              :                 ChaosEvent::ForceKillController => {
      98            0 :                     self.force_kill().await;
      99              :                 }
     100              :             }
     101              :         }
     102            0 :     }
     103              : 
     104            0 :     fn is_shard_eligible_for_chaos(&self, shard: &TenantShard) -> bool {
     105              :         // - Skip non-active scheduling policies, so that a shard with a policy like Pause can
     106              :         //   be pinned without being disrupted by us.
     107              :         // - Skip shards doing a graceful migration already, so that we allow these to run to
     108              :         //   completion rather than only exercising the first part and then cancelling with
     109              :         //   some other chaos.
     110            0 :         !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
     111            0 :             && shard.get_preferred_node().is_none()
     112            0 :     }
     113              : 
     114              :     /// If a shard has a secondary and attached location, then re-assign the secondary to be
     115              :     /// attached and the attached to be secondary.
     116              :     ///
     117              :     /// Only modifies tenants if they're in Active scheduling policy.
     118            0 :     fn maybe_migrate_to_secondary(
     119            0 :         &self,
     120            0 :         tenant_shard_id: TenantShardId,
     121            0 :         nodes: &Arc<HashMap<NodeId, Node>>,
     122            0 :         tenants: &mut BTreeMap<TenantShardId, TenantShard>,
     123            0 :         scheduler: &mut Scheduler,
     124            0 :     ) {
     125            0 :         let shard = tenants
     126            0 :             .get_mut(&tenant_shard_id)
     127            0 :             .expect("Held lock between choosing ID and this get");
     128            0 : 
     129            0 :         if !self.is_shard_eligible_for_chaos(shard) {
     130            0 :             return;
     131            0 :         }
     132              : 
     133              :         // Pick a secondary to promote
     134            0 :         let Some(new_location) = shard
     135            0 :             .intent
     136            0 :             .get_secondary()
     137            0 :             .choose(&mut thread_rng())
     138            0 :             .cloned()
     139              :         else {
     140            0 :             tracing::info!(
     141            0 :                 "Skipping shard {tenant_shard_id}: no secondary location, can't migrate"
     142              :             );
     143            0 :             return;
     144              :         };
     145              : 
     146            0 :         let Some(old_location) = *shard.intent.get_attached() else {
     147            0 :             tracing::info!("Skipping shard {tenant_shard_id}: currently has no attached location");
     148            0 :             return;
     149              :         };
     150              : 
     151            0 :         tracing::info!("Injecting chaos: migrate {tenant_shard_id} {old_location}->{new_location}");
     152              : 
     153            0 :         shard.intent.demote_attached(scheduler, old_location);
     154            0 :         shard.intent.promote_attached(scheduler, new_location);
     155            0 :         self.service.maybe_reconcile_shard(
     156            0 :             shard,
     157            0 :             nodes,
     158            0 :             crate::reconciler::ReconcilerPriority::Normal,
     159            0 :         );
     160            0 :     }
     161              : 
     162            0 :     async fn force_kill(&mut self) {
     163            0 :         tracing::warn!("Injecting chaos: force kill");
     164            0 :         std::process::exit(1);
     165              :     }
     166              : 
     167              :     // Unlike [`Self::inject_migrations_to_secondary`], this function will not only cut over to secondary, it
     168              :     // will migrate a tenant to a random node in its home AZ using a graceful migration of the same type
     169              :     // that my be initiated by an API caller using prewarm=true.
     170              :     //
     171              :     // This is a much more expensive operation in terms of I/O and time, as we will fully warm up
     172              :     // some new location in order to migrate the tenant there.  For that reason we do far fewer of these.
     173            0 :     fn inject_graceful_migrations_anywhere(&mut self) {
     174            0 :         let batch_size = 1;
     175            0 :         let mut inner = self.service.inner.write().unwrap();
     176            0 :         let (nodes, tenants, _scheduler) = inner.parts_mut();
     177            0 : 
     178            0 :         let mut candidates = tenants
     179            0 :             .values_mut()
     180            0 :             .filter(|shard| self.is_shard_eligible_for_chaos(shard))
     181            0 :             .collect::<Vec<_>>();
     182            0 : 
     183            0 :         tracing::info!(
     184            0 :             "Injecting chaos: found {} candidates for graceful migrations anywhere",
     185            0 :             candidates.len()
     186              :         );
     187              : 
     188            0 :         let mut victims: Vec<&mut TenantShard> = Vec::new();
     189              : 
     190              :         // Pick our victims: use a hand-rolled loop rather than choose_multiple() because we want
     191              :         // to take the mutable refs from our candidates rather than ref'ing them.
     192            0 :         while !candidates.is_empty() && victims.len() < batch_size {
     193            0 :             let i = thread_rng().gen_range(0..candidates.len());
     194            0 :             victims.push(candidates.swap_remove(i));
     195            0 :         }
     196              : 
     197            0 :         for victim in victims.into_iter() {
     198              :             // Find a node in the same AZ as the shard, or if the shard has no AZ preference, which
     199              :             // is not where they are currently attached.
     200            0 :             let candidate_nodes = nodes
     201            0 :                 .values()
     202            0 :                 .filter(|node| {
     203            0 :                     if let Some(preferred_az) = victim.preferred_az() {
     204            0 :                         node.get_availability_zone_id() == preferred_az
     205            0 :                     } else if let Some(attached) = *victim.intent.get_attached() {
     206            0 :                         node.get_id() != attached
     207              :                     } else {
     208            0 :                         true
     209              :                     }
     210            0 :                 })
     211            0 :                 .collect::<Vec<_>>();
     212              : 
     213            0 :             let Some(victim_node) = candidate_nodes.choose(&mut thread_rng()) else {
     214              :                 // This can happen if e.g. we are in a small region with only one pageserver per AZ.
     215            0 :                 tracing::info!(
     216            0 :                     "no candidate nodes found for migrating shard {tenant_shard_id} within its home AZ",
     217              :                     tenant_shard_id = victim.tenant_shard_id
     218              :                 );
     219            0 :                 continue;
     220              :             };
     221              : 
     222              :             // This doesn't change intent immediately: next iteration of Service::optimize_all should do that.  We avoid
     223              :             // doing it here because applying optimizations requires dropping lock to do some async work to check the optimisation
     224              :             // is valid given remote state, and it would be a shame to duplicate that dance here.
     225            0 :             tracing::info!(
     226            0 :                 "Injecting chaos: migrate {} to {}",
     227              :                 victim.tenant_shard_id,
     228              :                 victim_node
     229              :             );
     230            0 :             victim.set_preferred_node(Some(victim_node.get_id()));
     231              :         }
     232            0 :     }
     233              : 
     234              :     /// Migrations of attached locations to their secondary location.  This exercises reconciliation in general,
     235              :     /// live migration in particular, and the pageserver code for cleanly shutting down and starting up tenants
     236              :     /// during such migrations.
     237            0 :     fn inject_migrations_to_secondary(&mut self) {
     238            0 :         // Pick some shards to interfere with
     239            0 :         let batch_size = 128;
     240            0 :         let mut inner = self.service.inner.write().unwrap();
     241            0 :         let (nodes, tenants, scheduler) = inner.parts_mut();
     242            0 : 
     243            0 :         // Prefer to migrate tenants that are currently outside their home AZ.  This avoids the chaos injector
     244            0 :         // continuously pushing tenants outside their home AZ: instead, we'll tend to cycle between picking some
     245            0 :         // random tenants to move, and then on next chaos iteration moving them back, then picking some new
     246            0 :         // random tenants on the next iteration.
     247            0 :         let (out_of_home_az, in_home_az): (Vec<_>, Vec<_>) = tenants
     248            0 :             .values()
     249            0 :             .map(|shard| {
     250            0 :                 (
     251            0 :                     shard.tenant_shard_id,
     252            0 :                     shard.is_attached_outside_preferred_az(nodes),
     253            0 :                 )
     254            0 :             })
     255            0 :             .partition(|(_id, is_outside)| *is_outside);
     256            0 : 
     257            0 :         let mut out_of_home_az: Vec<_> = out_of_home_az.into_iter().map(|(id, _)| id).collect();
     258            0 :         let mut in_home_az: Vec<_> = in_home_az.into_iter().map(|(id, _)| id).collect();
     259            0 : 
     260            0 :         let mut victims = Vec::with_capacity(batch_size);
     261            0 :         if out_of_home_az.len() >= batch_size {
     262            0 :             tracing::info!(
     263            0 :                 "Injecting chaos: found {batch_size} shards to migrate back to home AZ (total {} out of home AZ)",
     264            0 :                 out_of_home_az.len()
     265              :             );
     266              : 
     267            0 :             out_of_home_az.shuffle(&mut thread_rng());
     268            0 :             victims.extend(out_of_home_az.into_iter().take(batch_size));
     269              :         } else {
     270            0 :             tracing::info!(
     271            0 :                 "Injecting chaos: found {} shards to migrate back to home AZ, picking {} random shards to migrate",
     272            0 :                 out_of_home_az.len(),
     273            0 :                 std::cmp::min(batch_size - out_of_home_az.len(), in_home_az.len())
     274              :             );
     275              : 
     276            0 :             victims.extend(out_of_home_az);
     277            0 :             in_home_az.shuffle(&mut thread_rng());
     278            0 :             victims.extend(in_home_az.into_iter().take(batch_size - victims.len()));
     279              :         }
     280              : 
     281            0 :         for victim in victims {
     282            0 :             self.maybe_migrate_to_secondary(victim, nodes, tenants, scheduler);
     283            0 :         }
     284            0 :     }
     285              : }
        

Generated by: LCOV version 2.1-beta