Line data Source code
1 : use std::collections::{BTreeMap, HashMap};
2 : use std::sync::Arc;
3 :
4 : use pageserver_api::controller_api::{NodeSchedulingPolicy, ShardSchedulingPolicy};
5 : use utils::id::NodeId;
6 : use utils::shard::TenantShardId;
7 :
8 : use crate::background_node_operations::OperationError;
9 : use crate::node::Node;
10 : use crate::scheduler::Scheduler;
11 : use crate::tenant_shard::TenantShard;
12 :
13 : /// Check that the state of the node being drained is as expected:
14 : /// node is present in memory and scheduling policy is set to expected_policy
15 0 : pub(crate) fn validate_node_state(
16 0 : node_id: &NodeId,
17 0 : nodes: Arc<HashMap<NodeId, Node>>,
18 0 : expected_policy: NodeSchedulingPolicy,
19 0 : ) -> Result<(), OperationError> {
20 0 : let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged(
21 0 : format!("node {node_id} was removed").into(),
22 0 : ))?;
23 :
24 0 : let current_policy = node.get_scheduling();
25 0 : if current_policy != expected_policy {
26 : // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
27 : // about it
28 0 : return Err(OperationError::NodeStateChanged(
29 0 : format!("node {node_id} changed state to {current_policy:?}").into(),
30 0 : ));
31 0 : }
32 :
33 0 : Ok(())
34 0 : }
35 :
36 : /// Struct that houses a few utility methods for draining pageserver nodes
37 : pub(crate) struct TenantShardDrain {
38 : pub(crate) drained_node: NodeId,
39 : pub(crate) tenant_shard_id: TenantShardId,
40 : }
41 :
42 : impl TenantShardDrain {
43 : /// Check if the tenant shard under question is eligible for drainining:
44 : /// it's primary attachment is on the node being drained
45 0 : pub(crate) fn tenant_shard_eligible_for_drain(
46 0 : &self,
47 0 : tenants: &BTreeMap<TenantShardId, TenantShard>,
48 0 : scheduler: &Scheduler,
49 0 : ) -> Option<NodeId> {
50 0 : let tenant_shard = tenants.get(&self.tenant_shard_id)?;
51 :
52 0 : if *tenant_shard.intent.get_attached() != Some(self.drained_node) {
53 0 : return None;
54 0 : }
55 :
56 : // Only tenants with a normal (Active) scheduling policy are proactively moved
57 : // around during a node drain. Shards which have been manually configured to a different
58 : // policy are only rescheduled by manual intervention.
59 0 : match tenant_shard.get_scheduling_policy() {
60 0 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {
61 0 : // A migration during drain is classed as 'essential' because it is required to
62 0 : // uphold our availability goals for the tenant: this shard is elegible for migration.
63 0 : }
64 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
65 : // If we have been asked to avoid rescheduling this shard, then do not migrate it during a drain
66 0 : return None;
67 : }
68 : }
69 :
70 0 : match tenant_shard.preferred_secondary(scheduler) {
71 0 : Some(node) => Some(node),
72 : None => {
73 0 : tracing::warn!(
74 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
75 0 : "No eligible secondary while draining {}", self.drained_node
76 : );
77 :
78 0 : None
79 : }
80 : }
81 0 : }
82 :
83 : /// Attempt to reschedule the tenant shard under question to one of its secondary locations
84 : /// Returns an Err when the operation should be aborted and Ok(None) when the tenant shard
85 : /// should be skipped.
86 0 : pub(crate) fn reschedule_to_secondary<'a>(
87 0 : &self,
88 0 : destination: NodeId,
89 0 : tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
90 0 : scheduler: &mut Scheduler,
91 0 : nodes: &Arc<HashMap<NodeId, Node>>,
92 0 : ) -> Result<Option<&'a mut TenantShard>, OperationError> {
93 0 : let tenant_shard = match tenants.get_mut(&self.tenant_shard_id) {
94 0 : Some(some) => some,
95 : None => {
96 : // Tenant shard was removed in the meantime.
97 : // Skip to the next one, but don't fail the overall operation
98 0 : return Ok(None);
99 : }
100 : };
101 :
102 0 : if !nodes.contains_key(&destination) {
103 0 : return Err(OperationError::NodeStateChanged(
104 0 : format!("node {destination} was removed").into(),
105 0 : ));
106 0 : }
107 :
108 0 : if !tenant_shard.intent.get_secondary().contains(&destination) {
109 0 : tracing::info!(
110 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
111 0 : "Secondary moved away from {destination} during drain"
112 : );
113 :
114 0 : return Ok(None);
115 0 : }
116 :
117 0 : match tenant_shard.reschedule_to_secondary(Some(destination), scheduler) {
118 0 : Err(e) => {
119 0 : tracing::warn!(
120 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
121 0 : "Scheduling error when draining pageserver {} : {}", self.drained_node, e
122 : );
123 :
124 0 : Ok(None)
125 : }
126 : Ok(()) => {
127 0 : let scheduled_to = tenant_shard.intent.get_attached();
128 0 : tracing::info!(
129 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
130 0 : "Rescheduled shard while draining node {}: {} -> {:?}",
131 : self.drained_node,
132 : self.drained_node,
133 : scheduled_to
134 : );
135 :
136 0 : Ok(Some(tenant_shard))
137 : }
138 : }
139 0 : }
140 : }
|