|             Line data    Source code 
       1              : use std::{
       2              :     collections::{BTreeMap, HashMap},
       3              :     str::FromStr,
       4              :     sync::Arc,
       5              :     time::{Duration, Instant},
       6              : };
       7              : 
       8              : use control_plane::attachment_service::{
       9              :     AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, NodeAvailability,
      10              :     NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, TenantCreateResponse,
      11              :     TenantCreateResponseShard, TenantLocateResponse, TenantLocateResponseShard,
      12              :     TenantShardMigrateRequest, TenantShardMigrateResponse,
      13              : };
      14              : use diesel::result::DatabaseErrorKind;
      15              : use futures::StreamExt;
      16              : use hyper::StatusCode;
      17              : use pageserver_api::{
      18              :     control_api::{
      19              :         ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
      20              :         ValidateResponse, ValidateResponseTenant,
      21              :     },
      22              :     models,
      23              :     models::{
      24              :         LocationConfig, LocationConfigMode, ShardParameters, TenantConfig, TenantCreateRequest,
      25              :         TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation,
      26              :         TimelineCreateRequest, TimelineInfo,
      27              :     },
      28              :     shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
      29              : };
      30              : use pageserver_client::mgmt_api;
      31              : use tokio_util::sync::CancellationToken;
      32              : use utils::{
      33              :     completion::Barrier,
      34              :     generation::Generation,
      35              :     http::error::ApiError,
      36              :     id::{NodeId, TenantId, TimelineId},
      37              :     seqwait::SeqWait,
      38              : };
      39              : 
      40              : use crate::{
      41              :     compute_hook::{self, ComputeHook},
      42              :     node::Node,
      43              :     persistence::{DatabaseError, NodePersistence, Persistence, TenantShardPersistence},
      44              :     scheduler::Scheduler,
      45              :     tenant_state::{
      46              :         IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
      47              :         ReconcilerWaiter, TenantState,
      48              :     },
      49              :     PlacementPolicy, Sequence,
      50              : };
      51              : 
      52              : const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
      53              : 
      54              : /// How long [`Service::startup_reconcile`] is allowed to take before it should give
      55              : /// up on unresponsive pageservers and proceed.
      56              : pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
      57              : 
      58              : // Top level state available to all HTTP handlers
      59              : struct ServiceState {
      60              :     tenants: BTreeMap<TenantShardId, TenantState>,
      61              : 
      62              :     nodes: Arc<HashMap<NodeId, Node>>,
      63              : 
      64              :     compute_hook: Arc<ComputeHook>,
      65              : 
      66              :     result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
      67              : }
      68              : 
      69              : impl ServiceState {
      70          361 :     fn new(
      71          361 :         config: Config,
      72          361 :         result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
      73          361 :         nodes: HashMap<NodeId, Node>,
      74          361 :         tenants: BTreeMap<TenantShardId, TenantState>,
      75          361 :     ) -> Self {
      76          361 :         Self {
      77          361 :             tenants,
      78          361 :             nodes: Arc::new(nodes),
      79          361 :             compute_hook: Arc::new(ComputeHook::new(config)),
      80          361 :             result_tx,
      81          361 :         }
      82          361 :     }
      83              : }
      84              : 
      85          854 : #[derive(Clone)]
      86              : pub struct Config {
      87              :     // All pageservers managed by one instance of this service must have
      88              :     // the same public key.  This JWT token will be used to authenticate
      89              :     // this service to the pageservers it manages.
      90              :     pub jwt_token: Option<String>,
      91              : 
      92              :     // This JWT token will be used to authenticate this service to the control plane.
      93              :     pub control_plane_jwt_token: Option<String>,
      94              : 
      95              :     /// Where the compute hook should send notifications of pageserver attachment locations
      96              :     /// (this URL points to the control plane in prod). If this is None, the compute hook will
      97              :     /// assume it is running in a test environment and try to update neon_local.
      98              :     pub compute_hook_url: Option<String>,
      99              : }
     100              : 
     101              : impl From<DatabaseError> for ApiError {
     102            0 :     fn from(err: DatabaseError) -> ApiError {
     103            0 :         match err {
     104            0 :             DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
     105              :             // FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
     106            0 :             DatabaseError::Connection(_e) => ApiError::ShuttingDown,
     107            0 :             DatabaseError::Logical(reason) => {
     108            0 :                 ApiError::InternalServerError(anyhow::anyhow!(reason))
     109              :             }
     110              :         }
     111            0 :     }
     112              : }
     113              : 
     114              : pub struct Service {
     115              :     inner: Arc<std::sync::RwLock<ServiceState>>,
     116              :     config: Config,
     117              :     persistence: Arc<Persistence>,
     118              : 
     119              :     /// This waits for initial reconciliation with pageservers to complete.  Until this barrier
     120              :     /// passes, it isn't safe to do any actions that mutate tenants.
     121              :     pub(crate) startup_complete: Barrier,
     122              : }
     123              : 
     124              : impl From<ReconcileWaitError> for ApiError {
     125            1 :     fn from(value: ReconcileWaitError) -> Self {
     126            1 :         match value {
     127            0 :             ReconcileWaitError::Shutdown => ApiError::ShuttingDown,
     128            0 :             e @ ReconcileWaitError::Timeout(_) => ApiError::Timeout(format!("{e}").into()),
     129            1 :             e @ ReconcileWaitError::Failed(..) => ApiError::InternalServerError(anyhow::anyhow!(e)),
     130              :         }
     131            1 :     }
     132              : }
     133              : 
     134              : impl Service {
     135            5 :     pub fn get_config(&self) -> &Config {
     136            5 :         &self.config
     137            5 :     }
     138              : 
     139              :     /// TODO: don't allow other API calls until this is done, don't start doing any background housekeeping
     140              :     /// until this is done.
     141          361 :     async fn startup_reconcile(&self) {
     142          361 :         // For all tenant shards, a vector of observed states on nodes (where None means
     143          361 :         // indeterminate, same as in [`ObservedStateLocation`])
     144          361 :         let mut observed = HashMap::new();
     145          361 : 
     146          361 :         let nodes = {
     147          361 :             let locked = self.inner.read().unwrap();
     148          361 :             locked.nodes.clone()
     149              :         };
     150              : 
     151              :         // TODO: issue these requests concurrently
     152          361 :         for node in nodes.values() {
     153            8 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
     154              : 
     155            8 :             tracing::info!("Scanning shards on node {}...", node.id);
     156           23 :             match client.list_location_config().await {
     157            3 :                 Err(e) => {
     158            3 :                     tracing::warn!("Could not contact pageserver {} ({e})", node.id);
     159              :                     // TODO: be more tolerant, apply a generous 5-10 second timeout with retries, in case
     160              :                     // pageserver is being restarted at the same time as we are
     161              :                 }
     162            5 :                 Ok(listing) => {
     163            5 :                     tracing::info!(
     164            5 :                         "Received {} shard statuses from pageserver {}, setting it to Active",
     165            5 :                         listing.tenant_shards.len(),
     166            5 :                         node.id
     167            5 :                     );
     168              : 
     169            9 :                     for (tenant_shard_id, conf_opt) in listing.tenant_shards {
     170            4 :                         observed.insert(tenant_shard_id, (node.id, conf_opt));
     171            4 :                     }
     172              :                 }
     173              :             }
     174              :         }
     175              : 
     176          361 :         let mut cleanup = Vec::new();
     177          361 : 
     178          361 :         let mut compute_notifications = Vec::new();
     179              : 
     180              :         // Populate intent and observed states for all tenants, based on reported state on pageservers
     181          361 :         let shard_count = {
     182          361 :             let mut locked = self.inner.write().unwrap();
     183          365 :             for (tenant_shard_id, (node_id, observed_loc)) in observed {
     184            4 :                 let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else {
     185            0 :                     cleanup.push((tenant_shard_id, node_id));
     186            0 :                     continue;
     187              :                 };
     188              : 
     189            4 :                 tenant_state
     190            4 :                     .observed
     191            4 :                     .locations
     192            4 :                     .insert(node_id, ObservedStateLocation { conf: observed_loc });
     193              :             }
     194              : 
     195              :             // Populate each tenant's intent state
     196          361 :             let mut scheduler = Scheduler::new(&locked.tenants, &nodes);
     197          361 :             for (tenant_shard_id, tenant_state) in locked.tenants.iter_mut() {
     198            9 :                 tenant_state.intent_from_observed();
     199            9 :                 if let Err(e) = tenant_state.schedule(&mut scheduler) {
     200              :                     // Non-fatal error: we are unable to properly schedule the tenant, perhaps because
     201              :                     // not enough pageservers are available.  The tenant may well still be available
     202              :                     // to clients.
     203            0 :                     tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}");
     204              :                 } else {
     205              :                     // If we're both intending and observed to be attached at a particular node, we will
     206              :                     // emit a compute notification for this. In the case where our observed state does not
     207              :                     // yet match our intent, we will eventually reconcile, and that will emit a compute notification.
     208            9 :                     if let Some(attached_at) = tenant_state.stably_attached() {
     209            4 :                         compute_notifications.push((*tenant_shard_id, attached_at));
     210            5 :                     }
     211              :                 }
     212              :             }
     213              : 
     214          361 :             locked.tenants.len()
     215              :         };
     216              : 
     217              :         // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
     218              :         // generation_pageserver in the database.
     219              : 
     220              :         // Clean up any tenants that were found on pageservers but are not known to us.
     221          361 :         for (tenant_shard_id, node_id) in cleanup {
     222              :             // A node reported a tenant_shard_id which is unknown to us: detach it.
     223            0 :             let node = nodes
     224            0 :                 .get(&node_id)
     225            0 :                 .expect("Always exists: only known nodes are scanned");
     226            0 : 
     227            0 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
     228            0 :             match client
     229            0 :                 .location_config(
     230            0 :                     tenant_shard_id,
     231            0 :                     LocationConfig {
     232            0 :                         mode: LocationConfigMode::Detached,
     233            0 :                         generation: None,
     234            0 :                         secondary_conf: None,
     235            0 :                         shard_number: tenant_shard_id.shard_number.0,
     236            0 :                         shard_count: tenant_shard_id.shard_count.0,
     237            0 :                         shard_stripe_size: 0,
     238            0 :                         tenant_conf: models::TenantConfig::default(),
     239            0 :                     },
     240            0 :                     None,
     241            0 :                 )
     242            0 :                 .await
     243              :             {
     244              :                 Ok(()) => {
     245            0 :                     tracing::info!(
     246            0 :                         "Detached unknown shard {tenant_shard_id} on pageserver {node_id}"
     247            0 :                     );
     248              :                 }
     249            0 :                 Err(e) => {
     250              :                     // Non-fatal error: leaving a tenant shard behind that we are not managing shouldn't
     251              :                     // break anything.
     252            0 :                     tracing::error!(
     253            0 :                         "Failed to detach unknkown shard {tenant_shard_id} on pageserver {node_id}: {e}"
     254            0 :                     );
     255              :                 }
     256              :             }
     257              :         }
     258              : 
     259              :         // Emit compute hook notifications for all tenants which are already stably attached.  Other tenants
     260              :         // will emit compute hook notifications when they reconcile.
     261              :         //
     262              :         // Ordering: we must complete these notification attempts before doing any other reconciliation for the
     263              :         // tenants named here, because otherwise our calls to notify() might race with more recent values
     264              :         // generated by reconciliation.
     265              : 
     266              :         // Compute notify is fallible.  If it fails here, do not delay overall startup: set the
     267              :         // flag on these shards that they have a pending notification.
     268          361 :         let compute_hook = self.inner.read().unwrap().compute_hook.clone();
     269          361 : 
     270          361 :         // Construct an async stream of futures to invoke the compute notify function: we do this
     271          361 :         // in order to subsequently use .buffered() on the stream to execute with bounded parallelism.
     272          361 :         let stream = futures::stream::iter(compute_notifications.into_iter())
     273          361 :             .map(|(tenant_shard_id, node_id)| {
     274            4 :                 let compute_hook = compute_hook.clone();
     275            4 :                 async move {
     276            4 :                     // TODO: give Service a cancellation token for clean shutdown
     277            4 :                     let cancel = CancellationToken::new();
     278            4 :                     if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
     279            0 :                         tracing::error!(
     280            0 :                             tenant_shard_id=%tenant_shard_id,
     281            0 :                             node_id=%node_id,
     282            0 :                             "Failed to notify compute on startup for shard: {e}"
     283            0 :                         );
     284            0 :                         Some(tenant_shard_id)
     285              :                     } else {
     286            4 :                         None
     287              :                     }
     288            4 :                 }
     289          361 :             })
     290          361 :             .buffered(compute_hook::API_CONCURRENCY);
     291          361 :         let notify_results = stream.collect::<Vec<_>>().await;
     292              : 
     293              :         // Update tenant state for any that failed to do their initial compute notify, so that they'll retry later.
     294              :         {
     295          361 :             let mut locked = self.inner.write().unwrap();
     296          361 :             for tenant_shard_id in notify_results.into_iter().flatten() {
     297            0 :                 if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
     298            0 :                     shard.pending_compute_notification = true;
     299            0 :                 }
     300              :             }
     301              :         }
     302              : 
     303              :         // Finally, now that the service is up and running, launch reconcile operations for any tenants
     304              :         // which require it: under normal circumstances this should only include tenants that were in some
     305              :         // transient state before we restarted, or any tenants whose compute hooks failed above.
     306          361 :         let reconcile_tasks = self.reconcile_all();
     307              :         // We will not wait for these reconciliation tasks to run here: we're now done with startup and
     308              :         // normal operations may proceed.
     309              : 
     310          361 :         tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
     311          361 :     }
     312              : 
     313          361 :     pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
     314          361 :         let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel();
     315              : 
     316          361 :         tracing::info!("Loading nodes from database...");
     317          361 :         let nodes = persistence.list_nodes().await?;
     318          361 :         let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.id, n)).collect();
     319          361 :         tracing::info!("Loaded {} nodes from database.", nodes.len());
     320              : 
     321          361 :         tracing::info!("Loading shards from database...");
     322          716 :         let tenant_shard_persistence = persistence.list_tenant_shards().await?;
     323          361 :         tracing::info!(
     324          361 :             "Loaded {} shards from database.",
     325          361 :             tenant_shard_persistence.len()
     326          361 :         );
     327              : 
     328          361 :         let mut tenants = BTreeMap::new();
     329              : 
     330          370 :         for tsp in tenant_shard_persistence {
     331            9 :             let tenant_shard_id = TenantShardId {
     332            9 :                 tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
     333            9 :                 shard_number: ShardNumber(tsp.shard_number as u8),
     334            9 :                 shard_count: ShardCount(tsp.shard_count as u8),
     335              :             };
     336            9 :             let shard_identity = if tsp.shard_count == 0 {
     337            9 :                 ShardIdentity::unsharded()
     338              :             } else {
     339            0 :                 ShardIdentity::new(
     340            0 :                     ShardNumber(tsp.shard_number as u8),
     341            0 :                     ShardCount(tsp.shard_count as u8),
     342            0 :                     ShardStripeSize(tsp.shard_stripe_size as u32),
     343            0 :                 )?
     344              :             };
     345              : 
     346              :             // We will populate intent properly later in [`Self::startup_reconcile`], initially populate
     347              :             // it with what we can infer: the node for which a generation was most recently issued.
     348            9 :             let mut intent = IntentState::new();
     349            9 :             if tsp.generation_pageserver != i64::MAX {
     350            8 :                 intent.attached = Some(NodeId(tsp.generation_pageserver as u64))
     351            1 :             }
     352              : 
     353            9 :             let new_tenant = TenantState {
     354            9 :                 tenant_shard_id,
     355            9 :                 shard: shard_identity,
     356            9 :                 sequence: Sequence::initial(),
     357            9 :                 generation: Generation::new(tsp.generation as u32),
     358            9 :                 policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
     359            9 :                 intent,
     360            9 :                 observed: ObservedState::new(),
     361            9 :                 config: serde_json::from_str(&tsp.config).unwrap(),
     362            9 :                 reconciler: None,
     363            9 :                 waiter: Arc::new(SeqWait::new(Sequence::initial())),
     364            9 :                 error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
     365            9 :                 last_error: Arc::default(),
     366            9 :                 pending_compute_notification: false,
     367            9 :             };
     368            9 : 
     369            9 :             tenants.insert(tenant_shard_id, new_tenant);
     370              :         }
     371              : 
     372          361 :         let (startup_completion, startup_complete) = utils::completion::channel();
     373          361 : 
     374          361 :         let this = Arc::new(Self {
     375          361 :             inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
     376          361 :                 config.clone(),
     377          361 :                 result_tx,
     378          361 :                 nodes,
     379          361 :                 tenants,
     380          361 :             ))),
     381          361 :             config,
     382          361 :             persistence,
     383          361 :             startup_complete,
     384          361 :         });
     385          361 : 
     386          361 :         let result_task_this = this.clone();
     387          361 :         tokio::task::spawn(async move {
     388          852 :             while let Some(result) = result_rx.recv().await {
     389          491 :                 tracing::info!(
     390          491 :                     "Reconcile result for sequence {}, ok={}",
     391          491 :                     result.sequence,
     392          491 :                     result.result.is_ok()
     393          491 :                 );
     394          491 :                 let mut locked = result_task_this.inner.write().unwrap();
     395          491 :                 let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
     396              :                     // A reconciliation result might race with removing a tenant: drop results for
     397              :                     // tenants that aren't in our map.
     398            0 :                     continue;
     399              :                 };
     400              : 
     401              :                 // Usually generation should only be updated via this path, so the max() isn't
     402              :                 // needed, but it is used to handle out-of-band updates via. e.g. test hook.
     403          491 :                 tenant.generation = std::cmp::max(tenant.generation, result.generation);
     404          491 : 
     405          491 :                 // If the reconciler signals that it failed to notify compute, set this state on
     406          491 :                 // the shard so that a future [`TenantState::maybe_reconcile`] will try again.
     407          491 :                 tenant.pending_compute_notification = result.pending_compute_notification;
     408          491 : 
     409          491 :                 match result.result {
     410              :                     Ok(()) => {
     411          981 :                         for (node_id, loc) in &result.observed.locations {
     412          493 :                             if let Some(conf) = &loc.conf {
     413          493 :                                 tracing::info!(
     414          493 :                                     "Updating observed location {}: {:?}",
     415          493 :                                     node_id,
     416          493 :                                     conf
     417          493 :                                 );
     418              :                             } else {
     419            0 :                                 tracing::info!("Setting observed location {} to None", node_id,)
     420              :                             }
     421              :                         }
     422          488 :                         tenant.observed = result.observed;
     423          488 :                         tenant.waiter.advance(result.sequence);
     424              :                     }
     425            3 :                     Err(e) => {
     426            3 :                         tracing::warn!(
     427            3 :                             "Reconcile error on tenant {}: {}",
     428            3 :                             tenant.tenant_shard_id,
     429            3 :                             e
     430            3 :                         );
     431              : 
     432              :                         // Ordering: populate last_error before advancing error_seq,
     433              :                         // so that waiters will see the correct error after waiting.
     434            3 :                         *(tenant.last_error.lock().unwrap()) = format!("{e}");
     435            3 :                         tenant.error_waiter.advance(result.sequence);
     436              : 
     437            6 :                         for (node_id, o) in result.observed.locations {
     438            3 :                             tenant.observed.locations.insert(node_id, o);
     439            3 :                         }
     440              :                     }
     441              :                 }
     442              :             }
     443          361 :         });
     444          361 : 
     445          361 :         let startup_reconcile_this = this.clone();
     446          361 :         tokio::task::spawn(async move {
     447          361 :             // Block the [`Service::startup_complete`] barrier until we're done
     448          361 :             let _completion = startup_completion;
     449          361 : 
     450          361 :             startup_reconcile_this.startup_reconcile().await
     451          361 :         });
     452          361 : 
     453          361 :         Ok(this)
     454          361 :     }
     455              : 
     456          212 :     pub(crate) async fn attach_hook(
     457          212 :         &self,
     458          212 :         attach_req: AttachHookRequest,
     459          212 :     ) -> anyhow::Result<AttachHookResponse> {
     460          212 :         // This is a test hook.  To enable using it on tenants that were created directly with
     461          212 :         // the pageserver API (not via this service), we will auto-create any missing tenant
     462          212 :         // shards with default state.
     463          212 :         let insert = {
     464          212 :             let locked = self.inner.write().unwrap();
     465          212 :             !locked.tenants.contains_key(&attach_req.tenant_shard_id)
     466          212 :         };
     467          212 :         if insert {
     468           13 :             let tsp = TenantShardPersistence {
     469           13 :                 tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
     470           13 :                 shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
     471           13 :                 shard_count: attach_req.tenant_shard_id.shard_count.0 as i32,
     472           13 :                 shard_stripe_size: 0,
     473           13 :                 generation: 0,
     474           13 :                 generation_pageserver: i64::MAX,
     475           13 :                 placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
     476           13 :                 config: serde_json::to_string(&TenantConfig::default()).unwrap(),
     477           13 :             };
     478           13 : 
     479           13 :             match self.persistence.insert_tenant_shards(vec![tsp]).await {
     480            0 :                 Err(e) => match e {
     481              :                     DatabaseError::Query(diesel::result::Error::DatabaseError(
     482              :                         DatabaseErrorKind::UniqueViolation,
     483              :                         _,
     484              :                     )) => {
     485            0 :                         tracing::info!(
     486            0 :                             "Raced with another request to insert tenant {}",
     487            0 :                             attach_req.tenant_shard_id
     488            0 :                         )
     489              :                     }
     490            0 :                     _ => return Err(e.into()),
     491              :                 },
     492              :                 Ok(()) => {
     493           13 :                     tracing::info!("Inserted shard {} in database", attach_req.tenant_shard_id);
     494              : 
     495           13 :                     let mut locked = self.inner.write().unwrap();
     496           13 :                     locked.tenants.insert(
     497           13 :                         attach_req.tenant_shard_id,
     498           13 :                         TenantState::new(
     499           13 :                             attach_req.tenant_shard_id,
     500           13 :                             ShardIdentity::unsharded(),
     501           13 :                             PlacementPolicy::Single,
     502           13 :                         ),
     503           13 :                     );
     504           13 :                     tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
     505              :                 }
     506              :             }
     507          199 :         }
     508              : 
     509          212 :         let new_generation = if let Some(req_node_id) = attach_req.node_id {
     510              :             Some(
     511          205 :                 self.persistence
     512          205 :                     .increment_generation(attach_req.tenant_shard_id, req_node_id)
     513          216 :                     .await?,
     514              :             )
     515              :         } else {
     516            7 :             self.persistence.detach(attach_req.tenant_shard_id).await?;
     517            7 :             None
     518              :         };
     519              : 
     520          212 :         let mut locked = self.inner.write().unwrap();
     521          212 :         let tenant_state = locked
     522          212 :             .tenants
     523          212 :             .get_mut(&attach_req.tenant_shard_id)
     524          212 :             .expect("Checked for existence above");
     525              : 
     526          212 :         if let Some(new_generation) = new_generation {
     527          205 :             tenant_state.generation = new_generation;
     528          205 :         } else {
     529              :             // This is a detach notification.  We must update placement policy to avoid re-attaching
     530              :             // during background scheduling/reconciliation, or during attachment service restart.
     531            7 :             assert!(attach_req.node_id.is_none());
     532            7 :             tenant_state.policy = PlacementPolicy::Detached;
     533              :         }
     534              : 
     535          212 :         if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
     536          205 :             tracing::info!(
     537          205 :                 tenant_id = %attach_req.tenant_shard_id,
     538          205 :                 ps_id = %attaching_pageserver,
     539          205 :                 generation = ?tenant_state.generation,
     540          205 :                 "issuing",
     541          205 :             );
     542            7 :         } else if let Some(ps_id) = tenant_state.intent.attached {
     543            7 :             tracing::info!(
     544            7 :                 tenant_id = %attach_req.tenant_shard_id,
     545            7 :                 %ps_id,
     546            7 :                 generation = ?tenant_state.generation,
     547            7 :                 "dropping",
     548            7 :             );
     549              :         } else {
     550            0 :             tracing::info!(
     551            0 :             tenant_id = %attach_req.tenant_shard_id,
     552            0 :             "no-op: tenant already has no pageserver");
     553              :         }
     554          212 :         tenant_state.intent.attached = attach_req.node_id;
     555              : 
     556          212 :         tracing::info!(
     557          212 :             "attach_hook: tenant {} set generation {:?}, pageserver {}",
     558          212 :             attach_req.tenant_shard_id,
     559          212 :             tenant_state.generation,
     560          212 :             // TODO: this is an odd number of 0xf's
     561          212 :             attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
     562          212 :         );
     563              : 
     564          212 :         Ok(AttachHookResponse {
     565          212 :             gen: attach_req
     566          212 :                 .node_id
     567          212 :                 .map(|_| tenant_state.generation.into().unwrap()),
     568          212 :         })
     569          212 :     }
     570              : 
     571           72 :     pub(crate) fn inspect(&self, inspect_req: InspectRequest) -> InspectResponse {
     572           72 :         let locked = self.inner.read().unwrap();
     573           72 : 
     574           72 :         let tenant_state = locked.tenants.get(&inspect_req.tenant_shard_id);
     575           72 : 
     576           72 :         InspectResponse {
     577           72 :             attachment: tenant_state.and_then(|s| {
     578           72 :                 s.intent
     579           72 :                     .attached
     580           72 :                     .map(|ps| (s.generation.into().unwrap(), ps))
     581           72 :             }),
     582           72 :         }
     583           72 :     }
     584              : 
     585          603 :     pub(crate) async fn re_attach(
     586          603 :         &self,
     587          603 :         reattach_req: ReAttachRequest,
     588          603 :     ) -> anyhow::Result<ReAttachResponse> {
     589              :         // Ordering: we must persist generation number updates before making them visible in the in-memory state
     590          603 :         let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;
     591              : 
     592              :         // Apply the updated generation to our in-memory state
     593          603 :         let mut locked = self.inner.write().unwrap();
     594          603 : 
     595          603 :         let mut response = ReAttachResponse {
     596          603 :             tenants: Vec::new(),
     597          603 :         };
     598              : 
     599          830 :         for (tenant_shard_id, new_gen) in incremented_generations {
     600          227 :             response.tenants.push(ReAttachResponseTenant {
     601          227 :                 id: tenant_shard_id,
     602          227 :                 gen: new_gen.into().unwrap(),
     603          227 :             });
     604          227 : 
     605          227 :             // Apply the new generation number to our in-memory state
     606          227 :             let shard_state = locked.tenants.get_mut(&tenant_shard_id);
     607          227 :             let Some(shard_state) = shard_state else {
     608              :                 // Not fatal.  This edge case requires a re-attach to happen
     609              :                 // between inserting a new tenant shard in to the database, and updating our in-memory
     610              :                 // state to know about the shard, _and_ that the state inserted to the database referenced
     611              :                 // a pageserver.  Should never happen, but handle it rather than panicking, since it should
     612              :                 // be harmless.
     613            0 :                 tracing::error!(
     614            0 :                     "Shard {} is in database for node {} but not in-memory state",
     615            0 :                     tenant_shard_id,
     616            0 :                     reattach_req.node_id
     617            0 :                 );
     618            0 :                 continue;
     619              :             };
     620              : 
     621          227 :             shard_state.generation = std::cmp::max(shard_state.generation, new_gen);
     622              : 
     623              :             // TODO: cancel/restart any running reconciliation for this tenant, it might be trying
     624              :             // to call location_conf API with an old generation.  Wait for cancellation to complete
     625              :             // before responding to this request.  Requires well implemented CancellationToken logic
     626              :             // all the way to where we call location_conf.  Even then, there can still be a location_conf
     627              :             // request in flight over the network: TODO handle that by making location_conf API refuse
     628              :             // to go backward in generations.
     629              :         }
     630          603 :         Ok(response)
     631          603 :     }
     632              : 
     633          411 :     pub(crate) fn validate(&self, validate_req: ValidateRequest) -> ValidateResponse {
     634          411 :         let locked = self.inner.read().unwrap();
     635          411 : 
     636          411 :         let mut response = ValidateResponse {
     637          411 :             tenants: Vec::new(),
     638          411 :         };
     639              : 
     640          909 :         for req_tenant in validate_req.tenants {
     641          498 :             if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) {
     642          497 :                 let valid = tenant_state.generation == Generation::new(req_tenant.gen);
     643          497 :                 tracing::info!(
     644          497 :                     "handle_validate: {}(gen {}): valid={valid} (latest {:?})",
     645          497 :                     req_tenant.id,
     646          497 :                     req_tenant.gen,
     647          497 :                     tenant_state.generation
     648          497 :                 );
     649          497 :                 response.tenants.push(ValidateResponseTenant {
     650          497 :                     id: req_tenant.id,
     651          497 :                     valid,
     652          497 :                 });
     653            1 :             } else {
     654            1 :                 // After tenant deletion, we may approve any validation.  This avoids
     655            1 :                 // spurious warnings on the pageserver if it has pending LSN updates
     656            1 :                 // at the point a deletion happens.
     657            1 :                 response.tenants.push(ValidateResponseTenant {
     658            1 :                     id: req_tenant.id,
     659            1 :                     valid: true,
     660            1 :                 });
     661            1 :             }
     662              :         }
     663          411 :         response
     664          411 :     }
     665              : 
     666          460 :     pub(crate) async fn tenant_create(
     667          460 :         &self,
     668          460 :         create_req: TenantCreateRequest,
     669          460 :     ) -> Result<TenantCreateResponse, ApiError> {
     670              :         // Shard count 0 is valid: it means create a single shard (ShardCount(0) means "unsharded")
     671          460 :         let literal_shard_count = if create_req.shard_parameters.is_unsharded() {
     672          444 :             1
     673              :         } else {
     674           16 :             create_req.shard_parameters.count.0
     675              :         };
     676              : 
     677              :         // This service expects to handle sharding itself: it is an error to try and directly create
     678              :         // a particular shard here.
     679          460 :         let tenant_id = if create_req.new_tenant_id.shard_count > ShardCount(1) {
     680            0 :             return Err(ApiError::BadRequest(anyhow::anyhow!(
     681            0 :                 "Attempted to create a specific shard, this API is for creating the whole tenant"
     682            0 :             )));
     683              :         } else {
     684          460 :             create_req.new_tenant_id.tenant_id
     685              :         };
     686              : 
     687          460 :         tracing::info!(
     688          460 :             "Creating tenant {}, shard_count={:?}",
     689          460 :             create_req.new_tenant_id,
     690          460 :             create_req.shard_parameters.count,
     691          460 :         );
     692              : 
     693          460 :         let create_ids = (0..literal_shard_count)
     694          484 :             .map(|i| TenantShardId {
     695          484 :                 tenant_id,
     696          484 :                 shard_number: ShardNumber(i),
     697          484 :                 shard_count: create_req.shard_parameters.count,
     698          484 :             })
     699          460 :             .collect::<Vec<_>>();
     700          460 : 
     701          460 :         // TODO: enable specifying this.  Using Single as a default helps legacy tests to work (they
     702          460 :         // have no expectation of HA).
     703          460 :         let placement_policy: PlacementPolicy = PlacementPolicy::Single;
     704          460 : 
     705          460 :         // Ordering: we persist tenant shards before creating them on the pageserver.  This enables a caller
     706          460 :         // to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
     707          460 :         // during the creation, rather than risking leaving orphan objects in S3.
     708          460 :         let persist_tenant_shards = create_ids
     709          460 :             .iter()
     710          484 :             .map(|tenant_shard_id| TenantShardPersistence {
     711          484 :                 tenant_id: tenant_shard_id.tenant_id.to_string(),
     712          484 :                 shard_number: tenant_shard_id.shard_number.0 as i32,
     713          484 :                 shard_count: tenant_shard_id.shard_count.0 as i32,
     714          484 :                 shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32,
     715          484 :                 generation: create_req.generation.map(|g| g as i32).unwrap_or(0),
     716          484 :                 generation_pageserver: i64::MAX,
     717          484 :                 placement_policy: serde_json::to_string(&placement_policy).unwrap(),
     718          484 :                 config: serde_json::to_string(&create_req.config).unwrap(),
     719          484 :             })
     720          460 :             .collect();
     721          460 :         self.persistence
     722          460 :             .insert_tenant_shards(persist_tenant_shards)
     723          460 :             .await
     724          460 :             .map_err(|e| {
     725            0 :                 // TODO: distinguish primary key constraint (idempotent, OK), from other errors
     726            0 :                 ApiError::InternalServerError(anyhow::anyhow!(e))
     727          460 :             })?;
     728              : 
     729          460 :         let (waiters, response_shards) = {
     730          460 :             let mut locked = self.inner.write().unwrap();
     731          460 : 
     732          460 :             let mut response_shards = Vec::new();
     733          460 : 
     734          460 :             let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
     735              : 
     736          944 :             for tenant_shard_id in create_ids {
     737          484 :                 tracing::info!("Creating shard {tenant_shard_id}...");
     738              : 
     739              :                 use std::collections::btree_map::Entry;
     740          484 :                 match locked.tenants.entry(tenant_shard_id) {
     741            0 :                     Entry::Occupied(mut entry) => {
     742            0 :                         tracing::info!(
     743            0 :                             "Tenant shard {tenant_shard_id} already exists while creating"
     744            0 :                         );
     745              : 
     746              :                         // TODO: schedule() should take an anti-affinity expression that pushes
     747              :                         // attached and secondary locations (independently) away frorm those
     748              :                         // pageservers also holding a shard for this tenant.
     749              : 
     750            0 :                         entry.get_mut().schedule(&mut scheduler).map_err(|e| {
     751            0 :                             ApiError::Conflict(format!(
     752            0 :                                 "Failed to schedule shard {tenant_shard_id}: {e}"
     753            0 :                             ))
     754            0 :                         })?;
     755              : 
     756            0 :                         response_shards.push(TenantCreateResponseShard {
     757            0 :                             shard_id: tenant_shard_id,
     758            0 :                             node_id: entry
     759            0 :                                 .get()
     760            0 :                                 .intent
     761            0 :                                 .attached
     762            0 :                                 .expect("We just set pageserver if it was None"),
     763            0 :                             generation: entry.get().generation.into().unwrap(),
     764            0 :                         });
     765            0 : 
     766            0 :                         continue;
     767              :                     }
     768          484 :                     Entry::Vacant(entry) => {
     769          484 :                         let mut state = TenantState::new(
     770          484 :                             tenant_shard_id,
     771          484 :                             ShardIdentity::from_params(
     772          484 :                                 tenant_shard_id.shard_number,
     773          484 :                                 &create_req.shard_parameters,
     774          484 :                             ),
     775          484 :                             placement_policy.clone(),
     776          484 :                         );
     777              : 
     778          484 :                         if let Some(create_gen) = create_req.generation {
     779            1 :                             state.generation = Generation::new(create_gen);
     780          483 :                         }
     781          484 :                         state.config = create_req.config.clone();
     782          484 : 
     783          484 :                         state.schedule(&mut scheduler).map_err(|e| {
     784            0 :                             ApiError::Conflict(format!(
     785            0 :                                 "Failed to schedule shard {tenant_shard_id}: {e}"
     786            0 :                             ))
     787          484 :                         })?;
     788              : 
     789          484 :                         response_shards.push(TenantCreateResponseShard {
     790          484 :                             shard_id: tenant_shard_id,
     791          484 :                             node_id: state
     792          484 :                                 .intent
     793          484 :                                 .attached
     794          484 :                                 .expect("We just set pageserver if it was None"),
     795          484 :                             generation: state.generation.into().unwrap(),
     796          484 :                         });
     797          484 :                         entry.insert(state)
     798              :                     }
     799              :                 };
     800              :             }
     801              : 
     802              :             // Take a snapshot of pageservers
     803          460 :             let pageservers = locked.nodes.clone();
     804          460 : 
     805          460 :             let result_tx = locked.result_tx.clone();
     806          460 :             let compute_hook = locked.compute_hook.clone();
     807          460 : 
     808          460 :             let waiters = locked
     809          460 :                 .tenants
     810          460 :                 .range_mut(TenantShardId::tenant_range(tenant_id))
     811          484 :                 .filter_map(|(_shard_id, shard)| {
     812          484 :                     shard.maybe_reconcile(
     813          484 :                         result_tx.clone(),
     814          484 :                         &pageservers,
     815          484 :                         &compute_hook,
     816          484 :                         &self.config,
     817          484 :                         &self.persistence,
     818          484 :                     )
     819          484 :                 })
     820          460 :                 .collect::<Vec<_>>();
     821          460 :             (waiters, response_shards)
     822          460 :         };
     823          460 : 
     824          469 :         self.await_waiters(waiters).await?;
     825              : 
     826          459 :         Ok(TenantCreateResponse {
     827          459 :             shards: response_shards,
     828          459 :         })
     829          460 :     }
     830              : 
     831              :     /// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
     832              :     /// wait for reconciliation to complete before responding.
     833          461 :     async fn await_waiters(
     834          461 :         &self,
     835          461 :         waiters: Vec<ReconcilerWaiter>,
     836          461 :     ) -> Result<(), ReconcileWaitError> {
     837          461 :         let deadline = Instant::now().checked_add(Duration::from_secs(30)).unwrap();
     838          944 :         for waiter in waiters {
     839          484 :             let timeout = deadline.duration_since(Instant::now());
     840          484 :             waiter.wait_timeout(timeout).await?;
     841              :         }
     842              : 
     843          460 :         Ok(())
     844          461 :     }
     845              : 
     846              :     /// This API is used by the cloud control plane to do coarse-grained control of tenants:
     847              :     /// - Call with mode Attached* to upsert the tenant.
     848              :     /// - Call with mode Detached to switch to PolicyMode::Detached
     849              :     ///
     850              :     /// In future, calling with mode Secondary may switch to a detach-lite mode in which a tenant only has
     851              :     /// secondary locations.
     852            2 :     pub(crate) async fn tenant_location_config(
     853            2 :         &self,
     854            2 :         tenant_id: TenantId,
     855            2 :         req: TenantLocationConfigRequest,
     856            2 :     ) -> Result<TenantLocationConfigResponse, ApiError> {
     857            2 :         if req.tenant_id.shard_count.0 > 1 {
     858            0 :             return Err(ApiError::BadRequest(anyhow::anyhow!(
     859            0 :                 "This API is for importing single-sharded or unsharded tenants"
     860            0 :             )));
     861            2 :         }
     862            2 : 
     863            2 :         let mut waiters = Vec::new();
     864            2 :         let mut result = TenantLocationConfigResponse { shards: Vec::new() };
     865            2 :         let maybe_create = {
     866            2 :             let mut locked = self.inner.write().unwrap();
     867            2 :             let result_tx = locked.result_tx.clone();
     868            2 :             let compute_hook = locked.compute_hook.clone();
     869            2 :             let pageservers = locked.nodes.clone();
     870            2 : 
     871            2 :             let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
     872            2 : 
     873            2 :             // Maybe we have existing shards
     874            2 :             let mut create = true;
     875            2 :             for (shard_id, shard) in locked
     876            2 :                 .tenants
     877            2 :                 .range_mut(TenantShardId::tenant_range(tenant_id))
     878              :             {
     879              :                 // Saw an existing shard: this is not a creation
     880            1 :                 create = false;
     881            1 : 
     882            1 :                 // Note that for existing tenants we do _not_ respect the generation in the request: this is likely
     883            1 :                 // to be stale.  Once a tenant is created in this service, our view of generation is authoritative, and
     884            1 :                 // callers' generations may be ignored.  This represents a one-way migration of tenants from the outer
     885            1 :                 // cloud control plane into this service.
     886            1 : 
     887            1 :                 // Use location config mode as an indicator of policy: if they ask for
     888            1 :                 // attached we go to default HA attached mode.  If they ask for secondary
     889            1 :                 // we go to secondary-only mode.  If they ask for detached we detach.
     890            1 :                 match req.config.mode {
     891            0 :                     LocationConfigMode::Detached => {
     892            0 :                         shard.policy = PlacementPolicy::Detached;
     893            0 :                     }
     894              :                     LocationConfigMode::Secondary => {
     895              :                         // TODO: implement secondary-only mode.
     896            0 :                         todo!();
     897              :                     }
     898              :                     LocationConfigMode::AttachedMulti
     899              :                     | LocationConfigMode::AttachedSingle
     900              :                     | LocationConfigMode::AttachedStale => {
     901              :                         // TODO: persistence for changes in policy
     902            1 :                         if pageservers.len() > 1 {
     903            0 :                             shard.policy = PlacementPolicy::Double(1)
     904              :                         } else {
     905              :                             // Convenience for dev/test: if we just have one pageserver, import
     906              :                             // tenants into Single mode so that scheduling will succeed.
     907            1 :                             shard.policy = PlacementPolicy::Single
     908              :                         }
     909              :                     }
     910              :                 }
     911              : 
     912            1 :                 shard.schedule(&mut scheduler)?;
     913              : 
     914            1 :                 let maybe_waiter = shard.maybe_reconcile(
     915            1 :                     result_tx.clone(),
     916            1 :                     &pageservers,
     917            1 :                     &compute_hook,
     918            1 :                     &self.config,
     919            1 :                     &self.persistence,
     920            1 :                 );
     921            1 :                 if let Some(waiter) = maybe_waiter {
     922            0 :                     waiters.push(waiter);
     923            1 :                 }
     924              : 
     925            1 :                 if let Some(node_id) = shard.intent.attached {
     926            1 :                     result.shards.push(TenantShardLocation {
     927            1 :                         shard_id: *shard_id,
     928            1 :                         node_id,
     929            1 :                     })
     930            0 :                 }
     931              :             }
     932              : 
     933            2 :             if create {
     934              :                 // Validate request mode
     935            1 :                 match req.config.mode {
     936              :                     LocationConfigMode::Detached | LocationConfigMode::Secondary => {
     937              :                         // When using this API to onboard an existing tenant to this service, it must start in
     938              :                         // an attached state, because we need the request to come with a generation
     939            0 :                         return Err(ApiError::BadRequest(anyhow::anyhow!(
     940            0 :                             "Imported tenant must be in attached mode"
     941            0 :                         )));
     942              :                     }
     943              : 
     944              :                     LocationConfigMode::AttachedMulti
     945              :                     | LocationConfigMode::AttachedSingle
     946            1 :                     | LocationConfigMode::AttachedStale => {
     947            1 :                         // Pass
     948            1 :                     }
     949              :                 }
     950              : 
     951              :                 // Validate request generation
     952            1 :                 let Some(generation) = req.config.generation else {
     953              :                     // We can only import attached tenants, because we need the request to come with a generation
     954            0 :                     return Err(ApiError::BadRequest(anyhow::anyhow!(
     955            0 :                         "Generation is mandatory when importing tenant"
     956            0 :                     )));
     957              :                 };
     958              : 
     959              :                 // Synthesize a creation request
     960            1 :                 Some(TenantCreateRequest {
     961            1 :                     new_tenant_id: TenantShardId::unsharded(tenant_id),
     962            1 :                     generation: Some(generation),
     963            1 :                     shard_parameters: ShardParameters {
     964            1 :                         // Must preserve the incoming shard_count do distinguish unsharded (0)
     965            1 :                         // from single-sharded (1): this distinction appears in the S3 keys of the tenant.
     966            1 :                         count: req.tenant_id.shard_count,
     967            1 :                         // We only import un-sharded or single-sharded tenants, so stripe
     968            1 :                         // size can be made up arbitrarily here.
     969            1 :                         stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE,
     970            1 :                     },
     971            1 :                     config: req.config.tenant_conf,
     972            1 :                 })
     973              :             } else {
     974            1 :                 None
     975              :             }
     976              :         };
     977              : 
     978            2 :         if let Some(create_req) = maybe_create {
     979            2 :             let create_resp = self.tenant_create(create_req).await?;
     980            1 :             result.shards = create_resp
     981            1 :                 .shards
     982            1 :                 .into_iter()
     983            1 :                 .map(|s| TenantShardLocation {
     984            1 :                     node_id: s.node_id,
     985            1 :                     shard_id: s.shard_id,
     986            1 :                 })
     987            1 :                 .collect();
     988              :         } else {
     989              :             // This was an update, wait for reconciliation
     990            1 :             self.await_waiters(waiters).await?;
     991              :         }
     992              : 
     993            2 :         Ok(result)
     994            2 :     }
     995              : 
     996           12 :     pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
     997              :         // TODO: refactor into helper
     998           12 :         let targets = {
     999           12 :             let locked = self.inner.read().unwrap();
    1000           12 :             let mut targets = Vec::new();
    1001              : 
    1002           24 :             for (tenant_shard_id, shard) in
    1003           12 :                 locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1004           24 :             {
    1005           24 :                 let node_id = shard.intent.attached.ok_or_else(|| {
    1006            0 :                     ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
    1007           24 :                 })?;
    1008           24 :                 let node = locked
    1009           24 :                     .nodes
    1010           24 :                     .get(&node_id)
    1011           24 :                     .expect("Pageservers may not be deleted while referenced");
    1012           24 : 
    1013           24 :                 targets.push((*tenant_shard_id, node.clone()));
    1014              :             }
    1015           12 :             targets
    1016           12 :         };
    1017           12 : 
    1018           12 :         // TODO: error out if the tenant is not attached anywhere.
    1019           12 : 
    1020           12 :         // Phase 1: delete on the pageservers
    1021           12 :         let mut any_pending = false;
    1022           36 :         for (tenant_shard_id, node) in targets {
    1023           24 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
    1024              :             // TODO: this, like many other places, requires proper retry handling for 503, timeout: those should not
    1025              :             // surface immediately as an error to our caller.
    1026           94 :             let status = client.tenant_delete(tenant_shard_id).await.map_err(|e| {
    1027            0 :                 ApiError::InternalServerError(anyhow::anyhow!(
    1028            0 :                     "Error deleting shard {tenant_shard_id} on node {}: {e}",
    1029            0 :                     node.id
    1030            0 :                 ))
    1031           24 :             })?;
    1032           24 :             tracing::info!(
    1033           24 :                 "Shard {tenant_shard_id} on node {}, delete returned {}",
    1034           24 :                 node.id,
    1035           24 :                 status
    1036           24 :             );
    1037           24 :             if status == StatusCode::ACCEPTED {
    1038           12 :                 any_pending = true;
    1039           12 :             }
    1040              :         }
    1041              : 
    1042           12 :         if any_pending {
    1043              :             // Caller should call us again later.  When we eventually see 404s from
    1044              :             // all the shards, we may proceed to delete our records of the tenant.
    1045            6 :             tracing::info!(
    1046            6 :                 "Tenant {} has some shards pending deletion, returning 202",
    1047            6 :                 tenant_id
    1048            6 :             );
    1049            6 :             return Ok(StatusCode::ACCEPTED);
    1050            6 :         }
    1051            6 : 
    1052            6 :         // Fall through: deletion of the tenant on pageservers is complete, we may proceed to drop
    1053            6 :         // our in-memory state and database state.
    1054            6 : 
    1055            6 :         // Ordering: we delete persistent state first: if we then
    1056            6 :         // crash, we will drop the in-memory state.
    1057            6 : 
    1058            6 :         // Drop persistent state.
    1059            6 :         self.persistence.delete_tenant(tenant_id).await?;
    1060              : 
    1061              :         // Drop in-memory state
    1062              :         {
    1063            6 :             let mut locked = self.inner.write().unwrap();
    1064            6 :             locked
    1065            6 :                 .tenants
    1066           42 :                 .retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
    1067            6 :             tracing::info!(
    1068            6 :                 "Deleted tenant {tenant_id}, now have {} tenants",
    1069            6 :                 locked.tenants.len()
    1070            6 :             );
    1071              :         };
    1072              : 
    1073              :         // Success is represented as 404, to imitate the existing pageserver deletion API
    1074            6 :         Ok(StatusCode::NOT_FOUND)
    1075           12 :     }
    1076              : 
    1077          797 :     pub(crate) async fn tenant_timeline_create(
    1078          797 :         &self,
    1079          797 :         tenant_id: TenantId,
    1080          797 :         mut create_req: TimelineCreateRequest,
    1081          797 :     ) -> Result<TimelineInfo, ApiError> {
    1082          797 :         let mut timeline_info = None;
    1083              : 
    1084          797 :         tracing::info!(
    1085          797 :             "Creating timeline {}/{}",
    1086          797 :             tenant_id,
    1087          797 :             create_req.new_timeline_id,
    1088          797 :         );
    1089              : 
    1090          797 :         self.ensure_attached_wait(tenant_id).await?;
    1091              : 
    1092              :         // TODO: refuse to do this if shard splitting is in progress
    1093          797 :         let targets = {
    1094          797 :             let locked = self.inner.read().unwrap();
    1095          797 :             let mut targets = Vec::new();
    1096              : 
    1097          825 :             for (tenant_shard_id, shard) in
    1098          797 :                 locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1099          825 :             {
    1100          825 :                 let node_id = shard.intent.attached.ok_or_else(|| {
    1101            0 :                     ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
    1102          825 :                 })?;
    1103          825 :                 let node = locked
    1104          825 :                     .nodes
    1105          825 :                     .get(&node_id)
    1106          825 :                     .expect("Pageservers may not be deleted while referenced");
    1107          825 : 
    1108          825 :                 targets.push((*tenant_shard_id, node.clone()));
    1109              :             }
    1110          797 :             targets
    1111          797 :         };
    1112          797 : 
    1113          797 :         if targets.is_empty() {
    1114            0 :             return Err(ApiError::NotFound(
    1115            0 :                 anyhow::anyhow!("Tenant not found").into(),
    1116            0 :             ));
    1117          797 :         }
    1118              : 
    1119         1618 :         for (tenant_shard_id, node) in targets {
    1120              :             // TODO: issue shard timeline creates in parallel, once the 0th is done.
    1121              : 
    1122          825 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
    1123              : 
    1124          825 :             tracing::info!(
    1125          825 :                 "Creating timeline on shard {}/{}, attached to node {}",
    1126          825 :                 tenant_shard_id,
    1127          825 :                 create_req.new_timeline_id,
    1128          825 :                 node.id
    1129          825 :             );
    1130              : 
    1131          825 :             let shard_timeline_info = client
    1132          825 :                 .timeline_create(tenant_shard_id, &create_req)
    1133         3204 :                 .await
    1134          825 :                 .map_err(|e| match e {
    1135            4 :                     mgmt_api::Error::ApiError(status, msg)
    1136            4 :                         if status == StatusCode::INTERNAL_SERVER_ERROR
    1137            4 :                             || status == StatusCode::NOT_ACCEPTABLE =>
    1138            4 :                     {
    1139            4 :                         // TODO: handle more error codes, e.g. 503 should be passed through.  Make a general wrapper
    1140            4 :                         // for pass-through API calls.
    1141            4 :                         ApiError::InternalServerError(anyhow::anyhow!(msg))
    1142              :                     }
    1143            0 :                     _ => ApiError::Conflict(format!("Failed to create timeline: {e}")),
    1144          825 :                 })?;
    1145              : 
    1146          821 :             if timeline_info.is_none() {
    1147              :                 // If the caller specified an ancestor but no ancestor LSN, we are responsible for
    1148              :                 // propagating the LSN chosen by the first shard to the other shards: it is important
    1149              :                 // that all shards end up with the same ancestor_start_lsn.
    1150          793 :                 if create_req.ancestor_timeline_id.is_some()
    1151          244 :                     && create_req.ancestor_start_lsn.is_none()
    1152          222 :                 {
    1153          222 :                     create_req.ancestor_start_lsn = shard_timeline_info.ancestor_lsn;
    1154          571 :                 }
    1155              : 
    1156              :                 // We will return the TimelineInfo from the first shard
    1157          793 :                 timeline_info = Some(shard_timeline_info);
    1158           28 :             }
    1159              :         }
    1160          793 :         Ok(timeline_info.expect("targets cannot be empty"))
    1161          797 :     }
    1162              : 
    1163            2 :     pub(crate) async fn tenant_timeline_delete(
    1164            2 :         &self,
    1165            2 :         tenant_id: TenantId,
    1166            2 :         timeline_id: TimelineId,
    1167            2 :     ) -> Result<StatusCode, ApiError> {
    1168            2 :         tracing::info!("Deleting timeline {}/{}", tenant_id, timeline_id,);
    1169              : 
    1170            2 :         self.ensure_attached_wait(tenant_id).await?;
    1171              : 
    1172              :         // TODO: refuse to do this if shard splitting is in progress
    1173            2 :         let targets = {
    1174            2 :             let locked = self.inner.read().unwrap();
    1175            2 :             let mut targets = Vec::new();
    1176              : 
    1177            4 :             for (tenant_shard_id, shard) in
    1178            2 :                 locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1179            4 :             {
    1180            4 :                 let node_id = shard.intent.attached.ok_or_else(|| {
    1181            0 :                     ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
    1182            4 :                 })?;
    1183            4 :                 let node = locked
    1184            4 :                     .nodes
    1185            4 :                     .get(&node_id)
    1186            4 :                     .expect("Pageservers may not be deleted while referenced");
    1187            4 : 
    1188            4 :                 targets.push((*tenant_shard_id, node.clone()));
    1189              :             }
    1190            2 :             targets
    1191            2 :         };
    1192            2 : 
    1193            2 :         if targets.is_empty() {
    1194            0 :             return Err(ApiError::NotFound(
    1195            0 :                 anyhow::anyhow!("Tenant not found").into(),
    1196            0 :             ));
    1197            2 :         }
    1198            2 : 
    1199            2 :         // TODO: call into shards concurrently
    1200            2 :         let mut any_pending = false;
    1201            6 :         for (tenant_shard_id, node) in targets {
    1202            4 :             let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
    1203              : 
    1204            4 :             tracing::info!(
    1205            4 :                 "Deleting timeline on shard {}/{}, attached to node {}",
    1206            4 :                 tenant_shard_id,
    1207            4 :                 timeline_id,
    1208            4 :                 node.id
    1209            4 :             );
    1210              : 
    1211            4 :             let status = client
    1212            4 :                 .timeline_delete(tenant_shard_id, timeline_id)
    1213           16 :                 .await
    1214            4 :                 .map_err(|e| {
    1215            0 :                     ApiError::InternalServerError(anyhow::anyhow!(
    1216            0 :                     "Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}",
    1217            0 :                     node.id
    1218            0 :                 ))
    1219            4 :                 })?;
    1220              : 
    1221            4 :             if status == StatusCode::ACCEPTED {
    1222            2 :                 any_pending = true;
    1223            2 :             }
    1224              :         }
    1225              : 
    1226            2 :         if any_pending {
    1227            1 :             Ok(StatusCode::ACCEPTED)
    1228              :         } else {
    1229            1 :             Ok(StatusCode::NOT_FOUND)
    1230              :         }
    1231            2 :     }
    1232              : 
    1233              :     /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
    1234              :     /// function looks it up and returns the url.  If the tenant isn't found, returns Err(ApiError::NotFound)
    1235           11 :     pub(crate) fn tenant_shard0_baseurl(
    1236           11 :         &self,
    1237           11 :         tenant_id: TenantId,
    1238           11 :     ) -> Result<(String, TenantShardId), ApiError> {
    1239           11 :         let locked = self.inner.read().unwrap();
    1240           11 :         let Some((tenant_shard_id, shard)) = locked
    1241           11 :             .tenants
    1242           11 :             .range(TenantShardId::tenant_range(tenant_id))
    1243           11 :             .next()
    1244              :         else {
    1245            6 :             return Err(ApiError::NotFound(
    1246            6 :                 anyhow::anyhow!("Tenant {tenant_id} not found").into(),
    1247            6 :             ));
    1248              :         };
    1249              : 
    1250              :         // TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
    1251              :         // point to somewhere we haven't attached yet.
    1252            5 :         let Some(node_id) = shard.intent.attached else {
    1253            0 :             return Err(ApiError::Conflict(
    1254            0 :                 "Cannot call timeline API on non-attached tenant".to_string(),
    1255            0 :             ));
    1256              :         };
    1257              : 
    1258            5 :         let Some(node) = locked.nodes.get(&node_id) else {
    1259              :             // This should never happen
    1260            0 :             return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1261            0 :                 "Shard refers to nonexistent node"
    1262            0 :             )));
    1263              :         };
    1264              : 
    1265            5 :         Ok((node.base_url(), *tenant_shard_id))
    1266           11 :     }
    1267              : 
    1268          823 :     pub(crate) fn tenant_locate(
    1269          823 :         &self,
    1270          823 :         tenant_id: TenantId,
    1271          823 :     ) -> Result<TenantLocateResponse, ApiError> {
    1272          823 :         let locked = self.inner.read().unwrap();
    1273          823 :         tracing::info!("Locating shards for tenant {tenant_id}");
    1274              : 
    1275              :         // Take a snapshot of pageservers
    1276          823 :         let pageservers = locked.nodes.clone();
    1277          823 : 
    1278          823 :         let mut result = Vec::new();
    1279          823 :         let mut shard_params: Option<ShardParameters> = None;
    1280              : 
    1281          892 :         for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id))
    1282              :         {
    1283          892 :             let node_id = shard
    1284          892 :                 .intent
    1285          892 :                 .attached
    1286          892 :                 .ok_or(ApiError::BadRequest(anyhow::anyhow!(
    1287          892 :                     "Cannot locate a tenant that is not attached"
    1288          892 :                 )))?;
    1289              : 
    1290          892 :             let node = pageservers
    1291          892 :                 .get(&node_id)
    1292          892 :                 .expect("Pageservers may not be deleted while referenced");
    1293          892 : 
    1294          892 :             result.push(TenantLocateResponseShard {
    1295          892 :                 shard_id: *tenant_shard_id,
    1296          892 :                 node_id,
    1297          892 :                 listen_http_addr: node.listen_http_addr.clone(),
    1298          892 :                 listen_http_port: node.listen_http_port,
    1299          892 :                 listen_pg_addr: node.listen_pg_addr.clone(),
    1300          892 :                 listen_pg_port: node.listen_pg_port,
    1301          892 :             });
    1302          892 : 
    1303          892 :             match &shard_params {
    1304          823 :                 None => {
    1305          823 :                     shard_params = Some(ShardParameters {
    1306          823 :                         stripe_size: shard.shard.stripe_size,
    1307          823 :                         count: shard.shard.count,
    1308          823 :                     });
    1309          823 :                 }
    1310           69 :                 Some(params) => {
    1311           69 :                     if params.stripe_size != shard.shard.stripe_size {
    1312              :                         // This should never happen.  We enforce at runtime because it's simpler than
    1313              :                         // adding an extra per-tenant data structure to store the things that should be the same
    1314            0 :                         return Err(ApiError::InternalServerError(anyhow::anyhow!(
    1315            0 :                             "Inconsistent shard stripe size parameters!"
    1316            0 :                         )));
    1317           69 :                     }
    1318              :                 }
    1319              :             }
    1320              :         }
    1321              : 
    1322          823 :         if result.is_empty() {
    1323            0 :             return Err(ApiError::NotFound(
    1324            0 :                 anyhow::anyhow!("No shards for this tenant ID found").into(),
    1325            0 :             ));
    1326          823 :         }
    1327          823 :         let shard_params = shard_params.expect("result is non-empty, therefore this is set");
    1328          823 :         tracing::info!(
    1329          823 :             "Located tenant {} with params {:?} on shards {}",
    1330          823 :             tenant_id,
    1331          823 :             shard_params,
    1332          823 :             result
    1333          823 :                 .iter()
    1334          892 :                 .map(|s| format!("{:?}", s))
    1335          823 :                 .collect::<Vec<_>>()
    1336          823 :                 .join(",")
    1337          823 :         );
    1338              : 
    1339          823 :         Ok(TenantLocateResponse {
    1340          823 :             shards: result,
    1341          823 :             shard_params,
    1342          823 :         })
    1343          823 :     }
    1344              : 
    1345            0 :     pub(crate) async fn tenant_shard_migrate(
    1346            0 :         &self,
    1347            0 :         tenant_shard_id: TenantShardId,
    1348            0 :         migrate_req: TenantShardMigrateRequest,
    1349            0 :     ) -> Result<TenantShardMigrateResponse, ApiError> {
    1350            0 :         let waiter = {
    1351            0 :             let mut locked = self.inner.write().unwrap();
    1352            0 : 
    1353            0 :             let result_tx = locked.result_tx.clone();
    1354            0 :             let pageservers = locked.nodes.clone();
    1355            0 :             let compute_hook = locked.compute_hook.clone();
    1356              : 
    1357            0 :             let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
    1358            0 :                 return Err(ApiError::NotFound(
    1359            0 :                     anyhow::anyhow!("Tenant shard not found").into(),
    1360            0 :                 ));
    1361              :             };
    1362              : 
    1363            0 :             if shard.intent.attached == Some(migrate_req.node_id) {
    1364              :                 // No-op case: we will still proceed to wait for reconciliation in case it is
    1365              :                 // incomplete from an earlier update to the intent.
    1366            0 :                 tracing::info!("Migrating: intent is unchanged {:?}", shard.intent);
    1367              :             } else {
    1368            0 :                 let old_attached = shard.intent.attached;
    1369            0 : 
    1370            0 :                 match shard.policy {
    1371            0 :                     PlacementPolicy::Single => {
    1372            0 :                         shard.intent.secondary.clear();
    1373            0 :                     }
    1374            0 :                     PlacementPolicy::Double(_n) => {
    1375            0 :                         // If our new attached node was a secondary, it no longer should be.
    1376            0 :                         shard.intent.secondary.retain(|s| s != &migrate_req.node_id);
    1377              : 
    1378              :                         // If we were already attached to something, demote that to a secondary
    1379            0 :                         if let Some(old_attached) = old_attached {
    1380            0 :                             shard.intent.secondary.push(old_attached);
    1381            0 :                         }
    1382              :                     }
    1383              :                     PlacementPolicy::Detached => {
    1384            0 :                         return Err(ApiError::BadRequest(anyhow::anyhow!(
    1385            0 :                             "Cannot migrate a tenant that is PlacementPolicy::Detached: configure it to an attached policy first"
    1386            0 :                         )))
    1387              :                     }
    1388              :                 }
    1389            0 :                 shard.intent.attached = Some(migrate_req.node_id);
    1390              : 
    1391            0 :                 tracing::info!("Migrating: new intent {:?}", shard.intent);
    1392            0 :                 shard.sequence = shard.sequence.next();
    1393              :             }
    1394              : 
    1395            0 :             shard.maybe_reconcile(
    1396            0 :                 result_tx,
    1397            0 :                 &pageservers,
    1398            0 :                 &compute_hook,
    1399            0 :                 &self.config,
    1400            0 :                 &self.persistence,
    1401            0 :             )
    1402              :         };
    1403              : 
    1404            0 :         if let Some(waiter) = waiter {
    1405            0 :             waiter.wait_timeout(RECONCILE_TIMEOUT).await?;
    1406              :         } else {
    1407            0 :             tracing::warn!("Migration is a no-op");
    1408              :         }
    1409              : 
    1410            0 :         Ok(TenantShardMigrateResponse {})
    1411            0 :     }
    1412              : 
    1413            2 :     pub(crate) async fn node_list(&self) -> Result<Vec<NodePersistence>, ApiError> {
    1414              :         // It is convenient to avoid taking the big lock and converting Node to a serializable
    1415              :         // structure, by fetching from storage instead of reading in-memory state.
    1416            2 :         let nodes = self
    1417            2 :             .persistence
    1418            2 :             .list_nodes()
    1419            2 :             .await?
    1420            2 :             .into_iter()
    1421            5 :             .map(|n| n.to_persistent())
    1422            2 :             .collect();
    1423            2 : 
    1424            2 :         Ok(nodes)
    1425            2 :     }
    1426              : 
    1427          603 :     pub(crate) async fn node_register(
    1428          603 :         &self,
    1429          603 :         register_req: NodeRegisterRequest,
    1430          603 :     ) -> Result<(), ApiError> {
    1431          603 :         // Pre-check for an already-existing node
    1432          603 :         {
    1433          603 :             let locked = self.inner.read().unwrap();
    1434          603 :             if let Some(node) = locked.nodes.get(®ister_req.node_id) {
    1435              :                 // Note that we do not do a total equality of the struct, because we don't require
    1436              :                 // the availability/scheduling states to agree for a POST to be idempotent.
    1437          215 :                 if node.listen_http_addr == register_req.listen_http_addr
    1438          215 :                     && node.listen_http_port == register_req.listen_http_port
    1439          215 :                     && node.listen_pg_addr == register_req.listen_pg_addr
    1440          215 :                     && node.listen_pg_port == register_req.listen_pg_port
    1441              :                 {
    1442          215 :                     tracing::info!(
    1443          215 :                         "Node {} re-registered with matching address",
    1444          215 :                         register_req.node_id
    1445          215 :                     );
    1446          215 :                     return Ok(());
    1447              :                 } else {
    1448              :                     // TODO: decide if we want to allow modifying node addresses without removing and re-adding
    1449              :                     // the node.  Safest/simplest thing is to refuse it, and usually we deploy with
    1450              :                     // a fixed address through the lifetime of a node.
    1451            0 :                     tracing::warn!(
    1452            0 :                         "Node {} tried to register with different address",
    1453            0 :                         register_req.node_id
    1454            0 :                     );
    1455            0 :                     return Err(ApiError::Conflict(
    1456            0 :                         "Node is already registered with different address".to_string(),
    1457            0 :                     ));
    1458              :                 }
    1459          388 :             }
    1460          388 :         }
    1461          388 : 
    1462          388 :         // Ordering: we must persist the new node _before_ adding it to in-memory state.
    1463          388 :         // This ensures that before we use it for anything or expose it via any external
    1464          388 :         // API, it is guaranteed to be available after a restart.
    1465          388 :         let new_node = Node {
    1466          388 :             id: register_req.node_id,
    1467          388 :             listen_http_addr: register_req.listen_http_addr,
    1468          388 :             listen_http_port: register_req.listen_http_port,
    1469          388 :             listen_pg_addr: register_req.listen_pg_addr,
    1470          388 :             listen_pg_port: register_req.listen_pg_port,
    1471          388 :             scheduling: NodeSchedulingPolicy::Filling,
    1472          388 :             // TODO: we shouldn't really call this Active until we've heartbeated it.
    1473          388 :             availability: NodeAvailability::Active,
    1474          388 :         };
    1475          388 :         // TODO: idempotency if the node already exists in the database
    1476          388 :         self.persistence.insert_node(&new_node).await?;
    1477              : 
    1478          388 :         let mut locked = self.inner.write().unwrap();
    1479          388 :         let mut new_nodes = (*locked.nodes).clone();
    1480          388 : 
    1481          388 :         new_nodes.insert(register_req.node_id, new_node);
    1482          388 : 
    1483          388 :         locked.nodes = Arc::new(new_nodes);
    1484              : 
    1485          388 :         tracing::info!(
    1486          388 :             "Registered pageserver {}, now have {} pageservers",
    1487          388 :             register_req.node_id,
    1488          388 :             locked.nodes.len()
    1489          388 :         );
    1490          388 :         Ok(())
    1491          603 :     }
    1492              : 
    1493            4 :     pub(crate) fn node_configure(&self, config_req: NodeConfigureRequest) -> Result<(), ApiError> {
    1494            4 :         let mut locked = self.inner.write().unwrap();
    1495            4 :         let result_tx = locked.result_tx.clone();
    1496            4 :         let compute_hook = locked.compute_hook.clone();
    1497            4 : 
    1498            4 :         let mut new_nodes = (*locked.nodes).clone();
    1499              : 
    1500            4 :         let Some(node) = new_nodes.get_mut(&config_req.node_id) else {
    1501            0 :             return Err(ApiError::NotFound(
    1502            0 :                 anyhow::anyhow!("Node not registered").into(),
    1503            0 :             ));
    1504              :         };
    1505              : 
    1506            4 :         let mut offline_transition = false;
    1507            4 :         let mut active_transition = false;
    1508              : 
    1509            4 :         if let Some(availability) = &config_req.availability {
    1510            3 :             match (availability, &node.availability) {
    1511              :                 (NodeAvailability::Offline, NodeAvailability::Active) => {
    1512            2 :                     tracing::info!("Node {} transition to offline", config_req.node_id);
    1513            2 :                     offline_transition = true;
    1514              :                 }
    1515              :                 (NodeAvailability::Active, NodeAvailability::Offline) => {
    1516            1 :                     tracing::info!("Node {} transition to active", config_req.node_id);
    1517            1 :                     active_transition = true;
    1518              :                 }
    1519              :                 _ => {
    1520            0 :                     tracing::info!("Node {} no change during config", config_req.node_id);
    1521              :                     // No change
    1522              :                 }
    1523              :             };
    1524            3 :             node.availability = *availability;
    1525            1 :         }
    1526              : 
    1527            4 :         if let Some(scheduling) = config_req.scheduling {
    1528            1 :             node.scheduling = scheduling;
    1529            1 : 
    1530            1 :             // TODO: once we have a background scheduling ticker for fill/drain, kick it
    1531            1 :             // to wake up and start working.
    1532            3 :         }
    1533              : 
    1534            4 :         let new_nodes = Arc::new(new_nodes);
    1535            4 : 
    1536            4 :         let mut scheduler = Scheduler::new(&locked.tenants, &new_nodes);
    1537            4 :         if offline_transition {
    1538           13 :             for (tenant_shard_id, tenant_state) in &mut locked.tenants {
    1539            5 :                 if let Some(observed_loc) =
    1540           13 :                     tenant_state.observed.locations.get_mut(&config_req.node_id)
    1541            5 :                 {
    1542            5 :                     // When a node goes offline, we set its observed configuration to None, indicating unknown: we will
    1543            5 :                     // not assume our knowledge of the node's configuration is accurate until it comes back online
    1544            5 :                     observed_loc.conf = None;
    1545            8 :                 }
    1546              : 
    1547           13 :                 if tenant_state.intent.notify_offline(config_req.node_id) {
    1548            5 :                     tenant_state.sequence = tenant_state.sequence.next();
    1549            5 :                     match tenant_state.schedule(&mut scheduler) {
    1550            0 :                         Err(e) => {
    1551            0 :                             // It is possible that some tenants will become unschedulable when too many pageservers
    1552            0 :                             // go offline: in this case there isn't much we can do other than make the issue observable.
    1553            0 :                             // TODO: give TenantState a scheduling error attribute to be queried later.
    1554            0 :                             tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id);
    1555              :                         }
    1556            5 :                         Ok(()) => {
    1557            5 :                             tenant_state.maybe_reconcile(
    1558            5 :                                 result_tx.clone(),
    1559            5 :                                 &new_nodes,
    1560            5 :                                 &compute_hook,
    1561            5 :                                 &self.config,
    1562            5 :                                 &self.persistence,
    1563            5 :                             );
    1564            5 :                         }
    1565              :                     }
    1566            8 :                 }
    1567              :             }
    1568            2 :         }
    1569              : 
    1570            4 :         if active_transition {
    1571              :             // When a node comes back online, we must reconcile any tenant that has a None observed
    1572              :             // location on the node.
    1573           12 :             for tenant_state in locked.tenants.values_mut() {
    1574            4 :                 if let Some(observed_loc) =
    1575           12 :                     tenant_state.observed.locations.get_mut(&config_req.node_id)
    1576              :                 {
    1577            4 :                     if observed_loc.conf.is_none() {
    1578            4 :                         tenant_state.maybe_reconcile(
    1579            4 :                             result_tx.clone(),
    1580            4 :                             &new_nodes,
    1581            4 :                             &compute_hook,
    1582            4 :                             &self.config,
    1583            4 :                             &self.persistence,
    1584            4 :                         );
    1585            4 :                     }
    1586            8 :                 }
    1587              :             }
    1588              : 
    1589              :             // TODO: in the background, we should balance work back onto this pageserver
    1590            3 :         }
    1591              : 
    1592            4 :         locked.nodes = new_nodes;
    1593            4 : 
    1594            4 :         Ok(())
    1595            4 :     }
    1596              : 
    1597              :     /// Helper for methods that will try and call pageserver APIs for
    1598              :     /// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
    1599              :     /// is attached somewhere.
    1600          799 :     fn ensure_attached_schedule(
    1601          799 :         &self,
    1602          799 :         mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,
    1603          799 :         tenant_id: TenantId,
    1604          799 :     ) -> Result<Vec<ReconcilerWaiter>, anyhow::Error> {
    1605          799 :         let mut waiters = Vec::new();
    1606          799 :         let result_tx = locked.result_tx.clone();
    1607          799 :         let compute_hook = locked.compute_hook.clone();
    1608          799 :         let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
    1609          799 :         let pageservers = locked.nodes.clone();
    1610              : 
    1611          829 :         for (_tenant_shard_id, shard) in locked
    1612          799 :             .tenants
    1613          799 :             .range_mut(TenantShardId::tenant_range(tenant_id))
    1614              :         {
    1615          829 :             shard.schedule(&mut scheduler)?;
    1616              : 
    1617          829 :             if let Some(waiter) = shard.maybe_reconcile(
    1618          829 :                 result_tx.clone(),
    1619          829 :                 &pageservers,
    1620          829 :                 &compute_hook,
    1621          829 :                 &self.config,
    1622          829 :                 &self.persistence,
    1623          829 :             ) {
    1624            0 :                 waiters.push(waiter);
    1625          829 :             }
    1626              :         }
    1627          799 :         Ok(waiters)
    1628          799 :     }
    1629              : 
    1630          799 :     async fn ensure_attached_wait(&self, tenant_id: TenantId) -> Result<(), ApiError> {
    1631          799 :         let ensure_waiters = {
    1632          799 :             let locked = self.inner.write().unwrap();
    1633          799 : 
    1634          799 :             self.ensure_attached_schedule(locked, tenant_id)
    1635          799 :                 .map_err(ApiError::InternalServerError)?
    1636              :         };
    1637              : 
    1638          799 :         let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap();
    1639          799 :         for waiter in ensure_waiters {
    1640            0 :             let timeout = deadline.duration_since(Instant::now());
    1641            0 :             waiter.wait_timeout(timeout).await?;
    1642              :         }
    1643              : 
    1644          799 :         Ok(())
    1645          799 :     }
    1646              : 
    1647              :     /// Check all tenants for pending reconciliation work, and reconcile those in need
    1648              :     ///
    1649              :     /// Returns how many reconciliation tasks were started
    1650          361 :     fn reconcile_all(&self) -> usize {
    1651          361 :         let mut locked = self.inner.write().unwrap();
    1652          361 :         let result_tx = locked.result_tx.clone();
    1653          361 :         let compute_hook = locked.compute_hook.clone();
    1654          361 :         let pageservers = locked.nodes.clone();
    1655          361 :         locked
    1656          361 :             .tenants
    1657          361 :             .iter_mut()
    1658          361 :             .filter_map(|(_tenant_shard_id, shard)| {
    1659            9 :                 shard.maybe_reconcile(
    1660            9 :                     result_tx.clone(),
    1661            9 :                     &pageservers,
    1662            9 :                     &compute_hook,
    1663            9 :                     &self.config,
    1664            9 :                     &self.persistence,
    1665            9 :                 )
    1666          361 :             })
    1667          361 :             .count()
    1668          361 :     }
    1669              : }
         |