Line data Source code
1 : use std::collections::{BTreeMap, HashMap};
2 : use std::sync::Arc;
3 : use std::time::Duration;
4 :
5 : use pageserver_api::controller_api::ShardSchedulingPolicy;
6 : use rand::seq::SliceRandom;
7 : use rand::{Rng, thread_rng};
8 : use tokio_util::sync::CancellationToken;
9 : use utils::id::NodeId;
10 : use utils::shard::TenantShardId;
11 :
12 : use super::{Node, Scheduler, Service, TenantShard};
13 :
14 : pub struct ChaosInjector {
15 : service: Arc<Service>,
16 : interval: Duration,
17 : chaos_exit_crontab: Option<cron::Schedule>,
18 : }
19 :
20 0 : fn cron_to_next_duration(cron: &cron::Schedule) -> anyhow::Result<tokio::time::Sleep> {
21 : use chrono::Utc;
22 0 : let next = cron.upcoming(Utc).next().unwrap();
23 0 : let duration = (next - Utc::now()).to_std()?;
24 0 : Ok(tokio::time::sleep(duration))
25 0 : }
26 :
27 0 : async fn maybe_sleep(sleep: Option<tokio::time::Sleep>) -> Option<()> {
28 0 : if let Some(sleep) = sleep {
29 0 : sleep.await;
30 0 : Some(())
31 : } else {
32 0 : None
33 : }
34 0 : }
35 :
36 : impl ChaosInjector {
37 0 : pub fn new(
38 0 : service: Arc<Service>,
39 0 : interval: Duration,
40 0 : chaos_exit_crontab: Option<cron::Schedule>,
41 0 : ) -> Self {
42 0 : Self {
43 0 : service,
44 0 : interval,
45 0 : chaos_exit_crontab,
46 0 : }
47 0 : }
48 :
49 0 : fn get_cron_interval_sleep_future(&self) -> Option<tokio::time::Sleep> {
50 0 : if let Some(ref chaos_exit_crontab) = self.chaos_exit_crontab {
51 0 : match cron_to_next_duration(chaos_exit_crontab) {
52 0 : Ok(interval_exit) => Some(interval_exit),
53 0 : Err(e) => {
54 0 : tracing::error!("Error processing the cron schedule: {e}");
55 0 : None
56 : }
57 : }
58 : } else {
59 0 : None
60 : }
61 0 : }
62 :
63 0 : pub async fn run(&mut self, cancel: CancellationToken) {
64 0 : let mut interval = tokio::time::interval(self.interval);
65 : #[derive(Debug)]
66 : enum ChaosEvent {
67 : MigrationsToSecondary,
68 : ForceKillController,
69 : GracefulMigrationsAnywhere,
70 : }
71 : loop {
72 0 : let cron_interval = self.get_cron_interval_sleep_future();
73 0 : let chaos_type = tokio::select! {
74 0 : _ = interval.tick() => {
75 0 : if thread_rng().gen_bool(0.5) {
76 0 : ChaosEvent::MigrationsToSecondary
77 : } else {
78 0 : ChaosEvent::GracefulMigrationsAnywhere
79 : }
80 : }
81 0 : Some(_) = maybe_sleep(cron_interval) => {
82 0 : ChaosEvent::ForceKillController
83 : }
84 0 : _ = cancel.cancelled() => {
85 0 : tracing::info!("Shutting down");
86 0 : return;
87 : }
88 : };
89 0 : tracing::info!("Chaos iteration: {chaos_type:?}...");
90 0 : match chaos_type {
91 0 : ChaosEvent::MigrationsToSecondary => {
92 0 : self.inject_migrations_to_secondary();
93 0 : }
94 0 : ChaosEvent::GracefulMigrationsAnywhere => {
95 0 : self.inject_graceful_migrations_anywhere();
96 0 : }
97 : ChaosEvent::ForceKillController => {
98 0 : self.force_kill().await;
99 : }
100 : }
101 : }
102 0 : }
103 :
104 0 : fn is_shard_eligible_for_chaos(&self, shard: &TenantShard) -> bool {
105 : // - Skip non-active scheduling policies, so that a shard with a policy like Pause can
106 : // be pinned without being disrupted by us.
107 : // - Skip shards doing a graceful migration already, so that we allow these to run to
108 : // completion rather than only exercising the first part and then cancelling with
109 : // some other chaos.
110 0 : !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active)
111 0 : && shard.get_preferred_node().is_none()
112 0 : }
113 :
114 : /// If a shard has a secondary and attached location, then re-assign the secondary to be
115 : /// attached and the attached to be secondary.
116 : ///
117 : /// Only modifies tenants if they're in Active scheduling policy.
118 0 : fn maybe_migrate_to_secondary(
119 0 : &self,
120 0 : tenant_shard_id: TenantShardId,
121 0 : nodes: &Arc<HashMap<NodeId, Node>>,
122 0 : tenants: &mut BTreeMap<TenantShardId, TenantShard>,
123 0 : scheduler: &mut Scheduler,
124 0 : ) {
125 0 : let shard = tenants
126 0 : .get_mut(&tenant_shard_id)
127 0 : .expect("Held lock between choosing ID and this get");
128 0 :
129 0 : if !self.is_shard_eligible_for_chaos(shard) {
130 0 : return;
131 0 : }
132 :
133 : // Pick a secondary to promote
134 0 : let Some(new_location) = shard
135 0 : .intent
136 0 : .get_secondary()
137 0 : .choose(&mut thread_rng())
138 0 : .cloned()
139 : else {
140 0 : tracing::info!(
141 0 : "Skipping shard {tenant_shard_id}: no secondary location, can't migrate"
142 : );
143 0 : return;
144 : };
145 :
146 0 : let Some(old_location) = *shard.intent.get_attached() else {
147 0 : tracing::info!("Skipping shard {tenant_shard_id}: currently has no attached location");
148 0 : return;
149 : };
150 :
151 0 : tracing::info!("Injecting chaos: migrate {tenant_shard_id} {old_location}->{new_location}");
152 :
153 0 : shard.intent.demote_attached(scheduler, old_location);
154 0 : shard.intent.promote_attached(scheduler, new_location);
155 0 : self.service.maybe_reconcile_shard(
156 0 : shard,
157 0 : nodes,
158 0 : crate::reconciler::ReconcilerPriority::Normal,
159 0 : );
160 0 : }
161 :
162 0 : async fn force_kill(&mut self) {
163 0 : tracing::warn!("Injecting chaos: force kill");
164 0 : std::process::exit(1);
165 : }
166 :
167 : // Unlike [`Self::inject_migrations_to_secondary`], this function will not only cut over to secondary, it
168 : // will migrate a tenant to a random node in its home AZ using a graceful migration of the same type
169 : // that my be initiated by an API caller using prewarm=true.
170 : //
171 : // This is a much more expensive operation in terms of I/O and time, as we will fully warm up
172 : // some new location in order to migrate the tenant there. For that reason we do far fewer of these.
173 0 : fn inject_graceful_migrations_anywhere(&mut self) {
174 0 : let batch_size = 1;
175 0 : let mut inner = self.service.inner.write().unwrap();
176 0 : let (nodes, tenants, _scheduler) = inner.parts_mut();
177 0 :
178 0 : let mut candidates = tenants
179 0 : .values_mut()
180 0 : .filter(|shard| self.is_shard_eligible_for_chaos(shard))
181 0 : .collect::<Vec<_>>();
182 0 :
183 0 : tracing::info!(
184 0 : "Injecting chaos: found {} candidates for graceful migrations anywhere",
185 0 : candidates.len()
186 : );
187 :
188 0 : let mut victims: Vec<&mut TenantShard> = Vec::new();
189 :
190 : // Pick our victims: use a hand-rolled loop rather than choose_multiple() because we want
191 : // to take the mutable refs from our candidates rather than ref'ing them.
192 0 : while !candidates.is_empty() && victims.len() < batch_size {
193 0 : let i = thread_rng().gen_range(0..candidates.len());
194 0 : victims.push(candidates.swap_remove(i));
195 0 : }
196 :
197 0 : for victim in victims.into_iter() {
198 : // Find a node in the same AZ as the shard, or if the shard has no AZ preference, which
199 : // is not where they are currently attached.
200 0 : let candidate_nodes = nodes
201 0 : .values()
202 0 : .filter(|node| {
203 0 : if let Some(preferred_az) = victim.preferred_az() {
204 0 : node.get_availability_zone_id() == preferred_az
205 0 : } else if let Some(attached) = *victim.intent.get_attached() {
206 0 : node.get_id() != attached
207 : } else {
208 0 : true
209 : }
210 0 : })
211 0 : .collect::<Vec<_>>();
212 :
213 0 : let Some(victim_node) = candidate_nodes.choose(&mut thread_rng()) else {
214 : // This can happen if e.g. we are in a small region with only one pageserver per AZ.
215 0 : tracing::info!(
216 0 : "no candidate nodes found for migrating shard {tenant_shard_id} within its home AZ",
217 : tenant_shard_id = victim.tenant_shard_id
218 : );
219 0 : continue;
220 : };
221 :
222 : // This doesn't change intent immediately: next iteration of Service::optimize_all should do that. We avoid
223 : // doing it here because applying optimizations requires dropping lock to do some async work to check the optimisation
224 : // is valid given remote state, and it would be a shame to duplicate that dance here.
225 0 : tracing::info!(
226 0 : "Injecting chaos: migrate {} to {}",
227 : victim.tenant_shard_id,
228 : victim_node
229 : );
230 0 : victim.set_preferred_node(Some(victim_node.get_id()));
231 : }
232 0 : }
233 :
234 : /// Migrations of attached locations to their secondary location. This exercises reconciliation in general,
235 : /// live migration in particular, and the pageserver code for cleanly shutting down and starting up tenants
236 : /// during such migrations.
237 0 : fn inject_migrations_to_secondary(&mut self) {
238 0 : // Pick some shards to interfere with
239 0 : let batch_size = 128;
240 0 : let mut inner = self.service.inner.write().unwrap();
241 0 : let (nodes, tenants, scheduler) = inner.parts_mut();
242 0 :
243 0 : // Prefer to migrate tenants that are currently outside their home AZ. This avoids the chaos injector
244 0 : // continuously pushing tenants outside their home AZ: instead, we'll tend to cycle between picking some
245 0 : // random tenants to move, and then on next chaos iteration moving them back, then picking some new
246 0 : // random tenants on the next iteration.
247 0 : let (out_of_home_az, in_home_az): (Vec<_>, Vec<_>) = tenants
248 0 : .values()
249 0 : .map(|shard| {
250 0 : (
251 0 : shard.tenant_shard_id,
252 0 : shard.is_attached_outside_preferred_az(nodes),
253 0 : )
254 0 : })
255 0 : .partition(|(_id, is_outside)| *is_outside);
256 0 :
257 0 : let mut out_of_home_az: Vec<_> = out_of_home_az.into_iter().map(|(id, _)| id).collect();
258 0 : let mut in_home_az: Vec<_> = in_home_az.into_iter().map(|(id, _)| id).collect();
259 0 :
260 0 : let mut victims = Vec::with_capacity(batch_size);
261 0 : if out_of_home_az.len() >= batch_size {
262 0 : tracing::info!(
263 0 : "Injecting chaos: found {batch_size} shards to migrate back to home AZ (total {} out of home AZ)",
264 0 : out_of_home_az.len()
265 : );
266 :
267 0 : out_of_home_az.shuffle(&mut thread_rng());
268 0 : victims.extend(out_of_home_az.into_iter().take(batch_size));
269 : } else {
270 0 : tracing::info!(
271 0 : "Injecting chaos: found {} shards to migrate back to home AZ, picking {} random shards to migrate",
272 0 : out_of_home_az.len(),
273 0 : std::cmp::min(batch_size - out_of_home_az.len(), in_home_az.len())
274 : );
275 :
276 0 : victims.extend(out_of_home_az);
277 0 : in_home_az.shuffle(&mut thread_rng());
278 0 : victims.extend(in_home_az.into_iter().take(batch_size - victims.len()));
279 : }
280 :
281 0 : for victim in victims {
282 0 : self.maybe_migrate_to_secondary(victim, nodes, tenants, scheduler);
283 0 : }
284 0 : }
285 : }
|