Line data Source code
1 : use std::collections::BTreeMap;
2 :
3 : use utils::id::TenantId;
4 : use utils::shard::TenantShardId;
5 :
6 : use crate::scheduler::{ScheduleContext, ScheduleMode};
7 : use crate::tenant_shard::TenantShard;
8 :
9 : /// When making scheduling decisions, it is useful to have the ScheduleContext for a whole
10 : /// tenant while considering the individual shards within it. This iterator is a helper
11 : /// that gathers all the shards in a tenant and then yields them together with a ScheduleContext
12 : /// for the tenant.
13 : pub(super) struct TenantShardContextIterator<'a> {
14 : schedule_mode: ScheduleMode,
15 : inner: std::collections::btree_map::IterMut<'a, TenantShardId, TenantShard>,
16 : }
17 :
18 : impl<'a> TenantShardContextIterator<'a> {
19 1 : pub(super) fn new(
20 1 : tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
21 1 : schedule_mode: ScheduleMode,
22 1 : ) -> Self {
23 1 : Self {
24 1 : schedule_mode,
25 1 : inner: tenants.iter_mut(),
26 1 : }
27 1 : }
28 : }
29 :
30 : impl<'a> Iterator for TenantShardContextIterator<'a> {
31 : type Item = (TenantId, ScheduleContext, Vec<&'a mut TenantShard>);
32 :
33 3 : fn next(&mut self) -> Option<Self::Item> {
34 3 : let mut tenant_shards = Vec::new();
35 3 : let mut schedule_context = ScheduleContext::new(self.schedule_mode.clone());
36 : loop {
37 6 : let (tenant_shard_id, shard) = self.inner.next()?;
38 :
39 6 : if tenant_shard_id.is_shard_zero() {
40 : // Cleared on last shard of previous tenant
41 3 : assert!(tenant_shards.is_empty());
42 3 : }
43 :
44 : // Accumulate the schedule context for all the shards in a tenant
45 6 : schedule_context.avoid(&shard.intent.all_pageservers());
46 6 : tenant_shards.push(shard);
47 6 :
48 6 : if tenant_shard_id.shard_number.0 == tenant_shard_id.shard_count.count() - 1 {
49 3 : return Some((tenant_shard_id.tenant_id, schedule_context, tenant_shards));
50 3 : }
51 : }
52 3 : }
53 : }
54 :
55 : #[cfg(test)]
56 : mod tests {
57 : use std::{collections::BTreeMap, str::FromStr};
58 :
59 : use pageserver_api::controller_api::PlacementPolicy;
60 : use utils::shard::{ShardCount, ShardNumber};
61 :
62 : use crate::{
63 : scheduler::test_utils::make_test_nodes, service::Scheduler,
64 : tenant_shard::tests::make_test_tenant_with_id,
65 : };
66 :
67 : use super::*;
68 :
69 : #[test]
70 1 : fn test_context_iterator() {
71 1 : // Hand-crafted tenant IDs to ensure they appear in the expected order when put into
72 1 : // a btreemap & iterated
73 1 : let mut t_1_shards = make_test_tenant_with_id(
74 1 : TenantId::from_str("af0480929707ee75372337efaa5ecf96").unwrap(),
75 1 : PlacementPolicy::Attached(1),
76 1 : ShardCount(1),
77 1 : None,
78 1 : );
79 1 : let t_2_shards = make_test_tenant_with_id(
80 1 : TenantId::from_str("bf0480929707ee75372337efaa5ecf96").unwrap(),
81 1 : PlacementPolicy::Attached(1),
82 1 : ShardCount(4),
83 1 : None,
84 1 : );
85 1 : let mut t_3_shards = make_test_tenant_with_id(
86 1 : TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(),
87 1 : PlacementPolicy::Attached(1),
88 1 : ShardCount(1),
89 1 : None,
90 1 : );
91 1 :
92 1 : let t1_id = t_1_shards[0].tenant_shard_id.tenant_id;
93 1 : let t2_id = t_2_shards[0].tenant_shard_id.tenant_id;
94 1 : let t3_id = t_3_shards[0].tenant_shard_id.tenant_id;
95 1 :
96 1 : let mut tenants = BTreeMap::new();
97 1 : tenants.insert(t_1_shards[0].tenant_shard_id, t_1_shards.pop().unwrap());
98 5 : for shard in t_2_shards {
99 4 : tenants.insert(shard.tenant_shard_id, shard);
100 4 : }
101 1 : tenants.insert(t_3_shards[0].tenant_shard_id, t_3_shards.pop().unwrap());
102 1 :
103 1 : let nodes = make_test_nodes(3, &[]);
104 1 : let mut scheduler = Scheduler::new(nodes.values());
105 1 : let mut context = ScheduleContext::default();
106 6 : for shard in tenants.values_mut() {
107 6 : shard.schedule(&mut scheduler, &mut context).unwrap();
108 6 : }
109 :
110 1 : let mut iter = TenantShardContextIterator::new(&mut tenants, ScheduleMode::Speculative);
111 1 : let (tenant_id, context, shards) = iter.next().unwrap();
112 1 : assert_eq!(tenant_id, t1_id);
113 1 : assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
114 1 : assert_eq!(shards.len(), 1);
115 1 : assert_eq!(context.location_count(), 2);
116 :
117 1 : let (tenant_id, context, shards) = iter.next().unwrap();
118 1 : assert_eq!(tenant_id, t2_id);
119 1 : assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
120 1 : assert_eq!(shards[1].tenant_shard_id.shard_number, ShardNumber(1));
121 1 : assert_eq!(shards[2].tenant_shard_id.shard_number, ShardNumber(2));
122 1 : assert_eq!(shards[3].tenant_shard_id.shard_number, ShardNumber(3));
123 1 : assert_eq!(shards.len(), 4);
124 1 : assert_eq!(context.location_count(), 8);
125 :
126 1 : let (tenant_id, context, shards) = iter.next().unwrap();
127 1 : assert_eq!(tenant_id, t3_id);
128 1 : assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
129 1 : assert_eq!(shards.len(), 1);
130 1 : assert_eq!(context.location_count(), 2);
131 :
132 6 : for shard in tenants.values_mut() {
133 6 : shard.intent.clear(&mut scheduler);
134 6 : }
135 1 : }
136 : }
|