LCOV - code coverage report
Current view: top level - storage_controller/src - compute_hook.rs (source / functions) Coverage Total Hit
Test: 36bb8dd7c7efcb53483d1a7d9f7cb33e8406dcf0.info Lines: 49.8 % 283 141
Test Date: 2024-04-08 10:22:05 Functions: 15.1 % 53 8

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::{collections::HashMap, time::Duration};
       3              : 
       4              : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
       5              : use control_plane::local_env::LocalEnv;
       6              : use hyper::{Method, StatusCode};
       7              : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
       8              : use postgres_connection::parse_host_port;
       9              : use serde::{Deserialize, Serialize};
      10              : use tokio_util::sync::CancellationToken;
      11              : use utils::{
      12              :     backoff::{self},
      13              :     id::{NodeId, TenantId},
      14              : };
      15              : 
      16              : use crate::service::Config;
      17              : 
      18              : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
      19              : 
      20              : pub(crate) const API_CONCURRENCY: usize = 32;
      21              : 
      22              : struct UnshardedComputeHookTenant {
      23              :     // Which node is this tenant attached to
      24              :     node_id: NodeId,
      25              : 
      26              :     // Must hold this lock to send a notification.
      27              :     send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
      28              : }
      29              : struct ShardedComputeHookTenant {
      30              :     stripe_size: ShardStripeSize,
      31              :     shard_count: ShardCount,
      32              :     shards: Vec<(ShardNumber, NodeId)>,
      33              : 
      34              :     // Must hold this lock to send a notification.  The contents represent
      35              :     // the last successfully sent notification, and are used to coalesce multiple
      36              :     // updates by only sending when there is a chance since our last successful send.
      37              :     send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
      38              : }
      39              : 
      40              : enum ComputeHookTenant {
      41              :     Unsharded(UnshardedComputeHookTenant),
      42              :     Sharded(ShardedComputeHookTenant),
      43              : }
      44              : 
      45              : impl ComputeHookTenant {
      46              :     /// Construct with at least one shard's information
      47            4 :     fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
      48            4 :         if tenant_shard_id.shard_count.count() > 1 {
      49            2 :             Self::Sharded(ShardedComputeHookTenant {
      50            2 :                 shards: vec![(tenant_shard_id.shard_number, node_id)],
      51            2 :                 stripe_size,
      52            2 :                 shard_count: tenant_shard_id.shard_count,
      53            2 :                 send_lock: Arc::default(),
      54            2 :             })
      55              :         } else {
      56            2 :             Self::Unsharded(UnshardedComputeHookTenant {
      57            2 :                 node_id,
      58            2 :                 send_lock: Arc::default(),
      59            2 :             })
      60              :         }
      61            4 :     }
      62              : 
      63            8 :     fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>> {
      64            8 :         match self {
      65            4 :             Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
      66            4 :             Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
      67              :         }
      68            8 :     }
      69              : 
      70              :     /// Set one shard's location.  If stripe size or shard count have changed, Self is reset
      71              :     /// and drops existing content.
      72            4 :     fn update(
      73            4 :         &mut self,
      74            4 :         tenant_shard_id: TenantShardId,
      75            4 :         stripe_size: ShardStripeSize,
      76            4 :         node_id: NodeId,
      77            4 :     ) {
      78            2 :         match self {
      79            2 :             Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
      80            0 :                 unsharded_tenant.node_id = node_id
      81              :             }
      82            2 :             Self::Sharded(sharded_tenant)
      83            2 :                 if sharded_tenant.stripe_size == stripe_size
      84            2 :                     && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
      85              :             {
      86            2 :                 if let Some(existing) = sharded_tenant
      87            2 :                     .shards
      88            2 :                     .iter()
      89            2 :                     .position(|s| s.0 == tenant_shard_id.shard_number)
      90            0 :                 {
      91            0 :                     sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
      92            0 :                 } else {
      93            2 :                     sharded_tenant
      94            2 :                         .shards
      95            2 :                         .push((tenant_shard_id.shard_number, node_id));
      96            4 :                     sharded_tenant.shards.sort_by_key(|s| s.0)
      97              :                 }
      98              :             }
      99            2 :             _ => {
     100            2 :                 // Shard count changed: reset struct.
     101            2 :                 *self = Self::new(tenant_shard_id, stripe_size, node_id);
     102            2 :             }
     103              :         }
     104            4 :     }
     105              : }
     106              : 
     107            0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
     108              : struct ComputeHookNotifyRequestShard {
     109              :     node_id: NodeId,
     110              :     shard_number: ShardNumber,
     111              : }
     112              : 
     113              : /// Request body that we send to the control plane to notify it of where a tenant is attached
     114            0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
     115              : struct ComputeHookNotifyRequest {
     116              :     tenant_id: TenantId,
     117              :     stripe_size: Option<ShardStripeSize>,
     118              :     shards: Vec<ComputeHookNotifyRequestShard>,
     119              : }
     120              : 
     121              : /// Error type for attempts to call into the control plane compute notification hook
     122            0 : #[derive(thiserror::Error, Debug)]
     123              : pub(crate) enum NotifyError {
     124              :     // Request was not send successfully, e.g. transport error
     125              :     #[error("Sending request: {0}")]
     126              :     Request(#[from] reqwest::Error),
     127              :     // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
     128              :     #[error("Control plane tenant busy")]
     129              :     Busy,
     130              :     // Explicit 429 response asking us to retry less frequently
     131              :     #[error("Control plane overloaded")]
     132              :     SlowDown,
     133              :     // A 503 response indicates the control plane can't handle the request right now
     134              :     #[error("Control plane unavailable (status {0})")]
     135              :     Unavailable(StatusCode),
     136              :     // API returned unexpected non-success status.  We will retry, but log a warning.
     137              :     #[error("Control plane returned unexpected status {0}")]
     138              :     Unexpected(StatusCode),
     139              :     // We shutdown while sending
     140              :     #[error("Shutting down")]
     141              :     ShuttingDown,
     142              :     // A response indicates we will never succeed, such as 400 or 404
     143              :     #[error("Non-retryable error {0}")]
     144              :     Fatal(StatusCode),
     145              : }
     146              : 
     147              : enum MaybeSendResult {
     148              :     // Please send this request while holding the lock, and if you succeed then write
     149              :     // the request into the lock.
     150              :     Transmit(
     151              :         (
     152              :             ComputeHookNotifyRequest,
     153              :             tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>,
     154              :         ),
     155              :     ),
     156              :     // Something requires sending, but you must wait for a current sender then call again
     157              :     AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>),
     158              :     // Nothing requires sending
     159              :     Noop,
     160              : }
     161              : 
     162              : impl ComputeHookTenant {
     163            8 :     fn maybe_send(
     164            8 :         &self,
     165            8 :         tenant_id: TenantId,
     166            8 :         lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>>,
     167            8 :     ) -> MaybeSendResult {
     168            8 :         let locked = match lock {
     169            0 :             Some(already_locked) => already_locked,
     170              :             None => {
     171              :                 // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
     172            8 :                 let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
     173            0 :                     return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
     174              :                 };
     175            8 :                 locked
     176              :             }
     177              :         };
     178              : 
     179            8 :         let request = match self {
     180            4 :             Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
     181            4 :                 tenant_id,
     182            4 :                 shards: vec![ComputeHookNotifyRequestShard {
     183            4 :                     shard_number: ShardNumber(0),
     184            4 :                     node_id: unsharded_tenant.node_id,
     185            4 :                 }],
     186            4 :                 stripe_size: None,
     187            4 :             }),
     188            4 :             Self::Sharded(sharded_tenant)
     189            4 :                 if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
     190            2 :             {
     191            2 :                 Some(ComputeHookNotifyRequest {
     192            2 :                     tenant_id,
     193            2 :                     shards: sharded_tenant
     194            2 :                         .shards
     195            2 :                         .iter()
     196            4 :                         .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
     197            4 :                             shard_number: *shard_number,
     198            4 :                             node_id: *node_id,
     199            4 :                         })
     200            2 :                         .collect(),
     201            2 :                     stripe_size: Some(sharded_tenant.stripe_size),
     202            2 :                 })
     203              :             }
     204            2 :             Self::Sharded(sharded_tenant) => {
     205            2 :                 // Sharded tenant doesn't yet have information for all its shards
     206            2 : 
     207            2 :                 tracing::info!(
     208            0 :                     "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
     209            0 :                     sharded_tenant.shards.len(),
     210            0 :                     sharded_tenant.shard_count.count()
     211            0 :                 );
     212            2 :                 None
     213              :             }
     214              :         };
     215              : 
     216            6 :         match request {
     217              :             None => {
     218              :                 // Not yet ready to emit a notification
     219            2 :                 tracing::info!("Tenant isn't yet ready to emit a notification");
     220            2 :                 MaybeSendResult::Noop
     221              :             }
     222            6 :             Some(request) if Some(&request) == locked.as_ref() => {
     223            2 :                 // No change from the last value successfully sent
     224            2 :                 MaybeSendResult::Noop
     225              :             }
     226            4 :             Some(request) => MaybeSendResult::Transmit((request, locked)),
     227              :         }
     228            8 :     }
     229              : }
     230              : 
     231              : /// The compute hook is a destination for notifications about changes to tenant:pageserver
     232              : /// mapping.  It aggregates updates for the shards in a tenant, and when appropriate reconfigures
     233              : /// the compute connection string.
     234              : pub(super) struct ComputeHook {
     235              :     config: Config,
     236              :     state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
     237              :     authorization_header: Option<String>,
     238              : 
     239              :     // Concurrency limiter, so that we do not overload the cloud control plane when updating
     240              :     // large numbers of tenants (e.g. when failing over after a node failure)
     241              :     api_concurrency: tokio::sync::Semaphore,
     242              : 
     243              :     // This lock is only used in testing enviroments, to serialize calls into neon_lock
     244              :     neon_local_lock: tokio::sync::Mutex<()>,
     245              : }
     246              : 
     247              : impl ComputeHook {
     248            0 :     pub(super) fn new(config: Config) -> Self {
     249            0 :         let authorization_header = config
     250            0 :             .control_plane_jwt_token
     251            0 :             .clone()
     252            0 :             .map(|jwt| format!("Bearer {}", jwt));
     253            0 : 
     254            0 :         Self {
     255            0 :             state: Default::default(),
     256            0 :             config,
     257            0 :             authorization_header,
     258            0 :             neon_local_lock: Default::default(),
     259            0 :             api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
     260            0 :         }
     261            0 :     }
     262              : 
     263              :     /// For test environments: use neon_local's LocalEnv to update compute
     264            0 :     async fn do_notify_local(
     265            0 :         &self,
     266            0 :         reconfigure_request: &ComputeHookNotifyRequest,
     267            0 :     ) -> anyhow::Result<()> {
     268              :         // neon_local updates are not safe to call concurrently, use a lock to serialize
     269              :         // all calls to this function
     270            0 :         let _locked = self.neon_local_lock.lock().await;
     271              : 
     272            0 :         let env = match LocalEnv::load_config() {
     273            0 :             Ok(e) => e,
     274            0 :             Err(e) => {
     275            0 :                 tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
     276            0 :                 return Ok(());
     277              :             }
     278              :         };
     279            0 :         let cplane =
     280            0 :             ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
     281            0 :         let ComputeHookNotifyRequest {
     282            0 :             tenant_id,
     283            0 :             shards,
     284            0 :             stripe_size,
     285            0 :         } = reconfigure_request;
     286            0 : 
     287            0 :         let compute_pageservers = shards
     288            0 :             .iter()
     289            0 :             .map(|shard| {
     290            0 :                 let ps_conf = env
     291            0 :                     .get_pageserver_conf(shard.node_id)
     292            0 :                     .expect("Unknown pageserver");
     293            0 :                 let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
     294            0 :                     .expect("Unable to parse listen_pg_addr");
     295            0 :                 (pg_host, pg_port.unwrap_or(5432))
     296            0 :             })
     297            0 :             .collect::<Vec<_>>();
     298              : 
     299            0 :         for (endpoint_name, endpoint) in &cplane.endpoints {
     300            0 :             if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
     301            0 :                 tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
     302            0 :                 endpoint
     303            0 :                     .reconfigure(compute_pageservers.clone(), *stripe_size)
     304            0 :                     .await?;
     305            0 :             }
     306              :         }
     307              : 
     308            0 :         Ok(())
     309            0 :     }
     310              : 
     311            0 :     async fn do_notify_iteration(
     312            0 :         &self,
     313            0 :         client: &reqwest::Client,
     314            0 :         url: &String,
     315            0 :         reconfigure_request: &ComputeHookNotifyRequest,
     316            0 :         cancel: &CancellationToken,
     317            0 :     ) -> Result<(), NotifyError> {
     318            0 :         let req = client.request(Method::PUT, url);
     319            0 :         let req = if let Some(value) = &self.authorization_header {
     320            0 :             req.header(reqwest::header::AUTHORIZATION, value)
     321              :         } else {
     322            0 :             req
     323              :         };
     324              : 
     325            0 :         tracing::info!(
     326            0 :             "Sending notify request to {} ({:?})",
     327            0 :             url,
     328            0 :             reconfigure_request
     329            0 :         );
     330            0 :         let send_result = req.json(&reconfigure_request).send().await;
     331            0 :         let response = match send_result {
     332            0 :             Ok(r) => r,
     333            0 :             Err(e) => return Err(e.into()),
     334              :         };
     335              : 
     336              :         // Treat all 2xx responses as success
     337            0 :         if response.status() >= StatusCode::OK && response.status() < StatusCode::MULTIPLE_CHOICES {
     338            0 :             if response.status() != StatusCode::OK {
     339              :                 // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
     340              :                 // log a warning.
     341            0 :                 tracing::warn!(
     342            0 :                     "Unexpected 2xx response code {} from control plane",
     343            0 :                     response.status()
     344            0 :                 );
     345            0 :             }
     346              : 
     347            0 :             return Ok(());
     348            0 :         }
     349            0 : 
     350            0 :         // Error response codes
     351            0 :         match response.status() {
     352              :             StatusCode::TOO_MANY_REQUESTS => {
     353              :                 // TODO: 429 handling should be global: set some state visible to other requests
     354              :                 // so that they will delay before starting, rather than all notifications trying
     355              :                 // once before backing off.
     356            0 :                 tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
     357            0 :                     .await
     358            0 :                     .ok();
     359            0 :                 Err(NotifyError::SlowDown)
     360              :             }
     361              :             StatusCode::LOCKED => {
     362              :                 // We consider this fatal, because it's possible that the operation blocking the control one is
     363              :                 // also the one that is waiting for this reconcile.  We should let the reconciler calling
     364              :                 // this hook fail, to give control plane a chance to un-lock.
     365            0 :                 tracing::info!("Control plane reports tenant is locked, dropping out of notify");
     366            0 :                 Err(NotifyError::Busy)
     367              :             }
     368              :             StatusCode::SERVICE_UNAVAILABLE
     369              :             | StatusCode::GATEWAY_TIMEOUT
     370            0 :             | StatusCode::BAD_GATEWAY => Err(NotifyError::Unavailable(response.status())),
     371              :             StatusCode::BAD_REQUEST | StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
     372            0 :                 Err(NotifyError::Fatal(response.status()))
     373              :             }
     374            0 :             _ => Err(NotifyError::Unexpected(response.status())),
     375              :         }
     376            0 :     }
     377              : 
     378            0 :     async fn do_notify(
     379            0 :         &self,
     380            0 :         url: &String,
     381            0 :         reconfigure_request: &ComputeHookNotifyRequest,
     382            0 :         cancel: &CancellationToken,
     383            0 :     ) -> Result<(), NotifyError> {
     384            0 :         let client = reqwest::Client::new();
     385              : 
     386              :         // We hold these semaphore units across all retries, rather than only across each
     387              :         // HTTP request: this is to preserve fairness and avoid a situation where a retry might
     388              :         // time out waiting for a semaphore.
     389            0 :         let _units = self
     390            0 :             .api_concurrency
     391            0 :             .acquire()
     392            0 :             .await
     393              :             // Interpret closed semaphore as shutdown
     394            0 :             .map_err(|_| NotifyError::ShuttingDown)?;
     395              : 
     396            0 :         backoff::retry(
     397            0 :             || self.do_notify_iteration(&client, url, reconfigure_request, cancel),
     398            0 :             |e| {
     399            0 :                 matches!(
     400            0 :                     e,
     401              :                     NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
     402              :                 )
     403            0 :             },
     404            0 :             3,
     405            0 :             10,
     406            0 :             "Send compute notification",
     407            0 :             cancel,
     408            0 :         )
     409            0 :         .await
     410            0 :         .ok_or_else(|| NotifyError::ShuttingDown)
     411            0 :         .and_then(|x| x)
     412            0 :     }
     413              : 
     414              :     /// Call this to notify the compute (postgres) tier of new pageservers to use
     415              :     /// for a tenant.  notify() is called by each shard individually, and this function
     416              :     /// will decide whether an update to the tenant is sent.  An update is sent on the
     417              :     /// condition that:
     418              :     /// - We know a pageserver for every shard.
     419              :     /// - All the shards have the same shard_count (i.e. we are not mid-split)
     420              :     ///
     421              :     /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
     422              :     /// that is cancelled.
     423              :     ///
     424              :     /// This function is fallible, including in the case that the control plane is transiently
     425              :     /// unavailable.  A limited number of retries are done internally to efficiently hide short unavailability
     426              :     /// periods, but we don't retry forever.  The **caller** is responsible for handling failures and
     427              :     /// ensuring that they eventually call again to ensure that the compute is eventually notified of
     428              :     /// the proper pageserver nodes for a tenant.
     429            0 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
     430              :     pub(super) async fn notify(
     431              :         &self,
     432              :         tenant_shard_id: TenantShardId,
     433              :         node_id: NodeId,
     434              :         stripe_size: ShardStripeSize,
     435              :         cancel: &CancellationToken,
     436              :     ) -> Result<(), NotifyError> {
     437              :         let maybe_send_result = {
     438              :             let mut state_locked = self.state.lock().unwrap();
     439              : 
     440              :             use std::collections::hash_map::Entry;
     441              :             let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
     442              :                 Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
     443              :                     tenant_shard_id,
     444              :                     stripe_size,
     445              :                     node_id,
     446              :                 )),
     447              :                 Entry::Occupied(e) => {
     448              :                     let tenant = e.into_mut();
     449              :                     tenant.update(tenant_shard_id, stripe_size, node_id);
     450              :                     tenant
     451              :                 }
     452              :             };
     453              :             tenant.maybe_send(tenant_shard_id.tenant_id, None)
     454              :         };
     455              : 
     456              :         // Process result: we may get an update to send, or we may have to wait for a lock
     457              :         // before trying again.
     458              :         let (request, mut send_lock_guard) = match maybe_send_result {
     459              :             MaybeSendResult::Noop => {
     460              :                 return Ok(());
     461              :             }
     462              :             MaybeSendResult::AwaitLock(send_lock) => {
     463              :                 let send_locked = send_lock.lock_owned().await;
     464              : 
     465              :                 // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
     466              :                 // we have acquired the send lock and take `[Self::state]` lock.  This is safe because maybe_send only uses
     467              :                 // try_lock.
     468              :                 let state_locked = self.state.lock().unwrap();
     469              :                 let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
     470              :                     return Ok(());
     471              :                 };
     472              :                 match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
     473              :                     MaybeSendResult::AwaitLock(_) => {
     474              :                         unreachable!("We supplied lock guard")
     475              :                     }
     476              :                     MaybeSendResult::Noop => {
     477              :                         return Ok(());
     478              :                     }
     479              :                     MaybeSendResult::Transmit((request, lock)) => (request, lock),
     480              :                 }
     481              :             }
     482              :             MaybeSendResult::Transmit((request, lock)) => (request, lock),
     483              :         };
     484              : 
     485              :         let result = if let Some(notify_url) = &self.config.compute_hook_url {
     486              :             self.do_notify(notify_url, &request, cancel).await
     487              :         } else {
     488            0 :             self.do_notify_local(&request).await.map_err(|e| {
     489            0 :                 // This path is for testing only, so munge the error into our prod-style error type.
     490            0 :                 tracing::error!("Local notification hook failed: {e}");
     491            0 :                 NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
     492            0 :             })
     493              :         };
     494              : 
     495              :         if result.is_ok() {
     496              :             // Before dropping the send lock, stash the request we just sent so that
     497              :             // subsequent callers can avoid redundantly re-sending the same thing.
     498              :             *send_lock_guard = Some(request);
     499              :         }
     500              :         result
     501              :     }
     502              : }
     503              : 
     504              : #[cfg(test)]
     505              : pub(crate) mod tests {
     506              :     use pageserver_api::shard::{ShardCount, ShardNumber};
     507              :     use utils::id::TenantId;
     508              : 
     509              :     use super::*;
     510              : 
     511              :     #[test]
     512            2 :     fn tenant_updates() -> anyhow::Result<()> {
     513            2 :         let tenant_id = TenantId::generate();
     514            2 :         let mut tenant_state = ComputeHookTenant::new(
     515            2 :             TenantShardId {
     516            2 :                 tenant_id,
     517            2 :                 shard_count: ShardCount::new(0),
     518            2 :                 shard_number: ShardNumber(0),
     519            2 :             },
     520            2 :             ShardStripeSize(12345),
     521            2 :             NodeId(1),
     522            2 :         );
     523            2 : 
     524            2 :         // An unsharded tenant is always ready to emit a notification, but won't
     525            2 :         // send the same one twice
     526            2 :         let send_result = tenant_state.maybe_send(tenant_id, None);
     527            2 :         let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
     528            0 :             anyhow::bail!("Wrong send result");
     529              :         };
     530            2 :         assert_eq!(request.shards.len(), 1);
     531            2 :         assert!(request.stripe_size.is_none());
     532              : 
     533              :         // Simulate successful send
     534            2 :         *guard = Some(request);
     535            2 :         drop(guard);
     536            2 : 
     537            2 :         // Try asking again: this should be a no-op
     538            2 :         let send_result = tenant_state.maybe_send(tenant_id, None);
     539            2 :         assert!(matches!(send_result, MaybeSendResult::Noop));
     540              : 
     541              :         // Writing the first shard of a multi-sharded situation (i.e. in a split)
     542              :         // resets the tenant state and puts it in an non-notifying state (need to
     543              :         // see all shards)
     544            2 :         tenant_state.update(
     545            2 :             TenantShardId {
     546            2 :                 tenant_id,
     547            2 :                 shard_count: ShardCount::new(2),
     548            2 :                 shard_number: ShardNumber(1),
     549            2 :             },
     550            2 :             ShardStripeSize(32768),
     551            2 :             NodeId(1),
     552            2 :         );
     553            2 :         assert!(matches!(
     554            2 :             tenant_state.maybe_send(tenant_id, None),
     555              :             MaybeSendResult::Noop
     556              :         ));
     557              : 
     558              :         // Writing the second shard makes it ready to notify
     559            2 :         tenant_state.update(
     560            2 :             TenantShardId {
     561            2 :                 tenant_id,
     562            2 :                 shard_count: ShardCount::new(2),
     563            2 :                 shard_number: ShardNumber(0),
     564            2 :             },
     565            2 :             ShardStripeSize(32768),
     566            2 :             NodeId(1),
     567            2 :         );
     568            2 : 
     569            2 :         let send_result = tenant_state.maybe_send(tenant_id, None);
     570            2 :         let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
     571            0 :             anyhow::bail!("Wrong send result");
     572              :         };
     573            2 :         assert_eq!(request.shards.len(), 2);
     574            2 :         assert_eq!(request.stripe_size, Some(ShardStripeSize(32768)));
     575              : 
     576              :         // Simulate successful send
     577            2 :         *guard = Some(request);
     578            2 :         drop(guard);
     579            2 : 
     580            2 :         Ok(())
     581            2 :     }
     582              : }
        

Generated by: LCOV version 2.1-beta