Line data Source code
1 : use std::sync::Arc;
2 : use std::{collections::HashMap, time::Duration};
3 :
4 : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
5 : use control_plane::local_env::LocalEnv;
6 : use futures::StreamExt;
7 : use hyper::StatusCode;
8 : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
9 : use postgres_connection::parse_host_port;
10 : use serde::{Deserialize, Serialize};
11 : use tokio_util::sync::CancellationToken;
12 : use tracing::{info_span, Instrument};
13 : use utils::{
14 : backoff::{self},
15 : id::{NodeId, TenantId},
16 : };
17 :
18 : use crate::service::Config;
19 :
20 : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
21 :
22 : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
23 :
24 : pub(crate) const API_CONCURRENCY: usize = 32;
25 :
26 : struct UnshardedComputeHookTenant {
27 : // Which node is this tenant attached to
28 : node_id: NodeId,
29 :
30 : // Must hold this lock to send a notification.
31 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
32 : }
33 : struct ShardedComputeHookTenant {
34 : stripe_size: ShardStripeSize,
35 : shard_count: ShardCount,
36 : shards: Vec<(ShardNumber, NodeId)>,
37 :
38 : // Must hold this lock to send a notification. The contents represent
39 : // the last successfully sent notification, and are used to coalesce multiple
40 : // updates by only sending when there is a chance since our last successful send.
41 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
42 : }
43 :
44 : enum ComputeHookTenant {
45 : Unsharded(UnshardedComputeHookTenant),
46 : Sharded(ShardedComputeHookTenant),
47 : }
48 :
49 : impl ComputeHookTenant {
50 : /// Construct with at least one shard's information
51 2 : fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
52 2 : if tenant_shard_id.shard_count.count() > 1 {
53 1 : Self::Sharded(ShardedComputeHookTenant {
54 1 : shards: vec![(tenant_shard_id.shard_number, node_id)],
55 1 : stripe_size,
56 1 : shard_count: tenant_shard_id.shard_count,
57 1 : send_lock: Arc::default(),
58 1 : })
59 : } else {
60 1 : Self::Unsharded(UnshardedComputeHookTenant {
61 1 : node_id,
62 1 : send_lock: Arc::default(),
63 1 : })
64 : }
65 2 : }
66 :
67 4 : fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>> {
68 4 : match self {
69 2 : Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
70 2 : Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
71 : }
72 4 : }
73 :
74 0 : fn is_sharded(&self) -> bool {
75 0 : matches!(self, ComputeHookTenant::Sharded(_))
76 0 : }
77 :
78 : /// Clear compute hook state for the specified shard.
79 : /// Only valid for [`ComputeHookTenant::Sharded`] instances.
80 0 : fn remove_shard(&mut self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize) {
81 0 : match self {
82 0 : ComputeHookTenant::Sharded(sharded) => {
83 0 : if sharded.stripe_size != stripe_size
84 0 : || sharded.shard_count != tenant_shard_id.shard_count
85 : {
86 0 : tracing::warn!("Shard split detected while handling detach")
87 0 : }
88 :
89 0 : let shard_idx = sharded.shards.iter().position(|(shard_number, _node_id)| {
90 0 : *shard_number == tenant_shard_id.shard_number
91 0 : });
92 :
93 0 : if let Some(shard_idx) = shard_idx {
94 0 : sharded.shards.remove(shard_idx);
95 0 : } else {
96 0 : tracing::warn!("Shard not found while handling detach")
97 : }
98 : }
99 : ComputeHookTenant::Unsharded(_) => {
100 0 : unreachable!("Detach of unsharded tenants is handled externally");
101 : }
102 : }
103 0 : }
104 :
105 : /// Set one shard's location. If stripe size or shard count have changed, Self is reset
106 : /// and drops existing content.
107 2 : fn update(
108 2 : &mut self,
109 2 : tenant_shard_id: TenantShardId,
110 2 : stripe_size: ShardStripeSize,
111 2 : node_id: NodeId,
112 2 : ) {
113 1 : match self {
114 1 : Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
115 0 : unsharded_tenant.node_id = node_id
116 : }
117 1 : Self::Sharded(sharded_tenant)
118 1 : if sharded_tenant.stripe_size == stripe_size
119 1 : && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
120 : {
121 1 : if let Some(existing) = sharded_tenant
122 1 : .shards
123 1 : .iter()
124 1 : .position(|s| s.0 == tenant_shard_id.shard_number)
125 0 : {
126 0 : sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
127 0 : } else {
128 1 : sharded_tenant
129 1 : .shards
130 1 : .push((tenant_shard_id.shard_number, node_id));
131 2 : sharded_tenant.shards.sort_by_key(|s| s.0)
132 : }
133 : }
134 1 : _ => {
135 1 : // Shard count changed: reset struct.
136 1 : *self = Self::new(tenant_shard_id, stripe_size, node_id);
137 1 : }
138 : }
139 2 : }
140 : }
141 :
142 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
143 : struct ComputeHookNotifyRequestShard {
144 : node_id: NodeId,
145 : shard_number: ShardNumber,
146 : }
147 :
148 : /// Request body that we send to the control plane to notify it of where a tenant is attached
149 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
150 : struct ComputeHookNotifyRequest {
151 : tenant_id: TenantId,
152 : stripe_size: Option<ShardStripeSize>,
153 : shards: Vec<ComputeHookNotifyRequestShard>,
154 : }
155 :
156 : /// Error type for attempts to call into the control plane compute notification hook
157 0 : #[derive(thiserror::Error, Debug)]
158 : pub(crate) enum NotifyError {
159 : // Request was not send successfully, e.g. transport error
160 : #[error("Sending request: {0}")]
161 : Request(#[from] reqwest::Error),
162 : // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
163 : #[error("Control plane tenant busy")]
164 : Busy,
165 : // Explicit 429 response asking us to retry less frequently
166 : #[error("Control plane overloaded")]
167 : SlowDown,
168 : // A 503 response indicates the control plane can't handle the request right now
169 : #[error("Control plane unavailable (status {0})")]
170 : Unavailable(StatusCode),
171 : // API returned unexpected non-success status. We will retry, but log a warning.
172 : #[error("Control plane returned unexpected status {0}")]
173 : Unexpected(StatusCode),
174 : // We shutdown while sending
175 : #[error("Shutting down")]
176 : ShuttingDown,
177 : // A response indicates we will never succeed, such as 400 or 404
178 : #[error("Non-retryable error {0}")]
179 : Fatal(StatusCode),
180 :
181 : #[error("neon_local error: {0}")]
182 : NeonLocal(anyhow::Error),
183 : }
184 :
185 : enum MaybeSendResult {
186 : // Please send this request while holding the lock, and if you succeed then write
187 : // the request into the lock.
188 : Transmit(
189 : (
190 : ComputeHookNotifyRequest,
191 : tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>,
192 : ),
193 : ),
194 : // Something requires sending, but you must wait for a current sender then call again
195 : AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>),
196 : // Nothing requires sending
197 : Noop,
198 : }
199 :
200 : impl ComputeHookTenant {
201 4 : fn maybe_send(
202 4 : &self,
203 4 : tenant_id: TenantId,
204 4 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>>,
205 4 : ) -> MaybeSendResult {
206 4 : let locked = match lock {
207 0 : Some(already_locked) => already_locked,
208 : None => {
209 : // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
210 4 : let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
211 0 : return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
212 : };
213 4 : locked
214 : }
215 : };
216 :
217 4 : let request = match self {
218 2 : Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
219 2 : tenant_id,
220 2 : shards: vec![ComputeHookNotifyRequestShard {
221 2 : shard_number: ShardNumber(0),
222 2 : node_id: unsharded_tenant.node_id,
223 2 : }],
224 2 : stripe_size: None,
225 2 : }),
226 2 : Self::Sharded(sharded_tenant)
227 2 : if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
228 1 : {
229 1 : Some(ComputeHookNotifyRequest {
230 1 : tenant_id,
231 1 : shards: sharded_tenant
232 1 : .shards
233 1 : .iter()
234 2 : .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
235 2 : shard_number: *shard_number,
236 2 : node_id: *node_id,
237 2 : })
238 1 : .collect(),
239 1 : stripe_size: Some(sharded_tenant.stripe_size),
240 1 : })
241 : }
242 1 : Self::Sharded(sharded_tenant) => {
243 1 : // Sharded tenant doesn't yet have information for all its shards
244 1 :
245 1 : tracing::info!(
246 0 : "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
247 0 : sharded_tenant.shards.len(),
248 0 : sharded_tenant.shard_count.count()
249 : );
250 1 : None
251 : }
252 : };
253 :
254 3 : match request {
255 : None => {
256 : // Not yet ready to emit a notification
257 1 : tracing::info!("Tenant isn't yet ready to emit a notification");
258 1 : MaybeSendResult::Noop
259 : }
260 3 : Some(request) if Some(&request) == locked.as_ref() => {
261 1 : // No change from the last value successfully sent
262 1 : MaybeSendResult::Noop
263 : }
264 2 : Some(request) => MaybeSendResult::Transmit((request, locked)),
265 : }
266 4 : }
267 : }
268 :
269 : /// The compute hook is a destination for notifications about changes to tenant:pageserver
270 : /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
271 : /// the compute connection string.
272 : pub(super) struct ComputeHook {
273 : config: Config,
274 : state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
275 : authorization_header: Option<String>,
276 :
277 : // Concurrency limiter, so that we do not overload the cloud control plane when updating
278 : // large numbers of tenants (e.g. when failing over after a node failure)
279 : api_concurrency: tokio::sync::Semaphore,
280 :
281 : // This lock is only used in testing enviroments, to serialize calls into neon_lock
282 : neon_local_lock: tokio::sync::Mutex<()>,
283 :
284 : // We share a client across all notifications to enable connection re-use etc when
285 : // sending large numbers of notifications
286 : client: reqwest::Client,
287 : }
288 :
289 : impl ComputeHook {
290 0 : pub(super) fn new(config: Config) -> Self {
291 0 : let authorization_header = config
292 0 : .control_plane_jwt_token
293 0 : .clone()
294 0 : .map(|jwt| format!("Bearer {}", jwt));
295 0 :
296 0 : let client = reqwest::ClientBuilder::new()
297 0 : .timeout(NOTIFY_REQUEST_TIMEOUT)
298 0 : .build()
299 0 : .expect("Failed to construct HTTP client");
300 0 :
301 0 : Self {
302 0 : state: Default::default(),
303 0 : config,
304 0 : authorization_header,
305 0 : neon_local_lock: Default::default(),
306 0 : api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
307 0 : client,
308 0 : }
309 0 : }
310 :
311 : /// For test environments: use neon_local's LocalEnv to update compute
312 0 : async fn do_notify_local(
313 0 : &self,
314 0 : reconfigure_request: &ComputeHookNotifyRequest,
315 0 : ) -> Result<(), NotifyError> {
316 : // neon_local updates are not safe to call concurrently, use a lock to serialize
317 : // all calls to this function
318 0 : let _locked = self.neon_local_lock.lock().await;
319 :
320 0 : let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
321 0 : tracing::warn!(
322 0 : "neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
323 : );
324 0 : return Ok(());
325 : };
326 0 : let env = match LocalEnv::load_config(repo_dir) {
327 0 : Ok(e) => e,
328 0 : Err(e) => {
329 0 : tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
330 0 : return Ok(());
331 : }
332 : };
333 0 : let cplane =
334 0 : ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
335 0 : let ComputeHookNotifyRequest {
336 0 : tenant_id,
337 0 : shards,
338 0 : stripe_size,
339 0 : } = reconfigure_request;
340 0 :
341 0 : let compute_pageservers = shards
342 0 : .iter()
343 0 : .map(|shard| {
344 0 : let ps_conf = env
345 0 : .get_pageserver_conf(shard.node_id)
346 0 : .expect("Unknown pageserver");
347 0 : let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
348 0 : .expect("Unable to parse listen_pg_addr");
349 0 : (pg_host, pg_port.unwrap_or(5432))
350 0 : })
351 0 : .collect::<Vec<_>>();
352 :
353 0 : for (endpoint_name, endpoint) in &cplane.endpoints {
354 0 : if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
355 0 : tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
356 0 : endpoint
357 0 : .reconfigure(compute_pageservers.clone(), *stripe_size, None)
358 0 : .await
359 0 : .map_err(NotifyError::NeonLocal)?;
360 0 : }
361 : }
362 :
363 0 : Ok(())
364 0 : }
365 :
366 0 : async fn do_notify_iteration(
367 0 : &self,
368 0 : url: &String,
369 0 : reconfigure_request: &ComputeHookNotifyRequest,
370 0 : cancel: &CancellationToken,
371 0 : ) -> Result<(), NotifyError> {
372 0 : let req = self.client.request(reqwest::Method::PUT, url);
373 0 : let req = if let Some(value) = &self.authorization_header {
374 0 : req.header(reqwest::header::AUTHORIZATION, value)
375 : } else {
376 0 : req
377 : };
378 :
379 0 : tracing::info!(
380 0 : "Sending notify request to {} ({:?})",
381 : url,
382 : reconfigure_request
383 : );
384 0 : let send_result = req.json(&reconfigure_request).send().await;
385 0 : let response = match send_result {
386 0 : Ok(r) => r,
387 0 : Err(e) => return Err(e.into()),
388 : };
389 :
390 : // Treat all 2xx responses as success
391 0 : if response.status() >= reqwest::StatusCode::OK
392 0 : && response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
393 : {
394 0 : if response.status() != reqwest::StatusCode::OK {
395 : // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
396 : // log a warning.
397 0 : tracing::warn!(
398 0 : "Unexpected 2xx response code {} from control plane",
399 0 : response.status()
400 : );
401 0 : }
402 :
403 0 : return Ok(());
404 0 : }
405 0 :
406 0 : // Error response codes
407 0 : match response.status() {
408 : reqwest::StatusCode::TOO_MANY_REQUESTS => {
409 : // TODO: 429 handling should be global: set some state visible to other requests
410 : // so that they will delay before starting, rather than all notifications trying
411 : // once before backing off.
412 0 : tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
413 0 : .await
414 0 : .ok();
415 0 : Err(NotifyError::SlowDown)
416 : }
417 : reqwest::StatusCode::LOCKED => {
418 : // We consider this fatal, because it's possible that the operation blocking the control one is
419 : // also the one that is waiting for this reconcile. We should let the reconciler calling
420 : // this hook fail, to give control plane a chance to un-lock.
421 0 : tracing::info!("Control plane reports tenant is locked, dropping out of notify");
422 0 : Err(NotifyError::Busy)
423 : }
424 : reqwest::StatusCode::SERVICE_UNAVAILABLE => {
425 0 : Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
426 : }
427 : reqwest::StatusCode::GATEWAY_TIMEOUT => {
428 0 : Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
429 : }
430 : reqwest::StatusCode::BAD_GATEWAY => {
431 0 : Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
432 : }
433 :
434 0 : reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
435 0 : reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
436 0 : reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
437 0 : status => Err(NotifyError::Unexpected(
438 0 : hyper::StatusCode::from_u16(status.as_u16())
439 0 : .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
440 0 : )),
441 : }
442 0 : }
443 :
444 0 : async fn do_notify(
445 0 : &self,
446 0 : url: &String,
447 0 : reconfigure_request: &ComputeHookNotifyRequest,
448 0 : cancel: &CancellationToken,
449 0 : ) -> Result<(), NotifyError> {
450 : // We hold these semaphore units across all retries, rather than only across each
451 : // HTTP request: this is to preserve fairness and avoid a situation where a retry might
452 : // time out waiting for a semaphore.
453 0 : let _units = self
454 0 : .api_concurrency
455 0 : .acquire()
456 0 : .await
457 : // Interpret closed semaphore as shutdown
458 0 : .map_err(|_| NotifyError::ShuttingDown)?;
459 :
460 0 : backoff::retry(
461 0 : || self.do_notify_iteration(url, reconfigure_request, cancel),
462 0 : |e| {
463 0 : matches!(
464 0 : e,
465 : NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
466 : )
467 0 : },
468 0 : 3,
469 0 : 10,
470 0 : "Send compute notification",
471 0 : cancel,
472 0 : )
473 0 : .await
474 0 : .ok_or_else(|| NotifyError::ShuttingDown)
475 0 : .and_then(|x| x)
476 0 : }
477 :
478 : /// Synchronous phase: update the per-tenant state for the next intended notification
479 0 : fn notify_prepare(
480 0 : &self,
481 0 : tenant_shard_id: TenantShardId,
482 0 : node_id: NodeId,
483 0 : stripe_size: ShardStripeSize,
484 0 : ) -> MaybeSendResult {
485 0 : let mut state_locked = self.state.lock().unwrap();
486 :
487 : use std::collections::hash_map::Entry;
488 0 : let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
489 0 : Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
490 0 : tenant_shard_id,
491 0 : stripe_size,
492 0 : node_id,
493 0 : )),
494 0 : Entry::Occupied(e) => {
495 0 : let tenant = e.into_mut();
496 0 : tenant.update(tenant_shard_id, stripe_size, node_id);
497 0 : tenant
498 : }
499 : };
500 0 : tenant.maybe_send(tenant_shard_id.tenant_id, None)
501 0 : }
502 :
503 0 : async fn notify_execute(
504 0 : &self,
505 0 : maybe_send_result: MaybeSendResult,
506 0 : tenant_shard_id: TenantShardId,
507 0 : cancel: &CancellationToken,
508 0 : ) -> Result<(), NotifyError> {
509 : // Process result: we may get an update to send, or we may have to wait for a lock
510 : // before trying again.
511 0 : let (request, mut send_lock_guard) = match maybe_send_result {
512 : MaybeSendResult::Noop => {
513 0 : return Ok(());
514 : }
515 0 : MaybeSendResult::AwaitLock(send_lock) => {
516 0 : let send_locked = tokio::select! {
517 0 : guard = send_lock.lock_owned() => {guard},
518 0 : _ = cancel.cancelled() => {
519 0 : return Err(NotifyError::ShuttingDown)
520 : }
521 : };
522 :
523 : // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
524 : // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
525 : // try_lock.
526 0 : let state_locked = self.state.lock().unwrap();
527 0 : let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
528 0 : return Ok(());
529 : };
530 0 : match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
531 : MaybeSendResult::AwaitLock(_) => {
532 0 : unreachable!("We supplied lock guard")
533 : }
534 : MaybeSendResult::Noop => {
535 0 : return Ok(());
536 : }
537 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
538 : }
539 : }
540 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
541 : };
542 :
543 0 : let result = if let Some(notify_url) = &self.config.compute_hook_url {
544 0 : self.do_notify(notify_url, &request, cancel).await
545 : } else {
546 0 : self.do_notify_local(&request).await.map_err(|e| {
547 0 : // This path is for testing only, so munge the error into our prod-style error type.
548 0 : tracing::error!("neon_local notification hook failed: {e}");
549 0 : NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
550 0 : })
551 : };
552 :
553 0 : if result.is_ok() {
554 0 : // Before dropping the send lock, stash the request we just sent so that
555 0 : // subsequent callers can avoid redundantly re-sending the same thing.
556 0 : *send_lock_guard = Some(request);
557 0 : }
558 0 : result
559 0 : }
560 :
561 : /// Infallible synchronous fire-and-forget version of notify(), that sends its results to
562 : /// a channel. Something should consume the channel and arrange to try notifying again
563 : /// if something failed.
564 0 : pub(super) fn notify_background(
565 0 : self: &Arc<Self>,
566 0 : notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
567 0 : result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
568 0 : cancel: &CancellationToken,
569 0 : ) {
570 0 : let mut maybe_sends = Vec::new();
571 0 : for (tenant_shard_id, node_id, stripe_size) in notifications {
572 0 : let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
573 0 : maybe_sends.push((tenant_shard_id, maybe_send_result))
574 : }
575 :
576 0 : let this = self.clone();
577 0 : let cancel = cancel.clone();
578 0 :
579 0 : tokio::task::spawn(async move {
580 0 : // Construct an async stream of futures to invoke the compute notify function: we do this
581 0 : // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. The
582 0 : // ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
583 0 : // would mostly just be waiting on that semaphore.
584 0 : let mut stream = futures::stream::iter(maybe_sends)
585 0 : .map(|(tenant_shard_id, maybe_send_result)| {
586 0 : let this = this.clone();
587 0 : let cancel = cancel.clone();
588 :
589 0 : async move {
590 0 : this
591 0 : .notify_execute(maybe_send_result, tenant_shard_id, &cancel)
592 0 : .await.map_err(|e| (tenant_shard_id, e))
593 0 : }.instrument(info_span!(
594 0 : "notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
595 : ))
596 0 : })
597 0 : .buffered(API_CONCURRENCY);
598 :
599 : loop {
600 0 : tokio::select! {
601 0 : next = stream.next() => {
602 0 : match next {
603 0 : Some(r) => {
604 0 : result_tx.send(r).await.ok();
605 : },
606 : None => {
607 0 : tracing::info!("Finished sending background compute notifications");
608 0 : break;
609 : }
610 : }
611 : },
612 0 : _ = cancel.cancelled() => {
613 0 : tracing::info!("Shutdown while running background compute notifications");
614 0 : break;
615 : }
616 : };
617 : }
618 0 : });
619 0 : }
620 :
621 : /// Call this to notify the compute (postgres) tier of new pageservers to use
622 : /// for a tenant. notify() is called by each shard individually, and this function
623 : /// will decide whether an update to the tenant is sent. An update is sent on the
624 : /// condition that:
625 : /// - We know a pageserver for every shard.
626 : /// - All the shards have the same shard_count (i.e. we are not mid-split)
627 : ///
628 : /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
629 : /// that is cancelled.
630 : ///
631 : /// This function is fallible, including in the case that the control plane is transiently
632 : /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
633 : /// periods, but we don't retry forever. The **caller** is responsible for handling failures and
634 : /// ensuring that they eventually call again to ensure that the compute is eventually notified of
635 : /// the proper pageserver nodes for a tenant.
636 0 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
637 : pub(super) async fn notify(
638 : &self,
639 : tenant_shard_id: TenantShardId,
640 : node_id: NodeId,
641 : stripe_size: ShardStripeSize,
642 : cancel: &CancellationToken,
643 : ) -> Result<(), NotifyError> {
644 : let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
645 : self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
646 : .await
647 : }
648 :
649 : /// Reflect a detach for a particular shard in the compute hook state.
650 : ///
651 : /// The goal is to avoid sending compute notifications with stale information (i.e.
652 : /// including detach pageservers).
653 0 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
654 : pub(super) fn handle_detach(
655 : &self,
656 : tenant_shard_id: TenantShardId,
657 : stripe_size: ShardStripeSize,
658 : ) {
659 : use std::collections::hash_map::Entry;
660 :
661 : let mut state_locked = self.state.lock().unwrap();
662 : match state_locked.entry(tenant_shard_id.tenant_id) {
663 : Entry::Vacant(_) => {
664 : tracing::warn!("Compute hook tenant not found for detach");
665 : }
666 : Entry::Occupied(mut e) => {
667 : let sharded = e.get().is_sharded();
668 : if !sharded {
669 : e.remove();
670 : } else {
671 : e.get_mut().remove_shard(tenant_shard_id, stripe_size);
672 : }
673 :
674 : tracing::debug!("Compute hook handled shard detach");
675 : }
676 : }
677 : }
678 : }
679 :
680 : #[cfg(test)]
681 : pub(crate) mod tests {
682 : use pageserver_api::shard::{ShardCount, ShardNumber};
683 : use utils::id::TenantId;
684 :
685 : use super::*;
686 :
687 : #[test]
688 1 : fn tenant_updates() -> anyhow::Result<()> {
689 1 : let tenant_id = TenantId::generate();
690 1 : let mut tenant_state = ComputeHookTenant::new(
691 1 : TenantShardId {
692 1 : tenant_id,
693 1 : shard_count: ShardCount::new(0),
694 1 : shard_number: ShardNumber(0),
695 1 : },
696 1 : ShardStripeSize(12345),
697 1 : NodeId(1),
698 1 : );
699 1 :
700 1 : // An unsharded tenant is always ready to emit a notification, but won't
701 1 : // send the same one twice
702 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
703 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
704 0 : anyhow::bail!("Wrong send result");
705 : };
706 1 : assert_eq!(request.shards.len(), 1);
707 1 : assert!(request.stripe_size.is_none());
708 :
709 : // Simulate successful send
710 1 : *guard = Some(request);
711 1 : drop(guard);
712 1 :
713 1 : // Try asking again: this should be a no-op
714 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
715 1 : assert!(matches!(send_result, MaybeSendResult::Noop));
716 :
717 : // Writing the first shard of a multi-sharded situation (i.e. in a split)
718 : // resets the tenant state and puts it in an non-notifying state (need to
719 : // see all shards)
720 1 : tenant_state.update(
721 1 : TenantShardId {
722 1 : tenant_id,
723 1 : shard_count: ShardCount::new(2),
724 1 : shard_number: ShardNumber(1),
725 1 : },
726 1 : ShardStripeSize(32768),
727 1 : NodeId(1),
728 1 : );
729 1 : assert!(matches!(
730 1 : tenant_state.maybe_send(tenant_id, None),
731 : MaybeSendResult::Noop
732 : ));
733 :
734 : // Writing the second shard makes it ready to notify
735 1 : tenant_state.update(
736 1 : TenantShardId {
737 1 : tenant_id,
738 1 : shard_count: ShardCount::new(2),
739 1 : shard_number: ShardNumber(0),
740 1 : },
741 1 : ShardStripeSize(32768),
742 1 : NodeId(1),
743 1 : );
744 1 :
745 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
746 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
747 0 : anyhow::bail!("Wrong send result");
748 : };
749 1 : assert_eq!(request.shards.len(), 2);
750 1 : assert_eq!(request.stripe_size, Some(ShardStripeSize(32768)));
751 :
752 : // Simulate successful send
753 1 : *guard = Some(request);
754 1 : drop(guard);
755 1 :
756 1 : Ok(())
757 1 : }
758 : }
|