Line data Source code
1 : use std::{sync::Arc, time::Duration};
2 :
3 : use pageserver_api::controller_api::ShardSchedulingPolicy;
4 : use rand::seq::SliceRandom;
5 : use rand::thread_rng;
6 : use tokio_util::sync::CancellationToken;
7 :
8 : use super::Service;
9 :
10 : pub struct ChaosInjector {
11 : service: Arc<Service>,
12 : interval: Duration,
13 : }
14 :
15 : impl ChaosInjector {
16 0 : pub fn new(service: Arc<Service>, interval: Duration) -> Self {
17 0 : Self { service, interval }
18 0 : }
19 :
20 0 : pub async fn run(&mut self, cancel: CancellationToken) {
21 0 : let mut interval = tokio::time::interval(self.interval);
22 :
23 : loop {
24 0 : tokio::select! {
25 0 : _ = interval.tick() => {}
26 0 : _ = cancel.cancelled() => {
27 0 : tracing::info!("Shutting down");
28 0 : return;
29 : }
30 : }
31 :
32 0 : self.inject_chaos().await;
33 :
34 0 : tracing::info!("Chaos iteration...");
35 : }
36 0 : }
37 :
38 0 : async fn inject_chaos(&mut self) {
39 0 : // Pick some shards to interfere with
40 0 : let batch_size = 128;
41 0 : let mut inner = self.service.inner.write().unwrap();
42 0 : let (nodes, tenants, scheduler) = inner.parts_mut();
43 0 : let tenant_ids = tenants.keys().cloned().collect::<Vec<_>>();
44 0 : let victims = tenant_ids.choose_multiple(&mut thread_rng(), batch_size);
45 :
46 0 : for victim in victims {
47 0 : let shard = tenants
48 0 : .get_mut(victim)
49 0 : .expect("Held lock between choosing ID and this get");
50 :
51 0 : if !matches!(shard.get_scheduling_policy(), ShardSchedulingPolicy::Active) {
52 : // Skip non-active scheduling policies, so that a shard with a policy like Pause can
53 : // be pinned without being disrupted by us.
54 0 : tracing::info!(
55 0 : "Skipping shard {victim}: scheduling policy is {:?}",
56 0 : shard.get_scheduling_policy()
57 : );
58 0 : continue;
59 0 : }
60 :
61 : // Pick a secondary to promote
62 0 : let Some(new_location) = shard
63 0 : .intent
64 0 : .get_secondary()
65 0 : .choose(&mut thread_rng())
66 0 : .cloned()
67 : else {
68 0 : tracing::info!("Skipping shard {victim}: no secondary location, can't migrate");
69 0 : continue;
70 : };
71 :
72 0 : let Some(old_location) = *shard.intent.get_attached() else {
73 0 : tracing::info!("Skipping shard {victim}: currently has no attached location");
74 0 : continue;
75 : };
76 :
77 0 : tracing::info!("Injecting chaos: migrate {victim} {old_location}->{new_location}");
78 :
79 0 : shard.intent.demote_attached(scheduler, old_location);
80 0 : shard.intent.promote_attached(scheduler, new_location);
81 0 : self.service.maybe_reconcile_shard(shard, nodes);
82 : }
83 0 : }
84 : }
|