LCOV - code coverage report
Current view: top level - storage_controller/src - compute_hook.rs (source / functions) Coverage Total Hit
Test: 98683a8629f0f7f0031d02e04512998d589d76ea.info Lines: 36.6 % 478 175
Test Date: 2025-04-11 16:58:57 Functions: 18.2 % 55 10

            Line data    Source code
       1              : use std::borrow::Cow;
       2              : use std::collections::HashMap;
       3              : use std::error::Error as _;
       4              : use std::sync::Arc;
       5              : use std::time::Duration;
       6              : 
       7              : use anyhow::Context;
       8              : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
       9              : use control_plane::local_env::LocalEnv;
      10              : use futures::StreamExt;
      11              : use hyper::StatusCode;
      12              : use pageserver_api::controller_api::AvailabilityZone;
      13              : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
      14              : use postgres_connection::parse_host_port;
      15              : use serde::{Deserialize, Serialize};
      16              : use tokio_util::sync::CancellationToken;
      17              : use tracing::{Instrument, info_span};
      18              : use utils::backoff::{self};
      19              : use utils::id::{NodeId, TenantId};
      20              : 
      21              : use crate::service::Config;
      22              : 
      23              : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
      24              : 
      25              : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
      26              : 
      27              : pub(crate) const API_CONCURRENCY: usize = 32;
      28              : 
      29              : struct UnshardedComputeHookTenant {
      30              :     // Which node is this tenant attached to
      31              :     node_id: NodeId,
      32              : 
      33              :     // The tenant's preferred AZ, so that we may pass this on to the control plane
      34              :     preferred_az: Option<AvailabilityZone>,
      35              : 
      36              :     // Must hold this lock to send a notification.
      37              :     send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
      38              : }
      39              : struct ShardedComputeHookTenant {
      40              :     stripe_size: ShardStripeSize,
      41              :     shard_count: ShardCount,
      42              :     shards: Vec<(ShardNumber, NodeId)>,
      43              : 
      44              :     // The tenant's preferred AZ, so that we may pass this on to the control plane
      45              :     preferred_az: Option<AvailabilityZone>,
      46              : 
      47              :     // Must hold this lock to send a notification.  The contents represent
      48              :     // the last successfully sent notification, and are used to coalesce multiple
      49              :     // updates by only sending when there is a chance since our last successful send.
      50              :     send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
      51              : }
      52              : 
      53              : /// Represents our knowledge of the compute's state: we can update this when we get a
      54              : /// response from a notify API call, which tells us what has been applied.
      55              : ///
      56              : /// Should be wrapped in an Option<>, as we cannot always know the remote state.
      57              : #[derive(PartialEq, Eq, Debug)]
      58              : struct ComputeRemoteState {
      59              :     // The request body which was acked by the compute
      60              :     request: ComputeHookNotifyRequest,
      61              : 
      62              :     // Whether the cplane indicated that the state was applied to running computes, or just
      63              :     // persisted.  In the Neon control plane, this is the difference between a 423 response (meaning
      64              :     // persisted but not applied), and a 2xx response (both persisted and applied)
      65              :     applied: bool,
      66              : }
      67              : 
      68              : enum ComputeHookTenant {
      69              :     Unsharded(UnshardedComputeHookTenant),
      70              :     Sharded(ShardedComputeHookTenant),
      71              : }
      72              : 
      73              : impl ComputeHookTenant {
      74              :     /// Construct with at least one shard's information
      75            2 :     fn new(
      76            2 :         tenant_shard_id: TenantShardId,
      77            2 :         stripe_size: ShardStripeSize,
      78            2 :         preferred_az: Option<AvailabilityZone>,
      79            2 :         node_id: NodeId,
      80            2 :     ) -> Self {
      81            2 :         if tenant_shard_id.shard_count.count() > 1 {
      82            1 :             Self::Sharded(ShardedComputeHookTenant {
      83            1 :                 shards: vec![(tenant_shard_id.shard_number, node_id)],
      84            1 :                 stripe_size,
      85            1 :                 shard_count: tenant_shard_id.shard_count,
      86            1 :                 preferred_az,
      87            1 :                 send_lock: Arc::default(),
      88            1 :             })
      89              :         } else {
      90            1 :             Self::Unsharded(UnshardedComputeHookTenant {
      91            1 :                 node_id,
      92            1 :                 preferred_az,
      93            1 :                 send_lock: Arc::default(),
      94            1 :             })
      95              :         }
      96            2 :     }
      97              : 
      98            4 :     fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>> {
      99            4 :         match self {
     100            2 :             Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
     101            2 :             Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
     102              :         }
     103            4 :     }
     104              : 
     105            0 :     fn is_sharded(&self) -> bool {
     106            0 :         matches!(self, ComputeHookTenant::Sharded(_))
     107            0 :     }
     108              : 
     109              :     /// Clear compute hook state for the specified shard.
     110              :     /// Only valid for [`ComputeHookTenant::Sharded`] instances.
     111            0 :     fn remove_shard(&mut self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize) {
     112            0 :         match self {
     113            0 :             ComputeHookTenant::Sharded(sharded) => {
     114            0 :                 if sharded.stripe_size != stripe_size
     115            0 :                     || sharded.shard_count != tenant_shard_id.shard_count
     116              :                 {
     117            0 :                     tracing::warn!("Shard split detected while handling detach")
     118            0 :                 }
     119              : 
     120            0 :                 let shard_idx = sharded.shards.iter().position(|(shard_number, _node_id)| {
     121            0 :                     *shard_number == tenant_shard_id.shard_number
     122            0 :                 });
     123              : 
     124            0 :                 if let Some(shard_idx) = shard_idx {
     125            0 :                     sharded.shards.remove(shard_idx);
     126            0 :                 } else {
     127              :                     // This is a valid but niche case, where the tenant was previously attached
     128              :                     // as a Secondary location and then detached, so has no previously notified
     129              :                     // state.
     130            0 :                     tracing::info!("Shard not found while handling detach")
     131              :                 }
     132              :             }
     133              :             ComputeHookTenant::Unsharded(_) => {
     134            0 :                 unreachable!("Detach of unsharded tenants is handled externally");
     135              :             }
     136              :         }
     137            0 :     }
     138              : 
     139              :     /// Set one shard's location.  If stripe size or shard count have changed, Self is reset
     140              :     /// and drops existing content.
     141            2 :     fn update(&mut self, shard_update: ShardUpdate) {
     142            2 :         let tenant_shard_id = shard_update.tenant_shard_id;
     143            2 :         let node_id = shard_update.node_id;
     144            2 :         let stripe_size = shard_update.stripe_size;
     145            2 :         let preferred_az = shard_update.preferred_az;
     146              : 
     147            1 :         match self {
     148            1 :             Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
     149            0 :                 unsharded_tenant.node_id = node_id;
     150            0 :                 if unsharded_tenant.preferred_az.as_ref()
     151            0 :                     != preferred_az.as_ref().map(|az| az.as_ref())
     152            0 :                 {
     153            0 :                     unsharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
     154            0 :                 }
     155              :             }
     156            1 :             Self::Sharded(sharded_tenant)
     157            1 :                 if sharded_tenant.stripe_size == stripe_size
     158            1 :                     && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
     159              :             {
     160            1 :                 if let Some(existing) = sharded_tenant
     161            1 :                     .shards
     162            1 :                     .iter()
     163            1 :                     .position(|s| s.0 == tenant_shard_id.shard_number)
     164            0 :                 {
     165            0 :                     sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
     166            0 :                 } else {
     167            1 :                     sharded_tenant
     168            1 :                         .shards
     169            1 :                         .push((tenant_shard_id.shard_number, node_id));
     170            2 :                     sharded_tenant.shards.sort_by_key(|s| s.0)
     171              :                 }
     172              : 
     173            1 :                 if sharded_tenant.preferred_az.as_ref()
     174            1 :                     != preferred_az.as_ref().map(|az| az.as_ref())
     175            0 :                 {
     176            0 :                     sharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
     177            1 :                 }
     178              :             }
     179            1 :             _ => {
     180            1 :                 // Shard count changed: reset struct.
     181            1 :                 *self = Self::new(
     182            1 :                     tenant_shard_id,
     183            1 :                     stripe_size,
     184            1 :                     preferred_az.map(|az| az.into_owned()),
     185            1 :                     node_id,
     186            1 :                 );
     187            1 :             }
     188              :         }
     189            2 :     }
     190              : }
     191              : 
     192            0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
     193              : struct ComputeHookNotifyRequestShard {
     194              :     node_id: NodeId,
     195              :     shard_number: ShardNumber,
     196              : }
     197              : 
     198              : /// Request body that we send to the control plane to notify it of where a tenant is attached
     199            0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
     200              : struct ComputeHookNotifyRequest {
     201              :     tenant_id: TenantId,
     202              :     preferred_az: Option<String>,
     203              :     stripe_size: Option<ShardStripeSize>,
     204              :     shards: Vec<ComputeHookNotifyRequestShard>,
     205              : }
     206              : 
     207              : /// Error type for attempts to call into the control plane compute notification hook
     208              : #[derive(thiserror::Error, Debug)]
     209              : pub(crate) enum NotifyError {
     210              :     // Request was not send successfully, e.g. transport error
     211            0 :     #[error("Sending request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
     212              :     Request(#[from] reqwest::Error),
     213              :     // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
     214              :     #[error("Control plane tenant busy")]
     215              :     Busy,
     216              :     // Explicit 429 response asking us to retry less frequently
     217              :     #[error("Control plane overloaded")]
     218              :     SlowDown,
     219              :     // A 503 response indicates the control plane can't handle the request right now
     220              :     #[error("Control plane unavailable (status {0})")]
     221              :     Unavailable(StatusCode),
     222              :     // API returned unexpected non-success status.  We will retry, but log a warning.
     223              :     #[error("Control plane returned unexpected status {0}")]
     224              :     Unexpected(StatusCode),
     225              :     // We shutdown while sending
     226              :     #[error("Shutting down")]
     227              :     ShuttingDown,
     228              :     // A response indicates we will never succeed, such as 400 or 403
     229              :     #[error("Non-retryable error {0}")]
     230              :     Fatal(StatusCode),
     231              : 
     232              :     #[error("neon_local error: {0}")]
     233              :     NeonLocal(anyhow::Error),
     234              : }
     235              : 
     236              : enum MaybeSendResult {
     237              :     // Please send this request while holding the lock, and if you succeed then write
     238              :     // the request into the lock.
     239              :     Transmit(
     240              :         (
     241              :             ComputeHookNotifyRequest,
     242              :             tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>,
     243              :         ),
     244              :     ),
     245              :     // Something requires sending, but you must wait for a current sender then call again
     246              :     AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>),
     247              :     // Nothing requires sending
     248              :     Noop,
     249              : }
     250              : 
     251              : impl ComputeHookTenant {
     252            4 :     fn maybe_send(
     253            4 :         &self,
     254            4 :         tenant_id: TenantId,
     255            4 :         lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>>,
     256            4 :     ) -> MaybeSendResult {
     257            4 :         let locked = match lock {
     258            0 :             Some(already_locked) => already_locked,
     259              :             None => {
     260              :                 // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
     261            4 :                 let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
     262            0 :                     return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
     263              :                 };
     264            4 :                 locked
     265              :             }
     266              :         };
     267              : 
     268            4 :         let request = match self {
     269            2 :             Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
     270            2 :                 tenant_id,
     271            2 :                 shards: vec![ComputeHookNotifyRequestShard {
     272            2 :                     shard_number: ShardNumber(0),
     273            2 :                     node_id: unsharded_tenant.node_id,
     274            2 :                 }],
     275            2 :                 stripe_size: None,
     276            2 :                 preferred_az: unsharded_tenant
     277            2 :                     .preferred_az
     278            2 :                     .as_ref()
     279            2 :                     .map(|az| az.0.clone()),
     280            2 :             }),
     281            2 :             Self::Sharded(sharded_tenant)
     282            2 :                 if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
     283            1 :             {
     284            1 :                 Some(ComputeHookNotifyRequest {
     285            1 :                     tenant_id,
     286            1 :                     shards: sharded_tenant
     287            1 :                         .shards
     288            1 :                         .iter()
     289            2 :                         .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
     290            2 :                             shard_number: *shard_number,
     291            2 :                             node_id: *node_id,
     292            2 :                         })
     293            1 :                         .collect(),
     294            1 :                     stripe_size: Some(sharded_tenant.stripe_size),
     295            1 :                     preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()),
     296            1 :                 })
     297              :             }
     298            1 :             Self::Sharded(sharded_tenant) => {
     299            1 :                 // Sharded tenant doesn't yet have information for all its shards
     300            1 : 
     301            1 :                 tracing::info!(
     302            0 :                     "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
     303            0 :                     sharded_tenant.shards.len(),
     304            0 :                     sharded_tenant.shard_count.count()
     305              :                 );
     306            1 :                 None
     307              :             }
     308              :         };
     309              : 
     310            3 :         match request {
     311              :             None => {
     312              :                 // Not yet ready to emit a notification
     313            1 :                 tracing::info!("Tenant isn't yet ready to emit a notification");
     314            1 :                 MaybeSendResult::Noop
     315              :             }
     316            1 :             Some(request)
     317            3 :                 if Some(&request) == locked.as_ref().map(|s| &s.request)
     318            1 :                     && locked.as_ref().map(|s| s.applied).unwrap_or(false) =>
     319            1 :             {
     320            1 :                 tracing::info!(
     321            0 :                     "Skipping notification because remote state already matches ({:?})",
     322            0 :                     &request
     323              :                 );
     324              :                 // No change from the last value successfully sent, and our state indicates that the last
     325              :                 // value sent was fully applied on the control plane side.
     326            1 :                 MaybeSendResult::Noop
     327              :             }
     328            2 :             Some(request) => {
     329            2 :                 // Our request differs from the last one sent, or the last one sent was not fully applied on the compute side
     330            2 :                 MaybeSendResult::Transmit((request, locked))
     331              :             }
     332              :         }
     333            4 :     }
     334              : }
     335              : 
     336              : /// The compute hook is a destination for notifications about changes to tenant:pageserver
     337              : /// mapping.  It aggregates updates for the shards in a tenant, and when appropriate reconfigures
     338              : /// the compute connection string.
     339              : pub(super) struct ComputeHook {
     340              :     config: Config,
     341              :     state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
     342              :     authorization_header: Option<String>,
     343              : 
     344              :     // Concurrency limiter, so that we do not overload the cloud control plane when updating
     345              :     // large numbers of tenants (e.g. when failing over after a node failure)
     346              :     api_concurrency: tokio::sync::Semaphore,
     347              : 
     348              :     // This lock is only used in testing enviroments, to serialize calls into neon_lock
     349              :     neon_local_lock: tokio::sync::Mutex<()>,
     350              : 
     351              :     // We share a client across all notifications to enable connection re-use etc when
     352              :     // sending large numbers of notifications
     353              :     client: reqwest::Client,
     354              : }
     355              : 
     356              : /// Callers may give us a list of these when asking us to send a bulk batch
     357              : /// of notifications in the background.  This is a 'notification' in the sense of
     358              : /// other code notifying us of a shard's status, rather than being the final notification
     359              : /// that we send upwards to the control plane for the whole tenant.
     360              : pub(crate) struct ShardUpdate<'a> {
     361              :     pub(crate) tenant_shard_id: TenantShardId,
     362              :     pub(crate) node_id: NodeId,
     363              :     pub(crate) stripe_size: ShardStripeSize,
     364              :     pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
     365              : }
     366              : 
     367              : impl ComputeHook {
     368            0 :     pub(super) fn new(config: Config) -> anyhow::Result<Self> {
     369            0 :         let authorization_header = config
     370            0 :             .control_plane_jwt_token
     371            0 :             .clone()
     372            0 :             .map(|jwt| format!("Bearer {}", jwt));
     373            0 : 
     374            0 :         let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
     375            0 :         for cert in &config.ssl_ca_certs {
     376            0 :             client = client.add_root_certificate(cert.clone());
     377            0 :         }
     378            0 :         let client = client
     379            0 :             .build()
     380            0 :             .context("Failed to build http client for compute hook")?;
     381              : 
     382            0 :         Ok(Self {
     383            0 :             state: Default::default(),
     384            0 :             config,
     385            0 :             authorization_header,
     386            0 :             neon_local_lock: Default::default(),
     387            0 :             api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
     388            0 :             client,
     389            0 :         })
     390            0 :     }
     391              : 
     392              :     /// For test environments: use neon_local's LocalEnv to update compute
     393            0 :     async fn do_notify_local(
     394            0 :         &self,
     395            0 :         reconfigure_request: &ComputeHookNotifyRequest,
     396            0 :     ) -> Result<(), NotifyError> {
     397              :         // neon_local updates are not safe to call concurrently, use a lock to serialize
     398              :         // all calls to this function
     399            0 :         let _locked = self.neon_local_lock.lock().await;
     400              : 
     401            0 :         let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
     402            0 :             tracing::warn!(
     403            0 :                 "neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
     404              :             );
     405            0 :             return Ok(());
     406              :         };
     407            0 :         let env = match LocalEnv::load_config(repo_dir) {
     408            0 :             Ok(e) => e,
     409            0 :             Err(e) => {
     410            0 :                 tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
     411            0 :                 return Ok(());
     412              :             }
     413              :         };
     414            0 :         let cplane =
     415            0 :             ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
     416            0 :         let ComputeHookNotifyRequest {
     417            0 :             tenant_id,
     418            0 :             shards,
     419            0 :             stripe_size,
     420            0 :             preferred_az: _preferred_az,
     421            0 :         } = reconfigure_request;
     422            0 : 
     423            0 :         let compute_pageservers = shards
     424            0 :             .iter()
     425            0 :             .map(|shard| {
     426            0 :                 let ps_conf = env
     427            0 :                     .get_pageserver_conf(shard.node_id)
     428            0 :                     .expect("Unknown pageserver");
     429            0 :                 let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
     430            0 :                     .expect("Unable to parse listen_pg_addr");
     431            0 :                 (pg_host, pg_port.unwrap_or(5432))
     432            0 :             })
     433            0 :             .collect::<Vec<_>>();
     434              : 
     435            0 :         for (endpoint_name, endpoint) in &cplane.endpoints {
     436            0 :             if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
     437            0 :                 tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
     438            0 :                 endpoint
     439            0 :                     .reconfigure(compute_pageservers.clone(), *stripe_size, None)
     440            0 :                     .await
     441            0 :                     .map_err(NotifyError::NeonLocal)?;
     442            0 :             }
     443              :         }
     444              : 
     445            0 :         Ok(())
     446            0 :     }
     447              : 
     448            0 :     async fn do_notify_iteration(
     449            0 :         &self,
     450            0 :         url: &String,
     451            0 :         reconfigure_request: &ComputeHookNotifyRequest,
     452            0 :         cancel: &CancellationToken,
     453            0 :     ) -> Result<(), NotifyError> {
     454            0 :         let req = self.client.request(reqwest::Method::PUT, url);
     455            0 :         let req = if let Some(value) = &self.authorization_header {
     456            0 :             req.header(reqwest::header::AUTHORIZATION, value)
     457              :         } else {
     458            0 :             req
     459              :         };
     460              : 
     461            0 :         tracing::info!(
     462            0 :             "Sending notify request to {} ({:?})",
     463              :             url,
     464              :             reconfigure_request
     465              :         );
     466            0 :         let send_result = req.json(&reconfigure_request).send().await;
     467            0 :         let response = match send_result {
     468            0 :             Ok(r) => r,
     469            0 :             Err(e) => return Err(e.into()),
     470              :         };
     471              : 
     472              :         // Treat all 2xx responses as success
     473            0 :         if response.status() >= reqwest::StatusCode::OK
     474            0 :             && response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
     475              :         {
     476            0 :             if response.status() != reqwest::StatusCode::OK {
     477              :                 // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
     478              :                 // log a warning.
     479            0 :                 tracing::warn!(
     480            0 :                     "Unexpected 2xx response code {} from control plane",
     481            0 :                     response.status()
     482              :                 );
     483            0 :             }
     484              : 
     485            0 :             return Ok(());
     486            0 :         }
     487            0 : 
     488            0 :         // Error response codes
     489            0 :         match response.status() {
     490              :             reqwest::StatusCode::TOO_MANY_REQUESTS => {
     491              :                 // TODO: 429 handling should be global: set some state visible to other requests
     492              :                 // so that they will delay before starting, rather than all notifications trying
     493              :                 // once before backing off.
     494            0 :                 tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
     495            0 :                     .await
     496            0 :                     .ok();
     497            0 :                 Err(NotifyError::SlowDown)
     498              :             }
     499              :             reqwest::StatusCode::LOCKED => {
     500              :                 // We consider this fatal, because it's possible that the operation blocking the control one is
     501              :                 // also the one that is waiting for this reconcile.  We should let the reconciler calling
     502              :                 // this hook fail, to give control plane a chance to un-lock.
     503            0 :                 tracing::info!("Control plane reports tenant is locked, dropping out of notify");
     504            0 :                 Err(NotifyError::Busy)
     505              :             }
     506              :             reqwest::StatusCode::SERVICE_UNAVAILABLE => {
     507            0 :                 Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
     508              :             }
     509              :             reqwest::StatusCode::GATEWAY_TIMEOUT => {
     510            0 :                 Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
     511              :             }
     512              :             reqwest::StatusCode::BAD_GATEWAY => {
     513            0 :                 Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
     514              :             }
     515              : 
     516            0 :             reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
     517            0 :             reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
     518            0 :             reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
     519            0 :             status => Err(NotifyError::Unexpected(
     520            0 :                 hyper::StatusCode::from_u16(status.as_u16())
     521            0 :                     .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
     522            0 :             )),
     523              :         }
     524            0 :     }
     525              : 
     526            0 :     async fn do_notify(
     527            0 :         &self,
     528            0 :         url: &String,
     529            0 :         reconfigure_request: &ComputeHookNotifyRequest,
     530            0 :         cancel: &CancellationToken,
     531            0 :     ) -> Result<(), NotifyError> {
     532              :         // We hold these semaphore units across all retries, rather than only across each
     533              :         // HTTP request: this is to preserve fairness and avoid a situation where a retry might
     534              :         // time out waiting for a semaphore.
     535            0 :         let _units = self
     536            0 :             .api_concurrency
     537            0 :             .acquire()
     538            0 :             .await
     539              :             // Interpret closed semaphore as shutdown
     540            0 :             .map_err(|_| NotifyError::ShuttingDown)?;
     541              : 
     542            0 :         backoff::retry(
     543            0 :             || self.do_notify_iteration(url, reconfigure_request, cancel),
     544            0 :             |e| {
     545            0 :                 matches!(
     546            0 :                     e,
     547              :                     NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
     548              :                 )
     549            0 :             },
     550            0 :             3,
     551            0 :             10,
     552            0 :             "Send compute notification",
     553            0 :             cancel,
     554            0 :         )
     555            0 :         .await
     556            0 :         .ok_or_else(|| NotifyError::ShuttingDown)
     557            0 :         .and_then(|x| x)
     558            0 :     }
     559              : 
     560              :     /// Synchronous phase: update the per-tenant state for the next intended notification
     561            0 :     fn notify_prepare(&self, shard_update: ShardUpdate) -> MaybeSendResult {
     562            0 :         let mut state_locked = self.state.lock().unwrap();
     563              : 
     564              :         use std::collections::hash_map::Entry;
     565            0 :         let tenant_shard_id = shard_update.tenant_shard_id;
     566              : 
     567            0 :         let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
     568            0 :             Entry::Vacant(e) => {
     569            0 :                 let ShardUpdate {
     570            0 :                     tenant_shard_id,
     571            0 :                     node_id,
     572            0 :                     stripe_size,
     573            0 :                     preferred_az,
     574            0 :                 } = shard_update;
     575            0 :                 e.insert(ComputeHookTenant::new(
     576            0 :                     tenant_shard_id,
     577            0 :                     stripe_size,
     578            0 :                     preferred_az.map(|az| az.into_owned()),
     579            0 :                     node_id,
     580            0 :                 ))
     581              :             }
     582            0 :             Entry::Occupied(e) => {
     583            0 :                 let tenant = e.into_mut();
     584            0 :                 tenant.update(shard_update);
     585            0 :                 tenant
     586              :             }
     587              :         };
     588            0 :         tenant.maybe_send(tenant_shard_id.tenant_id, None)
     589            0 :     }
     590              : 
     591            0 :     async fn notify_execute(
     592            0 :         &self,
     593            0 :         maybe_send_result: MaybeSendResult,
     594            0 :         tenant_shard_id: TenantShardId,
     595            0 :         cancel: &CancellationToken,
     596            0 :     ) -> Result<(), NotifyError> {
     597              :         // Process result: we may get an update to send, or we may have to wait for a lock
     598              :         // before trying again.
     599            0 :         let (request, mut send_lock_guard) = match maybe_send_result {
     600              :             MaybeSendResult::Noop => {
     601            0 :                 return Ok(());
     602              :             }
     603            0 :             MaybeSendResult::AwaitLock(send_lock) => {
     604            0 :                 let send_locked = tokio::select! {
     605            0 :                     guard = send_lock.lock_owned() => {guard},
     606            0 :                     _ = cancel.cancelled() => {
     607            0 :                         return Err(NotifyError::ShuttingDown)
     608              :                     }
     609              :                 };
     610              : 
     611              :                 // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
     612              :                 // we have acquired the send lock and take `[Self::state]` lock.  This is safe because maybe_send only uses
     613              :                 // try_lock.
     614            0 :                 let state_locked = self.state.lock().unwrap();
     615            0 :                 let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
     616            0 :                     return Ok(());
     617              :                 };
     618            0 :                 match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
     619              :                     MaybeSendResult::AwaitLock(_) => {
     620            0 :                         unreachable!("We supplied lock guard")
     621              :                     }
     622              :                     MaybeSendResult::Noop => {
     623            0 :                         return Ok(());
     624              :                     }
     625            0 :                     MaybeSendResult::Transmit((request, lock)) => (request, lock),
     626              :                 }
     627              :             }
     628            0 :             MaybeSendResult::Transmit((request, lock)) => (request, lock),
     629              :         };
     630              : 
     631            0 :         let result = if !self.config.use_local_compute_notifications {
     632            0 :             let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
     633            0 :                 Some(if control_plane_url.ends_with('/') {
     634            0 :                     format!("{control_plane_url}notify-attach")
     635              :                 } else {
     636            0 :                     format!("{control_plane_url}/notify-attach")
     637              :                 })
     638              :             } else {
     639            0 :                 self.config.compute_hook_url.clone()
     640              :             };
     641              : 
     642              :             // We validate this at startup
     643            0 :             let notify_url = compute_hook_url.as_ref().unwrap();
     644            0 :             self.do_notify(notify_url, &request, cancel).await
     645              :         } else {
     646            0 :             self.do_notify_local(&request).await.map_err(|e| {
     647            0 :                 // This path is for testing only, so munge the error into our prod-style error type.
     648            0 :                 tracing::error!("neon_local notification hook failed: {e}");
     649            0 :                 NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
     650            0 :             })
     651              :         };
     652              : 
     653            0 :         match result {
     654            0 :             Ok(_) => {
     655            0 :                 // Before dropping the send lock, stash the request we just sent so that
     656            0 :                 // subsequent callers can avoid redundantly re-sending the same thing.
     657            0 :                 *send_lock_guard = Some(ComputeRemoteState {
     658            0 :                     request,
     659            0 :                     applied: true,
     660            0 :                 });
     661            0 :             }
     662            0 :             Err(NotifyError::Busy) => {
     663            0 :                 // Busy result means that the server responded and has stored the new configuration,
     664            0 :                 // but was not able to fully apply it to the compute
     665            0 :                 *send_lock_guard = Some(ComputeRemoteState {
     666            0 :                     request,
     667            0 :                     applied: false,
     668            0 :                 });
     669            0 :             }
     670            0 :             Err(_) => {
     671            0 :                 // General error case: we can no longer know the remote state, so clear it.  This will result in
     672            0 :                 // the logic in maybe_send recognizing that we should call the hook again.
     673            0 :                 *send_lock_guard = None;
     674            0 :             }
     675              :         }
     676            0 :         result
     677            0 :     }
     678              : 
     679              :     /// Infallible synchronous fire-and-forget version of notify(), that sends its results to
     680              :     /// a channel.  Something should consume the channel and arrange to try notifying again
     681              :     /// if something failed.
     682            0 :     pub(super) fn notify_background(
     683            0 :         self: &Arc<Self>,
     684            0 :         notifications: Vec<ShardUpdate>,
     685            0 :         result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
     686            0 :         cancel: &CancellationToken,
     687            0 :     ) {
     688            0 :         let mut maybe_sends = Vec::new();
     689            0 :         for shard_update in notifications {
     690            0 :             let tenant_shard_id = shard_update.tenant_shard_id;
     691            0 :             let maybe_send_result = self.notify_prepare(shard_update);
     692            0 :             maybe_sends.push((tenant_shard_id, maybe_send_result))
     693              :         }
     694              : 
     695            0 :         let this = self.clone();
     696            0 :         let cancel = cancel.clone();
     697            0 : 
     698            0 :         tokio::task::spawn(async move {
     699            0 :             // Construct an async stream of futures to invoke the compute notify function: we do this
     700            0 :             // in order to subsequently use .buffered() on the stream to execute with bounded parallelism.  The
     701            0 :             // ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
     702            0 :             // would mostly just be waiting on that semaphore.
     703            0 :             let mut stream = futures::stream::iter(maybe_sends)
     704            0 :                 .map(|(tenant_shard_id, maybe_send_result)| {
     705            0 :                     let this = this.clone();
     706            0 :                     let cancel = cancel.clone();
     707              : 
     708            0 :                     async move {
     709            0 :                         this
     710            0 :                             .notify_execute(maybe_send_result, tenant_shard_id, &cancel)
     711            0 :                             .await.map_err(|e| (tenant_shard_id, e))
     712            0 :                     }.instrument(info_span!(
     713            0 :                         "notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
     714              :                     ))
     715            0 :                 })
     716            0 :                 .buffered(API_CONCURRENCY);
     717              : 
     718              :             loop {
     719            0 :                 tokio::select! {
     720            0 :                     next = stream.next() => {
     721            0 :                         match next {
     722            0 :                             Some(r) => {
     723            0 :                                 result_tx.send(r).await.ok();
     724              :                             },
     725              :                             None => {
     726            0 :                                 tracing::info!("Finished sending background compute notifications");
     727            0 :                                 break;
     728              :                             }
     729              :                         }
     730              :                     },
     731            0 :                     _ = cancel.cancelled() => {
     732            0 :                         tracing::info!("Shutdown while running background compute notifications");
     733            0 :                         break;
     734              :                     }
     735              :                 };
     736              :             }
     737            0 :         });
     738            0 :     }
     739              : 
     740              :     /// Call this to notify the compute (postgres) tier of new pageservers to use
     741              :     /// for a tenant.  notify() is called by each shard individually, and this function
     742              :     /// will decide whether an update to the tenant is sent.  An update is sent on the
     743              :     /// condition that:
     744              :     /// - We know a pageserver for every shard.
     745              :     /// - All the shards have the same shard_count (i.e. we are not mid-split)
     746              :     ///
     747              :     /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
     748              :     /// that is cancelled.
     749              :     ///
     750              :     /// This function is fallible, including in the case that the control plane is transiently
     751              :     /// unavailable.  A limited number of retries are done internally to efficiently hide short unavailability
     752              :     /// periods, but we don't retry forever.  The **caller** is responsible for handling failures and
     753              :     /// ensuring that they eventually call again to ensure that the compute is eventually notified of
     754              :     /// the proper pageserver nodes for a tenant.
     755              :     #[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
     756              :     pub(super) async fn notify<'a>(
     757              :         &self,
     758              :         shard_update: ShardUpdate<'a>,
     759              :         cancel: &CancellationToken,
     760              :     ) -> Result<(), NotifyError> {
     761              :         let tenant_shard_id = shard_update.tenant_shard_id;
     762              :         let maybe_send_result = self.notify_prepare(shard_update);
     763              :         self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
     764              :             .await
     765              :     }
     766              : 
     767              :     /// Reflect a detach for a particular shard in the compute hook state.
     768              :     ///
     769              :     /// The goal is to avoid sending compute notifications with stale information (i.e.
     770              :     /// including detach pageservers).
     771              :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
     772              :     pub(super) fn handle_detach(
     773              :         &self,
     774              :         tenant_shard_id: TenantShardId,
     775              :         stripe_size: ShardStripeSize,
     776              :     ) {
     777              :         use std::collections::hash_map::Entry;
     778              : 
     779              :         let mut state_locked = self.state.lock().unwrap();
     780              :         match state_locked.entry(tenant_shard_id.tenant_id) {
     781              :             Entry::Vacant(_) => {
     782              :                 // This is a valid but niche case, where the tenant was previously attached
     783              :                 // as a Secondary location and then detached, so has no previously notified
     784              :                 // state.
     785              :                 tracing::info!("Compute hook tenant not found for detach");
     786              :             }
     787              :             Entry::Occupied(mut e) => {
     788              :                 let sharded = e.get().is_sharded();
     789              :                 if !sharded {
     790              :                     e.remove();
     791              :                 } else {
     792              :                     e.get_mut().remove_shard(tenant_shard_id, stripe_size);
     793              :                 }
     794              : 
     795              :                 tracing::debug!("Compute hook handled shard detach");
     796              :             }
     797              :         }
     798              :     }
     799              : }
     800              : 
     801              : #[cfg(test)]
     802              : pub(crate) mod tests {
     803              :     use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber};
     804              :     use utils::id::TenantId;
     805              : 
     806              :     use super::*;
     807              : 
     808              :     #[test]
     809            1 :     fn tenant_updates() -> anyhow::Result<()> {
     810            1 :         let tenant_id = TenantId::generate();
     811            1 :         let stripe_size = DEFAULT_STRIPE_SIZE;
     812            1 :         let mut tenant_state = ComputeHookTenant::new(
     813            1 :             TenantShardId {
     814            1 :                 tenant_id,
     815            1 :                 shard_count: ShardCount::new(0),
     816            1 :                 shard_number: ShardNumber(0),
     817            1 :             },
     818            1 :             ShardStripeSize(12345),
     819            1 :             None,
     820            1 :             NodeId(1),
     821            1 :         );
     822            1 : 
     823            1 :         // An unsharded tenant is always ready to emit a notification, but won't
     824            1 :         // send the same one twice
     825            1 :         let send_result = tenant_state.maybe_send(tenant_id, None);
     826            1 :         let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
     827            0 :             anyhow::bail!("Wrong send result");
     828              :         };
     829            1 :         assert_eq!(request.shards.len(), 1);
     830            1 :         assert!(request.stripe_size.is_none());
     831              : 
     832              :         // Simulate successful send
     833            1 :         *guard = Some(ComputeRemoteState {
     834            1 :             request,
     835            1 :             applied: true,
     836            1 :         });
     837            1 :         drop(guard);
     838            1 : 
     839            1 :         // Try asking again: this should be a no-op
     840            1 :         let send_result = tenant_state.maybe_send(tenant_id, None);
     841            1 :         assert!(matches!(send_result, MaybeSendResult::Noop));
     842              : 
     843              :         // Writing the first shard of a multi-sharded situation (i.e. in a split)
     844              :         // resets the tenant state and puts it in an non-notifying state (need to
     845              :         // see all shards)
     846            1 :         tenant_state.update(ShardUpdate {
     847            1 :             tenant_shard_id: TenantShardId {
     848            1 :                 tenant_id,
     849            1 :                 shard_count: ShardCount::new(2),
     850            1 :                 shard_number: ShardNumber(1),
     851            1 :             },
     852            1 :             stripe_size,
     853            1 :             preferred_az: None,
     854            1 :             node_id: NodeId(1),
     855            1 :         });
     856            1 :         assert!(matches!(
     857            1 :             tenant_state.maybe_send(tenant_id, None),
     858              :             MaybeSendResult::Noop
     859              :         ));
     860              : 
     861              :         // Writing the second shard makes it ready to notify
     862            1 :         tenant_state.update(ShardUpdate {
     863            1 :             tenant_shard_id: TenantShardId {
     864            1 :                 tenant_id,
     865            1 :                 shard_count: ShardCount::new(2),
     866            1 :                 shard_number: ShardNumber(0),
     867            1 :             },
     868            1 :             stripe_size,
     869            1 :             preferred_az: None,
     870            1 :             node_id: NodeId(1),
     871            1 :         });
     872            1 : 
     873            1 :         let send_result = tenant_state.maybe_send(tenant_id, None);
     874            1 :         let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
     875            0 :             anyhow::bail!("Wrong send result");
     876              :         };
     877            1 :         assert_eq!(request.shards.len(), 2);
     878            1 :         assert_eq!(request.stripe_size, Some(stripe_size));
     879              : 
     880              :         // Simulate successful send
     881            1 :         *guard = Some(ComputeRemoteState {
     882            1 :             request,
     883            1 :             applied: true,
     884            1 :         });
     885            1 :         drop(guard);
     886            1 : 
     887            1 :         Ok(())
     888            1 :     }
     889              : }
        

Generated by: LCOV version 2.1-beta