Line data Source code
1 : use std::sync::Arc;
2 : use std::{collections::HashMap, time::Duration};
3 :
4 : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
5 : use control_plane::local_env::LocalEnv;
6 : use hyper::{Method, StatusCode};
7 : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
8 : use postgres_connection::parse_host_port;
9 : use serde::{Deserialize, Serialize};
10 : use tokio_util::sync::CancellationToken;
11 : use utils::{
12 : backoff::{self},
13 : id::{NodeId, TenantId},
14 : };
15 :
16 : use crate::service::Config;
17 :
18 : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
19 :
20 : pub(crate) const API_CONCURRENCY: usize = 32;
21 :
22 : struct UnshardedComputeHookTenant {
23 : // Which node is this tenant attached to
24 : node_id: NodeId,
25 :
26 : // Must hold this lock to send a notification.
27 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
28 : }
29 : struct ShardedComputeHookTenant {
30 : stripe_size: ShardStripeSize,
31 : shard_count: ShardCount,
32 : shards: Vec<(ShardNumber, NodeId)>,
33 :
34 : // Must hold this lock to send a notification. The contents represent
35 : // the last successfully sent notification, and are used to coalesce multiple
36 : // updates by only sending when there is a chance since our last successful send.
37 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
38 : }
39 :
40 : enum ComputeHookTenant {
41 : Unsharded(UnshardedComputeHookTenant),
42 : Sharded(ShardedComputeHookTenant),
43 : }
44 :
45 : impl ComputeHookTenant {
46 : /// Construct with at least one shard's information
47 4 : fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
48 4 : if tenant_shard_id.shard_count.count() > 1 {
49 2 : Self::Sharded(ShardedComputeHookTenant {
50 2 : shards: vec![(tenant_shard_id.shard_number, node_id)],
51 2 : stripe_size,
52 2 : shard_count: tenant_shard_id.shard_count,
53 2 : send_lock: Arc::default(),
54 2 : })
55 : } else {
56 2 : Self::Unsharded(UnshardedComputeHookTenant {
57 2 : node_id,
58 2 : send_lock: Arc::default(),
59 2 : })
60 : }
61 4 : }
62 :
63 8 : fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>> {
64 8 : match self {
65 4 : Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
66 4 : Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
67 : }
68 8 : }
69 :
70 : /// Set one shard's location. If stripe size or shard count have changed, Self is reset
71 : /// and drops existing content.
72 4 : fn update(
73 4 : &mut self,
74 4 : tenant_shard_id: TenantShardId,
75 4 : stripe_size: ShardStripeSize,
76 4 : node_id: NodeId,
77 4 : ) {
78 2 : match self {
79 2 : Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
80 0 : unsharded_tenant.node_id = node_id
81 : }
82 2 : Self::Sharded(sharded_tenant)
83 2 : if sharded_tenant.stripe_size == stripe_size
84 2 : && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
85 : {
86 2 : if let Some(existing) = sharded_tenant
87 2 : .shards
88 2 : .iter()
89 2 : .position(|s| s.0 == tenant_shard_id.shard_number)
90 0 : {
91 0 : sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
92 0 : } else {
93 2 : sharded_tenant
94 2 : .shards
95 2 : .push((tenant_shard_id.shard_number, node_id));
96 4 : sharded_tenant.shards.sort_by_key(|s| s.0)
97 : }
98 : }
99 2 : _ => {
100 2 : // Shard count changed: reset struct.
101 2 : *self = Self::new(tenant_shard_id, stripe_size, node_id);
102 2 : }
103 : }
104 4 : }
105 : }
106 :
107 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
108 : struct ComputeHookNotifyRequestShard {
109 : node_id: NodeId,
110 : shard_number: ShardNumber,
111 : }
112 :
113 : /// Request body that we send to the control plane to notify it of where a tenant is attached
114 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
115 : struct ComputeHookNotifyRequest {
116 : tenant_id: TenantId,
117 : stripe_size: Option<ShardStripeSize>,
118 : shards: Vec<ComputeHookNotifyRequestShard>,
119 : }
120 :
121 : /// Error type for attempts to call into the control plane compute notification hook
122 0 : #[derive(thiserror::Error, Debug)]
123 : pub(crate) enum NotifyError {
124 : // Request was not send successfully, e.g. transport error
125 : #[error("Sending request: {0}")]
126 : Request(#[from] reqwest::Error),
127 : // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
128 : #[error("Control plane tenant busy")]
129 : Busy,
130 : // Explicit 429 response asking us to retry less frequently
131 : #[error("Control plane overloaded")]
132 : SlowDown,
133 : // A 503 response indicates the control plane can't handle the request right now
134 : #[error("Control plane unavailable (status {0})")]
135 : Unavailable(StatusCode),
136 : // API returned unexpected non-success status. We will retry, but log a warning.
137 : #[error("Control plane returned unexpected status {0}")]
138 : Unexpected(StatusCode),
139 : // We shutdown while sending
140 : #[error("Shutting down")]
141 : ShuttingDown,
142 : // A response indicates we will never succeed, such as 400 or 404
143 : #[error("Non-retryable error {0}")]
144 : Fatal(StatusCode),
145 : }
146 :
147 : enum MaybeSendResult {
148 : // Please send this request while holding the lock, and if you succeed then write
149 : // the request into the lock.
150 : Transmit(
151 : (
152 : ComputeHookNotifyRequest,
153 : tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>,
154 : ),
155 : ),
156 : // Something requires sending, but you must wait for a current sender then call again
157 : AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>),
158 : // Nothing requires sending
159 : Noop,
160 : }
161 :
162 : impl ComputeHookTenant {
163 8 : fn maybe_send(
164 8 : &self,
165 8 : tenant_id: TenantId,
166 8 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>>,
167 8 : ) -> MaybeSendResult {
168 8 : let locked = match lock {
169 0 : Some(already_locked) => already_locked,
170 : None => {
171 : // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
172 8 : let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
173 0 : return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
174 : };
175 8 : locked
176 : }
177 : };
178 :
179 8 : let request = match self {
180 4 : Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
181 4 : tenant_id,
182 4 : shards: vec![ComputeHookNotifyRequestShard {
183 4 : shard_number: ShardNumber(0),
184 4 : node_id: unsharded_tenant.node_id,
185 4 : }],
186 4 : stripe_size: None,
187 4 : }),
188 4 : Self::Sharded(sharded_tenant)
189 4 : if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
190 2 : {
191 2 : Some(ComputeHookNotifyRequest {
192 2 : tenant_id,
193 2 : shards: sharded_tenant
194 2 : .shards
195 2 : .iter()
196 4 : .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
197 4 : shard_number: *shard_number,
198 4 : node_id: *node_id,
199 4 : })
200 2 : .collect(),
201 2 : stripe_size: Some(sharded_tenant.stripe_size),
202 2 : })
203 : }
204 2 : Self::Sharded(sharded_tenant) => {
205 2 : // Sharded tenant doesn't yet have information for all its shards
206 2 :
207 2 : tracing::info!(
208 0 : "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
209 0 : sharded_tenant.shards.len(),
210 0 : sharded_tenant.shard_count.count()
211 0 : );
212 2 : None
213 : }
214 : };
215 :
216 6 : match request {
217 : None => {
218 : // Not yet ready to emit a notification
219 2 : tracing::info!("Tenant isn't yet ready to emit a notification");
220 2 : MaybeSendResult::Noop
221 : }
222 6 : Some(request) if Some(&request) == locked.as_ref() => {
223 2 : // No change from the last value successfully sent
224 2 : MaybeSendResult::Noop
225 : }
226 4 : Some(request) => MaybeSendResult::Transmit((request, locked)),
227 : }
228 8 : }
229 : }
230 :
231 : /// The compute hook is a destination for notifications about changes to tenant:pageserver
232 : /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
233 : /// the compute connection string.
234 : pub(super) struct ComputeHook {
235 : config: Config,
236 : state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
237 : authorization_header: Option<String>,
238 :
239 : // Concurrency limiter, so that we do not overload the cloud control plane when updating
240 : // large numbers of tenants (e.g. when failing over after a node failure)
241 : api_concurrency: tokio::sync::Semaphore,
242 :
243 : // This lock is only used in testing enviroments, to serialize calls into neon_lock
244 : neon_local_lock: tokio::sync::Mutex<()>,
245 : }
246 :
247 : impl ComputeHook {
248 0 : pub(super) fn new(config: Config) -> Self {
249 0 : let authorization_header = config
250 0 : .control_plane_jwt_token
251 0 : .clone()
252 0 : .map(|jwt| format!("Bearer {}", jwt));
253 0 :
254 0 : Self {
255 0 : state: Default::default(),
256 0 : config,
257 0 : authorization_header,
258 0 : neon_local_lock: Default::default(),
259 0 : api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
260 0 : }
261 0 : }
262 :
263 : /// For test environments: use neon_local's LocalEnv to update compute
264 0 : async fn do_notify_local(
265 0 : &self,
266 0 : reconfigure_request: &ComputeHookNotifyRequest,
267 0 : ) -> anyhow::Result<()> {
268 : // neon_local updates are not safe to call concurrently, use a lock to serialize
269 : // all calls to this function
270 0 : let _locked = self.neon_local_lock.lock().await;
271 :
272 0 : let env = match LocalEnv::load_config() {
273 0 : Ok(e) => e,
274 0 : Err(e) => {
275 0 : tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
276 0 : return Ok(());
277 : }
278 : };
279 0 : let cplane =
280 0 : ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
281 0 : let ComputeHookNotifyRequest {
282 0 : tenant_id,
283 0 : shards,
284 0 : stripe_size,
285 0 : } = reconfigure_request;
286 0 :
287 0 : let compute_pageservers = shards
288 0 : .iter()
289 0 : .map(|shard| {
290 0 : let ps_conf = env
291 0 : .get_pageserver_conf(shard.node_id)
292 0 : .expect("Unknown pageserver");
293 0 : let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
294 0 : .expect("Unable to parse listen_pg_addr");
295 0 : (pg_host, pg_port.unwrap_or(5432))
296 0 : })
297 0 : .collect::<Vec<_>>();
298 :
299 0 : for (endpoint_name, endpoint) in &cplane.endpoints {
300 0 : if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
301 0 : tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
302 0 : endpoint
303 0 : .reconfigure(compute_pageservers.clone(), *stripe_size)
304 0 : .await?;
305 0 : }
306 : }
307 :
308 0 : Ok(())
309 0 : }
310 :
311 0 : async fn do_notify_iteration(
312 0 : &self,
313 0 : client: &reqwest::Client,
314 0 : url: &String,
315 0 : reconfigure_request: &ComputeHookNotifyRequest,
316 0 : cancel: &CancellationToken,
317 0 : ) -> Result<(), NotifyError> {
318 0 : let req = client.request(Method::PUT, url);
319 0 : let req = if let Some(value) = &self.authorization_header {
320 0 : req.header(reqwest::header::AUTHORIZATION, value)
321 : } else {
322 0 : req
323 : };
324 :
325 0 : tracing::info!(
326 0 : "Sending notify request to {} ({:?})",
327 0 : url,
328 0 : reconfigure_request
329 0 : );
330 0 : let send_result = req.json(&reconfigure_request).send().await;
331 0 : let response = match send_result {
332 0 : Ok(r) => r,
333 0 : Err(e) => return Err(e.into()),
334 : };
335 :
336 : // Treat all 2xx responses as success
337 0 : if response.status() >= StatusCode::OK && response.status() < StatusCode::MULTIPLE_CHOICES {
338 0 : if response.status() != StatusCode::OK {
339 : // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
340 : // log a warning.
341 0 : tracing::warn!(
342 0 : "Unexpected 2xx response code {} from control plane",
343 0 : response.status()
344 0 : );
345 0 : }
346 :
347 0 : return Ok(());
348 0 : }
349 0 :
350 0 : // Error response codes
351 0 : match response.status() {
352 : StatusCode::TOO_MANY_REQUESTS => {
353 : // TODO: 429 handling should be global: set some state visible to other requests
354 : // so that they will delay before starting, rather than all notifications trying
355 : // once before backing off.
356 0 : tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
357 0 : .await
358 0 : .ok();
359 0 : Err(NotifyError::SlowDown)
360 : }
361 : StatusCode::LOCKED => {
362 : // We consider this fatal, because it's possible that the operation blocking the control one is
363 : // also the one that is waiting for this reconcile. We should let the reconciler calling
364 : // this hook fail, to give control plane a chance to un-lock.
365 0 : tracing::info!("Control plane reports tenant is locked, dropping out of notify");
366 0 : Err(NotifyError::Busy)
367 : }
368 : StatusCode::SERVICE_UNAVAILABLE
369 : | StatusCode::GATEWAY_TIMEOUT
370 0 : | StatusCode::BAD_GATEWAY => Err(NotifyError::Unavailable(response.status())),
371 : StatusCode::BAD_REQUEST | StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
372 0 : Err(NotifyError::Fatal(response.status()))
373 : }
374 0 : _ => Err(NotifyError::Unexpected(response.status())),
375 : }
376 0 : }
377 :
378 0 : async fn do_notify(
379 0 : &self,
380 0 : url: &String,
381 0 : reconfigure_request: &ComputeHookNotifyRequest,
382 0 : cancel: &CancellationToken,
383 0 : ) -> Result<(), NotifyError> {
384 0 : let client = reqwest::Client::new();
385 :
386 : // We hold these semaphore units across all retries, rather than only across each
387 : // HTTP request: this is to preserve fairness and avoid a situation where a retry might
388 : // time out waiting for a semaphore.
389 0 : let _units = self
390 0 : .api_concurrency
391 0 : .acquire()
392 0 : .await
393 : // Interpret closed semaphore as shutdown
394 0 : .map_err(|_| NotifyError::ShuttingDown)?;
395 :
396 0 : backoff::retry(
397 0 : || self.do_notify_iteration(&client, url, reconfigure_request, cancel),
398 0 : |e| {
399 0 : matches!(
400 0 : e,
401 : NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
402 : )
403 0 : },
404 0 : 3,
405 0 : 10,
406 0 : "Send compute notification",
407 0 : cancel,
408 0 : )
409 0 : .await
410 0 : .ok_or_else(|| NotifyError::ShuttingDown)
411 0 : .and_then(|x| x)
412 0 : }
413 :
414 : /// Call this to notify the compute (postgres) tier of new pageservers to use
415 : /// for a tenant. notify() is called by each shard individually, and this function
416 : /// will decide whether an update to the tenant is sent. An update is sent on the
417 : /// condition that:
418 : /// - We know a pageserver for every shard.
419 : /// - All the shards have the same shard_count (i.e. we are not mid-split)
420 : ///
421 : /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
422 : /// that is cancelled.
423 : ///
424 : /// This function is fallible, including in the case that the control plane is transiently
425 : /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
426 : /// periods, but we don't retry forever. The **caller** is responsible for handling failures and
427 : /// ensuring that they eventually call again to ensure that the compute is eventually notified of
428 : /// the proper pageserver nodes for a tenant.
429 0 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
430 : pub(super) async fn notify(
431 : &self,
432 : tenant_shard_id: TenantShardId,
433 : node_id: NodeId,
434 : stripe_size: ShardStripeSize,
435 : cancel: &CancellationToken,
436 : ) -> Result<(), NotifyError> {
437 : let maybe_send_result = {
438 : let mut state_locked = self.state.lock().unwrap();
439 :
440 : use std::collections::hash_map::Entry;
441 : let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
442 : Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
443 : tenant_shard_id,
444 : stripe_size,
445 : node_id,
446 : )),
447 : Entry::Occupied(e) => {
448 : let tenant = e.into_mut();
449 : tenant.update(tenant_shard_id, stripe_size, node_id);
450 : tenant
451 : }
452 : };
453 : tenant.maybe_send(tenant_shard_id.tenant_id, None)
454 : };
455 :
456 : // Process result: we may get an update to send, or we may have to wait for a lock
457 : // before trying again.
458 : let (request, mut send_lock_guard) = match maybe_send_result {
459 : MaybeSendResult::Noop => {
460 : return Ok(());
461 : }
462 : MaybeSendResult::AwaitLock(send_lock) => {
463 : let send_locked = send_lock.lock_owned().await;
464 :
465 : // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
466 : // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
467 : // try_lock.
468 : let state_locked = self.state.lock().unwrap();
469 : let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
470 : return Ok(());
471 : };
472 : match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
473 : MaybeSendResult::AwaitLock(_) => {
474 : unreachable!("We supplied lock guard")
475 : }
476 : MaybeSendResult::Noop => {
477 : return Ok(());
478 : }
479 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
480 : }
481 : }
482 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
483 : };
484 :
485 : let result = if let Some(notify_url) = &self.config.compute_hook_url {
486 : self.do_notify(notify_url, &request, cancel).await
487 : } else {
488 0 : self.do_notify_local(&request).await.map_err(|e| {
489 0 : // This path is for testing only, so munge the error into our prod-style error type.
490 0 : tracing::error!("Local notification hook failed: {e}");
491 0 : NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
492 0 : })
493 : };
494 :
495 : if result.is_ok() {
496 : // Before dropping the send lock, stash the request we just sent so that
497 : // subsequent callers can avoid redundantly re-sending the same thing.
498 : *send_lock_guard = Some(request);
499 : }
500 : result
501 : }
502 : }
503 :
504 : #[cfg(test)]
505 : pub(crate) mod tests {
506 : use pageserver_api::shard::{ShardCount, ShardNumber};
507 : use utils::id::TenantId;
508 :
509 : use super::*;
510 :
511 : #[test]
512 2 : fn tenant_updates() -> anyhow::Result<()> {
513 2 : let tenant_id = TenantId::generate();
514 2 : let mut tenant_state = ComputeHookTenant::new(
515 2 : TenantShardId {
516 2 : tenant_id,
517 2 : shard_count: ShardCount::new(0),
518 2 : shard_number: ShardNumber(0),
519 2 : },
520 2 : ShardStripeSize(12345),
521 2 : NodeId(1),
522 2 : );
523 2 :
524 2 : // An unsharded tenant is always ready to emit a notification, but won't
525 2 : // send the same one twice
526 2 : let send_result = tenant_state.maybe_send(tenant_id, None);
527 2 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
528 0 : anyhow::bail!("Wrong send result");
529 : };
530 2 : assert_eq!(request.shards.len(), 1);
531 2 : assert!(request.stripe_size.is_none());
532 :
533 : // Simulate successful send
534 2 : *guard = Some(request);
535 2 : drop(guard);
536 2 :
537 2 : // Try asking again: this should be a no-op
538 2 : let send_result = tenant_state.maybe_send(tenant_id, None);
539 2 : assert!(matches!(send_result, MaybeSendResult::Noop));
540 :
541 : // Writing the first shard of a multi-sharded situation (i.e. in a split)
542 : // resets the tenant state and puts it in an non-notifying state (need to
543 : // see all shards)
544 2 : tenant_state.update(
545 2 : TenantShardId {
546 2 : tenant_id,
547 2 : shard_count: ShardCount::new(2),
548 2 : shard_number: ShardNumber(1),
549 2 : },
550 2 : ShardStripeSize(32768),
551 2 : NodeId(1),
552 2 : );
553 2 : assert!(matches!(
554 2 : tenant_state.maybe_send(tenant_id, None),
555 : MaybeSendResult::Noop
556 : ));
557 :
558 : // Writing the second shard makes it ready to notify
559 2 : tenant_state.update(
560 2 : TenantShardId {
561 2 : tenant_id,
562 2 : shard_count: ShardCount::new(2),
563 2 : shard_number: ShardNumber(0),
564 2 : },
565 2 : ShardStripeSize(32768),
566 2 : NodeId(1),
567 2 : );
568 2 :
569 2 : let send_result = tenant_state.maybe_send(tenant_id, None);
570 2 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
571 0 : anyhow::bail!("Wrong send result");
572 : };
573 2 : assert_eq!(request.shards.len(), 2);
574 2 : assert_eq!(request.stripe_size, Some(ShardStripeSize(32768)));
575 :
576 : // Simulate successful send
577 2 : *guard = Some(request);
578 2 : drop(guard);
579 2 :
580 2 : Ok(())
581 2 : }
582 : }
|