Line data Source code
1 : use std::{
2 : cmp::Ordering,
3 : collections::{BTreeMap, HashMap, HashSet},
4 : str::FromStr,
5 : sync::Arc,
6 : time::{Duration, Instant},
7 : };
8 :
9 : use control_plane::attachment_service::{
10 : AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, NodeAvailability,
11 : NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, TenantCreateResponse,
12 : TenantCreateResponseShard, TenantLocateResponse, TenantLocateResponseShard,
13 : TenantShardMigrateRequest, TenantShardMigrateResponse,
14 : };
15 : use diesel::result::DatabaseErrorKind;
16 : use futures::StreamExt;
17 : use hyper::StatusCode;
18 : use pageserver_api::{
19 : control_api::{
20 : ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
21 : ValidateResponse, ValidateResponseTenant,
22 : },
23 : models,
24 : models::{
25 : LocationConfig, LocationConfigMode, ShardParameters, TenantConfig, TenantCreateRequest,
26 : TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation,
27 : TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
28 : },
29 : shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
30 : };
31 : use pageserver_client::mgmt_api;
32 : use tokio_util::sync::CancellationToken;
33 : use utils::{
34 : backoff,
35 : completion::Barrier,
36 : generation::Generation,
37 : http::error::ApiError,
38 : id::{NodeId, TenantId, TimelineId},
39 : seqwait::SeqWait,
40 : };
41 :
42 : use crate::{
43 : compute_hook::{self, ComputeHook},
44 : node::Node,
45 : persistence::{
46 : split_state::SplitState, DatabaseError, NodePersistence, Persistence,
47 : TenantShardPersistence,
48 : },
49 : reconciler::attached_location_conf,
50 : scheduler::Scheduler,
51 : tenant_state::{
52 : IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
53 : ReconcilerWaiter, TenantState,
54 : },
55 : PlacementPolicy, Sequence,
56 : };
57 :
58 : const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
59 :
60 : /// How long [`Service::startup_reconcile`] is allowed to take before it should give
61 : /// up on unresponsive pageservers and proceed.
62 : pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
63 :
64 : // Top level state available to all HTTP handlers
65 : struct ServiceState {
66 : tenants: BTreeMap<TenantShardId, TenantState>,
67 :
68 : nodes: Arc<HashMap<NodeId, Node>>,
69 :
70 : compute_hook: Arc<ComputeHook>,
71 :
72 : result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
73 : }
74 :
75 : impl ServiceState {
76 366 : fn new(
77 366 : config: Config,
78 366 : result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
79 366 : nodes: HashMap<NodeId, Node>,
80 366 : tenants: BTreeMap<TenantShardId, TenantState>,
81 366 : ) -> Self {
82 366 : Self {
83 366 : tenants,
84 366 : nodes: Arc::new(nodes),
85 366 : compute_hook: Arc::new(ComputeHook::new(config)),
86 366 : result_tx,
87 366 : }
88 366 : }
89 : }
90 :
91 869 : #[derive(Clone)]
92 : pub struct Config {
93 : // All pageservers managed by one instance of this service must have
94 : // the same public key. This JWT token will be used to authenticate
95 : // this service to the pageservers it manages.
96 : pub jwt_token: Option<String>,
97 :
98 : // This JWT token will be used to authenticate this service to the control plane.
99 : pub control_plane_jwt_token: Option<String>,
100 :
101 : /// Where the compute hook should send notifications of pageserver attachment locations
102 : /// (this URL points to the control plane in prod). If this is None, the compute hook will
103 : /// assume it is running in a test environment and try to update neon_local.
104 : pub compute_hook_url: Option<String>,
105 : }
106 :
107 : impl From<DatabaseError> for ApiError {
108 0 : fn from(err: DatabaseError) -> ApiError {
109 0 : match err {
110 0 : DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
111 : // FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
112 : DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
113 0 : ApiError::ShuttingDown
114 : }
115 0 : DatabaseError::Logical(reason) => {
116 0 : ApiError::InternalServerError(anyhow::anyhow!(reason))
117 : }
118 : }
119 0 : }
120 : }
121 :
122 : pub struct Service {
123 : inner: Arc<std::sync::RwLock<ServiceState>>,
124 : config: Config,
125 : persistence: Arc<Persistence>,
126 :
127 : /// This waits for initial reconciliation with pageservers to complete. Until this barrier
128 : /// passes, it isn't safe to do any actions that mutate tenants.
129 : pub(crate) startup_complete: Barrier,
130 : }
131 :
132 : impl From<ReconcileWaitError> for ApiError {
133 1 : fn from(value: ReconcileWaitError) -> Self {
134 1 : match value {
135 0 : ReconcileWaitError::Shutdown => ApiError::ShuttingDown,
136 0 : e @ ReconcileWaitError::Timeout(_) => ApiError::Timeout(format!("{e}").into()),
137 1 : e @ ReconcileWaitError::Failed(..) => ApiError::InternalServerError(anyhow::anyhow!(e)),
138 : }
139 1 : }
140 : }
141 :
142 : impl Service {
143 5 : pub fn get_config(&self) -> &Config {
144 5 : &self.config
145 5 : }
146 :
147 : /// TODO: don't allow other API calls until this is done, don't start doing any background housekeeping
148 : /// until this is done.
149 366 : async fn startup_reconcile(&self) {
150 366 : // For all tenant shards, a vector of observed states on nodes (where None means
151 366 : // indeterminate, same as in [`ObservedStateLocation`])
152 366 : let mut observed = HashMap::new();
153 366 :
154 366 : let mut nodes_online = HashSet::new();
155 366 :
156 366 : // TODO: give Service a cancellation token for clean shutdown
157 366 : let cancel = CancellationToken::new();
158 366 :
159 366 : // TODO: issue these requests concurrently
160 366 : {
161 366 : let nodes = {
162 366 : let locked = self.inner.read().unwrap();
163 366 : locked.nodes.clone()
164 : };
165 366 : for node in nodes.values() {
166 10 : let http_client = reqwest::ClientBuilder::new()
167 10 : .timeout(Duration::from_secs(5))
168 10 : .build()
169 10 : .expect("Failed to construct HTTP client");
170 10 : let client = mgmt_api::Client::from_client(
171 10 : http_client,
172 10 : node.base_url(),
173 10 : self.config.jwt_token.as_deref(),
174 10 : );
175 :
176 12 : fn is_fatal(e: &mgmt_api::Error) -> bool {
177 12 : use mgmt_api::Error::*;
178 12 : match e {
179 12 : ReceiveBody(_) | ReceiveErrorBody(_) => false,
180 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
181 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
182 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
183 0 : ApiError(_, _) => true,
184 : }
185 12 : }
186 :
187 10 : let list_response = backoff::retry(
188 18 : || client.list_location_config(),
189 10 : is_fatal,
190 10 : 1,
191 10 : 5,
192 10 : "Location config listing",
193 10 : &cancel,
194 10 : )
195 52 : .await;
196 7 : let Some(list_response) = list_response else {
197 0 : tracing::info!("Shutdown during startup_reconcile");
198 0 : return;
199 : };
200 :
201 7 : tracing::info!("Scanning shards on node {}...", node.id);
202 7 : match list_response {
203 1 : Err(e) => {
204 1 : tracing::warn!("Could not contact pageserver {} ({e})", node.id);
205 : // TODO: be more tolerant, do some retries, in case
206 : // pageserver is being restarted at the same time as we are
207 : }
208 6 : Ok(listing) => {
209 6 : tracing::info!(
210 6 : "Received {} shard statuses from pageserver {}, setting it to Active",
211 6 : listing.tenant_shards.len(),
212 6 : node.id
213 6 : );
214 6 : nodes_online.insert(node.id);
215 :
216 11 : for (tenant_shard_id, conf_opt) in listing.tenant_shards {
217 5 : observed.insert(tenant_shard_id, (node.id, conf_opt));
218 5 : }
219 : }
220 : }
221 : }
222 : }
223 :
224 363 : let mut cleanup = Vec::new();
225 363 :
226 363 : let mut compute_notifications = Vec::new();
227 :
228 : // Populate intent and observed states for all tenants, based on reported state on pageservers
229 363 : let (shard_count, nodes) = {
230 363 : let mut locked = self.inner.write().unwrap();
231 363 :
232 363 : // Mark nodes online if they responded to us: nodes are offline by default after a restart.
233 363 : let mut nodes = (*locked.nodes).clone();
234 363 : for (node_id, node) in nodes.iter_mut() {
235 7 : if nodes_online.contains(node_id) {
236 6 : node.availability = NodeAvailability::Active;
237 6 : }
238 : }
239 363 : locked.nodes = Arc::new(nodes);
240 363 : let nodes = locked.nodes.clone();
241 :
242 368 : for (tenant_shard_id, (node_id, observed_loc)) in observed {
243 5 : let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else {
244 0 : cleanup.push((tenant_shard_id, node_id));
245 0 : continue;
246 : };
247 :
248 5 : tenant_state
249 5 : .observed
250 5 : .locations
251 5 : .insert(node_id, ObservedStateLocation { conf: observed_loc });
252 : }
253 :
254 : // Populate each tenant's intent state
255 363 : let mut scheduler = Scheduler::new(&locked.tenants, &nodes);
256 363 : for (tenant_shard_id, tenant_state) in locked.tenants.iter_mut() {
257 8 : tenant_state.intent_from_observed();
258 8 : if let Err(e) = tenant_state.schedule(&mut scheduler) {
259 : // Non-fatal error: we are unable to properly schedule the tenant, perhaps because
260 : // not enough pageservers are available. The tenant may well still be available
261 : // to clients.
262 0 : tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}");
263 : } else {
264 : // If we're both intending and observed to be attached at a particular node, we will
265 : // emit a compute notification for this. In the case where our observed state does not
266 : // yet match our intent, we will eventually reconcile, and that will emit a compute notification.
267 8 : if let Some(attached_at) = tenant_state.stably_attached() {
268 5 : compute_notifications.push((*tenant_shard_id, attached_at));
269 5 : }
270 : }
271 : }
272 :
273 363 : (locked.tenants.len(), nodes)
274 : };
275 :
276 : // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
277 : // generation_pageserver in the database.
278 :
279 : // Clean up any tenants that were found on pageservers but are not known to us.
280 363 : for (tenant_shard_id, node_id) in cleanup {
281 : // A node reported a tenant_shard_id which is unknown to us: detach it.
282 0 : let node = nodes
283 0 : .get(&node_id)
284 0 : .expect("Always exists: only known nodes are scanned");
285 0 :
286 0 : let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
287 0 : match client
288 0 : .location_config(
289 0 : tenant_shard_id,
290 0 : LocationConfig {
291 0 : mode: LocationConfigMode::Detached,
292 0 : generation: None,
293 0 : secondary_conf: None,
294 0 : shard_number: tenant_shard_id.shard_number.0,
295 0 : shard_count: tenant_shard_id.shard_count.0,
296 0 : shard_stripe_size: 0,
297 0 : tenant_conf: models::TenantConfig::default(),
298 0 : },
299 0 : None,
300 0 : )
301 0 : .await
302 : {
303 : Ok(()) => {
304 0 : tracing::info!(
305 0 : "Detached unknown shard {tenant_shard_id} on pageserver {node_id}"
306 0 : );
307 : }
308 0 : Err(e) => {
309 : // Non-fatal error: leaving a tenant shard behind that we are not managing shouldn't
310 : // break anything.
311 0 : tracing::error!(
312 0 : "Failed to detach unknkown shard {tenant_shard_id} on pageserver {node_id}: {e}"
313 0 : );
314 : }
315 : }
316 : }
317 :
318 : // Emit compute hook notifications for all tenants which are already stably attached. Other tenants
319 : // will emit compute hook notifications when they reconcile.
320 : //
321 : // Ordering: we must complete these notification attempts before doing any other reconciliation for the
322 : // tenants named here, because otherwise our calls to notify() might race with more recent values
323 : // generated by reconciliation.
324 :
325 : // Compute notify is fallible. If it fails here, do not delay overall startup: set the
326 : // flag on these shards that they have a pending notification.
327 363 : let compute_hook = self.inner.read().unwrap().compute_hook.clone();
328 363 :
329 363 : // Construct an async stream of futures to invoke the compute notify function: we do this
330 363 : // in order to subsequently use .buffered() on the stream to execute with bounded parallelism.
331 363 : let stream = futures::stream::iter(compute_notifications.into_iter())
332 363 : .map(|(tenant_shard_id, node_id)| {
333 5 : let compute_hook = compute_hook.clone();
334 5 : let cancel = cancel.clone();
335 5 : async move {
336 5 : if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
337 0 : tracing::error!(
338 0 : tenant_shard_id=%tenant_shard_id,
339 0 : node_id=%node_id,
340 0 : "Failed to notify compute on startup for shard: {e}"
341 0 : );
342 0 : Some(tenant_shard_id)
343 : } else {
344 5 : None
345 : }
346 5 : }
347 363 : })
348 363 : .buffered(compute_hook::API_CONCURRENCY);
349 363 : let notify_results = stream.collect::<Vec<_>>().await;
350 :
351 : // Update tenant state for any that failed to do their initial compute notify, so that they'll retry later.
352 : {
353 363 : let mut locked = self.inner.write().unwrap();
354 363 : for tenant_shard_id in notify_results.into_iter().flatten() {
355 0 : if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
356 0 : shard.pending_compute_notification = true;
357 0 : }
358 : }
359 : }
360 :
361 : // Finally, now that the service is up and running, launch reconcile operations for any tenants
362 : // which require it: under normal circumstances this should only include tenants that were in some
363 : // transient state before we restarted, or any tenants whose compute hooks failed above.
364 363 : let reconcile_tasks = self.reconcile_all();
365 : // We will not wait for these reconciliation tasks to run here: we're now done with startup and
366 : // normal operations may proceed.
367 :
368 363 : tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
369 363 : }
370 :
371 366 : pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
372 366 : let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel();
373 :
374 366 : tracing::info!("Loading nodes from database...");
375 366 : let nodes = persistence.list_nodes().await?;
376 366 : let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.id, n)).collect();
377 366 : tracing::info!("Loaded {} nodes from database.", nodes.len());
378 :
379 366 : tracing::info!("Loading shards from database...");
380 728 : let tenant_shard_persistence = persistence.list_tenant_shards().await?;
381 366 : tracing::info!(
382 366 : "Loaded {} shards from database.",
383 366 : tenant_shard_persistence.len()
384 366 : );
385 :
386 366 : let mut tenants = BTreeMap::new();
387 :
388 378 : for tsp in tenant_shard_persistence {
389 12 : let tenant_shard_id = TenantShardId {
390 12 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
391 12 : shard_number: ShardNumber(tsp.shard_number as u8),
392 12 : shard_count: ShardCount(tsp.shard_count as u8),
393 : };
394 12 : let shard_identity = if tsp.shard_count == 0 {
395 12 : ShardIdentity::unsharded()
396 : } else {
397 0 : ShardIdentity::new(
398 0 : ShardNumber(tsp.shard_number as u8),
399 0 : ShardCount(tsp.shard_count as u8),
400 0 : ShardStripeSize(tsp.shard_stripe_size as u32),
401 0 : )?
402 : };
403 :
404 : // We will populate intent properly later in [`Self::startup_reconcile`], initially populate
405 : // it with what we can infer: the node for which a generation was most recently issued.
406 12 : let mut intent = IntentState::new();
407 12 : if tsp.generation_pageserver != i64::MAX {
408 11 : intent.attached = Some(NodeId(tsp.generation_pageserver as u64))
409 1 : }
410 :
411 12 : let new_tenant = TenantState {
412 12 : tenant_shard_id,
413 12 : shard: shard_identity,
414 12 : sequence: Sequence::initial(),
415 12 : generation: Generation::new(tsp.generation as u32),
416 12 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
417 12 : intent,
418 12 : observed: ObservedState::new(),
419 12 : config: serde_json::from_str(&tsp.config).unwrap(),
420 12 : reconciler: None,
421 12 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
422 12 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
423 12 : last_error: Arc::default(),
424 12 : pending_compute_notification: false,
425 12 : };
426 12 :
427 12 : tenants.insert(tenant_shard_id, new_tenant);
428 : }
429 :
430 366 : let (startup_completion, startup_complete) = utils::completion::channel();
431 366 :
432 366 : let this = Arc::new(Self {
433 366 : inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
434 366 : config.clone(),
435 366 : result_tx,
436 366 : nodes,
437 366 : tenants,
438 366 : ))),
439 366 : config,
440 366 : persistence,
441 366 : startup_complete: startup_complete.clone(),
442 366 : });
443 366 :
444 366 : let result_task_this = this.clone();
445 366 : tokio::task::spawn(async move {
446 867 : while let Some(result) = result_rx.recv().await {
447 501 : tracing::info!(
448 501 : "Reconcile result for sequence {}, ok={}",
449 501 : result.sequence,
450 501 : result.result.is_ok()
451 501 : );
452 501 : let mut locked = result_task_this.inner.write().unwrap();
453 501 : let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
454 : // A reconciliation result might race with removing a tenant: drop results for
455 : // tenants that aren't in our map.
456 0 : continue;
457 : };
458 :
459 : // Usually generation should only be updated via this path, so the max() isn't
460 : // needed, but it is used to handle out-of-band updates via. e.g. test hook.
461 501 : tenant.generation = std::cmp::max(tenant.generation, result.generation);
462 501 :
463 501 : // If the reconciler signals that it failed to notify compute, set this state on
464 501 : // the shard so that a future [`TenantState::maybe_reconcile`] will try again.
465 501 : tenant.pending_compute_notification = result.pending_compute_notification;
466 501 :
467 501 : match result.result {
468 : Ok(()) => {
469 1009 : for (node_id, loc) in &result.observed.locations {
470 509 : if let Some(conf) = &loc.conf {
471 509 : tracing::info!(
472 509 : "Updating observed location {}: {:?}",
473 509 : node_id,
474 509 : conf
475 509 : );
476 : } else {
477 0 : tracing::info!("Setting observed location {} to None", node_id,)
478 : }
479 : }
480 500 : tenant.observed = result.observed;
481 500 : tenant.waiter.advance(result.sequence);
482 : }
483 1 : Err(e) => {
484 1 : tracing::warn!(
485 1 : "Reconcile error on tenant {}: {}",
486 1 : tenant.tenant_shard_id,
487 1 : e
488 1 : );
489 :
490 : // Ordering: populate last_error before advancing error_seq,
491 : // so that waiters will see the correct error after waiting.
492 1 : *(tenant.last_error.lock().unwrap()) = format!("{e}");
493 1 : tenant.error_waiter.advance(result.sequence);
494 :
495 2 : for (node_id, o) in result.observed.locations {
496 1 : tenant.observed.locations.insert(node_id, o);
497 1 : }
498 : }
499 : }
500 : }
501 366 : });
502 366 :
503 366 : let startup_reconcile_this = this.clone();
504 366 : tokio::task::spawn(async move {
505 366 : // Block the [`Service::startup_complete`] barrier until we're done
506 366 : let _completion = startup_completion;
507 366 :
508 366 : startup_reconcile_this.startup_reconcile().await
509 366 : });
510 366 :
511 366 : Ok(this)
512 366 : }
513 :
514 207 : pub(crate) async fn attach_hook(
515 207 : &self,
516 207 : attach_req: AttachHookRequest,
517 207 : ) -> anyhow::Result<AttachHookResponse> {
518 207 : // This is a test hook. To enable using it on tenants that were created directly with
519 207 : // the pageserver API (not via this service), we will auto-create any missing tenant
520 207 : // shards with default state.
521 207 : let insert = {
522 207 : let locked = self.inner.write().unwrap();
523 207 : !locked.tenants.contains_key(&attach_req.tenant_shard_id)
524 207 : };
525 207 : if insert {
526 15 : let tsp = TenantShardPersistence {
527 15 : tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
528 15 : shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
529 15 : shard_count: attach_req.tenant_shard_id.shard_count.0 as i32,
530 15 : shard_stripe_size: 0,
531 15 : generation: 0,
532 15 : generation_pageserver: i64::MAX,
533 15 : placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
534 15 : config: serde_json::to_string(&TenantConfig::default()).unwrap(),
535 15 : splitting: SplitState::default(),
536 15 : };
537 15 :
538 16 : match self.persistence.insert_tenant_shards(vec![tsp]).await {
539 2 : Err(e) => match e {
540 : DatabaseError::Query(diesel::result::Error::DatabaseError(
541 : DatabaseErrorKind::UniqueViolation,
542 : _,
543 : )) => {
544 2 : tracing::info!(
545 2 : "Raced with another request to insert tenant {}",
546 2 : attach_req.tenant_shard_id
547 2 : )
548 : }
549 0 : _ => return Err(e.into()),
550 : },
551 : Ok(()) => {
552 13 : tracing::info!("Inserted shard {} in database", attach_req.tenant_shard_id);
553 :
554 13 : let mut locked = self.inner.write().unwrap();
555 13 : locked.tenants.insert(
556 13 : attach_req.tenant_shard_id,
557 13 : TenantState::new(
558 13 : attach_req.tenant_shard_id,
559 13 : ShardIdentity::unsharded(),
560 13 : PlacementPolicy::Single,
561 13 : ),
562 13 : );
563 13 : tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
564 : }
565 : }
566 192 : }
567 :
568 207 : let new_generation = if let Some(req_node_id) = attach_req.node_id {
569 : Some(
570 200 : self.persistence
571 200 : .increment_generation(attach_req.tenant_shard_id, req_node_id)
572 216 : .await?,
573 : )
574 : } else {
575 7 : self.persistence.detach(attach_req.tenant_shard_id).await?;
576 7 : None
577 : };
578 :
579 207 : let mut locked = self.inner.write().unwrap();
580 207 : let tenant_state = locked
581 207 : .tenants
582 207 : .get_mut(&attach_req.tenant_shard_id)
583 207 : .expect("Checked for existence above");
584 :
585 207 : if let Some(new_generation) = new_generation {
586 200 : tenant_state.generation = new_generation;
587 200 : } else {
588 : // This is a detach notification. We must update placement policy to avoid re-attaching
589 : // during background scheduling/reconciliation, or during attachment service restart.
590 7 : assert!(attach_req.node_id.is_none());
591 7 : tenant_state.policy = PlacementPolicy::Detached;
592 : }
593 :
594 207 : if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
595 200 : tracing::info!(
596 200 : tenant_id = %attach_req.tenant_shard_id,
597 200 : ps_id = %attaching_pageserver,
598 200 : generation = ?tenant_state.generation,
599 200 : "issuing",
600 200 : );
601 7 : } else if let Some(ps_id) = tenant_state.intent.attached {
602 7 : tracing::info!(
603 7 : tenant_id = %attach_req.tenant_shard_id,
604 7 : %ps_id,
605 7 : generation = ?tenant_state.generation,
606 7 : "dropping",
607 7 : );
608 : } else {
609 0 : tracing::info!(
610 0 : tenant_id = %attach_req.tenant_shard_id,
611 0 : "no-op: tenant already has no pageserver");
612 : }
613 207 : tenant_state.intent.attached = attach_req.node_id;
614 :
615 207 : tracing::info!(
616 207 : "attach_hook: tenant {} set generation {:?}, pageserver {}",
617 207 : attach_req.tenant_shard_id,
618 207 : tenant_state.generation,
619 207 : // TODO: this is an odd number of 0xf's
620 207 : attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
621 207 : );
622 :
623 207 : Ok(AttachHookResponse {
624 207 : gen: attach_req
625 207 : .node_id
626 207 : .map(|_| tenant_state.generation.into().unwrap()),
627 207 : })
628 207 : }
629 :
630 73 : pub(crate) fn inspect(&self, inspect_req: InspectRequest) -> InspectResponse {
631 73 : let locked = self.inner.read().unwrap();
632 73 :
633 73 : let tenant_state = locked.tenants.get(&inspect_req.tenant_shard_id);
634 73 :
635 73 : InspectResponse {
636 73 : attachment: tenant_state.and_then(|s| {
637 73 : s.intent
638 73 : .attached
639 73 : .map(|ps| (s.generation.into().unwrap(), ps))
640 73 : }),
641 73 : }
642 73 : }
643 :
644 624 : pub(crate) async fn re_attach(
645 624 : &self,
646 624 : reattach_req: ReAttachRequest,
647 624 : ) -> anyhow::Result<ReAttachResponse> {
648 : // Ordering: we must persist generation number updates before making them visible in the in-memory state
649 624 : let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;
650 :
651 : // Apply the updated generation to our in-memory state
652 624 : let mut locked = self.inner.write().unwrap();
653 624 :
654 624 : let mut response = ReAttachResponse {
655 624 : tenants: Vec::new(),
656 624 : };
657 :
658 863 : for (tenant_shard_id, new_gen) in incremented_generations {
659 239 : response.tenants.push(ReAttachResponseTenant {
660 239 : id: tenant_shard_id,
661 239 : gen: new_gen.into().unwrap(),
662 239 : });
663 239 :
664 239 : // Apply the new generation number to our in-memory state
665 239 : let shard_state = locked.tenants.get_mut(&tenant_shard_id);
666 239 : let Some(shard_state) = shard_state else {
667 : // Not fatal. This edge case requires a re-attach to happen
668 : // between inserting a new tenant shard in to the database, and updating our in-memory
669 : // state to know about the shard, _and_ that the state inserted to the database referenced
670 : // a pageserver. Should never happen, but handle it rather than panicking, since it should
671 : // be harmless.
672 0 : tracing::error!(
673 0 : "Shard {} is in database for node {} but not in-memory state",
674 0 : tenant_shard_id,
675 0 : reattach_req.node_id
676 0 : );
677 0 : continue;
678 : };
679 :
680 239 : shard_state.generation = std::cmp::max(shard_state.generation, new_gen);
681 :
682 : // TODO: cancel/restart any running reconciliation for this tenant, it might be trying
683 : // to call location_conf API with an old generation. Wait for cancellation to complete
684 : // before responding to this request. Requires well implemented CancellationToken logic
685 : // all the way to where we call location_conf. Even then, there can still be a location_conf
686 : // request in flight over the network: TODO handle that by making location_conf API refuse
687 : // to go backward in generations.
688 : }
689 624 : Ok(response)
690 624 : }
691 :
692 441 : pub(crate) fn validate(&self, validate_req: ValidateRequest) -> ValidateResponse {
693 441 : let locked = self.inner.read().unwrap();
694 441 :
695 441 : let mut response = ValidateResponse {
696 441 : tenants: Vec::new(),
697 441 : };
698 :
699 970 : for req_tenant in validate_req.tenants {
700 529 : if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) {
701 528 : let valid = tenant_state.generation == Generation::new(req_tenant.gen);
702 528 : tracing::info!(
703 528 : "handle_validate: {}(gen {}): valid={valid} (latest {:?})",
704 528 : req_tenant.id,
705 528 : req_tenant.gen,
706 528 : tenant_state.generation
707 528 : );
708 528 : response.tenants.push(ValidateResponseTenant {
709 528 : id: req_tenant.id,
710 528 : valid,
711 528 : });
712 1 : } else {
713 1 : // After tenant deletion, we may approve any validation. This avoids
714 1 : // spurious warnings on the pageserver if it has pending LSN updates
715 1 : // at the point a deletion happens.
716 1 : response.tenants.push(ValidateResponseTenant {
717 1 : id: req_tenant.id,
718 1 : valid: true,
719 1 : });
720 1 : }
721 : }
722 441 : response
723 441 : }
724 :
725 464 : pub(crate) async fn tenant_create(
726 464 : &self,
727 464 : create_req: TenantCreateRequest,
728 464 : ) -> Result<TenantCreateResponse, ApiError> {
729 : // Shard count 0 is valid: it means create a single shard (ShardCount(0) means "unsharded")
730 464 : let literal_shard_count = if create_req.shard_parameters.is_unsharded() {
731 446 : 1
732 : } else {
733 18 : create_req.shard_parameters.count.0
734 : };
735 :
736 : // This service expects to handle sharding itself: it is an error to try and directly create
737 : // a particular shard here.
738 464 : let tenant_id = if create_req.new_tenant_id.shard_count > ShardCount(1) {
739 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
740 0 : "Attempted to create a specific shard, this API is for creating the whole tenant"
741 0 : )));
742 : } else {
743 464 : create_req.new_tenant_id.tenant_id
744 : };
745 :
746 464 : tracing::info!(
747 464 : "Creating tenant {}, shard_count={:?}",
748 464 : create_req.new_tenant_id,
749 464 : create_req.shard_parameters.count,
750 464 : );
751 :
752 464 : let create_ids = (0..literal_shard_count)
753 492 : .map(|i| TenantShardId {
754 492 : tenant_id,
755 492 : shard_number: ShardNumber(i),
756 492 : shard_count: create_req.shard_parameters.count,
757 492 : })
758 464 : .collect::<Vec<_>>();
759 464 :
760 464 : // TODO: enable specifying this. Using Single as a default helps legacy tests to work (they
761 464 : // have no expectation of HA).
762 464 : let placement_policy: PlacementPolicy = PlacementPolicy::Single;
763 464 :
764 464 : // Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller
765 464 : // to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
766 464 : // during the creation, rather than risking leaving orphan objects in S3.
767 464 : let persist_tenant_shards = create_ids
768 464 : .iter()
769 492 : .map(|tenant_shard_id| TenantShardPersistence {
770 492 : tenant_id: tenant_shard_id.tenant_id.to_string(),
771 492 : shard_number: tenant_shard_id.shard_number.0 as i32,
772 492 : shard_count: tenant_shard_id.shard_count.0 as i32,
773 492 : shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32,
774 492 : generation: create_req.generation.map(|g| g as i32).unwrap_or(0),
775 492 : generation_pageserver: i64::MAX,
776 492 : placement_policy: serde_json::to_string(&placement_policy).unwrap(),
777 492 : config: serde_json::to_string(&create_req.config).unwrap(),
778 492 : splitting: SplitState::default(),
779 492 : })
780 464 : .collect();
781 464 : self.persistence
782 464 : .insert_tenant_shards(persist_tenant_shards)
783 464 : .await
784 464 : .map_err(|e| {
785 0 : // TODO: distinguish primary key constraint (idempotent, OK), from other errors
786 0 : ApiError::InternalServerError(anyhow::anyhow!(e))
787 464 : })?;
788 :
789 464 : let (waiters, response_shards) = {
790 464 : let mut locked = self.inner.write().unwrap();
791 464 :
792 464 : let mut response_shards = Vec::new();
793 464 :
794 464 : let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
795 :
796 956 : for tenant_shard_id in create_ids {
797 492 : tracing::info!("Creating shard {tenant_shard_id}...");
798 :
799 : use std::collections::btree_map::Entry;
800 492 : match locked.tenants.entry(tenant_shard_id) {
801 0 : Entry::Occupied(mut entry) => {
802 0 : tracing::info!(
803 0 : "Tenant shard {tenant_shard_id} already exists while creating"
804 0 : );
805 :
806 : // TODO: schedule() should take an anti-affinity expression that pushes
807 : // attached and secondary locations (independently) away frorm those
808 : // pageservers also holding a shard for this tenant.
809 :
810 0 : entry.get_mut().schedule(&mut scheduler).map_err(|e| {
811 0 : ApiError::Conflict(format!(
812 0 : "Failed to schedule shard {tenant_shard_id}: {e}"
813 0 : ))
814 0 : })?;
815 :
816 0 : response_shards.push(TenantCreateResponseShard {
817 0 : shard_id: tenant_shard_id,
818 0 : node_id: entry
819 0 : .get()
820 0 : .intent
821 0 : .attached
822 0 : .expect("We just set pageserver if it was None"),
823 0 : generation: entry.get().generation.into().unwrap(),
824 0 : });
825 0 :
826 0 : continue;
827 : }
828 492 : Entry::Vacant(entry) => {
829 492 : let mut state = TenantState::new(
830 492 : tenant_shard_id,
831 492 : ShardIdentity::from_params(
832 492 : tenant_shard_id.shard_number,
833 492 : &create_req.shard_parameters,
834 492 : ),
835 492 : placement_policy.clone(),
836 492 : );
837 :
838 492 : if let Some(create_gen) = create_req.generation {
839 1 : state.generation = Generation::new(create_gen);
840 491 : }
841 492 : state.config = create_req.config.clone();
842 492 :
843 492 : state.schedule(&mut scheduler).map_err(|e| {
844 0 : ApiError::Conflict(format!(
845 0 : "Failed to schedule shard {tenant_shard_id}: {e}"
846 0 : ))
847 492 : })?;
848 :
849 492 : response_shards.push(TenantCreateResponseShard {
850 492 : shard_id: tenant_shard_id,
851 492 : node_id: state
852 492 : .intent
853 492 : .attached
854 492 : .expect("We just set pageserver if it was None"),
855 492 : generation: state.generation.into().unwrap(),
856 492 : });
857 492 : entry.insert(state)
858 : }
859 : };
860 : }
861 :
862 : // Take a snapshot of pageservers
863 464 : let pageservers = locked.nodes.clone();
864 464 :
865 464 : let result_tx = locked.result_tx.clone();
866 464 : let compute_hook = locked.compute_hook.clone();
867 464 :
868 464 : let waiters = locked
869 464 : .tenants
870 464 : .range_mut(TenantShardId::tenant_range(tenant_id))
871 492 : .filter_map(|(_shard_id, shard)| {
872 492 : shard.maybe_reconcile(
873 492 : result_tx.clone(),
874 492 : &pageservers,
875 492 : &compute_hook,
876 492 : &self.config,
877 492 : &self.persistence,
878 492 : )
879 492 : })
880 464 : .collect::<Vec<_>>();
881 464 : (waiters, response_shards)
882 464 : };
883 464 :
884 473 : self.await_waiters(waiters).await?;
885 :
886 463 : Ok(TenantCreateResponse {
887 463 : shards: response_shards,
888 463 : })
889 464 : }
890 :
891 : /// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
892 : /// wait for reconciliation to complete before responding.
893 465 : async fn await_waiters(
894 465 : &self,
895 465 : waiters: Vec<ReconcilerWaiter>,
896 465 : ) -> Result<(), ReconcileWaitError> {
897 465 : let deadline = Instant::now().checked_add(Duration::from_secs(30)).unwrap();
898 956 : for waiter in waiters {
899 492 : let timeout = deadline.duration_since(Instant::now());
900 492 : waiter.wait_timeout(timeout).await?;
901 : }
902 :
903 464 : Ok(())
904 465 : }
905 :
906 : /// This API is used by the cloud control plane to do coarse-grained control of tenants:
907 : /// - Call with mode Attached* to upsert the tenant.
908 : /// - Call with mode Detached to switch to PolicyMode::Detached
909 : ///
910 : /// In future, calling with mode Secondary may switch to a detach-lite mode in which a tenant only has
911 : /// secondary locations.
912 2 : pub(crate) async fn tenant_location_config(
913 2 : &self,
914 2 : tenant_id: TenantId,
915 2 : req: TenantLocationConfigRequest,
916 2 : ) -> Result<TenantLocationConfigResponse, ApiError> {
917 2 : if req.tenant_id.shard_count.0 > 1 {
918 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
919 0 : "This API is for importing single-sharded or unsharded tenants"
920 0 : )));
921 2 : }
922 2 :
923 2 : let mut waiters = Vec::new();
924 2 : let mut result = TenantLocationConfigResponse { shards: Vec::new() };
925 2 : let maybe_create = {
926 2 : let mut locked = self.inner.write().unwrap();
927 2 : let result_tx = locked.result_tx.clone();
928 2 : let compute_hook = locked.compute_hook.clone();
929 2 : let pageservers = locked.nodes.clone();
930 2 :
931 2 : let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
932 2 :
933 2 : // Maybe we have existing shards
934 2 : let mut create = true;
935 2 : for (shard_id, shard) in locked
936 2 : .tenants
937 2 : .range_mut(TenantShardId::tenant_range(tenant_id))
938 : {
939 : // Saw an existing shard: this is not a creation
940 1 : create = false;
941 1 :
942 1 : // Note that for existing tenants we do _not_ respect the generation in the request: this is likely
943 1 : // to be stale. Once a tenant is created in this service, our view of generation is authoritative, and
944 1 : // callers' generations may be ignored. This represents a one-way migration of tenants from the outer
945 1 : // cloud control plane into this service.
946 1 :
947 1 : // Use location config mode as an indicator of policy: if they ask for
948 1 : // attached we go to default HA attached mode. If they ask for secondary
949 1 : // we go to secondary-only mode. If they ask for detached we detach.
950 1 : match req.config.mode {
951 0 : LocationConfigMode::Detached => {
952 0 : shard.policy = PlacementPolicy::Detached;
953 0 : }
954 : LocationConfigMode::Secondary => {
955 : // TODO: implement secondary-only mode.
956 0 : todo!();
957 : }
958 : LocationConfigMode::AttachedMulti
959 : | LocationConfigMode::AttachedSingle
960 : | LocationConfigMode::AttachedStale => {
961 : // TODO: persistence for changes in policy
962 1 : if pageservers.len() > 1 {
963 0 : shard.policy = PlacementPolicy::Double(1)
964 : } else {
965 : // Convenience for dev/test: if we just have one pageserver, import
966 : // tenants into Single mode so that scheduling will succeed.
967 1 : shard.policy = PlacementPolicy::Single
968 : }
969 : }
970 : }
971 :
972 1 : shard.schedule(&mut scheduler)?;
973 :
974 1 : let maybe_waiter = shard.maybe_reconcile(
975 1 : result_tx.clone(),
976 1 : &pageservers,
977 1 : &compute_hook,
978 1 : &self.config,
979 1 : &self.persistence,
980 1 : );
981 1 : if let Some(waiter) = maybe_waiter {
982 0 : waiters.push(waiter);
983 1 : }
984 :
985 1 : if let Some(node_id) = shard.intent.attached {
986 1 : result.shards.push(TenantShardLocation {
987 1 : shard_id: *shard_id,
988 1 : node_id,
989 1 : })
990 0 : }
991 : }
992 :
993 2 : if create {
994 : // Validate request mode
995 1 : match req.config.mode {
996 : LocationConfigMode::Detached | LocationConfigMode::Secondary => {
997 : // When using this API to onboard an existing tenant to this service, it must start in
998 : // an attached state, because we need the request to come with a generation
999 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1000 0 : "Imported tenant must be in attached mode"
1001 0 : )));
1002 : }
1003 :
1004 : LocationConfigMode::AttachedMulti
1005 : | LocationConfigMode::AttachedSingle
1006 1 : | LocationConfigMode::AttachedStale => {
1007 1 : // Pass
1008 1 : }
1009 : }
1010 :
1011 : // Validate request generation
1012 1 : let Some(generation) = req.config.generation else {
1013 : // We can only import attached tenants, because we need the request to come with a generation
1014 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1015 0 : "Generation is mandatory when importing tenant"
1016 0 : )));
1017 : };
1018 :
1019 : // Synthesize a creation request
1020 1 : Some(TenantCreateRequest {
1021 1 : new_tenant_id: TenantShardId::unsharded(tenant_id),
1022 1 : generation: Some(generation),
1023 1 : shard_parameters: ShardParameters {
1024 1 : // Must preserve the incoming shard_count do distinguish unsharded (0)
1025 1 : // from single-sharded (1): this distinction appears in the S3 keys of the tenant.
1026 1 : count: req.tenant_id.shard_count,
1027 1 : // We only import un-sharded or single-sharded tenants, so stripe
1028 1 : // size can be made up arbitrarily here.
1029 1 : stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE,
1030 1 : },
1031 1 : config: req.config.tenant_conf,
1032 1 : })
1033 : } else {
1034 1 : None
1035 : }
1036 : };
1037 :
1038 : // TODO: if we timeout/fail on reconcile, we should still succeed this request,
1039 : // because otherwise a broken compute hook causes a feedback loop where
1040 : // location_config returns 500 and gets retried forever.
1041 :
1042 2 : if let Some(create_req) = maybe_create {
1043 2 : let create_resp = self.tenant_create(create_req).await?;
1044 1 : result.shards = create_resp
1045 1 : .shards
1046 1 : .into_iter()
1047 1 : .map(|s| TenantShardLocation {
1048 1 : node_id: s.node_id,
1049 1 : shard_id: s.shard_id,
1050 1 : })
1051 1 : .collect();
1052 : } else {
1053 : // This was an update, wait for reconciliation
1054 1 : if let Err(e) = self.await_waiters(waiters).await {
1055 : // Do not treat a reconcile error as fatal: we have already applied any requested
1056 : // Intent changes, and the reconcile can fail for external reasons like unavailable
1057 : // compute notification API. In these cases, it is important that we do not
1058 : // cause the cloud control plane to retry forever on this API.
1059 0 : tracing::warn!(
1060 0 : "Failed to reconcile after /location_config: {e}, returning success anyway"
1061 0 : );
1062 1 : }
1063 : }
1064 :
1065 2 : Ok(result)
1066 2 : }
1067 :
1068 12 : pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
1069 : // TODO: refactor into helper
1070 12 : let targets = {
1071 12 : let locked = self.inner.read().unwrap();
1072 12 : let mut targets = Vec::new();
1073 :
1074 24 : for (tenant_shard_id, shard) in
1075 12 : locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1076 24 : {
1077 24 : let node_id = shard.intent.attached.ok_or_else(|| {
1078 0 : ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
1079 24 : })?;
1080 24 : let node = locked
1081 24 : .nodes
1082 24 : .get(&node_id)
1083 24 : .expect("Pageservers may not be deleted while referenced");
1084 24 :
1085 24 : targets.push((*tenant_shard_id, node.clone()));
1086 : }
1087 12 : targets
1088 12 : };
1089 12 :
1090 12 : // TODO: error out if the tenant is not attached anywhere.
1091 12 :
1092 12 : // Phase 1: delete on the pageservers
1093 12 : let mut any_pending = false;
1094 36 : for (tenant_shard_id, node) in targets {
1095 24 : let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
1096 : // TODO: this, like many other places, requires proper retry handling for 503, timeout: those should not
1097 : // surface immediately as an error to our caller.
1098 96 : let status = client.tenant_delete(tenant_shard_id).await.map_err(|e| {
1099 0 : ApiError::InternalServerError(anyhow::anyhow!(
1100 0 : "Error deleting shard {tenant_shard_id} on node {}: {e}",
1101 0 : node.id
1102 0 : ))
1103 24 : })?;
1104 24 : tracing::info!(
1105 24 : "Shard {tenant_shard_id} on node {}, delete returned {}",
1106 24 : node.id,
1107 24 : status
1108 24 : );
1109 24 : if status == StatusCode::ACCEPTED {
1110 12 : any_pending = true;
1111 12 : }
1112 : }
1113 :
1114 12 : if any_pending {
1115 : // Caller should call us again later. When we eventually see 404s from
1116 : // all the shards, we may proceed to delete our records of the tenant.
1117 6 : tracing::info!(
1118 6 : "Tenant {} has some shards pending deletion, returning 202",
1119 6 : tenant_id
1120 6 : );
1121 6 : return Ok(StatusCode::ACCEPTED);
1122 6 : }
1123 6 :
1124 6 : // Fall through: deletion of the tenant on pageservers is complete, we may proceed to drop
1125 6 : // our in-memory state and database state.
1126 6 :
1127 6 : // Ordering: we delete persistent state first: if we then
1128 6 : // crash, we will drop the in-memory state.
1129 6 :
1130 6 : // Drop persistent state.
1131 6 : self.persistence.delete_tenant(tenant_id).await?;
1132 :
1133 : // Drop in-memory state
1134 : {
1135 6 : let mut locked = self.inner.write().unwrap();
1136 6 : locked
1137 6 : .tenants
1138 42 : .retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
1139 6 : tracing::info!(
1140 6 : "Deleted tenant {tenant_id}, now have {} tenants",
1141 6 : locked.tenants.len()
1142 6 : );
1143 : };
1144 :
1145 : // Success is represented as 404, to imitate the existing pageserver deletion API
1146 6 : Ok(StatusCode::NOT_FOUND)
1147 12 : }
1148 :
1149 797 : pub(crate) async fn tenant_timeline_create(
1150 797 : &self,
1151 797 : tenant_id: TenantId,
1152 797 : mut create_req: TimelineCreateRequest,
1153 797 : ) -> Result<TimelineInfo, ApiError> {
1154 797 : let mut timeline_info = None;
1155 :
1156 797 : tracing::info!(
1157 797 : "Creating timeline {}/{}",
1158 797 : tenant_id,
1159 797 : create_req.new_timeline_id,
1160 797 : );
1161 :
1162 797 : self.ensure_attached_wait(tenant_id).await?;
1163 :
1164 : // TODO: refuse to do this if shard splitting is in progress
1165 : // (https://github.com/neondatabase/neon/issues/6676)
1166 797 : let targets = {
1167 797 : let locked = self.inner.read().unwrap();
1168 797 : let mut targets = Vec::new();
1169 :
1170 828 : for (tenant_shard_id, shard) in
1171 797 : locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1172 828 : {
1173 828 : let node_id = shard.intent.attached.ok_or_else(|| {
1174 0 : ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
1175 828 : })?;
1176 828 : let node = locked
1177 828 : .nodes
1178 828 : .get(&node_id)
1179 828 : .expect("Pageservers may not be deleted while referenced");
1180 828 :
1181 828 : targets.push((*tenant_shard_id, node.clone()));
1182 : }
1183 797 : targets
1184 797 : };
1185 797 :
1186 797 : if targets.is_empty() {
1187 0 : return Err(ApiError::NotFound(
1188 0 : anyhow::anyhow!("Tenant not found").into(),
1189 0 : ));
1190 797 : }
1191 :
1192 1621 : for (tenant_shard_id, node) in targets {
1193 : // TODO: issue shard timeline creates in parallel, once the 0th is done.
1194 :
1195 828 : let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
1196 :
1197 828 : tracing::info!(
1198 828 : "Creating timeline on shard {}/{}, attached to node {}",
1199 828 : tenant_shard_id,
1200 828 : create_req.new_timeline_id,
1201 828 : node.id
1202 828 : );
1203 :
1204 828 : let shard_timeline_info = client
1205 828 : .timeline_create(tenant_shard_id, &create_req)
1206 3275 : .await
1207 828 : .map_err(|e| match e {
1208 4 : mgmt_api::Error::ApiError(status, msg)
1209 4 : if status == StatusCode::INTERNAL_SERVER_ERROR
1210 4 : || status == StatusCode::NOT_ACCEPTABLE =>
1211 4 : {
1212 4 : // TODO: handle more error codes, e.g. 503 should be passed through. Make a general wrapper
1213 4 : // for pass-through API calls.
1214 4 : ApiError::InternalServerError(anyhow::anyhow!(msg))
1215 : }
1216 0 : _ => ApiError::Conflict(format!("Failed to create timeline: {e}")),
1217 828 : })?;
1218 :
1219 824 : if timeline_info.is_none() {
1220 : // If the caller specified an ancestor but no ancestor LSN, we are responsible for
1221 : // propagating the LSN chosen by the first shard to the other shards: it is important
1222 : // that all shards end up with the same ancestor_start_lsn.
1223 793 : if create_req.ancestor_timeline_id.is_some()
1224 242 : && create_req.ancestor_start_lsn.is_none()
1225 220 : {
1226 220 : create_req.ancestor_start_lsn = shard_timeline_info.ancestor_lsn;
1227 573 : }
1228 :
1229 : // We will return the TimelineInfo from the first shard
1230 793 : timeline_info = Some(shard_timeline_info);
1231 31 : }
1232 : }
1233 793 : Ok(timeline_info.expect("targets cannot be empty"))
1234 797 : }
1235 :
1236 2 : pub(crate) async fn tenant_timeline_delete(
1237 2 : &self,
1238 2 : tenant_id: TenantId,
1239 2 : timeline_id: TimelineId,
1240 2 : ) -> Result<StatusCode, ApiError> {
1241 2 : tracing::info!("Deleting timeline {}/{}", tenant_id, timeline_id,);
1242 :
1243 2 : self.ensure_attached_wait(tenant_id).await?;
1244 :
1245 : // TODO: refuse to do this if shard splitting is in progress
1246 : // (https://github.com/neondatabase/neon/issues/6676)
1247 2 : let targets = {
1248 2 : let locked = self.inner.read().unwrap();
1249 2 : let mut targets = Vec::new();
1250 :
1251 4 : for (tenant_shard_id, shard) in
1252 2 : locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1253 4 : {
1254 4 : let node_id = shard.intent.attached.ok_or_else(|| {
1255 0 : ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
1256 4 : })?;
1257 4 : let node = locked
1258 4 : .nodes
1259 4 : .get(&node_id)
1260 4 : .expect("Pageservers may not be deleted while referenced");
1261 4 :
1262 4 : targets.push((*tenant_shard_id, node.clone()));
1263 : }
1264 2 : targets
1265 2 : };
1266 2 :
1267 2 : if targets.is_empty() {
1268 0 : return Err(ApiError::NotFound(
1269 0 : anyhow::anyhow!("Tenant not found").into(),
1270 0 : ));
1271 2 : }
1272 2 :
1273 2 : // TODO: call into shards concurrently
1274 2 : let mut any_pending = false;
1275 6 : for (tenant_shard_id, node) in targets {
1276 4 : let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
1277 :
1278 4 : tracing::info!(
1279 4 : "Deleting timeline on shard {}/{}, attached to node {}",
1280 4 : tenant_shard_id,
1281 4 : timeline_id,
1282 4 : node.id
1283 4 : );
1284 :
1285 4 : let status = client
1286 4 : .timeline_delete(tenant_shard_id, timeline_id)
1287 16 : .await
1288 4 : .map_err(|e| {
1289 0 : ApiError::InternalServerError(anyhow::anyhow!(
1290 0 : "Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}",
1291 0 : node.id
1292 0 : ))
1293 4 : })?;
1294 :
1295 4 : if status == StatusCode::ACCEPTED {
1296 2 : any_pending = true;
1297 2 : }
1298 : }
1299 :
1300 2 : if any_pending {
1301 1 : Ok(StatusCode::ACCEPTED)
1302 : } else {
1303 1 : Ok(StatusCode::NOT_FOUND)
1304 : }
1305 2 : }
1306 :
1307 : /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
1308 : /// function looks it up and returns the url. If the tenant isn't found, returns Err(ApiError::NotFound)
1309 11 : pub(crate) fn tenant_shard0_baseurl(
1310 11 : &self,
1311 11 : tenant_id: TenantId,
1312 11 : ) -> Result<(String, TenantShardId), ApiError> {
1313 11 : let locked = self.inner.read().unwrap();
1314 11 : let Some((tenant_shard_id, shard)) = locked
1315 11 : .tenants
1316 11 : .range(TenantShardId::tenant_range(tenant_id))
1317 11 : .next()
1318 : else {
1319 6 : return Err(ApiError::NotFound(
1320 6 : anyhow::anyhow!("Tenant {tenant_id} not found").into(),
1321 6 : ));
1322 : };
1323 :
1324 : // TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
1325 : // point to somewhere we haven't attached yet.
1326 5 : let Some(node_id) = shard.intent.attached else {
1327 0 : return Err(ApiError::Conflict(
1328 0 : "Cannot call timeline API on non-attached tenant".to_string(),
1329 0 : ));
1330 : };
1331 :
1332 5 : let Some(node) = locked.nodes.get(&node_id) else {
1333 : // This should never happen
1334 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1335 0 : "Shard refers to nonexistent node"
1336 0 : )));
1337 : };
1338 :
1339 5 : Ok((node.base_url(), *tenant_shard_id))
1340 11 : }
1341 :
1342 841 : pub(crate) fn tenant_locate(
1343 841 : &self,
1344 841 : tenant_id: TenantId,
1345 841 : ) -> Result<TenantLocateResponse, ApiError> {
1346 841 : let locked = self.inner.read().unwrap();
1347 841 : tracing::info!("Locating shards for tenant {tenant_id}");
1348 :
1349 : // Take a snapshot of pageservers
1350 841 : let pageservers = locked.nodes.clone();
1351 841 :
1352 841 : let mut result = Vec::new();
1353 841 : let mut shard_params: Option<ShardParameters> = None;
1354 :
1355 1047 : for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1356 : {
1357 1047 : let node_id = shard
1358 1047 : .intent
1359 1047 : .attached
1360 1047 : .ok_or(ApiError::BadRequest(anyhow::anyhow!(
1361 1047 : "Cannot locate a tenant that is not attached"
1362 1047 : )))?;
1363 :
1364 1047 : let node = pageservers
1365 1047 : .get(&node_id)
1366 1047 : .expect("Pageservers may not be deleted while referenced");
1367 1047 :
1368 1047 : result.push(TenantLocateResponseShard {
1369 1047 : shard_id: *tenant_shard_id,
1370 1047 : node_id,
1371 1047 : listen_http_addr: node.listen_http_addr.clone(),
1372 1047 : listen_http_port: node.listen_http_port,
1373 1047 : listen_pg_addr: node.listen_pg_addr.clone(),
1374 1047 : listen_pg_port: node.listen_pg_port,
1375 1047 : });
1376 1047 :
1377 1047 : match &shard_params {
1378 841 : None => {
1379 841 : shard_params = Some(ShardParameters {
1380 841 : stripe_size: shard.shard.stripe_size,
1381 841 : count: shard.shard.count,
1382 841 : });
1383 841 : }
1384 206 : Some(params) => {
1385 206 : if params.stripe_size != shard.shard.stripe_size {
1386 : // This should never happen. We enforce at runtime because it's simpler than
1387 : // adding an extra per-tenant data structure to store the things that should be the same
1388 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1389 0 : "Inconsistent shard stripe size parameters!"
1390 0 : )));
1391 206 : }
1392 : }
1393 : }
1394 : }
1395 :
1396 841 : if result.is_empty() {
1397 0 : return Err(ApiError::NotFound(
1398 0 : anyhow::anyhow!("No shards for this tenant ID found").into(),
1399 0 : ));
1400 841 : }
1401 841 : let shard_params = shard_params.expect("result is non-empty, therefore this is set");
1402 841 : tracing::info!(
1403 841 : "Located tenant {} with params {:?} on shards {}",
1404 841 : tenant_id,
1405 841 : shard_params,
1406 841 : result
1407 841 : .iter()
1408 1047 : .map(|s| format!("{:?}", s))
1409 841 : .collect::<Vec<_>>()
1410 841 : .join(",")
1411 841 : );
1412 :
1413 841 : Ok(TenantLocateResponse {
1414 841 : shards: result,
1415 841 : shard_params,
1416 841 : })
1417 841 : }
1418 :
1419 2 : pub(crate) async fn tenant_shard_split(
1420 2 : &self,
1421 2 : tenant_id: TenantId,
1422 2 : split_req: TenantShardSplitRequest,
1423 2 : ) -> Result<TenantShardSplitResponse, ApiError> {
1424 2 : let mut policy = None;
1425 2 : let mut shard_ident = None;
1426 2 :
1427 2 : // TODO: put a cancellation token on Service for clean shutdown
1428 2 : let cancel = CancellationToken::new();
1429 :
1430 : // A parent shard which will be split
1431 : struct SplitTarget {
1432 : parent_id: TenantShardId,
1433 : node: Node,
1434 : child_ids: Vec<TenantShardId>,
1435 : }
1436 :
1437 : // Validate input, and calculate which shards we will create
1438 2 : let (old_shard_count, targets, compute_hook) = {
1439 2 : let locked = self.inner.read().unwrap();
1440 2 :
1441 2 : let pageservers = locked.nodes.clone();
1442 2 :
1443 2 : let mut targets = Vec::new();
1444 2 :
1445 2 : // In case this is a retry, count how many already-split shards we found
1446 2 : let mut children_found = Vec::new();
1447 2 : let mut old_shard_count = None;
1448 :
1449 5 : for (tenant_shard_id, shard) in
1450 2 : locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1451 : {
1452 5 : match shard.shard.count.0.cmp(&split_req.new_shard_count) {
1453 : Ordering::Equal => {
1454 : // Already split this
1455 0 : children_found.push(*tenant_shard_id);
1456 0 : continue;
1457 : }
1458 : Ordering::Greater => {
1459 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1460 0 : "Requested count {} but already have shards at count {}",
1461 0 : split_req.new_shard_count,
1462 0 : shard.shard.count.0
1463 0 : )));
1464 : }
1465 5 : Ordering::Less => {
1466 5 : // Fall through: this shard has lower count than requested,
1467 5 : // is a candidate for splitting.
1468 5 : }
1469 5 : }
1470 5 :
1471 5 : match old_shard_count {
1472 2 : None => old_shard_count = Some(shard.shard.count),
1473 3 : Some(old_shard_count) => {
1474 3 : if old_shard_count != shard.shard.count {
1475 : // We may hit this case if a caller asked for two splits to
1476 : // different sizes, before the first one is complete.
1477 : // e.g. 1->2, 2->4, where the 4 call comes while we have a mixture
1478 : // of shard_count=1 and shard_count=2 shards in the map.
1479 0 : return Err(ApiError::Conflict(
1480 0 : "Cannot split, currently mid-split".to_string(),
1481 0 : ));
1482 3 : }
1483 : }
1484 : }
1485 5 : if policy.is_none() {
1486 2 : policy = Some(shard.policy.clone());
1487 3 : }
1488 5 : if shard_ident.is_none() {
1489 2 : shard_ident = Some(shard.shard);
1490 3 : }
1491 :
1492 5 : if tenant_shard_id.shard_count == ShardCount(split_req.new_shard_count) {
1493 0 : tracing::info!(
1494 0 : "Tenant shard {} already has shard count {}",
1495 0 : tenant_shard_id,
1496 0 : split_req.new_shard_count
1497 0 : );
1498 0 : continue;
1499 5 : }
1500 :
1501 5 : let node_id =
1502 5 : shard
1503 5 : .intent
1504 5 : .attached
1505 5 : .ok_or(ApiError::BadRequest(anyhow::anyhow!(
1506 5 : "Cannot split a tenant that is not attached"
1507 5 : )))?;
1508 :
1509 5 : let node = pageservers
1510 5 : .get(&node_id)
1511 5 : .expect("Pageservers may not be deleted while referenced");
1512 5 :
1513 5 : // TODO: if any reconciliation is currently in progress for this shard, wait for it.
1514 5 :
1515 5 : targets.push(SplitTarget {
1516 5 : parent_id: *tenant_shard_id,
1517 5 : node: node.clone(),
1518 5 : child_ids: tenant_shard_id.split(ShardCount(split_req.new_shard_count)),
1519 5 : });
1520 : }
1521 :
1522 2 : if targets.is_empty() {
1523 0 : if children_found.len() == split_req.new_shard_count as usize {
1524 0 : return Ok(TenantShardSplitResponse {
1525 0 : new_shards: children_found,
1526 0 : });
1527 : } else {
1528 : // No shards found to split, and no existing children found: the
1529 : // tenant doesn't exist at all.
1530 0 : return Err(ApiError::NotFound(
1531 0 : anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
1532 0 : ));
1533 : }
1534 2 : }
1535 2 :
1536 2 : (old_shard_count, targets, locked.compute_hook.clone())
1537 2 : };
1538 2 :
1539 2 : // unwrap safety: we would have returned above if we didn't find at least one shard to split
1540 2 : let old_shard_count = old_shard_count.unwrap();
1541 2 : let shard_ident = shard_ident.unwrap();
1542 2 : let policy = policy.unwrap();
1543 2 :
1544 2 : // FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
1545 2 : // request could occur here, deleting or mutating the tenant. begin_shard_split checks that the
1546 2 : // parent shards exist as expected, but it would be neater to do the above pre-checks within the
1547 2 : // same database transaction rather than pre-check in-memory and then maybe-fail the database write.
1548 2 : // (https://github.com/neondatabase/neon/issues/6676)
1549 2 :
1550 2 : // Before creating any new child shards in memory or on the pageservers, persist them: this
1551 2 : // enables us to ensure that we will always be able to clean up if something goes wrong. This also
1552 2 : // acts as the protection against two concurrent attempts to split: one of them will get a database
1553 2 : // error trying to insert the child shards.
1554 2 : let mut child_tsps = Vec::new();
1555 7 : for target in &targets {
1556 5 : let mut this_child_tsps = Vec::new();
1557 15 : for child in &target.child_ids {
1558 10 : let mut child_shard = shard_ident;
1559 10 : child_shard.number = child.shard_number;
1560 10 : child_shard.count = child.shard_count;
1561 10 :
1562 10 : this_child_tsps.push(TenantShardPersistence {
1563 10 : tenant_id: child.tenant_id.to_string(),
1564 10 : shard_number: child.shard_number.0 as i32,
1565 10 : shard_count: child.shard_count.0 as i32,
1566 10 : shard_stripe_size: shard_ident.stripe_size.0 as i32,
1567 10 : // Note: this generation is a placeholder, [`Persistence::begin_shard_split`] will
1568 10 : // populate the correct generation as part of its transaction, to protect us
1569 10 : // against racing with changes in the state of the parent.
1570 10 : generation: 0,
1571 10 : generation_pageserver: target.node.id.0 as i64,
1572 10 : placement_policy: serde_json::to_string(&policy).unwrap(),
1573 10 : // TODO: get the config out of the map
1574 10 : config: serde_json::to_string(&TenantConfig::default()).unwrap(),
1575 10 : splitting: SplitState::Splitting,
1576 10 : });
1577 10 : }
1578 :
1579 5 : child_tsps.push((target.parent_id, this_child_tsps));
1580 : }
1581 :
1582 2 : if let Err(e) = self
1583 2 : .persistence
1584 2 : .begin_shard_split(old_shard_count, tenant_id, child_tsps)
1585 2 : .await
1586 : {
1587 0 : match e {
1588 : DatabaseError::Query(diesel::result::Error::DatabaseError(
1589 : DatabaseErrorKind::UniqueViolation,
1590 : _,
1591 : )) => {
1592 : // Inserting a child shard violated a unique constraint: we raced with another call to
1593 : // this function
1594 0 : tracing::warn!("Conflicting attempt to split {tenant_id}: {e}");
1595 0 : return Err(ApiError::Conflict("Tenant is already splitting".into()));
1596 : }
1597 0 : _ => return Err(ApiError::InternalServerError(e.into())),
1598 : }
1599 2 : }
1600 :
1601 : // FIXME: we have now committed the shard split state to the database, so any subsequent
1602 : // failure needs to roll it back. We will later wrap this function in logic to roll back
1603 : // the split if it fails.
1604 : // (https://github.com/neondatabase/neon/issues/6676)
1605 :
1606 : // TODO: issue split calls concurrently (this only matters once we're splitting
1607 : // N>1 shards into M shards -- initially we're usually splitting 1 shard into N).
1608 :
1609 7 : for target in &targets {
1610 : let SplitTarget {
1611 5 : parent_id,
1612 5 : node,
1613 5 : child_ids,
1614 5 : } = target;
1615 5 : let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
1616 5 : let response = client
1617 5 : .tenant_shard_split(
1618 5 : *parent_id,
1619 5 : TenantShardSplitRequest {
1620 5 : new_shard_count: split_req.new_shard_count,
1621 5 : },
1622 5 : )
1623 20 : .await
1624 5 : .map_err(|e| ApiError::Conflict(format!("Failed to split {}: {}", parent_id, e)))?;
1625 :
1626 5 : tracing::info!(
1627 5 : "Split {} into {}",
1628 5 : parent_id,
1629 5 : response
1630 5 : .new_shards
1631 5 : .iter()
1632 10 : .map(|s| format!("{:?}", s))
1633 5 : .collect::<Vec<_>>()
1634 5 : .join(",")
1635 5 : );
1636 :
1637 5 : if &response.new_shards != child_ids {
1638 : // This should never happen: the pageserver should agree with us on how shard splits work.
1639 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1640 0 : "Splitting shard {} resulted in unexpected IDs: {:?} (expected {:?})",
1641 0 : parent_id,
1642 0 : response.new_shards,
1643 0 : child_ids
1644 0 : )));
1645 5 : }
1646 : }
1647 :
1648 : // TODO: if the pageserver restarted concurrently with our split API call,
1649 : // the actual generation of the child shard might differ from the generation
1650 : // we expect it to have. In order for our in-database generation to end up
1651 : // correct, we should carry the child generation back in the response and apply it here
1652 : // in complete_shard_split (and apply the correct generation in memory)
1653 : // (or, we can carry generation in the request and reject the request if
1654 : // it doesn't match, but that requires more retry logic on this side)
1655 :
1656 2 : self.persistence
1657 2 : .complete_shard_split(tenant_id, old_shard_count)
1658 2 : .await?;
1659 :
1660 : // Replace all the shards we just split with their children
1661 2 : let mut response = TenantShardSplitResponse {
1662 2 : new_shards: Vec::new(),
1663 2 : };
1664 2 : let mut child_locations = Vec::new();
1665 2 : {
1666 2 : let mut locked = self.inner.write().unwrap();
1667 7 : for target in targets {
1668 : let SplitTarget {
1669 5 : parent_id,
1670 5 : node: _node,
1671 5 : child_ids,
1672 5 : } = target;
1673 5 : let (pageserver, generation, config) = {
1674 5 : let old_state = locked
1675 5 : .tenants
1676 5 : .remove(&parent_id)
1677 5 : .expect("It was present, we just split it");
1678 5 : (
1679 5 : old_state.intent.attached.unwrap(),
1680 5 : old_state.generation,
1681 5 : old_state.config.clone(),
1682 5 : )
1683 5 : };
1684 5 :
1685 5 : locked.tenants.remove(&parent_id);
1686 :
1687 15 : for child in child_ids {
1688 10 : let mut child_shard = shard_ident;
1689 10 : child_shard.number = child.shard_number;
1690 10 : child_shard.count = child.shard_count;
1691 10 :
1692 10 : let mut child_observed: HashMap<NodeId, ObservedStateLocation> = HashMap::new();
1693 10 : child_observed.insert(
1694 10 : pageserver,
1695 10 : ObservedStateLocation {
1696 10 : conf: Some(attached_location_conf(generation, &child_shard, &config)),
1697 10 : },
1698 10 : );
1699 10 :
1700 10 : let mut child_state = TenantState::new(child, child_shard, policy.clone());
1701 10 : child_state.intent = IntentState::single(Some(pageserver));
1702 10 : child_state.observed = ObservedState {
1703 10 : locations: child_observed,
1704 10 : };
1705 10 : child_state.generation = generation;
1706 10 : child_state.config = config.clone();
1707 10 :
1708 10 : child_locations.push((child, pageserver));
1709 10 :
1710 10 : locked.tenants.insert(child, child_state);
1711 10 : response.new_shards.push(child);
1712 10 : }
1713 : }
1714 : }
1715 :
1716 : // Send compute notifications for all the new shards
1717 2 : let mut failed_notifications = Vec::new();
1718 12 : for (child_id, child_ps) in child_locations {
1719 10 : if let Err(e) = compute_hook.notify(child_id, child_ps, &cancel).await {
1720 0 : tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
1721 0 : child_id, child_ps);
1722 0 : failed_notifications.push(child_id);
1723 10 : }
1724 : }
1725 :
1726 : // If we failed any compute notifications, make a note to retry later.
1727 2 : if !failed_notifications.is_empty() {
1728 0 : let mut locked = self.inner.write().unwrap();
1729 0 : for failed in failed_notifications {
1730 0 : if let Some(shard) = locked.tenants.get_mut(&failed) {
1731 0 : shard.pending_compute_notification = true;
1732 0 : }
1733 : }
1734 2 : }
1735 :
1736 2 : Ok(response)
1737 2 : }
1738 :
1739 4 : pub(crate) async fn tenant_shard_migrate(
1740 4 : &self,
1741 4 : tenant_shard_id: TenantShardId,
1742 4 : migrate_req: TenantShardMigrateRequest,
1743 4 : ) -> Result<TenantShardMigrateResponse, ApiError> {
1744 4 : let waiter = {
1745 4 : let mut locked = self.inner.write().unwrap();
1746 4 :
1747 4 : let result_tx = locked.result_tx.clone();
1748 4 : let pageservers = locked.nodes.clone();
1749 4 : let compute_hook = locked.compute_hook.clone();
1750 :
1751 4 : let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
1752 0 : return Err(ApiError::NotFound(
1753 0 : anyhow::anyhow!("Tenant shard not found").into(),
1754 0 : ));
1755 : };
1756 :
1757 4 : if shard.intent.attached == Some(migrate_req.node_id) {
1758 : // No-op case: we will still proceed to wait for reconciliation in case it is
1759 : // incomplete from an earlier update to the intent.
1760 0 : tracing::info!("Migrating: intent is unchanged {:?}", shard.intent);
1761 : } else {
1762 4 : let old_attached = shard.intent.attached;
1763 4 :
1764 4 : match shard.policy {
1765 4 : PlacementPolicy::Single => {
1766 4 : shard.intent.secondary.clear();
1767 4 : }
1768 0 : PlacementPolicy::Double(_n) => {
1769 0 : // If our new attached node was a secondary, it no longer should be.
1770 0 : shard.intent.secondary.retain(|s| s != &migrate_req.node_id);
1771 :
1772 : // If we were already attached to something, demote that to a secondary
1773 0 : if let Some(old_attached) = old_attached {
1774 0 : shard.intent.secondary.push(old_attached);
1775 0 : }
1776 : }
1777 : PlacementPolicy::Detached => {
1778 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1779 0 : "Cannot migrate a tenant that is PlacementPolicy::Detached: configure it to an attached policy first"
1780 0 : )))
1781 : }
1782 : }
1783 4 : shard.intent.attached = Some(migrate_req.node_id);
1784 :
1785 4 : tracing::info!("Migrating: new intent {:?}", shard.intent);
1786 4 : shard.sequence = shard.sequence.next();
1787 : }
1788 :
1789 4 : shard.maybe_reconcile(
1790 4 : result_tx,
1791 4 : &pageservers,
1792 4 : &compute_hook,
1793 4 : &self.config,
1794 4 : &self.persistence,
1795 4 : )
1796 : };
1797 :
1798 4 : if let Some(waiter) = waiter {
1799 4 : waiter.wait_timeout(RECONCILE_TIMEOUT).await?;
1800 : } else {
1801 0 : tracing::warn!("Migration is a no-op");
1802 : }
1803 :
1804 4 : Ok(TenantShardMigrateResponse {})
1805 4 : }
1806 :
1807 : /// This is for debug/support only: we simply drop all state for a tenant, without
1808 : /// detaching or deleting it on pageservers.
1809 1 : pub(crate) async fn tenant_drop(&self, tenant_id: TenantId) -> Result<(), ApiError> {
1810 1 : self.persistence.delete_tenant(tenant_id).await?;
1811 :
1812 1 : let mut locked = self.inner.write().unwrap();
1813 1 : let mut shards = Vec::new();
1814 2 : for (tenant_shard_id, _) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
1815 2 : shards.push(*tenant_shard_id);
1816 2 : }
1817 :
1818 3 : for shard in shards {
1819 2 : locked.tenants.remove(&shard);
1820 2 : }
1821 :
1822 1 : Ok(())
1823 1 : }
1824 :
1825 : /// This is for debug/support only: we simply drop all state for a tenant, without
1826 : /// detaching or deleting it on pageservers. We do not try and re-schedule any
1827 : /// tenants that were on this node.
1828 : ///
1829 : /// TODO: proper node deletion API that unhooks things more gracefully
1830 1 : pub(crate) async fn node_drop(&self, node_id: NodeId) -> Result<(), ApiError> {
1831 1 : self.persistence.delete_node(node_id).await?;
1832 :
1833 1 : let mut locked = self.inner.write().unwrap();
1834 :
1835 3 : for shard in locked.tenants.values_mut() {
1836 3 : shard.deref_node(node_id);
1837 3 : }
1838 :
1839 1 : let mut nodes = (*locked.nodes).clone();
1840 1 : nodes.remove(&node_id);
1841 1 : locked.nodes = Arc::new(nodes);
1842 1 :
1843 1 : Ok(())
1844 1 : }
1845 :
1846 5 : pub(crate) async fn node_list(&self) -> Result<Vec<NodePersistence>, ApiError> {
1847 : // It is convenient to avoid taking the big lock and converting Node to a serializable
1848 : // structure, by fetching from storage instead of reading in-memory state.
1849 5 : let nodes = self
1850 5 : .persistence
1851 5 : .list_nodes()
1852 5 : .await?
1853 5 : .into_iter()
1854 10 : .map(|n| n.to_persistent())
1855 5 : .collect();
1856 5 :
1857 5 : Ok(nodes)
1858 5 : }
1859 :
1860 624 : pub(crate) async fn node_register(
1861 624 : &self,
1862 624 : register_req: NodeRegisterRequest,
1863 624 : ) -> Result<(), ApiError> {
1864 624 : // Pre-check for an already-existing node
1865 624 : {
1866 624 : let locked = self.inner.read().unwrap();
1867 624 : if let Some(node) = locked.nodes.get(®ister_req.node_id) {
1868 : // Note that we do not do a total equality of the struct, because we don't require
1869 : // the availability/scheduling states to agree for a POST to be idempotent.
1870 224 : if node.listen_http_addr == register_req.listen_http_addr
1871 224 : && node.listen_http_port == register_req.listen_http_port
1872 224 : && node.listen_pg_addr == register_req.listen_pg_addr
1873 224 : && node.listen_pg_port == register_req.listen_pg_port
1874 : {
1875 224 : tracing::info!(
1876 224 : "Node {} re-registered with matching address",
1877 224 : register_req.node_id
1878 224 : );
1879 224 : return Ok(());
1880 : } else {
1881 : // TODO: decide if we want to allow modifying node addresses without removing and re-adding
1882 : // the node. Safest/simplest thing is to refuse it, and usually we deploy with
1883 : // a fixed address through the lifetime of a node.
1884 0 : tracing::warn!(
1885 0 : "Node {} tried to register with different address",
1886 0 : register_req.node_id
1887 0 : );
1888 0 : return Err(ApiError::Conflict(
1889 0 : "Node is already registered with different address".to_string(),
1890 0 : ));
1891 : }
1892 400 : }
1893 400 : }
1894 400 :
1895 400 : // Ordering: we must persist the new node _before_ adding it to in-memory state.
1896 400 : // This ensures that before we use it for anything or expose it via any external
1897 400 : // API, it is guaranteed to be available after a restart.
1898 400 : let new_node = Node {
1899 400 : id: register_req.node_id,
1900 400 : listen_http_addr: register_req.listen_http_addr,
1901 400 : listen_http_port: register_req.listen_http_port,
1902 400 : listen_pg_addr: register_req.listen_pg_addr,
1903 400 : listen_pg_port: register_req.listen_pg_port,
1904 400 : scheduling: NodeSchedulingPolicy::Filling,
1905 400 : // TODO: we shouldn't really call this Active until we've heartbeated it.
1906 400 : availability: NodeAvailability::Active,
1907 400 : };
1908 400 : // TODO: idempotency if the node already exists in the database
1909 400 : self.persistence.insert_node(&new_node).await?;
1910 :
1911 400 : let mut locked = self.inner.write().unwrap();
1912 400 : let mut new_nodes = (*locked.nodes).clone();
1913 400 :
1914 400 : new_nodes.insert(register_req.node_id, new_node);
1915 400 :
1916 400 : locked.nodes = Arc::new(new_nodes);
1917 :
1918 400 : tracing::info!(
1919 400 : "Registered pageserver {}, now have {} pageservers",
1920 400 : register_req.node_id,
1921 400 : locked.nodes.len()
1922 400 : );
1923 400 : Ok(())
1924 624 : }
1925 :
1926 4 : pub(crate) fn node_configure(&self, config_req: NodeConfigureRequest) -> Result<(), ApiError> {
1927 4 : let mut locked = self.inner.write().unwrap();
1928 4 : let result_tx = locked.result_tx.clone();
1929 4 : let compute_hook = locked.compute_hook.clone();
1930 4 :
1931 4 : let mut new_nodes = (*locked.nodes).clone();
1932 :
1933 4 : let Some(node) = new_nodes.get_mut(&config_req.node_id) else {
1934 0 : return Err(ApiError::NotFound(
1935 0 : anyhow::anyhow!("Node not registered").into(),
1936 0 : ));
1937 : };
1938 :
1939 4 : let mut offline_transition = false;
1940 4 : let mut active_transition = false;
1941 :
1942 4 : if let Some(availability) = &config_req.availability {
1943 3 : match (availability, &node.availability) {
1944 : (NodeAvailability::Offline, NodeAvailability::Active) => {
1945 2 : tracing::info!("Node {} transition to offline", config_req.node_id);
1946 2 : offline_transition = true;
1947 : }
1948 : (NodeAvailability::Active, NodeAvailability::Offline) => {
1949 1 : tracing::info!("Node {} transition to active", config_req.node_id);
1950 1 : active_transition = true;
1951 : }
1952 : _ => {
1953 0 : tracing::info!("Node {} no change during config", config_req.node_id);
1954 : // No change
1955 : }
1956 : };
1957 3 : node.availability = *availability;
1958 1 : }
1959 :
1960 4 : if let Some(scheduling) = config_req.scheduling {
1961 1 : node.scheduling = scheduling;
1962 1 :
1963 1 : // TODO: once we have a background scheduling ticker for fill/drain, kick it
1964 1 : // to wake up and start working.
1965 3 : }
1966 :
1967 4 : let new_nodes = Arc::new(new_nodes);
1968 4 :
1969 4 : let mut scheduler = Scheduler::new(&locked.tenants, &new_nodes);
1970 4 : if offline_transition {
1971 13 : for (tenant_shard_id, tenant_state) in &mut locked.tenants {
1972 5 : if let Some(observed_loc) =
1973 13 : tenant_state.observed.locations.get_mut(&config_req.node_id)
1974 5 : {
1975 5 : // When a node goes offline, we set its observed configuration to None, indicating unknown: we will
1976 5 : // not assume our knowledge of the node's configuration is accurate until it comes back online
1977 5 : observed_loc.conf = None;
1978 8 : }
1979 :
1980 13 : if tenant_state.intent.notify_offline(config_req.node_id) {
1981 5 : tenant_state.sequence = tenant_state.sequence.next();
1982 5 : match tenant_state.schedule(&mut scheduler) {
1983 0 : Err(e) => {
1984 0 : // It is possible that some tenants will become unschedulable when too many pageservers
1985 0 : // go offline: in this case there isn't much we can do other than make the issue observable.
1986 0 : // TODO: give TenantState a scheduling error attribute to be queried later.
1987 0 : tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id);
1988 : }
1989 5 : Ok(()) => {
1990 5 : tenant_state.maybe_reconcile(
1991 5 : result_tx.clone(),
1992 5 : &new_nodes,
1993 5 : &compute_hook,
1994 5 : &self.config,
1995 5 : &self.persistence,
1996 5 : );
1997 5 : }
1998 : }
1999 8 : }
2000 : }
2001 2 : }
2002 :
2003 4 : if active_transition {
2004 : // When a node comes back online, we must reconcile any tenant that has a None observed
2005 : // location on the node.
2006 12 : for tenant_state in locked.tenants.values_mut() {
2007 4 : if let Some(observed_loc) =
2008 12 : tenant_state.observed.locations.get_mut(&config_req.node_id)
2009 : {
2010 4 : if observed_loc.conf.is_none() {
2011 4 : tenant_state.maybe_reconcile(
2012 4 : result_tx.clone(),
2013 4 : &new_nodes,
2014 4 : &compute_hook,
2015 4 : &self.config,
2016 4 : &self.persistence,
2017 4 : );
2018 4 : }
2019 8 : }
2020 : }
2021 :
2022 : // TODO: in the background, we should balance work back onto this pageserver
2023 3 : }
2024 :
2025 4 : locked.nodes = new_nodes;
2026 4 :
2027 4 : Ok(())
2028 4 : }
2029 :
2030 : /// Helper for methods that will try and call pageserver APIs for
2031 : /// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
2032 : /// is attached somewhere.
2033 799 : fn ensure_attached_schedule(
2034 799 : &self,
2035 799 : mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,
2036 799 : tenant_id: TenantId,
2037 799 : ) -> Result<Vec<ReconcilerWaiter>, anyhow::Error> {
2038 799 : let mut waiters = Vec::new();
2039 799 : let result_tx = locked.result_tx.clone();
2040 799 : let compute_hook = locked.compute_hook.clone();
2041 799 : let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
2042 799 : let pageservers = locked.nodes.clone();
2043 :
2044 832 : for (_tenant_shard_id, shard) in locked
2045 799 : .tenants
2046 799 : .range_mut(TenantShardId::tenant_range(tenant_id))
2047 : {
2048 832 : shard.schedule(&mut scheduler)?;
2049 :
2050 832 : if let Some(waiter) = shard.maybe_reconcile(
2051 832 : result_tx.clone(),
2052 832 : &pageservers,
2053 832 : &compute_hook,
2054 832 : &self.config,
2055 832 : &self.persistence,
2056 832 : ) {
2057 0 : waiters.push(waiter);
2058 832 : }
2059 : }
2060 799 : Ok(waiters)
2061 799 : }
2062 :
2063 799 : async fn ensure_attached_wait(&self, tenant_id: TenantId) -> Result<(), ApiError> {
2064 799 : let ensure_waiters = {
2065 799 : let locked = self.inner.write().unwrap();
2066 799 :
2067 799 : self.ensure_attached_schedule(locked, tenant_id)
2068 799 : .map_err(ApiError::InternalServerError)?
2069 : };
2070 :
2071 799 : let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap();
2072 799 : for waiter in ensure_waiters {
2073 0 : let timeout = deadline.duration_since(Instant::now());
2074 0 : waiter.wait_timeout(timeout).await?;
2075 : }
2076 :
2077 799 : Ok(())
2078 799 : }
2079 :
2080 : /// Check all tenants for pending reconciliation work, and reconcile those in need
2081 : ///
2082 : /// Returns how many reconciliation tasks were started
2083 363 : fn reconcile_all(&self) -> usize {
2084 363 : let mut locked = self.inner.write().unwrap();
2085 363 : let result_tx = locked.result_tx.clone();
2086 363 : let compute_hook = locked.compute_hook.clone();
2087 363 : let pageservers = locked.nodes.clone();
2088 363 : locked
2089 363 : .tenants
2090 363 : .iter_mut()
2091 363 : .filter_map(|(_tenant_shard_id, shard)| {
2092 8 : shard.maybe_reconcile(
2093 8 : result_tx.clone(),
2094 8 : &pageservers,
2095 8 : &compute_hook,
2096 8 : &self.config,
2097 8 : &self.persistence,
2098 8 : )
2099 363 : })
2100 363 : .count()
2101 363 : }
2102 : }
|