Line data Source code
1 : use std::borrow::Cow;
2 : use std::collections::HashMap;
3 : use std::sync::Arc;
4 : use std::time::{Duration, Instant};
5 :
6 : use json_structural_diff::JsonDiff;
7 : use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
8 : use pageserver_api::models::{
9 : LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
10 : };
11 : use pageserver_api::shard::{ShardIdentity, TenantShardId};
12 : use pageserver_client::mgmt_api;
13 : use reqwest::StatusCode;
14 : use tokio_util::sync::CancellationToken;
15 : use utils::backoff::exponential_backoff;
16 : use utils::generation::Generation;
17 : use utils::id::{NodeId, TimelineId};
18 : use utils::lsn::Lsn;
19 : use utils::pausable_failpoint;
20 : use utils::sync::gate::GateGuard;
21 :
22 : use crate::compute_hook::{ComputeHook, NotifyError};
23 : use crate::node::Node;
24 : use crate::pageserver_client::PageserverClient;
25 : use crate::persistence::Persistence;
26 : use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
27 : use crate::{compute_hook, service};
28 :
29 : const DEFAULT_HEATMAP_PERIOD: Duration = Duration::from_secs(60);
30 :
31 : /// Object with the lifetime of the background reconcile task that is created
32 : /// for tenants which have a difference between their intent and observed states.
33 : pub(super) struct Reconciler {
34 : /// See [`crate::tenant_shard::TenantShard`] for the meanings of these fields: they are a snapshot
35 : /// of a tenant's state from when we spawned a reconcile task.
36 : pub(super) tenant_shard_id: TenantShardId,
37 : pub(crate) shard: ShardIdentity,
38 : pub(crate) placement_policy: PlacementPolicy,
39 : pub(crate) generation: Option<Generation>,
40 : pub(crate) intent: TargetState,
41 :
42 : /// Nodes not referenced by [`Self::intent`], from which we should try
43 : /// to detach this tenant shard.
44 : pub(crate) detach: Vec<Node>,
45 :
46 : /// Configuration specific to this reconciler
47 : pub(crate) reconciler_config: ReconcilerConfig,
48 :
49 : pub(crate) config: TenantConfig,
50 : pub(crate) preferred_az: Option<AvailabilityZone>,
51 :
52 : /// Observed state from the point of view of the reconciler.
53 : /// This gets updated as the reconciliation makes progress.
54 : pub(crate) observed: ObservedState,
55 :
56 : /// Snapshot of the observed state at the point when the reconciler
57 : /// was spawned.
58 : pub(crate) original_observed: ObservedState,
59 :
60 : pub(crate) service_config: service::Config,
61 :
62 : /// A hook to notify the running postgres instances when we change the location
63 : /// of a tenant. Use this via [`Self::compute_notify`] to update our failure flag
64 : /// and guarantee eventual retries.
65 : pub(crate) compute_hook: Arc<ComputeHook>,
66 :
67 : /// To avoid stalling if the cloud control plane is unavailable, we may proceed
68 : /// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
69 : /// so that we can set [`crate::tenant_shard::TenantShard::pending_compute_notification`] to ensure a later retry.
70 : pub(crate) compute_notify_failure: bool,
71 :
72 : /// Reconciler is responsible for keeping alive semaphore units that limit concurrency on how many
73 : /// we will spawn.
74 : pub(crate) _resource_units: ReconcileUnits,
75 :
76 : /// A means to abort background reconciliation: it is essential to
77 : /// call this when something changes in the original TenantShard that
78 : /// will make this reconciliation impossible or unnecessary, for
79 : /// example when a pageserver node goes offline, or the PlacementPolicy for
80 : /// the tenant is changed.
81 : pub(crate) cancel: CancellationToken,
82 :
83 : /// Reconcilers are registered with a Gate so that during a graceful shutdown we
84 : /// can wait for all the reconcilers to respond to their cancellation tokens.
85 : pub(crate) _gate_guard: GateGuard,
86 :
87 : /// Access to persistent storage for updating generation numbers
88 : pub(crate) persistence: Arc<Persistence>,
89 :
90 : /// HTTP client with proper CA certs.
91 : pub(crate) http_client: reqwest::Client,
92 : }
93 :
94 : pub(crate) struct ReconcilerConfigBuilder {
95 : config: ReconcilerConfig,
96 : }
97 :
98 : impl ReconcilerConfigBuilder {
99 : /// Priority is special: you must pick one thoughtfully, do not just use 'normal' as the default
100 0 : pub(crate) fn new(priority: ReconcilerPriority) -> Self {
101 0 : Self {
102 0 : config: ReconcilerConfig::new(priority),
103 0 : }
104 0 : }
105 :
106 0 : pub(crate) fn secondary_warmup_timeout(self, value: Duration) -> Self {
107 0 : Self {
108 0 : config: ReconcilerConfig {
109 0 : secondary_warmup_timeout: Some(value),
110 0 : ..self.config
111 0 : },
112 0 : }
113 0 : }
114 :
115 0 : pub(crate) fn secondary_download_request_timeout(self, value: Duration) -> Self {
116 0 : Self {
117 0 : config: ReconcilerConfig {
118 0 : secondary_download_request_timeout: Some(value),
119 0 : ..self.config
120 0 : },
121 0 : }
122 0 : }
123 :
124 0 : pub(crate) fn tenant_creation_hint(self, hint: bool) -> Self {
125 0 : Self {
126 0 : config: ReconcilerConfig {
127 0 : tenant_creation_hint: hint,
128 0 : ..self.config
129 0 : },
130 0 : }
131 0 : }
132 :
133 0 : pub(crate) fn build(self) -> ReconcilerConfig {
134 0 : self.config
135 0 : }
136 : }
137 :
138 : // Higher priorities are used for user-facing tasks, so that a long backlog of housekeeping work (e.g. reconciling on startup, rescheduling
139 : // things on node changes) does not starve user-facing tasks.
140 : #[derive(Debug, Copy, Clone)]
141 : pub(crate) enum ReconcilerPriority {
142 : Normal,
143 : High,
144 : }
145 :
146 : #[derive(Debug, Copy, Clone)]
147 : pub(crate) struct ReconcilerConfig {
148 : pub(crate) priority: ReconcilerPriority,
149 :
150 : // During live migration give up on warming-up the secondary
151 : // after this timeout.
152 : secondary_warmup_timeout: Option<Duration>,
153 :
154 : // During live migrations this is the amount of time that
155 : // the pagserver will hold our poll.
156 : secondary_download_request_timeout: Option<Duration>,
157 :
158 : // A hint indicating whether this reconciliation is done on the
159 : // creation of a new tenant. This only informs logging behaviour.
160 : tenant_creation_hint: bool,
161 : }
162 :
163 : impl ReconcilerConfig {
164 : /// Configs are always constructed with an explicit priority, to force callers to think about whether
165 : /// the operation they're scheduling is high-priority or not. Normal priority is not a safe default, because
166 : /// scheduling something user-facing at normal priority can result in it getting starved out by background work.
167 0 : pub(crate) fn new(priority: ReconcilerPriority) -> Self {
168 0 : Self {
169 0 : priority,
170 0 : secondary_warmup_timeout: None,
171 0 : secondary_download_request_timeout: None,
172 0 : tenant_creation_hint: false,
173 0 : }
174 0 : }
175 :
176 0 : pub(crate) fn get_secondary_warmup_timeout(&self) -> Duration {
177 : const SECONDARY_WARMUP_TIMEOUT_DEFAULT: Duration = Duration::from_secs(300);
178 0 : self.secondary_warmup_timeout
179 0 : .unwrap_or(SECONDARY_WARMUP_TIMEOUT_DEFAULT)
180 0 : }
181 :
182 0 : pub(crate) fn get_secondary_download_request_timeout(&self) -> Duration {
183 : const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(20);
184 0 : self.secondary_download_request_timeout
185 0 : .unwrap_or(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT_DEFAULT)
186 0 : }
187 :
188 0 : pub(crate) fn tenant_creation_hint(&self) -> bool {
189 0 : self.tenant_creation_hint
190 0 : }
191 : }
192 :
193 : impl From<&MigrationConfig> for ReconcilerConfig {
194 0 : fn from(value: &MigrationConfig) -> Self {
195 0 : // Run reconciler at high priority because MigrationConfig comes from human requests that should
196 0 : // be presumed urgent.
197 0 : let mut builder = ReconcilerConfigBuilder::new(ReconcilerPriority::High);
198 :
199 0 : if let Some(timeout) = value.secondary_warmup_timeout {
200 0 : builder = builder.secondary_warmup_timeout(timeout)
201 0 : }
202 :
203 0 : if let Some(timeout) = value.secondary_download_request_timeout {
204 0 : builder = builder.secondary_download_request_timeout(timeout)
205 0 : }
206 :
207 0 : builder.build()
208 0 : }
209 : }
210 :
211 : /// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
212 : pub(crate) struct ReconcileUnits {
213 : _sem_units: tokio::sync::OwnedSemaphorePermit,
214 : }
215 :
216 : impl ReconcileUnits {
217 0 : pub(crate) fn new(sem_units: tokio::sync::OwnedSemaphorePermit) -> Self {
218 0 : Self {
219 0 : _sem_units: sem_units,
220 0 : }
221 0 : }
222 : }
223 :
224 : /// This is a snapshot of [`crate::tenant_shard::IntentState`], but it does not do any
225 : /// reference counting for Scheduler. The IntentState is what the scheduler works with,
226 : /// and the TargetState is just the instruction for a particular Reconciler run.
227 : #[derive(Debug)]
228 : pub(crate) struct TargetState {
229 : pub(crate) attached: Option<Node>,
230 : pub(crate) secondary: Vec<Node>,
231 : }
232 :
233 : impl TargetState {
234 0 : pub(crate) fn from_intent(nodes: &HashMap<NodeId, Node>, intent: &IntentState) -> Self {
235 0 : Self {
236 0 : attached: intent.get_attached().map(|n| {
237 0 : nodes
238 0 : .get(&n)
239 0 : .expect("Intent attached referenced non-existent node")
240 0 : .clone()
241 0 : }),
242 0 : secondary: intent
243 0 : .get_secondary()
244 0 : .iter()
245 0 : .map(|n| {
246 0 : nodes
247 0 : .get(n)
248 0 : .expect("Intent secondary referenced non-existent node")
249 0 : .clone()
250 0 : })
251 0 : .collect(),
252 0 : }
253 0 : }
254 : }
255 :
256 : #[derive(thiserror::Error, Debug)]
257 : pub(crate) enum ReconcileError {
258 : #[error(transparent)]
259 : Remote(#[from] mgmt_api::Error),
260 : #[error(transparent)]
261 : Notify(#[from] NotifyError),
262 : #[error("Cancelled")]
263 : Cancel,
264 : #[error(transparent)]
265 : Other(#[from] anyhow::Error),
266 : }
267 :
268 : impl Reconciler {
269 0 : async fn location_config(
270 0 : &mut self,
271 0 : node: &Node,
272 0 : config: LocationConfig,
273 0 : flush_ms: Option<Duration>,
274 0 : lazy: bool,
275 0 : ) -> Result<(), ReconcileError> {
276 0 : if !node.is_available() && config.mode == LocationConfigMode::Detached {
277 : // [`crate::service::Service::node_activate_reconcile`] will update the observed state
278 : // when the node comes back online. At that point, the intent and observed states will
279 : // be mismatched and a background reconciliation will detach.
280 0 : tracing::info!(
281 0 : "Node {node} is unavailable during detach: proceeding anyway, it will be detached via background reconciliation"
282 : );
283 0 : return Ok(());
284 0 : }
285 0 :
286 0 : self.observed
287 0 : .locations
288 0 : .insert(node.get_id(), ObservedStateLocation { conf: None });
289 0 :
290 0 : // TODO: amend locations that use long-polling: they will hit this timeout.
291 0 : let timeout = Duration::from_secs(25);
292 0 :
293 0 : tracing::info!("location_config({node}) calling: {:?}", config);
294 0 : let tenant_shard_id = self.tenant_shard_id;
295 0 : let config_ref = &config;
296 0 : match node
297 0 : .with_client_retries(
298 0 : |client| async move {
299 0 : let config = config_ref.clone();
300 0 : client
301 0 : .location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
302 0 : .await
303 0 : },
304 0 : &self.http_client,
305 0 : &self.service_config.pageserver_jwt_token,
306 0 : 1,
307 0 : 3,
308 0 : timeout,
309 0 : &self.cancel,
310 0 : )
311 0 : .await
312 : {
313 0 : Some(Ok(_)) => {}
314 0 : Some(Err(e)) => return Err(e.into()),
315 0 : None => return Err(ReconcileError::Cancel),
316 : };
317 0 : tracing::info!("location_config({node}) complete: {:?}", config);
318 :
319 0 : match config.mode {
320 0 : LocationConfigMode::Detached => {
321 0 : self.observed.locations.remove(&node.get_id());
322 0 : }
323 0 : _ => {
324 0 : self.observed
325 0 : .locations
326 0 : .insert(node.get_id(), ObservedStateLocation { conf: Some(config) });
327 0 : }
328 : }
329 :
330 0 : Ok(())
331 0 : }
332 :
333 0 : fn get_node(&self, node_id: &NodeId) -> Option<&Node> {
334 0 : if let Some(node) = self.intent.attached.as_ref() {
335 0 : if node.get_id() == *node_id {
336 0 : return Some(node);
337 0 : }
338 0 : }
339 :
340 0 : if let Some(node) = self
341 0 : .intent
342 0 : .secondary
343 0 : .iter()
344 0 : .find(|n| n.get_id() == *node_id)
345 : {
346 0 : return Some(node);
347 0 : }
348 :
349 0 : if let Some(node) = self.detach.iter().find(|n| n.get_id() == *node_id) {
350 0 : return Some(node);
351 0 : }
352 0 :
353 0 : None
354 0 : }
355 :
356 0 : async fn maybe_live_migrate(&mut self) -> Result<(), ReconcileError> {
357 0 : let destination = if let Some(node) = &self.intent.attached {
358 0 : match self.observed.locations.get(&node.get_id()) {
359 0 : Some(conf) => {
360 : // We will do a live migration only if the intended destination is not
361 : // currently in an attached state.
362 0 : match &conf.conf {
363 0 : Some(conf) if conf.mode == LocationConfigMode::Secondary => {
364 0 : // Fall through to do a live migration
365 0 : node
366 : }
367 : None | Some(_) => {
368 : // Attached or uncertain: don't do a live migration, proceed
369 : // with a general-case reconciliation
370 0 : tracing::info!("maybe_live_migrate: destination is None or attached");
371 0 : return Ok(());
372 : }
373 : }
374 : }
375 : None => {
376 : // Our destination is not attached: maybe live migrate if some other
377 : // node is currently attached. Fall through.
378 0 : node
379 : }
380 : }
381 : } else {
382 : // No intent to be attached
383 0 : tracing::info!("maybe_live_migrate: no attached intent");
384 0 : return Ok(());
385 : };
386 :
387 0 : let mut origin = None;
388 0 : for (node_id, state) in &self.observed.locations {
389 0 : if let Some(observed_conf) = &state.conf {
390 0 : if observed_conf.mode == LocationConfigMode::AttachedSingle {
391 : // We will only attempt live migration if the origin is not offline: this
392 : // avoids trying to do it while reconciling after responding to an HA failover.
393 0 : if let Some(node) = self.get_node(node_id) {
394 0 : if node.is_available() {
395 0 : origin = Some(node.clone());
396 0 : break;
397 0 : }
398 0 : }
399 0 : }
400 0 : }
401 : }
402 :
403 0 : let Some(origin) = origin else {
404 0 : tracing::info!("maybe_live_migrate: no origin found");
405 0 : return Ok(());
406 : };
407 :
408 : // We have an origin and a destination: proceed to do the live migration
409 0 : tracing::info!("Live migrating {}->{}", origin, destination);
410 0 : self.live_migrate(origin, destination.clone()).await?;
411 :
412 0 : Ok(())
413 0 : }
414 :
415 0 : async fn wait_lsn(
416 0 : &self,
417 0 : node: &Node,
418 0 : tenant_shard_id: TenantShardId,
419 0 : timelines: HashMap<TimelineId, Lsn>,
420 0 : ) -> Result<StatusCode, ReconcileError> {
421 : const TIMEOUT: Duration = Duration::from_secs(10);
422 :
423 0 : let client = PageserverClient::new(
424 0 : node.get_id(),
425 0 : self.http_client.clone(),
426 0 : node.base_url(),
427 0 : self.service_config.pageserver_jwt_token.as_deref(),
428 0 : );
429 0 :
430 0 : client
431 0 : .wait_lsn(
432 0 : tenant_shard_id,
433 0 : TenantWaitLsnRequest {
434 0 : timelines,
435 0 : timeout: TIMEOUT,
436 0 : },
437 0 : )
438 0 : .await
439 0 : .map_err(|e| e.into())
440 0 : }
441 :
442 0 : async fn get_lsns(
443 0 : &self,
444 0 : tenant_shard_id: TenantShardId,
445 0 : node: &Node,
446 0 : ) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
447 0 : let client = PageserverClient::new(
448 0 : node.get_id(),
449 0 : self.http_client.clone(),
450 0 : node.base_url(),
451 0 : self.service_config.pageserver_jwt_token.as_deref(),
452 0 : );
453 :
454 0 : let timelines = client.timeline_list(&tenant_shard_id).await?;
455 0 : Ok(timelines
456 0 : .into_iter()
457 0 : .map(|t| (t.timeline_id, t.last_record_lsn))
458 0 : .collect())
459 0 : }
460 :
461 0 : async fn secondary_download(
462 0 : &self,
463 0 : tenant_shard_id: TenantShardId,
464 0 : node: &Node,
465 0 : ) -> Result<(), ReconcileError> {
466 0 : // This is not the timeout for a request, but the total amount of time we're willing to wait
467 0 : // for a secondary location to get up to date before
468 0 : let total_download_timeout = self.reconciler_config.get_secondary_warmup_timeout();
469 0 :
470 0 : // This the long-polling interval for the secondary download requests we send to destination pageserver
471 0 : // during a migration.
472 0 : let request_download_timeout = self
473 0 : .reconciler_config
474 0 : .get_secondary_download_request_timeout();
475 0 :
476 0 : let started_at = Instant::now();
477 :
478 : loop {
479 0 : let (status, progress) = match node
480 0 : .with_client_retries(
481 0 : |client| async move {
482 0 : client
483 0 : .tenant_secondary_download(
484 0 : tenant_shard_id,
485 0 : Some(request_download_timeout),
486 0 : )
487 0 : .await
488 0 : },
489 0 : &self.http_client,
490 0 : &self.service_config.pageserver_jwt_token,
491 0 : 1,
492 0 : 3,
493 0 : request_download_timeout * 2,
494 0 : &self.cancel,
495 0 : )
496 0 : .await
497 : {
498 0 : None => Err(ReconcileError::Cancel),
499 0 : Some(Ok(v)) => Ok(v),
500 0 : Some(Err(e)) => {
501 0 : // Give up, but proceed: it's unfortunate if we couldn't freshen the destination before
502 0 : // attaching, but we should not let an issue with a secondary location stop us proceeding
503 0 : // with a live migration.
504 0 : tracing::warn!("Failed to prepare by downloading layers on node {node}: {e})");
505 0 : return Ok(());
506 : }
507 0 : }?;
508 :
509 0 : if status == StatusCode::OK {
510 0 : tracing::info!(
511 0 : "Downloads to {} complete: {}/{} layers, {}/{} bytes",
512 : node,
513 : progress.layers_downloaded,
514 : progress.layers_total,
515 : progress.bytes_downloaded,
516 : progress.bytes_total
517 : );
518 0 : return Ok(());
519 0 : } else if status == StatusCode::ACCEPTED {
520 0 : let total_runtime = started_at.elapsed();
521 0 : if total_runtime > total_download_timeout {
522 0 : tracing::warn!(
523 0 : "Timed out after {}ms downloading layers to {node}. Progress so far: {}/{} layers, {}/{} bytes",
524 0 : total_runtime.as_millis(),
525 : progress.layers_downloaded,
526 : progress.layers_total,
527 : progress.bytes_downloaded,
528 : progress.bytes_total
529 : );
530 : // Give up, but proceed: an incompletely warmed destination doesn't prevent migration working,
531 : // it just makes the I/O performance for users less good.
532 0 : return Ok(());
533 0 : }
534 0 :
535 0 : // Log and proceed around the loop to retry. We don't sleep between requests, because our HTTP call
536 0 : // to the pageserver is a long-poll.
537 0 : tracing::info!(
538 0 : "Downloads to {} not yet complete: {}/{} layers, {}/{} bytes",
539 : node,
540 : progress.layers_downloaded,
541 : progress.layers_total,
542 : progress.bytes_downloaded,
543 : progress.bytes_total
544 : );
545 0 : }
546 : }
547 0 : }
548 :
549 : /// This function does _not_ mutate any state, so it is cancellation safe.
550 : ///
551 : /// This function does not respect [`Self::cancel`], callers should handle that.
552 0 : async fn await_lsn(
553 0 : &self,
554 0 : tenant_shard_id: TenantShardId,
555 0 : node: &Node,
556 0 : baseline: HashMap<TimelineId, Lsn>,
557 0 : ) -> anyhow::Result<()> {
558 : // Signal to the pageserver that it should ingest up to the baseline LSNs.
559 : loop {
560 0 : match self.wait_lsn(node, tenant_shard_id, baseline.clone()).await {
561 : Ok(StatusCode::OK) => {
562 : // Everything is caught up
563 0 : return Ok(());
564 : }
565 : Ok(StatusCode::ACCEPTED) => {
566 : // Some timelines are not caught up yet.
567 : // They'll be polled below.
568 0 : break;
569 : }
570 : Ok(StatusCode::NOT_FOUND) => {
571 : // None of the timelines are present on the pageserver.
572 : // This is correct if they've all been deleted, but
573 : // let let the polling loop below cross check.
574 0 : break;
575 : }
576 0 : Ok(status_code) => {
577 0 : tracing::warn!(
578 0 : "Unexpected status code ({status_code}) returned by wait_lsn endpoint"
579 : );
580 0 : break;
581 : }
582 0 : Err(e) => {
583 0 : tracing::info!("🕑 Can't trigger LSN wait on {node} yet, waiting ({e})",);
584 0 : tokio::time::sleep(Duration::from_millis(500)).await;
585 0 : continue;
586 : }
587 : }
588 : }
589 :
590 : // Poll the LSNs until they catch up
591 : loop {
592 0 : let latest = match self.get_lsns(tenant_shard_id, node).await {
593 0 : Ok(l) => l,
594 0 : Err(e) => {
595 0 : tracing::info!("🕑 Can't get LSNs on node {node} yet, waiting ({e})",);
596 0 : tokio::time::sleep(Duration::from_millis(500)).await;
597 0 : continue;
598 : }
599 : };
600 :
601 0 : let mut any_behind: bool = false;
602 0 : for (timeline_id, baseline_lsn) in &baseline {
603 0 : match latest.get(timeline_id) {
604 0 : Some(latest_lsn) => {
605 0 : tracing::info!(timeline_id = %timeline_id, "🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
606 0 : if latest_lsn < baseline_lsn {
607 0 : any_behind = true;
608 0 : }
609 : }
610 0 : None => {
611 0 : // Timeline was deleted in the meantime - ignore it
612 0 : }
613 : }
614 : }
615 :
616 0 : if !any_behind {
617 0 : tracing::info!("✅ LSN caught up. Proceeding...");
618 0 : break;
619 : } else {
620 0 : tokio::time::sleep(Duration::from_millis(500)).await;
621 : }
622 : }
623 :
624 0 : Ok(())
625 0 : }
626 :
627 0 : pub async fn live_migrate(
628 0 : &mut self,
629 0 : origin_ps: Node,
630 0 : dest_ps: Node,
631 0 : ) -> Result<(), ReconcileError> {
632 0 : // `maybe_live_migrate` is responsibble for sanity of inputs
633 0 : assert!(origin_ps.get_id() != dest_ps.get_id());
634 :
635 0 : fn build_location_config(
636 0 : shard: &ShardIdentity,
637 0 : config: &TenantConfig,
638 0 : mode: LocationConfigMode,
639 0 : generation: Option<Generation>,
640 0 : secondary_conf: Option<LocationConfigSecondary>,
641 0 : ) -> LocationConfig {
642 0 : LocationConfig {
643 0 : mode,
644 0 : generation: generation.map(|g| g.into().unwrap()),
645 0 : secondary_conf,
646 0 : tenant_conf: config.clone(),
647 0 : shard_number: shard.number.0,
648 0 : shard_count: shard.count.literal(),
649 0 : shard_stripe_size: shard.stripe_size.0,
650 0 : }
651 0 : }
652 :
653 0 : tracing::info!("🔁 Switching origin node {origin_ps} to stale mode",);
654 :
655 : // FIXME: it is incorrect to use self.generation here, we should use the generation
656 : // from the ObservedState of the origin pageserver (it might be older than self.generation)
657 0 : let stale_conf = build_location_config(
658 0 : &self.shard,
659 0 : &self.config,
660 0 : LocationConfigMode::AttachedStale,
661 0 : self.generation,
662 0 : None,
663 0 : );
664 0 : self.location_config(&origin_ps, stale_conf, Some(Duration::from_secs(10)), false)
665 0 : .await?;
666 :
667 0 : let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps).await?);
668 :
669 : // If we are migrating to a destination that has a secondary location, warm it up first
670 0 : if let Some(destination_conf) = self.observed.locations.get(&dest_ps.get_id()) {
671 0 : if let Some(destination_conf) = &destination_conf.conf {
672 0 : if destination_conf.mode == LocationConfigMode::Secondary {
673 0 : tracing::info!("🔁 Downloading latest layers to destination node {dest_ps}",);
674 0 : self.secondary_download(self.tenant_shard_id, &dest_ps)
675 0 : .await?;
676 0 : }
677 0 : }
678 0 : }
679 :
680 0 : pausable_failpoint!("reconciler-live-migrate-pre-generation-inc");
681 :
682 : // Increment generation before attaching to new pageserver
683 : self.generation = Some(
684 0 : self.persistence
685 0 : .increment_generation(self.tenant_shard_id, dest_ps.get_id())
686 0 : .await?,
687 : );
688 :
689 0 : pausable_failpoint!("reconciler-live-migrate-post-generation-inc");
690 :
691 0 : let dest_conf = build_location_config(
692 0 : &self.shard,
693 0 : &self.config,
694 0 : LocationConfigMode::AttachedMulti,
695 0 : self.generation,
696 0 : None,
697 0 : );
698 0 :
699 0 : tracing::info!("🔁 Attaching to pageserver {dest_ps}");
700 0 : self.location_config(&dest_ps, dest_conf, None, false)
701 0 : .await?;
702 :
703 0 : pausable_failpoint!("reconciler-live-migrate-pre-await-lsn");
704 :
705 0 : if let Some(baseline) = baseline_lsns {
706 0 : tracing::info!("🕑 Waiting for LSN to catch up...");
707 0 : tokio::select! {
708 0 : r = self.await_lsn(self.tenant_shard_id, &dest_ps, baseline) => {r?;}
709 0 : _ = self.cancel.cancelled() => {return Err(ReconcileError::Cancel)}
710 : };
711 0 : }
712 :
713 0 : tracing::info!("🔁 Notifying compute to use pageserver {dest_ps}");
714 :
715 : // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach
716 : // the origin without notifying compute, we will render the tenant unavailable.
717 0 : self.compute_notify_blocking(&origin_ps).await?;
718 0 : pausable_failpoint!("reconciler-live-migrate-post-notify");
719 :
720 : // Downgrade the origin to secondary. If the tenant's policy is PlacementPolicy::Attached(0), then
721 : // this location will be deleted in the general case reconciliation that runs after this.
722 0 : let origin_secondary_conf = build_location_config(
723 0 : &self.shard,
724 0 : &self.config,
725 0 : LocationConfigMode::Secondary,
726 0 : None,
727 0 : Some(LocationConfigSecondary { warm: true }),
728 0 : );
729 0 : self.location_config(&origin_ps, origin_secondary_conf.clone(), None, false)
730 0 : .await?;
731 : // TODO: we should also be setting the ObservedState on earlier API calls, in case we fail
732 : // partway through. In fact, all location conf API calls should be in a wrapper that sets
733 : // the observed state to None, then runs, then sets it to what we wrote.
734 0 : self.observed.locations.insert(
735 0 : origin_ps.get_id(),
736 0 : ObservedStateLocation {
737 0 : conf: Some(origin_secondary_conf),
738 0 : },
739 0 : );
740 0 :
741 0 : pausable_failpoint!("reconciler-live-migrate-post-detach");
742 :
743 0 : tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",);
744 0 : let dest_final_conf = build_location_config(
745 0 : &self.shard,
746 0 : &self.config,
747 0 : LocationConfigMode::AttachedSingle,
748 0 : self.generation,
749 0 : None,
750 0 : );
751 0 : self.location_config(&dest_ps, dest_final_conf.clone(), None, false)
752 0 : .await?;
753 0 : self.observed.locations.insert(
754 0 : dest_ps.get_id(),
755 0 : ObservedStateLocation {
756 0 : conf: Some(dest_final_conf),
757 0 : },
758 0 : );
759 0 :
760 0 : tracing::info!("✅ Migration complete");
761 :
762 0 : Ok(())
763 0 : }
764 :
765 : /// Returns true if the observed state of the attached location was refreshed
766 : /// and false otherwise.
767 0 : async fn maybe_refresh_observed(&mut self) -> Result<bool, ReconcileError> {
768 : // If the attached node has uncertain state, read it from the pageserver before proceeding: this
769 : // is important to avoid spurious generation increments.
770 : //
771 : // We don't need to do this for secondary/detach locations because it's harmless to just PUT their
772 : // location conf, whereas for attached locations it can interrupt clients if we spuriously destroy/recreate
773 : // the `Timeline` object in the pageserver.
774 :
775 0 : let Some(attached_node) = self.intent.attached.as_ref() else {
776 : // Nothing to do
777 0 : return Ok(false);
778 : };
779 :
780 0 : if matches!(
781 0 : self.observed.locations.get(&attached_node.get_id()),
782 : Some(ObservedStateLocation { conf: None })
783 : ) {
784 0 : let tenant_shard_id = self.tenant_shard_id;
785 0 : let observed_conf = match attached_node
786 0 : .with_client_retries(
787 0 : |client| async move { client.get_location_config(tenant_shard_id).await },
788 0 : &self.http_client,
789 0 : &self.service_config.pageserver_jwt_token,
790 0 : 1,
791 0 : 1,
792 0 : Duration::from_secs(5),
793 0 : &self.cancel,
794 0 : )
795 0 : .await
796 : {
797 0 : Some(Ok(observed)) => Some(observed),
798 0 : Some(Err(mgmt_api::Error::ApiError(status, _msg)))
799 0 : if status == StatusCode::NOT_FOUND =>
800 0 : {
801 0 : None
802 : }
803 0 : Some(Err(e)) => return Err(e.into()),
804 0 : None => return Err(ReconcileError::Cancel),
805 : };
806 0 : tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
807 0 : match observed_conf {
808 0 : Some(conf) => {
809 0 : // Pageserver returned a state: update it in observed. This may still be an indeterminate (None) state,
810 0 : // if internally the pageserver's TenantSlot was being mutated (e.g. some long running API call is still running)
811 0 : self.observed
812 0 : .locations
813 0 : .insert(attached_node.get_id(), ObservedStateLocation { conf });
814 0 : }
815 0 : None => {
816 0 : // Pageserver returned 404: we have confirmation that there is no state for this shard on that pageserver.
817 0 : self.observed.locations.remove(&attached_node.get_id());
818 0 : }
819 : }
820 0 : }
821 :
822 0 : Ok(true)
823 0 : }
824 :
825 : /// Reconciling a tenant makes API calls to pageservers until the observed state
826 : /// matches the intended state.
827 : ///
828 : /// First we apply special case handling (e.g. for live migrations), and then a
829 : /// general case reconciliation where we walk through the intent by pageserver
830 : /// and call out to the pageserver to apply the desired state.
831 : ///
832 : /// An Ok(()) result indicates that we successfully attached the tenant, but _not_ that
833 : /// all locations for the tenant are in the expected state. When nodes that are to be detached
834 : /// or configured as secondary are unavailable, we may return Ok(()) but leave the shard in a
835 : /// state where it still requires later reconciliation.
836 0 : pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
837 : // Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
838 0 : let refreshed = self.maybe_refresh_observed().await?;
839 :
840 : // Special case: live migration
841 0 : self.maybe_live_migrate().await?;
842 :
843 : // If the attached pageserver is not attached, do so now.
844 0 : if let Some(node) = self.intent.attached.as_ref() {
845 : // If we are in an attached policy, then generation must have been set (null generations
846 : // are only present when a tenant is initially loaded with a secondary policy)
847 0 : debug_assert!(self.generation.is_some());
848 0 : let Some(generation) = self.generation else {
849 0 : return Err(ReconcileError::Other(anyhow::anyhow!(
850 0 : "Attempted to attach with NULL generation"
851 0 : )));
852 : };
853 :
854 0 : let mut wanted_conf = attached_location_conf(
855 0 : generation,
856 0 : &self.shard,
857 0 : &self.config,
858 0 : &self.placement_policy,
859 0 : );
860 0 : match self.observed.locations.get(&node.get_id()) {
861 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
862 0 : if refreshed {
863 0 : tracing::info!(
864 0 : node_id=%node.get_id(), "Observed configuration correct after refresh. Notifying compute.");
865 0 : self.compute_notify().await?;
866 : } else {
867 : // Nothing to do
868 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.");
869 : }
870 : }
871 0 : observed => {
872 : // In all cases other than a matching observed configuration, we will
873 : // reconcile this location. This includes locations with different configurations, as well
874 : // as locations with unknown (None) observed state.
875 :
876 : // Incrementing generation is the safe general case, but is inefficient for changes that only
877 : // modify some details (e.g. the tenant's config).
878 0 : let increment_generation = match observed {
879 0 : None => true,
880 0 : Some(ObservedStateLocation { conf: None }) => true,
881 : Some(ObservedStateLocation {
882 0 : conf: Some(observed),
883 0 : }) => {
884 0 : let generations_match = observed.generation == wanted_conf.generation;
885 0 :
886 0 : // We may skip incrementing the generation if the location is already in the expected mode and
887 0 : // generation. In principle it would also be safe to skip from certain other modes (e.g. AttachedStale),
888 0 : // but such states are handled inside `live_migrate`, and if we see that state here we're cleaning up
889 0 : // after a restart/crash, so fall back to the universally safe path of incrementing generation.
890 0 : !generations_match || (observed.mode != wanted_conf.mode)
891 : }
892 : };
893 :
894 0 : if increment_generation {
895 0 : pausable_failpoint!("reconciler-pre-increment-generation");
896 :
897 0 : let generation = self
898 0 : .persistence
899 0 : .increment_generation(self.tenant_shard_id, node.get_id())
900 0 : .await?;
901 0 : self.generation = Some(generation);
902 0 : wanted_conf.generation = generation.into();
903 0 : }
904 :
905 0 : let diff = match observed {
906 : Some(ObservedStateLocation {
907 0 : conf: Some(observed),
908 0 : }) => {
909 0 : let diff = JsonDiff::diff(
910 0 : &serde_json::to_value(observed.clone()).unwrap(),
911 0 : &serde_json::to_value(wanted_conf.clone()).unwrap(),
912 0 : false,
913 0 : );
914 :
915 0 : if let Some(json_diff) = diff.diff {
916 0 : serde_json::to_string(&json_diff).unwrap_or("diff err".to_string())
917 : } else {
918 0 : "unknown".to_string()
919 : }
920 : }
921 0 : _ => "full".to_string(),
922 : };
923 :
924 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration requires update: {diff}");
925 :
926 : // Because `node` comes from a ref to &self, clone it before calling into a &mut self
927 : // function: this could be avoided by refactoring the state mutated by location_config into
928 : // a separate type to Self.
929 0 : let node = node.clone();
930 0 :
931 0 : // Use lazy=true, because we may run many of Self concurrently, and do not want to
932 0 : // overload the pageserver with logical size calculations.
933 0 : self.location_config(&node, wanted_conf, None, true).await?;
934 0 : self.compute_notify().await?;
935 : }
936 : }
937 0 : }
938 :
939 : // Configure secondary locations: if these were previously attached this
940 : // implicitly downgrades them from attached to secondary.
941 0 : let mut changes = Vec::new();
942 0 : for node in &self.intent.secondary {
943 0 : let wanted_conf = secondary_location_conf(&self.shard, &self.config);
944 0 : match self.observed.locations.get(&node.get_id()) {
945 0 : Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
946 0 : // Nothing to do
947 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
948 : }
949 : _ => {
950 : // Only try and configure secondary locations on nodes that are available. This
951 : // allows the reconciler to "succeed" while some secondaries are offline (e.g. after
952 : // a node failure, where the failed node will have a secondary intent)
953 0 : if node.is_available() {
954 0 : tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
955 0 : changes.push((node.clone(), wanted_conf))
956 : } else {
957 0 : tracing::info!(node_id=%node.get_id(), "Skipping configuration as secondary, node is unavailable");
958 0 : self.observed
959 0 : .locations
960 0 : .insert(node.get_id(), ObservedStateLocation { conf: None });
961 : }
962 : }
963 : }
964 : }
965 :
966 : // Detach any extraneous pageservers that are no longer referenced
967 : // by our intent.
968 0 : for node in &self.detach {
969 0 : changes.push((
970 0 : node.clone(),
971 0 : LocationConfig {
972 0 : mode: LocationConfigMode::Detached,
973 0 : generation: None,
974 0 : secondary_conf: None,
975 0 : shard_number: self.shard.number.0,
976 0 : shard_count: self.shard.count.literal(),
977 0 : shard_stripe_size: self.shard.stripe_size.0,
978 0 : tenant_conf: self.config.clone(),
979 0 : },
980 0 : ));
981 0 : }
982 :
983 0 : for (node, conf) in changes {
984 0 : if self.cancel.is_cancelled() {
985 0 : return Err(ReconcileError::Cancel);
986 0 : }
987 0 : // We only try to configure secondary locations if the node is available. This does
988 0 : // not stop us succeeding with the reconcile, because our core goal is to make the
989 0 : // shard _available_ (the attached location), and configuring secondary locations
990 0 : // can be done lazily when the node becomes available (via background reconciliation).
991 0 : if node.is_available() {
992 0 : self.location_config(&node, conf, None, false).await?;
993 : } else {
994 : // If the node is unavailable, we skip and consider the reconciliation successful: this
995 : // is a common case where a pageserver is marked unavailable: we demote a location on
996 : // that unavailable pageserver to secondary.
997 0 : tracing::info!("Skipping configuring secondary location {node}, it is unavailable");
998 0 : self.observed
999 0 : .locations
1000 0 : .insert(node.get_id(), ObservedStateLocation { conf: None });
1001 : }
1002 : }
1003 :
1004 : // The condition below identifies a detach. We must have no attached intent and
1005 : // must have been attached to something previously. Pass this information to
1006 : // the [`ComputeHook`] such that it can update its tenant-wide state.
1007 0 : if self.intent.attached.is_none() && !self.detach.is_empty() {
1008 0 : // TODO: Consider notifying control plane about detaches. This would avoid situations
1009 0 : // where the compute tries to start-up with a stale set of pageservers.
1010 0 : self.compute_hook
1011 0 : .handle_detach(self.tenant_shard_id, self.shard.stripe_size);
1012 0 : }
1013 :
1014 0 : pausable_failpoint!("reconciler-epilogue");
1015 :
1016 0 : Ok(())
1017 0 : }
1018 :
1019 0 : pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> {
1020 : // Whenever a particular Reconciler emits a notification, it is always notifying for the intended
1021 : // destination.
1022 0 : if let Some(node) = &self.intent.attached {
1023 0 : let result = self
1024 0 : .compute_hook
1025 0 : .notify(
1026 0 : compute_hook::ShardUpdate {
1027 0 : tenant_shard_id: self.tenant_shard_id,
1028 0 : node_id: node.get_id(),
1029 0 : stripe_size: self.shard.stripe_size,
1030 0 : preferred_az: self.preferred_az.as_ref().map(Cow::Borrowed),
1031 0 : },
1032 0 : &self.cancel,
1033 0 : )
1034 0 : .await;
1035 0 : if let Err(e) = &result {
1036 : // Set this flag so that in our ReconcileResult we will set the flag on the shard that it
1037 : // needs to retry at some point.
1038 0 : self.compute_notify_failure = true;
1039 :
1040 : // It is up to the caller whether they want to drop out on this error, but they don't have to:
1041 : // in general we should avoid letting unavailability of the cloud control plane stop us from
1042 : // making progress.
1043 0 : match e {
1044 0 : // 404s from cplane during tenant creation are expected.
1045 0 : // Cplane only persists the shards to the database after
1046 0 : // creating the tenant and the timeline. If we notify before
1047 0 : // that, we'll get a 404.
1048 0 : //
1049 0 : // This is fine because tenant creations happen via /location_config
1050 0 : // and that returns the list of locations in the response. Hence, we
1051 0 : // silence the error and return Ok(()) here. Reconciliation will still
1052 0 : // be retried because we set [`Reconciler::compute_notify_failure`] above.
1053 0 : NotifyError::Unexpected(hyper::StatusCode::NOT_FOUND)
1054 0 : if self.reconciler_config.tenant_creation_hint() =>
1055 0 : {
1056 0 : return Ok(());
1057 : }
1058 0 : NotifyError::ShuttingDown => {}
1059 : _ => {
1060 0 : tracing::warn!(
1061 0 : "Failed to notify compute of attached pageserver {node}: {e}"
1062 : );
1063 : }
1064 : }
1065 0 : }
1066 0 : result
1067 : } else {
1068 0 : Ok(())
1069 : }
1070 0 : }
1071 :
1072 : /// Compare the observed state snapshot from when the reconcile was created
1073 : /// with the final observed state in order to generate observed state deltas.
1074 0 : pub(crate) fn observed_deltas(&self) -> Vec<ObservedStateDelta> {
1075 0 : let mut deltas = Vec::default();
1076 :
1077 0 : for (node_id, location) in &self.observed.locations {
1078 0 : let previous_location = self.original_observed.locations.get(node_id);
1079 0 : let do_upsert = match previous_location {
1080 : // Location config changed for node
1081 0 : Some(prev) if location.conf != prev.conf => true,
1082 : // New location config for node
1083 0 : None => true,
1084 : // Location config has not changed for node
1085 0 : _ => false,
1086 : };
1087 :
1088 0 : if do_upsert {
1089 0 : deltas.push(ObservedStateDelta::Upsert(Box::new((
1090 0 : *node_id,
1091 0 : location.clone(),
1092 0 : ))));
1093 0 : }
1094 : }
1095 :
1096 0 : for node_id in self.original_observed.locations.keys() {
1097 0 : if !self.observed.locations.contains_key(node_id) {
1098 0 : deltas.push(ObservedStateDelta::Delete(*node_id));
1099 0 : }
1100 : }
1101 :
1102 0 : deltas
1103 0 : }
1104 :
1105 : /// Keep trying to notify the compute indefinitely, only dropping out if:
1106 : /// - the node `origin` becomes unavailable -> Ok(())
1107 : /// - the node `origin` no longer has our tenant shard attached -> Ok(())
1108 : /// - our cancellation token fires -> Err(ReconcileError::Cancelled)
1109 : ///
1110 : /// This is used during live migration, where we do not wish to detach
1111 : /// an origin location until the compute definitely knows about the new
1112 : /// location.
1113 : ///
1114 : /// In cases where the origin node becomes unavailable, we return success, indicating
1115 : /// to the caller that they should continue irrespective of whether the compute was notified,
1116 : /// because the origin node is unusable anyway. Notification will be retried later via the
1117 : /// [`Self::compute_notify_failure`] flag.
1118 0 : async fn compute_notify_blocking(&mut self, origin: &Node) -> Result<(), ReconcileError> {
1119 0 : let mut notify_attempts = 0;
1120 0 : while let Err(e) = self.compute_notify().await {
1121 0 : match e {
1122 0 : NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)),
1123 0 : NotifyError::ShuttingDown => return Err(ReconcileError::Cancel),
1124 : _ => {
1125 0 : tracing::warn!(
1126 0 : "Live migration blocked by compute notification error, retrying: {e}"
1127 : );
1128 : }
1129 : }
1130 :
1131 : // Did the origin pageserver become unavailable?
1132 0 : if !origin.is_available() {
1133 0 : tracing::info!("Giving up on compute notification because {origin} is unavailable");
1134 0 : break;
1135 0 : }
1136 0 :
1137 0 : // Does the origin pageserver still host the shard we are interested in? We should only
1138 0 : // continue waiting for compute notification to be acked if the old location is still usable.
1139 0 : let tenant_shard_id = self.tenant_shard_id;
1140 0 : match origin
1141 0 : .with_client_retries(
1142 0 : |client| async move { client.get_location_config(tenant_shard_id).await },
1143 0 : &self.http_client,
1144 0 : &self.service_config.pageserver_jwt_token,
1145 0 : 1,
1146 0 : 3,
1147 0 : Duration::from_secs(5),
1148 0 : &self.cancel,
1149 0 : )
1150 0 : .await
1151 : {
1152 0 : Some(Ok(Some(location_conf))) => {
1153 0 : if matches!(
1154 0 : location_conf.mode,
1155 : LocationConfigMode::AttachedMulti
1156 : | LocationConfigMode::AttachedSingle
1157 : | LocationConfigMode::AttachedStale
1158 : ) {
1159 0 : tracing::debug!(
1160 0 : "Still attached to {origin}, will wait & retry compute notification"
1161 : );
1162 : } else {
1163 0 : tracing::info!(
1164 0 : "Giving up on compute notification because {origin} is in state {:?}",
1165 : location_conf.mode
1166 : );
1167 0 : return Ok(());
1168 : }
1169 : // Fall through
1170 : }
1171 : Some(Ok(None)) => {
1172 0 : tracing::info!(
1173 0 : "No longer attached to {origin}, giving up on compute notification"
1174 : );
1175 0 : return Ok(());
1176 : }
1177 0 : Some(Err(e)) => {
1178 0 : match e {
1179 : mgmt_api::Error::Cancelled => {
1180 0 : tracing::info!(
1181 0 : "Giving up on compute notification because {origin} is unavailable"
1182 : );
1183 0 : return Ok(());
1184 : }
1185 : mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _) => {
1186 0 : tracing::info!(
1187 0 : "No longer attached to {origin}, giving up on compute notification"
1188 : );
1189 0 : return Ok(());
1190 : }
1191 0 : e => {
1192 0 : // Other API errors are unexpected here.
1193 0 : tracing::warn!("Unexpected error checking location on {origin}: {e}");
1194 :
1195 : // Fall through, we will retry compute notification.
1196 : }
1197 : }
1198 : }
1199 0 : None => return Err(ReconcileError::Cancel),
1200 : };
1201 :
1202 0 : exponential_backoff(
1203 0 : notify_attempts,
1204 0 : // Generous waits: control plane operations which might be blocking us usually complete on the order
1205 0 : // of hundreds to thousands of milliseconds, so no point busy polling.
1206 0 : 1.0,
1207 0 : 10.0,
1208 0 : &self.cancel,
1209 0 : )
1210 0 : .await;
1211 0 : notify_attempts += 1;
1212 : }
1213 :
1214 0 : Ok(())
1215 0 : }
1216 : }
1217 :
1218 : /// We tweak the externally-set TenantConfig while configuring
1219 : /// locations, using our awareness of whether secondary locations
1220 : /// are in use to automatically enable/disable heatmap uploads.
1221 0 : fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig {
1222 0 : let mut config = config.clone();
1223 0 : if has_secondaries {
1224 0 : if config.heatmap_period.is_none() {
1225 0 : config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD);
1226 0 : }
1227 0 : } else {
1228 0 : config.heatmap_period = None;
1229 0 : }
1230 0 : config
1231 0 : }
1232 :
1233 0 : pub(crate) fn attached_location_conf(
1234 0 : generation: Generation,
1235 0 : shard: &ShardIdentity,
1236 0 : config: &TenantConfig,
1237 0 : policy: &PlacementPolicy,
1238 0 : ) -> LocationConfig {
1239 0 : let has_secondaries = match policy {
1240 : PlacementPolicy::Attached(0) | PlacementPolicy::Detached | PlacementPolicy::Secondary => {
1241 0 : false
1242 : }
1243 0 : PlacementPolicy::Attached(_) => true,
1244 : };
1245 :
1246 0 : LocationConfig {
1247 0 : mode: LocationConfigMode::AttachedSingle,
1248 0 : generation: generation.into(),
1249 0 : secondary_conf: None,
1250 0 : shard_number: shard.number.0,
1251 0 : shard_count: shard.count.literal(),
1252 0 : shard_stripe_size: shard.stripe_size.0,
1253 0 : tenant_conf: ha_aware_config(config, has_secondaries),
1254 0 : }
1255 0 : }
1256 :
1257 0 : pub(crate) fn secondary_location_conf(
1258 0 : shard: &ShardIdentity,
1259 0 : config: &TenantConfig,
1260 0 : ) -> LocationConfig {
1261 0 : LocationConfig {
1262 0 : mode: LocationConfigMode::Secondary,
1263 0 : generation: None,
1264 0 : secondary_conf: Some(LocationConfigSecondary { warm: true }),
1265 0 : shard_number: shard.number.0,
1266 0 : shard_count: shard.count.literal(),
1267 0 : shard_stripe_size: shard.stripe_size.0,
1268 0 : tenant_conf: ha_aware_config(config, true),
1269 0 : }
1270 0 : }
|