LCOV - code coverage report
Current view: top level - control_plane/attachment_service/src - service.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 85.2 % 1482 1262
Test Date: 2024-02-14 18:05:35 Functions: 76.5 % 132 101

            Line data    Source code
       1              : use std::{
       2              :     cmp::Ordering,
       3              :     collections::{BTreeMap, HashMap, HashSet},
       4              :     str::FromStr,
       5              :     sync::Arc,
       6              :     time::{Duration, Instant},
       7              : };
       8              : 
       9              : use control_plane::attachment_service::{
      10              :     AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, NodeAvailability,
      11              :     NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, TenantCreateResponse,
      12              :     TenantCreateResponseShard, TenantLocateResponse, TenantLocateResponseShard,
      13              :     TenantShardMigrateRequest, TenantShardMigrateResponse,
      14              : };
      15              : use diesel::result::DatabaseErrorKind;
      16              : use futures::StreamExt;
      17              : use hyper::StatusCode;
      18              : use pageserver_api::{
      19              :     control_api::{
      20              :         ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
      21              :         ValidateResponse, ValidateResponseTenant,
      22              :     },
      23              :     models,
      24              :     models::{
      25              :         LocationConfig, LocationConfigMode, ShardParameters, TenantConfig, TenantCreateRequest,
      26              :         TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation,
      27              :         TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
      28              :     },
      29              :     shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
      30              : };
      31              : use pageserver_client::mgmt_api;
      32              : use tokio_util::sync::CancellationToken;
      33              : use utils::{
      34              :     backoff,
      35              :     completion::Barrier,
      36              :     generation::Generation,
      37              :     http::error::ApiError,
      38              :     id::{NodeId, TenantId, TimelineId},
      39              :     seqwait::SeqWait,
      40              : };
      41              : 
      42              : use crate::{
      43              :     compute_hook::{self, ComputeHook},
      44              :     node::Node,
      45              :     persistence::{
      46              :         split_state::SplitState, DatabaseError, NodePersistence, Persistence,
      47              :         TenantShardPersistence,
      48              :     },
      49              :     reconciler::attached_location_conf,
      50              :     scheduler::Scheduler,
      51              :     tenant_state::{
      52              :         IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
      53              :         ReconcilerWaiter, TenantState,
      54              :     },
      55              :     PlacementPolicy, Sequence,
      56              : };
      57              : 
      58              : const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
      59              : 
      60              : /// How long [`Service::startup_reconcile`] is allowed to take before it should give
      61              : /// up on unresponsive pageservers and proceed.
      62              : pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
      63              : 
      64              : // Top level state available to all HTTP handlers
      65              : struct ServiceState {
      66              :     tenants: BTreeMap<TenantShardId, TenantState>,
      67              : 
      68              :     nodes: Arc<HashMap<NodeId, Node>>,
      69              : 
      70              :     compute_hook: Arc<ComputeHook>,
      71              : 
      72              :     result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
      73              : }
      74              : 
      75              : impl ServiceState {
      76          366 :     fn new(
      77          366 :         config: Config,
      78          366 :         result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
      79          366 :         nodes: HashMap<NodeId, Node>,
      80          366 :         tenants: BTreeMap<TenantShardId, TenantState>,
      81          366 :     ) -> Self {
      82          366 :         Self {
      83          366 :             tenants,
      84          366 :             nodes: Arc::new(nodes),
      85          366 :             compute_hook: Arc::new(ComputeHook::new(config)),
      86          366 :             result_tx,
      87          366 :         }
      88          366 :     }
      89              : }
      90              : 
      91          869 : #[derive(Clone)]
      92              : pub struct Config {
      93              :     // All pageservers managed by one instance of this service must have
      94              :     // the same public key.  This JWT token will be used to authenticate
      95              :     // this service to the pageservers it manages.
      96              :     pub jwt_token: Option<String>,
      97              : 
      98              :     // This JWT token will be used to authenticate this service to the control plane.
      99              :     pub control_plane_jwt_token: Option<String>,
     100              : 
     101              :     /// Where the compute hook should send notifications of pageserver attachment locations
     102              :     /// (this URL points to the control plane in prod). If this is None, the compute hook will
     103              :     /// assume it is running in a test environment and try to update neon_local.
     104              :     pub compute_hook_url: Option<String>,
     105              : }
     106              : 
     107              : impl From<DatabaseError> for ApiError {
     108            0 :     fn from(err: DatabaseError) -> ApiError {
     109            0 :         match err {
     110            0 :             DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
     111              :             // FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
     112              :             DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
     113            0 :                 ApiError::ShuttingDown
     114              :             }
     115            0 :             DatabaseError::Logical(reason) => {
     116            0 :                 ApiError::InternalServerError(anyhow::anyhow!(reason))
     117              :             }
     118              :         }
     119            0 :     }
     120              : }
     121              : 
     122              : pub struct Service {
     123              :     inner: Arc<std::sync::RwLock<ServiceState>>,
     124              :     config: Config,
     125              :     persistence: Arc<Persistence>,
     126              : 
     127              :     /// This waits for initial reconciliation with pageservers to complete.  Until this barrier
     128              :     /// passes, it isn't safe to do any actions that mutate tenants.
     129              :     pub(crate) startup_complete: Barrier,
     130              : }
     131              : 
     132              : impl From<ReconcileWaitError> for ApiError {
     133            1 :     fn from(value: ReconcileWaitError) -> Self {
     134            1 :         match value {
     135            0 :             ReconcileWaitError::Shutdown => ApiError::ShuttingDown,
     136            0 :             e @ ReconcileWaitError::Timeout(_) => ApiError::Timeout(format!("{e}").into()),
     137            1 :             e @ ReconcileWaitError::Failed(..) => ApiError::InternalServerError(anyhow::anyhow!(e)),
     138              :         }
     139            1 :     }
     140              : }
     141              : 
     142              : impl Service {
     143            5 :     pub fn get_config(&self) -> &Config {
     144            5 :         &self.config
     145            5 :     }
     146              : 
     147              :     /// TODO: don't allow other API calls until this is done, don't start doing any background housekeeping
     148              :     /// until this is done.
     149          366 :     async fn startup_reconcile(&self) {
     150          366 :         // For all tenant shards, a vector of observed states on nodes (where None means
     151          366 :         // indeterminate, same as in [`ObservedStateLocation`])
     152          366 :         let mut observed = HashMap::new();
     153          366 : 
     154          366 :         let mut nodes_online = HashSet::new();
     155          366 : 
     156          366 :         // TODO: give Service a cancellation token for clean shutdown
     157          366 :         let cancel = CancellationToken::new();
     158          366 : 
     159          366 :         // TODO: issue these requests concurrently
     160          366 :         {
     161          366 :             let nodes = {
     162          366 :                 let locked = self.inner.read().unwrap();
     163          366 :                 locked.nodes.clone()
     164              :             };
     165          366 :             for node in nodes.values() {
     166           10 :                 let http_client = reqwest::ClientBuilder::new()
     167           10 :                     .timeout(Duration::from_secs(5))
     168           10 :                     .build()
     169           10 :                     .expect("Failed to construct HTTP client");
     170           10 :                 let client = mgmt_api::Client::from_client(
     171           10 :                     http_client,
     172           10 :                     node.base_url(),
     173           10 :                     self.config.jwt_token.as_deref(),
     174           10 :                 );
     175              : 
     176           12 :                 fn is_fatal(e: &mgmt_api::Error) -> bool {
     177           12 :                     use mgmt_api::Error::*;
     178           12 :                     match e {
     179           12 :                         ReceiveBody(_) | ReceiveErrorBody(_) => false,
     180              :                         ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
     181              :                         | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
     182            0 :                         | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
     183            0 :                         ApiError(_, _) => true,
     184              :                     }
     185           12 :                 }
     186              : 
     187           10 :                 let list_response = backoff::retry(
     188           18 :                     || client.list_location_config(),
     189           10 :                     is_fatal,
     190           10 :                     1,
     191           10 :                     5,
     192           10 :                     "Location config listing",
     193           10 :                     &cancel,
     194           10 :                 )
     195           52 :                 .await;
     196            7 :                 let Some(list_response) = list_response else {
     197            0 :                     tracing::info!("Shutdown during startup_reconcile");
     198            0 :                     return;
     199              :                 };
     200              : 
     201            7 :                 tracing::info!("Scanning shards on node {}...", node.id);
     202            7 :                 match list_response {
     203            1 :                     Err(e) => {
     204            1 :                         tracing::warn!("Could not contact pageserver {} ({e})", node.id);
     205              :                         // TODO: be more tolerant, do some retries, in case
     206              :                         // pageserver is being restarted at the same time as we are
     207              :                     }
     208            6 :                     Ok(listing) => {
     209            6 :                         tracing::info!(
     210            6 :                             "Received {} shard statuses from pageserver {}, setting it to Active",
     211            6 :                             listing.tenant_shards.len(),
     212            6 :                             node.id
     213            6 :                         );
     214            6 :                         nodes_online.insert(node.id);
     215              : 
     216           11 :                         for (tenant_shard_id, conf_opt) in listing.tenant_shards {
     217            5 :                             observed.insert(tenant_shard_id, (node.id, conf_opt));
     218            5 :                         }
     219              :                     }
     220              :                 }
     221              :             }
     222              :         }
     223              : 
     224          363 :         let mut cleanup = Vec::new();
     225          363 : 
     226          363 :         let mut compute_notifications = Vec::new();
     227              : 
     228              :         // Populate intent and observed states for all tenants, based on reported state on pageservers
     229          363 :         let (shard_count, nodes) = {
     230          363 :             let mut locked = self.inner.write().unwrap();
     231          363 : 
     232          363 :             // Mark nodes online if they responded to us: nodes are offline by default after a restart.
     233          363 :             let mut nodes = (*locked.nodes).clone();
     234          363 :             for (node_id, node) in nodes.iter_mut() {
     235            7 :                 if nodes_online.contains(node_id) {
     236            6 :                     node.availability = NodeAvailability::Active;
     237            6 :                 }
     238              :             }
     239          363 :             locked.nodes = Arc::new(nodes);
     240          363 :             let nodes = locked.nodes.clone();
     241              : 
     242          368 :             for (tenant_shard_id, (node_id, observed_loc)) in observed {
     243            5 :                 let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else {
     244            0 :                     cleanup.push((tenant_shard_id, node_id));
     245            0 :                     continue;
     246              :                 };
     247              : 
     248            5 :                 tenant_state
     249            5 :                     .observed
     250            5 :                     .locations
     251            5 :                     .insert(node_id, ObservedStateLocation { conf: observed_loc });
     252              :             }
     253              : 
     254              :             // Populate each tenant's intent state
     255          363 :             let mut scheduler = Scheduler::new(&locked.tenants, &nodes);
     256          363 :             for (tenant_shard_id, tenant_state) in locked.tenants.iter_mut() {
     257            8 :                 tenant_state.intent_from_observed();
     258            8 :                 if let Err(e) = tenant_state.schedule(&mut scheduler) {
     259              :                     // Non-fatal error: we are unable to properly schedule the tenant, perhaps because
     260              :                     // not enough pageservers are available.  The tenant may well still be available
     261              :                     // to clients.
     262            0 :                     tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}");
     263              :                 } else {
     264              :                     // If we're both intending and observed to be attached at a particular node, we will
     265              :                     // emit a compute notification for this. In the case where our observed state does not
     266              :                     // yet match our intent, we will eventually reconcile, and that will emit a compute notification.
     267            8 :                     if let Some(attached_at) = tenant_state.stably_attached() {
     268            5 :                         compute_notifications.push((*tenant_shard_id, attached_at));
     269            5 :                     }
     270              :                 }
     271              :             }
     272              : 
     273          363 :             (locked.tenants.len(), nodes)
     274              :         };
     275              : 
     276              :         // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
     277              :         // generation_pageserver in the database.
     278              : 
     279              :         // Clean up any tenants that were found on pageservers but are not known to us.
     280          363 :         for (tenant_shard_id, node_id) in cleanup {
     281              :             // A node reported a tenant_shard_id which is unknown to us: detach it.
     282            0 :             let node = nodes
     283            0 :                 .get(&node_id)
     284            0 :                 .expect("Always exists: only known nodes are scanned");
     285            0 : 
     286            0 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
     287            0 :             match client
     288            0 :                 .location_config(
     289            0 :                     tenant_shard_id,
     290            0 :                     LocationConfig {
     291            0 :                         mode: LocationConfigMode::Detached,
     292            0 :                         generation: None,
     293            0 :                         secondary_conf: None,
     294            0 :                         shard_number: tenant_shard_id.shard_number.0,
     295            0 :                         shard_count: tenant_shard_id.shard_count.0,
     296            0 :                         shard_stripe_size: 0,
     297            0 :                         tenant_conf: models::TenantConfig::default(),
     298            0 :                     },
     299            0 :                     None,
     300            0 :                 )
     301            0 :                 .await
     302              :             {
     303              :                 Ok(()) => {
     304            0 :                     tracing::info!(
     305            0 :                         "Detached unknown shard {tenant_shard_id} on pageserver {node_id}"
     306            0 :                     );
     307              :                 }
     308            0 :                 Err(e) => {
     309              :                     // Non-fatal error: leaving a tenant shard behind that we are not managing shouldn't
     310              :                     // break anything.
     311            0 :                     tracing::error!(
     312            0 :                         "Failed to detach unknkown shard {tenant_shard_id} on pageserver {node_id}: {e}"
     313            0 :                     );
     314              :                 }
     315              :             }
     316              :         }
     317              : 
     318              :         // Emit compute hook notifications for all tenants which are already stably attached.  Other tenants
     319              :         // will emit compute hook notifications when they reconcile.
     320              :         //
     321              :         // Ordering: we must complete these notification attempts before doing any other reconciliation for the
     322              :         // tenants named here, because otherwise our calls to notify() might race with more recent values
     323              :         // generated by reconciliation.
     324              : 
     325              :         // Compute notify is fallible.  If it fails here, do not delay overall startup: set the
     326              :         // flag on these shards that they have a pending notification.
     327          363 :         let compute_hook = self.inner.read().unwrap().compute_hook.clone();
     328          363 : 
     329          363 :         // Construct an async stream of futures to invoke the compute notify function: we do this
     330          363 :         // in order to subsequently use .buffered() on the stream to execute with bounded parallelism.
     331          363 :         let stream = futures::stream::iter(compute_notifications.into_iter())
     332          363 :             .map(|(tenant_shard_id, node_id)| {
     333            5 :                 let compute_hook = compute_hook.clone();
     334            5 :                 let cancel = cancel.clone();
     335            5 :                 async move {
     336            5 :                     if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
     337            0 :                         tracing::error!(
     338            0 :                             tenant_shard_id=%tenant_shard_id,
     339            0 :                             node_id=%node_id,
     340            0 :                             "Failed to notify compute on startup for shard: {e}"
     341            0 :                         );
     342            0 :                         Some(tenant_shard_id)
     343              :                     } else {
     344            5 :                         None
     345              :                     }
     346            5 :                 }
     347          363 :             })
     348          363 :             .buffered(compute_hook::API_CONCURRENCY);
     349          363 :         let notify_results = stream.collect::<Vec<_>>().await;
     350              : 
     351              :         // Update tenant state for any that failed to do their initial compute notify, so that they'll retry later.
     352              :         {
     353          363 :             let mut locked = self.inner.write().unwrap();
     354          363 :             for tenant_shard_id in notify_results.into_iter().flatten() {
     355            0 :                 if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
     356            0 :                     shard.pending_compute_notification = true;
     357            0 :                 }
     358              :             }
     359              :         }
     360              : 
     361              :         // Finally, now that the service is up and running, launch reconcile operations for any tenants
     362              :         // which require it: under normal circumstances this should only include tenants that were in some
     363              :         // transient state before we restarted, or any tenants whose compute hooks failed above.
     364          363 :         let reconcile_tasks = self.reconcile_all();
     365              :         // We will not wait for these reconciliation tasks to run here: we're now done with startup and
     366              :         // normal operations may proceed.
     367              : 
     368          363 :         tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
     369          363 :     }
     370              : 
     371          366 :     pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
     372          366 :         let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel();
     373              : 
     374          366 :         tracing::info!("Loading nodes from database...");
     375          366 :         let nodes = persistence.list_nodes().await?;
     376          366 :         let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.id, n)).collect();
     377          366 :         tracing::info!("Loaded {} nodes from database.", nodes.len());
     378              : 
     379          366 :         tracing::info!("Loading shards from database...");
     380          728 :         let tenant_shard_persistence = persistence.list_tenant_shards().await?;
     381          366 :         tracing::info!(
     382          366 :             "Loaded {} shards from database.",
     383          366 :             tenant_shard_persistence.len()
     384          366 :         );
     385              : 
     386          366 :         let mut tenants = BTreeMap::new();
     387              : 
     388          378 :         for tsp in tenant_shard_persistence {
     389           12 :             let tenant_shard_id = TenantShardId {
     390           12 :                 tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
     391           12 :                 shard_number: ShardNumber(tsp.shard_number as u8),
     392           12 :                 shard_count: ShardCount(tsp.shard_count as u8),
     393              :             };
     394           12 :             let shard_identity = if tsp.shard_count == 0 {
     395           12 :                 ShardIdentity::unsharded()
     396              :             } else {
     397            0 :                 ShardIdentity::new(
     398            0 :                     ShardNumber(tsp.shard_number as u8),
     399            0 :                     ShardCount(tsp.shard_count as u8),
     400            0 :                     ShardStripeSize(tsp.shard_stripe_size as u32),
     401            0 :                 )?
     402              :             };
     403              : 
     404              :             // We will populate intent properly later in [`Self::startup_reconcile`], initially populate
     405              :             // it with what we can infer: the node for which a generation was most recently issued.
     406           12 :             let mut intent = IntentState::new();
     407           12 :             if tsp.generation_pageserver != i64::MAX {
     408           11 :                 intent.attached = Some(NodeId(tsp.generation_pageserver as u64))
     409            1 :             }
     410              : 
     411           12 :             let new_tenant = TenantState {
     412           12 :                 tenant_shard_id,
     413           12 :                 shard: shard_identity,
     414           12 :                 sequence: Sequence::initial(),
     415           12 :                 generation: Generation::new(tsp.generation as u32),
     416           12 :                 policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
     417           12 :                 intent,
     418           12 :                 observed: ObservedState::new(),
     419           12 :                 config: serde_json::from_str(&tsp.config).unwrap(),
     420           12 :                 reconciler: None,
     421           12 :                 waiter: Arc::new(SeqWait::new(Sequence::initial())),
     422           12 :                 error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
     423           12 :                 last_error: Arc::default(),
     424           12 :                 pending_compute_notification: false,
     425           12 :             };
     426           12 : 
     427           12 :             tenants.insert(tenant_shard_id, new_tenant);
     428              :         }
     429              : 
     430          366 :         let (startup_completion, startup_complete) = utils::completion::channel();
     431          366 : 
     432          366 :         let this = Arc::new(Self {
     433          366 :             inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
     434          366 :                 config.clone(),
     435          366 :                 result_tx,
     436          366 :                 nodes,
     437          366 :                 tenants,
     438          366 :             ))),
     439          366 :             config,
     440          366 :             persistence,
     441          366 :             startup_complete: startup_complete.clone(),
     442          366 :         });
     443          366 : 
     444          366 :         let result_task_this = this.clone();
     445          366 :         tokio::task::spawn(async move {
     446          867 :             while let Some(result) = result_rx.recv().await {
     447          501 :                 tracing::info!(
     448          501 :                     "Reconcile result for sequence {}, ok={}",
     449          501 :                     result.sequence,
     450          501 :                     result.result.is_ok()
     451          501 :                 );
     452          501 :                 let mut locked = result_task_this.inner.write().unwrap();
     453          501 :                 let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
     454              :                     // A reconciliation result might race with removing a tenant: drop results for
     455              :                     // tenants that aren't in our map.
     456            0 :                     continue;
     457              :                 };
     458              : 
     459              :                 // Usually generation should only be updated via this path, so the max() isn't
     460              :                 // needed, but it is used to handle out-of-band updates via. e.g. test hook.
     461          501 :                 tenant.generation = std::cmp::max(tenant.generation, result.generation);
     462          501 : 
     463          501 :                 // If the reconciler signals that it failed to notify compute, set this state on
     464          501 :                 // the shard so that a future [`TenantState::maybe_reconcile`] will try again.
     465          501 :                 tenant.pending_compute_notification = result.pending_compute_notification;
     466          501 : 
     467          501 :                 match result.result {
     468              :                     Ok(()) => {
     469         1009 :                         for (node_id, loc) in &result.observed.locations {
     470          509 :                             if let Some(conf) = &loc.conf {
     471          509 :                                 tracing::info!(
     472          509 :                                     "Updating observed location {}: {:?}",
     473          509 :                                     node_id,
     474          509 :                                     conf
     475          509 :                                 );
     476              :                             } else {
     477            0 :                                 tracing::info!("Setting observed location {} to None", node_id,)
     478              :                             }
     479              :                         }
     480          500 :                         tenant.observed = result.observed;
     481          500 :                         tenant.waiter.advance(result.sequence);
     482              :                     }
     483            1 :                     Err(e) => {
     484            1 :                         tracing::warn!(
     485            1 :                             "Reconcile error on tenant {}: {}",
     486            1 :                             tenant.tenant_shard_id,
     487            1 :                             e
     488            1 :                         );
     489              : 
     490              :                         // Ordering: populate last_error before advancing error_seq,
     491              :                         // so that waiters will see the correct error after waiting.
     492            1 :                         *(tenant.last_error.lock().unwrap()) = format!("{e}");
     493            1 :                         tenant.error_waiter.advance(result.sequence);
     494              : 
     495            2 :                         for (node_id, o) in result.observed.locations {
     496            1 :                             tenant.observed.locations.insert(node_id, o);
     497            1 :                         }
     498              :                     }
     499              :                 }
     500              :             }
     501          366 :         });
     502          366 : 
     503          366 :         let startup_reconcile_this = this.clone();
     504          366 :         tokio::task::spawn(async move {
     505          366 :             // Block the [`Service::startup_complete`] barrier until we're done
     506          366 :             let _completion = startup_completion;
     507          366 : 
     508          366 :             startup_reconcile_this.startup_reconcile().await
     509          366 :         });
     510          366 : 
     511          366 :         Ok(this)
     512          366 :     }
     513              : 
     514          207 :     pub(crate) async fn attach_hook(
     515          207 :         &self,
     516          207 :         attach_req: AttachHookRequest,
     517          207 :     ) -> anyhow::Result<AttachHookResponse> {
     518          207 :         // This is a test hook.  To enable using it on tenants that were created directly with
     519          207 :         // the pageserver API (not via this service), we will auto-create any missing tenant
     520          207 :         // shards with default state.
     521          207 :         let insert = {
     522          207 :             let locked = self.inner.write().unwrap();
     523          207 :             !locked.tenants.contains_key(&attach_req.tenant_shard_id)
     524          207 :         };
     525          207 :         if insert {
     526           15 :             let tsp = TenantShardPersistence {
     527           15 :                 tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
     528           15 :                 shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
     529           15 :                 shard_count: attach_req.tenant_shard_id.shard_count.0 as i32,
     530           15 :                 shard_stripe_size: 0,
     531           15 :                 generation: 0,
     532           15 :                 generation_pageserver: i64::MAX,
     533           15 :                 placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
     534           15 :                 config: serde_json::to_string(&TenantConfig::default()).unwrap(),
     535           15 :                 splitting: SplitState::default(),
     536           15 :             };
     537           15 : 
     538           16 :             match self.persistence.insert_tenant_shards(vec![tsp]).await {
     539            2 :                 Err(e) => match e {
     540              :                     DatabaseError::Query(diesel::result::Error::DatabaseError(
     541              :                         DatabaseErrorKind::UniqueViolation,
     542              :                         _,
     543              :                     )) => {
     544            2 :                         tracing::info!(
     545            2 :                             "Raced with another request to insert tenant {}",
     546            2 :                             attach_req.tenant_shard_id
     547            2 :                         )
     548              :                     }
     549            0 :                     _ => return Err(e.into()),
     550              :                 },
     551              :                 Ok(()) => {
     552           13 :                     tracing::info!("Inserted shard {} in database", attach_req.tenant_shard_id);
     553              : 
     554           13 :                     let mut locked = self.inner.write().unwrap();
     555           13 :                     locked.tenants.insert(
     556           13 :                         attach_req.tenant_shard_id,
     557           13 :                         TenantState::new(
     558           13 :                             attach_req.tenant_shard_id,
     559           13 :                             ShardIdentity::unsharded(),
     560           13 :                             PlacementPolicy::Single,
     561           13 :                         ),
     562           13 :                     );
     563           13 :                     tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
     564              :                 }
     565              :             }
     566          192 :         }
     567              : 
     568          207 :         let new_generation = if let Some(req_node_id) = attach_req.node_id {
     569              :             Some(
     570          200 :                 self.persistence
     571          200 :                     .increment_generation(attach_req.tenant_shard_id, req_node_id)
     572          216 :                     .await?,
     573              :             )
     574              :         } else {
     575            7 :             self.persistence.detach(attach_req.tenant_shard_id).await?;
     576            7 :             None
     577              :         };
     578              : 
     579          207 :         let mut locked = self.inner.write().unwrap();
     580          207 :         let tenant_state = locked
     581          207 :             .tenants
     582          207 :             .get_mut(&attach_req.tenant_shard_id)
     583          207 :             .expect("Checked for existence above");
     584              : 
     585          207 :         if let Some(new_generation) = new_generation {
     586          200 :             tenant_state.generation = new_generation;
     587          200 :         } else {
     588              :             // This is a detach notification.  We must update placement policy to avoid re-attaching
     589              :             // during background scheduling/reconciliation, or during attachment service restart.
     590            7 :             assert!(attach_req.node_id.is_none());
     591            7 :             tenant_state.policy = PlacementPolicy::Detached;
     592              :         }
     593              : 
     594          207 :         if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
     595          200 :             tracing::info!(
     596          200 :                 tenant_id = %attach_req.tenant_shard_id,
     597          200 :                 ps_id = %attaching_pageserver,
     598          200 :                 generation = ?tenant_state.generation,
     599          200 :                 "issuing",
     600          200 :             );
     601            7 :         } else if let Some(ps_id) = tenant_state.intent.attached {
     602            7 :             tracing::info!(
     603            7 :                 tenant_id = %attach_req.tenant_shard_id,
     604            7 :                 %ps_id,
     605            7 :                 generation = ?tenant_state.generation,
     606            7 :                 "dropping",
     607            7 :             );
     608              :         } else {
     609            0 :             tracing::info!(
     610            0 :             tenant_id = %attach_req.tenant_shard_id,
     611            0 :             "no-op: tenant already has no pageserver");
     612              :         }
     613          207 :         tenant_state.intent.attached = attach_req.node_id;
     614              : 
     615          207 :         tracing::info!(
     616          207 :             "attach_hook: tenant {} set generation {:?}, pageserver {}",
     617          207 :             attach_req.tenant_shard_id,
     618          207 :             tenant_state.generation,
     619          207 :             // TODO: this is an odd number of 0xf's
     620          207 :             attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
     621          207 :         );
     622              : 
     623          207 :         Ok(AttachHookResponse {
     624          207 :             gen: attach_req
     625          207 :                 .node_id
     626          207 :                 .map(|_| tenant_state.generation.into().unwrap()),
     627          207 :         })
     628          207 :     }
     629              : 
     630           73 :     pub(crate) fn inspect(&self, inspect_req: InspectRequest) -> InspectResponse {
     631           73 :         let locked = self.inner.read().unwrap();
     632           73 : 
     633           73 :         let tenant_state = locked.tenants.get(&inspect_req.tenant_shard_id);
     634           73 : 
     635           73 :         InspectResponse {
     636           73 :             attachment: tenant_state.and_then(|s| {
     637           73 :                 s.intent
     638           73 :                     .attached
     639           73 :                     .map(|ps| (s.generation.into().unwrap(), ps))
     640           73 :             }),
     641           73 :         }
     642           73 :     }
     643              : 
     644          624 :     pub(crate) async fn re_attach(
     645          624 :         &self,
     646          624 :         reattach_req: ReAttachRequest,
     647          624 :     ) -> anyhow::Result<ReAttachResponse> {
     648              :         // Ordering: we must persist generation number updates before making them visible in the in-memory state
     649          624 :         let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;
     650              : 
     651              :         // Apply the updated generation to our in-memory state
     652          624 :         let mut locked = self.inner.write().unwrap();
     653          624 : 
     654          624 :         let mut response = ReAttachResponse {
     655          624 :             tenants: Vec::new(),
     656          624 :         };
     657              : 
     658          863 :         for (tenant_shard_id, new_gen) in incremented_generations {
     659          239 :             response.tenants.push(ReAttachResponseTenant {
     660          239 :                 id: tenant_shard_id,
     661          239 :                 gen: new_gen.into().unwrap(),
     662          239 :             });
     663          239 : 
     664          239 :             // Apply the new generation number to our in-memory state
     665          239 :             let shard_state = locked.tenants.get_mut(&tenant_shard_id);
     666          239 :             let Some(shard_state) = shard_state else {
     667              :                 // Not fatal.  This edge case requires a re-attach to happen
     668              :                 // between inserting a new tenant shard in to the database, and updating our in-memory
     669              :                 // state to know about the shard, _and_ that the state inserted to the database referenced
     670              :                 // a pageserver.  Should never happen, but handle it rather than panicking, since it should
     671              :                 // be harmless.
     672            0 :                 tracing::error!(
     673            0 :                     "Shard {} is in database for node {} but not in-memory state",
     674            0 :                     tenant_shard_id,
     675            0 :                     reattach_req.node_id
     676            0 :                 );
     677            0 :                 continue;
     678              :             };
     679              : 
     680          239 :             shard_state.generation = std::cmp::max(shard_state.generation, new_gen);
     681              : 
     682              :             // TODO: cancel/restart any running reconciliation for this tenant, it might be trying
     683              :             // to call location_conf API with an old generation.  Wait for cancellation to complete
     684              :             // before responding to this request.  Requires well implemented CancellationToken logic
     685              :             // all the way to where we call location_conf.  Even then, there can still be a location_conf
     686              :             // request in flight over the network: TODO handle that by making location_conf API refuse
     687              :             // to go backward in generations.
     688              :         }
     689          624 :         Ok(response)
     690          624 :     }
     691              : 
     692          441 :     pub(crate) fn validate(&self, validate_req: ValidateRequest) -> ValidateResponse {
     693          441 :         let locked = self.inner.read().unwrap();
     694          441 : 
     695          441 :         let mut response = ValidateResponse {
     696          441 :             tenants: Vec::new(),
     697          441 :         };
     698              : 
     699          970 :         for req_tenant in validate_req.tenants {
     700          529 :             if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) {
     701          528 :                 let valid = tenant_state.generation == Generation::new(req_tenant.gen);
     702          528 :                 tracing::info!(
     703          528 :                     "handle_validate: {}(gen {}): valid={valid} (latest {:?})",
     704          528 :                     req_tenant.id,
     705          528 :                     req_tenant.gen,
     706          528 :                     tenant_state.generation
     707          528 :                 );
     708          528 :                 response.tenants.push(ValidateResponseTenant {
     709          528 :                     id: req_tenant.id,
     710          528 :                     valid,
     711          528 :                 });
     712            1 :             } else {
     713            1 :                 // After tenant deletion, we may approve any validation.  This avoids
     714            1 :                 // spurious warnings on the pageserver if it has pending LSN updates
     715            1 :                 // at the point a deletion happens.
     716            1 :                 response.tenants.push(ValidateResponseTenant {
     717            1 :                     id: req_tenant.id,
     718            1 :                     valid: true,
     719            1 :                 });
     720            1 :             }
     721              :         }
     722          441 :         response
     723          441 :     }
     724              : 
     725          464 :     pub(crate) async fn tenant_create(
     726          464 :         &self,
     727          464 :         create_req: TenantCreateRequest,
     728          464 :     ) -> Result<TenantCreateResponse, ApiError> {
     729              :         // Shard count 0 is valid: it means create a single shard (ShardCount(0) means "unsharded")
     730          464 :         let literal_shard_count = if create_req.shard_parameters.is_unsharded() {
     731          446 :             1
     732              :         } else {
     733           18 :             create_req.shard_parameters.count.0
     734              :         };
     735              : 
     736              :         // This service expects to handle sharding itself: it is an error to try and directly create
     737              :         // a particular shard here.
     738          464 :         let tenant_id = if create_req.new_tenant_id.shard_count > ShardCount(1) {
     739            0 :             return Err(ApiError::BadRequest(anyhow::anyhow!(
     740            0 :                 "Attempted to create a specific shard, this API is for creating the whole tenant"
     741            0 :             )));
     742              :         } else {
     743          464 :             create_req.new_tenant_id.tenant_id
     744              :         };
     745              : 
     746          464 :         tracing::info!(
     747          464 :             "Creating tenant {}, shard_count={:?}",
     748          464 :             create_req.new_tenant_id,
     749          464 :             create_req.shard_parameters.count,
     750          464 :         );
     751              : 
     752          464 :         let create_ids = (0..literal_shard_count)
     753          492 :             .map(|i| TenantShardId {
     754          492 :                 tenant_id,
     755          492 :                 shard_number: ShardNumber(i),
     756          492 :                 shard_count: create_req.shard_parameters.count,
     757          492 :             })
     758          464 :             .collect::<Vec<_>>();
     759          464 : 
     760          464 :         // TODO: enable specifying this.  Using Single as a default helps legacy tests to work (they
     761          464 :         // have no expectation of HA).
     762          464 :         let placement_policy: PlacementPolicy = PlacementPolicy::Single;
     763          464 : 
     764          464 :         // Ordering: we persist tenant shards before creating them on the pageserver.  This enables a caller
     765          464 :         // to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
     766          464 :         // during the creation, rather than risking leaving orphan objects in S3.
     767          464 :         let persist_tenant_shards = create_ids
     768          464 :             .iter()
     769          492 :             .map(|tenant_shard_id| TenantShardPersistence {
     770          492 :                 tenant_id: tenant_shard_id.tenant_id.to_string(),
     771          492 :                 shard_number: tenant_shard_id.shard_number.0 as i32,
     772          492 :                 shard_count: tenant_shard_id.shard_count.0 as i32,
     773          492 :                 shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32,
     774          492 :                 generation: create_req.generation.map(|g| g as i32).unwrap_or(0),
     775          492 :                 generation_pageserver: i64::MAX,
     776          492 :                 placement_policy: serde_json::to_string(&placement_policy).unwrap(),
     777          492 :                 config: serde_json::to_string(&create_req.config).unwrap(),
     778          492 :                 splitting: SplitState::default(),
     779          492 :             })
     780          464 :             .collect();
     781          464 :         self.persistence
     782          464 :             .insert_tenant_shards(persist_tenant_shards)
     783          464 :             .await
     784          464 :             .map_err(|e| {
     785            0 :                 // TODO: distinguish primary key constraint (idempotent, OK), from other errors
     786            0 :                 ApiError::InternalServerError(anyhow::anyhow!(e))
     787          464 :             })?;
     788              : 
     789          464 :         let (waiters, response_shards) = {
     790          464 :             let mut locked = self.inner.write().unwrap();
     791          464 : 
     792          464 :             let mut response_shards = Vec::new();
     793          464 : 
     794          464 :             let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
     795              : 
     796          956 :             for tenant_shard_id in create_ids {
     797          492 :                 tracing::info!("Creating shard {tenant_shard_id}...");
     798              : 
     799              :                 use std::collections::btree_map::Entry;
     800          492 :                 match locked.tenants.entry(tenant_shard_id) {
     801            0 :                     Entry::Occupied(mut entry) => {
     802            0 :                         tracing::info!(
     803            0 :                             "Tenant shard {tenant_shard_id} already exists while creating"
     804            0 :                         );
     805              : 
     806              :                         // TODO: schedule() should take an anti-affinity expression that pushes
     807              :                         // attached and secondary locations (independently) away frorm those
     808              :                         // pageservers also holding a shard for this tenant.
     809              : 
     810            0 :                         entry.get_mut().schedule(&mut scheduler).map_err(|e| {
     811            0 :                             ApiError::Conflict(format!(
     812            0 :                                 "Failed to schedule shard {tenant_shard_id}: {e}"
     813            0 :                             ))
     814            0 :                         })?;
     815              : 
     816            0 :                         response_shards.push(TenantCreateResponseShard {
     817            0 :                             shard_id: tenant_shard_id,
     818            0 :                             node_id: entry
     819            0 :                                 .get()
     820            0 :                                 .intent
     821            0 :                                 .attached
     822            0 :                                 .expect("We just set pageserver if it was None"),
     823            0 :                             generation: entry.get().generation.into().unwrap(),
     824            0 :                         });
     825            0 : 
     826            0 :                         continue;
     827              :                     }
     828          492 :                     Entry::Vacant(entry) => {
     829          492 :                         let mut state = TenantState::new(
     830          492 :                             tenant_shard_id,
     831          492 :                             ShardIdentity::from_params(
     832          492 :                                 tenant_shard_id.shard_number,
     833          492 :                                 &create_req.shard_parameters,
     834          492 :                             ),
     835          492 :                             placement_policy.clone(),
     836          492 :                         );
     837              : 
     838          492 :                         if let Some(create_gen) = create_req.generation {
     839            1 :                             state.generation = Generation::new(create_gen);
     840          491 :                         }
     841          492 :                         state.config = create_req.config.clone();
     842          492 : 
     843          492 :                         state.schedule(&mut scheduler).map_err(|e| {
     844            0 :                             ApiError::Conflict(format!(
     845            0 :                                 "Failed to schedule shard {tenant_shard_id}: {e}"
     846            0 :                             ))
     847          492 :                         })?;
     848              : 
     849          492 :                         response_shards.push(TenantCreateResponseShard {
     850          492 :                             shard_id: tenant_shard_id,
     851          492 :                             node_id: state
     852          492 :                                 .intent
     853          492 :                                 .attached
     854          492 :                                 .expect("We just set pageserver if it was None"),
     855          492 :                             generation: state.generation.into().unwrap(),
     856          492 :                         });
     857          492 :                         entry.insert(state)
     858              :                     }
     859              :                 };
     860              :             }
     861              : 
     862              :             // Take a snapshot of pageservers
     863          464 :             let pageservers = locked.nodes.clone();
     864          464 : 
     865          464 :             let result_tx = locked.result_tx.clone();
     866          464 :             let compute_hook = locked.compute_hook.clone();
     867          464 : 
     868          464 :             let waiters = locked
     869          464 :                 .tenants
     870          464 :                 .range_mut(TenantShardId::tenant_range(tenant_id))
     871          492 :                 .filter_map(|(_shard_id, shard)| {
     872          492 :                     shard.maybe_reconcile(
     873          492 :                         result_tx.clone(),
     874          492 :                         &pageservers,
     875          492 :                         &compute_hook,
     876          492 :                         &self.config,
     877          492 :                         &self.persistence,
     878          492 :                     )
     879          492 :                 })
     880          464 :                 .collect::<Vec<_>>();
     881          464 :             (waiters, response_shards)
     882          464 :         };
     883          464 : 
     884          473 :         self.await_waiters(waiters).await?;
     885              : 
     886          463 :         Ok(TenantCreateResponse {
     887          463 :             shards: response_shards,
     888          463 :         })
     889          464 :     }
     890              : 
     891              :     /// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
     892              :     /// wait for reconciliation to complete before responding.
     893          465 :     async fn await_waiters(
     894          465 :         &self,
     895          465 :         waiters: Vec<ReconcilerWaiter>,
     896          465 :     ) -> Result<(), ReconcileWaitError> {
     897          465 :         let deadline = Instant::now().checked_add(Duration::from_secs(30)).unwrap();
     898          956 :         for waiter in waiters {
     899          492 :             let timeout = deadline.duration_since(Instant::now());
     900          492 :             waiter.wait_timeout(timeout).await?;
     901              :         }
     902              : 
     903          464 :         Ok(())
     904          465 :     }
     905              : 
     906              :     /// This API is used by the cloud control plane to do coarse-grained control of tenants:
     907              :     /// - Call with mode Attached* to upsert the tenant.
     908              :     /// - Call with mode Detached to switch to PolicyMode::Detached
     909              :     ///
     910              :     /// In future, calling with mode Secondary may switch to a detach-lite mode in which a tenant only has
     911              :     /// secondary locations.
     912            2 :     pub(crate) async fn tenant_location_config(
     913            2 :         &self,
     914            2 :         tenant_id: TenantId,
     915            2 :         req: TenantLocationConfigRequest,
     916            2 :     ) -> Result<TenantLocationConfigResponse, ApiError> {
     917            2 :         if req.tenant_id.shard_count.0 > 1 {
     918            0 :             return Err(ApiError::BadRequest(anyhow::anyhow!(
     919            0 :                 "This API is for importing single-sharded or unsharded tenants"
     920            0 :             )));
     921            2 :         }
     922            2 : 
     923            2 :         let mut waiters = Vec::new();
     924            2 :         let mut result = TenantLocationConfigResponse { shards: Vec::new() };
     925            2 :         let maybe_create = {
     926            2 :             let mut locked = self.inner.write().unwrap();
     927            2 :             let result_tx = locked.result_tx.clone();
     928            2 :             let compute_hook = locked.compute_hook.clone();
     929            2 :             let pageservers = locked.nodes.clone();
     930            2 : 
     931            2 :             let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
     932            2 : 
     933            2 :             // Maybe we have existing shards
     934            2 :             let mut create = true;
     935            2 :             for (shard_id, shard) in locked
     936            2 :                 .tenants
     937            2 :                 .range_mut(TenantShardId::tenant_range(tenant_id))
     938              :             {
     939              :                 // Saw an existing shard: this is not a creation
     940            1 :                 create = false;
     941            1 : 
     942            1 :                 // Note that for existing tenants we do _not_ respect the generation in the request: this is likely
     943            1 :                 // to be stale.  Once a tenant is created in this service, our view of generation is authoritative, and
     944            1 :                 // callers' generations may be ignored.  This represents a one-way migration of tenants from the outer
     945            1 :                 // cloud control plane into this service.
     946            1 : 
     947            1 :                 // Use location config mode as an indicator of policy: if they ask for
     948            1 :                 // attached we go to default HA attached mode.  If they ask for secondary
     949            1 :                 // we go to secondary-only mode.  If they ask for detached we detach.
     950            1 :                 match req.config.mode {
     951            0 :                     LocationConfigMode::Detached => {
     952            0 :                         shard.policy = PlacementPolicy::Detached;
     953            0 :                     }
     954              :                     LocationConfigMode::Secondary => {
     955              :                         // TODO: implement secondary-only mode.
     956            0 :                         todo!();
     957              :                     }
     958              :                     LocationConfigMode::AttachedMulti
     959              :                     | LocationConfigMode::AttachedSingle
     960              :                     | LocationConfigMode::AttachedStale => {
     961              :                         // TODO: persistence for changes in policy
     962            1 :                         if pageservers.len() > 1 {
     963            0 :                             shard.policy = PlacementPolicy::Double(1)
     964              :                         } else {
     965              :                             // Convenience for dev/test: if we just have one pageserver, import
     966              :                             // tenants into Single mode so that scheduling will succeed.
     967            1 :                             shard.policy = PlacementPolicy::Single
     968              :                         }
     969              :                     }
     970              :                 }
     971              : 
     972            1 :                 shard.schedule(&mut scheduler)?;
     973              : 
     974            1 :                 let maybe_waiter = shard.maybe_reconcile(
     975            1 :                     result_tx.clone(),
     976            1 :                     &pageservers,
     977            1 :                     &compute_hook,
     978            1 :                     &self.config,
     979            1 :                     &self.persistence,
     980            1 :                 );
     981            1 :                 if let Some(waiter) = maybe_waiter {
     982            0 :                     waiters.push(waiter);
     983            1 :                 }
     984              : 
     985            1 :                 if let Some(node_id) = shard.intent.attached {
     986            1 :                     result.shards.push(TenantShardLocation {
     987            1 :                         shard_id: *shard_id,
     988            1 :                         node_id,
     989            1 :                     })
     990            0 :                 }
     991              :             }
     992              : 
     993            2 :             if create {
     994              :                 // Validate request mode
     995            1 :                 match req.config.mode {
     996              :                     LocationConfigMode::Detached | LocationConfigMode::Secondary => {
     997              :                         // When using this API to onboard an existing tenant to this service, it must start in
     998              :                         // an attached state, because we need the request to come with a generation
     999            0 :                         return Err(ApiError::BadRequest(anyhow::anyhow!(
    1000            0 :                             "Imported tenant must be in attached mode"
    1001            0 :                         )));
    1002              :                     }
    1003              : 
    1004              :                     LocationConfigMode::AttachedMulti
    1005              :                     | LocationConfigMode::AttachedSingle
    1006            1 :                     | LocationConfigMode::AttachedStale => {
    1007            1 :                         // Pass
    1008            1 :                     }
    1009              :                 }
    1010              : 
    1011              :                 // Validate request generation
    1012            1 :                 let Some(generation) = req.config.generation else {
    1013              :                     // We can only import attached tenants, because we need the request to come with a generation
    1014            0 :                     return Err(ApiError::BadRequest(anyhow::anyhow!(
    1015            0 :                         "Generation is mandatory when importing tenant"
    1016            0 :                     )));
    1017              :                 };
    1018              : 
    1019              :                 // Synthesize a creation request
    1020            1 :                 Some(TenantCreateRequest {
    1021            1 :                     new_tenant_id: TenantShardId::unsharded(tenant_id),
    1022            1 :                     generation: Some(generation),
    1023            1 :                     shard_parameters: ShardParameters {
    1024            1 :                         // Must preserve the incoming shard_count do distinguish unsharded (0)
    1025            1 :                         // from single-sharded (1): this distinction appears in the S3 keys of the tenant.
    1026            1 :                         count: req.tenant_id.shard_count,
    1027            1 :                         // We only import un-sharded or single-sharded tenants, so stripe
    1028            1 :                         // size can be made up arbitrarily here.
    1029            1 :                         stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE,
    1030            1 :                     },
    1031            1 :                     config: req.config.tenant_conf,
    1032            1 :                 })
    1033              :             } else {
    1034            1 :                 None
    1035              :             }
    1036              :         };
    1037              : 
    1038              :         // TODO: if we timeout/fail on reconcile, we should still succeed this request,
    1039              :         // because otherwise a broken compute hook causes a feedback loop where
    1040              :         // location_config returns 500 and gets retried forever.
    1041              : 
    1042            2 :         if let Some(create_req) = maybe_create {
    1043            2 :             let create_resp = self.tenant_create(create_req).await?;
    1044            1 :             result.shards = create_resp
    1045            1 :                 .shards
    1046            1 :                 .into_iter()
    1047            1 :                 .map(|s| TenantShardLocation {
    1048            1 :                     node_id: s.node_id,
    1049            1 :                     shard_id: s.shard_id,
    1050            1 :                 })
    1051            1 :                 .collect();
    1052              :         } else {
    1053              :             // This was an update, wait for reconciliation
    1054            1 :             if let Err(e) = self.await_waiters(waiters).await {
    1055              :                 // Do not treat a reconcile error as fatal: we have already applied any requested
    1056              :                 // Intent changes, and the reconcile can fail for external reasons like unavailable
    1057              :                 // compute notification API.  In these cases, it is important that we do not
    1058              :                 // cause the cloud control plane to retry forever on this API.
    1059            0 :                 tracing::warn!(
    1060            0 :                     "Failed to reconcile after /location_config: {e}, returning success anyway"
    1061            0 :                 );
    1062            1 :             }
    1063              :         }
    1064              : 
    1065            2 :         Ok(result)
    1066            2 :     }
    1067              : 
    1068           12 :     pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
    1069              :         // TODO: refactor into helper
    1070           12 :         let targets = {
    1071           12 :             let locked = self.inner.read().unwrap();
    1072           12 :             let mut targets = Vec::new();
    1073              : 
    1074           24 :             for (tenant_shard_id, shard) in
    1075           12 :                 locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1076           24 :             {
    1077           24 :                 let node_id = shard.intent.attached.ok_or_else(|| {
    1078            0 :                     ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
    1079           24 :                 })?;
    1080           24 :                 let node = locked
    1081           24 :                     .nodes
    1082           24 :                     .get(&node_id)
    1083           24 :                     .expect("Pageservers may not be deleted while referenced");
    1084           24 : 
    1085           24 :                 targets.push((*tenant_shard_id, node.clone()));
    1086              :             }
    1087           12 :             targets
    1088           12 :         };
    1089           12 : 
    1090           12 :         // TODO: error out if the tenant is not attached anywhere.
    1091           12 : 
    1092           12 :         // Phase 1: delete on the pageservers
    1093           12 :         let mut any_pending = false;
    1094           36 :         for (tenant_shard_id, node) in targets {
    1095           24 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
    1096              :             // TODO: this, like many other places, requires proper retry handling for 503, timeout: those should not
    1097              :             // surface immediately as an error to our caller.
    1098           96 :             let status = client.tenant_delete(tenant_shard_id).await.map_err(|e| {
    1099            0 :                 ApiError::InternalServerError(anyhow::anyhow!(
    1100            0 :                     "Error deleting shard {tenant_shard_id} on node {}: {e}",
    1101            0 :                     node.id
    1102            0 :                 ))
    1103           24 :             })?;
    1104           24 :             tracing::info!(
    1105           24 :                 "Shard {tenant_shard_id} on node {}, delete returned {}",
    1106           24 :                 node.id,
    1107           24 :                 status
    1108           24 :             );
    1109           24 :             if status == StatusCode::ACCEPTED {
    1110           12 :                 any_pending = true;
    1111           12 :             }
    1112              :         }
    1113              : 
    1114           12 :         if any_pending {
    1115              :             // Caller should call us again later.  When we eventually see 404s from
    1116              :             // all the shards, we may proceed to delete our records of the tenant.
    1117            6 :             tracing::info!(
    1118            6 :                 "Tenant {} has some shards pending deletion, returning 202",
    1119            6 :                 tenant_id
    1120            6 :             );
    1121            6 :             return Ok(StatusCode::ACCEPTED);
    1122            6 :         }
    1123            6 : 
    1124            6 :         // Fall through: deletion of the tenant on pageservers is complete, we may proceed to drop
    1125            6 :         // our in-memory state and database state.
    1126            6 : 
    1127            6 :         // Ordering: we delete persistent state first: if we then
    1128            6 :         // crash, we will drop the in-memory state.
    1129            6 : 
    1130            6 :         // Drop persistent state.
    1131            6 :         self.persistence.delete_tenant(tenant_id).await?;
    1132              : 
    1133              :         // Drop in-memory state
    1134              :         {
    1135            6 :             let mut locked = self.inner.write().unwrap();
    1136            6 :             locked
    1137            6 :                 .tenants
    1138           42 :                 .retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
    1139            6 :             tracing::info!(
    1140            6 :                 "Deleted tenant {tenant_id}, now have {} tenants",
    1141            6 :                 locked.tenants.len()
    1142            6 :             );
    1143              :         };
    1144              : 
    1145              :         // Success is represented as 404, to imitate the existing pageserver deletion API
    1146            6 :         Ok(StatusCode::NOT_FOUND)
    1147           12 :     }
    1148              : 
    1149          797 :     pub(crate) async fn tenant_timeline_create(
    1150          797 :         &self,
    1151          797 :         tenant_id: TenantId,
    1152          797 :         mut create_req: TimelineCreateRequest,
    1153          797 :     ) -> Result<TimelineInfo, ApiError> {
    1154          797 :         let mut timeline_info = None;
    1155              : 
    1156          797 :         tracing::info!(
    1157          797 :             "Creating timeline {}/{}",
    1158          797 :             tenant_id,
    1159          797 :             create_req.new_timeline_id,
    1160          797 :         );
    1161              : 
    1162          797 :         self.ensure_attached_wait(tenant_id).await?;
    1163              : 
    1164              :         // TODO: refuse to do this if shard splitting is in progress
    1165              :         // (https://github.com/neondatabase/neon/issues/6676)
    1166          797 :         let targets = {
    1167          797 :             let locked = self.inner.read().unwrap();
    1168          797 :             let mut targets = Vec::new();
    1169              : 
    1170          828 :             for (tenant_shard_id, shard) in
    1171          797 :                 locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1172          828 :             {
    1173          828 :                 let node_id = shard.intent.attached.ok_or_else(|| {
    1174            0 :                     ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
    1175          828 :                 })?;
    1176          828 :                 let node = locked
    1177          828 :                     .nodes
    1178          828 :                     .get(&node_id)
    1179          828 :                     .expect("Pageservers may not be deleted while referenced");
    1180          828 : 
    1181          828 :                 targets.push((*tenant_shard_id, node.clone()));
    1182              :             }
    1183          797 :             targets
    1184          797 :         };
    1185          797 : 
    1186          797 :         if targets.is_empty() {
    1187            0 :             return Err(ApiError::NotFound(
    1188            0 :                 anyhow::anyhow!("Tenant not found").into(),
    1189            0 :             ));
    1190          797 :         }
    1191              : 
    1192         1621 :         for (tenant_shard_id, node) in targets {
    1193              :             // TODO: issue shard timeline creates in parallel, once the 0th is done.
    1194              : 
    1195          828 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
    1196              : 
    1197          828 :             tracing::info!(
    1198          828 :                 "Creating timeline on shard {}/{}, attached to node {}",
    1199          828 :                 tenant_shard_id,
    1200          828 :                 create_req.new_timeline_id,
    1201          828 :                 node.id
    1202          828 :             );
    1203              : 
    1204          828 :             let shard_timeline_info = client
    1205          828 :                 .timeline_create(tenant_shard_id, &create_req)
    1206         3275 :                 .await
    1207          828 :                 .map_err(|e| match e {
    1208            4 :                     mgmt_api::Error::ApiError(status, msg)
    1209            4 :                         if status == StatusCode::INTERNAL_SERVER_ERROR
    1210            4 :                             || status == StatusCode::NOT_ACCEPTABLE =>
    1211            4 :                     {
    1212            4 :                         // TODO: handle more error codes, e.g. 503 should be passed through.  Make a general wrapper
    1213            4 :                         // for pass-through API calls.
    1214            4 :                         ApiError::InternalServerError(anyhow::anyhow!(msg))
    1215              :                     }
    1216            0 :                     _ => ApiError::Conflict(format!("Failed to create timeline: {e}")),
    1217          828 :                 })?;
    1218              : 
    1219          824 :             if timeline_info.is_none() {
    1220              :                 // If the caller specified an ancestor but no ancestor LSN, we are responsible for
    1221              :                 // propagating the LSN chosen by the first shard to the other shards: it is important
    1222              :                 // that all shards end up with the same ancestor_start_lsn.
    1223          793 :                 if create_req.ancestor_timeline_id.is_some()
    1224          242 :                     && create_req.ancestor_start_lsn.is_none()
    1225          220 :                 {
    1226          220 :                     create_req.ancestor_start_lsn = shard_timeline_info.ancestor_lsn;
    1227          573 :                 }
    1228              : 
    1229              :                 // We will return the TimelineInfo from the first shard
    1230          793 :                 timeline_info = Some(shard_timeline_info);
    1231           31 :             }
    1232              :         }
    1233          793 :         Ok(timeline_info.expect("targets cannot be empty"))
    1234          797 :     }
    1235              : 
    1236            2 :     pub(crate) async fn tenant_timeline_delete(
    1237            2 :         &self,
    1238            2 :         tenant_id: TenantId,
    1239            2 :         timeline_id: TimelineId,
    1240            2 :     ) -> Result<StatusCode, ApiError> {
    1241            2 :         tracing::info!("Deleting timeline {}/{}", tenant_id, timeline_id,);
    1242              : 
    1243            2 :         self.ensure_attached_wait(tenant_id).await?;
    1244              : 
    1245              :         // TODO: refuse to do this if shard splitting is in progress
    1246              :         // (https://github.com/neondatabase/neon/issues/6676)
    1247            2 :         let targets = {
    1248            2 :             let locked = self.inner.read().unwrap();
    1249            2 :             let mut targets = Vec::new();
    1250              : 
    1251            4 :             for (tenant_shard_id, shard) in
    1252            2 :                 locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1253            4 :             {
    1254            4 :                 let node_id = shard.intent.attached.ok_or_else(|| {
    1255            0 :                     ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
    1256            4 :                 })?;
    1257            4 :                 let node = locked
    1258            4 :                     .nodes
    1259            4 :                     .get(&node_id)
    1260            4 :                     .expect("Pageservers may not be deleted while referenced");
    1261            4 : 
    1262            4 :                 targets.push((*tenant_shard_id, node.clone()));
    1263              :             }
    1264            2 :             targets
    1265            2 :         };
    1266            2 : 
    1267            2 :         if targets.is_empty() {
    1268            0 :             return Err(ApiError::NotFound(
    1269            0 :                 anyhow::anyhow!("Tenant not found").into(),
    1270            0 :             ));
    1271            2 :         }
    1272            2 : 
    1273            2 :         // TODO: call into shards concurrently
    1274            2 :         let mut any_pending = false;
    1275            6 :         for (tenant_shard_id, node) in targets {
    1276            4 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
    1277              : 
    1278            4 :             tracing::info!(
    1279            4 :                 "Deleting timeline on shard {}/{}, attached to node {}",
    1280            4 :                 tenant_shard_id,
    1281            4 :                 timeline_id,
    1282            4 :                 node.id
    1283            4 :             );
    1284              : 
    1285            4 :             let status = client
    1286            4 :                 .timeline_delete(tenant_shard_id, timeline_id)
    1287           16 :                 .await
    1288            4 :                 .map_err(|e| {
    1289            0 :                     ApiError::InternalServerError(anyhow::anyhow!(
    1290            0 :                     "Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}",
    1291            0 :                     node.id
    1292            0 :                 ))
    1293            4 :                 })?;
    1294              : 
    1295            4 :             if status == StatusCode::ACCEPTED {
    1296            2 :                 any_pending = true;
    1297            2 :             }
    1298              :         }
    1299              : 
    1300            2 :         if any_pending {
    1301            1 :             Ok(StatusCode::ACCEPTED)
    1302              :         } else {
    1303            1 :             Ok(StatusCode::NOT_FOUND)
    1304              :         }
    1305            2 :     }
    1306              : 
    1307              :     /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
    1308              :     /// function looks it up and returns the url.  If the tenant isn't found, returns Err(ApiError::NotFound)
    1309           11 :     pub(crate) fn tenant_shard0_baseurl(
    1310           11 :         &self,
    1311           11 :         tenant_id: TenantId,
    1312           11 :     ) -> Result<(String, TenantShardId), ApiError> {
    1313           11 :         let locked = self.inner.read().unwrap();
    1314           11 :         let Some((tenant_shard_id, shard)) = locked
    1315           11 :             .tenants
    1316           11 :             .range(TenantShardId::tenant_range(tenant_id))
    1317           11 :             .next()
    1318              :         else {
    1319            6 :             return Err(ApiError::NotFound(
    1320            6 :                 anyhow::anyhow!("Tenant {tenant_id} not found").into(),
    1321            6 :             ));
    1322              :         };
    1323              : 
    1324              :         // TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
    1325              :         // point to somewhere we haven't attached yet.
    1326            5 :         let Some(node_id) = shard.intent.attached else {
    1327            0 :             return Err(ApiError::Conflict(
    1328            0 :                 "Cannot call timeline API on non-attached tenant".to_string(),
    1329            0 :             ));
    1330              :         };
    1331              : 
    1332            5 :         let Some(node) = locked.nodes.get(&node_id) else {
    1333              :             // This should never happen
    1334            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1335            0 :                 "Shard refers to nonexistent node"
    1336            0 :             )));
    1337              :         };
    1338              : 
    1339            5 :         Ok((node.base_url(), *tenant_shard_id))
    1340           11 :     }
    1341              : 
    1342          841 :     pub(crate) fn tenant_locate(
    1343          841 :         &self,
    1344          841 :         tenant_id: TenantId,
    1345          841 :     ) -> Result<TenantLocateResponse, ApiError> {
    1346          841 :         let locked = self.inner.read().unwrap();
    1347          841 :         tracing::info!("Locating shards for tenant {tenant_id}");
    1348              : 
    1349              :         // Take a snapshot of pageservers
    1350          841 :         let pageservers = locked.nodes.clone();
    1351          841 : 
    1352          841 :         let mut result = Vec::new();
    1353          841 :         let mut shard_params: Option<ShardParameters> = None;
    1354              : 
    1355         1047 :         for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1356              :         {
    1357         1047 :             let node_id = shard
    1358         1047 :                 .intent
    1359         1047 :                 .attached
    1360         1047 :                 .ok_or(ApiError::BadRequest(anyhow::anyhow!(
    1361         1047 :                     "Cannot locate a tenant that is not attached"
    1362         1047 :                 )))?;
    1363              : 
    1364         1047 :             let node = pageservers
    1365         1047 :                 .get(&node_id)
    1366         1047 :                 .expect("Pageservers may not be deleted while referenced");
    1367         1047 : 
    1368         1047 :             result.push(TenantLocateResponseShard {
    1369         1047 :                 shard_id: *tenant_shard_id,
    1370         1047 :                 node_id,
    1371         1047 :                 listen_http_addr: node.listen_http_addr.clone(),
    1372         1047 :                 listen_http_port: node.listen_http_port,
    1373         1047 :                 listen_pg_addr: node.listen_pg_addr.clone(),
    1374         1047 :                 listen_pg_port: node.listen_pg_port,
    1375         1047 :             });
    1376         1047 : 
    1377         1047 :             match &shard_params {
    1378          841 :                 None => {
    1379          841 :                     shard_params = Some(ShardParameters {
    1380          841 :                         stripe_size: shard.shard.stripe_size,
    1381          841 :                         count: shard.shard.count,
    1382          841 :                     });
    1383          841 :                 }
    1384          206 :                 Some(params) => {
    1385          206 :                     if params.stripe_size != shard.shard.stripe_size {
    1386              :                         // This should never happen.  We enforce at runtime because it's simpler than
    1387              :                         // adding an extra per-tenant data structure to store the things that should be the same
    1388            0 :                         return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1389            0 :                             "Inconsistent shard stripe size parameters!"
    1390            0 :                         )));
    1391          206 :                     }
    1392              :                 }
    1393              :             }
    1394              :         }
    1395              : 
    1396          841 :         if result.is_empty() {
    1397            0 :             return Err(ApiError::NotFound(
    1398            0 :                 anyhow::anyhow!("No shards for this tenant ID found").into(),
    1399            0 :             ));
    1400          841 :         }
    1401          841 :         let shard_params = shard_params.expect("result is non-empty, therefore this is set");
    1402          841 :         tracing::info!(
    1403          841 :             "Located tenant {} with params {:?} on shards {}",
    1404          841 :             tenant_id,
    1405          841 :             shard_params,
    1406          841 :             result
    1407          841 :                 .iter()
    1408         1047 :                 .map(|s| format!("{:?}", s))
    1409          841 :                 .collect::<Vec<_>>()
    1410          841 :                 .join(",")
    1411          841 :         );
    1412              : 
    1413          841 :         Ok(TenantLocateResponse {
    1414          841 :             shards: result,
    1415          841 :             shard_params,
    1416          841 :         })
    1417          841 :     }
    1418              : 
    1419            2 :     pub(crate) async fn tenant_shard_split(
    1420            2 :         &self,
    1421            2 :         tenant_id: TenantId,
    1422            2 :         split_req: TenantShardSplitRequest,
    1423            2 :     ) -> Result<TenantShardSplitResponse, ApiError> {
    1424            2 :         let mut policy = None;
    1425            2 :         let mut shard_ident = None;
    1426            2 : 
    1427            2 :         // TODO: put a cancellation token on Service for clean shutdown
    1428            2 :         let cancel = CancellationToken::new();
    1429              : 
    1430              :         // A parent shard which will be split
    1431              :         struct SplitTarget {
    1432              :             parent_id: TenantShardId,
    1433              :             node: Node,
    1434              :             child_ids: Vec<TenantShardId>,
    1435              :         }
    1436              : 
    1437              :         // Validate input, and calculate which shards we will create
    1438            2 :         let (old_shard_count, targets, compute_hook) = {
    1439            2 :             let locked = self.inner.read().unwrap();
    1440            2 : 
    1441            2 :             let pageservers = locked.nodes.clone();
    1442            2 : 
    1443            2 :             let mut targets = Vec::new();
    1444            2 : 
    1445            2 :             // In case this is a retry, count how many already-split shards we found
    1446            2 :             let mut children_found = Vec::new();
    1447            2 :             let mut old_shard_count = None;
    1448              : 
    1449            5 :             for (tenant_shard_id, shard) in
    1450            2 :                 locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1451              :             {
    1452            5 :                 match shard.shard.count.0.cmp(&split_req.new_shard_count) {
    1453              :                     Ordering::Equal => {
    1454              :                         //  Already split this
    1455            0 :                         children_found.push(*tenant_shard_id);
    1456            0 :                         continue;
    1457              :                     }
    1458              :                     Ordering::Greater => {
    1459            0 :                         return Err(ApiError::BadRequest(anyhow::anyhow!(
    1460            0 :                             "Requested count {} but already have shards at count {}",
    1461            0 :                             split_req.new_shard_count,
    1462            0 :                             shard.shard.count.0
    1463            0 :                         )));
    1464              :                     }
    1465            5 :                     Ordering::Less => {
    1466            5 :                         // Fall through: this shard has lower count than requested,
    1467            5 :                         // is a candidate for splitting.
    1468            5 :                     }
    1469            5 :                 }
    1470            5 : 
    1471            5 :                 match old_shard_count {
    1472            2 :                     None => old_shard_count = Some(shard.shard.count),
    1473            3 :                     Some(old_shard_count) => {
    1474            3 :                         if old_shard_count != shard.shard.count {
    1475              :                             // We may hit this case if a caller asked for two splits to
    1476              :                             // different sizes, before the first one is complete.
    1477              :                             // e.g. 1->2, 2->4, where the 4 call comes while we have a mixture
    1478              :                             // of shard_count=1 and shard_count=2 shards in the map.
    1479            0 :                             return Err(ApiError::Conflict(
    1480            0 :                                 "Cannot split, currently mid-split".to_string(),
    1481            0 :                             ));
    1482            3 :                         }
    1483              :                     }
    1484              :                 }
    1485            5 :                 if policy.is_none() {
    1486            2 :                     policy = Some(shard.policy.clone());
    1487            3 :                 }
    1488            5 :                 if shard_ident.is_none() {
    1489            2 :                     shard_ident = Some(shard.shard);
    1490            3 :                 }
    1491              : 
    1492            5 :                 if tenant_shard_id.shard_count == ShardCount(split_req.new_shard_count) {
    1493            0 :                     tracing::info!(
    1494            0 :                         "Tenant shard {} already has shard count {}",
    1495            0 :                         tenant_shard_id,
    1496            0 :                         split_req.new_shard_count
    1497            0 :                     );
    1498            0 :                     continue;
    1499            5 :                 }
    1500              : 
    1501            5 :                 let node_id =
    1502            5 :                     shard
    1503            5 :                         .intent
    1504            5 :                         .attached
    1505            5 :                         .ok_or(ApiError::BadRequest(anyhow::anyhow!(
    1506            5 :                             "Cannot split a tenant that is not attached"
    1507            5 :                         )))?;
    1508              : 
    1509            5 :                 let node = pageservers
    1510            5 :                     .get(&node_id)
    1511            5 :                     .expect("Pageservers may not be deleted while referenced");
    1512            5 : 
    1513            5 :                 // TODO: if any reconciliation is currently in progress for this shard, wait for it.
    1514            5 : 
    1515            5 :                 targets.push(SplitTarget {
    1516            5 :                     parent_id: *tenant_shard_id,
    1517            5 :                     node: node.clone(),
    1518            5 :                     child_ids: tenant_shard_id.split(ShardCount(split_req.new_shard_count)),
    1519            5 :                 });
    1520              :             }
    1521              : 
    1522            2 :             if targets.is_empty() {
    1523            0 :                 if children_found.len() == split_req.new_shard_count as usize {
    1524            0 :                     return Ok(TenantShardSplitResponse {
    1525            0 :                         new_shards: children_found,
    1526            0 :                     });
    1527              :                 } else {
    1528              :                     // No shards found to split, and no existing children found: the
    1529              :                     // tenant doesn't exist at all.
    1530            0 :                     return Err(ApiError::NotFound(
    1531            0 :                         anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
    1532            0 :                     ));
    1533              :                 }
    1534            2 :             }
    1535            2 : 
    1536            2 :             (old_shard_count, targets, locked.compute_hook.clone())
    1537            2 :         };
    1538            2 : 
    1539            2 :         // unwrap safety: we would have returned above if we didn't find at least one shard to split
    1540            2 :         let old_shard_count = old_shard_count.unwrap();
    1541            2 :         let shard_ident = shard_ident.unwrap();
    1542            2 :         let policy = policy.unwrap();
    1543            2 : 
    1544            2 :         // FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
    1545            2 :         // request could occur here, deleting or mutating the tenant.  begin_shard_split checks that the
    1546            2 :         // parent shards exist as expected, but it would be neater to do the above pre-checks within the
    1547            2 :         // same database transaction rather than pre-check in-memory and then maybe-fail the database write.
    1548            2 :         // (https://github.com/neondatabase/neon/issues/6676)
    1549            2 : 
    1550            2 :         // Before creating any new child shards in memory or on the pageservers, persist them: this
    1551            2 :         // enables us to ensure that we will always be able to clean up if something goes wrong.  This also
    1552            2 :         // acts as the protection against two concurrent attempts to split: one of them will get a database
    1553            2 :         // error trying to insert the child shards.
    1554            2 :         let mut child_tsps = Vec::new();
    1555            7 :         for target in &targets {
    1556            5 :             let mut this_child_tsps = Vec::new();
    1557           15 :             for child in &target.child_ids {
    1558           10 :                 let mut child_shard = shard_ident;
    1559           10 :                 child_shard.number = child.shard_number;
    1560           10 :                 child_shard.count = child.shard_count;
    1561           10 : 
    1562           10 :                 this_child_tsps.push(TenantShardPersistence {
    1563           10 :                     tenant_id: child.tenant_id.to_string(),
    1564           10 :                     shard_number: child.shard_number.0 as i32,
    1565           10 :                     shard_count: child.shard_count.0 as i32,
    1566           10 :                     shard_stripe_size: shard_ident.stripe_size.0 as i32,
    1567           10 :                     // Note: this generation is a placeholder, [`Persistence::begin_shard_split`] will
    1568           10 :                     // populate the correct generation as part of its transaction, to protect us
    1569           10 :                     // against racing with changes in the state of the parent.
    1570           10 :                     generation: 0,
    1571           10 :                     generation_pageserver: target.node.id.0 as i64,
    1572           10 :                     placement_policy: serde_json::to_string(&policy).unwrap(),
    1573           10 :                     // TODO: get the config out of the map
    1574           10 :                     config: serde_json::to_string(&TenantConfig::default()).unwrap(),
    1575           10 :                     splitting: SplitState::Splitting,
    1576           10 :                 });
    1577           10 :             }
    1578              : 
    1579            5 :             child_tsps.push((target.parent_id, this_child_tsps));
    1580              :         }
    1581              : 
    1582            2 :         if let Err(e) = self
    1583            2 :             .persistence
    1584            2 :             .begin_shard_split(old_shard_count, tenant_id, child_tsps)
    1585            2 :             .await
    1586              :         {
    1587            0 :             match e {
    1588              :                 DatabaseError::Query(diesel::result::Error::DatabaseError(
    1589              :                     DatabaseErrorKind::UniqueViolation,
    1590              :                     _,
    1591              :                 )) => {
    1592              :                     // Inserting a child shard violated a unique constraint: we raced with another call to
    1593              :                     // this function
    1594            0 :                     tracing::warn!("Conflicting attempt to split {tenant_id}: {e}");
    1595            0 :                     return Err(ApiError::Conflict("Tenant is already splitting".into()));
    1596              :                 }
    1597            0 :                 _ => return Err(ApiError::InternalServerError(e.into())),
    1598              :             }
    1599            2 :         }
    1600              : 
    1601              :         // FIXME: we have now committed the shard split state to the database, so any subsequent
    1602              :         // failure needs to roll it back.  We will later wrap this function in logic to roll back
    1603              :         // the split if it fails.
    1604              :         // (https://github.com/neondatabase/neon/issues/6676)
    1605              : 
    1606              :         // TODO: issue split calls concurrently (this only matters once we're splitting
    1607              :         // N>1 shards into M shards -- initially we're usually splitting 1 shard into N).
    1608              : 
    1609            7 :         for target in &targets {
    1610              :             let SplitTarget {
    1611            5 :                 parent_id,
    1612            5 :                 node,
    1613            5 :                 child_ids,
    1614            5 :             } = target;
    1615            5 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
    1616            5 :             let response = client
    1617            5 :                 .tenant_shard_split(
    1618            5 :                     *parent_id,
    1619            5 :                     TenantShardSplitRequest {
    1620            5 :                         new_shard_count: split_req.new_shard_count,
    1621            5 :                     },
    1622            5 :                 )
    1623           20 :                 .await
    1624            5 :                 .map_err(|e| ApiError::Conflict(format!("Failed to split {}: {}", parent_id, e)))?;
    1625              : 
    1626            5 :             tracing::info!(
    1627            5 :                 "Split {} into {}",
    1628            5 :                 parent_id,
    1629            5 :                 response
    1630            5 :                     .new_shards
    1631            5 :                     .iter()
    1632           10 :                     .map(|s| format!("{:?}", s))
    1633            5 :                     .collect::<Vec<_>>()
    1634            5 :                     .join(",")
    1635            5 :             );
    1636              : 
    1637            5 :             if &response.new_shards != child_ids {
    1638              :                 // This should never happen: the pageserver should agree with us on how shard splits work.
    1639            0 :                 return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1640            0 :                     "Splitting shard {} resulted in unexpected IDs: {:?} (expected {:?})",
    1641            0 :                     parent_id,
    1642            0 :                     response.new_shards,
    1643            0 :                     child_ids
    1644            0 :                 )));
    1645            5 :             }
    1646              :         }
    1647              : 
    1648              :         // TODO: if the pageserver restarted concurrently with our split API call,
    1649              :         // the actual generation of the child shard might differ from the generation
    1650              :         // we expect it to have.  In order for our in-database generation to end up
    1651              :         // correct, we should carry the child generation back in the response and apply it here
    1652              :         // in complete_shard_split (and apply the correct generation in memory)
    1653              :         // (or, we can carry generation in the request and reject the request if
    1654              :         //  it doesn't match, but that requires more retry logic on this side)
    1655              : 
    1656            2 :         self.persistence
    1657            2 :             .complete_shard_split(tenant_id, old_shard_count)
    1658            2 :             .await?;
    1659              : 
    1660              :         // Replace all the shards we just split with their children
    1661            2 :         let mut response = TenantShardSplitResponse {
    1662            2 :             new_shards: Vec::new(),
    1663            2 :         };
    1664            2 :         let mut child_locations = Vec::new();
    1665            2 :         {
    1666            2 :             let mut locked = self.inner.write().unwrap();
    1667            7 :             for target in targets {
    1668              :                 let SplitTarget {
    1669            5 :                     parent_id,
    1670            5 :                     node: _node,
    1671            5 :                     child_ids,
    1672            5 :                 } = target;
    1673            5 :                 let (pageserver, generation, config) = {
    1674            5 :                     let old_state = locked
    1675            5 :                         .tenants
    1676            5 :                         .remove(&parent_id)
    1677            5 :                         .expect("It was present, we just split it");
    1678            5 :                     (
    1679            5 :                         old_state.intent.attached.unwrap(),
    1680            5 :                         old_state.generation,
    1681            5 :                         old_state.config.clone(),
    1682            5 :                     )
    1683            5 :                 };
    1684            5 : 
    1685            5 :                 locked.tenants.remove(&parent_id);
    1686              : 
    1687           15 :                 for child in child_ids {
    1688           10 :                     let mut child_shard = shard_ident;
    1689           10 :                     child_shard.number = child.shard_number;
    1690           10 :                     child_shard.count = child.shard_count;
    1691           10 : 
    1692           10 :                     let mut child_observed: HashMap<NodeId, ObservedStateLocation> = HashMap::new();
    1693           10 :                     child_observed.insert(
    1694           10 :                         pageserver,
    1695           10 :                         ObservedStateLocation {
    1696           10 :                             conf: Some(attached_location_conf(generation, &child_shard, &config)),
    1697           10 :                         },
    1698           10 :                     );
    1699           10 : 
    1700           10 :                     let mut child_state = TenantState::new(child, child_shard, policy.clone());
    1701           10 :                     child_state.intent = IntentState::single(Some(pageserver));
    1702           10 :                     child_state.observed = ObservedState {
    1703           10 :                         locations: child_observed,
    1704           10 :                     };
    1705           10 :                     child_state.generation = generation;
    1706           10 :                     child_state.config = config.clone();
    1707           10 : 
    1708           10 :                     child_locations.push((child, pageserver));
    1709           10 : 
    1710           10 :                     locked.tenants.insert(child, child_state);
    1711           10 :                     response.new_shards.push(child);
    1712           10 :                 }
    1713              :             }
    1714              :         }
    1715              : 
    1716              :         // Send compute notifications for all the new shards
    1717            2 :         let mut failed_notifications = Vec::new();
    1718           12 :         for (child_id, child_ps) in child_locations {
    1719           10 :             if let Err(e) = compute_hook.notify(child_id, child_ps, &cancel).await {
    1720            0 :                 tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
    1721            0 :                         child_id, child_ps);
    1722            0 :                 failed_notifications.push(child_id);
    1723           10 :             }
    1724              :         }
    1725              : 
    1726              :         // If we failed any compute notifications, make a note to retry later.
    1727            2 :         if !failed_notifications.is_empty() {
    1728            0 :             let mut locked = self.inner.write().unwrap();
    1729            0 :             for failed in failed_notifications {
    1730            0 :                 if let Some(shard) = locked.tenants.get_mut(&failed) {
    1731            0 :                     shard.pending_compute_notification = true;
    1732            0 :                 }
    1733              :             }
    1734            2 :         }
    1735              : 
    1736            2 :         Ok(response)
    1737            2 :     }
    1738              : 
    1739            4 :     pub(crate) async fn tenant_shard_migrate(
    1740            4 :         &self,
    1741            4 :         tenant_shard_id: TenantShardId,
    1742            4 :         migrate_req: TenantShardMigrateRequest,
    1743            4 :     ) -> Result<TenantShardMigrateResponse, ApiError> {
    1744            4 :         let waiter = {
    1745            4 :             let mut locked = self.inner.write().unwrap();
    1746            4 : 
    1747            4 :             let result_tx = locked.result_tx.clone();
    1748            4 :             let pageservers = locked.nodes.clone();
    1749            4 :             let compute_hook = locked.compute_hook.clone();
    1750              : 
    1751            4 :             let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
    1752            0 :                 return Err(ApiError::NotFound(
    1753            0 :                     anyhow::anyhow!("Tenant shard not found").into(),
    1754            0 :                 ));
    1755              :             };
    1756              : 
    1757            4 :             if shard.intent.attached == Some(migrate_req.node_id) {
    1758              :                 // No-op case: we will still proceed to wait for reconciliation in case it is
    1759              :                 // incomplete from an earlier update to the intent.
    1760            0 :                 tracing::info!("Migrating: intent is unchanged {:?}", shard.intent);
    1761              :             } else {
    1762            4 :                 let old_attached = shard.intent.attached;
    1763            4 : 
    1764            4 :                 match shard.policy {
    1765            4 :                     PlacementPolicy::Single => {
    1766            4 :                         shard.intent.secondary.clear();
    1767            4 :                     }
    1768            0 :                     PlacementPolicy::Double(_n) => {
    1769            0 :                         // If our new attached node was a secondary, it no longer should be.
    1770            0 :                         shard.intent.secondary.retain(|s| s != &migrate_req.node_id);
    1771              : 
    1772              :                         // If we were already attached to something, demote that to a secondary
    1773            0 :                         if let Some(old_attached) = old_attached {
    1774            0 :                             shard.intent.secondary.push(old_attached);
    1775            0 :                         }
    1776              :                     }
    1777              :                     PlacementPolicy::Detached => {
    1778            0 :                         return Err(ApiError::BadRequest(anyhow::anyhow!(
    1779            0 :                             "Cannot migrate a tenant that is PlacementPolicy::Detached: configure it to an attached policy first"
    1780            0 :                         )))
    1781              :                     }
    1782              :                 }
    1783            4 :                 shard.intent.attached = Some(migrate_req.node_id);
    1784              : 
    1785            4 :                 tracing::info!("Migrating: new intent {:?}", shard.intent);
    1786            4 :                 shard.sequence = shard.sequence.next();
    1787              :             }
    1788              : 
    1789            4 :             shard.maybe_reconcile(
    1790            4 :                 result_tx,
    1791            4 :                 &pageservers,
    1792            4 :                 &compute_hook,
    1793            4 :                 &self.config,
    1794            4 :                 &self.persistence,
    1795            4 :             )
    1796              :         };
    1797              : 
    1798            4 :         if let Some(waiter) = waiter {
    1799            4 :             waiter.wait_timeout(RECONCILE_TIMEOUT).await?;
    1800              :         } else {
    1801            0 :             tracing::warn!("Migration is a no-op");
    1802              :         }
    1803              : 
    1804            4 :         Ok(TenantShardMigrateResponse {})
    1805            4 :     }
    1806              : 
    1807              :     /// This is for debug/support only: we simply drop all state for a tenant, without
    1808              :     /// detaching or deleting it on pageservers.
    1809            1 :     pub(crate) async fn tenant_drop(&self, tenant_id: TenantId) -> Result<(), ApiError> {
    1810            1 :         self.persistence.delete_tenant(tenant_id).await?;
    1811              : 
    1812            1 :         let mut locked = self.inner.write().unwrap();
    1813            1 :         let mut shards = Vec::new();
    1814            2 :         for (tenant_shard_id, _) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
    1815            2 :             shards.push(*tenant_shard_id);
    1816            2 :         }
    1817              : 
    1818            3 :         for shard in shards {
    1819            2 :             locked.tenants.remove(&shard);
    1820            2 :         }
    1821              : 
    1822            1 :         Ok(())
    1823            1 :     }
    1824              : 
    1825              :     /// This is for debug/support only: we simply drop all state for a tenant, without
    1826              :     /// detaching or deleting it on pageservers.  We do not try and re-schedule any
    1827              :     /// tenants that were on this node.
    1828              :     ///
    1829              :     /// TODO: proper node deletion API that unhooks things more gracefully
    1830            1 :     pub(crate) async fn node_drop(&self, node_id: NodeId) -> Result<(), ApiError> {
    1831            1 :         self.persistence.delete_node(node_id).await?;
    1832              : 
    1833            1 :         let mut locked = self.inner.write().unwrap();
    1834              : 
    1835            3 :         for shard in locked.tenants.values_mut() {
    1836            3 :             shard.deref_node(node_id);
    1837            3 :         }
    1838              : 
    1839            1 :         let mut nodes = (*locked.nodes).clone();
    1840            1 :         nodes.remove(&node_id);
    1841            1 :         locked.nodes = Arc::new(nodes);
    1842            1 : 
    1843            1 :         Ok(())
    1844            1 :     }
    1845              : 
    1846            5 :     pub(crate) async fn node_list(&self) -> Result<Vec<NodePersistence>, ApiError> {
    1847              :         // It is convenient to avoid taking the big lock and converting Node to a serializable
    1848              :         // structure, by fetching from storage instead of reading in-memory state.
    1849            5 :         let nodes = self
    1850            5 :             .persistence
    1851            5 :             .list_nodes()
    1852            5 :             .await?
    1853            5 :             .into_iter()
    1854           10 :             .map(|n| n.to_persistent())
    1855            5 :             .collect();
    1856            5 : 
    1857            5 :         Ok(nodes)
    1858            5 :     }
    1859              : 
    1860          624 :     pub(crate) async fn node_register(
    1861          624 :         &self,
    1862          624 :         register_req: NodeRegisterRequest,
    1863          624 :     ) -> Result<(), ApiError> {
    1864          624 :         // Pre-check for an already-existing node
    1865          624 :         {
    1866          624 :             let locked = self.inner.read().unwrap();
    1867          624 :             if let Some(node) = locked.nodes.get(&register_req.node_id) {
    1868              :                 // Note that we do not do a total equality of the struct, because we don't require
    1869              :                 // the availability/scheduling states to agree for a POST to be idempotent.
    1870          224 :                 if node.listen_http_addr == register_req.listen_http_addr
    1871          224 :                     && node.listen_http_port == register_req.listen_http_port
    1872          224 :                     && node.listen_pg_addr == register_req.listen_pg_addr
    1873          224 :                     && node.listen_pg_port == register_req.listen_pg_port
    1874              :                 {
    1875          224 :                     tracing::info!(
    1876          224 :                         "Node {} re-registered with matching address",
    1877          224 :                         register_req.node_id
    1878          224 :                     );
    1879          224 :                     return Ok(());
    1880              :                 } else {
    1881              :                     // TODO: decide if we want to allow modifying node addresses without removing and re-adding
    1882              :                     // the node.  Safest/simplest thing is to refuse it, and usually we deploy with
    1883              :                     // a fixed address through the lifetime of a node.
    1884            0 :                     tracing::warn!(
    1885            0 :                         "Node {} tried to register with different address",
    1886            0 :                         register_req.node_id
    1887            0 :                     );
    1888            0 :                     return Err(ApiError::Conflict(
    1889            0 :                         "Node is already registered with different address".to_string(),
    1890            0 :                     ));
    1891              :                 }
    1892          400 :             }
    1893          400 :         }
    1894          400 : 
    1895          400 :         // Ordering: we must persist the new node _before_ adding it to in-memory state.
    1896          400 :         // This ensures that before we use it for anything or expose it via any external
    1897          400 :         // API, it is guaranteed to be available after a restart.
    1898          400 :         let new_node = Node {
    1899          400 :             id: register_req.node_id,
    1900          400 :             listen_http_addr: register_req.listen_http_addr,
    1901          400 :             listen_http_port: register_req.listen_http_port,
    1902          400 :             listen_pg_addr: register_req.listen_pg_addr,
    1903          400 :             listen_pg_port: register_req.listen_pg_port,
    1904          400 :             scheduling: NodeSchedulingPolicy::Filling,
    1905          400 :             // TODO: we shouldn't really call this Active until we've heartbeated it.
    1906          400 :             availability: NodeAvailability::Active,
    1907          400 :         };
    1908          400 :         // TODO: idempotency if the node already exists in the database
    1909          400 :         self.persistence.insert_node(&new_node).await?;
    1910              : 
    1911          400 :         let mut locked = self.inner.write().unwrap();
    1912          400 :         let mut new_nodes = (*locked.nodes).clone();
    1913          400 : 
    1914          400 :         new_nodes.insert(register_req.node_id, new_node);
    1915          400 : 
    1916          400 :         locked.nodes = Arc::new(new_nodes);
    1917              : 
    1918          400 :         tracing::info!(
    1919          400 :             "Registered pageserver {}, now have {} pageservers",
    1920          400 :             register_req.node_id,
    1921          400 :             locked.nodes.len()
    1922          400 :         );
    1923          400 :         Ok(())
    1924          624 :     }
    1925              : 
    1926            4 :     pub(crate) fn node_configure(&self, config_req: NodeConfigureRequest) -> Result<(), ApiError> {
    1927            4 :         let mut locked = self.inner.write().unwrap();
    1928            4 :         let result_tx = locked.result_tx.clone();
    1929            4 :         let compute_hook = locked.compute_hook.clone();
    1930            4 : 
    1931            4 :         let mut new_nodes = (*locked.nodes).clone();
    1932              : 
    1933            4 :         let Some(node) = new_nodes.get_mut(&config_req.node_id) else {
    1934            0 :             return Err(ApiError::NotFound(
    1935            0 :                 anyhow::anyhow!("Node not registered").into(),
    1936            0 :             ));
    1937              :         };
    1938              : 
    1939            4 :         let mut offline_transition = false;
    1940            4 :         let mut active_transition = false;
    1941              : 
    1942            4 :         if let Some(availability) = &config_req.availability {
    1943            3 :             match (availability, &node.availability) {
    1944              :                 (NodeAvailability::Offline, NodeAvailability::Active) => {
    1945            2 :                     tracing::info!("Node {} transition to offline", config_req.node_id);
    1946            2 :                     offline_transition = true;
    1947              :                 }
    1948              :                 (NodeAvailability::Active, NodeAvailability::Offline) => {
    1949            1 :                     tracing::info!("Node {} transition to active", config_req.node_id);
    1950            1 :                     active_transition = true;
    1951              :                 }
    1952              :                 _ => {
    1953            0 :                     tracing::info!("Node {} no change during config", config_req.node_id);
    1954              :                     // No change
    1955              :                 }
    1956              :             };
    1957            3 :             node.availability = *availability;
    1958            1 :         }
    1959              : 
    1960            4 :         if let Some(scheduling) = config_req.scheduling {
    1961            1 :             node.scheduling = scheduling;
    1962            1 : 
    1963            1 :             // TODO: once we have a background scheduling ticker for fill/drain, kick it
    1964            1 :             // to wake up and start working.
    1965            3 :         }
    1966              : 
    1967            4 :         let new_nodes = Arc::new(new_nodes);
    1968            4 : 
    1969            4 :         let mut scheduler = Scheduler::new(&locked.tenants, &new_nodes);
    1970            4 :         if offline_transition {
    1971           13 :             for (tenant_shard_id, tenant_state) in &mut locked.tenants {
    1972            5 :                 if let Some(observed_loc) =
    1973           13 :                     tenant_state.observed.locations.get_mut(&config_req.node_id)
    1974            5 :                 {
    1975            5 :                     // When a node goes offline, we set its observed configuration to None, indicating unknown: we will
    1976            5 :                     // not assume our knowledge of the node's configuration is accurate until it comes back online
    1977            5 :                     observed_loc.conf = None;
    1978            8 :                 }
    1979              : 
    1980           13 :                 if tenant_state.intent.notify_offline(config_req.node_id) {
    1981            5 :                     tenant_state.sequence = tenant_state.sequence.next();
    1982            5 :                     match tenant_state.schedule(&mut scheduler) {
    1983            0 :                         Err(e) => {
    1984            0 :                             // It is possible that some tenants will become unschedulable when too many pageservers
    1985            0 :                             // go offline: in this case there isn't much we can do other than make the issue observable.
    1986            0 :                             // TODO: give TenantState a scheduling error attribute to be queried later.
    1987            0 :                             tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id);
    1988              :                         }
    1989            5 :                         Ok(()) => {
    1990            5 :                             tenant_state.maybe_reconcile(
    1991            5 :                                 result_tx.clone(),
    1992            5 :                                 &new_nodes,
    1993            5 :                                 &compute_hook,
    1994            5 :                                 &self.config,
    1995            5 :                                 &self.persistence,
    1996            5 :                             );
    1997            5 :                         }
    1998              :                     }
    1999            8 :                 }
    2000              :             }
    2001            2 :         }
    2002              : 
    2003            4 :         if active_transition {
    2004              :             // When a node comes back online, we must reconcile any tenant that has a None observed
    2005              :             // location on the node.
    2006           12 :             for tenant_state in locked.tenants.values_mut() {
    2007            4 :                 if let Some(observed_loc) =
    2008           12 :                     tenant_state.observed.locations.get_mut(&config_req.node_id)
    2009              :                 {
    2010            4 :                     if observed_loc.conf.is_none() {
    2011            4 :                         tenant_state.maybe_reconcile(
    2012            4 :                             result_tx.clone(),
    2013            4 :                             &new_nodes,
    2014            4 :                             &compute_hook,
    2015            4 :                             &self.config,
    2016            4 :                             &self.persistence,
    2017            4 :                         );
    2018            4 :                     }
    2019            8 :                 }
    2020              :             }
    2021              : 
    2022              :             // TODO: in the background, we should balance work back onto this pageserver
    2023            3 :         }
    2024              : 
    2025            4 :         locked.nodes = new_nodes;
    2026            4 : 
    2027            4 :         Ok(())
    2028            4 :     }
    2029              : 
    2030              :     /// Helper for methods that will try and call pageserver APIs for
    2031              :     /// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
    2032              :     /// is attached somewhere.
    2033          799 :     fn ensure_attached_schedule(
    2034          799 :         &self,
    2035          799 :         mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,
    2036          799 :         tenant_id: TenantId,
    2037          799 :     ) -> Result<Vec<ReconcilerWaiter>, anyhow::Error> {
    2038          799 :         let mut waiters = Vec::new();
    2039          799 :         let result_tx = locked.result_tx.clone();
    2040          799 :         let compute_hook = locked.compute_hook.clone();
    2041          799 :         let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
    2042          799 :         let pageservers = locked.nodes.clone();
    2043              : 
    2044          832 :         for (_tenant_shard_id, shard) in locked
    2045          799 :             .tenants
    2046          799 :             .range_mut(TenantShardId::tenant_range(tenant_id))
    2047              :         {
    2048          832 :             shard.schedule(&mut scheduler)?;
    2049              : 
    2050          832 :             if let Some(waiter) = shard.maybe_reconcile(
    2051          832 :                 result_tx.clone(),
    2052          832 :                 &pageservers,
    2053          832 :                 &compute_hook,
    2054          832 :                 &self.config,
    2055          832 :                 &self.persistence,
    2056          832 :             ) {
    2057            0 :                 waiters.push(waiter);
    2058          832 :             }
    2059              :         }
    2060          799 :         Ok(waiters)
    2061          799 :     }
    2062              : 
    2063          799 :     async fn ensure_attached_wait(&self, tenant_id: TenantId) -> Result<(), ApiError> {
    2064          799 :         let ensure_waiters = {
    2065          799 :             let locked = self.inner.write().unwrap();
    2066          799 : 
    2067          799 :             self.ensure_attached_schedule(locked, tenant_id)
    2068          799 :                 .map_err(ApiError::InternalServerError)?
    2069              :         };
    2070              : 
    2071          799 :         let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap();
    2072          799 :         for waiter in ensure_waiters {
    2073            0 :             let timeout = deadline.duration_since(Instant::now());
    2074            0 :             waiter.wait_timeout(timeout).await?;
    2075              :         }
    2076              : 
    2077          799 :         Ok(())
    2078          799 :     }
    2079              : 
    2080              :     /// Check all tenants for pending reconciliation work, and reconcile those in need
    2081              :     ///
    2082              :     /// Returns how many reconciliation tasks were started
    2083          363 :     fn reconcile_all(&self) -> usize {
    2084          363 :         let mut locked = self.inner.write().unwrap();
    2085          363 :         let result_tx = locked.result_tx.clone();
    2086          363 :         let compute_hook = locked.compute_hook.clone();
    2087          363 :         let pageservers = locked.nodes.clone();
    2088          363 :         locked
    2089          363 :             .tenants
    2090          363 :             .iter_mut()
    2091          363 :             .filter_map(|(_tenant_shard_id, shard)| {
    2092            8 :                 shard.maybe_reconcile(
    2093            8 :                     result_tx.clone(),
    2094            8 :                     &pageservers,
    2095            8 :                     &compute_hook,
    2096            8 :                     &self.config,
    2097            8 :                     &self.persistence,
    2098            8 :                 )
    2099          363 :             })
    2100          363 :             .count()
    2101          363 :     }
    2102              : }
        

Generated by: LCOV version 2.1-beta