Line data Source code
1 : use std::borrow::Cow;
2 : use std::error::Error as _;
3 : use std::sync::Arc;
4 : use std::{collections::HashMap, time::Duration};
5 :
6 : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
7 : use control_plane::local_env::LocalEnv;
8 : use futures::StreamExt;
9 : use hyper::StatusCode;
10 : use pageserver_api::controller_api::AvailabilityZone;
11 : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
12 : use postgres_connection::parse_host_port;
13 : use serde::{Deserialize, Serialize};
14 : use tokio_util::sync::CancellationToken;
15 : use tracing::{info_span, Instrument};
16 : use utils::{
17 : backoff::{self},
18 : id::{NodeId, TenantId},
19 : };
20 :
21 : use crate::service::Config;
22 :
23 : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
24 :
25 : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
26 :
27 : pub(crate) const API_CONCURRENCY: usize = 32;
28 :
29 : struct UnshardedComputeHookTenant {
30 : // Which node is this tenant attached to
31 : node_id: NodeId,
32 :
33 : // The tenant's preferred AZ, so that we may pass this on to the control plane
34 : preferred_az: Option<AvailabilityZone>,
35 :
36 : // Must hold this lock to send a notification.
37 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
38 : }
39 : struct ShardedComputeHookTenant {
40 : stripe_size: ShardStripeSize,
41 : shard_count: ShardCount,
42 : shards: Vec<(ShardNumber, NodeId)>,
43 :
44 : // The tenant's preferred AZ, so that we may pass this on to the control plane
45 : preferred_az: Option<AvailabilityZone>,
46 :
47 : // Must hold this lock to send a notification. The contents represent
48 : // the last successfully sent notification, and are used to coalesce multiple
49 : // updates by only sending when there is a chance since our last successful send.
50 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
51 : }
52 :
53 : /// Represents our knowledge of the compute's state: we can update this when we get a
54 : /// response from a notify API call, which tells us what has been applied.
55 : ///
56 : /// Should be wrapped in an Option<>, as we cannot always know the remote state.
57 : #[derive(PartialEq, Eq, Debug)]
58 : struct ComputeRemoteState {
59 : // The request body which was acked by the compute
60 : request: ComputeHookNotifyRequest,
61 :
62 : // Whether the cplane indicated that the state was applied to running computes, or just
63 : // persisted. In the Neon control plane, this is the difference between a 423 response (meaning
64 : // persisted but not applied), and a 2xx response (both persisted and applied)
65 : applied: bool,
66 : }
67 :
68 : enum ComputeHookTenant {
69 : Unsharded(UnshardedComputeHookTenant),
70 : Sharded(ShardedComputeHookTenant),
71 : }
72 :
73 : impl ComputeHookTenant {
74 : /// Construct with at least one shard's information
75 2 : fn new(
76 2 : tenant_shard_id: TenantShardId,
77 2 : stripe_size: ShardStripeSize,
78 2 : preferred_az: Option<AvailabilityZone>,
79 2 : node_id: NodeId,
80 2 : ) -> Self {
81 2 : if tenant_shard_id.shard_count.count() > 1 {
82 1 : Self::Sharded(ShardedComputeHookTenant {
83 1 : shards: vec![(tenant_shard_id.shard_number, node_id)],
84 1 : stripe_size,
85 1 : shard_count: tenant_shard_id.shard_count,
86 1 : preferred_az,
87 1 : send_lock: Arc::default(),
88 1 : })
89 : } else {
90 1 : Self::Unsharded(UnshardedComputeHookTenant {
91 1 : node_id,
92 1 : preferred_az,
93 1 : send_lock: Arc::default(),
94 1 : })
95 : }
96 2 : }
97 :
98 4 : fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>> {
99 4 : match self {
100 2 : Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
101 2 : Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
102 : }
103 4 : }
104 :
105 0 : fn is_sharded(&self) -> bool {
106 0 : matches!(self, ComputeHookTenant::Sharded(_))
107 0 : }
108 :
109 : /// Clear compute hook state for the specified shard.
110 : /// Only valid for [`ComputeHookTenant::Sharded`] instances.
111 0 : fn remove_shard(&mut self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize) {
112 0 : match self {
113 0 : ComputeHookTenant::Sharded(sharded) => {
114 0 : if sharded.stripe_size != stripe_size
115 0 : || sharded.shard_count != tenant_shard_id.shard_count
116 : {
117 0 : tracing::warn!("Shard split detected while handling detach")
118 0 : }
119 :
120 0 : let shard_idx = sharded.shards.iter().position(|(shard_number, _node_id)| {
121 0 : *shard_number == tenant_shard_id.shard_number
122 0 : });
123 :
124 0 : if let Some(shard_idx) = shard_idx {
125 0 : sharded.shards.remove(shard_idx);
126 0 : } else {
127 : // This is a valid but niche case, where the tenant was previously attached
128 : // as a Secondary location and then detached, so has no previously notified
129 : // state.
130 0 : tracing::info!("Shard not found while handling detach")
131 : }
132 : }
133 : ComputeHookTenant::Unsharded(_) => {
134 0 : unreachable!("Detach of unsharded tenants is handled externally");
135 : }
136 : }
137 0 : }
138 :
139 : /// Set one shard's location. If stripe size or shard count have changed, Self is reset
140 : /// and drops existing content.
141 2 : fn update(&mut self, shard_update: ShardUpdate) {
142 2 : let tenant_shard_id = shard_update.tenant_shard_id;
143 2 : let node_id = shard_update.node_id;
144 2 : let stripe_size = shard_update.stripe_size;
145 2 : let preferred_az = shard_update.preferred_az;
146 :
147 1 : match self {
148 1 : Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
149 0 : unsharded_tenant.node_id = node_id;
150 0 : if unsharded_tenant.preferred_az.as_ref()
151 0 : != preferred_az.as_ref().map(|az| az.as_ref())
152 0 : {
153 0 : unsharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
154 0 : }
155 : }
156 1 : Self::Sharded(sharded_tenant)
157 1 : if sharded_tenant.stripe_size == stripe_size
158 1 : && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
159 : {
160 1 : if let Some(existing) = sharded_tenant
161 1 : .shards
162 1 : .iter()
163 1 : .position(|s| s.0 == tenant_shard_id.shard_number)
164 0 : {
165 0 : sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
166 0 : } else {
167 1 : sharded_tenant
168 1 : .shards
169 1 : .push((tenant_shard_id.shard_number, node_id));
170 2 : sharded_tenant.shards.sort_by_key(|s| s.0)
171 : }
172 :
173 1 : if sharded_tenant.preferred_az.as_ref()
174 1 : != preferred_az.as_ref().map(|az| az.as_ref())
175 0 : {
176 0 : sharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
177 1 : }
178 : }
179 1 : _ => {
180 1 : // Shard count changed: reset struct.
181 1 : *self = Self::new(
182 1 : tenant_shard_id,
183 1 : stripe_size,
184 1 : preferred_az.map(|az| az.into_owned()),
185 1 : node_id,
186 1 : );
187 1 : }
188 : }
189 2 : }
190 : }
191 :
192 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
193 : struct ComputeHookNotifyRequestShard {
194 : node_id: NodeId,
195 : shard_number: ShardNumber,
196 : }
197 :
198 : /// Request body that we send to the control plane to notify it of where a tenant is attached
199 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
200 : struct ComputeHookNotifyRequest {
201 : tenant_id: TenantId,
202 : preferred_az: Option<String>,
203 : stripe_size: Option<ShardStripeSize>,
204 : shards: Vec<ComputeHookNotifyRequestShard>,
205 : }
206 :
207 : /// Error type for attempts to call into the control plane compute notification hook
208 : #[derive(thiserror::Error, Debug)]
209 : pub(crate) enum NotifyError {
210 : // Request was not send successfully, e.g. transport error
211 0 : #[error("Sending request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
212 : Request(#[from] reqwest::Error),
213 : // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
214 : #[error("Control plane tenant busy")]
215 : Busy,
216 : // Explicit 429 response asking us to retry less frequently
217 : #[error("Control plane overloaded")]
218 : SlowDown,
219 : // A 503 response indicates the control plane can't handle the request right now
220 : #[error("Control plane unavailable (status {0})")]
221 : Unavailable(StatusCode),
222 : // API returned unexpected non-success status. We will retry, but log a warning.
223 : #[error("Control plane returned unexpected status {0}")]
224 : Unexpected(StatusCode),
225 : // We shutdown while sending
226 : #[error("Shutting down")]
227 : ShuttingDown,
228 : // A response indicates we will never succeed, such as 400 or 403
229 : #[error("Non-retryable error {0}")]
230 : Fatal(StatusCode),
231 :
232 : #[error("neon_local error: {0}")]
233 : NeonLocal(anyhow::Error),
234 : }
235 :
236 : enum MaybeSendResult {
237 : // Please send this request while holding the lock, and if you succeed then write
238 : // the request into the lock.
239 : Transmit(
240 : (
241 : ComputeHookNotifyRequest,
242 : tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>,
243 : ),
244 : ),
245 : // Something requires sending, but you must wait for a current sender then call again
246 : AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>),
247 : // Nothing requires sending
248 : Noop,
249 : }
250 :
251 : impl ComputeHookTenant {
252 4 : fn maybe_send(
253 4 : &self,
254 4 : tenant_id: TenantId,
255 4 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>>,
256 4 : ) -> MaybeSendResult {
257 4 : let locked = match lock {
258 0 : Some(already_locked) => already_locked,
259 : None => {
260 : // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
261 4 : let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
262 0 : return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
263 : };
264 4 : locked
265 : }
266 : };
267 :
268 4 : let request = match self {
269 2 : Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
270 2 : tenant_id,
271 2 : shards: vec![ComputeHookNotifyRequestShard {
272 2 : shard_number: ShardNumber(0),
273 2 : node_id: unsharded_tenant.node_id,
274 2 : }],
275 2 : stripe_size: None,
276 2 : preferred_az: unsharded_tenant
277 2 : .preferred_az
278 2 : .as_ref()
279 2 : .map(|az| az.0.clone()),
280 2 : }),
281 2 : Self::Sharded(sharded_tenant)
282 2 : if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
283 1 : {
284 1 : Some(ComputeHookNotifyRequest {
285 1 : tenant_id,
286 1 : shards: sharded_tenant
287 1 : .shards
288 1 : .iter()
289 2 : .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
290 2 : shard_number: *shard_number,
291 2 : node_id: *node_id,
292 2 : })
293 1 : .collect(),
294 1 : stripe_size: Some(sharded_tenant.stripe_size),
295 1 : preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()),
296 1 : })
297 : }
298 1 : Self::Sharded(sharded_tenant) => {
299 1 : // Sharded tenant doesn't yet have information for all its shards
300 1 :
301 1 : tracing::info!(
302 0 : "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
303 0 : sharded_tenant.shards.len(),
304 0 : sharded_tenant.shard_count.count()
305 : );
306 1 : None
307 : }
308 : };
309 :
310 3 : match request {
311 : None => {
312 : // Not yet ready to emit a notification
313 1 : tracing::info!("Tenant isn't yet ready to emit a notification");
314 1 : MaybeSendResult::Noop
315 : }
316 1 : Some(request)
317 3 : if Some(&request) == locked.as_ref().map(|s| &s.request)
318 1 : && locked.as_ref().map(|s| s.applied).unwrap_or(false) =>
319 1 : {
320 1 : tracing::info!(
321 0 : "Skipping notification because remote state already matches ({:?})",
322 0 : &request
323 : );
324 : // No change from the last value successfully sent, and our state indicates that the last
325 : // value sent was fully applied on the control plane side.
326 1 : MaybeSendResult::Noop
327 : }
328 2 : Some(request) => {
329 2 : // Our request differs from the last one sent, or the last one sent was not fully applied on the compute side
330 2 : MaybeSendResult::Transmit((request, locked))
331 : }
332 : }
333 4 : }
334 : }
335 :
336 : /// The compute hook is a destination for notifications about changes to tenant:pageserver
337 : /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
338 : /// the compute connection string.
339 : pub(super) struct ComputeHook {
340 : config: Config,
341 : state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
342 : authorization_header: Option<String>,
343 :
344 : // Concurrency limiter, so that we do not overload the cloud control plane when updating
345 : // large numbers of tenants (e.g. when failing over after a node failure)
346 : api_concurrency: tokio::sync::Semaphore,
347 :
348 : // This lock is only used in testing enviroments, to serialize calls into neon_lock
349 : neon_local_lock: tokio::sync::Mutex<()>,
350 :
351 : // We share a client across all notifications to enable connection re-use etc when
352 : // sending large numbers of notifications
353 : client: reqwest::Client,
354 : }
355 :
356 : /// Callers may give us a list of these when asking us to send a bulk batch
357 : /// of notifications in the background. This is a 'notification' in the sense of
358 : /// other code notifying us of a shard's status, rather than being the final notification
359 : /// that we send upwards to the control plane for the whole tenant.
360 : pub(crate) struct ShardUpdate<'a> {
361 : pub(crate) tenant_shard_id: TenantShardId,
362 : pub(crate) node_id: NodeId,
363 : pub(crate) stripe_size: ShardStripeSize,
364 : pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
365 : }
366 :
367 : impl ComputeHook {
368 0 : pub(super) fn new(config: Config) -> Self {
369 0 : let authorization_header = config
370 0 : .control_plane_jwt_token
371 0 : .clone()
372 0 : .map(|jwt| format!("Bearer {}", jwt));
373 0 :
374 0 : let client = reqwest::ClientBuilder::new()
375 0 : .timeout(NOTIFY_REQUEST_TIMEOUT)
376 0 : .build()
377 0 : .expect("Failed to construct HTTP client");
378 0 :
379 0 : Self {
380 0 : state: Default::default(),
381 0 : config,
382 0 : authorization_header,
383 0 : neon_local_lock: Default::default(),
384 0 : api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
385 0 : client,
386 0 : }
387 0 : }
388 :
389 : /// For test environments: use neon_local's LocalEnv to update compute
390 0 : async fn do_notify_local(
391 0 : &self,
392 0 : reconfigure_request: &ComputeHookNotifyRequest,
393 0 : ) -> Result<(), NotifyError> {
394 : // neon_local updates are not safe to call concurrently, use a lock to serialize
395 : // all calls to this function
396 0 : let _locked = self.neon_local_lock.lock().await;
397 :
398 0 : let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
399 0 : tracing::warn!(
400 0 : "neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
401 : );
402 0 : return Ok(());
403 : };
404 0 : let env = match LocalEnv::load_config(repo_dir) {
405 0 : Ok(e) => e,
406 0 : Err(e) => {
407 0 : tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
408 0 : return Ok(());
409 : }
410 : };
411 0 : let cplane =
412 0 : ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
413 0 : let ComputeHookNotifyRequest {
414 0 : tenant_id,
415 0 : shards,
416 0 : stripe_size,
417 0 : preferred_az: _preferred_az,
418 0 : } = reconfigure_request;
419 0 :
420 0 : let compute_pageservers = shards
421 0 : .iter()
422 0 : .map(|shard| {
423 0 : let ps_conf = env
424 0 : .get_pageserver_conf(shard.node_id)
425 0 : .expect("Unknown pageserver");
426 0 : let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
427 0 : .expect("Unable to parse listen_pg_addr");
428 0 : (pg_host, pg_port.unwrap_or(5432))
429 0 : })
430 0 : .collect::<Vec<_>>();
431 :
432 0 : for (endpoint_name, endpoint) in &cplane.endpoints {
433 0 : if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
434 0 : tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
435 0 : endpoint
436 0 : .reconfigure(compute_pageservers.clone(), *stripe_size, None)
437 0 : .await
438 0 : .map_err(NotifyError::NeonLocal)?;
439 0 : }
440 : }
441 :
442 0 : Ok(())
443 0 : }
444 :
445 0 : async fn do_notify_iteration(
446 0 : &self,
447 0 : url: &String,
448 0 : reconfigure_request: &ComputeHookNotifyRequest,
449 0 : cancel: &CancellationToken,
450 0 : ) -> Result<(), NotifyError> {
451 0 : let req = self.client.request(reqwest::Method::PUT, url);
452 0 : let req = if let Some(value) = &self.authorization_header {
453 0 : req.header(reqwest::header::AUTHORIZATION, value)
454 : } else {
455 0 : req
456 : };
457 :
458 0 : tracing::info!(
459 0 : "Sending notify request to {} ({:?})",
460 : url,
461 : reconfigure_request
462 : );
463 0 : let send_result = req.json(&reconfigure_request).send().await;
464 0 : let response = match send_result {
465 0 : Ok(r) => r,
466 0 : Err(e) => return Err(e.into()),
467 : };
468 :
469 : // Treat all 2xx responses as success
470 0 : if response.status() >= reqwest::StatusCode::OK
471 0 : && response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
472 : {
473 0 : if response.status() != reqwest::StatusCode::OK {
474 : // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
475 : // log a warning.
476 0 : tracing::warn!(
477 0 : "Unexpected 2xx response code {} from control plane",
478 0 : response.status()
479 : );
480 0 : }
481 :
482 0 : return Ok(());
483 0 : }
484 0 :
485 0 : // Error response codes
486 0 : match response.status() {
487 : reqwest::StatusCode::TOO_MANY_REQUESTS => {
488 : // TODO: 429 handling should be global: set some state visible to other requests
489 : // so that they will delay before starting, rather than all notifications trying
490 : // once before backing off.
491 0 : tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
492 0 : .await
493 0 : .ok();
494 0 : Err(NotifyError::SlowDown)
495 : }
496 : reqwest::StatusCode::LOCKED => {
497 : // We consider this fatal, because it's possible that the operation blocking the control one is
498 : // also the one that is waiting for this reconcile. We should let the reconciler calling
499 : // this hook fail, to give control plane a chance to un-lock.
500 0 : tracing::info!("Control plane reports tenant is locked, dropping out of notify");
501 0 : Err(NotifyError::Busy)
502 : }
503 : reqwest::StatusCode::SERVICE_UNAVAILABLE => {
504 0 : Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
505 : }
506 : reqwest::StatusCode::GATEWAY_TIMEOUT => {
507 0 : Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
508 : }
509 : reqwest::StatusCode::BAD_GATEWAY => {
510 0 : Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
511 : }
512 :
513 0 : reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
514 0 : reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
515 0 : reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
516 0 : status => Err(NotifyError::Unexpected(
517 0 : hyper::StatusCode::from_u16(status.as_u16())
518 0 : .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
519 0 : )),
520 : }
521 0 : }
522 :
523 0 : async fn do_notify(
524 0 : &self,
525 0 : url: &String,
526 0 : reconfigure_request: &ComputeHookNotifyRequest,
527 0 : cancel: &CancellationToken,
528 0 : ) -> Result<(), NotifyError> {
529 : // We hold these semaphore units across all retries, rather than only across each
530 : // HTTP request: this is to preserve fairness and avoid a situation where a retry might
531 : // time out waiting for a semaphore.
532 0 : let _units = self
533 0 : .api_concurrency
534 0 : .acquire()
535 0 : .await
536 : // Interpret closed semaphore as shutdown
537 0 : .map_err(|_| NotifyError::ShuttingDown)?;
538 :
539 0 : backoff::retry(
540 0 : || self.do_notify_iteration(url, reconfigure_request, cancel),
541 0 : |e| {
542 0 : matches!(
543 0 : e,
544 : NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
545 : )
546 0 : },
547 0 : 3,
548 0 : 10,
549 0 : "Send compute notification",
550 0 : cancel,
551 0 : )
552 0 : .await
553 0 : .ok_or_else(|| NotifyError::ShuttingDown)
554 0 : .and_then(|x| x)
555 0 : }
556 :
557 : /// Synchronous phase: update the per-tenant state for the next intended notification
558 0 : fn notify_prepare(&self, shard_update: ShardUpdate) -> MaybeSendResult {
559 0 : let mut state_locked = self.state.lock().unwrap();
560 :
561 : use std::collections::hash_map::Entry;
562 0 : let tenant_shard_id = shard_update.tenant_shard_id;
563 :
564 0 : let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
565 0 : Entry::Vacant(e) => {
566 0 : let ShardUpdate {
567 0 : tenant_shard_id,
568 0 : node_id,
569 0 : stripe_size,
570 0 : preferred_az,
571 0 : } = shard_update;
572 0 : e.insert(ComputeHookTenant::new(
573 0 : tenant_shard_id,
574 0 : stripe_size,
575 0 : preferred_az.map(|az| az.into_owned()),
576 0 : node_id,
577 0 : ))
578 : }
579 0 : Entry::Occupied(e) => {
580 0 : let tenant = e.into_mut();
581 0 : tenant.update(shard_update);
582 0 : tenant
583 : }
584 : };
585 0 : tenant.maybe_send(tenant_shard_id.tenant_id, None)
586 0 : }
587 :
588 0 : async fn notify_execute(
589 0 : &self,
590 0 : maybe_send_result: MaybeSendResult,
591 0 : tenant_shard_id: TenantShardId,
592 0 : cancel: &CancellationToken,
593 0 : ) -> Result<(), NotifyError> {
594 : // Process result: we may get an update to send, or we may have to wait for a lock
595 : // before trying again.
596 0 : let (request, mut send_lock_guard) = match maybe_send_result {
597 : MaybeSendResult::Noop => {
598 0 : return Ok(());
599 : }
600 0 : MaybeSendResult::AwaitLock(send_lock) => {
601 0 : let send_locked = tokio::select! {
602 0 : guard = send_lock.lock_owned() => {guard},
603 0 : _ = cancel.cancelled() => {
604 0 : return Err(NotifyError::ShuttingDown)
605 : }
606 : };
607 :
608 : // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
609 : // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
610 : // try_lock.
611 0 : let state_locked = self.state.lock().unwrap();
612 0 : let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
613 0 : return Ok(());
614 : };
615 0 : match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
616 : MaybeSendResult::AwaitLock(_) => {
617 0 : unreachable!("We supplied lock guard")
618 : }
619 : MaybeSendResult::Noop => {
620 0 : return Ok(());
621 : }
622 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
623 : }
624 : }
625 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
626 : };
627 :
628 0 : let result = if let Some(notify_url) = &self.config.compute_hook_url {
629 0 : self.do_notify(notify_url, &request, cancel).await
630 : } else {
631 0 : self.do_notify_local(&request).await.map_err(|e| {
632 0 : // This path is for testing only, so munge the error into our prod-style error type.
633 0 : tracing::error!("neon_local notification hook failed: {e}");
634 0 : NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
635 0 : })
636 : };
637 :
638 0 : match result {
639 0 : Ok(_) => {
640 0 : // Before dropping the send lock, stash the request we just sent so that
641 0 : // subsequent callers can avoid redundantly re-sending the same thing.
642 0 : *send_lock_guard = Some(ComputeRemoteState {
643 0 : request,
644 0 : applied: true,
645 0 : });
646 0 : }
647 0 : Err(NotifyError::Busy) => {
648 0 : // Busy result means that the server responded and has stored the new configuration,
649 0 : // but was not able to fully apply it to the compute
650 0 : *send_lock_guard = Some(ComputeRemoteState {
651 0 : request,
652 0 : applied: false,
653 0 : });
654 0 : }
655 0 : Err(_) => {
656 0 : // General error case: we can no longer know the remote state, so clear it. This will result in
657 0 : // the logic in maybe_send recognizing that we should call the hook again.
658 0 : *send_lock_guard = None;
659 0 : }
660 : }
661 0 : result
662 0 : }
663 :
664 : /// Infallible synchronous fire-and-forget version of notify(), that sends its results to
665 : /// a channel. Something should consume the channel and arrange to try notifying again
666 : /// if something failed.
667 0 : pub(super) fn notify_background(
668 0 : self: &Arc<Self>,
669 0 : notifications: Vec<ShardUpdate>,
670 0 : result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
671 0 : cancel: &CancellationToken,
672 0 : ) {
673 0 : let mut maybe_sends = Vec::new();
674 0 : for shard_update in notifications {
675 0 : let tenant_shard_id = shard_update.tenant_shard_id;
676 0 : let maybe_send_result = self.notify_prepare(shard_update);
677 0 : maybe_sends.push((tenant_shard_id, maybe_send_result))
678 : }
679 :
680 0 : let this = self.clone();
681 0 : let cancel = cancel.clone();
682 0 :
683 0 : tokio::task::spawn(async move {
684 0 : // Construct an async stream of futures to invoke the compute notify function: we do this
685 0 : // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. The
686 0 : // ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
687 0 : // would mostly just be waiting on that semaphore.
688 0 : let mut stream = futures::stream::iter(maybe_sends)
689 0 : .map(|(tenant_shard_id, maybe_send_result)| {
690 0 : let this = this.clone();
691 0 : let cancel = cancel.clone();
692 :
693 0 : async move {
694 0 : this
695 0 : .notify_execute(maybe_send_result, tenant_shard_id, &cancel)
696 0 : .await.map_err(|e| (tenant_shard_id, e))
697 0 : }.instrument(info_span!(
698 0 : "notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
699 : ))
700 0 : })
701 0 : .buffered(API_CONCURRENCY);
702 :
703 : loop {
704 0 : tokio::select! {
705 0 : next = stream.next() => {
706 0 : match next {
707 0 : Some(r) => {
708 0 : result_tx.send(r).await.ok();
709 : },
710 : None => {
711 0 : tracing::info!("Finished sending background compute notifications");
712 0 : break;
713 : }
714 : }
715 : },
716 0 : _ = cancel.cancelled() => {
717 0 : tracing::info!("Shutdown while running background compute notifications");
718 0 : break;
719 : }
720 : };
721 : }
722 0 : });
723 0 : }
724 :
725 : /// Call this to notify the compute (postgres) tier of new pageservers to use
726 : /// for a tenant. notify() is called by each shard individually, and this function
727 : /// will decide whether an update to the tenant is sent. An update is sent on the
728 : /// condition that:
729 : /// - We know a pageserver for every shard.
730 : /// - All the shards have the same shard_count (i.e. we are not mid-split)
731 : ///
732 : /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
733 : /// that is cancelled.
734 : ///
735 : /// This function is fallible, including in the case that the control plane is transiently
736 : /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
737 : /// periods, but we don't retry forever. The **caller** is responsible for handling failures and
738 : /// ensuring that they eventually call again to ensure that the compute is eventually notified of
739 : /// the proper pageserver nodes for a tenant.
740 : #[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
741 : pub(super) async fn notify<'a>(
742 : &self,
743 : shard_update: ShardUpdate<'a>,
744 : cancel: &CancellationToken,
745 : ) -> Result<(), NotifyError> {
746 : let tenant_shard_id = shard_update.tenant_shard_id;
747 : let maybe_send_result = self.notify_prepare(shard_update);
748 : self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
749 : .await
750 : }
751 :
752 : /// Reflect a detach for a particular shard in the compute hook state.
753 : ///
754 : /// The goal is to avoid sending compute notifications with stale information (i.e.
755 : /// including detach pageservers).
756 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
757 : pub(super) fn handle_detach(
758 : &self,
759 : tenant_shard_id: TenantShardId,
760 : stripe_size: ShardStripeSize,
761 : ) {
762 : use std::collections::hash_map::Entry;
763 :
764 : let mut state_locked = self.state.lock().unwrap();
765 : match state_locked.entry(tenant_shard_id.tenant_id) {
766 : Entry::Vacant(_) => {
767 : // This is a valid but niche case, where the tenant was previously attached
768 : // as a Secondary location and then detached, so has no previously notified
769 : // state.
770 : tracing::info!("Compute hook tenant not found for detach");
771 : }
772 : Entry::Occupied(mut e) => {
773 : let sharded = e.get().is_sharded();
774 : if !sharded {
775 : e.remove();
776 : } else {
777 : e.get_mut().remove_shard(tenant_shard_id, stripe_size);
778 : }
779 :
780 : tracing::debug!("Compute hook handled shard detach");
781 : }
782 : }
783 : }
784 : }
785 :
786 : #[cfg(test)]
787 : pub(crate) mod tests {
788 : use pageserver_api::shard::{ShardCount, ShardNumber};
789 : use utils::id::TenantId;
790 :
791 : use super::*;
792 :
793 : #[test]
794 1 : fn tenant_updates() -> anyhow::Result<()> {
795 1 : let tenant_id = TenantId::generate();
796 1 : let mut tenant_state = ComputeHookTenant::new(
797 1 : TenantShardId {
798 1 : tenant_id,
799 1 : shard_count: ShardCount::new(0),
800 1 : shard_number: ShardNumber(0),
801 1 : },
802 1 : ShardStripeSize(12345),
803 1 : None,
804 1 : NodeId(1),
805 1 : );
806 1 :
807 1 : // An unsharded tenant is always ready to emit a notification, but won't
808 1 : // send the same one twice
809 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
810 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
811 0 : anyhow::bail!("Wrong send result");
812 : };
813 1 : assert_eq!(request.shards.len(), 1);
814 1 : assert!(request.stripe_size.is_none());
815 :
816 : // Simulate successful send
817 1 : *guard = Some(ComputeRemoteState {
818 1 : request,
819 1 : applied: true,
820 1 : });
821 1 : drop(guard);
822 1 :
823 1 : // Try asking again: this should be a no-op
824 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
825 1 : assert!(matches!(send_result, MaybeSendResult::Noop));
826 :
827 : // Writing the first shard of a multi-sharded situation (i.e. in a split)
828 : // resets the tenant state and puts it in an non-notifying state (need to
829 : // see all shards)
830 1 : tenant_state.update(ShardUpdate {
831 1 : tenant_shard_id: TenantShardId {
832 1 : tenant_id,
833 1 : shard_count: ShardCount::new(2),
834 1 : shard_number: ShardNumber(1),
835 1 : },
836 1 : stripe_size: ShardStripeSize(32768),
837 1 : preferred_az: None,
838 1 : node_id: NodeId(1),
839 1 : });
840 1 : assert!(matches!(
841 1 : tenant_state.maybe_send(tenant_id, None),
842 : MaybeSendResult::Noop
843 : ));
844 :
845 : // Writing the second shard makes it ready to notify
846 1 : tenant_state.update(ShardUpdate {
847 1 : tenant_shard_id: TenantShardId {
848 1 : tenant_id,
849 1 : shard_count: ShardCount::new(2),
850 1 : shard_number: ShardNumber(0),
851 1 : },
852 1 : stripe_size: ShardStripeSize(32768),
853 1 : preferred_az: None,
854 1 : node_id: NodeId(1),
855 1 : });
856 1 :
857 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
858 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
859 0 : anyhow::bail!("Wrong send result");
860 : };
861 1 : assert_eq!(request.shards.len(), 2);
862 1 : assert_eq!(request.stripe_size, Some(ShardStripeSize(32768)));
863 :
864 : // Simulate successful send
865 1 : *guard = Some(ComputeRemoteState {
866 1 : request,
867 1 : applied: true,
868 1 : });
869 1 : drop(guard);
870 1 :
871 1 : Ok(())
872 1 : }
873 : }
|