LCOV - code coverage report
Current view: top level - storage_controller/src - reconciler.rs (source / functions) Coverage Total Hit
Test: c28d23d327d4ca6acc894004f1432d7b7eea829c.info Lines: 0.0 % 759 0
Test Date: 2025-03-21 14:50:36 Functions: 0.0 % 55 0

            Line data    Source code
       1              : use std::borrow::Cow;
       2              : use std::collections::HashMap;
       3              : use std::sync::Arc;
       4              : use std::time::{Duration, Instant};
       5              : 
       6              : use json_structural_diff::JsonDiff;
       7              : use pageserver_api::controller_api::{AvailabilityZone, MigrationConfig, PlacementPolicy};
       8              : use pageserver_api::models::{
       9              :     LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig, TenantWaitLsnRequest,
      10              : };
      11              : use pageserver_api::shard::{ShardIdentity, TenantShardId};
      12              : use pageserver_client::mgmt_api;
      13              : use reqwest::StatusCode;
      14              : use tokio_util::sync::CancellationToken;
      15              : use utils::backoff::exponential_backoff;
      16              : use utils::generation::Generation;
      17              : use utils::id::{NodeId, TimelineId};
      18              : use utils::lsn::Lsn;
      19              : use utils::pausable_failpoint;
      20              : use utils::sync::gate::GateGuard;
      21              : 
      22              : use crate::compute_hook::{ComputeHook, NotifyError};
      23              : use crate::node::Node;
      24              : use crate::pageserver_client::PageserverClient;
      25              : use crate::persistence::Persistence;
      26              : use crate::tenant_shard::{IntentState, ObservedState, ObservedStateDelta, ObservedStateLocation};
      27              : use crate::{compute_hook, service};
      28              : 
      29              : const DEFAULT_HEATMAP_PERIOD: Duration = Duration::from_secs(60);
      30              : 
      31              : /// Object with the lifetime of the background reconcile task that is created
      32              : /// for tenants which have a difference between their intent and observed states.
      33              : pub(super) struct Reconciler {
      34              :     /// See [`crate::tenant_shard::TenantShard`] for the meanings of these fields: they are a snapshot
      35              :     /// of a tenant's state from when we spawned a reconcile task.
      36              :     pub(super) tenant_shard_id: TenantShardId,
      37              :     pub(crate) shard: ShardIdentity,
      38              :     pub(crate) placement_policy: PlacementPolicy,
      39              :     pub(crate) generation: Option<Generation>,
      40              :     pub(crate) intent: TargetState,
      41              : 
      42              :     /// Nodes not referenced by [`Self::intent`], from which we should try
      43              :     /// to detach this tenant shard.
      44              :     pub(crate) detach: Vec<Node>,
      45              : 
      46              :     /// Configuration specific to this reconciler
      47              :     pub(crate) reconciler_config: ReconcilerConfig,
      48              : 
      49              :     pub(crate) config: TenantConfig,
      50              :     pub(crate) preferred_az: Option<AvailabilityZone>,
      51              : 
      52              :     /// Observed state from the point of view of the reconciler.
      53              :     /// This gets updated as the reconciliation makes progress.
      54              :     pub(crate) observed: ObservedState,
      55              : 
      56              :     /// Snapshot of the observed state at the point when the reconciler
      57              :     /// was spawned.
      58              :     pub(crate) original_observed: ObservedState,
      59              : 
      60              :     pub(crate) service_config: service::Config,
      61              : 
      62              :     /// A hook to notify the running postgres instances when we change the location
      63              :     /// of a tenant.  Use this via [`Self::compute_notify`] to update our failure flag
      64              :     /// and guarantee eventual retries.
      65              :     pub(crate) compute_hook: Arc<ComputeHook>,
      66              : 
      67              :     /// To avoid stalling if the cloud control plane is unavailable, we may proceed
      68              :     /// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
      69              :     /// so that we can set [`crate::tenant_shard::TenantShard::pending_compute_notification`] to ensure a later retry.
      70              :     pub(crate) compute_notify_failure: bool,
      71              : 
      72              :     /// Reconciler is responsible for keeping alive semaphore units that limit concurrency on how many
      73              :     /// we will spawn.
      74              :     pub(crate) _resource_units: ReconcileUnits,
      75              : 
      76              :     /// A means to abort background reconciliation: it is essential to
      77              :     /// call this when something changes in the original TenantShard that
      78              :     /// will make this reconciliation impossible or unnecessary, for
      79              :     /// example when a pageserver node goes offline, or the PlacementPolicy for
      80              :     /// the tenant is changed.
      81              :     pub(crate) cancel: CancellationToken,
      82              : 
      83              :     /// Reconcilers are registered with a Gate so that during a graceful shutdown we
      84              :     /// can wait for all the reconcilers to respond to their cancellation tokens.
      85              :     pub(crate) _gate_guard: GateGuard,
      86              : 
      87              :     /// Access to persistent storage for updating generation numbers
      88              :     pub(crate) persistence: Arc<Persistence>,
      89              : 
      90              :     /// HTTP client with proper CA certs.
      91              :     pub(crate) http_client: reqwest::Client,
      92              : }
      93              : 
      94              : pub(crate) struct ReconcilerConfigBuilder {
      95              :     config: ReconcilerConfig,
      96              : }
      97              : 
      98              : impl ReconcilerConfigBuilder {
      99              :     /// Priority is special: you must pick one thoughtfully, do not just use 'normal' as the default
     100            0 :     pub(crate) fn new(priority: ReconcilerPriority) -> Self {
     101            0 :         Self {
     102            0 :             config: ReconcilerConfig::new(priority),
     103            0 :         }
     104            0 :     }
     105              : 
     106            0 :     pub(crate) fn secondary_warmup_timeout(self, value: Duration) -> Self {
     107            0 :         Self {
     108            0 :             config: ReconcilerConfig {
     109            0 :                 secondary_warmup_timeout: Some(value),
     110            0 :                 ..self.config
     111            0 :             },
     112            0 :         }
     113            0 :     }
     114              : 
     115            0 :     pub(crate) fn secondary_download_request_timeout(self, value: Duration) -> Self {
     116            0 :         Self {
     117            0 :             config: ReconcilerConfig {
     118            0 :                 secondary_download_request_timeout: Some(value),
     119            0 :                 ..self.config
     120            0 :             },
     121            0 :         }
     122            0 :     }
     123              : 
     124            0 :     pub(crate) fn tenant_creation_hint(self, hint: bool) -> Self {
     125            0 :         Self {
     126            0 :             config: ReconcilerConfig {
     127            0 :                 tenant_creation_hint: hint,
     128            0 :                 ..self.config
     129            0 :             },
     130            0 :         }
     131            0 :     }
     132              : 
     133            0 :     pub(crate) fn build(self) -> ReconcilerConfig {
     134            0 :         self.config
     135            0 :     }
     136              : }
     137              : 
     138              : // Higher priorities are used for user-facing tasks, so that a long backlog of housekeeping work (e.g. reconciling on startup, rescheduling
     139              : // things on node changes) does not starve user-facing tasks.
     140              : #[derive(Debug, Copy, Clone)]
     141              : pub(crate) enum ReconcilerPriority {
     142              :     Normal,
     143              :     High,
     144              : }
     145              : 
     146              : #[derive(Debug, Copy, Clone)]
     147              : pub(crate) struct ReconcilerConfig {
     148              :     pub(crate) priority: ReconcilerPriority,
     149              : 
     150              :     // During live migration give up on warming-up the secondary
     151              :     // after this timeout.
     152              :     secondary_warmup_timeout: Option<Duration>,
     153              : 
     154              :     // During live migrations this is the amount of time that
     155              :     // the pagserver will hold our poll.
     156              :     secondary_download_request_timeout: Option<Duration>,
     157              : 
     158              :     // A hint indicating whether this reconciliation is done on the
     159              :     // creation of a new tenant. This only informs logging behaviour.
     160              :     tenant_creation_hint: bool,
     161              : }
     162              : 
     163              : impl ReconcilerConfig {
     164              :     /// Configs are always constructed with an explicit priority, to force callers to think about whether
     165              :     /// the operation they're scheduling is high-priority or not. Normal priority is not a safe default, because
     166              :     /// scheduling something user-facing at normal priority can result in it getting starved out by background work.
     167            0 :     pub(crate) fn new(priority: ReconcilerPriority) -> Self {
     168            0 :         Self {
     169            0 :             priority,
     170            0 :             secondary_warmup_timeout: None,
     171            0 :             secondary_download_request_timeout: None,
     172            0 :             tenant_creation_hint: false,
     173            0 :         }
     174            0 :     }
     175              : 
     176            0 :     pub(crate) fn get_secondary_warmup_timeout(&self) -> Duration {
     177              :         const SECONDARY_WARMUP_TIMEOUT_DEFAULT: Duration = Duration::from_secs(300);
     178            0 :         self.secondary_warmup_timeout
     179            0 :             .unwrap_or(SECONDARY_WARMUP_TIMEOUT_DEFAULT)
     180            0 :     }
     181              : 
     182            0 :     pub(crate) fn get_secondary_download_request_timeout(&self) -> Duration {
     183              :         const SECONDARY_DOWNLOAD_REQUEST_TIMEOUT_DEFAULT: Duration = Duration::from_secs(20);
     184            0 :         self.secondary_download_request_timeout
     185            0 :             .unwrap_or(SECONDARY_DOWNLOAD_REQUEST_TIMEOUT_DEFAULT)
     186            0 :     }
     187              : 
     188            0 :     pub(crate) fn tenant_creation_hint(&self) -> bool {
     189            0 :         self.tenant_creation_hint
     190            0 :     }
     191              : }
     192              : 
     193              : impl From<&MigrationConfig> for ReconcilerConfig {
     194            0 :     fn from(value: &MigrationConfig) -> Self {
     195            0 :         // Run reconciler at high priority because MigrationConfig comes from human requests that should
     196            0 :         // be presumed urgent.
     197            0 :         let mut builder = ReconcilerConfigBuilder::new(ReconcilerPriority::High);
     198              : 
     199            0 :         if let Some(timeout) = value.secondary_warmup_timeout {
     200            0 :             builder = builder.secondary_warmup_timeout(timeout)
     201            0 :         }
     202              : 
     203            0 :         if let Some(timeout) = value.secondary_download_request_timeout {
     204            0 :             builder = builder.secondary_download_request_timeout(timeout)
     205            0 :         }
     206              : 
     207            0 :         builder.build()
     208            0 :     }
     209              : }
     210              : 
     211              : /// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
     212              : pub(crate) struct ReconcileUnits {
     213              :     _sem_units: tokio::sync::OwnedSemaphorePermit,
     214              : }
     215              : 
     216              : impl ReconcileUnits {
     217            0 :     pub(crate) fn new(sem_units: tokio::sync::OwnedSemaphorePermit) -> Self {
     218            0 :         Self {
     219            0 :             _sem_units: sem_units,
     220            0 :         }
     221            0 :     }
     222              : }
     223              : 
     224              : /// This is a snapshot of [`crate::tenant_shard::IntentState`], but it does not do any
     225              : /// reference counting for Scheduler.  The IntentState is what the scheduler works with,
     226              : /// and the TargetState is just the instruction for a particular Reconciler run.
     227              : #[derive(Debug)]
     228              : pub(crate) struct TargetState {
     229              :     pub(crate) attached: Option<Node>,
     230              :     pub(crate) secondary: Vec<Node>,
     231              : }
     232              : 
     233              : impl TargetState {
     234            0 :     pub(crate) fn from_intent(nodes: &HashMap<NodeId, Node>, intent: &IntentState) -> Self {
     235            0 :         Self {
     236            0 :             attached: intent.get_attached().map(|n| {
     237            0 :                 nodes
     238            0 :                     .get(&n)
     239            0 :                     .expect("Intent attached referenced non-existent node")
     240            0 :                     .clone()
     241            0 :             }),
     242            0 :             secondary: intent
     243            0 :                 .get_secondary()
     244            0 :                 .iter()
     245            0 :                 .map(|n| {
     246            0 :                     nodes
     247            0 :                         .get(n)
     248            0 :                         .expect("Intent secondary referenced non-existent node")
     249            0 :                         .clone()
     250            0 :                 })
     251            0 :                 .collect(),
     252            0 :         }
     253            0 :     }
     254              : }
     255              : 
     256              : #[derive(thiserror::Error, Debug)]
     257              : pub(crate) enum ReconcileError {
     258              :     #[error(transparent)]
     259              :     Remote(#[from] mgmt_api::Error),
     260              :     #[error(transparent)]
     261              :     Notify(#[from] NotifyError),
     262              :     #[error("Cancelled")]
     263              :     Cancel,
     264              :     #[error(transparent)]
     265              :     Other(#[from] anyhow::Error),
     266              : }
     267              : 
     268              : impl Reconciler {
     269            0 :     async fn location_config(
     270            0 :         &mut self,
     271            0 :         node: &Node,
     272            0 :         config: LocationConfig,
     273            0 :         flush_ms: Option<Duration>,
     274            0 :         lazy: bool,
     275            0 :     ) -> Result<(), ReconcileError> {
     276            0 :         if !node.is_available() && config.mode == LocationConfigMode::Detached {
     277              :             // [`crate::service::Service::node_activate_reconcile`] will update the observed state
     278              :             // when the node comes back online. At that point, the intent and observed states will
     279              :             // be mismatched and a background reconciliation will detach.
     280            0 :             tracing::info!(
     281            0 :                 "Node {node} is unavailable during detach: proceeding anyway, it will be detached via background reconciliation"
     282              :             );
     283            0 :             return Ok(());
     284            0 :         }
     285            0 : 
     286            0 :         self.observed
     287            0 :             .locations
     288            0 :             .insert(node.get_id(), ObservedStateLocation { conf: None });
     289            0 : 
     290            0 :         // TODO: amend locations that use long-polling: they will hit this timeout.
     291            0 :         let timeout = Duration::from_secs(25);
     292            0 : 
     293            0 :         tracing::info!("location_config({node}) calling: {:?}", config);
     294            0 :         let tenant_shard_id = self.tenant_shard_id;
     295            0 :         let config_ref = &config;
     296            0 :         match node
     297            0 :             .with_client_retries(
     298            0 :                 |client| async move {
     299            0 :                     let config = config_ref.clone();
     300            0 :                     client
     301            0 :                         .location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
     302            0 :                         .await
     303            0 :                 },
     304            0 :                 &self.http_client,
     305            0 :                 &self.service_config.pageserver_jwt_token,
     306            0 :                 1,
     307            0 :                 3,
     308            0 :                 timeout,
     309            0 :                 &self.cancel,
     310            0 :             )
     311            0 :             .await
     312              :         {
     313            0 :             Some(Ok(_)) => {}
     314            0 :             Some(Err(e)) => return Err(e.into()),
     315            0 :             None => return Err(ReconcileError::Cancel),
     316              :         };
     317            0 :         tracing::info!("location_config({node}) complete: {:?}", config);
     318              : 
     319            0 :         match config.mode {
     320            0 :             LocationConfigMode::Detached => {
     321            0 :                 self.observed.locations.remove(&node.get_id());
     322            0 :             }
     323            0 :             _ => {
     324            0 :                 self.observed
     325            0 :                     .locations
     326            0 :                     .insert(node.get_id(), ObservedStateLocation { conf: Some(config) });
     327            0 :             }
     328              :         }
     329              : 
     330            0 :         Ok(())
     331            0 :     }
     332              : 
     333            0 :     fn get_node(&self, node_id: &NodeId) -> Option<&Node> {
     334            0 :         if let Some(node) = self.intent.attached.as_ref() {
     335            0 :             if node.get_id() == *node_id {
     336            0 :                 return Some(node);
     337            0 :             }
     338            0 :         }
     339              : 
     340            0 :         if let Some(node) = self
     341            0 :             .intent
     342            0 :             .secondary
     343            0 :             .iter()
     344            0 :             .find(|n| n.get_id() == *node_id)
     345              :         {
     346            0 :             return Some(node);
     347            0 :         }
     348              : 
     349            0 :         if let Some(node) = self.detach.iter().find(|n| n.get_id() == *node_id) {
     350            0 :             return Some(node);
     351            0 :         }
     352            0 : 
     353            0 :         None
     354            0 :     }
     355              : 
     356            0 :     async fn maybe_live_migrate(&mut self) -> Result<(), ReconcileError> {
     357            0 :         let destination = if let Some(node) = &self.intent.attached {
     358            0 :             match self.observed.locations.get(&node.get_id()) {
     359            0 :                 Some(conf) => {
     360              :                     // We will do a live migration only if the intended destination is not
     361              :                     // currently in an attached state.
     362            0 :                     match &conf.conf {
     363            0 :                         Some(conf) if conf.mode == LocationConfigMode::Secondary => {
     364            0 :                             // Fall through to do a live migration
     365            0 :                             node
     366              :                         }
     367              :                         None | Some(_) => {
     368              :                             // Attached or uncertain: don't do a live migration, proceed
     369              :                             // with a general-case reconciliation
     370            0 :                             tracing::info!("maybe_live_migrate: destination is None or attached");
     371            0 :                             return Ok(());
     372              :                         }
     373              :                     }
     374              :                 }
     375              :                 None => {
     376              :                     // Our destination is not attached: maybe live migrate if some other
     377              :                     // node is currently attached.  Fall through.
     378            0 :                     node
     379              :                 }
     380              :             }
     381              :         } else {
     382              :             // No intent to be attached
     383            0 :             tracing::info!("maybe_live_migrate: no attached intent");
     384            0 :             return Ok(());
     385              :         };
     386              : 
     387            0 :         let mut origin = None;
     388            0 :         for (node_id, state) in &self.observed.locations {
     389            0 :             if let Some(observed_conf) = &state.conf {
     390            0 :                 if observed_conf.mode == LocationConfigMode::AttachedSingle {
     391              :                     // We will only attempt live migration if the origin is not offline: this
     392              :                     // avoids trying to do it while reconciling after responding to an HA failover.
     393            0 :                     if let Some(node) = self.get_node(node_id) {
     394            0 :                         if node.is_available() {
     395            0 :                             origin = Some(node.clone());
     396            0 :                             break;
     397            0 :                         }
     398            0 :                     }
     399            0 :                 }
     400            0 :             }
     401              :         }
     402              : 
     403            0 :         let Some(origin) = origin else {
     404            0 :             tracing::info!("maybe_live_migrate: no origin found");
     405            0 :             return Ok(());
     406              :         };
     407              : 
     408              :         // We have an origin and a destination: proceed to do the live migration
     409            0 :         tracing::info!("Live migrating {}->{}", origin, destination);
     410            0 :         self.live_migrate(origin, destination.clone()).await?;
     411              : 
     412            0 :         Ok(())
     413            0 :     }
     414              : 
     415            0 :     async fn wait_lsn(
     416            0 :         &self,
     417            0 :         node: &Node,
     418            0 :         tenant_shard_id: TenantShardId,
     419            0 :         timelines: HashMap<TimelineId, Lsn>,
     420            0 :     ) -> Result<StatusCode, ReconcileError> {
     421              :         const TIMEOUT: Duration = Duration::from_secs(10);
     422              : 
     423            0 :         let client = PageserverClient::new(
     424            0 :             node.get_id(),
     425            0 :             self.http_client.clone(),
     426            0 :             node.base_url(),
     427            0 :             self.service_config.pageserver_jwt_token.as_deref(),
     428            0 :         );
     429            0 : 
     430            0 :         client
     431            0 :             .wait_lsn(
     432            0 :                 tenant_shard_id,
     433            0 :                 TenantWaitLsnRequest {
     434            0 :                     timelines,
     435            0 :                     timeout: TIMEOUT,
     436            0 :                 },
     437            0 :             )
     438            0 :             .await
     439            0 :             .map_err(|e| e.into())
     440            0 :     }
     441              : 
     442            0 :     async fn get_lsns(
     443            0 :         &self,
     444            0 :         tenant_shard_id: TenantShardId,
     445            0 :         node: &Node,
     446            0 :     ) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
     447            0 :         let client = PageserverClient::new(
     448            0 :             node.get_id(),
     449            0 :             self.http_client.clone(),
     450            0 :             node.base_url(),
     451            0 :             self.service_config.pageserver_jwt_token.as_deref(),
     452            0 :         );
     453              : 
     454            0 :         let timelines = client.timeline_list(&tenant_shard_id).await?;
     455            0 :         Ok(timelines
     456            0 :             .into_iter()
     457            0 :             .map(|t| (t.timeline_id, t.last_record_lsn))
     458            0 :             .collect())
     459            0 :     }
     460              : 
     461            0 :     async fn secondary_download(
     462            0 :         &self,
     463            0 :         tenant_shard_id: TenantShardId,
     464            0 :         node: &Node,
     465            0 :     ) -> Result<(), ReconcileError> {
     466            0 :         // This is not the timeout for a request, but the total amount of time we're willing to wait
     467            0 :         // for a secondary location to get up to date before
     468            0 :         let total_download_timeout = self.reconciler_config.get_secondary_warmup_timeout();
     469            0 : 
     470            0 :         // This the long-polling interval for the secondary download requests we send to destination pageserver
     471            0 :         // during a migration.
     472            0 :         let request_download_timeout = self
     473            0 :             .reconciler_config
     474            0 :             .get_secondary_download_request_timeout();
     475            0 : 
     476            0 :         let started_at = Instant::now();
     477              : 
     478              :         loop {
     479            0 :             let (status, progress) = match node
     480            0 :                 .with_client_retries(
     481            0 :                     |client| async move {
     482            0 :                         client
     483            0 :                             .tenant_secondary_download(
     484            0 :                                 tenant_shard_id,
     485            0 :                                 Some(request_download_timeout),
     486            0 :                             )
     487            0 :                             .await
     488            0 :                     },
     489            0 :                     &self.http_client,
     490            0 :                     &self.service_config.pageserver_jwt_token,
     491            0 :                     1,
     492            0 :                     3,
     493            0 :                     request_download_timeout * 2,
     494            0 :                     &self.cancel,
     495            0 :                 )
     496            0 :                 .await
     497              :             {
     498            0 :                 None => Err(ReconcileError::Cancel),
     499            0 :                 Some(Ok(v)) => Ok(v),
     500            0 :                 Some(Err(e)) => {
     501            0 :                     // Give up, but proceed: it's unfortunate if we couldn't freshen the destination before
     502            0 :                     // attaching, but we should not let an issue with a secondary location stop us proceeding
     503            0 :                     // with a live migration.
     504            0 :                     tracing::warn!("Failed to prepare by downloading layers on node {node}: {e})");
     505            0 :                     return Ok(());
     506              :                 }
     507            0 :             }?;
     508              : 
     509            0 :             if status == StatusCode::OK {
     510            0 :                 tracing::info!(
     511            0 :                     "Downloads to {} complete: {}/{} layers, {}/{} bytes",
     512              :                     node,
     513              :                     progress.layers_downloaded,
     514              :                     progress.layers_total,
     515              :                     progress.bytes_downloaded,
     516              :                     progress.bytes_total
     517              :                 );
     518            0 :                 return Ok(());
     519            0 :             } else if status == StatusCode::ACCEPTED {
     520            0 :                 let total_runtime = started_at.elapsed();
     521            0 :                 if total_runtime > total_download_timeout {
     522            0 :                     tracing::warn!(
     523            0 :                         "Timed out after {}ms downloading layers to {node}.  Progress so far: {}/{} layers, {}/{} bytes",
     524            0 :                         total_runtime.as_millis(),
     525              :                         progress.layers_downloaded,
     526              :                         progress.layers_total,
     527              :                         progress.bytes_downloaded,
     528              :                         progress.bytes_total
     529              :                     );
     530              :                     // Give up, but proceed: an incompletely warmed destination doesn't prevent migration working,
     531              :                     // it just makes the I/O performance for users less good.
     532            0 :                     return Ok(());
     533            0 :                 }
     534            0 : 
     535            0 :                 // Log and proceed around the loop to retry.  We don't sleep between requests, because our HTTP call
     536            0 :                 // to the pageserver is a long-poll.
     537            0 :                 tracing::info!(
     538            0 :                     "Downloads to {} not yet complete: {}/{} layers, {}/{} bytes",
     539              :                     node,
     540              :                     progress.layers_downloaded,
     541              :                     progress.layers_total,
     542              :                     progress.bytes_downloaded,
     543              :                     progress.bytes_total
     544              :                 );
     545            0 :             }
     546              :         }
     547            0 :     }
     548              : 
     549              :     /// This function does _not_ mutate any state, so it is cancellation safe.
     550              :     ///
     551              :     /// This function does not respect [`Self::cancel`], callers should handle that.
     552            0 :     async fn await_lsn(
     553            0 :         &self,
     554            0 :         tenant_shard_id: TenantShardId,
     555            0 :         node: &Node,
     556            0 :         baseline: HashMap<TimelineId, Lsn>,
     557            0 :     ) -> anyhow::Result<()> {
     558              :         // Signal to the pageserver that it should ingest up to the baseline LSNs.
     559              :         loop {
     560            0 :             match self.wait_lsn(node, tenant_shard_id, baseline.clone()).await {
     561              :                 Ok(StatusCode::OK) => {
     562              :                     // Everything is caught up
     563            0 :                     return Ok(());
     564              :                 }
     565              :                 Ok(StatusCode::ACCEPTED) => {
     566              :                     // Some timelines are not caught up yet.
     567              :                     // They'll be polled below.
     568            0 :                     break;
     569              :                 }
     570              :                 Ok(StatusCode::NOT_FOUND) => {
     571              :                     // None of the timelines are present on the pageserver.
     572              :                     // This is correct if they've all been deleted, but
     573              :                     // let let the polling loop below cross check.
     574            0 :                     break;
     575              :                 }
     576            0 :                 Ok(status_code) => {
     577            0 :                     tracing::warn!(
     578            0 :                         "Unexpected status code ({status_code}) returned by wait_lsn endpoint"
     579              :                     );
     580            0 :                     break;
     581              :                 }
     582            0 :                 Err(e) => {
     583            0 :                     tracing::info!("🕑 Can't trigger LSN wait on {node} yet, waiting ({e})",);
     584            0 :                     tokio::time::sleep(Duration::from_millis(500)).await;
     585            0 :                     continue;
     586              :                 }
     587              :             }
     588              :         }
     589              : 
     590              :         // Poll the LSNs until they catch up
     591              :         loop {
     592            0 :             let latest = match self.get_lsns(tenant_shard_id, node).await {
     593            0 :                 Ok(l) => l,
     594            0 :                 Err(e) => {
     595            0 :                     tracing::info!("🕑 Can't get LSNs on node {node} yet, waiting ({e})",);
     596            0 :                     tokio::time::sleep(Duration::from_millis(500)).await;
     597            0 :                     continue;
     598              :                 }
     599              :             };
     600              : 
     601            0 :             let mut any_behind: bool = false;
     602            0 :             for (timeline_id, baseline_lsn) in &baseline {
     603            0 :                 match latest.get(timeline_id) {
     604            0 :                     Some(latest_lsn) => {
     605            0 :                         tracing::info!(timeline_id = %timeline_id, "🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
     606            0 :                         if latest_lsn < baseline_lsn {
     607            0 :                             any_behind = true;
     608            0 :                         }
     609              :                     }
     610            0 :                     None => {
     611            0 :                         // Timeline was deleted in the meantime - ignore it
     612            0 :                     }
     613              :                 }
     614              :             }
     615              : 
     616            0 :             if !any_behind {
     617            0 :                 tracing::info!("✅ LSN caught up.  Proceeding...");
     618            0 :                 break;
     619              :             } else {
     620            0 :                 tokio::time::sleep(Duration::from_millis(500)).await;
     621              :             }
     622              :         }
     623              : 
     624            0 :         Ok(())
     625            0 :     }
     626              : 
     627            0 :     pub async fn live_migrate(
     628            0 :         &mut self,
     629            0 :         origin_ps: Node,
     630            0 :         dest_ps: Node,
     631            0 :     ) -> Result<(), ReconcileError> {
     632            0 :         // `maybe_live_migrate` is responsibble for sanity of inputs
     633            0 :         assert!(origin_ps.get_id() != dest_ps.get_id());
     634              : 
     635            0 :         fn build_location_config(
     636            0 :             shard: &ShardIdentity,
     637            0 :             config: &TenantConfig,
     638            0 :             mode: LocationConfigMode,
     639            0 :             generation: Option<Generation>,
     640            0 :             secondary_conf: Option<LocationConfigSecondary>,
     641            0 :         ) -> LocationConfig {
     642            0 :             LocationConfig {
     643            0 :                 mode,
     644            0 :                 generation: generation.map(|g| g.into().unwrap()),
     645            0 :                 secondary_conf,
     646            0 :                 tenant_conf: config.clone(),
     647            0 :                 shard_number: shard.number.0,
     648            0 :                 shard_count: shard.count.literal(),
     649            0 :                 shard_stripe_size: shard.stripe_size.0,
     650            0 :             }
     651            0 :         }
     652              : 
     653            0 :         tracing::info!("🔁 Switching origin node {origin_ps} to stale mode",);
     654              : 
     655              :         // FIXME: it is incorrect to use self.generation here, we should use the generation
     656              :         // from the ObservedState of the origin pageserver (it might be older than self.generation)
     657            0 :         let stale_conf = build_location_config(
     658            0 :             &self.shard,
     659            0 :             &self.config,
     660            0 :             LocationConfigMode::AttachedStale,
     661            0 :             self.generation,
     662            0 :             None,
     663            0 :         );
     664            0 :         self.location_config(&origin_ps, stale_conf, Some(Duration::from_secs(10)), false)
     665            0 :             .await?;
     666              : 
     667            0 :         let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps).await?);
     668              : 
     669              :         // If we are migrating to a destination that has a secondary location, warm it up first
     670            0 :         if let Some(destination_conf) = self.observed.locations.get(&dest_ps.get_id()) {
     671            0 :             if let Some(destination_conf) = &destination_conf.conf {
     672            0 :                 if destination_conf.mode == LocationConfigMode::Secondary {
     673            0 :                     tracing::info!("🔁 Downloading latest layers to destination node {dest_ps}",);
     674            0 :                     self.secondary_download(self.tenant_shard_id, &dest_ps)
     675            0 :                         .await?;
     676            0 :                 }
     677            0 :             }
     678            0 :         }
     679              : 
     680            0 :         pausable_failpoint!("reconciler-live-migrate-pre-generation-inc");
     681              : 
     682              :         // Increment generation before attaching to new pageserver
     683              :         self.generation = Some(
     684            0 :             self.persistence
     685            0 :                 .increment_generation(self.tenant_shard_id, dest_ps.get_id())
     686            0 :                 .await?,
     687              :         );
     688              : 
     689            0 :         let dest_conf = build_location_config(
     690            0 :             &self.shard,
     691            0 :             &self.config,
     692            0 :             LocationConfigMode::AttachedMulti,
     693            0 :             self.generation,
     694            0 :             None,
     695            0 :         );
     696            0 : 
     697            0 :         tracing::info!("🔁 Attaching to pageserver {dest_ps}");
     698            0 :         self.location_config(&dest_ps, dest_conf, None, false)
     699            0 :             .await?;
     700              : 
     701            0 :         pausable_failpoint!("reconciler-live-migrate-pre-await-lsn");
     702              : 
     703            0 :         if let Some(baseline) = baseline_lsns {
     704            0 :             tracing::info!("🕑 Waiting for LSN to catch up...");
     705            0 :             tokio::select! {
     706            0 :                 r = self.await_lsn(self.tenant_shard_id, &dest_ps, baseline) => {r?;}
     707            0 :                 _ = self.cancel.cancelled() => {return Err(ReconcileError::Cancel)}
     708              :             };
     709            0 :         }
     710              : 
     711            0 :         tracing::info!("🔁 Notifying compute to use pageserver {dest_ps}");
     712              : 
     713              :         // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach
     714              :         // the origin without notifying compute, we will render the tenant unavailable.
     715            0 :         self.compute_notify_blocking(&origin_ps).await?;
     716            0 :         pausable_failpoint!("reconciler-live-migrate-post-notify");
     717              : 
     718              :         // Downgrade the origin to secondary.  If the tenant's policy is PlacementPolicy::Attached(0), then
     719              :         // this location will be deleted in the general case reconciliation that runs after this.
     720            0 :         let origin_secondary_conf = build_location_config(
     721            0 :             &self.shard,
     722            0 :             &self.config,
     723            0 :             LocationConfigMode::Secondary,
     724            0 :             None,
     725            0 :             Some(LocationConfigSecondary { warm: true }),
     726            0 :         );
     727            0 :         self.location_config(&origin_ps, origin_secondary_conf.clone(), None, false)
     728            0 :             .await?;
     729              :         // TODO: we should also be setting the ObservedState on earlier API calls, in case we fail
     730              :         // partway through.  In fact, all location conf API calls should be in a wrapper that sets
     731              :         // the observed state to None, then runs, then sets it to what we wrote.
     732            0 :         self.observed.locations.insert(
     733            0 :             origin_ps.get_id(),
     734            0 :             ObservedStateLocation {
     735            0 :                 conf: Some(origin_secondary_conf),
     736            0 :             },
     737            0 :         );
     738            0 : 
     739            0 :         pausable_failpoint!("reconciler-live-migrate-post-detach");
     740              : 
     741            0 :         tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",);
     742            0 :         let dest_final_conf = build_location_config(
     743            0 :             &self.shard,
     744            0 :             &self.config,
     745            0 :             LocationConfigMode::AttachedSingle,
     746            0 :             self.generation,
     747            0 :             None,
     748            0 :         );
     749            0 :         self.location_config(&dest_ps, dest_final_conf.clone(), None, false)
     750            0 :             .await?;
     751            0 :         self.observed.locations.insert(
     752            0 :             dest_ps.get_id(),
     753            0 :             ObservedStateLocation {
     754            0 :                 conf: Some(dest_final_conf),
     755            0 :             },
     756            0 :         );
     757            0 : 
     758            0 :         tracing::info!("✅ Migration complete");
     759              : 
     760            0 :         Ok(())
     761            0 :     }
     762              : 
     763            0 :     async fn maybe_refresh_observed(&mut self) -> Result<(), ReconcileError> {
     764              :         // If the attached node has uncertain state, read it from the pageserver before proceeding: this
     765              :         // is important to avoid spurious generation increments.
     766              :         //
     767              :         // We don't need to do this for secondary/detach locations because it's harmless to just PUT their
     768              :         // location conf, whereas for attached locations it can interrupt clients if we spuriously destroy/recreate
     769              :         // the `Timeline` object in the pageserver.
     770              : 
     771            0 :         let Some(attached_node) = self.intent.attached.as_ref() else {
     772              :             // Nothing to do
     773            0 :             return Ok(());
     774              :         };
     775              : 
     776            0 :         if matches!(
     777            0 :             self.observed.locations.get(&attached_node.get_id()),
     778              :             Some(ObservedStateLocation { conf: None })
     779              :         ) {
     780            0 :             let tenant_shard_id = self.tenant_shard_id;
     781            0 :             let observed_conf = match attached_node
     782            0 :                 .with_client_retries(
     783            0 :                     |client| async move { client.get_location_config(tenant_shard_id).await },
     784            0 :                     &self.http_client,
     785            0 :                     &self.service_config.pageserver_jwt_token,
     786            0 :                     1,
     787            0 :                     1,
     788            0 :                     Duration::from_secs(5),
     789            0 :                     &self.cancel,
     790            0 :                 )
     791            0 :                 .await
     792              :             {
     793            0 :                 Some(Ok(observed)) => Some(observed),
     794            0 :                 Some(Err(mgmt_api::Error::ApiError(status, _msg)))
     795            0 :                     if status == StatusCode::NOT_FOUND =>
     796            0 :                 {
     797            0 :                     None
     798              :                 }
     799            0 :                 Some(Err(e)) => return Err(e.into()),
     800            0 :                 None => return Err(ReconcileError::Cancel),
     801              :             };
     802            0 :             tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
     803            0 :             match observed_conf {
     804            0 :                 Some(conf) => {
     805            0 :                     // Pageserver returned a state: update it in observed.  This may still be an indeterminate (None) state,
     806            0 :                     // if internally the pageserver's TenantSlot was being mutated (e.g. some long running API call is still running)
     807            0 :                     self.observed
     808            0 :                         .locations
     809            0 :                         .insert(attached_node.get_id(), ObservedStateLocation { conf });
     810            0 :                 }
     811            0 :                 None => {
     812            0 :                     // Pageserver returned 404: we have confirmation that there is no state for this shard on that pageserver.
     813            0 :                     self.observed.locations.remove(&attached_node.get_id());
     814            0 :                 }
     815              :             }
     816            0 :         }
     817              : 
     818            0 :         Ok(())
     819            0 :     }
     820              : 
     821              :     /// Reconciling a tenant makes API calls to pageservers until the observed state
     822              :     /// matches the intended state.
     823              :     ///
     824              :     /// First we apply special case handling (e.g. for live migrations), and then a
     825              :     /// general case reconciliation where we walk through the intent by pageserver
     826              :     /// and call out to the pageserver to apply the desired state.
     827              :     ///
     828              :     /// An Ok(()) result indicates that we successfully attached the tenant, but _not_ that
     829              :     /// all locations for the tenant are in the expected state. When nodes that are to be detached
     830              :     /// or configured as secondary are unavailable, we may return Ok(()) but leave the shard in a
     831              :     /// state where it still requires later reconciliation.
     832            0 :     pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
     833            0 :         // Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
     834            0 :         self.maybe_refresh_observed().await?;
     835              : 
     836              :         // Special case: live migration
     837            0 :         self.maybe_live_migrate().await?;
     838              : 
     839              :         // If the attached pageserver is not attached, do so now.
     840            0 :         if let Some(node) = self.intent.attached.as_ref() {
     841              :             // If we are in an attached policy, then generation must have been set (null generations
     842              :             // are only present when a tenant is initially loaded with a secondary policy)
     843            0 :             debug_assert!(self.generation.is_some());
     844            0 :             let Some(generation) = self.generation else {
     845            0 :                 return Err(ReconcileError::Other(anyhow::anyhow!(
     846            0 :                     "Attempted to attach with NULL generation"
     847            0 :                 )));
     848              :             };
     849              : 
     850            0 :             let mut wanted_conf = attached_location_conf(
     851            0 :                 generation,
     852            0 :                 &self.shard,
     853            0 :                 &self.config,
     854            0 :                 &self.placement_policy,
     855            0 :             );
     856            0 :             match self.observed.locations.get(&node.get_id()) {
     857            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
     858            0 :                     // Nothing to do
     859            0 :                     tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
     860              :                 }
     861            0 :                 observed => {
     862              :                     // In all cases other than a matching observed configuration, we will
     863              :                     // reconcile this location.  This includes locations with different configurations, as well
     864              :                     // as locations with unknown (None) observed state.
     865              : 
     866              :                     // Incrementing generation is the safe general case, but is inefficient for changes that only
     867              :                     // modify some details (e.g. the tenant's config).
     868            0 :                     let increment_generation = match observed {
     869            0 :                         None => true,
     870            0 :                         Some(ObservedStateLocation { conf: None }) => true,
     871              :                         Some(ObservedStateLocation {
     872            0 :                             conf: Some(observed),
     873            0 :                         }) => {
     874            0 :                             let generations_match = observed.generation == wanted_conf.generation;
     875            0 : 
     876            0 :                             // We may skip incrementing the generation if the location is already in the expected mode and
     877            0 :                             // generation.  In principle it would also be safe to skip from certain other modes (e.g. AttachedStale),
     878            0 :                             // but such states are handled inside `live_migrate`, and if we see that state here we're cleaning up
     879            0 :                             // after a restart/crash, so fall back to the universally safe path of incrementing generation.
     880            0 :                             !generations_match || (observed.mode != wanted_conf.mode)
     881              :                         }
     882              :                     };
     883              : 
     884            0 :                     if increment_generation {
     885            0 :                         pausable_failpoint!("reconciler-pre-increment-generation");
     886              : 
     887            0 :                         let generation = self
     888            0 :                             .persistence
     889            0 :                             .increment_generation(self.tenant_shard_id, node.get_id())
     890            0 :                             .await?;
     891            0 :                         self.generation = Some(generation);
     892            0 :                         wanted_conf.generation = generation.into();
     893            0 :                     }
     894              : 
     895            0 :                     let diff = match observed {
     896              :                         Some(ObservedStateLocation {
     897            0 :                             conf: Some(observed),
     898            0 :                         }) => {
     899            0 :                             let diff = JsonDiff::diff(
     900            0 :                                 &serde_json::to_value(observed.clone()).unwrap(),
     901            0 :                                 &serde_json::to_value(wanted_conf.clone()).unwrap(),
     902            0 :                                 false,
     903            0 :                             );
     904              : 
     905            0 :                             if let Some(json_diff) = diff.diff {
     906            0 :                                 serde_json::to_string(&json_diff).unwrap_or("diff err".to_string())
     907              :                             } else {
     908            0 :                                 "unknown".to_string()
     909              :                             }
     910              :                         }
     911            0 :                         _ => "full".to_string(),
     912              :                     };
     913              : 
     914            0 :                     tracing::info!(node_id=%node.get_id(), "Observed configuration requires update: {diff}");
     915              : 
     916              :                     // Because `node` comes from a ref to &self, clone it before calling into a &mut self
     917              :                     // function: this could be avoided by refactoring the state mutated by location_config into
     918              :                     // a separate type to Self.
     919            0 :                     let node = node.clone();
     920            0 : 
     921            0 :                     // Use lazy=true, because we may run many of Self concurrently, and do not want to
     922            0 :                     // overload the pageserver with logical size calculations.
     923            0 :                     self.location_config(&node, wanted_conf, None, true).await?;
     924            0 :                     self.compute_notify().await?;
     925              :                 }
     926              :             }
     927            0 :         }
     928              : 
     929              :         // Configure secondary locations: if these were previously attached this
     930              :         // implicitly downgrades them from attached to secondary.
     931            0 :         let mut changes = Vec::new();
     932            0 :         for node in &self.intent.secondary {
     933            0 :             let wanted_conf = secondary_location_conf(&self.shard, &self.config);
     934            0 :             match self.observed.locations.get(&node.get_id()) {
     935            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
     936            0 :                     // Nothing to do
     937            0 :                     tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
     938              :                 }
     939              :                 _ => {
     940              :                     // Only try and configure secondary locations on nodes that are available.  This
     941              :                     // allows the reconciler to "succeed" while some secondaries are offline (e.g. after
     942              :                     // a node failure, where the failed node will have a secondary intent)
     943            0 :                     if node.is_available() {
     944            0 :                         tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
     945            0 :                         changes.push((node.clone(), wanted_conf))
     946              :                     } else {
     947            0 :                         tracing::info!(node_id=%node.get_id(), "Skipping configuration as secondary, node is unavailable");
     948            0 :                         self.observed
     949            0 :                             .locations
     950            0 :                             .insert(node.get_id(), ObservedStateLocation { conf: None });
     951              :                     }
     952              :                 }
     953              :             }
     954              :         }
     955              : 
     956              :         // Detach any extraneous pageservers that are no longer referenced
     957              :         // by our intent.
     958            0 :         for node in &self.detach {
     959            0 :             changes.push((
     960            0 :                 node.clone(),
     961            0 :                 LocationConfig {
     962            0 :                     mode: LocationConfigMode::Detached,
     963            0 :                     generation: None,
     964            0 :                     secondary_conf: None,
     965            0 :                     shard_number: self.shard.number.0,
     966            0 :                     shard_count: self.shard.count.literal(),
     967            0 :                     shard_stripe_size: self.shard.stripe_size.0,
     968            0 :                     tenant_conf: self.config.clone(),
     969            0 :                 },
     970            0 :             ));
     971            0 :         }
     972              : 
     973            0 :         for (node, conf) in changes {
     974            0 :             if self.cancel.is_cancelled() {
     975            0 :                 return Err(ReconcileError::Cancel);
     976            0 :             }
     977            0 :             // We only try to configure secondary locations if the node is available.  This does
     978            0 :             // not stop us succeeding with the reconcile, because our core goal is to make the
     979            0 :             // shard _available_ (the attached location), and configuring secondary locations
     980            0 :             // can be done lazily when the node becomes available (via background reconciliation).
     981            0 :             if node.is_available() {
     982            0 :                 self.location_config(&node, conf, None, false).await?;
     983              :             } else {
     984              :                 // If the node is unavailable, we skip and consider the reconciliation successful: this
     985              :                 // is a common case where a pageserver is marked unavailable: we demote a location on
     986              :                 // that unavailable pageserver to secondary.
     987            0 :                 tracing::info!("Skipping configuring secondary location {node}, it is unavailable");
     988            0 :                 self.observed
     989            0 :                     .locations
     990            0 :                     .insert(node.get_id(), ObservedStateLocation { conf: None });
     991              :             }
     992              :         }
     993              : 
     994              :         // The condition below identifies a detach. We must have no attached intent and
     995              :         // must have been attached to something previously. Pass this information to
     996              :         // the [`ComputeHook`] such that it can update its tenant-wide state.
     997            0 :         if self.intent.attached.is_none() && !self.detach.is_empty() {
     998            0 :             // TODO: Consider notifying control plane about detaches. This would avoid situations
     999            0 :             // where the compute tries to start-up with a stale set of pageservers.
    1000            0 :             self.compute_hook
    1001            0 :                 .handle_detach(self.tenant_shard_id, self.shard.stripe_size);
    1002            0 :         }
    1003              : 
    1004            0 :         pausable_failpoint!("reconciler-epilogue");
    1005              : 
    1006            0 :         Ok(())
    1007            0 :     }
    1008              : 
    1009            0 :     pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> {
    1010              :         // Whenever a particular Reconciler emits a notification, it is always notifying for the intended
    1011              :         // destination.
    1012            0 :         if let Some(node) = &self.intent.attached {
    1013            0 :             let result = self
    1014            0 :                 .compute_hook
    1015            0 :                 .notify(
    1016            0 :                     compute_hook::ShardUpdate {
    1017            0 :                         tenant_shard_id: self.tenant_shard_id,
    1018            0 :                         node_id: node.get_id(),
    1019            0 :                         stripe_size: self.shard.stripe_size,
    1020            0 :                         preferred_az: self.preferred_az.as_ref().map(Cow::Borrowed),
    1021            0 :                     },
    1022            0 :                     &self.cancel,
    1023            0 :                 )
    1024            0 :                 .await;
    1025            0 :             if let Err(e) = &result {
    1026              :                 // Set this flag so that in our ReconcileResult we will set the flag on the shard that it
    1027              :                 // needs to retry at some point.
    1028            0 :                 self.compute_notify_failure = true;
    1029              : 
    1030              :                 // It is up to the caller whether they want to drop out on this error, but they don't have to:
    1031              :                 // in general we should avoid letting unavailability of the cloud control plane stop us from
    1032              :                 // making progress.
    1033            0 :                 match e {
    1034            0 :                     // 404s from cplane during tenant creation are expected.
    1035            0 :                     // Cplane only persists the shards to the database after
    1036            0 :                     // creating the tenant and the timeline. If we notify before
    1037            0 :                     // that, we'll get a 404.
    1038            0 :                     //
    1039            0 :                     // This is fine because tenant creations happen via /location_config
    1040            0 :                     // and that returns the list of locations in the response. Hence, we
    1041            0 :                     // silence the error and return Ok(()) here. Reconciliation will still
    1042            0 :                     // be retried because we set [`Reconciler::compute_notify_failure`] above.
    1043            0 :                     NotifyError::Unexpected(hyper::StatusCode::NOT_FOUND)
    1044            0 :                         if self.reconciler_config.tenant_creation_hint() =>
    1045            0 :                     {
    1046            0 :                         return Ok(());
    1047              :                     }
    1048            0 :                     NotifyError::ShuttingDown => {}
    1049              :                     _ => {
    1050            0 :                         tracing::warn!(
    1051            0 :                             "Failed to notify compute of attached pageserver {node}: {e}"
    1052              :                         );
    1053              :                     }
    1054              :                 }
    1055            0 :             }
    1056            0 :             result
    1057              :         } else {
    1058            0 :             Ok(())
    1059              :         }
    1060            0 :     }
    1061              : 
    1062              :     /// Compare the observed state snapshot from when the reconcile was created
    1063              :     /// with the final observed state in order to generate observed state deltas.
    1064            0 :     pub(crate) fn observed_deltas(&self) -> Vec<ObservedStateDelta> {
    1065            0 :         let mut deltas = Vec::default();
    1066              : 
    1067            0 :         for (node_id, location) in &self.observed.locations {
    1068            0 :             let previous_location = self.original_observed.locations.get(node_id);
    1069            0 :             let do_upsert = match previous_location {
    1070              :                 // Location config changed for node
    1071            0 :                 Some(prev) if location.conf != prev.conf => true,
    1072              :                 // New location config for node
    1073            0 :                 None => true,
    1074              :                 // Location config has not changed for node
    1075            0 :                 _ => false,
    1076              :             };
    1077              : 
    1078            0 :             if do_upsert {
    1079            0 :                 deltas.push(ObservedStateDelta::Upsert(Box::new((
    1080            0 :                     *node_id,
    1081            0 :                     location.clone(),
    1082            0 :                 ))));
    1083            0 :             }
    1084              :         }
    1085              : 
    1086            0 :         for node_id in self.original_observed.locations.keys() {
    1087            0 :             if !self.observed.locations.contains_key(node_id) {
    1088            0 :                 deltas.push(ObservedStateDelta::Delete(*node_id));
    1089            0 :             }
    1090              :         }
    1091              : 
    1092            0 :         deltas
    1093            0 :     }
    1094              : 
    1095              :     /// Keep trying to notify the compute indefinitely, only dropping out if:
    1096              :     /// - the node `origin` becomes unavailable -> Ok(())
    1097              :     /// - the node `origin` no longer has our tenant shard attached -> Ok(())
    1098              :     /// - our cancellation token fires -> Err(ReconcileError::Cancelled)
    1099              :     ///
    1100              :     /// This is used during live migration, where we do not wish to detach
    1101              :     /// an origin location until the compute definitely knows about the new
    1102              :     /// location.
    1103              :     ///
    1104              :     /// In cases where the origin node becomes unavailable, we return success, indicating
    1105              :     /// to the caller that they should continue irrespective of whether the compute was notified,
    1106              :     /// because the origin node is unusable anyway.  Notification will be retried later via the
    1107              :     /// [`Self::compute_notify_failure`] flag.
    1108            0 :     async fn compute_notify_blocking(&mut self, origin: &Node) -> Result<(), ReconcileError> {
    1109            0 :         let mut notify_attempts = 0;
    1110            0 :         while let Err(e) = self.compute_notify().await {
    1111            0 :             match e {
    1112            0 :                 NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)),
    1113            0 :                 NotifyError::ShuttingDown => return Err(ReconcileError::Cancel),
    1114              :                 _ => {
    1115            0 :                     tracing::warn!(
    1116            0 :                         "Live migration blocked by compute notification error, retrying: {e}"
    1117              :                     );
    1118              :                 }
    1119              :             }
    1120              : 
    1121              :             // Did the origin pageserver become unavailable?
    1122            0 :             if !origin.is_available() {
    1123            0 :                 tracing::info!("Giving up on compute notification because {origin} is unavailable");
    1124            0 :                 break;
    1125            0 :             }
    1126            0 : 
    1127            0 :             // Does the origin pageserver still host the shard we are interested in?  We should only
    1128            0 :             // continue waiting for compute notification to be acked if the old location is still usable.
    1129            0 :             let tenant_shard_id = self.tenant_shard_id;
    1130            0 :             match origin
    1131            0 :                 .with_client_retries(
    1132            0 :                     |client| async move { client.get_location_config(tenant_shard_id).await },
    1133            0 :                     &self.http_client,
    1134            0 :                     &self.service_config.pageserver_jwt_token,
    1135            0 :                     1,
    1136            0 :                     3,
    1137            0 :                     Duration::from_secs(5),
    1138            0 :                     &self.cancel,
    1139            0 :                 )
    1140            0 :                 .await
    1141              :             {
    1142            0 :                 Some(Ok(Some(location_conf))) => {
    1143            0 :                     if matches!(
    1144            0 :                         location_conf.mode,
    1145              :                         LocationConfigMode::AttachedMulti
    1146              :                             | LocationConfigMode::AttachedSingle
    1147              :                             | LocationConfigMode::AttachedStale
    1148              :                     ) {
    1149            0 :                         tracing::debug!(
    1150            0 :                             "Still attached to {origin}, will wait & retry compute notification"
    1151              :                         );
    1152              :                     } else {
    1153            0 :                         tracing::info!(
    1154            0 :                             "Giving up on compute notification because {origin} is in state {:?}",
    1155              :                             location_conf.mode
    1156              :                         );
    1157            0 :                         return Ok(());
    1158              :                     }
    1159              :                     // Fall through
    1160              :                 }
    1161              :                 Some(Ok(None)) => {
    1162            0 :                     tracing::info!(
    1163            0 :                         "No longer attached to {origin}, giving up on compute notification"
    1164              :                     );
    1165            0 :                     return Ok(());
    1166              :                 }
    1167            0 :                 Some(Err(e)) => {
    1168            0 :                     match e {
    1169              :                         mgmt_api::Error::Cancelled => {
    1170            0 :                             tracing::info!(
    1171            0 :                                 "Giving up on compute notification because {origin} is unavailable"
    1172              :                             );
    1173            0 :                             return Ok(());
    1174              :                         }
    1175              :                         mgmt_api::Error::ApiError(StatusCode::NOT_FOUND, _) => {
    1176            0 :                             tracing::info!(
    1177            0 :                                 "No longer attached to {origin}, giving up on compute notification"
    1178              :                             );
    1179            0 :                             return Ok(());
    1180              :                         }
    1181            0 :                         e => {
    1182            0 :                             // Other API errors are unexpected here.
    1183            0 :                             tracing::warn!("Unexpected error checking location on {origin}: {e}");
    1184              : 
    1185              :                             // Fall through, we will retry compute notification.
    1186              :                         }
    1187              :                     }
    1188              :                 }
    1189            0 :                 None => return Err(ReconcileError::Cancel),
    1190              :             };
    1191              : 
    1192            0 :             exponential_backoff(
    1193            0 :                 notify_attempts,
    1194            0 :                 // Generous waits: control plane operations which might be blocking us usually complete on the order
    1195            0 :                 // of hundreds to thousands of milliseconds, so no point busy polling.
    1196            0 :                 1.0,
    1197            0 :                 10.0,
    1198            0 :                 &self.cancel,
    1199            0 :             )
    1200            0 :             .await;
    1201            0 :             notify_attempts += 1;
    1202              :         }
    1203              : 
    1204            0 :         Ok(())
    1205            0 :     }
    1206              : }
    1207              : 
    1208              : /// We tweak the externally-set TenantConfig while configuring
    1209              : /// locations, using our awareness of whether secondary locations
    1210              : /// are in use to automatically enable/disable heatmap uploads.
    1211            0 : fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig {
    1212            0 :     let mut config = config.clone();
    1213            0 :     if has_secondaries {
    1214            0 :         if config.heatmap_period.is_none() {
    1215            0 :             config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD);
    1216            0 :         }
    1217            0 :     } else {
    1218            0 :         config.heatmap_period = None;
    1219            0 :     }
    1220            0 :     config
    1221            0 : }
    1222              : 
    1223            0 : pub(crate) fn attached_location_conf(
    1224            0 :     generation: Generation,
    1225            0 :     shard: &ShardIdentity,
    1226            0 :     config: &TenantConfig,
    1227            0 :     policy: &PlacementPolicy,
    1228            0 : ) -> LocationConfig {
    1229            0 :     let has_secondaries = match policy {
    1230              :         PlacementPolicy::Attached(0) | PlacementPolicy::Detached | PlacementPolicy::Secondary => {
    1231            0 :             false
    1232              :         }
    1233            0 :         PlacementPolicy::Attached(_) => true,
    1234              :     };
    1235              : 
    1236            0 :     LocationConfig {
    1237            0 :         mode: LocationConfigMode::AttachedSingle,
    1238            0 :         generation: generation.into(),
    1239            0 :         secondary_conf: None,
    1240            0 :         shard_number: shard.number.0,
    1241            0 :         shard_count: shard.count.literal(),
    1242            0 :         shard_stripe_size: shard.stripe_size.0,
    1243            0 :         tenant_conf: ha_aware_config(config, has_secondaries),
    1244            0 :     }
    1245            0 : }
    1246              : 
    1247            0 : pub(crate) fn secondary_location_conf(
    1248            0 :     shard: &ShardIdentity,
    1249            0 :     config: &TenantConfig,
    1250            0 : ) -> LocationConfig {
    1251            0 :     LocationConfig {
    1252            0 :         mode: LocationConfigMode::Secondary,
    1253            0 :         generation: None,
    1254            0 :         secondary_conf: Some(LocationConfigSecondary { warm: true }),
    1255            0 :         shard_number: shard.number.0,
    1256            0 :         shard_count: shard.count.literal(),
    1257            0 :         shard_stripe_size: shard.stripe_size.0,
    1258            0 :         tenant_conf: ha_aware_config(config, true),
    1259            0 :     }
    1260            0 : }
        

Generated by: LCOV version 2.1-beta