LCOV - code coverage report
Current view: top level - storage_controller/src - operation_utils.rs (source / functions) Coverage Total Hit
Test: 5713ff31fc16472ab3f92425989ca6addc3dcf9c.info Lines: 0.0 % 80 0
Test Date: 2025-07-30 16:18:19 Functions: 0.0 % 4 0

            Line data    Source code
       1              : use std::collections::{BTreeMap, HashMap};
       2              : use std::sync::Arc;
       3              : 
       4              : use pageserver_api::controller_api::{NodeSchedulingPolicy, ShardSchedulingPolicy};
       5              : use utils::id::NodeId;
       6              : use utils::shard::TenantShardId;
       7              : 
       8              : use crate::background_node_operations::OperationError;
       9              : use crate::node::Node;
      10              : use crate::scheduler::Scheduler;
      11              : use crate::tenant_shard::TenantShard;
      12              : 
      13              : /// Check that the state of the node being drained is as expected:
      14              : /// node is present in memory and scheduling policy is set to expected_policy
      15            0 : pub(crate) fn validate_node_state(
      16            0 :     node_id: &NodeId,
      17            0 :     nodes: Arc<HashMap<NodeId, Node>>,
      18            0 :     expected_policy: NodeSchedulingPolicy,
      19            0 : ) -> Result<(), OperationError> {
      20            0 :     let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged(
      21            0 :         format!("node {node_id} was removed").into(),
      22            0 :     ))?;
      23              : 
      24            0 :     let current_policy = node.get_scheduling();
      25            0 :     if current_policy != expected_policy {
      26              :         // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
      27              :         // about it
      28            0 :         return Err(OperationError::NodeStateChanged(
      29            0 :             format!("node {node_id} changed state to {current_policy:?}").into(),
      30            0 :         ));
      31            0 :     }
      32              : 
      33            0 :     Ok(())
      34            0 : }
      35              : 
      36              : /// Struct that houses a few utility methods for draining pageserver nodes
      37              : pub(crate) struct TenantShardDrain {
      38              :     pub(crate) drained_node: NodeId,
      39              :     pub(crate) tenant_shard_id: TenantShardId,
      40              : }
      41              : 
      42              : impl TenantShardDrain {
      43              :     /// Check if the tenant shard under question is eligible for drainining:
      44              :     /// it's primary attachment is on the node being drained
      45            0 :     pub(crate) fn tenant_shard_eligible_for_drain(
      46            0 :         &self,
      47            0 :         tenants: &BTreeMap<TenantShardId, TenantShard>,
      48            0 :         scheduler: &Scheduler,
      49            0 :     ) -> TenantShardDrainAction {
      50            0 :         let Some(tenant_shard) = tenants.get(&self.tenant_shard_id) else {
      51            0 :             return TenantShardDrainAction::Skip;
      52              :         };
      53              : 
      54            0 :         if *tenant_shard.intent.get_attached() != Some(self.drained_node) {
      55              :             // If the intent attached node is not the drained node, check the observed state
      56              :             // of the shard on the drained node. If it is Attached*, it means the shard is
      57              :             // beeing migrated from the drained node. The drain loop needs to wait for the
      58              :             // reconciliation to complete for a smooth draining.
      59              : 
      60              :             use pageserver_api::models::LocationConfigMode::*;
      61              : 
      62            0 :             let attach_mode = tenant_shard
      63            0 :                 .observed
      64            0 :                 .locations
      65            0 :                 .get(&self.drained_node)
      66            0 :                 .and_then(|observed| observed.conf.as_ref().map(|conf| conf.mode));
      67              : 
      68            0 :             return match (attach_mode, tenant_shard.intent.get_attached()) {
      69            0 :                 (Some(AttachedSingle | AttachedMulti | AttachedStale), Some(intent_node_id)) => {
      70            0 :                     TenantShardDrainAction::Reconcile(*intent_node_id)
      71              :                 }
      72            0 :                 _ => TenantShardDrainAction::Skip,
      73              :             };
      74            0 :         }
      75              : 
      76              :         // Only tenants with a normal (Active) scheduling policy are proactively moved
      77              :         // around during a node drain.  Shards which have been manually configured to a different
      78              :         // policy are only rescheduled by manual intervention.
      79            0 :         match tenant_shard.get_scheduling_policy() {
      80            0 :             ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {
      81            0 :                 // A migration during drain is classed as 'essential' because it is required to
      82            0 :                 // uphold our availability goals for the tenant: this shard is elegible for migration.
      83            0 :             }
      84              :             ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
      85              :                 // If we have been asked to avoid rescheduling this shard, then do not migrate it during a drain
      86            0 :                 return TenantShardDrainAction::Skip;
      87              :             }
      88              :         }
      89              : 
      90            0 :         match tenant_shard.preferred_secondary(scheduler) {
      91            0 :             Some(node) => TenantShardDrainAction::RescheduleToSecondary(node),
      92              :             None => {
      93            0 :                 tracing::warn!(
      94            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
      95            0 :                     "No eligible secondary while draining {}", self.drained_node
      96              :                 );
      97              : 
      98            0 :                 TenantShardDrainAction::Skip
      99              :             }
     100              :         }
     101            0 :     }
     102              : 
     103              :     /// Attempt to reschedule the tenant shard under question to one of its secondary locations
     104              :     /// Returns an Err when the operation should be aborted and Ok(None) when the tenant shard
     105              :     /// should be skipped.
     106            0 :     pub(crate) fn reschedule_to_secondary<'a>(
     107            0 :         &self,
     108            0 :         destination: NodeId,
     109            0 :         tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
     110            0 :         scheduler: &mut Scheduler,
     111            0 :         nodes: &Arc<HashMap<NodeId, Node>>,
     112            0 :     ) -> Result<Option<&'a mut TenantShard>, OperationError> {
     113            0 :         let tenant_shard = match tenants.get_mut(&self.tenant_shard_id) {
     114            0 :             Some(some) => some,
     115              :             None => {
     116              :                 // Tenant shard was removed in the meantime.
     117              :                 // Skip to the next one, but don't fail the overall operation
     118            0 :                 return Ok(None);
     119              :             }
     120              :         };
     121              : 
     122            0 :         if !nodes.contains_key(&destination) {
     123            0 :             return Err(OperationError::NodeStateChanged(
     124            0 :                 format!("node {destination} was removed").into(),
     125            0 :             ));
     126            0 :         }
     127              : 
     128            0 :         if !tenant_shard.intent.get_secondary().contains(&destination) {
     129            0 :             tracing::info!(
     130            0 :                 tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     131            0 :                 "Secondary moved away from {destination} during drain"
     132              :             );
     133              : 
     134            0 :             return Ok(None);
     135            0 :         }
     136              : 
     137            0 :         match tenant_shard.reschedule_to_secondary(Some(destination), scheduler) {
     138            0 :             Err(e) => {
     139            0 :                 tracing::warn!(
     140            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     141            0 :                     "Scheduling error when draining pageserver {} : {}", self.drained_node, e
     142              :                 );
     143              : 
     144            0 :                 Ok(None)
     145              :             }
     146              :             Ok(()) => {
     147            0 :                 let scheduled_to = tenant_shard.intent.get_attached();
     148            0 :                 tracing::info!(
     149            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     150            0 :                     "Rescheduled shard while draining node {}: {} -> {:?}",
     151              :                     self.drained_node,
     152              :                     self.drained_node,
     153              :                     scheduled_to
     154              :                 );
     155              : 
     156            0 :                 Ok(Some(tenant_shard))
     157              :             }
     158              :         }
     159            0 :     }
     160              : }
     161              : 
     162              : /// Action to take when draining a tenant shard.
     163              : pub(crate) enum TenantShardDrainAction {
     164              :     /// The tenant shard is on the draining node.
     165              :     /// Reschedule the tenant shard to a secondary location.
     166              :     /// Holds a destination node id to reschedule to.
     167              :     RescheduleToSecondary(NodeId),
     168              :     /// The tenant shard is beeing migrated from the draining node.
     169              :     /// Wait for the reconciliation to complete.
     170              :     /// Holds the intent attached node id.
     171              :     Reconcile(NodeId),
     172              :     /// The tenant shard is not eligible for drainining, skip it.
     173              :     Skip,
     174              : }
        

Generated by: LCOV version 2.1-beta