Line data Source code
1 : use std::sync::Arc;
2 : use std::{collections::HashMap, time::Duration};
3 :
4 : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
5 : use control_plane::local_env::LocalEnv;
6 : use hyper::{Method, StatusCode};
7 : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
8 : use postgres_connection::parse_host_port;
9 : use serde::{Deserialize, Serialize};
10 : use tokio_util::sync::CancellationToken;
11 : use utils::{
12 : backoff::{self},
13 : id::{NodeId, TenantId},
14 : };
15 :
16 : use crate::service::Config;
17 :
18 : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
19 :
20 : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
21 :
22 : pub(crate) const API_CONCURRENCY: usize = 32;
23 :
24 : struct UnshardedComputeHookTenant {
25 : // Which node is this tenant attached to
26 : node_id: NodeId,
27 :
28 : // Must hold this lock to send a notification.
29 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
30 : }
31 : struct ShardedComputeHookTenant {
32 : stripe_size: ShardStripeSize,
33 : shard_count: ShardCount,
34 : shards: Vec<(ShardNumber, NodeId)>,
35 :
36 : // Must hold this lock to send a notification. The contents represent
37 : // the last successfully sent notification, and are used to coalesce multiple
38 : // updates by only sending when there is a chance since our last successful send.
39 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>,
40 : }
41 :
42 : enum ComputeHookTenant {
43 : Unsharded(UnshardedComputeHookTenant),
44 : Sharded(ShardedComputeHookTenant),
45 : }
46 :
47 : impl ComputeHookTenant {
48 : /// Construct with at least one shard's information
49 4 : fn new(tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize, node_id: NodeId) -> Self {
50 4 : if tenant_shard_id.shard_count.count() > 1 {
51 2 : Self::Sharded(ShardedComputeHookTenant {
52 2 : shards: vec![(tenant_shard_id.shard_number, node_id)],
53 2 : stripe_size,
54 2 : shard_count: tenant_shard_id.shard_count,
55 2 : send_lock: Arc::default(),
56 2 : })
57 : } else {
58 2 : Self::Unsharded(UnshardedComputeHookTenant {
59 2 : node_id,
60 2 : send_lock: Arc::default(),
61 2 : })
62 : }
63 4 : }
64 :
65 8 : fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>> {
66 8 : match self {
67 4 : Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
68 4 : Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
69 : }
70 8 : }
71 :
72 : /// Set one shard's location. If stripe size or shard count have changed, Self is reset
73 : /// and drops existing content.
74 4 : fn update(
75 4 : &mut self,
76 4 : tenant_shard_id: TenantShardId,
77 4 : stripe_size: ShardStripeSize,
78 4 : node_id: NodeId,
79 4 : ) {
80 2 : match self {
81 2 : Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
82 0 : unsharded_tenant.node_id = node_id
83 : }
84 2 : Self::Sharded(sharded_tenant)
85 2 : if sharded_tenant.stripe_size == stripe_size
86 2 : && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
87 : {
88 2 : if let Some(existing) = sharded_tenant
89 2 : .shards
90 2 : .iter()
91 2 : .position(|s| s.0 == tenant_shard_id.shard_number)
92 0 : {
93 0 : sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
94 0 : } else {
95 2 : sharded_tenant
96 2 : .shards
97 2 : .push((tenant_shard_id.shard_number, node_id));
98 4 : sharded_tenant.shards.sort_by_key(|s| s.0)
99 : }
100 : }
101 2 : _ => {
102 2 : // Shard count changed: reset struct.
103 2 : *self = Self::new(tenant_shard_id, stripe_size, node_id);
104 2 : }
105 : }
106 4 : }
107 : }
108 :
109 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
110 : struct ComputeHookNotifyRequestShard {
111 : node_id: NodeId,
112 : shard_number: ShardNumber,
113 : }
114 :
115 : /// Request body that we send to the control plane to notify it of where a tenant is attached
116 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
117 : struct ComputeHookNotifyRequest {
118 : tenant_id: TenantId,
119 : stripe_size: Option<ShardStripeSize>,
120 : shards: Vec<ComputeHookNotifyRequestShard>,
121 : }
122 :
123 : /// Error type for attempts to call into the control plane compute notification hook
124 0 : #[derive(thiserror::Error, Debug)]
125 : pub(crate) enum NotifyError {
126 : // Request was not send successfully, e.g. transport error
127 : #[error("Sending request: {0}")]
128 : Request(#[from] reqwest::Error),
129 : // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
130 : #[error("Control plane tenant busy")]
131 : Busy,
132 : // Explicit 429 response asking us to retry less frequently
133 : #[error("Control plane overloaded")]
134 : SlowDown,
135 : // A 503 response indicates the control plane can't handle the request right now
136 : #[error("Control plane unavailable (status {0})")]
137 : Unavailable(StatusCode),
138 : // API returned unexpected non-success status. We will retry, but log a warning.
139 : #[error("Control plane returned unexpected status {0}")]
140 : Unexpected(StatusCode),
141 : // We shutdown while sending
142 : #[error("Shutting down")]
143 : ShuttingDown,
144 : // A response indicates we will never succeed, such as 400 or 404
145 : #[error("Non-retryable error {0}")]
146 : Fatal(StatusCode),
147 : }
148 :
149 : enum MaybeSendResult {
150 : // Please send this request while holding the lock, and if you succeed then write
151 : // the request into the lock.
152 : Transmit(
153 : (
154 : ComputeHookNotifyRequest,
155 : tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>,
156 : ),
157 : ),
158 : // Something requires sending, but you must wait for a current sender then call again
159 : AwaitLock(Arc<tokio::sync::Mutex<Option<ComputeHookNotifyRequest>>>),
160 : // Nothing requires sending
161 : Noop,
162 : }
163 :
164 : impl ComputeHookTenant {
165 8 : fn maybe_send(
166 8 : &self,
167 8 : tenant_id: TenantId,
168 8 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeHookNotifyRequest>>>,
169 8 : ) -> MaybeSendResult {
170 8 : let locked = match lock {
171 0 : Some(already_locked) => already_locked,
172 : None => {
173 : // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::state`] lock.
174 8 : let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
175 0 : return MaybeSendResult::AwaitLock(self.get_send_lock().clone());
176 : };
177 8 : locked
178 : }
179 : };
180 :
181 8 : let request = match self {
182 4 : Self::Unsharded(unsharded_tenant) => Some(ComputeHookNotifyRequest {
183 4 : tenant_id,
184 4 : shards: vec![ComputeHookNotifyRequestShard {
185 4 : shard_number: ShardNumber(0),
186 4 : node_id: unsharded_tenant.node_id,
187 4 : }],
188 4 : stripe_size: None,
189 4 : }),
190 4 : Self::Sharded(sharded_tenant)
191 4 : if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
192 2 : {
193 2 : Some(ComputeHookNotifyRequest {
194 2 : tenant_id,
195 2 : shards: sharded_tenant
196 2 : .shards
197 2 : .iter()
198 4 : .map(|(shard_number, node_id)| ComputeHookNotifyRequestShard {
199 4 : shard_number: *shard_number,
200 4 : node_id: *node_id,
201 4 : })
202 2 : .collect(),
203 2 : stripe_size: Some(sharded_tenant.stripe_size),
204 2 : })
205 : }
206 2 : Self::Sharded(sharded_tenant) => {
207 2 : // Sharded tenant doesn't yet have information for all its shards
208 2 :
209 2 : tracing::info!(
210 0 : "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
211 0 : sharded_tenant.shards.len(),
212 0 : sharded_tenant.shard_count.count()
213 0 : );
214 2 : None
215 : }
216 : };
217 :
218 6 : match request {
219 : None => {
220 : // Not yet ready to emit a notification
221 2 : tracing::info!("Tenant isn't yet ready to emit a notification");
222 2 : MaybeSendResult::Noop
223 : }
224 6 : Some(request) if Some(&request) == locked.as_ref() => {
225 2 : // No change from the last value successfully sent
226 2 : MaybeSendResult::Noop
227 : }
228 4 : Some(request) => MaybeSendResult::Transmit((request, locked)),
229 : }
230 8 : }
231 : }
232 :
233 : /// The compute hook is a destination for notifications about changes to tenant:pageserver
234 : /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
235 : /// the compute connection string.
236 : pub(super) struct ComputeHook {
237 : config: Config,
238 : state: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
239 : authorization_header: Option<String>,
240 :
241 : // Concurrency limiter, so that we do not overload the cloud control plane when updating
242 : // large numbers of tenants (e.g. when failing over after a node failure)
243 : api_concurrency: tokio::sync::Semaphore,
244 :
245 : // This lock is only used in testing enviroments, to serialize calls into neon_lock
246 : neon_local_lock: tokio::sync::Mutex<()>,
247 :
248 : // We share a client across all notifications to enable connection re-use etc when
249 : // sending large numbers of notifications
250 : client: reqwest::Client,
251 : }
252 :
253 : impl ComputeHook {
254 0 : pub(super) fn new(config: Config) -> Self {
255 0 : let authorization_header = config
256 0 : .control_plane_jwt_token
257 0 : .clone()
258 0 : .map(|jwt| format!("Bearer {}", jwt));
259 0 :
260 0 : let client = reqwest::ClientBuilder::new()
261 0 : .timeout(NOTIFY_REQUEST_TIMEOUT)
262 0 : .build()
263 0 : .expect("Failed to construct HTTP client");
264 0 :
265 0 : Self {
266 0 : state: Default::default(),
267 0 : config,
268 0 : authorization_header,
269 0 : neon_local_lock: Default::default(),
270 0 : api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
271 0 : client,
272 0 : }
273 0 : }
274 :
275 : /// For test environments: use neon_local's LocalEnv to update compute
276 0 : async fn do_notify_local(
277 0 : &self,
278 0 : reconfigure_request: &ComputeHookNotifyRequest,
279 0 : ) -> anyhow::Result<()> {
280 : // neon_local updates are not safe to call concurrently, use a lock to serialize
281 : // all calls to this function
282 0 : let _locked = self.neon_local_lock.lock().await;
283 :
284 0 : let env = match LocalEnv::load_config() {
285 0 : Ok(e) => e,
286 0 : Err(e) => {
287 0 : tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
288 0 : return Ok(());
289 : }
290 : };
291 0 : let cplane =
292 0 : ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
293 0 : let ComputeHookNotifyRequest {
294 0 : tenant_id,
295 0 : shards,
296 0 : stripe_size,
297 0 : } = reconfigure_request;
298 0 :
299 0 : let compute_pageservers = shards
300 0 : .iter()
301 0 : .map(|shard| {
302 0 : let ps_conf = env
303 0 : .get_pageserver_conf(shard.node_id)
304 0 : .expect("Unknown pageserver");
305 0 : let (pg_host, pg_port) = parse_host_port(&ps_conf.listen_pg_addr)
306 0 : .expect("Unable to parse listen_pg_addr");
307 0 : (pg_host, pg_port.unwrap_or(5432))
308 0 : })
309 0 : .collect::<Vec<_>>();
310 :
311 0 : for (endpoint_name, endpoint) in &cplane.endpoints {
312 0 : if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
313 0 : tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
314 0 : endpoint
315 0 : .reconfigure(compute_pageservers.clone(), *stripe_size)
316 0 : .await?;
317 0 : }
318 : }
319 :
320 0 : Ok(())
321 0 : }
322 :
323 0 : async fn do_notify_iteration(
324 0 : &self,
325 0 : url: &String,
326 0 : reconfigure_request: &ComputeHookNotifyRequest,
327 0 : cancel: &CancellationToken,
328 0 : ) -> Result<(), NotifyError> {
329 0 : let req = self.client.request(Method::PUT, url);
330 0 : let req = if let Some(value) = &self.authorization_header {
331 0 : req.header(reqwest::header::AUTHORIZATION, value)
332 : } else {
333 0 : req
334 : };
335 :
336 0 : tracing::info!(
337 0 : "Sending notify request to {} ({:?})",
338 0 : url,
339 0 : reconfigure_request
340 0 : );
341 0 : let send_result = req.json(&reconfigure_request).send().await;
342 0 : let response = match send_result {
343 0 : Ok(r) => r,
344 0 : Err(e) => return Err(e.into()),
345 : };
346 :
347 : // Treat all 2xx responses as success
348 0 : if response.status() >= StatusCode::OK && response.status() < StatusCode::MULTIPLE_CHOICES {
349 0 : if response.status() != StatusCode::OK {
350 : // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
351 : // log a warning.
352 0 : tracing::warn!(
353 0 : "Unexpected 2xx response code {} from control plane",
354 0 : response.status()
355 0 : );
356 0 : }
357 :
358 0 : return Ok(());
359 0 : }
360 0 :
361 0 : // Error response codes
362 0 : match response.status() {
363 : StatusCode::TOO_MANY_REQUESTS => {
364 : // TODO: 429 handling should be global: set some state visible to other requests
365 : // so that they will delay before starting, rather than all notifications trying
366 : // once before backing off.
367 0 : tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
368 0 : .await
369 0 : .ok();
370 0 : Err(NotifyError::SlowDown)
371 : }
372 : StatusCode::LOCKED => {
373 : // We consider this fatal, because it's possible that the operation blocking the control one is
374 : // also the one that is waiting for this reconcile. We should let the reconciler calling
375 : // this hook fail, to give control plane a chance to un-lock.
376 0 : tracing::info!("Control plane reports tenant is locked, dropping out of notify");
377 0 : Err(NotifyError::Busy)
378 : }
379 : StatusCode::SERVICE_UNAVAILABLE
380 : | StatusCode::GATEWAY_TIMEOUT
381 0 : | StatusCode::BAD_GATEWAY => Err(NotifyError::Unavailable(response.status())),
382 : StatusCode::BAD_REQUEST | StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
383 0 : Err(NotifyError::Fatal(response.status()))
384 : }
385 0 : _ => Err(NotifyError::Unexpected(response.status())),
386 : }
387 0 : }
388 :
389 0 : async fn do_notify(
390 0 : &self,
391 0 : url: &String,
392 0 : reconfigure_request: &ComputeHookNotifyRequest,
393 0 : cancel: &CancellationToken,
394 0 : ) -> Result<(), NotifyError> {
395 : // We hold these semaphore units across all retries, rather than only across each
396 : // HTTP request: this is to preserve fairness and avoid a situation where a retry might
397 : // time out waiting for a semaphore.
398 0 : let _units = self
399 0 : .api_concurrency
400 0 : .acquire()
401 0 : .await
402 : // Interpret closed semaphore as shutdown
403 0 : .map_err(|_| NotifyError::ShuttingDown)?;
404 :
405 0 : backoff::retry(
406 0 : || self.do_notify_iteration(url, reconfigure_request, cancel),
407 0 : |e| {
408 0 : matches!(
409 0 : e,
410 : NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
411 : )
412 0 : },
413 0 : 3,
414 0 : 10,
415 0 : "Send compute notification",
416 0 : cancel,
417 0 : )
418 0 : .await
419 0 : .ok_or_else(|| NotifyError::ShuttingDown)
420 0 : .and_then(|x| x)
421 0 : }
422 :
423 : /// Call this to notify the compute (postgres) tier of new pageservers to use
424 : /// for a tenant. notify() is called by each shard individually, and this function
425 : /// will decide whether an update to the tenant is sent. An update is sent on the
426 : /// condition that:
427 : /// - We know a pageserver for every shard.
428 : /// - All the shards have the same shard_count (i.e. we are not mid-split)
429 : ///
430 : /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
431 : /// that is cancelled.
432 : ///
433 : /// This function is fallible, including in the case that the control plane is transiently
434 : /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
435 : /// periods, but we don't retry forever. The **caller** is responsible for handling failures and
436 : /// ensuring that they eventually call again to ensure that the compute is eventually notified of
437 : /// the proper pageserver nodes for a tenant.
438 0 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
439 : pub(super) async fn notify(
440 : &self,
441 : tenant_shard_id: TenantShardId,
442 : node_id: NodeId,
443 : stripe_size: ShardStripeSize,
444 : cancel: &CancellationToken,
445 : ) -> Result<(), NotifyError> {
446 : let maybe_send_result = {
447 : let mut state_locked = self.state.lock().unwrap();
448 :
449 : use std::collections::hash_map::Entry;
450 : let tenant = match state_locked.entry(tenant_shard_id.tenant_id) {
451 : Entry::Vacant(e) => e.insert(ComputeHookTenant::new(
452 : tenant_shard_id,
453 : stripe_size,
454 : node_id,
455 : )),
456 : Entry::Occupied(e) => {
457 : let tenant = e.into_mut();
458 : tenant.update(tenant_shard_id, stripe_size, node_id);
459 : tenant
460 : }
461 : };
462 : tenant.maybe_send(tenant_shard_id.tenant_id, None)
463 : };
464 :
465 : // Process result: we may get an update to send, or we may have to wait for a lock
466 : // before trying again.
467 : let (request, mut send_lock_guard) = match maybe_send_result {
468 : MaybeSendResult::Noop => {
469 : return Ok(());
470 : }
471 : MaybeSendResult::AwaitLock(send_lock) => {
472 : let send_locked = send_lock.lock_owned().await;
473 :
474 : // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
475 : // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
476 : // try_lock.
477 : let state_locked = self.state.lock().unwrap();
478 : let Some(tenant) = state_locked.get(&tenant_shard_id.tenant_id) else {
479 : return Ok(());
480 : };
481 : match tenant.maybe_send(tenant_shard_id.tenant_id, Some(send_locked)) {
482 : MaybeSendResult::AwaitLock(_) => {
483 : unreachable!("We supplied lock guard")
484 : }
485 : MaybeSendResult::Noop => {
486 : return Ok(());
487 : }
488 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
489 : }
490 : }
491 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
492 : };
493 :
494 : let result = if let Some(notify_url) = &self.config.compute_hook_url {
495 : self.do_notify(notify_url, &request, cancel).await
496 : } else {
497 0 : self.do_notify_local(&request).await.map_err(|e| {
498 0 : // This path is for testing only, so munge the error into our prod-style error type.
499 0 : tracing::error!("Local notification hook failed: {e}");
500 0 : NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
501 0 : })
502 : };
503 :
504 : if result.is_ok() {
505 : // Before dropping the send lock, stash the request we just sent so that
506 : // subsequent callers can avoid redundantly re-sending the same thing.
507 : *send_lock_guard = Some(request);
508 : }
509 : result
510 : }
511 : }
512 :
513 : #[cfg(test)]
514 : pub(crate) mod tests {
515 : use pageserver_api::shard::{ShardCount, ShardNumber};
516 : use utils::id::TenantId;
517 :
518 : use super::*;
519 :
520 : #[test]
521 2 : fn tenant_updates() -> anyhow::Result<()> {
522 2 : let tenant_id = TenantId::generate();
523 2 : let mut tenant_state = ComputeHookTenant::new(
524 2 : TenantShardId {
525 2 : tenant_id,
526 2 : shard_count: ShardCount::new(0),
527 2 : shard_number: ShardNumber(0),
528 2 : },
529 2 : ShardStripeSize(12345),
530 2 : NodeId(1),
531 2 : );
532 2 :
533 2 : // An unsharded tenant is always ready to emit a notification, but won't
534 2 : // send the same one twice
535 2 : let send_result = tenant_state.maybe_send(tenant_id, None);
536 2 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
537 0 : anyhow::bail!("Wrong send result");
538 : };
539 2 : assert_eq!(request.shards.len(), 1);
540 2 : assert!(request.stripe_size.is_none());
541 :
542 : // Simulate successful send
543 2 : *guard = Some(request);
544 2 : drop(guard);
545 2 :
546 2 : // Try asking again: this should be a no-op
547 2 : let send_result = tenant_state.maybe_send(tenant_id, None);
548 2 : assert!(matches!(send_result, MaybeSendResult::Noop));
549 :
550 : // Writing the first shard of a multi-sharded situation (i.e. in a split)
551 : // resets the tenant state and puts it in an non-notifying state (need to
552 : // see all shards)
553 2 : tenant_state.update(
554 2 : TenantShardId {
555 2 : tenant_id,
556 2 : shard_count: ShardCount::new(2),
557 2 : shard_number: ShardNumber(1),
558 2 : },
559 2 : ShardStripeSize(32768),
560 2 : NodeId(1),
561 2 : );
562 2 : assert!(matches!(
563 2 : tenant_state.maybe_send(tenant_id, None),
564 : MaybeSendResult::Noop
565 : ));
566 :
567 : // Writing the second shard makes it ready to notify
568 2 : tenant_state.update(
569 2 : TenantShardId {
570 2 : tenant_id,
571 2 : shard_count: ShardCount::new(2),
572 2 : shard_number: ShardNumber(0),
573 2 : },
574 2 : ShardStripeSize(32768),
575 2 : NodeId(1),
576 2 : );
577 2 :
578 2 : let send_result = tenant_state.maybe_send(tenant_id, None);
579 2 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
580 0 : anyhow::bail!("Wrong send result");
581 : };
582 2 : assert_eq!(request.shards.len(), 2);
583 2 : assert_eq!(request.stripe_size, Some(ShardStripeSize(32768)));
584 :
585 : // Simulate successful send
586 2 : *guard = Some(request);
587 2 : drop(guard);
588 2 :
589 2 : Ok(())
590 2 : }
591 : }
|