LCOV - code coverage report
Current view: top level - storage_controller/src - compute_hook.rs (source / functions) Coverage Total Hit
Test: 1d5975439f3c9882b18414799141ebf9a3922c58.info Lines: 26.8 % 578 155
Test Date: 2025-07-31 15:59:03 Functions: 9.9 % 71 7

            Line data    Source code
       1              : use std::borrow::Cow;
       2              : use std::collections::HashMap;
       3              : use std::error::Error as _;
       4              : use std::sync::Arc;
       5              : use std::time::Duration;
       6              : 
       7              : use anyhow::Context;
       8              : use compute_api::spec::PageserverProtocol;
       9              : use compute_api::spec::PageserverShardInfo;
      10              : use control_plane::endpoint::{
      11              :     ComputeControlPlane, EndpointStatus, PageserverConnectionInfo, PageserverShardConnectionInfo,
      12              : };
      13              : use control_plane::local_env::LocalEnv;
      14              : use futures::StreamExt;
      15              : use hyper::StatusCode;
      16              : use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT;
      17              : use pageserver_api::controller_api::AvailabilityZone;
      18              : use pageserver_api::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize, TenantShardId};
      19              : use postgres_connection::parse_host_port;
      20              : use safekeeper_api::membership::SafekeeperGeneration;
      21              : use serde::{Deserialize, Serialize};
      22              : use tokio_util::sync::CancellationToken;
      23              : use tracing::{Instrument, info_span};
      24              : use utils::backoff::{self};
      25              : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
      26              : 
      27              : use crate::service::Config;
      28              : 
      29              : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
      30              : 
      31              : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
      32              : 
      33              : pub(crate) const API_CONCURRENCY: usize = 32;
      34              : 
      35              : struct UnshardedComputeHookTenant {
      36              :     // Which node is this tenant attached to
      37              :     node_id: NodeId,
      38              : 
      39              :     // The tenant's preferred AZ, so that we may pass this on to the control plane
      40              :     preferred_az: Option<AvailabilityZone>,
      41              : 
      42              :     // Must hold this lock to send a notification.
      43              :     send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>>,
      44              : }
      45              : struct ShardedComputeHookTenant {
      46              :     stripe_size: ShardStripeSize,
      47              :     shard_count: ShardCount,
      48              :     shards: Vec<(ShardNumber, NodeId)>,
      49              : 
      50              :     // The tenant's preferred AZ, so that we may pass this on to the control plane
      51              :     preferred_az: Option<AvailabilityZone>,
      52              : 
      53              :     // Must hold this lock to send a notification.  The contents represent
      54              :     // the last successfully sent notification, and are used to coalesce multiple
      55              :     // updates by only sending when there is a chance since our last successful send.
      56              :     send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>>,
      57              : }
      58              : 
      59              : /// Represents our knowledge of the compute's state: we can update this when we get a
      60              : /// response from a notify API call, which tells us what has been applied.
      61              : ///
      62              : /// Should be wrapped in an Option<>, as we cannot always know the remote state.
      63              : #[derive(PartialEq, Eq, Debug)]
      64              : struct ComputeRemoteState<R> {
      65              :     // The request body which was acked by the compute
      66              :     request: R,
      67              : 
      68              :     // Whether the cplane indicated that the state was applied to running computes, or just
      69              :     // persisted.  In the Neon control plane, this is the difference between a 423 response (meaning
      70              :     // persisted but not applied), and a 2xx response (both persisted and applied)
      71              :     applied: bool,
      72              : }
      73              : 
      74              : type ComputeRemoteTenantState = ComputeRemoteState<NotifyAttachRequest>;
      75              : type ComputeRemoteTimelineState = ComputeRemoteState<NotifySafekeepersRequest>;
      76              : 
      77              : /// The trait which define the handler-specific types and methods.
      78              : /// We have two implementations of this trait so far:
      79              : /// - [`ComputeHookTenant`] for tenant attach notifications ("/notify-attach")
      80              : /// - [`ComputeHookTimeline`] for safekeeper change notifications ("/notify-safekeepers")
      81              : trait ApiMethod {
      82              :     /// Type of the key which identifies the resource.
      83              :     /// It's either TenantId for tenant attach notifications,
      84              :     /// or TenantTimelineId for safekeeper change notifications.
      85              :     type Key: std::cmp::Eq + std::hash::Hash + Clone;
      86              : 
      87              :     type Request: serde::Serialize + std::fmt::Debug;
      88              : 
      89              :     const API_PATH: &'static str;
      90              : 
      91              :     fn maybe_send(
      92              :         &self,
      93              :         key: Self::Key,
      94              :         lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState<Self::Request>>>>,
      95              :     ) -> MaybeSendResult<Self::Request, Self::Key>;
      96              : 
      97              :     async fn notify_local(
      98              :         env: &LocalEnv,
      99              :         cplane: &ComputeControlPlane,
     100              :         req: &Self::Request,
     101              :     ) -> Result<(), NotifyError>;
     102              : }
     103              : 
     104              : enum ComputeHookTenant {
     105              :     Unsharded(UnshardedComputeHookTenant),
     106              :     Sharded(ShardedComputeHookTenant),
     107              : }
     108              : 
     109              : impl ComputeHookTenant {
     110              :     /// Construct with at least one shard's information
     111            2 :     fn new(
     112            2 :         tenant_shard_id: TenantShardId,
     113            2 :         stripe_size: ShardStripeSize,
     114            2 :         preferred_az: Option<AvailabilityZone>,
     115            2 :         node_id: NodeId,
     116            2 :     ) -> Self {
     117            2 :         if tenant_shard_id.shard_count.count() > 1 {
     118            1 :             Self::Sharded(ShardedComputeHookTenant {
     119            1 :                 shards: vec![(tenant_shard_id.shard_number, node_id)],
     120            1 :                 stripe_size,
     121            1 :                 shard_count: tenant_shard_id.shard_count,
     122            1 :                 preferred_az,
     123            1 :                 send_lock: Arc::default(),
     124            1 :             })
     125              :         } else {
     126            1 :             Self::Unsharded(UnshardedComputeHookTenant {
     127            1 :                 node_id,
     128            1 :                 preferred_az,
     129            1 :                 send_lock: Arc::default(),
     130            1 :             })
     131              :         }
     132            2 :     }
     133              : 
     134            4 :     fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>> {
     135            4 :         match self {
     136            2 :             Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
     137            2 :             Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
     138              :         }
     139            4 :     }
     140              : 
     141            0 :     fn is_sharded(&self) -> bool {
     142            0 :         matches!(self, ComputeHookTenant::Sharded(_))
     143            0 :     }
     144              : 
     145              :     /// Clear compute hook state for the specified shard.
     146              :     /// Only valid for [`ComputeHookTenant::Sharded`] instances.
     147            0 :     fn remove_shard(&mut self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize) {
     148            0 :         match self {
     149            0 :             ComputeHookTenant::Sharded(sharded) => {
     150            0 :                 if sharded.stripe_size != stripe_size
     151            0 :                     || sharded.shard_count != tenant_shard_id.shard_count
     152              :                 {
     153            0 :                     tracing::warn!("Shard split detected while handling detach")
     154            0 :                 }
     155              : 
     156            0 :                 let shard_idx = sharded.shards.iter().position(|(shard_number, _node_id)| {
     157            0 :                     *shard_number == tenant_shard_id.shard_number
     158            0 :                 });
     159              : 
     160            0 :                 if let Some(shard_idx) = shard_idx {
     161            0 :                     sharded.shards.remove(shard_idx);
     162            0 :                 } else {
     163              :                     // This is a valid but niche case, where the tenant was previously attached
     164              :                     // as a Secondary location and then detached, so has no previously notified
     165              :                     // state.
     166            0 :                     tracing::info!("Shard not found while handling detach")
     167              :                 }
     168              :             }
     169              :             ComputeHookTenant::Unsharded(_) => {
     170            0 :                 unreachable!("Detach of unsharded tenants is handled externally");
     171              :             }
     172              :         }
     173            0 :     }
     174              : 
     175              :     /// Set one shard's location.  If stripe size or shard count have changed, Self is reset
     176              :     /// and drops existing content.
     177            2 :     fn update(&mut self, shard_update: ShardUpdate) {
     178            2 :         let tenant_shard_id = shard_update.tenant_shard_id;
     179            2 :         let node_id = shard_update.node_id;
     180            2 :         let stripe_size = shard_update.stripe_size;
     181            2 :         let preferred_az = shard_update.preferred_az;
     182              : 
     183            1 :         match self {
     184            1 :             Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
     185            0 :                 unsharded_tenant.node_id = node_id;
     186            0 :                 if unsharded_tenant.preferred_az.as_ref()
     187            0 :                     != preferred_az.as_ref().map(|az| az.as_ref())
     188              :                 {
     189            0 :                     unsharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
     190            0 :                 }
     191              :             }
     192            1 :             Self::Sharded(sharded_tenant)
     193            1 :                 if sharded_tenant.stripe_size == stripe_size
     194            1 :                     && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
     195              :             {
     196            1 :                 if let Some(existing) = sharded_tenant
     197            1 :                     .shards
     198            1 :                     .iter()
     199            1 :                     .position(|s| s.0 == tenant_shard_id.shard_number)
     200            0 :                 {
     201            0 :                     sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
     202            0 :                 } else {
     203            1 :                     sharded_tenant
     204            1 :                         .shards
     205            1 :                         .push((tenant_shard_id.shard_number, node_id));
     206            1 :                     sharded_tenant.shards.sort_by_key(|s| s.0)
     207              :                 }
     208              : 
     209            1 :                 if sharded_tenant.preferred_az.as_ref()
     210            1 :                     != preferred_az.as_ref().map(|az| az.as_ref())
     211              :                 {
     212            0 :                     sharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
     213            1 :                 }
     214              :             }
     215              :             _ => {
     216              :                 // Shard count changed: reset struct.
     217            1 :                 *self = Self::new(
     218            1 :                     tenant_shard_id,
     219            1 :                     stripe_size,
     220            1 :                     preferred_az.map(|az| az.into_owned()),
     221            1 :                     node_id,
     222              :                 );
     223              :             }
     224              :         }
     225            2 :     }
     226              : }
     227              : 
     228              : /// The state of a timeline we need to notify the compute about.
     229              : struct ComputeHookTimeline {
     230              :     generation: SafekeeperGeneration,
     231              :     safekeepers: Vec<SafekeeperInfo>,
     232              : 
     233              :     send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTimelineState>>>,
     234              : }
     235              : 
     236              : impl ComputeHookTimeline {
     237              :     /// Construct a new ComputeHookTimeline with the given safekeepers and generation.
     238            0 :     fn new(generation: SafekeeperGeneration, safekeepers: Vec<SafekeeperInfo>) -> Self {
     239            0 :         Self {
     240            0 :             generation,
     241            0 :             safekeepers,
     242            0 :             send_lock: Arc::default(),
     243            0 :         }
     244            0 :     }
     245              : 
     246              :     /// Update the state with a new SafekeepersUpdate.
     247              :     /// Noop if the update generation is not greater than the current generation.
     248            0 :     fn update(&mut self, sk_update: SafekeepersUpdate) {
     249            0 :         if sk_update.generation > self.generation {
     250            0 :             self.generation = sk_update.generation;
     251            0 :             self.safekeepers = sk_update.safekeepers;
     252            0 :         }
     253            0 :     }
     254              : }
     255              : 
     256              : impl ApiMethod for ComputeHookTimeline {
     257              :     type Key = TenantTimelineId;
     258              :     type Request = NotifySafekeepersRequest;
     259              : 
     260              :     const API_PATH: &'static str = "notify-safekeepers";
     261              : 
     262            0 :     fn maybe_send(
     263            0 :         &self,
     264            0 :         ttid: TenantTimelineId,
     265            0 :         lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteTimelineState>>>,
     266            0 :     ) -> MaybeSendNotifySafekeepersResult {
     267            0 :         let locked = match lock {
     268            0 :             Some(already_locked) => already_locked,
     269              :             None => {
     270              :                 // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::timelines`] lock.
     271            0 :                 let Ok(locked) = self.send_lock.clone().try_lock_owned() else {
     272            0 :                     return MaybeSendResult::AwaitLock((ttid, self.send_lock.clone()));
     273              :                 };
     274            0 :                 locked
     275              :             }
     276              :         };
     277              : 
     278            0 :         if locked
     279            0 :             .as_ref()
     280            0 :             .is_some_and(|s| s.request.generation >= self.generation)
     281              :         {
     282            0 :             return MaybeSendResult::Noop;
     283            0 :         }
     284              : 
     285            0 :         MaybeSendResult::Transmit((
     286            0 :             NotifySafekeepersRequest {
     287            0 :                 tenant_id: ttid.tenant_id,
     288            0 :                 timeline_id: ttid.timeline_id,
     289            0 :                 generation: self.generation,
     290            0 :                 safekeepers: self.safekeepers.clone(),
     291            0 :             },
     292            0 :             locked,
     293            0 :         ))
     294            0 :     }
     295              : 
     296            0 :     async fn notify_local(
     297            0 :         _env: &LocalEnv,
     298            0 :         cplane: &ComputeControlPlane,
     299            0 :         req: &NotifySafekeepersRequest,
     300            0 :     ) -> Result<(), NotifyError> {
     301              :         let NotifySafekeepersRequest {
     302            0 :             tenant_id,
     303            0 :             timeline_id,
     304            0 :             generation,
     305            0 :             safekeepers,
     306            0 :         } = req;
     307              : 
     308            0 :         for (endpoint_name, endpoint) in &cplane.endpoints {
     309            0 :             if endpoint.tenant_id == *tenant_id
     310            0 :                 && endpoint.timeline_id == *timeline_id
     311            0 :                 && endpoint.status() == EndpointStatus::Running
     312              :             {
     313            0 :                 tracing::info!("Reconfiguring safekeepers for endpoint {endpoint_name}");
     314              : 
     315            0 :                 let safekeepers = safekeepers.iter().map(|sk| sk.id).collect::<Vec<_>>();
     316              : 
     317            0 :                 endpoint
     318            0 :                     .reconfigure_safekeepers(safekeepers, *generation)
     319            0 :                     .await
     320            0 :                     .map_err(NotifyError::NeonLocal)?;
     321            0 :             }
     322              :         }
     323              : 
     324            0 :         Ok(())
     325            0 :     }
     326              : }
     327              : 
     328            0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
     329              : struct NotifyAttachRequestShard {
     330              :     node_id: NodeId,
     331              :     shard_number: ShardNumber,
     332              : }
     333              : 
     334              : /// Request body that we send to the control plane to notify it of where a tenant is attached
     335            0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
     336              : struct NotifyAttachRequest {
     337              :     tenant_id: TenantId,
     338              :     preferred_az: Option<String>,
     339              :     stripe_size: Option<ShardStripeSize>,
     340              :     shards: Vec<NotifyAttachRequestShard>,
     341              : }
     342              : 
     343            0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
     344              : pub(crate) struct SafekeeperInfo {
     345              :     pub id: NodeId,
     346              :     /// Hostname of the safekeeper.
     347              :     /// It exists for better debuggability. Might be missing.
     348              :     /// Should not be used for anything else.
     349              :     pub hostname: Option<String>,
     350              : }
     351              : 
     352            0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
     353              : struct NotifySafekeepersRequest {
     354              :     tenant_id: TenantId,
     355              :     timeline_id: TimelineId,
     356              :     generation: SafekeeperGeneration,
     357              :     safekeepers: Vec<SafekeeperInfo>,
     358              : }
     359              : 
     360              : /// Error type for attempts to call into the control plane compute notification hook
     361              : #[derive(thiserror::Error, Debug)]
     362              : pub(crate) enum NotifyError {
     363              :     // Request was not send successfully, e.g. transport error
     364            0 :     #[error("Sending request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
     365              :     Request(#[from] reqwest::Error),
     366              :     // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
     367              :     #[error("Control plane tenant busy")]
     368              :     Busy,
     369              :     // Explicit 429 response asking us to retry less frequently
     370              :     #[error("Control plane overloaded")]
     371              :     SlowDown,
     372              :     // A 503 response indicates the control plane can't handle the request right now
     373              :     #[error("Control plane unavailable (status {0})")]
     374              :     Unavailable(StatusCode),
     375              :     // API returned unexpected non-success status.  We will retry, but log a warning.
     376              :     #[error("Control plane returned unexpected status {0}")]
     377              :     Unexpected(StatusCode),
     378              :     // We shutdown while sending
     379              :     #[error("Shutting down")]
     380              :     ShuttingDown,
     381              :     // A response indicates we will never succeed, such as 400 or 403
     382              :     #[error("Non-retryable error {0}")]
     383              :     Fatal(StatusCode),
     384              : 
     385              :     #[error("neon_local error: {0}")]
     386              :     NeonLocal(anyhow::Error),
     387              : }
     388              : 
     389              : enum MaybeSendResult<R, K> {
     390              :     // Please send this request while holding the lock, and if you succeed then write
     391              :     // the request into the lock.
     392              :     Transmit(
     393              :         (
     394              :             R,
     395              :             tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState<R>>>,
     396              :         ),
     397              :     ),
     398              :     // Something requires sending, but you must wait for a current sender then call again
     399              :     AwaitLock((K, Arc<tokio::sync::Mutex<Option<ComputeRemoteState<R>>>>)),
     400              :     // Nothing requires sending
     401              :     Noop,
     402              : }
     403              : 
     404              : type MaybeSendNotifyAttachResult = MaybeSendResult<NotifyAttachRequest, TenantId>;
     405              : type MaybeSendNotifySafekeepersResult = MaybeSendResult<NotifySafekeepersRequest, TenantTimelineId>;
     406              : 
     407              : impl ApiMethod for ComputeHookTenant {
     408              :     type Key = TenantId;
     409              :     type Request = NotifyAttachRequest;
     410              : 
     411              :     const API_PATH: &'static str = "notify-attach";
     412              : 
     413            4 :     fn maybe_send(
     414            4 :         &self,
     415            4 :         tenant_id: TenantId,
     416            4 :         lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteTenantState>>>,
     417            4 :     ) -> MaybeSendNotifyAttachResult {
     418            4 :         let locked = match lock {
     419            0 :             Some(already_locked) => already_locked,
     420              :             None => {
     421              :                 // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::tenants`] lock.
     422            4 :                 let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
     423            0 :                     return MaybeSendResult::AwaitLock((tenant_id, self.get_send_lock().clone()));
     424              :                 };
     425            4 :                 locked
     426              :             }
     427              :         };
     428              : 
     429            4 :         let request = match self {
     430            2 :             Self::Unsharded(unsharded_tenant) => Some(NotifyAttachRequest {
     431            2 :                 tenant_id,
     432            2 :                 shards: vec![NotifyAttachRequestShard {
     433            2 :                     shard_number: ShardNumber(0),
     434            2 :                     node_id: unsharded_tenant.node_id,
     435            2 :                 }],
     436            2 :                 stripe_size: None,
     437            2 :                 preferred_az: unsharded_tenant
     438            2 :                     .preferred_az
     439            2 :                     .as_ref()
     440            2 :                     .map(|az| az.0.clone()),
     441              :             }),
     442            2 :             Self::Sharded(sharded_tenant)
     443            2 :                 if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
     444              :             {
     445              :                 Some(NotifyAttachRequest {
     446            1 :                     tenant_id,
     447            1 :                     shards: sharded_tenant
     448            1 :                         .shards
     449            1 :                         .iter()
     450            1 :                         .map(|(shard_number, node_id)| NotifyAttachRequestShard {
     451            2 :                             shard_number: *shard_number,
     452            2 :                             node_id: *node_id,
     453            2 :                         })
     454            1 :                         .collect(),
     455            1 :                     stripe_size: Some(sharded_tenant.stripe_size),
     456            1 :                     preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()),
     457              :                 })
     458              :             }
     459            1 :             Self::Sharded(sharded_tenant) => {
     460              :                 // Sharded tenant doesn't yet have information for all its shards
     461              : 
     462            1 :                 tracing::info!(
     463            0 :                     "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
     464            0 :                     sharded_tenant.shards.len(),
     465            0 :                     sharded_tenant.shard_count.count()
     466              :                 );
     467            1 :                 None
     468              :             }
     469              :         };
     470              : 
     471            3 :         match request {
     472              :             None => {
     473              :                 // Not yet ready to emit a notification
     474            1 :                 tracing::info!("Tenant isn't yet ready to emit a notification");
     475            1 :                 MaybeSendResult::Noop
     476              :             }
     477            1 :             Some(request)
     478            3 :                 if Some(&request) == locked.as_ref().map(|s| &s.request)
     479            1 :                     && locked.as_ref().map(|s| s.applied).unwrap_or(false) =>
     480              :             {
     481            1 :                 tracing::info!(
     482            0 :                     "Skipping notification because remote state already matches ({:?})",
     483            0 :                     &request
     484              :                 );
     485              :                 // No change from the last value successfully sent, and our state indicates that the last
     486              :                 // value sent was fully applied on the control plane side.
     487            1 :                 MaybeSendResult::Noop
     488              :             }
     489            2 :             Some(request) => {
     490              :                 // Our request differs from the last one sent, or the last one sent was not fully applied on the compute side
     491            2 :                 MaybeSendResult::Transmit((request, locked))
     492              :             }
     493              :         }
     494            4 :     }
     495              : 
     496            0 :     async fn notify_local(
     497            0 :         env: &LocalEnv,
     498            0 :         cplane: &ComputeControlPlane,
     499            0 :         req: &NotifyAttachRequest,
     500            0 :     ) -> Result<(), NotifyError> {
     501              :         let NotifyAttachRequest {
     502            0 :             tenant_id,
     503            0 :             shards,
     504            0 :             stripe_size,
     505            0 :             preferred_az: _preferred_az,
     506            0 :         } = req;
     507              : 
     508            0 :         for (endpoint_name, endpoint) in &cplane.endpoints {
     509            0 :             if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
     510            0 :                 tracing::info!("Reconfiguring pageservers for endpoint {endpoint_name}");
     511              : 
     512            0 :                 let shard_count = match shards.len() {
     513            0 :                     1 => ShardCount::unsharded(),
     514            0 :                     n => ShardCount(n.try_into().expect("too many shards")),
     515              :                 };
     516              : 
     517            0 :                 let mut shard_infos: HashMap<ShardIndex, PageserverShardInfo> = HashMap::new();
     518              : 
     519            0 :                 let prefer_protocol = if endpoint.grpc {
     520            0 :                     PageserverProtocol::Grpc
     521              :                 } else {
     522            0 :                     PageserverProtocol::Libpq
     523              :                 };
     524              : 
     525            0 :                 for shard in shards.iter() {
     526            0 :                     let ps_conf = env
     527            0 :                         .get_pageserver_conf(shard.node_id)
     528            0 :                         .expect("Unknown pageserver");
     529              : 
     530            0 :                     let libpq_url = Some({
     531            0 :                         let (host, port) = parse_host_port(&ps_conf.listen_pg_addr)
     532            0 :                             .expect("Unable to parse listen_pg_addr");
     533            0 :                         let port = port.unwrap_or(5432);
     534            0 :                         format!("postgres://no_user@{host}:{port}")
     535            0 :                     });
     536            0 :                     let grpc_url = if let Some(grpc_addr) = &ps_conf.listen_grpc_addr {
     537            0 :                         let (host, port) =
     538            0 :                             parse_host_port(grpc_addr).expect("invalid gRPC address");
     539            0 :                         let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT);
     540            0 :                         Some(format!("grpc://no_user@{host}:{port}"))
     541              :                     } else {
     542            0 :                         None
     543              :                     };
     544            0 :                     let pageserver = PageserverShardConnectionInfo {
     545            0 :                         id: Some(shard.node_id),
     546            0 :                         libpq_url,
     547            0 :                         grpc_url,
     548            0 :                     };
     549            0 :                     let shard_info = PageserverShardInfo {
     550            0 :                         pageservers: vec![pageserver],
     551            0 :                     };
     552            0 :                     shard_infos.insert(
     553            0 :                         ShardIndex {
     554            0 :                             shard_number: shard.shard_number,
     555            0 :                             shard_count,
     556            0 :                         },
     557            0 :                         shard_info,
     558              :                     );
     559              :                 }
     560              : 
     561            0 :                 let pageserver_conninfo = PageserverConnectionInfo {
     562            0 :                     shard_count,
     563            0 :                     stripe_size: stripe_size.map(|val| ShardStripeSize(val.0)),
     564            0 :                     shards: shard_infos,
     565            0 :                     prefer_protocol,
     566              :                 };
     567              : 
     568            0 :                 endpoint
     569            0 :                     .reconfigure_pageservers(&pageserver_conninfo)
     570            0 :                     .await
     571            0 :                     .map_err(NotifyError::NeonLocal)?;
     572            0 :             }
     573              :         }
     574              : 
     575            0 :         Ok(())
     576            0 :     }
     577              : }
     578              : 
     579              : /// The compute hook is a destination for notifications about changes to tenant:pageserver
     580              : /// mapping.  It aggregates updates for the shards in a tenant, and when appropriate reconfigures
     581              : /// the compute connection string.
     582              : pub(super) struct ComputeHook {
     583              :     config: Config,
     584              :     tenants: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
     585              :     timelines: std::sync::Mutex<HashMap<TenantTimelineId, ComputeHookTimeline>>,
     586              :     authorization_header: Option<String>,
     587              : 
     588              :     // Concurrency limiter, so that we do not overload the cloud control plane when updating
     589              :     // large numbers of tenants (e.g. when failing over after a node failure)
     590              :     api_concurrency: tokio::sync::Semaphore,
     591              : 
     592              :     // This lock is only used in testing enviroments, to serialize calls into neon_local
     593              :     neon_local_lock: tokio::sync::Mutex<()>,
     594              : 
     595              :     // We share a client across all notifications to enable connection re-use etc when
     596              :     // sending large numbers of notifications
     597              :     client: reqwest::Client,
     598              : }
     599              : 
     600              : /// Callers may give us a list of these when asking us to send a bulk batch
     601              : /// of notifications in the background.  This is a 'notification' in the sense of
     602              : /// other code notifying us of a shard's status, rather than being the final notification
     603              : /// that we send upwards to the control plane for the whole tenant.
     604              : pub(crate) struct ShardUpdate<'a> {
     605              :     pub(crate) tenant_shard_id: TenantShardId,
     606              :     pub(crate) node_id: NodeId,
     607              :     pub(crate) stripe_size: ShardStripeSize,
     608              :     pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
     609              : }
     610              : 
     611              : pub(crate) struct SafekeepersUpdate {
     612              :     pub(crate) tenant_id: TenantId,
     613              :     pub(crate) timeline_id: TimelineId,
     614              :     pub(crate) generation: SafekeeperGeneration,
     615              :     pub(crate) safekeepers: Vec<SafekeeperInfo>,
     616              : }
     617              : 
     618              : impl ComputeHook {
     619            0 :     pub(super) fn new(config: Config) -> anyhow::Result<Self> {
     620            0 :         let authorization_header = config
     621            0 :             .control_plane_jwt_token
     622            0 :             .clone()
     623            0 :             .map(|jwt| format!("Bearer {jwt}"));
     624              : 
     625            0 :         let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
     626            0 :         for cert in &config.ssl_ca_certs {
     627            0 :             client = client.add_root_certificate(cert.clone());
     628            0 :         }
     629            0 :         let client = client
     630            0 :             .build()
     631            0 :             .context("Failed to build http client for compute hook")?;
     632              : 
     633            0 :         Ok(Self {
     634            0 :             tenants: Default::default(),
     635            0 :             timelines: Default::default(),
     636            0 :             config,
     637            0 :             authorization_header,
     638            0 :             neon_local_lock: Default::default(),
     639            0 :             api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
     640            0 :             client,
     641            0 :         })
     642            0 :     }
     643              : 
     644              :     /// For test environments: use neon_local's LocalEnv to update compute
     645            0 :     async fn do_notify_local<M: ApiMethod>(&self, req: &M::Request) -> Result<(), NotifyError> {
     646              :         // neon_local updates are not safe to call concurrently, use a lock to serialize
     647              :         // all calls to this function
     648            0 :         let _locked = self.neon_local_lock.lock().await;
     649              : 
     650            0 :         let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
     651            0 :             tracing::warn!(
     652            0 :                 "neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
     653              :             );
     654            0 :             return Ok(());
     655              :         };
     656            0 :         let env = match LocalEnv::load_config(repo_dir) {
     657            0 :             Ok(e) => e,
     658            0 :             Err(e) => {
     659            0 :                 tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
     660            0 :                 return Ok(());
     661              :             }
     662              :         };
     663            0 :         let cplane =
     664            0 :             ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
     665              : 
     666            0 :         M::notify_local(&env, &cplane, req).await
     667            0 :     }
     668              : 
     669            0 :     async fn do_notify_iteration<Req: serde::Serialize + std::fmt::Debug>(
     670            0 :         &self,
     671            0 :         url: &String,
     672            0 :         reconfigure_request: &Req,
     673            0 :         cancel: &CancellationToken,
     674            0 :     ) -> Result<(), NotifyError> {
     675            0 :         let req = self.client.request(reqwest::Method::PUT, url);
     676            0 :         let req = if let Some(value) = &self.authorization_header {
     677            0 :             req.header(reqwest::header::AUTHORIZATION, value)
     678              :         } else {
     679            0 :             req
     680              :         };
     681              : 
     682            0 :         tracing::info!(
     683            0 :             "Sending notify request to {} ({:?})",
     684              :             url,
     685              :             reconfigure_request
     686              :         );
     687            0 :         let send_result = req.json(&reconfigure_request).send().await;
     688            0 :         let response = match send_result {
     689            0 :             Ok(r) => r,
     690            0 :             Err(e) => return Err(e.into()),
     691              :         };
     692              : 
     693              :         // Treat all 2xx responses as success
     694            0 :         if response.status().is_success() {
     695            0 :             if response.status() != reqwest::StatusCode::OK {
     696              :                 // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
     697              :                 // log a warning.
     698            0 :                 tracing::warn!(
     699            0 :                     "Unexpected 2xx response code {} from control plane",
     700            0 :                     response.status()
     701              :                 );
     702            0 :             }
     703              : 
     704            0 :             return Ok(());
     705            0 :         }
     706              : 
     707              :         // Error response codes
     708            0 :         match response.status() {
     709              :             reqwest::StatusCode::TOO_MANY_REQUESTS => {
     710              :                 // TODO: 429 handling should be global: set some state visible to other requests
     711              :                 // so that they will delay before starting, rather than all notifications trying
     712              :                 // once before backing off.
     713            0 :                 tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
     714            0 :                     .await
     715            0 :                     .ok();
     716            0 :                 Err(NotifyError::SlowDown)
     717              :             }
     718              :             reqwest::StatusCode::LOCKED => {
     719              :                 // We consider this fatal, because it's possible that the operation blocking the control one is
     720              :                 // also the one that is waiting for this reconcile.  We should let the reconciler calling
     721              :                 // this hook fail, to give control plane a chance to un-lock.
     722            0 :                 tracing::info!("Control plane reports tenant is locked, dropping out of notify");
     723            0 :                 Err(NotifyError::Busy)
     724              :             }
     725              :             reqwest::StatusCode::SERVICE_UNAVAILABLE => {
     726            0 :                 Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
     727              :             }
     728              :             reqwest::StatusCode::GATEWAY_TIMEOUT => {
     729            0 :                 Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
     730              :             }
     731              :             reqwest::StatusCode::BAD_GATEWAY => {
     732            0 :                 Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
     733              :             }
     734              : 
     735            0 :             reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
     736            0 :             reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
     737            0 :             reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
     738            0 :             status => Err(NotifyError::Unexpected(
     739            0 :                 hyper::StatusCode::from_u16(status.as_u16())
     740            0 :                     .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
     741            0 :             )),
     742              :         }
     743            0 :     }
     744              : 
     745            0 :     async fn do_notify<R: serde::Serialize + std::fmt::Debug>(
     746            0 :         &self,
     747            0 :         url: &String,
     748            0 :         reconfigure_request: &R,
     749            0 :         cancel: &CancellationToken,
     750            0 :     ) -> Result<(), NotifyError> {
     751              :         // We hold these semaphore units across all retries, rather than only across each
     752              :         // HTTP request: this is to preserve fairness and avoid a situation where a retry might
     753              :         // time out waiting for a semaphore.
     754            0 :         let _units = self
     755            0 :             .api_concurrency
     756            0 :             .acquire()
     757            0 :             .await
     758              :             // Interpret closed semaphore as shutdown
     759            0 :             .map_err(|_| NotifyError::ShuttingDown)?;
     760              : 
     761            0 :         backoff::retry(
     762            0 :             || self.do_notify_iteration(url, reconfigure_request, cancel),
     763            0 :             |e| {
     764            0 :                 matches!(
     765            0 :                     e,
     766              :                     NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
     767              :                 )
     768            0 :             },
     769              :             3,
     770              :             10,
     771            0 :             "Send compute notification",
     772            0 :             cancel,
     773              :         )
     774            0 :         .await
     775            0 :         .ok_or_else(|| NotifyError::ShuttingDown)
     776            0 :         .and_then(|x| x)
     777            0 :     }
     778              : 
     779              :     /// Synchronous phase: update the per-tenant state for the next intended notification
     780            0 :     fn notify_attach_prepare(&self, shard_update: ShardUpdate) -> MaybeSendNotifyAttachResult {
     781            0 :         let mut tenants_locked = self.tenants.lock().unwrap();
     782              : 
     783              :         use std::collections::hash_map::Entry;
     784            0 :         let tenant_shard_id = shard_update.tenant_shard_id;
     785              : 
     786            0 :         let tenant = match tenants_locked.entry(tenant_shard_id.tenant_id) {
     787            0 :             Entry::Vacant(e) => {
     788              :                 let ShardUpdate {
     789            0 :                     tenant_shard_id,
     790            0 :                     node_id,
     791            0 :                     stripe_size,
     792            0 :                     preferred_az,
     793            0 :                 } = shard_update;
     794            0 :                 e.insert(ComputeHookTenant::new(
     795            0 :                     tenant_shard_id,
     796            0 :                     stripe_size,
     797            0 :                     preferred_az.map(|az| az.into_owned()),
     798            0 :                     node_id,
     799              :                 ))
     800              :             }
     801            0 :             Entry::Occupied(e) => {
     802            0 :                 let tenant = e.into_mut();
     803            0 :                 tenant.update(shard_update);
     804            0 :                 tenant
     805              :             }
     806              :         };
     807            0 :         tenant.maybe_send(tenant_shard_id.tenant_id, None)
     808            0 :     }
     809              : 
     810            0 :     fn notify_safekeepers_prepare(
     811            0 :         &self,
     812            0 :         safekeepers_update: SafekeepersUpdate,
     813            0 :     ) -> MaybeSendNotifySafekeepersResult {
     814            0 :         let mut timelines_locked = self.timelines.lock().unwrap();
     815              : 
     816            0 :         let ttid = TenantTimelineId {
     817            0 :             tenant_id: safekeepers_update.tenant_id,
     818            0 :             timeline_id: safekeepers_update.timeline_id,
     819            0 :         };
     820              : 
     821              :         use std::collections::hash_map::Entry;
     822            0 :         let timeline = match timelines_locked.entry(ttid) {
     823            0 :             Entry::Vacant(e) => e.insert(ComputeHookTimeline::new(
     824            0 :                 safekeepers_update.generation,
     825            0 :                 safekeepers_update.safekeepers,
     826              :             )),
     827            0 :             Entry::Occupied(e) => {
     828            0 :                 let timeline = e.into_mut();
     829            0 :                 timeline.update(safekeepers_update);
     830            0 :                 timeline
     831              :             }
     832              :         };
     833              : 
     834            0 :         timeline.maybe_send(ttid, None)
     835            0 :     }
     836              : 
     837            0 :     async fn notify_execute<M: ApiMethod>(
     838            0 :         &self,
     839            0 :         state: &std::sync::Mutex<HashMap<M::Key, M>>,
     840            0 :         maybe_send_result: MaybeSendResult<M::Request, M::Key>,
     841            0 :         cancel: &CancellationToken,
     842            0 :     ) -> Result<(), NotifyError> {
     843              :         // Process result: we may get an update to send, or we may have to wait for a lock
     844              :         // before trying again.
     845            0 :         let (request, mut send_lock_guard) = match maybe_send_result {
     846              :             MaybeSendResult::Noop => {
     847            0 :                 return Ok(());
     848              :             }
     849            0 :             MaybeSendResult::AwaitLock((key, send_lock)) => {
     850            0 :                 let send_locked = tokio::select! {
     851            0 :                     guard = send_lock.lock_owned() => {guard},
     852            0 :                     _ = cancel.cancelled() => {
     853            0 :                         tracing::info!("Notification cancelled while waiting for lock");
     854            0 :                         return Err(NotifyError::ShuttingDown)
     855              :                     }
     856              :                 };
     857              : 
     858              :                 // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
     859              :                 // we have acquired the send lock and take `[Self::state]` lock.  This is safe because maybe_send only uses
     860              :                 // try_lock.
     861            0 :                 let state_locked = state.lock().unwrap();
     862            0 :                 let Some(resource_state) = state_locked.get(&key) else {
     863            0 :                     return Ok(());
     864              :                 };
     865            0 :                 match resource_state.maybe_send(key, Some(send_locked)) {
     866              :                     MaybeSendResult::AwaitLock(_) => {
     867            0 :                         unreachable!("We supplied lock guard")
     868              :                     }
     869              :                     MaybeSendResult::Noop => {
     870            0 :                         return Ok(());
     871              :                     }
     872            0 :                     MaybeSendResult::Transmit((request, lock)) => (request, lock),
     873              :                 }
     874              :             }
     875            0 :             MaybeSendResult::Transmit((request, lock)) => (request, lock),
     876              :         };
     877              : 
     878            0 :         let result = if !self.config.use_local_compute_notifications {
     879            0 :             let compute_hook_url =
     880            0 :                 self.config
     881            0 :                     .control_plane_url
     882            0 :                     .as_ref()
     883            0 :                     .map(|control_plane_url| {
     884            0 :                         format!(
     885            0 :                             "{}/{}",
     886            0 :                             control_plane_url.trim_end_matches('/'),
     887              :                             M::API_PATH
     888              :                         )
     889            0 :                     });
     890              : 
     891              :             // We validate this at startup
     892            0 :             let notify_url = compute_hook_url.as_ref().unwrap();
     893            0 :             self.do_notify(notify_url, &request, cancel).await
     894              :         } else {
     895            0 :             match self.do_notify_local::<M>(&request).await.map_err(|e| {
     896              :                 // This path is for testing only, so munge the error into our prod-style error type.
     897            0 :                 if e.to_string().contains("refresh-configuration-pending") {
     898              :                     // If the error message mentions "refresh-configuration-pending", it means the compute node
     899              :                     // rejected our notification request because it already trying to reconfigure itself. We
     900              :                     // can proceed with the rest of the reconcliation process as the compute node already
     901              :                     // discovers the need to reconfigure and will eventually update its configuration once
     902              :                     // we update the pageserver mappings. In fact, it is important that we continue with
     903              :                     // reconcliation to make sure we update the pageserver mappings to unblock the compute node.
     904            0 :                     tracing::info!("neon_local notification hook failed: {e}");
     905            0 :                     tracing::info!("Notification failed likely due to compute node self-reconfiguration, will retry.");
     906            0 :                     Ok(())
     907              :                 } else {
     908            0 :                     tracing::error!("neon_local notification hook failed: {e}");
     909            0 :                     Err(NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR))
     910              :                 }
     911            0 :             }) {
     912              :                 // Compute node accepted the notification request. Ok to proceed.
     913            0 :                 Ok(_) => Ok(()),
     914              :                 // Compute node rejected our request but it is already self-reconfiguring. Ok to proceed.
     915            0 :                 Err(Ok(_)) => Ok(()),
     916              :                 // Fail the reconciliation attempt in all other cases. Recall that this whole code path involving
     917              :                 // neon_local is for testing only. In production we always retry failed reconcliations so we
     918              :                 // don't have any deadends here.
     919            0 :                 Err(Err(e)) => Err(e),
     920              :             }
     921              :         };
     922              : 
     923            0 :         match result {
     924            0 :             Ok(_) => {
     925            0 :                 // Before dropping the send lock, stash the request we just sent so that
     926            0 :                 // subsequent callers can avoid redundantly re-sending the same thing.
     927            0 :                 *send_lock_guard = Some(ComputeRemoteState {
     928            0 :                     request,
     929            0 :                     applied: true,
     930            0 :                 });
     931            0 :             }
     932            0 :             Err(NotifyError::Busy) => {
     933            0 :                 // Busy result means that the server responded and has stored the new configuration,
     934            0 :                 // but was not able to fully apply it to the compute
     935            0 :                 *send_lock_guard = Some(ComputeRemoteState {
     936            0 :                     request,
     937            0 :                     applied: false,
     938            0 :                 });
     939            0 :             }
     940            0 :             Err(_) => {
     941            0 :                 // General error case: we can no longer know the remote state, so clear it.  This will result in
     942            0 :                 // the logic in maybe_send recognizing that we should call the hook again.
     943            0 :                 *send_lock_guard = None;
     944            0 :             }
     945              :         }
     946            0 :         result
     947            0 :     }
     948              : 
     949              :     /// Infallible synchronous fire-and-forget version of notify(), that sends its results to
     950              :     /// a channel.  Something should consume the channel and arrange to try notifying again
     951              :     /// if something failed.
     952            0 :     pub(super) fn notify_attach_background(
     953            0 :         self: &Arc<Self>,
     954            0 :         notifications: Vec<ShardUpdate>,
     955            0 :         result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
     956            0 :         cancel: &CancellationToken,
     957            0 :     ) {
     958            0 :         let mut maybe_sends = Vec::new();
     959            0 :         for shard_update in notifications {
     960            0 :             let tenant_shard_id = shard_update.tenant_shard_id;
     961            0 :             let maybe_send_result = self.notify_attach_prepare(shard_update);
     962            0 :             maybe_sends.push((tenant_shard_id, maybe_send_result))
     963              :         }
     964              : 
     965            0 :         let this = self.clone();
     966            0 :         let cancel = cancel.clone();
     967              : 
     968            0 :         tokio::task::spawn(async move {
     969              :             // Construct an async stream of futures to invoke the compute notify function: we do this
     970              :             // in order to subsequently use .buffered() on the stream to execute with bounded parallelism.  The
     971              :             // ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
     972              :             // would mostly just be waiting on that semaphore.
     973            0 :             let mut stream = futures::stream::iter(maybe_sends)
     974            0 :                 .map(|(tenant_shard_id, maybe_send_result)| {
     975            0 :                     let this = this.clone();
     976            0 :                     let cancel = cancel.clone();
     977              : 
     978            0 :                     async move {
     979            0 :                         this
     980            0 :                             .notify_execute(&this.tenants, maybe_send_result, &cancel)
     981            0 :                             .await.map_err(|e| (tenant_shard_id, e))
     982            0 :                     }.instrument(info_span!(
     983            0 :                         "notify_attach_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
     984              :                     ))
     985            0 :                 })
     986            0 :                 .buffered(API_CONCURRENCY);
     987              : 
     988              :             loop {
     989            0 :                 tokio::select! {
     990            0 :                     next = stream.next() => {
     991            0 :                         match next {
     992            0 :                             Some(r) => {
     993            0 :                                 result_tx.send(r).await.ok();
     994              :                             },
     995              :                             None => {
     996            0 :                                 tracing::info!("Finished sending background compute notifications");
     997            0 :                                 break;
     998              :                             }
     999              :                         }
    1000              :                     },
    1001            0 :                     _ = cancel.cancelled() => {
    1002            0 :                         tracing::info!("Shutdown while running background compute notifications");
    1003            0 :                         break;
    1004              :                     }
    1005              :                 };
    1006              :             }
    1007            0 :         });
    1008            0 :     }
    1009              : 
    1010              :     /// Call this to notify the compute (postgres) tier of new pageservers to use
    1011              :     /// for a tenant.  notify() is called by each shard individually, and this function
    1012              :     /// will decide whether an update to the tenant is sent.  An update is sent on the
    1013              :     /// condition that:
    1014              :     /// - We know a pageserver for every shard.
    1015              :     /// - All the shards have the same shard_count (i.e. we are not mid-split)
    1016              :     ///
    1017              :     /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
    1018              :     /// that is cancelled.
    1019              :     ///
    1020              :     /// This function is fallible, including in the case that the control plane is transiently
    1021              :     /// unavailable.  A limited number of retries are done internally to efficiently hide short unavailability
    1022              :     /// periods, but we don't retry forever.  The **caller** is responsible for handling failures and
    1023              :     /// ensuring that they eventually call again to ensure that the compute is eventually notified of
    1024              :     /// the proper pageserver nodes for a tenant.
    1025              :     #[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
    1026              :     pub(super) async fn notify_attach<'a>(
    1027              :         &self,
    1028              :         shard_update: ShardUpdate<'a>,
    1029              :         cancel: &CancellationToken,
    1030              :     ) -> Result<(), NotifyError> {
    1031              :         let maybe_send_result = self.notify_attach_prepare(shard_update);
    1032              :         self.notify_execute(&self.tenants, maybe_send_result, cancel)
    1033              :             .await
    1034              :     }
    1035              : 
    1036            0 :     pub(super) async fn notify_safekeepers(
    1037            0 :         &self,
    1038            0 :         safekeepers_update: SafekeepersUpdate,
    1039            0 :         cancel: &CancellationToken,
    1040            0 :     ) -> Result<(), NotifyError> {
    1041            0 :         let maybe_send_result = self.notify_safekeepers_prepare(safekeepers_update);
    1042            0 :         self.notify_execute(&self.timelines, maybe_send_result, cancel)
    1043            0 :             .await
    1044            0 :     }
    1045              : 
    1046              :     /// Reflect a detach for a particular shard in the compute hook state.
    1047              :     ///
    1048              :     /// The goal is to avoid sending compute notifications with stale information (i.e.
    1049              :     /// including detach pageservers).
    1050              :     #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
    1051              :     pub(super) fn handle_detach(
    1052              :         &self,
    1053              :         tenant_shard_id: TenantShardId,
    1054              :         stripe_size: ShardStripeSize,
    1055              :     ) {
    1056              :         use std::collections::hash_map::Entry;
    1057              : 
    1058              :         let mut tenants_locked = self.tenants.lock().unwrap();
    1059              :         match tenants_locked.entry(tenant_shard_id.tenant_id) {
    1060              :             Entry::Vacant(_) => {
    1061              :                 // This is a valid but niche case, where the tenant was previously attached
    1062              :                 // as a Secondary location and then detached, so has no previously notified
    1063              :                 // state.
    1064              :                 tracing::info!("Compute hook tenant not found for detach");
    1065              :             }
    1066              :             Entry::Occupied(mut e) => {
    1067              :                 let sharded = e.get().is_sharded();
    1068              :                 if !sharded {
    1069              :                     e.remove();
    1070              :                 } else {
    1071              :                     e.get_mut().remove_shard(tenant_shard_id, stripe_size);
    1072              :                 }
    1073              : 
    1074              :                 tracing::debug!("Compute hook handled shard detach");
    1075              :             }
    1076              :         }
    1077              :     }
    1078              : }
    1079              : 
    1080              : #[cfg(test)]
    1081              : pub(crate) mod tests {
    1082              :     use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber};
    1083              :     use utils::id::TenantId;
    1084              : 
    1085              :     use super::*;
    1086              : 
    1087              :     #[test]
    1088            1 :     fn tenant_updates() -> anyhow::Result<()> {
    1089            1 :         let tenant_id = TenantId::generate();
    1090            1 :         let stripe_size = DEFAULT_STRIPE_SIZE;
    1091            1 :         let mut tenant_state = ComputeHookTenant::new(
    1092            1 :             TenantShardId {
    1093            1 :                 tenant_id,
    1094            1 :                 shard_count: ShardCount::new(0),
    1095            1 :                 shard_number: ShardNumber(0),
    1096            1 :             },
    1097            1 :             ShardStripeSize(12345),
    1098            1 :             None,
    1099            1 :             NodeId(1),
    1100              :         );
    1101              : 
    1102              :         // An unsharded tenant is always ready to emit a notification, but won't
    1103              :         // send the same one twice
    1104            1 :         let send_result = tenant_state.maybe_send(tenant_id, None);
    1105            1 :         let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
    1106            0 :             anyhow::bail!("Wrong send result");
    1107              :         };
    1108            1 :         assert_eq!(request.shards.len(), 1);
    1109            1 :         assert!(request.stripe_size.is_none());
    1110              : 
    1111              :         // Simulate successful send
    1112            1 :         *guard = Some(ComputeRemoteState {
    1113            1 :             request,
    1114            1 :             applied: true,
    1115            1 :         });
    1116            1 :         drop(guard);
    1117              : 
    1118              :         // Try asking again: this should be a no-op
    1119            1 :         let send_result = tenant_state.maybe_send(tenant_id, None);
    1120            1 :         assert!(matches!(send_result, MaybeSendResult::Noop));
    1121              : 
    1122              :         // Writing the first shard of a multi-sharded situation (i.e. in a split)
    1123              :         // resets the tenant state and puts it in an non-notifying state (need to
    1124              :         // see all shards)
    1125            1 :         tenant_state.update(ShardUpdate {
    1126            1 :             tenant_shard_id: TenantShardId {
    1127            1 :                 tenant_id,
    1128            1 :                 shard_count: ShardCount::new(2),
    1129            1 :                 shard_number: ShardNumber(1),
    1130            1 :             },
    1131            1 :             stripe_size,
    1132            1 :             preferred_az: None,
    1133            1 :             node_id: NodeId(1),
    1134            1 :         });
    1135            1 :         assert!(matches!(
    1136            1 :             tenant_state.maybe_send(tenant_id, None),
    1137              :             MaybeSendResult::Noop
    1138              :         ));
    1139              : 
    1140              :         // Writing the second shard makes it ready to notify
    1141            1 :         tenant_state.update(ShardUpdate {
    1142            1 :             tenant_shard_id: TenantShardId {
    1143            1 :                 tenant_id,
    1144            1 :                 shard_count: ShardCount::new(2),
    1145            1 :                 shard_number: ShardNumber(0),
    1146            1 :             },
    1147            1 :             stripe_size,
    1148            1 :             preferred_az: None,
    1149            1 :             node_id: NodeId(1),
    1150            1 :         });
    1151              : 
    1152            1 :         let send_result = tenant_state.maybe_send(tenant_id, None);
    1153            1 :         let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
    1154            0 :             anyhow::bail!("Wrong send result");
    1155              :         };
    1156            1 :         assert_eq!(request.shards.len(), 2);
    1157            1 :         assert_eq!(request.stripe_size, Some(stripe_size));
    1158              : 
    1159              :         // Simulate successful send
    1160            1 :         *guard = Some(ComputeRemoteState {
    1161            1 :             request,
    1162            1 :             applied: true,
    1163            1 :         });
    1164            1 :         drop(guard);
    1165              : 
    1166            1 :         Ok(())
    1167            1 :     }
    1168              : }
        

Generated by: LCOV version 2.1-beta