Line data Source code
1 : use std::{
2 : collections::{BTreeMap, HashMap},
3 : sync::Arc,
4 : time::Duration,
5 : };
6 :
7 : use pageserver_api::controller_api::ShardSchedulingPolicy;
8 : use rand::seq::SliceRandom;
9 : use rand::thread_rng;
10 : use tokio_util::sync::CancellationToken;
11 : use utils::id::NodeId;
12 : use utils::shard::TenantShardId;
13 :
14 : use super::{Node, Scheduler, Service, TenantShard};
15 :
16 : pub struct ChaosInjector {
17 : service: Arc<Service>,
18 : interval: Duration,
19 : }
20 :
21 : impl ChaosInjector {
22 0 : pub fn new(service: Arc<Service>, interval: Duration) -> Self {
23 0 : Self { service, interval }
24 0 : }
25 :
26 0 : pub async fn run(&mut self, cancel: CancellationToken) {
27 0 : let mut interval = tokio::time::interval(self.interval);
28 :
29 : loop {
30 0 : tokio::select! {
31 0 : _ = interval.tick() => {}
32 0 : _ = cancel.cancelled() => {
33 0 : tracing::info!("Shutting down");
34 0 : return;
35 : }
36 : }
37 :
38 0 : self.inject_chaos().await;
39 :
40 0 : tracing::info!("Chaos iteration...");
41 : }
42 0 : }
43 :
44 : /// If a shard has a secondary and attached location, then re-assign the secondary to be
45 : /// attached and the attached to be secondary.
46 : ///
47 : /// Only modifies tenants if they're in Active scheduling policy.
48 0 : fn maybe_migrate_to_secondary(
49 0 : &self,
50 0 : tenant_shard_id: TenantShardId,
51 0 : nodes: &Arc<HashMap<NodeId, Node>>,
52 0 : tenants: &mut BTreeMap<TenantShardId, TenantShard>,
53 0 : scheduler: &mut Scheduler,
54 0 : ) {
55 0 : let shard = tenants
56 0 : .get_mut(&tenant_shard_id)
57 0 : .expect("Held lock between choosing ID and this get");
58 :
59 0 : if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
60 : // Skip non-active scheduling policies, so that a shard with a policy like Pause can
61 : // be pinned without being disrupted by us.
62 0 : tracing::info!(
63 0 : "Skipping shard {tenant_shard_id}: scheduling policy is {:?}",
64 0 : shard.get_scheduling_policy()
65 : );
66 0 : return;
67 0 : }
68 :
69 : // Pick a secondary to promote
70 0 : let Some(new_location) = shard
71 0 : .intent
72 0 : .get_secondary()
73 0 : .choose(&mut thread_rng())
74 0 : .cloned()
75 : else {
76 0 : tracing::info!(
77 0 : "Skipping shard {tenant_shard_id}: no secondary location, can't migrate"
78 : );
79 0 : return;
80 : };
81 :
82 0 : let Some(old_location) = *shard.intent.get_attached() else {
83 0 : tracing::info!("Skipping shard {tenant_shard_id}: currently has no attached location");
84 0 : return;
85 : };
86 :
87 0 : tracing::info!("Injecting chaos: migrate {tenant_shard_id} {old_location}->{new_location}");
88 :
89 0 : shard.intent.demote_attached(scheduler, old_location);
90 0 : shard.intent.promote_attached(scheduler, new_location);
91 0 : self.service.maybe_reconcile_shard(
92 0 : shard,
93 0 : nodes,
94 0 : crate::reconciler::ReconcilerPriority::Normal,
95 0 : );
96 0 : }
97 :
98 0 : async fn inject_chaos(&mut self) {
99 0 : // Pick some shards to interfere with
100 0 : let batch_size = 128;
101 0 : let mut inner = self.service.inner.write().unwrap();
102 0 : let (nodes, tenants, scheduler) = inner.parts_mut();
103 0 :
104 0 : // Prefer to migrate tenants that are currently outside their home AZ. This avoids the chaos injector
105 0 : // continuously pushing tenants outside their home AZ: instead, we'll tend to cycle between picking some
106 0 : // random tenants to move, and then on next chaos iteration moving them back, then picking some new
107 0 : // random tenants on the next iteration.
108 0 : let (out_of_home_az, in_home_az): (Vec<_>, Vec<_>) = tenants
109 0 : .values()
110 0 : .map(|shard| {
111 0 : (
112 0 : shard.tenant_shard_id,
113 0 : shard.is_attached_outside_preferred_az(nodes),
114 0 : )
115 0 : })
116 0 : .partition(|(_id, is_outside)| *is_outside);
117 0 :
118 0 : let mut out_of_home_az: Vec<_> = out_of_home_az.into_iter().map(|(id, _)| id).collect();
119 0 : let mut in_home_az: Vec<_> = in_home_az.into_iter().map(|(id, _)| id).collect();
120 0 :
121 0 : let mut victims = Vec::with_capacity(batch_size);
122 0 : if out_of_home_az.len() >= batch_size {
123 0 : tracing::info!("Injecting chaos: found {batch_size} shards to migrate back to home AZ (total {} out of home AZ)", out_of_home_az.len());
124 :
125 0 : out_of_home_az.shuffle(&mut thread_rng());
126 0 : victims.extend(out_of_home_az.into_iter().take(batch_size));
127 : } else {
128 0 : tracing::info!("Injecting chaos: found {} shards to migrate back to home AZ, picking {} random shards to migrate", out_of_home_az.len(), std::cmp::min(batch_size - out_of_home_az.len(), in_home_az.len()));
129 :
130 0 : victims.extend(out_of_home_az);
131 0 : in_home_az.shuffle(&mut thread_rng());
132 0 : victims.extend(in_home_az.into_iter().take(batch_size - victims.len()));
133 : }
134 :
135 0 : for victim in victims {
136 0 : self.maybe_migrate_to_secondary(victim, nodes, tenants, scheduler);
137 0 : }
138 0 : }
139 : }
|