Line data Source code
1 : use std::sync::Arc;
2 : use std::{collections::HashMap, time::Duration};
3 :
4 : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
5 : use control_plane::local_env::LocalEnv;
6 : use futures::StreamExt;
7 : use hyper::StatusCode;
8 : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
9 : use postgres_connection::parse_host_port;
10 : use serde::{Deserialize, Serialize};
11 : use tokio_util::sync::CancellationToken;
12 : use tracing::{info_span, Instrument};
13 : use utils::{
14 : backoff::{self},
15 : id::{NodeId, TenantId},
16 : };
17 :
18 : use crate::service::Config;
19 :
20 : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
21 :
22 : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
23 :
24 : pub(crate) const API_CONCURRENCY: usize = 32;
25 :
26 : struct UnshardedComputeHookTenant {
27 : // Which node is this tenant attached to
28 : node_id: NodeId,
29 :
30 : // Must hold this lock to send a notification.
31 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
32 : }
33 : struct ShardedComputeHookTenant {
34 : stripe_size: ShardStripeSize,
35 : shard_count: ShardCount,
36 : shards: Vec<(ShardNumber, NodeId)>,
37 :
38 : // Must hold this lock to send a notification. The contents represent
39 : // the last successfully sent notification, and are used to coalesce multiple
40 : // updates by only sending when there is a chance since our last successful send.
41 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
42 : }
43 :
44 : /// Represents our knowledge of the compute's state: we can update this when we get a
45 : /// response from a notify API call, which tells us what has been applied.
46 : ///
47 : /// Should be wrapped in an Option<>, as we cannot always know the remote state.
48 : #[derive(PartialEq, Eq, Debug)]
49 : struct ComputeRemoteState {
50 : // The request body which was acked by the compute
51 : request: ComputeHookNotifyRequest,
52 :
53 : // Whether the cplane indicated that the state was applied to running computes, or just
54 : // persisted. In the Neon control plane, this is the difference between a 423 response (meaning
55 : // persisted but not applied), and a 2xx response (both persisted and applied)
56 : applied: bool,
57 : }
58 :
59 : enum ComputeHookTenant {
60 : Unsharded(UnshardedComputeHookTenant),
61 : Sharded(ShardedComputeHookTenant),
62 : }
63 :
64 : impl ComputeHookTenant {
65 : /// Construct with at least one shard's information
66 2 : fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
67 2 : if tenant_shard_id.shard_count.count() > 1 {
68 1 : Self::Sharded(ShardedComputeHookTenant {
69 1 : shards: vec![(tenant_shard_id.shard_number, node_id)],
70 1 : stripe_size,
71 1 : shard_count: tenant_shard_id.shard_count,
72 1 : send_lock: Arc::default(),
73 1 : })
74 : } else {
75 1 : Self::Unsharded(UnshardedComputeHookTenant {
76 1 : node_id,
77 1 : send_lock: Arc::default(),
78 1 : })
79 : }
80 2 : }
81 :
82 4 : fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>> {
83 4 : match self {
84 2 : Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
85 2 : Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
86 : }
87 4 : }
88 :
89 0 : fn is_sharded(&self) -> bool {
90 0 : matches!(self, ComputeHookTenant::Sharded(_))
91 0 : }
92 :
93 : /// Clear compute hook state for the specified shard.
94 : /// Only valid for [`ComputeHookTenant::Sharded`] instances.
95 0 : fn remove_shard(&mut self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize) {
96 0 : match self {
97 0 : ComputeHookTenant::Sharded(sharded) => {
98 0 : if sharded.stripe_size != stripe_size
99 0 : || sharded.shard_count != tenant_shard_id.shard_count
100 : {
101 0 : tracing::warn!("Shard split detected while handling detach")
102 0 : }
103 :
104 0 : let shard_idx = sharded.shards.iter().position(|(shard_number, _node_id)| {
105 0 : *shard_number == tenant_shard_id.shard_number
106 0 : });
107 :
108 0 : if let Some(shard_idx) = shard_idx {
109 0 : sharded.shards.remove(shard_idx);
110 0 : } else {
111 0 : tracing::warn!("Shard not found while handling detach")
112 : }
113 : }
114 : ComputeHookTenant::Unsharded(_) => {
115 0 : unreachable!("Detach of unsharded tenants is handled externally");
116 : }
117 : }
118 0 : }
119 :
120 : /// Set one shard's location. If stripe size or shard count have changed, Self is reset
121 : /// and drops existing content.
122 2 : fn update(
123 2 : &mut self,
124 2 : tenant_shard_id: TenantShardId,
125 2 : stripe_size: ShardStripeSize,
126 2 : node_id: NodeId,
127 2 : ) {
128 1 : match self {
129 1 : Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
130 0 : unsharded_tenant.node_id = node_id
131 : }
132 1 : Self::Sharded(sharded_tenant)
133 1 : if sharded_tenant.stripe_size == stripe_size
134 1 : && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
135 : {
136 1 : if let Some(existing) = sharded_tenant
137 1 : .shards
138 1 : .iter()
139 1 : .position(|s| s.0 == tenant_shard_id.shard_number)
140 0 : {
141 0 : sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
142 0 : } else {
143 1 : sharded_tenant
144 1 : .shards
145 1 : .push((tenant_shard_id.shard_number, node_id));
146 2 : sharded_tenant.shards.sort_by_key(|s| s.0)
147 : }
148 : }
149 1 : _ => {
150 1 : // Shard count changed: reset struct.
151 1 : *self = Self::new(tenant_shard_id, stripe_size, node_id);
152 1 : }
153 : }
154 2 : }
155 : }
156 :
157 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
158 : struct ComputeHookNotifyRequestShard {
159 : node_id: NodeId,
160 : shard_number: ShardNumber,
161 : }
162 :
163 : /// Request body that we send to the control plane to notify it of where a tenant is attached
164 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
165 : struct ComputeHookNotifyRequest {
166 : tenant_id: TenantId,
167 : stripe_size: Option<ShardStripeSize>,
168 : shards: Vec<ComputeHookNotifyRequestShard>,
169 : }
170 :
171 : /// Error type for attempts to call into the control plane compute notification hook
172 0 : #[derive(thiserror::Error, Debug)]
173 : pub(crate) enum NotifyError {
174 : // Request was not send successfully, e.g. transport error
175 : #[error("Sending request: {0}")]
176 : Request(#[from] reqwest::Error),
177 : // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
178 : #[error("Control plane tenant busy")]
179 : Busy,
180 : // Explicit 429 response asking us to retry less frequently
181 : #[error("Control plane overloaded")]
182 : SlowDown,
183 : // A 503 response indicates the control plane can't handle the request right now
184 : #[error("Control plane unavailable (status {0})")]
185 : Unavailable(StatusCode),
186 : // API returned unexpected non-success status. We will retry, but log a warning.
187 : #[error("Control plane returned unexpected status {0}")]
188 : Unexpected(StatusCode),
189 : // We shutdown while sending
190 : #[error("Shutting down")]
191 : ShuttingDown,
192 : // A response indicates we will never succeed, such as 400 or 404
193 : #[error("Non-retryable error {0}")]
194 : Fatal(StatusCode),
195 :
196 : #[error("neon_local error: {0}")]
197 : NeonLocal(anyhow::Error),
198 : }
199 :
200 : enum MaybeSendResult {
201 : // Please send this request while holding the lock, and if you succeed then write
202 : // the request into the lock.
203 : Transmit(
204 : (
205 : ComputeHookNotifyRequest,
206 : tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>,
207 : ),
208 : ),
209 : // Something requires sending, but you must wait for a current sender then call again
210 : AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>),
211 : // Nothing requires sending
212 : Noop,
213 : }
214 :
215 : impl ComputeHookTenant {
216 4 : fn maybe_send(
217 4 : &self,
218 4 : tenant_id: TenantId,
219 4 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>>,
220 4 : ) -> MaybeSendResult {
221 4 : let locked = match lock {
222 0 : Some(already_locked) => already_locked,
223 : None => {
224 : // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
225 4 : let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
226 0 : return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
227 : };
228 4 : locked
229 : }
230 : };
231 :
232 4 : let request = match self {
233 2 : Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
234 2 : tenant_id,
235 2 : shards: vec![ComputeHookNotifyRequestShard {
236 2 : shard_number: ShardNumber(0),
237 2 : node_id: unsharded_tenant.node_id,
238 2 : }],
239 2 : stripe_size: None,
240 2 : }),
241 2 : Self::Sharded(sharded_tenant)
242 2 : if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
243 1 : {
244 1 : Some(ComputeHookNotifyRequest {
245 1 : tenant_id,
246 1 : shards: sharded_tenant
247 1 : .shards
248 1 : .iter()
249 2 : .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
250 2 : shard_number: *shard_number,
251 2 : node_id: *node_id,
252 2 : })
253 1 : .collect(),
254 1 : stripe_size: Some(sharded_tenant.stripe_size),
255 1 : })
256 : }
257 1 : Self::Sharded(sharded_tenant) => {
258 1 : // Sharded tenant doesn't yet have information for all its shards
259 1 :
260 1 : tracing::info!(
261 0 : "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
262 0 : sharded_tenant.shards.len(),
263 0 : sharded_tenant.shard_count.count()
264 : );
265 1 : None
266 : }
267 : };
268 :
269 3 : match request {
270 : None => {
271 : // Not yet ready to emit a notification
272 1 : tracing::info!("Tenant isn't yet ready to emit a notification");
273 1 : MaybeSendResult::Noop
274 : }
275 1 : Some(request)
276 3 : if Some(&request) == locked.as_ref().map(|s| &s.request)
277 1 : && locked.as_ref().map(|s| s.applied).unwrap_or(false) =>
278 1 : {
279 1 : tracing::info!(
280 0 : "Skipping notification because remote state already matches ({:?})",
281 0 : &request
282 : );
283 : // No change from the last value successfully sent, and our state indicates that the last
284 : // value sent was fully applied on the control plane side.
285 1 : MaybeSendResult::Noop
286 : }
287 2 : Some(request) => {
288 2 : // Our request differs from the last one sent, or the last one sent was not fully applied on the compute side
289 2 : MaybeSendResult::Transmit((request, locked))
290 : }
291 : }
292 4 : }
293 : }
294 :
295 : /// The compute hook is a destination for notifications about changes to tenant:pageserver
296 : /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
297 : /// the compute connection string.
298 : pub(super) struct ComputeHook {
299 : config: Config,
300 : state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
301 : authorization_header: Option<String>,
302 :
303 : // Concurrency limiter, so that we do not overload the cloud control plane when updating
304 : // large numbers of tenants (e.g. when failing over after a node failure)
305 : api_concurrency: tokio::sync::Semaphore,
306 :
307 : // This lock is only used in testing enviroments, to serialize calls into neon_lock
308 : neon_local_lock: tokio::sync::Mutex<()>,
309 :
310 : // We share a client across all notifications to enable connection re-use etc when
311 : // sending large numbers of notifications
312 : client: reqwest::Client,
313 : }
314 :
315 : impl ComputeHook {
316 0 : pub(super) fn new(config: Config) -> Self {
317 0 : let authorization_header = config
318 0 : .control_plane_jwt_token
319 0 : .clone()
320 0 : .map(|jwt| format!("Bearer {}", jwt));
321 0 :
322 0 : let client = reqwest::ClientBuilder::new()
323 0 : .timeout(NOTIFY_REQUEST_TIMEOUT)
324 0 : .build()
325 0 : .expect("Failed to construct HTTP client");
326 0 :
327 0 : Self {
328 0 : state: Default::default(),
329 0 : config,
330 0 : authorization_header,
331 0 : neon_local_lock: Default::default(),
332 0 : api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
333 0 : client,
334 0 : }
335 0 : }
336 :
337 : /// For test environments: use neon_local's LocalEnv to update compute
338 0 : async fn do_notify_local(
339 0 : &self,
340 0 : reconfigure_request: &ComputeHookNotifyRequest,
341 0 : ) -> Result<(), NotifyError> {
342 : // neon_local updates are not safe to call concurrently, use a lock to serialize
343 : // all calls to this function
344 0 : let _locked = self.neon_local_lock.lock().await;
345 :
346 0 : let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
347 0 : tracing::warn!(
348 0 : "neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
349 : );
350 0 : return Ok(());
351 : };
352 0 : let env = match LocalEnv::load_config(repo_dir) {
353 0 : Ok(e) => e,
354 0 : Err(e) => {
355 0 : tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
356 0 : return Ok(());
357 : }
358 : };
359 0 : let cplane =
360 0 : ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
361 0 : let ComputeHookNotifyRequest {
362 0 : tenant_id,
363 0 : shards,
364 0 : stripe_size,
365 0 : } = reconfigure_request;
366 0 :
367 0 : let compute_pageservers = shards
368 0 : .iter()
369 0 : .map(|shard| {
370 0 : let ps_conf = env
371 0 : .get_pageserver_conf(shard.node_id)
372 0 : .expect("Unknown pageserver");
373 0 : let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
374 0 : .expect("Unable to parse listen_pg_addr");
375 0 : (pg_host, pg_port.unwrap_or(5432))
376 0 : })
377 0 : .collect::<Vec<_>>();
378 :
379 0 : for (endpoint_name, endpoint) in &cplane.endpoints {
380 0 : if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
381 0 : tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
382 0 : endpoint
383 0 : .reconfigure(compute_pageservers.clone(), *stripe_size, None)
384 0 : .await
385 0 : .map_err(NotifyError::NeonLocal)?;
386 0 : }
387 : }
388 :
389 0 : Ok(())
390 0 : }
391 :
392 0 : async fn do_notify_iteration(
393 0 : &self,
394 0 : url: &String,
395 0 : reconfigure_request: &ComputeHookNotifyRequest,
396 0 : cancel: &CancellationToken,
397 0 : ) -> Result<(), NotifyError> {
398 0 : let req = self.client.request(reqwest::Method::PUT, url);
399 0 : let req = if let Some(value) = &self.authorization_header {
400 0 : req.header(reqwest::header::AUTHORIZATION, value)
401 : } else {
402 0 : req
403 : };
404 :
405 0 : tracing::info!(
406 0 : "Sending notify request to {} ({:?})",
407 : url,
408 : reconfigure_request
409 : );
410 0 : let send_result = req.json(&reconfigure_request).send().await;
411 0 : let response = match send_result {
412 0 : Ok(r) => r,
413 0 : Err(e) => return Err(e.into()),
414 : };
415 :
416 : // Treat all 2xx responses as success
417 0 : if response.status() >= reqwest::StatusCode::OK
418 0 : && response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
419 : {
420 0 : if response.status() != reqwest::StatusCode::OK {
421 : // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
422 : // log a warning.
423 0 : tracing::warn!(
424 0 : "Unexpected 2xx response code {} from control plane",
425 0 : response.status()
426 : );
427 0 : }
428 :
429 0 : return Ok(());
430 0 : }
431 0 :
432 0 : // Error response codes
433 0 : match response.status() {
434 : reqwest::StatusCode::TOO_MANY_REQUESTS => {
435 : // TODO: 429 handling should be global: set some state visible to other requests
436 : // so that they will delay before starting, rather than all notifications trying
437 : // once before backing off.
438 0 : tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
439 0 : .await
440 0 : .ok();
441 0 : Err(NotifyError::SlowDown)
442 : }
443 : reqwest::StatusCode::LOCKED => {
444 : // We consider this fatal, because it's possible that the operation blocking the control one is
445 : // also the one that is waiting for this reconcile. We should let the reconciler calling
446 : // this hook fail, to give control plane a chance to un-lock.
447 0 : tracing::info!("Control plane reports tenant is locked, dropping out of notify");
448 0 : Err(NotifyError::Busy)
449 : }
450 : reqwest::StatusCode::SERVICE_UNAVAILABLE => {
451 0 : Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
452 : }
453 : reqwest::StatusCode::GATEWAY_TIMEOUT => {
454 0 : Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
455 : }
456 : reqwest::StatusCode::BAD_GATEWAY => {
457 0 : Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
458 : }
459 :
460 0 : reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
461 0 : reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
462 0 : reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
463 0 : status => Err(NotifyError::Unexpected(
464 0 : hyper::StatusCode::from_u16(status.as_u16())
465 0 : .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
466 0 : )),
467 : }
468 0 : }
469 :
470 0 : async fn do_notify(
471 0 : &self,
472 0 : url: &String,
473 0 : reconfigure_request: &ComputeHookNotifyRequest,
474 0 : cancel: &CancellationToken,
475 0 : ) -> Result<(), NotifyError> {
476 : // We hold these semaphore units across all retries, rather than only across each
477 : // HTTP request: this is to preserve fairness and avoid a situation where a retry might
478 : // time out waiting for a semaphore.
479 0 : let _units = self
480 0 : .api_concurrency
481 0 : .acquire()
482 0 : .await
483 : // Interpret closed semaphore as shutdown
484 0 : .map_err(|_| NotifyError::ShuttingDown)?;
485 :
486 0 : backoff::retry(
487 0 : || self.do_notify_iteration(url, reconfigure_request, cancel),
488 0 : |e| {
489 0 : matches!(
490 0 : e,
491 : NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
492 : )
493 0 : },
494 0 : 3,
495 0 : 10,
496 0 : "Send compute notification",
497 0 : cancel,
498 0 : )
499 0 : .await
500 0 : .ok_or_else(|| NotifyError::ShuttingDown)
501 0 : .and_then(|x| x)
502 0 : }
503 :
504 : /// Synchronous phase: update the per-tenant state for the next intended notification
505 0 : fn notify_prepare(
506 0 : &self,
507 0 : tenant_shard_id: TenantShardId,
508 0 : node_id: NodeId,
509 0 : stripe_size: ShardStripeSize,
510 0 : ) -> MaybeSendResult {
511 0 : let mut state_locked = self.state.lock().unwrap();
512 :
513 : use std::collections::hash_map::Entry;
514 0 : let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
515 0 : Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
516 0 : tenant_shard_id,
517 0 : stripe_size,
518 0 : node_id,
519 0 : )),
520 0 : Entry::Occupied(e) => {
521 0 : let tenant = e.into_mut();
522 0 : tenant.update(tenant_shard_id, stripe_size, node_id);
523 0 : tenant
524 : }
525 : };
526 0 : tenant.maybe_send(tenant_shard_id.tenant_id, None)
527 0 : }
528 :
529 0 : async fn notify_execute(
530 0 : &self,
531 0 : maybe_send_result: MaybeSendResult,
532 0 : tenant_shard_id: TenantShardId,
533 0 : cancel: &CancellationToken,
534 0 : ) -> Result<(), NotifyError> {
535 : // Process result: we may get an update to send, or we may have to wait for a lock
536 : // before trying again.
537 0 : let (request, mut send_lock_guard) = match maybe_send_result {
538 : MaybeSendResult::Noop => {
539 0 : return Ok(());
540 : }
541 0 : MaybeSendResult::AwaitLock(send_lock) => {
542 0 : let send_locked = tokio::select! {
543 0 : guard = send_lock.lock_owned() => {guard},
544 0 : _ = cancel.cancelled() => {
545 0 : return Err(NotifyError::ShuttingDown)
546 : }
547 : };
548 :
549 : // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
550 : // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
551 : // try_lock.
552 0 : let state_locked = self.state.lock().unwrap();
553 0 : let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
554 0 : return Ok(());
555 : };
556 0 : match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
557 : MaybeSendResult::AwaitLock(_) => {
558 0 : unreachable!("We supplied lock guard")
559 : }
560 : MaybeSendResult::Noop => {
561 0 : return Ok(());
562 : }
563 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
564 : }
565 : }
566 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
567 : };
568 :
569 0 : let result = if let Some(notify_url) = &self.config.compute_hook_url {
570 0 : self.do_notify(notify_url, &request, cancel).await
571 : } else {
572 0 : self.do_notify_local(&request).await.map_err(|e| {
573 0 : // This path is for testing only, so munge the error into our prod-style error type.
574 0 : tracing::error!("neon_local notification hook failed: {e}");
575 0 : NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
576 0 : })
577 : };
578 :
579 0 : match result {
580 0 : Ok(_) => {
581 0 : // Before dropping the send lock, stash the request we just sent so that
582 0 : // subsequent callers can avoid redundantly re-sending the same thing.
583 0 : *send_lock_guard = Some(ComputeRemoteState {
584 0 : request,
585 0 : applied: true,
586 0 : });
587 0 : }
588 0 : Err(NotifyError::Busy) => {
589 0 : // Busy result means that the server responded and has stored the new configuration,
590 0 : // but was not able to fully apply it to the compute
591 0 : *send_lock_guard = Some(ComputeRemoteState {
592 0 : request,
593 0 : applied: false,
594 0 : });
595 0 : }
596 0 : Err(_) => {
597 0 : // General error case: we can no longer know the remote state, so clear it. This will result in
598 0 : // the logic in maybe_send recognizing that we should call the hook again.
599 0 : *send_lock_guard = None;
600 0 : }
601 : }
602 0 : result
603 0 : }
604 :
605 : /// Infallible synchronous fire-and-forget version of notify(), that sends its results to
606 : /// a channel. Something should consume the channel and arrange to try notifying again
607 : /// if something failed.
608 0 : pub(super) fn notify_background(
609 0 : self: &Arc<Self>,
610 0 : notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
611 0 : result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
612 0 : cancel: &CancellationToken,
613 0 : ) {
614 0 : let mut maybe_sends = Vec::new();
615 0 : for (tenant_shard_id, node_id, stripe_size) in notifications {
616 0 : let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
617 0 : maybe_sends.push((tenant_shard_id, maybe_send_result))
618 : }
619 :
620 0 : let this = self.clone();
621 0 : let cancel = cancel.clone();
622 0 :
623 0 : tokio::task::spawn(async move {
624 0 : // Construct an async stream of futures to invoke the compute notify function: we do this
625 0 : // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. The
626 0 : // ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
627 0 : // would mostly just be waiting on that semaphore.
628 0 : let mut stream = futures::stream::iter(maybe_sends)
629 0 : .map(|(tenant_shard_id, maybe_send_result)| {
630 0 : let this = this.clone();
631 0 : let cancel = cancel.clone();
632 :
633 0 : async move {
634 0 : this
635 0 : .notify_execute(maybe_send_result, tenant_shard_id, &cancel)
636 0 : .await.map_err(|e| (tenant_shard_id, e))
637 0 : }.instrument(info_span!(
638 0 : "notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
639 : ))
640 0 : })
641 0 : .buffered(API_CONCURRENCY);
642 :
643 : loop {
644 0 : tokio::select! {
645 0 : next = stream.next() => {
646 0 : match next {
647 0 : Some(r) => {
648 0 : result_tx.send(r).await.ok();
649 : },
650 : None => {
651 0 : tracing::info!("Finished sending background compute notifications");
652 0 : break;
653 : }
654 : }
655 : },
656 0 : _ = cancel.cancelled() => {
657 0 : tracing::info!("Shutdown while running background compute notifications");
658 0 : break;
659 : }
660 : };
661 : }
662 0 : });
663 0 : }
664 :
665 : /// Call this to notify the compute (postgres) tier of new pageservers to use
666 : /// for a tenant. notify() is called by each shard individually, and this function
667 : /// will decide whether an update to the tenant is sent. An update is sent on the
668 : /// condition that:
669 : /// - We know a pageserver for every shard.
670 : /// - All the shards have the same shard_count (i.e. we are not mid-split)
671 : ///
672 : /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
673 : /// that is cancelled.
674 : ///
675 : /// This function is fallible, including in the case that the control plane is transiently
676 : /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
677 : /// periods, but we don't retry forever. The **caller** is responsible for handling failures and
678 : /// ensuring that they eventually call again to ensure that the compute is eventually notified of
679 : /// the proper pageserver nodes for a tenant.
680 0 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
681 : pub(super) async fn notify(
682 : &self,
683 : tenant_shard_id: TenantShardId,
684 : node_id: NodeId,
685 : stripe_size: ShardStripeSize,
686 : cancel: &CancellationToken,
687 : ) -> Result<(), NotifyError> {
688 : let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
689 : self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
690 : .await
691 : }
692 :
693 : /// Reflect a detach for a particular shard in the compute hook state.
694 : ///
695 : /// The goal is to avoid sending compute notifications with stale information (i.e.
696 : /// including detach pageservers).
697 0 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
698 : pub(super) fn handle_detach(
699 : &self,
700 : tenant_shard_id: TenantShardId,
701 : stripe_size: ShardStripeSize,
702 : ) {
703 : use std::collections::hash_map::Entry;
704 :
705 : let mut state_locked = self.state.lock().unwrap();
706 : match state_locked.entry(tenant_shard_id.tenant_id) {
707 : Entry::Vacant(_) => {
708 : tracing::warn!("Compute hook tenant not found for detach");
709 : }
710 : Entry::Occupied(mut e) => {
711 : let sharded = e.get().is_sharded();
712 : if !sharded {
713 : e.remove();
714 : } else {
715 : e.get_mut().remove_shard(tenant_shard_id, stripe_size);
716 : }
717 :
718 : tracing::debug!("Compute hook handled shard detach");
719 : }
720 : }
721 : }
722 : }
723 :
724 : #[cfg(test)]
725 : pub(crate) mod tests {
726 : use pageserver_api::shard::{ShardCount, ShardNumber};
727 : use utils::id::TenantId;
728 :
729 : use super::*;
730 :
731 : #[test]
732 1 : fn tenant_updates() -> anyhow::Result<()> {
733 1 : let tenant_id = TenantId::generate();
734 1 : let mut tenant_state = ComputeHookTenant::new(
735 1 : TenantShardId {
736 1 : tenant_id,
737 1 : shard_count: ShardCount::new(0),
738 1 : shard_number: ShardNumber(0),
739 1 : },
740 1 : ShardStripeSize(12345),
741 1 : NodeId(1),
742 1 : );
743 1 :
744 1 : // An unsharded tenant is always ready to emit a notification, but won't
745 1 : // send the same one twice
746 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
747 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
748 0 : anyhow::bail!("Wrong send result");
749 : };
750 1 : assert_eq!(request.shards.len(), 1);
751 1 : assert!(request.stripe_size.is_none());
752 :
753 : // Simulate successful send
754 1 : *guard = Some(ComputeRemoteState {
755 1 : request,
756 1 : applied: true,
757 1 : });
758 1 : drop(guard);
759 1 :
760 1 : // Try asking again: this should be a no-op
761 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
762 1 : assert!(matches!(send_result, MaybeSendResult::Noop));
763 :
764 : // Writing the first shard of a multi-sharded situation (i.e. in a split)
765 : // resets the tenant state and puts it in an non-notifying state (need to
766 : // see all shards)
767 1 : tenant_state.update(
768 1 : TenantShardId {
769 1 : tenant_id,
770 1 : shard_count: ShardCount::new(2),
771 1 : shard_number: ShardNumber(1),
772 1 : },
773 1 : ShardStripeSize(32768),
774 1 : NodeId(1),
775 1 : );
776 1 : assert!(matches!(
777 1 : tenant_state.maybe_send(tenant_id, None),
778 : MaybeSendResult::Noop
779 : ));
780 :
781 : // Writing the second shard makes it ready to notify
782 1 : tenant_state.update(
783 1 : TenantShardId {
784 1 : tenant_id,
785 1 : shard_count: ShardCount::new(2),
786 1 : shard_number: ShardNumber(0),
787 1 : },
788 1 : ShardStripeSize(32768),
789 1 : NodeId(1),
790 1 : );
791 1 :
792 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
793 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
794 0 : anyhow::bail!("Wrong send result");
795 : };
796 1 : assert_eq!(request.shards.len(), 2);
797 1 : assert_eq!(request.stripe_size, Some(ShardStripeSize(32768)));
798 :
799 : // Simulate successful send
800 1 : *guard = Some(ComputeRemoteState {
801 1 : request,
802 1 : applied: true,
803 1 : });
804 1 : drop(guard);
805 1 :
806 1 : Ok(())
807 1 : }
808 : }
|