Line data Source code
1 : use std::{
2 : collections::{BTreeMap, HashMap},
3 : sync::Arc,
4 : };
5 :
6 : use pageserver_api::controller_api::{NodeSchedulingPolicy, ShardSchedulingPolicy};
7 : use utils::{id::NodeId, shard::TenantShardId};
8 :
9 : use crate::{
10 : background_node_operations::OperationError, node::Node, scheduler::Scheduler,
11 : tenant_shard::TenantShard,
12 : };
13 :
14 : pub(crate) struct TenantShardIterator<F> {
15 : tenants_accessor: F,
16 : inspected_all_shards: bool,
17 : last_inspected_shard: Option<TenantShardId>,
18 : }
19 :
20 : /// A simple iterator which can be used in tandem with [`crate::service::Service`]
21 : /// to iterate over all known tenant shard ids without holding the lock on the
22 : /// service state at all times.
23 : impl<F> TenantShardIterator<F>
24 : where
25 : F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
26 : {
27 1 : pub(crate) fn new(tenants_accessor: F) -> Self {
28 1 : Self {
29 1 : tenants_accessor,
30 1 : inspected_all_shards: false,
31 1 : last_inspected_shard: None,
32 1 : }
33 1 : }
34 :
35 : /// Returns the next tenant shard id if one exists
36 9 : pub(crate) fn next(&mut self) -> Option<TenantShardId> {
37 9 : if self.inspected_all_shards {
38 0 : return None;
39 9 : }
40 9 :
41 9 : match (self.tenants_accessor)(self.last_inspected_shard) {
42 8 : Some(tid) => {
43 8 : self.last_inspected_shard = Some(tid);
44 8 : Some(tid)
45 : }
46 : None => {
47 1 : self.inspected_all_shards = true;
48 1 : None
49 : }
50 : }
51 9 : }
52 :
53 : /// Returns true when the end of the iterator is reached and false otherwise
54 0 : pub(crate) fn finished(&self) -> bool {
55 0 : self.inspected_all_shards
56 0 : }
57 : }
58 :
59 : /// Check that the state of the node being drained is as expected:
60 : /// node is present in memory and scheduling policy is set to [`NodeSchedulingPolicy::Draining`]
61 0 : pub(crate) fn validate_node_state(
62 0 : node_id: &NodeId,
63 0 : nodes: Arc<HashMap<NodeId, Node>>,
64 0 : ) -> Result<(), OperationError> {
65 0 : let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged(
66 0 : format!("node {} was removed", node_id).into(),
67 0 : ))?;
68 :
69 0 : let current_policy = node.get_scheduling();
70 0 : if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
71 : // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
72 : // about it
73 0 : return Err(OperationError::NodeStateChanged(
74 0 : format!("node {} changed state to {:?}", node_id, current_policy).into(),
75 0 : ));
76 0 : }
77 0 :
78 0 : Ok(())
79 0 : }
80 :
81 : /// Struct that houses a few utility methods for draining pageserver nodes
82 : pub(crate) struct TenantShardDrain {
83 : pub(crate) drained_node: NodeId,
84 : pub(crate) tenant_shard_id: TenantShardId,
85 : }
86 :
87 : impl TenantShardDrain {
88 : /// Check if the tenant shard under question is eligible for drainining:
89 : /// it's primary attachment is on the node being drained
90 0 : pub(crate) fn tenant_shard_eligible_for_drain(
91 0 : &self,
92 0 : tenants: &BTreeMap<TenantShardId, TenantShard>,
93 0 : scheduler: &Scheduler,
94 0 : ) -> Option<NodeId> {
95 0 : let tenant_shard = tenants.get(&self.tenant_shard_id)?;
96 :
97 0 : if *tenant_shard.intent.get_attached() != Some(self.drained_node) {
98 0 : return None;
99 0 : }
100 0 :
101 0 : // Only tenants with a normal (Active) scheduling policy are proactively moved
102 0 : // around during a node drain. Shards which have been manually configured to a different
103 0 : // policy are only rescheduled by manual intervention.
104 0 : match tenant_shard.get_scheduling_policy() {
105 0 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {
106 0 : // A migration during drain is classed as 'essential' because it is required to
107 0 : // uphold our availability goals for the tenant: this shard is elegible for migration.
108 0 : }
109 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
110 : // If we have been asked to avoid rescheduling this shard, then do not migrate it during a drain
111 0 : return None;
112 : }
113 : }
114 :
115 0 : match scheduler.node_preferred(tenant_shard.intent.get_secondary()) {
116 0 : Some(node) => Some(node),
117 : None => {
118 0 : tracing::warn!(
119 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
120 0 : "No eligible secondary while draining {}", self.drained_node
121 : );
122 :
123 0 : None
124 : }
125 : }
126 0 : }
127 :
128 : /// Attempt to reschedule the tenant shard under question to one of its secondary locations
129 : /// Returns an Err when the operation should be aborted and Ok(None) when the tenant shard
130 : /// should be skipped.
131 0 : pub(crate) fn reschedule_to_secondary<'a>(
132 0 : &self,
133 0 : destination: NodeId,
134 0 : tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
135 0 : scheduler: &mut Scheduler,
136 0 : nodes: &Arc<HashMap<NodeId, Node>>,
137 0 : ) -> Result<Option<&'a mut TenantShard>, OperationError> {
138 0 : let tenant_shard = match tenants.get_mut(&self.tenant_shard_id) {
139 0 : Some(some) => some,
140 : None => {
141 : // Tenant shard was removed in the meantime.
142 : // Skip to the next one, but don't fail the overall operation
143 0 : return Ok(None);
144 : }
145 : };
146 :
147 0 : if !nodes.contains_key(&destination) {
148 0 : return Err(OperationError::NodeStateChanged(
149 0 : format!("node {} was removed", destination).into(),
150 0 : ));
151 0 : }
152 0 :
153 0 : if !tenant_shard.intent.get_secondary().contains(&destination) {
154 0 : tracing::info!(
155 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
156 0 : "Secondary moved away from {destination} during drain"
157 : );
158 :
159 0 : return Ok(None);
160 0 : }
161 0 :
162 0 : match tenant_shard.reschedule_to_secondary(Some(destination), scheduler) {
163 0 : Err(e) => {
164 0 : tracing::warn!(
165 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
166 0 : "Scheduling error when draining pageserver {} : {}", self.drained_node, e
167 : );
168 :
169 0 : Ok(None)
170 : }
171 : Ok(()) => {
172 0 : let scheduled_to = tenant_shard.intent.get_attached();
173 0 : tracing::info!(
174 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
175 0 : "Rescheduled shard while draining node {}: {} -> {:?}",
176 : self.drained_node,
177 : self.drained_node,
178 : scheduled_to
179 : );
180 :
181 0 : Ok(Some(tenant_shard))
182 : }
183 : }
184 0 : }
185 : }
186 :
187 : #[cfg(test)]
188 : mod tests {
189 : use std::sync::Arc;
190 :
191 : use utils::{
192 : id::TenantId,
193 : shard::{ShardCount, ShardNumber, TenantShardId},
194 : };
195 :
196 : use super::TenantShardIterator;
197 :
198 : #[test]
199 1 : fn test_tenant_shard_iterator() {
200 1 : let tenant_id = TenantId::generate();
201 1 : let shard_count = ShardCount(8);
202 1 :
203 1 : let mut tenant_shards = Vec::default();
204 8 : for i in 0..shard_count.0 {
205 8 : tenant_shards.push((
206 8 : TenantShardId {
207 8 : tenant_id,
208 8 : shard_number: ShardNumber(i),
209 8 : shard_count,
210 8 : },
211 8 : (),
212 8 : ))
213 : }
214 :
215 1 : let tenant_shards = Arc::new(tenant_shards);
216 1 :
217 1 : let mut tid_iter = TenantShardIterator::new({
218 1 : let tenants = tenant_shards.clone();
219 9 : move |last_inspected_shard: Option<TenantShardId>| {
220 9 : let entry = match last_inspected_shard {
221 8 : Some(skip_past) => {
222 36 : let mut cursor = tenants.iter().skip_while(|(tid, _)| *tid != skip_past);
223 8 : cursor.nth(1)
224 : }
225 1 : None => tenants.first(),
226 : };
227 :
228 9 : entry.map(|(tid, _)| tid).copied()
229 9 : }
230 1 : });
231 1 :
232 1 : let mut iterated_over = Vec::default();
233 9 : while let Some(tid) = tid_iter.next() {
234 8 : iterated_over.push((tid, ()));
235 8 : }
236 :
237 1 : assert_eq!(iterated_over, *tenant_shards);
238 1 : }
239 : }
|