LCOV - code coverage report
Current view: top level - control_plane/attachment_service/src - service.rs (source / functions) Coverage Total Hit
Test: 190869232aac3a234374e5bb62582e91cf5f5818.info Lines: 0.0 % 1834 0
Test Date: 2024-02-23 13:21:27 Functions: 0.0 % 213 0

            Line data    Source code
       1              : use std::{
       2              :     borrow::Cow,
       3              :     cmp::Ordering,
       4              :     collections::{BTreeMap, HashMap, HashSet},
       5              :     str::FromStr,
       6              :     sync::Arc,
       7              :     time::{Duration, Instant},
       8              : };
       9              : 
      10              : use anyhow::Context;
      11              : use control_plane::attachment_service::{
      12              :     AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, NodeAvailability,
      13              :     NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, TenantCreateResponse,
      14              :     TenantCreateResponseShard, TenantLocateResponse, TenantLocateResponseShard,
      15              :     TenantShardMigrateRequest, TenantShardMigrateResponse,
      16              : };
      17              : use diesel::result::DatabaseErrorKind;
      18              : use futures::{stream::FuturesUnordered, StreamExt};
      19              : use hyper::StatusCode;
      20              : use pageserver_api::{
      21              :     control_api::{
      22              :         ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
      23              :         ValidateResponse, ValidateResponseTenant,
      24              :     },
      25              :     models::{
      26              :         self, LocationConfig, LocationConfigListResponse, LocationConfigMode, ShardParameters,
      27              :         TenantConfig, TenantCreateRequest, TenantLocationConfigRequest,
      28              :         TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
      29              :         TenantShardSplitResponse, TenantTimeTravelRequest, TimelineCreateRequest, TimelineInfo,
      30              :     },
      31              :     shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
      32              : };
      33              : use pageserver_client::mgmt_api;
      34              : use tokio_util::sync::CancellationToken;
      35              : use tracing::instrument;
      36              : use utils::{
      37              :     backoff,
      38              :     completion::Barrier,
      39              :     generation::Generation,
      40              :     http::error::ApiError,
      41              :     id::{NodeId, TenantId, TimelineId},
      42              :     seqwait::SeqWait,
      43              :     sync::gate::Gate,
      44              : };
      45              : 
      46              : use crate::{
      47              :     compute_hook::{self, ComputeHook},
      48              :     node::Node,
      49              :     persistence::{split_state::SplitState, DatabaseError, Persistence, TenantShardPersistence},
      50              :     reconciler::attached_location_conf,
      51              :     scheduler::Scheduler,
      52              :     tenant_state::{
      53              :         IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
      54              :         ReconcilerWaiter, TenantState,
      55              :     },
      56              :     PlacementPolicy, Sequence,
      57              : };
      58              : 
      59              : // For operations that should be quick, like attaching a new tenant
      60              : const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
      61              : 
      62              : // For operations that might be slow, like migrating a tenant with
      63              : // some data in it.
      64              : const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
      65              : 
      66              : /// How long [`Service::startup_reconcile`] is allowed to take before it should give
      67              : /// up on unresponsive pageservers and proceed.
      68              : pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
      69              : 
      70              : // Top level state available to all HTTP handlers
      71              : struct ServiceState {
      72              :     tenants: BTreeMap<TenantShardId, TenantState>,
      73              : 
      74              :     nodes: Arc<HashMap<NodeId, Node>>,
      75              : 
      76              :     scheduler: Scheduler,
      77              : 
      78              :     compute_hook: Arc<ComputeHook>,
      79              : 
      80              :     result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
      81              : }
      82              : 
      83              : impl ServiceState {
      84            0 :     fn new(
      85            0 :         config: Config,
      86            0 :         result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
      87            0 :         nodes: HashMap<NodeId, Node>,
      88            0 :         tenants: BTreeMap<TenantShardId, TenantState>,
      89            0 :         scheduler: Scheduler,
      90            0 :     ) -> Self {
      91            0 :         Self {
      92            0 :             tenants,
      93            0 :             nodes: Arc::new(nodes),
      94            0 :             scheduler,
      95            0 :             compute_hook: Arc::new(ComputeHook::new(config)),
      96            0 :             result_tx,
      97            0 :         }
      98            0 :     }
      99              : 
     100            0 :     fn parts_mut(
     101            0 :         &mut self,
     102            0 :     ) -> (
     103            0 :         &mut Arc<HashMap<NodeId, Node>>,
     104            0 :         &mut BTreeMap<TenantShardId, TenantState>,
     105            0 :         &mut Scheduler,
     106            0 :     ) {
     107            0 :         (&mut self.nodes, &mut self.tenants, &mut self.scheduler)
     108            0 :     }
     109              : }
     110              : 
     111            0 : #[derive(Clone)]
     112              : pub struct Config {
     113              :     // All pageservers managed by one instance of this service must have
     114              :     // the same public key.  This JWT token will be used to authenticate
     115              :     // this service to the pageservers it manages.
     116              :     pub jwt_token: Option<String>,
     117              : 
     118              :     // This JWT token will be used to authenticate this service to the control plane.
     119              :     pub control_plane_jwt_token: Option<String>,
     120              : 
     121              :     /// Where the compute hook should send notifications of pageserver attachment locations
     122              :     /// (this URL points to the control plane in prod). If this is None, the compute hook will
     123              :     /// assume it is running in a test environment and try to update neon_local.
     124              :     pub compute_hook_url: Option<String>,
     125              : }
     126              : 
     127              : impl From<DatabaseError> for ApiError {
     128            0 :     fn from(err: DatabaseError) -> ApiError {
     129            0 :         match err {
     130            0 :             DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
     131              :             // FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
     132              :             DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
     133            0 :                 ApiError::ShuttingDown
     134              :             }
     135            0 :             DatabaseError::Logical(reason) => {
     136            0 :                 ApiError::InternalServerError(anyhow::anyhow!(reason))
     137              :             }
     138              :         }
     139            0 :     }
     140              : }
     141              : 
     142              : pub struct Service {
     143              :     inner: Arc<std::sync::RwLock<ServiceState>>,
     144              :     config: Config,
     145              :     persistence: Arc<Persistence>,
     146              : 
     147              :     // Process shutdown will fire this token
     148              :     cancel: CancellationToken,
     149              : 
     150              :     // Background tasks will hold this gate
     151              :     gate: Gate,
     152              : 
     153              :     /// This waits for initial reconciliation with pageservers to complete.  Until this barrier
     154              :     /// passes, it isn't safe to do any actions that mutate tenants.
     155              :     pub(crate) startup_complete: Barrier,
     156              : }
     157              : 
     158              : impl From<ReconcileWaitError> for ApiError {
     159            0 :     fn from(value: ReconcileWaitError) -> Self {
     160            0 :         match value {
     161            0 :             ReconcileWaitError::Shutdown => ApiError::ShuttingDown,
     162            0 :             e @ ReconcileWaitError::Timeout(_) => ApiError::Timeout(format!("{e}").into()),
     163            0 :             e @ ReconcileWaitError::Failed(..) => ApiError::InternalServerError(anyhow::anyhow!(e)),
     164              :         }
     165            0 :     }
     166              : }
     167              : 
     168              : impl Service {
     169            0 :     pub fn get_config(&self) -> &Config {
     170            0 :         &self.config
     171            0 :     }
     172              : 
     173              :     /// Called once on startup, this function attempts to contact all pageservers to build an up-to-date
     174              :     /// view of the world, and determine which pageservers are responsive.
     175            0 :     #[instrument(skip_all)]
     176              :     async fn startup_reconcile(self: &Arc<Service>) {
     177              :         // For all tenant shards, a vector of observed states on nodes (where None means
     178              :         // indeterminate, same as in [`ObservedStateLocation`])
     179              :         let mut observed = HashMap::new();
     180              : 
     181              :         let mut nodes_online = HashSet::new();
     182              : 
     183              :         // Startup reconciliation does I/O to other services: whether they
     184              :         // are responsive or not, we should aim to finish within our deadline, because:
     185              :         // - If we don't, a k8s readiness hook watching /ready will kill us.
     186              :         // - While we're waiting for startup reconciliation, we are not fully
     187              :         //   available for end user operations like creating/deleting tenants and timelines.
     188              :         //
     189              :         // We set multiple deadlines to break up the time available between the phases of work: this is
     190              :         // arbitrary, but avoids a situation where the first phase could burn our entire timeout period.
     191              :         let start_at = Instant::now();
     192              :         let node_scan_deadline = start_at
     193              :             .checked_add(STARTUP_RECONCILE_TIMEOUT / 2)
     194              :             .expect("Reconcile timeout is a modest constant");
     195              : 
     196              :         let compute_notify_deadline = start_at
     197              :             .checked_add((STARTUP_RECONCILE_TIMEOUT / 4) * 3)
     198              :             .expect("Reconcile timeout is a modest constant");
     199              : 
     200              :         // Accumulate a list of any tenant locations that ought to be detached
     201              :         let mut cleanup = Vec::new();
     202              : 
     203              :         let node_listings = self.scan_node_locations(node_scan_deadline).await;
     204              :         for (node_id, list_response) in node_listings {
     205              :             let tenant_shards = list_response.tenant_shards;
     206            0 :             tracing::info!(
     207            0 :                 "Received {} shard statuses from pageserver {}, setting it to Active",
     208            0 :                 tenant_shards.len(),
     209            0 :                 node_id
     210            0 :             );
     211              :             nodes_online.insert(node_id);
     212              : 
     213              :             for (tenant_shard_id, conf_opt) in tenant_shards {
     214              :                 observed.insert(tenant_shard_id, (node_id, conf_opt));
     215              :             }
     216              :         }
     217              : 
     218              :         // List of tenants for which we will attempt to notify compute of their location at startup
     219              :         let mut compute_notifications = Vec::new();
     220              : 
     221              :         // Populate intent and observed states for all tenants, based on reported state on pageservers
     222              :         let shard_count = {
     223              :             let mut locked = self.inner.write().unwrap();
     224              :             let (nodes, tenants, scheduler) = locked.parts_mut();
     225              : 
     226              :             // Mark nodes online if they responded to us: nodes are offline by default after a restart.
     227              :             let mut new_nodes = (**nodes).clone();
     228              :             for (node_id, node) in new_nodes.iter_mut() {
     229              :                 if nodes_online.contains(node_id) {
     230              :                     node.availability = NodeAvailability::Active;
     231              :                     scheduler.node_upsert(node);
     232              :                 }
     233              :             }
     234              :             *nodes = Arc::new(new_nodes);
     235              : 
     236              :             for (tenant_shard_id, (node_id, observed_loc)) in observed {
     237              :                 let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else {
     238              :                     cleanup.push((tenant_shard_id, node_id));
     239              :                     continue;
     240              :                 };
     241              : 
     242              :                 tenant_state
     243              :                     .observed
     244              :                     .locations
     245              :                     .insert(node_id, ObservedStateLocation { conf: observed_loc });
     246              :             }
     247              : 
     248              :             // Populate each tenant's intent state
     249              :             for (tenant_shard_id, tenant_state) in tenants.iter_mut() {
     250              :                 tenant_state.intent_from_observed();
     251              :                 if let Err(e) = tenant_state.schedule(scheduler) {
     252              :                     // Non-fatal error: we are unable to properly schedule the tenant, perhaps because
     253              :                     // not enough pageservers are available.  The tenant may well still be available
     254              :                     // to clients.
     255            0 :                     tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}");
     256              :                 } else {
     257              :                     // If we're both intending and observed to be attached at a particular node, we will
     258              :                     // emit a compute notification for this. In the case where our observed state does not
     259              :                     // yet match our intent, we will eventually reconcile, and that will emit a compute notification.
     260              :                     if let Some(attached_at) = tenant_state.stably_attached() {
     261              :                         compute_notifications.push((*tenant_shard_id, attached_at));
     262              :                     }
     263              :                 }
     264              :             }
     265              : 
     266              :             tenants.len()
     267              :         };
     268              : 
     269              :         // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
     270              :         // generation_pageserver in the database.
     271              : 
     272              :         // Emit compute hook notifications for all tenants which are already stably attached.  Other tenants
     273              :         // will emit compute hook notifications when they reconcile.
     274              :         //
     275              :         // Ordering: we must complete these notification attempts before doing any other reconciliation for the
     276              :         // tenants named here, because otherwise our calls to notify() might race with more recent values
     277              :         // generated by reconciliation.
     278              :         let notify_failures = self
     279              :             .compute_notify_many(compute_notifications, compute_notify_deadline)
     280              :             .await;
     281              : 
     282              :         // Compute notify is fallible.  If it fails here, do not delay overall startup: set the
     283              :         // flag on these shards that they have a pending notification.
     284              :         // Update tenant state for any that failed to do their initial compute notify, so that they'll retry later.
     285              :         {
     286              :             let mut locked = self.inner.write().unwrap();
     287              :             for tenant_shard_id in notify_failures.into_iter() {
     288              :                 if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
     289              :                     shard.pending_compute_notification = true;
     290              :                 }
     291              :             }
     292              :         }
     293              : 
     294              :         // Finally, now that the service is up and running, launch reconcile operations for any tenants
     295              :         // which require it: under normal circumstances this should only include tenants that were in some
     296              :         // transient state before we restarted, or any tenants whose compute hooks failed above.
     297              :         let reconcile_tasks = self.reconcile_all();
     298              :         // We will not wait for these reconciliation tasks to run here: we're now done with startup and
     299              :         // normal operations may proceed.
     300              : 
     301              :         // Clean up any tenants that were found on pageservers but are not known to us.  Do this in the
     302              :         // background because it does not need to complete in order to proceed with other work.
     303              :         if !cleanup.is_empty() {
     304            0 :             tracing::info!("Cleaning up {} locations in the background", cleanup.len());
     305              :             tokio::task::spawn({
     306              :                 let cleanup_self = self.clone();
     307            0 :                 async move { cleanup_self.cleanup_locations(cleanup).await }
     308              :             });
     309              :         }
     310              : 
     311            0 :         tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
     312              :     }
     313              : 
     314              :     /// Used during [`Self::startup_reconcile`]: issue GETs to all nodes concurrently, with a deadline.
     315              :     ///
     316              :     /// The result includes only nodes which responded within the deadline
     317            0 :     async fn scan_node_locations(
     318            0 :         &self,
     319            0 :         deadline: Instant,
     320            0 :     ) -> HashMap<NodeId, LocationConfigListResponse> {
     321            0 :         let nodes = {
     322            0 :             let locked = self.inner.read().unwrap();
     323            0 :             locked.nodes.clone()
     324            0 :         };
     325            0 : 
     326            0 :         let mut node_results = HashMap::new();
     327            0 : 
     328            0 :         let mut node_list_futs = FuturesUnordered::new();
     329              : 
     330            0 :         for node in nodes.values() {
     331            0 :             node_list_futs.push({
     332            0 :                 async move {
     333            0 :                     let http_client = reqwest::ClientBuilder::new()
     334            0 :                         .timeout(Duration::from_secs(5))
     335            0 :                         .build()
     336            0 :                         .expect("Failed to construct HTTP client");
     337            0 :                     let client = mgmt_api::Client::from_client(
     338            0 :                         http_client,
     339            0 :                         node.base_url(),
     340            0 :                         self.config.jwt_token.as_deref(),
     341            0 :                     );
     342            0 : 
     343            0 :                     fn is_fatal(e: &mgmt_api::Error) -> bool {
     344            0 :                         use mgmt_api::Error::*;
     345            0 :                         match e {
     346            0 :                             ReceiveBody(_) | ReceiveErrorBody(_) => false,
     347            0 :                             ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
     348            0 :                             | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
     349            0 :                             | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
     350            0 :                             ApiError(_, _) => true,
     351            0 :                         }
     352            0 :                     }
     353            0 : 
     354            0 :                     tracing::info!("Scanning shards on node {}...", node.id);
     355            0 :                     let description = format!("List locations on {}", node.id);
     356            0 :                     let response = backoff::retry(
     357            0 :                         || client.list_location_config(),
     358            0 :                         is_fatal,
     359            0 :                         1,
     360            0 :                         5,
     361            0 :                         &description,
     362            0 :                         &self.cancel,
     363            0 :                     )
     364            0 :                     .await;
     365              : 
     366            0 :                     (node.id, response)
     367            0 :                 }
     368            0 :             });
     369            0 :         }
     370              : 
     371              :         loop {
     372            0 :             let (node_id, result) = tokio::select! {
     373            0 :                 next = node_list_futs.next() => {
     374              :                     match next {
     375              :                         Some(result) => result,
     376              :                         None =>{
     377              :                             // We got results for all our nodes
     378              :                             break;
     379              :                         }
     380              : 
     381              :                     }
     382              :                 },
     383              :                 _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
     384              :                     // Give up waiting for anyone who hasn't responded: we will yield the results that we have
     385            0 :                     tracing::info!("Reached deadline while waiting for nodes to respond to location listing requests");
     386              :                     break;
     387              :                 }
     388              :             };
     389              : 
     390            0 :             let Some(list_response) = result else {
     391            0 :                 tracing::info!("Shutdown during startup_reconcile");
     392            0 :                 break;
     393              :             };
     394              : 
     395            0 :             match list_response {
     396            0 :                 Err(e) => {
     397            0 :                     tracing::warn!("Could not scan node {} ({e})", node_id);
     398              :                 }
     399            0 :                 Ok(listing) => {
     400            0 :                     node_results.insert(node_id, listing);
     401            0 :                 }
     402              :             }
     403              :         }
     404              : 
     405            0 :         node_results
     406            0 :     }
     407              : 
     408              :     /// Used during [`Self::startup_reconcile`]: detach a list of unknown-to-us tenants from pageservers.
     409              :     ///
     410              :     /// This is safe to run in the background, because if we don't have this TenantShardId in our map of
     411              :     /// tenants, then it is probably something incompletely deleted before: we will not fight with any
     412              :     /// other task trying to attach it.
     413            0 :     #[instrument(skip_all)]
     414              :     async fn cleanup_locations(&self, cleanup: Vec<(TenantShardId, NodeId)>) {
     415              :         let nodes = self.inner.read().unwrap().nodes.clone();
     416              : 
     417              :         for (tenant_shard_id, node_id) in cleanup {
     418              :             // A node reported a tenant_shard_id which is unknown to us: detach it.
     419              :             let Some(node) = nodes.get(&node_id) else {
     420              :                 // This is legitimate; we run in the background and [`Self::startup_reconcile`] might have identified
     421              :                 // a location to clean up on a node that has since been removed.
     422            0 :                 tracing::info!(
     423            0 :                     "Not cleaning up location {node_id}/{tenant_shard_id}: node not found"
     424            0 :                 );
     425              :                 continue;
     426              :             };
     427              : 
     428              :             if self.cancel.is_cancelled() {
     429              :                 break;
     430              :             }
     431              : 
     432              :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
     433              :             match client
     434              :                 .location_config(
     435              :                     tenant_shard_id,
     436              :                     LocationConfig {
     437              :                         mode: LocationConfigMode::Detached,
     438              :                         generation: None,
     439              :                         secondary_conf: None,
     440              :                         shard_number: tenant_shard_id.shard_number.0,
     441              :                         shard_count: tenant_shard_id.shard_count.literal(),
     442              :                         shard_stripe_size: 0,
     443              :                         tenant_conf: models::TenantConfig::default(),
     444              :                     },
     445              :                     None,
     446              :                 )
     447              :                 .await
     448              :             {
     449              :                 Ok(()) => {
     450            0 :                     tracing::info!(
     451            0 :                         "Detached unknown shard {tenant_shard_id} on pageserver {node_id}"
     452            0 :                     );
     453              :                 }
     454              :                 Err(e) => {
     455              :                     // Non-fatal error: leaving a tenant shard behind that we are not managing shouldn't
     456              :                     // break anything.
     457            0 :                     tracing::error!(
     458            0 :                         "Failed to detach unknkown shard {tenant_shard_id} on pageserver {node_id}: {e}"
     459            0 :                     );
     460              :                 }
     461              :             }
     462              :         }
     463              :     }
     464              : 
     465              :     /// Used during [`Self::startup_reconcile`]: issue many concurrent compute notifications.
     466              :     ///
     467              :     /// Returns a set of any shards for which notifications where not acked within the deadline.
     468            0 :     async fn compute_notify_many(
     469            0 :         &self,
     470            0 :         notifications: Vec<(TenantShardId, NodeId)>,
     471            0 :         deadline: Instant,
     472            0 :     ) -> HashSet<TenantShardId> {
     473            0 :         let compute_hook = self.inner.read().unwrap().compute_hook.clone();
     474            0 : 
     475            0 :         let attempt_shards = notifications.iter().map(|i| i.0).collect::<HashSet<_>>();
     476            0 :         let mut success_shards = HashSet::new();
     477            0 : 
     478            0 :         // Construct an async stream of futures to invoke the compute notify function: we do this
     479            0 :         // in order to subsequently use .buffered() on the stream to execute with bounded parallelism.
     480            0 :         let mut stream = futures::stream::iter(notifications.into_iter())
     481            0 :             .map(|(tenant_shard_id, node_id)| {
     482            0 :                 let compute_hook = compute_hook.clone();
     483            0 :                 let cancel = self.cancel.clone();
     484            0 :                 async move {
     485            0 :                     if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
     486            0 :                         tracing::error!(
     487            0 :                             %tenant_shard_id,
     488            0 :                             %node_id,
     489            0 :                             "Failed to notify compute on startup for shard: {e}"
     490            0 :                         );
     491            0 :                         None
     492              :                     } else {
     493            0 :                         Some(tenant_shard_id)
     494              :                     }
     495            0 :                 }
     496            0 :             })
     497            0 :             .buffered(compute_hook::API_CONCURRENCY);
     498              : 
     499              :         loop {
     500            0 :             tokio::select! {
     501            0 :                 next = stream.next() => {
     502              :                     match next {
     503              :                         Some(Some(success_shard)) => {
     504              :                             // A notification succeeded
     505              :                             success_shards.insert(success_shard);
     506              :                             },
     507              :                         Some(None) => {
     508              :                             // A notification that failed
     509              :                         },
     510              :                         None => {
     511            0 :                             tracing::info!("Successfully sent all compute notifications");
     512              :                             break;
     513              :                         }
     514              :                     }
     515              :                 },
     516              :                 _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
     517              :                     // Give up sending any that didn't succeed yet
     518            0 :                     tracing::info!("Reached deadline while sending compute notifications");
     519              :                     break;
     520              :                 }
     521              :             };
     522              :         }
     523              : 
     524            0 :         attempt_shards
     525            0 :             .difference(&success_shards)
     526            0 :             .cloned()
     527            0 :             .collect()
     528            0 :     }
     529              : 
     530              :     /// Long running background task that periodically wakes up and looks for shards that need
     531              :     /// reconciliation.  Reconciliation is fallible, so any reconciliation tasks that fail during
     532              :     /// e.g. a tenant create/attach/migrate must eventually be retried: this task is responsible
     533              :     /// for those retries.
     534            0 :     #[instrument(skip_all)]
     535              :     async fn background_reconcile(&self) {
     536              :         self.startup_complete.clone().wait().await;
     537              : 
     538              :         const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20);
     539              : 
     540              :         let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD);
     541              :         while !self.cancel.is_cancelled() {
     542            0 :             tokio::select! {
     543            0 :               _ = interval.tick() => { self.reconcile_all(); }
     544            0 :               _ = self.cancel.cancelled() => return
     545            0 :             }
     546              :         }
     547              :     }
     548              : 
     549              :     /// Apply the contents of a [`ReconcileResult`] to our in-memory state: if the reconciliation
     550              :     /// was successful, this will update the observed state of the tenant such that subsequent
     551              :     /// calls to [`TenantState::maybe_reconcile`] will do nothing.
     552            0 :     #[instrument(skip_all, fields(
     553              :         tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(),
     554              :         sequence=%result.sequence
     555            0 :     ))]
     556              :     fn process_result(&self, result: ReconcileResult) {
     557              :         let mut locked = self.inner.write().unwrap();
     558              :         let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
     559              :             // A reconciliation result might race with removing a tenant: drop results for
     560              :             // tenants that aren't in our map.
     561              :             return;
     562              :         };
     563              : 
     564              :         // Usually generation should only be updated via this path, so the max() isn't
     565              :         // needed, but it is used to handle out-of-band updates via. e.g. test hook.
     566              :         tenant.generation = std::cmp::max(tenant.generation, result.generation);
     567              : 
     568              :         // If the reconciler signals that it failed to notify compute, set this state on
     569              :         // the shard so that a future [`TenantState::maybe_reconcile`] will try again.
     570              :         tenant.pending_compute_notification = result.pending_compute_notification;
     571              : 
     572              :         match result.result {
     573              :             Ok(()) => {
     574              :                 for (node_id, loc) in &result.observed.locations {
     575              :                     if let Some(conf) = &loc.conf {
     576            0 :                         tracing::info!("Updating observed location {}: {:?}", node_id, conf);
     577              :                     } else {
     578            0 :                         tracing::info!("Setting observed location {} to None", node_id,)
     579              :                     }
     580              :                 }
     581              :                 tenant.observed = result.observed;
     582              :                 tenant.waiter.advance(result.sequence);
     583              :             }
     584              :             Err(e) => {
     585            0 :                 tracing::warn!("Reconcile error: {}", e);
     586              : 
     587              :                 // Ordering: populate last_error before advancing error_seq,
     588              :                 // so that waiters will see the correct error after waiting.
     589              :                 *(tenant.last_error.lock().unwrap()) = format!("{e}");
     590              :                 tenant.error_waiter.advance(result.sequence);
     591              : 
     592              :                 for (node_id, o) in result.observed.locations {
     593              :                     tenant.observed.locations.insert(node_id, o);
     594              :                 }
     595              :             }
     596              :         }
     597              :     }
     598              : 
     599            0 :     async fn process_results(
     600            0 :         &self,
     601            0 :         mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResult>,
     602            0 :     ) {
     603            0 :         loop {
     604            0 :             // Wait for the next result, or for cancellation
     605            0 :             let result = tokio::select! {
     606            0 :                 r = result_rx.recv() => {
     607              :                     match r {
     608              :                         Some(result) => {result},
     609              :                         None => {break;}
     610              :                     }
     611              :                 }
     612              :                 _ = self.cancel.cancelled() => {
     613              :                     break;
     614              :                 }
     615              :             };
     616              : 
     617            0 :             self.process_result(result);
     618              :         }
     619            0 :     }
     620              : 
     621            0 :     pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
     622            0 :         let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
     623              : 
     624            0 :         tracing::info!("Loading nodes from database...");
     625            0 :         let nodes = persistence
     626            0 :             .list_nodes()
     627            0 :             .await?
     628            0 :             .into_iter()
     629            0 :             .map(|n| Node {
     630            0 :                 id: NodeId(n.node_id as u64),
     631            0 :                 // At startup we consider a node offline until proven otherwise.
     632            0 :                 availability: NodeAvailability::Offline,
     633            0 :                 scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
     634            0 :                     .expect("Bad scheduling policy in DB"),
     635            0 :                 listen_http_addr: n.listen_http_addr,
     636            0 :                 listen_http_port: n.listen_http_port as u16,
     637            0 :                 listen_pg_addr: n.listen_pg_addr,
     638            0 :                 listen_pg_port: n.listen_pg_port as u16,
     639            0 :             })
     640            0 :             .collect::<Vec<_>>();
     641            0 :         let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.id, n)).collect();
     642            0 :         tracing::info!("Loaded {} nodes from database.", nodes.len());
     643              : 
     644            0 :         tracing::info!("Loading shards from database...");
     645            0 :         let tenant_shard_persistence = persistence.list_tenant_shards().await?;
     646            0 :         tracing::info!(
     647            0 :             "Loaded {} shards from database.",
     648            0 :             tenant_shard_persistence.len()
     649            0 :         );
     650              : 
     651            0 :         let mut tenants = BTreeMap::new();
     652            0 : 
     653            0 :         let mut scheduler = Scheduler::new(nodes.values());
     654            0 : 
     655            0 :         #[cfg(feature = "testing")]
     656            0 :         {
     657            0 :             // Hack: insert scheduler state for all nodes referenced by shards, as compatibility
     658            0 :             // tests only store the shards, not the nodes.  The nodes will be loaded shortly
     659            0 :             // after when pageservers start up and register.
     660            0 :             let mut node_ids = HashSet::new();
     661            0 :             for tsp in &tenant_shard_persistence {
     662            0 :                 if tsp.generation_pageserver != i64::MAX {
     663            0 :                     node_ids.insert(tsp.generation_pageserver);
     664            0 :                 }
     665              :             }
     666            0 :             for node_id in node_ids {
     667            0 :                 tracing::info!("Creating node {} in scheduler for tests", node_id);
     668            0 :                 let node = Node {
     669            0 :                     id: NodeId(node_id as u64),
     670            0 :                     availability: NodeAvailability::Active,
     671            0 :                     scheduling: NodeSchedulingPolicy::Active,
     672            0 :                     listen_http_addr: "".to_string(),
     673            0 :                     listen_http_port: 123,
     674            0 :                     listen_pg_addr: "".to_string(),
     675            0 :                     listen_pg_port: 123,
     676            0 :                 };
     677            0 : 
     678            0 :                 scheduler.node_upsert(&node);
     679              :             }
     680              :         }
     681            0 :         for tsp in tenant_shard_persistence {
     682            0 :             let tenant_shard_id = TenantShardId {
     683            0 :                 tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
     684            0 :                 shard_number: ShardNumber(tsp.shard_number as u8),
     685            0 :                 shard_count: ShardCount::new(tsp.shard_count as u8),
     686              :             };
     687            0 :             let shard_identity = if tsp.shard_count == 0 {
     688            0 :                 ShardIdentity::unsharded()
     689              :             } else {
     690            0 :                 ShardIdentity::new(
     691            0 :                     ShardNumber(tsp.shard_number as u8),
     692            0 :                     ShardCount::new(tsp.shard_count as u8),
     693            0 :                     ShardStripeSize(tsp.shard_stripe_size as u32),
     694            0 :                 )?
     695              :             };
     696              : 
     697              :             // We will populate intent properly later in [`Self::startup_reconcile`], initially populate
     698              :             // it with what we can infer: the node for which a generation was most recently issued.
     699            0 :             let mut intent = IntentState::new();
     700            0 :             if tsp.generation_pageserver != i64::MAX {
     701            0 :                 intent.set_attached(
     702            0 :                     &mut scheduler,
     703            0 :                     Some(NodeId(tsp.generation_pageserver as u64)),
     704            0 :                 );
     705            0 :             }
     706              : 
     707            0 :             let new_tenant = TenantState {
     708            0 :                 tenant_shard_id,
     709            0 :                 shard: shard_identity,
     710            0 :                 sequence: Sequence::initial(),
     711            0 :                 generation: Generation::new(tsp.generation as u32),
     712            0 :                 policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
     713            0 :                 intent,
     714            0 :                 observed: ObservedState::new(),
     715            0 :                 config: serde_json::from_str(&tsp.config).unwrap(),
     716            0 :                 reconciler: None,
     717            0 :                 splitting: tsp.splitting,
     718            0 :                 waiter: Arc::new(SeqWait::new(Sequence::initial())),
     719            0 :                 error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
     720            0 :                 last_error: Arc::default(),
     721            0 :                 pending_compute_notification: false,
     722            0 :             };
     723            0 : 
     724            0 :             tenants.insert(tenant_shard_id, new_tenant);
     725              :         }
     726              : 
     727            0 :         let (startup_completion, startup_complete) = utils::completion::channel();
     728            0 : 
     729            0 :         let this = Arc::new(Self {
     730            0 :             inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
     731            0 :                 config.clone(),
     732            0 :                 result_tx,
     733            0 :                 nodes,
     734            0 :                 tenants,
     735            0 :                 scheduler,
     736            0 :             ))),
     737            0 :             config,
     738            0 :             persistence,
     739            0 :             startup_complete: startup_complete.clone(),
     740            0 :             cancel: CancellationToken::new(),
     741            0 :             gate: Gate::default(),
     742            0 :         });
     743            0 : 
     744            0 :         let result_task_this = this.clone();
     745            0 :         tokio::task::spawn(async move {
     746              :             // Block shutdown until we're done (we must respect self.cancel)
     747            0 :             if let Ok(_gate) = result_task_this.gate.enter() {
     748            0 :                 result_task_this.process_results(result_rx).await
     749            0 :             }
     750            0 :         });
     751            0 : 
     752            0 :         tokio::task::spawn({
     753            0 :             let this = this.clone();
     754            0 :             // We will block the [`Service::startup_complete`] barrier until [`Self::startup_reconcile`]
     755            0 :             // is done.
     756            0 :             let startup_completion = startup_completion.clone();
     757            0 :             async move {
     758              :                 // Block shutdown until we're done (we must respect self.cancel)
     759            0 :                 let Ok(_gate) = this.gate.enter() else {
     760            0 :                     return;
     761              :                 };
     762              : 
     763            0 :                 this.startup_reconcile().await;
     764              : 
     765            0 :                 drop(startup_completion);
     766            0 : 
     767            0 :                 this.background_reconcile().await;
     768            0 :             }
     769            0 :         });
     770            0 : 
     771            0 :         Ok(this)
     772            0 :     }
     773              : 
     774            0 :     pub(crate) async fn attach_hook(
     775            0 :         &self,
     776            0 :         attach_req: AttachHookRequest,
     777            0 :     ) -> anyhow::Result<AttachHookResponse> {
     778            0 :         // This is a test hook.  To enable using it on tenants that were created directly with
     779            0 :         // the pageserver API (not via this service), we will auto-create any missing tenant
     780            0 :         // shards with default state.
     781            0 :         let insert = {
     782            0 :             let locked = self.inner.write().unwrap();
     783            0 :             !locked.tenants.contains_key(&attach_req.tenant_shard_id)
     784            0 :         };
     785            0 :         if insert {
     786            0 :             let tsp = TenantShardPersistence {
     787            0 :                 tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
     788            0 :                 shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
     789            0 :                 shard_count: attach_req.tenant_shard_id.shard_count.literal() as i32,
     790            0 :                 shard_stripe_size: 0,
     791            0 :                 generation: 0,
     792            0 :                 generation_pageserver: i64::MAX,
     793            0 :                 placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
     794            0 :                 config: serde_json::to_string(&TenantConfig::default()).unwrap(),
     795            0 :                 splitting: SplitState::default(),
     796            0 :             };
     797            0 : 
     798            0 :             match self.persistence.insert_tenant_shards(vec![tsp]).await {
     799            0 :                 Err(e) => match e {
     800              :                     DatabaseError::Query(diesel::result::Error::DatabaseError(
     801              :                         DatabaseErrorKind::UniqueViolation,
     802              :                         _,
     803              :                     )) => {
     804            0 :                         tracing::info!(
     805            0 :                             "Raced with another request to insert tenant {}",
     806            0 :                             attach_req.tenant_shard_id
     807            0 :                         )
     808              :                     }
     809            0 :                     _ => return Err(e.into()),
     810              :                 },
     811              :                 Ok(()) => {
     812            0 :                     tracing::info!("Inserted shard {} in database", attach_req.tenant_shard_id);
     813              : 
     814            0 :                     let mut locked = self.inner.write().unwrap();
     815            0 :                     locked.tenants.insert(
     816            0 :                         attach_req.tenant_shard_id,
     817            0 :                         TenantState::new(
     818            0 :                             attach_req.tenant_shard_id,
     819            0 :                             ShardIdentity::unsharded(),
     820            0 :                             PlacementPolicy::Single,
     821            0 :                         ),
     822            0 :                     );
     823            0 :                     tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
     824              :                 }
     825              :             }
     826            0 :         }
     827              : 
     828            0 :         let new_generation = if let Some(req_node_id) = attach_req.node_id {
     829              :             Some(
     830            0 :                 self.persistence
     831            0 :                     .increment_generation(attach_req.tenant_shard_id, req_node_id)
     832            0 :                     .await?,
     833              :             )
     834              :         } else {
     835            0 :             self.persistence.detach(attach_req.tenant_shard_id).await?;
     836            0 :             None
     837              :         };
     838              : 
     839            0 :         let mut locked = self.inner.write().unwrap();
     840            0 :         let (_nodes, tenants, scheduler) = locked.parts_mut();
     841            0 : 
     842            0 :         let tenant_state = tenants
     843            0 :             .get_mut(&attach_req.tenant_shard_id)
     844            0 :             .expect("Checked for existence above");
     845              : 
     846            0 :         if let Some(new_generation) = new_generation {
     847            0 :             tenant_state.generation = new_generation;
     848            0 :         } else {
     849              :             // This is a detach notification.  We must update placement policy to avoid re-attaching
     850              :             // during background scheduling/reconciliation, or during attachment service restart.
     851            0 :             assert!(attach_req.node_id.is_none());
     852            0 :             tenant_state.policy = PlacementPolicy::Detached;
     853              :         }
     854              : 
     855            0 :         if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
     856            0 :             tracing::info!(
     857            0 :                 tenant_id = %attach_req.tenant_shard_id,
     858            0 :                 ps_id = %attaching_pageserver,
     859            0 :                 generation = ?tenant_state.generation,
     860            0 :                 "issuing",
     861            0 :             );
     862            0 :         } else if let Some(ps_id) = tenant_state.intent.get_attached() {
     863            0 :             tracing::info!(
     864            0 :                 tenant_id = %attach_req.tenant_shard_id,
     865            0 :                 %ps_id,
     866            0 :                 generation = ?tenant_state.generation,
     867            0 :                 "dropping",
     868            0 :             );
     869              :         } else {
     870            0 :             tracing::info!(
     871            0 :             tenant_id = %attach_req.tenant_shard_id,
     872            0 :             "no-op: tenant already has no pageserver");
     873              :         }
     874            0 :         tenant_state
     875            0 :             .intent
     876            0 :             .set_attached(scheduler, attach_req.node_id);
     877              : 
     878            0 :         tracing::info!(
     879            0 :             "attach_hook: tenant {} set generation {:?}, pageserver {}",
     880            0 :             attach_req.tenant_shard_id,
     881            0 :             tenant_state.generation,
     882            0 :             // TODO: this is an odd number of 0xf's
     883            0 :             attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
     884            0 :         );
     885              : 
     886              :         // Trick the reconciler into not doing anything for this tenant: this helps
     887              :         // tests that manually configure a tenant on the pagesrever, and then call this
     888              :         // attach hook: they don't want background reconciliation to modify what they
     889              :         // did to the pageserver.
     890              :         #[cfg(feature = "testing")]
     891              :         {
     892            0 :             if let Some(node_id) = attach_req.node_id {
     893            0 :                 tenant_state.observed.locations = HashMap::from([(
     894            0 :                     node_id,
     895            0 :                     ObservedStateLocation {
     896            0 :                         conf: Some(attached_location_conf(
     897            0 :                             tenant_state.generation,
     898            0 :                             &tenant_state.shard,
     899            0 :                             &tenant_state.config,
     900            0 :                         )),
     901            0 :                     },
     902            0 :                 )]);
     903            0 :             } else {
     904            0 :                 tenant_state.observed.locations.clear();
     905            0 :             }
     906              :         }
     907              : 
     908            0 :         Ok(AttachHookResponse {
     909            0 :             gen: attach_req
     910            0 :                 .node_id
     911            0 :                 .map(|_| tenant_state.generation.into().unwrap()),
     912            0 :         })
     913            0 :     }
     914              : 
     915            0 :     pub(crate) fn inspect(&self, inspect_req: InspectRequest) -> InspectResponse {
     916            0 :         let locked = self.inner.read().unwrap();
     917            0 : 
     918            0 :         let tenant_state = locked.tenants.get(&inspect_req.tenant_shard_id);
     919            0 : 
     920            0 :         InspectResponse {
     921            0 :             attachment: tenant_state.and_then(|s| {
     922            0 :                 s.intent
     923            0 :                     .get_attached()
     924            0 :                     .map(|ps| (s.generation.into().unwrap(), ps))
     925            0 :             }),
     926            0 :         }
     927            0 :     }
     928              : 
     929            0 :     pub(crate) async fn re_attach(
     930            0 :         &self,
     931            0 :         reattach_req: ReAttachRequest,
     932            0 :     ) -> Result<ReAttachResponse, ApiError> {
     933            0 :         // Take a re-attach as indication that the node is available: this is a precursor to proper
     934            0 :         // heartbeating in https://github.com/neondatabase/neon/issues/6844
     935            0 :         self.node_configure(NodeConfigureRequest {
     936            0 :             node_id: reattach_req.node_id,
     937            0 :             availability: Some(NodeAvailability::Active),
     938            0 :             scheduling: None,
     939            0 :         })
     940            0 :         .await?;
     941              : 
     942              :         // Ordering: we must persist generation number updates before making them visible in the in-memory state
     943            0 :         let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;
     944              : 
     945              :         // Apply the updated generation to our in-memory state
     946            0 :         let mut locked = self.inner.write().unwrap();
     947            0 : 
     948            0 :         let mut response = ReAttachResponse {
     949            0 :             tenants: Vec::new(),
     950            0 :         };
     951              : 
     952            0 :         for (tenant_shard_id, new_gen) in incremented_generations {
     953            0 :             response.tenants.push(ReAttachResponseTenant {
     954            0 :                 id: tenant_shard_id,
     955            0 :                 gen: new_gen.into().unwrap(),
     956            0 :             });
     957            0 : 
     958            0 :             // Apply the new generation number to our in-memory state
     959            0 :             let shard_state = locked.tenants.get_mut(&tenant_shard_id);
     960            0 :             let Some(shard_state) = shard_state else {
     961              :                 // Not fatal.  This edge case requires a re-attach to happen
     962              :                 // between inserting a new tenant shard in to the database, and updating our in-memory
     963              :                 // state to know about the shard, _and_ that the state inserted to the database referenced
     964              :                 // a pageserver.  Should never happen, but handle it rather than panicking, since it should
     965              :                 // be harmless.
     966            0 :                 tracing::error!(
     967            0 :                     "Shard {} is in database for node {} but not in-memory state",
     968            0 :                     tenant_shard_id,
     969            0 :                     reattach_req.node_id
     970            0 :                 );
     971            0 :                 continue;
     972              :             };
     973              : 
     974            0 :             shard_state.generation = std::cmp::max(shard_state.generation, new_gen);
     975            0 :             if let Some(observed) = shard_state
     976            0 :                 .observed
     977            0 :                 .locations
     978            0 :                 .get_mut(&reattach_req.node_id)
     979              :             {
     980            0 :                 if let Some(conf) = observed.conf.as_mut() {
     981            0 :                     conf.generation = new_gen.into();
     982            0 :                 }
     983            0 :             }
     984              : 
     985              :             // TODO: cancel/restart any running reconciliation for this tenant, it might be trying
     986              :             // to call location_conf API with an old generation.  Wait for cancellation to complete
     987              :             // before responding to this request.  Requires well implemented CancellationToken logic
     988              :             // all the way to where we call location_conf.  Even then, there can still be a location_conf
     989              :             // request in flight over the network: TODO handle that by making location_conf API refuse
     990              :             // to go backward in generations.
     991              :         }
     992            0 :         Ok(response)
     993            0 :     }
     994              : 
     995            0 :     pub(crate) fn validate(&self, validate_req: ValidateRequest) -> ValidateResponse {
     996            0 :         let locked = self.inner.read().unwrap();
     997            0 : 
     998            0 :         let mut response = ValidateResponse {
     999            0 :             tenants: Vec::new(),
    1000            0 :         };
    1001              : 
    1002            0 :         for req_tenant in validate_req.tenants {
    1003            0 :             if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) {
    1004            0 :                 let valid = tenant_state.generation == Generation::new(req_tenant.gen);
    1005            0 :                 tracing::info!(
    1006            0 :                     "handle_validate: {}(gen {}): valid={valid} (latest {:?})",
    1007            0 :                     req_tenant.id,
    1008            0 :                     req_tenant.gen,
    1009            0 :                     tenant_state.generation
    1010            0 :                 );
    1011            0 :                 response.tenants.push(ValidateResponseTenant {
    1012            0 :                     id: req_tenant.id,
    1013            0 :                     valid,
    1014            0 :                 });
    1015            0 :             } else {
    1016            0 :                 // After tenant deletion, we may approve any validation.  This avoids
    1017            0 :                 // spurious warnings on the pageserver if it has pending LSN updates
    1018            0 :                 // at the point a deletion happens.
    1019            0 :                 response.tenants.push(ValidateResponseTenant {
    1020            0 :                     id: req_tenant.id,
    1021            0 :                     valid: true,
    1022            0 :                 });
    1023            0 :             }
    1024              :         }
    1025            0 :         response
    1026            0 :     }
    1027              : 
    1028            0 :     pub(crate) async fn tenant_create(
    1029            0 :         &self,
    1030            0 :         create_req: TenantCreateRequest,
    1031            0 :     ) -> Result<TenantCreateResponse, ApiError> {
    1032            0 :         let (response, waiters) = self.do_tenant_create(create_req).await?;
    1033              : 
    1034            0 :         self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await?;
    1035            0 :         Ok(response)
    1036            0 :     }
    1037              : 
    1038            0 :     pub(crate) async fn do_tenant_create(
    1039            0 :         &self,
    1040            0 :         create_req: TenantCreateRequest,
    1041            0 :     ) -> Result<(TenantCreateResponse, Vec<ReconcilerWaiter>), ApiError> {
    1042              :         // This service expects to handle sharding itself: it is an error to try and directly create
    1043              :         // a particular shard here.
    1044            0 :         let tenant_id = if !create_req.new_tenant_id.is_unsharded() {
    1045            0 :             return Err(ApiError::BadRequest(anyhow::anyhow!(
    1046            0 :                 "Attempted to create a specific shard, this API is for creating the whole tenant"
    1047            0 :             )));
    1048              :         } else {
    1049            0 :             create_req.new_tenant_id.tenant_id
    1050              :         };
    1051              : 
    1052            0 :         tracing::info!(
    1053            0 :             "Creating tenant {}, shard_count={:?}",
    1054            0 :             create_req.new_tenant_id,
    1055            0 :             create_req.shard_parameters.count,
    1056            0 :         );
    1057              : 
    1058            0 :         let create_ids = (0..create_req.shard_parameters.count.count())
    1059            0 :             .map(|i| TenantShardId {
    1060            0 :                 tenant_id,
    1061            0 :                 shard_number: ShardNumber(i),
    1062            0 :                 shard_count: create_req.shard_parameters.count,
    1063            0 :             })
    1064            0 :             .collect::<Vec<_>>();
    1065            0 : 
    1066            0 :         // TODO: enable specifying this.  Using Single as a default helps legacy tests to work (they
    1067            0 :         // have no expectation of HA).
    1068            0 :         let placement_policy: PlacementPolicy = PlacementPolicy::Single;
    1069            0 : 
    1070            0 :         // Ordering: we persist tenant shards before creating them on the pageserver.  This enables a caller
    1071            0 :         // to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
    1072            0 :         // during the creation, rather than risking leaving orphan objects in S3.
    1073            0 :         let persist_tenant_shards = create_ids
    1074            0 :             .iter()
    1075            0 :             .map(|tenant_shard_id| TenantShardPersistence {
    1076            0 :                 tenant_id: tenant_shard_id.tenant_id.to_string(),
    1077            0 :                 shard_number: tenant_shard_id.shard_number.0 as i32,
    1078            0 :                 shard_count: tenant_shard_id.shard_count.literal() as i32,
    1079            0 :                 shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32,
    1080            0 :                 generation: create_req.generation.map(|g| g as i32).unwrap_or(0),
    1081            0 :                 generation_pageserver: i64::MAX,
    1082            0 :                 placement_policy: serde_json::to_string(&placement_policy).unwrap(),
    1083            0 :                 config: serde_json::to_string(&create_req.config).unwrap(),
    1084            0 :                 splitting: SplitState::default(),
    1085            0 :             })
    1086            0 :             .collect();
    1087            0 :         self.persistence
    1088            0 :             .insert_tenant_shards(persist_tenant_shards)
    1089            0 :             .await
    1090            0 :             .map_err(|e| {
    1091            0 :                 // TODO: distinguish primary key constraint (idempotent, OK), from other errors
    1092            0 :                 ApiError::InternalServerError(anyhow::anyhow!(e))
    1093            0 :             })?;
    1094              : 
    1095            0 :         let (waiters, response_shards) = {
    1096            0 :             let mut locked = self.inner.write().unwrap();
    1097            0 :             let (_nodes, tenants, scheduler) = locked.parts_mut();
    1098            0 : 
    1099            0 :             let mut response_shards = Vec::new();
    1100              : 
    1101            0 :             for tenant_shard_id in create_ids {
    1102            0 :                 tracing::info!("Creating shard {tenant_shard_id}...");
    1103              : 
    1104              :                 use std::collections::btree_map::Entry;
    1105            0 :                 match tenants.entry(tenant_shard_id) {
    1106            0 :                     Entry::Occupied(mut entry) => {
    1107            0 :                         tracing::info!(
    1108            0 :                             "Tenant shard {tenant_shard_id} already exists while creating"
    1109            0 :                         );
    1110              : 
    1111              :                         // TODO: schedule() should take an anti-affinity expression that pushes
    1112              :                         // attached and secondary locations (independently) away frorm those
    1113              :                         // pageservers also holding a shard for this tenant.
    1114              : 
    1115            0 :                         entry.get_mut().schedule(scheduler).map_err(|e| {
    1116            0 :                             ApiError::Conflict(format!(
    1117            0 :                                 "Failed to schedule shard {tenant_shard_id}: {e}"
    1118            0 :                             ))
    1119            0 :                         })?;
    1120              : 
    1121            0 :                         response_shards.push(TenantCreateResponseShard {
    1122            0 :                             shard_id: tenant_shard_id,
    1123            0 :                             node_id: entry
    1124            0 :                                 .get()
    1125            0 :                                 .intent
    1126            0 :                                 .get_attached()
    1127            0 :                                 .expect("We just set pageserver if it was None"),
    1128            0 :                             generation: entry.get().generation.into().unwrap(),
    1129            0 :                         });
    1130            0 : 
    1131            0 :                         continue;
    1132              :                     }
    1133            0 :                     Entry::Vacant(entry) => {
    1134            0 :                         let mut state = TenantState::new(
    1135            0 :                             tenant_shard_id,
    1136            0 :                             ShardIdentity::from_params(
    1137            0 :                                 tenant_shard_id.shard_number,
    1138            0 :                                 &create_req.shard_parameters,
    1139            0 :                             ),
    1140            0 :                             placement_policy.clone(),
    1141            0 :                         );
    1142              : 
    1143            0 :                         if let Some(create_gen) = create_req.generation {
    1144            0 :                             state.generation = Generation::new(create_gen);
    1145            0 :                         }
    1146            0 :                         state.config = create_req.config.clone();
    1147            0 : 
    1148            0 :                         state.schedule(scheduler).map_err(|e| {
    1149            0 :                             ApiError::Conflict(format!(
    1150            0 :                                 "Failed to schedule shard {tenant_shard_id}: {e}"
    1151            0 :                             ))
    1152            0 :                         })?;
    1153              : 
    1154            0 :                         response_shards.push(TenantCreateResponseShard {
    1155            0 :                             shard_id: tenant_shard_id,
    1156            0 :                             node_id: state
    1157            0 :                                 .intent
    1158            0 :                                 .get_attached()
    1159            0 :                                 .expect("We just set pageserver if it was None"),
    1160            0 :                             generation: state.generation.into().unwrap(),
    1161            0 :                         });
    1162            0 :                         entry.insert(state)
    1163              :                     }
    1164              :                 };
    1165              :             }
    1166              : 
    1167              :             // Take a snapshot of pageservers
    1168            0 :             let pageservers = locked.nodes.clone();
    1169            0 : 
    1170            0 :             let result_tx = locked.result_tx.clone();
    1171            0 :             let compute_hook = locked.compute_hook.clone();
    1172            0 : 
    1173            0 :             let waiters = locked
    1174            0 :                 .tenants
    1175            0 :                 .range_mut(TenantShardId::tenant_range(tenant_id))
    1176            0 :                 .filter_map(|(_shard_id, shard)| {
    1177            0 :                     shard.maybe_reconcile(
    1178            0 :                         result_tx.clone(),
    1179            0 :                         &pageservers,
    1180            0 :                         &compute_hook,
    1181            0 :                         &self.config,
    1182            0 :                         &self.persistence,
    1183            0 :                         &self.gate,
    1184            0 :                         &self.cancel,
    1185            0 :                     )
    1186            0 :                 })
    1187            0 :                 .collect::<Vec<_>>();
    1188            0 :             (waiters, response_shards)
    1189            0 :         };
    1190            0 : 
    1191            0 :         Ok((
    1192            0 :             TenantCreateResponse {
    1193            0 :                 shards: response_shards,
    1194            0 :             },
    1195            0 :             waiters,
    1196            0 :         ))
    1197            0 :     }
    1198              : 
    1199              :     /// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
    1200              :     /// wait for reconciliation to complete before responding.
    1201            0 :     async fn await_waiters(
    1202            0 :         &self,
    1203            0 :         waiters: Vec<ReconcilerWaiter>,
    1204            0 :         timeout: Duration,
    1205            0 :     ) -> Result<(), ReconcileWaitError> {
    1206            0 :         let deadline = Instant::now().checked_add(timeout).unwrap();
    1207            0 :         for waiter in waiters {
    1208            0 :             let timeout = deadline.duration_since(Instant::now());
    1209            0 :             waiter.wait_timeout(timeout).await?;
    1210              :         }
    1211              : 
    1212            0 :         Ok(())
    1213            0 :     }
    1214              : 
    1215              :     /// This API is used by the cloud control plane to do coarse-grained control of tenants:
    1216              :     /// - Call with mode Attached* to upsert the tenant.
    1217              :     /// - Call with mode Detached to switch to PolicyMode::Detached
    1218              :     ///
    1219              :     /// In future, calling with mode Secondary may switch to a detach-lite mode in which a tenant only has
    1220              :     /// secondary locations.
    1221            0 :     pub(crate) async fn tenant_location_config(
    1222            0 :         &self,
    1223            0 :         tenant_id: TenantId,
    1224            0 :         req: TenantLocationConfigRequest,
    1225            0 :     ) -> Result<TenantLocationConfigResponse, ApiError> {
    1226            0 :         if !req.tenant_id.is_unsharded() {
    1227            0 :             return Err(ApiError::BadRequest(anyhow::anyhow!(
    1228            0 :                 "This API is for importing single-sharded or unsharded tenants"
    1229            0 :             )));
    1230            0 :         }
    1231            0 : 
    1232            0 :         let mut waiters = Vec::new();
    1233            0 :         let mut result = TenantLocationConfigResponse { shards: Vec::new() };
    1234            0 :         let maybe_create = {
    1235            0 :             let mut locked = self.inner.write().unwrap();
    1236            0 :             let result_tx = locked.result_tx.clone();
    1237            0 :             let compute_hook = locked.compute_hook.clone();
    1238            0 :             let (nodes, tenants, scheduler) = locked.parts_mut();
    1239            0 : 
    1240            0 :             // Maybe we have existing shards
    1241            0 :             let mut create = true;
    1242            0 :             for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
    1243              :                 // Saw an existing shard: this is not a creation
    1244            0 :                 create = false;
    1245            0 : 
    1246            0 :                 // Note that for existing tenants we do _not_ respect the generation in the request: this is likely
    1247            0 :                 // to be stale.  Once a tenant is created in this service, our view of generation is authoritative, and
    1248            0 :                 // callers' generations may be ignored.  This represents a one-way migration of tenants from the outer
    1249            0 :                 // cloud control plane into this service.
    1250            0 : 
    1251            0 :                 // Use location config mode as an indicator of policy: if they ask for
    1252            0 :                 // attached we go to default HA attached mode.  If they ask for secondary
    1253            0 :                 // we go to secondary-only mode.  If they ask for detached we detach.
    1254            0 :                 match req.config.mode {
    1255            0 :                     LocationConfigMode::Detached => {
    1256            0 :                         shard.policy = PlacementPolicy::Detached;
    1257            0 :                     }
    1258              :                     LocationConfigMode::Secondary => {
    1259              :                         // TODO: implement secondary-only mode.
    1260            0 :                         todo!();
    1261              :                     }
    1262              :                     LocationConfigMode::AttachedMulti
    1263              :                     | LocationConfigMode::AttachedSingle
    1264              :                     | LocationConfigMode::AttachedStale => {
    1265              :                         // TODO: persistence for changes in policy
    1266            0 :                         if nodes.len() > 1 {
    1267            0 :                             shard.policy = PlacementPolicy::Double(1)
    1268              :                         } else {
    1269              :                             // Convenience for dev/test: if we just have one pageserver, import
    1270              :                             // tenants into Single mode so that scheduling will succeed.
    1271            0 :                             shard.policy = PlacementPolicy::Single
    1272              :                         }
    1273              :                     }
    1274              :                 }
    1275              : 
    1276            0 :                 shard.schedule(scheduler)?;
    1277              : 
    1278            0 :                 let maybe_waiter = shard.maybe_reconcile(
    1279            0 :                     result_tx.clone(),
    1280            0 :                     nodes,
    1281            0 :                     &compute_hook,
    1282            0 :                     &self.config,
    1283            0 :                     &self.persistence,
    1284            0 :                     &self.gate,
    1285            0 :                     &self.cancel,
    1286            0 :                 );
    1287            0 :                 if let Some(waiter) = maybe_waiter {
    1288            0 :                     waiters.push(waiter);
    1289            0 :                 }
    1290              : 
    1291            0 :                 if let Some(node_id) = shard.intent.get_attached() {
    1292            0 :                     result.shards.push(TenantShardLocation {
    1293            0 :                         shard_id: *shard_id,
    1294            0 :                         node_id: *node_id,
    1295            0 :                     })
    1296            0 :                 }
    1297              :             }
    1298              : 
    1299            0 :             if create {
    1300              :                 // Validate request mode
    1301            0 :                 match req.config.mode {
    1302              :                     LocationConfigMode::Detached | LocationConfigMode::Secondary => {
    1303              :                         // When using this API to onboard an existing tenant to this service, it must start in
    1304              :                         // an attached state, because we need the request to come with a generation
    1305            0 :                         return Err(ApiError::BadRequest(anyhow::anyhow!(
    1306            0 :                             "Imported tenant must be in attached mode"
    1307            0 :                         )));
    1308              :                     }
    1309              : 
    1310              :                     LocationConfigMode::AttachedMulti
    1311              :                     | LocationConfigMode::AttachedSingle
    1312            0 :                     | LocationConfigMode::AttachedStale => {
    1313            0 :                         // Pass
    1314            0 :                     }
    1315              :                 }
    1316              : 
    1317              :                 // Validate request generation
    1318            0 :                 let Some(generation) = req.config.generation else {
    1319              :                     // We can only import attached tenants, because we need the request to come with a generation
    1320            0 :                     return Err(ApiError::BadRequest(anyhow::anyhow!(
    1321            0 :                         "Generation is mandatory when importing tenant"
    1322            0 :                     )));
    1323              :                 };
    1324              : 
    1325              :                 // Synthesize a creation request
    1326            0 :                 Some(TenantCreateRequest {
    1327            0 :                     new_tenant_id: TenantShardId::unsharded(tenant_id),
    1328            0 :                     generation: Some(generation),
    1329            0 :                     shard_parameters: ShardParameters {
    1330            0 :                         // Must preserve the incoming shard_count do distinguish unsharded (0)
    1331            0 :                         // from single-sharded (1): this distinction appears in the S3 keys of the tenant.
    1332            0 :                         count: req.tenant_id.shard_count,
    1333            0 :                         // We only import un-sharded or single-sharded tenants, so stripe
    1334            0 :                         // size can be made up arbitrarily here.
    1335            0 :                         stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE,
    1336            0 :                     },
    1337            0 :                     config: req.config.tenant_conf,
    1338            0 :                 })
    1339              :             } else {
    1340            0 :                 None
    1341              :             }
    1342              :         };
    1343              : 
    1344            0 :         let waiters = if let Some(create_req) = maybe_create {
    1345            0 :             let (create_resp, waiters) = self.do_tenant_create(create_req).await?;
    1346            0 :             result.shards = create_resp
    1347            0 :                 .shards
    1348            0 :                 .into_iter()
    1349            0 :                 .map(|s| TenantShardLocation {
    1350            0 :                     node_id: s.node_id,
    1351            0 :                     shard_id: s.shard_id,
    1352            0 :                 })
    1353            0 :                 .collect();
    1354            0 :             waiters
    1355              :         } else {
    1356            0 :             waiters
    1357              :         };
    1358              : 
    1359            0 :         if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
    1360              :             // Do not treat a reconcile error as fatal: we have already applied any requested
    1361              :             // Intent changes, and the reconcile can fail for external reasons like unavailable
    1362              :             // compute notification API.  In these cases, it is important that we do not
    1363              :             // cause the cloud control plane to retry forever on this API.
    1364            0 :             tracing::warn!(
    1365            0 :                 "Failed to reconcile after /location_config: {e}, returning success anyway"
    1366            0 :             );
    1367            0 :         }
    1368              : 
    1369              :         // Logging the full result is useful because it lets us cross-check what the cloud control
    1370              :         // plane's tenant_shards table should contain.
    1371            0 :         tracing::info!("Complete, returning {result:?}");
    1372              : 
    1373            0 :         Ok(result)
    1374            0 :     }
    1375              : 
    1376            0 :     pub(crate) async fn tenant_time_travel_remote_storage(
    1377            0 :         &self,
    1378            0 :         time_travel_req: &TenantTimeTravelRequest,
    1379            0 :         tenant_id: TenantId,
    1380            0 :         timestamp: Cow<'_, str>,
    1381            0 :         done_if_after: Cow<'_, str>,
    1382            0 :     ) -> Result<(), ApiError> {
    1383            0 :         let node = {
    1384            0 :             let locked = self.inner.read().unwrap();
    1385              :             // Just a sanity check to prevent misuse: the API expects that the tenant is fully
    1386              :             // detached everywhere, and nothing writes to S3 storage. Here, we verify that,
    1387              :             // but only at the start of the process, so it's really just to prevent operator
    1388              :             // mistakes.
    1389            0 :             for (shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
    1390            0 :                 if shard.intent.get_attached().is_some() || !shard.intent.get_secondary().is_empty()
    1391              :                 {
    1392            0 :                     return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1393            0 :                         "We want tenant to be attached in shard with tenant_shard_id={shard_id}"
    1394            0 :                     )));
    1395            0 :                 }
    1396            0 :                 let maybe_attached = shard
    1397            0 :                     .observed
    1398            0 :                     .locations
    1399            0 :                     .iter()
    1400            0 :                     .filter_map(|(node_id, observed_location)| {
    1401            0 :                         observed_location
    1402            0 :                             .conf
    1403            0 :                             .as_ref()
    1404            0 :                             .map(|loc| (node_id, observed_location, loc.mode))
    1405            0 :                     })
    1406            0 :                     .find(|(_, _, mode)| *mode != LocationConfigMode::Detached);
    1407            0 :                 if let Some((node_id, _observed_location, mode)) = maybe_attached {
    1408            0 :                     return Err(ApiError::InternalServerError(anyhow::anyhow!("We observed attached={mode:?} tenant in node_id={node_id} shard with tenant_shard_id={shard_id}")));
    1409            0 :                 }
    1410              :             }
    1411            0 :             let scheduler = &locked.scheduler;
    1412              :             // Right now we only perform the operation on a single node without parallelization
    1413              :             // TODO fan out the operation to multiple nodes for better performance
    1414            0 :             let node_id = scheduler.schedule_shard(&[])?;
    1415            0 :             let node = locked
    1416            0 :                 .nodes
    1417            0 :                 .get(&node_id)
    1418            0 :                 .expect("Pageservers may not be deleted while lock is active");
    1419            0 :             node.clone()
    1420            0 :         };
    1421            0 : 
    1422            0 :         // The shard count is encoded in the remote storage's URL, so we need to handle all historically used shard counts
    1423            0 :         let mut counts = time_travel_req
    1424            0 :             .shard_counts
    1425            0 :             .iter()
    1426            0 :             .copied()
    1427            0 :             .collect::<HashSet<_>>()
    1428            0 :             .into_iter()
    1429            0 :             .collect::<Vec<_>>();
    1430            0 :         counts.sort_unstable();
    1431              : 
    1432            0 :         for count in counts {
    1433            0 :             let shard_ids = (0..count.count())
    1434            0 :                 .map(|i| TenantShardId {
    1435            0 :                     tenant_id,
    1436            0 :                     shard_number: ShardNumber(i),
    1437            0 :                     shard_count: count,
    1438            0 :                 })
    1439            0 :                 .collect::<Vec<_>>();
    1440            0 :             for tenant_shard_id in shard_ids {
    1441            0 :                 let client =
    1442            0 :                     mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
    1443              : 
    1444            0 :                 tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
    1445              : 
    1446            0 :                 client
    1447            0 :                         .tenant_time_travel_remote_storage(
    1448            0 :                             tenant_shard_id,
    1449            0 :                             &timestamp,
    1450            0 :                             &done_if_after,
    1451            0 :                         )
    1452            0 :                         .await
    1453            0 :                         .map_err(|e| {
    1454            0 :                             ApiError::InternalServerError(anyhow::anyhow!(
    1455            0 :                                 "Error doing time travel recovery for shard {tenant_shard_id} on node {}: {e}",
    1456            0 :                                 node.id
    1457            0 :                             ))
    1458            0 :                         })?;
    1459              :             }
    1460              :         }
    1461              : 
    1462            0 :         Ok(())
    1463            0 :     }
    1464              : 
    1465            0 :     pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
    1466            0 :         self.ensure_attached_wait(tenant_id).await?;
    1467              : 
    1468              :         // TODO: refactor into helper
    1469            0 :         let targets = {
    1470            0 :             let locked = self.inner.read().unwrap();
    1471            0 :             let mut targets = Vec::new();
    1472              : 
    1473            0 :             for (tenant_shard_id, shard) in
    1474            0 :                 locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1475            0 :             {
    1476            0 :                 let node_id = shard.intent.get_attached().ok_or_else(|| {
    1477            0 :                     ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
    1478            0 :                 })?;
    1479            0 :                 let node = locked
    1480            0 :                     .nodes
    1481            0 :                     .get(&node_id)
    1482            0 :                     .expect("Pageservers may not be deleted while referenced");
    1483            0 : 
    1484            0 :                 targets.push((*tenant_shard_id, node.clone()));
    1485              :             }
    1486            0 :             targets
    1487            0 :         };
    1488            0 : 
    1489            0 :         // Phase 1: delete on the pageservers
    1490            0 :         let mut any_pending = false;
    1491            0 :         for (tenant_shard_id, node) in targets {
    1492            0 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
    1493              :             // TODO: this, like many other places, requires proper retry handling for 503, timeout: those should not
    1494              :             // surface immediately as an error to our caller.
    1495            0 :             let status = client.tenant_delete(tenant_shard_id).await.map_err(|e| {
    1496            0 :                 ApiError::InternalServerError(anyhow::anyhow!(
    1497            0 :                     "Error deleting shard {tenant_shard_id} on node {}: {e}",
    1498            0 :                     node.id
    1499            0 :                 ))
    1500            0 :             })?;
    1501            0 :             tracing::info!(
    1502            0 :                 "Shard {tenant_shard_id} on node {}, delete returned {}",
    1503            0 :                 node.id,
    1504            0 :                 status
    1505            0 :             );
    1506            0 :             if status == StatusCode::ACCEPTED {
    1507            0 :                 any_pending = true;
    1508            0 :             }
    1509              :         }
    1510              : 
    1511            0 :         if any_pending {
    1512              :             // Caller should call us again later.  When we eventually see 404s from
    1513              :             // all the shards, we may proceed to delete our records of the tenant.
    1514            0 :             tracing::info!(
    1515            0 :                 "Tenant {} has some shards pending deletion, returning 202",
    1516            0 :                 tenant_id
    1517            0 :             );
    1518            0 :             return Ok(StatusCode::ACCEPTED);
    1519            0 :         }
    1520            0 : 
    1521            0 :         // Fall through: deletion of the tenant on pageservers is complete, we may proceed to drop
    1522            0 :         // our in-memory state and database state.
    1523            0 : 
    1524            0 :         // Ordering: we delete persistent state first: if we then
    1525            0 :         // crash, we will drop the in-memory state.
    1526            0 : 
    1527            0 :         // Drop persistent state.
    1528            0 :         self.persistence.delete_tenant(tenant_id).await?;
    1529              : 
    1530              :         // Drop in-memory state
    1531              :         {
    1532            0 :             let mut locked = self.inner.write().unwrap();
    1533            0 :             let (_nodes, tenants, scheduler) = locked.parts_mut();
    1534              : 
    1535              :             // Dereference Scheduler from shards before dropping them
    1536            0 :             for (_tenant_shard_id, shard) in
    1537            0 :                 tenants.range_mut(TenantShardId::tenant_range(tenant_id))
    1538            0 :             {
    1539            0 :                 shard.intent.clear(scheduler);
    1540            0 :             }
    1541              : 
    1542            0 :             tenants.retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
    1543            0 :             tracing::info!(
    1544            0 :                 "Deleted tenant {tenant_id}, now have {} tenants",
    1545            0 :                 locked.tenants.len()
    1546            0 :             );
    1547              :         };
    1548              : 
    1549              :         // Success is represented as 404, to imitate the existing pageserver deletion API
    1550            0 :         Ok(StatusCode::NOT_FOUND)
    1551            0 :     }
    1552              : 
    1553            0 :     pub(crate) async fn tenant_timeline_create(
    1554            0 :         &self,
    1555            0 :         tenant_id: TenantId,
    1556            0 :         mut create_req: TimelineCreateRequest,
    1557            0 :     ) -> Result<TimelineInfo, ApiError> {
    1558            0 :         tracing::info!(
    1559            0 :             "Creating timeline {}/{}",
    1560            0 :             tenant_id,
    1561            0 :             create_req.new_timeline_id,
    1562            0 :         );
    1563              : 
    1564            0 :         self.ensure_attached_wait(tenant_id).await?;
    1565              : 
    1566              :         // TODO: refuse to do this if shard splitting is in progress
    1567              :         // (https://github.com/neondatabase/neon/issues/6676)
    1568            0 :         let mut targets = {
    1569            0 :             let locked = self.inner.read().unwrap();
    1570            0 :             let mut targets = Vec::new();
    1571              : 
    1572            0 :             for (tenant_shard_id, shard) in
    1573            0 :                 locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1574            0 :             {
    1575            0 :                 let node_id = shard.intent.get_attached().ok_or_else(|| {
    1576            0 :                     ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
    1577            0 :                 })?;
    1578            0 :                 let node = locked
    1579            0 :                     .nodes
    1580            0 :                     .get(&node_id)
    1581            0 :                     .expect("Pageservers may not be deleted while referenced");
    1582            0 : 
    1583            0 :                 targets.push((*tenant_shard_id, node.clone()));
    1584              :             }
    1585            0 :             targets
    1586            0 :         };
    1587            0 : 
    1588            0 :         if targets.is_empty() {
    1589            0 :             return Err(ApiError::NotFound(
    1590            0 :                 anyhow::anyhow!("Tenant not found").into(),
    1591            0 :             ));
    1592            0 :         };
    1593            0 :         let shard_zero = targets.remove(0);
    1594              : 
    1595            0 :         async fn create_one(
    1596            0 :             tenant_shard_id: TenantShardId,
    1597            0 :             node: Node,
    1598            0 :             jwt: Option<String>,
    1599            0 :             create_req: TimelineCreateRequest,
    1600            0 :         ) -> Result<TimelineInfo, ApiError> {
    1601            0 :             tracing::info!(
    1602            0 :                 "Creating timeline on shard {}/{}, attached to node {}",
    1603            0 :                 tenant_shard_id,
    1604            0 :                 create_req.new_timeline_id,
    1605            0 :                 node.id
    1606            0 :             );
    1607            0 :             let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
    1608            0 : 
    1609            0 :             client
    1610            0 :                 .timeline_create(tenant_shard_id, &create_req)
    1611            0 :                 .await
    1612            0 :                 .map_err(|e| match e {
    1613            0 :                     mgmt_api::Error::ApiError(status, msg)
    1614            0 :                         if status == StatusCode::INTERNAL_SERVER_ERROR
    1615            0 :                             || status == StatusCode::NOT_ACCEPTABLE =>
    1616            0 :                     {
    1617            0 :                         // TODO: handle more error codes, e.g. 503 should be passed through.  Make a general wrapper
    1618            0 :                         // for pass-through API calls.
    1619            0 :                         ApiError::InternalServerError(anyhow::anyhow!(msg))
    1620              :                     }
    1621            0 :                     _ => ApiError::Conflict(format!("Failed to create timeline: {e}")),
    1622            0 :                 })
    1623            0 :         }
    1624              : 
    1625              :         // Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
    1626              :         // use whatever LSN that shard picked when creating on subsequent shards.  We arbitrarily use shard zero as the shard
    1627              :         // that will get the first creation request, and propagate the LSN to all the >0 shards.
    1628            0 :         let timeline_info = create_one(
    1629            0 :             shard_zero.0,
    1630            0 :             shard_zero.1,
    1631            0 :             self.config.jwt_token.clone(),
    1632            0 :             create_req.clone(),
    1633            0 :         )
    1634            0 :         .await?;
    1635              : 
    1636              :         // Propagate the LSN that shard zero picked, if caller didn't provide one
    1637            0 :         if create_req.ancestor_timeline_id.is_some() && create_req.ancestor_start_lsn.is_none() {
    1638            0 :             create_req.ancestor_start_lsn = timeline_info.ancestor_lsn;
    1639            0 :         }
    1640              : 
    1641              :         // Create timeline on remaining shards with number >0
    1642            0 :         if !targets.is_empty() {
    1643              :             // If we had multiple shards, issue requests for the remainder now.
    1644            0 :             let jwt = self.config.jwt_token.clone();
    1645            0 :             self.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
    1646            0 :                 let create_req = create_req.clone();
    1647            0 :                 Box::pin(create_one(tenant_shard_id, node, jwt.clone(), create_req))
    1648            0 :             })
    1649            0 :             .await?;
    1650            0 :         }
    1651              : 
    1652            0 :         Ok(timeline_info)
    1653            0 :     }
    1654              : 
    1655              :     /// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation.
    1656              :     ///
    1657              :     /// On success, the returned vector contains exactly the same number of elements as the input `locations`.
    1658            0 :     async fn tenant_for_shards<F, R>(
    1659            0 :         &self,
    1660            0 :         locations: Vec<(TenantShardId, Node)>,
    1661            0 :         mut req_fn: F,
    1662            0 :     ) -> Result<Vec<R>, ApiError>
    1663            0 :     where
    1664            0 :         F: FnMut(
    1665            0 :             TenantShardId,
    1666            0 :             Node,
    1667            0 :         )
    1668            0 :             -> std::pin::Pin<Box<dyn futures::Future<Output = Result<R, ApiError>> + Send>>,
    1669            0 :     {
    1670            0 :         let mut futs = FuturesUnordered::new();
    1671            0 :         let mut results = Vec::with_capacity(locations.len());
    1672              : 
    1673            0 :         for (tenant_shard_id, node) in locations {
    1674            0 :             futs.push(req_fn(tenant_shard_id, node));
    1675            0 :         }
    1676              : 
    1677            0 :         while let Some(r) = futs.next().await {
    1678            0 :             results.push(r?);
    1679              :         }
    1680              : 
    1681            0 :         Ok(results)
    1682            0 :     }
    1683              : 
    1684            0 :     pub(crate) async fn tenant_timeline_delete(
    1685            0 :         &self,
    1686            0 :         tenant_id: TenantId,
    1687            0 :         timeline_id: TimelineId,
    1688            0 :     ) -> Result<StatusCode, ApiError> {
    1689            0 :         tracing::info!("Deleting timeline {}/{}", tenant_id, timeline_id,);
    1690              : 
    1691            0 :         self.ensure_attached_wait(tenant_id).await?;
    1692              : 
    1693              :         // TODO: refuse to do this if shard splitting is in progress
    1694              :         // (https://github.com/neondatabase/neon/issues/6676)
    1695            0 :         let mut targets = {
    1696            0 :             let locked = self.inner.read().unwrap();
    1697            0 :             let mut targets = Vec::new();
    1698              : 
    1699            0 :             for (tenant_shard_id, shard) in
    1700            0 :                 locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1701            0 :             {
    1702            0 :                 let node_id = shard.intent.get_attached().ok_or_else(|| {
    1703            0 :                     ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
    1704            0 :                 })?;
    1705            0 :                 let node = locked
    1706            0 :                     .nodes
    1707            0 :                     .get(&node_id)
    1708            0 :                     .expect("Pageservers may not be deleted while referenced");
    1709            0 : 
    1710            0 :                 targets.push((*tenant_shard_id, node.clone()));
    1711              :             }
    1712            0 :             targets
    1713            0 :         };
    1714            0 : 
    1715            0 :         if targets.is_empty() {
    1716            0 :             return Err(ApiError::NotFound(
    1717            0 :                 anyhow::anyhow!("Tenant not found").into(),
    1718            0 :             ));
    1719            0 :         }
    1720            0 :         let shard_zero = targets.remove(0);
    1721              : 
    1722            0 :         async fn delete_one(
    1723            0 :             tenant_shard_id: TenantShardId,
    1724            0 :             timeline_id: TimelineId,
    1725            0 :             node: Node,
    1726            0 :             jwt: Option<String>,
    1727            0 :         ) -> Result<StatusCode, ApiError> {
    1728            0 :             tracing::info!(
    1729            0 :                 "Deleting timeline on shard {}/{}, attached to node {}",
    1730            0 :                 tenant_shard_id,
    1731            0 :                 timeline_id,
    1732            0 :                 node.id
    1733            0 :             );
    1734              : 
    1735            0 :             let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
    1736            0 :             client
    1737            0 :                 .timeline_delete(tenant_shard_id, timeline_id)
    1738            0 :                 .await
    1739            0 :                 .map_err(|e| {
    1740            0 :                     ApiError::InternalServerError(anyhow::anyhow!(
    1741            0 :                     "Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}",
    1742            0 :                     node.id
    1743            0 :                 ))
    1744            0 :                 })
    1745            0 :         }
    1746              : 
    1747            0 :         let statuses = self
    1748            0 :             .tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
    1749            0 :                 Box::pin(delete_one(
    1750            0 :                     tenant_shard_id,
    1751            0 :                     timeline_id,
    1752            0 :                     node,
    1753            0 :                     self.config.jwt_token.clone(),
    1754            0 :                 ))
    1755            0 :             })
    1756            0 :             .await?;
    1757              : 
    1758              :         // If any shards >0 haven't finished deletion yet, don't start deletion on shard zero
    1759            0 :         if statuses.iter().any(|s| s != &StatusCode::NOT_FOUND) {
    1760            0 :             return Ok(StatusCode::ACCEPTED);
    1761            0 :         }
    1762              : 
    1763              :         // Delete shard zero last: this is not strictly necessary, but since a caller's GET on a timeline will be routed
    1764              :         // to shard zero, it gives a more obvious behavior that a GET returns 404 once the deletion is done.
    1765            0 :         let shard_zero_status = delete_one(
    1766            0 :             shard_zero.0,
    1767            0 :             timeline_id,
    1768            0 :             shard_zero.1,
    1769            0 :             self.config.jwt_token.clone(),
    1770            0 :         )
    1771            0 :         .await?;
    1772              : 
    1773            0 :         Ok(shard_zero_status)
    1774            0 :     }
    1775              : 
    1776              :     /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
    1777              :     /// function looks it up and returns the url.  If the tenant isn't found, returns Err(ApiError::NotFound)
    1778            0 :     pub(crate) fn tenant_shard0_baseurl(
    1779            0 :         &self,
    1780            0 :         tenant_id: TenantId,
    1781            0 :     ) -> Result<(String, TenantShardId), ApiError> {
    1782            0 :         let locked = self.inner.read().unwrap();
    1783            0 :         let Some((tenant_shard_id, shard)) = locked
    1784            0 :             .tenants
    1785            0 :             .range(TenantShardId::tenant_range(tenant_id))
    1786            0 :             .next()
    1787              :         else {
    1788            0 :             return Err(ApiError::NotFound(
    1789            0 :                 anyhow::anyhow!("Tenant {tenant_id} not found").into(),
    1790            0 :             ));
    1791              :         };
    1792              : 
    1793              :         // TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
    1794              :         // point to somewhere we haven't attached yet.
    1795            0 :         let Some(node_id) = shard.intent.get_attached() else {
    1796            0 :             tracing::warn!(
    1797            0 :                 tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
    1798            0 :                 "Shard not scheduled (policy {:?}), cannot generate pass-through URL",
    1799            0 :                 shard.policy
    1800            0 :             );
    1801            0 :             return Err(ApiError::Conflict(
    1802            0 :                 "Cannot call timeline API on non-attached tenant".to_string(),
    1803            0 :             ));
    1804              :         };
    1805              : 
    1806            0 :         let Some(node) = locked.nodes.get(node_id) else {
    1807              :             // This should never happen
    1808            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1809            0 :                 "Shard refers to nonexistent node"
    1810            0 :             )));
    1811              :         };
    1812              : 
    1813            0 :         Ok((node.base_url(), *tenant_shard_id))
    1814            0 :     }
    1815              : 
    1816            0 :     pub(crate) fn tenant_locate(
    1817            0 :         &self,
    1818            0 :         tenant_id: TenantId,
    1819            0 :     ) -> Result<TenantLocateResponse, ApiError> {
    1820            0 :         let locked = self.inner.read().unwrap();
    1821            0 :         tracing::info!("Locating shards for tenant {tenant_id}");
    1822              : 
    1823              :         // Take a snapshot of pageservers
    1824            0 :         let pageservers = locked.nodes.clone();
    1825            0 : 
    1826            0 :         let mut result = Vec::new();
    1827            0 :         let mut shard_params: Option<ShardParameters> = None;
    1828              : 
    1829            0 :         for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1830              :         {
    1831            0 :             let node_id =
    1832            0 :                 shard
    1833            0 :                     .intent
    1834            0 :                     .get_attached()
    1835            0 :                     .ok_or(ApiError::BadRequest(anyhow::anyhow!(
    1836            0 :                         "Cannot locate a tenant that is not attached"
    1837            0 :                     )))?;
    1838              : 
    1839            0 :             let node = pageservers
    1840            0 :                 .get(&node_id)
    1841            0 :                 .expect("Pageservers may not be deleted while referenced");
    1842            0 : 
    1843            0 :             result.push(TenantLocateResponseShard {
    1844            0 :                 shard_id: *tenant_shard_id,
    1845            0 :                 node_id,
    1846            0 :                 listen_http_addr: node.listen_http_addr.clone(),
    1847            0 :                 listen_http_port: node.listen_http_port,
    1848            0 :                 listen_pg_addr: node.listen_pg_addr.clone(),
    1849            0 :                 listen_pg_port: node.listen_pg_port,
    1850            0 :             });
    1851            0 : 
    1852            0 :             match &shard_params {
    1853            0 :                 None => {
    1854            0 :                     shard_params = Some(ShardParameters {
    1855            0 :                         stripe_size: shard.shard.stripe_size,
    1856            0 :                         count: shard.shard.count,
    1857            0 :                     });
    1858            0 :                 }
    1859            0 :                 Some(params) => {
    1860            0 :                     if params.stripe_size != shard.shard.stripe_size {
    1861              :                         // This should never happen.  We enforce at runtime because it's simpler than
    1862              :                         // adding an extra per-tenant data structure to store the things that should be the same
    1863            0 :                         return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1864            0 :                             "Inconsistent shard stripe size parameters!"
    1865            0 :                         )));
    1866            0 :                     }
    1867              :                 }
    1868              :             }
    1869              :         }
    1870              : 
    1871            0 :         if result.is_empty() {
    1872            0 :             return Err(ApiError::NotFound(
    1873            0 :                 anyhow::anyhow!("No shards for this tenant ID found").into(),
    1874            0 :             ));
    1875            0 :         }
    1876            0 :         let shard_params = shard_params.expect("result is non-empty, therefore this is set");
    1877            0 :         tracing::info!(
    1878            0 :             "Located tenant {} with params {:?} on shards {}",
    1879            0 :             tenant_id,
    1880            0 :             shard_params,
    1881            0 :             result
    1882            0 :                 .iter()
    1883            0 :                 .map(|s| format!("{:?}", s))
    1884            0 :                 .collect::<Vec<_>>()
    1885            0 :                 .join(",")
    1886            0 :         );
    1887              : 
    1888            0 :         Ok(TenantLocateResponse {
    1889            0 :             shards: result,
    1890            0 :             shard_params,
    1891            0 :         })
    1892            0 :     }
    1893              : 
    1894            0 :     pub(crate) async fn tenant_shard_split(
    1895            0 :         &self,
    1896            0 :         tenant_id: TenantId,
    1897            0 :         split_req: TenantShardSplitRequest,
    1898            0 :     ) -> Result<TenantShardSplitResponse, ApiError> {
    1899            0 :         let mut policy = None;
    1900            0 :         let mut shard_ident = None;
    1901              : 
    1902              :         // A parent shard which will be split
    1903              :         struct SplitTarget {
    1904              :             parent_id: TenantShardId,
    1905              :             node: Node,
    1906              :             child_ids: Vec<TenantShardId>,
    1907              :         }
    1908              : 
    1909              :         // Validate input, and calculate which shards we will create
    1910            0 :         let (old_shard_count, targets, compute_hook) =
    1911              :             {
    1912            0 :                 let locked = self.inner.read().unwrap();
    1913            0 : 
    1914            0 :                 let pageservers = locked.nodes.clone();
    1915            0 : 
    1916            0 :                 let mut targets = Vec::new();
    1917            0 : 
    1918            0 :                 // In case this is a retry, count how many already-split shards we found
    1919            0 :                 let mut children_found = Vec::new();
    1920            0 :                 let mut old_shard_count = None;
    1921              : 
    1922            0 :                 for (tenant_shard_id, shard) in
    1923            0 :                     locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1924              :                 {
    1925            0 :                     match shard.shard.count.count().cmp(&split_req.new_shard_count) {
    1926              :                         Ordering::Equal => {
    1927              :                             //  Already split this
    1928            0 :                             children_found.push(*tenant_shard_id);
    1929            0 :                             continue;
    1930              :                         }
    1931              :                         Ordering::Greater => {
    1932            0 :                             return Err(ApiError::BadRequest(anyhow::anyhow!(
    1933            0 :                                 "Requested count {} but already have shards at count {}",
    1934            0 :                                 split_req.new_shard_count,
    1935            0 :                                 shard.shard.count.count()
    1936            0 :                             )));
    1937              :                         }
    1938            0 :                         Ordering::Less => {
    1939            0 :                             // Fall through: this shard has lower count than requested,
    1940            0 :                             // is a candidate for splitting.
    1941            0 :                         }
    1942            0 :                     }
    1943            0 : 
    1944            0 :                     match old_shard_count {
    1945            0 :                         None => old_shard_count = Some(shard.shard.count),
    1946            0 :                         Some(old_shard_count) => {
    1947            0 :                             if old_shard_count != shard.shard.count {
    1948              :                                 // We may hit this case if a caller asked for two splits to
    1949              :                                 // different sizes, before the first one is complete.
    1950              :                                 // e.g. 1->2, 2->4, where the 4 call comes while we have a mixture
    1951              :                                 // of shard_count=1 and shard_count=2 shards in the map.
    1952            0 :                                 return Err(ApiError::Conflict(
    1953            0 :                                     "Cannot split, currently mid-split".to_string(),
    1954            0 :                                 ));
    1955            0 :                             }
    1956              :                         }
    1957              :                     }
    1958            0 :                     if policy.is_none() {
    1959            0 :                         policy = Some(shard.policy.clone());
    1960            0 :                     }
    1961            0 :                     if shard_ident.is_none() {
    1962            0 :                         shard_ident = Some(shard.shard);
    1963            0 :                     }
    1964              : 
    1965            0 :                     if tenant_shard_id.shard_count.count() == split_req.new_shard_count {
    1966            0 :                         tracing::info!(
    1967            0 :                             "Tenant shard {} already has shard count {}",
    1968            0 :                             tenant_shard_id,
    1969            0 :                             split_req.new_shard_count
    1970            0 :                         );
    1971            0 :                         continue;
    1972            0 :                     }
    1973              : 
    1974            0 :                     let node_id = shard.intent.get_attached().ok_or(ApiError::BadRequest(
    1975            0 :                         anyhow::anyhow!("Cannot split a tenant that is not attached"),
    1976            0 :                     ))?;
    1977              : 
    1978            0 :                     let node = pageservers
    1979            0 :                         .get(&node_id)
    1980            0 :                         .expect("Pageservers may not be deleted while referenced");
    1981            0 : 
    1982            0 :                     // TODO: if any reconciliation is currently in progress for this shard, wait for it.
    1983            0 : 
    1984            0 :                     targets.push(SplitTarget {
    1985            0 :                         parent_id: *tenant_shard_id,
    1986            0 :                         node: node.clone(),
    1987            0 :                         child_ids: tenant_shard_id
    1988            0 :                             .split(ShardCount::new(split_req.new_shard_count)),
    1989            0 :                     });
    1990              :                 }
    1991              : 
    1992            0 :                 if targets.is_empty() {
    1993            0 :                     if children_found.len() == split_req.new_shard_count as usize {
    1994            0 :                         return Ok(TenantShardSplitResponse {
    1995            0 :                             new_shards: children_found,
    1996            0 :                         });
    1997              :                     } else {
    1998              :                         // No shards found to split, and no existing children found: the
    1999              :                         // tenant doesn't exist at all.
    2000            0 :                         return Err(ApiError::NotFound(
    2001            0 :                             anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
    2002            0 :                         ));
    2003              :                     }
    2004            0 :                 }
    2005            0 : 
    2006            0 :                 (old_shard_count, targets, locked.compute_hook.clone())
    2007            0 :             };
    2008            0 : 
    2009            0 :         // unwrap safety: we would have returned above if we didn't find at least one shard to split
    2010            0 :         let old_shard_count = old_shard_count.unwrap();
    2011            0 :         let shard_ident = shard_ident.unwrap();
    2012            0 :         let policy = policy.unwrap();
    2013            0 : 
    2014            0 :         // FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
    2015            0 :         // request could occur here, deleting or mutating the tenant.  begin_shard_split checks that the
    2016            0 :         // parent shards exist as expected, but it would be neater to do the above pre-checks within the
    2017            0 :         // same database transaction rather than pre-check in-memory and then maybe-fail the database write.
    2018            0 :         // (https://github.com/neondatabase/neon/issues/6676)
    2019            0 : 
    2020            0 :         // Before creating any new child shards in memory or on the pageservers, persist them: this
    2021            0 :         // enables us to ensure that we will always be able to clean up if something goes wrong.  This also
    2022            0 :         // acts as the protection against two concurrent attempts to split: one of them will get a database
    2023            0 :         // error trying to insert the child shards.
    2024            0 :         let mut child_tsps = Vec::new();
    2025            0 :         for target in &targets {
    2026            0 :             let mut this_child_tsps = Vec::new();
    2027            0 :             for child in &target.child_ids {
    2028            0 :                 let mut child_shard = shard_ident;
    2029            0 :                 child_shard.number = child.shard_number;
    2030            0 :                 child_shard.count = child.shard_count;
    2031            0 : 
    2032            0 :                 this_child_tsps.push(TenantShardPersistence {
    2033            0 :                     tenant_id: child.tenant_id.to_string(),
    2034            0 :                     shard_number: child.shard_number.0 as i32,
    2035            0 :                     shard_count: child.shard_count.literal() as i32,
    2036            0 :                     shard_stripe_size: shard_ident.stripe_size.0 as i32,
    2037            0 :                     // Note: this generation is a placeholder, [`Persistence::begin_shard_split`] will
    2038            0 :                     // populate the correct generation as part of its transaction, to protect us
    2039            0 :                     // against racing with changes in the state of the parent.
    2040            0 :                     generation: 0,
    2041            0 :                     generation_pageserver: target.node.id.0 as i64,
    2042            0 :                     placement_policy: serde_json::to_string(&policy).unwrap(),
    2043            0 :                     // TODO: get the config out of the map
    2044            0 :                     config: serde_json::to_string(&TenantConfig::default()).unwrap(),
    2045            0 :                     splitting: SplitState::Splitting,
    2046            0 :                 });
    2047            0 :             }
    2048              : 
    2049            0 :             child_tsps.push((target.parent_id, this_child_tsps));
    2050              :         }
    2051              : 
    2052            0 :         if let Err(e) = self
    2053            0 :             .persistence
    2054            0 :             .begin_shard_split(old_shard_count, tenant_id, child_tsps)
    2055            0 :             .await
    2056              :         {
    2057            0 :             match e {
    2058              :                 DatabaseError::Query(diesel::result::Error::DatabaseError(
    2059              :                     DatabaseErrorKind::UniqueViolation,
    2060              :                     _,
    2061              :                 )) => {
    2062              :                     // Inserting a child shard violated a unique constraint: we raced with another call to
    2063              :                     // this function
    2064            0 :                     tracing::warn!("Conflicting attempt to split {tenant_id}: {e}");
    2065            0 :                     return Err(ApiError::Conflict("Tenant is already splitting".into()));
    2066              :                 }
    2067            0 :                 _ => return Err(ApiError::InternalServerError(e.into())),
    2068              :             }
    2069            0 :         }
    2070            0 : 
    2071            0 :         // Now that I have persisted the splitting state, apply it in-memory.  This is infallible, so
    2072            0 :         // callers may assume that if splitting is set in memory, then it was persisted, and if splitting
    2073            0 :         // is not set in memory, then it was not persisted.
    2074            0 :         {
    2075            0 :             let mut locked = self.inner.write().unwrap();
    2076            0 :             for target in &targets {
    2077            0 :                 if let Some(parent_shard) = locked.tenants.get_mut(&target.parent_id) {
    2078            0 :                     parent_shard.splitting = SplitState::Splitting;
    2079            0 :                 }
    2080              :             }
    2081              :         }
    2082              : 
    2083              :         // FIXME: we have now committed the shard split state to the database, so any subsequent
    2084              :         // failure needs to roll it back.  We will later wrap this function in logic to roll back
    2085              :         // the split if it fails.
    2086              :         // (https://github.com/neondatabase/neon/issues/6676)
    2087              : 
    2088              :         // TODO: issue split calls concurrently (this only matters once we're splitting
    2089              :         // N>1 shards into M shards -- initially we're usually splitting 1 shard into N).
    2090              : 
    2091            0 :         for target in &targets {
    2092              :             let SplitTarget {
    2093            0 :                 parent_id,
    2094            0 :                 node,
    2095            0 :                 child_ids,
    2096            0 :             } = target;
    2097            0 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
    2098            0 :             let response = client
    2099            0 :                 .tenant_shard_split(
    2100            0 :                     *parent_id,
    2101            0 :                     TenantShardSplitRequest {
    2102            0 :                         new_shard_count: split_req.new_shard_count,
    2103            0 :                     },
    2104            0 :                 )
    2105            0 :                 .await
    2106            0 :                 .map_err(|e| ApiError::Conflict(format!("Failed to split {}: {}", parent_id, e)))?;
    2107              : 
    2108            0 :             tracing::info!(
    2109            0 :                 "Split {} into {}",
    2110            0 :                 parent_id,
    2111            0 :                 response
    2112            0 :                     .new_shards
    2113            0 :                     .iter()
    2114            0 :                     .map(|s| format!("{:?}", s))
    2115            0 :                     .collect::<Vec<_>>()
    2116            0 :                     .join(",")
    2117            0 :             );
    2118              : 
    2119            0 :             if &response.new_shards != child_ids {
    2120              :                 // This should never happen: the pageserver should agree with us on how shard splits work.
    2121            0 :                 return Err(ApiError::InternalServerError(anyhow::anyhow!(
    2122            0 :                     "Splitting shard {} resulted in unexpected IDs: {:?} (expected {:?})",
    2123            0 :                     parent_id,
    2124            0 :                     response.new_shards,
    2125            0 :                     child_ids
    2126            0 :                 )));
    2127            0 :             }
    2128              :         }
    2129              : 
    2130              :         // TODO: if the pageserver restarted concurrently with our split API call,
    2131              :         // the actual generation of the child shard might differ from the generation
    2132              :         // we expect it to have.  In order for our in-database generation to end up
    2133              :         // correct, we should carry the child generation back in the response and apply it here
    2134              :         // in complete_shard_split (and apply the correct generation in memory)
    2135              :         // (or, we can carry generation in the request and reject the request if
    2136              :         //  it doesn't match, but that requires more retry logic on this side)
    2137              : 
    2138            0 :         self.persistence
    2139            0 :             .complete_shard_split(tenant_id, old_shard_count)
    2140            0 :             .await?;
    2141              : 
    2142              :         // Replace all the shards we just split with their children: this phase is infallible.
    2143            0 :         let mut response = TenantShardSplitResponse {
    2144            0 :             new_shards: Vec::new(),
    2145            0 :         };
    2146            0 :         let mut child_locations = Vec::new();
    2147            0 :         {
    2148            0 :             let mut locked = self.inner.write().unwrap();
    2149            0 :             let (_nodes, tenants, scheduler) = locked.parts_mut();
    2150            0 :             for target in targets {
    2151              :                 let SplitTarget {
    2152            0 :                     parent_id,
    2153            0 :                     node: _node,
    2154            0 :                     child_ids,
    2155            0 :                 } = target;
    2156            0 :                 let (pageserver, generation, config) = {
    2157            0 :                     let mut old_state = tenants
    2158            0 :                         .remove(&parent_id)
    2159            0 :                         .expect("It was present, we just split it");
    2160            0 :                     let old_attached = old_state.intent.get_attached().unwrap();
    2161            0 :                     old_state.intent.clear(scheduler);
    2162            0 :                     (old_attached, old_state.generation, old_state.config.clone())
    2163            0 :                 };
    2164              : 
    2165            0 :                 for child in child_ids {
    2166            0 :                     let mut child_shard = shard_ident;
    2167            0 :                     child_shard.number = child.shard_number;
    2168            0 :                     child_shard.count = child.shard_count;
    2169            0 : 
    2170            0 :                     let mut child_observed: HashMap<NodeId, ObservedStateLocation> = HashMap::new();
    2171            0 :                     child_observed.insert(
    2172            0 :                         pageserver,
    2173            0 :                         ObservedStateLocation {
    2174            0 :                             conf: Some(attached_location_conf(generation, &child_shard, &config)),
    2175            0 :                         },
    2176            0 :                     );
    2177            0 : 
    2178            0 :                     let mut child_state = TenantState::new(child, child_shard, policy.clone());
    2179            0 :                     child_state.intent = IntentState::single(scheduler, Some(pageserver));
    2180            0 :                     child_state.observed = ObservedState {
    2181            0 :                         locations: child_observed,
    2182            0 :                     };
    2183            0 :                     child_state.generation = generation;
    2184            0 :                     child_state.config = config.clone();
    2185            0 : 
    2186            0 :                     // The child's TenantState::splitting is intentionally left at the default value of Idle,
    2187            0 :                     // as at this point in the split process we have succeeded and this part is infallible:
    2188            0 :                     // we will never need to do any special recovery from this state.
    2189            0 : 
    2190            0 :                     child_locations.push((child, pageserver));
    2191            0 : 
    2192            0 :                     tenants.insert(child, child_state);
    2193            0 :                     response.new_shards.push(child);
    2194            0 :                 }
    2195              :             }
    2196              :         }
    2197              : 
    2198              :         // Send compute notifications for all the new shards
    2199            0 :         let mut failed_notifications = Vec::new();
    2200            0 :         for (child_id, child_ps) in child_locations {
    2201            0 :             if let Err(e) = compute_hook.notify(child_id, child_ps, &self.cancel).await {
    2202            0 :                 tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
    2203            0 :                         child_id, child_ps);
    2204            0 :                 failed_notifications.push(child_id);
    2205            0 :             }
    2206              :         }
    2207              : 
    2208              :         // If we failed any compute notifications, make a note to retry later.
    2209            0 :         if !failed_notifications.is_empty() {
    2210            0 :             let mut locked = self.inner.write().unwrap();
    2211            0 :             for failed in failed_notifications {
    2212            0 :                 if let Some(shard) = locked.tenants.get_mut(&failed) {
    2213            0 :                     shard.pending_compute_notification = true;
    2214            0 :                 }
    2215              :             }
    2216            0 :         }
    2217              : 
    2218            0 :         Ok(response)
    2219            0 :     }
    2220              : 
    2221            0 :     pub(crate) async fn tenant_shard_migrate(
    2222            0 :         &self,
    2223            0 :         tenant_shard_id: TenantShardId,
    2224            0 :         migrate_req: TenantShardMigrateRequest,
    2225            0 :     ) -> Result<TenantShardMigrateResponse, ApiError> {
    2226            0 :         let waiter = {
    2227            0 :             let mut locked = self.inner.write().unwrap();
    2228            0 :             let result_tx = locked.result_tx.clone();
    2229            0 :             let compute_hook = locked.compute_hook.clone();
    2230            0 :             let (nodes, tenants, scheduler) = locked.parts_mut();
    2231              : 
    2232            0 :             let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
    2233            0 :                 return Err(ApiError::NotFound(
    2234            0 :                     anyhow::anyhow!("Tenant shard not found").into(),
    2235            0 :                 ));
    2236              :             };
    2237              : 
    2238            0 :             if shard.intent.get_attached() == &Some(migrate_req.node_id) {
    2239              :                 // No-op case: we will still proceed to wait for reconciliation in case it is
    2240              :                 // incomplete from an earlier update to the intent.
    2241            0 :                 tracing::info!("Migrating: intent is unchanged {:?}", shard.intent);
    2242              :             } else {
    2243            0 :                 let old_attached = *shard.intent.get_attached();
    2244            0 : 
    2245            0 :                 match shard.policy {
    2246            0 :                     PlacementPolicy::Single => {
    2247            0 :                         shard.intent.clear_secondary(scheduler);
    2248            0 :                     }
    2249            0 :                     PlacementPolicy::Double(_n) => {
    2250            0 :                         // If our new attached node was a secondary, it no longer should be.
    2251            0 :                         shard.intent.remove_secondary(scheduler, migrate_req.node_id);
    2252              : 
    2253              :                         // If we were already attached to something, demote that to a secondary
    2254            0 :                         if let Some(old_attached) = old_attached {
    2255            0 :                             shard.intent.push_secondary(scheduler, old_attached);
    2256            0 :                         }
    2257              :                     }
    2258              :                     PlacementPolicy::Detached => {
    2259            0 :                         return Err(ApiError::BadRequest(anyhow::anyhow!(
    2260            0 :                             "Cannot migrate a tenant that is PlacementPolicy::Detached: configure it to an attached policy first"
    2261            0 :                         )))
    2262              :                     }
    2263              :                 }
    2264            0 :                 shard
    2265            0 :                     .intent
    2266            0 :                     .set_attached(scheduler, Some(migrate_req.node_id));
    2267              : 
    2268            0 :                 tracing::info!("Migrating: new intent {:?}", shard.intent);
    2269            0 :                 shard.sequence = shard.sequence.next();
    2270              :             }
    2271              : 
    2272            0 :             shard.maybe_reconcile(
    2273            0 :                 result_tx,
    2274            0 :                 nodes,
    2275            0 :                 &compute_hook,
    2276            0 :                 &self.config,
    2277            0 :                 &self.persistence,
    2278            0 :                 &self.gate,
    2279            0 :                 &self.cancel,
    2280            0 :             )
    2281              :         };
    2282              : 
    2283            0 :         if let Some(waiter) = waiter {
    2284            0 :             waiter.wait_timeout(RECONCILE_TIMEOUT).await?;
    2285              :         } else {
    2286            0 :             tracing::warn!("Migration is a no-op");
    2287              :         }
    2288              : 
    2289            0 :         Ok(TenantShardMigrateResponse {})
    2290            0 :     }
    2291              : 
    2292              :     /// This is for debug/support only: we simply drop all state for a tenant, without
    2293              :     /// detaching or deleting it on pageservers.
    2294            0 :     pub(crate) async fn tenant_drop(&self, tenant_id: TenantId) -> Result<(), ApiError> {
    2295            0 :         self.persistence.delete_tenant(tenant_id).await?;
    2296              : 
    2297            0 :         let mut locked = self.inner.write().unwrap();
    2298            0 :         let (_nodes, tenants, scheduler) = locked.parts_mut();
    2299            0 :         let mut shards = Vec::new();
    2300            0 :         for (tenant_shard_id, _) in tenants.range(TenantShardId::tenant_range(tenant_id)) {
    2301            0 :             shards.push(*tenant_shard_id);
    2302            0 :         }
    2303              : 
    2304            0 :         for shard_id in shards {
    2305            0 :             if let Some(mut shard) = tenants.remove(&shard_id) {
    2306            0 :                 shard.intent.clear(scheduler);
    2307            0 :             }
    2308              :         }
    2309              : 
    2310            0 :         Ok(())
    2311            0 :     }
    2312              : 
    2313              :     /// For debug/support: a full JSON dump of TenantStates.  Returns a response so that
    2314              :     /// we don't have to make TenantState clonable in the return path.
    2315            0 :     pub(crate) fn tenants_dump(&self) -> Result<hyper::Response<hyper::Body>, ApiError> {
    2316            0 :         let serialized = {
    2317            0 :             let locked = self.inner.read().unwrap();
    2318            0 :             let result = locked.tenants.values().collect::<Vec<_>>();
    2319            0 :             serde_json::to_string(&result).map_err(|e| ApiError::InternalServerError(e.into()))?
    2320              :         };
    2321              : 
    2322            0 :         hyper::Response::builder()
    2323            0 :             .status(hyper::StatusCode::OK)
    2324            0 :             .header(hyper::header::CONTENT_TYPE, "application/json")
    2325            0 :             .body(hyper::Body::from(serialized))
    2326            0 :             .map_err(|e| ApiError::InternalServerError(e.into()))
    2327            0 :     }
    2328              : 
    2329              :     /// Check the consistency of in-memory state vs. persistent state, and check that the
    2330              :     /// scheduler's statistics are up to date.
    2331              :     ///
    2332              :     /// These consistency checks expect an **idle** system.  If changes are going on while
    2333              :     /// we run, then we can falsely indicate a consistency issue.  This is sufficient for end-of-test
    2334              :     /// checks, but not suitable for running continuously in the background in the field.
    2335            0 :     pub(crate) async fn consistency_check(&self) -> Result<(), ApiError> {
    2336            0 :         let (mut expect_nodes, mut expect_shards) = {
    2337            0 :             let locked = self.inner.read().unwrap();
    2338            0 : 
    2339            0 :             locked
    2340            0 :                 .scheduler
    2341            0 :                 .consistency_check(locked.nodes.values(), locked.tenants.values())
    2342            0 :                 .context("Scheduler checks")
    2343            0 :                 .map_err(ApiError::InternalServerError)?;
    2344              : 
    2345            0 :             let expect_nodes = locked
    2346            0 :                 .nodes
    2347            0 :                 .values()
    2348            0 :                 .map(|n| n.to_persistent())
    2349            0 :                 .collect::<Vec<_>>();
    2350            0 : 
    2351            0 :             let expect_shards = locked
    2352            0 :                 .tenants
    2353            0 :                 .values()
    2354            0 :                 .map(|t| t.to_persistent())
    2355            0 :                 .collect::<Vec<_>>();
    2356            0 : 
    2357            0 :             (expect_nodes, expect_shards)
    2358              :         };
    2359              : 
    2360            0 :         let mut nodes = self.persistence.list_nodes().await?;
    2361            0 :         expect_nodes.sort_by_key(|n| n.node_id);
    2362            0 :         nodes.sort_by_key(|n| n.node_id);
    2363            0 : 
    2364            0 :         if nodes != expect_nodes {
    2365            0 :             tracing::error!("Consistency check failed on nodes.");
    2366            0 :             tracing::error!(
    2367            0 :                 "Nodes in memory: {}",
    2368            0 :                 serde_json::to_string(&expect_nodes)
    2369            0 :                     .map_err(|e| ApiError::InternalServerError(e.into()))?
    2370            0 :             );
    2371            0 :             tracing::error!(
    2372            0 :                 "Nodes in database: {}",
    2373            0 :                 serde_json::to_string(&nodes)
    2374            0 :                     .map_err(|e| ApiError::InternalServerError(e.into()))?
    2375            0 :             );
    2376            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
    2377            0 :                 "Node consistency failure"
    2378            0 :             )));
    2379            0 :         }
    2380              : 
    2381            0 :         let mut shards = self.persistence.list_tenant_shards().await?;
    2382            0 :         shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
    2383            0 :         expect_shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
    2384            0 : 
    2385            0 :         if shards != expect_shards {
    2386            0 :             tracing::error!("Consistency check failed on shards.");
    2387            0 :             tracing::error!(
    2388            0 :                 "Shards in memory: {}",
    2389            0 :                 serde_json::to_string(&expect_shards)
    2390            0 :                     .map_err(|e| ApiError::InternalServerError(e.into()))?
    2391            0 :             );
    2392            0 :             tracing::error!(
    2393            0 :                 "Shards in database: {}",
    2394            0 :                 serde_json::to_string(&shards)
    2395            0 :                     .map_err(|e| ApiError::InternalServerError(e.into()))?
    2396            0 :             );
    2397            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
    2398            0 :                 "Shard consistency failure"
    2399            0 :             )));
    2400            0 :         }
    2401            0 : 
    2402            0 :         Ok(())
    2403            0 :     }
    2404              : 
    2405              :     /// For debug/support: a JSON dump of the [`Scheduler`].  Returns a response so that
    2406              :     /// we don't have to make TenantState clonable in the return path.
    2407            0 :     pub(crate) fn scheduler_dump(&self) -> Result<hyper::Response<hyper::Body>, ApiError> {
    2408            0 :         let serialized = {
    2409            0 :             let locked = self.inner.read().unwrap();
    2410            0 :             serde_json::to_string(&locked.scheduler)
    2411            0 :                 .map_err(|e| ApiError::InternalServerError(e.into()))?
    2412              :         };
    2413              : 
    2414            0 :         hyper::Response::builder()
    2415            0 :             .status(hyper::StatusCode::OK)
    2416            0 :             .header(hyper::header::CONTENT_TYPE, "application/json")
    2417            0 :             .body(hyper::Body::from(serialized))
    2418            0 :             .map_err(|e| ApiError::InternalServerError(e.into()))
    2419            0 :     }
    2420              : 
    2421              :     /// This is for debug/support only: we simply drop all state for a tenant, without
    2422              :     /// detaching or deleting it on pageservers.  We do not try and re-schedule any
    2423              :     /// tenants that were on this node.
    2424              :     ///
    2425              :     /// TODO: proper node deletion API that unhooks things more gracefully
    2426            0 :     pub(crate) async fn node_drop(&self, node_id: NodeId) -> Result<(), ApiError> {
    2427            0 :         self.persistence.delete_node(node_id).await?;
    2428              : 
    2429            0 :         let mut locked = self.inner.write().unwrap();
    2430              : 
    2431            0 :         for shard in locked.tenants.values_mut() {
    2432            0 :             shard.deref_node(node_id);
    2433            0 :         }
    2434              : 
    2435            0 :         let mut nodes = (*locked.nodes).clone();
    2436            0 :         nodes.remove(&node_id);
    2437            0 :         locked.nodes = Arc::new(nodes);
    2438            0 : 
    2439            0 :         locked.scheduler.node_remove(node_id);
    2440            0 : 
    2441            0 :         Ok(())
    2442            0 :     }
    2443              : 
    2444            0 :     pub(crate) async fn node_list(&self) -> Result<Vec<Node>, ApiError> {
    2445            0 :         let nodes = {
    2446            0 :             self.inner
    2447            0 :                 .read()
    2448            0 :                 .unwrap()
    2449            0 :                 .nodes
    2450            0 :                 .values()
    2451            0 :                 .cloned()
    2452            0 :                 .collect::<Vec<_>>()
    2453            0 :         };
    2454            0 : 
    2455            0 :         Ok(nodes)
    2456            0 :     }
    2457              : 
    2458            0 :     pub(crate) async fn node_register(
    2459            0 :         &self,
    2460            0 :         register_req: NodeRegisterRequest,
    2461            0 :     ) -> Result<(), ApiError> {
    2462            0 :         // Pre-check for an already-existing node
    2463            0 :         {
    2464            0 :             let locked = self.inner.read().unwrap();
    2465            0 :             if let Some(node) = locked.nodes.get(&register_req.node_id) {
    2466              :                 // Note that we do not do a total equality of the struct, because we don't require
    2467              :                 // the availability/scheduling states to agree for a POST to be idempotent.
    2468            0 :                 if node.listen_http_addr == register_req.listen_http_addr
    2469            0 :                     && node.listen_http_port == register_req.listen_http_port
    2470            0 :                     && node.listen_pg_addr == register_req.listen_pg_addr
    2471            0 :                     && node.listen_pg_port == register_req.listen_pg_port
    2472              :                 {
    2473            0 :                     tracing::info!(
    2474            0 :                         "Node {} re-registered with matching address",
    2475            0 :                         register_req.node_id
    2476            0 :                     );
    2477            0 :                     return Ok(());
    2478              :                 } else {
    2479              :                     // TODO: decide if we want to allow modifying node addresses without removing and re-adding
    2480              :                     // the node.  Safest/simplest thing is to refuse it, and usually we deploy with
    2481              :                     // a fixed address through the lifetime of a node.
    2482            0 :                     tracing::warn!(
    2483            0 :                         "Node {} tried to register with different address",
    2484            0 :                         register_req.node_id
    2485            0 :                     );
    2486            0 :                     return Err(ApiError::Conflict(
    2487            0 :                         "Node is already registered with different address".to_string(),
    2488            0 :                     ));
    2489              :                 }
    2490            0 :             }
    2491            0 :         }
    2492            0 : 
    2493            0 :         // Ordering: we must persist the new node _before_ adding it to in-memory state.
    2494            0 :         // This ensures that before we use it for anything or expose it via any external
    2495            0 :         // API, it is guaranteed to be available after a restart.
    2496            0 :         let new_node = Node {
    2497            0 :             id: register_req.node_id,
    2498            0 :             listen_http_addr: register_req.listen_http_addr,
    2499            0 :             listen_http_port: register_req.listen_http_port,
    2500            0 :             listen_pg_addr: register_req.listen_pg_addr,
    2501            0 :             listen_pg_port: register_req.listen_pg_port,
    2502            0 :             scheduling: NodeSchedulingPolicy::Filling,
    2503            0 :             // TODO: we shouldn't really call this Active until we've heartbeated it.
    2504            0 :             availability: NodeAvailability::Active,
    2505            0 :         };
    2506            0 :         // TODO: idempotency if the node already exists in the database
    2507            0 :         self.persistence.insert_node(&new_node).await?;
    2508              : 
    2509            0 :         let mut locked = self.inner.write().unwrap();
    2510            0 :         let mut new_nodes = (*locked.nodes).clone();
    2511            0 : 
    2512            0 :         locked.scheduler.node_upsert(&new_node);
    2513            0 :         new_nodes.insert(register_req.node_id, new_node);
    2514            0 : 
    2515            0 :         locked.nodes = Arc::new(new_nodes);
    2516              : 
    2517            0 :         tracing::info!(
    2518            0 :             "Registered pageserver {}, now have {} pageservers",
    2519            0 :             register_req.node_id,
    2520            0 :             locked.nodes.len()
    2521            0 :         );
    2522            0 :         Ok(())
    2523            0 :     }
    2524              : 
    2525            0 :     pub(crate) async fn node_configure(
    2526            0 :         &self,
    2527            0 :         config_req: NodeConfigureRequest,
    2528            0 :     ) -> Result<(), ApiError> {
    2529            0 :         if let Some(scheduling) = config_req.scheduling {
    2530              :             // Scheduling is a persistent part of Node: we must write updates to the database before
    2531              :             // applying them in memory
    2532            0 :             self.persistence
    2533            0 :                 .update_node(config_req.node_id, scheduling)
    2534            0 :                 .await?;
    2535            0 :         }
    2536              : 
    2537            0 :         let mut locked = self.inner.write().unwrap();
    2538            0 :         let result_tx = locked.result_tx.clone();
    2539            0 :         let compute_hook = locked.compute_hook.clone();
    2540            0 :         let (nodes, tenants, scheduler) = locked.parts_mut();
    2541            0 : 
    2542            0 :         let mut new_nodes = (**nodes).clone();
    2543              : 
    2544            0 :         let Some(node) = new_nodes.get_mut(&config_req.node_id) else {
    2545            0 :             return Err(ApiError::NotFound(
    2546            0 :                 anyhow::anyhow!("Node not registered").into(),
    2547            0 :             ));
    2548              :         };
    2549              : 
    2550            0 :         let mut offline_transition = false;
    2551            0 :         let mut active_transition = false;
    2552              : 
    2553            0 :         if let Some(availability) = &config_req.availability {
    2554            0 :             match (availability, &node.availability) {
    2555              :                 (NodeAvailability::Offline, NodeAvailability::Active) => {
    2556            0 :                     tracing::info!("Node {} transition to offline", config_req.node_id);
    2557            0 :                     offline_transition = true;
    2558              :                 }
    2559              :                 (NodeAvailability::Active, NodeAvailability::Offline) => {
    2560            0 :                     tracing::info!("Node {} transition to active", config_req.node_id);
    2561            0 :                     active_transition = true;
    2562              :                 }
    2563              :                 _ => {
    2564            0 :                     tracing::info!("Node {} no change during config", config_req.node_id);
    2565              :                     // No change
    2566              :                 }
    2567              :             };
    2568            0 :             node.availability = *availability;
    2569            0 :         }
    2570              : 
    2571            0 :         if let Some(scheduling) = config_req.scheduling {
    2572            0 :             node.scheduling = scheduling;
    2573            0 : 
    2574            0 :             // TODO: once we have a background scheduling ticker for fill/drain, kick it
    2575            0 :             // to wake up and start working.
    2576            0 :         }
    2577              : 
    2578              :         // Update the scheduler, in case the elegibility of the node for new shards has changed
    2579            0 :         scheduler.node_upsert(node);
    2580            0 : 
    2581            0 :         let new_nodes = Arc::new(new_nodes);
    2582            0 : 
    2583            0 :         if offline_transition {
    2584            0 :             let mut tenants_affected: usize = 0;
    2585            0 :             for (tenant_shard_id, tenant_state) in tenants {
    2586            0 :                 if let Some(observed_loc) =
    2587            0 :                     tenant_state.observed.locations.get_mut(&config_req.node_id)
    2588            0 :                 {
    2589            0 :                     // When a node goes offline, we set its observed configuration to None, indicating unknown: we will
    2590            0 :                     // not assume our knowledge of the node's configuration is accurate until it comes back online
    2591            0 :                     observed_loc.conf = None;
    2592            0 :                 }
    2593              : 
    2594            0 :                 if tenant_state.intent.notify_offline(config_req.node_id) {
    2595            0 :                     tenant_state.sequence = tenant_state.sequence.next();
    2596            0 :                     match tenant_state.schedule(scheduler) {
    2597            0 :                         Err(e) => {
    2598              :                             // It is possible that some tenants will become unschedulable when too many pageservers
    2599              :                             // go offline: in this case there isn't much we can do other than make the issue observable.
    2600              :                             // TODO: give TenantState a scheduling error attribute to be queried later.
    2601            0 :                             tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id);
    2602              :                         }
    2603              :                         Ok(()) => {
    2604            0 :                             if tenant_state
    2605            0 :                                 .maybe_reconcile(
    2606            0 :                                     result_tx.clone(),
    2607            0 :                                     &new_nodes,
    2608            0 :                                     &compute_hook,
    2609            0 :                                     &self.config,
    2610            0 :                                     &self.persistence,
    2611            0 :                                     &self.gate,
    2612            0 :                                     &self.cancel,
    2613            0 :                                 )
    2614            0 :                                 .is_some()
    2615            0 :                             {
    2616            0 :                                 tenants_affected += 1;
    2617            0 :                             };
    2618              :                         }
    2619              :                     }
    2620            0 :                 }
    2621              :             }
    2622            0 :             tracing::info!(
    2623            0 :                 "Launched {} reconciler tasks for tenants affected by node {} going offline",
    2624            0 :                 tenants_affected,
    2625            0 :                 config_req.node_id
    2626            0 :             )
    2627            0 :         }
    2628              : 
    2629            0 :         if active_transition {
    2630              :             // When a node comes back online, we must reconcile any tenant that has a None observed
    2631              :             // location on the node.
    2632            0 :             for tenant_state in locked.tenants.values_mut() {
    2633            0 :                 if let Some(observed_loc) =
    2634            0 :                     tenant_state.observed.locations.get_mut(&config_req.node_id)
    2635              :                 {
    2636            0 :                     if observed_loc.conf.is_none() {
    2637            0 :                         tenant_state.maybe_reconcile(
    2638            0 :                             result_tx.clone(),
    2639            0 :                             &new_nodes,
    2640            0 :                             &compute_hook,
    2641            0 :                             &self.config,
    2642            0 :                             &self.persistence,
    2643            0 :                             &self.gate,
    2644            0 :                             &self.cancel,
    2645            0 :                         );
    2646            0 :                     }
    2647            0 :                 }
    2648              :             }
    2649              : 
    2650              :             // TODO: in the background, we should balance work back onto this pageserver
    2651            0 :         }
    2652              : 
    2653            0 :         locked.nodes = new_nodes;
    2654            0 : 
    2655            0 :         Ok(())
    2656            0 :     }
    2657              : 
    2658              :     /// Helper for methods that will try and call pageserver APIs for
    2659              :     /// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
    2660              :     /// is attached somewhere.
    2661            0 :     fn ensure_attached_schedule(
    2662            0 :         &self,
    2663            0 :         mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,
    2664            0 :         tenant_id: TenantId,
    2665            0 :     ) -> Result<Vec<ReconcilerWaiter>, anyhow::Error> {
    2666            0 :         let mut waiters = Vec::new();
    2667            0 :         let result_tx = locked.result_tx.clone();
    2668            0 :         let compute_hook = locked.compute_hook.clone();
    2669            0 :         let (nodes, tenants, scheduler) = locked.parts_mut();
    2670              : 
    2671            0 :         for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
    2672            0 :             shard.schedule(scheduler)?;
    2673              : 
    2674            0 :             if let Some(waiter) = shard.maybe_reconcile(
    2675            0 :                 result_tx.clone(),
    2676            0 :                 nodes,
    2677            0 :                 &compute_hook,
    2678            0 :                 &self.config,
    2679            0 :                 &self.persistence,
    2680            0 :                 &self.gate,
    2681            0 :                 &self.cancel,
    2682            0 :             ) {
    2683            0 :                 waiters.push(waiter);
    2684            0 :             }
    2685              :         }
    2686            0 :         Ok(waiters)
    2687            0 :     }
    2688              : 
    2689            0 :     async fn ensure_attached_wait(&self, tenant_id: TenantId) -> Result<(), ApiError> {
    2690            0 :         let ensure_waiters = {
    2691            0 :             let locked = self.inner.write().unwrap();
    2692              : 
    2693              :             // Check if the tenant is splitting: in this case, even if it is attached,
    2694              :             // we must act as if it is not: this blocks e.g. timeline creation/deletion
    2695              :             // operations during the split.
    2696            0 :             for (_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
    2697            0 :                 if !matches!(shard.splitting, SplitState::Idle) {
    2698            0 :                     return Err(ApiError::ResourceUnavailable(
    2699            0 :                         "Tenant shards are currently splitting".into(),
    2700            0 :                     ));
    2701            0 :                 }
    2702              :             }
    2703              : 
    2704            0 :             self.ensure_attached_schedule(locked, tenant_id)
    2705            0 :                 .map_err(ApiError::InternalServerError)?
    2706              :         };
    2707              : 
    2708            0 :         let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap();
    2709            0 :         for waiter in ensure_waiters {
    2710            0 :             let timeout = deadline.duration_since(Instant::now());
    2711            0 :             waiter.wait_timeout(timeout).await?;
    2712              :         }
    2713              : 
    2714            0 :         Ok(())
    2715            0 :     }
    2716              : 
    2717              :     /// Check all tenants for pending reconciliation work, and reconcile those in need
    2718              :     ///
    2719              :     /// Returns how many reconciliation tasks were started
    2720            0 :     fn reconcile_all(&self) -> usize {
    2721            0 :         let mut locked = self.inner.write().unwrap();
    2722            0 :         let result_tx = locked.result_tx.clone();
    2723            0 :         let compute_hook = locked.compute_hook.clone();
    2724            0 :         let pageservers = locked.nodes.clone();
    2725            0 :         locked
    2726            0 :             .tenants
    2727            0 :             .iter_mut()
    2728            0 :             .filter_map(|(_tenant_shard_id, shard)| {
    2729            0 :                 shard.maybe_reconcile(
    2730            0 :                     result_tx.clone(),
    2731            0 :                     &pageservers,
    2732            0 :                     &compute_hook,
    2733            0 :                     &self.config,
    2734            0 :                     &self.persistence,
    2735            0 :                     &self.gate,
    2736            0 :                     &self.cancel,
    2737            0 :                 )
    2738            0 :             })
    2739            0 :             .count()
    2740            0 :     }
    2741              : 
    2742            0 :     pub async fn shutdown(&self) {
    2743            0 :         // Note that this already stops processing any results from reconciles: so
    2744            0 :         // we do not expect that our [`TenantState`] objects will reach a neat
    2745            0 :         // final state.
    2746            0 :         self.cancel.cancel();
    2747            0 : 
    2748            0 :         // The cancellation tokens in [`crate::reconciler::Reconciler`] are children
    2749            0 :         // of our cancellation token, so we do not need to explicitly cancel each of
    2750            0 :         // them.
    2751            0 : 
    2752            0 :         // Background tasks and reconcilers hold gate guards: this waits for them all
    2753            0 :         // to complete.
    2754            0 :         self.gate.close().await;
    2755            0 :     }
    2756              : }
        

Generated by: LCOV version 2.1-beta