LCOV - code coverage report
Current view: top level - storage_controller/src - drain_utils.rs (source / functions) Coverage Total Hit
Test: 249f165943bd2c492f96a3f7d250276e4addca1a.info Lines: 39.1 % 133 52
Test Date: 2024-11-20 18:39:52 Functions: 50.0 % 12 6

            Line data    Source code
       1              : use std::{
       2              :     collections::{BTreeMap, HashMap},
       3              :     sync::Arc,
       4              : };
       5              : 
       6              : use pageserver_api::controller_api::{NodeSchedulingPolicy, ShardSchedulingPolicy};
       7              : use utils::{id::NodeId, shard::TenantShardId};
       8              : 
       9              : use crate::{
      10              :     background_node_operations::OperationError, node::Node, scheduler::Scheduler,
      11              :     tenant_shard::TenantShard,
      12              : };
      13              : 
      14              : pub(crate) struct TenantShardIterator<F> {
      15              :     tenants_accessor: F,
      16              :     inspected_all_shards: bool,
      17              :     last_inspected_shard: Option<TenantShardId>,
      18              : }
      19              : 
      20              : /// A simple iterator which can be used in tandem with [`crate::service::Service`]
      21              : /// to iterate over all known tenant shard ids without holding the lock on the
      22              : /// service state at all times.
      23              : impl<F> TenantShardIterator<F>
      24              : where
      25              :     F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
      26              : {
      27            1 :     pub(crate) fn new(tenants_accessor: F) -> Self {
      28            1 :         Self {
      29            1 :             tenants_accessor,
      30            1 :             inspected_all_shards: false,
      31            1 :             last_inspected_shard: None,
      32            1 :         }
      33            1 :     }
      34              : 
      35              :     /// Returns the next tenant shard id if one exists
      36            9 :     pub(crate) fn next(&mut self) -> Option<TenantShardId> {
      37            9 :         if self.inspected_all_shards {
      38            0 :             return None;
      39            9 :         }
      40            9 : 
      41            9 :         match (self.tenants_accessor)(self.last_inspected_shard) {
      42            8 :             Some(tid) => {
      43            8 :                 self.last_inspected_shard = Some(tid);
      44            8 :                 Some(tid)
      45              :             }
      46              :             None => {
      47            1 :                 self.inspected_all_shards = true;
      48            1 :                 None
      49              :             }
      50              :         }
      51            9 :     }
      52              : 
      53              :     /// Returns true when the end of the iterator is reached and false otherwise
      54            0 :     pub(crate) fn finished(&self) -> bool {
      55            0 :         self.inspected_all_shards
      56            0 :     }
      57              : }
      58              : 
      59              : /// Check that the state of the node being drained is as expected:
      60              : /// node is present in memory and scheduling policy is set to [`NodeSchedulingPolicy::Draining`]
      61            0 : pub(crate) fn validate_node_state(
      62            0 :     node_id: &NodeId,
      63            0 :     nodes: Arc<HashMap<NodeId, Node>>,
      64            0 : ) -> Result<(), OperationError> {
      65            0 :     let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged(
      66            0 :         format!("node {} was removed", node_id).into(),
      67            0 :     ))?;
      68              : 
      69            0 :     let current_policy = node.get_scheduling();
      70            0 :     if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
      71              :         // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
      72              :         // about it
      73            0 :         return Err(OperationError::NodeStateChanged(
      74            0 :             format!("node {} changed state to {:?}", node_id, current_policy).into(),
      75            0 :         ));
      76            0 :     }
      77            0 : 
      78            0 :     Ok(())
      79            0 : }
      80              : 
      81              : /// Struct that houses a few utility methods for draining pageserver nodes
      82              : pub(crate) struct TenantShardDrain {
      83              :     pub(crate) drained_node: NodeId,
      84              :     pub(crate) tenant_shard_id: TenantShardId,
      85              : }
      86              : 
      87              : impl TenantShardDrain {
      88              :     /// Check if the tenant shard under question is eligible for drainining:
      89              :     /// it's primary attachment is on the node being drained
      90            0 :     pub(crate) fn tenant_shard_eligible_for_drain(
      91            0 :         &self,
      92            0 :         tenants: &BTreeMap<TenantShardId, TenantShard>,
      93            0 :         scheduler: &Scheduler,
      94            0 :     ) -> Option<NodeId> {
      95            0 :         let tenant_shard = tenants.get(&self.tenant_shard_id)?;
      96              : 
      97            0 :         if *tenant_shard.intent.get_attached() != Some(self.drained_node) {
      98            0 :             return None;
      99            0 :         }
     100            0 : 
     101            0 :         // Only tenants with a normal (Active) scheduling policy are proactively moved
     102            0 :         // around during a node drain.  Shards which have been manually configured to a different
     103            0 :         // policy are only rescheduled by manual intervention.
     104            0 :         match tenant_shard.get_scheduling_policy() {
     105            0 :             ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {
     106            0 :                 // A migration during drain is classed as 'essential' because it is required to
     107            0 :                 // uphold our availability goals for the tenant: this shard is elegible for migration.
     108            0 :             }
     109              :             ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
     110              :                 // If we have been asked to avoid rescheduling this shard, then do not migrate it during a drain
     111            0 :                 return None;
     112              :             }
     113              :         }
     114              : 
     115            0 :         match scheduler.node_preferred(tenant_shard.intent.get_secondary()) {
     116            0 :             Some(node) => Some(node),
     117              :             None => {
     118            0 :                 tracing::warn!(
     119            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     120            0 :                     "No eligible secondary while draining {}", self.drained_node
     121              :                 );
     122              : 
     123            0 :                 None
     124              :             }
     125              :         }
     126            0 :     }
     127              : 
     128              :     /// Attempt to reschedule the tenant shard under question to one of its secondary locations
     129              :     /// Returns an Err when the operation should be aborted and Ok(None) when the tenant shard
     130              :     /// should be skipped.
     131            0 :     pub(crate) fn reschedule_to_secondary<'a>(
     132            0 :         &self,
     133            0 :         destination: NodeId,
     134            0 :         tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
     135            0 :         scheduler: &mut Scheduler,
     136            0 :         nodes: &Arc<HashMap<NodeId, Node>>,
     137            0 :     ) -> Result<Option<&'a mut TenantShard>, OperationError> {
     138            0 :         let tenant_shard = match tenants.get_mut(&self.tenant_shard_id) {
     139            0 :             Some(some) => some,
     140              :             None => {
     141              :                 // Tenant shard was removed in the meantime.
     142              :                 // Skip to the next one, but don't fail the overall operation
     143            0 :                 return Ok(None);
     144              :             }
     145              :         };
     146              : 
     147            0 :         if !nodes.contains_key(&destination) {
     148            0 :             return Err(OperationError::NodeStateChanged(
     149            0 :                 format!("node {} was removed", destination).into(),
     150            0 :             ));
     151            0 :         }
     152            0 : 
     153            0 :         if !tenant_shard.intent.get_secondary().contains(&destination) {
     154            0 :             tracing::info!(
     155            0 :                 tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     156            0 :                 "Secondary moved away from {destination} during drain"
     157              :             );
     158              : 
     159            0 :             return Ok(None);
     160            0 :         }
     161            0 : 
     162            0 :         match tenant_shard.reschedule_to_secondary(Some(destination), scheduler) {
     163            0 :             Err(e) => {
     164            0 :                 tracing::warn!(
     165            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     166            0 :                     "Scheduling error when draining pageserver {} : {}", self.drained_node, e
     167              :                 );
     168              : 
     169            0 :                 Ok(None)
     170              :             }
     171              :             Ok(()) => {
     172            0 :                 let scheduled_to = tenant_shard.intent.get_attached();
     173            0 :                 tracing::info!(
     174            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     175            0 :                     "Rescheduled shard while draining node {}: {} -> {:?}",
     176              :                     self.drained_node,
     177              :                     self.drained_node,
     178              :                     scheduled_to
     179              :                 );
     180              : 
     181            0 :                 Ok(Some(tenant_shard))
     182              :             }
     183              :         }
     184            0 :     }
     185              : }
     186              : 
     187              : #[cfg(test)]
     188              : mod tests {
     189              :     use std::sync::Arc;
     190              : 
     191              :     use utils::{
     192              :         id::TenantId,
     193              :         shard::{ShardCount, ShardNumber, TenantShardId},
     194              :     };
     195              : 
     196              :     use super::TenantShardIterator;
     197              : 
     198              :     #[test]
     199            1 :     fn test_tenant_shard_iterator() {
     200            1 :         let tenant_id = TenantId::generate();
     201            1 :         let shard_count = ShardCount(8);
     202            1 : 
     203            1 :         let mut tenant_shards = Vec::default();
     204            8 :         for i in 0..shard_count.0 {
     205            8 :             tenant_shards.push((
     206            8 :                 TenantShardId {
     207            8 :                     tenant_id,
     208            8 :                     shard_number: ShardNumber(i),
     209            8 :                     shard_count,
     210            8 :                 },
     211            8 :                 (),
     212            8 :             ))
     213              :         }
     214              : 
     215            1 :         let tenant_shards = Arc::new(tenant_shards);
     216            1 : 
     217            1 :         let mut tid_iter = TenantShardIterator::new({
     218            1 :             let tenants = tenant_shards.clone();
     219            9 :             move |last_inspected_shard: Option<TenantShardId>| {
     220            9 :                 let entry = match last_inspected_shard {
     221            8 :                     Some(skip_past) => {
     222           36 :                         let mut cursor = tenants.iter().skip_while(|(tid, _)| *tid != skip_past);
     223            8 :                         cursor.nth(1)
     224              :                     }
     225            1 :                     None => tenants.first(),
     226              :                 };
     227              : 
     228            9 :                 entry.map(|(tid, _)| tid).copied()
     229            9 :             }
     230            1 :         });
     231            1 : 
     232            1 :         let mut iterated_over = Vec::default();
     233            9 :         while let Some(tid) = tid_iter.next() {
     234            8 :             iterated_over.push((tid, ()));
     235            8 :         }
     236              : 
     237            1 :         assert_eq!(iterated_over, *tenant_shards);
     238            1 :     }
     239              : }
        

Generated by: LCOV version 2.1-beta