Line data Source code
1 : use std::collections::{BTreeMap, HashMap};
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use pageserver_api::controller_api::ShardSchedulingPolicy;
6 : use rand::seq::SliceRandom;
7 : use rand::thread_rng;
8 : use tokio_util::sync::CancellationToken;
9 : use utils::id::NodeId;
10 : use utils::shard::TenantShardId;
11 :
12 : use super::{Node, Scheduler, Service, TenantShard};
13 :
14 : pub struct ChaosInjector {
15 : service: Arc<Service>,
16 : interval: Duration,
17 : chaos_exit_crontab: Option<cron::Schedule>,
18 : }
19 :
20 0 : fn cron_to_next_duration(cron: &cron::Schedule) -> anyhow::Result<tokio::time::Sleep> {
21 : use chrono::Utc;
22 0 : let next = cron.upcoming(Utc).next().unwrap();
23 0 : let duration = (next - Utc::now()).to_std()?;
24 0 : Ok(tokio::time::sleep(duration))
25 0 : }
26 :
27 0 : async fn maybe_sleep(sleep: Option<tokio::time::Sleep>) -> Option<()> {
28 0 : if let Some(sleep) = sleep {
29 0 : sleep.await;
30 0 : Some(())
31 : } else {
32 0 : None
33 : }
34 0 : }
35 :
36 : impl ChaosInjector {
37 0 : pub fn new(
38 0 : service: Arc<Service>,
39 0 : interval: Duration,
40 0 : chaos_exit_crontab: Option<cron::Schedule>,
41 0 : ) -> Self {
42 0 : Self {
43 0 : service,
44 0 : interval,
45 0 : chaos_exit_crontab,
46 0 : }
47 0 : }
48 :
49 0 : fn get_cron_interval_sleep_future(&self) -> Option<tokio::time::Sleep> {
50 0 : if let Some(ref chaos_exit_crontab) = self.chaos_exit_crontab {
51 0 : match cron_to_next_duration(chaos_exit_crontab) {
52 0 : Ok(interval_exit) => Some(interval_exit),
53 0 : Err(e) => {
54 0 : tracing::error!("Error processing the cron schedule: {e}");
55 0 : None
56 : }
57 : }
58 : } else {
59 0 : None
60 : }
61 0 : }
62 :
63 0 : pub async fn run(&mut self, cancel: CancellationToken) {
64 0 : let mut interval = tokio::time::interval(self.interval);
65 : #[derive(Debug)]
66 : enum ChaosEvent {
67 : ShuffleTenant,
68 : ForceKill,
69 : }
70 : loop {
71 0 : let cron_interval = self.get_cron_interval_sleep_future();
72 0 : let chaos_type = tokio::select! {
73 0 : _ = interval.tick() => {
74 0 : ChaosEvent::ShuffleTenant
75 : }
76 0 : Some(_) = maybe_sleep(cron_interval) => {
77 0 : ChaosEvent::ForceKill
78 : }
79 0 : _ = cancel.cancelled() => {
80 0 : tracing::info!("Shutting down");
81 0 : return;
82 : }
83 : };
84 0 : tracing::info!("Chaos iteration: {chaos_type:?}...");
85 0 : match chaos_type {
86 : ChaosEvent::ShuffleTenant => {
87 0 : self.inject_chaos().await;
88 : }
89 : ChaosEvent::ForceKill => {
90 0 : self.force_kill().await;
91 : }
92 : }
93 : }
94 0 : }
95 :
96 : /// If a shard has a secondary and attached location, then re-assign the secondary to be
97 : /// attached and the attached to be secondary.
98 : ///
99 : /// Only modifies tenants if they're in Active scheduling policy.
100 0 : fn maybe_migrate_to_secondary(
101 0 : &self,
102 0 : tenant_shard_id: TenantShardId,
103 0 : nodes: &Arc<HashMap<NodeId, Node>>,
104 0 : tenants: &mut BTreeMap<TenantShardId, TenantShard>,
105 0 : scheduler: &mut Scheduler,
106 0 : ) {
107 0 : let shard = tenants
108 0 : .get_mut(&tenant_shard_id)
109 0 : .expect("Held lock between choosing ID and this get");
110 :
111 0 : if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
112 : // Skip non-active scheduling policies, so that a shard with a policy like Pause can
113 : // be pinned without being disrupted by us.
114 0 : tracing::info!(
115 0 : "Skipping shard {tenant_shard_id}: scheduling policy is {:?}",
116 0 : shard.get_scheduling_policy()
117 : );
118 0 : return;
119 0 : }
120 :
121 : // Pick a secondary to promote
122 0 : let Some(new_location) = shard
123 0 : .intent
124 0 : .get_secondary()
125 0 : .choose(&mut thread_rng())
126 0 : .cloned()
127 : else {
128 0 : tracing::info!(
129 0 : "Skipping shard {tenant_shard_id}: no secondary location, can't migrate"
130 : );
131 0 : return;
132 : };
133 :
134 0 : let Some(old_location) = *shard.intent.get_attached() else {
135 0 : tracing::info!("Skipping shard {tenant_shard_id}: currently has no attached location");
136 0 : return;
137 : };
138 :
139 0 : tracing::info!("Injecting chaos: migrate {tenant_shard_id} {old_location}->{new_location}");
140 :
141 0 : shard.intent.demote_attached(scheduler, old_location);
142 0 : shard.intent.promote_attached(scheduler, new_location);
143 0 : self.service.maybe_reconcile_shard(
144 0 : shard,
145 0 : nodes,
146 0 : crate::reconciler::ReconcilerPriority::Normal,
147 0 : );
148 0 : }
149 :
150 0 : async fn force_kill(&mut self) {
151 0 : tracing::warn!("Injecting chaos: force kill");
152 0 : std::process::exit(1);
153 : }
154 :
155 0 : async fn inject_chaos(&mut self) {
156 0 : // Pick some shards to interfere with
157 0 : let batch_size = 128;
158 0 : let mut inner = self.service.inner.write().unwrap();
159 0 : let (nodes, tenants, scheduler) = inner.parts_mut();
160 0 :
161 0 : // Prefer to migrate tenants that are currently outside their home AZ. This avoids the chaos injector
162 0 : // continuously pushing tenants outside their home AZ: instead, we'll tend to cycle between picking some
163 0 : // random tenants to move, and then on next chaos iteration moving them back, then picking some new
164 0 : // random tenants on the next iteration.
165 0 : let (out_of_home_az, in_home_az): (Vec<_>, Vec<_>) = tenants
166 0 : .values()
167 0 : .map(|shard| {
168 0 : (
169 0 : shard.tenant_shard_id,
170 0 : shard.is_attached_outside_preferred_az(nodes),
171 0 : )
172 0 : })
173 0 : .partition(|(_id, is_outside)| *is_outside);
174 0 :
175 0 : let mut out_of_home_az: Vec<_> = out_of_home_az.into_iter().map(|(id, _)| id).collect();
176 0 : let mut in_home_az: Vec<_> = in_home_az.into_iter().map(|(id, _)| id).collect();
177 0 :
178 0 : let mut victims = Vec::with_capacity(batch_size);
179 0 : if out_of_home_az.len() >= batch_size {
180 0 : tracing::info!(
181 0 : "Injecting chaos: found {batch_size} shards to migrate back to home AZ (total {} out of home AZ)",
182 0 : out_of_home_az.len()
183 : );
184 :
185 0 : out_of_home_az.shuffle(&mut thread_rng());
186 0 : victims.extend(out_of_home_az.into_iter().take(batch_size));
187 : } else {
188 0 : tracing::info!(
189 0 : "Injecting chaos: found {} shards to migrate back to home AZ, picking {} random shards to migrate",
190 0 : out_of_home_az.len(),
191 0 : std::cmp::min(batch_size - out_of_home_az.len(), in_home_az.len())
192 : );
193 :
194 0 : victims.extend(out_of_home_az);
195 0 : in_home_az.shuffle(&mut thread_rng());
196 0 : victims.extend(in_home_az.into_iter().take(batch_size - victims.len()));
197 : }
198 :
199 0 : for victim in victims {
200 0 : self.maybe_migrate_to_secondary(victim, nodes, tenants, scheduler);
201 0 : }
202 0 : }
203 : }
|