Line data Source code
1 : use std::sync::Arc;
2 : use std::{collections::HashMap, time::Duration};
3 :
4 : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
5 : use control_plane::local_env::LocalEnv;
6 : use futures::StreamExt;
7 : use hyper::StatusCode;
8 : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
9 : use postgres_connection::parse_host_port;
10 : use serde::{Deserialize, Serialize};
11 : use tokio_util::sync::CancellationToken;
12 : use tracing::{info_span, Instrument};
13 : use utils::{
14 : backoff::{self},
15 : id::{NodeId, TenantId},
16 : };
17 :
18 : use crate::service::Config;
19 :
20 : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
21 :
22 : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
23 :
24 : pub(crate) const API_CONCURRENCY: usize = 32;
25 :
26 : struct UnshardedComputeHookTenant {
27 : // Which node is this tenant attached to
28 : node_id: NodeId,
29 :
30 : // Must hold this lock to send a notification.
31 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
32 : }
33 : struct ShardedComputeHookTenant {
34 : stripe_size: ShardStripeSize,
35 : shard_count: ShardCount,
36 : shards: Vec<(ShardNumber, NodeId)>,
37 :
38 : // Must hold this lock to send a notification. The contents represent
39 : // the last successfully sent notification, and are used to coalesce multiple
40 : // updates by only sending when there is a chance since our last successful send.
41 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
42 : }
43 :
44 : enum ComputeHookTenant {
45 : Unsharded(UnshardedComputeHookTenant),
46 : Sharded(ShardedComputeHookTenant),
47 : }
48 :
49 : impl ComputeHookTenant {
50 : /// Construct with at least one shard's information
51 4 : fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
52 4 : if tenant_shard_id.shard_count.count() > 1 {
53 2 : Self::Sharded(ShardedComputeHookTenant {
54 2 : shards: vec![(tenant_shard_id.shard_number, node_id)],
55 2 : stripe_size,
56 2 : shard_count: tenant_shard_id.shard_count,
57 2 : send_lock: Arc::default(),
58 2 : })
59 : } else {
60 2 : Self::Unsharded(UnshardedComputeHookTenant {
61 2 : node_id,
62 2 : send_lock: Arc::default(),
63 2 : })
64 : }
65 4 : }
66 :
67 8 : fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>> {
68 8 : match self {
69 4 : Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
70 4 : Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
71 : }
72 8 : }
73 :
74 : /// Set one shard's location. If stripe size or shard count have changed, Self is reset
75 : /// and drops existing content.
76 4 : fn update(
77 4 : &mut self,
78 4 : tenant_shard_id: TenantShardId,
79 4 : stripe_size: ShardStripeSize,
80 4 : node_id: NodeId,
81 4 : ) {
82 2 : match self {
83 2 : Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
84 0 : unsharded_tenant.node_id = node_id
85 : }
86 2 : Self::Sharded(sharded_tenant)
87 2 : if sharded_tenant.stripe_size == stripe_size
88 2 : && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
89 : {
90 2 : if let Some(existing) = sharded_tenant
91 2 : .shards
92 2 : .iter()
93 2 : .position(|s| s.0 == tenant_shard_id.shard_number)
94 0 : {
95 0 : sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
96 0 : } else {
97 2 : sharded_tenant
98 2 : .shards
99 2 : .push((tenant_shard_id.shard_number, node_id));
100 4 : sharded_tenant.shards.sort_by_key(|s| s.0)
101 : }
102 : }
103 2 : _ => {
104 2 : // Shard count changed: reset struct.
105 2 : *self = Self::new(tenant_shard_id, stripe_size, node_id);
106 2 : }
107 : }
108 4 : }
109 : }
110 :
111 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
112 : struct ComputeHookNotifyRequestShard {
113 : node_id: NodeId,
114 : shard_number: ShardNumber,
115 : }
116 :
117 : /// Request body that we send to the control plane to notify it of where a tenant is attached
118 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
119 : struct ComputeHookNotifyRequest {
120 : tenant_id: TenantId,
121 : stripe_size: Option<ShardStripeSize>,
122 : shards: Vec<ComputeHookNotifyRequestShard>,
123 : }
124 :
125 : /// Error type for attempts to call into the control plane compute notification hook
126 0 : #[derive(thiserror::Error, Debug)]
127 : pub(crate) enum NotifyError {
128 : // Request was not send successfully, e.g. transport error
129 : #[error("Sending request: {0}")]
130 : Request(#[from] reqwest::Error),
131 : // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
132 : #[error("Control plane tenant busy")]
133 : Busy,
134 : // Explicit 429 response asking us to retry less frequently
135 : #[error("Control plane overloaded")]
136 : SlowDown,
137 : // A 503 response indicates the control plane can't handle the request right now
138 : #[error("Control plane unavailable (status {0})")]
139 : Unavailable(StatusCode),
140 : // API returned unexpected non-success status. We will retry, but log a warning.
141 : #[error("Control plane returned unexpected status {0}")]
142 : Unexpected(StatusCode),
143 : // We shutdown while sending
144 : #[error("Shutting down")]
145 : ShuttingDown,
146 : // A response indicates we will never succeed, such as 400 or 404
147 : #[error("Non-retryable error {0}")]
148 : Fatal(StatusCode),
149 : }
150 :
151 : enum MaybeSendResult {
152 : // Please send this request while holding the lock, and if you succeed then write
153 : // the request into the lock.
154 : Transmit(
155 : (
156 : ComputeHookNotifyRequest,
157 : tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>,
158 : ),
159 : ),
160 : // Something requires sending, but you must wait for a current sender then call again
161 : AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>),
162 : // Nothing requires sending
163 : Noop,
164 : }
165 :
166 : impl ComputeHookTenant {
167 8 : fn maybe_send(
168 8 : &self,
169 8 : tenant_id: TenantId,
170 8 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>>,
171 8 : ) -> MaybeSendResult {
172 8 : let locked = match lock {
173 0 : Some(already_locked) => already_locked,
174 : None => {
175 : // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
176 8 : let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
177 0 : return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
178 : };
179 8 : locked
180 : }
181 : };
182 :
183 8 : let request = match self {
184 4 : Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
185 4 : tenant_id,
186 4 : shards: vec![ComputeHookNotifyRequestShard {
187 4 : shard_number: ShardNumber(0),
188 4 : node_id: unsharded_tenant.node_id,
189 4 : }],
190 4 : stripe_size: None,
191 4 : }),
192 4 : Self::Sharded(sharded_tenant)
193 4 : if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
194 2 : {
195 2 : Some(ComputeHookNotifyRequest {
196 2 : tenant_id,
197 2 : shards: sharded_tenant
198 2 : .shards
199 2 : .iter()
200 4 : .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
201 4 : shard_number: *shard_number,
202 4 : node_id: *node_id,
203 4 : })
204 2 : .collect(),
205 2 : stripe_size: Some(sharded_tenant.stripe_size),
206 2 : })
207 : }
208 2 : Self::Sharded(sharded_tenant) => {
209 2 : // Sharded tenant doesn't yet have information for all its shards
210 2 :
211 2 : tracing::info!(
212 0 : "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
213 0 : sharded_tenant.shards.len(),
214 0 : sharded_tenant.shard_count.count()
215 : );
216 2 : None
217 : }
218 : };
219 :
220 6 : match request {
221 : None => {
222 : // Not yet ready to emit a notification
223 2 : tracing::info!("Tenant isn't yet ready to emit a notification");
224 2 : MaybeSendResult::Noop
225 : }
226 6 : Some(request) if Some(&request) == locked.as_ref() => {
227 2 : // No change from the last value successfully sent
228 2 : MaybeSendResult::Noop
229 : }
230 4 : Some(request) => MaybeSendResult::Transmit((request, locked)),
231 : }
232 8 : }
233 : }
234 :
235 : /// The compute hook is a destination for notifications about changes to tenant:pageserver
236 : /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
237 : /// the compute connection string.
238 : pub(super) struct ComputeHook {
239 : config: Config,
240 : state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
241 : authorization_header: Option<String>,
242 :
243 : // Concurrency limiter, so that we do not overload the cloud control plane when updating
244 : // large numbers of tenants (e.g. when failing over after a node failure)
245 : api_concurrency: tokio::sync::Semaphore,
246 :
247 : // This lock is only used in testing enviroments, to serialize calls into neon_lock
248 : neon_local_lock: tokio::sync::Mutex<()>,
249 :
250 : // We share a client across all notifications to enable connection re-use etc when
251 : // sending large numbers of notifications
252 : client: reqwest::Client,
253 : }
254 :
255 : impl ComputeHook {
256 0 : pub(super) fn new(config: Config) -> Self {
257 0 : let authorization_header = config
258 0 : .control_plane_jwt_token
259 0 : .clone()
260 0 : .map(|jwt| format!("Bearer {}", jwt));
261 0 :
262 0 : let client = reqwest::ClientBuilder::new()
263 0 : .timeout(NOTIFY_REQUEST_TIMEOUT)
264 0 : .build()
265 0 : .expect("Failed to construct HTTP client");
266 0 :
267 0 : Self {
268 0 : state: Default::default(),
269 0 : config,
270 0 : authorization_header,
271 0 : neon_local_lock: Default::default(),
272 0 : api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
273 0 : client,
274 0 : }
275 0 : }
276 :
277 : /// For test environments: use neon_local's LocalEnv to update compute
278 0 : async fn do_notify_local(
279 0 : &self,
280 0 : reconfigure_request: &ComputeHookNotifyRequest,
281 0 : ) -> anyhow::Result<()> {
282 : // neon_local updates are not safe to call concurrently, use a lock to serialize
283 : // all calls to this function
284 0 : let _locked = self.neon_local_lock.lock().await;
285 :
286 0 : let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
287 0 : tracing::warn!(
288 0 : "neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
289 : );
290 0 : return Ok(());
291 : };
292 0 : let env = match LocalEnv::load_config(repo_dir) {
293 0 : Ok(e) => e,
294 0 : Err(e) => {
295 0 : tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
296 0 : return Ok(());
297 : }
298 : };
299 0 : let cplane =
300 0 : ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
301 0 : let ComputeHookNotifyRequest {
302 0 : tenant_id,
303 0 : shards,
304 0 : stripe_size,
305 0 : } = reconfigure_request;
306 0 :
307 0 : let compute_pageservers = shards
308 0 : .iter()
309 0 : .map(|shard| {
310 0 : let ps_conf = env
311 0 : .get_pageserver_conf(shard.node_id)
312 0 : .expect("Unknown pageserver");
313 0 : let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
314 0 : .expect("Unable to parse listen_pg_addr");
315 0 : (pg_host, pg_port.unwrap_or(5432))
316 0 : })
317 0 : .collect::<Vec<_>>();
318 :
319 0 : for (endpoint_name, endpoint) in &cplane.endpoints {
320 0 : if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
321 0 : tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
322 0 : endpoint
323 0 : .reconfigure(compute_pageservers.clone(), *stripe_size)
324 0 : .await?;
325 0 : }
326 : }
327 :
328 0 : Ok(())
329 0 : }
330 :
331 0 : async fn do_notify_iteration(
332 0 : &self,
333 0 : url: &String,
334 0 : reconfigure_request: &ComputeHookNotifyRequest,
335 0 : cancel: &CancellationToken,
336 0 : ) -> Result<(), NotifyError> {
337 0 : let req = self.client.request(reqwest::Method::PUT, url);
338 0 : let req = if let Some(value) = &self.authorization_header {
339 0 : req.header(reqwest::header::AUTHORIZATION, value)
340 : } else {
341 0 : req
342 : };
343 :
344 0 : tracing::info!(
345 0 : "Sending notify request to {} ({:?})",
346 : url,
347 : reconfigure_request
348 : );
349 0 : let send_result = req.json(&reconfigure_request).send().await;
350 0 : let response = match send_result {
351 0 : Ok(r) => r,
352 0 : Err(e) => return Err(e.into()),
353 : };
354 :
355 : // Treat all 2xx responses as success
356 0 : if response.status() >= reqwest::StatusCode::OK
357 0 : && response.status() < reqwest::StatusCode::MULTIPLE_CHOICES
358 : {
359 0 : if response.status() != reqwest::StatusCode::OK {
360 : // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
361 : // log a warning.
362 0 : tracing::warn!(
363 0 : "Unexpected 2xx response code {} from control plane",
364 0 : response.status()
365 : );
366 0 : }
367 :
368 0 : return Ok(());
369 0 : }
370 0 :
371 0 : // Error response codes
372 0 : match response.status() {
373 : reqwest::StatusCode::TOO_MANY_REQUESTS => {
374 : // TODO: 429 handling should be global: set some state visible to other requests
375 : // so that they will delay before starting, rather than all notifications trying
376 : // once before backing off.
377 0 : tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
378 0 : .await
379 0 : .ok();
380 0 : Err(NotifyError::SlowDown)
381 : }
382 : reqwest::StatusCode::LOCKED => {
383 : // We consider this fatal, because it's possible that the operation blocking the control one is
384 : // also the one that is waiting for this reconcile. We should let the reconciler calling
385 : // this hook fail, to give control plane a chance to un-lock.
386 0 : tracing::info!("Control plane reports tenant is locked, dropping out of notify");
387 0 : Err(NotifyError::Busy)
388 : }
389 : reqwest::StatusCode::SERVICE_UNAVAILABLE => {
390 0 : Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
391 : }
392 : reqwest::StatusCode::GATEWAY_TIMEOUT => {
393 0 : Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
394 : }
395 : reqwest::StatusCode::BAD_GATEWAY => {
396 0 : Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
397 : }
398 :
399 0 : reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
400 0 : reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
401 0 : reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
402 0 : status => Err(NotifyError::Unexpected(
403 0 : hyper::StatusCode::from_u16(status.as_u16())
404 0 : .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
405 0 : )),
406 : }
407 0 : }
408 :
409 0 : async fn do_notify(
410 0 : &self,
411 0 : url: &String,
412 0 : reconfigure_request: &ComputeHookNotifyRequest,
413 0 : cancel: &CancellationToken,
414 0 : ) -> Result<(), NotifyError> {
415 : // We hold these semaphore units across all retries, rather than only across each
416 : // HTTP request: this is to preserve fairness and avoid a situation where a retry might
417 : // time out waiting for a semaphore.
418 0 : let _units = self
419 0 : .api_concurrency
420 0 : .acquire()
421 0 : .await
422 : // Interpret closed semaphore as shutdown
423 0 : .map_err(|_| NotifyError::ShuttingDown)?;
424 :
425 0 : backoff::retry(
426 0 : || self.do_notify_iteration(url, reconfigure_request, cancel),
427 0 : |e| {
428 0 : matches!(
429 0 : e,
430 : NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
431 : )
432 0 : },
433 0 : 3,
434 0 : 10,
435 0 : "Send compute notification",
436 0 : cancel,
437 0 : )
438 0 : .await
439 0 : .ok_or_else(|| NotifyError::ShuttingDown)
440 0 : .and_then(|x| x)
441 0 : }
442 :
443 : /// Synchronous phase: update the per-tenant state for the next intended notification
444 0 : fn notify_prepare(
445 0 : &self,
446 0 : tenant_shard_id: TenantShardId,
447 0 : node_id: NodeId,
448 0 : stripe_size: ShardStripeSize,
449 0 : ) -> MaybeSendResult {
450 0 : let mut state_locked = self.state.lock().unwrap();
451 :
452 : use std::collections::hash_map::Entry;
453 0 : let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
454 0 : Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
455 0 : tenant_shard_id,
456 0 : stripe_size,
457 0 : node_id,
458 0 : )),
459 0 : Entry::Occupied(e) => {
460 0 : let tenant = e.into_mut();
461 0 : tenant.update(tenant_shard_id, stripe_size, node_id);
462 0 : tenant
463 : }
464 : };
465 0 : tenant.maybe_send(tenant_shard_id.tenant_id, None)
466 0 : }
467 :
468 0 : async fn notify_execute(
469 0 : &self,
470 0 : maybe_send_result: MaybeSendResult,
471 0 : tenant_shard_id: TenantShardId,
472 0 : cancel: &CancellationToken,
473 0 : ) -> Result<(), NotifyError> {
474 : // Process result: we may get an update to send, or we may have to wait for a lock
475 : // before trying again.
476 0 : let (request, mut send_lock_guard) = match maybe_send_result {
477 : MaybeSendResult::Noop => {
478 0 : return Ok(());
479 : }
480 0 : MaybeSendResult::AwaitLock(send_lock) => {
481 0 : let send_locked = tokio::select! {
482 : guard = send_lock.lock_owned() => {guard},
483 : _ = cancel.cancelled() => {
484 : return Err(NotifyError::ShuttingDown)
485 : }
486 : };
487 :
488 : // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
489 : // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
490 : // try_lock.
491 0 : let state_locked = self.state.lock().unwrap();
492 0 : let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
493 0 : return Ok(());
494 : };
495 0 : match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
496 : MaybeSendResult::AwaitLock(_) => {
497 0 : unreachable!("We supplied lock guard")
498 : }
499 : MaybeSendResult::Noop => {
500 0 : return Ok(());
501 : }
502 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
503 : }
504 : }
505 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
506 : };
507 :
508 0 : let result = if let Some(notify_url) = &self.config.compute_hook_url {
509 0 : self.do_notify(notify_url, &request, cancel).await
510 : } else {
511 0 : self.do_notify_local(&request).await.map_err(|e| {
512 0 : // This path is for testing only, so munge the error into our prod-style error type.
513 0 : tracing::error!("Local notification hook failed: {e}");
514 0 : NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
515 0 : })
516 : };
517 :
518 0 : if result.is_ok() {
519 0 : // Before dropping the send lock, stash the request we just sent so that
520 0 : // subsequent callers can avoid redundantly re-sending the same thing.
521 0 : *send_lock_guard = Some(request);
522 0 : }
523 0 : result
524 0 : }
525 :
526 : /// Infallible synchronous fire-and-forget version of notify(), that sends its results to
527 : /// a channel. Something should consume the channel and arrange to try notifying again
528 : /// if something failed.
529 0 : pub(super) fn notify_background(
530 0 : self: &Arc<Self>,
531 0 : notifications: Vec<(TenantShardId, NodeId, ShardStripeSize)>,
532 0 : result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
533 0 : cancel: &CancellationToken,
534 0 : ) {
535 0 : let mut maybe_sends = Vec::new();
536 0 : for (tenant_shard_id, node_id, stripe_size) in notifications {
537 0 : let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
538 0 : maybe_sends.push((tenant_shard_id, maybe_send_result))
539 : }
540 :
541 0 : let this = self.clone();
542 0 : let cancel = cancel.clone();
543 0 :
544 0 : tokio::task::spawn(async move {
545 0 : // Construct an async stream of futures to invoke the compute notify function: we do this
546 0 : // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. The
547 0 : // ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
548 0 : // would mostly just be waiting on that semaphore.
549 0 : let mut stream = futures::stream::iter(maybe_sends)
550 0 : .map(|(tenant_shard_id, maybe_send_result)| {
551 0 : let this = this.clone();
552 0 : let cancel = cancel.clone();
553 :
554 0 : async move {
555 0 : this
556 0 : .notify_execute(maybe_send_result, tenant_shard_id, &cancel)
557 0 : .await.map_err(|e| (tenant_shard_id, e))
558 0 : }.instrument(info_span!(
559 0 : "notify_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
560 : ))
561 0 : })
562 0 : .buffered(API_CONCURRENCY);
563 :
564 0 : loop {
565 0 : tokio::select! {
566 : next = stream.next() => {
567 : match next {
568 : Some(r) => {
569 : result_tx.send(r).await.ok();
570 : },
571 : None => {
572 : tracing::info!("Finished sending background compute notifications");
573 : break;
574 : }
575 : }
576 : },
577 : _ = cancel.cancelled() => {
578 : tracing::info!("Shutdown while running background compute notifications");
579 : break;
580 : }
581 0 : };
582 0 : }
583 0 : });
584 0 : }
585 :
586 : /// Call this to notify the compute (postgres) tier of new pageservers to use
587 : /// for a tenant. notify() is called by each shard individually, and this function
588 : /// will decide whether an update to the tenant is sent. An update is sent on the
589 : /// condition that:
590 : /// - We know a pageserver for every shard.
591 : /// - All the shards have the same shard_count (i.e. we are not mid-split)
592 : ///
593 : /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
594 : /// that is cancelled.
595 : ///
596 : /// This function is fallible, including in the case that the control plane is transiently
597 : /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
598 : /// periods, but we don't retry forever. The **caller** is responsible for handling failures and
599 : /// ensuring that they eventually call again to ensure that the compute is eventually notified of
600 : /// the proper pageserver nodes for a tenant.
601 0 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
602 : pub(super) async fn notify(
603 : &self,
604 : tenant_shard_id: TenantShardId,
605 : node_id: NodeId,
606 : stripe_size: ShardStripeSize,
607 : cancel: &CancellationToken,
608 : ) -> Result<(), NotifyError> {
609 : let maybe_send_result = self.notify_prepare(tenant_shard_id, node_id, stripe_size);
610 : self.notify_execute(maybe_send_result, tenant_shard_id, cancel)
611 : .await
612 : }
613 : }
614 :
615 : #[cfg(test)]
616 : pub(crate) mod tests {
617 : use pageserver_api::shard::{ShardCount, ShardNumber};
618 : use utils::id::TenantId;
619 :
620 : use super::*;
621 :
622 : #[test]
623 2 : fn tenant_updates() -> anyhow::Result<()> {
624 2 : let tenant_id = TenantId::generate();
625 2 : let mut tenant_state = ComputeHookTenant::new(
626 2 : TenantShardId {
627 2 : tenant_id,
628 2 : shard_count: ShardCount::new(0),
629 2 : shard_number: ShardNumber(0),
630 2 : },
631 2 : ShardStripeSize(12345),
632 2 : NodeId(1),
633 2 : );
634 2 :
635 2 : // An unsharded tenant is always ready to emit a notification, but won't
636 2 : // send the same one twice
637 2 : let send_result = tenant_state.maybe_send(tenant_id, None);
638 2 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
639 0 : anyhow::bail!("Wrong send result");
640 : };
641 2 : assert_eq!(request.shards.len(), 1);
642 2 : assert!(request.stripe_size.is_none());
643 :
644 : // Simulate successful send
645 2 : *guard = Some(request);
646 2 : drop(guard);
647 2 :
648 2 : // Try asking again: this should be a no-op
649 2 : let send_result = tenant_state.maybe_send(tenant_id, None);
650 2 : assert!(matches!(send_result, MaybeSendResult::Noop));
651 :
652 : // Writing the first shard of a multi-sharded situation (i.e. in a split)
653 : // resets the tenant state and puts it in an non-notifying state (need to
654 : // see all shards)
655 2 : tenant_state.update(
656 2 : TenantShardId {
657 2 : tenant_id,
658 2 : shard_count: ShardCount::new(2),
659 2 : shard_number: ShardNumber(1),
660 2 : },
661 2 : ShardStripeSize(32768),
662 2 : NodeId(1),
663 2 : );
664 2 : assert!(matches!(
665 2 : tenant_state.maybe_send(tenant_id, None),
666 : MaybeSendResult::Noop
667 : ));
668 :
669 : // Writing the second shard makes it ready to notify
670 2 : tenant_state.update(
671 2 : TenantShardId {
672 2 : tenant_id,
673 2 : shard_count: ShardCount::new(2),
674 2 : shard_number: ShardNumber(0),
675 2 : },
676 2 : ShardStripeSize(32768),
677 2 : NodeId(1),
678 2 : );
679 2 :
680 2 : let send_result = tenant_state.maybe_send(tenant_id, None);
681 2 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
682 0 : anyhow::bail!("Wrong send result");
683 : };
684 2 : assert_eq!(request.shards.len(), 2);
685 2 : assert_eq!(request.stripe_size, Some(ShardStripeSize(32768)));
686 :
687 : // Simulate successful send
688 2 : *guard = Some(request);
689 2 : drop(guard);
690 2 :
691 2 : Ok(())
692 2 : }
693 : }
|