Line data Source code
1 : use std::borrow::Cow;
2 : use std::collections::HashMap;
3 : use std::error::Error as _;
4 : use std::sync::Arc;
5 : use std::time::Duration;
6 :
7 : use anyhow::Context;
8 : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
9 : use control_plane::local_env::LocalEnv;
10 : use futures::StreamExt;
11 : use hyper::StatusCode;
12 : use pageserver_api::controller_api::AvailabilityZone;
13 : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
14 : use postgres_connection::parse_host_port;
15 : use serde::{Deserialize, Serialize};
16 : use tokio_util::sync::CancellationToken;
17 : use tracing::{Instrument, info_span};
18 : use utils::backoff::{self};
19 : use utils::id::{NodeId, TenantId};
20 :
21 : use crate::service::Config;
22 :
23 : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
24 :
25 : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
26 :
27 : pub(crate) const API_CONCURRENCY: usize = 32;
28 :
29 : struct UnshardedComputeHookTenant {
30 : // Which node is this tenant attached to
31 : node_id: NodeId,
32 :
33 : // The tenant's preferred AZ, so that we may pass this on to the control plane
34 : preferred_az: Option<AvailabilityZone>,
35 :
36 : // Must hold this lock to send a notification.
37 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
38 : }
39 : struct ShardedComputeHookTenant {
40 : stripe_size: ShardStripeSize,
41 : shard_count: ShardCount,
42 : shards: Vec<(ShardNumber, NodeId)>,
43 :
44 : // The tenant's preferred AZ, so that we may pass this on to the control plane
45 : preferred_az: Option<AvailabilityZone>,
46 :
47 : // Must hold this lock to send a notification. The contents represent
48 : // the last successfully sent notification, and are used to coalesce multiple
49 : // updates by only sending when there is a chance since our last successful send.
50 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
51 : }
52 :
53 : /// Represents our knowledge of the compute's state: we can update this when we get a
54 : /// response from a notify API call, which tells us what has been applied.
55 : ///
56 : /// Should be wrapped in an Option<>, as we cannot always know the remote state.
57 : #[derive(PartialEq, Eq, Debug)]
58 : struct ComputeRemoteState {
59 : // The request body which was acked by the compute
60 : request: ComputeHookNotifyRequest,
61 :
62 : // Whether the cplane indicated that the state was applied to running computes, or just
63 : // persisted. In the Neon control plane, this is the difference between a 423 response (meaning
64 : // persisted but not applied), and a 2xx response (both persisted and applied)
65 : applied: bool,
66 : }
67 :
68 : enum ComputeHookTenant {
69 : Unsharded(UnshardedComputeHookTenant),
70 : Sharded(ShardedComputeHookTenant),
71 : }
72 :
73 : impl ComputeHookTenant {
74 : /// Construct with at least one shard's information
75 2 : fn new(
76 2 : tenant_shard_id: TenantShardId,
77 2 : stripe_size: ShardStripeSize,
78 2 : preferred_az: Option<AvailabilityZone>,
79 2 : node_id: NodeId,
80 2 : ) -> Self {
81 2 : if tenant_shard_id.shard_count.count() > 1 {
82 1 : Self::Sharded(ShardedComputeHookTenant {
83 1 : shards: vec![(tenant_shard_id.shard_number, node_id)],
84 1 : stripe_size,
85 1 : shard_count: tenant_shard_id.shard_count,
86 1 : preferred_az,
87 1 : send_lock: Arc::default(),
88 1 : })
89 : } else {
90 1 : Self::Unsharded(UnshardedComputeHookTenant {
91 1 : node_id,
92 1 : preferred_az,
93 1 : send_lock: Arc::default(),
94 1 : })
95 : }
96 2 : }
97 :
98 4 : fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>> {
99 4 : match self {
100 2 : Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
101 2 : Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
102 : }
103 4 : }
104 :
105 0 : fn is_sharded(&self) -> bool {
106 0 : matches!(self, ComputeHookTenant::Sharded(_))
107 0 : }
108 :
109 : /// Clear compute hook state for the specified shard.
110 : /// Only valid for [`ComputeHookTenant::Sharded`] instances.
111 0 : fn remove_shard(&mut self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize) {
112 0 : match self {
113 0 : ComputeHookTenant::Sharded(sharded) => {
114 0 : if sharded.stripe_size != stripe_size
115 0 : || sharded.shard_count != tenant_shard_id.shard_count
116 : {
117 0 : tracing::warn!("Shard split detected while handling detach")
118 0 : }
119 :
120 0 : let shard_idx = sharded.shards.iter().position(|(shard_number, _node_id)| {
121 0 : *shard_number == tenant_shard_id.shard_number
122 0 : });
123 :
124 0 : if let Some(shard_idx) = shard_idx {
125 0 : sharded.shards.remove(shard_idx);
126 0 : } else {
127 : // This is a valid but niche case, where the tenant was previously attached
128 : // as a Secondary location and then detached, so has no previously notified
129 : // state.
130 0 : tracing::info!("Shard not found while handling detach")
131 : }
132 : }
133 : ComputeHookTenant::Unsharded(_) => {
134 0 : unreachable!("Detach of unsharded tenants is handled externally");
135 : }
136 : }
137 0 : }
138 :
139 : /// Set one shard's location. If stripe size or shard count have changed, Self is reset
140 : /// and drops existing content.
141 2 : fn update(&mut self, shard_update: ShardUpdate) {
142 2 : let tenant_shard_id = shard_update.tenant_shard_id;
143 2 : let node_id = shard_update.node_id;
144 2 : let stripe_size = shard_update.stripe_size;
145 2 : let preferred_az = shard_update.preferred_az;
146 :
147 1 : match self {
148 1 : Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
149 0 : unsharded_tenant.node_id = node_id;
150 0 : if unsharded_tenant.preferred_az.as_ref()
151 0 : != preferred_az.as_ref().map(|az| az.as_ref())
152 0 : {
153 0 : unsharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
154 0 : }
155 : }
156 1 : Self::Sharded(sharded_tenant)
157 1 : if sharded_tenant.stripe_size == stripe_size
158 1 : && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
159 : {
160 1 : if let Some(existing) = sharded_tenant
161 1 : .shards
162 1 : .iter()
163 1 : .position(|s| s.0 == tenant_shard_id.shard_number)
164 0 : {
165 0 : sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
166 0 : } else {
167 1 : sharded_tenant
168 1 : .shards
169 1 : .push((tenant_shard_id.shard_number, node_id));
170 2 : sharded_tenant.shards.sort_by_key(|s| s.0)
171 : }
172 :
173 1 : if sharded_tenant.preferred_az.as_ref()
174 1 : != preferred_az.as_ref().map(|az| az.as_ref())
175 0 : {
176 0 : sharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
177 1 : }
178 : }
179 1 : _ => {
180 1 : // Shard count changed: reset struct.
181 1 : *self = Self::new(
182 1 : tenant_shard_id,
183 1 : stripe_size,
184 1 : preferred_az.map(|az| az.into_owned()),
185 1 : node_id,
186 1 : );
187 1 : }
188 : }
189 2 : }
190 : }
191 :
192 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
193 : struct ComputeHookNotifyRequestShard {
194 : node_id: NodeId,
195 : shard_number: ShardNumber,
196 : }
197 :
198 : /// Request body that we send to the control plane to notify it of where a tenant is attached
199 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
200 : struct ComputeHookNotifyRequest {
201 : tenant_id: TenantId,
202 : preferred_az: Option<String>,
203 : stripe_size: Option<ShardStripeSize>,
204 : shards: Vec<ComputeHookNotifyRequestShard>,
205 : }
206 :
207 : /// Error type for attempts to call into the control plane compute notification hook
208 : #[derive(thiserror::Error, Debug)]
209 : pub(crate) enum NotifyError {
210 : // Request was not send successfully, e.g. transport error
211 0 : #[error("Sending request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
212 : Request(#[from] reqwest::Error),
213 : // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
214 : #[error("Control plane tenant busy")]
215 : Busy,
216 : // Explicit 429 response asking us to retry less frequently
217 : #[error("Control plane overloaded")]
218 : SlowDown,
219 : // A 503 response indicates the control plane can't handle the request right now
220 : #[error("Control plane unavailable (status {0})")]
221 : Unavailable(StatusCode),
222 : // API returned unexpected non-success status. We will retry, but log a warning.
223 : #[error("Control plane returned unexpected status {0}")]
224 : Unexpected(StatusCode),
225 : // We shutdown while sending
226 : #[error("Shutting down")]
227 : ShuttingDown,
228 : // A response indicates we will never succeed, such as 400 or 403
229 : #[error("Non-retryable error {0}")]
230 : Fatal(StatusCode),
231 :
232 : #[error("neon_local error: {0}")]
233 : NeonLocal(anyhow::Error),
234 : }
235 :
236 : enum MaybeSendResult {
237 : // Please send this request while holding the lock, and if you succeed then write
238 : // the request into the lock.
239 : Transmit(
240 : (
241 : ComputeHookNotifyRequest,
242 : tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>,
243 : ),
244 : ),
245 : // Something requires sending, but you must wait for a current sender then call again
246 : AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>),
247 : // Nothing requires sending
248 : Noop,
249 : }
250 :
251 : impl ComputeHookTenant {
252 4 : fn maybe_send(
253 4 : &self,
254 4 : tenant_id: TenantId,
255 4 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>>,
256 4 : ) -> MaybeSendResult {
257 4 : let locked = match lock {
258 0 : Some(already_locked) => already_locked,
259 : None => {
260 : // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
261 4 : let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
262 0 : return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
263 : };
264 4 : locked
265 : }
266 : };
267 :
268 4 : let request = match self {
269 2 : Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
270 2 : tenant_id,
271 2 : shards: vec![ComputeHookNotifyRequestShard {
272 2 : shard_number: ShardNumber(0),
273 2 : node_id: unsharded_tenant.node_id,
274 2 : }],
275 2 : stripe_size: None,
276 2 : preferred_az: unsharded_tenant
277 2 : .preferred_az
278 2 : .as_ref()
279 2 : .map(|az| az.0.clone()),
280 2 : }),
281 2 : Self::Sharded(sharded_tenant)
282 2 : if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
283 1 : {
284 1 : Some(ComputeHookNotifyRequest {
285 1 : tenant_id,
286 1 : shards: sharded_tenant
287 1 : .shards
288 1 : .iter()
289 2 : .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
290 2 : shard_number: *shard_number,
291 2 : node_id: *node_id,
292 2 : })
293 1 : .collect(),
294 1 : stripe_size: Some(sharded_tenant.stripe_size),
295 1 : preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()),
296 1 : })
297 : }
298 1 : Self::Sharded(sharded_tenant) => {
299 1 : // Sharded tenant doesn't yet have information for all its shards
300 1 :
301 1 : tracing::info!(
302 0 : "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
303 0 : sharded_tenant.shards.len(),
304 0 : sharded_tenant.shard_count.count()
305 : );
306 1 : None
307 : }
308 : };
309 :
310 3 : match request {
311 : None => {
312 : // Not yet ready to emit a notification
313 1 : tracing::info!("Tenant isn't yet ready to emit a notification");
314 1 : MaybeSendResult::Noop
315 : }
316 1 : Some(request)
317 3 : if Some(&request) == locked.as_ref().map(|s| &s.request)
318 1 : && locked.as_ref().map(|s| s.applied).unwrap_or(false) =>
319 1 : {
320 1 : tracing::info!(
321 0 : "Skipping notification because remote state already matches ({:?})",
322 0 : &request
323 : );
324 : // No change from the last value successfully sent, and our state indicates that the last
325 : // value sent was fully applied on the control plane side.
326 1 : MaybeSendResult::Noop
327 : }
328 2 : Some(request) => {
329 2 : // Our request differs from the last one sent, or the last one sent was not fully applied on the compute side
330 2 : MaybeSendResult::Transmit((request, locked))
331 : }
332 : }
333 4 : }
334 : }
335 :
336 : /// The compute hook is a destination for notifications about changes to tenant:pageserver
337 : /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
338 : /// the compute connection string.
339 : pub(super) struct ComputeHook {
340 : config: Config,
341 : state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
342 : authorization_header: Option<String>,
343 :
344 : // Concurrency limiter, so that we do not overload the cloud control plane when updating
345 : // large numbers of tenants (e.g. when failing over after a node failure)
346 : api_concurrency: tokio::sync::Semaphore,
347 :
348 : // This lock is only used in testing enviroments, to serialize calls into neon_lock
349 : neon_local_lock: tokio::sync::Mutex<()>,
350 :
351 : // We share a client across all notifications to enable connection re-use etc when
352 : // sending large numbers of notifications
353 : client: reqwest::Client,
354 : }
355 :
356 : /// Callers may give us a list of these when asking us to send a bulk batch
357 : /// of notifications in the background. This is a 'notification' in the sense of
358 : /// other code notifying us of a shard's status, rather than being the final notification
359 : /// that we send upwards to the control plane for the whole tenant.
360 : pub(crate) struct ShardUpdate<'a> {
361 : pub(crate) tenant_shard_id: TenantShardId,
362 : pub(crate) node_id: NodeId,
363 : pub(crate) stripe_size: ShardStripeSize,
364 : pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
365 : }
366 :
367 : impl ComputeHook {
368 0 : pub(super) fn new(config: Config) -> anyhow::Result<Self> {
369 0 : let authorization_header = config
370 0 : .control_plane_jwt_token
371 0 : .clone()
372 0 : .map(|jwt| format!("Bearer {}", jwt));
373 0 :
374 0 : let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
375 0 : for cert in &config.ssl_ca_certs {
376 0 : client = client.add_root_certificate(cert.clone());
377 0 : }
378 0 : let client = client
379 0 : .build()
380 0 : .context("Failed to build http client for compute hook")?;
381 :
382 0 : Ok(Self {
383 0 : state: Default::default(),
384 0 : config,
385 0 : authorization_header,
386 0 : neon_local_lock: Default::default(),
387 0 : api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
388 0 : client,
389 0 : })
390 0 : }
391 :
392 : /// For test environments: use neon_local's LocalEnv to update compute
393 0 : async fn do_notify_local(
394 0 : &self,
395 0 : reconfigure_request: &ComputeHookNotifyRequest,
396 0 : ) -> Result<(), NotifyError> {
397 : // neon_local updates are not safe to call concurrently, use a lock to serialize
398 : // all calls to this function
399 0 : let _locked = self.neon_local_lock.lock().await;
400 :
401 0 : let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
402 0 : tracing::warn!(
403 0 : "neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
404 : );
405 0 : return Ok(());
406 : };
407 0 : let env = match LocalEnv::load_config(repo_dir) {
408 0 : Ok(e) => e,
409 0 : Err(e) => {
410 0 : tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
411 0 : return Ok(());
412 : }
413 : };
414 0 : let cplane =
415 0 : ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
416 0 : let ComputeHookNotifyRequest {
417 0 : tenant_id,
418 0 : shards,
419 0 : stripe_size,
420 0 : preferred_az: _preferred_az,
421 0 : } = reconfigure_request;
422 0 :
423 0 : let compute_pageservers = shards
424 0 : .iter()
425 0 : .map(|shard| {
426 0 : let ps_conf = env
427 0 : .get_pageserver_conf(shard.node_id)
428 0 : .expect("Unknown pageserver");
429 0 : let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
430 0 : .expect("Unable to parse listen_pg_addr");
431 0 : (pg_host, pg_port.unwrap_or(5432))
432 0 : })
433 0 : .collect::<Vec<_>>();
434 :
435 0 : for (endpoint_name, endpoint) in &cplane.endpoints {
436 0 : if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
437 0 : tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
438 0 : endpoint
439 0 : .reconfigure(compute_pageservers.clone(), *stripe_size, None)
440 0 : .await
441 0 : .map_err(NotifyError::NeonLocal)?;
442 0 : }
443 : }
444 :
445 0 : Ok(())
446 0 : }
447 :
448 0 : async fn do_notify_iteration(
449 0 : &self,
450 0 : url: &String,
451 0 : reconfigure_request: &ComputeHookNotifyRequest,
452 0 : cancel: &CancellationToken,
453 0 : ) -> Result<(), NotifyError> {
454 0 : let req = self.client.request(reqwest::Method::PUT, url);
455 0 : let req = if let Some(value) = &self.authorization_header {
456 0 : req.header(reqwest::header::AUTHORIZATION, value)
457 : } else {
458 0 : req
459 : };
460 :
461 0 : tracing::info!(
462 0 : "Sending notify request to {} ({:?})",
463 : url,
464 : reconfigure_request
465 : );
466 0 : let send_result = req.json(&reconfigure_request).send().await;
467 0 : let response = match send_result {
468 0 : Ok(r) => r,
469 0 : Err(e) => return Err(e.into()),
470 : };
471 :
472 : // Treat all 2xx responses as success
473 0 : if response.status() >= reqwest::StatusCode::OK
474 0 : && response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
475 : {
476 0 : if response.status() != reqwest::StatusCode::OK {
477 : // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
478 : // log a warning.
479 0 : tracing::warn!(
480 0 : "Unexpected 2xx response code {} from control plane",
481 0 : response.status()
482 : );
483 0 : }
484 :
485 0 : return Ok(());
486 0 : }
487 0 :
488 0 : // Error response codes
489 0 : match response.status() {
490 : reqwest::StatusCode::TOO_MANY_REQUESTS => {
491 : // TODO: 429 handling should be global: set some state visible to other requests
492 : // so that they will delay before starting, rather than all notifications trying
493 : // once before backing off.
494 0 : tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
495 0 : .await
496 0 : .ok();
497 0 : Err(NotifyError::SlowDown)
498 : }
499 : reqwest::StatusCode::LOCKED => {
500 : // We consider this fatal, because it's possible that the operation blocking the control one is
501 : // also the one that is waiting for this reconcile. We should let the reconciler calling
502 : // this hook fail, to give control plane a chance to un-lock.
503 0 : tracing::info!("Control plane reports tenant is locked, dropping out of notify");
504 0 : Err(NotifyError::Busy)
505 : }
506 : reqwest::StatusCode::SERVICE_UNAVAILABLE => {
507 0 : Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
508 : }
509 : reqwest::StatusCode::GATEWAY_TIMEOUT => {
510 0 : Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
511 : }
512 : reqwest::StatusCode::BAD_GATEWAY => {
513 0 : Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
514 : }
515 :
516 0 : reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
517 0 : reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
518 0 : reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
519 0 : status => Err(NotifyError::Unexpected(
520 0 : hyper::StatusCode::from_u16(status.as_u16())
521 0 : .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
522 0 : )),
523 : }
524 0 : }
525 :
526 0 : async fn do_notify(
527 0 : &self,
528 0 : url: &String,
529 0 : reconfigure_request: &ComputeHookNotifyRequest,
530 0 : cancel: &CancellationToken,
531 0 : ) -> Result<(), NotifyError> {
532 : // We hold these semaphore units across all retries, rather than only across each
533 : // HTTP request: this is to preserve fairness and avoid a situation where a retry might
534 : // time out waiting for a semaphore.
535 0 : let _units = self
536 0 : .api_concurrency
537 0 : .acquire()
538 0 : .await
539 : // Interpret closed semaphore as shutdown
540 0 : .map_err(|_| NotifyError::ShuttingDown)?;
541 :
542 0 : backoff::retry(
543 0 : || self.do_notify_iteration(url, reconfigure_request, cancel),
544 0 : |e| {
545 0 : matches!(
546 0 : e,
547 : NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
548 : )
549 0 : },
550 0 : 3,
551 0 : 10,
552 0 : "Send compute notification",
553 0 : cancel,
554 0 : )
555 0 : .await
556 0 : .ok_or_else(|| NotifyError::ShuttingDown)
557 0 : .and_then(|x| x)
558 0 : }
559 :
560 : /// Synchronous phase: update the per-tenant state for the next intended notification
561 0 : fn notify_prepare(&self, shard_update: ShardUpdate) -> MaybeSendResult {
562 0 : let mut state_locked = self.state.lock().unwrap();
563 :
564 : use std::collections::hash_map::Entry;
565 0 : let tenant_shard_id = shard_update.tenant_shard_id;
566 :
567 0 : let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
568 0 : Entry::Vacant(e) => {
569 0 : let ShardUpdate {
570 0 : tenant_shard_id,
571 0 : node_id,
572 0 : stripe_size,
573 0 : preferred_az,
574 0 : } = shard_update;
575 0 : e.insert(ComputeHookTenant::new(
576 0 : tenant_shard_id,
577 0 : stripe_size,
578 0 : preferred_az.map(|az| az.into_owned()),
579 0 : node_id,
580 0 : ))
581 : }
582 0 : Entry::Occupied(e) => {
583 0 : let tenant = e.into_mut();
584 0 : tenant.update(shard_update);
585 0 : tenant
586 : }
587 : };
588 0 : tenant.maybe_send(tenant_shard_id.tenant_id, None)
589 0 : }
590 :
591 0 : async fn notify_execute(
592 0 : &self,
593 0 : maybe_send_result: MaybeSendResult,
594 0 : tenant_shard_id: TenantShardId,
595 0 : cancel: &CancellationToken,
596 0 : ) -> Result<(), NotifyError> {
597 : // Process result: we may get an update to send, or we may have to wait for a lock
598 : // before trying again.
599 0 : let (request, mut send_lock_guard) = match maybe_send_result {
600 : MaybeSendResult::Noop => {
601 0 : return Ok(());
602 : }
603 0 : MaybeSendResult::AwaitLock(send_lock) => {
604 0 : let send_locked = tokio::select! {
605 0 : guard = send_lock.lock_owned() => {guard},
606 0 : _ = cancel.cancelled() => {
607 0 : return Err(NotifyError::ShuttingDown)
608 : }
609 : };
610 :
611 : // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
612 : // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
613 : // try_lock.
614 0 : let state_locked = self.state.lock().unwrap();
615 0 : let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
616 0 : return Ok(());
617 : };
618 0 : match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
619 : MaybeSendResult::AwaitLock(_) => {
620 0 : unreachable!("We supplied lock guard")
621 : }
622 : MaybeSendResult::Noop => {
623 0 : return Ok(());
624 : }
625 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
626 : }
627 : }
628 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
629 : };
630 :
631 0 : let result = if !self.config.use_local_compute_notifications {
632 0 : let compute_hook_url = if let Some(control_plane_url) = &self.config.control_plane_url {
633 0 : Some(if control_plane_url.ends_with('/') {
634 0 : format!("{control_plane_url}notify-attach")
635 : } else {
636 0 : format!("{control_plane_url}/notify-attach")
637 : })
638 : } else {
639 0 : self.config.compute_hook_url.clone()
640 : };
641 :
642 : // We validate this at startup
643 0 : let notify_url = compute_hook_url.as_ref().unwrap();
644 0 : self.do_notify(notify_url, &request, cancel).await
645 : } else {
646 0 : self.do_notify_local(&request).await.map_err(|e| {
647 0 : // This path is for testing only, so munge the error into our prod-style error type.
648 0 : tracing::error!("neon_local notification hook failed: {e}");
649 0 : NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
650 0 : })
651 : };
652 :
653 0 : match result {
654 0 : Ok(_) => {
655 0 : // Before dropping the send lock, stash the request we just sent so that
656 0 : // subsequent callers can avoid redundantly re-sending the same thing.
657 0 : *send_lock_guard = Some(ComputeRemoteState {
658 0 : request,
659 0 : applied: true,
660 0 : });
661 0 : }
662 0 : Err(NotifyError::Busy) => {
663 0 : // Busy result means that the server responded and has stored the new configuration,
664 0 : // but was not able to fully apply it to the compute
665 0 : *send_lock_guard = Some(ComputeRemoteState {
666 0 : request,
667 0 : applied: false,
668 0 : });
669 0 : }
670 0 : Err(_) => {
671 0 : // General error case: we can no longer know the remote state, so clear it. This will result in
672 0 : // the logic in maybe_send recognizing that we should call the hook again.
673 0 : *send_lock_guard = None;
674 0 : }
675 : }
676 0 : result
677 0 : }
678 :
679 : /// Infallible synchronous fire-and-forget version of notify(), that sends its results to
680 : /// a channel. Something should consume the channel and arrange to try notifying again
681 : /// if something failed.
682 0 : pub(super) fn notify_background(
683 0 : self: &Arc<Self>,
684 0 : notifications: Vec<ShardUpdate>,
685 0 : result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
686 0 : cancel: &CancellationToken,
687 0 : ) {
688 0 : let mut maybe_sends = Vec::new();
689 0 : for shard_update in notifications {
690 0 : let tenant_shard_id = shard_update.tenant_shard_id;
691 0 : let maybe_send_result = self.notify_prepare(shard_update);
692 0 : maybe_sends.push((tenant_shard_id, maybe_send_result))
693 : }
694 :
695 0 : let this = self.clone();
696 0 : let cancel = cancel.clone();
697 0 :
698 0 : tokio::task::spawn(async move {
699 0 : // Construct an async stream of futures to invoke the compute notify function: we do this
700 0 : // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. The
701 0 : // ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
702 0 : // would mostly just be waiting on that semaphore.
703 0 : let mut stream = futures::stream::iter(maybe_sends)
704 0 : .map(|(tenant_shard_id, maybe_send_result)| {
705 0 : let this = this.clone();
706 0 : let cancel = cancel.clone();
707 :
708 0 : async move {
709 0 : this
710 0 : .notify_execute(maybe_send_result, tenant_shard_id, &cancel)
711 0 : .await.map_err(|e| (tenant_shard_id, e))
712 0 : }.instrument(info_span!(
713 0 : "notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
714 : ))
715 0 : })
716 0 : .buffered(API_CONCURRENCY);
717 :
718 : loop {
719 0 : tokio::select! {
720 0 : next = stream.next() => {
721 0 : match next {
722 0 : Some(r) => {
723 0 : result_tx.send(r).await.ok();
724 : },
725 : None => {
726 0 : tracing::info!("Finished sending background compute notifications");
727 0 : break;
728 : }
729 : }
730 : },
731 0 : _ = cancel.cancelled() => {
732 0 : tracing::info!("Shutdown while running background compute notifications");
733 0 : break;
734 : }
735 : };
736 : }
737 0 : });
738 0 : }
739 :
740 : /// Call this to notify the compute (postgres) tier of new pageservers to use
741 : /// for a tenant. notify() is called by each shard individually, and this function
742 : /// will decide whether an update to the tenant is sent. An update is sent on the
743 : /// condition that:
744 : /// - We know a pageserver for every shard.
745 : /// - All the shards have the same shard_count (i.e. we are not mid-split)
746 : ///
747 : /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
748 : /// that is cancelled.
749 : ///
750 : /// This function is fallible, including in the case that the control plane is transiently
751 : /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
752 : /// periods, but we don't retry forever. The **caller** is responsible for handling failures and
753 : /// ensuring that they eventually call again to ensure that the compute is eventually notified of
754 : /// the proper pageserver nodes for a tenant.
755 : #[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
756 : pub(super) async fn notify<'a>(
757 : &self,
758 : shard_update: ShardUpdate<'a>,
759 : cancel: &CancellationToken,
760 : ) -> Result<(), NotifyError> {
761 : let tenant_shard_id = shard_update.tenant_shard_id;
762 : let maybe_send_result = self.notify_prepare(shard_update);
763 : self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
764 : .await
765 : }
766 :
767 : /// Reflect a detach for a particular shard in the compute hook state.
768 : ///
769 : /// The goal is to avoid sending compute notifications with stale information (i.e.
770 : /// including detach pageservers).
771 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
772 : pub(super) fn handle_detach(
773 : &self,
774 : tenant_shard_id: TenantShardId,
775 : stripe_size: ShardStripeSize,
776 : ) {
777 : use std::collections::hash_map::Entry;
778 :
779 : let mut state_locked = self.state.lock().unwrap();
780 : match state_locked.entry(tenant_shard_id.tenant_id) {
781 : Entry::Vacant(_) => {
782 : // This is a valid but niche case, where the tenant was previously attached
783 : // as a Secondary location and then detached, so has no previously notified
784 : // state.
785 : tracing::info!("Compute hook tenant not found for detach");
786 : }
787 : Entry::Occupied(mut e) => {
788 : let sharded = e.get().is_sharded();
789 : if !sharded {
790 : e.remove();
791 : } else {
792 : e.get_mut().remove_shard(tenant_shard_id, stripe_size);
793 : }
794 :
795 : tracing::debug!("Compute hook handled shard detach");
796 : }
797 : }
798 : }
799 : }
800 :
801 : #[cfg(test)]
802 : pub(crate) mod tests {
803 : use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber};
804 : use utils::id::TenantId;
805 :
806 : use super::*;
807 :
808 : #[test]
809 1 : fn tenant_updates() -> anyhow::Result<()> {
810 1 : let tenant_id = TenantId::generate();
811 1 : let stripe_size = DEFAULT_STRIPE_SIZE;
812 1 : let mut tenant_state = ComputeHookTenant::new(
813 1 : TenantShardId {
814 1 : tenant_id,
815 1 : shard_count: ShardCount::new(0),
816 1 : shard_number: ShardNumber(0),
817 1 : },
818 1 : ShardStripeSize(12345),
819 1 : None,
820 1 : NodeId(1),
821 1 : );
822 1 :
823 1 : // An unsharded tenant is always ready to emit a notification, but won't
824 1 : // send the same one twice
825 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
826 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
827 0 : anyhow::bail!("Wrong send result");
828 : };
829 1 : assert_eq!(request.shards.len(), 1);
830 1 : assert!(request.stripe_size.is_none());
831 :
832 : // Simulate successful send
833 1 : *guard = Some(ComputeRemoteState {
834 1 : request,
835 1 : applied: true,
836 1 : });
837 1 : drop(guard);
838 1 :
839 1 : // Try asking again: this should be a no-op
840 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
841 1 : assert!(matches!(send_result, MaybeSendResult::Noop));
842 :
843 : // Writing the first shard of a multi-sharded situation (i.e. in a split)
844 : // resets the tenant state and puts it in an non-notifying state (need to
845 : // see all shards)
846 1 : tenant_state.update(ShardUpdate {
847 1 : tenant_shard_id: TenantShardId {
848 1 : tenant_id,
849 1 : shard_count: ShardCount::new(2),
850 1 : shard_number: ShardNumber(1),
851 1 : },
852 1 : stripe_size,
853 1 : preferred_az: None,
854 1 : node_id: NodeId(1),
855 1 : });
856 1 : assert!(matches!(
857 1 : tenant_state.maybe_send(tenant_id, None),
858 : MaybeSendResult::Noop
859 : ));
860 :
861 : // Writing the second shard makes it ready to notify
862 1 : tenant_state.update(ShardUpdate {
863 1 : tenant_shard_id: TenantShardId {
864 1 : tenant_id,
865 1 : shard_count: ShardCount::new(2),
866 1 : shard_number: ShardNumber(0),
867 1 : },
868 1 : stripe_size,
869 1 : preferred_az: None,
870 1 : node_id: NodeId(1),
871 1 : });
872 1 :
873 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
874 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
875 0 : anyhow::bail!("Wrong send result");
876 : };
877 1 : assert_eq!(request.shards.len(), 2);
878 1 : assert_eq!(request.stripe_size, Some(stripe_size));
879 :
880 : // Simulate successful send
881 1 : *guard = Some(ComputeRemoteState {
882 1 : request,
883 1 : applied: true,
884 1 : });
885 1 : drop(guard);
886 1 :
887 1 : Ok(())
888 1 : }
889 : }
|