LCOV - code coverage report
Current view: top level - storage_controller/src/service - tenant_shard_iterator.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 86.6 % 142 123
Test Date: 2025-07-16 12:29:03 Functions: 57.1 % 14 8

            Line data    Source code
       1              : use std::collections::BTreeMap;
       2              : use std::sync::Arc;
       3              : 
       4              : use utils::id::TenantId;
       5              : use utils::shard::TenantShardId;
       6              : 
       7              : use crate::scheduler::{ScheduleContext, ScheduleMode};
       8              : use crate::tenant_shard::TenantShard;
       9              : 
      10              : use super::Service;
      11              : 
      12              : /// Exclusive iterator over all tenant shards.
      13              : /// It is used to iterate over consistent tenants state at specific point in time.
      14              : ///
      15              : /// When making scheduling decisions, it is useful to have the ScheduleContext for a whole
      16              : /// tenant while considering the individual shards within it.  This iterator is a helper
      17              : /// that gathers all the shards in a tenant and then yields them together with a ScheduleContext
      18              : /// for the tenant.
      19              : pub(super) struct TenantShardExclusiveIterator<'a> {
      20              :     schedule_mode: ScheduleMode,
      21              :     inner: std::collections::btree_map::IterMut<'a, TenantShardId, TenantShard>,
      22              : }
      23              : 
      24              : impl<'a> TenantShardExclusiveIterator<'a> {
      25            1 :     pub(super) fn new(
      26            1 :         tenants: &'a mut BTreeMap<TenantShardId, TenantShard>,
      27            1 :         schedule_mode: ScheduleMode,
      28            1 :     ) -> Self {
      29            1 :         Self {
      30            1 :             schedule_mode,
      31            1 :             inner: tenants.iter_mut(),
      32            1 :         }
      33            1 :     }
      34              : }
      35              : 
      36              : impl<'a> Iterator for TenantShardExclusiveIterator<'a> {
      37              :     type Item = (TenantId, ScheduleContext, Vec<&'a mut TenantShard>);
      38              : 
      39            3 :     fn next(&mut self) -> Option<Self::Item> {
      40            3 :         let mut tenant_shards = Vec::new();
      41            3 :         let mut schedule_context = ScheduleContext::new(self.schedule_mode.clone());
      42              :         loop {
      43            6 :             let (tenant_shard_id, shard) = self.inner.next()?;
      44              : 
      45            6 :             if tenant_shard_id.is_shard_zero() {
      46              :                 // Cleared on last shard of previous tenant
      47            3 :                 assert!(tenant_shards.is_empty());
      48            3 :             }
      49              : 
      50              :             // Accumulate the schedule context for all the shards in a tenant
      51            6 :             schedule_context.avoid(&shard.intent.all_pageservers());
      52            6 :             tenant_shards.push(shard);
      53              : 
      54            6 :             if tenant_shard_id.shard_number.0 == tenant_shard_id.shard_count.count() - 1 {
      55            3 :                 return Some((tenant_shard_id.tenant_id, schedule_context, tenant_shards));
      56            3 :             }
      57              :         }
      58            3 :     }
      59              : }
      60              : 
      61              : /// Shared iterator over all tenant shards.
      62              : /// It is used to iterate over all tenants without blocking another code, working with tenants
      63              : ///
      64              : /// A simple iterator which can be used in tandem with [`crate::service::Service`]
      65              : /// to iterate over all known tenant shard ids without holding the lock on the
      66              : /// service state at all times.
      67              : pub(crate) struct TenantShardSharedIterator<F> {
      68              :     tenants_accessor: F,
      69              :     inspected_all_shards: bool,
      70              :     last_inspected_shard: Option<TenantShardId>,
      71              : }
      72              : 
      73              : impl<F> TenantShardSharedIterator<F>
      74              : where
      75              :     F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
      76              : {
      77            1 :     pub(crate) fn new(tenants_accessor: F) -> Self {
      78            1 :         Self {
      79            1 :             tenants_accessor,
      80            1 :             inspected_all_shards: false,
      81            1 :             last_inspected_shard: None,
      82            1 :         }
      83            1 :     }
      84              : 
      85            0 :     pub(crate) fn finished(&self) -> bool {
      86            0 :         self.inspected_all_shards
      87            0 :     }
      88              : }
      89              : 
      90              : impl<F> Iterator for TenantShardSharedIterator<F>
      91              : where
      92              :     F: Fn(Option<TenantShardId>) -> Option<TenantShardId>,
      93              : {
      94              :     // TODO(ephemeralsad): consider adding schedule context to the iterator
      95              :     type Item = TenantShardId;
      96              : 
      97              :     /// Returns the next tenant shard id if one exists
      98            9 :     fn next(&mut self) -> Option<Self::Item> {
      99            9 :         if self.inspected_all_shards {
     100            0 :             return None;
     101            9 :         }
     102              : 
     103            9 :         match (self.tenants_accessor)(self.last_inspected_shard) {
     104            8 :             Some(tid) => {
     105            8 :                 self.last_inspected_shard = Some(tid);
     106            8 :                 Some(tid)
     107              :             }
     108              :             None => {
     109            1 :                 self.inspected_all_shards = true;
     110            1 :                 None
     111              :             }
     112              :         }
     113            9 :     }
     114              : }
     115              : 
     116            0 : pub(crate) fn create_shared_shard_iterator(
     117            0 :     service: Arc<Service>,
     118            0 : ) -> TenantShardSharedIterator<impl Fn(Option<TenantShardId>) -> Option<TenantShardId>> {
     119            0 :     let tenants_accessor = move |last_inspected_shard: Option<TenantShardId>| {
     120            0 :         let locked = &service.inner.read().unwrap();
     121            0 :         let tenants = &locked.tenants;
     122            0 :         let entry = match last_inspected_shard {
     123            0 :             Some(skip_past) => {
     124              :                 // Skip to the last seen tenant shard id
     125            0 :                 let mut cursor = tenants.iter().skip_while(|(tid, _)| **tid != skip_past);
     126              : 
     127              :                 // Skip past the last seen
     128            0 :                 cursor.nth(1)
     129              :             }
     130            0 :             None => tenants.first_key_value(),
     131              :         };
     132              : 
     133            0 :         entry.map(|(tid, _)| tid).copied()
     134            0 :     };
     135              : 
     136            0 :     TenantShardSharedIterator::new(tenants_accessor)
     137            0 : }
     138              : 
     139              : #[cfg(test)]
     140              : mod tests {
     141              :     use std::collections::BTreeMap;
     142              :     use std::str::FromStr;
     143              :     use std::sync::Arc;
     144              : 
     145              :     use pageserver_api::controller_api::PlacementPolicy;
     146              :     use utils::id::TenantId;
     147              :     use utils::shard::{ShardCount, ShardNumber, TenantShardId};
     148              : 
     149              :     use super::*;
     150              :     use crate::scheduler::test_utils::make_test_nodes;
     151              :     use crate::service::Scheduler;
     152              :     use crate::tenant_shard::tests::make_test_tenant_with_id;
     153              : 
     154              :     #[test]
     155            1 :     fn test_exclusive_shard_iterator() {
     156              :         // Hand-crafted tenant IDs to ensure they appear in the expected order when put into
     157              :         // a btreemap & iterated
     158            1 :         let mut t_1_shards = make_test_tenant_with_id(
     159            1 :             TenantId::from_str("af0480929707ee75372337efaa5ecf96").unwrap(),
     160            1 :             PlacementPolicy::Attached(1),
     161            1 :             ShardCount(1),
     162            1 :             None,
     163              :         );
     164            1 :         let t_2_shards = make_test_tenant_with_id(
     165            1 :             TenantId::from_str("bf0480929707ee75372337efaa5ecf96").unwrap(),
     166            1 :             PlacementPolicy::Attached(1),
     167            1 :             ShardCount(4),
     168            1 :             None,
     169              :         );
     170            1 :         let mut t_3_shards = make_test_tenant_with_id(
     171            1 :             TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap(),
     172            1 :             PlacementPolicy::Attached(1),
     173            1 :             ShardCount(1),
     174            1 :             None,
     175              :         );
     176              : 
     177            1 :         let t1_id = t_1_shards[0].tenant_shard_id.tenant_id;
     178            1 :         let t2_id = t_2_shards[0].tenant_shard_id.tenant_id;
     179            1 :         let t3_id = t_3_shards[0].tenant_shard_id.tenant_id;
     180              : 
     181            1 :         let mut tenants = BTreeMap::new();
     182            1 :         tenants.insert(t_1_shards[0].tenant_shard_id, t_1_shards.pop().unwrap());
     183            5 :         for shard in t_2_shards {
     184            4 :             tenants.insert(shard.tenant_shard_id, shard);
     185            4 :         }
     186            1 :         tenants.insert(t_3_shards[0].tenant_shard_id, t_3_shards.pop().unwrap());
     187              : 
     188            1 :         let nodes = make_test_nodes(3, &[]);
     189            1 :         let mut scheduler = Scheduler::new(nodes.values());
     190            1 :         let mut context = ScheduleContext::default();
     191            6 :         for shard in tenants.values_mut() {
     192            6 :             shard.schedule(&mut scheduler, &mut context).unwrap();
     193            6 :         }
     194              : 
     195            1 :         let mut iter = TenantShardExclusiveIterator::new(&mut tenants, ScheduleMode::Speculative);
     196            1 :         let (tenant_id, context, shards) = iter.next().unwrap();
     197            1 :         assert_eq!(tenant_id, t1_id);
     198            1 :         assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
     199            1 :         assert_eq!(shards.len(), 1);
     200            1 :         assert_eq!(context.location_count(), 2);
     201              : 
     202            1 :         let (tenant_id, context, shards) = iter.next().unwrap();
     203            1 :         assert_eq!(tenant_id, t2_id);
     204            1 :         assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
     205            1 :         assert_eq!(shards[1].tenant_shard_id.shard_number, ShardNumber(1));
     206            1 :         assert_eq!(shards[2].tenant_shard_id.shard_number, ShardNumber(2));
     207            1 :         assert_eq!(shards[3].tenant_shard_id.shard_number, ShardNumber(3));
     208            1 :         assert_eq!(shards.len(), 4);
     209            1 :         assert_eq!(context.location_count(), 8);
     210              : 
     211            1 :         let (tenant_id, context, shards) = iter.next().unwrap();
     212            1 :         assert_eq!(tenant_id, t3_id);
     213            1 :         assert_eq!(shards[0].tenant_shard_id.shard_number, ShardNumber(0));
     214            1 :         assert_eq!(shards.len(), 1);
     215            1 :         assert_eq!(context.location_count(), 2);
     216              : 
     217            6 :         for shard in tenants.values_mut() {
     218            6 :             shard.intent.clear(&mut scheduler);
     219            6 :         }
     220            1 :     }
     221              : 
     222              :     #[test]
     223            1 :     fn test_shared_shard_iterator() {
     224            1 :         let tenant_id = TenantId::generate();
     225            1 :         let shard_count = ShardCount(8);
     226              : 
     227            1 :         let mut tenant_shards = Vec::default();
     228            8 :         for i in 0..shard_count.0 {
     229            8 :             tenant_shards.push((
     230            8 :                 TenantShardId {
     231            8 :                     tenant_id,
     232            8 :                     shard_number: ShardNumber(i),
     233            8 :                     shard_count,
     234            8 :                 },
     235            8 :                 (),
     236            8 :             ))
     237              :         }
     238              : 
     239            1 :         let tenant_shards = Arc::new(tenant_shards);
     240              : 
     241            1 :         let tid_iter = TenantShardSharedIterator::new({
     242            1 :             let tenants = tenant_shards.clone();
     243            9 :             move |last_inspected_shard: Option<TenantShardId>| {
     244            9 :                 let entry = match last_inspected_shard {
     245            8 :                     Some(skip_past) => {
     246           36 :                         let mut cursor = tenants.iter().skip_while(|(tid, _)| *tid != skip_past);
     247            8 :                         cursor.nth(1)
     248              :                     }
     249            1 :                     None => tenants.first(),
     250              :                 };
     251              : 
     252            9 :                 entry.map(|(tid, _)| tid).copied()
     253            9 :             }
     254              :         });
     255              : 
     256            1 :         let mut iterated_over = Vec::default();
     257            9 :         for tid in tid_iter {
     258            8 :             iterated_over.push((tid, ()));
     259            8 :         }
     260              : 
     261            1 :         assert_eq!(iterated_over, *tenant_shards);
     262            1 :     }
     263              : }
        

Generated by: LCOV version 2.1-beta