LCOV - code coverage report
Current view: top level - storage_controller/src - drain_utils.rs (source / functions) Coverage Total Hit
Test: 90b23405d17e36048d3bb64e314067f397803f1b.info Lines: 41.9 % 124 52
Test Date: 2024-09-20 13:14:58 Functions: 50.0 % 12 6

            Line data    Source code
       1              : use std::{
       2              :     collections::{BTreeMap, HashMap},
       3              :     sync::Arc,
       4              : };
       5              : 
       6              : use pageserver_api::controller_api::NodeSchedulingPolicy;
       7              : use utils::{id::NodeId, shard::TenantShardId};
       8              : 
       9              : use crate::{
      10              :     background_node_operations::OperationError, node::Node, scheduler::Scheduler,
      11              :     tenant_shard::TenantShard,
      12              : };
      13              : 
      14              : pub(crate) struct TenantShardIterator<F> {
      15              :     tenants_accessor: F,
      16              :     inspected_all_shards: bool,
      17              :     last_inspected_shard: Option<TenantShardId>,
      18              : }
      19              : 
      20              : /// A simple iterator which can be used in tandem with [`crate::service::Service`]
      21              : /// to iterate over all known tenant shard ids without holding the lock on the
      22              : /// service state at all times.
      23              : impl<F> TenantShardIterator<F>
      24              : where
      25              :     F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
      26              : {
      27            1 :     pub(crate) fn new(tenants_accessor: F) -> Self {
      28            1 :         Self {
      29            1 :             tenants_accessor,
      30            1 :             inspected_all_shards: false,
      31            1 :             last_inspected_shard: None,
      32            1 :         }
      33            1 :     }
      34              : 
      35              :     /// Returns the next tenant shard id if one exists
      36            9 :     pub(crate) fn next(&mut self) -> Option<TenantShardId> {
      37            9 :         if self.inspected_all_shards {
      38            0 :             return None;
      39            9 :         }
      40            9 : 
      41            9 :         match (self.tenants_accessor)(self.last_inspected_shard) {
      42            8 :             Some(tid) => {
      43            8 :                 self.last_inspected_shard = Some(tid);
      44            8 :                 Some(tid)
      45              :             }
      46              :             None => {
      47            1 :                 self.inspected_all_shards = true;
      48            1 :                 None
      49              :             }
      50              :         }
      51            9 :     }
      52              : 
      53              :     /// Returns true when the end of the iterator is reached and false otherwise
      54            0 :     pub(crate) fn finished(&self) -> bool {
      55            0 :         self.inspected_all_shards
      56            0 :     }
      57              : }
      58              : 
      59              : /// Check that the state of the node being drained is as expected:
      60              : /// node is present in memory and scheduling policy is set to [`NodeSchedulingPolicy::Draining`]
      61            0 : pub(crate) fn validate_node_state(
      62            0 :     node_id: &NodeId,
      63            0 :     nodes: Arc<HashMap<NodeId, Node>>,
      64            0 : ) -> Result<(), OperationError> {
      65            0 :     let node = nodes.get(node_id).ok_or(OperationError::NodeStateChanged(
      66            0 :         format!("node {} was removed", node_id).into(),
      67            0 :     ))?;
      68              : 
      69            0 :     let current_policy = node.get_scheduling();
      70            0 :     if !matches!(current_policy, NodeSchedulingPolicy::Draining) {
      71              :         // TODO(vlad): maybe cancel pending reconciles before erroring out. need to think
      72              :         // about it
      73            0 :         return Err(OperationError::NodeStateChanged(
      74            0 :             format!("node {} changed state to {:?}", node_id, current_policy).into(),
      75            0 :         ));
      76            0 :     }
      77            0 : 
      78            0 :     Ok(())
      79            0 : }
      80              : 
      81              : /// Struct that houses a few utility methods for draining pageserver nodes
      82              : pub(crate) struct TenantShardDrain {
      83              :     pub(crate) drained_node: NodeId,
      84              :     pub(crate) tenant_shard_id: TenantShardId,
      85              : }
      86              : 
      87              : impl TenantShardDrain {
      88              :     /// Check if the tenant shard under question is eligible for drainining:
      89              :     /// it's primary attachment is on the node being drained
      90            0 :     pub(crate) fn tenant_shard_eligible_for_drain(
      91            0 :         &self,
      92            0 :         tenants: &BTreeMap<TenantShardId, TenantShard>,
      93            0 :         scheduler: &Scheduler,
      94            0 :     ) -> Option<NodeId> {
      95            0 :         let tenant_shard = tenants.get(&self.tenant_shard_id)?;
      96              : 
      97            0 :         if *tenant_shard.intent.get_attached() != Some(self.drained_node) {
      98            0 :             return None;
      99            0 :         }
     100            0 : 
     101            0 :         match scheduler.node_preferred(tenant_shard.intent.get_secondary()) {
     102            0 :             Some(node) => Some(node),
     103              :             None => {
     104            0 :                 tracing::warn!(
     105            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     106            0 :                     "No eligible secondary while draining {}", self.drained_node
     107              :                 );
     108              : 
     109            0 :                 None
     110              :             }
     111              :         }
     112            0 :     }
     113              : 
     114              :     /// Attempt to reschedule the tenant shard under question to one of its secondary locations
     115              :     /// Returns an Err when the operation should be aborted and Ok(None) when the tenant shard
     116              :     /// should be skipped.
     117            0 :     pub(crate) fn reschedule_to_secondary<'a>(
     118            0 :         &self,
     119            0 :         destination: NodeId,
     120            0 :         tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
     121            0 :         scheduler: &mut Scheduler,
     122            0 :         nodes: &Arc<HashMap<NodeId, Node>>,
     123            0 :     ) -> Result<Option<&'a mut TenantShard>, OperationError> {
     124            0 :         let tenant_shard = match tenants.get_mut(&self.tenant_shard_id) {
     125            0 :             Some(some) => some,
     126              :             None => {
     127              :                 // Tenant shard was removed in the meantime.
     128              :                 // Skip to the next one, but don't fail the overall operation
     129            0 :                 return Ok(None);
     130              :             }
     131              :         };
     132              : 
     133            0 :         if !nodes.contains_key(&destination) {
     134            0 :             return Err(OperationError::NodeStateChanged(
     135            0 :                 format!("node {} was removed", destination).into(),
     136            0 :             ));
     137            0 :         }
     138            0 : 
     139            0 :         if !tenant_shard.intent.get_secondary().contains(&destination) {
     140            0 :             tracing::info!(
     141            0 :                 tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     142            0 :                 "Secondary moved away from {destination} during drain"
     143              :             );
     144              : 
     145            0 :             return Ok(None);
     146            0 :         }
     147            0 : 
     148            0 :         match tenant_shard.reschedule_to_secondary(Some(destination), scheduler) {
     149            0 :             Err(e) => {
     150            0 :                 tracing::warn!(
     151            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     152            0 :                     "Scheduling error when draining pageserver {} : {}", self.drained_node, e
     153              :                 );
     154              : 
     155            0 :                 Ok(None)
     156              :             }
     157              :             Ok(()) => {
     158            0 :                 let scheduled_to = tenant_shard.intent.get_attached();
     159            0 :                 tracing::info!(
     160            0 :                     tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
     161            0 :                     "Rescheduled shard while draining node {}: {} -> {:?}",
     162              :                     self.drained_node,
     163              :                     self.drained_node,
     164              :                     scheduled_to
     165              :                 );
     166              : 
     167            0 :                 Ok(Some(tenant_shard))
     168              :             }
     169              :         }
     170            0 :     }
     171              : }
     172              : 
     173              : #[cfg(test)]
     174              : mod tests {
     175              :     use std::sync::Arc;
     176              : 
     177              :     use utils::{
     178              :         id::TenantId,
     179              :         shard::{ShardCount, ShardNumber, TenantShardId},
     180              :     };
     181              : 
     182              :     use super::TenantShardIterator;
     183              : 
     184              :     #[test]
     185            1 :     fn test_tenant_shard_iterator() {
     186            1 :         let tenant_id = TenantId::generate();
     187            1 :         let shard_count = ShardCount(8);
     188            1 : 
     189            1 :         let mut tenant_shards = Vec::default();
     190            8 :         for i in 0..shard_count.0 {
     191            8 :             tenant_shards.push((
     192            8 :                 TenantShardId {
     193            8 :                     tenant_id,
     194            8 :                     shard_number: ShardNumber(i),
     195            8 :                     shard_count,
     196            8 :                 },
     197            8 :                 (),
     198            8 :             ))
     199              :         }
     200              : 
     201            1 :         let tenant_shards = Arc::new(tenant_shards);
     202            1 : 
     203            1 :         let mut tid_iter = TenantShardIterator::new({
     204            1 :             let tenants = tenant_shards.clone();
     205            9 :             move |last_inspected_shard: Option<TenantShardId>| {
     206            9 :                 let entry = match last_inspected_shard {
     207            8 :                     Some(skip_past) => {
     208           36 :                         let mut cursor = tenants.iter().skip_while(|(tid, _)| *tid != skip_past);
     209            8 :                         cursor.nth(1)
     210              :                     }
     211            1 :                     None => tenants.first(),
     212              :                 };
     213              : 
     214            9 :                 entry.map(|(tid, _)| tid).copied()
     215            9 :             }
     216            1 :         });
     217            1 : 
     218            1 :         let mut iterated_over = Vec::default();
     219            9 :         while let Some(tid) = tid_iter.next() {
     220            8 :             iterated_over.push((tid, ()));
     221            8 :         }
     222              : 
     223            1 :         assert_eq!(iterated_over, *tenant_shards);
     224            1 :     }
     225              : }
        

Generated by: LCOV version 2.1-beta