Line data Source code
1 : use std::borrow::Cow;
2 : use std::collections::HashMap;
3 : use std::error::Error as _;
4 : use std::sync::Arc;
5 : use std::time::Duration;
6 :
7 : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
8 : use control_plane::local_env::LocalEnv;
9 : use futures::StreamExt;
10 : use hyper::StatusCode;
11 : use pageserver_api::controller_api::AvailabilityZone;
12 : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
13 : use postgres_connection::parse_host_port;
14 : use serde::{Deserialize, Serialize};
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::{Instrument, info_span};
17 : use utils::backoff::{self};
18 : use utils::id::{NodeId, TenantId};
19 :
20 : use crate::service::Config;
21 :
22 : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
23 :
24 : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
25 :
26 : pub(crate) const API_CONCURRENCY: usize = 32;
27 :
28 : struct UnshardedComputeHookTenant {
29 : // Which node is this tenant attached to
30 : node_id: NodeId,
31 :
32 : // The tenant's preferred AZ, so that we may pass this on to the control plane
33 : preferred_az: Option<AvailabilityZone>,
34 :
35 : // Must hold this lock to send a notification.
36 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
37 : }
38 : struct ShardedComputeHookTenant {
39 : stripe_size: ShardStripeSize,
40 : shard_count: ShardCount,
41 : shards: Vec<(ShardNumber, NodeId)>,
42 :
43 : // The tenant's preferred AZ, so that we may pass this on to the control plane
44 : preferred_az: Option<AvailabilityZone>,
45 :
46 : // Must hold this lock to send a notification. The contents represent
47 : // the last successfully sent notification, and are used to coalesce multiple
48 : // updates by only sending when there is a chance since our last successful send.
49 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
50 : }
51 :
52 : /// Represents our knowledge of the compute's state: we can update this when we get a
53 : /// response from a notify API call, which tells us what has been applied.
54 : ///
55 : /// Should be wrapped in an Option<>, as we cannot always know the remote state.
56 : #[derive(PartialEq, Eq, Debug)]
57 : struct ComputeRemoteState {
58 : // The request body which was acked by the compute
59 : request: ComputeHookNotifyRequest,
60 :
61 : // Whether the cplane indicated that the state was applied to running computes, or just
62 : // persisted. In the Neon control plane, this is the difference between a 423 response (meaning
63 : // persisted but not applied), and a 2xx response (both persisted and applied)
64 : applied: bool,
65 : }
66 :
67 : enum ComputeHookTenant {
68 : Unsharded(UnshardedComputeHookTenant),
69 : Sharded(ShardedComputeHookTenant),
70 : }
71 :
72 : impl ComputeHookTenant {
73 : /// Construct with at least one shard's information
74 2 : fn new(
75 2 : tenant_shard_id: TenantShardId,
76 2 : stripe_size: ShardStripeSize,
77 2 : preferred_az: Option<AvailabilityZone>,
78 2 : node_id: NodeId,
79 2 : ) -> Self {
80 2 : if tenant_shard_id.shard_count.count() > 1 {
81 1 : Self::Sharded(ShardedComputeHookTenant {
82 1 : shards: vec![(tenant_shard_id.shard_number, node_id)],
83 1 : stripe_size,
84 1 : shard_count: tenant_shard_id.shard_count,
85 1 : preferred_az,
86 1 : send_lock: Arc::default(),
87 1 : })
88 : } else {
89 1 : Self::Unsharded(UnshardedComputeHookTenant {
90 1 : node_id,
91 1 : preferred_az,
92 1 : send_lock: Arc::default(),
93 1 : })
94 : }
95 2 : }
96 :
97 4 : fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>> {
98 4 : match self {
99 2 : Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
100 2 : Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
101 : }
102 4 : }
103 :
104 0 : fn is_sharded(&self) -> bool {
105 0 : matches!(self, ComputeHookTenant::Sharded(_))
106 0 : }
107 :
108 : /// Clear compute hook state for the specified shard.
109 : /// Only valid for [`ComputeHookTenant::Sharded`] instances.
110 0 : fn remove_shard(&mut self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize) {
111 0 : match self {
112 0 : ComputeHookTenant::Sharded(sharded) => {
113 0 : if sharded.stripe_size != stripe_size
114 0 : || sharded.shard_count != tenant_shard_id.shard_count
115 : {
116 0 : tracing::warn!("Shard split detected while handling detach")
117 0 : }
118 :
119 0 : let shard_idx = sharded.shards.iter().position(|(shard_number, _node_id)| {
120 0 : *shard_number == tenant_shard_id.shard_number
121 0 : });
122 :
123 0 : if let Some(shard_idx) = shard_idx {
124 0 : sharded.shards.remove(shard_idx);
125 0 : } else {
126 : // This is a valid but niche case, where the tenant was previously attached
127 : // as a Secondary location and then detached, so has no previously notified
128 : // state.
129 0 : tracing::info!("Shard not found while handling detach")
130 : }
131 : }
132 : ComputeHookTenant::Unsharded(_) => {
133 0 : unreachable!("Detach of unsharded tenants is handled externally");
134 : }
135 : }
136 0 : }
137 :
138 : /// Set one shard's location. If stripe size or shard count have changed, Self is reset
139 : /// and drops existing content.
140 2 : fn update(&mut self, shard_update: ShardUpdate) {
141 2 : let tenant_shard_id = shard_update.tenant_shard_id;
142 2 : let node_id = shard_update.node_id;
143 2 : let stripe_size = shard_update.stripe_size;
144 2 : let preferred_az = shard_update.preferred_az;
145 :
146 1 : match self {
147 1 : Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
148 0 : unsharded_tenant.node_id = node_id;
149 0 : if unsharded_tenant.preferred_az.as_ref()
150 0 : != preferred_az.as_ref().map(|az| az.as_ref())
151 0 : {
152 0 : unsharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
153 0 : }
154 : }
155 1 : Self::Sharded(sharded_tenant)
156 1 : if sharded_tenant.stripe_size == stripe_size
157 1 : && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
158 : {
159 1 : if let Some(existing) = sharded_tenant
160 1 : .shards
161 1 : .iter()
162 1 : .position(|s| s.0 == tenant_shard_id.shard_number)
163 0 : {
164 0 : sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
165 0 : } else {
166 1 : sharded_tenant
167 1 : .shards
168 1 : .push((tenant_shard_id.shard_number, node_id));
169 2 : sharded_tenant.shards.sort_by_key(|s| s.0)
170 : }
171 :
172 1 : if sharded_tenant.preferred_az.as_ref()
173 1 : != preferred_az.as_ref().map(|az| az.as_ref())
174 0 : {
175 0 : sharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
176 1 : }
177 : }
178 1 : _ => {
179 1 : // Shard count changed: reset struct.
180 1 : *self = Self::new(
181 1 : tenant_shard_id,
182 1 : stripe_size,
183 1 : preferred_az.map(|az| az.into_owned()),
184 1 : node_id,
185 1 : );
186 1 : }
187 : }
188 2 : }
189 : }
190 :
191 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
192 : struct ComputeHookNotifyRequestShard {
193 : node_id: NodeId,
194 : shard_number: ShardNumber,
195 : }
196 :
197 : /// Request body that we send to the control plane to notify it of where a tenant is attached
198 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
199 : struct ComputeHookNotifyRequest {
200 : tenant_id: TenantId,
201 : preferred_az: Option<String>,
202 : stripe_size: Option<ShardStripeSize>,
203 : shards: Vec<ComputeHookNotifyRequestShard>,
204 : }
205 :
206 : /// Error type for attempts to call into the control plane compute notification hook
207 : #[derive(thiserror::Error, Debug)]
208 : pub(crate) enum NotifyError {
209 : // Request was not send successfully, e.g. transport error
210 0 : #[error("Sending request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
211 : Request(#[from] reqwest::Error),
212 : // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
213 : #[error("Control plane tenant busy")]
214 : Busy,
215 : // Explicit 429 response asking us to retry less frequently
216 : #[error("Control plane overloaded")]
217 : SlowDown,
218 : // A 503 response indicates the control plane can't handle the request right now
219 : #[error("Control plane unavailable (status {0})")]
220 : Unavailable(StatusCode),
221 : // API returned unexpected non-success status. We will retry, but log a warning.
222 : #[error("Control plane returned unexpected status {0}")]
223 : Unexpected(StatusCode),
224 : // We shutdown while sending
225 : #[error("Shutting down")]
226 : ShuttingDown,
227 : // A response indicates we will never succeed, such as 400 or 403
228 : #[error("Non-retryable error {0}")]
229 : Fatal(StatusCode),
230 :
231 : #[error("neon_local error: {0}")]
232 : NeonLocal(anyhow::Error),
233 : }
234 :
235 : enum MaybeSendResult {
236 : // Please send this request while holding the lock, and if you succeed then write
237 : // the request into the lock.
238 : Transmit(
239 : (
240 : ComputeHookNotifyRequest,
241 : tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>,
242 : ),
243 : ),
244 : // Something requires sending, but you must wait for a current sender then call again
245 : AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>),
246 : // Nothing requires sending
247 : Noop,
248 : }
249 :
250 : impl ComputeHookTenant {
251 4 : fn maybe_send(
252 4 : &self,
253 4 : tenant_id: TenantId,
254 4 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>>,
255 4 : ) -> MaybeSendResult {
256 4 : let locked = match lock {
257 0 : Some(already_locked) => already_locked,
258 : None => {
259 : // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
260 4 : let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
261 0 : return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
262 : };
263 4 : locked
264 : }
265 : };
266 :
267 4 : let request = match self {
268 2 : Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
269 2 : tenant_id,
270 2 : shards: vec![ComputeHookNotifyRequestShard {
271 2 : shard_number: ShardNumber(0),
272 2 : node_id: unsharded_tenant.node_id,
273 2 : }],
274 2 : stripe_size: None,
275 2 : preferred_az: unsharded_tenant
276 2 : .preferred_az
277 2 : .as_ref()
278 2 : .map(|az| az.0.clone()),
279 2 : }),
280 2 : Self::Sharded(sharded_tenant)
281 2 : if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
282 1 : {
283 1 : Some(ComputeHookNotifyRequest {
284 1 : tenant_id,
285 1 : shards: sharded_tenant
286 1 : .shards
287 1 : .iter()
288 2 : .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
289 2 : shard_number: *shard_number,
290 2 : node_id: *node_id,
291 2 : })
292 1 : .collect(),
293 1 : stripe_size: Some(sharded_tenant.stripe_size),
294 1 : preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()),
295 1 : })
296 : }
297 1 : Self::Sharded(sharded_tenant) => {
298 1 : // Sharded tenant doesn't yet have information for all its shards
299 1 :
300 1 : tracing::info!(
301 0 : "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
302 0 : sharded_tenant.shards.len(),
303 0 : sharded_tenant.shard_count.count()
304 : );
305 1 : None
306 : }
307 : };
308 :
309 3 : match request {
310 : None => {
311 : // Not yet ready to emit a notification
312 1 : tracing::info!("Tenant isn't yet ready to emit a notification");
313 1 : MaybeSendResult::Noop
314 : }
315 1 : Some(request)
316 3 : if Some(&request) == locked.as_ref().map(|s| &s.request)
317 1 : && locked.as_ref().map(|s| s.applied).unwrap_or(false) =>
318 1 : {
319 1 : tracing::info!(
320 0 : "Skipping notification because remote state already matches ({:?})",
321 0 : &request
322 : );
323 : // No change from the last value successfully sent, and our state indicates that the last
324 : // value sent was fully applied on the control plane side.
325 1 : MaybeSendResult::Noop
326 : }
327 2 : Some(request) => {
328 2 : // Our request differs from the last one sent, or the last one sent was not fully applied on the compute side
329 2 : MaybeSendResult::Transmit((request, locked))
330 : }
331 : }
332 4 : }
333 : }
334 :
335 : /// The compute hook is a destination for notifications about changes to tenant:pageserver
336 : /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
337 : /// the compute connection string.
338 : pub(super) struct ComputeHook {
339 : config: Config,
340 : state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
341 : authorization_header: Option<String>,
342 :
343 : // Concurrency limiter, so that we do not overload the cloud control plane when updating
344 : // large numbers of tenants (e.g. when failing over after a node failure)
345 : api_concurrency: tokio::sync::Semaphore,
346 :
347 : // This lock is only used in testing enviroments, to serialize calls into neon_lock
348 : neon_local_lock: tokio::sync::Mutex<()>,
349 :
350 : // We share a client across all notifications to enable connection re-use etc when
351 : // sending large numbers of notifications
352 : client: reqwest::Client,
353 : }
354 :
355 : /// Callers may give us a list of these when asking us to send a bulk batch
356 : /// of notifications in the background. This is a 'notification' in the sense of
357 : /// other code notifying us of a shard's status, rather than being the final notification
358 : /// that we send upwards to the control plane for the whole tenant.
359 : pub(crate) struct ShardUpdate<'a> {
360 : pub(crate) tenant_shard_id: TenantShardId,
361 : pub(crate) node_id: NodeId,
362 : pub(crate) stripe_size: ShardStripeSize,
363 : pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
364 : }
365 :
366 : impl ComputeHook {
367 0 : pub(super) fn new(config: Config) -> Self {
368 0 : let authorization_header = config
369 0 : .control_plane_jwt_token
370 0 : .clone()
371 0 : .map(|jwt| format!("Bearer {}", jwt));
372 0 :
373 0 : let client = reqwest::ClientBuilder::new()
374 0 : .timeout(NOTIFY_REQUEST_TIMEOUT)
375 0 : .build()
376 0 : .expect("Failed to construct HTTP client");
377 0 :
378 0 : Self {
379 0 : state: Default::default(),
380 0 : config,
381 0 : authorization_header,
382 0 : neon_local_lock: Default::default(),
383 0 : api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
384 0 : client,
385 0 : }
386 0 : }
387 :
388 : /// For test environments: use neon_local's LocalEnv to update compute
389 0 : async fn do_notify_local(
390 0 : &self,
391 0 : reconfigure_request: &ComputeHookNotifyRequest,
392 0 : ) -> Result<(), NotifyError> {
393 : // neon_local updates are not safe to call concurrently, use a lock to serialize
394 : // all calls to this function
395 0 : let _locked = self.neon_local_lock.lock().await;
396 :
397 0 : let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
398 0 : tracing::warn!(
399 0 : "neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
400 : );
401 0 : return Ok(());
402 : };
403 0 : let env = match LocalEnv::load_config(repo_dir) {
404 0 : Ok(e) => e,
405 0 : Err(e) => {
406 0 : tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
407 0 : return Ok(());
408 : }
409 : };
410 0 : let cplane =
411 0 : ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
412 0 : let ComputeHookNotifyRequest {
413 0 : tenant_id,
414 0 : shards,
415 0 : stripe_size,
416 0 : preferred_az: _preferred_az,
417 0 : } = reconfigure_request;
418 0 :
419 0 : let compute_pageservers = shards
420 0 : .iter()
421 0 : .map(|shard| {
422 0 : let ps_conf = env
423 0 : .get_pageserver_conf(shard.node_id)
424 0 : .expect("Unknown pageserver");
425 0 : let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
426 0 : .expect("Unable to parse listen_pg_addr");
427 0 : (pg_host, pg_port.unwrap_or(5432))
428 0 : })
429 0 : .collect::<Vec<_>>();
430 :
431 0 : for (endpoint_name, endpoint) in &cplane.endpoints {
432 0 : if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
433 0 : tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
434 0 : endpoint
435 0 : .reconfigure(compute_pageservers.clone(), *stripe_size, None)
436 0 : .await
437 0 : .map_err(NotifyError::NeonLocal)?;
438 0 : }
439 : }
440 :
441 0 : Ok(())
442 0 : }
443 :
444 0 : async fn do_notify_iteration(
445 0 : &self,
446 0 : url: &String,
447 0 : reconfigure_request: &ComputeHookNotifyRequest,
448 0 : cancel: &CancellationToken,
449 0 : ) -> Result<(), NotifyError> {
450 0 : let req = self.client.request(reqwest::Method::PUT, url);
451 0 : let req = if let Some(value) = &self.authorization_header {
452 0 : req.header(reqwest::header::AUTHORIZATION, value)
453 : } else {
454 0 : req
455 : };
456 :
457 0 : tracing::info!(
458 0 : "Sending notify request to {} ({:?})",
459 : url,
460 : reconfigure_request
461 : );
462 0 : let send_result = req.json(&reconfigure_request).send().await;
463 0 : let response = match send_result {
464 0 : Ok(r) => r,
465 0 : Err(e) => return Err(e.into()),
466 : };
467 :
468 : // Treat all 2xx responses as success
469 0 : if response.status() >= reqwest::StatusCode::OK
470 0 : && response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
471 : {
472 0 : if response.status() != reqwest::StatusCode::OK {
473 : // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
474 : // log a warning.
475 0 : tracing::warn!(
476 0 : "Unexpected 2xx response code {} from control plane",
477 0 : response.status()
478 : );
479 0 : }
480 :
481 0 : return Ok(());
482 0 : }
483 0 :
484 0 : // Error response codes
485 0 : match response.status() {
486 : reqwest::StatusCode::TOO_MANY_REQUESTS => {
487 : // TODO: 429 handling should be global: set some state visible to other requests
488 : // so that they will delay before starting, rather than all notifications trying
489 : // once before backing off.
490 0 : tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
491 0 : .await
492 0 : .ok();
493 0 : Err(NotifyError::SlowDown)
494 : }
495 : reqwest::StatusCode::LOCKED => {
496 : // We consider this fatal, because it's possible that the operation blocking the control one is
497 : // also the one that is waiting for this reconcile. We should let the reconciler calling
498 : // this hook fail, to give control plane a chance to un-lock.
499 0 : tracing::info!("Control plane reports tenant is locked, dropping out of notify");
500 0 : Err(NotifyError::Busy)
501 : }
502 : reqwest::StatusCode::SERVICE_UNAVAILABLE => {
503 0 : Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
504 : }
505 : reqwest::StatusCode::GATEWAY_TIMEOUT => {
506 0 : Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
507 : }
508 : reqwest::StatusCode::BAD_GATEWAY => {
509 0 : Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
510 : }
511 :
512 0 : reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
513 0 : reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
514 0 : reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
515 0 : status => Err(NotifyError::Unexpected(
516 0 : hyper::StatusCode::from_u16(status.as_u16())
517 0 : .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
518 0 : )),
519 : }
520 0 : }
521 :
522 0 : async fn do_notify(
523 0 : &self,
524 0 : url: &String,
525 0 : reconfigure_request: &ComputeHookNotifyRequest,
526 0 : cancel: &CancellationToken,
527 0 : ) -> Result<(), NotifyError> {
528 : // We hold these semaphore units across all retries, rather than only across each
529 : // HTTP request: this is to preserve fairness and avoid a situation where a retry might
530 : // time out waiting for a semaphore.
531 0 : let _units = self
532 0 : .api_concurrency
533 0 : .acquire()
534 0 : .await
535 : // Interpret closed semaphore as shutdown
536 0 : .map_err(|_| NotifyError::ShuttingDown)?;
537 :
538 0 : backoff::retry(
539 0 : || self.do_notify_iteration(url, reconfigure_request, cancel),
540 0 : |e| {
541 0 : matches!(
542 0 : e,
543 : NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
544 : )
545 0 : },
546 0 : 3,
547 0 : 10,
548 0 : "Send compute notification",
549 0 : cancel,
550 0 : )
551 0 : .await
552 0 : .ok_or_else(|| NotifyError::ShuttingDown)
553 0 : .and_then(|x| x)
554 0 : }
555 :
556 : /// Synchronous phase: update the per-tenant state for the next intended notification
557 0 : fn notify_prepare(&self, shard_update: ShardUpdate) -> MaybeSendResult {
558 0 : let mut state_locked = self.state.lock().unwrap();
559 :
560 : use std::collections::hash_map::Entry;
561 0 : let tenant_shard_id = shard_update.tenant_shard_id;
562 :
563 0 : let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
564 0 : Entry::Vacant(e) => {
565 0 : let ShardUpdate {
566 0 : tenant_shard_id,
567 0 : node_id,
568 0 : stripe_size,
569 0 : preferred_az,
570 0 : } = shard_update;
571 0 : e.insert(ComputeHookTenant::new(
572 0 : tenant_shard_id,
573 0 : stripe_size,
574 0 : preferred_az.map(|az| az.into_owned()),
575 0 : node_id,
576 0 : ))
577 : }
578 0 : Entry::Occupied(e) => {
579 0 : let tenant = e.into_mut();
580 0 : tenant.update(shard_update);
581 0 : tenant
582 : }
583 : };
584 0 : tenant.maybe_send(tenant_shard_id.tenant_id, None)
585 0 : }
586 :
587 0 : async fn notify_execute(
588 0 : &self,
589 0 : maybe_send_result: MaybeSendResult,
590 0 : tenant_shard_id: TenantShardId,
591 0 : cancel: &CancellationToken,
592 0 : ) -> Result<(), NotifyError> {
593 : // Process result: we may get an update to send, or we may have to wait for a lock
594 : // before trying again.
595 0 : let (request, mut send_lock_guard) = match maybe_send_result {
596 : MaybeSendResult::Noop => {
597 0 : return Ok(());
598 : }
599 0 : MaybeSendResult::AwaitLock(send_lock) => {
600 0 : let send_locked = tokio::select! {
601 0 : guard = send_lock.lock_owned() => {guard},
602 0 : _ = cancel.cancelled() => {
603 0 : return Err(NotifyError::ShuttingDown)
604 : }
605 : };
606 :
607 : // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
608 : // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
609 : // try_lock.
610 0 : let state_locked = self.state.lock().unwrap();
611 0 : let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
612 0 : return Ok(());
613 : };
614 0 : match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
615 : MaybeSendResult::AwaitLock(_) => {
616 0 : unreachable!("We supplied lock guard")
617 : }
618 : MaybeSendResult::Noop => {
619 0 : return Ok(());
620 : }
621 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
622 : }
623 : }
624 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
625 : };
626 :
627 0 : let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
628 0 : Some(if control_plane_url.ends_with('/') {
629 0 : format!("{control_plane_url}notify-attach")
630 : } else {
631 0 : format!("{control_plane_url}/notify-attach")
632 : })
633 : } else {
634 0 : self.config.compute_hook_url.clone()
635 : };
636 0 : let result = if let Some(notify_url) = &compute_hook_url {
637 0 : self.do_notify(notify_url, &request, cancel).await
638 : } else {
639 0 : self.do_notify_local(&request).await.map_err(|e| {
640 0 : // This path is for testing only, so munge the error into our prod-style error type.
641 0 : tracing::error!("neon_local notification hook failed: {e}");
642 0 : NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
643 0 : })
644 : };
645 :
646 0 : match result {
647 0 : Ok(_) => {
648 0 : // Before dropping the send lock, stash the request we just sent so that
649 0 : // subsequent callers can avoid redundantly re-sending the same thing.
650 0 : *send_lock_guard = Some(ComputeRemoteState {
651 0 : request,
652 0 : applied: true,
653 0 : });
654 0 : }
655 0 : Err(NotifyError::Busy) => {
656 0 : // Busy result means that the server responded and has stored the new configuration,
657 0 : // but was not able to fully apply it to the compute
658 0 : *send_lock_guard = Some(ComputeRemoteState {
659 0 : request,
660 0 : applied: false,
661 0 : });
662 0 : }
663 0 : Err(_) => {
664 0 : // General error case: we can no longer know the remote state, so clear it. This will result in
665 0 : // the logic in maybe_send recognizing that we should call the hook again.
666 0 : *send_lock_guard = None;
667 0 : }
668 : }
669 0 : result
670 0 : }
671 :
672 : /// Infallible synchronous fire-and-forget version of notify(), that sends its results to
673 : /// a channel. Something should consume the channel and arrange to try notifying again
674 : /// if something failed.
675 0 : pub(super) fn notify_background(
676 0 : self: &Arc<Self>,
677 0 : notifications: Vec<ShardUpdate>,
678 0 : result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
679 0 : cancel: &CancellationToken,
680 0 : ) {
681 0 : let mut maybe_sends = Vec::new();
682 0 : for shard_update in notifications {
683 0 : let tenant_shard_id = shard_update.tenant_shard_id;
684 0 : let maybe_send_result = self.notify_prepare(shard_update);
685 0 : maybe_sends.push((tenant_shard_id, maybe_send_result))
686 : }
687 :
688 0 : let this = self.clone();
689 0 : let cancel = cancel.clone();
690 0 :
691 0 : tokio::task::spawn(async move {
692 0 : // Construct an async stream of futures to invoke the compute notify function: we do this
693 0 : // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. The
694 0 : // ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
695 0 : // would mostly just be waiting on that semaphore.
696 0 : let mut stream = futures::stream::iter(maybe_sends)
697 0 : .map(|(tenant_shard_id, maybe_send_result)| {
698 0 : let this = this.clone();
699 0 : let cancel = cancel.clone();
700 :
701 0 : async move {
702 0 : this
703 0 : .notify_execute(maybe_send_result, tenant_shard_id, &cancel)
704 0 : .await.map_err(|e| (tenant_shard_id, e))
705 0 : }.instrument(info_span!(
706 0 : "notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
707 : ))
708 0 : })
709 0 : .buffered(API_CONCURRENCY);
710 :
711 : loop {
712 0 : tokio::select! {
713 0 : next = stream.next() => {
714 0 : match next {
715 0 : Some(r) => {
716 0 : result_tx.send(r).await.ok();
717 : },
718 : None => {
719 0 : tracing::info!("Finished sending background compute notifications");
720 0 : break;
721 : }
722 : }
723 : },
724 0 : _ = cancel.cancelled() => {
725 0 : tracing::info!("Shutdown while running background compute notifications");
726 0 : break;
727 : }
728 : };
729 : }
730 0 : });
731 0 : }
732 :
733 : /// Call this to notify the compute (postgres) tier of new pageservers to use
734 : /// for a tenant. notify() is called by each shard individually, and this function
735 : /// will decide whether an update to the tenant is sent. An update is sent on the
736 : /// condition that:
737 : /// - We know a pageserver for every shard.
738 : /// - All the shards have the same shard_count (i.e. we are not mid-split)
739 : ///
740 : /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
741 : /// that is cancelled.
742 : ///
743 : /// This function is fallible, including in the case that the control plane is transiently
744 : /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
745 : /// periods, but we don't retry forever. The **caller** is responsible for handling failures and
746 : /// ensuring that they eventually call again to ensure that the compute is eventually notified of
747 : /// the proper pageserver nodes for a tenant.
748 : #[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
749 : pub(super) async fn notify<'a>(
750 : &self,
751 : shard_update: ShardUpdate<'a>,
752 : cancel: &CancellationToken,
753 : ) -> Result<(), NotifyError> {
754 : let tenant_shard_id = shard_update.tenant_shard_id;
755 : let maybe_send_result = self.notify_prepare(shard_update);
756 : self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
757 : .await
758 : }
759 :
760 : /// Reflect a detach for a particular shard in the compute hook state.
761 : ///
762 : /// The goal is to avoid sending compute notifications with stale information (i.e.
763 : /// including detach pageservers).
764 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
765 : pub(super) fn handle_detach(
766 : &self,
767 : tenant_shard_id: TenantShardId,
768 : stripe_size: ShardStripeSize,
769 : ) {
770 : use std::collections::hash_map::Entry;
771 :
772 : let mut state_locked = self.state.lock().unwrap();
773 : match state_locked.entry(tenant_shard_id.tenant_id) {
774 : Entry::Vacant(_) => {
775 : // This is a valid but niche case, where the tenant was previously attached
776 : // as a Secondary location and then detached, so has no previously notified
777 : // state.
778 : tracing::info!("Compute hook tenant not found for detach");
779 : }
780 : Entry::Occupied(mut e) => {
781 : let sharded = e.get().is_sharded();
782 : if !sharded {
783 : e.remove();
784 : } else {
785 : e.get_mut().remove_shard(tenant_shard_id, stripe_size);
786 : }
787 :
788 : tracing::debug!("Compute hook handled shard detach");
789 : }
790 : }
791 : }
792 : }
793 :
794 : #[cfg(test)]
795 : pub(crate) mod tests {
796 : use pageserver_api::shard::{ShardCount, ShardNumber};
797 : use utils::id::TenantId;
798 :
799 : use super::*;
800 :
801 : #[test]
802 1 : fn tenant_updates() -> anyhow::Result<()> {
803 1 : let tenant_id = TenantId::generate();
804 1 : let mut tenant_state = ComputeHookTenant::new(
805 1 : TenantShardId {
806 1 : tenant_id,
807 1 : shard_count: ShardCount::new(0),
808 1 : shard_number: ShardNumber(0),
809 1 : },
810 1 : ShardStripeSize(12345),
811 1 : None,
812 1 : NodeId(1),
813 1 : );
814 1 :
815 1 : // An unsharded tenant is always ready to emit a notification, but won't
816 1 : // send the same one twice
817 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
818 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
819 0 : anyhow::bail!("Wrong send result");
820 : };
821 1 : assert_eq!(request.shards.len(), 1);
822 1 : assert!(request.stripe_size.is_none());
823 :
824 : // Simulate successful send
825 1 : *guard = Some(ComputeRemoteState {
826 1 : request,
827 1 : applied: true,
828 1 : });
829 1 : drop(guard);
830 1 :
831 1 : // Try asking again: this should be a no-op
832 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
833 1 : assert!(matches!(send_result, MaybeSendResult::Noop));
834 :
835 : // Writing the first shard of a multi-sharded situation (i.e. in a split)
836 : // resets the tenant state and puts it in an non-notifying state (need to
837 : // see all shards)
838 1 : tenant_state.update(ShardUpdate {
839 1 : tenant_shard_id: TenantShardId {
840 1 : tenant_id,
841 1 : shard_count: ShardCount::new(2),
842 1 : shard_number: ShardNumber(1),
843 1 : },
844 1 : stripe_size: ShardStripeSize(32768),
845 1 : preferred_az: None,
846 1 : node_id: NodeId(1),
847 1 : });
848 1 : assert!(matches!(
849 1 : tenant_state.maybe_send(tenant_id, None),
850 : MaybeSendResult::Noop
851 : ));
852 :
853 : // Writing the second shard makes it ready to notify
854 1 : tenant_state.update(ShardUpdate {
855 1 : tenant_shard_id: TenantShardId {
856 1 : tenant_id,
857 1 : shard_count: ShardCount::new(2),
858 1 : shard_number: ShardNumber(0),
859 1 : },
860 1 : stripe_size: ShardStripeSize(32768),
861 1 : preferred_az: None,
862 1 : node_id: NodeId(1),
863 1 : });
864 1 :
865 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
866 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
867 0 : anyhow::bail!("Wrong send result");
868 : };
869 1 : assert_eq!(request.shards.len(), 2);
870 1 : assert_eq!(request.stripe_size, Some(ShardStripeSize(32768)));
871 :
872 : // Simulate successful send
873 1 : *guard = Some(ComputeRemoteState {
874 1 : request,
875 1 : applied: true,
876 1 : });
877 1 : drop(guard);
878 1 :
879 1 : Ok(())
880 1 : }
881 : }
|