Line data Source code
1 : use std::{
2 : collections::{BTreeMap, HashMap},
3 : str::FromStr,
4 : sync::Arc,
5 : time::{Duration, Instant},
6 : };
7 :
8 : use control_plane::attachment_service::{
9 : AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse, NodeAvailability,
10 : NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, TenantCreateResponse,
11 : TenantCreateResponseShard, TenantLocateResponse, TenantLocateResponseShard,
12 : TenantShardMigrateRequest, TenantShardMigrateResponse,
13 : };
14 : use diesel::result::DatabaseErrorKind;
15 : use futures::StreamExt;
16 : use hyper::StatusCode;
17 : use pageserver_api::{
18 : control_api::{
19 : ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
20 : ValidateResponse, ValidateResponseTenant,
21 : },
22 : models,
23 : models::{
24 : LocationConfig, LocationConfigMode, ShardParameters, TenantConfig, TenantCreateRequest,
25 : TenantLocationConfigRequest, TenantLocationConfigResponse, TenantShardLocation,
26 : TimelineCreateRequest, TimelineInfo,
27 : },
28 : shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
29 : };
30 : use pageserver_client::mgmt_api;
31 : use tokio_util::sync::CancellationToken;
32 : use utils::{
33 : completion::Barrier,
34 : generation::Generation,
35 : http::error::ApiError,
36 : id::{NodeId, TenantId, TimelineId},
37 : seqwait::SeqWait,
38 : };
39 :
40 : use crate::{
41 : compute_hook::{self, ComputeHook},
42 : node::Node,
43 : persistence::{DatabaseError, NodePersistence, Persistence, TenantShardPersistence},
44 : scheduler::Scheduler,
45 : tenant_state::{
46 : IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
47 : ReconcilerWaiter, TenantState,
48 : },
49 : PlacementPolicy, Sequence,
50 : };
51 :
52 : const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
53 :
54 : /// How long [`Service::startup_reconcile`] is allowed to take before it should give
55 : /// up on unresponsive pageservers and proceed.
56 : pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
57 :
58 : // Top level state available to all HTTP handlers
59 : struct ServiceState {
60 : tenants: BTreeMap<TenantShardId, TenantState>,
61 :
62 : nodes: Arc<HashMap<NodeId, Node>>,
63 :
64 : compute_hook: Arc<ComputeHook>,
65 :
66 : result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
67 : }
68 :
69 : impl ServiceState {
70 361 : fn new(
71 361 : config: Config,
72 361 : result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
73 361 : nodes: HashMap<NodeId, Node>,
74 361 : tenants: BTreeMap<TenantShardId, TenantState>,
75 361 : ) -> Self {
76 361 : Self {
77 361 : tenants,
78 361 : nodes: Arc::new(nodes),
79 361 : compute_hook: Arc::new(ComputeHook::new(config)),
80 361 : result_tx,
81 361 : }
82 361 : }
83 : }
84 :
85 854 : #[derive(Clone)]
86 : pub struct Config {
87 : // All pageservers managed by one instance of this service must have
88 : // the same public key. This JWT token will be used to authenticate
89 : // this service to the pageservers it manages.
90 : pub jwt_token: Option<String>,
91 :
92 : // This JWT token will be used to authenticate this service to the control plane.
93 : pub control_plane_jwt_token: Option<String>,
94 :
95 : /// Where the compute hook should send notifications of pageserver attachment locations
96 : /// (this URL points to the control plane in prod). If this is None, the compute hook will
97 : /// assume it is running in a test environment and try to update neon_local.
98 : pub compute_hook_url: Option<String>,
99 : }
100 :
101 : impl From<DatabaseError> for ApiError {
102 0 : fn from(err: DatabaseError) -> ApiError {
103 0 : match err {
104 0 : DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
105 : // FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
106 0 : DatabaseError::Connection(_e) => ApiError::ShuttingDown,
107 0 : DatabaseError::Logical(reason) => {
108 0 : ApiError::InternalServerError(anyhow::anyhow!(reason))
109 : }
110 : }
111 0 : }
112 : }
113 :
114 : pub struct Service {
115 : inner: Arc<std::sync::RwLock<ServiceState>>,
116 : config: Config,
117 : persistence: Arc<Persistence>,
118 :
119 : /// This waits for initial reconciliation with pageservers to complete. Until this barrier
120 : /// passes, it isn't safe to do any actions that mutate tenants.
121 : pub(crate) startup_complete: Barrier,
122 : }
123 :
124 : impl From<ReconcileWaitError> for ApiError {
125 1 : fn from(value: ReconcileWaitError) -> Self {
126 1 : match value {
127 0 : ReconcileWaitError::Shutdown => ApiError::ShuttingDown,
128 0 : e @ ReconcileWaitError::Timeout(_) => ApiError::Timeout(format!("{e}").into()),
129 1 : e @ ReconcileWaitError::Failed(..) => ApiError::InternalServerError(anyhow::anyhow!(e)),
130 : }
131 1 : }
132 : }
133 :
134 : impl Service {
135 5 : pub fn get_config(&self) -> &Config {
136 5 : &self.config
137 5 : }
138 :
139 : /// TODO: don't allow other API calls until this is done, don't start doing any background housekeeping
140 : /// until this is done.
141 361 : async fn startup_reconcile(&self) {
142 361 : // For all tenant shards, a vector of observed states on nodes (where None means
143 361 : // indeterminate, same as in [`ObservedStateLocation`])
144 361 : let mut observed = HashMap::new();
145 361 :
146 361 : let nodes = {
147 361 : let locked = self.inner.read().unwrap();
148 361 : locked.nodes.clone()
149 : };
150 :
151 : // TODO: issue these requests concurrently
152 361 : for node in nodes.values() {
153 8 : let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
154 :
155 8 : tracing::info!("Scanning shards on node {}...", node.id);
156 23 : match client.list_location_config().await {
157 3 : Err(e) => {
158 3 : tracing::warn!("Could not contact pageserver {} ({e})", node.id);
159 : // TODO: be more tolerant, apply a generous 5-10 second timeout with retries, in case
160 : // pageserver is being restarted at the same time as we are
161 : }
162 5 : Ok(listing) => {
163 5 : tracing::info!(
164 5 : "Received {} shard statuses from pageserver {}, setting it to Active",
165 5 : listing.tenant_shards.len(),
166 5 : node.id
167 5 : );
168 :
169 9 : for (tenant_shard_id, conf_opt) in listing.tenant_shards {
170 4 : observed.insert(tenant_shard_id, (node.id, conf_opt));
171 4 : }
172 : }
173 : }
174 : }
175 :
176 361 : let mut cleanup = Vec::new();
177 361 :
178 361 : let mut compute_notifications = Vec::new();
179 :
180 : // Populate intent and observed states for all tenants, based on reported state on pageservers
181 361 : let shard_count = {
182 361 : let mut locked = self.inner.write().unwrap();
183 365 : for (tenant_shard_id, (node_id, observed_loc)) in observed {
184 4 : let Some(tenant_state) = locked.tenants.get_mut(&tenant_shard_id) else {
185 0 : cleanup.push((tenant_shard_id, node_id));
186 0 : continue;
187 : };
188 :
189 4 : tenant_state
190 4 : .observed
191 4 : .locations
192 4 : .insert(node_id, ObservedStateLocation { conf: observed_loc });
193 : }
194 :
195 : // Populate each tenant's intent state
196 361 : let mut scheduler = Scheduler::new(&locked.tenants, &nodes);
197 361 : for (tenant_shard_id, tenant_state) in locked.tenants.iter_mut() {
198 9 : tenant_state.intent_from_observed();
199 9 : if let Err(e) = tenant_state.schedule(&mut scheduler) {
200 : // Non-fatal error: we are unable to properly schedule the tenant, perhaps because
201 : // not enough pageservers are available. The tenant may well still be available
202 : // to clients.
203 0 : tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}");
204 : } else {
205 : // If we're both intending and observed to be attached at a particular node, we will
206 : // emit a compute notification for this. In the case where our observed state does not
207 : // yet match our intent, we will eventually reconcile, and that will emit a compute notification.
208 9 : if let Some(attached_at) = tenant_state.stably_attached() {
209 4 : compute_notifications.push((*tenant_shard_id, attached_at));
210 5 : }
211 : }
212 : }
213 :
214 361 : locked.tenants.len()
215 : };
216 :
217 : // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
218 : // generation_pageserver in the database.
219 :
220 : // Clean up any tenants that were found on pageservers but are not known to us.
221 361 : for (tenant_shard_id, node_id) in cleanup {
222 : // A node reported a tenant_shard_id which is unknown to us: detach it.
223 0 : let node = nodes
224 0 : .get(&node_id)
225 0 : .expect("Always exists: only known nodes are scanned");
226 0 :
227 0 : let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
228 0 : match client
229 0 : .location_config(
230 0 : tenant_shard_id,
231 0 : LocationConfig {
232 0 : mode: LocationConfigMode::Detached,
233 0 : generation: None,
234 0 : secondary_conf: None,
235 0 : shard_number: tenant_shard_id.shard_number.0,
236 0 : shard_count: tenant_shard_id.shard_count.0,
237 0 : shard_stripe_size: 0,
238 0 : tenant_conf: models::TenantConfig::default(),
239 0 : },
240 0 : None,
241 0 : )
242 0 : .await
243 : {
244 : Ok(()) => {
245 0 : tracing::info!(
246 0 : "Detached unknown shard {tenant_shard_id} on pageserver {node_id}"
247 0 : );
248 : }
249 0 : Err(e) => {
250 : // Non-fatal error: leaving a tenant shard behind that we are not managing shouldn't
251 : // break anything.
252 0 : tracing::error!(
253 0 : "Failed to detach unknkown shard {tenant_shard_id} on pageserver {node_id}: {e}"
254 0 : );
255 : }
256 : }
257 : }
258 :
259 : // Emit compute hook notifications for all tenants which are already stably attached. Other tenants
260 : // will emit compute hook notifications when they reconcile.
261 : //
262 : // Ordering: we must complete these notification attempts before doing any other reconciliation for the
263 : // tenants named here, because otherwise our calls to notify() might race with more recent values
264 : // generated by reconciliation.
265 :
266 : // Compute notify is fallible. If it fails here, do not delay overall startup: set the
267 : // flag on these shards that they have a pending notification.
268 361 : let compute_hook = self.inner.read().unwrap().compute_hook.clone();
269 361 :
270 361 : // Construct an async stream of futures to invoke the compute notify function: we do this
271 361 : // in order to subsequently use .buffered() on the stream to execute with bounded parallelism.
272 361 : let stream = futures::stream::iter(compute_notifications.into_iter())
273 361 : .map(|(tenant_shard_id, node_id)| {
274 4 : let compute_hook = compute_hook.clone();
275 4 : async move {
276 4 : // TODO: give Service a cancellation token for clean shutdown
277 4 : let cancel = CancellationToken::new();
278 4 : if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
279 0 : tracing::error!(
280 0 : tenant_shard_id=%tenant_shard_id,
281 0 : node_id=%node_id,
282 0 : "Failed to notify compute on startup for shard: {e}"
283 0 : );
284 0 : Some(tenant_shard_id)
285 : } else {
286 4 : None
287 : }
288 4 : }
289 361 : })
290 361 : .buffered(compute_hook::API_CONCURRENCY);
291 361 : let notify_results = stream.collect::<Vec<_>>().await;
292 :
293 : // Update tenant state for any that failed to do their initial compute notify, so that they'll retry later.
294 : {
295 361 : let mut locked = self.inner.write().unwrap();
296 361 : for tenant_shard_id in notify_results.into_iter().flatten() {
297 0 : if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
298 0 : shard.pending_compute_notification = true;
299 0 : }
300 : }
301 : }
302 :
303 : // Finally, now that the service is up and running, launch reconcile operations for any tenants
304 : // which require it: under normal circumstances this should only include tenants that were in some
305 : // transient state before we restarted, or any tenants whose compute hooks failed above.
306 361 : let reconcile_tasks = self.reconcile_all();
307 : // We will not wait for these reconciliation tasks to run here: we're now done with startup and
308 : // normal operations may proceed.
309 :
310 361 : tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
311 361 : }
312 :
313 361 : pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
314 361 : let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel();
315 :
316 361 : tracing::info!("Loading nodes from database...");
317 361 : let nodes = persistence.list_nodes().await?;
318 361 : let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.id, n)).collect();
319 361 : tracing::info!("Loaded {} nodes from database.", nodes.len());
320 :
321 361 : tracing::info!("Loading shards from database...");
322 716 : let tenant_shard_persistence = persistence.list_tenant_shards().await?;
323 361 : tracing::info!(
324 361 : "Loaded {} shards from database.",
325 361 : tenant_shard_persistence.len()
326 361 : );
327 :
328 361 : let mut tenants = BTreeMap::new();
329 :
330 370 : for tsp in tenant_shard_persistence {
331 9 : let tenant_shard_id = TenantShardId {
332 9 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
333 9 : shard_number: ShardNumber(tsp.shard_number as u8),
334 9 : shard_count: ShardCount(tsp.shard_count as u8),
335 : };
336 9 : let shard_identity = if tsp.shard_count == 0 {
337 9 : ShardIdentity::unsharded()
338 : } else {
339 0 : ShardIdentity::new(
340 0 : ShardNumber(tsp.shard_number as u8),
341 0 : ShardCount(tsp.shard_count as u8),
342 0 : ShardStripeSize(tsp.shard_stripe_size as u32),
343 0 : )?
344 : };
345 :
346 : // We will populate intent properly later in [`Self::startup_reconcile`], initially populate
347 : // it with what we can infer: the node for which a generation was most recently issued.
348 9 : let mut intent = IntentState::new();
349 9 : if tsp.generation_pageserver != i64::MAX {
350 8 : intent.attached = Some(NodeId(tsp.generation_pageserver as u64))
351 1 : }
352 :
353 9 : let new_tenant = TenantState {
354 9 : tenant_shard_id,
355 9 : shard: shard_identity,
356 9 : sequence: Sequence::initial(),
357 9 : generation: Generation::new(tsp.generation as u32),
358 9 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
359 9 : intent,
360 9 : observed: ObservedState::new(),
361 9 : config: serde_json::from_str(&tsp.config).unwrap(),
362 9 : reconciler: None,
363 9 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
364 9 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
365 9 : last_error: Arc::default(),
366 9 : pending_compute_notification: false,
367 9 : };
368 9 :
369 9 : tenants.insert(tenant_shard_id, new_tenant);
370 : }
371 :
372 361 : let (startup_completion, startup_complete) = utils::completion::channel();
373 361 :
374 361 : let this = Arc::new(Self {
375 361 : inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
376 361 : config.clone(),
377 361 : result_tx,
378 361 : nodes,
379 361 : tenants,
380 361 : ))),
381 361 : config,
382 361 : persistence,
383 361 : startup_complete,
384 361 : });
385 361 :
386 361 : let result_task_this = this.clone();
387 361 : tokio::task::spawn(async move {
388 852 : while let Some(result) = result_rx.recv().await {
389 491 : tracing::info!(
390 491 : "Reconcile result for sequence {}, ok={}",
391 491 : result.sequence,
392 491 : result.result.is_ok()
393 491 : );
394 491 : let mut locked = result_task_this.inner.write().unwrap();
395 491 : let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
396 : // A reconciliation result might race with removing a tenant: drop results for
397 : // tenants that aren't in our map.
398 0 : continue;
399 : };
400 :
401 : // Usually generation should only be updated via this path, so the max() isn't
402 : // needed, but it is used to handle out-of-band updates via. e.g. test hook.
403 491 : tenant.generation = std::cmp::max(tenant.generation, result.generation);
404 491 :
405 491 : // If the reconciler signals that it failed to notify compute, set this state on
406 491 : // the shard so that a future [`TenantState::maybe_reconcile`] will try again.
407 491 : tenant.pending_compute_notification = result.pending_compute_notification;
408 491 :
409 491 : match result.result {
410 : Ok(()) => {
411 981 : for (node_id, loc) in &result.observed.locations {
412 493 : if let Some(conf) = &loc.conf {
413 493 : tracing::info!(
414 493 : "Updating observed location {}: {:?}",
415 493 : node_id,
416 493 : conf
417 493 : );
418 : } else {
419 0 : tracing::info!("Setting observed location {} to None", node_id,)
420 : }
421 : }
422 488 : tenant.observed = result.observed;
423 488 : tenant.waiter.advance(result.sequence);
424 : }
425 3 : Err(e) => {
426 3 : tracing::warn!(
427 3 : "Reconcile error on tenant {}: {}",
428 3 : tenant.tenant_shard_id,
429 3 : e
430 3 : );
431 :
432 : // Ordering: populate last_error before advancing error_seq,
433 : // so that waiters will see the correct error after waiting.
434 3 : *(tenant.last_error.lock().unwrap()) = format!("{e}");
435 3 : tenant.error_waiter.advance(result.sequence);
436 :
437 6 : for (node_id, o) in result.observed.locations {
438 3 : tenant.observed.locations.insert(node_id, o);
439 3 : }
440 : }
441 : }
442 : }
443 361 : });
444 361 :
445 361 : let startup_reconcile_this = this.clone();
446 361 : tokio::task::spawn(async move {
447 361 : // Block the [`Service::startup_complete`] barrier until we're done
448 361 : let _completion = startup_completion;
449 361 :
450 361 : startup_reconcile_this.startup_reconcile().await
451 361 : });
452 361 :
453 361 : Ok(this)
454 361 : }
455 :
456 212 : pub(crate) async fn attach_hook(
457 212 : &self,
458 212 : attach_req: AttachHookRequest,
459 212 : ) -> anyhow::Result<AttachHookResponse> {
460 212 : // This is a test hook. To enable using it on tenants that were created directly with
461 212 : // the pageserver API (not via this service), we will auto-create any missing tenant
462 212 : // shards with default state.
463 212 : let insert = {
464 212 : let locked = self.inner.write().unwrap();
465 212 : !locked.tenants.contains_key(&attach_req.tenant_shard_id)
466 212 : };
467 212 : if insert {
468 13 : let tsp = TenantShardPersistence {
469 13 : tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
470 13 : shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
471 13 : shard_count: attach_req.tenant_shard_id.shard_count.0 as i32,
472 13 : shard_stripe_size: 0,
473 13 : generation: 0,
474 13 : generation_pageserver: i64::MAX,
475 13 : placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
476 13 : config: serde_json::to_string(&TenantConfig::default()).unwrap(),
477 13 : };
478 13 :
479 13 : match self.persistence.insert_tenant_shards(vec![tsp]).await {
480 0 : Err(e) => match e {
481 : DatabaseError::Query(diesel::result::Error::DatabaseError(
482 : DatabaseErrorKind::UniqueViolation,
483 : _,
484 : )) => {
485 0 : tracing::info!(
486 0 : "Raced with another request to insert tenant {}",
487 0 : attach_req.tenant_shard_id
488 0 : )
489 : }
490 0 : _ => return Err(e.into()),
491 : },
492 : Ok(()) => {
493 13 : tracing::info!("Inserted shard {} in database", attach_req.tenant_shard_id);
494 :
495 13 : let mut locked = self.inner.write().unwrap();
496 13 : locked.tenants.insert(
497 13 : attach_req.tenant_shard_id,
498 13 : TenantState::new(
499 13 : attach_req.tenant_shard_id,
500 13 : ShardIdentity::unsharded(),
501 13 : PlacementPolicy::Single,
502 13 : ),
503 13 : );
504 13 : tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
505 : }
506 : }
507 199 : }
508 :
509 212 : let new_generation = if let Some(req_node_id) = attach_req.node_id {
510 : Some(
511 205 : self.persistence
512 205 : .increment_generation(attach_req.tenant_shard_id, req_node_id)
513 216 : .await?,
514 : )
515 : } else {
516 7 : self.persistence.detach(attach_req.tenant_shard_id).await?;
517 7 : None
518 : };
519 :
520 212 : let mut locked = self.inner.write().unwrap();
521 212 : let tenant_state = locked
522 212 : .tenants
523 212 : .get_mut(&attach_req.tenant_shard_id)
524 212 : .expect("Checked for existence above");
525 :
526 212 : if let Some(new_generation) = new_generation {
527 205 : tenant_state.generation = new_generation;
528 205 : } else {
529 : // This is a detach notification. We must update placement policy to avoid re-attaching
530 : // during background scheduling/reconciliation, or during attachment service restart.
531 7 : assert!(attach_req.node_id.is_none());
532 7 : tenant_state.policy = PlacementPolicy::Detached;
533 : }
534 :
535 212 : if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
536 205 : tracing::info!(
537 205 : tenant_id = %attach_req.tenant_shard_id,
538 205 : ps_id = %attaching_pageserver,
539 205 : generation = ?tenant_state.generation,
540 205 : "issuing",
541 205 : );
542 7 : } else if let Some(ps_id) = tenant_state.intent.attached {
543 7 : tracing::info!(
544 7 : tenant_id = %attach_req.tenant_shard_id,
545 7 : %ps_id,
546 7 : generation = ?tenant_state.generation,
547 7 : "dropping",
548 7 : );
549 : } else {
550 0 : tracing::info!(
551 0 : tenant_id = %attach_req.tenant_shard_id,
552 0 : "no-op: tenant already has no pageserver");
553 : }
554 212 : tenant_state.intent.attached = attach_req.node_id;
555 :
556 212 : tracing::info!(
557 212 : "attach_hook: tenant {} set generation {:?}, pageserver {}",
558 212 : attach_req.tenant_shard_id,
559 212 : tenant_state.generation,
560 212 : // TODO: this is an odd number of 0xf's
561 212 : attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
562 212 : );
563 :
564 212 : Ok(AttachHookResponse {
565 212 : gen: attach_req
566 212 : .node_id
567 212 : .map(|_| tenant_state.generation.into().unwrap()),
568 212 : })
569 212 : }
570 :
571 72 : pub(crate) fn inspect(&self, inspect_req: InspectRequest) -> InspectResponse {
572 72 : let locked = self.inner.read().unwrap();
573 72 :
574 72 : let tenant_state = locked.tenants.get(&inspect_req.tenant_shard_id);
575 72 :
576 72 : InspectResponse {
577 72 : attachment: tenant_state.and_then(|s| {
578 72 : s.intent
579 72 : .attached
580 72 : .map(|ps| (s.generation.into().unwrap(), ps))
581 72 : }),
582 72 : }
583 72 : }
584 :
585 603 : pub(crate) async fn re_attach(
586 603 : &self,
587 603 : reattach_req: ReAttachRequest,
588 603 : ) -> anyhow::Result<ReAttachResponse> {
589 : // Ordering: we must persist generation number updates before making them visible in the in-memory state
590 603 : let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;
591 :
592 : // Apply the updated generation to our in-memory state
593 603 : let mut locked = self.inner.write().unwrap();
594 603 :
595 603 : let mut response = ReAttachResponse {
596 603 : tenants: Vec::new(),
597 603 : };
598 :
599 830 : for (tenant_shard_id, new_gen) in incremented_generations {
600 227 : response.tenants.push(ReAttachResponseTenant {
601 227 : id: tenant_shard_id,
602 227 : gen: new_gen.into().unwrap(),
603 227 : });
604 227 :
605 227 : // Apply the new generation number to our in-memory state
606 227 : let shard_state = locked.tenants.get_mut(&tenant_shard_id);
607 227 : let Some(shard_state) = shard_state else {
608 : // Not fatal. This edge case requires a re-attach to happen
609 : // between inserting a new tenant shard in to the database, and updating our in-memory
610 : // state to know about the shard, _and_ that the state inserted to the database referenced
611 : // a pageserver. Should never happen, but handle it rather than panicking, since it should
612 : // be harmless.
613 0 : tracing::error!(
614 0 : "Shard {} is in database for node {} but not in-memory state",
615 0 : tenant_shard_id,
616 0 : reattach_req.node_id
617 0 : );
618 0 : continue;
619 : };
620 :
621 227 : shard_state.generation = std::cmp::max(shard_state.generation, new_gen);
622 :
623 : // TODO: cancel/restart any running reconciliation for this tenant, it might be trying
624 : // to call location_conf API with an old generation. Wait for cancellation to complete
625 : // before responding to this request. Requires well implemented CancellationToken logic
626 : // all the way to where we call location_conf. Even then, there can still be a location_conf
627 : // request in flight over the network: TODO handle that by making location_conf API refuse
628 : // to go backward in generations.
629 : }
630 603 : Ok(response)
631 603 : }
632 :
633 411 : pub(crate) fn validate(&self, validate_req: ValidateRequest) -> ValidateResponse {
634 411 : let locked = self.inner.read().unwrap();
635 411 :
636 411 : let mut response = ValidateResponse {
637 411 : tenants: Vec::new(),
638 411 : };
639 :
640 909 : for req_tenant in validate_req.tenants {
641 498 : if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) {
642 497 : let valid = tenant_state.generation == Generation::new(req_tenant.gen);
643 497 : tracing::info!(
644 497 : "handle_validate: {}(gen {}): valid={valid} (latest {:?})",
645 497 : req_tenant.id,
646 497 : req_tenant.gen,
647 497 : tenant_state.generation
648 497 : );
649 497 : response.tenants.push(ValidateResponseTenant {
650 497 : id: req_tenant.id,
651 497 : valid,
652 497 : });
653 1 : } else {
654 1 : // After tenant deletion, we may approve any validation. This avoids
655 1 : // spurious warnings on the pageserver if it has pending LSN updates
656 1 : // at the point a deletion happens.
657 1 : response.tenants.push(ValidateResponseTenant {
658 1 : id: req_tenant.id,
659 1 : valid: true,
660 1 : });
661 1 : }
662 : }
663 411 : response
664 411 : }
665 :
666 460 : pub(crate) async fn tenant_create(
667 460 : &self,
668 460 : create_req: TenantCreateRequest,
669 460 : ) -> Result<TenantCreateResponse, ApiError> {
670 : // Shard count 0 is valid: it means create a single shard (ShardCount(0) means "unsharded")
671 460 : let literal_shard_count = if create_req.shard_parameters.is_unsharded() {
672 444 : 1
673 : } else {
674 16 : create_req.shard_parameters.count.0
675 : };
676 :
677 : // This service expects to handle sharding itself: it is an error to try and directly create
678 : // a particular shard here.
679 460 : let tenant_id = if create_req.new_tenant_id.shard_count > ShardCount(1) {
680 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
681 0 : "Attempted to create a specific shard, this API is for creating the whole tenant"
682 0 : )));
683 : } else {
684 460 : create_req.new_tenant_id.tenant_id
685 : };
686 :
687 460 : tracing::info!(
688 460 : "Creating tenant {}, shard_count={:?}",
689 460 : create_req.new_tenant_id,
690 460 : create_req.shard_parameters.count,
691 460 : );
692 :
693 460 : let create_ids = (0..literal_shard_count)
694 484 : .map(|i| TenantShardId {
695 484 : tenant_id,
696 484 : shard_number: ShardNumber(i),
697 484 : shard_count: create_req.shard_parameters.count,
698 484 : })
699 460 : .collect::<Vec<_>>();
700 460 :
701 460 : // TODO: enable specifying this. Using Single as a default helps legacy tests to work (they
702 460 : // have no expectation of HA).
703 460 : let placement_policy: PlacementPolicy = PlacementPolicy::Single;
704 460 :
705 460 : // Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller
706 460 : // to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
707 460 : // during the creation, rather than risking leaving orphan objects in S3.
708 460 : let persist_tenant_shards = create_ids
709 460 : .iter()
710 484 : .map(|tenant_shard_id| TenantShardPersistence {
711 484 : tenant_id: tenant_shard_id.tenant_id.to_string(),
712 484 : shard_number: tenant_shard_id.shard_number.0 as i32,
713 484 : shard_count: tenant_shard_id.shard_count.0 as i32,
714 484 : shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32,
715 484 : generation: create_req.generation.map(|g| g as i32).unwrap_or(0),
716 484 : generation_pageserver: i64::MAX,
717 484 : placement_policy: serde_json::to_string(&placement_policy).unwrap(),
718 484 : config: serde_json::to_string(&create_req.config).unwrap(),
719 484 : })
720 460 : .collect();
721 460 : self.persistence
722 460 : .insert_tenant_shards(persist_tenant_shards)
723 460 : .await
724 460 : .map_err(|e| {
725 0 : // TODO: distinguish primary key constraint (idempotent, OK), from other errors
726 0 : ApiError::InternalServerError(anyhow::anyhow!(e))
727 460 : })?;
728 :
729 460 : let (waiters, response_shards) = {
730 460 : let mut locked = self.inner.write().unwrap();
731 460 :
732 460 : let mut response_shards = Vec::new();
733 460 :
734 460 : let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
735 :
736 944 : for tenant_shard_id in create_ids {
737 484 : tracing::info!("Creating shard {tenant_shard_id}...");
738 :
739 : use std::collections::btree_map::Entry;
740 484 : match locked.tenants.entry(tenant_shard_id) {
741 0 : Entry::Occupied(mut entry) => {
742 0 : tracing::info!(
743 0 : "Tenant shard {tenant_shard_id} already exists while creating"
744 0 : );
745 :
746 : // TODO: schedule() should take an anti-affinity expression that pushes
747 : // attached and secondary locations (independently) away frorm those
748 : // pageservers also holding a shard for this tenant.
749 :
750 0 : entry.get_mut().schedule(&mut scheduler).map_err(|e| {
751 0 : ApiError::Conflict(format!(
752 0 : "Failed to schedule shard {tenant_shard_id}: {e}"
753 0 : ))
754 0 : })?;
755 :
756 0 : response_shards.push(TenantCreateResponseShard {
757 0 : shard_id: tenant_shard_id,
758 0 : node_id: entry
759 0 : .get()
760 0 : .intent
761 0 : .attached
762 0 : .expect("We just set pageserver if it was None"),
763 0 : generation: entry.get().generation.into().unwrap(),
764 0 : });
765 0 :
766 0 : continue;
767 : }
768 484 : Entry::Vacant(entry) => {
769 484 : let mut state = TenantState::new(
770 484 : tenant_shard_id,
771 484 : ShardIdentity::from_params(
772 484 : tenant_shard_id.shard_number,
773 484 : &create_req.shard_parameters,
774 484 : ),
775 484 : placement_policy.clone(),
776 484 : );
777 :
778 484 : if let Some(create_gen) = create_req.generation {
779 1 : state.generation = Generation::new(create_gen);
780 483 : }
781 484 : state.config = create_req.config.clone();
782 484 :
783 484 : state.schedule(&mut scheduler).map_err(|e| {
784 0 : ApiError::Conflict(format!(
785 0 : "Failed to schedule shard {tenant_shard_id}: {e}"
786 0 : ))
787 484 : })?;
788 :
789 484 : response_shards.push(TenantCreateResponseShard {
790 484 : shard_id: tenant_shard_id,
791 484 : node_id: state
792 484 : .intent
793 484 : .attached
794 484 : .expect("We just set pageserver if it was None"),
795 484 : generation: state.generation.into().unwrap(),
796 484 : });
797 484 : entry.insert(state)
798 : }
799 : };
800 : }
801 :
802 : // Take a snapshot of pageservers
803 460 : let pageservers = locked.nodes.clone();
804 460 :
805 460 : let result_tx = locked.result_tx.clone();
806 460 : let compute_hook = locked.compute_hook.clone();
807 460 :
808 460 : let waiters = locked
809 460 : .tenants
810 460 : .range_mut(TenantShardId::tenant_range(tenant_id))
811 484 : .filter_map(|(_shard_id, shard)| {
812 484 : shard.maybe_reconcile(
813 484 : result_tx.clone(),
814 484 : &pageservers,
815 484 : &compute_hook,
816 484 : &self.config,
817 484 : &self.persistence,
818 484 : )
819 484 : })
820 460 : .collect::<Vec<_>>();
821 460 : (waiters, response_shards)
822 460 : };
823 460 :
824 469 : self.await_waiters(waiters).await?;
825 :
826 459 : Ok(TenantCreateResponse {
827 459 : shards: response_shards,
828 459 : })
829 460 : }
830 :
831 : /// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
832 : /// wait for reconciliation to complete before responding.
833 461 : async fn await_waiters(
834 461 : &self,
835 461 : waiters: Vec<ReconcilerWaiter>,
836 461 : ) -> Result<(), ReconcileWaitError> {
837 461 : let deadline = Instant::now().checked_add(Duration::from_secs(30)).unwrap();
838 944 : for waiter in waiters {
839 484 : let timeout = deadline.duration_since(Instant::now());
840 484 : waiter.wait_timeout(timeout).await?;
841 : }
842 :
843 460 : Ok(())
844 461 : }
845 :
846 : /// This API is used by the cloud control plane to do coarse-grained control of tenants:
847 : /// - Call with mode Attached* to upsert the tenant.
848 : /// - Call with mode Detached to switch to PolicyMode::Detached
849 : ///
850 : /// In future, calling with mode Secondary may switch to a detach-lite mode in which a tenant only has
851 : /// secondary locations.
852 2 : pub(crate) async fn tenant_location_config(
853 2 : &self,
854 2 : tenant_id: TenantId,
855 2 : req: TenantLocationConfigRequest,
856 2 : ) -> Result<TenantLocationConfigResponse, ApiError> {
857 2 : if req.tenant_id.shard_count.0 > 1 {
858 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
859 0 : "This API is for importing single-sharded or unsharded tenants"
860 0 : )));
861 2 : }
862 2 :
863 2 : let mut waiters = Vec::new();
864 2 : let mut result = TenantLocationConfigResponse { shards: Vec::new() };
865 2 : let maybe_create = {
866 2 : let mut locked = self.inner.write().unwrap();
867 2 : let result_tx = locked.result_tx.clone();
868 2 : let compute_hook = locked.compute_hook.clone();
869 2 : let pageservers = locked.nodes.clone();
870 2 :
871 2 : let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
872 2 :
873 2 : // Maybe we have existing shards
874 2 : let mut create = true;
875 2 : for (shard_id, shard) in locked
876 2 : .tenants
877 2 : .range_mut(TenantShardId::tenant_range(tenant_id))
878 : {
879 : // Saw an existing shard: this is not a creation
880 1 : create = false;
881 1 :
882 1 : // Note that for existing tenants we do _not_ respect the generation in the request: this is likely
883 1 : // to be stale. Once a tenant is created in this service, our view of generation is authoritative, and
884 1 : // callers' generations may be ignored. This represents a one-way migration of tenants from the outer
885 1 : // cloud control plane into this service.
886 1 :
887 1 : // Use location config mode as an indicator of policy: if they ask for
888 1 : // attached we go to default HA attached mode. If they ask for secondary
889 1 : // we go to secondary-only mode. If they ask for detached we detach.
890 1 : match req.config.mode {
891 0 : LocationConfigMode::Detached => {
892 0 : shard.policy = PlacementPolicy::Detached;
893 0 : }
894 : LocationConfigMode::Secondary => {
895 : // TODO: implement secondary-only mode.
896 0 : todo!();
897 : }
898 : LocationConfigMode::AttachedMulti
899 : | LocationConfigMode::AttachedSingle
900 : | LocationConfigMode::AttachedStale => {
901 : // TODO: persistence for changes in policy
902 1 : if pageservers.len() > 1 {
903 0 : shard.policy = PlacementPolicy::Double(1)
904 : } else {
905 : // Convenience for dev/test: if we just have one pageserver, import
906 : // tenants into Single mode so that scheduling will succeed.
907 1 : shard.policy = PlacementPolicy::Single
908 : }
909 : }
910 : }
911 :
912 1 : shard.schedule(&mut scheduler)?;
913 :
914 1 : let maybe_waiter = shard.maybe_reconcile(
915 1 : result_tx.clone(),
916 1 : &pageservers,
917 1 : &compute_hook,
918 1 : &self.config,
919 1 : &self.persistence,
920 1 : );
921 1 : if let Some(waiter) = maybe_waiter {
922 0 : waiters.push(waiter);
923 1 : }
924 :
925 1 : if let Some(node_id) = shard.intent.attached {
926 1 : result.shards.push(TenantShardLocation {
927 1 : shard_id: *shard_id,
928 1 : node_id,
929 1 : })
930 0 : }
931 : }
932 :
933 2 : if create {
934 : // Validate request mode
935 1 : match req.config.mode {
936 : LocationConfigMode::Detached | LocationConfigMode::Secondary => {
937 : // When using this API to onboard an existing tenant to this service, it must start in
938 : // an attached state, because we need the request to come with a generation
939 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
940 0 : "Imported tenant must be in attached mode"
941 0 : )));
942 : }
943 :
944 : LocationConfigMode::AttachedMulti
945 : | LocationConfigMode::AttachedSingle
946 1 : | LocationConfigMode::AttachedStale => {
947 1 : // Pass
948 1 : }
949 : }
950 :
951 : // Validate request generation
952 1 : let Some(generation) = req.config.generation else {
953 : // We can only import attached tenants, because we need the request to come with a generation
954 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
955 0 : "Generation is mandatory when importing tenant"
956 0 : )));
957 : };
958 :
959 : // Synthesize a creation request
960 1 : Some(TenantCreateRequest {
961 1 : new_tenant_id: TenantShardId::unsharded(tenant_id),
962 1 : generation: Some(generation),
963 1 : shard_parameters: ShardParameters {
964 1 : // Must preserve the incoming shard_count do distinguish unsharded (0)
965 1 : // from single-sharded (1): this distinction appears in the S3 keys of the tenant.
966 1 : count: req.tenant_id.shard_count,
967 1 : // We only import un-sharded or single-sharded tenants, so stripe
968 1 : // size can be made up arbitrarily here.
969 1 : stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE,
970 1 : },
971 1 : config: req.config.tenant_conf,
972 1 : })
973 : } else {
974 1 : None
975 : }
976 : };
977 :
978 2 : if let Some(create_req) = maybe_create {
979 2 : let create_resp = self.tenant_create(create_req).await?;
980 1 : result.shards = create_resp
981 1 : .shards
982 1 : .into_iter()
983 1 : .map(|s| TenantShardLocation {
984 1 : node_id: s.node_id,
985 1 : shard_id: s.shard_id,
986 1 : })
987 1 : .collect();
988 : } else {
989 : // This was an update, wait for reconciliation
990 1 : self.await_waiters(waiters).await?;
991 : }
992 :
993 2 : Ok(result)
994 2 : }
995 :
996 12 : pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
997 : // TODO: refactor into helper
998 12 : let targets = {
999 12 : let locked = self.inner.read().unwrap();
1000 12 : let mut targets = Vec::new();
1001 :
1002 24 : for (tenant_shard_id, shard) in
1003 12 : locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1004 24 : {
1005 24 : let node_id = shard.intent.attached.ok_or_else(|| {
1006 0 : ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
1007 24 : })?;
1008 24 : let node = locked
1009 24 : .nodes
1010 24 : .get(&node_id)
1011 24 : .expect("Pageservers may not be deleted while referenced");
1012 24 :
1013 24 : targets.push((*tenant_shard_id, node.clone()));
1014 : }
1015 12 : targets
1016 12 : };
1017 12 :
1018 12 : // TODO: error out if the tenant is not attached anywhere.
1019 12 :
1020 12 : // Phase 1: delete on the pageservers
1021 12 : let mut any_pending = false;
1022 36 : for (tenant_shard_id, node) in targets {
1023 24 : let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
1024 : // TODO: this, like many other places, requires proper retry handling for 503, timeout: those should not
1025 : // surface immediately as an error to our caller.
1026 94 : let status = client.tenant_delete(tenant_shard_id).await.map_err(|e| {
1027 0 : ApiError::InternalServerError(anyhow::anyhow!(
1028 0 : "Error deleting shard {tenant_shard_id} on node {}: {e}",
1029 0 : node.id
1030 0 : ))
1031 24 : })?;
1032 24 : tracing::info!(
1033 24 : "Shard {tenant_shard_id} on node {}, delete returned {}",
1034 24 : node.id,
1035 24 : status
1036 24 : );
1037 24 : if status == StatusCode::ACCEPTED {
1038 12 : any_pending = true;
1039 12 : }
1040 : }
1041 :
1042 12 : if any_pending {
1043 : // Caller should call us again later. When we eventually see 404s from
1044 : // all the shards, we may proceed to delete our records of the tenant.
1045 6 : tracing::info!(
1046 6 : "Tenant {} has some shards pending deletion, returning 202",
1047 6 : tenant_id
1048 6 : );
1049 6 : return Ok(StatusCode::ACCEPTED);
1050 6 : }
1051 6 :
1052 6 : // Fall through: deletion of the tenant on pageservers is complete, we may proceed to drop
1053 6 : // our in-memory state and database state.
1054 6 :
1055 6 : // Ordering: we delete persistent state first: if we then
1056 6 : // crash, we will drop the in-memory state.
1057 6 :
1058 6 : // Drop persistent state.
1059 6 : self.persistence.delete_tenant(tenant_id).await?;
1060 :
1061 : // Drop in-memory state
1062 : {
1063 6 : let mut locked = self.inner.write().unwrap();
1064 6 : locked
1065 6 : .tenants
1066 42 : .retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
1067 6 : tracing::info!(
1068 6 : "Deleted tenant {tenant_id}, now have {} tenants",
1069 6 : locked.tenants.len()
1070 6 : );
1071 : };
1072 :
1073 : // Success is represented as 404, to imitate the existing pageserver deletion API
1074 6 : Ok(StatusCode::NOT_FOUND)
1075 12 : }
1076 :
1077 797 : pub(crate) async fn tenant_timeline_create(
1078 797 : &self,
1079 797 : tenant_id: TenantId,
1080 797 : mut create_req: TimelineCreateRequest,
1081 797 : ) -> Result<TimelineInfo, ApiError> {
1082 797 : let mut timeline_info = None;
1083 :
1084 797 : tracing::info!(
1085 797 : "Creating timeline {}/{}",
1086 797 : tenant_id,
1087 797 : create_req.new_timeline_id,
1088 797 : );
1089 :
1090 797 : self.ensure_attached_wait(tenant_id).await?;
1091 :
1092 : // TODO: refuse to do this if shard splitting is in progress
1093 797 : let targets = {
1094 797 : let locked = self.inner.read().unwrap();
1095 797 : let mut targets = Vec::new();
1096 :
1097 825 : for (tenant_shard_id, shard) in
1098 797 : locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1099 825 : {
1100 825 : let node_id = shard.intent.attached.ok_or_else(|| {
1101 0 : ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
1102 825 : })?;
1103 825 : let node = locked
1104 825 : .nodes
1105 825 : .get(&node_id)
1106 825 : .expect("Pageservers may not be deleted while referenced");
1107 825 :
1108 825 : targets.push((*tenant_shard_id, node.clone()));
1109 : }
1110 797 : targets
1111 797 : };
1112 797 :
1113 797 : if targets.is_empty() {
1114 0 : return Err(ApiError::NotFound(
1115 0 : anyhow::anyhow!("Tenant not found").into(),
1116 0 : ));
1117 797 : }
1118 :
1119 1618 : for (tenant_shard_id, node) in targets {
1120 : // TODO: issue shard timeline creates in parallel, once the 0th is done.
1121 :
1122 825 : let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
1123 :
1124 825 : tracing::info!(
1125 825 : "Creating timeline on shard {}/{}, attached to node {}",
1126 825 : tenant_shard_id,
1127 825 : create_req.new_timeline_id,
1128 825 : node.id
1129 825 : );
1130 :
1131 825 : let shard_timeline_info = client
1132 825 : .timeline_create(tenant_shard_id, &create_req)
1133 3204 : .await
1134 825 : .map_err(|e| match e {
1135 4 : mgmt_api::Error::ApiError(status, msg)
1136 4 : if status == StatusCode::INTERNAL_SERVER_ERROR
1137 4 : || status == StatusCode::NOT_ACCEPTABLE =>
1138 4 : {
1139 4 : // TODO: handle more error codes, e.g. 503 should be passed through. Make a general wrapper
1140 4 : // for pass-through API calls.
1141 4 : ApiError::InternalServerError(anyhow::anyhow!(msg))
1142 : }
1143 0 : _ => ApiError::Conflict(format!("Failed to create timeline: {e}")),
1144 825 : })?;
1145 :
1146 821 : if timeline_info.is_none() {
1147 : // If the caller specified an ancestor but no ancestor LSN, we are responsible for
1148 : // propagating the LSN chosen by the first shard to the other shards: it is important
1149 : // that all shards end up with the same ancestor_start_lsn.
1150 793 : if create_req.ancestor_timeline_id.is_some()
1151 244 : && create_req.ancestor_start_lsn.is_none()
1152 222 : {
1153 222 : create_req.ancestor_start_lsn = shard_timeline_info.ancestor_lsn;
1154 571 : }
1155 :
1156 : // We will return the TimelineInfo from the first shard
1157 793 : timeline_info = Some(shard_timeline_info);
1158 28 : }
1159 : }
1160 793 : Ok(timeline_info.expect("targets cannot be empty"))
1161 797 : }
1162 :
1163 2 : pub(crate) async fn tenant_timeline_delete(
1164 2 : &self,
1165 2 : tenant_id: TenantId,
1166 2 : timeline_id: TimelineId,
1167 2 : ) -> Result<StatusCode, ApiError> {
1168 2 : tracing::info!("Deleting timeline {}/{}", tenant_id, timeline_id,);
1169 :
1170 2 : self.ensure_attached_wait(tenant_id).await?;
1171 :
1172 : // TODO: refuse to do this if shard splitting is in progress
1173 2 : let targets = {
1174 2 : let locked = self.inner.read().unwrap();
1175 2 : let mut targets = Vec::new();
1176 :
1177 4 : for (tenant_shard_id, shard) in
1178 2 : locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1179 4 : {
1180 4 : let node_id = shard.intent.attached.ok_or_else(|| {
1181 0 : ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
1182 4 : })?;
1183 4 : let node = locked
1184 4 : .nodes
1185 4 : .get(&node_id)
1186 4 : .expect("Pageservers may not be deleted while referenced");
1187 4 :
1188 4 : targets.push((*tenant_shard_id, node.clone()));
1189 : }
1190 2 : targets
1191 2 : };
1192 2 :
1193 2 : if targets.is_empty() {
1194 0 : return Err(ApiError::NotFound(
1195 0 : anyhow::anyhow!("Tenant not found").into(),
1196 0 : ));
1197 2 : }
1198 2 :
1199 2 : // TODO: call into shards concurrently
1200 2 : let mut any_pending = false;
1201 6 : for (tenant_shard_id, node) in targets {
1202 4 : let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
1203 :
1204 4 : tracing::info!(
1205 4 : "Deleting timeline on shard {}/{}, attached to node {}",
1206 4 : tenant_shard_id,
1207 4 : timeline_id,
1208 4 : node.id
1209 4 : );
1210 :
1211 4 : let status = client
1212 4 : .timeline_delete(tenant_shard_id, timeline_id)
1213 16 : .await
1214 4 : .map_err(|e| {
1215 0 : ApiError::InternalServerError(anyhow::anyhow!(
1216 0 : "Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}",
1217 0 : node.id
1218 0 : ))
1219 4 : })?;
1220 :
1221 4 : if status == StatusCode::ACCEPTED {
1222 2 : any_pending = true;
1223 2 : }
1224 : }
1225 :
1226 2 : if any_pending {
1227 1 : Ok(StatusCode::ACCEPTED)
1228 : } else {
1229 1 : Ok(StatusCode::NOT_FOUND)
1230 : }
1231 2 : }
1232 :
1233 : /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
1234 : /// function looks it up and returns the url. If the tenant isn't found, returns Err(ApiError::NotFound)
1235 11 : pub(crate) fn tenant_shard0_baseurl(
1236 11 : &self,
1237 11 : tenant_id: TenantId,
1238 11 : ) -> Result<(String, TenantShardId), ApiError> {
1239 11 : let locked = self.inner.read().unwrap();
1240 11 : let Some((tenant_shard_id, shard)) = locked
1241 11 : .tenants
1242 11 : .range(TenantShardId::tenant_range(tenant_id))
1243 11 : .next()
1244 : else {
1245 6 : return Err(ApiError::NotFound(
1246 6 : anyhow::anyhow!("Tenant {tenant_id} not found").into(),
1247 6 : ));
1248 : };
1249 :
1250 : // TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
1251 : // point to somewhere we haven't attached yet.
1252 5 : let Some(node_id) = shard.intent.attached else {
1253 0 : return Err(ApiError::Conflict(
1254 0 : "Cannot call timeline API on non-attached tenant".to_string(),
1255 0 : ));
1256 : };
1257 :
1258 5 : let Some(node) = locked.nodes.get(&node_id) else {
1259 : // This should never happen
1260 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1261 0 : "Shard refers to nonexistent node"
1262 0 : )));
1263 : };
1264 :
1265 5 : Ok((node.base_url(), *tenant_shard_id))
1266 11 : }
1267 :
1268 823 : pub(crate) fn tenant_locate(
1269 823 : &self,
1270 823 : tenant_id: TenantId,
1271 823 : ) -> Result<TenantLocateResponse, ApiError> {
1272 823 : let locked = self.inner.read().unwrap();
1273 823 : tracing::info!("Locating shards for tenant {tenant_id}");
1274 :
1275 : // Take a snapshot of pageservers
1276 823 : let pageservers = locked.nodes.clone();
1277 823 :
1278 823 : let mut result = Vec::new();
1279 823 : let mut shard_params: Option<ShardParameters> = None;
1280 :
1281 892 : for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1282 : {
1283 892 : let node_id = shard
1284 892 : .intent
1285 892 : .attached
1286 892 : .ok_or(ApiError::BadRequest(anyhow::anyhow!(
1287 892 : "Cannot locate a tenant that is not attached"
1288 892 : )))?;
1289 :
1290 892 : let node = pageservers
1291 892 : .get(&node_id)
1292 892 : .expect("Pageservers may not be deleted while referenced");
1293 892 :
1294 892 : result.push(TenantLocateResponseShard {
1295 892 : shard_id: *tenant_shard_id,
1296 892 : node_id,
1297 892 : listen_http_addr: node.listen_http_addr.clone(),
1298 892 : listen_http_port: node.listen_http_port,
1299 892 : listen_pg_addr: node.listen_pg_addr.clone(),
1300 892 : listen_pg_port: node.listen_pg_port,
1301 892 : });
1302 892 :
1303 892 : match &shard_params {
1304 823 : None => {
1305 823 : shard_params = Some(ShardParameters {
1306 823 : stripe_size: shard.shard.stripe_size,
1307 823 : count: shard.shard.count,
1308 823 : });
1309 823 : }
1310 69 : Some(params) => {
1311 69 : if params.stripe_size != shard.shard.stripe_size {
1312 : // This should never happen. We enforce at runtime because it's simpler than
1313 : // adding an extra per-tenant data structure to store the things that should be the same
1314 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1315 0 : "Inconsistent shard stripe size parameters!"
1316 0 : )));
1317 69 : }
1318 : }
1319 : }
1320 : }
1321 :
1322 823 : if result.is_empty() {
1323 0 : return Err(ApiError::NotFound(
1324 0 : anyhow::anyhow!("No shards for this tenant ID found").into(),
1325 0 : ));
1326 823 : }
1327 823 : let shard_params = shard_params.expect("result is non-empty, therefore this is set");
1328 823 : tracing::info!(
1329 823 : "Located tenant {} with params {:?} on shards {}",
1330 823 : tenant_id,
1331 823 : shard_params,
1332 823 : result
1333 823 : .iter()
1334 892 : .map(|s| format!("{:?}", s))
1335 823 : .collect::<Vec<_>>()
1336 823 : .join(",")
1337 823 : );
1338 :
1339 823 : Ok(TenantLocateResponse {
1340 823 : shards: result,
1341 823 : shard_params,
1342 823 : })
1343 823 : }
1344 :
1345 0 : pub(crate) async fn tenant_shard_migrate(
1346 0 : &self,
1347 0 : tenant_shard_id: TenantShardId,
1348 0 : migrate_req: TenantShardMigrateRequest,
1349 0 : ) -> Result<TenantShardMigrateResponse, ApiError> {
1350 0 : let waiter = {
1351 0 : let mut locked = self.inner.write().unwrap();
1352 0 :
1353 0 : let result_tx = locked.result_tx.clone();
1354 0 : let pageservers = locked.nodes.clone();
1355 0 : let compute_hook = locked.compute_hook.clone();
1356 :
1357 0 : let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
1358 0 : return Err(ApiError::NotFound(
1359 0 : anyhow::anyhow!("Tenant shard not found").into(),
1360 0 : ));
1361 : };
1362 :
1363 0 : if shard.intent.attached == Some(migrate_req.node_id) {
1364 : // No-op case: we will still proceed to wait for reconciliation in case it is
1365 : // incomplete from an earlier update to the intent.
1366 0 : tracing::info!("Migrating: intent is unchanged {:?}", shard.intent);
1367 : } else {
1368 0 : let old_attached = shard.intent.attached;
1369 0 :
1370 0 : match shard.policy {
1371 0 : PlacementPolicy::Single => {
1372 0 : shard.intent.secondary.clear();
1373 0 : }
1374 0 : PlacementPolicy::Double(_n) => {
1375 0 : // If our new attached node was a secondary, it no longer should be.
1376 0 : shard.intent.secondary.retain(|s| s != &migrate_req.node_id);
1377 :
1378 : // If we were already attached to something, demote that to a secondary
1379 0 : if let Some(old_attached) = old_attached {
1380 0 : shard.intent.secondary.push(old_attached);
1381 0 : }
1382 : }
1383 : PlacementPolicy::Detached => {
1384 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1385 0 : "Cannot migrate a tenant that is PlacementPolicy::Detached: configure it to an attached policy first"
1386 0 : )))
1387 : }
1388 : }
1389 0 : shard.intent.attached = Some(migrate_req.node_id);
1390 :
1391 0 : tracing::info!("Migrating: new intent {:?}", shard.intent);
1392 0 : shard.sequence = shard.sequence.next();
1393 : }
1394 :
1395 0 : shard.maybe_reconcile(
1396 0 : result_tx,
1397 0 : &pageservers,
1398 0 : &compute_hook,
1399 0 : &self.config,
1400 0 : &self.persistence,
1401 0 : )
1402 : };
1403 :
1404 0 : if let Some(waiter) = waiter {
1405 0 : waiter.wait_timeout(RECONCILE_TIMEOUT).await?;
1406 : } else {
1407 0 : tracing::warn!("Migration is a no-op");
1408 : }
1409 :
1410 0 : Ok(TenantShardMigrateResponse {})
1411 0 : }
1412 :
1413 2 : pub(crate) async fn node_list(&self) -> Result<Vec<NodePersistence>, ApiError> {
1414 : // It is convenient to avoid taking the big lock and converting Node to a serializable
1415 : // structure, by fetching from storage instead of reading in-memory state.
1416 2 : let nodes = self
1417 2 : .persistence
1418 2 : .list_nodes()
1419 2 : .await?
1420 2 : .into_iter()
1421 5 : .map(|n| n.to_persistent())
1422 2 : .collect();
1423 2 :
1424 2 : Ok(nodes)
1425 2 : }
1426 :
1427 603 : pub(crate) async fn node_register(
1428 603 : &self,
1429 603 : register_req: NodeRegisterRequest,
1430 603 : ) -> Result<(), ApiError> {
1431 603 : // Pre-check for an already-existing node
1432 603 : {
1433 603 : let locked = self.inner.read().unwrap();
1434 603 : if let Some(node) = locked.nodes.get(®ister_req.node_id) {
1435 : // Note that we do not do a total equality of the struct, because we don't require
1436 : // the availability/scheduling states to agree for a POST to be idempotent.
1437 215 : if node.listen_http_addr == register_req.listen_http_addr
1438 215 : && node.listen_http_port == register_req.listen_http_port
1439 215 : && node.listen_pg_addr == register_req.listen_pg_addr
1440 215 : && node.listen_pg_port == register_req.listen_pg_port
1441 : {
1442 215 : tracing::info!(
1443 215 : "Node {} re-registered with matching address",
1444 215 : register_req.node_id
1445 215 : );
1446 215 : return Ok(());
1447 : } else {
1448 : // TODO: decide if we want to allow modifying node addresses without removing and re-adding
1449 : // the node. Safest/simplest thing is to refuse it, and usually we deploy with
1450 : // a fixed address through the lifetime of a node.
1451 0 : tracing::warn!(
1452 0 : "Node {} tried to register with different address",
1453 0 : register_req.node_id
1454 0 : );
1455 0 : return Err(ApiError::Conflict(
1456 0 : "Node is already registered with different address".to_string(),
1457 0 : ));
1458 : }
1459 388 : }
1460 388 : }
1461 388 :
1462 388 : // Ordering: we must persist the new node _before_ adding it to in-memory state.
1463 388 : // This ensures that before we use it for anything or expose it via any external
1464 388 : // API, it is guaranteed to be available after a restart.
1465 388 : let new_node = Node {
1466 388 : id: register_req.node_id,
1467 388 : listen_http_addr: register_req.listen_http_addr,
1468 388 : listen_http_port: register_req.listen_http_port,
1469 388 : listen_pg_addr: register_req.listen_pg_addr,
1470 388 : listen_pg_port: register_req.listen_pg_port,
1471 388 : scheduling: NodeSchedulingPolicy::Filling,
1472 388 : // TODO: we shouldn't really call this Active until we've heartbeated it.
1473 388 : availability: NodeAvailability::Active,
1474 388 : };
1475 388 : // TODO: idempotency if the node already exists in the database
1476 388 : self.persistence.insert_node(&new_node).await?;
1477 :
1478 388 : let mut locked = self.inner.write().unwrap();
1479 388 : let mut new_nodes = (*locked.nodes).clone();
1480 388 :
1481 388 : new_nodes.insert(register_req.node_id, new_node);
1482 388 :
1483 388 : locked.nodes = Arc::new(new_nodes);
1484 :
1485 388 : tracing::info!(
1486 388 : "Registered pageserver {}, now have {} pageservers",
1487 388 : register_req.node_id,
1488 388 : locked.nodes.len()
1489 388 : );
1490 388 : Ok(())
1491 603 : }
1492 :
1493 4 : pub(crate) fn node_configure(&self, config_req: NodeConfigureRequest) -> Result<(), ApiError> {
1494 4 : let mut locked = self.inner.write().unwrap();
1495 4 : let result_tx = locked.result_tx.clone();
1496 4 : let compute_hook = locked.compute_hook.clone();
1497 4 :
1498 4 : let mut new_nodes = (*locked.nodes).clone();
1499 :
1500 4 : let Some(node) = new_nodes.get_mut(&config_req.node_id) else {
1501 0 : return Err(ApiError::NotFound(
1502 0 : anyhow::anyhow!("Node not registered").into(),
1503 0 : ));
1504 : };
1505 :
1506 4 : let mut offline_transition = false;
1507 4 : let mut active_transition = false;
1508 :
1509 4 : if let Some(availability) = &config_req.availability {
1510 3 : match (availability, &node.availability) {
1511 : (NodeAvailability::Offline, NodeAvailability::Active) => {
1512 2 : tracing::info!("Node {} transition to offline", config_req.node_id);
1513 2 : offline_transition = true;
1514 : }
1515 : (NodeAvailability::Active, NodeAvailability::Offline) => {
1516 1 : tracing::info!("Node {} transition to active", config_req.node_id);
1517 1 : active_transition = true;
1518 : }
1519 : _ => {
1520 0 : tracing::info!("Node {} no change during config", config_req.node_id);
1521 : // No change
1522 : }
1523 : };
1524 3 : node.availability = *availability;
1525 1 : }
1526 :
1527 4 : if let Some(scheduling) = config_req.scheduling {
1528 1 : node.scheduling = scheduling;
1529 1 :
1530 1 : // TODO: once we have a background scheduling ticker for fill/drain, kick it
1531 1 : // to wake up and start working.
1532 3 : }
1533 :
1534 4 : let new_nodes = Arc::new(new_nodes);
1535 4 :
1536 4 : let mut scheduler = Scheduler::new(&locked.tenants, &new_nodes);
1537 4 : if offline_transition {
1538 13 : for (tenant_shard_id, tenant_state) in &mut locked.tenants {
1539 5 : if let Some(observed_loc) =
1540 13 : tenant_state.observed.locations.get_mut(&config_req.node_id)
1541 5 : {
1542 5 : // When a node goes offline, we set its observed configuration to None, indicating unknown: we will
1543 5 : // not assume our knowledge of the node's configuration is accurate until it comes back online
1544 5 : observed_loc.conf = None;
1545 8 : }
1546 :
1547 13 : if tenant_state.intent.notify_offline(config_req.node_id) {
1548 5 : tenant_state.sequence = tenant_state.sequence.next();
1549 5 : match tenant_state.schedule(&mut scheduler) {
1550 0 : Err(e) => {
1551 0 : // It is possible that some tenants will become unschedulable when too many pageservers
1552 0 : // go offline: in this case there isn't much we can do other than make the issue observable.
1553 0 : // TODO: give TenantState a scheduling error attribute to be queried later.
1554 0 : tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id);
1555 : }
1556 5 : Ok(()) => {
1557 5 : tenant_state.maybe_reconcile(
1558 5 : result_tx.clone(),
1559 5 : &new_nodes,
1560 5 : &compute_hook,
1561 5 : &self.config,
1562 5 : &self.persistence,
1563 5 : );
1564 5 : }
1565 : }
1566 8 : }
1567 : }
1568 2 : }
1569 :
1570 4 : if active_transition {
1571 : // When a node comes back online, we must reconcile any tenant that has a None observed
1572 : // location on the node.
1573 12 : for tenant_state in locked.tenants.values_mut() {
1574 4 : if let Some(observed_loc) =
1575 12 : tenant_state.observed.locations.get_mut(&config_req.node_id)
1576 : {
1577 4 : if observed_loc.conf.is_none() {
1578 4 : tenant_state.maybe_reconcile(
1579 4 : result_tx.clone(),
1580 4 : &new_nodes,
1581 4 : &compute_hook,
1582 4 : &self.config,
1583 4 : &self.persistence,
1584 4 : );
1585 4 : }
1586 8 : }
1587 : }
1588 :
1589 : // TODO: in the background, we should balance work back onto this pageserver
1590 3 : }
1591 :
1592 4 : locked.nodes = new_nodes;
1593 4 :
1594 4 : Ok(())
1595 4 : }
1596 :
1597 : /// Helper for methods that will try and call pageserver APIs for
1598 : /// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
1599 : /// is attached somewhere.
1600 799 : fn ensure_attached_schedule(
1601 799 : &self,
1602 799 : mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,
1603 799 : tenant_id: TenantId,
1604 799 : ) -> Result<Vec<ReconcilerWaiter>, anyhow::Error> {
1605 799 : let mut waiters = Vec::new();
1606 799 : let result_tx = locked.result_tx.clone();
1607 799 : let compute_hook = locked.compute_hook.clone();
1608 799 : let mut scheduler = Scheduler::new(&locked.tenants, &locked.nodes);
1609 799 : let pageservers = locked.nodes.clone();
1610 :
1611 829 : for (_tenant_shard_id, shard) in locked
1612 799 : .tenants
1613 799 : .range_mut(TenantShardId::tenant_range(tenant_id))
1614 : {
1615 829 : shard.schedule(&mut scheduler)?;
1616 :
1617 829 : if let Some(waiter) = shard.maybe_reconcile(
1618 829 : result_tx.clone(),
1619 829 : &pageservers,
1620 829 : &compute_hook,
1621 829 : &self.config,
1622 829 : &self.persistence,
1623 829 : ) {
1624 0 : waiters.push(waiter);
1625 829 : }
1626 : }
1627 799 : Ok(waiters)
1628 799 : }
1629 :
1630 799 : async fn ensure_attached_wait(&self, tenant_id: TenantId) -> Result<(), ApiError> {
1631 799 : let ensure_waiters = {
1632 799 : let locked = self.inner.write().unwrap();
1633 799 :
1634 799 : self.ensure_attached_schedule(locked, tenant_id)
1635 799 : .map_err(ApiError::InternalServerError)?
1636 : };
1637 :
1638 799 : let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap();
1639 799 : for waiter in ensure_waiters {
1640 0 : let timeout = deadline.duration_since(Instant::now());
1641 0 : waiter.wait_timeout(timeout).await?;
1642 : }
1643 :
1644 799 : Ok(())
1645 799 : }
1646 :
1647 : /// Check all tenants for pending reconciliation work, and reconcile those in need
1648 : ///
1649 : /// Returns how many reconciliation tasks were started
1650 361 : fn reconcile_all(&self) -> usize {
1651 361 : let mut locked = self.inner.write().unwrap();
1652 361 : let result_tx = locked.result_tx.clone();
1653 361 : let compute_hook = locked.compute_hook.clone();
1654 361 : let pageservers = locked.nodes.clone();
1655 361 : locked
1656 361 : .tenants
1657 361 : .iter_mut()
1658 361 : .filter_map(|(_tenant_shard_id, shard)| {
1659 9 : shard.maybe_reconcile(
1660 9 : result_tx.clone(),
1661 9 : &pageservers,
1662 9 : &compute_hook,
1663 9 : &self.config,
1664 9 : &self.persistence,
1665 9 : )
1666 361 : })
1667 361 : .count()
1668 361 : }
1669 : }
|