LCOV - code coverage report
Current view: top level - storage_controller/src - reconciler.rs (source / functions) Coverage Total Hit
Test: fabb29a6339542ee130cd1d32b534fafdc0be240.info Lines: 0.0 % 523 0
Test Date: 2024-06-25 13:20:00 Functions: 0.0 % 42 0

            Line data    Source code
       1              : use crate::pageserver_client::PageserverClient;
       2              : use crate::persistence::Persistence;
       3              : use crate::service;
       4              : use pageserver_api::models::{
       5              :     LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
       6              : };
       7              : use pageserver_api::shard::{ShardIdentity, TenantShardId};
       8              : use pageserver_client::mgmt_api;
       9              : use reqwest::StatusCode;
      10              : use std::collections::HashMap;
      11              : use std::sync::Arc;
      12              : use std::time::{Duration, Instant};
      13              : use tokio_util::sync::CancellationToken;
      14              : use utils::generation::Generation;
      15              : use utils::id::{NodeId, TimelineId};
      16              : use utils::lsn::Lsn;
      17              : use utils::sync::gate::GateGuard;
      18              : 
      19              : use crate::compute_hook::{ComputeHook, NotifyError};
      20              : use crate::node::Node;
      21              : use crate::tenant_shard::{IntentState, ObservedState, ObservedStateLocation};
      22              : 
      23              : const DEFAULT_HEATMAP_PERIOD: &str = "60s";
      24              : 
      25              : /// Object with the lifetime of the background reconcile task that is created
      26              : /// for tenants which have a difference between their intent and observed states.
      27              : pub(super) struct Reconciler {
      28              :     /// See [`crate::tenant_shard::TenantShard`] for the meanings of these fields: they are a snapshot
      29              :     /// of a tenant's state from when we spawned a reconcile task.
      30              :     pub(super) tenant_shard_id: TenantShardId,
      31              :     pub(crate) shard: ShardIdentity,
      32              :     pub(crate) generation: Option<Generation>,
      33              :     pub(crate) intent: TargetState,
      34              : 
      35              :     /// Nodes not referenced by [`Self::intent`], from which we should try
      36              :     /// to detach this tenant shard.
      37              :     pub(crate) detach: Vec<Node>,
      38              : 
      39              :     pub(crate) config: TenantConfig,
      40              :     pub(crate) observed: ObservedState,
      41              : 
      42              :     pub(crate) service_config: service::Config,
      43              : 
      44              :     /// A hook to notify the running postgres instances when we change the location
      45              :     /// of a tenant.  Use this via [`Self::compute_notify`] to update our failure flag
      46              :     /// and guarantee eventual retries.
      47              :     pub(crate) compute_hook: Arc<ComputeHook>,
      48              : 
      49              :     /// To avoid stalling if the cloud control plane is unavailable, we may proceed
      50              :     /// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
      51              :     /// so that we can set [`crate::tenant_shard::TenantShard::pending_compute_notification`] to ensure a later retry.
      52              :     pub(crate) compute_notify_failure: bool,
      53              : 
      54              :     /// Reconciler is responsible for keeping alive semaphore units that limit concurrency on how many
      55              :     /// we will spawn.
      56              :     pub(crate) _resource_units: ReconcileUnits,
      57              : 
      58              :     /// A means to abort background reconciliation: it is essential to
      59              :     /// call this when something changes in the original TenantShard that
      60              :     /// will make this reconciliation impossible or unnecessary, for
      61              :     /// example when a pageserver node goes offline, or the PlacementPolicy for
      62              :     /// the tenant is changed.
      63              :     pub(crate) cancel: CancellationToken,
      64              : 
      65              :     /// Reconcilers are registered with a Gate so that during a graceful shutdown we
      66              :     /// can wait for all the reconcilers to respond to their cancellation tokens.
      67              :     pub(crate) _gate_guard: GateGuard,
      68              : 
      69              :     /// Access to persistent storage for updating generation numbers
      70              :     pub(crate) persistence: Arc<Persistence>,
      71              : }
      72              : 
      73              : /// RAII resource units granted to a Reconciler, which it should keep alive until it finishes doing I/O
      74              : pub(crate) struct ReconcileUnits {
      75              :     _sem_units: tokio::sync::OwnedSemaphorePermit,
      76              : }
      77              : 
      78              : impl ReconcileUnits {
      79            0 :     pub(crate) fn new(sem_units: tokio::sync::OwnedSemaphorePermit) -> Self {
      80            0 :         Self {
      81            0 :             _sem_units: sem_units,
      82            0 :         }
      83            0 :     }
      84              : }
      85              : 
      86              : /// This is a snapshot of [`crate::tenant_shard::IntentState`], but it does not do any
      87              : /// reference counting for Scheduler.  The IntentState is what the scheduler works with,
      88              : /// and the TargetState is just the instruction for a particular Reconciler run.
      89              : #[derive(Debug)]
      90              : pub(crate) struct TargetState {
      91              :     pub(crate) attached: Option<Node>,
      92              :     pub(crate) secondary: Vec<Node>,
      93              : }
      94              : 
      95              : impl TargetState {
      96            0 :     pub(crate) fn from_intent(nodes: &HashMap<NodeId, Node>, intent: &IntentState) -> Self {
      97            0 :         Self {
      98            0 :             attached: intent.get_attached().map(|n| {
      99            0 :                 nodes
     100            0 :                     .get(&n)
     101            0 :                     .expect("Intent attached referenced non-existent node")
     102            0 :                     .clone()
     103            0 :             }),
     104            0 :             secondary: intent
     105            0 :                 .get_secondary()
     106            0 :                 .iter()
     107            0 :                 .map(|n| {
     108            0 :                     nodes
     109            0 :                         .get(n)
     110            0 :                         .expect("Intent secondary referenced non-existent node")
     111            0 :                         .clone()
     112            0 :                 })
     113            0 :                 .collect(),
     114            0 :         }
     115            0 :     }
     116              : }
     117              : 
     118            0 : #[derive(thiserror::Error, Debug)]
     119              : pub(crate) enum ReconcileError {
     120              :     #[error(transparent)]
     121              :     Remote(#[from] mgmt_api::Error),
     122              :     #[error(transparent)]
     123              :     Notify(#[from] NotifyError),
     124              :     #[error("Cancelled")]
     125              :     Cancel,
     126              :     #[error(transparent)]
     127              :     Other(#[from] anyhow::Error),
     128              : }
     129              : 
     130              : impl Reconciler {
     131            0 :     async fn location_config(
     132            0 :         &mut self,
     133            0 :         node: &Node,
     134            0 :         config: LocationConfig,
     135            0 :         flush_ms: Option<Duration>,
     136            0 :         lazy: bool,
     137            0 :     ) -> Result<(), ReconcileError> {
     138            0 :         if !node.is_available() && config.mode == LocationConfigMode::Detached {
     139              :             // Attempts to detach from offline nodes may be imitated without doing I/O: a node which is offline
     140              :             // will get fully reconciled wrt the shard's intent state when it is reactivated, irrespective of
     141              :             // what we put into `observed`, in [`crate::service::Service::node_activate_reconcile`]
     142            0 :             tracing::info!("Node {node} is unavailable during detach: proceeding anyway, it will be detached on next activation");
     143            0 :             self.observed.locations.remove(&node.get_id());
     144            0 :             return Ok(());
     145            0 :         }
     146            0 : 
     147            0 :         self.observed
     148            0 :             .locations
     149            0 :             .insert(node.get_id(), ObservedStateLocation { conf: None });
     150            0 : 
     151            0 :         // TODO: amend locations that use long-polling: they will hit this timeout.
     152            0 :         let timeout = Duration::from_secs(25);
     153            0 : 
     154            0 :         tracing::info!("location_config({node}) calling: {:?}", config);
     155            0 :         let tenant_shard_id = self.tenant_shard_id;
     156            0 :         let config_ref = &config;
     157            0 :         match node
     158            0 :             .with_client_retries(
     159            0 :                 |client| async move {
     160            0 :                     let config = config_ref.clone();
     161            0 :                     client
     162            0 :                         .location_config(tenant_shard_id, config.clone(), flush_ms, lazy)
     163            0 :                         .await
     164            0 :                 },
     165            0 :                 &self.service_config.jwt_token,
     166            0 :                 1,
     167            0 :                 3,
     168            0 :                 timeout,
     169            0 :                 &self.cancel,
     170            0 :             )
     171            0 :             .await
     172              :         {
     173            0 :             Some(Ok(_)) => {}
     174            0 :             Some(Err(e)) => return Err(e.into()),
     175            0 :             None => return Err(ReconcileError::Cancel),
     176              :         };
     177            0 :         tracing::info!("location_config({node}) complete: {:?}", config);
     178              : 
     179            0 :         match config.mode {
     180            0 :             LocationConfigMode::Detached => {
     181            0 :                 self.observed.locations.remove(&node.get_id());
     182            0 :             }
     183            0 :             _ => {
     184            0 :                 self.observed
     185            0 :                     .locations
     186            0 :                     .insert(node.get_id(), ObservedStateLocation { conf: Some(config) });
     187            0 :             }
     188              :         }
     189              : 
     190            0 :         Ok(())
     191            0 :     }
     192              : 
     193            0 :     fn get_node(&self, node_id: &NodeId) -> Option<&Node> {
     194            0 :         if let Some(node) = self.intent.attached.as_ref() {
     195            0 :             if node.get_id() == *node_id {
     196            0 :                 return Some(node);
     197            0 :             }
     198            0 :         }
     199              : 
     200            0 :         if let Some(node) = self
     201            0 :             .intent
     202            0 :             .secondary
     203            0 :             .iter()
     204            0 :             .find(|n| n.get_id() == *node_id)
     205              :         {
     206            0 :             return Some(node);
     207            0 :         }
     208              : 
     209            0 :         if let Some(node) = self.detach.iter().find(|n| n.get_id() == *node_id) {
     210            0 :             return Some(node);
     211            0 :         }
     212            0 : 
     213            0 :         None
     214            0 :     }
     215              : 
     216            0 :     async fn maybe_live_migrate(&mut self) -> Result<(), ReconcileError> {
     217            0 :         let destination = if let Some(node) = &self.intent.attached {
     218            0 :             match self.observed.locations.get(&node.get_id()) {
     219            0 :                 Some(conf) => {
     220              :                     // We will do a live migration only if the intended destination is not
     221              :                     // currently in an attached state.
     222            0 :                     match &conf.conf {
     223            0 :                         Some(conf) if conf.mode == LocationConfigMode::Secondary => {
     224            0 :                             // Fall through to do a live migration
     225            0 :                             node
     226              :                         }
     227              :                         None | Some(_) => {
     228              :                             // Attached or uncertain: don't do a live migration, proceed
     229              :                             // with a general-case reconciliation
     230            0 :                             tracing::info!("maybe_live_migrate: destination is None or attached");
     231            0 :                             return Ok(());
     232              :                         }
     233              :                     }
     234              :                 }
     235              :                 None => {
     236              :                     // Our destination is not attached: maybe live migrate if some other
     237              :                     // node is currently attached.  Fall through.
     238            0 :                     node
     239              :                 }
     240              :             }
     241              :         } else {
     242              :             // No intent to be attached
     243            0 :             tracing::info!("maybe_live_migrate: no attached intent");
     244            0 :             return Ok(());
     245              :         };
     246              : 
     247            0 :         let mut origin = None;
     248            0 :         for (node_id, state) in &self.observed.locations {
     249            0 :             if let Some(observed_conf) = &state.conf {
     250            0 :                 if observed_conf.mode == LocationConfigMode::AttachedSingle {
     251              :                     // We will only attempt live migration if the origin is not offline: this
     252              :                     // avoids trying to do it while reconciling after responding to an HA failover.
     253            0 :                     if let Some(node) = self.get_node(node_id) {
     254            0 :                         if node.is_available() {
     255            0 :                             origin = Some(node.clone());
     256            0 :                             break;
     257            0 :                         }
     258            0 :                     }
     259            0 :                 }
     260            0 :             }
     261              :         }
     262              : 
     263            0 :         let Some(origin) = origin else {
     264            0 :             tracing::info!("maybe_live_migrate: no origin found");
     265            0 :             return Ok(());
     266              :         };
     267              : 
     268              :         // We have an origin and a destination: proceed to do the live migration
     269            0 :         tracing::info!("Live migrating {}->{}", origin, destination);
     270            0 :         self.live_migrate(origin, destination.clone()).await?;
     271              : 
     272            0 :         Ok(())
     273            0 :     }
     274              : 
     275            0 :     async fn get_lsns(
     276            0 :         &self,
     277            0 :         tenant_shard_id: TenantShardId,
     278            0 :         node: &Node,
     279            0 :     ) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
     280            0 :         let client = PageserverClient::new(
     281            0 :             node.get_id(),
     282            0 :             node.base_url(),
     283            0 :             self.service_config.jwt_token.as_deref(),
     284            0 :         );
     285              : 
     286            0 :         let timelines = client.timeline_list(&tenant_shard_id).await?;
     287            0 :         Ok(timelines
     288            0 :             .into_iter()
     289            0 :             .map(|t| (t.timeline_id, t.last_record_lsn))
     290            0 :             .collect())
     291            0 :     }
     292              : 
     293            0 :     async fn secondary_download(
     294            0 :         &self,
     295            0 :         tenant_shard_id: TenantShardId,
     296            0 :         node: &Node,
     297            0 :     ) -> Result<(), ReconcileError> {
     298            0 :         // This is not the timeout for a request, but the total amount of time we're willing to wait
     299            0 :         // for a secondary location to get up to date before
     300            0 :         const TOTAL_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(300);
     301            0 : 
     302            0 :         // This the long-polling interval for the secondary download requests we send to destination pageserver
     303            0 :         // during a migration.
     304            0 :         const REQUEST_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(20);
     305            0 : 
     306            0 :         let started_at = Instant::now();
     307              : 
     308              :         loop {
     309            0 :             let (status, progress) = match node
     310            0 :                 .with_client_retries(
     311            0 :                     |client| async move {
     312            0 :                         client
     313            0 :                             .tenant_secondary_download(
     314            0 :                                 tenant_shard_id,
     315            0 :                                 Some(REQUEST_DOWNLOAD_TIMEOUT),
     316            0 :                             )
     317            0 :                             .await
     318            0 :                     },
     319            0 :                     &self.service_config.jwt_token,
     320            0 :                     1,
     321            0 :                     3,
     322            0 :                     REQUEST_DOWNLOAD_TIMEOUT * 2,
     323            0 :                     &self.cancel,
     324            0 :                 )
     325            0 :                 .await
     326              :             {
     327            0 :                 None => Err(ReconcileError::Cancel),
     328            0 :                 Some(Ok(v)) => Ok(v),
     329            0 :                 Some(Err(e)) => {
     330            0 :                     // Give up, but proceed: it's unfortunate if we couldn't freshen the destination before
     331            0 :                     // attaching, but we should not let an issue with a secondary location stop us proceeding
     332            0 :                     // with a live migration.
     333            0 :                     tracing::warn!("Failed to prepare by downloading layers on node {node}: {e})");
     334            0 :                     return Ok(());
     335              :                 }
     336            0 :             }?;
     337              : 
     338            0 :             if status == StatusCode::OK {
     339            0 :                 tracing::info!(
     340            0 :                     "Downloads to {} complete: {}/{} layers, {}/{} bytes",
     341              :                     node,
     342              :                     progress.layers_downloaded,
     343              :                     progress.layers_total,
     344              :                     progress.bytes_downloaded,
     345              :                     progress.bytes_total
     346              :                 );
     347            0 :                 return Ok(());
     348            0 :             } else if status == StatusCode::ACCEPTED {
     349            0 :                 let total_runtime = started_at.elapsed();
     350            0 :                 if total_runtime > TOTAL_DOWNLOAD_TIMEOUT {
     351            0 :                     tracing::warn!("Timed out after {}ms downloading layers to {node}.  Progress so far: {}/{} layers, {}/{} bytes",
     352            0 :                         total_runtime.as_millis(),
     353              :                         progress.layers_downloaded,
     354              :                         progress.layers_total,
     355              :                         progress.bytes_downloaded,
     356              :                         progress.bytes_total
     357              :                     );
     358              :                     // Give up, but proceed: an incompletely warmed destination doesn't prevent migration working,
     359              :                     // it just makes the I/O performance for users less good.
     360            0 :                     return Ok(());
     361            0 :                 }
     362            0 : 
     363            0 :                 // Log and proceed around the loop to retry.  We don't sleep between requests, because our HTTP call
     364            0 :                 // to the pageserver is a long-poll.
     365            0 :                 tracing::info!(
     366            0 :                     "Downloads to {} not yet complete: {}/{} layers, {}/{} bytes",
     367              :                     node,
     368              :                     progress.layers_downloaded,
     369              :                     progress.layers_total,
     370              :                     progress.bytes_downloaded,
     371              :                     progress.bytes_total
     372              :                 );
     373            0 :             }
     374              :         }
     375            0 :     }
     376              : 
     377            0 :     async fn await_lsn(
     378            0 :         &self,
     379            0 :         tenant_shard_id: TenantShardId,
     380            0 :         node: &Node,
     381            0 :         baseline: HashMap<TimelineId, Lsn>,
     382            0 :     ) -> anyhow::Result<()> {
     383              :         loop {
     384            0 :             let latest = match self.get_lsns(tenant_shard_id, node).await {
     385            0 :                 Ok(l) => l,
     386            0 :                 Err(e) => {
     387            0 :                     tracing::info!("🕑 Can't get LSNs on node {node} yet, waiting ({e})",);
     388            0 :                     std::thread::sleep(Duration::from_millis(500));
     389            0 :                     continue;
     390              :                 }
     391              :             };
     392              : 
     393            0 :             let mut any_behind: bool = false;
     394            0 :             for (timeline_id, baseline_lsn) in &baseline {
     395            0 :                 match latest.get(timeline_id) {
     396            0 :                     Some(latest_lsn) => {
     397            0 :                         tracing::info!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
     398            0 :                         if latest_lsn < baseline_lsn {
     399            0 :                             any_behind = true;
     400            0 :                         }
     401              :                     }
     402            0 :                     None => {
     403            0 :                         // Expected timeline isn't yet visible on migration destination.
     404            0 :                         // (IRL we would have to account for timeline deletion, but this
     405            0 :                         //  is just test helper)
     406            0 :                         any_behind = true;
     407            0 :                     }
     408              :                 }
     409              :             }
     410              : 
     411            0 :             if !any_behind {
     412            0 :                 tracing::info!("✅ LSN caught up.  Proceeding...");
     413            0 :                 break;
     414            0 :             } else {
     415            0 :                 std::thread::sleep(Duration::from_millis(500));
     416            0 :             }
     417              :         }
     418              : 
     419            0 :         Ok(())
     420            0 :     }
     421              : 
     422            0 :     pub async fn live_migrate(
     423            0 :         &mut self,
     424            0 :         origin_ps: Node,
     425            0 :         dest_ps: Node,
     426            0 :     ) -> Result<(), ReconcileError> {
     427            0 :         // `maybe_live_migrate` is responsibble for sanity of inputs
     428            0 :         assert!(origin_ps.get_id() != dest_ps.get_id());
     429              : 
     430            0 :         fn build_location_config(
     431            0 :             shard: &ShardIdentity,
     432            0 :             config: &TenantConfig,
     433            0 :             mode: LocationConfigMode,
     434            0 :             generation: Option<Generation>,
     435            0 :             secondary_conf: Option<LocationConfigSecondary>,
     436            0 :         ) -> LocationConfig {
     437            0 :             LocationConfig {
     438            0 :                 mode,
     439            0 :                 generation: generation.map(|g| g.into().unwrap()),
     440            0 :                 secondary_conf,
     441            0 :                 tenant_conf: config.clone(),
     442            0 :                 shard_number: shard.number.0,
     443            0 :                 shard_count: shard.count.literal(),
     444            0 :                 shard_stripe_size: shard.stripe_size.0,
     445            0 :             }
     446            0 :         }
     447              : 
     448            0 :         tracing::info!("🔁 Switching origin node {origin_ps} to stale mode",);
     449              : 
     450              :         // FIXME: it is incorrect to use self.generation here, we should use the generation
     451              :         // from the ObservedState of the origin pageserver (it might be older than self.generation)
     452            0 :         let stale_conf = build_location_config(
     453            0 :             &self.shard,
     454            0 :             &self.config,
     455            0 :             LocationConfigMode::AttachedStale,
     456            0 :             self.generation,
     457            0 :             None,
     458            0 :         );
     459            0 :         self.location_config(&origin_ps, stale_conf, Some(Duration::from_secs(10)), false)
     460            0 :             .await?;
     461              : 
     462            0 :         let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps).await?);
     463              : 
     464              :         // If we are migrating to a destination that has a secondary location, warm it up first
     465            0 :         if let Some(destination_conf) = self.observed.locations.get(&dest_ps.get_id()) {
     466            0 :             if let Some(destination_conf) = &destination_conf.conf {
     467            0 :                 if destination_conf.mode == LocationConfigMode::Secondary {
     468            0 :                     tracing::info!("🔁 Downloading latest layers to destination node {dest_ps}",);
     469            0 :                     self.secondary_download(self.tenant_shard_id, &dest_ps)
     470            0 :                         .await?;
     471            0 :                 }
     472            0 :             }
     473            0 :         }
     474              : 
     475              :         // Increment generation before attaching to new pageserver
     476              :         self.generation = Some(
     477            0 :             self.persistence
     478            0 :                 .increment_generation(self.tenant_shard_id, dest_ps.get_id())
     479            0 :                 .await?,
     480              :         );
     481              : 
     482            0 :         let dest_conf = build_location_config(
     483            0 :             &self.shard,
     484            0 :             &self.config,
     485            0 :             LocationConfigMode::AttachedMulti,
     486            0 :             self.generation,
     487            0 :             None,
     488            0 :         );
     489            0 : 
     490            0 :         tracing::info!("🔁 Attaching to pageserver {dest_ps}");
     491            0 :         self.location_config(&dest_ps, dest_conf, None, false)
     492            0 :             .await?;
     493              : 
     494            0 :         if let Some(baseline) = baseline_lsns {
     495            0 :             tracing::info!("🕑 Waiting for LSN to catch up...");
     496            0 :             self.await_lsn(self.tenant_shard_id, &dest_ps, baseline)
     497            0 :                 .await?;
     498            0 :         }
     499              : 
     500            0 :         tracing::info!("🔁 Notifying compute to use pageserver {dest_ps}");
     501              : 
     502              :         // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach
     503              :         // the origin without notifying compute, we will render the tenant unavailable.
     504            0 :         while let Err(e) = self.compute_notify().await {
     505            0 :             match e {
     506            0 :                 NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)),
     507            0 :                 NotifyError::ShuttingDown => return Err(ReconcileError::Cancel),
     508              :                 _ => {
     509            0 :                     tracing::warn!(
     510            0 :                         "Live migration blocked by compute notification error, retrying: {e}"
     511              :                     );
     512              :                 }
     513              :             }
     514              :         }
     515              : 
     516              :         // Downgrade the origin to secondary.  If the tenant's policy is PlacementPolicy::Attached(0), then
     517              :         // this location will be deleted in the general case reconciliation that runs after this.
     518            0 :         let origin_secondary_conf = build_location_config(
     519            0 :             &self.shard,
     520            0 :             &self.config,
     521            0 :             LocationConfigMode::Secondary,
     522            0 :             None,
     523            0 :             Some(LocationConfigSecondary { warm: true }),
     524            0 :         );
     525            0 :         self.location_config(&origin_ps, origin_secondary_conf.clone(), None, false)
     526            0 :             .await?;
     527              :         // TODO: we should also be setting the ObservedState on earlier API calls, in case we fail
     528              :         // partway through.  In fact, all location conf API calls should be in a wrapper that sets
     529              :         // the observed state to None, then runs, then sets it to what we wrote.
     530            0 :         self.observed.locations.insert(
     531            0 :             origin_ps.get_id(),
     532            0 :             ObservedStateLocation {
     533            0 :                 conf: Some(origin_secondary_conf),
     534            0 :             },
     535            0 :         );
     536            0 : 
     537            0 :         tracing::info!("🔁 Switching to AttachedSingle mode on node {dest_ps}",);
     538            0 :         let dest_final_conf = build_location_config(
     539            0 :             &self.shard,
     540            0 :             &self.config,
     541            0 :             LocationConfigMode::AttachedSingle,
     542            0 :             self.generation,
     543            0 :             None,
     544            0 :         );
     545            0 :         self.location_config(&dest_ps, dest_final_conf.clone(), None, false)
     546            0 :             .await?;
     547            0 :         self.observed.locations.insert(
     548            0 :             dest_ps.get_id(),
     549            0 :             ObservedStateLocation {
     550            0 :                 conf: Some(dest_final_conf),
     551            0 :             },
     552            0 :         );
     553            0 : 
     554            0 :         tracing::info!("✅ Migration complete");
     555              : 
     556            0 :         Ok(())
     557            0 :     }
     558              : 
     559            0 :     async fn maybe_refresh_observed(&mut self) -> Result<(), ReconcileError> {
     560              :         // If the attached node has uncertain state, read it from the pageserver before proceeding: this
     561              :         // is important to avoid spurious generation increments.
     562              :         //
     563              :         // We don't need to do this for secondary/detach locations because it's harmless to just PUT their
     564              :         // location conf, whereas for attached locations it can interrupt clients if we spuriously destroy/recreate
     565              :         // the `Timeline` object in the pageserver.
     566              : 
     567            0 :         let Some(attached_node) = self.intent.attached.as_ref() else {
     568              :             // Nothing to do
     569            0 :             return Ok(());
     570              :         };
     571              : 
     572            0 :         if matches!(
     573            0 :             self.observed.locations.get(&attached_node.get_id()),
     574              :             Some(ObservedStateLocation { conf: None })
     575              :         ) {
     576            0 :             let tenant_shard_id = self.tenant_shard_id;
     577            0 :             let observed_conf = match attached_node
     578            0 :                 .with_client_retries(
     579            0 :                     |client| async move { client.get_location_config(tenant_shard_id).await },
     580            0 :                     &self.service_config.jwt_token,
     581            0 :                     1,
     582            0 :                     1,
     583            0 :                     Duration::from_secs(5),
     584            0 :                     &self.cancel,
     585            0 :                 )
     586            0 :                 .await
     587              :             {
     588            0 :                 Some(Ok(observed)) => Some(observed),
     589            0 :                 Some(Err(mgmt_api::Error::ApiError(status, _msg)))
     590            0 :                     if status == StatusCode::NOT_FOUND =>
     591            0 :                 {
     592            0 :                     None
     593              :                 }
     594            0 :                 Some(Err(e)) => return Err(e.into()),
     595            0 :                 None => return Err(ReconcileError::Cancel),
     596              :             };
     597            0 :             tracing::info!("Scanned location configuration on {attached_node}: {observed_conf:?}");
     598            0 :             match observed_conf {
     599            0 :                 Some(conf) => {
     600            0 :                     // Pageserver returned a state: update it in observed.  This may still be an indeterminate (None) state,
     601            0 :                     // if internally the pageserver's TenantSlot was being mutated (e.g. some long running API call is still running)
     602            0 :                     self.observed
     603            0 :                         .locations
     604            0 :                         .insert(attached_node.get_id(), ObservedStateLocation { conf });
     605            0 :                 }
     606            0 :                 None => {
     607            0 :                     // Pageserver returned 404: we have confirmation that there is no state for this shard on that pageserver.
     608            0 :                     self.observed.locations.remove(&attached_node.get_id());
     609            0 :                 }
     610              :             }
     611            0 :         }
     612              : 
     613            0 :         Ok(())
     614            0 :     }
     615              : 
     616              :     /// Reconciling a tenant makes API calls to pageservers until the observed state
     617              :     /// matches the intended state.
     618              :     ///
     619              :     /// First we apply special case handling (e.g. for live migrations), and then a
     620              :     /// general case reconciliation where we walk through the intent by pageserver
     621              :     /// and call out to the pageserver to apply the desired state.
     622            0 :     pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
     623            0 :         // Prepare: if we have uncertain `observed` state for our would-be attachement location, then refresh it
     624            0 :         self.maybe_refresh_observed().await?;
     625              : 
     626              :         // Special case: live migration
     627            0 :         self.maybe_live_migrate().await?;
     628              : 
     629              :         // If the attached pageserver is not attached, do so now.
     630            0 :         if let Some(node) = self.intent.attached.as_ref() {
     631              :             // If we are in an attached policy, then generation must have been set (null generations
     632              :             // are only present when a tenant is initially loaded with a secondary policy)
     633            0 :             debug_assert!(self.generation.is_some());
     634            0 :             let Some(generation) = self.generation else {
     635            0 :                 return Err(ReconcileError::Other(anyhow::anyhow!(
     636            0 :                     "Attempted to attach with NULL generation"
     637            0 :                 )));
     638              :             };
     639              : 
     640            0 :             let mut wanted_conf = attached_location_conf(
     641            0 :                 generation,
     642            0 :                 &self.shard,
     643            0 :                 &self.config,
     644            0 :                 !self.intent.secondary.is_empty(),
     645            0 :             );
     646            0 :             match self.observed.locations.get(&node.get_id()) {
     647            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
     648            0 :                     // Nothing to do
     649            0 :                     tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
     650              :                 }
     651            0 :                 observed => {
     652              :                     // In all cases other than a matching observed configuration, we will
     653              :                     // reconcile this location.  This includes locations with different configurations, as well
     654              :                     // as locations with unknown (None) observed state.
     655              : 
     656              :                     // The general case is to increment the generation.  However, there are cases
     657              :                     // where this is not necessary:
     658              :                     // - if we are only updating the TenantConf part of the location
     659              :                     // - if we are only changing the attachment mode (e.g. going to attachedmulti or attachedstale)
     660              :                     //   and the location was already in the correct generation
     661            0 :                     let increment_generation = match observed {
     662            0 :                         None => true,
     663            0 :                         Some(ObservedStateLocation { conf: None }) => true,
     664              :                         Some(ObservedStateLocation {
     665            0 :                             conf: Some(observed),
     666            0 :                         }) => {
     667            0 :                             let generations_match = observed.generation == wanted_conf.generation;
     668              : 
     669              :                             use LocationConfigMode::*;
     670            0 :                             let mode_transition_requires_gen_inc =
     671            0 :                                 match (observed.mode, wanted_conf.mode) {
     672              :                                     // Usually the short-lived attachment modes (multi and stale) are only used
     673              :                                     // in the case of [`Self::live_migrate`], but it is simple to handle them correctly
     674              :                                     // here too.  Locations are allowed to go Single->Stale and Multi->Single within the same generation.
     675            0 :                                     (AttachedSingle, AttachedStale) => false,
     676            0 :                                     (AttachedMulti, AttachedSingle) => false,
     677            0 :                                     (lhs, rhs) => lhs != rhs,
     678              :                                 };
     679              : 
     680            0 :                             !generations_match || mode_transition_requires_gen_inc
     681              :                         }
     682              :                     };
     683              : 
     684            0 :                     if increment_generation {
     685            0 :                         let generation = self
     686            0 :                             .persistence
     687            0 :                             .increment_generation(self.tenant_shard_id, node.get_id())
     688            0 :                             .await?;
     689            0 :                         self.generation = Some(generation);
     690            0 :                         wanted_conf.generation = generation.into();
     691            0 :                     }
     692            0 :                     tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
     693              : 
     694              :                     // Because `node` comes from a ref to &self, clone it before calling into a &mut self
     695              :                     // function: this could be avoided by refactoring the state mutated by location_config into
     696              :                     // a separate type to Self.
     697            0 :                     let node = node.clone();
     698            0 : 
     699            0 :                     // Use lazy=true, because we may run many of Self concurrently, and do not want to
     700            0 :                     // overload the pageserver with logical size calculations.
     701            0 :                     self.location_config(&node, wanted_conf, None, true).await?;
     702            0 :                     self.compute_notify().await?;
     703              :                 }
     704              :             }
     705            0 :         }
     706              : 
     707              :         // Configure secondary locations: if these were previously attached this
     708              :         // implicitly downgrades them from attached to secondary.
     709            0 :         let mut changes = Vec::new();
     710            0 :         for node in &self.intent.secondary {
     711            0 :             let wanted_conf = secondary_location_conf(&self.shard, &self.config);
     712            0 :             match self.observed.locations.get(&node.get_id()) {
     713            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
     714            0 :                     // Nothing to do
     715            0 :                     tracing::info!(node_id=%node.get_id(), "Observed configuration already correct.")
     716              :                 }
     717              :                 _ => {
     718              :                     // In all cases other than a matching observed configuration, we will
     719              :                     // reconcile this location.
     720            0 :                     tracing::info!(node_id=%node.get_id(), "Observed configuration requires update.");
     721            0 :                     changes.push((node.clone(), wanted_conf))
     722              :                 }
     723              :             }
     724              :         }
     725              : 
     726              :         // Detach any extraneous pageservers that are no longer referenced
     727              :         // by our intent.
     728            0 :         for node in &self.detach {
     729            0 :             changes.push((
     730            0 :                 node.clone(),
     731            0 :                 LocationConfig {
     732            0 :                     mode: LocationConfigMode::Detached,
     733            0 :                     generation: None,
     734            0 :                     secondary_conf: None,
     735            0 :                     shard_number: self.shard.number.0,
     736            0 :                     shard_count: self.shard.count.literal(),
     737            0 :                     shard_stripe_size: self.shard.stripe_size.0,
     738            0 :                     tenant_conf: self.config.clone(),
     739            0 :                 },
     740            0 :             ));
     741            0 :         }
     742              : 
     743            0 :         for (node, conf) in changes {
     744            0 :             if self.cancel.is_cancelled() {
     745            0 :                 return Err(ReconcileError::Cancel);
     746            0 :             }
     747            0 :             self.location_config(&node, conf, None, false).await?;
     748              :         }
     749              : 
     750            0 :         Ok(())
     751            0 :     }
     752              : 
     753            0 :     pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> {
     754              :         // Whenever a particular Reconciler emits a notification, it is always notifying for the intended
     755              :         // destination.
     756            0 :         if let Some(node) = &self.intent.attached {
     757            0 :             let result = self
     758            0 :                 .compute_hook
     759            0 :                 .notify(
     760            0 :                     self.tenant_shard_id,
     761            0 :                     node.get_id(),
     762            0 :                     self.shard.stripe_size,
     763            0 :                     &self.cancel,
     764            0 :                 )
     765            0 :                 .await;
     766            0 :             if let Err(e) = &result {
     767              :                 // It is up to the caller whether they want to drop out on this error, but they don't have to:
     768              :                 // in general we should avoid letting unavailability of the cloud control plane stop us from
     769              :                 // making progress.
     770            0 :                 if !matches!(e, NotifyError::ShuttingDown) {
     771            0 :                     tracing::warn!("Failed to notify compute of attached pageserver {node}: {e}");
     772            0 :                 }
     773              : 
     774              :                 // Set this flag so that in our ReconcileResult we will set the flag on the shard that it
     775              :                 // needs to retry at some point.
     776            0 :                 self.compute_notify_failure = true;
     777            0 :             }
     778            0 :             result
     779              :         } else {
     780            0 :             Ok(())
     781              :         }
     782            0 :     }
     783              : }
     784              : 
     785              : /// We tweak the externally-set TenantConfig while configuring
     786              : /// locations, using our awareness of whether secondary locations
     787              : /// are in use to automatically enable/disable heatmap uploads.
     788            0 : fn ha_aware_config(config: &TenantConfig, has_secondaries: bool) -> TenantConfig {
     789            0 :     let mut config = config.clone();
     790            0 :     if has_secondaries {
     791            0 :         if config.heatmap_period.is_none() {
     792            0 :             config.heatmap_period = Some(DEFAULT_HEATMAP_PERIOD.to_string());
     793            0 :         }
     794            0 :     } else {
     795            0 :         config.heatmap_period = None;
     796            0 :     }
     797            0 :     config
     798            0 : }
     799              : 
     800            0 : pub(crate) fn attached_location_conf(
     801            0 :     generation: Generation,
     802            0 :     shard: &ShardIdentity,
     803            0 :     config: &TenantConfig,
     804            0 :     has_secondaries: bool,
     805            0 : ) -> LocationConfig {
     806            0 :     LocationConfig {
     807            0 :         mode: LocationConfigMode::AttachedSingle,
     808            0 :         generation: generation.into(),
     809            0 :         secondary_conf: None,
     810            0 :         shard_number: shard.number.0,
     811            0 :         shard_count: shard.count.literal(),
     812            0 :         shard_stripe_size: shard.stripe_size.0,
     813            0 :         tenant_conf: ha_aware_config(config, has_secondaries),
     814            0 :     }
     815            0 : }
     816              : 
     817            0 : pub(crate) fn secondary_location_conf(
     818            0 :     shard: &ShardIdentity,
     819            0 :     config: &TenantConfig,
     820            0 : ) -> LocationConfig {
     821            0 :     LocationConfig {
     822            0 :         mode: LocationConfigMode::Secondary,
     823            0 :         generation: None,
     824            0 :         secondary_conf: Some(LocationConfigSecondary { warm: true }),
     825            0 :         shard_number: shard.number.0,
     826            0 :         shard_count: shard.count.literal(),
     827            0 :         shard_stripe_size: shard.stripe_size.0,
     828            0 :         tenant_conf: ha_aware_config(config, true),
     829            0 :     }
     830            0 : }
        

Generated by: LCOV version 2.1-beta