Line data Source code
1 : use std::collections::{BTreeMap, HashMap};
2 : use std::sync::Arc;
3 :
4 : use pageserver_api::controller_api::{NodeSchedulingPolicy, ShardSchedulingPolicy};
5 : use utils::id::NodeId;
6 : use utils::shard::TenantShardId;
7 :
8 : use crate::background_node_operations::OperationError;
9 : use crate::node::Node;
10 : use crate::scheduler::Scheduler;
11 : use crate::tenant_shard::TenantShard;
12 :
13 : pub(crate) struct TenantShardIterator<F> {
14 : tenants_accessor: F,
15 : inspected_all_shards: bool,
16 : last_inspected_shard: Option<TenantShardId>,
17 : }
18 :
19 : /// A simple iterator which can be used in tandem with [`crate::service::Service`]
20 : /// to iterate over all known tenant shard ids without holding the lock on the
21 : /// service state at all times.
22 : impl<F> TenantShardIterator<F>
23 : where
24 : F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
25 : {
26 1 : pub(crate) fn new(tenants_accessor: F) -> Self {
27 1 : Self {
28 1 : tenants_accessor,
29 1 : inspected_all_shards: false,
30 1 : last_inspected_shard: None,
31 1 : }
32 1 : }
33 :
34 : /// Returns the next tenant shard id if one exists
35 9 : pub(crate) fn next(&mut self) -> Option<TenantShardId> {
36 9 : if self.inspected_all_shards {
37 0 : return None;
38 9 : }
39 9 :
40 9 : match (self.tenants_accessor)(self.last_inspected_shard) {
41 8 : Some(tid) => {
42 8 : self.last_inspected_shard = Some(tid);
43 8 : Some(tid)
44 : }
45 : None => {
46 1 : self.inspected_all_shards = true;
47 1 : None
48 : }
49 : }
50 9 : }
51 :
52 : /// Returns true when the end of the iterator is reached and false otherwise
53 0 : pub(crate) fn finished(&self) -> bool {
54 0 : self.inspected_all_shards
55 0 : }
56 : }
57 :
58 : /// Check that the state of the node being drained is as expected:
59 : /// node is present in memory and scheduling policy is set to [`NodeSchedulingPolicy::Draining`]
60 0 : pub(crate) fn validate_node_state(
61 0 : node_id: &NodeId,
62 0 : nodes: Arc<HashMap<NodeId, Node>>,
63 0 : ) -> Result<(), OperationError> {
64 0 : let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged(
65 0 : format!("node {} was removed", node_id).into(),
66 0 : ))?;
67 :
68 0 : let current_policy = node.get_scheduling();
69 0 : if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
70 : // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
71 : // about it
72 0 : return Err(OperationError::NodeStateChanged(
73 0 : format!("node {} changed state to {:?}", node_id, current_policy).into(),
74 0 : ));
75 0 : }
76 0 :
77 0 : Ok(())
78 0 : }
79 :
80 : /// Struct that houses a few utility methods for draining pageserver nodes
81 : pub(crate) struct TenantShardDrain {
82 : pub(crate) drained_node: NodeId,
83 : pub(crate) tenant_shard_id: TenantShardId,
84 : }
85 :
86 : impl TenantShardDrain {
87 : /// Check if the tenant shard under question is eligible for drainining:
88 : /// it's primary attachment is on the node being drained
89 0 : pub(crate) fn tenant_shard_eligible_for_drain(
90 0 : &self,
91 0 : tenants: &BTreeMap<TenantShardId, TenantShard>,
92 0 : scheduler: &Scheduler,
93 0 : ) -> Option<NodeId> {
94 0 : let tenant_shard = tenants.get(&self.tenant_shard_id)?;
95 :
96 0 : if *tenant_shard.intent.get_attached() != Some(self.drained_node) {
97 0 : return None;
98 0 : }
99 0 :
100 0 : // Only tenants with a normal (Active) scheduling policy are proactively moved
101 0 : // around during a node drain. Shards which have been manually configured to a different
102 0 : // policy are only rescheduled by manual intervention.
103 0 : match tenant_shard.get_scheduling_policy() {
104 0 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {
105 0 : // A migration during drain is classed as 'essential' because it is required to
106 0 : // uphold our availability goals for the tenant: this shard is elegible for migration.
107 0 : }
108 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
109 : // If we have been asked to avoid rescheduling this shard, then do not migrate it during a drain
110 0 : return None;
111 : }
112 : }
113 :
114 0 : match tenant_shard.preferred_secondary(scheduler) {
115 0 : Some(node) => Some(node),
116 : None => {
117 0 : tracing::warn!(
118 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
119 0 : "No eligible secondary while draining {}", self.drained_node
120 : );
121 :
122 0 : None
123 : }
124 : }
125 0 : }
126 :
127 : /// Attempt to reschedule the tenant shard under question to one of its secondary locations
128 : /// Returns an Err when the operation should be aborted and Ok(None) when the tenant shard
129 : /// should be skipped.
130 0 : pub(crate) fn reschedule_to_secondary<'a>(
131 0 : &self,
132 0 : destination: NodeId,
133 0 : tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
134 0 : scheduler: &mut Scheduler,
135 0 : nodes: &Arc<HashMap<NodeId, Node>>,
136 0 : ) -> Result<Option<&'a mut TenantShard>, OperationError> {
137 0 : let tenant_shard = match tenants.get_mut(&self.tenant_shard_id) {
138 0 : Some(some) => some,
139 : None => {
140 : // Tenant shard was removed in the meantime.
141 : // Skip to the next one, but don't fail the overall operation
142 0 : return Ok(None);
143 : }
144 : };
145 :
146 0 : if !nodes.contains_key(&destination) {
147 0 : return Err(OperationError::NodeStateChanged(
148 0 : format!("node {} was removed", destination).into(),
149 0 : ));
150 0 : }
151 0 :
152 0 : if !tenant_shard.intent.get_secondary().contains(&destination) {
153 0 : tracing::info!(
154 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
155 0 : "Secondary moved away from {destination} during drain"
156 : );
157 :
158 0 : return Ok(None);
159 0 : }
160 0 :
161 0 : match tenant_shard.reschedule_to_secondary(Some(destination), scheduler) {
162 0 : Err(e) => {
163 0 : tracing::warn!(
164 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
165 0 : "Scheduling error when draining pageserver {} : {}", self.drained_node, e
166 : );
167 :
168 0 : Ok(None)
169 : }
170 : Ok(()) => {
171 0 : let scheduled_to = tenant_shard.intent.get_attached();
172 0 : tracing::info!(
173 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
174 0 : "Rescheduled shard while draining node {}: {} -> {:?}",
175 : self.drained_node,
176 : self.drained_node,
177 : scheduled_to
178 : );
179 :
180 0 : Ok(Some(tenant_shard))
181 : }
182 : }
183 0 : }
184 : }
185 :
186 : #[cfg(test)]
187 : mod tests {
188 : use std::sync::Arc;
189 :
190 : use utils::id::TenantId;
191 : use utils::shard::{ShardCount, ShardNumber, TenantShardId};
192 :
193 : use super::TenantShardIterator;
194 :
195 : #[test]
196 1 : fn test_tenant_shard_iterator() {
197 1 : let tenant_id = TenantId::generate();
198 1 : let shard_count = ShardCount(8);
199 1 :
200 1 : let mut tenant_shards = Vec::default();
201 8 : for i in 0..shard_count.0 {
202 8 : tenant_shards.push((
203 8 : TenantShardId {
204 8 : tenant_id,
205 8 : shard_number: ShardNumber(i),
206 8 : shard_count,
207 8 : },
208 8 : (),
209 8 : ))
210 : }
211 :
212 1 : let tenant_shards = Arc::new(tenant_shards);
213 1 :
214 1 : let mut tid_iter = TenantShardIterator::new({
215 1 : let tenants = tenant_shards.clone();
216 9 : move |last_inspected_shard: Option<TenantShardId>| {
217 9 : let entry = match last_inspected_shard {
218 8 : Some(skip_past) => {
219 36 : let mut cursor = tenants.iter().skip_while(|(tid, _)| *tid != skip_past);
220 8 : cursor.nth(1)
221 : }
222 1 : None => tenants.first(),
223 : };
224 :
225 9 : entry.map(|(tid, _)| tid).copied()
226 9 : }
227 1 : });
228 1 :
229 1 : let mut iterated_over = Vec::default();
230 9 : while let Some(tid) = tid_iter.next() {
231 8 : iterated_over.push((tid, ()));
232 8 : }
233 :
234 1 : assert_eq!(iterated_over, *tenant_shards);
235 1 : }
236 : }
|