LCOV - code coverage report
Current view: top level - storage_controller/src - drain_utils.rs (source / functions) Coverage Total Hit
Test: 6df3fc19ec669bcfbbf9aba41d1338898d24eaa0.info Lines: 39.1 % 133 52
Test Date: 2025-03-12 18:28:53 Functions: 50.0 % 12 6

            Line data    Source code
       1              : use std::collections::{BTreeMap, HashMap};
       2              : use std::sync::Arc;
       3              : 
       4              : use pageserver_api::controller_api::{NodeSchedulingPolicy, ShardSchedulingPolicy};
       5              : use utils::id::NodeId;
       6              : use utils::shard::TenantShardId;
       7              : 
       8              : use crate::background_node_operations::OperationError;
       9              : use crate::node::Node;
      10              : use crate::scheduler::Scheduler;
      11              : use crate::tenant_shard::TenantShard;
      12              : 
      13              : pub(crate) struct TenantShardIterator<F> {
      14              :     tenants_accessor: F,
      15              :     inspected_all_shards: bool,
      16              :     last_inspected_shard: Option<TenantShardId>,
      17              : }
      18              : 
      19              : /// A simple iterator which can be used in tandem with [`crate::service::Service`]
      20              : /// to iterate over all known tenant shard ids without holding the lock on the
      21              : /// service state at all times.
      22              : impl<F> TenantShardIterator<F>
      23              : where
      24              :     F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
      25              : {
      26            1 :     pub(crate) fn new(tenants_accessor: F) -> Self {
      27            1 :         Self {
      28            1 :             tenants_accessor,
      29            1 :             inspected_all_shards: false,
      30            1 :             last_inspected_shard: None,
      31            1 :         }
      32            1 :     }
      33              : 
      34              :     /// Returns the next tenant shard id if one exists
      35            9 :     pub(crate) fn next(&mut self) -> Option<TenantShardId> {
      36            9 :         if self.inspected_all_shards {
      37            0 :             return None;
      38            9 :         }
      39            9 : 
      40            9 :         match (self.tenants_accessor)(self.last_inspected_shard) {
      41            8 :             Some(tid) => {
      42            8 :                 self.last_inspected_shard = Some(tid);
      43            8 :                 Some(tid)
      44              :             }
      45              :             None => {
      46            1 :                 self.inspected_all_shards = true;
      47            1 :                 None
      48              :             }
      49              :         }
      50            9 :     }
      51              : 
      52              :     /// Returns true when the end of the iterator is reached and false otherwise
      53            0 :     pub(crate) fn finished(&self) -> bool {
      54            0 :         self.inspected_all_shards
      55            0 :     }
      56              : }
      57              : 
      58              : /// Check that the state of the node being drained is as expected:
      59              : /// node is present in memory and scheduling policy is set to [`NodeSchedulingPolicy::Draining`]
      60            0 : pub(crate) fn validate_node_state(
      61            0 :     node_id: &NodeId,
      62            0 :     nodes: Arc<HashMap<NodeId, Node>>,
      63            0 : ) -> Result<(), OperationError> {
      64            0 :     let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged(
      65            0 :         format!("node {} was removed", node_id).into(),
      66            0 :     ))?;
      67              : 
      68            0 :     let current_policy = node.get_scheduling();
      69            0 :     if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
      70              :         // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
      71              :         // about it
      72            0 :         return Err(OperationError::NodeStateChanged(
      73            0 :             format!("node {} changed state to {:?}", node_id, current_policy).into(),
      74            0 :         ));
      75            0 :     }
      76            0 : 
      77            0 :     Ok(())
      78            0 : }
      79              : 
      80              : /// Struct that houses a few utility methods for draining pageserver nodes
      81              : pub(crate) struct TenantShardDrain {
      82              :     pub(crate) drained_node: NodeId,
      83              :     pub(crate) tenant_shard_id: TenantShardId,
      84              : }
      85              : 
      86              : impl TenantShardDrain {
      87              :     /// Check if the tenant shard under question is eligible for drainining:
      88              :     /// it's primary attachment is on the node being drained
      89            0 :     pub(crate) fn tenant_shard_eligible_for_drain(
      90            0 :         &self,
      91            0 :         tenants: &BTreeMap<TenantShardId, TenantShard>,
      92            0 :         scheduler: &Scheduler,
      93            0 :     ) -> Option<NodeId> {
      94            0 :         let tenant_shard = tenants.get(&self.tenant_shard_id)?;
      95              : 
      96            0 :         if *tenant_shard.intent.get_attached() != Some(self.drained_node) {
      97            0 :             return None;
      98            0 :         }
      99            0 : 
     100            0 :         // Only tenants with a normal (Active) scheduling policy are proactively moved
     101            0 :         // around during a node drain.  Shards which have been manually configured to a different
     102            0 :         // policy are only rescheduled by manual intervention.
     103            0 :         match tenant_shard.get_scheduling_policy() {
     104            0 :             ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {
     105            0 :                 // A migration during drain is classed as 'essential' because it is required to
     106            0 :                 // uphold our availability goals for the tenant: this shard is elegible for migration.
     107            0 :             }
     108              :             ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
     109              :                 // If we have been asked to avoid rescheduling this shard, then do not migrate it during a drain
     110            0 :                 return None;
     111              :             }
     112              :         }
     113              : 
     114            0 :         match tenant_shard.preferred_secondary(scheduler) {
     115            0 :             Some(node) => Some(node),
     116              :             None => {
     117            0 :                 tracing::warn!(
     118            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     119            0 :                     "No eligible secondary while draining {}", self.drained_node
     120              :                 );
     121              : 
     122            0 :                 None
     123              :             }
     124              :         }
     125            0 :     }
     126              : 
     127              :     /// Attempt to reschedule the tenant shard under question to one of its secondary locations
     128              :     /// Returns an Err when the operation should be aborted and Ok(None) when the tenant shard
     129              :     /// should be skipped.
     130            0 :     pub(crate) fn reschedule_to_secondary<'a>(
     131            0 :         &self,
     132            0 :         destination: NodeId,
     133            0 :         tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
     134            0 :         scheduler: &mut Scheduler,
     135            0 :         nodes: &Arc<HashMap<NodeId, Node>>,
     136            0 :     ) -> Result<Option<&'a mut TenantShard>, OperationError> {
     137            0 :         let tenant_shard = match tenants.get_mut(&self.tenant_shard_id) {
     138            0 :             Some(some) => some,
     139              :             None => {
     140              :                 // Tenant shard was removed in the meantime.
     141              :                 // Skip to the next one, but don't fail the overall operation
     142            0 :                 return Ok(None);
     143              :             }
     144              :         };
     145              : 
     146            0 :         if !nodes.contains_key(&destination) {
     147            0 :             return Err(OperationError::NodeStateChanged(
     148            0 :                 format!("node {} was removed", destination).into(),
     149            0 :             ));
     150            0 :         }
     151            0 : 
     152            0 :         if !tenant_shard.intent.get_secondary().contains(&destination) {
     153            0 :             tracing::info!(
     154            0 :                 tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     155            0 :                 "Secondary moved away from {destination} during drain"
     156              :             );
     157              : 
     158            0 :             return Ok(None);
     159            0 :         }
     160            0 : 
     161            0 :         match tenant_shard.reschedule_to_secondary(Some(destination), scheduler) {
     162            0 :             Err(e) => {
     163            0 :                 tracing::warn!(
     164            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     165            0 :                     "Scheduling error when draining pageserver {} : {}", self.drained_node, e
     166              :                 );
     167              : 
     168            0 :                 Ok(None)
     169              :             }
     170              :             Ok(()) => {
     171            0 :                 let scheduled_to = tenant_shard.intent.get_attached();
     172            0 :                 tracing::info!(
     173            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     174            0 :                     "Rescheduled shard while draining node {}: {} -> {:?}",
     175              :                     self.drained_node,
     176              :                     self.drained_node,
     177              :                     scheduled_to
     178              :                 );
     179              : 
     180            0 :                 Ok(Some(tenant_shard))
     181              :             }
     182              :         }
     183            0 :     }
     184              : }
     185              : 
     186              : #[cfg(test)]
     187              : mod tests {
     188              :     use std::sync::Arc;
     189              : 
     190              :     use utils::id::TenantId;
     191              :     use utils::shard::{ShardCount, ShardNumber, TenantShardId};
     192              : 
     193              :     use super::TenantShardIterator;
     194              : 
     195              :     #[test]
     196            1 :     fn test_tenant_shard_iterator() {
     197            1 :         let tenant_id = TenantId::generate();
     198            1 :         let shard_count = ShardCount(8);
     199            1 : 
     200            1 :         let mut tenant_shards = Vec::default();
     201            8 :         for i in 0..shard_count.0 {
     202            8 :             tenant_shards.push((
     203            8 :                 TenantShardId {
     204            8 :                     tenant_id,
     205            8 :                     shard_number: ShardNumber(i),
     206            8 :                     shard_count,
     207            8 :                 },
     208            8 :                 (),
     209            8 :             ))
     210              :         }
     211              : 
     212            1 :         let tenant_shards = Arc::new(tenant_shards);
     213            1 : 
     214            1 :         let mut tid_iter = TenantShardIterator::new({
     215            1 :             let tenants = tenant_shards.clone();
     216            9 :             move |last_inspected_shard: Option<TenantShardId>| {
     217            9 :                 let entry = match last_inspected_shard {
     218            8 :                     Some(skip_past) => {
     219           36 :                         let mut cursor = tenants.iter().skip_while(|(tid, _)| *tid != skip_past);
     220            8 :                         cursor.nth(1)
     221              :                     }
     222            1 :                     None => tenants.first(),
     223              :                 };
     224              : 
     225            9 :                 entry.map(|(tid, _)| tid).copied()
     226            9 :             }
     227            1 :         });
     228            1 : 
     229            1 :         let mut iterated_over = Vec::default();
     230            9 :         while let Some(tid) = tid_iter.next() {
     231            8 :             iterated_over.push((tid, ()));
     232            8 :         }
     233              : 
     234            1 :         assert_eq!(iterated_over, *tenant_shards);
     235            1 :     }
     236              : }
        

Generated by: LCOV version 2.1-beta