Line data Source code
1 : use std::collections::BTreeMap;
2 : use std::sync::Arc;
3 :
4 : use utils::id::TenantId;
5 : use utils::shard::TenantShardId;
6 :
7 : use crate::scheduler::{ScheduleContext, ScheduleMode};
8 : use crate::tenant_shard::TenantShard;
9 :
10 : use super::Service;
11 :
12 : /// Exclusive iterator over all tenant shards.
13 : /// It is used to iterate over consistent tenants state at specific point in time.
14 : ///
15 : /// When making scheduling decisions, it is useful to have the ScheduleContext for a whole
16 : /// tenant while considering the individual shards within it. This iterator is a helper
17 : /// that gathers all the shards in a tenant and then yields them together with a ScheduleContext
18 : /// for the tenant.
19 : pub(super) struct TenantShardExclusiveIterator<'a> {
20 : schedule_mode: ScheduleMode,
21 : inner: std::collections::btree_map::IterMut<'a, TenantShardId, TenantShard>,
22 : }
23 :
24 : impl<'a> TenantShardExclusiveIterator<'a> {
25 1 : pub(super) fn new(
26 1 : tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
27 1 : schedule_mode: ScheduleMode,
28 1 : ) -> Self {
29 1 : Self {
30 1 : schedule_mode,
31 1 : inner: tenants.iter_mut(),
32 1 : }
33 1 : }
34 : }
35 :
36 : impl<'a> Iterator for TenantShardExclusiveIterator<'a> {
37 : type Item = (TenantId, ScheduleContext, Vec<&'a mut TenantShard>);
38 :
39 3 : fn next(&mut self) -> Option<Self::Item> {
40 3 : let mut tenant_shards = Vec::new();
41 3 : let mut schedule_context = ScheduleContext::new(self.schedule_mode.clone());
42 : loop {
43 6 : let (tenant_shard_id, shard) = self.inner.next()?;
44 :
45 6 : if tenant_shard_id.is_shard_zero() {
46 : // Cleared on last shard of previous tenant
47 3 : assert!(tenant_shards.is_empty());
48 3 : }
49 :
50 : // Accumulate the schedule context for all the shards in a tenant
51 6 : schedule_context.avoid(&shard.intent.all_pageservers());
52 6 : tenant_shards.push(shard);
53 :
54 6 : if tenant_shard_id.shard_number.0 == tenant_shard_id.shard_count.count() - 1 {
55 3 : return Some((tenant_shard_id.tenant_id, schedule_context, tenant_shards));
56 3 : }
57 : }
58 3 : }
59 : }
60 :
61 : /// Shared iterator over all tenant shards.
62 : /// It is used to iterate over all tenants without blocking another code, working with tenants
63 : ///
64 : /// A simple iterator which can be used in tandem with [`crate::service::Service`]
65 : /// to iterate over all known tenant shard ids without holding the lock on the
66 : /// service state at all times.
67 : pub(crate) struct TenantShardSharedIterator<F> {
68 : tenants_accessor: F,
69 : inspected_all_shards: bool,
70 : last_inspected_shard: Option<TenantShardId>,
71 : }
72 :
73 : impl<F> TenantShardSharedIterator<F>
74 : where
75 : F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
76 : {
77 1 : pub(crate) fn new(tenants_accessor: F) -> Self {
78 1 : Self {
79 1 : tenants_accessor,
80 1 : inspected_all_shards: false,
81 1 : last_inspected_shard: None,
82 1 : }
83 1 : }
84 :
85 0 : pub(crate) fn finished(&self) -> bool {
86 0 : self.inspected_all_shards
87 0 : }
88 : }
89 :
90 : impl<F> Iterator for TenantShardSharedIterator<F>
91 : where
92 : F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
93 : {
94 : // TODO(ephemeralsad): consider adding schedule context to the iterator
95 : type Item = TenantShardId;
96 :
97 : /// Returns the next tenant shard id if one exists
98 9 : fn next(&mut self) -> Option<Self::Item> {
99 9 : if self.inspected_all_shards {
100 0 : return None;
101 9 : }
102 :
103 9 : match (self.tenants_accessor)(self.last_inspected_shard) {
104 8 : Some(tid) => {
105 8 : self.last_inspected_shard = Some(tid);
106 8 : Some(tid)
107 : }
108 : None => {
109 1 : self.inspected_all_shards = true;
110 1 : None
111 : }
112 : }
113 9 : }
114 : }
115 :
116 0 : pub(crate) fn create_shared_shard_iterator(
117 0 : service: Arc<Service>,
118 0 : ) -> TenantShardSharedIterator<impl Fn(Option<TenantShardId>) -> Option<TenantShardId>> {
119 0 : let tenants_accessor = move |last_inspected_shard: Option<TenantShardId>| {
120 0 : let locked = &service.inner.read().unwrap();
121 0 : let tenants = &locked.tenants;
122 0 : let entry = match last_inspected_shard {
123 0 : Some(skip_past) => {
124 : // Skip to the last seen tenant shard id
125 0 : let mut cursor = tenants.iter().skip_while(|(tid, _)| **tid != skip_past);
126 :
127 : // Skip past the last seen
128 0 : cursor.nth(1)
129 : }
130 0 : None => tenants.first_key_value(),
131 : };
132 :
133 0 : entry.map(|(tid, _)| tid).copied()
134 0 : };
135 :
136 0 : TenantShardSharedIterator::new(tenants_accessor)
137 0 : }
138 :
139 : #[cfg(test)]
140 : mod tests {
141 : use std::collections::BTreeMap;
142 : use std::str::FromStr;
143 : use std::sync::Arc;
144 :
145 : use pageserver_api::controller_api::PlacementPolicy;
146 : use utils::id::TenantId;
147 : use utils::shard::{ShardCount, ShardNumber, TenantShardId};
148 :
149 : use super::*;
150 : use crate::scheduler::test_utils::make_test_nodes;
151 : use crate::service::Scheduler;
152 : use crate::tenant_shard::tests::make_test_tenant_with_id;
153 :
154 : #[test]
155 1 : fn test_exclusive_shard_iterator() {
156 : // Hand-crafted tenant IDs to ensure they appear in the expected order when put into
157 : // a btreemap & iterated
158 1 : let mut t_1_shards = make_test_tenant_with_id(
159 1 : TenantId::from_str("af0480929707ee75372337efaa5ecf96").unwrap(),
160 1 : PlacementPolicy::Attached(1),
161 1 : ShardCount(1),
162 1 : None,
163 : );
164 1 : let t_2_shards = make_test_tenant_with_id(
165 1 : TenantId::from_str("bf0480929707ee75372337efaa5ecf96").unwrap(),
166 1 : PlacementPolicy::Attached(1),
167 1 : ShardCount(4),
168 1 : None,
169 : );
170 1 : let mut t_3_shards = make_test_tenant_with_id(
171 1 : TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(),
172 1 : PlacementPolicy::Attached(1),
173 1 : ShardCount(1),
174 1 : None,
175 : );
176 :
177 1 : let t1_id = t_1_shards[0].tenant_shard_id.tenant_id;
178 1 : let t2_id = t_2_shards[0].tenant_shard_id.tenant_id;
179 1 : let t3_id = t_3_shards[0].tenant_shard_id.tenant_id;
180 :
181 1 : let mut tenants = BTreeMap::new();
182 1 : tenants.insert(t_1_shards[0].tenant_shard_id, t_1_shards.pop().unwrap());
183 5 : for shard in t_2_shards {
184 4 : tenants.insert(shard.tenant_shard_id, shard);
185 4 : }
186 1 : tenants.insert(t_3_shards[0].tenant_shard_id, t_3_shards.pop().unwrap());
187 :
188 1 : let nodes = make_test_nodes(3, &[]);
189 1 : let mut scheduler = Scheduler::new(nodes.values());
190 1 : let mut context = ScheduleContext::default();
191 6 : for shard in tenants.values_mut() {
192 6 : shard.schedule(&mut scheduler, &mut context).unwrap();
193 6 : }
194 :
195 1 : let mut iter = TenantShardExclusiveIterator::new(&mut tenants, ScheduleMode::Speculative);
196 1 : let (tenant_id, context, shards) = iter.next().unwrap();
197 1 : assert_eq!(tenant_id, t1_id);
198 1 : assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
199 1 : assert_eq!(shards.len(), 1);
200 1 : assert_eq!(context.location_count(), 2);
201 :
202 1 : let (tenant_id, context, shards) = iter.next().unwrap();
203 1 : assert_eq!(tenant_id, t2_id);
204 1 : assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
205 1 : assert_eq!(shards[1].tenant_shard_id.shard_number, ShardNumber(1));
206 1 : assert_eq!(shards[2].tenant_shard_id.shard_number, ShardNumber(2));
207 1 : assert_eq!(shards[3].tenant_shard_id.shard_number, ShardNumber(3));
208 1 : assert_eq!(shards.len(), 4);
209 1 : assert_eq!(context.location_count(), 8);
210 :
211 1 : let (tenant_id, context, shards) = iter.next().unwrap();
212 1 : assert_eq!(tenant_id, t3_id);
213 1 : assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
214 1 : assert_eq!(shards.len(), 1);
215 1 : assert_eq!(context.location_count(), 2);
216 :
217 6 : for shard in tenants.values_mut() {
218 6 : shard.intent.clear(&mut scheduler);
219 6 : }
220 1 : }
221 :
222 : #[test]
223 1 : fn test_shared_shard_iterator() {
224 1 : let tenant_id = TenantId::generate();
225 1 : let shard_count = ShardCount(8);
226 :
227 1 : let mut tenant_shards = Vec::default();
228 8 : for i in 0..shard_count.0 {
229 8 : tenant_shards.push((
230 8 : TenantShardId {
231 8 : tenant_id,
232 8 : shard_number: ShardNumber(i),
233 8 : shard_count,
234 8 : },
235 8 : (),
236 8 : ))
237 : }
238 :
239 1 : let tenant_shards = Arc::new(tenant_shards);
240 :
241 1 : let tid_iter = TenantShardSharedIterator::new({
242 1 : let tenants = tenant_shards.clone();
243 9 : move |last_inspected_shard: Option<TenantShardId>| {
244 9 : let entry = match last_inspected_shard {
245 8 : Some(skip_past) => {
246 36 : let mut cursor = tenants.iter().skip_while(|(tid, _)| *tid != skip_past);
247 8 : cursor.nth(1)
248 : }
249 1 : None => tenants.first(),
250 : };
251 :
252 9 : entry.map(|(tid, _)| tid).copied()
253 9 : }
254 : });
255 :
256 1 : let mut iterated_over = Vec::default();
257 9 : for tid in tid_iter {
258 8 : iterated_over.push((tid, ()));
259 8 : }
260 :
261 1 : assert_eq!(iterated_over, *tenant_shards);
262 1 : }
263 : }
|