LCOV - code coverage report
Current view: top level - storage_controller/src - compute_hook.rs (source / functions) Coverage Total Hit
Test: b837401fb09d2d9818b70e630fdb67e9799b7b0d.info Lines: 49.1 % 287 141
Test Date: 2024-04-18 15:32:49 Functions: 15.1 % 53 8

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::{collections::HashMap, time::Duration};
       3              : 
       4              : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
       5              : use control_plane::local_env::LocalEnv;
       6              : use hyper::{Method, StatusCode};
       7              : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
       8              : use postgres_connection::parse_host_port;
       9              : use serde::{Deserialize, Serialize};
      10              : use tokio_util::sync::CancellationToken;
      11              : use utils::{
      12              :     backoff::{self},
      13              :     id::{NodeId, TenantId},
      14              : };
      15              : 
      16              : use crate::service::Config;
      17              : 
      18              : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
      19              : 
      20              : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
      21              : 
      22              : pub(crate) const API_CONCURRENCY: usize = 32;
      23              : 
      24              : struct UnshardedComputeHookTenant {
      25              :     // Which node is this tenant attached to
      26              :     node_id: NodeId,
      27              : 
      28              :     // Must hold this lock to send a notification.
      29              :     send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
      30              : }
      31              : struct ShardedComputeHookTenant {
      32              :     stripe_size: ShardStripeSize,
      33              :     shard_count: ShardCount,
      34              :     shards: Vec<(ShardNumber, NodeId)>,
      35              : 
      36              :     // Must hold this lock to send a notification.  The contents represent
      37              :     // the last successfully sent notification, and are used to coalesce multiple
      38              :     // updates by only sending when there is a chance since our last successful send.
      39              :     send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
      40              : }
      41              : 
      42              : enum ComputeHookTenant {
      43              :     Unsharded(UnshardedComputeHookTenant),
      44              :     Sharded(ShardedComputeHookTenant),
      45              : }
      46              : 
      47              : impl ComputeHookTenant {
      48              :     /// Construct with at least one shard's information
      49            4 :     fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
      50            4 :         if tenant_shard_id.shard_count.count() > 1 {
      51            2 :             Self::Sharded(ShardedComputeHookTenant {
      52            2 :                 shards: vec![(tenant_shard_id.shard_number, node_id)],
      53            2 :                 stripe_size,
      54            2 :                 shard_count: tenant_shard_id.shard_count,
      55            2 :                 send_lock: Arc::default(),
      56            2 :             })
      57              :         } else {
      58            2 :             Self::Unsharded(UnshardedComputeHookTenant {
      59            2 :                 node_id,
      60            2 :                 send_lock: Arc::default(),
      61            2 :             })
      62              :         }
      63            4 :     }
      64              : 
      65            8 :     fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>> {
      66            8 :         match self {
      67            4 :             Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
      68            4 :             Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
      69              :         }
      70            8 :     }
      71              : 
      72              :     /// Set one shard's location.  If stripe size or shard count have changed, Self is reset
      73              :     /// and drops existing content.
      74            4 :     fn update(
      75            4 :         &mut self,
      76            4 :         tenant_shard_id: TenantShardId,
      77            4 :         stripe_size: ShardStripeSize,
      78            4 :         node_id: NodeId,
      79            4 :     ) {
      80            2 :         match self {
      81            2 :             Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
      82            0 :                 unsharded_tenant.node_id = node_id
      83              :             }
      84            2 :             Self::Sharded(sharded_tenant)
      85            2 :                 if sharded_tenant.stripe_size == stripe_size
      86            2 :                     && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
      87              :             {
      88            2 :                 if let Some(existing) = sharded_tenant
      89            2 :                     .shards
      90            2 :                     .iter()
      91            2 :                     .position(|s| s.0 == tenant_shard_id.shard_number)
      92            0 :                 {
      93            0 :                     sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
      94            0 :                 } else {
      95            2 :                     sharded_tenant
      96            2 :                         .shards
      97            2 :                         .push((tenant_shard_id.shard_number, node_id));
      98            4 :                     sharded_tenant.shards.sort_by_key(|s| s.0)
      99              :                 }
     100              :             }
     101            2 :             _ => {
     102            2 :                 // Shard count changed: reset struct.
     103            2 :                 *self = Self::new(tenant_shard_id, stripe_size, node_id);
     104            2 :             }
     105              :         }
     106            4 :     }
     107              : }
     108              : 
     109            0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
     110              : struct ComputeHookNotifyRequestShard {
     111              :     node_id: NodeId,
     112              :     shard_number: ShardNumber,
     113              : }
     114              : 
     115              : /// Request body that we send to the control plane to notify it of where a tenant is attached
     116            0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
     117              : struct ComputeHookNotifyRequest {
     118              :     tenant_id: TenantId,
     119              :     stripe_size: Option<ShardStripeSize>,
     120              :     shards: Vec<ComputeHookNotifyRequestShard>,
     121              : }
     122              : 
     123              : /// Error type for attempts to call into the control plane compute notification hook
     124            0 : #[derive(thiserror::Error, Debug)]
     125              : pub(crate) enum NotifyError {
     126              :     // Request was not send successfully, e.g. transport error
     127              :     #[error("Sending request: {0}")]
     128              :     Request(#[from] reqwest::Error),
     129              :     // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
     130              :     #[error("Control plane tenant busy")]
     131              :     Busy,
     132              :     // Explicit 429 response asking us to retry less frequently
     133              :     #[error("Control plane overloaded")]
     134              :     SlowDown,
     135              :     // A 503 response indicates the control plane can't handle the request right now
     136              :     #[error("Control plane unavailable (status {0})")]
     137              :     Unavailable(StatusCode),
     138              :     // API returned unexpected non-success status.  We will retry, but log a warning.
     139              :     #[error("Control plane returned unexpected status {0}")]
     140              :     Unexpected(StatusCode),
     141              :     // We shutdown while sending
     142              :     #[error("Shutting down")]
     143              :     ShuttingDown,
     144              :     // A response indicates we will never succeed, such as 400 or 404
     145              :     #[error("Non-retryable error {0}")]
     146              :     Fatal(StatusCode),
     147              : }
     148              : 
     149              : enum MaybeSendResult {
     150              :     // Please send this request while holding the lock, and if you succeed then write
     151              :     // the request into the lock.
     152              :     Transmit(
     153              :         (
     154              :             ComputeHookNotifyRequest,
     155              :             tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>,
     156              :         ),
     157              :     ),
     158              :     // Something requires sending, but you must wait for a current sender then call again
     159              :     AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>),
     160              :     // Nothing requires sending
     161              :     Noop,
     162              : }
     163              : 
     164              : impl ComputeHookTenant {
     165            8 :     fn maybe_send(
     166            8 :         &self,
     167            8 :         tenant_id: TenantId,
     168            8 :         lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>>,
     169            8 :     ) -> MaybeSendResult {
     170            8 :         let locked = match lock {
     171            0 :             Some(already_locked) => already_locked,
     172              :             None => {
     173              :                 // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
     174            8 :                 let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
     175            0 :                     return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
     176              :                 };
     177            8 :                 locked
     178              :             }
     179              :         };
     180              : 
     181            8 :         let request = match self {
     182            4 :             Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
     183            4 :                 tenant_id,
     184            4 :                 shards: vec![ComputeHookNotifyRequestShard {
     185            4 :                     shard_number: ShardNumber(0),
     186            4 :                     node_id: unsharded_tenant.node_id,
     187            4 :                 }],
     188            4 :                 stripe_size: None,
     189            4 :             }),
     190            4 :             Self::Sharded(sharded_tenant)
     191            4 :                 if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
     192            2 :             {
     193            2 :                 Some(ComputeHookNotifyRequest {
     194            2 :                     tenant_id,
     195            2 :                     shards: sharded_tenant
     196            2 :                         .shards
     197            2 :                         .iter()
     198            4 :                         .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
     199            4 :                             shard_number: *shard_number,
     200            4 :                             node_id: *node_id,
     201            4 :                         })
     202            2 :                         .collect(),
     203            2 :                     stripe_size: Some(sharded_tenant.stripe_size),
     204            2 :                 })
     205              :             }
     206            2 :             Self::Sharded(sharded_tenant) => {
     207            2 :                 // Sharded tenant doesn't yet have information for all its shards
     208            2 : 
     209            2 :                 tracing::info!(
     210            0 :                     "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
     211            0 :                     sharded_tenant.shards.len(),
     212            0 :                     sharded_tenant.shard_count.count()
     213            0 :                 );
     214            2 :                 None
     215              :             }
     216              :         };
     217              : 
     218            6 :         match request {
     219              :             None => {
     220              :                 // Not yet ready to emit a notification
     221            2 :                 tracing::info!("Tenant isn't yet ready to emit a notification");
     222            2 :                 MaybeSendResult::Noop
     223              :             }
     224            6 :             Some(request) if Some(&request) == locked.as_ref() => {
     225            2 :                 // No change from the last value successfully sent
     226            2 :                 MaybeSendResult::Noop
     227              :             }
     228            4 :             Some(request) => MaybeSendResult::Transmit((request, locked)),
     229              :         }
     230            8 :     }
     231              : }
     232              : 
     233              : /// The compute hook is a destination for notifications about changes to tenant:pageserver
     234              : /// mapping.  It aggregates updates for the shards in a tenant, and when appropriate reconfigures
     235              : /// the compute connection string.
     236              : pub(super) struct ComputeHook {
     237              :     config: Config,
     238              :     state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
     239              :     authorization_header: Option<String>,
     240              : 
     241              :     // Concurrency limiter, so that we do not overload the cloud control plane when updating
     242              :     // large numbers of tenants (e.g. when failing over after a node failure)
     243              :     api_concurrency: tokio::sync::Semaphore,
     244              : 
     245              :     // This lock is only used in testing enviroments, to serialize calls into neon_lock
     246              :     neon_local_lock: tokio::sync::Mutex<()>,
     247              : 
     248              :     // We share a client across all notifications to enable connection re-use etc when
     249              :     // sending large numbers of notifications
     250              :     client: reqwest::Client,
     251              : }
     252              : 
     253              : impl ComputeHook {
     254            0 :     pub(super) fn new(config: Config) -> Self {
     255            0 :         let authorization_header = config
     256            0 :             .control_plane_jwt_token
     257            0 :             .clone()
     258            0 :             .map(|jwt| format!("Bearer {}", jwt));
     259            0 : 
     260            0 :         let client = reqwest::ClientBuilder::new()
     261            0 :             .timeout(NOTIFY_REQUEST_TIMEOUT)
     262            0 :             .build()
     263            0 :             .expect("Failed to construct HTTP client");
     264            0 : 
     265            0 :         Self {
     266            0 :             state: Default::default(),
     267            0 :             config,
     268            0 :             authorization_header,
     269            0 :             neon_local_lock: Default::default(),
     270            0 :             api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
     271            0 :             client,
     272            0 :         }
     273            0 :     }
     274              : 
     275              :     /// For test environments: use neon_local's LocalEnv to update compute
     276            0 :     async fn do_notify_local(
     277            0 :         &self,
     278            0 :         reconfigure_request: &ComputeHookNotifyRequest,
     279            0 :     ) -> anyhow::Result<()> {
     280              :         // neon_local updates are not safe to call concurrently, use a lock to serialize
     281              :         // all calls to this function
     282            0 :         let _locked = self.neon_local_lock.lock().await;
     283              : 
     284            0 :         let env = match LocalEnv::load_config() {
     285            0 :             Ok(e) => e,
     286            0 :             Err(e) => {
     287            0 :                 tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
     288            0 :                 return Ok(());
     289              :             }
     290              :         };
     291            0 :         let cplane =
     292            0 :             ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
     293            0 :         let ComputeHookNotifyRequest {
     294            0 :             tenant_id,
     295            0 :             shards,
     296            0 :             stripe_size,
     297            0 :         } = reconfigure_request;
     298            0 : 
     299            0 :         let compute_pageservers = shards
     300            0 :             .iter()
     301            0 :             .map(|shard| {
     302            0 :                 let ps_conf = env
     303            0 :                     .get_pageserver_conf(shard.node_id)
     304            0 :                     .expect("Unknown pageserver");
     305            0 :                 let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
     306            0 :                     .expect("Unable to parse listen_pg_addr");
     307            0 :                 (pg_host, pg_port.unwrap_or(5432))
     308            0 :             })
     309            0 :             .collect::<Vec<_>>();
     310              : 
     311            0 :         for (endpoint_name, endpoint) in &cplane.endpoints {
     312            0 :             if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
     313            0 :                 tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
     314            0 :                 endpoint
     315            0 :                     .reconfigure(compute_pageservers.clone(), *stripe_size)
     316            0 :                     .await?;
     317            0 :             }
     318              :         }
     319              : 
     320            0 :         Ok(())
     321            0 :     }
     322              : 
     323            0 :     async fn do_notify_iteration(
     324            0 :         &self,
     325            0 :         url: &String,
     326            0 :         reconfigure_request: &ComputeHookNotifyRequest,
     327            0 :         cancel: &CancellationToken,
     328            0 :     ) -> Result<(), NotifyError> {
     329            0 :         let req = self.client.request(Method::PUT, url);
     330            0 :         let req = if let Some(value) = &self.authorization_header {
     331            0 :             req.header(reqwest::header::AUTHORIZATION, value)
     332              :         } else {
     333            0 :             req
     334              :         };
     335              : 
     336            0 :         tracing::info!(
     337            0 :             "Sending notify request to {} ({:?})",
     338            0 :             url,
     339            0 :             reconfigure_request
     340            0 :         );
     341            0 :         let send_result = req.json(&reconfigure_request).send().await;
     342            0 :         let response = match send_result {
     343            0 :             Ok(r) => r,
     344            0 :             Err(e) => return Err(e.into()),
     345              :         };
     346              : 
     347              :         // Treat all 2xx responses as success
     348            0 :         if response.status() >= StatusCode::OK && response.status() < StatusCode::MULTIPLE_CHOICES {
     349            0 :             if response.status() != StatusCode::OK {
     350              :                 // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
     351              :                 // log a warning.
     352            0 :                 tracing::warn!(
     353            0 :                     "Unexpected 2xx response code {} from control plane",
     354            0 :                     response.status()
     355            0 :                 );
     356            0 :             }
     357              : 
     358            0 :             return Ok(());
     359            0 :         }
     360            0 : 
     361            0 :         // Error response codes
     362            0 :         match response.status() {
     363              :             StatusCode::TOO_MANY_REQUESTS => {
     364              :                 // TODO: 429 handling should be global: set some state visible to other requests
     365              :                 // so that they will delay before starting, rather than all notifications trying
     366              :                 // once before backing off.
     367            0 :                 tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
     368            0 :                     .await
     369            0 :                     .ok();
     370            0 :                 Err(NotifyError::SlowDown)
     371              :             }
     372              :             StatusCode::LOCKED => {
     373              :                 // We consider this fatal, because it's possible that the operation blocking the control one is
     374              :                 // also the one that is waiting for this reconcile.  We should let the reconciler calling
     375              :                 // this hook fail, to give control plane a chance to un-lock.
     376            0 :                 tracing::info!("Control plane reports tenant is locked, dropping out of notify");
     377            0 :                 Err(NotifyError::Busy)
     378              :             }
     379              :             StatusCode::SERVICE_UNAVAILABLE
     380              :             | StatusCode::GATEWAY_TIMEOUT
     381            0 :             | StatusCode::BAD_GATEWAY => Err(NotifyError::Unavailable(response.status())),
     382              :             StatusCode::BAD_REQUEST | StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
     383            0 :                 Err(NotifyError::Fatal(response.status()))
     384              :             }
     385            0 :             _ => Err(NotifyError::Unexpected(response.status())),
     386              :         }
     387            0 :     }
     388              : 
     389            0 :     async fn do_notify(
     390            0 :         &self,
     391            0 :         url: &String,
     392            0 :         reconfigure_request: &ComputeHookNotifyRequest,
     393            0 :         cancel: &CancellationToken,
     394            0 :     ) -> Result<(), NotifyError> {
     395              :         // We hold these semaphore units across all retries, rather than only across each
     396              :         // HTTP request: this is to preserve fairness and avoid a situation where a retry might
     397              :         // time out waiting for a semaphore.
     398            0 :         let _units = self
     399            0 :             .api_concurrency
     400            0 :             .acquire()
     401            0 :             .await
     402              :             // Interpret closed semaphore as shutdown
     403            0 :             .map_err(|_| NotifyError::ShuttingDown)?;
     404              : 
     405            0 :         backoff::retry(
     406            0 :             || self.do_notify_iteration(url, reconfigure_request, cancel),
     407            0 :             |e| {
     408            0 :                 matches!(
     409            0 :                     e,
     410              :                     NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
     411              :                 )
     412            0 :             },
     413            0 :             3,
     414            0 :             10,
     415            0 :             "Send compute notification",
     416            0 :             cancel,
     417            0 :         )
     418            0 :         .await
     419            0 :         .ok_or_else(|| NotifyError::ShuttingDown)
     420            0 :         .and_then(|x| x)
     421            0 :     }
     422              : 
     423              :     /// Call this to notify the compute (postgres) tier of new pageservers to use
     424              :     /// for a tenant.  notify() is called by each shard individually, and this function
     425              :     /// will decide whether an update to the tenant is sent.  An update is sent on the
     426              :     /// condition that:
     427              :     /// - We know a pageserver for every shard.
     428              :     /// - All the shards have the same shard_count (i.e. we are not mid-split)
     429              :     ///
     430              :     /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
     431              :     /// that is cancelled.
     432              :     ///
     433              :     /// This function is fallible, including in the case that the control plane is transiently
     434              :     /// unavailable.  A limited number of retries are done internally to efficiently hide short unavailability
     435              :     /// periods, but we don't retry forever.  The **caller** is responsible for handling failures and
     436              :     /// ensuring that they eventually call again to ensure that the compute is eventually notified of
     437              :     /// the proper pageserver nodes for a tenant.
     438            0 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
     439              :     pub(super) async fn notify(
     440              :         &self,
     441              :         tenant_shard_id: TenantShardId,
     442              :         node_id: NodeId,
     443              :         stripe_size: ShardStripeSize,
     444              :         cancel: &CancellationToken,
     445              :     ) -> Result<(), NotifyError> {
     446              :         let maybe_send_result = {
     447              :             let mut state_locked = self.state.lock().unwrap();
     448              : 
     449              :             use std::collections::hash_map::Entry;
     450              :             let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
     451              :                 Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
     452              :                     tenant_shard_id,
     453              :                     stripe_size,
     454              :                     node_id,
     455              :                 )),
     456              :                 Entry::Occupied(e) => {
     457              :                     let tenant = e.into_mut();
     458              :                     tenant.update(tenant_shard_id, stripe_size, node_id);
     459              :                     tenant
     460              :                 }
     461              :             };
     462              :             tenant.maybe_send(tenant_shard_id.tenant_id, None)
     463              :         };
     464              : 
     465              :         // Process result: we may get an update to send, or we may have to wait for a lock
     466              :         // before trying again.
     467              :         let (request, mut send_lock_guard) = match maybe_send_result {
     468              :             MaybeSendResult::Noop => {
     469              :                 return Ok(());
     470              :             }
     471              :             MaybeSendResult::AwaitLock(send_lock) => {
     472              :                 let send_locked = send_lock.lock_owned().await;
     473              : 
     474              :                 // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
     475              :                 // we have acquired the send lock and take `[Self::state]` lock.  This is safe because maybe_send only uses
     476              :                 // try_lock.
     477              :                 let state_locked = self.state.lock().unwrap();
     478              :                 let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
     479              :                     return Ok(());
     480              :                 };
     481              :                 match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
     482              :                     MaybeSendResult::AwaitLock(_) => {
     483              :                         unreachable!("We supplied lock guard")
     484              :                     }
     485              :                     MaybeSendResult::Noop => {
     486              :                         return Ok(());
     487              :                     }
     488              :                     MaybeSendResult::Transmit((request, lock)) => (request, lock),
     489              :                 }
     490              :             }
     491              :             MaybeSendResult::Transmit((request, lock)) => (request, lock),
     492              :         };
     493              : 
     494              :         let result = if let Some(notify_url) = &self.config.compute_hook_url {
     495              :             self.do_notify(notify_url, &request, cancel).await
     496              :         } else {
     497            0 :             self.do_notify_local(&request).await.map_err(|e| {
     498            0 :                 // This path is for testing only, so munge the error into our prod-style error type.
     499            0 :                 tracing::error!("Local notification hook failed: {e}");
     500            0 :                 NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
     501            0 :             })
     502              :         };
     503              : 
     504              :         if result.is_ok() {
     505              :             // Before dropping the send lock, stash the request we just sent so that
     506              :             // subsequent callers can avoid redundantly re-sending the same thing.
     507              :             *send_lock_guard = Some(request);
     508              :         }
     509              :         result
     510              :     }
     511              : }
     512              : 
     513              : #[cfg(test)]
     514              : pub(crate) mod tests {
     515              :     use pageserver_api::shard::{ShardCount, ShardNumber};
     516              :     use utils::id::TenantId;
     517              : 
     518              :     use super::*;
     519              : 
     520              :     #[test]
     521            2 :     fn tenant_updates() -> anyhow::Result<()> {
     522            2 :         let tenant_id = TenantId::generate();
     523            2 :         let mut tenant_state = ComputeHookTenant::new(
     524            2 :             TenantShardId {
     525            2 :                 tenant_id,
     526            2 :                 shard_count: ShardCount::new(0),
     527            2 :                 shard_number: ShardNumber(0),
     528            2 :             },
     529            2 :             ShardStripeSize(12345),
     530            2 :             NodeId(1),
     531            2 :         );
     532            2 : 
     533            2 :         // An unsharded tenant is always ready to emit a notification, but won't
     534            2 :         // send the same one twice
     535            2 :         let send_result = tenant_state.maybe_send(tenant_id, None);
     536            2 :         let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
     537            0 :             anyhow::bail!("Wrong send result");
     538              :         };
     539            2 :         assert_eq!(request.shards.len(), 1);
     540            2 :         assert!(request.stripe_size.is_none());
     541              : 
     542              :         // Simulate successful send
     543            2 :         *guard = Some(request);
     544            2 :         drop(guard);
     545            2 : 
     546            2 :         // Try asking again: this should be a no-op
     547            2 :         let send_result = tenant_state.maybe_send(tenant_id, None);
     548            2 :         assert!(matches!(send_result, MaybeSendResult::Noop));
     549              : 
     550              :         // Writing the first shard of a multi-sharded situation (i.e. in a split)
     551              :         // resets the tenant state and puts it in an non-notifying state (need to
     552              :         // see all shards)
     553            2 :         tenant_state.update(
     554            2 :             TenantShardId {
     555            2 :                 tenant_id,
     556            2 :                 shard_count: ShardCount::new(2),
     557            2 :                 shard_number: ShardNumber(1),
     558            2 :             },
     559            2 :             ShardStripeSize(32768),
     560            2 :             NodeId(1),
     561            2 :         );
     562            2 :         assert!(matches!(
     563            2 :             tenant_state.maybe_send(tenant_id, None),
     564              :             MaybeSendResult::Noop
     565              :         ));
     566              : 
     567              :         // Writing the second shard makes it ready to notify
     568            2 :         tenant_state.update(
     569            2 :             TenantShardId {
     570            2 :                 tenant_id,
     571            2 :                 shard_count: ShardCount::new(2),
     572            2 :                 shard_number: ShardNumber(0),
     573            2 :             },
     574            2 :             ShardStripeSize(32768),
     575            2 :             NodeId(1),
     576            2 :         );
     577            2 : 
     578            2 :         let send_result = tenant_state.maybe_send(tenant_id, None);
     579            2 :         let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
     580            0 :             anyhow::bail!("Wrong send result");
     581              :         };
     582            2 :         assert_eq!(request.shards.len(), 2);
     583            2 :         assert_eq!(request.stripe_size, Some(ShardStripeSize(32768)));
     584              : 
     585              :         // Simulate successful send
     586            2 :         *guard = Some(request);
     587            2 :         drop(guard);
     588            2 : 
     589            2 :         Ok(())
     590            2 :     }
     591              : }
        

Generated by: LCOV version 2.1-beta