Line data Source code
1 : use std::collections::{BTreeMap, HashMap};
2 : use std::sync::Arc;
3 :
4 : use pageserver_api::controller_api::{NodeSchedulingPolicy, ShardSchedulingPolicy};
5 : use utils::id::NodeId;
6 : use utils::shard::TenantShardId;
7 :
8 : use crate::background_node_operations::OperationError;
9 : use crate::node::Node;
10 : use crate::scheduler::Scheduler;
11 : use crate::tenant_shard::TenantShard;
12 :
13 : /// Check that the state of the node being drained is as expected:
14 : /// node is present in memory and scheduling policy is set to expected_policy
15 0 : pub(crate) fn validate_node_state(
16 0 : node_id: &NodeId,
17 0 : nodes: Arc<HashMap<NodeId, Node>>,
18 0 : expected_policy: NodeSchedulingPolicy,
19 0 : ) -> Result<(), OperationError> {
20 0 : let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged(
21 0 : format!("node {node_id} was removed").into(),
22 0 : ))?;
23 :
24 0 : let current_policy = node.get_scheduling();
25 0 : if current_policy != expected_policy {
26 : // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
27 : // about it
28 0 : return Err(OperationError::NodeStateChanged(
29 0 : format!("node {node_id} changed state to {current_policy:?}").into(),
30 0 : ));
31 0 : }
32 :
33 0 : Ok(())
34 0 : }
35 :
36 : /// Struct that houses a few utility methods for draining pageserver nodes
37 : pub(crate) struct TenantShardDrain {
38 : pub(crate) drained_node: NodeId,
39 : pub(crate) tenant_shard_id: TenantShardId,
40 : }
41 :
42 : impl TenantShardDrain {
43 : /// Check if the tenant shard under question is eligible for drainining:
44 : /// it's primary attachment is on the node being drained
45 0 : pub(crate) fn tenant_shard_eligible_for_drain(
46 0 : &self,
47 0 : tenants: &BTreeMap<TenantShardId, TenantShard>,
48 0 : scheduler: &Scheduler,
49 0 : ) -> TenantShardDrainAction {
50 0 : let Some(tenant_shard) = tenants.get(&self.tenant_shard_id) else {
51 0 : return TenantShardDrainAction::Skip;
52 : };
53 :
54 0 : if *tenant_shard.intent.get_attached() != Some(self.drained_node) {
55 : // If the intent attached node is not the drained node, check the observed state
56 : // of the shard on the drained node. If it is Attached*, it means the shard is
57 : // beeing migrated from the drained node. The drain loop needs to wait for the
58 : // reconciliation to complete for a smooth draining.
59 :
60 : use pageserver_api::models::LocationConfigMode::*;
61 :
62 0 : let attach_mode = tenant_shard
63 0 : .observed
64 0 : .locations
65 0 : .get(&self.drained_node)
66 0 : .and_then(|observed| observed.conf.as_ref().map(|conf| conf.mode));
67 :
68 0 : return match (attach_mode, tenant_shard.intent.get_attached()) {
69 0 : (Some(AttachedSingle | AttachedMulti | AttachedStale), Some(intent_node_id)) => {
70 0 : TenantShardDrainAction::Reconcile(*intent_node_id)
71 : }
72 0 : _ => TenantShardDrainAction::Skip,
73 : };
74 0 : }
75 :
76 : // Only tenants with a normal (Active) scheduling policy are proactively moved
77 : // around during a node drain. Shards which have been manually configured to a different
78 : // policy are only rescheduled by manual intervention.
79 0 : match tenant_shard.get_scheduling_policy() {
80 0 : ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {
81 0 : // A migration during drain is classed as 'essential' because it is required to
82 0 : // uphold our availability goals for the tenant: this shard is elegible for migration.
83 0 : }
84 : ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
85 : // If we have been asked to avoid rescheduling this shard, then do not migrate it during a drain
86 0 : return TenantShardDrainAction::Skip;
87 : }
88 : }
89 :
90 0 : match tenant_shard.preferred_secondary(scheduler) {
91 0 : Some(node) => TenantShardDrainAction::RescheduleToSecondary(node),
92 : None => {
93 0 : tracing::warn!(
94 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
95 0 : "No eligible secondary while draining {}", self.drained_node
96 : );
97 :
98 0 : TenantShardDrainAction::Skip
99 : }
100 : }
101 0 : }
102 :
103 : /// Attempt to reschedule the tenant shard under question to one of its secondary locations
104 : /// Returns an Err when the operation should be aborted and Ok(None) when the tenant shard
105 : /// should be skipped.
106 0 : pub(crate) fn reschedule_to_secondary<'a>(
107 0 : &self,
108 0 : destination: NodeId,
109 0 : tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
110 0 : scheduler: &mut Scheduler,
111 0 : nodes: &Arc<HashMap<NodeId, Node>>,
112 0 : ) -> Result<Option<&'a mut TenantShard>, OperationError> {
113 0 : let tenant_shard = match tenants.get_mut(&self.tenant_shard_id) {
114 0 : Some(some) => some,
115 : None => {
116 : // Tenant shard was removed in the meantime.
117 : // Skip to the next one, but don't fail the overall operation
118 0 : return Ok(None);
119 : }
120 : };
121 :
122 0 : if !nodes.contains_key(&destination) {
123 0 : return Err(OperationError::NodeStateChanged(
124 0 : format!("node {destination} was removed").into(),
125 0 : ));
126 0 : }
127 :
128 0 : if !tenant_shard.intent.get_secondary().contains(&destination) {
129 0 : tracing::info!(
130 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
131 0 : "Secondary moved away from {destination} during drain"
132 : );
133 :
134 0 : return Ok(None);
135 0 : }
136 :
137 0 : match tenant_shard.reschedule_to_secondary(Some(destination), scheduler) {
138 0 : Err(e) => {
139 0 : tracing::warn!(
140 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
141 0 : "Scheduling error when draining pageserver {} : {}", self.drained_node, e
142 : );
143 :
144 0 : Ok(None)
145 : }
146 : Ok(()) => {
147 0 : let scheduled_to = tenant_shard.intent.get_attached();
148 0 : tracing::info!(
149 0 : tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
150 0 : "Rescheduled shard while draining node {}: {} -> {:?}",
151 : self.drained_node,
152 : self.drained_node,
153 : scheduled_to
154 : );
155 :
156 0 : Ok(Some(tenant_shard))
157 : }
158 : }
159 0 : }
160 : }
161 :
162 : /// Action to take when draining a tenant shard.
163 : pub(crate) enum TenantShardDrainAction {
164 : /// The tenant shard is on the draining node.
165 : /// Reschedule the tenant shard to a secondary location.
166 : /// Holds a destination node id to reschedule to.
167 : RescheduleToSecondary(NodeId),
168 : /// The tenant shard is beeing migrated from the draining node.
169 : /// Wait for the reconciliation to complete.
170 : /// Holds the intent attached node id.
171 : Reconcile(NodeId),
172 : /// The tenant shard is not eligible for drainining, skip it.
173 : Skip,
174 : }
|