Line data Source code
1 : use std::borrow::Cow;
2 : use std::collections::HashMap;
3 : use std::error::Error as _;
4 : use std::sync::Arc;
5 : use std::time::Duration;
6 :
7 : use anyhow::Context;
8 : use compute_api::spec::PageserverProtocol;
9 : use control_plane::endpoint::{ComputeControlPlane, EndpointStatus};
10 : use control_plane::local_env::LocalEnv;
11 : use futures::StreamExt;
12 : use hyper::StatusCode;
13 : use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT;
14 : use pageserver_api::controller_api::AvailabilityZone;
15 : use pageserver_api::shard::{ShardCount, ShardNumber, ShardStripeSize, TenantShardId};
16 : use postgres_connection::parse_host_port;
17 : use safekeeper_api::membership::SafekeeperGeneration;
18 : use serde::{Deserialize, Serialize};
19 : use tokio_util::sync::CancellationToken;
20 : use tracing::{Instrument, info_span};
21 : use utils::backoff::{self};
22 : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
23 :
24 : use crate::service::Config;
25 :
26 : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
27 :
28 : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
29 :
30 : pub(crate) const API_CONCURRENCY: usize = 32;
31 :
32 : struct UnshardedComputeHookTenant {
33 : // Which node is this tenant attached to
34 : node_id: NodeId,
35 :
36 : // The tenant's preferred AZ, so that we may pass this on to the control plane
37 : preferred_az: Option<AvailabilityZone>,
38 :
39 : // Must hold this lock to send a notification.
40 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>>,
41 : }
42 : struct ShardedComputeHookTenant {
43 : stripe_size: ShardStripeSize,
44 : shard_count: ShardCount,
45 : shards: Vec<(ShardNumber, NodeId)>,
46 :
47 : // The tenant's preferred AZ, so that we may pass this on to the control plane
48 : preferred_az: Option<AvailabilityZone>,
49 :
50 : // Must hold this lock to send a notification. The contents represent
51 : // the last successfully sent notification, and are used to coalesce multiple
52 : // updates by only sending when there is a chance since our last successful send.
53 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>>,
54 : }
55 :
56 : /// Represents our knowledge of the compute's state: we can update this when we get a
57 : /// response from a notify API call, which tells us what has been applied.
58 : ///
59 : /// Should be wrapped in an Option<>, as we cannot always know the remote state.
60 : #[derive(PartialEq, Eq, Debug)]
61 : struct ComputeRemoteState<R> {
62 : // The request body which was acked by the compute
63 : request: R,
64 :
65 : // Whether the cplane indicated that the state was applied to running computes, or just
66 : // persisted. In the Neon control plane, this is the difference between a 423 response (meaning
67 : // persisted but not applied), and a 2xx response (both persisted and applied)
68 : applied: bool,
69 : }
70 :
71 : type ComputeRemoteTenantState = ComputeRemoteState<NotifyAttachRequest>;
72 : type ComputeRemoteTimelineState = ComputeRemoteState<NotifySafekeepersRequest>;
73 :
74 : /// The trait which define the handler-specific types and methods.
75 : /// We have two implementations of this trait so far:
76 : /// - [`ComputeHookTenant`] for tenant attach notifications ("/notify-attach")
77 : /// - [`ComputeHookTimeline`] for safekeeper change notifications ("/notify-safekeepers")
78 : trait ApiMethod {
79 : /// Type of the key which identifies the resource.
80 : /// It's either TenantId for tenant attach notifications,
81 : /// or TenantTimelineId for safekeeper change notifications.
82 : type Key: std::cmp::Eq + std::hash::Hash + Clone;
83 :
84 : type Request: serde::Serialize + std::fmt::Debug;
85 :
86 : const API_PATH: &'static str;
87 :
88 : fn maybe_send(
89 : &self,
90 : key: Self::Key,
91 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState<Self::Request>>>>,
92 : ) -> MaybeSendResult<Self::Request, Self::Key>;
93 :
94 : async fn notify_local(
95 : env: &LocalEnv,
96 : cplane: &ComputeControlPlane,
97 : req: &Self::Request,
98 : ) -> Result<(), NotifyError>;
99 : }
100 :
101 : enum ComputeHookTenant {
102 : Unsharded(UnshardedComputeHookTenant),
103 : Sharded(ShardedComputeHookTenant),
104 : }
105 :
106 : impl ComputeHookTenant {
107 : /// Construct with at least one shard's information
108 2 : fn new(
109 2 : tenant_shard_id: TenantShardId,
110 2 : stripe_size: ShardStripeSize,
111 2 : preferred_az: Option<AvailabilityZone>,
112 2 : node_id: NodeId,
113 2 : ) -> Self {
114 2 : if tenant_shard_id.shard_count.count() > 1 {
115 1 : Self::Sharded(ShardedComputeHookTenant {
116 1 : shards: vec![(tenant_shard_id.shard_number, node_id)],
117 1 : stripe_size,
118 1 : shard_count: tenant_shard_id.shard_count,
119 1 : preferred_az,
120 1 : send_lock: Arc::default(),
121 1 : })
122 : } else {
123 1 : Self::Unsharded(UnshardedComputeHookTenant {
124 1 : node_id,
125 1 : preferred_az,
126 1 : send_lock: Arc::default(),
127 1 : })
128 : }
129 2 : }
130 :
131 4 : fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>> {
132 4 : match self {
133 2 : Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
134 2 : Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
135 : }
136 4 : }
137 :
138 0 : fn is_sharded(&self) -> bool {
139 0 : matches!(self, ComputeHookTenant::Sharded(_))
140 0 : }
141 :
142 : /// Clear compute hook state for the specified shard.
143 : /// Only valid for [`ComputeHookTenant::Sharded`] instances.
144 0 : fn remove_shard(&mut self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize) {
145 0 : match self {
146 0 : ComputeHookTenant::Sharded(sharded) => {
147 0 : if sharded.stripe_size != stripe_size
148 0 : || sharded.shard_count != tenant_shard_id.shard_count
149 : {
150 0 : tracing::warn!("Shard split detected while handling detach")
151 0 : }
152 :
153 0 : let shard_idx = sharded.shards.iter().position(|(shard_number, _node_id)| {
154 0 : *shard_number == tenant_shard_id.shard_number
155 0 : });
156 :
157 0 : if let Some(shard_idx) = shard_idx {
158 0 : sharded.shards.remove(shard_idx);
159 0 : } else {
160 : // This is a valid but niche case, where the tenant was previously attached
161 : // as a Secondary location and then detached, so has no previously notified
162 : // state.
163 0 : tracing::info!("Shard not found while handling detach")
164 : }
165 : }
166 : ComputeHookTenant::Unsharded(_) => {
167 0 : unreachable!("Detach of unsharded tenants is handled externally");
168 : }
169 : }
170 0 : }
171 :
172 : /// Set one shard's location. If stripe size or shard count have changed, Self is reset
173 : /// and drops existing content.
174 2 : fn update(&mut self, shard_update: ShardUpdate) {
175 2 : let tenant_shard_id = shard_update.tenant_shard_id;
176 2 : let node_id = shard_update.node_id;
177 2 : let stripe_size = shard_update.stripe_size;
178 2 : let preferred_az = shard_update.preferred_az;
179 :
180 1 : match self {
181 1 : Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
182 0 : unsharded_tenant.node_id = node_id;
183 0 : if unsharded_tenant.preferred_az.as_ref()
184 0 : != preferred_az.as_ref().map(|az| az.as_ref())
185 : {
186 0 : unsharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
187 0 : }
188 : }
189 1 : Self::Sharded(sharded_tenant)
190 1 : if sharded_tenant.stripe_size == stripe_size
191 1 : && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
192 : {
193 1 : if let Some(existing) = sharded_tenant
194 1 : .shards
195 1 : .iter()
196 1 : .position(|s| s.0 == tenant_shard_id.shard_number)
197 0 : {
198 0 : sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
199 0 : } else {
200 1 : sharded_tenant
201 1 : .shards
202 1 : .push((tenant_shard_id.shard_number, node_id));
203 1 : sharded_tenant.shards.sort_by_key(|s| s.0)
204 : }
205 :
206 1 : if sharded_tenant.preferred_az.as_ref()
207 1 : != preferred_az.as_ref().map(|az| az.as_ref())
208 : {
209 0 : sharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
210 1 : }
211 : }
212 : _ => {
213 : // Shard count changed: reset struct.
214 1 : *self = Self::new(
215 1 : tenant_shard_id,
216 1 : stripe_size,
217 1 : preferred_az.map(|az| az.into_owned()),
218 1 : node_id,
219 : );
220 : }
221 : }
222 2 : }
223 : }
224 :
225 : /// The state of a timeline we need to notify the compute about.
226 : struct ComputeHookTimeline {
227 : generation: SafekeeperGeneration,
228 : safekeepers: Vec<SafekeeperInfo>,
229 :
230 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTimelineState>>>,
231 : }
232 :
233 : impl ComputeHookTimeline {
234 : /// Construct a new ComputeHookTimeline with the given safekeepers and generation.
235 0 : fn new(generation: SafekeeperGeneration, safekeepers: Vec<SafekeeperInfo>) -> Self {
236 0 : Self {
237 0 : generation,
238 0 : safekeepers,
239 0 : send_lock: Arc::default(),
240 0 : }
241 0 : }
242 :
243 : /// Update the state with a new SafekeepersUpdate.
244 : /// Noop if the update generation is not greater than the current generation.
245 0 : fn update(&mut self, sk_update: SafekeepersUpdate) {
246 0 : if sk_update.generation > self.generation {
247 0 : self.generation = sk_update.generation;
248 0 : self.safekeepers = sk_update.safekeepers;
249 0 : }
250 0 : }
251 : }
252 :
253 : impl ApiMethod for ComputeHookTimeline {
254 : type Key = TenantTimelineId;
255 : type Request = NotifySafekeepersRequest;
256 :
257 : const API_PATH: &'static str = "notify-safekeepers";
258 :
259 0 : fn maybe_send(
260 0 : &self,
261 0 : ttid: TenantTimelineId,
262 0 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteTimelineState>>>,
263 0 : ) -> MaybeSendNotifySafekeepersResult {
264 0 : let locked = match lock {
265 0 : Some(already_locked) => already_locked,
266 : None => {
267 : // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::timelines`] lock.
268 0 : let Ok(locked) = self.send_lock.clone().try_lock_owned() else {
269 0 : return MaybeSendResult::AwaitLock((ttid, self.send_lock.clone()));
270 : };
271 0 : locked
272 : }
273 : };
274 :
275 0 : if locked
276 0 : .as_ref()
277 0 : .is_some_and(|s| s.request.generation >= self.generation)
278 : {
279 0 : return MaybeSendResult::Noop;
280 0 : }
281 :
282 0 : MaybeSendResult::Transmit((
283 0 : NotifySafekeepersRequest {
284 0 : tenant_id: ttid.tenant_id,
285 0 : timeline_id: ttid.timeline_id,
286 0 : generation: self.generation,
287 0 : safekeepers: self.safekeepers.clone(),
288 0 : },
289 0 : locked,
290 0 : ))
291 0 : }
292 :
293 0 : async fn notify_local(
294 0 : _env: &LocalEnv,
295 0 : cplane: &ComputeControlPlane,
296 0 : req: &NotifySafekeepersRequest,
297 0 : ) -> Result<(), NotifyError> {
298 : let NotifySafekeepersRequest {
299 0 : tenant_id,
300 0 : timeline_id,
301 0 : generation,
302 0 : safekeepers,
303 0 : } = req;
304 :
305 0 : for (endpoint_name, endpoint) in &cplane.endpoints {
306 0 : if endpoint.tenant_id == *tenant_id
307 0 : && endpoint.timeline_id == *timeline_id
308 0 : && endpoint.status() == EndpointStatus::Running
309 : {
310 0 : tracing::info!("Reconfiguring safekeepers for endpoint {endpoint_name}");
311 :
312 0 : let safekeepers = safekeepers.iter().map(|sk| sk.id).collect::<Vec<_>>();
313 :
314 0 : endpoint
315 0 : .reconfigure_safekeepers(safekeepers, *generation)
316 0 : .await
317 0 : .map_err(NotifyError::NeonLocal)?;
318 0 : }
319 : }
320 :
321 0 : Ok(())
322 0 : }
323 : }
324 :
325 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
326 : struct NotifyAttachRequestShard {
327 : node_id: NodeId,
328 : shard_number: ShardNumber,
329 : }
330 :
331 : /// Request body that we send to the control plane to notify it of where a tenant is attached
332 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
333 : struct NotifyAttachRequest {
334 : tenant_id: TenantId,
335 : preferred_az: Option<String>,
336 : stripe_size: Option<ShardStripeSize>,
337 : shards: Vec<NotifyAttachRequestShard>,
338 : }
339 :
340 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
341 : pub(crate) struct SafekeeperInfo {
342 : pub id: NodeId,
343 : /// Hostname of the safekeeper.
344 : /// It exists for better debuggability. Might be missing.
345 : /// Should not be used for anything else.
346 : pub hostname: Option<String>,
347 : }
348 :
349 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
350 : struct NotifySafekeepersRequest {
351 : tenant_id: TenantId,
352 : timeline_id: TimelineId,
353 : generation: SafekeeperGeneration,
354 : safekeepers: Vec<SafekeeperInfo>,
355 : }
356 :
357 : /// Error type for attempts to call into the control plane compute notification hook
358 : #[derive(thiserror::Error, Debug)]
359 : pub(crate) enum NotifyError {
360 : // Request was not send successfully, e.g. transport error
361 0 : #[error("Sending request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
362 : Request(#[from] reqwest::Error),
363 : // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
364 : #[error("Control plane tenant busy")]
365 : Busy,
366 : // Explicit 429 response asking us to retry less frequently
367 : #[error("Control plane overloaded")]
368 : SlowDown,
369 : // A 503 response indicates the control plane can't handle the request right now
370 : #[error("Control plane unavailable (status {0})")]
371 : Unavailable(StatusCode),
372 : // API returned unexpected non-success status. We will retry, but log a warning.
373 : #[error("Control plane returned unexpected status {0}")]
374 : Unexpected(StatusCode),
375 : // We shutdown while sending
376 : #[error("Shutting down")]
377 : ShuttingDown,
378 : // A response indicates we will never succeed, such as 400 or 403
379 : #[error("Non-retryable error {0}")]
380 : Fatal(StatusCode),
381 :
382 : #[error("neon_local error: {0}")]
383 : NeonLocal(anyhow::Error),
384 : }
385 :
386 : enum MaybeSendResult<R, K> {
387 : // Please send this request while holding the lock, and if you succeed then write
388 : // the request into the lock.
389 : Transmit(
390 : (
391 : R,
392 : tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState<R>>>,
393 : ),
394 : ),
395 : // Something requires sending, but you must wait for a current sender then call again
396 : AwaitLock((K, Arc<tokio::sync::Mutex<Option<ComputeRemoteState<R>>>>)),
397 : // Nothing requires sending
398 : Noop,
399 : }
400 :
401 : type MaybeSendNotifyAttachResult = MaybeSendResult<NotifyAttachRequest, TenantId>;
402 : type MaybeSendNotifySafekeepersResult = MaybeSendResult<NotifySafekeepersRequest, TenantTimelineId>;
403 :
404 : impl ApiMethod for ComputeHookTenant {
405 : type Key = TenantId;
406 : type Request = NotifyAttachRequest;
407 :
408 : const API_PATH: &'static str = "notify-attach";
409 :
410 4 : fn maybe_send(
411 4 : &self,
412 4 : tenant_id: TenantId,
413 4 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteTenantState>>>,
414 4 : ) -> MaybeSendNotifyAttachResult {
415 4 : let locked = match lock {
416 0 : Some(already_locked) => already_locked,
417 : None => {
418 : // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::tenants`] lock.
419 4 : let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
420 0 : return MaybeSendResult::AwaitLock((tenant_id, self.get_send_lock().clone()));
421 : };
422 4 : locked
423 : }
424 : };
425 :
426 4 : let request = match self {
427 2 : Self::Unsharded(unsharded_tenant) => Some(NotifyAttachRequest {
428 2 : tenant_id,
429 2 : shards: vec![NotifyAttachRequestShard {
430 2 : shard_number: ShardNumber(0),
431 2 : node_id: unsharded_tenant.node_id,
432 2 : }],
433 2 : stripe_size: None,
434 2 : preferred_az: unsharded_tenant
435 2 : .preferred_az
436 2 : .as_ref()
437 2 : .map(|az| az.0.clone()),
438 : }),
439 2 : Self::Sharded(sharded_tenant)
440 2 : if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
441 : {
442 : Some(NotifyAttachRequest {
443 1 : tenant_id,
444 1 : shards: sharded_tenant
445 1 : .shards
446 1 : .iter()
447 1 : .map(|(shard_number, node_id)| NotifyAttachRequestShard {
448 2 : shard_number: *shard_number,
449 2 : node_id: *node_id,
450 2 : })
451 1 : .collect(),
452 1 : stripe_size: Some(sharded_tenant.stripe_size),
453 1 : preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()),
454 : })
455 : }
456 1 : Self::Sharded(sharded_tenant) => {
457 : // Sharded tenant doesn't yet have information for all its shards
458 :
459 1 : tracing::info!(
460 0 : "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
461 0 : sharded_tenant.shards.len(),
462 0 : sharded_tenant.shard_count.count()
463 : );
464 1 : None
465 : }
466 : };
467 :
468 3 : match request {
469 : None => {
470 : // Not yet ready to emit a notification
471 1 : tracing::info!("Tenant isn't yet ready to emit a notification");
472 1 : MaybeSendResult::Noop
473 : }
474 1 : Some(request)
475 3 : if Some(&request) == locked.as_ref().map(|s| &s.request)
476 1 : && locked.as_ref().map(|s| s.applied).unwrap_or(false) =>
477 : {
478 1 : tracing::info!(
479 0 : "Skipping notification because remote state already matches ({:?})",
480 0 : &request
481 : );
482 : // No change from the last value successfully sent, and our state indicates that the last
483 : // value sent was fully applied on the control plane side.
484 1 : MaybeSendResult::Noop
485 : }
486 2 : Some(request) => {
487 : // Our request differs from the last one sent, or the last one sent was not fully applied on the compute side
488 2 : MaybeSendResult::Transmit((request, locked))
489 : }
490 : }
491 4 : }
492 :
493 0 : async fn notify_local(
494 0 : env: &LocalEnv,
495 0 : cplane: &ComputeControlPlane,
496 0 : req: &NotifyAttachRequest,
497 0 : ) -> Result<(), NotifyError> {
498 : let NotifyAttachRequest {
499 0 : tenant_id,
500 0 : shards,
501 0 : stripe_size,
502 0 : preferred_az: _preferred_az,
503 0 : } = req;
504 :
505 0 : for (endpoint_name, endpoint) in &cplane.endpoints {
506 0 : if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
507 0 : tracing::info!("Reconfiguring pageservers for endpoint {endpoint_name}");
508 :
509 0 : let pageservers = shards
510 0 : .iter()
511 0 : .map(|shard| {
512 0 : let ps_conf = env
513 0 : .get_pageserver_conf(shard.node_id)
514 0 : .expect("Unknown pageserver");
515 0 : if endpoint.grpc {
516 0 : let addr = ps_conf.listen_grpc_addr.as_ref().expect("no gRPC address");
517 0 : let (host, port) = parse_host_port(addr).expect("invalid gRPC address");
518 0 : let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT);
519 0 : (PageserverProtocol::Grpc, host, port)
520 : } else {
521 0 : let (host, port) = parse_host_port(&ps_conf.listen_pg_addr)
522 0 : .expect("Unable to parse listen_pg_addr");
523 0 : (PageserverProtocol::Libpq, host, port.unwrap_or(5432))
524 : }
525 0 : })
526 0 : .collect::<Vec<_>>();
527 :
528 0 : endpoint
529 0 : .reconfigure_pageservers(pageservers, *stripe_size)
530 0 : .await
531 0 : .map_err(NotifyError::NeonLocal)?;
532 0 : }
533 : }
534 :
535 0 : Ok(())
536 0 : }
537 : }
538 :
539 : /// The compute hook is a destination for notifications about changes to tenant:pageserver
540 : /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
541 : /// the compute connection string.
542 : pub(super) struct ComputeHook {
543 : config: Config,
544 : tenants: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
545 : timelines: std::sync::Mutex<HashMap<TenantTimelineId, ComputeHookTimeline>>,
546 : authorization_header: Option<String>,
547 :
548 : // Concurrency limiter, so that we do not overload the cloud control plane when updating
549 : // large numbers of tenants (e.g. when failing over after a node failure)
550 : api_concurrency: tokio::sync::Semaphore,
551 :
552 : // This lock is only used in testing enviroments, to serialize calls into neon_local
553 : neon_local_lock: tokio::sync::Mutex<()>,
554 :
555 : // We share a client across all notifications to enable connection re-use etc when
556 : // sending large numbers of notifications
557 : client: reqwest::Client,
558 : }
559 :
560 : /// Callers may give us a list of these when asking us to send a bulk batch
561 : /// of notifications in the background. This is a 'notification' in the sense of
562 : /// other code notifying us of a shard's status, rather than being the final notification
563 : /// that we send upwards to the control plane for the whole tenant.
564 : pub(crate) struct ShardUpdate<'a> {
565 : pub(crate) tenant_shard_id: TenantShardId,
566 : pub(crate) node_id: NodeId,
567 : pub(crate) stripe_size: ShardStripeSize,
568 : pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
569 : }
570 :
571 : pub(crate) struct SafekeepersUpdate {
572 : pub(crate) tenant_id: TenantId,
573 : pub(crate) timeline_id: TimelineId,
574 : pub(crate) generation: SafekeeperGeneration,
575 : pub(crate) safekeepers: Vec<SafekeeperInfo>,
576 : }
577 :
578 : impl ComputeHook {
579 0 : pub(super) fn new(config: Config) -> anyhow::Result<Self> {
580 0 : let authorization_header = config
581 0 : .control_plane_jwt_token
582 0 : .clone()
583 0 : .map(|jwt| format!("Bearer {jwt}"));
584 :
585 0 : let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
586 0 : for cert in &config.ssl_ca_certs {
587 0 : client = client.add_root_certificate(cert.clone());
588 0 : }
589 0 : let client = client
590 0 : .build()
591 0 : .context("Failed to build http client for compute hook")?;
592 :
593 0 : Ok(Self {
594 0 : tenants: Default::default(),
595 0 : timelines: Default::default(),
596 0 : config,
597 0 : authorization_header,
598 0 : neon_local_lock: Default::default(),
599 0 : api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
600 0 : client,
601 0 : })
602 0 : }
603 :
604 : /// For test environments: use neon_local's LocalEnv to update compute
605 0 : async fn do_notify_local<M: ApiMethod>(&self, req: &M::Request) -> Result<(), NotifyError> {
606 : // neon_local updates are not safe to call concurrently, use a lock to serialize
607 : // all calls to this function
608 0 : let _locked = self.neon_local_lock.lock().await;
609 :
610 0 : let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
611 0 : tracing::warn!(
612 0 : "neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
613 : );
614 0 : return Ok(());
615 : };
616 0 : let env = match LocalEnv::load_config(repo_dir) {
617 0 : Ok(e) => e,
618 0 : Err(e) => {
619 0 : tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
620 0 : return Ok(());
621 : }
622 : };
623 0 : let cplane =
624 0 : ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
625 :
626 0 : M::notify_local(&env, &cplane, req).await
627 0 : }
628 :
629 0 : async fn do_notify_iteration<Req: serde::Serialize + std::fmt::Debug>(
630 0 : &self,
631 0 : url: &String,
632 0 : reconfigure_request: &Req,
633 0 : cancel: &CancellationToken,
634 0 : ) -> Result<(), NotifyError> {
635 0 : let req = self.client.request(reqwest::Method::PUT, url);
636 0 : let req = if let Some(value) = &self.authorization_header {
637 0 : req.header(reqwest::header::AUTHORIZATION, value)
638 : } else {
639 0 : req
640 : };
641 :
642 0 : tracing::info!(
643 0 : "Sending notify request to {} ({:?})",
644 : url,
645 : reconfigure_request
646 : );
647 0 : let send_result = req.json(&reconfigure_request).send().await;
648 0 : let response = match send_result {
649 0 : Ok(r) => r,
650 0 : Err(e) => return Err(e.into()),
651 : };
652 :
653 : // Treat all 2xx responses as success
654 0 : if response.status().is_success() {
655 0 : if response.status() != reqwest::StatusCode::OK {
656 : // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
657 : // log a warning.
658 0 : tracing::warn!(
659 0 : "Unexpected 2xx response code {} from control plane",
660 0 : response.status()
661 : );
662 0 : }
663 :
664 0 : return Ok(());
665 0 : }
666 :
667 : // Error response codes
668 0 : match response.status() {
669 : reqwest::StatusCode::TOO_MANY_REQUESTS => {
670 : // TODO: 429 handling should be global: set some state visible to other requests
671 : // so that they will delay before starting, rather than all notifications trying
672 : // once before backing off.
673 0 : tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
674 0 : .await
675 0 : .ok();
676 0 : Err(NotifyError::SlowDown)
677 : }
678 : reqwest::StatusCode::LOCKED => {
679 : // We consider this fatal, because it's possible that the operation blocking the control one is
680 : // also the one that is waiting for this reconcile. We should let the reconciler calling
681 : // this hook fail, to give control plane a chance to un-lock.
682 0 : tracing::info!("Control plane reports tenant is locked, dropping out of notify");
683 0 : Err(NotifyError::Busy)
684 : }
685 : reqwest::StatusCode::SERVICE_UNAVAILABLE => {
686 0 : Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
687 : }
688 : reqwest::StatusCode::GATEWAY_TIMEOUT => {
689 0 : Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
690 : }
691 : reqwest::StatusCode::BAD_GATEWAY => {
692 0 : Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
693 : }
694 :
695 0 : reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
696 0 : reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
697 0 : reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
698 0 : status => Err(NotifyError::Unexpected(
699 0 : hyper::StatusCode::from_u16(status.as_u16())
700 0 : .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
701 0 : )),
702 : }
703 0 : }
704 :
705 0 : async fn do_notify<R: serde::Serialize + std::fmt::Debug>(
706 0 : &self,
707 0 : url: &String,
708 0 : reconfigure_request: &R,
709 0 : cancel: &CancellationToken,
710 0 : ) -> Result<(), NotifyError> {
711 : // We hold these semaphore units across all retries, rather than only across each
712 : // HTTP request: this is to preserve fairness and avoid a situation where a retry might
713 : // time out waiting for a semaphore.
714 0 : let _units = self
715 0 : .api_concurrency
716 0 : .acquire()
717 0 : .await
718 : // Interpret closed semaphore as shutdown
719 0 : .map_err(|_| NotifyError::ShuttingDown)?;
720 :
721 0 : backoff::retry(
722 0 : || self.do_notify_iteration(url, reconfigure_request, cancel),
723 0 : |e| {
724 0 : matches!(
725 0 : e,
726 : NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
727 : )
728 0 : },
729 : 3,
730 : 10,
731 0 : "Send compute notification",
732 0 : cancel,
733 : )
734 0 : .await
735 0 : .ok_or_else(|| NotifyError::ShuttingDown)
736 0 : .and_then(|x| x)
737 0 : }
738 :
739 : /// Synchronous phase: update the per-tenant state for the next intended notification
740 0 : fn notify_attach_prepare(&self, shard_update: ShardUpdate) -> MaybeSendNotifyAttachResult {
741 0 : let mut tenants_locked = self.tenants.lock().unwrap();
742 :
743 : use std::collections::hash_map::Entry;
744 0 : let tenant_shard_id = shard_update.tenant_shard_id;
745 :
746 0 : let tenant = match tenants_locked.entry(tenant_shard_id.tenant_id) {
747 0 : Entry::Vacant(e) => {
748 : let ShardUpdate {
749 0 : tenant_shard_id,
750 0 : node_id,
751 0 : stripe_size,
752 0 : preferred_az,
753 0 : } = shard_update;
754 0 : e.insert(ComputeHookTenant::new(
755 0 : tenant_shard_id,
756 0 : stripe_size,
757 0 : preferred_az.map(|az| az.into_owned()),
758 0 : node_id,
759 : ))
760 : }
761 0 : Entry::Occupied(e) => {
762 0 : let tenant = e.into_mut();
763 0 : tenant.update(shard_update);
764 0 : tenant
765 : }
766 : };
767 0 : tenant.maybe_send(tenant_shard_id.tenant_id, None)
768 0 : }
769 :
770 0 : fn notify_safekeepers_prepare(
771 0 : &self,
772 0 : safekeepers_update: SafekeepersUpdate,
773 0 : ) -> MaybeSendNotifySafekeepersResult {
774 0 : let mut timelines_locked = self.timelines.lock().unwrap();
775 :
776 0 : let ttid = TenantTimelineId {
777 0 : tenant_id: safekeepers_update.tenant_id,
778 0 : timeline_id: safekeepers_update.timeline_id,
779 0 : };
780 :
781 : use std::collections::hash_map::Entry;
782 0 : let timeline = match timelines_locked.entry(ttid) {
783 0 : Entry::Vacant(e) => e.insert(ComputeHookTimeline::new(
784 0 : safekeepers_update.generation,
785 0 : safekeepers_update.safekeepers,
786 : )),
787 0 : Entry::Occupied(e) => {
788 0 : let timeline = e.into_mut();
789 0 : timeline.update(safekeepers_update);
790 0 : timeline
791 : }
792 : };
793 :
794 0 : timeline.maybe_send(ttid, None)
795 0 : }
796 :
797 0 : async fn notify_execute<M: ApiMethod>(
798 0 : &self,
799 0 : state: &std::sync::Mutex<HashMap<M::Key, M>>,
800 0 : maybe_send_result: MaybeSendResult<M::Request, M::Key>,
801 0 : cancel: &CancellationToken,
802 0 : ) -> Result<(), NotifyError> {
803 : // Process result: we may get an update to send, or we may have to wait for a lock
804 : // before trying again.
805 0 : let (request, mut send_lock_guard) = match maybe_send_result {
806 : MaybeSendResult::Noop => {
807 0 : return Ok(());
808 : }
809 0 : MaybeSendResult::AwaitLock((key, send_lock)) => {
810 0 : let send_locked = tokio::select! {
811 0 : guard = send_lock.lock_owned() => {guard},
812 0 : _ = cancel.cancelled() => {
813 0 : return Err(NotifyError::ShuttingDown)
814 : }
815 : };
816 :
817 : // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
818 : // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
819 : // try_lock.
820 0 : let state_locked = state.lock().unwrap();
821 0 : let Some(resource_state) = state_locked.get(&key) else {
822 0 : return Ok(());
823 : };
824 0 : match resource_state.maybe_send(key, Some(send_locked)) {
825 : MaybeSendResult::AwaitLock(_) => {
826 0 : unreachable!("We supplied lock guard")
827 : }
828 : MaybeSendResult::Noop => {
829 0 : return Ok(());
830 : }
831 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
832 : }
833 : }
834 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
835 : };
836 :
837 0 : let result = if !self.config.use_local_compute_notifications {
838 0 : let compute_hook_url =
839 0 : self.config
840 0 : .control_plane_url
841 0 : .as_ref()
842 0 : .map(|control_plane_url| {
843 0 : format!(
844 0 : "{}/{}",
845 0 : control_plane_url.trim_end_matches('/'),
846 : M::API_PATH
847 : )
848 0 : });
849 :
850 : // We validate this at startup
851 0 : let notify_url = compute_hook_url.as_ref().unwrap();
852 0 : self.do_notify(notify_url, &request, cancel).await
853 : } else {
854 0 : self.do_notify_local::<M>(&request).await.map_err(|e| {
855 : // This path is for testing only, so munge the error into our prod-style error type.
856 0 : tracing::error!("neon_local notification hook failed: {e}");
857 0 : NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR)
858 0 : })
859 : };
860 :
861 0 : match result {
862 0 : Ok(_) => {
863 0 : // Before dropping the send lock, stash the request we just sent so that
864 0 : // subsequent callers can avoid redundantly re-sending the same thing.
865 0 : *send_lock_guard = Some(ComputeRemoteState {
866 0 : request,
867 0 : applied: true,
868 0 : });
869 0 : }
870 0 : Err(NotifyError::Busy) => {
871 0 : // Busy result means that the server responded and has stored the new configuration,
872 0 : // but was not able to fully apply it to the compute
873 0 : *send_lock_guard = Some(ComputeRemoteState {
874 0 : request,
875 0 : applied: false,
876 0 : });
877 0 : }
878 0 : Err(_) => {
879 0 : // General error case: we can no longer know the remote state, so clear it. This will result in
880 0 : // the logic in maybe_send recognizing that we should call the hook again.
881 0 : *send_lock_guard = None;
882 0 : }
883 : }
884 0 : result
885 0 : }
886 :
887 : /// Infallible synchronous fire-and-forget version of notify(), that sends its results to
888 : /// a channel. Something should consume the channel and arrange to try notifying again
889 : /// if something failed.
890 0 : pub(super) fn notify_attach_background(
891 0 : self: &Arc<Self>,
892 0 : notifications: Vec<ShardUpdate>,
893 0 : result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
894 0 : cancel: &CancellationToken,
895 0 : ) {
896 0 : let mut maybe_sends = Vec::new();
897 0 : for shard_update in notifications {
898 0 : let tenant_shard_id = shard_update.tenant_shard_id;
899 0 : let maybe_send_result = self.notify_attach_prepare(shard_update);
900 0 : maybe_sends.push((tenant_shard_id, maybe_send_result))
901 : }
902 :
903 0 : let this = self.clone();
904 0 : let cancel = cancel.clone();
905 :
906 0 : tokio::task::spawn(async move {
907 : // Construct an async stream of futures to invoke the compute notify function: we do this
908 : // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. The
909 : // ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
910 : // would mostly just be waiting on that semaphore.
911 0 : let mut stream = futures::stream::iter(maybe_sends)
912 0 : .map(|(tenant_shard_id, maybe_send_result)| {
913 0 : let this = this.clone();
914 0 : let cancel = cancel.clone();
915 :
916 0 : async move {
917 0 : this
918 0 : .notify_execute(&this.tenants, maybe_send_result, &cancel)
919 0 : .await.map_err(|e| (tenant_shard_id, e))
920 0 : }.instrument(info_span!(
921 0 : "notify_attach_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
922 : ))
923 0 : })
924 0 : .buffered(API_CONCURRENCY);
925 :
926 : loop {
927 0 : tokio::select! {
928 0 : next = stream.next() => {
929 0 : match next {
930 0 : Some(r) => {
931 0 : result_tx.send(r).await.ok();
932 : },
933 : None => {
934 0 : tracing::info!("Finished sending background compute notifications");
935 0 : break;
936 : }
937 : }
938 : },
939 0 : _ = cancel.cancelled() => {
940 0 : tracing::info!("Shutdown while running background compute notifications");
941 0 : break;
942 : }
943 : };
944 : }
945 0 : });
946 0 : }
947 :
948 : /// Call this to notify the compute (postgres) tier of new pageservers to use
949 : /// for a tenant. notify() is called by each shard individually, and this function
950 : /// will decide whether an update to the tenant is sent. An update is sent on the
951 : /// condition that:
952 : /// - We know a pageserver for every shard.
953 : /// - All the shards have the same shard_count (i.e. we are not mid-split)
954 : ///
955 : /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
956 : /// that is cancelled.
957 : ///
958 : /// This function is fallible, including in the case that the control plane is transiently
959 : /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
960 : /// periods, but we don't retry forever. The **caller** is responsible for handling failures and
961 : /// ensuring that they eventually call again to ensure that the compute is eventually notified of
962 : /// the proper pageserver nodes for a tenant.
963 : #[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
964 : pub(super) async fn notify_attach<'a>(
965 : &self,
966 : shard_update: ShardUpdate<'a>,
967 : cancel: &CancellationToken,
968 : ) -> Result<(), NotifyError> {
969 : let maybe_send_result = self.notify_attach_prepare(shard_update);
970 : self.notify_execute(&self.tenants, maybe_send_result, cancel)
971 : .await
972 : }
973 :
974 0 : pub(super) async fn notify_safekeepers(
975 0 : &self,
976 0 : safekeepers_update: SafekeepersUpdate,
977 0 : cancel: &CancellationToken,
978 0 : ) -> Result<(), NotifyError> {
979 0 : let maybe_send_result = self.notify_safekeepers_prepare(safekeepers_update);
980 0 : self.notify_execute(&self.timelines, maybe_send_result, cancel)
981 0 : .await
982 0 : }
983 :
984 : /// Reflect a detach for a particular shard in the compute hook state.
985 : ///
986 : /// The goal is to avoid sending compute notifications with stale information (i.e.
987 : /// including detach pageservers).
988 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
989 : pub(super) fn handle_detach(
990 : &self,
991 : tenant_shard_id: TenantShardId,
992 : stripe_size: ShardStripeSize,
993 : ) {
994 : use std::collections::hash_map::Entry;
995 :
996 : let mut tenants_locked = self.tenants.lock().unwrap();
997 : match tenants_locked.entry(tenant_shard_id.tenant_id) {
998 : Entry::Vacant(_) => {
999 : // This is a valid but niche case, where the tenant was previously attached
1000 : // as a Secondary location and then detached, so has no previously notified
1001 : // state.
1002 : tracing::info!("Compute hook tenant not found for detach");
1003 : }
1004 : Entry::Occupied(mut e) => {
1005 : let sharded = e.get().is_sharded();
1006 : if !sharded {
1007 : e.remove();
1008 : } else {
1009 : e.get_mut().remove_shard(tenant_shard_id, stripe_size);
1010 : }
1011 :
1012 : tracing::debug!("Compute hook handled shard detach");
1013 : }
1014 : }
1015 : }
1016 : }
1017 :
1018 : #[cfg(test)]
1019 : pub(crate) mod tests {
1020 : use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber};
1021 : use utils::id::TenantId;
1022 :
1023 : use super::*;
1024 :
1025 : #[test]
1026 1 : fn tenant_updates() -> anyhow::Result<()> {
1027 1 : let tenant_id = TenantId::generate();
1028 1 : let stripe_size = DEFAULT_STRIPE_SIZE;
1029 1 : let mut tenant_state = ComputeHookTenant::new(
1030 1 : TenantShardId {
1031 1 : tenant_id,
1032 1 : shard_count: ShardCount::new(0),
1033 1 : shard_number: ShardNumber(0),
1034 1 : },
1035 1 : ShardStripeSize(12345),
1036 1 : None,
1037 1 : NodeId(1),
1038 : );
1039 :
1040 : // An unsharded tenant is always ready to emit a notification, but won't
1041 : // send the same one twice
1042 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
1043 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
1044 0 : anyhow::bail!("Wrong send result");
1045 : };
1046 1 : assert_eq!(request.shards.len(), 1);
1047 1 : assert!(request.stripe_size.is_none());
1048 :
1049 : // Simulate successful send
1050 1 : *guard = Some(ComputeRemoteState {
1051 1 : request,
1052 1 : applied: true,
1053 1 : });
1054 1 : drop(guard);
1055 :
1056 : // Try asking again: this should be a no-op
1057 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
1058 1 : assert!(matches!(send_result, MaybeSendResult::Noop));
1059 :
1060 : // Writing the first shard of a multi-sharded situation (i.e. in a split)
1061 : // resets the tenant state and puts it in an non-notifying state (need to
1062 : // see all shards)
1063 1 : tenant_state.update(ShardUpdate {
1064 1 : tenant_shard_id: TenantShardId {
1065 1 : tenant_id,
1066 1 : shard_count: ShardCount::new(2),
1067 1 : shard_number: ShardNumber(1),
1068 1 : },
1069 1 : stripe_size,
1070 1 : preferred_az: None,
1071 1 : node_id: NodeId(1),
1072 1 : });
1073 1 : assert!(matches!(
1074 1 : tenant_state.maybe_send(tenant_id, None),
1075 : MaybeSendResult::Noop
1076 : ));
1077 :
1078 : // Writing the second shard makes it ready to notify
1079 1 : tenant_state.update(ShardUpdate {
1080 1 : tenant_shard_id: TenantShardId {
1081 1 : tenant_id,
1082 1 : shard_count: ShardCount::new(2),
1083 1 : shard_number: ShardNumber(0),
1084 1 : },
1085 1 : stripe_size,
1086 1 : preferred_az: None,
1087 1 : node_id: NodeId(1),
1088 1 : });
1089 :
1090 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
1091 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
1092 0 : anyhow::bail!("Wrong send result");
1093 : };
1094 1 : assert_eq!(request.shards.len(), 2);
1095 1 : assert_eq!(request.stripe_size, Some(stripe_size));
1096 :
1097 : // Simulate successful send
1098 1 : *guard = Some(ComputeRemoteState {
1099 1 : request,
1100 1 : applied: true,
1101 1 : });
1102 1 : drop(guard);
1103 :
1104 1 : Ok(())
1105 1 : }
1106 : }
|