Line data Source code
1 : use std::collections::{BTreeMap, HashMap};
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use pageserver_api::controller_api::ShardSchedulingPolicy;
6 : use rand::seq::SliceRandom;
7 : use rand::thread_rng;
8 : use tokio_util::sync::CancellationToken;
9 : use utils::id::NodeId;
10 : use utils::shard::TenantShardId;
11 :
12 : use super::{Node, Scheduler, Service, TenantShard};
13 :
14 : pub struct ChaosInjector {
15 : service: Arc<Service>,
16 : interval: Duration,
17 : chaos_exit_crontab: Option<cron::Schedule>,
18 : }
19 :
20 0 : fn cron_to_next_duration(cron: &cron::Schedule) -> anyhow::Result<tokio::time::Sleep> {
21 : use chrono::Utc;
22 0 : let next = cron.upcoming(Utc).next().unwrap();
23 0 : let duration = (next - Utc::now()).to_std()?;
24 0 : Ok(tokio::time::sleep(duration))
25 0 : }
26 :
27 0 : async fn maybe_sleep(sleep: Option<tokio::time::Sleep>) -> Option<()> {
28 0 : if let Some(sleep) = sleep {
29 0 : sleep.await;
30 0 : Some(())
31 : } else {
32 0 : None
33 : }
34 0 : }
35 :
36 : impl ChaosInjector {
37 0 : pub fn new(
38 0 : service: Arc<Service>,
39 0 : interval: Duration,
40 0 : chaos_exit_crontab: Option<cron::Schedule>,
41 0 : ) -> Self {
42 0 : Self {
43 0 : service,
44 0 : interval,
45 0 : chaos_exit_crontab,
46 0 : }
47 0 : }
48 :
49 0 : pub async fn run(&mut self, cancel: CancellationToken) {
50 0 : let mut interval = tokio::time::interval(self.interval);
51 0 : let cron_interval = {
52 0 : if let Some(ref chaos_exit_crontab) = self.chaos_exit_crontab {
53 0 : match cron_to_next_duration(chaos_exit_crontab) {
54 0 : Ok(interval_exit) => Some(interval_exit),
55 0 : Err(e) => {
56 0 : tracing::error!("Error processing the cron schedule: {e}");
57 0 : None
58 : }
59 : }
60 : } else {
61 0 : None
62 : }
63 : };
64 : enum ChaosEvent {
65 : ShuffleTenant,
66 : ForceKill,
67 : }
68 0 : let chaos_type = tokio::select! {
69 0 : _ = interval.tick() => {
70 0 : ChaosEvent::ShuffleTenant
71 : }
72 0 : Some(_) = maybe_sleep(cron_interval) => {
73 0 : ChaosEvent::ForceKill
74 : }
75 0 : _ = cancel.cancelled() => {
76 0 : tracing::info!("Shutting down");
77 0 : return;
78 : }
79 : };
80 :
81 0 : match chaos_type {
82 : ChaosEvent::ShuffleTenant => {
83 0 : self.inject_chaos().await;
84 : }
85 : ChaosEvent::ForceKill => {
86 0 : self.force_kill().await;
87 : }
88 : }
89 :
90 0 : tracing::info!("Chaos iteration...");
91 0 : }
92 :
93 : /// If a shard has a secondary and attached location, then re-assign the secondary to be
94 : /// attached and the attached to be secondary.
95 : ///
96 : /// Only modifies tenants if they're in Active scheduling policy.
97 0 : fn maybe_migrate_to_secondary(
98 0 : &self,
99 0 : tenant_shard_id: TenantShardId,
100 0 : nodes: &Arc<HashMap<NodeId, Node>>,
101 0 : tenants: &mut BTreeMap<TenantShardId, TenantShard>,
102 0 : scheduler: &mut Scheduler,
103 0 : ) {
104 0 : let shard = tenants
105 0 : .get_mut(&tenant_shard_id)
106 0 : .expect("Held lock between choosing ID and this get");
107 :
108 0 : if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
109 : // Skip non-active scheduling policies, so that a shard with a policy like Pause can
110 : // be pinned without being disrupted by us.
111 0 : tracing::info!(
112 0 : "Skipping shard {tenant_shard_id}: scheduling policy is {:?}",
113 0 : shard.get_scheduling_policy()
114 : );
115 0 : return;
116 0 : }
117 :
118 : // Pick a secondary to promote
119 0 : let Some(new_location) = shard
120 0 : .intent
121 0 : .get_secondary()
122 0 : .choose(&mut thread_rng())
123 0 : .cloned()
124 : else {
125 0 : tracing::info!(
126 0 : "Skipping shard {tenant_shard_id}: no secondary location, can't migrate"
127 : );
128 0 : return;
129 : };
130 :
131 0 : let Some(old_location) = *shard.intent.get_attached() else {
132 0 : tracing::info!("Skipping shard {tenant_shard_id}: currently has no attached location");
133 0 : return;
134 : };
135 :
136 0 : tracing::info!("Injecting chaos: migrate {tenant_shard_id} {old_location}->{new_location}");
137 :
138 0 : shard.intent.demote_attached(scheduler, old_location);
139 0 : shard.intent.promote_attached(scheduler, new_location);
140 0 : self.service.maybe_reconcile_shard(
141 0 : shard,
142 0 : nodes,
143 0 : crate::reconciler::ReconcilerPriority::Normal,
144 0 : );
145 0 : }
146 :
147 0 : async fn force_kill(&mut self) {
148 0 : tracing::warn!("Injecting chaos: force kill");
149 0 : std::process::exit(1);
150 : }
151 :
152 0 : async fn inject_chaos(&mut self) {
153 0 : // Pick some shards to interfere with
154 0 : let batch_size = 128;
155 0 : let mut inner = self.service.inner.write().unwrap();
156 0 : let (nodes, tenants, scheduler) = inner.parts_mut();
157 0 :
158 0 : // Prefer to migrate tenants that are currently outside their home AZ. This avoids the chaos injector
159 0 : // continuously pushing tenants outside their home AZ: instead, we'll tend to cycle between picking some
160 0 : // random tenants to move, and then on next chaos iteration moving them back, then picking some new
161 0 : // random tenants on the next iteration.
162 0 : let (out_of_home_az, in_home_az): (Vec<_>, Vec<_>) = tenants
163 0 : .values()
164 0 : .map(|shard| {
165 0 : (
166 0 : shard.tenant_shard_id,
167 0 : shard.is_attached_outside_preferred_az(nodes),
168 0 : )
169 0 : })
170 0 : .partition(|(_id, is_outside)| *is_outside);
171 0 :
172 0 : let mut out_of_home_az: Vec<_> = out_of_home_az.into_iter().map(|(id, _)| id).collect();
173 0 : let mut in_home_az: Vec<_> = in_home_az.into_iter().map(|(id, _)| id).collect();
174 0 :
175 0 : let mut victims = Vec::with_capacity(batch_size);
176 0 : if out_of_home_az.len() >= batch_size {
177 0 : tracing::info!(
178 0 : "Injecting chaos: found {batch_size} shards to migrate back to home AZ (total {} out of home AZ)",
179 0 : out_of_home_az.len()
180 : );
181 :
182 0 : out_of_home_az.shuffle(&mut thread_rng());
183 0 : victims.extend(out_of_home_az.into_iter().take(batch_size));
184 : } else {
185 0 : tracing::info!(
186 0 : "Injecting chaos: found {} shards to migrate back to home AZ, picking {} random shards to migrate",
187 0 : out_of_home_az.len(),
188 0 : std::cmp::min(batch_size - out_of_home_az.len(), in_home_az.len())
189 : );
190 :
191 0 : victims.extend(out_of_home_az);
192 0 : in_home_az.shuffle(&mut thread_rng());
193 0 : victims.extend(in_home_az.into_iter().take(batch_size - victims.len()));
194 : }
195 :
196 0 : for victim in victims {
197 0 : self.maybe_migrate_to_secondary(victim, nodes, tenants, scheduler);
198 0 : }
199 0 : }
200 : }
|