Line data Source code
1 : use std::{
2 : collections::{BTreeMap, HashMap},
3 : sync::Arc,
4 : time::Duration,
5 : };
6 :
7 : use pageserver_api::controller_api::ShardSchedulingPolicy;
8 : use rand::seq::SliceRandom;
9 : use rand::thread_rng;
10 : use tokio_util::sync::CancellationToken;
11 : use utils::id::NodeId;
12 : use utils::shard::TenantShardId;
13 :
14 : use super::{Node, Scheduler, Service, TenantShard};
15 :
16 : pub struct ChaosInjector {
17 : service: Arc<Service>,
18 : interval: Duration,
19 : }
20 :
21 : impl ChaosInjector {
22 0 : pub fn new(service: Arc<Service>, interval: Duration) -> Self {
23 0 : Self { service, interval }
24 0 : }
25 :
26 0 : pub async fn run(&mut self, cancel: CancellationToken) {
27 0 : let mut interval = tokio::time::interval(self.interval);
28 :
29 : loop {
30 0 : tokio::select! {
31 0 : _ = interval.tick() => {}
32 0 : _ = cancel.cancelled() => {
33 0 : tracing::info!("Shutting down");
34 0 : return;
35 : }
36 : }
37 :
38 0 : self.inject_chaos().await;
39 :
40 0 : tracing::info!("Chaos iteration...");
41 : }
42 0 : }
43 :
44 : /// If a shard has a secondary and attached location, then re-assign the secondary to be
45 : /// attached and the attached to be secondary.
46 : ///
47 : /// Only modifies tenants if they're in Active scheduling policy.
48 0 : fn maybe_migrate_to_secondary(
49 0 : &self,
50 0 : tenant_shard_id: TenantShardId,
51 0 : nodes: &Arc<HashMap<NodeId, Node>>,
52 0 : tenants: &mut BTreeMap<TenantShardId, TenantShard>,
53 0 : scheduler: &mut Scheduler,
54 0 : ) {
55 0 : let shard = tenants
56 0 : .get_mut(&tenant_shard_id)
57 0 : .expect("Held lock between choosing ID and this get");
58 :
59 0 : if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
60 : // Skip non-active scheduling policies, so that a shard with a policy like Pause can
61 : // be pinned without being disrupted by us.
62 0 : tracing::info!(
63 0 : "Skipping shard {tenant_shard_id}: scheduling policy is {:?}",
64 0 : shard.get_scheduling_policy()
65 : );
66 0 : return;
67 0 : }
68 :
69 : // Pick a secondary to promote
70 0 : let Some(new_location) = shard
71 0 : .intent
72 0 : .get_secondary()
73 0 : .choose(&mut thread_rng())
74 0 : .cloned()
75 : else {
76 0 : tracing::info!(
77 0 : "Skipping shard {tenant_shard_id}: no secondary location, can't migrate"
78 : );
79 0 : return;
80 : };
81 :
82 0 : let Some(old_location) = *shard.intent.get_attached() else {
83 0 : tracing::info!("Skipping shard {tenant_shard_id}: currently has no attached location");
84 0 : return;
85 : };
86 :
87 0 : tracing::info!("Injecting chaos: migrate {tenant_shard_id} {old_location}->{new_location}");
88 :
89 0 : shard.intent.demote_attached(scheduler, old_location);
90 0 : shard.intent.promote_attached(scheduler, new_location);
91 0 : self.service.maybe_reconcile_shard(shard, nodes);
92 0 : }
93 :
94 0 : async fn inject_chaos(&mut self) {
95 0 : // Pick some shards to interfere with
96 0 : let batch_size = 128;
97 0 : let mut inner = self.service.inner.write().unwrap();
98 0 : let (nodes, tenants, scheduler) = inner.parts_mut();
99 0 : let tenant_ids = tenants.keys().cloned().collect::<Vec<_>>();
100 0 :
101 0 : // Prefer to migrate tenants that are currently outside their home AZ. This avoids the chaos injector
102 0 : // continuously pushing tenants outside their home AZ: instead, we'll tend to cycle between picking some
103 0 : // random tenants to move, and then on next chaos iteration moving them back, then picking some new
104 0 : // random tenants on the next iteration.
105 0 : let mut victims = Vec::with_capacity(batch_size);
106 0 : for shard in tenants.values() {
107 0 : if shard.is_attached_outside_preferred_az(nodes) {
108 0 : victims.push(shard.tenant_shard_id);
109 0 : }
110 :
111 0 : if victims.len() >= batch_size {
112 0 : break;
113 0 : }
114 : }
115 :
116 0 : let choose_random = batch_size.saturating_sub(victims.len());
117 0 : tracing::info!("Injecting chaos: found {} shards to migrate back to home AZ, picking {choose_random} random shards to migrate", victims.len());
118 :
119 0 : let random_victims = tenant_ids.choose_multiple(&mut thread_rng(), choose_random);
120 0 : victims.extend(random_victims);
121 :
122 0 : for victim in victims {
123 0 : self.maybe_migrate_to_secondary(victim, nodes, tenants, scheduler);
124 0 : }
125 0 : }
126 : }
|