Line data Source code
1 : use std::{
2 : collections::{BTreeMap, HashMap},
3 : sync::Arc,
4 : };
5 :
6 : use pageserver_api::controller_api::NodeSchedulingPolicy;
7 : use utils::{id::NodeId, shard::TenantShardId};
8 :
9 : use crate::{
10 : background_node_operations::OperationError, node::Node, scheduler::Scheduler,
11 : tenant_shard::TenantShard,
12 : };
13 :
14 : pub(crate) struct TenantShardIterator<F> {
15 : tenants_accessor: F,
16 : inspected_all_shards: bool,
17 : last_inspected_shard: Option<TenantShardId>,
18 : }
19 :
20 : /// A simple iterator which can be used in tandem with [`crate::service::Service`]
21 : /// to iterate over all known tenant shard ids without holding the lock on the
22 : /// service state at all times.
23 : impl<F> TenantShardIterator<F>
24 : where
25 : F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
26 : {
27 1 : pub(crate) fn new(tenants_accessor: F) -> Self {
28 1 : Self {
29 1 : tenants_accessor,
30 1 : inspected_all_shards: false,
31 1 : last_inspected_shard: None,
32 1 : }
33 1 : }
34 :
35 : /// Returns the next tenant shard id if one exists
36 9 : pub(crate) fn next(&mut self) -> Option<TenantShardId> {
37 9 : if self.inspected_all_shards {
38 0 : return None;
39 9 : }
40 9 :
41 9 : match (self.tenants_accessor)(self.last_inspected_shard) {
42 8 : Some(tid) => {
43 8 : self.last_inspected_shard = Some(tid);
44 8 : Some(tid)
45 : }
46 : None => {
47 1 : self.inspected_all_shards = true;
48 1 : None
49 : }
50 : }
51 9 : }
52 :
53 : /// Returns true when the end of the iterator is reached and false otherwise
54 0 : pub(crate) fn finished(&self) -> bool {
55 0 : self.inspected_all_shards
56 0 : }
57 : }
58 :
59 : /// Check that the state of the node being drained is as expected:
60 : /// node is present in memory and scheduling policy is set to [`NodeSchedulingPolicy::Draining`]
61 0 : pub(crate) fn validate_node_state(
62 0 : node_id: &NodeId,
63 0 : nodes: Arc<HashMap<NodeId, Node>>,
64 0 : ) -> Result<(), OperationError> {
65 0 : let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged(
66 0 : format!("node {} was removed", node_id).into(),
67 0 : ))?;
68 :
69 0 : let current_policy = node.get_scheduling();
70 0 : if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
71 : // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
72 : // about it
73 0 : return Err(OperationError::NodeStateChanged(
74 0 : format!("node {} changed state to {:?}", node_id, current_policy).into(),
75 0 : ));
76 0 : }
77 0 :
78 0 : Ok(())
79 0 : }
80 :
81 : /// Struct that houses a few utility methods for draining pageserver nodes
82 : pub(crate) struct TenantShardDrain {
83 : pub(crate) drained_node: NodeId,
84 : pub(crate) tenant_shard_id: TenantShardId,
85 : }
86 :
87 : impl TenantShardDrain {
88 : /// Check if the tenant shard under question is eligible for drainining:
89 : /// it's primary attachment is on the node being drained
90 0 : pub(crate) fn tenant_shard_eligible_for_drain(
91 0 : &self,
92 0 : tenants: &BTreeMap<TenantShardId, TenantShard>,
93 0 : scheduler: &Scheduler,
94 0 : ) -> Option<NodeId> {
95 0 : let tenant_shard = tenants.get(&self.tenant_shard_id)?;
96 :
97 0 : if *tenant_shard.intent.get_attached() != Some(self.drained_node) {
98 0 : return None;
99 0 : }
100 0 :
101 0 : match scheduler.node_preferred(tenant_shard.intent.get_secondary()) {
102 0 : Some(node) => Some(node),
103 : None => {
104 0 : tracing::warn!(
105 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
106 0 : "No eligible secondary while draining {}", self.drained_node
107 : );
108 :
109 0 : None
110 : }
111 : }
112 0 : }
113 :
114 : /// Attempt to reschedule the tenant shard under question to one of its secondary locations
115 : /// Returns an Err when the operation should be aborted and Ok(None) when the tenant shard
116 : /// should be skipped.
117 0 : pub(crate) fn reschedule_to_secondary<'a>(
118 0 : &self,
119 0 : destination: NodeId,
120 0 : tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
121 0 : scheduler: &mut Scheduler,
122 0 : nodes: &Arc<HashMap<NodeId, Node>>,
123 0 : ) -> Result<Option<&'a mut TenantShard>, OperationError> {
124 0 : let tenant_shard = match tenants.get_mut(&self.tenant_shard_id) {
125 0 : Some(some) => some,
126 : None => {
127 : // Tenant shard was removed in the meantime.
128 : // Skip to the next one, but don't fail the overall operation
129 0 : return Ok(None);
130 : }
131 : };
132 :
133 0 : if !nodes.contains_key(&destination) {
134 0 : return Err(OperationError::NodeStateChanged(
135 0 : format!("node {} was removed", destination).into(),
136 0 : ));
137 0 : }
138 0 :
139 0 : if !tenant_shard.intent.get_secondary().contains(&destination) {
140 0 : tracing::info!(
141 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
142 0 : "Secondary moved away from {destination} during drain"
143 : );
144 :
145 0 : return Ok(None);
146 0 : }
147 0 :
148 0 : match tenant_shard.reschedule_to_secondary(Some(destination), scheduler) {
149 0 : Err(e) => {
150 0 : tracing::warn!(
151 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
152 0 : "Scheduling error when draining pageserver {} : {}", self.drained_node, e
153 : );
154 :
155 0 : Ok(None)
156 : }
157 : Ok(()) => {
158 0 : let scheduled_to = tenant_shard.intent.get_attached();
159 0 : tracing::info!(
160 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
161 0 : "Rescheduled shard while draining node {}: {} -> {:?}",
162 : self.drained_node,
163 : self.drained_node,
164 : scheduled_to
165 : );
166 :
167 0 : Ok(Some(tenant_shard))
168 : }
169 : }
170 0 : }
171 : }
172 :
173 : #[cfg(test)]
174 : mod tests {
175 : use std::sync::Arc;
176 :
177 : use utils::{
178 : id::TenantId,
179 : shard::{ShardCount, ShardNumber, TenantShardId},
180 : };
181 :
182 : use super::TenantShardIterator;
183 :
184 : #[test]
185 1 : fn test_tenant_shard_iterator() {
186 1 : let tenant_id = TenantId::generate();
187 1 : let shard_count = ShardCount(8);
188 1 :
189 1 : let mut tenant_shards = Vec::default();
190 8 : for i in 0..shard_count.0 {
191 8 : tenant_shards.push((
192 8 : TenantShardId {
193 8 : tenant_id,
194 8 : shard_number: ShardNumber(i),
195 8 : shard_count,
196 8 : },
197 8 : (),
198 8 : ))
199 : }
200 :
201 1 : let tenant_shards = Arc::new(tenant_shards);
202 1 :
203 1 : let mut tid_iter = TenantShardIterator::new({
204 1 : let tenants = tenant_shards.clone();
205 9 : move |last_inspected_shard: Option<TenantShardId>| {
206 9 : let entry = match last_inspected_shard {
207 8 : Some(skip_past) => {
208 36 : let mut cursor = tenants.iter().skip_while(|(tid, _)| *tid != skip_past);
209 8 : cursor.nth(1)
210 : }
211 1 : None => tenants.first(),
212 : };
213 :
214 9 : entry.map(|(tid, _)| tid).copied()
215 9 : }
216 1 : });
217 1 :
218 1 : let mut iterated_over = Vec::default();
219 9 : while let Some(tid) = tid_iter.next() {
220 8 : iterated_over.push((tid, ()));
221 8 : }
222 :
223 1 : assert_eq!(iterated_over, *tenant_shards);
224 1 : }
225 : }
|