Line data Source code
1 : use std::{
2 : borrow::Cow,
3 : cmp::Ordering,
4 : collections::{BTreeMap, HashMap, HashSet},
5 : str::FromStr,
6 : sync::Arc,
7 : time::{Duration, Instant},
8 : };
9 :
10 : use anyhow::Context;
11 : use control_plane::attachment_service::{
12 : AttachHookRequest, AttachHookResponse, InspectRequest, InspectResponse,
13 : };
14 : use diesel::result::DatabaseErrorKind;
15 : use futures::{stream::FuturesUnordered, StreamExt};
16 : use hyper::StatusCode;
17 : use pageserver_api::controller_api::{
18 : NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy,
19 : TenantCreateResponse, TenantCreateResponseShard, TenantLocateResponse,
20 : TenantLocateResponseShard, TenantShardMigrateRequest, TenantShardMigrateResponse,
21 : };
22 : use pageserver_api::{
23 : models::{
24 : self, LocationConfig, LocationConfigListResponse, LocationConfigMode, ShardParameters,
25 : TenantConfig, TenantCreateRequest, TenantLocationConfigRequest,
26 : TenantLocationConfigResponse, TenantShardLocation, TenantShardSplitRequest,
27 : TenantShardSplitResponse, TenantTimeTravelRequest, TimelineCreateRequest, TimelineInfo,
28 : },
29 : shard::{ShardCount, ShardIdentity, ShardNumber, ShardStripeSize, TenantShardId},
30 : upcall_api::{
31 : ReAttachRequest, ReAttachResponse, ReAttachResponseTenant, ValidateRequest,
32 : ValidateResponse, ValidateResponseTenant,
33 : },
34 : };
35 : use pageserver_client::mgmt_api;
36 : use tokio_util::sync::CancellationToken;
37 : use tracing::instrument;
38 : use utils::{
39 : backoff,
40 : completion::Barrier,
41 : generation::Generation,
42 : http::error::ApiError,
43 : id::{NodeId, TenantId, TimelineId},
44 : seqwait::SeqWait,
45 : sync::gate::Gate,
46 : };
47 :
48 : use crate::{
49 : compute_hook::{self, ComputeHook},
50 : node::Node,
51 : persistence::{split_state::SplitState, DatabaseError, Persistence, TenantShardPersistence},
52 : reconciler::attached_location_conf,
53 : scheduler::Scheduler,
54 : tenant_state::{
55 : IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
56 : ReconcilerWaiter, TenantState,
57 : },
58 : PlacementPolicy, Sequence,
59 : };
60 :
61 : // For operations that should be quick, like attaching a new tenant
62 : const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
63 :
64 : // For operations that might be slow, like migrating a tenant with
65 : // some data in it.
66 : const RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
67 :
68 : /// How long [`Service::startup_reconcile`] is allowed to take before it should give
69 : /// up on unresponsive pageservers and proceed.
70 : pub(crate) const STARTUP_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
71 :
72 : // Top level state available to all HTTP handlers
73 : struct ServiceState {
74 : tenants: BTreeMap<TenantShardId, TenantState>,
75 :
76 : nodes: Arc<HashMap<NodeId, Node>>,
77 :
78 : scheduler: Scheduler,
79 :
80 : compute_hook: Arc<ComputeHook>,
81 :
82 : result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
83 : }
84 :
85 : impl ServiceState {
86 0 : fn new(
87 0 : config: Config,
88 0 : result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
89 0 : nodes: HashMap<NodeId, Node>,
90 0 : tenants: BTreeMap<TenantShardId, TenantState>,
91 0 : scheduler: Scheduler,
92 0 : ) -> Self {
93 0 : Self {
94 0 : tenants,
95 0 : nodes: Arc::new(nodes),
96 0 : scheduler,
97 0 : compute_hook: Arc::new(ComputeHook::new(config)),
98 0 : result_tx,
99 0 : }
100 0 : }
101 :
102 0 : fn parts_mut(
103 0 : &mut self,
104 0 : ) -> (
105 0 : &mut Arc<HashMap<NodeId, Node>>,
106 0 : &mut BTreeMap<TenantShardId, TenantState>,
107 0 : &mut Scheduler,
108 0 : ) {
109 0 : (&mut self.nodes, &mut self.tenants, &mut self.scheduler)
110 0 : }
111 : }
112 :
113 0 : #[derive(Clone)]
114 : pub struct Config {
115 : // All pageservers managed by one instance of this service must have
116 : // the same public key. This JWT token will be used to authenticate
117 : // this service to the pageservers it manages.
118 : pub jwt_token: Option<String>,
119 :
120 : // This JWT token will be used to authenticate this service to the control plane.
121 : pub control_plane_jwt_token: Option<String>,
122 :
123 : /// Where the compute hook should send notifications of pageserver attachment locations
124 : /// (this URL points to the control plane in prod). If this is None, the compute hook will
125 : /// assume it is running in a test environment and try to update neon_local.
126 : pub compute_hook_url: Option<String>,
127 : }
128 :
129 : impl From<DatabaseError> for ApiError {
130 0 : fn from(err: DatabaseError) -> ApiError {
131 0 : match err {
132 0 : DatabaseError::Query(e) => ApiError::InternalServerError(e.into()),
133 : // FIXME: ApiError doesn't have an Unavailable variant, but ShuttingDown maps to 503.
134 : DatabaseError::Connection(_) | DatabaseError::ConnectionPool(_) => {
135 0 : ApiError::ShuttingDown
136 : }
137 0 : DatabaseError::Logical(reason) => {
138 0 : ApiError::InternalServerError(anyhow::anyhow!(reason))
139 : }
140 : }
141 0 : }
142 : }
143 :
144 : pub struct Service {
145 : inner: Arc<std::sync::RwLock<ServiceState>>,
146 : config: Config,
147 : persistence: Arc<Persistence>,
148 :
149 : // Process shutdown will fire this token
150 : cancel: CancellationToken,
151 :
152 : // Background tasks will hold this gate
153 : gate: Gate,
154 :
155 : /// This waits for initial reconciliation with pageservers to complete. Until this barrier
156 : /// passes, it isn't safe to do any actions that mutate tenants.
157 : pub(crate) startup_complete: Barrier,
158 : }
159 :
160 : impl From<ReconcileWaitError> for ApiError {
161 0 : fn from(value: ReconcileWaitError) -> Self {
162 0 : match value {
163 0 : ReconcileWaitError::Shutdown => ApiError::ShuttingDown,
164 0 : e @ ReconcileWaitError::Timeout(_) => ApiError::Timeout(format!("{e}").into()),
165 0 : e @ ReconcileWaitError::Failed(..) => ApiError::InternalServerError(anyhow::anyhow!(e)),
166 : }
167 0 : }
168 : }
169 :
170 : impl Service {
171 0 : pub fn get_config(&self) -> &Config {
172 0 : &self.config
173 0 : }
174 :
175 : /// Called once on startup, this function attempts to contact all pageservers to build an up-to-date
176 : /// view of the world, and determine which pageservers are responsive.
177 0 : #[instrument(skip_all)]
178 : async fn startup_reconcile(self: &Arc<Service>) {
179 : // For all tenant shards, a vector of observed states on nodes (where None means
180 : // indeterminate, same as in [`ObservedStateLocation`])
181 : let mut observed = HashMap::new();
182 :
183 : let mut nodes_online = HashSet::new();
184 :
185 : // Startup reconciliation does I/O to other services: whether they
186 : // are responsive or not, we should aim to finish within our deadline, because:
187 : // - If we don't, a k8s readiness hook watching /ready will kill us.
188 : // - While we're waiting for startup reconciliation, we are not fully
189 : // available for end user operations like creating/deleting tenants and timelines.
190 : //
191 : // We set multiple deadlines to break up the time available between the phases of work: this is
192 : // arbitrary, but avoids a situation where the first phase could burn our entire timeout period.
193 : let start_at = Instant::now();
194 : let node_scan_deadline = start_at
195 : .checked_add(STARTUP_RECONCILE_TIMEOUT / 2)
196 : .expect("Reconcile timeout is a modest constant");
197 :
198 : let compute_notify_deadline = start_at
199 : .checked_add((STARTUP_RECONCILE_TIMEOUT / 4) * 3)
200 : .expect("Reconcile timeout is a modest constant");
201 :
202 : // Accumulate a list of any tenant locations that ought to be detached
203 : let mut cleanup = Vec::new();
204 :
205 : let node_listings = self.scan_node_locations(node_scan_deadline).await;
206 : for (node_id, list_response) in node_listings {
207 : let tenant_shards = list_response.tenant_shards;
208 0 : tracing::info!(
209 0 : "Received {} shard statuses from pageserver {}, setting it to Active",
210 0 : tenant_shards.len(),
211 0 : node_id
212 0 : );
213 : nodes_online.insert(node_id);
214 :
215 : for (tenant_shard_id, conf_opt) in tenant_shards {
216 : observed.insert(tenant_shard_id, (node_id, conf_opt));
217 : }
218 : }
219 :
220 : // List of tenants for which we will attempt to notify compute of their location at startup
221 : let mut compute_notifications = Vec::new();
222 :
223 : // Populate intent and observed states for all tenants, based on reported state on pageservers
224 : let shard_count = {
225 : let mut locked = self.inner.write().unwrap();
226 : let (nodes, tenants, scheduler) = locked.parts_mut();
227 :
228 : // Mark nodes online if they responded to us: nodes are offline by default after a restart.
229 : let mut new_nodes = (**nodes).clone();
230 : for (node_id, node) in new_nodes.iter_mut() {
231 : if nodes_online.contains(node_id) {
232 : node.availability = NodeAvailability::Active;
233 : scheduler.node_upsert(node);
234 : }
235 : }
236 : *nodes = Arc::new(new_nodes);
237 :
238 : for (tenant_shard_id, (node_id, observed_loc)) in observed {
239 : let Some(tenant_state) = tenants.get_mut(&tenant_shard_id) else {
240 : cleanup.push((tenant_shard_id, node_id));
241 : continue;
242 : };
243 :
244 : tenant_state
245 : .observed
246 : .locations
247 : .insert(node_id, ObservedStateLocation { conf: observed_loc });
248 : }
249 :
250 : // Populate each tenant's intent state
251 : for (tenant_shard_id, tenant_state) in tenants.iter_mut() {
252 : tenant_state.intent_from_observed();
253 : if let Err(e) = tenant_state.schedule(scheduler) {
254 : // Non-fatal error: we are unable to properly schedule the tenant, perhaps because
255 : // not enough pageservers are available. The tenant may well still be available
256 : // to clients.
257 0 : tracing::error!("Failed to schedule tenant {tenant_shard_id} at startup: {e}");
258 : } else {
259 : // If we're both intending and observed to be attached at a particular node, we will
260 : // emit a compute notification for this. In the case where our observed state does not
261 : // yet match our intent, we will eventually reconcile, and that will emit a compute notification.
262 : if let Some(attached_at) = tenant_state.stably_attached() {
263 : compute_notifications.push((*tenant_shard_id, attached_at));
264 : }
265 : }
266 : }
267 :
268 : tenants.len()
269 : };
270 :
271 : // TODO: if any tenant's intent now differs from its loaded generation_pageserver, we should clear that
272 : // generation_pageserver in the database.
273 :
274 : // Emit compute hook notifications for all tenants which are already stably attached. Other tenants
275 : // will emit compute hook notifications when they reconcile.
276 : //
277 : // Ordering: we must complete these notification attempts before doing any other reconciliation for the
278 : // tenants named here, because otherwise our calls to notify() might race with more recent values
279 : // generated by reconciliation.
280 : let notify_failures = self
281 : .compute_notify_many(compute_notifications, compute_notify_deadline)
282 : .await;
283 :
284 : // Compute notify is fallible. If it fails here, do not delay overall startup: set the
285 : // flag on these shards that they have a pending notification.
286 : // Update tenant state for any that failed to do their initial compute notify, so that they'll retry later.
287 : {
288 : let mut locked = self.inner.write().unwrap();
289 : for tenant_shard_id in notify_failures.into_iter() {
290 : if let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) {
291 : shard.pending_compute_notification = true;
292 : }
293 : }
294 : }
295 :
296 : // Finally, now that the service is up and running, launch reconcile operations for any tenants
297 : // which require it: under normal circumstances this should only include tenants that were in some
298 : // transient state before we restarted, or any tenants whose compute hooks failed above.
299 : let reconcile_tasks = self.reconcile_all();
300 : // We will not wait for these reconciliation tasks to run here: we're now done with startup and
301 : // normal operations may proceed.
302 :
303 : // Clean up any tenants that were found on pageservers but are not known to us. Do this in the
304 : // background because it does not need to complete in order to proceed with other work.
305 : if !cleanup.is_empty() {
306 0 : tracing::info!("Cleaning up {} locations in the background", cleanup.len());
307 : tokio::task::spawn({
308 : let cleanup_self = self.clone();
309 0 : async move { cleanup_self.cleanup_locations(cleanup).await }
310 : });
311 : }
312 :
313 0 : tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
314 : }
315 :
316 : /// Used during [`Self::startup_reconcile`]: issue GETs to all nodes concurrently, with a deadline.
317 : ///
318 : /// The result includes only nodes which responded within the deadline
319 0 : async fn scan_node_locations(
320 0 : &self,
321 0 : deadline: Instant,
322 0 : ) -> HashMap<NodeId, LocationConfigListResponse> {
323 0 : let nodes = {
324 0 : let locked = self.inner.read().unwrap();
325 0 : locked.nodes.clone()
326 0 : };
327 0 :
328 0 : let mut node_results = HashMap::new();
329 0 :
330 0 : let mut node_list_futs = FuturesUnordered::new();
331 :
332 0 : for node in nodes.values() {
333 0 : node_list_futs.push({
334 0 : async move {
335 0 : let http_client = reqwest::ClientBuilder::new()
336 0 : .timeout(Duration::from_secs(5))
337 0 : .build()
338 0 : .expect("Failed to construct HTTP client");
339 0 : let client = mgmt_api::Client::from_client(
340 0 : http_client,
341 0 : node.base_url(),
342 0 : self.config.jwt_token.as_deref(),
343 0 : );
344 0 :
345 0 : fn is_fatal(e: &mgmt_api::Error) -> bool {
346 0 : use mgmt_api::Error::*;
347 0 : match e {
348 0 : ReceiveBody(_) | ReceiveErrorBody(_) => false,
349 0 : ApiError(StatusCode::SERVICE_UNAVAILABLE, _)
350 0 : | ApiError(StatusCode::GATEWAY_TIMEOUT, _)
351 0 : | ApiError(StatusCode::REQUEST_TIMEOUT, _) => false,
352 0 : ApiError(_, _) => true,
353 0 : }
354 0 : }
355 0 :
356 0 : tracing::info!("Scanning shards on node {}...", node.id);
357 0 : let description = format!("List locations on {}", node.id);
358 0 : let response = backoff::retry(
359 0 : || client.list_location_config(),
360 0 : is_fatal,
361 0 : 1,
362 0 : 5,
363 0 : &description,
364 0 : &self.cancel,
365 0 : )
366 0 : .await;
367 :
368 0 : (node.id, response)
369 0 : }
370 0 : });
371 0 : }
372 :
373 : loop {
374 0 : let (node_id, result) = tokio::select! {
375 0 : next = node_list_futs.next() => {
376 : match next {
377 : Some(result) => result,
378 : None =>{
379 : // We got results for all our nodes
380 : break;
381 : }
382 :
383 : }
384 : },
385 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
386 : // Give up waiting for anyone who hasn't responded: we will yield the results that we have
387 0 : tracing::info!("Reached deadline while waiting for nodes to respond to location listing requests");
388 : break;
389 : }
390 : };
391 :
392 0 : let Some(list_response) = result else {
393 0 : tracing::info!("Shutdown during startup_reconcile");
394 0 : break;
395 : };
396 :
397 0 : match list_response {
398 0 : Err(e) => {
399 0 : tracing::warn!("Could not scan node {} ({e})", node_id);
400 : }
401 0 : Ok(listing) => {
402 0 : node_results.insert(node_id, listing);
403 0 : }
404 : }
405 : }
406 :
407 0 : node_results
408 0 : }
409 :
410 : /// Used during [`Self::startup_reconcile`]: detach a list of unknown-to-us tenants from pageservers.
411 : ///
412 : /// This is safe to run in the background, because if we don't have this TenantShardId in our map of
413 : /// tenants, then it is probably something incompletely deleted before: we will not fight with any
414 : /// other task trying to attach it.
415 0 : #[instrument(skip_all)]
416 : async fn cleanup_locations(&self, cleanup: Vec<(TenantShardId, NodeId)>) {
417 : let nodes = self.inner.read().unwrap().nodes.clone();
418 :
419 : for (tenant_shard_id, node_id) in cleanup {
420 : // A node reported a tenant_shard_id which is unknown to us: detach it.
421 : let Some(node) = nodes.get(&node_id) else {
422 : // This is legitimate; we run in the background and [`Self::startup_reconcile`] might have identified
423 : // a location to clean up on a node that has since been removed.
424 0 : tracing::info!(
425 0 : "Not cleaning up location {node_id}/{tenant_shard_id}: node not found"
426 0 : );
427 : continue;
428 : };
429 :
430 : if self.cancel.is_cancelled() {
431 : break;
432 : }
433 :
434 : let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
435 : match client
436 : .location_config(
437 : tenant_shard_id,
438 : LocationConfig {
439 : mode: LocationConfigMode::Detached,
440 : generation: None,
441 : secondary_conf: None,
442 : shard_number: tenant_shard_id.shard_number.0,
443 : shard_count: tenant_shard_id.shard_count.literal(),
444 : shard_stripe_size: 0,
445 : tenant_conf: models::TenantConfig::default(),
446 : },
447 : None,
448 : )
449 : .await
450 : {
451 : Ok(()) => {
452 0 : tracing::info!(
453 0 : "Detached unknown shard {tenant_shard_id} on pageserver {node_id}"
454 0 : );
455 : }
456 : Err(e) => {
457 : // Non-fatal error: leaving a tenant shard behind that we are not managing shouldn't
458 : // break anything.
459 0 : tracing::error!(
460 0 : "Failed to detach unknkown shard {tenant_shard_id} on pageserver {node_id}: {e}"
461 0 : );
462 : }
463 : }
464 : }
465 : }
466 :
467 : /// Used during [`Self::startup_reconcile`]: issue many concurrent compute notifications.
468 : ///
469 : /// Returns a set of any shards for which notifications where not acked within the deadline.
470 0 : async fn compute_notify_many(
471 0 : &self,
472 0 : notifications: Vec<(TenantShardId, NodeId)>,
473 0 : deadline: Instant,
474 0 : ) -> HashSet<TenantShardId> {
475 0 : let compute_hook = self.inner.read().unwrap().compute_hook.clone();
476 0 :
477 0 : let attempt_shards = notifications.iter().map(|i| i.0).collect::<HashSet<_>>();
478 0 : let mut success_shards = HashSet::new();
479 0 :
480 0 : // Construct an async stream of futures to invoke the compute notify function: we do this
481 0 : // in order to subsequently use .buffered() on the stream to execute with bounded parallelism.
482 0 : let mut stream = futures::stream::iter(notifications.into_iter())
483 0 : .map(|(tenant_shard_id, node_id)| {
484 0 : let compute_hook = compute_hook.clone();
485 0 : let cancel = self.cancel.clone();
486 0 : async move {
487 0 : if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
488 0 : tracing::error!(
489 0 : %tenant_shard_id,
490 0 : %node_id,
491 0 : "Failed to notify compute on startup for shard: {e}"
492 0 : );
493 0 : None
494 : } else {
495 0 : Some(tenant_shard_id)
496 : }
497 0 : }
498 0 : })
499 0 : .buffered(compute_hook::API_CONCURRENCY);
500 :
501 : loop {
502 0 : tokio::select! {
503 0 : next = stream.next() => {
504 : match next {
505 : Some(Some(success_shard)) => {
506 : // A notification succeeded
507 : success_shards.insert(success_shard);
508 : },
509 : Some(None) => {
510 : // A notification that failed
511 : },
512 : None => {
513 0 : tracing::info!("Successfully sent all compute notifications");
514 : break;
515 : }
516 : }
517 : },
518 : _ = tokio::time::sleep(deadline.duration_since(Instant::now())) => {
519 : // Give up sending any that didn't succeed yet
520 0 : tracing::info!("Reached deadline while sending compute notifications");
521 : break;
522 : }
523 : };
524 : }
525 :
526 0 : attempt_shards
527 0 : .difference(&success_shards)
528 0 : .cloned()
529 0 : .collect()
530 0 : }
531 :
532 : /// Long running background task that periodically wakes up and looks for shards that need
533 : /// reconciliation. Reconciliation is fallible, so any reconciliation tasks that fail during
534 : /// e.g. a tenant create/attach/migrate must eventually be retried: this task is responsible
535 : /// for those retries.
536 0 : #[instrument(skip_all)]
537 : async fn background_reconcile(&self) {
538 : self.startup_complete.clone().wait().await;
539 :
540 : const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20);
541 :
542 : let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD);
543 : while !self.cancel.is_cancelled() {
544 0 : tokio::select! {
545 0 : _ = interval.tick() => { self.reconcile_all(); }
546 0 : _ = self.cancel.cancelled() => return
547 0 : }
548 : }
549 : }
550 :
551 : /// Apply the contents of a [`ReconcileResult`] to our in-memory state: if the reconciliation
552 : /// was successful, this will update the observed state of the tenant such that subsequent
553 : /// calls to [`TenantState::maybe_reconcile`] will do nothing.
554 0 : #[instrument(skip_all, fields(
555 : tenant_id=%result.tenant_shard_id.tenant_id, shard_id=%result.tenant_shard_id.shard_slug(),
556 : sequence=%result.sequence
557 0 : ))]
558 : fn process_result(&self, result: ReconcileResult) {
559 : let mut locked = self.inner.write().unwrap();
560 : let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
561 : // A reconciliation result might race with removing a tenant: drop results for
562 : // tenants that aren't in our map.
563 : return;
564 : };
565 :
566 : // Usually generation should only be updated via this path, so the max() isn't
567 : // needed, but it is used to handle out-of-band updates via. e.g. test hook.
568 : tenant.generation = std::cmp::max(tenant.generation, result.generation);
569 :
570 : // If the reconciler signals that it failed to notify compute, set this state on
571 : // the shard so that a future [`TenantState::maybe_reconcile`] will try again.
572 : tenant.pending_compute_notification = result.pending_compute_notification;
573 :
574 : match result.result {
575 : Ok(()) => {
576 : for (node_id, loc) in &result.observed.locations {
577 : if let Some(conf) = &loc.conf {
578 0 : tracing::info!("Updating observed location {}: {:?}", node_id, conf);
579 : } else {
580 0 : tracing::info!("Setting observed location {} to None", node_id,)
581 : }
582 : }
583 : tenant.observed = result.observed;
584 : tenant.waiter.advance(result.sequence);
585 : }
586 : Err(e) => {
587 0 : tracing::warn!("Reconcile error: {}", e);
588 :
589 : // Ordering: populate last_error before advancing error_seq,
590 : // so that waiters will see the correct error after waiting.
591 : *(tenant.last_error.lock().unwrap()) = format!("{e}");
592 : tenant.error_waiter.advance(result.sequence);
593 :
594 : for (node_id, o) in result.observed.locations {
595 : tenant.observed.locations.insert(node_id, o);
596 : }
597 : }
598 : }
599 : }
600 :
601 0 : async fn process_results(
602 0 : &self,
603 0 : mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResult>,
604 0 : ) {
605 0 : loop {
606 0 : // Wait for the next result, or for cancellation
607 0 : let result = tokio::select! {
608 0 : r = result_rx.recv() => {
609 : match r {
610 : Some(result) => {result},
611 : None => {break;}
612 : }
613 : }
614 : _ = self.cancel.cancelled() => {
615 : break;
616 : }
617 : };
618 :
619 0 : self.process_result(result);
620 : }
621 0 : }
622 :
623 0 : pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
624 0 : let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
625 :
626 0 : tracing::info!("Loading nodes from database...");
627 0 : let nodes = persistence
628 0 : .list_nodes()
629 0 : .await?
630 0 : .into_iter()
631 0 : .map(|n| Node {
632 0 : id: NodeId(n.node_id as u64),
633 0 : // At startup we consider a node offline until proven otherwise.
634 0 : availability: NodeAvailability::Offline,
635 0 : scheduling: NodeSchedulingPolicy::from_str(&n.scheduling_policy)
636 0 : .expect("Bad scheduling policy in DB"),
637 0 : listen_http_addr: n.listen_http_addr,
638 0 : listen_http_port: n.listen_http_port as u16,
639 0 : listen_pg_addr: n.listen_pg_addr,
640 0 : listen_pg_port: n.listen_pg_port as u16,
641 0 : })
642 0 : .collect::<Vec<_>>();
643 0 : let nodes: HashMap<NodeId, Node> = nodes.into_iter().map(|n| (n.id, n)).collect();
644 0 : tracing::info!("Loaded {} nodes from database.", nodes.len());
645 :
646 0 : tracing::info!("Loading shards from database...");
647 0 : let tenant_shard_persistence = persistence.list_tenant_shards().await?;
648 0 : tracing::info!(
649 0 : "Loaded {} shards from database.",
650 0 : tenant_shard_persistence.len()
651 0 : );
652 :
653 0 : let mut tenants = BTreeMap::new();
654 0 :
655 0 : let mut scheduler = Scheduler::new(nodes.values());
656 0 :
657 0 : #[cfg(feature = "testing")]
658 0 : {
659 0 : // Hack: insert scheduler state for all nodes referenced by shards, as compatibility
660 0 : // tests only store the shards, not the nodes. The nodes will be loaded shortly
661 0 : // after when pageservers start up and register.
662 0 : let mut node_ids = HashSet::new();
663 0 : for tsp in &tenant_shard_persistence {
664 0 : if tsp.generation_pageserver != i64::MAX {
665 0 : node_ids.insert(tsp.generation_pageserver);
666 0 : }
667 : }
668 0 : for node_id in node_ids {
669 0 : tracing::info!("Creating node {} in scheduler for tests", node_id);
670 0 : let node = Node {
671 0 : id: NodeId(node_id as u64),
672 0 : availability: NodeAvailability::Active,
673 0 : scheduling: NodeSchedulingPolicy::Active,
674 0 : listen_http_addr: "".to_string(),
675 0 : listen_http_port: 123,
676 0 : listen_pg_addr: "".to_string(),
677 0 : listen_pg_port: 123,
678 0 : };
679 0 :
680 0 : scheduler.node_upsert(&node);
681 : }
682 : }
683 0 : for tsp in tenant_shard_persistence {
684 0 : let tenant_shard_id = TenantShardId {
685 0 : tenant_id: TenantId::from_str(tsp.tenant_id.as_str())?,
686 0 : shard_number: ShardNumber(tsp.shard_number as u8),
687 0 : shard_count: ShardCount::new(tsp.shard_count as u8),
688 : };
689 0 : let shard_identity = if tsp.shard_count == 0 {
690 0 : ShardIdentity::unsharded()
691 : } else {
692 0 : ShardIdentity::new(
693 0 : ShardNumber(tsp.shard_number as u8),
694 0 : ShardCount::new(tsp.shard_count as u8),
695 0 : ShardStripeSize(tsp.shard_stripe_size as u32),
696 0 : )?
697 : };
698 :
699 : // We will populate intent properly later in [`Self::startup_reconcile`], initially populate
700 : // it with what we can infer: the node for which a generation was most recently issued.
701 0 : let mut intent = IntentState::new();
702 0 : if tsp.generation_pageserver != i64::MAX {
703 0 : intent.set_attached(
704 0 : &mut scheduler,
705 0 : Some(NodeId(tsp.generation_pageserver as u64)),
706 0 : );
707 0 : }
708 :
709 0 : let new_tenant = TenantState {
710 0 : tenant_shard_id,
711 0 : shard: shard_identity,
712 0 : sequence: Sequence::initial(),
713 0 : generation: Generation::new(tsp.generation as u32),
714 0 : policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
715 0 : intent,
716 0 : observed: ObservedState::new(),
717 0 : config: serde_json::from_str(&tsp.config).unwrap(),
718 0 : reconciler: None,
719 0 : splitting: tsp.splitting,
720 0 : waiter: Arc::new(SeqWait::new(Sequence::initial())),
721 0 : error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
722 0 : last_error: Arc::default(),
723 0 : pending_compute_notification: false,
724 0 : };
725 0 :
726 0 : tenants.insert(tenant_shard_id, new_tenant);
727 : }
728 :
729 0 : let (startup_completion, startup_complete) = utils::completion::channel();
730 0 :
731 0 : let this = Arc::new(Self {
732 0 : inner: Arc::new(std::sync::RwLock::new(ServiceState::new(
733 0 : config.clone(),
734 0 : result_tx,
735 0 : nodes,
736 0 : tenants,
737 0 : scheduler,
738 0 : ))),
739 0 : config,
740 0 : persistence,
741 0 : startup_complete: startup_complete.clone(),
742 0 : cancel: CancellationToken::new(),
743 0 : gate: Gate::default(),
744 0 : });
745 0 :
746 0 : let result_task_this = this.clone();
747 0 : tokio::task::spawn(async move {
748 : // Block shutdown until we're done (we must respect self.cancel)
749 0 : if let Ok(_gate) = result_task_this.gate.enter() {
750 0 : result_task_this.process_results(result_rx).await
751 0 : }
752 0 : });
753 0 :
754 0 : tokio::task::spawn({
755 0 : let this = this.clone();
756 0 : // We will block the [`Service::startup_complete`] barrier until [`Self::startup_reconcile`]
757 0 : // is done.
758 0 : let startup_completion = startup_completion.clone();
759 0 : async move {
760 : // Block shutdown until we're done (we must respect self.cancel)
761 0 : let Ok(_gate) = this.gate.enter() else {
762 0 : return;
763 : };
764 :
765 0 : this.startup_reconcile().await;
766 :
767 0 : drop(startup_completion);
768 0 :
769 0 : this.background_reconcile().await;
770 0 : }
771 0 : });
772 0 :
773 0 : Ok(this)
774 0 : }
775 :
776 0 : pub(crate) async fn attach_hook(
777 0 : &self,
778 0 : attach_req: AttachHookRequest,
779 0 : ) -> anyhow::Result<AttachHookResponse> {
780 0 : // This is a test hook. To enable using it on tenants that were created directly with
781 0 : // the pageserver API (not via this service), we will auto-create any missing tenant
782 0 : // shards with default state.
783 0 : let insert = {
784 0 : let locked = self.inner.write().unwrap();
785 0 : !locked.tenants.contains_key(&attach_req.tenant_shard_id)
786 0 : };
787 0 : if insert {
788 0 : let tsp = TenantShardPersistence {
789 0 : tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
790 0 : shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
791 0 : shard_count: attach_req.tenant_shard_id.shard_count.literal() as i32,
792 0 : shard_stripe_size: 0,
793 0 : generation: 0,
794 0 : generation_pageserver: i64::MAX,
795 0 : placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
796 0 : config: serde_json::to_string(&TenantConfig::default()).unwrap(),
797 0 : splitting: SplitState::default(),
798 0 : };
799 0 :
800 0 : match self.persistence.insert_tenant_shards(vec![tsp]).await {
801 0 : Err(e) => match e {
802 : DatabaseError::Query(diesel::result::Error::DatabaseError(
803 : DatabaseErrorKind::UniqueViolation,
804 : _,
805 : )) => {
806 0 : tracing::info!(
807 0 : "Raced with another request to insert tenant {}",
808 0 : attach_req.tenant_shard_id
809 0 : )
810 : }
811 0 : _ => return Err(e.into()),
812 : },
813 : Ok(()) => {
814 0 : tracing::info!("Inserted shard {} in database", attach_req.tenant_shard_id);
815 :
816 0 : let mut locked = self.inner.write().unwrap();
817 0 : locked.tenants.insert(
818 0 : attach_req.tenant_shard_id,
819 0 : TenantState::new(
820 0 : attach_req.tenant_shard_id,
821 0 : ShardIdentity::unsharded(),
822 0 : PlacementPolicy::Single,
823 0 : ),
824 0 : );
825 0 : tracing::info!("Inserted shard {} in memory", attach_req.tenant_shard_id);
826 : }
827 : }
828 0 : }
829 :
830 0 : let new_generation = if let Some(req_node_id) = attach_req.node_id {
831 : Some(
832 0 : self.persistence
833 0 : .increment_generation(attach_req.tenant_shard_id, req_node_id)
834 0 : .await?,
835 : )
836 : } else {
837 0 : self.persistence.detach(attach_req.tenant_shard_id).await?;
838 0 : None
839 : };
840 :
841 0 : let mut locked = self.inner.write().unwrap();
842 0 : let (_nodes, tenants, scheduler) = locked.parts_mut();
843 0 :
844 0 : let tenant_state = tenants
845 0 : .get_mut(&attach_req.tenant_shard_id)
846 0 : .expect("Checked for existence above");
847 :
848 0 : if let Some(new_generation) = new_generation {
849 0 : tenant_state.generation = new_generation;
850 0 : } else {
851 : // This is a detach notification. We must update placement policy to avoid re-attaching
852 : // during background scheduling/reconciliation, or during attachment service restart.
853 0 : assert!(attach_req.node_id.is_none());
854 0 : tenant_state.policy = PlacementPolicy::Detached;
855 : }
856 :
857 0 : if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
858 0 : tracing::info!(
859 0 : tenant_id = %attach_req.tenant_shard_id,
860 0 : ps_id = %attaching_pageserver,
861 0 : generation = ?tenant_state.generation,
862 0 : "issuing",
863 0 : );
864 0 : } else if let Some(ps_id) = tenant_state.intent.get_attached() {
865 0 : tracing::info!(
866 0 : tenant_id = %attach_req.tenant_shard_id,
867 0 : %ps_id,
868 0 : generation = ?tenant_state.generation,
869 0 : "dropping",
870 0 : );
871 : } else {
872 0 : tracing::info!(
873 0 : tenant_id = %attach_req.tenant_shard_id,
874 0 : "no-op: tenant already has no pageserver");
875 : }
876 0 : tenant_state
877 0 : .intent
878 0 : .set_attached(scheduler, attach_req.node_id);
879 :
880 0 : tracing::info!(
881 0 : "attach_hook: tenant {} set generation {:?}, pageserver {}",
882 0 : attach_req.tenant_shard_id,
883 0 : tenant_state.generation,
884 0 : // TODO: this is an odd number of 0xf's
885 0 : attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
886 0 : );
887 :
888 : // Trick the reconciler into not doing anything for this tenant: this helps
889 : // tests that manually configure a tenant on the pagesrever, and then call this
890 : // attach hook: they don't want background reconciliation to modify what they
891 : // did to the pageserver.
892 : #[cfg(feature = "testing")]
893 : {
894 0 : if let Some(node_id) = attach_req.node_id {
895 0 : tenant_state.observed.locations = HashMap::from([(
896 0 : node_id,
897 0 : ObservedStateLocation {
898 0 : conf: Some(attached_location_conf(
899 0 : tenant_state.generation,
900 0 : &tenant_state.shard,
901 0 : &tenant_state.config,
902 0 : )),
903 0 : },
904 0 : )]);
905 0 : } else {
906 0 : tenant_state.observed.locations.clear();
907 0 : }
908 : }
909 :
910 0 : Ok(AttachHookResponse {
911 0 : gen: attach_req
912 0 : .node_id
913 0 : .map(|_| tenant_state.generation.into().unwrap()),
914 0 : })
915 0 : }
916 :
917 0 : pub(crate) fn inspect(&self, inspect_req: InspectRequest) -> InspectResponse {
918 0 : let locked = self.inner.read().unwrap();
919 0 :
920 0 : let tenant_state = locked.tenants.get(&inspect_req.tenant_shard_id);
921 0 :
922 0 : InspectResponse {
923 0 : attachment: tenant_state.and_then(|s| {
924 0 : s.intent
925 0 : .get_attached()
926 0 : .map(|ps| (s.generation.into().unwrap(), ps))
927 0 : }),
928 0 : }
929 0 : }
930 :
931 0 : pub(crate) async fn re_attach(
932 0 : &self,
933 0 : reattach_req: ReAttachRequest,
934 0 : ) -> Result<ReAttachResponse, ApiError> {
935 0 : // Take a re-attach as indication that the node is available: this is a precursor to proper
936 0 : // heartbeating in https://github.com/neondatabase/neon/issues/6844
937 0 : self.node_configure(NodeConfigureRequest {
938 0 : node_id: reattach_req.node_id,
939 0 : availability: Some(NodeAvailability::Active),
940 0 : scheduling: None,
941 0 : })
942 0 : .await?;
943 :
944 : // Ordering: we must persist generation number updates before making them visible in the in-memory state
945 0 : let incremented_generations = self.persistence.re_attach(reattach_req.node_id).await?;
946 :
947 : // Apply the updated generation to our in-memory state
948 0 : let mut locked = self.inner.write().unwrap();
949 0 :
950 0 : let mut response = ReAttachResponse {
951 0 : tenants: Vec::new(),
952 0 : };
953 :
954 0 : for (tenant_shard_id, new_gen) in incremented_generations {
955 0 : response.tenants.push(ReAttachResponseTenant {
956 0 : id: tenant_shard_id,
957 0 : gen: new_gen.into().unwrap(),
958 0 : });
959 0 :
960 0 : // Apply the new generation number to our in-memory state
961 0 : let shard_state = locked.tenants.get_mut(&tenant_shard_id);
962 0 : let Some(shard_state) = shard_state else {
963 : // Not fatal. This edge case requires a re-attach to happen
964 : // between inserting a new tenant shard in to the database, and updating our in-memory
965 : // state to know about the shard, _and_ that the state inserted to the database referenced
966 : // a pageserver. Should never happen, but handle it rather than panicking, since it should
967 : // be harmless.
968 0 : tracing::error!(
969 0 : "Shard {} is in database for node {} but not in-memory state",
970 0 : tenant_shard_id,
971 0 : reattach_req.node_id
972 0 : );
973 0 : continue;
974 : };
975 :
976 0 : shard_state.generation = std::cmp::max(shard_state.generation, new_gen);
977 0 : if let Some(observed) = shard_state
978 0 : .observed
979 0 : .locations
980 0 : .get_mut(&reattach_req.node_id)
981 : {
982 0 : if let Some(conf) = observed.conf.as_mut() {
983 0 : conf.generation = new_gen.into();
984 0 : }
985 0 : }
986 :
987 : // TODO: cancel/restart any running reconciliation for this tenant, it might be trying
988 : // to call location_conf API with an old generation. Wait for cancellation to complete
989 : // before responding to this request. Requires well implemented CancellationToken logic
990 : // all the way to where we call location_conf. Even then, there can still be a location_conf
991 : // request in flight over the network: TODO handle that by making location_conf API refuse
992 : // to go backward in generations.
993 : }
994 0 : Ok(response)
995 0 : }
996 :
997 0 : pub(crate) fn validate(&self, validate_req: ValidateRequest) -> ValidateResponse {
998 0 : let locked = self.inner.read().unwrap();
999 0 :
1000 0 : let mut response = ValidateResponse {
1001 0 : tenants: Vec::new(),
1002 0 : };
1003 :
1004 0 : for req_tenant in validate_req.tenants {
1005 0 : if let Some(tenant_state) = locked.tenants.get(&req_tenant.id) {
1006 0 : let valid = tenant_state.generation == Generation::new(req_tenant.gen);
1007 0 : tracing::info!(
1008 0 : "handle_validate: {}(gen {}): valid={valid} (latest {:?})",
1009 0 : req_tenant.id,
1010 0 : req_tenant.gen,
1011 0 : tenant_state.generation
1012 0 : );
1013 0 : response.tenants.push(ValidateResponseTenant {
1014 0 : id: req_tenant.id,
1015 0 : valid,
1016 0 : });
1017 0 : } else {
1018 0 : // After tenant deletion, we may approve any validation. This avoids
1019 0 : // spurious warnings on the pageserver if it has pending LSN updates
1020 0 : // at the point a deletion happens.
1021 0 : response.tenants.push(ValidateResponseTenant {
1022 0 : id: req_tenant.id,
1023 0 : valid: true,
1024 0 : });
1025 0 : }
1026 : }
1027 0 : response
1028 0 : }
1029 :
1030 0 : pub(crate) async fn tenant_create(
1031 0 : &self,
1032 0 : create_req: TenantCreateRequest,
1033 0 : ) -> Result<TenantCreateResponse, ApiError> {
1034 0 : let (response, waiters) = self.do_tenant_create(create_req).await?;
1035 :
1036 0 : self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await?;
1037 0 : Ok(response)
1038 0 : }
1039 :
1040 0 : pub(crate) async fn do_tenant_create(
1041 0 : &self,
1042 0 : create_req: TenantCreateRequest,
1043 0 : ) -> Result<(TenantCreateResponse, Vec<ReconcilerWaiter>), ApiError> {
1044 : // This service expects to handle sharding itself: it is an error to try and directly create
1045 : // a particular shard here.
1046 0 : let tenant_id = if !create_req.new_tenant_id.is_unsharded() {
1047 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1048 0 : "Attempted to create a specific shard, this API is for creating the whole tenant"
1049 0 : )));
1050 : } else {
1051 0 : create_req.new_tenant_id.tenant_id
1052 : };
1053 :
1054 0 : tracing::info!(
1055 0 : "Creating tenant {}, shard_count={:?}",
1056 0 : create_req.new_tenant_id,
1057 0 : create_req.shard_parameters.count,
1058 0 : );
1059 :
1060 0 : let create_ids = (0..create_req.shard_parameters.count.count())
1061 0 : .map(|i| TenantShardId {
1062 0 : tenant_id,
1063 0 : shard_number: ShardNumber(i),
1064 0 : shard_count: create_req.shard_parameters.count,
1065 0 : })
1066 0 : .collect::<Vec<_>>();
1067 0 :
1068 0 : // TODO: enable specifying this. Using Single as a default helps legacy tests to work (they
1069 0 : // have no expectation of HA).
1070 0 : let placement_policy: PlacementPolicy = PlacementPolicy::Single;
1071 0 :
1072 0 : // Ordering: we persist tenant shards before creating them on the pageserver. This enables a caller
1073 0 : // to clean up after themselves by issuing a tenant deletion if something goes wrong and we restart
1074 0 : // during the creation, rather than risking leaving orphan objects in S3.
1075 0 : let persist_tenant_shards = create_ids
1076 0 : .iter()
1077 0 : .map(|tenant_shard_id| TenantShardPersistence {
1078 0 : tenant_id: tenant_shard_id.tenant_id.to_string(),
1079 0 : shard_number: tenant_shard_id.shard_number.0 as i32,
1080 0 : shard_count: tenant_shard_id.shard_count.literal() as i32,
1081 0 : shard_stripe_size: create_req.shard_parameters.stripe_size.0 as i32,
1082 0 : generation: create_req.generation.map(|g| g as i32).unwrap_or(0),
1083 0 : generation_pageserver: i64::MAX,
1084 0 : placement_policy: serde_json::to_string(&placement_policy).unwrap(),
1085 0 : config: serde_json::to_string(&create_req.config).unwrap(),
1086 0 : splitting: SplitState::default(),
1087 0 : })
1088 0 : .collect();
1089 0 : self.persistence
1090 0 : .insert_tenant_shards(persist_tenant_shards)
1091 0 : .await
1092 0 : .map_err(|e| {
1093 0 : // TODO: distinguish primary key constraint (idempotent, OK), from other errors
1094 0 : ApiError::InternalServerError(anyhow::anyhow!(e))
1095 0 : })?;
1096 :
1097 0 : let (waiters, response_shards) = {
1098 0 : let mut locked = self.inner.write().unwrap();
1099 0 : let (_nodes, tenants, scheduler) = locked.parts_mut();
1100 0 :
1101 0 : let mut response_shards = Vec::new();
1102 :
1103 0 : for tenant_shard_id in create_ids {
1104 0 : tracing::info!("Creating shard {tenant_shard_id}...");
1105 :
1106 : use std::collections::btree_map::Entry;
1107 0 : match tenants.entry(tenant_shard_id) {
1108 0 : Entry::Occupied(mut entry) => {
1109 0 : tracing::info!(
1110 0 : "Tenant shard {tenant_shard_id} already exists while creating"
1111 0 : );
1112 :
1113 : // TODO: schedule() should take an anti-affinity expression that pushes
1114 : // attached and secondary locations (independently) away frorm those
1115 : // pageservers also holding a shard for this tenant.
1116 :
1117 0 : entry.get_mut().schedule(scheduler).map_err(|e| {
1118 0 : ApiError::Conflict(format!(
1119 0 : "Failed to schedule shard {tenant_shard_id}: {e}"
1120 0 : ))
1121 0 : })?;
1122 :
1123 0 : response_shards.push(TenantCreateResponseShard {
1124 0 : shard_id: tenant_shard_id,
1125 0 : node_id: entry
1126 0 : .get()
1127 0 : .intent
1128 0 : .get_attached()
1129 0 : .expect("We just set pageserver if it was None"),
1130 0 : generation: entry.get().generation.into().unwrap(),
1131 0 : });
1132 0 :
1133 0 : continue;
1134 : }
1135 0 : Entry::Vacant(entry) => {
1136 0 : let mut state = TenantState::new(
1137 0 : tenant_shard_id,
1138 0 : ShardIdentity::from_params(
1139 0 : tenant_shard_id.shard_number,
1140 0 : &create_req.shard_parameters,
1141 0 : ),
1142 0 : placement_policy.clone(),
1143 0 : );
1144 :
1145 0 : if let Some(create_gen) = create_req.generation {
1146 0 : state.generation = Generation::new(create_gen);
1147 0 : }
1148 0 : state.config = create_req.config.clone();
1149 0 :
1150 0 : state.schedule(scheduler).map_err(|e| {
1151 0 : ApiError::Conflict(format!(
1152 0 : "Failed to schedule shard {tenant_shard_id}: {e}"
1153 0 : ))
1154 0 : })?;
1155 :
1156 0 : response_shards.push(TenantCreateResponseShard {
1157 0 : shard_id: tenant_shard_id,
1158 0 : node_id: state
1159 0 : .intent
1160 0 : .get_attached()
1161 0 : .expect("We just set pageserver if it was None"),
1162 0 : generation: state.generation.into().unwrap(),
1163 0 : });
1164 0 : entry.insert(state)
1165 : }
1166 : };
1167 : }
1168 :
1169 : // Take a snapshot of pageservers
1170 0 : let pageservers = locked.nodes.clone();
1171 0 :
1172 0 : let result_tx = locked.result_tx.clone();
1173 0 : let compute_hook = locked.compute_hook.clone();
1174 0 :
1175 0 : let waiters = locked
1176 0 : .tenants
1177 0 : .range_mut(TenantShardId::tenant_range(tenant_id))
1178 0 : .filter_map(|(_shard_id, shard)| {
1179 0 : shard.maybe_reconcile(
1180 0 : result_tx.clone(),
1181 0 : &pageservers,
1182 0 : &compute_hook,
1183 0 : &self.config,
1184 0 : &self.persistence,
1185 0 : &self.gate,
1186 0 : &self.cancel,
1187 0 : )
1188 0 : })
1189 0 : .collect::<Vec<_>>();
1190 0 : (waiters, response_shards)
1191 0 : };
1192 0 :
1193 0 : Ok((
1194 0 : TenantCreateResponse {
1195 0 : shards: response_shards,
1196 0 : },
1197 0 : waiters,
1198 0 : ))
1199 0 : }
1200 :
1201 : /// Helper for functions that reconcile a number of shards, and would like to do a timeout-bounded
1202 : /// wait for reconciliation to complete before responding.
1203 0 : async fn await_waiters(
1204 0 : &self,
1205 0 : waiters: Vec<ReconcilerWaiter>,
1206 0 : timeout: Duration,
1207 0 : ) -> Result<(), ReconcileWaitError> {
1208 0 : let deadline = Instant::now().checked_add(timeout).unwrap();
1209 0 : for waiter in waiters {
1210 0 : let timeout = deadline.duration_since(Instant::now());
1211 0 : waiter.wait_timeout(timeout).await?;
1212 : }
1213 :
1214 0 : Ok(())
1215 0 : }
1216 :
1217 : /// This API is used by the cloud control plane to do coarse-grained control of tenants:
1218 : /// - Call with mode Attached* to upsert the tenant.
1219 : /// - Call with mode Detached to switch to PolicyMode::Detached
1220 : ///
1221 : /// In future, calling with mode Secondary may switch to a detach-lite mode in which a tenant only has
1222 : /// secondary locations.
1223 0 : pub(crate) async fn tenant_location_config(
1224 0 : &self,
1225 0 : tenant_id: TenantId,
1226 0 : req: TenantLocationConfigRequest,
1227 0 : ) -> Result<TenantLocationConfigResponse, ApiError> {
1228 0 : if !req.tenant_id.is_unsharded() {
1229 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1230 0 : "This API is for importing single-sharded or unsharded tenants"
1231 0 : )));
1232 0 : }
1233 0 :
1234 0 : let mut waiters = Vec::new();
1235 0 : let mut result = TenantLocationConfigResponse { shards: Vec::new() };
1236 0 : let maybe_create = {
1237 0 : let mut locked = self.inner.write().unwrap();
1238 0 : let result_tx = locked.result_tx.clone();
1239 0 : let compute_hook = locked.compute_hook.clone();
1240 0 : let (nodes, tenants, scheduler) = locked.parts_mut();
1241 0 :
1242 0 : // Maybe we have existing shards
1243 0 : let mut create = true;
1244 0 : for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
1245 : // Saw an existing shard: this is not a creation
1246 0 : create = false;
1247 0 :
1248 0 : // Note that for existing tenants we do _not_ respect the generation in the request: this is likely
1249 0 : // to be stale. Once a tenant is created in this service, our view of generation is authoritative, and
1250 0 : // callers' generations may be ignored. This represents a one-way migration of tenants from the outer
1251 0 : // cloud control plane into this service.
1252 0 :
1253 0 : // Use location config mode as an indicator of policy: if they ask for
1254 0 : // attached we go to default HA attached mode. If they ask for secondary
1255 0 : // we go to secondary-only mode. If they ask for detached we detach.
1256 0 : match req.config.mode {
1257 0 : LocationConfigMode::Detached => {
1258 0 : shard.policy = PlacementPolicy::Detached;
1259 0 : }
1260 : LocationConfigMode::Secondary => {
1261 : // TODO: implement secondary-only mode.
1262 0 : todo!();
1263 : }
1264 : LocationConfigMode::AttachedMulti
1265 : | LocationConfigMode::AttachedSingle
1266 : | LocationConfigMode::AttachedStale => {
1267 : // TODO: persistence for changes in policy
1268 0 : if nodes.len() > 1 {
1269 0 : shard.policy = PlacementPolicy::Double(1)
1270 : } else {
1271 : // Convenience for dev/test: if we just have one pageserver, import
1272 : // tenants into Single mode so that scheduling will succeed.
1273 0 : shard.policy = PlacementPolicy::Single
1274 : }
1275 : }
1276 : }
1277 :
1278 0 : shard.schedule(scheduler)?;
1279 :
1280 0 : let maybe_waiter = shard.maybe_reconcile(
1281 0 : result_tx.clone(),
1282 0 : nodes,
1283 0 : &compute_hook,
1284 0 : &self.config,
1285 0 : &self.persistence,
1286 0 : &self.gate,
1287 0 : &self.cancel,
1288 0 : );
1289 0 : if let Some(waiter) = maybe_waiter {
1290 0 : waiters.push(waiter);
1291 0 : }
1292 :
1293 0 : if let Some(node_id) = shard.intent.get_attached() {
1294 0 : result.shards.push(TenantShardLocation {
1295 0 : shard_id: *shard_id,
1296 0 : node_id: *node_id,
1297 0 : })
1298 0 : }
1299 : }
1300 :
1301 0 : if create {
1302 : // Validate request mode
1303 0 : match req.config.mode {
1304 : LocationConfigMode::Detached | LocationConfigMode::Secondary => {
1305 : // When using this API to onboard an existing tenant to this service, it must start in
1306 : // an attached state, because we need the request to come with a generation
1307 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1308 0 : "Imported tenant must be in attached mode"
1309 0 : )));
1310 : }
1311 :
1312 : LocationConfigMode::AttachedMulti
1313 : | LocationConfigMode::AttachedSingle
1314 0 : | LocationConfigMode::AttachedStale => {
1315 0 : // Pass
1316 0 : }
1317 : }
1318 :
1319 : // Validate request generation
1320 0 : let Some(generation) = req.config.generation else {
1321 : // We can only import attached tenants, because we need the request to come with a generation
1322 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1323 0 : "Generation is mandatory when importing tenant"
1324 0 : )));
1325 : };
1326 :
1327 : // Synthesize a creation request
1328 0 : Some(TenantCreateRequest {
1329 0 : new_tenant_id: TenantShardId::unsharded(tenant_id),
1330 0 : generation: Some(generation),
1331 0 : shard_parameters: ShardParameters {
1332 0 : // Must preserve the incoming shard_count do distinguish unsharded (0)
1333 0 : // from single-sharded (1): this distinction appears in the S3 keys of the tenant.
1334 0 : count: req.tenant_id.shard_count,
1335 0 : // We only import un-sharded or single-sharded tenants, so stripe
1336 0 : // size can be made up arbitrarily here.
1337 0 : stripe_size: ShardParameters::DEFAULT_STRIPE_SIZE,
1338 0 : },
1339 0 : config: req.config.tenant_conf,
1340 0 : })
1341 : } else {
1342 0 : None
1343 : }
1344 : };
1345 :
1346 0 : let waiters = if let Some(create_req) = maybe_create {
1347 0 : let (create_resp, waiters) = self.do_tenant_create(create_req).await?;
1348 0 : result.shards = create_resp
1349 0 : .shards
1350 0 : .into_iter()
1351 0 : .map(|s| TenantShardLocation {
1352 0 : node_id: s.node_id,
1353 0 : shard_id: s.shard_id,
1354 0 : })
1355 0 : .collect();
1356 0 : waiters
1357 : } else {
1358 0 : waiters
1359 : };
1360 :
1361 0 : if let Err(e) = self.await_waiters(waiters, SHORT_RECONCILE_TIMEOUT).await {
1362 : // Do not treat a reconcile error as fatal: we have already applied any requested
1363 : // Intent changes, and the reconcile can fail for external reasons like unavailable
1364 : // compute notification API. In these cases, it is important that we do not
1365 : // cause the cloud control plane to retry forever on this API.
1366 0 : tracing::warn!(
1367 0 : "Failed to reconcile after /location_config: {e}, returning success anyway"
1368 0 : );
1369 0 : }
1370 :
1371 : // Logging the full result is useful because it lets us cross-check what the cloud control
1372 : // plane's tenant_shards table should contain.
1373 0 : tracing::info!("Complete, returning {result:?}");
1374 :
1375 0 : Ok(result)
1376 0 : }
1377 :
1378 0 : pub(crate) async fn tenant_time_travel_remote_storage(
1379 0 : &self,
1380 0 : time_travel_req: &TenantTimeTravelRequest,
1381 0 : tenant_id: TenantId,
1382 0 : timestamp: Cow<'_, str>,
1383 0 : done_if_after: Cow<'_, str>,
1384 0 : ) -> Result<(), ApiError> {
1385 0 : let node = {
1386 0 : let locked = self.inner.read().unwrap();
1387 : // Just a sanity check to prevent misuse: the API expects that the tenant is fully
1388 : // detached everywhere, and nothing writes to S3 storage. Here, we verify that,
1389 : // but only at the start of the process, so it's really just to prevent operator
1390 : // mistakes.
1391 0 : for (shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
1392 0 : if shard.intent.get_attached().is_some() || !shard.intent.get_secondary().is_empty()
1393 : {
1394 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1395 0 : "We want tenant to be attached in shard with tenant_shard_id={shard_id}"
1396 0 : )));
1397 0 : }
1398 0 : let maybe_attached = shard
1399 0 : .observed
1400 0 : .locations
1401 0 : .iter()
1402 0 : .filter_map(|(node_id, observed_location)| {
1403 0 : observed_location
1404 0 : .conf
1405 0 : .as_ref()
1406 0 : .map(|loc| (node_id, observed_location, loc.mode))
1407 0 : })
1408 0 : .find(|(_, _, mode)| *mode != LocationConfigMode::Detached);
1409 0 : if let Some((node_id, _observed_location, mode)) = maybe_attached {
1410 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!("We observed attached={mode:?} tenant in node_id={node_id} shard with tenant_shard_id={shard_id}")));
1411 0 : }
1412 : }
1413 0 : let scheduler = &locked.scheduler;
1414 : // Right now we only perform the operation on a single node without parallelization
1415 : // TODO fan out the operation to multiple nodes for better performance
1416 0 : let node_id = scheduler.schedule_shard(&[])?;
1417 0 : let node = locked
1418 0 : .nodes
1419 0 : .get(&node_id)
1420 0 : .expect("Pageservers may not be deleted while lock is active");
1421 0 : node.clone()
1422 0 : };
1423 0 :
1424 0 : // The shard count is encoded in the remote storage's URL, so we need to handle all historically used shard counts
1425 0 : let mut counts = time_travel_req
1426 0 : .shard_counts
1427 0 : .iter()
1428 0 : .copied()
1429 0 : .collect::<HashSet<_>>()
1430 0 : .into_iter()
1431 0 : .collect::<Vec<_>>();
1432 0 : counts.sort_unstable();
1433 :
1434 0 : for count in counts {
1435 0 : let shard_ids = (0..count.count())
1436 0 : .map(|i| TenantShardId {
1437 0 : tenant_id,
1438 0 : shard_number: ShardNumber(i),
1439 0 : shard_count: count,
1440 0 : })
1441 0 : .collect::<Vec<_>>();
1442 0 : for tenant_shard_id in shard_ids {
1443 0 : let client =
1444 0 : mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
1445 :
1446 0 : tracing::info!("Doing time travel recovery for shard {tenant_shard_id}",);
1447 :
1448 0 : client
1449 0 : .tenant_time_travel_remote_storage(
1450 0 : tenant_shard_id,
1451 0 : ×tamp,
1452 0 : &done_if_after,
1453 0 : )
1454 0 : .await
1455 0 : .map_err(|e| {
1456 0 : ApiError::InternalServerError(anyhow::anyhow!(
1457 0 : "Error doing time travel recovery for shard {tenant_shard_id} on node {}: {e}",
1458 0 : node.id
1459 0 : ))
1460 0 : })?;
1461 : }
1462 : }
1463 :
1464 0 : Ok(())
1465 0 : }
1466 :
1467 0 : pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
1468 0 : self.ensure_attached_wait(tenant_id).await?;
1469 :
1470 : // TODO: refactor into helper
1471 0 : let targets = {
1472 0 : let locked = self.inner.read().unwrap();
1473 0 : let mut targets = Vec::new();
1474 :
1475 0 : for (tenant_shard_id, shard) in
1476 0 : locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1477 0 : {
1478 0 : let node_id = shard.intent.get_attached().ok_or_else(|| {
1479 0 : ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
1480 0 : })?;
1481 0 : let node = locked
1482 0 : .nodes
1483 0 : .get(&node_id)
1484 0 : .expect("Pageservers may not be deleted while referenced");
1485 0 :
1486 0 : targets.push((*tenant_shard_id, node.clone()));
1487 : }
1488 0 : targets
1489 0 : };
1490 0 :
1491 0 : // Phase 1: delete on the pageservers
1492 0 : let mut any_pending = false;
1493 0 : for (tenant_shard_id, node) in targets {
1494 0 : let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
1495 : // TODO: this, like many other places, requires proper retry handling for 503, timeout: those should not
1496 : // surface immediately as an error to our caller.
1497 0 : let status = client.tenant_delete(tenant_shard_id).await.map_err(|e| {
1498 0 : ApiError::InternalServerError(anyhow::anyhow!(
1499 0 : "Error deleting shard {tenant_shard_id} on node {}: {e}",
1500 0 : node.id
1501 0 : ))
1502 0 : })?;
1503 0 : tracing::info!(
1504 0 : "Shard {tenant_shard_id} on node {}, delete returned {}",
1505 0 : node.id,
1506 0 : status
1507 0 : );
1508 0 : if status == StatusCode::ACCEPTED {
1509 0 : any_pending = true;
1510 0 : }
1511 : }
1512 :
1513 0 : if any_pending {
1514 : // Caller should call us again later. When we eventually see 404s from
1515 : // all the shards, we may proceed to delete our records of the tenant.
1516 0 : tracing::info!(
1517 0 : "Tenant {} has some shards pending deletion, returning 202",
1518 0 : tenant_id
1519 0 : );
1520 0 : return Ok(StatusCode::ACCEPTED);
1521 0 : }
1522 0 :
1523 0 : // Fall through: deletion of the tenant on pageservers is complete, we may proceed to drop
1524 0 : // our in-memory state and database state.
1525 0 :
1526 0 : // Ordering: we delete persistent state first: if we then
1527 0 : // crash, we will drop the in-memory state.
1528 0 :
1529 0 : // Drop persistent state.
1530 0 : self.persistence.delete_tenant(tenant_id).await?;
1531 :
1532 : // Drop in-memory state
1533 : {
1534 0 : let mut locked = self.inner.write().unwrap();
1535 0 : let (_nodes, tenants, scheduler) = locked.parts_mut();
1536 :
1537 : // Dereference Scheduler from shards before dropping them
1538 0 : for (_tenant_shard_id, shard) in
1539 0 : tenants.range_mut(TenantShardId::tenant_range(tenant_id))
1540 0 : {
1541 0 : shard.intent.clear(scheduler);
1542 0 : }
1543 :
1544 0 : tenants.retain(|tenant_shard_id, _shard| tenant_shard_id.tenant_id != tenant_id);
1545 0 : tracing::info!(
1546 0 : "Deleted tenant {tenant_id}, now have {} tenants",
1547 0 : locked.tenants.len()
1548 0 : );
1549 : };
1550 :
1551 : // Success is represented as 404, to imitate the existing pageserver deletion API
1552 0 : Ok(StatusCode::NOT_FOUND)
1553 0 : }
1554 :
1555 0 : pub(crate) async fn tenant_timeline_create(
1556 0 : &self,
1557 0 : tenant_id: TenantId,
1558 0 : mut create_req: TimelineCreateRequest,
1559 0 : ) -> Result<TimelineInfo, ApiError> {
1560 0 : tracing::info!(
1561 0 : "Creating timeline {}/{}",
1562 0 : tenant_id,
1563 0 : create_req.new_timeline_id,
1564 0 : );
1565 :
1566 0 : self.ensure_attached_wait(tenant_id).await?;
1567 :
1568 : // TODO: refuse to do this if shard splitting is in progress
1569 : // (https://github.com/neondatabase/neon/issues/6676)
1570 0 : let mut targets = {
1571 0 : let locked = self.inner.read().unwrap();
1572 0 : let mut targets = Vec::new();
1573 :
1574 0 : for (tenant_shard_id, shard) in
1575 0 : locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1576 0 : {
1577 0 : let node_id = shard.intent.get_attached().ok_or_else(|| {
1578 0 : ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
1579 0 : })?;
1580 0 : let node = locked
1581 0 : .nodes
1582 0 : .get(&node_id)
1583 0 : .expect("Pageservers may not be deleted while referenced");
1584 0 :
1585 0 : targets.push((*tenant_shard_id, node.clone()));
1586 : }
1587 0 : targets
1588 0 : };
1589 0 :
1590 0 : if targets.is_empty() {
1591 0 : return Err(ApiError::NotFound(
1592 0 : anyhow::anyhow!("Tenant not found").into(),
1593 0 : ));
1594 0 : };
1595 0 : let shard_zero = targets.remove(0);
1596 :
1597 0 : async fn create_one(
1598 0 : tenant_shard_id: TenantShardId,
1599 0 : node: Node,
1600 0 : jwt: Option<String>,
1601 0 : create_req: TimelineCreateRequest,
1602 0 : ) -> Result<TimelineInfo, ApiError> {
1603 0 : tracing::info!(
1604 0 : "Creating timeline on shard {}/{}, attached to node {}",
1605 0 : tenant_shard_id,
1606 0 : create_req.new_timeline_id,
1607 0 : node.id
1608 0 : );
1609 0 : let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
1610 0 :
1611 0 : client
1612 0 : .timeline_create(tenant_shard_id, &create_req)
1613 0 : .await
1614 0 : .map_err(|e| match e {
1615 0 : mgmt_api::Error::ApiError(status, msg)
1616 0 : if status == StatusCode::INTERNAL_SERVER_ERROR
1617 0 : || status == StatusCode::NOT_ACCEPTABLE =>
1618 0 : {
1619 0 : // TODO: handle more error codes, e.g. 503 should be passed through. Make a general wrapper
1620 0 : // for pass-through API calls.
1621 0 : ApiError::InternalServerError(anyhow::anyhow!(msg))
1622 : }
1623 0 : _ => ApiError::Conflict(format!("Failed to create timeline: {e}")),
1624 0 : })
1625 0 : }
1626 :
1627 : // Because the caller might not provide an explicit LSN, we must do the creation first on a single shard, and then
1628 : // use whatever LSN that shard picked when creating on subsequent shards. We arbitrarily use shard zero as the shard
1629 : // that will get the first creation request, and propagate the LSN to all the >0 shards.
1630 0 : let timeline_info = create_one(
1631 0 : shard_zero.0,
1632 0 : shard_zero.1,
1633 0 : self.config.jwt_token.clone(),
1634 0 : create_req.clone(),
1635 0 : )
1636 0 : .await?;
1637 :
1638 : // Propagate the LSN that shard zero picked, if caller didn't provide one
1639 0 : if create_req.ancestor_timeline_id.is_some() && create_req.ancestor_start_lsn.is_none() {
1640 0 : create_req.ancestor_start_lsn = timeline_info.ancestor_lsn;
1641 0 : }
1642 :
1643 : // Create timeline on remaining shards with number >0
1644 0 : if !targets.is_empty() {
1645 : // If we had multiple shards, issue requests for the remainder now.
1646 0 : let jwt = self.config.jwt_token.clone();
1647 0 : self.tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
1648 0 : let create_req = create_req.clone();
1649 0 : Box::pin(create_one(tenant_shard_id, node, jwt.clone(), create_req))
1650 0 : })
1651 0 : .await?;
1652 0 : }
1653 :
1654 0 : Ok(timeline_info)
1655 0 : }
1656 :
1657 : /// Helper for concurrently calling a pageserver API on a number of shards, such as timeline creation.
1658 : ///
1659 : /// On success, the returned vector contains exactly the same number of elements as the input `locations`.
1660 0 : async fn tenant_for_shards<F, R>(
1661 0 : &self,
1662 0 : locations: Vec<(TenantShardId, Node)>,
1663 0 : mut req_fn: F,
1664 0 : ) -> Result<Vec<R>, ApiError>
1665 0 : where
1666 0 : F: FnMut(
1667 0 : TenantShardId,
1668 0 : Node,
1669 0 : )
1670 0 : -> std::pin::Pin<Box<dyn futures::Future<Output = Result<R, ApiError>> + Send>>,
1671 0 : {
1672 0 : let mut futs = FuturesUnordered::new();
1673 0 : let mut results = Vec::with_capacity(locations.len());
1674 :
1675 0 : for (tenant_shard_id, node) in locations {
1676 0 : futs.push(req_fn(tenant_shard_id, node));
1677 0 : }
1678 :
1679 0 : while let Some(r) = futs.next().await {
1680 0 : results.push(r?);
1681 : }
1682 :
1683 0 : Ok(results)
1684 0 : }
1685 :
1686 0 : pub(crate) async fn tenant_timeline_delete(
1687 0 : &self,
1688 0 : tenant_id: TenantId,
1689 0 : timeline_id: TimelineId,
1690 0 : ) -> Result<StatusCode, ApiError> {
1691 0 : tracing::info!("Deleting timeline {}/{}", tenant_id, timeline_id,);
1692 :
1693 0 : self.ensure_attached_wait(tenant_id).await?;
1694 :
1695 : // TODO: refuse to do this if shard splitting is in progress
1696 : // (https://github.com/neondatabase/neon/issues/6676)
1697 0 : let mut targets = {
1698 0 : let locked = self.inner.read().unwrap();
1699 0 : let mut targets = Vec::new();
1700 :
1701 0 : for (tenant_shard_id, shard) in
1702 0 : locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1703 0 : {
1704 0 : let node_id = shard.intent.get_attached().ok_or_else(|| {
1705 0 : ApiError::InternalServerError(anyhow::anyhow!("Shard not scheduled"))
1706 0 : })?;
1707 0 : let node = locked
1708 0 : .nodes
1709 0 : .get(&node_id)
1710 0 : .expect("Pageservers may not be deleted while referenced");
1711 0 :
1712 0 : targets.push((*tenant_shard_id, node.clone()));
1713 : }
1714 0 : targets
1715 0 : };
1716 0 :
1717 0 : if targets.is_empty() {
1718 0 : return Err(ApiError::NotFound(
1719 0 : anyhow::anyhow!("Tenant not found").into(),
1720 0 : ));
1721 0 : }
1722 0 : let shard_zero = targets.remove(0);
1723 :
1724 0 : async fn delete_one(
1725 0 : tenant_shard_id: TenantShardId,
1726 0 : timeline_id: TimelineId,
1727 0 : node: Node,
1728 0 : jwt: Option<String>,
1729 0 : ) -> Result<StatusCode, ApiError> {
1730 0 : tracing::info!(
1731 0 : "Deleting timeline on shard {}/{}, attached to node {}",
1732 0 : tenant_shard_id,
1733 0 : timeline_id,
1734 0 : node.id
1735 0 : );
1736 :
1737 0 : let client = mgmt_api::Client::new(node.base_url(), jwt.as_deref());
1738 0 : client
1739 0 : .timeline_delete(tenant_shard_id, timeline_id)
1740 0 : .await
1741 0 : .map_err(|e| {
1742 0 : ApiError::InternalServerError(anyhow::anyhow!(
1743 0 : "Error deleting timeline {timeline_id} on {tenant_shard_id} on node {}: {e}",
1744 0 : node.id
1745 0 : ))
1746 0 : })
1747 0 : }
1748 :
1749 0 : let statuses = self
1750 0 : .tenant_for_shards(targets, |tenant_shard_id: TenantShardId, node: Node| {
1751 0 : Box::pin(delete_one(
1752 0 : tenant_shard_id,
1753 0 : timeline_id,
1754 0 : node,
1755 0 : self.config.jwt_token.clone(),
1756 0 : ))
1757 0 : })
1758 0 : .await?;
1759 :
1760 : // If any shards >0 haven't finished deletion yet, don't start deletion on shard zero
1761 0 : if statuses.iter().any(|s| s != &StatusCode::NOT_FOUND) {
1762 0 : return Ok(StatusCode::ACCEPTED);
1763 0 : }
1764 :
1765 : // Delete shard zero last: this is not strictly necessary, but since a caller's GET on a timeline will be routed
1766 : // to shard zero, it gives a more obvious behavior that a GET returns 404 once the deletion is done.
1767 0 : let shard_zero_status = delete_one(
1768 0 : shard_zero.0,
1769 0 : timeline_id,
1770 0 : shard_zero.1,
1771 0 : self.config.jwt_token.clone(),
1772 0 : )
1773 0 : .await?;
1774 :
1775 0 : Ok(shard_zero_status)
1776 0 : }
1777 :
1778 : /// When you need to send an HTTP request to the pageserver that holds shard0 of a tenant, this
1779 : /// function looks it up and returns the url. If the tenant isn't found, returns Err(ApiError::NotFound)
1780 0 : pub(crate) fn tenant_shard0_baseurl(
1781 0 : &self,
1782 0 : tenant_id: TenantId,
1783 0 : ) -> Result<(String, TenantShardId), ApiError> {
1784 0 : let locked = self.inner.read().unwrap();
1785 0 : let Some((tenant_shard_id, shard)) = locked
1786 0 : .tenants
1787 0 : .range(TenantShardId::tenant_range(tenant_id))
1788 0 : .next()
1789 : else {
1790 0 : return Err(ApiError::NotFound(
1791 0 : anyhow::anyhow!("Tenant {tenant_id} not found").into(),
1792 0 : ));
1793 : };
1794 :
1795 : // TODO: should use the ID last published to compute_hook, rather than the intent: the intent might
1796 : // point to somewhere we haven't attached yet.
1797 0 : let Some(node_id) = shard.intent.get_attached() else {
1798 0 : tracing::warn!(
1799 0 : tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(),
1800 0 : "Shard not scheduled (policy {:?}), cannot generate pass-through URL",
1801 0 : shard.policy
1802 0 : );
1803 0 : return Err(ApiError::Conflict(
1804 0 : "Cannot call timeline API on non-attached tenant".to_string(),
1805 0 : ));
1806 : };
1807 :
1808 0 : let Some(node) = locked.nodes.get(node_id) else {
1809 : // This should never happen
1810 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1811 0 : "Shard refers to nonexistent node"
1812 0 : )));
1813 : };
1814 :
1815 0 : Ok((node.base_url(), *tenant_shard_id))
1816 0 : }
1817 :
1818 0 : pub(crate) fn tenant_locate(
1819 0 : &self,
1820 0 : tenant_id: TenantId,
1821 0 : ) -> Result<TenantLocateResponse, ApiError> {
1822 0 : let locked = self.inner.read().unwrap();
1823 0 : tracing::info!("Locating shards for tenant {tenant_id}");
1824 :
1825 : // Take a snapshot of pageservers
1826 0 : let pageservers = locked.nodes.clone();
1827 0 :
1828 0 : let mut result = Vec::new();
1829 0 : let mut shard_params: Option<ShardParameters> = None;
1830 :
1831 0 : for (tenant_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1832 : {
1833 0 : let node_id =
1834 0 : shard
1835 0 : .intent
1836 0 : .get_attached()
1837 0 : .ok_or(ApiError::BadRequest(anyhow::anyhow!(
1838 0 : "Cannot locate a tenant that is not attached"
1839 0 : )))?;
1840 :
1841 0 : let node = pageservers
1842 0 : .get(&node_id)
1843 0 : .expect("Pageservers may not be deleted while referenced");
1844 0 :
1845 0 : result.push(TenantLocateResponseShard {
1846 0 : shard_id: *tenant_shard_id,
1847 0 : node_id,
1848 0 : listen_http_addr: node.listen_http_addr.clone(),
1849 0 : listen_http_port: node.listen_http_port,
1850 0 : listen_pg_addr: node.listen_pg_addr.clone(),
1851 0 : listen_pg_port: node.listen_pg_port,
1852 0 : });
1853 0 :
1854 0 : match &shard_params {
1855 0 : None => {
1856 0 : shard_params = Some(ShardParameters {
1857 0 : stripe_size: shard.shard.stripe_size,
1858 0 : count: shard.shard.count,
1859 0 : });
1860 0 : }
1861 0 : Some(params) => {
1862 0 : if params.stripe_size != shard.shard.stripe_size {
1863 : // This should never happen. We enforce at runtime because it's simpler than
1864 : // adding an extra per-tenant data structure to store the things that should be the same
1865 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
1866 0 : "Inconsistent shard stripe size parameters!"
1867 0 : )));
1868 0 : }
1869 : }
1870 : }
1871 : }
1872 :
1873 0 : if result.is_empty() {
1874 0 : return Err(ApiError::NotFound(
1875 0 : anyhow::anyhow!("No shards for this tenant ID found").into(),
1876 0 : ));
1877 0 : }
1878 0 : let shard_params = shard_params.expect("result is non-empty, therefore this is set");
1879 0 : tracing::info!(
1880 0 : "Located tenant {} with params {:?} on shards {}",
1881 0 : tenant_id,
1882 0 : shard_params,
1883 0 : result
1884 0 : .iter()
1885 0 : .map(|s| format!("{:?}", s))
1886 0 : .collect::<Vec<_>>()
1887 0 : .join(",")
1888 0 : );
1889 :
1890 0 : Ok(TenantLocateResponse {
1891 0 : shards: result,
1892 0 : shard_params,
1893 0 : })
1894 0 : }
1895 :
1896 0 : pub(crate) async fn tenant_shard_split(
1897 0 : &self,
1898 0 : tenant_id: TenantId,
1899 0 : split_req: TenantShardSplitRequest,
1900 0 : ) -> Result<TenantShardSplitResponse, ApiError> {
1901 0 : let mut policy = None;
1902 0 : let mut shard_ident = None;
1903 :
1904 : // A parent shard which will be split
1905 : struct SplitTarget {
1906 : parent_id: TenantShardId,
1907 : node: Node,
1908 : child_ids: Vec<TenantShardId>,
1909 : }
1910 :
1911 : // Validate input, and calculate which shards we will create
1912 0 : let (old_shard_count, targets, compute_hook) =
1913 : {
1914 0 : let locked = self.inner.read().unwrap();
1915 0 :
1916 0 : let pageservers = locked.nodes.clone();
1917 0 :
1918 0 : let mut targets = Vec::new();
1919 0 :
1920 0 : // In case this is a retry, count how many already-split shards we found
1921 0 : let mut children_found = Vec::new();
1922 0 : let mut old_shard_count = None;
1923 :
1924 0 : for (tenant_shard_id, shard) in
1925 0 : locked.tenants.range(TenantShardId::tenant_range(tenant_id))
1926 : {
1927 0 : match shard.shard.count.count().cmp(&split_req.new_shard_count) {
1928 : Ordering::Equal => {
1929 : // Already split this
1930 0 : children_found.push(*tenant_shard_id);
1931 0 : continue;
1932 : }
1933 : Ordering::Greater => {
1934 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
1935 0 : "Requested count {} but already have shards at count {}",
1936 0 : split_req.new_shard_count,
1937 0 : shard.shard.count.count()
1938 0 : )));
1939 : }
1940 0 : Ordering::Less => {
1941 0 : // Fall through: this shard has lower count than requested,
1942 0 : // is a candidate for splitting.
1943 0 : }
1944 0 : }
1945 0 :
1946 0 : match old_shard_count {
1947 0 : None => old_shard_count = Some(shard.shard.count),
1948 0 : Some(old_shard_count) => {
1949 0 : if old_shard_count != shard.shard.count {
1950 : // We may hit this case if a caller asked for two splits to
1951 : // different sizes, before the first one is complete.
1952 : // e.g. 1->2, 2->4, where the 4 call comes while we have a mixture
1953 : // of shard_count=1 and shard_count=2 shards in the map.
1954 0 : return Err(ApiError::Conflict(
1955 0 : "Cannot split, currently mid-split".to_string(),
1956 0 : ));
1957 0 : }
1958 : }
1959 : }
1960 0 : if policy.is_none() {
1961 0 : policy = Some(shard.policy.clone());
1962 0 : }
1963 0 : if shard_ident.is_none() {
1964 0 : shard_ident = Some(shard.shard);
1965 0 : }
1966 :
1967 0 : if tenant_shard_id.shard_count.count() == split_req.new_shard_count {
1968 0 : tracing::info!(
1969 0 : "Tenant shard {} already has shard count {}",
1970 0 : tenant_shard_id,
1971 0 : split_req.new_shard_count
1972 0 : );
1973 0 : continue;
1974 0 : }
1975 :
1976 0 : let node_id = shard.intent.get_attached().ok_or(ApiError::BadRequest(
1977 0 : anyhow::anyhow!("Cannot split a tenant that is not attached"),
1978 0 : ))?;
1979 :
1980 0 : let node = pageservers
1981 0 : .get(&node_id)
1982 0 : .expect("Pageservers may not be deleted while referenced");
1983 0 :
1984 0 : // TODO: if any reconciliation is currently in progress for this shard, wait for it.
1985 0 :
1986 0 : targets.push(SplitTarget {
1987 0 : parent_id: *tenant_shard_id,
1988 0 : node: node.clone(),
1989 0 : child_ids: tenant_shard_id
1990 0 : .split(ShardCount::new(split_req.new_shard_count)),
1991 0 : });
1992 : }
1993 :
1994 0 : if targets.is_empty() {
1995 0 : if children_found.len() == split_req.new_shard_count as usize {
1996 0 : return Ok(TenantShardSplitResponse {
1997 0 : new_shards: children_found,
1998 0 : });
1999 : } else {
2000 : // No shards found to split, and no existing children found: the
2001 : // tenant doesn't exist at all.
2002 0 : return Err(ApiError::NotFound(
2003 0 : anyhow::anyhow!("Tenant {} not found", tenant_id).into(),
2004 0 : ));
2005 : }
2006 0 : }
2007 0 :
2008 0 : (old_shard_count, targets, locked.compute_hook.clone())
2009 0 : };
2010 0 :
2011 0 : // unwrap safety: we would have returned above if we didn't find at least one shard to split
2012 0 : let old_shard_count = old_shard_count.unwrap();
2013 0 : let shard_ident = shard_ident.unwrap();
2014 0 : let policy = policy.unwrap();
2015 0 :
2016 0 : // FIXME: we have dropped self.inner lock, and not yet written anything to the database: another
2017 0 : // request could occur here, deleting or mutating the tenant. begin_shard_split checks that the
2018 0 : // parent shards exist as expected, but it would be neater to do the above pre-checks within the
2019 0 : // same database transaction rather than pre-check in-memory and then maybe-fail the database write.
2020 0 : // (https://github.com/neondatabase/neon/issues/6676)
2021 0 :
2022 0 : // Before creating any new child shards in memory or on the pageservers, persist them: this
2023 0 : // enables us to ensure that we will always be able to clean up if something goes wrong. This also
2024 0 : // acts as the protection against two concurrent attempts to split: one of them will get a database
2025 0 : // error trying to insert the child shards.
2026 0 : let mut child_tsps = Vec::new();
2027 0 : for target in &targets {
2028 0 : let mut this_child_tsps = Vec::new();
2029 0 : for child in &target.child_ids {
2030 0 : let mut child_shard = shard_ident;
2031 0 : child_shard.number = child.shard_number;
2032 0 : child_shard.count = child.shard_count;
2033 0 :
2034 0 : this_child_tsps.push(TenantShardPersistence {
2035 0 : tenant_id: child.tenant_id.to_string(),
2036 0 : shard_number: child.shard_number.0 as i32,
2037 0 : shard_count: child.shard_count.literal() as i32,
2038 0 : shard_stripe_size: shard_ident.stripe_size.0 as i32,
2039 0 : // Note: this generation is a placeholder, [`Persistence::begin_shard_split`] will
2040 0 : // populate the correct generation as part of its transaction, to protect us
2041 0 : // against racing with changes in the state of the parent.
2042 0 : generation: 0,
2043 0 : generation_pageserver: target.node.id.0 as i64,
2044 0 : placement_policy: serde_json::to_string(&policy).unwrap(),
2045 0 : // TODO: get the config out of the map
2046 0 : config: serde_json::to_string(&TenantConfig::default()).unwrap(),
2047 0 : splitting: SplitState::Splitting,
2048 0 : });
2049 0 : }
2050 :
2051 0 : child_tsps.push((target.parent_id, this_child_tsps));
2052 : }
2053 :
2054 0 : if let Err(e) = self
2055 0 : .persistence
2056 0 : .begin_shard_split(old_shard_count, tenant_id, child_tsps)
2057 0 : .await
2058 : {
2059 0 : match e {
2060 : DatabaseError::Query(diesel::result::Error::DatabaseError(
2061 : DatabaseErrorKind::UniqueViolation,
2062 : _,
2063 : )) => {
2064 : // Inserting a child shard violated a unique constraint: we raced with another call to
2065 : // this function
2066 0 : tracing::warn!("Conflicting attempt to split {tenant_id}: {e}");
2067 0 : return Err(ApiError::Conflict("Tenant is already splitting".into()));
2068 : }
2069 0 : _ => return Err(ApiError::InternalServerError(e.into())),
2070 : }
2071 0 : }
2072 0 :
2073 0 : // Now that I have persisted the splitting state, apply it in-memory. This is infallible, so
2074 0 : // callers may assume that if splitting is set in memory, then it was persisted, and if splitting
2075 0 : // is not set in memory, then it was not persisted.
2076 0 : {
2077 0 : let mut locked = self.inner.write().unwrap();
2078 0 : for target in &targets {
2079 0 : if let Some(parent_shard) = locked.tenants.get_mut(&target.parent_id) {
2080 0 : parent_shard.splitting = SplitState::Splitting;
2081 0 : }
2082 : }
2083 : }
2084 :
2085 : // FIXME: we have now committed the shard split state to the database, so any subsequent
2086 : // failure needs to roll it back. We will later wrap this function in logic to roll back
2087 : // the split if it fails.
2088 : // (https://github.com/neondatabase/neon/issues/6676)
2089 :
2090 : // TODO: issue split calls concurrently (this only matters once we're splitting
2091 : // N>1 shards into M shards -- initially we're usually splitting 1 shard into N).
2092 :
2093 0 : for target in &targets {
2094 : let SplitTarget {
2095 0 : parent_id,
2096 0 : node,
2097 0 : child_ids,
2098 0 : } = target;
2099 0 : let client = mgmt_api::Client::new(node.base_url(), self.config.jwt_token.as_deref());
2100 0 : let response = client
2101 0 : .tenant_shard_split(
2102 0 : *parent_id,
2103 0 : TenantShardSplitRequest {
2104 0 : new_shard_count: split_req.new_shard_count,
2105 0 : },
2106 0 : )
2107 0 : .await
2108 0 : .map_err(|e| ApiError::Conflict(format!("Failed to split {}: {}", parent_id, e)))?;
2109 :
2110 0 : tracing::info!(
2111 0 : "Split {} into {}",
2112 0 : parent_id,
2113 0 : response
2114 0 : .new_shards
2115 0 : .iter()
2116 0 : .map(|s| format!("{:?}", s))
2117 0 : .collect::<Vec<_>>()
2118 0 : .join(",")
2119 0 : );
2120 :
2121 0 : if &response.new_shards != child_ids {
2122 : // This should never happen: the pageserver should agree with us on how shard splits work.
2123 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
2124 0 : "Splitting shard {} resulted in unexpected IDs: {:?} (expected {:?})",
2125 0 : parent_id,
2126 0 : response.new_shards,
2127 0 : child_ids
2128 0 : )));
2129 0 : }
2130 : }
2131 :
2132 : // TODO: if the pageserver restarted concurrently with our split API call,
2133 : // the actual generation of the child shard might differ from the generation
2134 : // we expect it to have. In order for our in-database generation to end up
2135 : // correct, we should carry the child generation back in the response and apply it here
2136 : // in complete_shard_split (and apply the correct generation in memory)
2137 : // (or, we can carry generation in the request and reject the request if
2138 : // it doesn't match, but that requires more retry logic on this side)
2139 :
2140 0 : self.persistence
2141 0 : .complete_shard_split(tenant_id, old_shard_count)
2142 0 : .await?;
2143 :
2144 : // Replace all the shards we just split with their children: this phase is infallible.
2145 0 : let mut response = TenantShardSplitResponse {
2146 0 : new_shards: Vec::new(),
2147 0 : };
2148 0 : let mut child_locations = Vec::new();
2149 0 : {
2150 0 : let mut locked = self.inner.write().unwrap();
2151 0 : let (_nodes, tenants, scheduler) = locked.parts_mut();
2152 0 : for target in targets {
2153 : let SplitTarget {
2154 0 : parent_id,
2155 0 : node: _node,
2156 0 : child_ids,
2157 0 : } = target;
2158 0 : let (pageserver, generation, config) = {
2159 0 : let mut old_state = tenants
2160 0 : .remove(&parent_id)
2161 0 : .expect("It was present, we just split it");
2162 0 : let old_attached = old_state.intent.get_attached().unwrap();
2163 0 : old_state.intent.clear(scheduler);
2164 0 : (old_attached, old_state.generation, old_state.config.clone())
2165 0 : };
2166 :
2167 0 : for child in child_ids {
2168 0 : let mut child_shard = shard_ident;
2169 0 : child_shard.number = child.shard_number;
2170 0 : child_shard.count = child.shard_count;
2171 0 :
2172 0 : let mut child_observed: HashMap<NodeId, ObservedStateLocation> = HashMap::new();
2173 0 : child_observed.insert(
2174 0 : pageserver,
2175 0 : ObservedStateLocation {
2176 0 : conf: Some(attached_location_conf(generation, &child_shard, &config)),
2177 0 : },
2178 0 : );
2179 0 :
2180 0 : let mut child_state = TenantState::new(child, child_shard, policy.clone());
2181 0 : child_state.intent = IntentState::single(scheduler, Some(pageserver));
2182 0 : child_state.observed = ObservedState {
2183 0 : locations: child_observed,
2184 0 : };
2185 0 : child_state.generation = generation;
2186 0 : child_state.config = config.clone();
2187 0 :
2188 0 : // The child's TenantState::splitting is intentionally left at the default value of Idle,
2189 0 : // as at this point in the split process we have succeeded and this part is infallible:
2190 0 : // we will never need to do any special recovery from this state.
2191 0 :
2192 0 : child_locations.push((child, pageserver));
2193 0 :
2194 0 : tenants.insert(child, child_state);
2195 0 : response.new_shards.push(child);
2196 0 : }
2197 : }
2198 : }
2199 :
2200 : // Send compute notifications for all the new shards
2201 0 : let mut failed_notifications = Vec::new();
2202 0 : for (child_id, child_ps) in child_locations {
2203 0 : if let Err(e) = compute_hook.notify(child_id, child_ps, &self.cancel).await {
2204 0 : tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
2205 0 : child_id, child_ps);
2206 0 : failed_notifications.push(child_id);
2207 0 : }
2208 : }
2209 :
2210 : // If we failed any compute notifications, make a note to retry later.
2211 0 : if !failed_notifications.is_empty() {
2212 0 : let mut locked = self.inner.write().unwrap();
2213 0 : for failed in failed_notifications {
2214 0 : if let Some(shard) = locked.tenants.get_mut(&failed) {
2215 0 : shard.pending_compute_notification = true;
2216 0 : }
2217 : }
2218 0 : }
2219 :
2220 0 : Ok(response)
2221 0 : }
2222 :
2223 0 : pub(crate) async fn tenant_shard_migrate(
2224 0 : &self,
2225 0 : tenant_shard_id: TenantShardId,
2226 0 : migrate_req: TenantShardMigrateRequest,
2227 0 : ) -> Result<TenantShardMigrateResponse, ApiError> {
2228 0 : let waiter = {
2229 0 : let mut locked = self.inner.write().unwrap();
2230 0 : let result_tx = locked.result_tx.clone();
2231 0 : let compute_hook = locked.compute_hook.clone();
2232 0 : let (nodes, tenants, scheduler) = locked.parts_mut();
2233 :
2234 0 : let Some(shard) = tenants.get_mut(&tenant_shard_id) else {
2235 0 : return Err(ApiError::NotFound(
2236 0 : anyhow::anyhow!("Tenant shard not found").into(),
2237 0 : ));
2238 : };
2239 :
2240 0 : if shard.intent.get_attached() == &Some(migrate_req.node_id) {
2241 : // No-op case: we will still proceed to wait for reconciliation in case it is
2242 : // incomplete from an earlier update to the intent.
2243 0 : tracing::info!("Migrating: intent is unchanged {:?}", shard.intent);
2244 : } else {
2245 0 : let old_attached = *shard.intent.get_attached();
2246 0 :
2247 0 : match shard.policy {
2248 0 : PlacementPolicy::Single => {
2249 0 : shard.intent.clear_secondary(scheduler);
2250 0 : }
2251 0 : PlacementPolicy::Double(_n) => {
2252 0 : // If our new attached node was a secondary, it no longer should be.
2253 0 : shard.intent.remove_secondary(scheduler, migrate_req.node_id);
2254 :
2255 : // If we were already attached to something, demote that to a secondary
2256 0 : if let Some(old_attached) = old_attached {
2257 0 : shard.intent.push_secondary(scheduler, old_attached);
2258 0 : }
2259 : }
2260 : PlacementPolicy::Detached => {
2261 0 : return Err(ApiError::BadRequest(anyhow::anyhow!(
2262 0 : "Cannot migrate a tenant that is PlacementPolicy::Detached: configure it to an attached policy first"
2263 0 : )))
2264 : }
2265 : }
2266 0 : shard
2267 0 : .intent
2268 0 : .set_attached(scheduler, Some(migrate_req.node_id));
2269 :
2270 0 : tracing::info!("Migrating: new intent {:?}", shard.intent);
2271 0 : shard.sequence = shard.sequence.next();
2272 : }
2273 :
2274 0 : shard.maybe_reconcile(
2275 0 : result_tx,
2276 0 : nodes,
2277 0 : &compute_hook,
2278 0 : &self.config,
2279 0 : &self.persistence,
2280 0 : &self.gate,
2281 0 : &self.cancel,
2282 0 : )
2283 : };
2284 :
2285 0 : if let Some(waiter) = waiter {
2286 0 : waiter.wait_timeout(RECONCILE_TIMEOUT).await?;
2287 : } else {
2288 0 : tracing::warn!("Migration is a no-op");
2289 : }
2290 :
2291 0 : Ok(TenantShardMigrateResponse {})
2292 0 : }
2293 :
2294 : /// This is for debug/support only: we simply drop all state for a tenant, without
2295 : /// detaching or deleting it on pageservers.
2296 0 : pub(crate) async fn tenant_drop(&self, tenant_id: TenantId) -> Result<(), ApiError> {
2297 0 : self.persistence.delete_tenant(tenant_id).await?;
2298 :
2299 0 : let mut locked = self.inner.write().unwrap();
2300 0 : let (_nodes, tenants, scheduler) = locked.parts_mut();
2301 0 : let mut shards = Vec::new();
2302 0 : for (tenant_shard_id, _) in tenants.range(TenantShardId::tenant_range(tenant_id)) {
2303 0 : shards.push(*tenant_shard_id);
2304 0 : }
2305 :
2306 0 : for shard_id in shards {
2307 0 : if let Some(mut shard) = tenants.remove(&shard_id) {
2308 0 : shard.intent.clear(scheduler);
2309 0 : }
2310 : }
2311 :
2312 0 : Ok(())
2313 0 : }
2314 :
2315 : /// For debug/support: a full JSON dump of TenantStates. Returns a response so that
2316 : /// we don't have to make TenantState clonable in the return path.
2317 0 : pub(crate) fn tenants_dump(&self) -> Result<hyper::Response<hyper::Body>, ApiError> {
2318 0 : let serialized = {
2319 0 : let locked = self.inner.read().unwrap();
2320 0 : let result = locked.tenants.values().collect::<Vec<_>>();
2321 0 : serde_json::to_string(&result).map_err(|e| ApiError::InternalServerError(e.into()))?
2322 : };
2323 :
2324 0 : hyper::Response::builder()
2325 0 : .status(hyper::StatusCode::OK)
2326 0 : .header(hyper::header::CONTENT_TYPE, "application/json")
2327 0 : .body(hyper::Body::from(serialized))
2328 0 : .map_err(|e| ApiError::InternalServerError(e.into()))
2329 0 : }
2330 :
2331 : /// Check the consistency of in-memory state vs. persistent state, and check that the
2332 : /// scheduler's statistics are up to date.
2333 : ///
2334 : /// These consistency checks expect an **idle** system. If changes are going on while
2335 : /// we run, then we can falsely indicate a consistency issue. This is sufficient for end-of-test
2336 : /// checks, but not suitable for running continuously in the background in the field.
2337 0 : pub(crate) async fn consistency_check(&self) -> Result<(), ApiError> {
2338 0 : let (mut expect_nodes, mut expect_shards) = {
2339 0 : let locked = self.inner.read().unwrap();
2340 0 :
2341 0 : locked
2342 0 : .scheduler
2343 0 : .consistency_check(locked.nodes.values(), locked.tenants.values())
2344 0 : .context("Scheduler checks")
2345 0 : .map_err(ApiError::InternalServerError)?;
2346 :
2347 0 : let expect_nodes = locked
2348 0 : .nodes
2349 0 : .values()
2350 0 : .map(|n| n.to_persistent())
2351 0 : .collect::<Vec<_>>();
2352 0 :
2353 0 : let expect_shards = locked
2354 0 : .tenants
2355 0 : .values()
2356 0 : .map(|t| t.to_persistent())
2357 0 : .collect::<Vec<_>>();
2358 0 :
2359 0 : (expect_nodes, expect_shards)
2360 : };
2361 :
2362 0 : let mut nodes = self.persistence.list_nodes().await?;
2363 0 : expect_nodes.sort_by_key(|n| n.node_id);
2364 0 : nodes.sort_by_key(|n| n.node_id);
2365 0 :
2366 0 : if nodes != expect_nodes {
2367 0 : tracing::error!("Consistency check failed on nodes.");
2368 0 : tracing::error!(
2369 0 : "Nodes in memory: {}",
2370 0 : serde_json::to_string(&expect_nodes)
2371 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?
2372 0 : );
2373 0 : tracing::error!(
2374 0 : "Nodes in database: {}",
2375 0 : serde_json::to_string(&nodes)
2376 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?
2377 0 : );
2378 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
2379 0 : "Node consistency failure"
2380 0 : )));
2381 0 : }
2382 :
2383 0 : let mut shards = self.persistence.list_tenant_shards().await?;
2384 0 : shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
2385 0 : expect_shards.sort_by_key(|tsp| (tsp.tenant_id.clone(), tsp.shard_number, tsp.shard_count));
2386 0 :
2387 0 : if shards != expect_shards {
2388 0 : tracing::error!("Consistency check failed on shards.");
2389 0 : tracing::error!(
2390 0 : "Shards in memory: {}",
2391 0 : serde_json::to_string(&expect_shards)
2392 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?
2393 0 : );
2394 0 : tracing::error!(
2395 0 : "Shards in database: {}",
2396 0 : serde_json::to_string(&shards)
2397 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?
2398 0 : );
2399 0 : return Err(ApiError::InternalServerError(anyhow::anyhow!(
2400 0 : "Shard consistency failure"
2401 0 : )));
2402 0 : }
2403 0 :
2404 0 : Ok(())
2405 0 : }
2406 :
2407 : /// For debug/support: a JSON dump of the [`Scheduler`]. Returns a response so that
2408 : /// we don't have to make TenantState clonable in the return path.
2409 0 : pub(crate) fn scheduler_dump(&self) -> Result<hyper::Response<hyper::Body>, ApiError> {
2410 0 : let serialized = {
2411 0 : let locked = self.inner.read().unwrap();
2412 0 : serde_json::to_string(&locked.scheduler)
2413 0 : .map_err(|e| ApiError::InternalServerError(e.into()))?
2414 : };
2415 :
2416 0 : hyper::Response::builder()
2417 0 : .status(hyper::StatusCode::OK)
2418 0 : .header(hyper::header::CONTENT_TYPE, "application/json")
2419 0 : .body(hyper::Body::from(serialized))
2420 0 : .map_err(|e| ApiError::InternalServerError(e.into()))
2421 0 : }
2422 :
2423 : /// This is for debug/support only: we simply drop all state for a tenant, without
2424 : /// detaching or deleting it on pageservers. We do not try and re-schedule any
2425 : /// tenants that were on this node.
2426 : ///
2427 : /// TODO: proper node deletion API that unhooks things more gracefully
2428 0 : pub(crate) async fn node_drop(&self, node_id: NodeId) -> Result<(), ApiError> {
2429 0 : self.persistence.delete_node(node_id).await?;
2430 :
2431 0 : let mut locked = self.inner.write().unwrap();
2432 :
2433 0 : for shard in locked.tenants.values_mut() {
2434 0 : shard.deref_node(node_id);
2435 0 : }
2436 :
2437 0 : let mut nodes = (*locked.nodes).clone();
2438 0 : nodes.remove(&node_id);
2439 0 : locked.nodes = Arc::new(nodes);
2440 0 :
2441 0 : locked.scheduler.node_remove(node_id);
2442 0 :
2443 0 : Ok(())
2444 0 : }
2445 :
2446 0 : pub(crate) async fn node_list(&self) -> Result<Vec<Node>, ApiError> {
2447 0 : let nodes = {
2448 0 : self.inner
2449 0 : .read()
2450 0 : .unwrap()
2451 0 : .nodes
2452 0 : .values()
2453 0 : .cloned()
2454 0 : .collect::<Vec<_>>()
2455 0 : };
2456 0 :
2457 0 : Ok(nodes)
2458 0 : }
2459 :
2460 0 : pub(crate) async fn node_register(
2461 0 : &self,
2462 0 : register_req: NodeRegisterRequest,
2463 0 : ) -> Result<(), ApiError> {
2464 0 : // Pre-check for an already-existing node
2465 0 : {
2466 0 : let locked = self.inner.read().unwrap();
2467 0 : if let Some(node) = locked.nodes.get(®ister_req.node_id) {
2468 : // Note that we do not do a total equality of the struct, because we don't require
2469 : // the availability/scheduling states to agree for a POST to be idempotent.
2470 0 : if node.listen_http_addr == register_req.listen_http_addr
2471 0 : && node.listen_http_port == register_req.listen_http_port
2472 0 : && node.listen_pg_addr == register_req.listen_pg_addr
2473 0 : && node.listen_pg_port == register_req.listen_pg_port
2474 : {
2475 0 : tracing::info!(
2476 0 : "Node {} re-registered with matching address",
2477 0 : register_req.node_id
2478 0 : );
2479 0 : return Ok(());
2480 : } else {
2481 : // TODO: decide if we want to allow modifying node addresses without removing and re-adding
2482 : // the node. Safest/simplest thing is to refuse it, and usually we deploy with
2483 : // a fixed address through the lifetime of a node.
2484 0 : tracing::warn!(
2485 0 : "Node {} tried to register with different address",
2486 0 : register_req.node_id
2487 0 : );
2488 0 : return Err(ApiError::Conflict(
2489 0 : "Node is already registered with different address".to_string(),
2490 0 : ));
2491 : }
2492 0 : }
2493 0 : }
2494 0 :
2495 0 : // Ordering: we must persist the new node _before_ adding it to in-memory state.
2496 0 : // This ensures that before we use it for anything or expose it via any external
2497 0 : // API, it is guaranteed to be available after a restart.
2498 0 : let new_node = Node {
2499 0 : id: register_req.node_id,
2500 0 : listen_http_addr: register_req.listen_http_addr,
2501 0 : listen_http_port: register_req.listen_http_port,
2502 0 : listen_pg_addr: register_req.listen_pg_addr,
2503 0 : listen_pg_port: register_req.listen_pg_port,
2504 0 : scheduling: NodeSchedulingPolicy::Filling,
2505 0 : // TODO: we shouldn't really call this Active until we've heartbeated it.
2506 0 : availability: NodeAvailability::Active,
2507 0 : };
2508 0 : // TODO: idempotency if the node already exists in the database
2509 0 : self.persistence.insert_node(&new_node).await?;
2510 :
2511 0 : let mut locked = self.inner.write().unwrap();
2512 0 : let mut new_nodes = (*locked.nodes).clone();
2513 0 :
2514 0 : locked.scheduler.node_upsert(&new_node);
2515 0 : new_nodes.insert(register_req.node_id, new_node);
2516 0 :
2517 0 : locked.nodes = Arc::new(new_nodes);
2518 :
2519 0 : tracing::info!(
2520 0 : "Registered pageserver {}, now have {} pageservers",
2521 0 : register_req.node_id,
2522 0 : locked.nodes.len()
2523 0 : );
2524 0 : Ok(())
2525 0 : }
2526 :
2527 0 : pub(crate) async fn node_configure(
2528 0 : &self,
2529 0 : config_req: NodeConfigureRequest,
2530 0 : ) -> Result<(), ApiError> {
2531 0 : if let Some(scheduling) = config_req.scheduling {
2532 : // Scheduling is a persistent part of Node: we must write updates to the database before
2533 : // applying them in memory
2534 0 : self.persistence
2535 0 : .update_node(config_req.node_id, scheduling)
2536 0 : .await?;
2537 0 : }
2538 :
2539 0 : let mut locked = self.inner.write().unwrap();
2540 0 : let result_tx = locked.result_tx.clone();
2541 0 : let compute_hook = locked.compute_hook.clone();
2542 0 : let (nodes, tenants, scheduler) = locked.parts_mut();
2543 0 :
2544 0 : let mut new_nodes = (**nodes).clone();
2545 :
2546 0 : let Some(node) = new_nodes.get_mut(&config_req.node_id) else {
2547 0 : return Err(ApiError::NotFound(
2548 0 : anyhow::anyhow!("Node not registered").into(),
2549 0 : ));
2550 : };
2551 :
2552 0 : let mut offline_transition = false;
2553 0 : let mut active_transition = false;
2554 :
2555 0 : if let Some(availability) = &config_req.availability {
2556 0 : match (availability, &node.availability) {
2557 : (NodeAvailability::Offline, NodeAvailability::Active) => {
2558 0 : tracing::info!("Node {} transition to offline", config_req.node_id);
2559 0 : offline_transition = true;
2560 : }
2561 : (NodeAvailability::Active, NodeAvailability::Offline) => {
2562 0 : tracing::info!("Node {} transition to active", config_req.node_id);
2563 0 : active_transition = true;
2564 : }
2565 : _ => {
2566 0 : tracing::info!("Node {} no change during config", config_req.node_id);
2567 : // No change
2568 : }
2569 : };
2570 0 : node.availability = *availability;
2571 0 : }
2572 :
2573 0 : if let Some(scheduling) = config_req.scheduling {
2574 0 : node.scheduling = scheduling;
2575 0 :
2576 0 : // TODO: once we have a background scheduling ticker for fill/drain, kick it
2577 0 : // to wake up and start working.
2578 0 : }
2579 :
2580 : // Update the scheduler, in case the elegibility of the node for new shards has changed
2581 0 : scheduler.node_upsert(node);
2582 0 :
2583 0 : let new_nodes = Arc::new(new_nodes);
2584 0 :
2585 0 : if offline_transition {
2586 0 : let mut tenants_affected: usize = 0;
2587 0 : for (tenant_shard_id, tenant_state) in tenants {
2588 0 : if let Some(observed_loc) =
2589 0 : tenant_state.observed.locations.get_mut(&config_req.node_id)
2590 0 : {
2591 0 : // When a node goes offline, we set its observed configuration to None, indicating unknown: we will
2592 0 : // not assume our knowledge of the node's configuration is accurate until it comes back online
2593 0 : observed_loc.conf = None;
2594 0 : }
2595 :
2596 0 : if tenant_state.intent.notify_offline(config_req.node_id) {
2597 0 : tenant_state.sequence = tenant_state.sequence.next();
2598 0 : match tenant_state.schedule(scheduler) {
2599 0 : Err(e) => {
2600 : // It is possible that some tenants will become unschedulable when too many pageservers
2601 : // go offline: in this case there isn't much we can do other than make the issue observable.
2602 : // TODO: give TenantState a scheduling error attribute to be queried later.
2603 0 : tracing::warn!(%tenant_shard_id, "Scheduling error when marking pageserver {} offline: {e}", config_req.node_id);
2604 : }
2605 : Ok(()) => {
2606 0 : if tenant_state
2607 0 : .maybe_reconcile(
2608 0 : result_tx.clone(),
2609 0 : &new_nodes,
2610 0 : &compute_hook,
2611 0 : &self.config,
2612 0 : &self.persistence,
2613 0 : &self.gate,
2614 0 : &self.cancel,
2615 0 : )
2616 0 : .is_some()
2617 0 : {
2618 0 : tenants_affected += 1;
2619 0 : };
2620 : }
2621 : }
2622 0 : }
2623 : }
2624 0 : tracing::info!(
2625 0 : "Launched {} reconciler tasks for tenants affected by node {} going offline",
2626 0 : tenants_affected,
2627 0 : config_req.node_id
2628 0 : )
2629 0 : }
2630 :
2631 0 : if active_transition {
2632 : // When a node comes back online, we must reconcile any tenant that has a None observed
2633 : // location on the node.
2634 0 : for tenant_state in locked.tenants.values_mut() {
2635 0 : if let Some(observed_loc) =
2636 0 : tenant_state.observed.locations.get_mut(&config_req.node_id)
2637 : {
2638 0 : if observed_loc.conf.is_none() {
2639 0 : tenant_state.maybe_reconcile(
2640 0 : result_tx.clone(),
2641 0 : &new_nodes,
2642 0 : &compute_hook,
2643 0 : &self.config,
2644 0 : &self.persistence,
2645 0 : &self.gate,
2646 0 : &self.cancel,
2647 0 : );
2648 0 : }
2649 0 : }
2650 : }
2651 :
2652 : // TODO: in the background, we should balance work back onto this pageserver
2653 0 : }
2654 :
2655 0 : locked.nodes = new_nodes;
2656 0 :
2657 0 : Ok(())
2658 0 : }
2659 :
2660 : /// Helper for methods that will try and call pageserver APIs for
2661 : /// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
2662 : /// is attached somewhere.
2663 0 : fn ensure_attached_schedule(
2664 0 : &self,
2665 0 : mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,
2666 0 : tenant_id: TenantId,
2667 0 : ) -> Result<Vec<ReconcilerWaiter>, anyhow::Error> {
2668 0 : let mut waiters = Vec::new();
2669 0 : let result_tx = locked.result_tx.clone();
2670 0 : let compute_hook = locked.compute_hook.clone();
2671 0 : let (nodes, tenants, scheduler) = locked.parts_mut();
2672 :
2673 0 : for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
2674 0 : shard.schedule(scheduler)?;
2675 :
2676 0 : if let Some(waiter) = shard.maybe_reconcile(
2677 0 : result_tx.clone(),
2678 0 : nodes,
2679 0 : &compute_hook,
2680 0 : &self.config,
2681 0 : &self.persistence,
2682 0 : &self.gate,
2683 0 : &self.cancel,
2684 0 : ) {
2685 0 : waiters.push(waiter);
2686 0 : }
2687 : }
2688 0 : Ok(waiters)
2689 0 : }
2690 :
2691 0 : async fn ensure_attached_wait(&self, tenant_id: TenantId) -> Result<(), ApiError> {
2692 0 : let ensure_waiters = {
2693 0 : let locked = self.inner.write().unwrap();
2694 :
2695 : // Check if the tenant is splitting: in this case, even if it is attached,
2696 : // we must act as if it is not: this blocks e.g. timeline creation/deletion
2697 : // operations during the split.
2698 0 : for (_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
2699 0 : if !matches!(shard.splitting, SplitState::Idle) {
2700 0 : return Err(ApiError::ResourceUnavailable(
2701 0 : "Tenant shards are currently splitting".into(),
2702 0 : ));
2703 0 : }
2704 : }
2705 :
2706 0 : self.ensure_attached_schedule(locked, tenant_id)
2707 0 : .map_err(ApiError::InternalServerError)?
2708 : };
2709 :
2710 0 : let deadline = Instant::now().checked_add(Duration::from_secs(5)).unwrap();
2711 0 : for waiter in ensure_waiters {
2712 0 : let timeout = deadline.duration_since(Instant::now());
2713 0 : waiter.wait_timeout(timeout).await?;
2714 : }
2715 :
2716 0 : Ok(())
2717 0 : }
2718 :
2719 : /// Check all tenants for pending reconciliation work, and reconcile those in need
2720 : ///
2721 : /// Returns how many reconciliation tasks were started
2722 0 : fn reconcile_all(&self) -> usize {
2723 0 : let mut locked = self.inner.write().unwrap();
2724 0 : let result_tx = locked.result_tx.clone();
2725 0 : let compute_hook = locked.compute_hook.clone();
2726 0 : let pageservers = locked.nodes.clone();
2727 0 : locked
2728 0 : .tenants
2729 0 : .iter_mut()
2730 0 : .filter_map(|(_tenant_shard_id, shard)| {
2731 0 : shard.maybe_reconcile(
2732 0 : result_tx.clone(),
2733 0 : &pageservers,
2734 0 : &compute_hook,
2735 0 : &self.config,
2736 0 : &self.persistence,
2737 0 : &self.gate,
2738 0 : &self.cancel,
2739 0 : )
2740 0 : })
2741 0 : .count()
2742 0 : }
2743 :
2744 0 : pub async fn shutdown(&self) {
2745 0 : // Note that this already stops processing any results from reconciles: so
2746 0 : // we do not expect that our [`TenantState`] objects will reach a neat
2747 0 : // final state.
2748 0 : self.cancel.cancel();
2749 0 :
2750 0 : // The cancellation tokens in [`crate::reconciler::Reconciler`] are children
2751 0 : // of our cancellation token, so we do not need to explicitly cancel each of
2752 0 : // them.
2753 0 :
2754 0 : // Background tasks and reconcilers hold gate guards: this waits for them all
2755 0 : // to complete.
2756 0 : self.gate.close().await;
2757 0 : }
2758 : }
|