LCOV - code coverage report
Current view: top level - storage_controller/src - compute_hook.rs (source / functions) Coverage Total Hit
Test: 7eb96e224e685167ad85f58f858387d8cf253f63.info Lines: 34.7 % 406 141
Test Date: 2024-09-23 21:23:07 Functions: 14.5 % 55 8

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::{collections::HashMap, time::Duration};
       3              : 
       4              : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
       5              : use control_plane::local_env::LocalEnv;
       6              : use futures::StreamExt;
       7              : use hyper::StatusCode;
       8              : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
       9              : use postgres_connection::parse_host_port;
      10              : use serde::{Deserialize, Serialize};
      11              : use tokio_util::sync::CancellationToken;
      12              : use tracing::{info_span, Instrument};
      13              : use utils::{
      14              :     backoff::{self},
      15              :     id::{NodeId, TenantId},
      16              : };
      17              : 
      18              : use crate::service::Config;
      19              : 
      20              : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
      21              : 
      22              : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
      23              : 
      24              : pub(crate) const API_CONCURRENCY: usize = 32;
      25              : 
      26              : struct UnshardedComputeHookTenant {
      27              :     // Which node is this tenant attached to
      28              :     node_id: NodeId,
      29              : 
      30              :     // Must hold this lock to send a notification.
      31              :     send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
      32              : }
      33              : struct ShardedComputeHookTenant {
      34              :     stripe_size: ShardStripeSize,
      35              :     shard_count: ShardCount,
      36              :     shards: Vec<(ShardNumber, NodeId)>,
      37              : 
      38              :     // Must hold this lock to send a notification.  The contents represent
      39              :     // the last successfully sent notification, and are used to coalesce multiple
      40              :     // updates by only sending when there is a chance since our last successful send.
      41              :     send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
      42              : }
      43              : 
      44              : enum ComputeHookTenant {
      45              :     Unsharded(UnshardedComputeHookTenant),
      46              :     Sharded(ShardedComputeHookTenant),
      47              : }
      48              : 
      49              : impl ComputeHookTenant {
      50              :     /// Construct with at least one shard's information
      51            2 :     fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
      52            2 :         if tenant_shard_id.shard_count.count() > 1 {
      53            1 :             Self::Sharded(ShardedComputeHookTenant {
      54            1 :                 shards: vec![(tenant_shard_id.shard_number, node_id)],
      55            1 :                 stripe_size,
      56            1 :                 shard_count: tenant_shard_id.shard_count,
      57            1 :                 send_lock: Arc::default(),
      58            1 :             })
      59              :         } else {
      60            1 :             Self::Unsharded(UnshardedComputeHookTenant {
      61            1 :                 node_id,
      62            1 :                 send_lock: Arc::default(),
      63            1 :             })
      64              :         }
      65            2 :     }
      66              : 
      67            4 :     fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>> {
      68            4 :         match self {
      69            2 :             Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
      70            2 :             Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
      71              :         }
      72            4 :     }
      73              : 
      74            0 :     fn is_sharded(&self) -> bool {
      75            0 :         matches!(self, ComputeHookTenant::Sharded(_))
      76            0 :     }
      77              : 
      78              :     /// Clear compute hook state for the specified shard.
      79              :     /// Only valid for [`ComputeHookTenant::Sharded`] instances.
      80            0 :     fn remove_shard(&mut self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize) {
      81            0 :         match self {
      82            0 :             ComputeHookTenant::Sharded(sharded) => {
      83            0 :                 if sharded.stripe_size != stripe_size
      84            0 :                     || sharded.shard_count != tenant_shard_id.shard_count
      85              :                 {
      86            0 :                     tracing::warn!("Shard split detected while handling detach")
      87            0 :                 }
      88              : 
      89            0 :                 let shard_idx = sharded.shards.iter().position(|(shard_number, _node_id)| {
      90            0 :                     *shard_number == tenant_shard_id.shard_number
      91            0 :                 });
      92              : 
      93            0 :                 if let Some(shard_idx) = shard_idx {
      94            0 :                     sharded.shards.remove(shard_idx);
      95            0 :                 } else {
      96            0 :                     tracing::warn!("Shard not found while handling detach")
      97              :                 }
      98              :             }
      99              :             ComputeHookTenant::Unsharded(_) => {
     100            0 :                 unreachable!("Detach of unsharded tenants is handled externally");
     101              :             }
     102              :         }
     103            0 :     }
     104              : 
     105              :     /// Set one shard's location.  If stripe size or shard count have changed, Self is reset
     106              :     /// and drops existing content.
     107            2 :     fn update(
     108            2 :         &mut self,
     109            2 :         tenant_shard_id: TenantShardId,
     110            2 :         stripe_size: ShardStripeSize,
     111            2 :         node_id: NodeId,
     112            2 :     ) {
     113            1 :         match self {
     114            1 :             Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
     115            0 :                 unsharded_tenant.node_id = node_id
     116              :             }
     117            1 :             Self::Sharded(sharded_tenant)
     118            1 :                 if sharded_tenant.stripe_size == stripe_size
     119            1 :                     && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
     120              :             {
     121            1 :                 if let Some(existing) = sharded_tenant
     122            1 :                     .shards
     123            1 :                     .iter()
     124            1 :                     .position(|s| s.0 == tenant_shard_id.shard_number)
     125            0 :                 {
     126            0 :                     sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
     127            0 :                 } else {
     128            1 :                     sharded_tenant
     129            1 :                         .shards
     130            1 :                         .push((tenant_shard_id.shard_number, node_id));
     131            2 :                     sharded_tenant.shards.sort_by_key(|s| s.0)
     132              :                 }
     133              :             }
     134            1 :             _ => {
     135            1 :                 // Shard count changed: reset struct.
     136            1 :                 *self = Self::new(tenant_shard_id, stripe_size, node_id);
     137            1 :             }
     138              :         }
     139            2 :     }
     140              : }
     141              : 
     142            0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
     143              : struct ComputeHookNotifyRequestShard {
     144              :     node_id: NodeId,
     145              :     shard_number: ShardNumber,
     146              : }
     147              : 
     148              : /// Request body that we send to the control plane to notify it of where a tenant is attached
     149            0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
     150              : struct ComputeHookNotifyRequest {
     151              :     tenant_id: TenantId,
     152              :     stripe_size: Option<ShardStripeSize>,
     153              :     shards: Vec<ComputeHookNotifyRequestShard>,
     154              : }
     155              : 
     156              : /// Error type for attempts to call into the control plane compute notification hook
     157            0 : #[derive(thiserror::Error, Debug)]
     158              : pub(crate) enum NotifyError {
     159              :     // Request was not send successfully, e.g. transport error
     160              :     #[error("Sending request: {0}")]
     161              :     Request(#[from] reqwest::Error),
     162              :     // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
     163              :     #[error("Control plane tenant busy")]
     164              :     Busy,
     165              :     // Explicit 429 response asking us to retry less frequently
     166              :     #[error("Control plane overloaded")]
     167              :     SlowDown,
     168              :     // A 503 response indicates the control plane can't handle the request right now
     169              :     #[error("Control plane unavailable (status {0})")]
     170              :     Unavailable(StatusCode),
     171              :     // API returned unexpected non-success status.  We will retry, but log a warning.
     172              :     #[error("Control plane returned unexpected status {0}")]
     173              :     Unexpected(StatusCode),
     174              :     // We shutdown while sending
     175              :     #[error("Shutting down")]
     176              :     ShuttingDown,
     177              :     // A response indicates we will never succeed, such as 400 or 404
     178              :     #[error("Non-retryable error {0}")]
     179              :     Fatal(StatusCode),
     180              : 
     181              :     #[error("neon_local error: {0}")]
     182              :     NeonLocal(anyhow::Error),
     183              : }
     184              : 
     185              : enum MaybeSendResult {
     186              :     // Please send this request while holding the lock, and if you succeed then write
     187              :     // the request into the lock.
     188              :     Transmit(
     189              :         (
     190              :             ComputeHookNotifyRequest,
     191              :             tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>,
     192              :         ),
     193              :     ),
     194              :     // Something requires sending, but you must wait for a current sender then call again
     195              :     AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>),
     196              :     // Nothing requires sending
     197              :     Noop,
     198              : }
     199              : 
     200              : impl ComputeHookTenant {
     201            4 :     fn maybe_send(
     202            4 :         &self,
     203            4 :         tenant_id: TenantId,
     204            4 :         lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>>,
     205            4 :     ) -> MaybeSendResult {
     206            4 :         let locked = match lock {
     207            0 :             Some(already_locked) => already_locked,
     208              :             None => {
     209              :                 // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
     210            4 :                 let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
     211            0 :                     return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
     212              :                 };
     213            4 :                 locked
     214              :             }
     215              :         };
     216              : 
     217            4 :         let request = match self {
     218            2 :             Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
     219            2 :                 tenant_id,
     220            2 :                 shards: vec![ComputeHookNotifyRequestShard {
     221            2 :                     shard_number: ShardNumber(0),
     222            2 :                     node_id: unsharded_tenant.node_id,
     223            2 :                 }],
     224            2 :                 stripe_size: None,
     225            2 :             }),
     226            2 :             Self::Sharded(sharded_tenant)
     227            2 :                 if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
     228            1 :             {
     229            1 :                 Some(ComputeHookNotifyRequest {
     230            1 :                     tenant_id,
     231            1 :                     shards: sharded_tenant
     232            1 :                         .shards
     233            1 :                         .iter()
     234            2 :                         .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
     235            2 :                             shard_number: *shard_number,
     236            2 :                             node_id: *node_id,
     237            2 :                         })
     238            1 :                         .collect(),
     239            1 :                     stripe_size: Some(sharded_tenant.stripe_size),
     240            1 :                 })
     241              :             }
     242            1 :             Self::Sharded(sharded_tenant) => {
     243            1 :                 // Sharded tenant doesn't yet have information for all its shards
     244            1 : 
     245            1 :                 tracing::info!(
     246            0 :                     "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
     247            0 :                     sharded_tenant.shards.len(),
     248            0 :                     sharded_tenant.shard_count.count()
     249              :                 );
     250            1 :                 None
     251              :             }
     252              :         };
     253              : 
     254            3 :         match request {
     255              :             None => {
     256              :                 // Not yet ready to emit a notification
     257            1 :                 tracing::info!("Tenant isn't yet ready to emit a notification");
     258            1 :                 MaybeSendResult::Noop
     259              :             }
     260            3 :             Some(request) if Some(&request) == locked.as_ref() => {
     261            1 :                 // No change from the last value successfully sent
     262            1 :                 MaybeSendResult::Noop
     263              :             }
     264            2 :             Some(request) => MaybeSendResult::Transmit((request, locked)),
     265              :         }
     266            4 :     }
     267              : }
     268              : 
     269              : /// The compute hook is a destination for notifications about changes to tenant:pageserver
     270              : /// mapping.  It aggregates updates for the shards in a tenant, and when appropriate reconfigures
     271              : /// the compute connection string.
     272              : pub(super) struct ComputeHook {
     273              :     config: Config,
     274              :     state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
     275              :     authorization_header: Option<String>,
     276              : 
     277              :     // Concurrency limiter, so that we do not overload the cloud control plane when updating
     278              :     // large numbers of tenants (e.g. when failing over after a node failure)
     279              :     api_concurrency: tokio::sync::Semaphore,
     280              : 
     281              :     // This lock is only used in testing enviroments, to serialize calls into neon_lock
     282              :     neon_local_lock: tokio::sync::Mutex<()>,
     283              : 
     284              :     // We share a client across all notifications to enable connection re-use etc when
     285              :     // sending large numbers of notifications
     286              :     client: reqwest::Client,
     287              : }
     288              : 
     289              : impl ComputeHook {
     290            0 :     pub(super) fn new(config: Config) -> Self {
     291            0 :         let authorization_header = config
     292            0 :             .control_plane_jwt_token
     293            0 :             .clone()
     294            0 :             .map(|jwt| format!("Bearer {}", jwt));
     295            0 : 
     296            0 :         let client = reqwest::ClientBuilder::new()
     297            0 :             .timeout(NOTIFY_REQUEST_TIMEOUT)
     298            0 :             .build()
     299            0 :             .expect("Failed to construct HTTP client");
     300            0 : 
     301            0 :         Self {
     302            0 :             state: Default::default(),
     303            0 :             config,
     304            0 :             authorization_header,
     305            0 :             neon_local_lock: Default::default(),
     306            0 :             api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
     307            0 :             client,
     308            0 :         }
     309            0 :     }
     310              : 
     311              :     /// For test environments: use neon_local's LocalEnv to update compute
     312            0 :     async fn do_notify_local(
     313            0 :         &self,
     314            0 :         reconfigure_request: &ComputeHookNotifyRequest,
     315            0 :     ) -> Result<(), NotifyError> {
     316              :         // neon_local updates are not safe to call concurrently, use a lock to serialize
     317              :         // all calls to this function
     318            0 :         let _locked = self.neon_local_lock.lock().await;
     319              : 
     320            0 :         let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
     321            0 :             tracing::warn!(
     322            0 :                 "neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
     323              :             );
     324            0 :             return Ok(());
     325              :         };
     326            0 :         let env = match LocalEnv::load_config(repo_dir) {
     327            0 :             Ok(e) => e,
     328            0 :             Err(e) => {
     329            0 :                 tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
     330            0 :                 return Ok(());
     331              :             }
     332              :         };
     333            0 :         let cplane =
     334            0 :             ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
     335            0 :         let ComputeHookNotifyRequest {
     336            0 :             tenant_id,
     337            0 :             shards,
     338            0 :             stripe_size,
     339            0 :         } = reconfigure_request;
     340            0 : 
     341            0 :         let compute_pageservers = shards
     342            0 :             .iter()
     343            0 :             .map(|shard| {
     344            0 :                 let ps_conf = env
     345            0 :                     .get_pageserver_conf(shard.node_id)
     346            0 :                     .expect("Unknown pageserver");
     347            0 :                 let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
     348            0 :                     .expect("Unable to parse listen_pg_addr");
     349            0 :                 (pg_host, pg_port.unwrap_or(5432))
     350            0 :             })
     351            0 :             .collect::<Vec<_>>();
     352              : 
     353            0 :         for (endpoint_name, endpoint) in &cplane.endpoints {
     354            0 :             if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
     355            0 :                 tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
     356            0 :                 endpoint
     357            0 :                     .reconfigure(compute_pageservers.clone(), *stripe_size, None)
     358            0 :                     .await
     359            0 :                     .map_err(NotifyError::NeonLocal)?;
     360            0 :             }
     361              :         }
     362              : 
     363            0 :         Ok(())
     364            0 :     }
     365              : 
     366            0 :     async fn do_notify_iteration(
     367            0 :         &self,
     368            0 :         url: &String,
     369            0 :         reconfigure_request: &ComputeHookNotifyRequest,
     370            0 :         cancel: &CancellationToken,
     371            0 :     ) -> Result<(), NotifyError> {
     372            0 :         let req = self.client.request(reqwest::Method::PUT, url);
     373            0 :         let req = if let Some(value) = &self.authorization_header {
     374            0 :             req.header(reqwest::header::AUTHORIZATION, value)
     375              :         } else {
     376            0 :             req
     377              :         };
     378              : 
     379            0 :         tracing::info!(
     380            0 :             "Sending notify request to {} ({:?})",
     381              :             url,
     382              :             reconfigure_request
     383              :         );
     384            0 :         let send_result = req.json(&reconfigure_request).send().await;
     385            0 :         let response = match send_result {
     386            0 :             Ok(r) => r,
     387            0 :             Err(e) => return Err(e.into()),
     388              :         };
     389              : 
     390              :         // Treat all 2xx responses as success
     391            0 :         if response.status() >= reqwest::StatusCode::OK
     392            0 :             && response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
     393              :         {
     394            0 :             if response.status() != reqwest::StatusCode::OK {
     395              :                 // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
     396              :                 // log a warning.
     397            0 :                 tracing::warn!(
     398            0 :                     "Unexpected 2xx response code {} from control plane",
     399            0 :                     response.status()
     400              :                 );
     401            0 :             }
     402              : 
     403            0 :             return Ok(());
     404            0 :         }
     405            0 : 
     406            0 :         // Error response codes
     407            0 :         match response.status() {
     408              :             reqwest::StatusCode::TOO_MANY_REQUESTS => {
     409              :                 // TODO: 429 handling should be global: set some state visible to other requests
     410              :                 // so that they will delay before starting, rather than all notifications trying
     411              :                 // once before backing off.
     412            0 :                 tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
     413            0 :                     .await
     414            0 :                     .ok();
     415            0 :                 Err(NotifyError::SlowDown)
     416              :             }
     417              :             reqwest::StatusCode::LOCKED => {
     418              :                 // We consider this fatal, because it's possible that the operation blocking the control one is
     419              :                 // also the one that is waiting for this reconcile.  We should let the reconciler calling
     420              :                 // this hook fail, to give control plane a chance to un-lock.
     421            0 :                 tracing::info!("Control plane reports tenant is locked, dropping out of notify");
     422            0 :                 Err(NotifyError::Busy)
     423              :             }
     424              :             reqwest::StatusCode::SERVICE_UNAVAILABLE => {
     425            0 :                 Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
     426              :             }
     427              :             reqwest::StatusCode::GATEWAY_TIMEOUT => {
     428            0 :                 Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
     429              :             }
     430              :             reqwest::StatusCode::BAD_GATEWAY => {
     431            0 :                 Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
     432              :             }
     433              : 
     434            0 :             reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
     435            0 :             reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
     436            0 :             reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
     437            0 :             status => Err(NotifyError::Unexpected(
     438            0 :                 hyper::StatusCode::from_u16(status.as_u16())
     439            0 :                     .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
     440            0 :             )),
     441              :         }
     442            0 :     }
     443              : 
     444            0 :     async fn do_notify(
     445            0 :         &self,
     446            0 :         url: &String,
     447            0 :         reconfigure_request: &ComputeHookNotifyRequest,
     448            0 :         cancel: &CancellationToken,
     449            0 :     ) -> Result<(), NotifyError> {
     450              :         // We hold these semaphore units across all retries, rather than only across each
     451              :         // HTTP request: this is to preserve fairness and avoid a situation where a retry might
     452              :         // time out waiting for a semaphore.
     453            0 :         let _units = self
     454            0 :             .api_concurrency
     455            0 :             .acquire()
     456            0 :             .await
     457              :             // Interpret closed semaphore as shutdown
     458            0 :             .map_err(|_| NotifyError::ShuttingDown)?;
     459              : 
     460            0 :         backoff::retry(
     461            0 :             || self.do_notify_iteration(url, reconfigure_request, cancel),
     462            0 :             |e| {
     463            0 :                 matches!(
     464            0 :                     e,
     465              :                     NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
     466              :                 )
     467            0 :             },
     468            0 :             3,
     469            0 :             10,
     470            0 :             "Send compute notification",
     471            0 :             cancel,
     472            0 :         )
     473            0 :         .await
     474            0 :         .ok_or_else(|| NotifyError::ShuttingDown)
     475            0 :         .and_then(|x| x)
     476            0 :     }
     477              : 
     478              :     /// Synchronous phase: update the per-tenant state for the next intended notification
     479            0 :     fn notify_prepare(
     480            0 :         &self,
     481            0 :         tenant_shard_id: TenantShardId,
     482            0 :         node_id: NodeId,
     483            0 :         stripe_size: ShardStripeSize,
     484            0 :     ) -> MaybeSendResult {
     485            0 :         let mut state_locked = self.state.lock().unwrap();
     486              : 
     487              :         use std::collections::hash_map::Entry;
     488            0 :         let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
     489            0 :             Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
     490            0 :                 tenant_shard_id,
     491            0 :                 stripe_size,
     492            0 :                 node_id,
     493            0 :             )),
     494            0 :             Entry::Occupied(e) => {
     495            0 :                 let tenant = e.into_mut();
     496            0 :                 tenant.update(tenant_shard_id, stripe_size, node_id);
     497            0 :                 tenant
     498              :             }
     499              :         };
     500            0 :         tenant.maybe_send(tenant_shard_id.tenant_id, None)
     501            0 :     }
     502              : 
     503            0 :     async fn notify_execute(
     504            0 :         &self,
     505            0 :         maybe_send_result: MaybeSendResult,
     506            0 :         tenant_shard_id: TenantShardId,
     507            0 :         cancel: &CancellationToken,
     508            0 :     ) -> Result<(), NotifyError> {
     509              :         // Process result: we may get an update to send, or we may have to wait for a lock
     510              :         // before trying again.
     511            0 :         let (request, mut send_lock_guard) = match maybe_send_result {
     512              :             MaybeSendResult::Noop => {
     513            0 :                 return Ok(());
     514              :             }
     515            0 :             MaybeSendResult::AwaitLock(send_lock) => {
     516            0 :                 let send_locked = tokio::select! {
     517            0 :                     guard = send_lock.lock_owned() => {guard},
     518            0 :                     _ = cancel.cancelled() => {
     519            0 :                         return Err(NotifyError::ShuttingDown)
     520              :                     }
     521              :                 };
     522              : 
     523              :                 // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
     524              :                 // we have acquired the send lock and take `[Self::state]` lock.  This is safe because maybe_send only uses
     525              :                 // try_lock.
     526            0 :                 let state_locked = self.state.lock().unwrap();
     527            0 :                 let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
     528            0 :                     return Ok(());
     529              :                 };
     530            0 :                 match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
     531              :                     MaybeSendResult::AwaitLock(_) => {
     532            0 :                         unreachable!("We supplied lock guard")
     533              :                     }
     534              :                     MaybeSendResult::Noop => {
     535            0 :                         return Ok(());
     536              :                     }
     537            0 :                     MaybeSendResult::Transmit((request, lock)) => (request, lock),
     538              :                 }
     539              :             }
     540            0 :             MaybeSendResult::Transmit((request, lock)) => (request, lock),
     541              :         };
     542              : 
     543            0 :         let result = if let Some(notify_url) = &self.config.compute_hook_url {
     544            0 :             self.do_notify(notify_url, &request, cancel).await
     545              :         } else {
     546            0 :             self.do_notify_local(&request).await.map_err(|e| {
     547            0 :                 // This path is for testing only, so munge the error into our prod-style error type.
     548            0 :                 tracing::error!("neon_local notification hook failed: {e}");
     549            0 :                 NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
     550            0 :             })
     551              :         };
     552              : 
     553            0 :         if result.is_ok() {
     554            0 :             // Before dropping the send lock, stash the request we just sent so that
     555            0 :             // subsequent callers can avoid redundantly re-sending the same thing.
     556            0 :             *send_lock_guard = Some(request);
     557            0 :         }
     558            0 :         result
     559            0 :     }
     560              : 
     561              :     /// Infallible synchronous fire-and-forget version of notify(), that sends its results to
     562              :     /// a channel.  Something should consume the channel and arrange to try notifying again
     563              :     /// if something failed.
     564            0 :     pub(super) fn notify_background(
     565            0 :         self: &Arc<Self>,
     566            0 :         notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
     567            0 :         result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
     568            0 :         cancel: &CancellationToken,
     569            0 :     ) {
     570            0 :         let mut maybe_sends = Vec::new();
     571            0 :         for (tenant_shard_id, node_id, stripe_size) in notifications {
     572            0 :             let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
     573            0 :             maybe_sends.push((tenant_shard_id, maybe_send_result))
     574              :         }
     575              : 
     576            0 :         let this = self.clone();
     577            0 :         let cancel = cancel.clone();
     578            0 : 
     579            0 :         tokio::task::spawn(async move {
     580            0 :             // Construct an async stream of futures to invoke the compute notify function: we do this
     581            0 :             // in order to subsequently use .buffered() on the stream to execute with bounded parallelism.  The
     582            0 :             // ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
     583            0 :             // would mostly just be waiting on that semaphore.
     584            0 :             let mut stream = futures::stream::iter(maybe_sends)
     585            0 :                 .map(|(tenant_shard_id, maybe_send_result)| {
     586            0 :                     let this = this.clone();
     587            0 :                     let cancel = cancel.clone();
     588              : 
     589            0 :                     async move {
     590            0 :                         this
     591            0 :                             .notify_execute(maybe_send_result, tenant_shard_id, &cancel)
     592            0 :                             .await.map_err(|e| (tenant_shard_id, e))
     593            0 :                     }.instrument(info_span!(
     594            0 :                         "notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
     595              :                     ))
     596            0 :                 })
     597            0 :                 .buffered(API_CONCURRENCY);
     598              : 
     599              :             loop {
     600            0 :                 tokio::select! {
     601            0 :                     next = stream.next() => {
     602            0 :                         match next {
     603            0 :                             Some(r) => {
     604            0 :                                 result_tx.send(r).await.ok();
     605              :                             },
     606              :                             None => {
     607            0 :                                 tracing::info!("Finished sending background compute notifications");
     608            0 :                                 break;
     609              :                             }
     610              :                         }
     611              :                     },
     612            0 :                     _ = cancel.cancelled() => {
     613            0 :                         tracing::info!("Shutdown while running background compute notifications");
     614            0 :                         break;
     615              :                     }
     616              :                 };
     617              :             }
     618            0 :         });
     619            0 :     }
     620              : 
     621              :     /// Call this to notify the compute (postgres) tier of new pageservers to use
     622              :     /// for a tenant.  notify() is called by each shard individually, and this function
     623              :     /// will decide whether an update to the tenant is sent.  An update is sent on the
     624              :     /// condition that:
     625              :     /// - We know a pageserver for every shard.
     626              :     /// - All the shards have the same shard_count (i.e. we are not mid-split)
     627              :     ///
     628              :     /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
     629              :     /// that is cancelled.
     630              :     ///
     631              :     /// This function is fallible, including in the case that the control plane is transiently
     632              :     /// unavailable.  A limited number of retries are done internally to efficiently hide short unavailability
     633              :     /// periods, but we don't retry forever.  The **caller** is responsible for handling failures and
     634              :     /// ensuring that they eventually call again to ensure that the compute is eventually notified of
     635              :     /// the proper pageserver nodes for a tenant.
     636            0 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
     637              :     pub(super) async fn notify(
     638              :         &self,
     639              :         tenant_shard_id: TenantShardId,
     640              :         node_id: NodeId,
     641              :         stripe_size: ShardStripeSize,
     642              :         cancel: &CancellationToken,
     643              :     ) -> Result<(), NotifyError> {
     644              :         let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
     645              :         self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
     646              :             .await
     647              :     }
     648              : 
     649              :     /// Reflect a detach for a particular shard in the compute hook state.
     650              :     ///
     651              :     /// The goal is to avoid sending compute notifications with stale information (i.e.
     652              :     /// including detach pageservers).
     653            0 :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
     654              :     pub(super) fn handle_detach(
     655              :         &self,
     656              :         tenant_shard_id: TenantShardId,
     657              :         stripe_size: ShardStripeSize,
     658              :     ) {
     659              :         use std::collections::hash_map::Entry;
     660              : 
     661              :         let mut state_locked = self.state.lock().unwrap();
     662              :         match state_locked.entry(tenant_shard_id.tenant_id) {
     663              :             Entry::Vacant(_) => {
     664              :                 tracing::warn!("Compute hook tenant not found for detach");
     665              :             }
     666              :             Entry::Occupied(mut e) => {
     667              :                 let sharded = e.get().is_sharded();
     668              :                 if !sharded {
     669              :                     e.remove();
     670              :                 } else {
     671              :                     e.get_mut().remove_shard(tenant_shard_id, stripe_size);
     672              :                 }
     673              : 
     674              :                 tracing::debug!("Compute hook handled shard detach");
     675              :             }
     676              :         }
     677              :     }
     678              : }
     679              : 
     680              : #[cfg(test)]
     681              : pub(crate) mod tests {
     682              :     use pageserver_api::shard::{ShardCount, ShardNumber};
     683              :     use utils::id::TenantId;
     684              : 
     685              :     use super::*;
     686              : 
     687              :     #[test]
     688            1 :     fn tenant_updates() -> anyhow::Result<()> {
     689            1 :         let tenant_id = TenantId::generate();
     690            1 :         let mut tenant_state = ComputeHookTenant::new(
     691            1 :             TenantShardId {
     692            1 :                 tenant_id,
     693            1 :                 shard_count: ShardCount::new(0),
     694            1 :                 shard_number: ShardNumber(0),
     695            1 :             },
     696            1 :             ShardStripeSize(12345),
     697            1 :             NodeId(1),
     698            1 :         );
     699            1 : 
     700            1 :         // An unsharded tenant is always ready to emit a notification, but won't
     701            1 :         // send the same one twice
     702            1 :         let send_result = tenant_state.maybe_send(tenant_id, None);
     703            1 :         let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
     704            0 :             anyhow::bail!("Wrong send result");
     705              :         };
     706            1 :         assert_eq!(request.shards.len(), 1);
     707            1 :         assert!(request.stripe_size.is_none());
     708              : 
     709              :         // Simulate successful send
     710            1 :         *guard = Some(request);
     711            1 :         drop(guard);
     712            1 : 
     713            1 :         // Try asking again: this should be a no-op
     714            1 :         let send_result = tenant_state.maybe_send(tenant_id, None);
     715            1 :         assert!(matches!(send_result, MaybeSendResult::Noop));
     716              : 
     717              :         // Writing the first shard of a multi-sharded situation (i.e. in a split)
     718              :         // resets the tenant state and puts it in an non-notifying state (need to
     719              :         // see all shards)
     720            1 :         tenant_state.update(
     721            1 :             TenantShardId {
     722            1 :                 tenant_id,
     723            1 :                 shard_count: ShardCount::new(2),
     724            1 :                 shard_number: ShardNumber(1),
     725            1 :             },
     726            1 :             ShardStripeSize(32768),
     727            1 :             NodeId(1),
     728            1 :         );
     729            1 :         assert!(matches!(
     730            1 :             tenant_state.maybe_send(tenant_id, None),
     731              :             MaybeSendResult::Noop
     732              :         ));
     733              : 
     734              :         // Writing the second shard makes it ready to notify
     735            1 :         tenant_state.update(
     736            1 :             TenantShardId {
     737            1 :                 tenant_id,
     738            1 :                 shard_count: ShardCount::new(2),
     739            1 :                 shard_number: ShardNumber(0),
     740            1 :             },
     741            1 :             ShardStripeSize(32768),
     742            1 :             NodeId(1),
     743            1 :         );
     744            1 : 
     745            1 :         let send_result = tenant_state.maybe_send(tenant_id, None);
     746            1 :         let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
     747            0 :             anyhow::bail!("Wrong send result");
     748              :         };
     749            1 :         assert_eq!(request.shards.len(), 2);
     750            1 :         assert_eq!(request.stripe_size, Some(ShardStripeSize(32768)));
     751              : 
     752              :         // Simulate successful send
     753            1 :         *guard = Some(request);
     754            1 :         drop(guard);
     755            1 : 
     756            1 :         Ok(())
     757            1 :     }
     758              : }
        

Generated by: LCOV version 2.1-beta