LCOV - code coverage report
Current view: top level - storage_controller/src - operation_utils.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 71 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 3 0

            Line data    Source code
       1              : use std::collections::{BTreeMap, HashMap};
       2              : use std::sync::Arc;
       3              : 
       4              : use pageserver_api::controller_api::{NodeSchedulingPolicy, ShardSchedulingPolicy};
       5              : use utils::id::NodeId;
       6              : use utils::shard::TenantShardId;
       7              : 
       8              : use crate::background_node_operations::OperationError;
       9              : use crate::node::Node;
      10              : use crate::scheduler::Scheduler;
      11              : use crate::tenant_shard::TenantShard;
      12              : 
      13              : /// Check that the state of the node being drained is as expected:
      14              : /// node is present in memory and scheduling policy is set to expected_policy
      15            0 : pub(crate) fn validate_node_state(
      16            0 :     node_id: &NodeId,
      17            0 :     nodes: Arc<HashMap<NodeId, Node>>,
      18            0 :     expected_policy: NodeSchedulingPolicy,
      19            0 : ) -> Result<(), OperationError> {
      20            0 :     let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged(
      21            0 :         format!("node {node_id} was removed").into(),
      22            0 :     ))?;
      23              : 
      24            0 :     let current_policy = node.get_scheduling();
      25            0 :     if current_policy != expected_policy {
      26              :         // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
      27              :         // about it
      28            0 :         return Err(OperationError::NodeStateChanged(
      29            0 :             format!("node {node_id} changed state to {current_policy:?}").into(),
      30            0 :         ));
      31            0 :     }
      32              : 
      33            0 :     Ok(())
      34            0 : }
      35              : 
      36              : /// Struct that houses a few utility methods for draining pageserver nodes
      37              : pub(crate) struct TenantShardDrain {
      38              :     pub(crate) drained_node: NodeId,
      39              :     pub(crate) tenant_shard_id: TenantShardId,
      40              : }
      41              : 
      42              : impl TenantShardDrain {
      43              :     /// Check if the tenant shard under question is eligible for drainining:
      44              :     /// it's primary attachment is on the node being drained
      45            0 :     pub(crate) fn tenant_shard_eligible_for_drain(
      46            0 :         &self,
      47            0 :         tenants: &BTreeMap<TenantShardId, TenantShard>,
      48            0 :         scheduler: &Scheduler,
      49            0 :     ) -> Option<NodeId> {
      50            0 :         let tenant_shard = tenants.get(&self.tenant_shard_id)?;
      51              : 
      52            0 :         if *tenant_shard.intent.get_attached() != Some(self.drained_node) {
      53            0 :             return None;
      54            0 :         }
      55              : 
      56              :         // Only tenants with a normal (Active) scheduling policy are proactively moved
      57              :         // around during a node drain.  Shards which have been manually configured to a different
      58              :         // policy are only rescheduled by manual intervention.
      59            0 :         match tenant_shard.get_scheduling_policy() {
      60            0 :             ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {
      61            0 :                 // A migration during drain is classed as 'essential' because it is required to
      62            0 :                 // uphold our availability goals for the tenant: this shard is elegible for migration.
      63            0 :             }
      64              :             ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
      65              :                 // If we have been asked to avoid rescheduling this shard, then do not migrate it during a drain
      66            0 :                 return None;
      67              :             }
      68              :         }
      69              : 
      70            0 :         match tenant_shard.preferred_secondary(scheduler) {
      71            0 :             Some(node) => Some(node),
      72              :             None => {
      73            0 :                 tracing::warn!(
      74            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
      75            0 :                     "No eligible secondary while draining {}", self.drained_node
      76              :                 );
      77              : 
      78            0 :                 None
      79              :             }
      80              :         }
      81            0 :     }
      82              : 
      83              :     /// Attempt to reschedule the tenant shard under question to one of its secondary locations
      84              :     /// Returns an Err when the operation should be aborted and Ok(None) when the tenant shard
      85              :     /// should be skipped.
      86            0 :     pub(crate) fn reschedule_to_secondary<'a>(
      87            0 :         &self,
      88            0 :         destination: NodeId,
      89            0 :         tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
      90            0 :         scheduler: &mut Scheduler,
      91            0 :         nodes: &Arc<HashMap<NodeId, Node>>,
      92            0 :     ) -> Result<Option<&'a mut TenantShard>, OperationError> {
      93            0 :         let tenant_shard = match tenants.get_mut(&self.tenant_shard_id) {
      94            0 :             Some(some) => some,
      95              :             None => {
      96              :                 // Tenant shard was removed in the meantime.
      97              :                 // Skip to the next one, but don't fail the overall operation
      98            0 :                 return Ok(None);
      99              :             }
     100              :         };
     101              : 
     102            0 :         if !nodes.contains_key(&destination) {
     103            0 :             return Err(OperationError::NodeStateChanged(
     104            0 :                 format!("node {destination} was removed").into(),
     105            0 :             ));
     106            0 :         }
     107              : 
     108            0 :         if !tenant_shard.intent.get_secondary().contains(&destination) {
     109            0 :             tracing::info!(
     110            0 :                 tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     111            0 :                 "Secondary moved away from {destination} during drain"
     112              :             );
     113              : 
     114            0 :             return Ok(None);
     115            0 :         }
     116              : 
     117            0 :         match tenant_shard.reschedule_to_secondary(Some(destination), scheduler) {
     118            0 :             Err(e) => {
     119            0 :                 tracing::warn!(
     120            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     121            0 :                     "Scheduling error when draining pageserver {} : {}", self.drained_node, e
     122              :                 );
     123              : 
     124            0 :                 Ok(None)
     125              :             }
     126              :             Ok(()) => {
     127            0 :                 let scheduled_to = tenant_shard.intent.get_attached();
     128            0 :                 tracing::info!(
     129            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     130            0 :                     "Rescheduled shard while draining node {}: {} -> {:?}",
     131              :                     self.drained_node,
     132              :                     self.drained_node,
     133              :                     scheduled_to
     134              :                 );
     135              : 
     136            0 :                 Ok(Some(tenant_shard))
     137              :             }
     138              :         }
     139            0 :     }
     140              : }
        

Generated by: LCOV version 2.1-beta