Line data Source code
1 : use std::error::Error as _;
2 : use std::sync::Arc;
3 : use std::{collections::HashMap, time::Duration};
4 :
5 : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
6 : use control_plane::local_env::LocalEnv;
7 : use futures::StreamExt;
8 : use hyper::StatusCode;
9 : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
10 : use postgres_connection::parse_host_port;
11 : use serde::{Deserialize, Serialize};
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{info_span, Instrument};
14 : use utils::{
15 : backoff::{self},
16 : id::{NodeId, TenantId},
17 : };
18 :
19 : use crate::service::Config;
20 :
21 : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
22 :
23 : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
24 :
25 : pub(crate) const API_CONCURRENCY: usize = 32;
26 :
27 : struct UnshardedComputeHookTenant {
28 : // Which node is this tenant attached to
29 : node_id: NodeId,
30 :
31 : // Must hold this lock to send a notification.
32 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
33 : }
34 : struct ShardedComputeHookTenant {
35 : stripe_size: ShardStripeSize,
36 : shard_count: ShardCount,
37 : shards: Vec<(ShardNumber, NodeId)>,
38 :
39 : // Must hold this lock to send a notification. The contents represent
40 : // the last successfully sent notification, and are used to coalesce multiple
41 : // updates by only sending when there is a chance since our last successful send.
42 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>,
43 : }
44 :
45 : /// Represents our knowledge of the compute's state: we can update this when we get a
46 : /// response from a notify API call, which tells us what has been applied.
47 : ///
48 : /// Should be wrapped in an Option<>, as we cannot always know the remote state.
49 : #[derive(PartialEq, Eq, Debug)]
50 : struct ComputeRemoteState {
51 : // The request body which was acked by the compute
52 : request: ComputeHookNotifyRequest,
53 :
54 : // Whether the cplane indicated that the state was applied to running computes, or just
55 : // persisted. In the Neon control plane, this is the difference between a 423 response (meaning
56 : // persisted but not applied), and a 2xx response (both persisted and applied)
57 : applied: bool,
58 : }
59 :
60 : enum ComputeHookTenant {
61 : Unsharded(UnshardedComputeHookTenant),
62 : Sharded(ShardedComputeHookTenant),
63 : }
64 :
65 : impl ComputeHookTenant {
66 : /// Construct with at least one shard's information
67 2 : fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
68 2 : if tenant_shard_id.shard_count.count() > 1 {
69 1 : Self::Sharded(ShardedComputeHookTenant {
70 1 : shards: vec![(tenant_shard_id.shard_number, node_id)],
71 1 : stripe_size,
72 1 : shard_count: tenant_shard_id.shard_count,
73 1 : send_lock: Arc::default(),
74 1 : })
75 : } else {
76 1 : Self::Unsharded(UnshardedComputeHookTenant {
77 1 : node_id,
78 1 : send_lock: Arc::default(),
79 1 : })
80 : }
81 2 : }
82 :
83 4 : fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>> {
84 4 : match self {
85 2 : Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
86 2 : Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
87 : }
88 4 : }
89 :
90 0 : fn is_sharded(&self) -> bool {
91 0 : matches!(self, ComputeHookTenant::Sharded(_))
92 0 : }
93 :
94 : /// Clear compute hook state for the specified shard.
95 : /// Only valid for [`ComputeHookTenant::Sharded`] instances.
96 0 : fn remove_shard(&mut self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize) {
97 0 : match self {
98 0 : ComputeHookTenant::Sharded(sharded) => {
99 0 : if sharded.stripe_size != stripe_size
100 0 : || sharded.shard_count != tenant_shard_id.shard_count
101 : {
102 0 : tracing::warn!("Shard split detected while handling detach")
103 0 : }
104 :
105 0 : let shard_idx = sharded.shards.iter().position(|(shard_number, _node_id)| {
106 0 : *shard_number == tenant_shard_id.shard_number
107 0 : });
108 :
109 0 : if let Some(shard_idx) = shard_idx {
110 0 : sharded.shards.remove(shard_idx);
111 0 : } else {
112 0 : tracing::warn!("Shard not found while handling detach")
113 : }
114 : }
115 : ComputeHookTenant::Unsharded(_) => {
116 0 : unreachable!("Detach of unsharded tenants is handled externally");
117 : }
118 : }
119 0 : }
120 :
121 : /// Set one shard's location. If stripe size or shard count have changed, Self is reset
122 : /// and drops existing content.
123 2 : fn update(
124 2 : &mut self,
125 2 : tenant_shard_id: TenantShardId,
126 2 : stripe_size: ShardStripeSize,
127 2 : node_id: NodeId,
128 2 : ) {
129 1 : match self {
130 1 : Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
131 0 : unsharded_tenant.node_id = node_id
132 : }
133 1 : Self::Sharded(sharded_tenant)
134 1 : if sharded_tenant.stripe_size == stripe_size
135 1 : && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
136 : {
137 1 : if let Some(existing) = sharded_tenant
138 1 : .shards
139 1 : .iter()
140 1 : .position(|s| s.0 == tenant_shard_id.shard_number)
141 0 : {
142 0 : sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
143 0 : } else {
144 1 : sharded_tenant
145 1 : .shards
146 1 : .push((tenant_shard_id.shard_number, node_id));
147 2 : sharded_tenant.shards.sort_by_key(|s| s.0)
148 : }
149 : }
150 1 : _ => {
151 1 : // Shard count changed: reset struct.
152 1 : *self = Self::new(tenant_shard_id, stripe_size, node_id);
153 1 : }
154 : }
155 2 : }
156 : }
157 :
158 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
159 : struct ComputeHookNotifyRequestShard {
160 : node_id: NodeId,
161 : shard_number: ShardNumber,
162 : }
163 :
164 : /// Request body that we send to the control plane to notify it of where a tenant is attached
165 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
166 : struct ComputeHookNotifyRequest {
167 : tenant_id: TenantId,
168 : stripe_size: Option<ShardStripeSize>,
169 : shards: Vec<ComputeHookNotifyRequestShard>,
170 : }
171 :
172 : /// Error type for attempts to call into the control plane compute notification hook
173 : #[derive(thiserror::Error, Debug)]
174 : pub(crate) enum NotifyError {
175 : // Request was not send successfully, e.g. transport error
176 0 : #[error("Sending request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
177 : Request(#[from] reqwest::Error),
178 : // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
179 : #[error("Control plane tenant busy")]
180 : Busy,
181 : // Explicit 429 response asking us to retry less frequently
182 : #[error("Control plane overloaded")]
183 : SlowDown,
184 : // A 503 response indicates the control plane can't handle the request right now
185 : #[error("Control plane unavailable (status {0})")]
186 : Unavailable(StatusCode),
187 : // API returned unexpected non-success status. We will retry, but log a warning.
188 : #[error("Control plane returned unexpected status {0}")]
189 : Unexpected(StatusCode),
190 : // We shutdown while sending
191 : #[error("Shutting down")]
192 : ShuttingDown,
193 : // A response indicates we will never succeed, such as 400 or 404
194 : #[error("Non-retryable error {0}")]
195 : Fatal(StatusCode),
196 :
197 : #[error("neon_local error: {0}")]
198 : NeonLocal(anyhow::Error),
199 : }
200 :
201 : enum MaybeSendResult {
202 : // Please send this request while holding the lock, and if you succeed then write
203 : // the request into the lock.
204 : Transmit(
205 : (
206 : ComputeHookNotifyRequest,
207 : tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>,
208 : ),
209 : ),
210 : // Something requires sending, but you must wait for a current sender then call again
211 : AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeRemoteState>>>),
212 : // Nothing requires sending
213 : Noop,
214 : }
215 :
216 : impl ComputeHookTenant {
217 4 : fn maybe_send(
218 4 : &self,
219 4 : tenant_id: TenantId,
220 4 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState>>>,
221 4 : ) -> MaybeSendResult {
222 4 : let locked = match lock {
223 0 : Some(already_locked) => already_locked,
224 : None => {
225 : // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
226 4 : let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
227 0 : return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
228 : };
229 4 : locked
230 : }
231 : };
232 :
233 4 : let request = match self {
234 2 : Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
235 2 : tenant_id,
236 2 : shards: vec![ComputeHookNotifyRequestShard {
237 2 : shard_number: ShardNumber(0),
238 2 : node_id: unsharded_tenant.node_id,
239 2 : }],
240 2 : stripe_size: None,
241 2 : }),
242 2 : Self::Sharded(sharded_tenant)
243 2 : if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
244 1 : {
245 1 : Some(ComputeHookNotifyRequest {
246 1 : tenant_id,
247 1 : shards: sharded_tenant
248 1 : .shards
249 1 : .iter()
250 2 : .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
251 2 : shard_number: *shard_number,
252 2 : node_id: *node_id,
253 2 : })
254 1 : .collect(),
255 1 : stripe_size: Some(sharded_tenant.stripe_size),
256 1 : })
257 : }
258 1 : Self::Sharded(sharded_tenant) => {
259 1 : // Sharded tenant doesn't yet have information for all its shards
260 1 :
261 1 : tracing::info!(
262 0 : "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
263 0 : sharded_tenant.shards.len(),
264 0 : sharded_tenant.shard_count.count()
265 : );
266 1 : None
267 : }
268 : };
269 :
270 3 : match request {
271 : None => {
272 : // Not yet ready to emit a notification
273 1 : tracing::info!("Tenant isn't yet ready to emit a notification");
274 1 : MaybeSendResult::Noop
275 : }
276 1 : Some(request)
277 3 : if Some(&request) == locked.as_ref().map(|s| &s.request)
278 1 : && locked.as_ref().map(|s| s.applied).unwrap_or(false) =>
279 1 : {
280 1 : tracing::info!(
281 0 : "Skipping notification because remote state already matches ({:?})",
282 0 : &request
283 : );
284 : // No change from the last value successfully sent, and our state indicates that the last
285 : // value sent was fully applied on the control plane side.
286 1 : MaybeSendResult::Noop
287 : }
288 2 : Some(request) => {
289 2 : // Our request differs from the last one sent, or the last one sent was not fully applied on the compute side
290 2 : MaybeSendResult::Transmit((request, locked))
291 : }
292 : }
293 4 : }
294 : }
295 :
296 : /// The compute hook is a destination for notifications about changes to tenant:pageserver
297 : /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
298 : /// the compute connection string.
299 : pub(super) struct ComputeHook {
300 : config: Config,
301 : state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
302 : authorization_header: Option<String>,
303 :
304 : // Concurrency limiter, so that we do not overload the cloud control plane when updating
305 : // large numbers of tenants (e.g. when failing over after a node failure)
306 : api_concurrency: tokio::sync::Semaphore,
307 :
308 : // This lock is only used in testing enviroments, to serialize calls into neon_lock
309 : neon_local_lock: tokio::sync::Mutex<()>,
310 :
311 : // We share a client across all notifications to enable connection re-use etc when
312 : // sending large numbers of notifications
313 : client: reqwest::Client,
314 : }
315 :
316 : impl ComputeHook {
317 0 : pub(super) fn new(config: Config) -> Self {
318 0 : let authorization_header = config
319 0 : .control_plane_jwt_token
320 0 : .clone()
321 0 : .map(|jwt| format!("Bearer {}", jwt));
322 0 :
323 0 : let client = reqwest::ClientBuilder::new()
324 0 : .timeout(NOTIFY_REQUEST_TIMEOUT)
325 0 : .build()
326 0 : .expect("Failed to construct HTTP client");
327 0 :
328 0 : Self {
329 0 : state: Default::default(),
330 0 : config,
331 0 : authorization_header,
332 0 : neon_local_lock: Default::default(),
333 0 : api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
334 0 : client,
335 0 : }
336 0 : }
337 :
338 : /// For test environments: use neon_local's LocalEnv to update compute
339 0 : async fn do_notify_local(
340 0 : &self,
341 0 : reconfigure_request: &ComputeHookNotifyRequest,
342 0 : ) -> Result<(), NotifyError> {
343 : // neon_local updates are not safe to call concurrently, use a lock to serialize
344 : // all calls to this function
345 0 : let _locked = self.neon_local_lock.lock().await;
346 :
347 0 : let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
348 0 : tracing::warn!(
349 0 : "neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
350 : );
351 0 : return Ok(());
352 : };
353 0 : let env = match LocalEnv::load_config(repo_dir) {
354 0 : Ok(e) => e,
355 0 : Err(e) => {
356 0 : tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
357 0 : return Ok(());
358 : }
359 : };
360 0 : let cplane =
361 0 : ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
362 0 : let ComputeHookNotifyRequest {
363 0 : tenant_id,
364 0 : shards,
365 0 : stripe_size,
366 0 : } = reconfigure_request;
367 0 :
368 0 : let compute_pageservers = shards
369 0 : .iter()
370 0 : .map(|shard| {
371 0 : let ps_conf = env
372 0 : .get_pageserver_conf(shard.node_id)
373 0 : .expect("Unknown pageserver");
374 0 : let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
375 0 : .expect("Unable to parse listen_pg_addr");
376 0 : (pg_host, pg_port.unwrap_or(5432))
377 0 : })
378 0 : .collect::<Vec<_>>();
379 :
380 0 : for (endpoint_name, endpoint) in &cplane.endpoints {
381 0 : if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
382 0 : tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
383 0 : endpoint
384 0 : .reconfigure(compute_pageservers.clone(), *stripe_size, None)
385 0 : .await
386 0 : .map_err(NotifyError::NeonLocal)?;
387 0 : }
388 : }
389 :
390 0 : Ok(())
391 0 : }
392 :
393 0 : async fn do_notify_iteration(
394 0 : &self,
395 0 : url: &String,
396 0 : reconfigure_request: &ComputeHookNotifyRequest,
397 0 : cancel: &CancellationToken,
398 0 : ) -> Result<(), NotifyError> {
399 0 : let req = self.client.request(reqwest::Method::PUT, url);
400 0 : let req = if let Some(value) = &self.authorization_header {
401 0 : req.header(reqwest::header::AUTHORIZATION, value)
402 : } else {
403 0 : req
404 : };
405 :
406 0 : tracing::info!(
407 0 : "Sending notify request to {} ({:?})",
408 : url,
409 : reconfigure_request
410 : );
411 0 : let send_result = req.json(&reconfigure_request).send().await;
412 0 : let response = match send_result {
413 0 : Ok(r) => r,
414 0 : Err(e) => return Err(e.into()),
415 : };
416 :
417 : // Treat all 2xx responses as success
418 0 : if response.status() >= reqwest::StatusCode::OK
419 0 : && response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
420 : {
421 0 : if response.status() != reqwest::StatusCode::OK {
422 : // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
423 : // log a warning.
424 0 : tracing::warn!(
425 0 : "Unexpected 2xx response code {} from control plane",
426 0 : response.status()
427 : );
428 0 : }
429 :
430 0 : return Ok(());
431 0 : }
432 0 :
433 0 : // Error response codes
434 0 : match response.status() {
435 : reqwest::StatusCode::TOO_MANY_REQUESTS => {
436 : // TODO: 429 handling should be global: set some state visible to other requests
437 : // so that they will delay before starting, rather than all notifications trying
438 : // once before backing off.
439 0 : tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
440 0 : .await
441 0 : .ok();
442 0 : Err(NotifyError::SlowDown)
443 : }
444 : reqwest::StatusCode::LOCKED => {
445 : // We consider this fatal, because it's possible that the operation blocking the control one is
446 : // also the one that is waiting for this reconcile. We should let the reconciler calling
447 : // this hook fail, to give control plane a chance to un-lock.
448 0 : tracing::info!("Control plane reports tenant is locked, dropping out of notify");
449 0 : Err(NotifyError::Busy)
450 : }
451 : reqwest::StatusCode::SERVICE_UNAVAILABLE => {
452 0 : Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
453 : }
454 : reqwest::StatusCode::GATEWAY_TIMEOUT => {
455 0 : Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
456 : }
457 : reqwest::StatusCode::BAD_GATEWAY => {
458 0 : Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
459 : }
460 :
461 0 : reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
462 0 : reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
463 0 : reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
464 0 : status => Err(NotifyError::Unexpected(
465 0 : hyper::StatusCode::from_u16(status.as_u16())
466 0 : .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
467 0 : )),
468 : }
469 0 : }
470 :
471 0 : async fn do_notify(
472 0 : &self,
473 0 : url: &String,
474 0 : reconfigure_request: &ComputeHookNotifyRequest,
475 0 : cancel: &CancellationToken,
476 0 : ) -> Result<(), NotifyError> {
477 : // We hold these semaphore units across all retries, rather than only across each
478 : // HTTP request: this is to preserve fairness and avoid a situation where a retry might
479 : // time out waiting for a semaphore.
480 0 : let _units = self
481 0 : .api_concurrency
482 0 : .acquire()
483 0 : .await
484 : // Interpret closed semaphore as shutdown
485 0 : .map_err(|_| NotifyError::ShuttingDown)?;
486 :
487 0 : backoff::retry(
488 0 : || self.do_notify_iteration(url, reconfigure_request, cancel),
489 0 : |e| {
490 0 : matches!(
491 0 : e,
492 : NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
493 : )
494 0 : },
495 0 : 3,
496 0 : 10,
497 0 : "Send compute notification",
498 0 : cancel,
499 0 : )
500 0 : .await
501 0 : .ok_or_else(|| NotifyError::ShuttingDown)
502 0 : .and_then(|x| x)
503 0 : }
504 :
505 : /// Synchronous phase: update the per-tenant state for the next intended notification
506 0 : fn notify_prepare(
507 0 : &self,
508 0 : tenant_shard_id: TenantShardId,
509 0 : node_id: NodeId,
510 0 : stripe_size: ShardStripeSize,
511 0 : ) -> MaybeSendResult {
512 0 : let mut state_locked = self.state.lock().unwrap();
513 :
514 : use std::collections::hash_map::Entry;
515 0 : let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
516 0 : Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
517 0 : tenant_shard_id,
518 0 : stripe_size,
519 0 : node_id,
520 0 : )),
521 0 : Entry::Occupied(e) => {
522 0 : let tenant = e.into_mut();
523 0 : tenant.update(tenant_shard_id, stripe_size, node_id);
524 0 : tenant
525 : }
526 : };
527 0 : tenant.maybe_send(tenant_shard_id.tenant_id, None)
528 0 : }
529 :
530 0 : async fn notify_execute(
531 0 : &self,
532 0 : maybe_send_result: MaybeSendResult,
533 0 : tenant_shard_id: TenantShardId,
534 0 : cancel: &CancellationToken,
535 0 : ) -> Result<(), NotifyError> {
536 : // Process result: we may get an update to send, or we may have to wait for a lock
537 : // before trying again.
538 0 : let (request, mut send_lock_guard) = match maybe_send_result {
539 : MaybeSendResult::Noop => {
540 0 : return Ok(());
541 : }
542 0 : MaybeSendResult::AwaitLock(send_lock) => {
543 0 : let send_locked = tokio::select! {
544 0 : guard = send_lock.lock_owned() => {guard},
545 0 : _ = cancel.cancelled() => {
546 0 : return Err(NotifyError::ShuttingDown)
547 : }
548 : };
549 :
550 : // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
551 : // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
552 : // try_lock.
553 0 : let state_locked = self.state.lock().unwrap();
554 0 : let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
555 0 : return Ok(());
556 : };
557 0 : match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
558 : MaybeSendResult::AwaitLock(_) => {
559 0 : unreachable!("We supplied lock guard")
560 : }
561 : MaybeSendResult::Noop => {
562 0 : return Ok(());
563 : }
564 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
565 : }
566 : }
567 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
568 : };
569 :
570 0 : let result = if let Some(notify_url) = &self.config.compute_hook_url {
571 0 : self.do_notify(notify_url, &request, cancel).await
572 : } else {
573 0 : self.do_notify_local(&request).await.map_err(|e| {
574 0 : // This path is for testing only, so munge the error into our prod-style error type.
575 0 : tracing::error!("neon_local notification hook failed: {e}");
576 0 : NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
577 0 : })
578 : };
579 :
580 0 : match result {
581 0 : Ok(_) => {
582 0 : // Before dropping the send lock, stash the request we just sent so that
583 0 : // subsequent callers can avoid redundantly re-sending the same thing.
584 0 : *send_lock_guard = Some(ComputeRemoteState {
585 0 : request,
586 0 : applied: true,
587 0 : });
588 0 : }
589 0 : Err(NotifyError::Busy) => {
590 0 : // Busy result means that the server responded and has stored the new configuration,
591 0 : // but was not able to fully apply it to the compute
592 0 : *send_lock_guard = Some(ComputeRemoteState {
593 0 : request,
594 0 : applied: false,
595 0 : });
596 0 : }
597 0 : Err(_) => {
598 0 : // General error case: we can no longer know the remote state, so clear it. This will result in
599 0 : // the logic in maybe_send recognizing that we should call the hook again.
600 0 : *send_lock_guard = None;
601 0 : }
602 : }
603 0 : result
604 0 : }
605 :
606 : /// Infallible synchronous fire-and-forget version of notify(), that sends its results to
607 : /// a channel. Something should consume the channel and arrange to try notifying again
608 : /// if something failed.
609 0 : pub(super) fn notify_background(
610 0 : self: &Arc<Self>,
611 0 : notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
612 0 : result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
613 0 : cancel: &CancellationToken,
614 0 : ) {
615 0 : let mut maybe_sends = Vec::new();
616 0 : for (tenant_shard_id, node_id, stripe_size) in notifications {
617 0 : let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
618 0 : maybe_sends.push((tenant_shard_id, maybe_send_result))
619 : }
620 :
621 0 : let this = self.clone();
622 0 : let cancel = cancel.clone();
623 0 :
624 0 : tokio::task::spawn(async move {
625 0 : // Construct an async stream of futures to invoke the compute notify function: we do this
626 0 : // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. The
627 0 : // ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
628 0 : // would mostly just be waiting on that semaphore.
629 0 : let mut stream = futures::stream::iter(maybe_sends)
630 0 : .map(|(tenant_shard_id, maybe_send_result)| {
631 0 : let this = this.clone();
632 0 : let cancel = cancel.clone();
633 :
634 0 : async move {
635 0 : this
636 0 : .notify_execute(maybe_send_result, tenant_shard_id, &cancel)
637 0 : .await.map_err(|e| (tenant_shard_id, e))
638 0 : }.instrument(info_span!(
639 0 : "notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
640 : ))
641 0 : })
642 0 : .buffered(API_CONCURRENCY);
643 :
644 : loop {
645 0 : tokio::select! {
646 0 : next = stream.next() => {
647 0 : match next {
648 0 : Some(r) => {
649 0 : result_tx.send(r).await.ok();
650 : },
651 : None => {
652 0 : tracing::info!("Finished sending background compute notifications");
653 0 : break;
654 : }
655 : }
656 : },
657 0 : _ = cancel.cancelled() => {
658 0 : tracing::info!("Shutdown while running background compute notifications");
659 0 : break;
660 : }
661 : };
662 : }
663 0 : });
664 0 : }
665 :
666 : /// Call this to notify the compute (postgres) tier of new pageservers to use
667 : /// for a tenant. notify() is called by each shard individually, and this function
668 : /// will decide whether an update to the tenant is sent. An update is sent on the
669 : /// condition that:
670 : /// - We know a pageserver for every shard.
671 : /// - All the shards have the same shard_count (i.e. we are not mid-split)
672 : ///
673 : /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
674 : /// that is cancelled.
675 : ///
676 : /// This function is fallible, including in the case that the control plane is transiently
677 : /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
678 : /// periods, but we don't retry forever. The **caller** is responsible for handling failures and
679 : /// ensuring that they eventually call again to ensure that the compute is eventually notified of
680 : /// the proper pageserver nodes for a tenant.
681 0 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
682 : pub(super) async fn notify(
683 : &self,
684 : tenant_shard_id: TenantShardId,
685 : node_id: NodeId,
686 : stripe_size: ShardStripeSize,
687 : cancel: &CancellationToken,
688 : ) -> Result<(), NotifyError> {
689 : let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
690 : self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
691 : .await
692 : }
693 :
694 : /// Reflect a detach for a particular shard in the compute hook state.
695 : ///
696 : /// The goal is to avoid sending compute notifications with stale information (i.e.
697 : /// including detach pageservers).
698 0 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
699 : pub(super) fn handle_detach(
700 : &self,
701 : tenant_shard_id: TenantShardId,
702 : stripe_size: ShardStripeSize,
703 : ) {
704 : use std::collections::hash_map::Entry;
705 :
706 : let mut state_locked = self.state.lock().unwrap();
707 : match state_locked.entry(tenant_shard_id.tenant_id) {
708 : Entry::Vacant(_) => {
709 : tracing::warn!("Compute hook tenant not found for detach");
710 : }
711 : Entry::Occupied(mut e) => {
712 : let sharded = e.get().is_sharded();
713 : if !sharded {
714 : e.remove();
715 : } else {
716 : e.get_mut().remove_shard(tenant_shard_id, stripe_size);
717 : }
718 :
719 : tracing::debug!("Compute hook handled shard detach");
720 : }
721 : }
722 : }
723 : }
724 :
725 : #[cfg(test)]
726 : pub(crate) mod tests {
727 : use pageserver_api::shard::{ShardCount, ShardNumber};
728 : use utils::id::TenantId;
729 :
730 : use super::*;
731 :
732 : #[test]
733 1 : fn tenant_updates() -> anyhow::Result<()> {
734 1 : let tenant_id = TenantId::generate();
735 1 : let mut tenant_state = ComputeHookTenant::new(
736 1 : TenantShardId {
737 1 : tenant_id,
738 1 : shard_count: ShardCount::new(0),
739 1 : shard_number: ShardNumber(0),
740 1 : },
741 1 : ShardStripeSize(12345),
742 1 : NodeId(1),
743 1 : );
744 1 :
745 1 : // An unsharded tenant is always ready to emit a notification, but won't
746 1 : // send the same one twice
747 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
748 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
749 0 : anyhow::bail!("Wrong send result");
750 : };
751 1 : assert_eq!(request.shards.len(), 1);
752 1 : assert!(request.stripe_size.is_none());
753 :
754 : // Simulate successful send
755 1 : *guard = Some(ComputeRemoteState {
756 1 : request,
757 1 : applied: true,
758 1 : });
759 1 : drop(guard);
760 1 :
761 1 : // Try asking again: this should be a no-op
762 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
763 1 : assert!(matches!(send_result, MaybeSendResult::Noop));
764 :
765 : // Writing the first shard of a multi-sharded situation (i.e. in a split)
766 : // resets the tenant state and puts it in an non-notifying state (need to
767 : // see all shards)
768 1 : tenant_state.update(
769 1 : TenantShardId {
770 1 : tenant_id,
771 1 : shard_count: ShardCount::new(2),
772 1 : shard_number: ShardNumber(1),
773 1 : },
774 1 : ShardStripeSize(32768),
775 1 : NodeId(1),
776 1 : );
777 1 : assert!(matches!(
778 1 : tenant_state.maybe_send(tenant_id, None),
779 : MaybeSendResult::Noop
780 : ));
781 :
782 : // Writing the second shard makes it ready to notify
783 1 : tenant_state.update(
784 1 : TenantShardId {
785 1 : tenant_id,
786 1 : shard_count: ShardCount::new(2),
787 1 : shard_number: ShardNumber(0),
788 1 : },
789 1 : ShardStripeSize(32768),
790 1 : NodeId(1),
791 1 : );
792 1 :
793 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
794 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
795 0 : anyhow::bail!("Wrong send result");
796 : };
797 1 : assert_eq!(request.shards.len(), 2);
798 1 : assert_eq!(request.stripe_size, Some(ShardStripeSize(32768)));
799 :
800 : // Simulate successful send
801 1 : *guard = Some(ComputeRemoteState {
802 1 : request,
803 1 : applied: true,
804 1 : });
805 1 : drop(guard);
806 1 :
807 1 : Ok(())
808 1 : }
809 : }
|