LCOV - code coverage report
Current view: top level - control_plane/attachment_service/src - service.rs (source / functions) Coverage Total Hit
Test: 322b88762cba8ea666f63cda880cccab6936bf37.info Lines: 0.0 % 1834 0
Test Date: 2024-02-29 11:57:12 Functions: 0.0 % 213 0

            Line data    Source code
       1              : use std::{
       2              :     borrow::Cow,
       3              :     cmp::Ordering,
       4              :     collections::{BTreeMap, HashMap, HashSet},
       5              :     str::FromStr,
       6              :     sync::Arc,
       7              :     time::{Duration, Instant},
       8              : };
       9              : 
      10              : use anyhow::Context;
      11              : use control_plane::attachment_service::{
      12              :     AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
      13              : };
      14              : use diesel::result::DatabaseErrorKind;
      15              : use futures::{stream::FuturesUnordered, StreamExt};
      16              : use hyper::StatusCode;
      17              : use pageserver_api::controller_api::{
      18              :     NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy,
      19              :     TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
      20              :     TenantLocateResponseShard, TenantShardMigrateRequest, TenantShardMigrateResponse,
      21              : };
      22              : use pageserver_api::{
      23              :     models::{
      24              :         self, LocationConfig, LocationConfigListResponse, LocationConfigMode, ShardParameters,
      25              :         TenantConfig, TenantCreateRequest, TenantLocationConfigRequest,
      26              :         TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
      27              :         TenantShardSplitResponse, TenantTimeTravelRequest, TimelineCreateRequest, TimelineInfo,
      28              :     },
      29              :     shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
      30              :     upcall_api::{
      31              :         ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
      32              :         ValidateResponse, ValidateResponseTenant,
      33              :     },
      34              : };
      35              : use pageserver_client::mgmt_api;
      36              : use tokio_util::sync::CancellationToken;
      37              : use tracing::instrument;
      38              : use utils::{
      39              :     backoff,
      40              :     completion::Barrier,
      41              :     generation::Generation,
      42              :     http::error::ApiError,
      43              :     id::{NodeId, TenantId, TimelineId},
      44              :     seqwait::SeqWait,
      45              :     sync::gate::Gate,
      46              : };
      47              : 
      48              : use crate::{
      49              :     compute_hook::{self, ComputeHook},
      50              :     node::Node,
      51              :     persistence::{split_state::SplitState, DatabaseError, Persistence, TenantShardPersistence},
      52              :     reconciler::attached_location_conf,
      53              :     scheduler::Scheduler,
      54              :     tenant_state::{
      55              :         IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
      56              :         ReconcilerWaiter, TenantState,
      57              :     },
      58              :     PlacementPolicy, Sequence,
      59              : };
      60              : 
      61              : // For operations that should be quick, like attaching a new tenant
      62              : const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
      63              : 
      64              : // For operations that might be slow, like migrating a tenant with
      65              : // some data in it.
      66              : const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
      67              : 
      68              : /// How long [`Service::startup_reconcile`] is allowed to take before it should give
      69              : /// up on unresponsive pageservers and proceed.
      70              : pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
      71              : 
      72              : // Top level state available to all HTTP handlers
      73              : struct ServiceState {
      74              :     tenants: BTreeMap<TenantShardId, TenantState>,
      75              : 
      76              :     nodes: Arc<HashMap<NodeId, Node>>,
      77              : 
      78              :     scheduler: Scheduler,
      79              : 
      80              :     compute_hook: Arc<ComputeHook>,
      81              : 
      82              :     result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
      83              : }
      84              : 
      85              : impl ServiceState {
      86            0 :     fn new(
      87            0 :         config: Config,
      88            0 :         result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
      89            0 :         nodes: HashMap<NodeId, Node>,
      90            0 :         tenants: BTreeMap<TenantShardId, TenantState>,
      91            0 :         scheduler: Scheduler,
      92            0 :     ) -> Self {
      93            0 :         Self {
      94            0 :             tenants,
      95            0 :             nodes: Arc::new(nodes),
      96            0 :             scheduler,
      97            0 :             compute_hook: Arc::new(ComputeHook::new(config)),
      98            0 :             result_tx,
      99            0 :         }
     100            0 :     }
     101              : 
     102            0 :     fn parts_mut(
     103            0 :         &mut self,
     104            0 :     ) -> (
     105            0 :         &mut Arc<HashMap<NodeId, Node>>,
     106            0 :         &mut BTreeMap<TenantShardId, TenantState>,
     107            0 :         &mut Scheduler,
     108            0 :     ) {
     109            0 :         (&mut self.nodes, &mut self.tenants, &mut self.scheduler)
     110            0 :     }
     111              : }
     112              : 
     113            0 : #[derive(Clone)]
     114              : pub struct Config {
     115              :     // All pageservers managed by one instance of this service must have
     116              :     // the same public key.  This JWT token will be used to authenticate
     117              :     // this service to the pageservers it manages.
     118              :     pub jwt_token: Option<String>,
     119              : 
     120              :     // This JWT token will be used to authenticate this service to the control plane.
     121              :     pub control_plane_jwt_token: Option<String>,
     122              : 
     123              :     /// Where the compute hook should send notifications of pageserver attachment locations
     124              :     /// (this URL points to the control plane in prod). If this is None, the compute hook will
     125              :     /// assume it is running in a test environment and try to update neon_local.
     126              :     pub compute_hook_url: Option<String>,
     127              : }
     128              : 
     129              : impl From<DatabaseError> for ApiError {
     130            0 :     fn from(err: DatabaseError) -> ApiError {
     131            0 :         match err {
     132            0 :             DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
     133              :             // FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
     134              :             DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
     135            0 :                 ApiError::ShuttingDown
     136              :             }
     137            0 :             DatabaseError::Logical(reason) => {
     138            0 :                 ApiError::InternalServerError(anyhow::anyhow!(reason))
     139              :             }
     140              :         }
     141            0 :     }
     142              : }
     143              : 
     144              : pub struct Service {
     145              :     inner: Arc<std::sync::RwLock<ServiceState>>,
     146              :     config: Config,
     147              :     persistence: Arc<Persistence>,
     148              : 
     149              :     // Process shutdown will fire this token
     150              :     cancel: CancellationToken,
     151              : 
     152              :     // Background tasks will hold this gate
     153              :     gate: Gate,
     154              : 
     155              :     /// This waits for initial reconciliation with pageservers to complete.  Until this barrier
     156              :     /// passes, it isn't safe to do any actions that mutate tenants.
     157              :     pub(crate) startup_complete: Barrier,
     158              : }
     159              : 
     160              : impl From<ReconcileWaitError> for ApiError {
     161            0 :     fn from(value: ReconcileWaitError) -> Self {
     162            0 :         match value {
     163            0 :             ReconcileWaitError::Shutdown => ApiError::ShuttingDown,
     164            0 :             e @ ReconcileWaitError::Timeout(_) => ApiError::Timeout(format!("{e}").into()),
     165            0 :             e @ ReconcileWaitError::Failed(..) => ApiError::InternalServerError(anyhow::anyhow!(e)),
     166              :         }
     167            0 :     }
     168              : }
     169              : 
     170              : impl Service {
     171            0 :     pub fn get_config(&self) -> &Config {
     172            0 :         &self.config
     173            0 :     }
     174              : 
     175              :     /// Called once on startup, this function attempts to contact all pageservers to build an up-to-date
     176              :     /// view of the world, and determine which pageservers are responsive.
     177            0 :     #[instrument(skip_all)]
     178              :     async fn startup_reconcile(self: &Arc<Service>) {
     179              :         // For all tenant shards, a vector of observed states on nodes (where None means
     180              :         // indeterminate, same as in [`ObservedStateLocation`])
     181              :         let mut observed = HashMap::new();
     182              : 
     183              :         let mut nodes_online = HashSet::new();
     184              : 
     185              :         // Startup reconciliation does I/O to other services: whether they
     186              :         // are responsive or not, we should aim to finish within our deadline, because:
     187              :         // - If we don't, a k8s readiness hook watching /ready will kill us.
     188              :         // - While we're waiting for startup reconciliation, we are not fully
     189              :         //   available for end user operations like creating/deleting tenants and timelines.
     190              :         //
     191              :         // We set multiple deadlines to break up the time available between the phases of work: this is
     192              :         // arbitrary, but avoids a situation where the first phase could burn our entire timeout period.
     193              :         let start_at = Instant::now();
     194              :         let node_scan_deadline = start_at
     195              :             .checked_add(STARTUP_RECONCILE_TIMEOUT / 2)
     196              :             .expect("Reconcile timeout is a modest constant");
     197              : 
     198              :         let compute_notify_deadline = start_at
     199              :             .checked_add((STARTUP_RECONCILE_TIMEOUT / 4) * 3)
     200              :             .expect("Reconcile timeout is a modest constant");
     201              : 
     202              :         // Accumulate a list of any tenant locations that ought to be detached
     203              :         let mut cleanup = Vec::new();
     204              : 
     205              :         let node_listings = self.scan_node_locations(node_scan_deadline).await;
     206              :         for (node_id, list_response) in node_listings {
     207              :             let tenant_shards = list_response.tenant_shards;
     208            0 :             tracing::info!(
     209            0 :                 "Received {} shard statuses from pageserver {}, setting it to Active",
     210            0 :                 tenant_shards.len(),
     211            0 :                 node_id
     212            0 :             );
     213              :             nodes_online.insert(node_id);
     214              : 
     215              :             for (tenant_shard_id, conf_opt) in tenant_shards {
     216              :                 observed.insert(tenant_shard_id, (node_id, conf_opt));
     217              :             }
     218              :         }
     219              : 
     220              :         // List of tenants for which we will attempt to notify compute of their location at startup
     221              :         let mut compute_notifications = Vec::new();
     222              : 
     223              :         // Populate intent and observed states for all tenants, based on reported state on pageservers
     224              :         let shard_count = {
     225              :             let mut locked = self.inner.write().unwrap();
     226              :             let (nodes, tenants, scheduler) = locked.parts_mut();
     227              : 
     228              :             // Mark nodes online if they responded to us: nodes are offline by default after a restart.
     229              :             let mut new_nodes = (**nodes).clone();
     230              :             for (node_id, node) in new_nodes.iter_mut() {
     231              :                 if nodes_online.contains(node_id) {
     232              :                     node.availability = NodeAvailability::Active;
     233              :                     scheduler.node_upsert(node);
     234              :                 }
     235              :             }
     236              :             *nodes = Arc::new(new_nodes);
     237              : 
     238              :             for (tenant_shard_id, (node_id, observed_loc)) in observed {
     239              :                 let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else {
     240              :                     cleanup.push((tenant_shard_id, node_id));
     241              :                     continue;
     242              :                 };
     243              : 
     244              :                 tenant_state
     245              :                     .observed
     246              :                     .locations
     247              :                     .insert(node_id, ObservedStateLocation { conf: observed_loc });
     248              :             }
     249              : 
     250              :             // Populate each tenant's intent state
     251              :             for (tenant_shard_id, tenant_state) in tenants.iter_mut() {
     252              :                 tenant_state.intent_from_observed();
     253              :                 if let Err(e) = tenant_state.schedule(scheduler) {
     254              :                     // Non-fatal error: we are unable to properly schedule the tenant, perhaps because
     255              :                     // not enough pageservers are available.  The tenant may well still be available
     256              :                     // to clients.
     257            0 :                     tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}");
     258              :                 } else {
     259              :                     // If we're both intending and observed to be attached at a particular node, we will
     260              :                     // emit a compute notification for this. In the case where our observed state does not
     261              :                     // yet match our intent, we will eventually reconcile, and that will emit a compute notification.
     262              :                     if let Some(attached_at) = tenant_state.stably_attached() {
     263              :                         compute_notifications.push((*tenant_shard_id, attached_at));
     264              :                     }
     265              :                 }
     266              :             }
     267              : 
     268              :             tenants.len()
     269              :         };
     270              : 
     271              :         // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
     272              :         // generation_pageserver in the database.
     273              : 
     274              :         // Emit compute hook notifications for all tenants which are already stably attached.  Other tenants
     275              :         // will emit compute hook notifications when they reconcile.
     276              :         //
     277              :         // Ordering: we must complete these notification attempts before doing any other reconciliation for the
     278              :         // tenants named here, because otherwise our calls to notify() might race with more recent values
     279              :         // generated by reconciliation.
     280              :         let notify_failures = self
     281              :             .compute_notify_many(compute_notifications, compute_notify_deadline)
     282              :             .await;
     283              : 
     284              :         // Compute notify is fallible.  If it fails here, do not delay overall startup: set the
     285              :         // flag on these shards that they have a pending notification.
     286              :         // Update tenant state for any that failed to do their initial compute notify, so that they'll retry later.
     287              :         {
     288              :             let mut locked = self.inner.write().unwrap();
     289              :             for tenant_shard_id in notify_failures.into_iter() {
     290              :                 if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
     291              :                     shard.pending_compute_notification = true;
     292              :                 }
     293              :             }
     294              :         }
     295              : 
     296              :         // Finally, now that the service is up and running, launch reconcile operations for any tenants
     297              :         // which require it: under normal circumstances this should only include tenants that were in some
     298              :         // transient state before we restarted, or any tenants whose compute hooks failed above.
     299              :         let reconcile_tasks = self.reconcile_all();
     300              :         // We will not wait for these reconciliation tasks to run here: we're now done with startup and
     301              :         // normal operations may proceed.
     302              : 
     303              :         // Clean up any tenants that were found on pageservers but are not known to us.  Do this in the
     304              :         // background because it does not need to complete in order to proceed with other work.
     305              :         if !cleanup.is_empty() {
     306            0 :             tracing::info!("Cleaning up {} locations in the background", cleanup.len());
     307              :             tokio::task::spawn({
     308              :                 let cleanup_self = self.clone();
     309            0 :                 async move { cleanup_self.cleanup_locations(cleanup).await }
     310              :             });
     311              :         }
     312              : 
     313            0 :         tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
     314              :     }
     315              : 
     316              :     /// Used during [`Self::startup_reconcile`]: issue GETs to all nodes concurrently, with a deadline.
     317              :     ///
     318              :     /// The result includes only nodes which responded within the deadline
     319            0 :     async fn scan_node_locations(
     320            0 :         &self,
     321            0 :         deadline: Instant,
     322            0 :     ) -> HashMap<NodeId, LocationConfigListResponse> {
     323            0 :         let nodes = {
     324            0 :             let locked = self.inner.read().unwrap();
     325            0 :             locked.nodes.clone()
     326            0 :         };
     327            0 : 
     328            0 :         let mut node_results = HashMap::new();
     329            0 : 
     330            0 :         let mut node_list_futs = FuturesUnordered::new();
     331              : 
     332            0 :         for node in nodes.values() {
     333            0 :             node_list_futs.push({
     334            0 :                 async move {
     335            0 :                     let http_client = reqwest::ClientBuilder::new()
     336            0 :                         .timeout(Duration::from_secs(5))
     337            0 :                         .build()
     338            0 :                         .expect("Failed to construct HTTP client");
     339            0 :                     let client = mgmt_api::Client::from_client(
     340            0 :                         http_client,
     341            0 :                         node.base_url(),
     342            0 :                         self.config.jwt_token.as_deref(),
     343            0 :                     );
     344            0 : 
     345            0 :                     fn is_fatal(e: &mgmt_api::Error) -> bool {
     346            0 :                         use mgmt_api::Error::*;
     347            0 :                         match e {
     348            0 :                             ReceiveBody(_) | ReceiveErrorBody(_) => false,
     349            0 :                             ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
     350            0 :                             | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
     351            0 :                             | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
     352            0 :                             ApiError(_, _) => true,
     353            0 :                         }
     354            0 :                     }
     355            0 : 
     356            0 :                     tracing::info!("Scanning shards on node {}...", node.id);
     357            0 :                     let description = format!("List locations on {}", node.id);
     358            0 :                     let response = backoff::retry(
     359            0 :                         || client.list_location_config(),
     360            0 :                         is_fatal,
     361            0 :                         1,
     362            0 :                         5,
     363            0 :                         &description,
     364            0 :                         &self.cancel,
     365            0 :                     )
     366            0 :                     .await;
     367              : 
     368            0 :                     (node.id, response)
     369            0 :                 }
     370            0 :             });
     371            0 :         }
     372              : 
     373              :         loop {
     374            0 :             let (node_id, result) = tokio::select! {
     375            0 :                 next = node_list_futs.next() => {
     376              :                     match next {
     377              :                         Some(result) => result,
     378              :                         None =>{
     379              :                             // We got results for all our nodes
     380              :                             break;
     381              :                         }
     382              : 
     383              :                     }
     384              :                 },
     385              :                 _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
     386              :                     // Give up waiting for anyone who hasn't responded: we will yield the results that we have
     387            0 :                     tracing::info!("Reached deadline while waiting for nodes to respond to location listing requests");
     388              :                     break;
     389              :                 }
     390              :             };
     391              : 
     392            0 :             let Some(list_response) = result else {
     393            0 :                 tracing::info!("Shutdown during startup_reconcile");
     394            0 :                 break;
     395              :             };
     396              : 
     397            0 :             match list_response {
     398            0 :                 Err(e) => {
     399            0 :                     tracing::warn!("Could not scan node {} ({e})", node_id);
     400              :                 }
     401            0 :                 Ok(listing) => {
     402            0 :                     node_results.insert(node_id, listing);
     403            0 :                 }
     404              :             }
     405              :         }
     406              : 
     407            0 :         node_results
     408            0 :     }
     409              : 
     410              :     /// Used during [`Self::startup_reconcile`]: detach a list of unknown-to-us tenants from pageservers.
     411              :     ///
     412              :     /// This is safe to run in the background, because if we don't have this TenantShardId in our map of
     413              :     /// tenants, then it is probably something incompletely deleted before: we will not fight with any
     414              :     /// other task trying to attach it.
     415            0 :     #[instrument(skip_all)]
     416              :     async fn cleanup_locations(&self, cleanup: Vec<(TenantShardId, NodeId)>) {
     417              :         let nodes = self.inner.read().unwrap().nodes.clone();
     418              : 
     419              :         for (tenant_shard_id, node_id) in cleanup {
     420              :             // A node reported a tenant_shard_id which is unknown to us: detach it.
     421              :             let Some(node) = nodes.get(&node_id) else {
     422              :                 // This is legitimate; we run in the background and [`Self::startup_reconcile`] might have identified
     423              :                 // a location to clean up on a node that has since been removed.
     424            0 :                 tracing::info!(
     425            0 :                     "Not cleaning up location {node_id}/{tenant_shard_id}: node not found"
     426            0 :                 );
     427              :                 continue;
     428              :             };
     429              : 
     430              :             if self.cancel.is_cancelled() {
     431              :                 break;
     432              :             }
     433              : 
     434              :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
     435              :             match client
     436              :                 .location_config(
     437              :                     tenant_shard_id,
     438              :                     LocationConfig {
     439              :                         mode: LocationConfigMode::Detached,
     440              :                         generation: None,
     441              :                         secondary_conf: None,
     442              :                         shard_number: tenant_shard_id.shard_number.0,
     443              :                         shard_count: tenant_shard_id.shard_count.literal(),
     444              :                         shard_stripe_size: 0,
     445              :                         tenant_conf: models::TenantConfig::default(),
     446              :                     },
     447              :                     None,
     448              :                 )
     449              :                 .await
     450              :             {
     451              :                 Ok(()) => {
     452            0 :                     tracing::info!(
     453            0 :                         "Detached unknown shard {tenant_shard_id} on pageserver {node_id}"
     454            0 :                     );
     455              :                 }
     456              :                 Err(e) => {
     457              :                     // Non-fatal error: leaving a tenant shard behind that we are not managing shouldn't
     458              :                     // break anything.
     459            0 :                     tracing::error!(
     460            0 :                         "Failed to detach unknkown shard {tenant_shard_id} on pageserver {node_id}: {e}"
     461            0 :                     );
     462              :                 }
     463              :             }
     464              :         }
     465              :     }
     466              : 
     467              :     /// Used during [`Self::startup_reconcile`]: issue many concurrent compute notifications.
     468              :     ///
     469              :     /// Returns a set of any shards for which notifications where not acked within the deadline.
     470            0 :     async fn compute_notify_many(
     471            0 :         &self,
     472            0 :         notifications: Vec<(TenantShardId, NodeId)>,
     473            0 :         deadline: Instant,
     474            0 :     ) -> HashSet<TenantShardId> {
     475            0 :         let compute_hook = self.inner.read().unwrap().compute_hook.clone();
     476            0 : 
     477            0 :         let attempt_shards = notifications.iter().map(|i| i.0).collect::<HashSet<_>>();
     478            0 :         let mut success_shards = HashSet::new();
     479            0 : 
     480            0 :         // Construct an async stream of futures to invoke the compute notify function: we do this
     481            0 :         // in order to subsequently use .buffered() on the stream to execute with bounded parallelism.
     482            0 :         let mut stream = futures::stream::iter(notifications.into_iter())
     483            0 :             .map(|(tenant_shard_id, node_id)| {
     484            0 :                 let compute_hook = compute_hook.clone();
     485            0 :                 let cancel = self.cancel.clone();
     486            0 :                 async move {
     487            0 :                     if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
     488            0 :                         tracing::error!(
     489            0 :                             %tenant_shard_id,
     490            0 :                             %node_id,
     491            0 :                             "Failed to notify compute on startup for shard: {e}"
     492            0 :                         );
     493            0 :                         None
     494              :                     } else {
     495            0 :                         Some(tenant_shard_id)
     496              :                     }
     497            0 :                 }
     498            0 :             })
     499            0 :             .buffered(compute_hook::API_CONCURRENCY);
     500              : 
     501              :         loop {
     502            0 :             tokio::select! {
     503            0 :                 next = stream.next() => {
     504              :                     match next {
     505              :                         Some(Some(success_shard)) => {
     506              :                             // A notification succeeded
     507              :                             success_shards.insert(success_shard);
     508              :                             },
     509              :                         Some(None) => {
     510              :                             // A notification that failed
     511              :                         },
     512              :                         None => {
     513            0 :                             tracing::info!("Successfully sent all compute notifications");
     514              :                             break;
     515              :                         }
     516              :                     }
     517              :                 },
     518              :                 _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
     519              :                     // Give up sending any that didn't succeed yet
     520            0 :                     tracing::info!("Reached deadline while sending compute notifications");
     521              :                     break;
     522              :                 }
     523              :             };
     524              :         }
     525              : 
     526            0 :         attempt_shards
     527            0 :             .difference(&success_shards)
     528            0 :             .cloned()
     529            0 :             .collect()
     530            0 :     }
     531              : 
     532              :     /// Long running background task that periodically wakes up and looks for shards that need
     533              :     /// reconciliation.  Reconciliation is fallible, so any reconciliation tasks that fail during
     534              :     /// e.g. a tenant create/attach/migrate must eventually be retried: this task is responsible
     535              :     /// for those retries.
     536            0 :     #[instrument(skip_all)]
     537              :     async fn background_reconcile(&self) {
     538              :         self.startup_complete.clone().wait().await;
     539              : 
     540              :         const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20);
     541              : 
     542              :         let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD);
     543              :         while !self.cancel.is_cancelled() {
     544            0 :             tokio::select! {
     545            0 :               _ = interval.tick() => { self.reconcile_all(); }
     546            0 :               _ = self.cancel.cancelled() => return
     547            0 :             }
     548              :         }
     549              :     }
     550              : 
     551              :     /// Apply the contents of a [`ReconcileResult`] to our in-memory state: if the reconciliation
     552              :     /// was successful, this will update the observed state of the tenant such that subsequent
     553              :     /// calls to [`TenantState::maybe_reconcile`] will do nothing.
     554            0 :     #[instrument(skip_all, fields(
     555              :         tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(),
     556              :         sequence=%result.sequence
     557            0 :     ))]
     558              :     fn process_result(&self, result: ReconcileResult) {
     559              :         let mut locked = self.inner.write().unwrap();
     560              :         let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
     561              :             // A reconciliation result might race with removing a tenant: drop results for
     562              :             // tenants that aren't in our map.
     563              :             return;
     564              :         };
     565              : 
     566              :         // Usually generation should only be updated via this path, so the max() isn't
     567              :         // needed, but it is used to handle out-of-band updates via. e.g. test hook.
     568              :         tenant.generation = std::cmp::max(tenant.generation, result.generation);
     569              : 
     570              :         // If the reconciler signals that it failed to notify compute, set this state on
     571              :         // the shard so that a future [`TenantState::maybe_reconcile`] will try again.
     572              :         tenant.pending_compute_notification = result.pending_compute_notification;
     573              : 
     574              :         match result.result {
     575              :             Ok(()) => {
     576              :                 for (node_id, loc) in &result.observed.locations {
     577              :                     if let Some(conf) = &loc.conf {
     578            0 :                         tracing::info!("Updating observed location {}: {:?}", node_id, conf);
     579              :                     } else {
     580            0 :                         tracing::info!("Setting observed location {} to None", node_id,)
     581              :                     }
     582              :                 }
     583              :                 tenant.observed = result.observed;
     584              :                 tenant.waiter.advance(result.sequence);
     585              :             }
     586              :             Err(e) => {
     587            0 :                 tracing::warn!("Reconcile error: {}", e);
     588              : 
     589              :                 // Ordering: populate last_error before advancing error_seq,
     590              :                 // so that waiters will see the correct error after waiting.
     591              :                 *(tenant.last_error.lock().unwrap()) = format!("{e}");
     592              :                 tenant.error_waiter.advance(result.sequence);
     593              : 
     594              :                 for (node_id, o) in result.observed.locations {
     595              :                     tenant.observed.locations.insert(node_id, o);
     596              :                 }
     597              :             }
     598              :         }
     599              :     }
     600              : 
     601            0 :     async fn process_results(
     602            0 :         &self,
     603            0 :         mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResult>,
     604            0 :     ) {
     605            0 :         loop {
     606            0 :             // Wait for the next result, or for cancellation
     607            0 :             let result = tokio::select! {
     608            0 :                 r = result_rx.recv() => {
     609              :                     match r {
     610              :                         Some(result) => {result},
     611              :                         None => {break;}
     612              :                     }
     613              :                 }
     614              :                 _ = self.cancel.cancelled() => {
     615              :                     break;
     616              :                 }
     617              :             };
     618              : 
     619            0 :             self.process_result(result);
     620              :         }
     621            0 :     }
     622              : 
     623            0 :     pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
     624            0 :         let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
     625              : 
     626            0 :         tracing::info!("Loading nodes from database...");
     627            0 :         let nodes = persistence
     628            0 :             .list_nodes()
     629            0 :             .await?
     630            0 :             .into_iter()
     631            0 :             .map(|n| Node {
     632            0 :                 id: NodeId(n.node_id as u64),
     633            0 :                 // At startup we consider a node offline until proven otherwise.
     634            0 :                 availability: NodeAvailability::Offline,
     635            0 :                 scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
     636            0 :                     .expect("Bad scheduling policy in DB"),
     637            0 :                 listen_http_addr: n.listen_http_addr,
     638            0 :                 listen_http_port: n.listen_http_port as u16,
     639            0 :                 listen_pg_addr: n.listen_pg_addr,
     640            0 :                 listen_pg_port: n.listen_pg_port as u16,
     641            0 :             })
     642            0 :             .collect::<Vec<_>>();
     643            0 :         let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.id, n)).collect();
     644            0 :         tracing::info!("Loaded {} nodes from database.", nodes.len());
     645              : 
     646            0 :         tracing::info!("Loading shards from database...");
     647            0 :         let tenant_shard_persistence = persistence.list_tenant_shards().await?;
     648            0 :         tracing::info!(
     649            0 :             "Loaded {} shards from database.",
     650            0 :             tenant_shard_persistence.len()
     651            0 :         );
     652              : 
     653            0 :         let mut tenants = BTreeMap::new();
     654            0 : 
     655            0 :         let mut scheduler = Scheduler::new(nodes.values());
     656            0 : 
     657            0 :         #[cfg(feature = "testing")]
     658            0 :         {
     659            0 :             // Hack: insert scheduler state for all nodes referenced by shards, as compatibility
     660            0 :             // tests only store the shards, not the nodes.  The nodes will be loaded shortly
     661            0 :             // after when pageservers start up and register.
     662            0 :             let mut node_ids = HashSet::new();
     663            0 :             for tsp in &tenant_shard_persistence {
     664            0 :                 if tsp.generation_pageserver != i64::MAX {
     665            0 :                     node_ids.insert(tsp.generation_pageserver);
     666            0 :                 }
     667              :             }
     668            0 :             for node_id in node_ids {
     669            0 :                 tracing::info!("Creating node {} in scheduler for tests", node_id);
     670            0 :                 let node = Node {
     671            0 :                     id: NodeId(node_id as u64),
     672            0 :                     availability: NodeAvailability::Active,
     673            0 :                     scheduling: NodeSchedulingPolicy::Active,
     674            0 :                     listen_http_addr: "".to_string(),
     675            0 :                     listen_http_port: 123,
     676            0 :                     listen_pg_addr: "".to_string(),
     677            0 :                     listen_pg_port: 123,
     678            0 :                 };
     679            0 : 
     680            0 :                 scheduler.node_upsert(&node);
     681              :             }
     682              :         }
     683            0 :         for tsp in tenant_shard_persistence {
     684            0 :             let tenant_shard_id = TenantShardId {
     685            0 :                 tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
     686            0 :                 shard_number: ShardNumber(tsp.shard_number as u8),
     687            0 :                 shard_count: ShardCount::new(tsp.shard_count as u8),
     688              :             };
     689            0 :             let shard_identity = if tsp.shard_count == 0 {
     690            0 :                 ShardIdentity::unsharded()
     691              :             } else {
     692            0 :                 ShardIdentity::new(
     693            0 :                     ShardNumber(tsp.shard_number as u8),
     694            0 :                     ShardCount::new(tsp.shard_count as u8),
     695            0 :                     ShardStripeSize(tsp.shard_stripe_size as u32),
     696            0 :                 )?
     697              :             };
     698              : 
     699              :             // We will populate intent properly later in [`Self::startup_reconcile`], initially populate
     700              :             // it with what we can infer: the node for which a generation was most recently issued.
     701            0 :             let mut intent = IntentState::new();
     702            0 :             if tsp.generation_pageserver != i64::MAX {
     703            0 :                 intent.set_attached(
     704            0 :                     &mut scheduler,
     705            0 :                     Some(NodeId(tsp.generation_pageserver as u64)),
     706            0 :                 );
     707            0 :             }
     708              : 
     709            0 :             let new_tenant = TenantState {
     710            0 :                 tenant_shard_id,
     711            0 :                 shard: shard_identity,
     712            0 :                 sequence: Sequence::initial(),
     713            0 :                 generation: Generation::new(tsp.generation as u32),
     714            0 :                 policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
     715            0 :                 intent,
     716            0 :                 observed: ObservedState::new(),
     717            0 :                 config: serde_json::from_str(&tsp.config).unwrap(),
     718            0 :                 reconciler: None,
     719            0 :                 splitting: tsp.splitting,
     720            0 :                 waiter: Arc::new(SeqWait::new(Sequence::initial())),
     721            0 :                 error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
     722            0 :                 last_error: Arc::default(),
     723            0 :                 pending_compute_notification: false,
     724            0 :             };
     725            0 : 
     726            0 :             tenants.insert(tenant_shard_id, new_tenant);
     727              :         }
     728              : 
     729            0 :         let (startup_completion, startup_complete) = utils::completion::channel();
     730            0 : 
     731            0 :         let this = Arc::new(Self {
     732            0 :             inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
     733            0 :                 config.clone(),
     734            0 :                 result_tx,
     735            0 :                 nodes,
     736            0 :                 tenants,
     737            0 :                 scheduler,
     738            0 :             ))),
     739            0 :             config,
     740            0 :             persistence,
     741            0 :             startup_complete: startup_complete.clone(),
     742            0 :             cancel: CancellationToken::new(),
     743            0 :             gate: Gate::default(),
     744            0 :         });
     745            0 : 
     746            0 :         let result_task_this = this.clone();
     747            0 :         tokio::task::spawn(async move {
     748              :             // Block shutdown until we're done (we must respect self.cancel)
     749            0 :             if let Ok(_gate) = result_task_this.gate.enter() {
     750            0 :                 result_task_this.process_results(result_rx).await
     751            0 :             }
     752            0 :         });
     753            0 : 
     754            0 :         tokio::task::spawn({
     755            0 :             let this = this.clone();
     756            0 :             // We will block the [`Service::startup_complete`] barrier until [`Self::startup_reconcile`]
     757            0 :             // is done.
     758            0 :             let startup_completion = startup_completion.clone();
     759            0 :             async move {
     760              :                 // Block shutdown until we're done (we must respect self.cancel)
     761            0 :                 let Ok(_gate) = this.gate.enter() else {
     762            0 :                     return;
     763              :                 };
     764              : 
     765            0 :                 this.startup_reconcile().await;
     766              : 
     767            0 :                 drop(startup_completion);
     768            0 : 
     769            0 :                 this.background_reconcile().await;
     770            0 :             }
     771            0 :         });
     772            0 : 
     773            0 :         Ok(this)
     774            0 :     }
     775              : 
     776            0 :     pub(crate) async fn attach_hook(
     777            0 :         &self,
     778            0 :         attach_req: AttachHookRequest,
     779            0 :     ) -> anyhow::Result<AttachHookResponse> {
     780            0 :         // This is a test hook.  To enable using it on tenants that were created directly with
     781            0 :         // the pageserver API (not via this service), we will auto-create any missing tenant
     782            0 :         // shards with default state.
     783            0 :         let insert = {
     784            0 :             let locked = self.inner.write().unwrap();
     785            0 :             !locked.tenants.contains_key(&attach_req.tenant_shard_id)
     786            0 :         };
     787            0 :         if insert {
     788            0 :             let tsp = TenantShardPersistence {
     789            0 :                 tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
     790            0 :                 shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
     791            0 :                 shard_count: attach_req.tenant_shard_id.shard_count.literal() as i32,
     792            0 :                 shard_stripe_size: 0,
     793            0 :                 generation: 0,
     794            0 :                 generation_pageserver: i64::MAX,
     795            0 :                 placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
     796            0 :                 config: serde_json::to_string(&TenantConfig::default()).unwrap(),
     797            0 :                 splitting: SplitState::default(),
     798            0 :             };
     799            0 : 
     800            0 :             match self.persistence.insert_tenant_shards(vec![tsp]).await {
     801            0 :                 Err(e) => match e {
     802              :                     DatabaseError::Query(diesel::result::Error::DatabaseError(
     803              :                         DatabaseErrorKind::UniqueViolation,
     804              :                         _,
     805              :                     )) => {
     806            0 :                         tracing::info!(
     807            0 :                             "Raced with another request to insert tenant {}",
     808            0 :                             attach_req.tenant_shard_id
     809            0 :                         )
     810              :                     }
     811            0 :                     _ => return Err(e.into()),
     812              :                 },
     813              :                 Ok(()) => {
     814            0 :                     tracing::info!("Inserted shard {} in database", attach_req.tenant_shard_id);
     815              : 
     816            0 :                     let mut locked = self.inner.write().unwrap();
     817            0 :                     locked.tenants.insert(
     818            0 :                         attach_req.tenant_shard_id,
     819            0 :                         TenantState::new(
     820            0 :                             attach_req.tenant_shard_id,
     821            0 :                             ShardIdentity::unsharded(),
     822            0 :                             PlacementPolicy::Single,
     823            0 :                         ),
     824            0 :                     );
     825            0 :                     tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
     826              :                 }
     827              :             }
     828            0 :         }
     829              : 
     830            0 :         let new_generation = if let Some(req_node_id) = attach_req.node_id {
     831              :             Some(
     832            0 :                 self.persistence
     833            0 :                     .increment_generation(attach_req.tenant_shard_id, req_node_id)
     834            0 :                     .await?,
     835              :             )
     836              :         } else {
     837            0 :             self.persistence.detach(attach_req.tenant_shard_id).await?;
     838            0 :             None
     839              :         };
     840              : 
     841            0 :         let mut locked = self.inner.write().unwrap();
     842            0 :         let (_nodes, tenants, scheduler) = locked.parts_mut();
     843            0 : 
     844            0 :         let tenant_state = tenants
     845            0 :             .get_mut(&attach_req.tenant_shard_id)
     846            0 :             .expect("Checked for existence above");
     847              : 
     848            0 :         if let Some(new_generation) = new_generation {
     849            0 :             tenant_state.generation = new_generation;
     850            0 :         } else {
     851              :             // This is a detach notification.  We must update placement policy to avoid re-attaching
     852              :             // during background scheduling/reconciliation, or during attachment service restart.
     853            0 :             assert!(attach_req.node_id.is_none());
     854            0 :             tenant_state.policy = PlacementPolicy::Detached;
     855              :         }
     856              : 
     857            0 :         if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
     858            0 :             tracing::info!(
     859            0 :                 tenant_id = %attach_req.tenant_shard_id,
     860            0 :                 ps_id = %attaching_pageserver,
     861            0 :                 generation = ?tenant_state.generation,
     862            0 :                 "issuing",
     863            0 :             );
     864            0 :         } else if let Some(ps_id) = tenant_state.intent.get_attached() {
     865            0 :             tracing::info!(
     866            0 :                 tenant_id = %attach_req.tenant_shard_id,
     867            0 :                 %ps_id,
     868            0 :                 generation = ?tenant_state.generation,
     869            0 :                 "dropping",
     870            0 :             );
     871              :         } else {
     872            0 :             tracing::info!(
     873            0 :             tenant_id = %attach_req.tenant_shard_id,
     874            0 :             "no-op: tenant already has no pageserver");
     875              :         }
     876            0 :         tenant_state
     877            0 :             .intent
     878            0 :             .set_attached(scheduler, attach_req.node_id);
     879              : 
     880            0 :         tracing::info!(
     881            0 :             "attach_hook: tenant {} set generation {:?}, pageserver {}",
     882            0 :             attach_req.tenant_shard_id,
     883            0 :             tenant_state.generation,
     884            0 :             // TODO: this is an odd number of 0xf's
     885            0 :             attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
     886            0 :         );
     887              : 
     888              :         // Trick the reconciler into not doing anything for this tenant: this helps
     889              :         // tests that manually configure a tenant on the pagesrever, and then call this
     890              :         // attach hook: they don't want background reconciliation to modify what they
     891              :         // did to the pageserver.
     892              :         #[cfg(feature = "testing")]
     893              :         {
     894            0 :             if let Some(node_id) = attach_req.node_id {
     895            0 :                 tenant_state.observed.locations = HashMap::from([(
     896            0 :                     node_id,
     897            0 :                     ObservedStateLocation {
     898            0 :                         conf: Some(attached_location_conf(
     899            0 :                             tenant_state.generation,
     900            0 :                             &tenant_state.shard,
     901            0 :                             &tenant_state.config,
     902            0 :                         )),
     903            0 :                     },
     904            0 :                 )]);
     905            0 :             } else {
     906            0 :                 tenant_state.observed.locations.clear();
     907            0 :             }
     908              :         }
     909              : 
     910            0 :         Ok(AttachHookResponse {
     911            0 :             gen: attach_req
     912            0 :                 .node_id
     913            0 :                 .map(|_| tenant_state.generation.into().unwrap()),
     914            0 :         })
     915            0 :     }
     916              : 
     917            0 :     pub(crate) fn inspect(&self, inspect_req: InspectRequest) -> InspectResponse {
     918            0 :         let locked = self.inner.read().unwrap();
     919            0 : 
     920            0 :         let tenant_state = locked.tenants.get(&inspect_req.tenant_shard_id);
     921            0 : 
     922            0 :         InspectResponse {
     923            0 :             attachment: tenant_state.and_then(|s| {
     924            0 :                 s.intent
     925            0 :                     .get_attached()
     926            0 :                     .map(|ps| (s.generation.into().unwrap(), ps))
     927            0 :             }),
     928            0 :         }
     929            0 :     }
     930              : 
     931            0 :     pub(crate) async fn re_attach(
     932            0 :         &self,
     933            0 :         reattach_req: ReAttachRequest,
     934            0 :     ) -> Result<ReAttachResponse, ApiError> {
     935            0 :         // Take a re-attach as indication that the node is available: this is a precursor to proper
     936            0 :         // heartbeating in https://github.com/neondatabase/neon/issues/6844
     937            0 :         self.node_configure(NodeConfigureRequest {
     938            0 :             node_id: reattach_req.node_id,
     939            0 :             availability: Some(NodeAvailability::Active),
     940            0 :             scheduling: None,
     941            0 :         })
     942            0 :         .await?;
     943              : 
     944              :         // Ordering: we must persist generation number updates before making them visible in the in-memory state
     945            0 :         let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;
     946              : 
     947              :         // Apply the updated generation to our in-memory state
     948            0 :         let mut locked = self.inner.write().unwrap();
     949            0 : 
     950            0 :         let mut response = ReAttachResponse {
     951            0 :             tenants: Vec::new(),
     952            0 :         };
     953              : 
     954            0 :         for (tenant_shard_id, new_gen) in incremented_generations {
     955            0 :             response.tenants.push(ReAttachResponseTenant {
     956            0 :                 id: tenant_shard_id,
     957            0 :                 gen: new_gen.into().unwrap(),
     958            0 :             });
     959            0 : 
     960            0 :             // Apply the new generation number to our in-memory state
     961            0 :             let shard_state = locked.tenants.get_mut(&tenant_shard_id);
     962            0 :             let Some(shard_state) = shard_state else {
     963              :                 // Not fatal.  This edge case requires a re-attach to happen
     964              :                 // between inserting a new tenant shard in to the database, and updating our in-memory
     965              :                 // state to know about the shard, _and_ that the state inserted to the database referenced
     966              :                 // a pageserver.  Should never happen, but handle it rather than panicking, since it should
     967              :                 // be harmless.
     968            0 :                 tracing::error!(
     969            0 :                     "Shard {} is in database for node {} but not in-memory state",
     970            0 :                     tenant_shard_id,
     971            0 :                     reattach_req.node_id
     972            0 :                 );
     973            0 :                 continue;
     974              :             };
     975              : 
     976            0 :             shard_state.generation = std::cmp::max(shard_state.generation, new_gen);
     977            0 :             if let Some(observed) = shard_state
     978            0 :                 .observed
     979            0 :                 .locations
     980            0 :                 .get_mut(&reattach_req.node_id)
     981              :             {
     982            0 :                 if let Some(conf) = observed.conf.as_mut() {
     983            0 :                     conf.generation = new_gen.into();
     984            0 :                 }
     985            0 :             }
     986              : 
     987              :             // TODO: cancel/restart any running reconciliation for this tenant, it might be trying
     988              :             // to call location_conf API with an old generation.  Wait for cancellation to complete
     989              :             // before responding to this request.  Requires well implemented CancellationToken logic
     990              :             // all the way to where we call location_conf.  Even then, there can still be a location_conf
     991              :             // request in flight over the network: TODO handle that by making location_conf API refuse
     992              :             // to go backward in generations.
     993              :         }
     994            0 :         Ok(response)
     995            0 :     }
     996              : 
     997            0 :     pub(crate) fn validate(&self, validate_req: ValidateRequest) -> ValidateResponse {
     998            0 :         let locked = self.inner.read().unwrap();
     999            0 : 
    1000            0 :         let mut response = ValidateResponse {
    1001            0 :             tenants: Vec::new(),
    1002            0 :         };
    1003              : 
    1004            0 :         for req_tenant in validate_req.tenants {
    1005            0 :             if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) {
    1006            0 :                 let valid = tenant_state.generation == Generation::new(req_tenant.gen);
    1007            0 :                 tracing::info!(
    1008            0 :                     "handle_validate: {}(gen {}): valid={valid} (latest {:?})",
    1009            0 :                     req_tenant.id,
    1010            0 :                     req_tenant.gen,
    1011            0 :                     tenant_state.generation
    1012            0 :                 );
    1013            0 :                 response.tenants.push(ValidateResponseTenant {
    1014            0 :                     id: req_tenant.id,
    1015            0 :                     valid,
    1016            0 :                 });
    1017            0 :             } else {
    1018            0 :                 // After tenant deletion, we may approve any validation.  This avoids
    1019            0 :                 // spurious warnings on the pageserver if it has pending LSN updates
    1020            0 :                 // at the point a deletion happens.
    1021            0 :                 response.tenants.push(ValidateResponseTenant {
    1022            0 :                     id: req_tenant.id,
    1023            0 :                     valid: true,
    1024            0 :                 });
    1025            0 :             }
    1026              :         }
    1027            0 :         response
    1028            0 :     }
    1029              : 
    1030            0 :     pub(crate) async fn tenant_create(
    1031            0 :         &self,
    1032            0 :         create_req: TenantCreateRequest,
    1033            0 :     ) -> Result<TenantCreateResponse, ApiError> {
    1034            0 :         let (response, waiters) = self.do_tenant_create(create_req).await?;
    1035              : 
    1036            0 :         self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await?;
    1037            0 :         Ok(response)
    1038            0 :     }
    1039              : 
    1040            0 :     pub(crate) async fn do_tenant_create(
    1041            0 :         &self,
    1042            0 :         create_req: TenantCreateRequest,
    1043            0 :     ) -> Result<(TenantCreateResponse, Vec<ReconcilerWaiter>), ApiError> {
    1044              :         // This service expects to handle sharding itself: it is an error to try and directly create
    1045              :         // a particular shard here.
    1046            0 :         let tenant_id = if !create_req.new_tenant_id.is_unsharded() {
    1047            0 :             return Err(ApiError::BadRequest(anyhow::anyhow!(
    1048            0 :                 "Attempted to create a specific shard, this API is for creating the whole tenant"
    1049            0 :             )));
    1050              :         } else {
    1051            0 :             create_req.new_tenant_id.tenant_id
    1052              :         };
    1053              : 
    1054            0 :         tracing::info!(
    1055            0 :             "Creating tenant {}, shard_count={:?}",
    1056            0 :             create_req.new_tenant_id,
    1057            0 :             create_req.shard_parameters.count,
    1058            0 :         );
    1059              : 
    1060            0 :         let create_ids = (0..create_req.shard_parameters.count.count())
    1061            0 :             .map(|i| TenantShardId {
    1062            0 :                 tenant_id,
    1063            0 :                 shard_number: ShardNumber(i),
    1064            0 :                 shard_count: create_req.shard_parameters.count,
    1065            0 :             })
    1066            0 :             .collect::<Vec<_>>();
    1067            0 : 
    1068            0 :         // TODO: enable specifying this.  Using Single as a default helps legacy tests to work (they
    1069            0 :         // have no expectation of HA).
    1070            0 :         let placement_policy: PlacementPolicy = PlacementPolicy::Single;
    1071            0 : 
    1072            0 :         // Ordering: we persist tenant shards before creating them on the pageserver.  This enables a caller
    1073            0 :         // to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
    1074            0 :         // during the creation, rather than risking leaving orphan objects in S3.
    1075            0 :         let persist_tenant_shards = create_ids
    1076            0 :             .iter()
    1077            0 :             .map(|tenant_shard_id| TenantShardPersistence {
    1078            0 :                 tenant_id: tenant_shard_id.tenant_id.to_string(),
    1079            0 :                 shard_number: tenant_shard_id.shard_number.0 as i32,
    1080            0 :                 shard_count: tenant_shard_id.shard_count.literal() as i32,
    1081            0 :                 shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32,
    1082            0 :                 generation: create_req.generation.map(|g| g as i32).unwrap_or(0),
    1083            0 :                 generation_pageserver: i64::MAX,
    1084            0 :                 placement_policy: serde_json::to_string(&placement_policy).unwrap(),
    1085            0 :                 config: serde_json::to_string(&create_req.config).unwrap(),
    1086            0 :                 splitting: SplitState::default(),
    1087            0 :             })
    1088            0 :             .collect();
    1089            0 :         self.persistence
    1090            0 :             .insert_tenant_shards(persist_tenant_shards)
    1091            0 :             .await
    1092            0 :             .map_err(|e| {
    1093            0 :                 // TODO: distinguish primary key constraint (idempotent, OK), from other errors
    1094            0 :                 ApiError::InternalServerError(anyhow::anyhow!(e))
    1095            0 :             })?;
    1096              : 
    1097            0 :         let (waiters, response_shards) = {
    1098            0 :             let mut locked = self.inner.write().unwrap();
    1099            0 :             let (_nodes, tenants, scheduler) = locked.parts_mut();
    1100            0 : 
    1101            0 :             let mut response_shards = Vec::new();
    1102              : 
    1103            0 :             for tenant_shard_id in create_ids {
    1104            0 :                 tracing::info!("Creating shard {tenant_shard_id}...");
    1105              : 
    1106              :                 use std::collections::btree_map::Entry;
    1107            0 :                 match tenants.entry(tenant_shard_id) {
    1108            0 :                     Entry::Occupied(mut entry) => {
    1109            0 :                         tracing::info!(
    1110            0 :                             "Tenant shard {tenant_shard_id} already exists while creating"
    1111            0 :                         );
    1112              : 
    1113              :                         // TODO: schedule() should take an anti-affinity expression that pushes
    1114              :                         // attached and secondary locations (independently) away frorm those
    1115              :                         // pageservers also holding a shard for this tenant.
    1116              : 
    1117            0 :                         entry.get_mut().schedule(scheduler).map_err(|e| {
    1118            0 :                             ApiError::Conflict(format!(
    1119            0 :                                 "Failed to schedule shard {tenant_shard_id}: {e}"
    1120            0 :                             ))
    1121            0 :                         })?;
    1122              : 
    1123            0 :                         response_shards.push(TenantCreateResponseShard {
    1124            0 :                             shard_id: tenant_shard_id,
    1125            0 :                             node_id: entry
    1126            0 :                                 .get()
    1127            0 :                                 .intent
    1128            0 :                                 .get_attached()
    1129            0 :                                 .expect("We just set pageserver if it was None"),
    1130            0 :                             generation: entry.get().generation.into().unwrap(),
    1131            0 :                         });
    1132            0 : 
    1133            0 :                         continue;
    1134              :                     }
    1135            0 :                     Entry::Vacant(entry) => {
    1136            0 :                         let mut state = TenantState::new(
    1137            0 :                             tenant_shard_id,
    1138            0 :                             ShardIdentity::from_params(
    1139            0 :                                 tenant_shard_id.shard_number,
    1140            0 :                                 &create_req.shard_parameters,
    1141            0 :                             ),
    1142            0 :                             placement_policy.clone(),
    1143            0 :                         );
    1144              : 
    1145            0 :                         if let Some(create_gen) = create_req.generation {
    1146            0 :                             state.generation = Generation::new(create_gen);
    1147            0 :                         }
    1148            0 :                         state.config = create_req.config.clone();
    1149            0 : 
    1150            0 :                         state.schedule(scheduler).map_err(|e| {
    1151            0 :                             ApiError::Conflict(format!(
    1152            0 :                                 "Failed to schedule shard {tenant_shard_id}: {e}"
    1153            0 :                             ))
    1154            0 :                         })?;
    1155              : 
    1156            0 :                         response_shards.push(TenantCreateResponseShard {
    1157            0 :                             shard_id: tenant_shard_id,
    1158            0 :                             node_id: state
    1159            0 :                                 .intent
    1160            0 :                                 .get_attached()
    1161            0 :                                 .expect("We just set pageserver if it was None"),
    1162            0 :                             generation: state.generation.into().unwrap(),
    1163            0 :                         });
    1164            0 :                         entry.insert(state)
    1165              :                     }
    1166              :                 };
    1167              :             }
    1168              : 
    1169              :             // Take a snapshot of pageservers
    1170            0 :             let pageservers = locked.nodes.clone();
    1171            0 : 
    1172            0 :             let result_tx = locked.result_tx.clone();
    1173            0 :             let compute_hook = locked.compute_hook.clone();
    1174            0 : 
    1175            0 :             let waiters = locked
    1176            0 :                 .tenants
    1177            0 :                 .range_mut(TenantShardId::tenant_range(tenant_id))
    1178            0 :                 .filter_map(|(_shard_id, shard)| {
    1179            0 :                     shard.maybe_reconcile(
    1180            0 :                         result_tx.clone(),
    1181            0 :                         &pageservers,
    1182            0 :                         &compute_hook,
    1183            0 :                         &self.config,
    1184            0 :                         &self.persistence,
    1185            0 :                         &self.gate,
    1186            0 :                         &self.cancel,
    1187            0 :                     )
    1188            0 :                 })
    1189            0 :                 .collect::<Vec<_>>();
    1190            0 :             (waiters, response_shards)
    1191            0 :         };
    1192            0 : 
    1193            0 :         Ok((
    1194            0 :             TenantCreateResponse {
    1195            0 :                 shards: response_shards,
    1196            0 :             },
    1197            0 :             waiters,
    1198            0 :         ))
    1199            0 :     }
    1200              : 
    1201              :     /// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
    1202              :     /// wait for reconciliation to complete before responding.
    1203            0 :     async fn await_waiters(
    1204            0 :         &self,
    1205            0 :         waiters: Vec<ReconcilerWaiter>,
    1206            0 :         timeout: Duration,
    1207            0 :     ) -> Result<(), ReconcileWaitError> {
    1208            0 :         let deadline = Instant::now().checked_add(timeout).unwrap();
    1209            0 :         for waiter in waiters {
    1210            0 :             let timeout = deadline.duration_since(Instant::now());
    1211            0 :             waiter.wait_timeout(timeout).await?;
    1212              :         }
    1213              : 
    1214            0 :         Ok(())
    1215            0 :     }
    1216              : 
    1217              :     /// This API is used by the cloud control plane to do coarse-grained control of tenants:
    1218              :     /// - Call with mode Attached* to upsert the tenant.
    1219              :     /// - Call with mode Detached to switch to PolicyMode::Detached
    1220              :     ///
    1221              :     /// In future, calling with mode Secondary may switch to a detach-lite mode in which a tenant only has
    1222              :     /// secondary locations.
    1223            0 :     pub(crate) async fn tenant_location_config(
    1224            0 :         &self,
    1225            0 :         tenant_id: TenantId,
    1226            0 :         req: TenantLocationConfigRequest,
    1227            0 :     ) -> Result<TenantLocationConfigResponse, ApiError> {
    1228            0 :         if !req.tenant_id.is_unsharded() {
    1229            0 :             return Err(ApiError::BadRequest(anyhow::anyhow!(
    1230            0 :                 "This API is for importing single-sharded or unsharded tenants"
    1231            0 :             )));
    1232            0 :         }
    1233            0 : 
    1234            0 :         let mut waiters = Vec::new();
    1235            0 :         let mut result = TenantLocationConfigResponse { shards: Vec::new() };
    1236            0 :         let maybe_create = {
    1237            0 :             let mut locked = self.inner.write().unwrap();
    1238            0 :             let result_tx = locked.result_tx.clone();
    1239            0 :             let compute_hook = locked.compute_hook.clone();
    1240            0 :             let (nodes, tenants, scheduler) = locked.parts_mut();
    1241            0 : 
    1242            0 :             // Maybe we have existing shards
    1243            0 :             let mut create = true;
    1244            0 :             for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
    1245              :                 // Saw an existing shard: this is not a creation
    1246            0 :                 create = false;
    1247            0 : 
    1248            0 :                 // Note that for existing tenants we do _not_ respect the generation in the request: this is likely
    1249            0 :                 // to be stale.  Once a tenant is created in this service, our view of generation is authoritative, and
    1250            0 :                 // callers' generations may be ignored.  This represents a one-way migration of tenants from the outer
    1251            0 :                 // cloud control plane into this service.
    1252            0 : 
    1253            0 :                 // Use location config mode as an indicator of policy: if they ask for
    1254            0 :                 // attached we go to default HA attached mode.  If they ask for secondary
    1255            0 :                 // we go to secondary-only mode.  If they ask for detached we detach.
    1256            0 :                 match req.config.mode {
    1257            0 :                     LocationConfigMode::Detached => {
    1258            0 :                         shard.policy = PlacementPolicy::Detached;
    1259            0 :                     }
    1260              :                     LocationConfigMode::Secondary => {
    1261              :                         // TODO: implement secondary-only mode.
    1262            0 :                         todo!();
    1263              :                     }
    1264              :                     LocationConfigMode::AttachedMulti
    1265              :                     | LocationConfigMode::AttachedSingle
    1266              :                     | LocationConfigMode::AttachedStale => {
    1267              :                         // TODO: persistence for changes in policy
    1268            0 :                         if nodes.len() > 1 {
    1269            0 :                             shard.policy = PlacementPolicy::Double(1)
    1270              :                         } else {
    1271              :                             // Convenience for dev/test: if we just have one pageserver, import
    1272              :                             // tenants into Single mode so that scheduling will succeed.
    1273            0 :                             shard.policy = PlacementPolicy::Single
    1274              :                         }
    1275              :                     }
    1276              :                 }
    1277              : 
    1278            0 :                 shard.schedule(scheduler)?;
    1279              : 
    1280            0 :                 let maybe_waiter = shard.maybe_reconcile(
    1281            0 :                     result_tx.clone(),
    1282            0 :                     nodes,
    1283            0 :                     &compute_hook,
    1284            0 :                     &self.config,
    1285            0 :                     &self.persistence,
    1286            0 :                     &self.gate,
    1287            0 :                     &self.cancel,
    1288            0 :                 );
    1289            0 :                 if let Some(waiter) = maybe_waiter {
    1290            0 :                     waiters.push(waiter);
    1291            0 :                 }
    1292              : 
    1293            0 :                 if let Some(node_id) = shard.intent.get_attached() {
    1294            0 :                     result.shards.push(TenantShardLocation {
    1295            0 :                         shard_id: *shard_id,
    1296            0 :                         node_id: *node_id,
    1297            0 :                     })
    1298            0 :                 }
    1299              :             }
    1300              : 
    1301            0 :             if create {
    1302              :                 // Validate request mode
    1303            0 :                 match req.config.mode {
    1304              :                     LocationConfigMode::Detached | LocationConfigMode::Secondary => {
    1305              :                         // When using this API to onboard an existing tenant to this service, it must start in
    1306              :                         // an attached state, because we need the request to come with a generation
    1307            0 :                         return Err(ApiError::BadRequest(anyhow::anyhow!(
    1308            0 :                             "Imported tenant must be in attached mode"
    1309            0 :                         )));
    1310              :                     }
    1311              : 
    1312              :                     LocationConfigMode::AttachedMulti
    1313              :                     | LocationConfigMode::AttachedSingle
    1314            0 :                     | LocationConfigMode::AttachedStale => {
    1315            0 :                         // Pass
    1316            0 :                     }
    1317              :                 }
    1318              : 
    1319              :                 // Validate request generation
    1320            0 :                 let Some(generation) = req.config.generation else {
    1321              :                     // We can only import attached tenants, because we need the request to come with a generation
    1322            0 :                     return Err(ApiError::BadRequest(anyhow::anyhow!(
    1323            0 :                         "Generation is mandatory when importing tenant"
    1324            0 :                     )));
    1325              :                 };
    1326              : 
    1327              :                 // Synthesize a creation request
    1328            0 :                 Some(TenantCreateRequest {
    1329            0 :                     new_tenant_id: TenantShardId::unsharded(tenant_id),
    1330            0 :                     generation: Some(generation),
    1331            0 :                     shard_parameters: ShardParameters {
    1332            0 :                         // Must preserve the incoming shard_count do distinguish unsharded (0)
    1333            0 :                         // from single-sharded (1): this distinction appears in the S3 keys of the tenant.
    1334            0 :                         count: req.tenant_id.shard_count,
    1335            0 :                         // We only import un-sharded or single-sharded tenants, so stripe
    1336            0 :                         // size can be made up arbitrarily here.
    1337            0 :                         stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE,
    1338            0 :                     },
    1339            0 :                     config: req.config.tenant_conf,
    1340            0 :                 })
    1341              :             } else {
    1342            0 :                 None
    1343              :             }
    1344              :         };
    1345              : 
    1346            0 :         let waiters = if let Some(create_req) = maybe_create {
    1347            0 :             let (create_resp, waiters) = self.do_tenant_create(create_req).await?;
    1348            0 :             result.shards = create_resp
    1349            0 :                 .shards
    1350            0 :                 .into_iter()
    1351            0 :                 .map(|s| TenantShardLocation {
    1352            0 :                     node_id: s.node_id,
    1353            0 :                     shard_id: s.shard_id,
    1354            0 :                 })
    1355            0 :                 .collect();
    1356            0 :             waiters
    1357              :         } else {
    1358            0 :             waiters
    1359              :         };
    1360              : 
    1361            0 :         if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
    1362              :             // Do not treat a reconcile error as fatal: we have already applied any requested
    1363              :             // Intent changes, and the reconcile can fail for external reasons like unavailable
    1364              :             // compute notification API.  In these cases, it is important that we do not
    1365              :             // cause the cloud control plane to retry forever on this API.
    1366            0 :             tracing::warn!(
    1367            0 :                 "Failed to reconcile after /location_config: {e}, returning success anyway"
    1368            0 :             );
    1369            0 :         }
    1370              : 
    1371              :         // Logging the full result is useful because it lets us cross-check what the cloud control
    1372              :         // plane's tenant_shards table should contain.
    1373            0 :         tracing::info!("Complete, returning {result:?}");
    1374              : 
    1375            0 :         Ok(result)
    1376            0 :     }
    1377              : 
    1378            0 :     pub(crate) async fn tenant_time_travel_remote_storage(
    1379            0 :         &self,
    1380            0 :         time_travel_req: &TenantTimeTravelRequest,
    1381            0 :         tenant_id: TenantId,
    1382            0 :         timestamp: Cow<'_, str>,
    1383            0 :         done_if_after: Cow<'_, str>,
    1384            0 :     ) -> Result<(), ApiError> {
    1385            0 :         let node = {
    1386            0 :             let locked = self.inner.read().unwrap();
    1387              :             // Just a sanity check to prevent misuse: the API expects that the tenant is fully
    1388              :             // detached everywhere, and nothing writes to S3 storage. Here, we verify that,
    1389              :             // but only at the start of the process, so it's really just to prevent operator
    1390              :             // mistakes.
    1391            0 :             for (shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
    1392            0 :                 if shard.intent.get_attached().is_some() || !shard.intent.get_secondary().is_empty()
    1393              :                 {
    1394            0 :                     return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1395            0 :                         "We want tenant to be attached in shard with tenant_shard_id={shard_id}"
    1396            0 :                     )));
    1397            0 :                 }
    1398            0 :                 let maybe_attached = shard
    1399            0 :                     .observed
    1400            0 :                     .locations
    1401            0 :                     .iter()
    1402            0 :                     .filter_map(|(node_id, observed_location)| {
    1403            0 :                         observed_location
    1404            0 :                             .conf
    1405            0 :                             .as_ref()
    1406            0 :                             .map(|loc| (node_id, observed_location, loc.mode))
    1407            0 :                     })
    1408            0 :                     .find(|(_, _, mode)| *mode != LocationConfigMode::Detached);
    1409            0 :                 if let Some((node_id, _observed_location, mode)) = maybe_attached {
    1410            0 :                     return Err(ApiError::InternalServerError(anyhow::anyhow!("We observed attached={mode:?} tenant in node_id={node_id} shard with tenant_shard_id={shard_id}")));
    1411            0 :                 }
    1412              :             }
    1413            0 :             let scheduler = &locked.scheduler;
    1414              :             // Right now we only perform the operation on a single node without parallelization
    1415              :             // TODO fan out the operation to multiple nodes for better performance
    1416            0 :             let node_id = scheduler.schedule_shard(&[])?;
    1417            0 :             let node = locked
    1418            0 :                 .nodes
    1419            0 :                 .get(&node_id)
    1420            0 :                 .expect("Pageservers may not be deleted while lock is active");
    1421            0 :             node.clone()
    1422            0 :         };
    1423            0 : 
    1424            0 :         // The shard count is encoded in the remote storage's URL, so we need to handle all historically used shard counts
    1425            0 :         let mut counts = time_travel_req
    1426            0 :             .shard_counts
    1427            0 :             .iter()
    1428            0 :             .copied()
    1429            0 :             .collect::<HashSet<_>>()
    1430            0 :             .into_iter()
    1431            0 :             .collect::<Vec<_>>();
    1432            0 :         counts.sort_unstable();
    1433              : 
    1434            0 :         for count in counts {
    1435            0 :             let shard_ids = (0..count.count())
    1436            0 :                 .map(|i| TenantShardId {
    1437            0 :                     tenant_id,
    1438            0 :                     shard_number: ShardNumber(i),
    1439            0 :                     shard_count: count,
    1440            0 :                 })
    1441            0 :                 .collect::<Vec<_>>();
    1442            0 :             for tenant_shard_id in shard_ids {
    1443            0 :                 let client =
    1444            0 :                     mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
    1445              : 
    1446            0 :                 tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
    1447              : 
    1448            0 :                 client
    1449            0 :                         .tenant_time_travel_remote_storage(
    1450            0 :                             tenant_shard_id,
    1451            0 :                             &timestamp,
    1452            0 :                             &done_if_after,
    1453            0 :                         )
    1454            0 :                         .await
    1455            0 :                         .map_err(|e| {
    1456            0 :                             ApiError::InternalServerError(anyhow::anyhow!(
    1457            0 :                                 "Error doing time travel recovery for shard {tenant_shard_id} on node {}: {e}",
    1458            0 :                                 node.id
    1459            0 :                             ))
    1460            0 :                         })?;
    1461              :             }
    1462              :         }
    1463              : 
    1464            0 :         Ok(())
    1465            0 :     }
    1466              : 
    1467            0 :     pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
    1468            0 :         self.ensure_attached_wait(tenant_id).await?;
    1469              : 
    1470              :         // TODO: refactor into helper
    1471            0 :         let targets = {
    1472            0 :             let locked = self.inner.read().unwrap();
    1473            0 :             let mut targets = Vec::new();
    1474              : 
    1475            0 :             for (tenant_shard_id, shard) in
    1476            0 :                 locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1477            0 :             {
    1478            0 :                 let node_id = shard.intent.get_attached().ok_or_else(|| {
    1479            0 :                     ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
    1480            0 :                 })?;
    1481            0 :                 let node = locked
    1482            0 :                     .nodes
    1483            0 :                     .get(&node_id)
    1484            0 :                     .expect("Pageservers may not be deleted while referenced");
    1485            0 : 
    1486            0 :                 targets.push((*tenant_shard_id, node.clone()));
    1487              :             }
    1488            0 :             targets
    1489            0 :         };
    1490            0 : 
    1491            0 :         // Phase 1: delete on the pageservers
    1492            0 :         let mut any_pending = false;
    1493            0 :         for (tenant_shard_id, node) in targets {
    1494            0 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
    1495              :             // TODO: this, like many other places, requires proper retry handling for 503, timeout: those should not
    1496              :             // surface immediately as an error to our caller.
    1497            0 :             let status = client.tenant_delete(tenant_shard_id).await.map_err(|e| {
    1498            0 :                 ApiError::InternalServerError(anyhow::anyhow!(
    1499            0 :                     "Error deleting shard {tenant_shard_id} on node {}: {e}",
    1500            0 :                     node.id
    1501            0 :                 ))
    1502            0 :             })?;
    1503            0 :             tracing::info!(
    1504            0 :                 "Shard {tenant_shard_id} on node {}, delete returned {}",
    1505            0 :                 node.id,
    1506            0 :                 status
    1507            0 :             );
    1508            0 :             if status == StatusCode::ACCEPTED {
    1509            0 :                 any_pending = true;
    1510            0 :             }
    1511              :         }
    1512              : 
    1513            0 :         if any_pending {
    1514              :             // Caller should call us again later.  When we eventually see 404s from
    1515              :             // all the shards, we may proceed to delete our records of the tenant.
    1516            0 :             tracing::info!(
    1517            0 :                 "Tenant {} has some shards pending deletion, returning 202",
    1518            0 :                 tenant_id
    1519            0 :             );
    1520            0 :             return Ok(StatusCode::ACCEPTED);
    1521            0 :         }
    1522            0 : 
    1523            0 :         // Fall through: deletion of the tenant on pageservers is complete, we may proceed to drop
    1524            0 :         // our in-memory state and database state.
    1525            0 : 
    1526            0 :         // Ordering: we delete persistent state first: if we then
    1527            0 :         // crash, we will drop the in-memory state.
    1528            0 : 
    1529            0 :         // Drop persistent state.
    1530            0 :         self.persistence.delete_tenant(tenant_id).await?;
    1531              : 
    1532              :         // Drop in-memory state
    1533              :         {
    1534            0 :             let mut locked = self.inner.write().unwrap();
    1535            0 :             let (_nodes, tenants, scheduler) = locked.parts_mut();
    1536              : 
    1537              :             // Dereference Scheduler from shards before dropping them
    1538            0 :             for (_tenant_shard_id, shard) in
    1539            0 :                 tenants.range_mut(TenantShardId::tenant_range(tenant_id))
    1540            0 :             {
    1541            0 :                 shard.intent.clear(scheduler);
    1542            0 :             }
    1543              : 
    1544            0 :             tenants.retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
    1545            0 :             tracing::info!(
    1546            0 :                 "Deleted tenant {tenant_id}, now have {} tenants",
    1547            0 :                 locked.tenants.len()
    1548            0 :             );
    1549              :         };
    1550              : 
    1551              :         // Success is represented as 404, to imitate the existing pageserver deletion API
    1552            0 :         Ok(StatusCode::NOT_FOUND)
    1553            0 :     }
    1554              : 
    1555            0 :     pub(crate) async fn tenant_timeline_create(
    1556            0 :         &self,
    1557            0 :         tenant_id: TenantId,
    1558            0 :         mut create_req: TimelineCreateRequest,
    1559            0 :     ) -> Result<TimelineInfo, ApiError> {
    1560            0 :         tracing::info!(
    1561            0 :             "Creating timeline {}/{}",
    1562            0 :             tenant_id,
    1563            0 :             create_req.new_timeline_id,
    1564            0 :         );
    1565              : 
    1566            0 :         self.ensure_attached_wait(tenant_id).await?;
    1567              : 
    1568              :         // TODO: refuse to do this if shard splitting is in progress
    1569              :         // (https://github.com/neondatabase/neon/issues/6676)
    1570            0 :         let mut targets = {
    1571            0 :             let locked = self.inner.read().unwrap();
    1572            0 :             let mut targets = Vec::new();
    1573              : 
    1574            0 :             for (tenant_shard_id, shard) in
    1575            0 :                 locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1576            0 :             {
    1577            0 :                 let node_id = shard.intent.get_attached().ok_or_else(|| {
    1578            0 :                     ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
    1579            0 :                 })?;
    1580            0 :                 let node = locked
    1581            0 :                     .nodes
    1582            0 :                     .get(&node_id)
    1583            0 :                     .expect("Pageservers may not be deleted while referenced");
    1584            0 : 
    1585            0 :                 targets.push((*tenant_shard_id, node.clone()));
    1586              :             }
    1587            0 :             targets
    1588            0 :         };
    1589            0 : 
    1590            0 :         if targets.is_empty() {
    1591            0 :             return Err(ApiError::NotFound(
    1592            0 :                 anyhow::anyhow!("Tenant not found").into(),
    1593            0 :             ));
    1594            0 :         };
    1595            0 :         let shard_zero = targets.remove(0);
    1596              : 
    1597            0 :         async fn create_one(
    1598            0 :             tenant_shard_id: TenantShardId,
    1599            0 :             node: Node,
    1600            0 :             jwt: Option<String>,
    1601            0 :             create_req: TimelineCreateRequest,
    1602            0 :         ) -> Result<TimelineInfo, ApiError> {
    1603            0 :             tracing::info!(
    1604            0 :                 "Creating timeline on shard {}/{}, attached to node {}",
    1605            0 :                 tenant_shard_id,
    1606            0 :                 create_req.new_timeline_id,
    1607            0 :                 node.id
    1608            0 :             );
    1609            0 :             let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
    1610            0 : 
    1611            0 :             client
    1612            0 :                 .timeline_create(tenant_shard_id, &create_req)
    1613            0 :                 .await
    1614            0 :                 .map_err(|e| match e {
    1615            0 :                     mgmt_api::Error::ApiError(status, msg)
    1616            0 :                         if status == StatusCode::INTERNAL_SERVER_ERROR
    1617            0 :                             || status == StatusCode::NOT_ACCEPTABLE =>
    1618            0 :                     {
    1619            0 :                         // TODO: handle more error codes, e.g. 503 should be passed through.  Make a general wrapper
    1620            0 :                         // for pass-through API calls.
    1621            0 :                         ApiError::InternalServerError(anyhow::anyhow!(msg))
    1622              :                     }
    1623            0 :                     _ => ApiError::Conflict(format!("Failed to create timeline: {e}")),
    1624            0 :                 })
    1625            0 :         }
    1626              : 
    1627              :         // Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
    1628              :         // use whatever LSN that shard picked when creating on subsequent shards.  We arbitrarily use shard zero as the shard
    1629              :         // that will get the first creation request, and propagate the LSN to all the >0 shards.
    1630            0 :         let timeline_info = create_one(
    1631            0 :             shard_zero.0,
    1632            0 :             shard_zero.1,
    1633            0 :             self.config.jwt_token.clone(),
    1634            0 :             create_req.clone(),
    1635            0 :         )
    1636            0 :         .await?;
    1637              : 
    1638              :         // Propagate the LSN that shard zero picked, if caller didn't provide one
    1639            0 :         if create_req.ancestor_timeline_id.is_some() && create_req.ancestor_start_lsn.is_none() {
    1640            0 :             create_req.ancestor_start_lsn = timeline_info.ancestor_lsn;
    1641            0 :         }
    1642              : 
    1643              :         // Create timeline on remaining shards with number >0
    1644            0 :         if !targets.is_empty() {
    1645              :             // If we had multiple shards, issue requests for the remainder now.
    1646            0 :             let jwt = self.config.jwt_token.clone();
    1647            0 :             self.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
    1648            0 :                 let create_req = create_req.clone();
    1649            0 :                 Box::pin(create_one(tenant_shard_id, node, jwt.clone(), create_req))
    1650            0 :             })
    1651            0 :             .await?;
    1652            0 :         }
    1653              : 
    1654            0 :         Ok(timeline_info)
    1655            0 :     }
    1656              : 
    1657              :     /// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation.
    1658              :     ///
    1659              :     /// On success, the returned vector contains exactly the same number of elements as the input `locations`.
    1660            0 :     async fn tenant_for_shards<F, R>(
    1661            0 :         &self,
    1662            0 :         locations: Vec<(TenantShardId, Node)>,
    1663            0 :         mut req_fn: F,
    1664            0 :     ) -> Result<Vec<R>, ApiError>
    1665            0 :     where
    1666            0 :         F: FnMut(
    1667            0 :             TenantShardId,
    1668            0 :             Node,
    1669            0 :         )
    1670            0 :             -> std::pin::Pin<Box<dyn futures::Future<Output = Result<R, ApiError>> + Send>>,
    1671            0 :     {
    1672            0 :         let mut futs = FuturesUnordered::new();
    1673            0 :         let mut results = Vec::with_capacity(locations.len());
    1674              : 
    1675            0 :         for (tenant_shard_id, node) in locations {
    1676            0 :             futs.push(req_fn(tenant_shard_id, node));
    1677            0 :         }
    1678              : 
    1679            0 :         while let Some(r) = futs.next().await {
    1680            0 :             results.push(r?);
    1681              :         }
    1682              : 
    1683            0 :         Ok(results)
    1684            0 :     }
    1685              : 
    1686            0 :     pub(crate) async fn tenant_timeline_delete(
    1687            0 :         &self,
    1688            0 :         tenant_id: TenantId,
    1689            0 :         timeline_id: TimelineId,
    1690            0 :     ) -> Result<StatusCode, ApiError> {
    1691            0 :         tracing::info!("Deleting timeline {}/{}", tenant_id, timeline_id,);
    1692              : 
    1693            0 :         self.ensure_attached_wait(tenant_id).await?;
    1694              : 
    1695              :         // TODO: refuse to do this if shard splitting is in progress
    1696              :         // (https://github.com/neondatabase/neon/issues/6676)
    1697            0 :         let mut targets = {
    1698            0 :             let locked = self.inner.read().unwrap();
    1699            0 :             let mut targets = Vec::new();
    1700              : 
    1701            0 :             for (tenant_shard_id, shard) in
    1702            0 :                 locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1703            0 :             {
    1704            0 :                 let node_id = shard.intent.get_attached().ok_or_else(|| {
    1705            0 :                     ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
    1706            0 :                 })?;
    1707            0 :                 let node = locked
    1708            0 :                     .nodes
    1709            0 :                     .get(&node_id)
    1710            0 :                     .expect("Pageservers may not be deleted while referenced");
    1711            0 : 
    1712            0 :                 targets.push((*tenant_shard_id, node.clone()));
    1713              :             }
    1714            0 :             targets
    1715            0 :         };
    1716            0 : 
    1717            0 :         if targets.is_empty() {
    1718            0 :             return Err(ApiError::NotFound(
    1719            0 :                 anyhow::anyhow!("Tenant not found").into(),
    1720            0 :             ));
    1721            0 :         }
    1722            0 :         let shard_zero = targets.remove(0);
    1723              : 
    1724            0 :         async fn delete_one(
    1725            0 :             tenant_shard_id: TenantShardId,
    1726            0 :             timeline_id: TimelineId,
    1727            0 :             node: Node,
    1728            0 :             jwt: Option<String>,
    1729            0 :         ) -> Result<StatusCode, ApiError> {
    1730            0 :             tracing::info!(
    1731            0 :                 "Deleting timeline on shard {}/{}, attached to node {}",
    1732            0 :                 tenant_shard_id,
    1733            0 :                 timeline_id,
    1734            0 :                 node.id
    1735            0 :             );
    1736              : 
    1737            0 :             let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
    1738            0 :             client
    1739            0 :                 .timeline_delete(tenant_shard_id, timeline_id)
    1740            0 :                 .await
    1741            0 :                 .map_err(|e| {
    1742            0 :                     ApiError::InternalServerError(anyhow::anyhow!(
    1743            0 :                     "Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}",
    1744            0 :                     node.id
    1745            0 :                 ))
    1746            0 :                 })
    1747            0 :         }
    1748              : 
    1749            0 :         let statuses = self
    1750            0 :             .tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
    1751            0 :                 Box::pin(delete_one(
    1752            0 :                     tenant_shard_id,
    1753            0 :                     timeline_id,
    1754            0 :                     node,
    1755            0 :                     self.config.jwt_token.clone(),
    1756            0 :                 ))
    1757            0 :             })
    1758            0 :             .await?;
    1759              : 
    1760              :         // If any shards >0 haven't finished deletion yet, don't start deletion on shard zero
    1761            0 :         if statuses.iter().any(|s| s != &StatusCode::NOT_FOUND) {
    1762            0 :             return Ok(StatusCode::ACCEPTED);
    1763            0 :         }
    1764              : 
    1765              :         // Delete shard zero last: this is not strictly necessary, but since a caller's GET on a timeline will be routed
    1766              :         // to shard zero, it gives a more obvious behavior that a GET returns 404 once the deletion is done.
    1767            0 :         let shard_zero_status = delete_one(
    1768            0 :             shard_zero.0,
    1769            0 :             timeline_id,
    1770            0 :             shard_zero.1,
    1771            0 :             self.config.jwt_token.clone(),
    1772            0 :         )
    1773            0 :         .await?;
    1774              : 
    1775            0 :         Ok(shard_zero_status)
    1776            0 :     }
    1777              : 
    1778              :     /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
    1779              :     /// function looks it up and returns the url.  If the tenant isn't found, returns Err(ApiError::NotFound)
    1780            0 :     pub(crate) fn tenant_shard0_baseurl(
    1781            0 :         &self,
    1782            0 :         tenant_id: TenantId,
    1783            0 :     ) -> Result<(String, TenantShardId), ApiError> {
    1784            0 :         let locked = self.inner.read().unwrap();
    1785            0 :         let Some((tenant_shard_id, shard)) = locked
    1786            0 :             .tenants
    1787            0 :             .range(TenantShardId::tenant_range(tenant_id))
    1788            0 :             .next()
    1789              :         else {
    1790            0 :             return Err(ApiError::NotFound(
    1791            0 :                 anyhow::anyhow!("Tenant {tenant_id} not found").into(),
    1792            0 :             ));
    1793              :         };
    1794              : 
    1795              :         // TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
    1796              :         // point to somewhere we haven't attached yet.
    1797            0 :         let Some(node_id) = shard.intent.get_attached() else {
    1798            0 :             tracing::warn!(
    1799            0 :                 tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
    1800            0 :                 "Shard not scheduled (policy {:?}), cannot generate pass-through URL",
    1801            0 :                 shard.policy
    1802            0 :             );
    1803            0 :             return Err(ApiError::Conflict(
    1804            0 :                 "Cannot call timeline API on non-attached tenant".to_string(),
    1805            0 :             ));
    1806              :         };
    1807              : 
    1808            0 :         let Some(node) = locked.nodes.get(node_id) else {
    1809              :             // This should never happen
    1810            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1811            0 :                 "Shard refers to nonexistent node"
    1812            0 :             )));
    1813              :         };
    1814              : 
    1815            0 :         Ok((node.base_url(), *tenant_shard_id))
    1816            0 :     }
    1817              : 
    1818            0 :     pub(crate) fn tenant_locate(
    1819            0 :         &self,
    1820            0 :         tenant_id: TenantId,
    1821            0 :     ) -> Result<TenantLocateResponse, ApiError> {
    1822            0 :         let locked = self.inner.read().unwrap();
    1823            0 :         tracing::info!("Locating shards for tenant {tenant_id}");
    1824              : 
    1825              :         // Take a snapshot of pageservers
    1826            0 :         let pageservers = locked.nodes.clone();
    1827            0 : 
    1828            0 :         let mut result = Vec::new();
    1829            0 :         let mut shard_params: Option<ShardParameters> = None;
    1830              : 
    1831            0 :         for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1832              :         {
    1833            0 :             let node_id =
    1834            0 :                 shard
    1835            0 :                     .intent
    1836            0 :                     .get_attached()
    1837            0 :                     .ok_or(ApiError::BadRequest(anyhow::anyhow!(
    1838            0 :                         "Cannot locate a tenant that is not attached"
    1839            0 :                     )))?;
    1840              : 
    1841            0 :             let node = pageservers
    1842            0 :                 .get(&node_id)
    1843            0 :                 .expect("Pageservers may not be deleted while referenced");
    1844            0 : 
    1845            0 :             result.push(TenantLocateResponseShard {
    1846            0 :                 shard_id: *tenant_shard_id,
    1847            0 :                 node_id,
    1848            0 :                 listen_http_addr: node.listen_http_addr.clone(),
    1849            0 :                 listen_http_port: node.listen_http_port,
    1850            0 :                 listen_pg_addr: node.listen_pg_addr.clone(),
    1851            0 :                 listen_pg_port: node.listen_pg_port,
    1852            0 :             });
    1853            0 : 
    1854            0 :             match &shard_params {
    1855            0 :                 None => {
    1856            0 :                     shard_params = Some(ShardParameters {
    1857            0 :                         stripe_size: shard.shard.stripe_size,
    1858            0 :                         count: shard.shard.count,
    1859            0 :                     });
    1860            0 :                 }
    1861            0 :                 Some(params) => {
    1862            0 :                     if params.stripe_size != shard.shard.stripe_size {
    1863              :                         // This should never happen.  We enforce at runtime because it's simpler than
    1864              :                         // adding an extra per-tenant data structure to store the things that should be the same
    1865            0 :                         return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1866            0 :                             "Inconsistent shard stripe size parameters!"
    1867            0 :                         )));
    1868            0 :                     }
    1869              :                 }
    1870              :             }
    1871              :         }
    1872              : 
    1873            0 :         if result.is_empty() {
    1874            0 :             return Err(ApiError::NotFound(
    1875            0 :                 anyhow::anyhow!("No shards for this tenant ID found").into(),
    1876            0 :             ));
    1877            0 :         }
    1878            0 :         let shard_params = shard_params.expect("result is non-empty, therefore this is set");
    1879            0 :         tracing::info!(
    1880            0 :             "Located tenant {} with params {:?} on shards {}",
    1881            0 :             tenant_id,
    1882            0 :             shard_params,
    1883            0 :             result
    1884            0 :                 .iter()
    1885            0 :                 .map(|s| format!("{:?}", s))
    1886            0 :                 .collect::<Vec<_>>()
    1887            0 :                 .join(",")
    1888            0 :         );
    1889              : 
    1890            0 :         Ok(TenantLocateResponse {
    1891            0 :             shards: result,
    1892            0 :             shard_params,
    1893            0 :         })
    1894            0 :     }
    1895              : 
    1896            0 :     pub(crate) async fn tenant_shard_split(
    1897            0 :         &self,
    1898            0 :         tenant_id: TenantId,
    1899            0 :         split_req: TenantShardSplitRequest,
    1900            0 :     ) -> Result<TenantShardSplitResponse, ApiError> {
    1901            0 :         let mut policy = None;
    1902            0 :         let mut shard_ident = None;
    1903              : 
    1904              :         // A parent shard which will be split
    1905              :         struct SplitTarget {
    1906              :             parent_id: TenantShardId,
    1907              :             node: Node,
    1908              :             child_ids: Vec<TenantShardId>,
    1909              :         }
    1910              : 
    1911              :         // Validate input, and calculate which shards we will create
    1912            0 :         let (old_shard_count, targets, compute_hook) =
    1913              :             {
    1914            0 :                 let locked = self.inner.read().unwrap();
    1915            0 : 
    1916            0 :                 let pageservers = locked.nodes.clone();
    1917            0 : 
    1918            0 :                 let mut targets = Vec::new();
    1919            0 : 
    1920            0 :                 // In case this is a retry, count how many already-split shards we found
    1921            0 :                 let mut children_found = Vec::new();
    1922            0 :                 let mut old_shard_count = None;
    1923              : 
    1924            0 :                 for (tenant_shard_id, shard) in
    1925            0 :                     locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1926              :                 {
    1927            0 :                     match shard.shard.count.count().cmp(&split_req.new_shard_count) {
    1928              :                         Ordering::Equal => {
    1929              :                             //  Already split this
    1930            0 :                             children_found.push(*tenant_shard_id);
    1931            0 :                             continue;
    1932              :                         }
    1933              :                         Ordering::Greater => {
    1934            0 :                             return Err(ApiError::BadRequest(anyhow::anyhow!(
    1935            0 :                                 "Requested count {} but already have shards at count {}",
    1936            0 :                                 split_req.new_shard_count,
    1937            0 :                                 shard.shard.count.count()
    1938            0 :                             )));
    1939              :                         }
    1940            0 :                         Ordering::Less => {
    1941            0 :                             // Fall through: this shard has lower count than requested,
    1942            0 :                             // is a candidate for splitting.
    1943            0 :                         }
    1944            0 :                     }
    1945            0 : 
    1946            0 :                     match old_shard_count {
    1947            0 :                         None => old_shard_count = Some(shard.shard.count),
    1948            0 :                         Some(old_shard_count) => {
    1949            0 :                             if old_shard_count != shard.shard.count {
    1950              :                                 // We may hit this case if a caller asked for two splits to
    1951              :                                 // different sizes, before the first one is complete.
    1952              :                                 // e.g. 1->2, 2->4, where the 4 call comes while we have a mixture
    1953              :                                 // of shard_count=1 and shard_count=2 shards in the map.
    1954            0 :                                 return Err(ApiError::Conflict(
    1955            0 :                                     "Cannot split, currently mid-split".to_string(),
    1956            0 :                                 ));
    1957            0 :                             }
    1958              :                         }
    1959              :                     }
    1960            0 :                     if policy.is_none() {
    1961            0 :                         policy = Some(shard.policy.clone());
    1962            0 :                     }
    1963            0 :                     if shard_ident.is_none() {
    1964            0 :                         shard_ident = Some(shard.shard);
    1965            0 :                     }
    1966              : 
    1967            0 :                     if tenant_shard_id.shard_count.count() == split_req.new_shard_count {
    1968            0 :                         tracing::info!(
    1969            0 :                             "Tenant shard {} already has shard count {}",
    1970            0 :                             tenant_shard_id,
    1971            0 :                             split_req.new_shard_count
    1972            0 :                         );
    1973            0 :                         continue;
    1974            0 :                     }
    1975              : 
    1976            0 :                     let node_id = shard.intent.get_attached().ok_or(ApiError::BadRequest(
    1977            0 :                         anyhow::anyhow!("Cannot split a tenant that is not attached"),
    1978            0 :                     ))?;
    1979              : 
    1980            0 :                     let node = pageservers
    1981            0 :                         .get(&node_id)
    1982            0 :                         .expect("Pageservers may not be deleted while referenced");
    1983            0 : 
    1984            0 :                     // TODO: if any reconciliation is currently in progress for this shard, wait for it.
    1985            0 : 
    1986            0 :                     targets.push(SplitTarget {
    1987            0 :                         parent_id: *tenant_shard_id,
    1988            0 :                         node: node.clone(),
    1989            0 :                         child_ids: tenant_shard_id
    1990            0 :                             .split(ShardCount::new(split_req.new_shard_count)),
    1991            0 :                     });
    1992              :                 }
    1993              : 
    1994            0 :                 if targets.is_empty() {
    1995            0 :                     if children_found.len() == split_req.new_shard_count as usize {
    1996            0 :                         return Ok(TenantShardSplitResponse {
    1997            0 :                             new_shards: children_found,
    1998            0 :                         });
    1999              :                     } else {
    2000              :                         // No shards found to split, and no existing children found: the
    2001              :                         // tenant doesn't exist at all.
    2002            0 :                         return Err(ApiError::NotFound(
    2003            0 :                             anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
    2004            0 :                         ));
    2005              :                     }
    2006            0 :                 }
    2007            0 : 
    2008            0 :                 (old_shard_count, targets, locked.compute_hook.clone())
    2009            0 :             };
    2010            0 : 
    2011            0 :         // unwrap safety: we would have returned above if we didn't find at least one shard to split
    2012            0 :         let old_shard_count = old_shard_count.unwrap();
    2013            0 :         let shard_ident = shard_ident.unwrap();
    2014            0 :         let policy = policy.unwrap();
    2015            0 : 
    2016            0 :         // FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
    2017            0 :         // request could occur here, deleting or mutating the tenant.  begin_shard_split checks that the
    2018            0 :         // parent shards exist as expected, but it would be neater to do the above pre-checks within the
    2019            0 :         // same database transaction rather than pre-check in-memory and then maybe-fail the database write.
    2020            0 :         // (https://github.com/neondatabase/neon/issues/6676)
    2021            0 : 
    2022            0 :         // Before creating any new child shards in memory or on the pageservers, persist them: this
    2023            0 :         // enables us to ensure that we will always be able to clean up if something goes wrong.  This also
    2024            0 :         // acts as the protection against two concurrent attempts to split: one of them will get a database
    2025            0 :         // error trying to insert the child shards.
    2026            0 :         let mut child_tsps = Vec::new();
    2027            0 :         for target in &targets {
    2028            0 :             let mut this_child_tsps = Vec::new();
    2029            0 :             for child in &target.child_ids {
    2030            0 :                 let mut child_shard = shard_ident;
    2031            0 :                 child_shard.number = child.shard_number;
    2032            0 :                 child_shard.count = child.shard_count;
    2033            0 : 
    2034            0 :                 this_child_tsps.push(TenantShardPersistence {
    2035            0 :                     tenant_id: child.tenant_id.to_string(),
    2036            0 :                     shard_number: child.shard_number.0 as i32,
    2037            0 :                     shard_count: child.shard_count.literal() as i32,
    2038            0 :                     shard_stripe_size: shard_ident.stripe_size.0 as i32,
    2039            0 :                     // Note: this generation is a placeholder, [`Persistence::begin_shard_split`] will
    2040            0 :                     // populate the correct generation as part of its transaction, to protect us
    2041            0 :                     // against racing with changes in the state of the parent.
    2042            0 :                     generation: 0,
    2043            0 :                     generation_pageserver: target.node.id.0 as i64,
    2044            0 :                     placement_policy: serde_json::to_string(&policy).unwrap(),
    2045            0 :                     // TODO: get the config out of the map
    2046            0 :                     config: serde_json::to_string(&TenantConfig::default()).unwrap(),
    2047            0 :                     splitting: SplitState::Splitting,
    2048            0 :                 });
    2049            0 :             }
    2050              : 
    2051            0 :             child_tsps.push((target.parent_id, this_child_tsps));
    2052              :         }
    2053              : 
    2054            0 :         if let Err(e) = self
    2055            0 :             .persistence
    2056            0 :             .begin_shard_split(old_shard_count, tenant_id, child_tsps)
    2057            0 :             .await
    2058              :         {
    2059            0 :             match e {
    2060              :                 DatabaseError::Query(diesel::result::Error::DatabaseError(
    2061              :                     DatabaseErrorKind::UniqueViolation,
    2062              :                     _,
    2063              :                 )) => {
    2064              :                     // Inserting a child shard violated a unique constraint: we raced with another call to
    2065              :                     // this function
    2066            0 :                     tracing::warn!("Conflicting attempt to split {tenant_id}: {e}");
    2067            0 :                     return Err(ApiError::Conflict("Tenant is already splitting".into()));
    2068              :                 }
    2069            0 :                 _ => return Err(ApiError::InternalServerError(e.into())),
    2070              :             }
    2071            0 :         }
    2072            0 : 
    2073            0 :         // Now that I have persisted the splitting state, apply it in-memory.  This is infallible, so
    2074            0 :         // callers may assume that if splitting is set in memory, then it was persisted, and if splitting
    2075            0 :         // is not set in memory, then it was not persisted.
    2076            0 :         {
    2077            0 :             let mut locked = self.inner.write().unwrap();
    2078            0 :             for target in &targets {
    2079            0 :                 if let Some(parent_shard) = locked.tenants.get_mut(&target.parent_id) {
    2080            0 :                     parent_shard.splitting = SplitState::Splitting;
    2081            0 :                 }
    2082              :             }
    2083              :         }
    2084              : 
    2085              :         // FIXME: we have now committed the shard split state to the database, so any subsequent
    2086              :         // failure needs to roll it back.  We will later wrap this function in logic to roll back
    2087              :         // the split if it fails.
    2088              :         // (https://github.com/neondatabase/neon/issues/6676)
    2089              : 
    2090              :         // TODO: issue split calls concurrently (this only matters once we're splitting
    2091              :         // N>1 shards into M shards -- initially we're usually splitting 1 shard into N).
    2092              : 
    2093            0 :         for target in &targets {
    2094              :             let SplitTarget {
    2095            0 :                 parent_id,
    2096            0 :                 node,
    2097            0 :                 child_ids,
    2098            0 :             } = target;
    2099            0 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
    2100            0 :             let response = client
    2101            0 :                 .tenant_shard_split(
    2102            0 :                     *parent_id,
    2103            0 :                     TenantShardSplitRequest {
    2104            0 :                         new_shard_count: split_req.new_shard_count,
    2105            0 :                     },
    2106            0 :                 )
    2107            0 :                 .await
    2108            0 :                 .map_err(|e| ApiError::Conflict(format!("Failed to split {}: {}", parent_id, e)))?;
    2109              : 
    2110            0 :             tracing::info!(
    2111            0 :                 "Split {} into {}",
    2112            0 :                 parent_id,
    2113            0 :                 response
    2114            0 :                     .new_shards
    2115            0 :                     .iter()
    2116            0 :                     .map(|s| format!("{:?}", s))
    2117            0 :                     .collect::<Vec<_>>()
    2118            0 :                     .join(",")
    2119            0 :             );
    2120              : 
    2121            0 :             if &response.new_shards != child_ids {
    2122              :                 // This should never happen: the pageserver should agree with us on how shard splits work.
    2123            0 :                 return Err(ApiError::InternalServerError(anyhow::anyhow!(
    2124            0 :                     "Splitting shard {} resulted in unexpected IDs: {:?} (expected {:?})",
    2125            0 :                     parent_id,
    2126            0 :                     response.new_shards,
    2127            0 :                     child_ids
    2128            0 :                 )));
    2129            0 :             }
    2130              :         }
    2131              : 
    2132              :         // TODO: if the pageserver restarted concurrently with our split API call,
    2133              :         // the actual generation of the child shard might differ from the generation
    2134              :         // we expect it to have.  In order for our in-database generation to end up
    2135              :         // correct, we should carry the child generation back in the response and apply it here
    2136              :         // in complete_shard_split (and apply the correct generation in memory)
    2137              :         // (or, we can carry generation in the request and reject the request if
    2138              :         //  it doesn't match, but that requires more retry logic on this side)
    2139              : 
    2140            0 :         self.persistence
    2141            0 :             .complete_shard_split(tenant_id, old_shard_count)
    2142            0 :             .await?;
    2143              : 
    2144              :         // Replace all the shards we just split with their children: this phase is infallible.
    2145            0 :         let mut response = TenantShardSplitResponse {
    2146            0 :             new_shards: Vec::new(),
    2147            0 :         };
    2148            0 :         let mut child_locations = Vec::new();
    2149            0 :         {
    2150            0 :             let mut locked = self.inner.write().unwrap();
    2151            0 :             let (_nodes, tenants, scheduler) = locked.parts_mut();
    2152            0 :             for target in targets {
    2153              :                 let SplitTarget {
    2154            0 :                     parent_id,
    2155            0 :                     node: _node,
    2156            0 :                     child_ids,
    2157            0 :                 } = target;
    2158            0 :                 let (pageserver, generation, config) = {
    2159            0 :                     let mut old_state = tenants
    2160            0 :                         .remove(&parent_id)
    2161            0 :                         .expect("It was present, we just split it");
    2162            0 :                     let old_attached = old_state.intent.get_attached().unwrap();
    2163            0 :                     old_state.intent.clear(scheduler);
    2164            0 :                     (old_attached, old_state.generation, old_state.config.clone())
    2165            0 :                 };
    2166              : 
    2167            0 :                 for child in child_ids {
    2168            0 :                     let mut child_shard = shard_ident;
    2169            0 :                     child_shard.number = child.shard_number;
    2170            0 :                     child_shard.count = child.shard_count;
    2171            0 : 
    2172            0 :                     let mut child_observed: HashMap<NodeId, ObservedStateLocation> = HashMap::new();
    2173            0 :                     child_observed.insert(
    2174            0 :                         pageserver,
    2175            0 :                         ObservedStateLocation {
    2176            0 :                             conf: Some(attached_location_conf(generation, &child_shard, &config)),
    2177            0 :                         },
    2178            0 :                     );
    2179            0 : 
    2180            0 :                     let mut child_state = TenantState::new(child, child_shard, policy.clone());
    2181            0 :                     child_state.intent = IntentState::single(scheduler, Some(pageserver));
    2182            0 :                     child_state.observed = ObservedState {
    2183            0 :                         locations: child_observed,
    2184            0 :                     };
    2185            0 :                     child_state.generation = generation;
    2186            0 :                     child_state.config = config.clone();
    2187            0 : 
    2188            0 :                     // The child's TenantState::splitting is intentionally left at the default value of Idle,
    2189            0 :                     // as at this point in the split process we have succeeded and this part is infallible:
    2190            0 :                     // we will never need to do any special recovery from this state.
    2191            0 : 
    2192            0 :                     child_locations.push((child, pageserver));
    2193            0 : 
    2194            0 :                     tenants.insert(child, child_state);
    2195            0 :                     response.new_shards.push(child);
    2196            0 :                 }
    2197              :             }
    2198              :         }
    2199              : 
    2200              :         // Send compute notifications for all the new shards
    2201            0 :         let mut failed_notifications = Vec::new();
    2202            0 :         for (child_id, child_ps) in child_locations {
    2203            0 :             if let Err(e) = compute_hook.notify(child_id, child_ps, &self.cancel).await {
    2204            0 :                 tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
    2205            0 :                         child_id, child_ps);
    2206            0 :                 failed_notifications.push(child_id);
    2207            0 :             }
    2208              :         }
    2209              : 
    2210              :         // If we failed any compute notifications, make a note to retry later.
    2211            0 :         if !failed_notifications.is_empty() {
    2212            0 :             let mut locked = self.inner.write().unwrap();
    2213            0 :             for failed in failed_notifications {
    2214            0 :                 if let Some(shard) = locked.tenants.get_mut(&failed) {
    2215            0 :                     shard.pending_compute_notification = true;
    2216            0 :                 }
    2217              :             }
    2218            0 :         }
    2219              : 
    2220            0 :         Ok(response)
    2221            0 :     }
    2222              : 
    2223            0 :     pub(crate) async fn tenant_shard_migrate(
    2224            0 :         &self,
    2225            0 :         tenant_shard_id: TenantShardId,
    2226            0 :         migrate_req: TenantShardMigrateRequest,
    2227            0 :     ) -> Result<TenantShardMigrateResponse, ApiError> {
    2228            0 :         let waiter = {
    2229            0 :             let mut locked = self.inner.write().unwrap();
    2230            0 :             let result_tx = locked.result_tx.clone();
    2231            0 :             let compute_hook = locked.compute_hook.clone();
    2232            0 :             let (nodes, tenants, scheduler) = locked.parts_mut();
    2233              : 
    2234            0 :             let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
    2235            0 :                 return Err(ApiError::NotFound(
    2236            0 :                     anyhow::anyhow!("Tenant shard not found").into(),
    2237            0 :                 ));
    2238              :             };
    2239              : 
    2240            0 :             if shard.intent.get_attached() == &Some(migrate_req.node_id) {
    2241              :                 // No-op case: we will still proceed to wait for reconciliation in case it is
    2242              :                 // incomplete from an earlier update to the intent.
    2243            0 :                 tracing::info!("Migrating: intent is unchanged {:?}", shard.intent);
    2244              :             } else {
    2245            0 :                 let old_attached = *shard.intent.get_attached();
    2246            0 : 
    2247            0 :                 match shard.policy {
    2248            0 :                     PlacementPolicy::Single => {
    2249            0 :                         shard.intent.clear_secondary(scheduler);
    2250            0 :                     }
    2251            0 :                     PlacementPolicy::Double(_n) => {
    2252            0 :                         // If our new attached node was a secondary, it no longer should be.
    2253            0 :                         shard.intent.remove_secondary(scheduler, migrate_req.node_id);
    2254              : 
    2255              :                         // If we were already attached to something, demote that to a secondary
    2256            0 :                         if let Some(old_attached) = old_attached {
    2257            0 :                             shard.intent.push_secondary(scheduler, old_attached);
    2258            0 :                         }
    2259              :                     }
    2260              :                     PlacementPolicy::Detached => {
    2261            0 :                         return Err(ApiError::BadRequest(anyhow::anyhow!(
    2262            0 :                             "Cannot migrate a tenant that is PlacementPolicy::Detached: configure it to an attached policy first"
    2263            0 :                         )))
    2264              :                     }
    2265              :                 }
    2266            0 :                 shard
    2267            0 :                     .intent
    2268            0 :                     .set_attached(scheduler, Some(migrate_req.node_id));
    2269              : 
    2270            0 :                 tracing::info!("Migrating: new intent {:?}", shard.intent);
    2271            0 :                 shard.sequence = shard.sequence.next();
    2272              :             }
    2273              : 
    2274            0 :             shard.maybe_reconcile(
    2275            0 :                 result_tx,
    2276            0 :                 nodes,
    2277            0 :                 &compute_hook,
    2278            0 :                 &self.config,
    2279            0 :                 &self.persistence,
    2280            0 :                 &self.gate,
    2281            0 :                 &self.cancel,
    2282            0 :             )
    2283              :         };
    2284              : 
    2285            0 :         if let Some(waiter) = waiter {
    2286            0 :             waiter.wait_timeout(RECONCILE_TIMEOUT).await?;
    2287              :         } else {
    2288            0 :             tracing::warn!("Migration is a no-op");
    2289              :         }
    2290              : 
    2291            0 :         Ok(TenantShardMigrateResponse {})
    2292            0 :     }
    2293              : 
    2294              :     /// This is for debug/support only: we simply drop all state for a tenant, without
    2295              :     /// detaching or deleting it on pageservers.
    2296            0 :     pub(crate) async fn tenant_drop(&self, tenant_id: TenantId) -> Result<(), ApiError> {
    2297            0 :         self.persistence.delete_tenant(tenant_id).await?;
    2298              : 
    2299            0 :         let mut locked = self.inner.write().unwrap();
    2300            0 :         let (_nodes, tenants, scheduler) = locked.parts_mut();
    2301            0 :         let mut shards = Vec::new();
    2302            0 :         for (tenant_shard_id, _) in tenants.range(TenantShardId::tenant_range(tenant_id)) {
    2303            0 :             shards.push(*tenant_shard_id);
    2304            0 :         }
    2305              : 
    2306            0 :         for shard_id in shards {
    2307            0 :             if let Some(mut shard) = tenants.remove(&shard_id) {
    2308            0 :                 shard.intent.clear(scheduler);
    2309            0 :             }
    2310              :         }
    2311              : 
    2312            0 :         Ok(())
    2313            0 :     }
    2314              : 
    2315              :     /// For debug/support: a full JSON dump of TenantStates.  Returns a response so that
    2316              :     /// we don't have to make TenantState clonable in the return path.
    2317            0 :     pub(crate) fn tenants_dump(&self) -> Result<hyper::Response<hyper::Body>, ApiError> {
    2318            0 :         let serialized = {
    2319            0 :             let locked = self.inner.read().unwrap();
    2320            0 :             let result = locked.tenants.values().collect::<Vec<_>>();
    2321            0 :             serde_json::to_string(&result).map_err(|e| ApiError::InternalServerError(e.into()))?
    2322              :         };
    2323              : 
    2324            0 :         hyper::Response::builder()
    2325            0 :             .status(hyper::StatusCode::OK)
    2326            0 :             .header(hyper::header::CONTENT_TYPE, "application/json")
    2327            0 :             .body(hyper::Body::from(serialized))
    2328            0 :             .map_err(|e| ApiError::InternalServerError(e.into()))
    2329            0 :     }
    2330              : 
    2331              :     /// Check the consistency of in-memory state vs. persistent state, and check that the
    2332              :     /// scheduler's statistics are up to date.
    2333              :     ///
    2334              :     /// These consistency checks expect an **idle** system.  If changes are going on while
    2335              :     /// we run, then we can falsely indicate a consistency issue.  This is sufficient for end-of-test
    2336              :     /// checks, but not suitable for running continuously in the background in the field.
    2337            0 :     pub(crate) async fn consistency_check(&self) -> Result<(), ApiError> {
    2338            0 :         let (mut expect_nodes, mut expect_shards) = {
    2339            0 :             let locked = self.inner.read().unwrap();
    2340            0 : 
    2341            0 :             locked
    2342            0 :                 .scheduler
    2343            0 :                 .consistency_check(locked.nodes.values(), locked.tenants.values())
    2344            0 :                 .context("Scheduler checks")
    2345            0 :                 .map_err(ApiError::InternalServerError)?;
    2346              : 
    2347            0 :             let expect_nodes = locked
    2348            0 :                 .nodes
    2349            0 :                 .values()
    2350            0 :                 .map(|n| n.to_persistent())
    2351            0 :                 .collect::<Vec<_>>();
    2352            0 : 
    2353            0 :             let expect_shards = locked
    2354            0 :                 .tenants
    2355            0 :                 .values()
    2356            0 :                 .map(|t| t.to_persistent())
    2357            0 :                 .collect::<Vec<_>>();
    2358            0 : 
    2359            0 :             (expect_nodes, expect_shards)
    2360              :         };
    2361              : 
    2362            0 :         let mut nodes = self.persistence.list_nodes().await?;
    2363            0 :         expect_nodes.sort_by_key(|n| n.node_id);
    2364            0 :         nodes.sort_by_key(|n| n.node_id);
    2365            0 : 
    2366            0 :         if nodes != expect_nodes {
    2367            0 :             tracing::error!("Consistency check failed on nodes.");
    2368            0 :             tracing::error!(
    2369            0 :                 "Nodes in memory: {}",
    2370            0 :                 serde_json::to_string(&expect_nodes)
    2371            0 :                     .map_err(|e| ApiError::InternalServerError(e.into()))?
    2372            0 :             );
    2373            0 :             tracing::error!(
    2374            0 :                 "Nodes in database: {}",
    2375            0 :                 serde_json::to_string(&nodes)
    2376            0 :                     .map_err(|e| ApiError::InternalServerError(e.into()))?
    2377            0 :             );
    2378            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
    2379            0 :                 "Node consistency failure"
    2380            0 :             )));
    2381            0 :         }
    2382              : 
    2383            0 :         let mut shards = self.persistence.list_tenant_shards().await?;
    2384            0 :         shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
    2385            0 :         expect_shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
    2386            0 : 
    2387            0 :         if shards != expect_shards {
    2388            0 :             tracing::error!("Consistency check failed on shards.");
    2389            0 :             tracing::error!(
    2390            0 :                 "Shards in memory: {}",
    2391            0 :                 serde_json::to_string(&expect_shards)
    2392            0 :                     .map_err(|e| ApiError::InternalServerError(e.into()))?
    2393            0 :             );
    2394            0 :             tracing::error!(
    2395            0 :                 "Shards in database: {}",
    2396            0 :                 serde_json::to_string(&shards)
    2397            0 :                     .map_err(|e| ApiError::InternalServerError(e.into()))?
    2398            0 :             );
    2399            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
    2400            0 :                 "Shard consistency failure"
    2401            0 :             )));
    2402            0 :         }
    2403            0 : 
    2404            0 :         Ok(())
    2405            0 :     }
    2406              : 
    2407              :     /// For debug/support: a JSON dump of the [`Scheduler`].  Returns a response so that
    2408              :     /// we don't have to make TenantState clonable in the return path.
    2409            0 :     pub(crate) fn scheduler_dump(&self) -> Result<hyper::Response<hyper::Body>, ApiError> {
    2410            0 :         let serialized = {
    2411            0 :             let locked = self.inner.read().unwrap();
    2412            0 :             serde_json::to_string(&locked.scheduler)
    2413            0 :                 .map_err(|e| ApiError::InternalServerError(e.into()))?
    2414              :         };
    2415              : 
    2416            0 :         hyper::Response::builder()
    2417            0 :             .status(hyper::StatusCode::OK)
    2418            0 :             .header(hyper::header::CONTENT_TYPE, "application/json")
    2419            0 :             .body(hyper::Body::from(serialized))
    2420            0 :             .map_err(|e| ApiError::InternalServerError(e.into()))
    2421            0 :     }
    2422              : 
    2423              :     /// This is for debug/support only: we simply drop all state for a tenant, without
    2424              :     /// detaching or deleting it on pageservers.  We do not try and re-schedule any
    2425              :     /// tenants that were on this node.
    2426              :     ///
    2427              :     /// TODO: proper node deletion API that unhooks things more gracefully
    2428            0 :     pub(crate) async fn node_drop(&self, node_id: NodeId) -> Result<(), ApiError> {
    2429            0 :         self.persistence.delete_node(node_id).await?;
    2430              : 
    2431            0 :         let mut locked = self.inner.write().unwrap();
    2432              : 
    2433            0 :         for shard in locked.tenants.values_mut() {
    2434            0 :             shard.deref_node(node_id);
    2435            0 :         }
    2436              : 
    2437            0 :         let mut nodes = (*locked.nodes).clone();
    2438            0 :         nodes.remove(&node_id);
    2439            0 :         locked.nodes = Arc::new(nodes);
    2440            0 : 
    2441            0 :         locked.scheduler.node_remove(node_id);
    2442            0 : 
    2443            0 :         Ok(())
    2444            0 :     }
    2445              : 
    2446            0 :     pub(crate) async fn node_list(&self) -> Result<Vec<Node>, ApiError> {
    2447            0 :         let nodes = {
    2448            0 :             self.inner
    2449            0 :                 .read()
    2450            0 :                 .unwrap()
    2451            0 :                 .nodes
    2452            0 :                 .values()
    2453            0 :                 .cloned()
    2454            0 :                 .collect::<Vec<_>>()
    2455            0 :         };
    2456            0 : 
    2457            0 :         Ok(nodes)
    2458            0 :     }
    2459              : 
    2460            0 :     pub(crate) async fn node_register(
    2461            0 :         &self,
    2462            0 :         register_req: NodeRegisterRequest,
    2463            0 :     ) -> Result<(), ApiError> {
    2464            0 :         // Pre-check for an already-existing node
    2465            0 :         {
    2466            0 :             let locked = self.inner.read().unwrap();
    2467            0 :             if let Some(node) = locked.nodes.get(&register_req.node_id) {
    2468              :                 // Note that we do not do a total equality of the struct, because we don't require
    2469              :                 // the availability/scheduling states to agree for a POST to be idempotent.
    2470            0 :                 if node.listen_http_addr == register_req.listen_http_addr
    2471            0 :                     && node.listen_http_port == register_req.listen_http_port
    2472            0 :                     && node.listen_pg_addr == register_req.listen_pg_addr
    2473            0 :                     && node.listen_pg_port == register_req.listen_pg_port
    2474              :                 {
    2475            0 :                     tracing::info!(
    2476            0 :                         "Node {} re-registered with matching address",
    2477            0 :                         register_req.node_id
    2478            0 :                     );
    2479            0 :                     return Ok(());
    2480              :                 } else {
    2481              :                     // TODO: decide if we want to allow modifying node addresses without removing and re-adding
    2482              :                     // the node.  Safest/simplest thing is to refuse it, and usually we deploy with
    2483              :                     // a fixed address through the lifetime of a node.
    2484            0 :                     tracing::warn!(
    2485            0 :                         "Node {} tried to register with different address",
    2486            0 :                         register_req.node_id
    2487            0 :                     );
    2488            0 :                     return Err(ApiError::Conflict(
    2489            0 :                         "Node is already registered with different address".to_string(),
    2490            0 :                     ));
    2491              :                 }
    2492            0 :             }
    2493            0 :         }
    2494            0 : 
    2495            0 :         // Ordering: we must persist the new node _before_ adding it to in-memory state.
    2496            0 :         // This ensures that before we use it for anything or expose it via any external
    2497            0 :         // API, it is guaranteed to be available after a restart.
    2498            0 :         let new_node = Node {
    2499            0 :             id: register_req.node_id,
    2500            0 :             listen_http_addr: register_req.listen_http_addr,
    2501            0 :             listen_http_port: register_req.listen_http_port,
    2502            0 :             listen_pg_addr: register_req.listen_pg_addr,
    2503            0 :             listen_pg_port: register_req.listen_pg_port,
    2504            0 :             scheduling: NodeSchedulingPolicy::Filling,
    2505            0 :             // TODO: we shouldn't really call this Active until we've heartbeated it.
    2506            0 :             availability: NodeAvailability::Active,
    2507            0 :         };
    2508            0 :         // TODO: idempotency if the node already exists in the database
    2509            0 :         self.persistence.insert_node(&new_node).await?;
    2510              : 
    2511            0 :         let mut locked = self.inner.write().unwrap();
    2512            0 :         let mut new_nodes = (*locked.nodes).clone();
    2513            0 : 
    2514            0 :         locked.scheduler.node_upsert(&new_node);
    2515            0 :         new_nodes.insert(register_req.node_id, new_node);
    2516            0 : 
    2517            0 :         locked.nodes = Arc::new(new_nodes);
    2518              : 
    2519            0 :         tracing::info!(
    2520            0 :             "Registered pageserver {}, now have {} pageservers",
    2521            0 :             register_req.node_id,
    2522            0 :             locked.nodes.len()
    2523            0 :         );
    2524            0 :         Ok(())
    2525            0 :     }
    2526              : 
    2527            0 :     pub(crate) async fn node_configure(
    2528            0 :         &self,
    2529            0 :         config_req: NodeConfigureRequest,
    2530            0 :     ) -> Result<(), ApiError> {
    2531            0 :         if let Some(scheduling) = config_req.scheduling {
    2532              :             // Scheduling is a persistent part of Node: we must write updates to the database before
    2533              :             // applying them in memory
    2534            0 :             self.persistence
    2535            0 :                 .update_node(config_req.node_id, scheduling)
    2536            0 :                 .await?;
    2537            0 :         }
    2538              : 
    2539            0 :         let mut locked = self.inner.write().unwrap();
    2540            0 :         let result_tx = locked.result_tx.clone();
    2541            0 :         let compute_hook = locked.compute_hook.clone();
    2542            0 :         let (nodes, tenants, scheduler) = locked.parts_mut();
    2543            0 : 
    2544            0 :         let mut new_nodes = (**nodes).clone();
    2545              : 
    2546            0 :         let Some(node) = new_nodes.get_mut(&config_req.node_id) else {
    2547            0 :             return Err(ApiError::NotFound(
    2548            0 :                 anyhow::anyhow!("Node not registered").into(),
    2549            0 :             ));
    2550              :         };
    2551              : 
    2552            0 :         let mut offline_transition = false;
    2553            0 :         let mut active_transition = false;
    2554              : 
    2555            0 :         if let Some(availability) = &config_req.availability {
    2556            0 :             match (availability, &node.availability) {
    2557              :                 (NodeAvailability::Offline, NodeAvailability::Active) => {
    2558            0 :                     tracing::info!("Node {} transition to offline", config_req.node_id);
    2559            0 :                     offline_transition = true;
    2560              :                 }
    2561              :                 (NodeAvailability::Active, NodeAvailability::Offline) => {
    2562            0 :                     tracing::info!("Node {} transition to active", config_req.node_id);
    2563            0 :                     active_transition = true;
    2564              :                 }
    2565              :                 _ => {
    2566            0 :                     tracing::info!("Node {} no change during config", config_req.node_id);
    2567              :                     // No change
    2568              :                 }
    2569              :             };
    2570            0 :             node.availability = *availability;
    2571            0 :         }
    2572              : 
    2573            0 :         if let Some(scheduling) = config_req.scheduling {
    2574            0 :             node.scheduling = scheduling;
    2575            0 : 
    2576            0 :             // TODO: once we have a background scheduling ticker for fill/drain, kick it
    2577            0 :             // to wake up and start working.
    2578            0 :         }
    2579              : 
    2580              :         // Update the scheduler, in case the elegibility of the node for new shards has changed
    2581            0 :         scheduler.node_upsert(node);
    2582            0 : 
    2583            0 :         let new_nodes = Arc::new(new_nodes);
    2584            0 : 
    2585            0 :         if offline_transition {
    2586            0 :             let mut tenants_affected: usize = 0;
    2587            0 :             for (tenant_shard_id, tenant_state) in tenants {
    2588            0 :                 if let Some(observed_loc) =
    2589            0 :                     tenant_state.observed.locations.get_mut(&config_req.node_id)
    2590            0 :                 {
    2591            0 :                     // When a node goes offline, we set its observed configuration to None, indicating unknown: we will
    2592            0 :                     // not assume our knowledge of the node's configuration is accurate until it comes back online
    2593            0 :                     observed_loc.conf = None;
    2594            0 :                 }
    2595              : 
    2596            0 :                 if tenant_state.intent.notify_offline(config_req.node_id) {
    2597            0 :                     tenant_state.sequence = tenant_state.sequence.next();
    2598            0 :                     match tenant_state.schedule(scheduler) {
    2599            0 :                         Err(e) => {
    2600              :                             // It is possible that some tenants will become unschedulable when too many pageservers
    2601              :                             // go offline: in this case there isn't much we can do other than make the issue observable.
    2602              :                             // TODO: give TenantState a scheduling error attribute to be queried later.
    2603            0 :                             tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id);
    2604              :                         }
    2605              :                         Ok(()) => {
    2606            0 :                             if tenant_state
    2607            0 :                                 .maybe_reconcile(
    2608            0 :                                     result_tx.clone(),
    2609            0 :                                     &new_nodes,
    2610            0 :                                     &compute_hook,
    2611            0 :                                     &self.config,
    2612            0 :                                     &self.persistence,
    2613            0 :                                     &self.gate,
    2614            0 :                                     &self.cancel,
    2615            0 :                                 )
    2616            0 :                                 .is_some()
    2617            0 :                             {
    2618            0 :                                 tenants_affected += 1;
    2619            0 :                             };
    2620              :                         }
    2621              :                     }
    2622            0 :                 }
    2623              :             }
    2624            0 :             tracing::info!(
    2625            0 :                 "Launched {} reconciler tasks for tenants affected by node {} going offline",
    2626            0 :                 tenants_affected,
    2627            0 :                 config_req.node_id
    2628            0 :             )
    2629            0 :         }
    2630              : 
    2631            0 :         if active_transition {
    2632              :             // When a node comes back online, we must reconcile any tenant that has a None observed
    2633              :             // location on the node.
    2634            0 :             for tenant_state in locked.tenants.values_mut() {
    2635            0 :                 if let Some(observed_loc) =
    2636            0 :                     tenant_state.observed.locations.get_mut(&config_req.node_id)
    2637              :                 {
    2638            0 :                     if observed_loc.conf.is_none() {
    2639            0 :                         tenant_state.maybe_reconcile(
    2640            0 :                             result_tx.clone(),
    2641            0 :                             &new_nodes,
    2642            0 :                             &compute_hook,
    2643            0 :                             &self.config,
    2644            0 :                             &self.persistence,
    2645            0 :                             &self.gate,
    2646            0 :                             &self.cancel,
    2647            0 :                         );
    2648            0 :                     }
    2649            0 :                 }
    2650              :             }
    2651              : 
    2652              :             // TODO: in the background, we should balance work back onto this pageserver
    2653            0 :         }
    2654              : 
    2655            0 :         locked.nodes = new_nodes;
    2656            0 : 
    2657            0 :         Ok(())
    2658            0 :     }
    2659              : 
    2660              :     /// Helper for methods that will try and call pageserver APIs for
    2661              :     /// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
    2662              :     /// is attached somewhere.
    2663            0 :     fn ensure_attached_schedule(
    2664            0 :         &self,
    2665            0 :         mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,
    2666            0 :         tenant_id: TenantId,
    2667            0 :     ) -> Result<Vec<ReconcilerWaiter>, anyhow::Error> {
    2668            0 :         let mut waiters = Vec::new();
    2669            0 :         let result_tx = locked.result_tx.clone();
    2670            0 :         let compute_hook = locked.compute_hook.clone();
    2671            0 :         let (nodes, tenants, scheduler) = locked.parts_mut();
    2672              : 
    2673            0 :         for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
    2674            0 :             shard.schedule(scheduler)?;
    2675              : 
    2676            0 :             if let Some(waiter) = shard.maybe_reconcile(
    2677            0 :                 result_tx.clone(),
    2678            0 :                 nodes,
    2679            0 :                 &compute_hook,
    2680            0 :                 &self.config,
    2681            0 :                 &self.persistence,
    2682            0 :                 &self.gate,
    2683            0 :                 &self.cancel,
    2684            0 :             ) {
    2685            0 :                 waiters.push(waiter);
    2686            0 :             }
    2687              :         }
    2688            0 :         Ok(waiters)
    2689            0 :     }
    2690              : 
    2691            0 :     async fn ensure_attached_wait(&self, tenant_id: TenantId) -> Result<(), ApiError> {
    2692            0 :         let ensure_waiters = {
    2693            0 :             let locked = self.inner.write().unwrap();
    2694              : 
    2695              :             // Check if the tenant is splitting: in this case, even if it is attached,
    2696              :             // we must act as if it is not: this blocks e.g. timeline creation/deletion
    2697              :             // operations during the split.
    2698            0 :             for (_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
    2699            0 :                 if !matches!(shard.splitting, SplitState::Idle) {
    2700            0 :                     return Err(ApiError::ResourceUnavailable(
    2701            0 :                         "Tenant shards are currently splitting".into(),
    2702            0 :                     ));
    2703            0 :                 }
    2704              :             }
    2705              : 
    2706            0 :             self.ensure_attached_schedule(locked, tenant_id)
    2707            0 :                 .map_err(ApiError::InternalServerError)?
    2708              :         };
    2709              : 
    2710            0 :         let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap();
    2711            0 :         for waiter in ensure_waiters {
    2712            0 :             let timeout = deadline.duration_since(Instant::now());
    2713            0 :             waiter.wait_timeout(timeout).await?;
    2714              :         }
    2715              : 
    2716            0 :         Ok(())
    2717            0 :     }
    2718              : 
    2719              :     /// Check all tenants for pending reconciliation work, and reconcile those in need
    2720              :     ///
    2721              :     /// Returns how many reconciliation tasks were started
    2722            0 :     fn reconcile_all(&self) -> usize {
    2723            0 :         let mut locked = self.inner.write().unwrap();
    2724            0 :         let result_tx = locked.result_tx.clone();
    2725            0 :         let compute_hook = locked.compute_hook.clone();
    2726            0 :         let pageservers = locked.nodes.clone();
    2727            0 :         locked
    2728            0 :             .tenants
    2729            0 :             .iter_mut()
    2730            0 :             .filter_map(|(_tenant_shard_id, shard)| {
    2731            0 :                 shard.maybe_reconcile(
    2732            0 :                     result_tx.clone(),
    2733            0 :                     &pageservers,
    2734            0 :                     &compute_hook,
    2735            0 :                     &self.config,
    2736            0 :                     &self.persistence,
    2737            0 :                     &self.gate,
    2738            0 :                     &self.cancel,
    2739            0 :                 )
    2740            0 :             })
    2741            0 :             .count()
    2742            0 :     }
    2743              : 
    2744            0 :     pub async fn shutdown(&self) {
    2745            0 :         // Note that this already stops processing any results from reconciles: so
    2746            0 :         // we do not expect that our [`TenantState`] objects will reach a neat
    2747            0 :         // final state.
    2748            0 :         self.cancel.cancel();
    2749            0 : 
    2750            0 :         // The cancellation tokens in [`crate::reconciler::Reconciler`] are children
    2751            0 :         // of our cancellation token, so we do not need to explicitly cancel each of
    2752            0 :         // them.
    2753            0 : 
    2754            0 :         // Background tasks and reconcilers hold gate guards: this waits for them all
    2755            0 :         // to complete.
    2756            0 :         self.gate.close().await;
    2757            0 :     }
    2758              : }
        

Generated by: LCOV version 2.1-beta