Line data Source code
1 : use std::borrow::Cow;
2 : use std::collections::HashMap;
3 : use std::error::Error as _;
4 : use std::sync::Arc;
5 : use std::time::Duration;
6 :
7 : use anyhow::Context;
8 : use compute_api::spec::PageserverProtocol;
9 : use compute_api::spec::PageserverShardInfo;
10 : use control_plane::endpoint::{
11 : ComputeControlPlane, EndpointStatus, PageserverConnectionInfo, PageserverShardConnectionInfo,
12 : };
13 : use control_plane::local_env::LocalEnv;
14 : use futures::StreamExt;
15 : use hyper::StatusCode;
16 : use pageserver_api::config::DEFAULT_GRPC_LISTEN_PORT;
17 : use pageserver_api::controller_api::AvailabilityZone;
18 : use pageserver_api::shard::{ShardCount, ShardIndex, ShardNumber, ShardStripeSize, TenantShardId};
19 : use postgres_connection::parse_host_port;
20 : use safekeeper_api::membership::SafekeeperGeneration;
21 : use serde::{Deserialize, Serialize};
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::{Instrument, info_span};
24 : use utils::backoff::{self};
25 : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
26 :
27 : use crate::service::Config;
28 :
29 : const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
30 :
31 : const NOTIFY_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
32 :
33 : pub(crate) const API_CONCURRENCY: usize = 32;
34 :
35 : struct UnshardedComputeHookTenant {
36 : // Which node is this tenant attached to
37 : node_id: NodeId,
38 :
39 : // The tenant's preferred AZ, so that we may pass this on to the control plane
40 : preferred_az: Option<AvailabilityZone>,
41 :
42 : // Must hold this lock to send a notification.
43 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>>,
44 : }
45 : struct ShardedComputeHookTenant {
46 : stripe_size: ShardStripeSize,
47 : shard_count: ShardCount,
48 : shards: Vec<(ShardNumber, NodeId)>,
49 :
50 : // The tenant's preferred AZ, so that we may pass this on to the control plane
51 : preferred_az: Option<AvailabilityZone>,
52 :
53 : // Must hold this lock to send a notification. The contents represent
54 : // the last successfully sent notification, and are used to coalesce multiple
55 : // updates by only sending when there is a chance since our last successful send.
56 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>>,
57 : }
58 :
59 : /// Represents our knowledge of the compute's state: we can update this when we get a
60 : /// response from a notify API call, which tells us what has been applied.
61 : ///
62 : /// Should be wrapped in an Option<>, as we cannot always know the remote state.
63 : #[derive(PartialEq, Eq, Debug)]
64 : struct ComputeRemoteState<R> {
65 : // The request body which was acked by the compute
66 : request: R,
67 :
68 : // Whether the cplane indicated that the state was applied to running computes, or just
69 : // persisted. In the Neon control plane, this is the difference between a 423 response (meaning
70 : // persisted but not applied), and a 2xx response (both persisted and applied)
71 : applied: bool,
72 : }
73 :
74 : type ComputeRemoteTenantState = ComputeRemoteState<NotifyAttachRequest>;
75 : type ComputeRemoteTimelineState = ComputeRemoteState<NotifySafekeepersRequest>;
76 :
77 : /// The trait which define the handler-specific types and methods.
78 : /// We have two implementations of this trait so far:
79 : /// - [`ComputeHookTenant`] for tenant attach notifications ("/notify-attach")
80 : /// - [`ComputeHookTimeline`] for safekeeper change notifications ("/notify-safekeepers")
81 : trait ApiMethod {
82 : /// Type of the key which identifies the resource.
83 : /// It's either TenantId for tenant attach notifications,
84 : /// or TenantTimelineId for safekeeper change notifications.
85 : type Key: std::cmp::Eq + std::hash::Hash + Clone;
86 :
87 : type Request: serde::Serialize + std::fmt::Debug;
88 :
89 : const API_PATH: &'static str;
90 :
91 : fn maybe_send(
92 : &self,
93 : key: Self::Key,
94 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState<Self::Request>>>>,
95 : ) -> MaybeSendResult<Self::Request, Self::Key>;
96 :
97 : async fn notify_local(
98 : env: &LocalEnv,
99 : cplane: &ComputeControlPlane,
100 : req: &Self::Request,
101 : ) -> Result<(), NotifyError>;
102 : }
103 :
104 : enum ComputeHookTenant {
105 : Unsharded(UnshardedComputeHookTenant),
106 : Sharded(ShardedComputeHookTenant),
107 : }
108 :
109 : impl ComputeHookTenant {
110 : /// Construct with at least one shard's information
111 2 : fn new(
112 2 : tenant_shard_id: TenantShardId,
113 2 : stripe_size: ShardStripeSize,
114 2 : preferred_az: Option<AvailabilityZone>,
115 2 : node_id: NodeId,
116 2 : ) -> Self {
117 2 : if tenant_shard_id.shard_count.count() > 1 {
118 1 : Self::Sharded(ShardedComputeHookTenant {
119 1 : shards: vec![(tenant_shard_id.shard_number, node_id)],
120 1 : stripe_size,
121 1 : shard_count: tenant_shard_id.shard_count,
122 1 : preferred_az,
123 1 : send_lock: Arc::default(),
124 1 : })
125 : } else {
126 1 : Self::Unsharded(UnshardedComputeHookTenant {
127 1 : node_id,
128 1 : preferred_az,
129 1 : send_lock: Arc::default(),
130 1 : })
131 : }
132 2 : }
133 :
134 4 : fn get_send_lock(&self) -> &Arc<tokio::sync::Mutex<Option<ComputeRemoteTenantState>>> {
135 4 : match self {
136 2 : Self::Unsharded(unsharded_tenant) => &unsharded_tenant.send_lock,
137 2 : Self::Sharded(sharded_tenant) => &sharded_tenant.send_lock,
138 : }
139 4 : }
140 :
141 0 : fn is_sharded(&self) -> bool {
142 0 : matches!(self, ComputeHookTenant::Sharded(_))
143 0 : }
144 :
145 : /// Clear compute hook state for the specified shard.
146 : /// Only valid for [`ComputeHookTenant::Sharded`] instances.
147 0 : fn remove_shard(&mut self, tenant_shard_id: TenantShardId, stripe_size: ShardStripeSize) {
148 0 : match self {
149 0 : ComputeHookTenant::Sharded(sharded) => {
150 0 : if sharded.stripe_size != stripe_size
151 0 : || sharded.shard_count != tenant_shard_id.shard_count
152 : {
153 0 : tracing::warn!("Shard split detected while handling detach")
154 0 : }
155 :
156 0 : let shard_idx = sharded.shards.iter().position(|(shard_number, _node_id)| {
157 0 : *shard_number == tenant_shard_id.shard_number
158 0 : });
159 :
160 0 : if let Some(shard_idx) = shard_idx {
161 0 : sharded.shards.remove(shard_idx);
162 0 : } else {
163 : // This is a valid but niche case, where the tenant was previously attached
164 : // as a Secondary location and then detached, so has no previously notified
165 : // state.
166 0 : tracing::info!("Shard not found while handling detach")
167 : }
168 : }
169 : ComputeHookTenant::Unsharded(_) => {
170 0 : unreachable!("Detach of unsharded tenants is handled externally");
171 : }
172 : }
173 0 : }
174 :
175 : /// Set one shard's location. If stripe size or shard count have changed, Self is reset
176 : /// and drops existing content.
177 2 : fn update(&mut self, shard_update: ShardUpdate) {
178 2 : let tenant_shard_id = shard_update.tenant_shard_id;
179 2 : let node_id = shard_update.node_id;
180 2 : let stripe_size = shard_update.stripe_size;
181 2 : let preferred_az = shard_update.preferred_az;
182 :
183 1 : match self {
184 1 : Self::Unsharded(unsharded_tenant) if tenant_shard_id.shard_count.count() == 1 => {
185 0 : unsharded_tenant.node_id = node_id;
186 0 : if unsharded_tenant.preferred_az.as_ref()
187 0 : != preferred_az.as_ref().map(|az| az.as_ref())
188 : {
189 0 : unsharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
190 0 : }
191 : }
192 1 : Self::Sharded(sharded_tenant)
193 1 : if sharded_tenant.stripe_size == stripe_size
194 1 : && sharded_tenant.shard_count == tenant_shard_id.shard_count =>
195 : {
196 1 : if let Some(existing) = sharded_tenant
197 1 : .shards
198 1 : .iter()
199 1 : .position(|s| s.0 == tenant_shard_id.shard_number)
200 0 : {
201 0 : sharded_tenant.shards.get_mut(existing).unwrap().1 = node_id;
202 0 : } else {
203 1 : sharded_tenant
204 1 : .shards
205 1 : .push((tenant_shard_id.shard_number, node_id));
206 1 : sharded_tenant.shards.sort_by_key(|s| s.0)
207 : }
208 :
209 1 : if sharded_tenant.preferred_az.as_ref()
210 1 : != preferred_az.as_ref().map(|az| az.as_ref())
211 : {
212 0 : sharded_tenant.preferred_az = preferred_az.map(|az| az.as_ref().clone());
213 1 : }
214 : }
215 : _ => {
216 : // Shard count changed: reset struct.
217 1 : *self = Self::new(
218 1 : tenant_shard_id,
219 1 : stripe_size,
220 1 : preferred_az.map(|az| az.into_owned()),
221 1 : node_id,
222 : );
223 : }
224 : }
225 2 : }
226 : }
227 :
228 : /// The state of a timeline we need to notify the compute about.
229 : struct ComputeHookTimeline {
230 : generation: SafekeeperGeneration,
231 : safekeepers: Vec<SafekeeperInfo>,
232 :
233 : send_lock: Arc<tokio::sync::Mutex<Option<ComputeRemoteTimelineState>>>,
234 : }
235 :
236 : impl ComputeHookTimeline {
237 : /// Construct a new ComputeHookTimeline with the given safekeepers and generation.
238 0 : fn new(generation: SafekeeperGeneration, safekeepers: Vec<SafekeeperInfo>) -> Self {
239 0 : Self {
240 0 : generation,
241 0 : safekeepers,
242 0 : send_lock: Arc::default(),
243 0 : }
244 0 : }
245 :
246 : /// Update the state with a new SafekeepersUpdate.
247 : /// Noop if the update generation is not greater than the current generation.
248 0 : fn update(&mut self, sk_update: SafekeepersUpdate) {
249 0 : if sk_update.generation > self.generation {
250 0 : self.generation = sk_update.generation;
251 0 : self.safekeepers = sk_update.safekeepers;
252 0 : }
253 0 : }
254 : }
255 :
256 : impl ApiMethod for ComputeHookTimeline {
257 : type Key = TenantTimelineId;
258 : type Request = NotifySafekeepersRequest;
259 :
260 : const API_PATH: &'static str = "notify-safekeepers";
261 :
262 0 : fn maybe_send(
263 0 : &self,
264 0 : ttid: TenantTimelineId,
265 0 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteTimelineState>>>,
266 0 : ) -> MaybeSendNotifySafekeepersResult {
267 0 : let locked = match lock {
268 0 : Some(already_locked) => already_locked,
269 : None => {
270 : // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::timelines`] lock.
271 0 : let Ok(locked) = self.send_lock.clone().try_lock_owned() else {
272 0 : return MaybeSendResult::AwaitLock((ttid, self.send_lock.clone()));
273 : };
274 0 : locked
275 : }
276 : };
277 :
278 0 : if locked
279 0 : .as_ref()
280 0 : .is_some_and(|s| s.request.generation >= self.generation)
281 : {
282 0 : return MaybeSendResult::Noop;
283 0 : }
284 :
285 0 : MaybeSendResult::Transmit((
286 0 : NotifySafekeepersRequest {
287 0 : tenant_id: ttid.tenant_id,
288 0 : timeline_id: ttid.timeline_id,
289 0 : generation: self.generation,
290 0 : safekeepers: self.safekeepers.clone(),
291 0 : },
292 0 : locked,
293 0 : ))
294 0 : }
295 :
296 0 : async fn notify_local(
297 0 : _env: &LocalEnv,
298 0 : cplane: &ComputeControlPlane,
299 0 : req: &NotifySafekeepersRequest,
300 0 : ) -> Result<(), NotifyError> {
301 : let NotifySafekeepersRequest {
302 0 : tenant_id,
303 0 : timeline_id,
304 0 : generation,
305 0 : safekeepers,
306 0 : } = req;
307 :
308 0 : for (endpoint_name, endpoint) in &cplane.endpoints {
309 0 : if endpoint.tenant_id == *tenant_id
310 0 : && endpoint.timeline_id == *timeline_id
311 0 : && endpoint.status() == EndpointStatus::Running
312 : {
313 0 : tracing::info!("Reconfiguring safekeepers for endpoint {endpoint_name}");
314 :
315 0 : let safekeepers = safekeepers.iter().map(|sk| sk.id).collect::<Vec<_>>();
316 :
317 0 : endpoint
318 0 : .reconfigure_safekeepers(safekeepers, *generation)
319 0 : .await
320 0 : .map_err(NotifyError::NeonLocal)?;
321 0 : }
322 : }
323 :
324 0 : Ok(())
325 0 : }
326 : }
327 :
328 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
329 : struct NotifyAttachRequestShard {
330 : node_id: NodeId,
331 : shard_number: ShardNumber,
332 : }
333 :
334 : /// Request body that we send to the control plane to notify it of where a tenant is attached
335 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
336 : struct NotifyAttachRequest {
337 : tenant_id: TenantId,
338 : preferred_az: Option<String>,
339 : stripe_size: Option<ShardStripeSize>,
340 : shards: Vec<NotifyAttachRequestShard>,
341 : }
342 :
343 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
344 : pub(crate) struct SafekeeperInfo {
345 : pub id: NodeId,
346 : /// Hostname of the safekeeper.
347 : /// It exists for better debuggability. Might be missing.
348 : /// Should not be used for anything else.
349 : pub hostname: Option<String>,
350 : }
351 :
352 0 : #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
353 : struct NotifySafekeepersRequest {
354 : tenant_id: TenantId,
355 : timeline_id: TimelineId,
356 : generation: SafekeeperGeneration,
357 : safekeepers: Vec<SafekeeperInfo>,
358 : }
359 :
360 : /// Error type for attempts to call into the control plane compute notification hook
361 : #[derive(thiserror::Error, Debug)]
362 : pub(crate) enum NotifyError {
363 : // Request was not send successfully, e.g. transport error
364 0 : #[error("Sending request: {0}{}", .0.source().map(|e| format!(": {e}")).unwrap_or_default())]
365 : Request(#[from] reqwest::Error),
366 : // Request could not be serviced right now due to ongoing Operation in control plane, but should be possible soon.
367 : #[error("Control plane tenant busy")]
368 : Busy,
369 : // Explicit 429 response asking us to retry less frequently
370 : #[error("Control plane overloaded")]
371 : SlowDown,
372 : // A 503 response indicates the control plane can't handle the request right now
373 : #[error("Control plane unavailable (status {0})")]
374 : Unavailable(StatusCode),
375 : // API returned unexpected non-success status. We will retry, but log a warning.
376 : #[error("Control plane returned unexpected status {0}")]
377 : Unexpected(StatusCode),
378 : // We shutdown while sending
379 : #[error("Shutting down")]
380 : ShuttingDown,
381 : // A response indicates we will never succeed, such as 400 or 403
382 : #[error("Non-retryable error {0}")]
383 : Fatal(StatusCode),
384 :
385 : #[error("neon_local error: {0}")]
386 : NeonLocal(anyhow::Error),
387 : }
388 :
389 : enum MaybeSendResult<R, K> {
390 : // Please send this request while holding the lock, and if you succeed then write
391 : // the request into the lock.
392 : Transmit(
393 : (
394 : R,
395 : tokio::sync::OwnedMutexGuard<Option<ComputeRemoteState<R>>>,
396 : ),
397 : ),
398 : // Something requires sending, but you must wait for a current sender then call again
399 : AwaitLock((K, Arc<tokio::sync::Mutex<Option<ComputeRemoteState<R>>>>)),
400 : // Nothing requires sending
401 : Noop,
402 : }
403 :
404 : type MaybeSendNotifyAttachResult = MaybeSendResult<NotifyAttachRequest, TenantId>;
405 : type MaybeSendNotifySafekeepersResult = MaybeSendResult<NotifySafekeepersRequest, TenantTimelineId>;
406 :
407 : impl ApiMethod for ComputeHookTenant {
408 : type Key = TenantId;
409 : type Request = NotifyAttachRequest;
410 :
411 : const API_PATH: &'static str = "notify-attach";
412 :
413 4 : fn maybe_send(
414 4 : &self,
415 4 : tenant_id: TenantId,
416 4 : lock: Option<tokio::sync::OwnedMutexGuard<Option<ComputeRemoteTenantState>>>,
417 4 : ) -> MaybeSendNotifyAttachResult {
418 4 : let locked = match lock {
419 0 : Some(already_locked) => already_locked,
420 : None => {
421 : // Lock order: this _must_ be only a try_lock, because we are called inside of the [`ComputeHook::tenants`] lock.
422 4 : let Ok(locked) = self.get_send_lock().clone().try_lock_owned() else {
423 0 : return MaybeSendResult::AwaitLock((tenant_id, self.get_send_lock().clone()));
424 : };
425 4 : locked
426 : }
427 : };
428 :
429 4 : let request = match self {
430 2 : Self::Unsharded(unsharded_tenant) => Some(NotifyAttachRequest {
431 2 : tenant_id,
432 2 : shards: vec![NotifyAttachRequestShard {
433 2 : shard_number: ShardNumber(0),
434 2 : node_id: unsharded_tenant.node_id,
435 2 : }],
436 2 : stripe_size: None,
437 2 : preferred_az: unsharded_tenant
438 2 : .preferred_az
439 2 : .as_ref()
440 2 : .map(|az| az.0.clone()),
441 : }),
442 2 : Self::Sharded(sharded_tenant)
443 2 : if sharded_tenant.shards.len() == sharded_tenant.shard_count.count() as usize =>
444 : {
445 : Some(NotifyAttachRequest {
446 1 : tenant_id,
447 1 : shards: sharded_tenant
448 1 : .shards
449 1 : .iter()
450 1 : .map(|(shard_number, node_id)| NotifyAttachRequestShard {
451 2 : shard_number: *shard_number,
452 2 : node_id: *node_id,
453 2 : })
454 1 : .collect(),
455 1 : stripe_size: Some(sharded_tenant.stripe_size),
456 1 : preferred_az: sharded_tenant.preferred_az.as_ref().map(|az| az.0.clone()),
457 : })
458 : }
459 1 : Self::Sharded(sharded_tenant) => {
460 : // Sharded tenant doesn't yet have information for all its shards
461 :
462 1 : tracing::info!(
463 0 : "ComputeHookTenant::maybe_send: not enough shards ({}/{})",
464 0 : sharded_tenant.shards.len(),
465 0 : sharded_tenant.shard_count.count()
466 : );
467 1 : None
468 : }
469 : };
470 :
471 3 : match request {
472 : None => {
473 : // Not yet ready to emit a notification
474 1 : tracing::info!("Tenant isn't yet ready to emit a notification");
475 1 : MaybeSendResult::Noop
476 : }
477 1 : Some(request)
478 3 : if Some(&request) == locked.as_ref().map(|s| &s.request)
479 1 : && locked.as_ref().map(|s| s.applied).unwrap_or(false) =>
480 : {
481 1 : tracing::info!(
482 0 : "Skipping notification because remote state already matches ({:?})",
483 0 : &request
484 : );
485 : // No change from the last value successfully sent, and our state indicates that the last
486 : // value sent was fully applied on the control plane side.
487 1 : MaybeSendResult::Noop
488 : }
489 2 : Some(request) => {
490 : // Our request differs from the last one sent, or the last one sent was not fully applied on the compute side
491 2 : MaybeSendResult::Transmit((request, locked))
492 : }
493 : }
494 4 : }
495 :
496 0 : async fn notify_local(
497 0 : env: &LocalEnv,
498 0 : cplane: &ComputeControlPlane,
499 0 : req: &NotifyAttachRequest,
500 0 : ) -> Result<(), NotifyError> {
501 : let NotifyAttachRequest {
502 0 : tenant_id,
503 0 : shards,
504 0 : stripe_size,
505 0 : preferred_az: _preferred_az,
506 0 : } = req;
507 :
508 0 : for (endpoint_name, endpoint) in &cplane.endpoints {
509 0 : if endpoint.tenant_id == *tenant_id && endpoint.status() == EndpointStatus::Running {
510 0 : tracing::info!("Reconfiguring pageservers for endpoint {endpoint_name}");
511 :
512 0 : let shard_count = match shards.len() {
513 0 : 1 => ShardCount::unsharded(),
514 0 : n => ShardCount(n.try_into().expect("too many shards")),
515 : };
516 :
517 0 : let mut shard_infos: HashMap<ShardIndex, PageserverShardInfo> = HashMap::new();
518 :
519 0 : let prefer_protocol = if endpoint.grpc {
520 0 : PageserverProtocol::Grpc
521 : } else {
522 0 : PageserverProtocol::Libpq
523 : };
524 :
525 0 : for shard in shards.iter() {
526 0 : let ps_conf = env
527 0 : .get_pageserver_conf(shard.node_id)
528 0 : .expect("Unknown pageserver");
529 :
530 0 : let libpq_url = Some({
531 0 : let (host, port) = parse_host_port(&ps_conf.listen_pg_addr)
532 0 : .expect("Unable to parse listen_pg_addr");
533 0 : let port = port.unwrap_or(5432);
534 0 : format!("postgres://no_user@{host}:{port}")
535 0 : });
536 0 : let grpc_url = if let Some(grpc_addr) = &ps_conf.listen_grpc_addr {
537 0 : let (host, port) =
538 0 : parse_host_port(grpc_addr).expect("invalid gRPC address");
539 0 : let port = port.unwrap_or(DEFAULT_GRPC_LISTEN_PORT);
540 0 : Some(format!("grpc://no_user@{host}:{port}"))
541 : } else {
542 0 : None
543 : };
544 0 : let pageserver = PageserverShardConnectionInfo {
545 0 : id: Some(shard.node_id),
546 0 : libpq_url,
547 0 : grpc_url,
548 0 : };
549 0 : let shard_info = PageserverShardInfo {
550 0 : pageservers: vec![pageserver],
551 0 : };
552 0 : shard_infos.insert(
553 0 : ShardIndex {
554 0 : shard_number: shard.shard_number,
555 0 : shard_count,
556 0 : },
557 0 : shard_info,
558 : );
559 : }
560 :
561 0 : let pageserver_conninfo = PageserverConnectionInfo {
562 0 : shard_count,
563 0 : stripe_size: stripe_size.map(|val| ShardStripeSize(val.0)),
564 0 : shards: shard_infos,
565 0 : prefer_protocol,
566 : };
567 :
568 0 : endpoint
569 0 : .reconfigure_pageservers(&pageserver_conninfo)
570 0 : .await
571 0 : .map_err(NotifyError::NeonLocal)?;
572 0 : }
573 : }
574 :
575 0 : Ok(())
576 0 : }
577 : }
578 :
579 : /// The compute hook is a destination for notifications about changes to tenant:pageserver
580 : /// mapping. It aggregates updates for the shards in a tenant, and when appropriate reconfigures
581 : /// the compute connection string.
582 : pub(super) struct ComputeHook {
583 : config: Config,
584 : tenants: std::sync::Mutex<HashMap<TenantId, ComputeHookTenant>>,
585 : timelines: std::sync::Mutex<HashMap<TenantTimelineId, ComputeHookTimeline>>,
586 : authorization_header: Option<String>,
587 :
588 : // Concurrency limiter, so that we do not overload the cloud control plane when updating
589 : // large numbers of tenants (e.g. when failing over after a node failure)
590 : api_concurrency: tokio::sync::Semaphore,
591 :
592 : // This lock is only used in testing enviroments, to serialize calls into neon_local
593 : neon_local_lock: tokio::sync::Mutex<()>,
594 :
595 : // We share a client across all notifications to enable connection re-use etc when
596 : // sending large numbers of notifications
597 : client: reqwest::Client,
598 : }
599 :
600 : /// Callers may give us a list of these when asking us to send a bulk batch
601 : /// of notifications in the background. This is a 'notification' in the sense of
602 : /// other code notifying us of a shard's status, rather than being the final notification
603 : /// that we send upwards to the control plane for the whole tenant.
604 : pub(crate) struct ShardUpdate<'a> {
605 : pub(crate) tenant_shard_id: TenantShardId,
606 : pub(crate) node_id: NodeId,
607 : pub(crate) stripe_size: ShardStripeSize,
608 : pub(crate) preferred_az: Option<Cow<'a, AvailabilityZone>>,
609 : }
610 :
611 : pub(crate) struct SafekeepersUpdate {
612 : pub(crate) tenant_id: TenantId,
613 : pub(crate) timeline_id: TimelineId,
614 : pub(crate) generation: SafekeeperGeneration,
615 : pub(crate) safekeepers: Vec<SafekeeperInfo>,
616 : }
617 :
618 : impl ComputeHook {
619 0 : pub(super) fn new(config: Config) -> anyhow::Result<Self> {
620 0 : let authorization_header = config
621 0 : .control_plane_jwt_token
622 0 : .clone()
623 0 : .map(|jwt| format!("Bearer {jwt}"));
624 :
625 0 : let mut client = reqwest::ClientBuilder::new().timeout(NOTIFY_REQUEST_TIMEOUT);
626 0 : for cert in &config.ssl_ca_certs {
627 0 : client = client.add_root_certificate(cert.clone());
628 0 : }
629 0 : let client = client
630 0 : .build()
631 0 : .context("Failed to build http client for compute hook")?;
632 :
633 0 : Ok(Self {
634 0 : tenants: Default::default(),
635 0 : timelines: Default::default(),
636 0 : config,
637 0 : authorization_header,
638 0 : neon_local_lock: Default::default(),
639 0 : api_concurrency: tokio::sync::Semaphore::new(API_CONCURRENCY),
640 0 : client,
641 0 : })
642 0 : }
643 :
644 : /// For test environments: use neon_local's LocalEnv to update compute
645 0 : async fn do_notify_local<M: ApiMethod>(&self, req: &M::Request) -> Result<(), NotifyError> {
646 : // neon_local updates are not safe to call concurrently, use a lock to serialize
647 : // all calls to this function
648 0 : let _locked = self.neon_local_lock.lock().await;
649 :
650 0 : let Some(repo_dir) = self.config.neon_local_repo_dir.as_deref() else {
651 0 : tracing::warn!(
652 0 : "neon_local_repo_dir not set, likely a bug in neon_local; skipping compute update"
653 : );
654 0 : return Ok(());
655 : };
656 0 : let env = match LocalEnv::load_config(repo_dir) {
657 0 : Ok(e) => e,
658 0 : Err(e) => {
659 0 : tracing::warn!("Couldn't load neon_local config, skipping compute update ({e})");
660 0 : return Ok(());
661 : }
662 : };
663 0 : let cplane =
664 0 : ComputeControlPlane::load(env.clone()).expect("Error loading compute control plane");
665 :
666 0 : M::notify_local(&env, &cplane, req).await
667 0 : }
668 :
669 0 : async fn do_notify_iteration<Req: serde::Serialize + std::fmt::Debug>(
670 0 : &self,
671 0 : url: &String,
672 0 : reconfigure_request: &Req,
673 0 : cancel: &CancellationToken,
674 0 : ) -> Result<(), NotifyError> {
675 0 : let req = self.client.request(reqwest::Method::PUT, url);
676 0 : let req = if let Some(value) = &self.authorization_header {
677 0 : req.header(reqwest::header::AUTHORIZATION, value)
678 : } else {
679 0 : req
680 : };
681 :
682 0 : tracing::info!(
683 0 : "Sending notify request to {} ({:?})",
684 : url,
685 : reconfigure_request
686 : );
687 0 : let send_result = req.json(&reconfigure_request).send().await;
688 0 : let response = match send_result {
689 0 : Ok(r) => r,
690 0 : Err(e) => return Err(e.into()),
691 : };
692 :
693 : // Treat all 2xx responses as success
694 0 : if response.status().is_success() {
695 0 : if response.status() != reqwest::StatusCode::OK {
696 : // Non-200 2xx response: it doesn't make sense to retry, but this is unexpected, so
697 : // log a warning.
698 0 : tracing::warn!(
699 0 : "Unexpected 2xx response code {} from control plane",
700 0 : response.status()
701 : );
702 0 : }
703 :
704 0 : return Ok(());
705 0 : }
706 :
707 : // Error response codes
708 0 : match response.status() {
709 : reqwest::StatusCode::TOO_MANY_REQUESTS => {
710 : // TODO: 429 handling should be global: set some state visible to other requests
711 : // so that they will delay before starting, rather than all notifications trying
712 : // once before backing off.
713 0 : tokio::time::timeout(SLOWDOWN_DELAY, cancel.cancelled())
714 0 : .await
715 0 : .ok();
716 0 : Err(NotifyError::SlowDown)
717 : }
718 : reqwest::StatusCode::LOCKED => {
719 : // We consider this fatal, because it's possible that the operation blocking the control one is
720 : // also the one that is waiting for this reconcile. We should let the reconciler calling
721 : // this hook fail, to give control plane a chance to un-lock.
722 0 : tracing::info!("Control plane reports tenant is locked, dropping out of notify");
723 0 : Err(NotifyError::Busy)
724 : }
725 : reqwest::StatusCode::SERVICE_UNAVAILABLE => {
726 0 : Err(NotifyError::Unavailable(StatusCode::SERVICE_UNAVAILABLE))
727 : }
728 : reqwest::StatusCode::GATEWAY_TIMEOUT => {
729 0 : Err(NotifyError::Unavailable(StatusCode::GATEWAY_TIMEOUT))
730 : }
731 : reqwest::StatusCode::BAD_GATEWAY => {
732 0 : Err(NotifyError::Unavailable(StatusCode::BAD_GATEWAY))
733 : }
734 :
735 0 : reqwest::StatusCode::BAD_REQUEST => Err(NotifyError::Fatal(StatusCode::BAD_REQUEST)),
736 0 : reqwest::StatusCode::UNAUTHORIZED => Err(NotifyError::Fatal(StatusCode::UNAUTHORIZED)),
737 0 : reqwest::StatusCode::FORBIDDEN => Err(NotifyError::Fatal(StatusCode::FORBIDDEN)),
738 0 : status => Err(NotifyError::Unexpected(
739 0 : hyper::StatusCode::from_u16(status.as_u16())
740 0 : .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
741 0 : )),
742 : }
743 0 : }
744 :
745 0 : async fn do_notify<R: serde::Serialize + std::fmt::Debug>(
746 0 : &self,
747 0 : url: &String,
748 0 : reconfigure_request: &R,
749 0 : cancel: &CancellationToken,
750 0 : ) -> Result<(), NotifyError> {
751 : // We hold these semaphore units across all retries, rather than only across each
752 : // HTTP request: this is to preserve fairness and avoid a situation where a retry might
753 : // time out waiting for a semaphore.
754 0 : let _units = self
755 0 : .api_concurrency
756 0 : .acquire()
757 0 : .await
758 : // Interpret closed semaphore as shutdown
759 0 : .map_err(|_| NotifyError::ShuttingDown)?;
760 :
761 0 : backoff::retry(
762 0 : || self.do_notify_iteration(url, reconfigure_request, cancel),
763 0 : |e| {
764 0 : matches!(
765 0 : e,
766 : NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
767 : )
768 0 : },
769 : 3,
770 : 10,
771 0 : "Send compute notification",
772 0 : cancel,
773 : )
774 0 : .await
775 0 : .ok_or_else(|| NotifyError::ShuttingDown)
776 0 : .and_then(|x| x)
777 0 : }
778 :
779 : /// Synchronous phase: update the per-tenant state for the next intended notification
780 0 : fn notify_attach_prepare(&self, shard_update: ShardUpdate) -> MaybeSendNotifyAttachResult {
781 0 : let mut tenants_locked = self.tenants.lock().unwrap();
782 :
783 : use std::collections::hash_map::Entry;
784 0 : let tenant_shard_id = shard_update.tenant_shard_id;
785 :
786 0 : let tenant = match tenants_locked.entry(tenant_shard_id.tenant_id) {
787 0 : Entry::Vacant(e) => {
788 : let ShardUpdate {
789 0 : tenant_shard_id,
790 0 : node_id,
791 0 : stripe_size,
792 0 : preferred_az,
793 0 : } = shard_update;
794 0 : e.insert(ComputeHookTenant::new(
795 0 : tenant_shard_id,
796 0 : stripe_size,
797 0 : preferred_az.map(|az| az.into_owned()),
798 0 : node_id,
799 : ))
800 : }
801 0 : Entry::Occupied(e) => {
802 0 : let tenant = e.into_mut();
803 0 : tenant.update(shard_update);
804 0 : tenant
805 : }
806 : };
807 0 : tenant.maybe_send(tenant_shard_id.tenant_id, None)
808 0 : }
809 :
810 0 : fn notify_safekeepers_prepare(
811 0 : &self,
812 0 : safekeepers_update: SafekeepersUpdate,
813 0 : ) -> MaybeSendNotifySafekeepersResult {
814 0 : let mut timelines_locked = self.timelines.lock().unwrap();
815 :
816 0 : let ttid = TenantTimelineId {
817 0 : tenant_id: safekeepers_update.tenant_id,
818 0 : timeline_id: safekeepers_update.timeline_id,
819 0 : };
820 :
821 : use std::collections::hash_map::Entry;
822 0 : let timeline = match timelines_locked.entry(ttid) {
823 0 : Entry::Vacant(e) => e.insert(ComputeHookTimeline::new(
824 0 : safekeepers_update.generation,
825 0 : safekeepers_update.safekeepers,
826 : )),
827 0 : Entry::Occupied(e) => {
828 0 : let timeline = e.into_mut();
829 0 : timeline.update(safekeepers_update);
830 0 : timeline
831 : }
832 : };
833 :
834 0 : timeline.maybe_send(ttid, None)
835 0 : }
836 :
837 0 : async fn notify_execute<M: ApiMethod>(
838 0 : &self,
839 0 : state: &std::sync::Mutex<HashMap<M::Key, M>>,
840 0 : maybe_send_result: MaybeSendResult<M::Request, M::Key>,
841 0 : cancel: &CancellationToken,
842 0 : ) -> Result<(), NotifyError> {
843 : // Process result: we may get an update to send, or we may have to wait for a lock
844 : // before trying again.
845 0 : let (request, mut send_lock_guard) = match maybe_send_result {
846 : MaybeSendResult::Noop => {
847 0 : return Ok(());
848 : }
849 0 : MaybeSendResult::AwaitLock((key, send_lock)) => {
850 0 : let send_locked = tokio::select! {
851 0 : guard = send_lock.lock_owned() => {guard},
852 0 : _ = cancel.cancelled() => {
853 0 : tracing::info!("Notification cancelled while waiting for lock");
854 0 : return Err(NotifyError::ShuttingDown)
855 : }
856 : };
857 :
858 : // Lock order: maybe_send is called within the `[Self::state]` lock, and takes the send lock, but here
859 : // we have acquired the send lock and take `[Self::state]` lock. This is safe because maybe_send only uses
860 : // try_lock.
861 0 : let state_locked = state.lock().unwrap();
862 0 : let Some(resource_state) = state_locked.get(&key) else {
863 0 : return Ok(());
864 : };
865 0 : match resource_state.maybe_send(key, Some(send_locked)) {
866 : MaybeSendResult::AwaitLock(_) => {
867 0 : unreachable!("We supplied lock guard")
868 : }
869 : MaybeSendResult::Noop => {
870 0 : return Ok(());
871 : }
872 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
873 : }
874 : }
875 0 : MaybeSendResult::Transmit((request, lock)) => (request, lock),
876 : };
877 :
878 0 : let result = if !self.config.use_local_compute_notifications {
879 0 : let compute_hook_url =
880 0 : self.config
881 0 : .control_plane_url
882 0 : .as_ref()
883 0 : .map(|control_plane_url| {
884 0 : format!(
885 0 : "{}/{}",
886 0 : control_plane_url.trim_end_matches('/'),
887 : M::API_PATH
888 : )
889 0 : });
890 :
891 : // We validate this at startup
892 0 : let notify_url = compute_hook_url.as_ref().unwrap();
893 0 : self.do_notify(notify_url, &request, cancel).await
894 : } else {
895 0 : match self.do_notify_local::<M>(&request).await.map_err(|e| {
896 : // This path is for testing only, so munge the error into our prod-style error type.
897 0 : if e.to_string().contains("refresh-configuration-pending") {
898 : // If the error message mentions "refresh-configuration-pending", it means the compute node
899 : // rejected our notification request because it already trying to reconfigure itself. We
900 : // can proceed with the rest of the reconcliation process as the compute node already
901 : // discovers the need to reconfigure and will eventually update its configuration once
902 : // we update the pageserver mappings. In fact, it is important that we continue with
903 : // reconcliation to make sure we update the pageserver mappings to unblock the compute node.
904 0 : tracing::info!("neon_local notification hook failed: {e}");
905 0 : tracing::info!("Notification failed likely due to compute node self-reconfiguration, will retry.");
906 0 : Ok(())
907 : } else {
908 0 : tracing::error!("neon_local notification hook failed: {e}");
909 0 : Err(NotifyError::Fatal(StatusCode::INTERNAL_SERVER_ERROR))
910 : }
911 0 : }) {
912 : // Compute node accepted the notification request. Ok to proceed.
913 0 : Ok(_) => Ok(()),
914 : // Compute node rejected our request but it is already self-reconfiguring. Ok to proceed.
915 0 : Err(Ok(_)) => Ok(()),
916 : // Fail the reconciliation attempt in all other cases. Recall that this whole code path involving
917 : // neon_local is for testing only. In production we always retry failed reconcliations so we
918 : // don't have any deadends here.
919 0 : Err(Err(e)) => Err(e),
920 : }
921 : };
922 :
923 0 : match result {
924 0 : Ok(_) => {
925 0 : // Before dropping the send lock, stash the request we just sent so that
926 0 : // subsequent callers can avoid redundantly re-sending the same thing.
927 0 : *send_lock_guard = Some(ComputeRemoteState {
928 0 : request,
929 0 : applied: true,
930 0 : });
931 0 : }
932 0 : Err(NotifyError::Busy) => {
933 0 : // Busy result means that the server responded and has stored the new configuration,
934 0 : // but was not able to fully apply it to the compute
935 0 : *send_lock_guard = Some(ComputeRemoteState {
936 0 : request,
937 0 : applied: false,
938 0 : });
939 0 : }
940 0 : Err(_) => {
941 0 : // General error case: we can no longer know the remote state, so clear it. This will result in
942 0 : // the logic in maybe_send recognizing that we should call the hook again.
943 0 : *send_lock_guard = None;
944 0 : }
945 : }
946 0 : result
947 0 : }
948 :
949 : /// Infallible synchronous fire-and-forget version of notify(), that sends its results to
950 : /// a channel. Something should consume the channel and arrange to try notifying again
951 : /// if something failed.
952 0 : pub(super) fn notify_attach_background(
953 0 : self: &Arc<Self>,
954 0 : notifications: Vec<ShardUpdate>,
955 0 : result_tx: tokio::sync::mpsc::Sender<Result<(), (TenantShardId, NotifyError)>>,
956 0 : cancel: &CancellationToken,
957 0 : ) {
958 0 : let mut maybe_sends = Vec::new();
959 0 : for shard_update in notifications {
960 0 : let tenant_shard_id = shard_update.tenant_shard_id;
961 0 : let maybe_send_result = self.notify_attach_prepare(shard_update);
962 0 : maybe_sends.push((tenant_shard_id, maybe_send_result))
963 : }
964 :
965 0 : let this = self.clone();
966 0 : let cancel = cancel.clone();
967 :
968 0 : tokio::task::spawn(async move {
969 : // Construct an async stream of futures to invoke the compute notify function: we do this
970 : // in order to subsequently use .buffered() on the stream to execute with bounded parallelism. The
971 : // ComputeHook semaphore already limits concurrency, but this way we avoid constructing+polling lots of futures which
972 : // would mostly just be waiting on that semaphore.
973 0 : let mut stream = futures::stream::iter(maybe_sends)
974 0 : .map(|(tenant_shard_id, maybe_send_result)| {
975 0 : let this = this.clone();
976 0 : let cancel = cancel.clone();
977 :
978 0 : async move {
979 0 : this
980 0 : .notify_execute(&this.tenants, maybe_send_result, &cancel)
981 0 : .await.map_err(|e| (tenant_shard_id, e))
982 0 : }.instrument(info_span!(
983 0 : "notify_attach_background", tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()
984 : ))
985 0 : })
986 0 : .buffered(API_CONCURRENCY);
987 :
988 : loop {
989 0 : tokio::select! {
990 0 : next = stream.next() => {
991 0 : match next {
992 0 : Some(r) => {
993 0 : result_tx.send(r).await.ok();
994 : },
995 : None => {
996 0 : tracing::info!("Finished sending background compute notifications");
997 0 : break;
998 : }
999 : }
1000 : },
1001 0 : _ = cancel.cancelled() => {
1002 0 : tracing::info!("Shutdown while running background compute notifications");
1003 0 : break;
1004 : }
1005 : };
1006 : }
1007 0 : });
1008 0 : }
1009 :
1010 : /// Call this to notify the compute (postgres) tier of new pageservers to use
1011 : /// for a tenant. notify() is called by each shard individually, and this function
1012 : /// will decide whether an update to the tenant is sent. An update is sent on the
1013 : /// condition that:
1014 : /// - We know a pageserver for every shard.
1015 : /// - All the shards have the same shard_count (i.e. we are not mid-split)
1016 : ///
1017 : /// Cancellation token enables callers to drop out, e.g. if calling from a Reconciler
1018 : /// that is cancelled.
1019 : ///
1020 : /// This function is fallible, including in the case that the control plane is transiently
1021 : /// unavailable. A limited number of retries are done internally to efficiently hide short unavailability
1022 : /// periods, but we don't retry forever. The **caller** is responsible for handling failures and
1023 : /// ensuring that they eventually call again to ensure that the compute is eventually notified of
1024 : /// the proper pageserver nodes for a tenant.
1025 : #[tracing::instrument(skip_all, fields(tenant_id=%shard_update.tenant_shard_id.tenant_id, shard_id=%shard_update.tenant_shard_id.shard_slug(), node_id))]
1026 : pub(super) async fn notify_attach<'a>(
1027 : &self,
1028 : shard_update: ShardUpdate<'a>,
1029 : cancel: &CancellationToken,
1030 : ) -> Result<(), NotifyError> {
1031 : let maybe_send_result = self.notify_attach_prepare(shard_update);
1032 : self.notify_execute(&self.tenants, maybe_send_result, cancel)
1033 : .await
1034 : }
1035 :
1036 0 : pub(super) async fn notify_safekeepers(
1037 0 : &self,
1038 0 : safekeepers_update: SafekeepersUpdate,
1039 0 : cancel: &CancellationToken,
1040 0 : ) -> Result<(), NotifyError> {
1041 0 : let maybe_send_result = self.notify_safekeepers_prepare(safekeepers_update);
1042 0 : self.notify_execute(&self.timelines, maybe_send_result, cancel)
1043 0 : .await
1044 0 : }
1045 :
1046 : /// Reflect a detach for a particular shard in the compute hook state.
1047 : ///
1048 : /// The goal is to avoid sending compute notifications with stale information (i.e.
1049 : /// including detach pageservers).
1050 : #[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug()))]
1051 : pub(super) fn handle_detach(
1052 : &self,
1053 : tenant_shard_id: TenantShardId,
1054 : stripe_size: ShardStripeSize,
1055 : ) {
1056 : use std::collections::hash_map::Entry;
1057 :
1058 : let mut tenants_locked = self.tenants.lock().unwrap();
1059 : match tenants_locked.entry(tenant_shard_id.tenant_id) {
1060 : Entry::Vacant(_) => {
1061 : // This is a valid but niche case, where the tenant was previously attached
1062 : // as a Secondary location and then detached, so has no previously notified
1063 : // state.
1064 : tracing::info!("Compute hook tenant not found for detach");
1065 : }
1066 : Entry::Occupied(mut e) => {
1067 : let sharded = e.get().is_sharded();
1068 : if !sharded {
1069 : e.remove();
1070 : } else {
1071 : e.get_mut().remove_shard(tenant_shard_id, stripe_size);
1072 : }
1073 :
1074 : tracing::debug!("Compute hook handled shard detach");
1075 : }
1076 : }
1077 : }
1078 : }
1079 :
1080 : #[cfg(test)]
1081 : pub(crate) mod tests {
1082 : use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardNumber};
1083 : use utils::id::TenantId;
1084 :
1085 : use super::*;
1086 :
1087 : #[test]
1088 1 : fn tenant_updates() -> anyhow::Result<()> {
1089 1 : let tenant_id = TenantId::generate();
1090 1 : let stripe_size = DEFAULT_STRIPE_SIZE;
1091 1 : let mut tenant_state = ComputeHookTenant::new(
1092 1 : TenantShardId {
1093 1 : tenant_id,
1094 1 : shard_count: ShardCount::new(0),
1095 1 : shard_number: ShardNumber(0),
1096 1 : },
1097 1 : ShardStripeSize(12345),
1098 1 : None,
1099 1 : NodeId(1),
1100 : );
1101 :
1102 : // An unsharded tenant is always ready to emit a notification, but won't
1103 : // send the same one twice
1104 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
1105 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
1106 0 : anyhow::bail!("Wrong send result");
1107 : };
1108 1 : assert_eq!(request.shards.len(), 1);
1109 1 : assert!(request.stripe_size.is_none());
1110 :
1111 : // Simulate successful send
1112 1 : *guard = Some(ComputeRemoteState {
1113 1 : request,
1114 1 : applied: true,
1115 1 : });
1116 1 : drop(guard);
1117 :
1118 : // Try asking again: this should be a no-op
1119 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
1120 1 : assert!(matches!(send_result, MaybeSendResult::Noop));
1121 :
1122 : // Writing the first shard of a multi-sharded situation (i.e. in a split)
1123 : // resets the tenant state and puts it in an non-notifying state (need to
1124 : // see all shards)
1125 1 : tenant_state.update(ShardUpdate {
1126 1 : tenant_shard_id: TenantShardId {
1127 1 : tenant_id,
1128 1 : shard_count: ShardCount::new(2),
1129 1 : shard_number: ShardNumber(1),
1130 1 : },
1131 1 : stripe_size,
1132 1 : preferred_az: None,
1133 1 : node_id: NodeId(1),
1134 1 : });
1135 1 : assert!(matches!(
1136 1 : tenant_state.maybe_send(tenant_id, None),
1137 : MaybeSendResult::Noop
1138 : ));
1139 :
1140 : // Writing the second shard makes it ready to notify
1141 1 : tenant_state.update(ShardUpdate {
1142 1 : tenant_shard_id: TenantShardId {
1143 1 : tenant_id,
1144 1 : shard_count: ShardCount::new(2),
1145 1 : shard_number: ShardNumber(0),
1146 1 : },
1147 1 : stripe_size,
1148 1 : preferred_az: None,
1149 1 : node_id: NodeId(1),
1150 1 : });
1151 :
1152 1 : let send_result = tenant_state.maybe_send(tenant_id, None);
1153 1 : let MaybeSendResult::Transmit((request, mut guard)) = send_result else {
1154 0 : anyhow::bail!("Wrong send result");
1155 : };
1156 1 : assert_eq!(request.shards.len(), 2);
1157 1 : assert_eq!(request.stripe_size, Some(stripe_size));
1158 :
1159 : // Simulate successful send
1160 1 : *guard = Some(ComputeRemoteState {
1161 1 : request,
1162 1 : applied: true,
1163 1 : });
1164 1 : drop(guard);
1165 :
1166 1 : Ok(())
1167 1 : }
1168 : }
|