LCOV - code coverage report
Current view: top level - control_plane/attachment_service/src - reconciler.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 78.2 % 340 266
Test Date: 2024-02-14 18:05:35 Functions: 68.2 % 44 30

            Line data    Source code
       1              : use crate::persistence::Persistence;
       2              : use crate::service;
       3              : use control_plane::attachment_service::NodeAvailability;
       4              : use pageserver_api::models::{
       5              :     LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
       6              : };
       7              : use pageserver_api::shard::{ShardIdentity, TenantShardId};
       8              : use pageserver_client::mgmt_api;
       9              : use std::collections::HashMap;
      10              : use std::sync::Arc;
      11              : use std::time::Duration;
      12              : use tokio_util::sync::CancellationToken;
      13              : use utils::generation::Generation;
      14              : use utils::id::{NodeId, TimelineId};
      15              : use utils::lsn::Lsn;
      16              : 
      17              : use crate::compute_hook::{ComputeHook, NotifyError};
      18              : use crate::node::Node;
      19              : use crate::tenant_state::{IntentState, ObservedState, ObservedStateLocation};
      20              : 
      21              : /// Object with the lifetime of the background reconcile task that is created
      22              : /// for tenants which have a difference between their intent and observed states.
      23              : pub(super) struct Reconciler {
      24              :     /// See [`crate::tenant_state::TenantState`] for the meanings of these fields: they are a snapshot
      25              :     /// of a tenant's state from when we spawned a reconcile task.
      26              :     pub(super) tenant_shard_id: TenantShardId,
      27              :     pub(crate) shard: ShardIdentity,
      28              :     pub(crate) generation: Generation,
      29              :     pub(crate) intent: IntentState,
      30              :     pub(crate) config: TenantConfig,
      31              :     pub(crate) observed: ObservedState,
      32              : 
      33              :     pub(crate) service_config: service::Config,
      34              : 
      35              :     /// A snapshot of the pageservers as they were when we were asked
      36              :     /// to reconcile.
      37              :     pub(crate) pageservers: Arc<HashMap<NodeId, Node>>,
      38              : 
      39              :     /// A hook to notify the running postgres instances when we change the location
      40              :     /// of a tenant.  Use this via [`Self::compute_notify`] to update our failure flag
      41              :     /// and guarantee eventual retries.
      42              :     pub(crate) compute_hook: Arc<ComputeHook>,
      43              : 
      44              :     /// To avoid stalling if the cloud control plane is unavailable, we may proceed
      45              :     /// past failures in [`ComputeHook::notify`], but we _must_ remember that we failed
      46              :     /// so that we can set [`crate::tenant_state::TenantState::pending_compute_notification`] to ensure a later retry.
      47              :     pub(crate) compute_notify_failure: bool,
      48              : 
      49              :     /// A means to abort background reconciliation: it is essential to
      50              :     /// call this when something changes in the original TenantState that
      51              :     /// will make this reconciliation impossible or unnecessary, for
      52              :     /// example when a pageserver node goes offline, or the PlacementPolicy for
      53              :     /// the tenant is changed.
      54              :     pub(crate) cancel: CancellationToken,
      55              : 
      56              :     /// Access to persistent storage for updating generation numbers
      57              :     pub(crate) persistence: Arc<Persistence>,
      58              : }
      59              : 
      60            2 : #[derive(thiserror::Error, Debug)]
      61              : pub(crate) enum ReconcileError {
      62              :     #[error(transparent)]
      63              :     Notify(#[from] NotifyError),
      64              :     #[error(transparent)]
      65              :     Other(#[from] anyhow::Error),
      66              : }
      67              : 
      68              : impl Reconciler {
      69          524 :     async fn location_config(
      70          524 :         &mut self,
      71          524 :         node_id: NodeId,
      72          524 :         config: LocationConfig,
      73          524 :         flush_ms: Option<Duration>,
      74          524 :     ) -> anyhow::Result<()> {
      75          524 :         let node = self
      76          524 :             .pageservers
      77          524 :             .get(&node_id)
      78          524 :             .expect("Pageserver may not be removed while referenced");
      79          524 : 
      80          524 :         self.observed
      81          524 :             .locations
      82          524 :             .insert(node.id, ObservedStateLocation { conf: None });
      83              : 
      84          522 :         tracing::info!("location_config({}) calling: {:?}", node_id, config);
      85          522 :         let client =
      86          522 :             mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
      87          522 :         client
      88          522 :             .location_config(self.tenant_shard_id, config.clone(), flush_ms)
      89         2060 :             .await?;
      90          521 :         tracing::info!("location_config({}) complete: {:?}", node_id, config);
      91              : 
      92          521 :         self.observed
      93          521 :             .locations
      94          521 :             .insert(node.id, ObservedStateLocation { conf: Some(config) });
      95          521 : 
      96          521 :         Ok(())
      97          522 :     }
      98              : 
      99          503 :     async fn maybe_live_migrate(&mut self) -> Result<(), ReconcileError> {
     100          503 :         let destination = if let Some(node_id) = self.intent.attached {
     101          503 :             match self.observed.locations.get(&node_id) {
     102            0 :                 Some(conf) => {
     103              :                     // We will do a live migration only if the intended destination is not
     104              :                     // currently in an attached state.
     105            0 :                     match &conf.conf {
     106            0 :                         Some(conf) if conf.mode == LocationConfigMode::Secondary => {
     107            0 :                             // Fall through to do a live migration
     108            0 :                             node_id
     109              :                         }
     110              :                         None | Some(_) => {
     111              :                             // Attached or uncertain: don't do a live migration, proceed
     112              :                             // with a general-case reconciliation
     113            0 :                             tracing::info!("maybe_live_migrate: destination is None or attached");
     114            0 :                             return Ok(());
     115              :                         }
     116              :                     }
     117              :                 }
     118              :                 None => {
     119              :                     // Our destination is not attached: maybe live migrate if some other
     120              :                     // node is currently attached.  Fall through.
     121          503 :                     node_id
     122              :                 }
     123              :             }
     124              :         } else {
     125              :             // No intent to be attached
     126            0 :             tracing::info!("maybe_live_migrate: no attached intent");
     127            0 :             return Ok(());
     128              :         };
     129              : 
     130          503 :         let mut origin = None;
     131          508 :         for (node_id, state) in &self.observed.locations {
     132            9 :             if let Some(observed_conf) = &state.conf {
     133            4 :                 if observed_conf.mode == LocationConfigMode::AttachedSingle {
     134            4 :                     let node = self
     135            4 :                         .pageservers
     136            4 :                         .get(node_id)
     137            4 :                         .expect("Nodes may not be removed while referenced");
     138            4 :                     // We will only attempt live migration if the origin is not offline: this
     139            4 :                     // avoids trying to do it while reconciling after responding to an HA failover.
     140            4 :                     if !matches!(node.availability, NodeAvailability::Offline) {
     141            4 :                         origin = Some(*node_id);
     142            4 :                         break;
     143            0 :                     }
     144            0 :                 }
     145            5 :             }
     146              :         }
     147              : 
     148          503 :         let Some(origin) = origin else {
     149          499 :             tracing::info!("maybe_live_migrate: no origin found");
     150          499 :             return Ok(());
     151              :         };
     152              : 
     153              :         // We have an origin and a destination: proceed to do the live migration
     154            4 :         tracing::info!("Live migrating {}->{}", origin, destination);
     155          128 :         self.live_migrate(origin, destination).await?;
     156              : 
     157            4 :         Ok(())
     158          503 :     }
     159              : 
     160           12 :     async fn get_lsns(
     161           12 :         &self,
     162           12 :         tenant_shard_id: TenantShardId,
     163           12 :         node_id: &NodeId,
     164           12 :     ) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
     165           12 :         let node = self
     166           12 :             .pageservers
     167           12 :             .get(node_id)
     168           12 :             .expect("Pageserver may not be removed while referenced");
     169           12 : 
     170           12 :         let client =
     171           12 :             mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
     172              : 
     173           48 :         let timelines = client.timeline_list(&tenant_shard_id).await?;
     174            8 :         Ok(timelines
     175            8 :             .into_iter()
     176            8 :             .map(|t| (t.timeline_id, t.last_record_lsn))
     177            8 :             .collect())
     178           12 :     }
     179              : 
     180            0 :     async fn secondary_download(&self, tenant_shard_id: TenantShardId, node_id: &NodeId) {
     181            0 :         let node = self
     182            0 :             .pageservers
     183            0 :             .get(node_id)
     184            0 :             .expect("Pageserver may not be removed while referenced");
     185            0 : 
     186            0 :         let client =
     187            0 :             mgmt_api::Client::new(node.base_url(), self.service_config.jwt_token.as_deref());
     188            0 : 
     189            0 :         match client.tenant_secondary_download(tenant_shard_id).await {
     190            0 :             Ok(()) => {}
     191              :             Err(_) => {
     192            0 :                 tracing::info!("  (skipping, destination wasn't in secondary mode)")
     193              :             }
     194              :         }
     195            0 :     }
     196              : 
     197            4 :     async fn await_lsn(
     198            4 :         &self,
     199            4 :         tenant_shard_id: TenantShardId,
     200            4 :         pageserver_id: &NodeId,
     201            4 :         baseline: HashMap<TimelineId, Lsn>,
     202            4 :     ) -> anyhow::Result<()> {
     203              :         loop {
     204           32 :             let latest = match self.get_lsns(tenant_shard_id, pageserver_id).await {
     205            4 :                 Ok(l) => l,
     206            4 :                 Err(e) => {
     207            4 :                     println!(
     208            4 :                         "🕑 Can't get LSNs on pageserver {} yet, waiting ({e})",
     209            4 :                         pageserver_id
     210            4 :                     );
     211            4 :                     std::thread::sleep(Duration::from_millis(500));
     212            4 :                     continue;
     213              :                 }
     214              :             };
     215              : 
     216            4 :             let mut any_behind: bool = false;
     217            8 :             for (timeline_id, baseline_lsn) in &baseline {
     218            4 :                 match latest.get(timeline_id) {
     219            4 :                     Some(latest_lsn) => {
     220            4 :                         println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
     221            4 :                         if latest_lsn < baseline_lsn {
     222            0 :                             any_behind = true;
     223            4 :                         }
     224              :                     }
     225            0 :                     None => {
     226            0 :                         // Expected timeline isn't yet visible on migration destination.
     227            0 :                         // (IRL we would have to account for timeline deletion, but this
     228            0 :                         //  is just test helper)
     229            0 :                         any_behind = true;
     230            0 :                     }
     231              :                 }
     232              :             }
     233              : 
     234            4 :             if !any_behind {
     235            4 :                 println!("✅ LSN caught up.  Proceeding...");
     236            4 :                 break;
     237            0 :             } else {
     238            0 :                 std::thread::sleep(Duration::from_millis(500));
     239            0 :             }
     240              :         }
     241              : 
     242            4 :         Ok(())
     243            4 :     }
     244              : 
     245            4 :     pub async fn live_migrate(
     246            4 :         &mut self,
     247            4 :         origin_ps_id: NodeId,
     248            4 :         dest_ps_id: NodeId,
     249            4 :     ) -> anyhow::Result<()> {
     250            4 :         // `maybe_live_migrate` is responsibble for sanity of inputs
     251            4 :         assert!(origin_ps_id != dest_ps_id);
     252              : 
     253           16 :         fn build_location_config(
     254           16 :             shard: &ShardIdentity,
     255           16 :             config: &TenantConfig,
     256           16 :             mode: LocationConfigMode,
     257           16 :             generation: Option<Generation>,
     258           16 :             secondary_conf: Option<LocationConfigSecondary>,
     259           16 :         ) -> LocationConfig {
     260           16 :             LocationConfig {
     261           16 :                 mode,
     262           16 :                 generation: generation.map(|g| g.into().unwrap()),
     263           16 :                 secondary_conf,
     264           16 :                 tenant_conf: config.clone(),
     265           16 :                 shard_number: shard.number.0,
     266           16 :                 shard_count: shard.count.0,
     267           16 :                 shard_stripe_size: shard.stripe_size.0,
     268           16 :             }
     269           16 :         }
     270              : 
     271            4 :         tracing::info!(
     272            4 :             "🔁 Switching origin pageserver {} to stale mode",
     273            4 :             origin_ps_id
     274            4 :         );
     275              : 
     276              :         // FIXME: it is incorrect to use self.generation here, we should use the generation
     277              :         // from the ObservedState of the origin pageserver (it might be older than self.generation)
     278            4 :         let stale_conf = build_location_config(
     279            4 :             &self.shard,
     280            4 :             &self.config,
     281            4 :             LocationConfigMode::AttachedStale,
     282            4 :             Some(self.generation),
     283            4 :             None,
     284            4 :         );
     285            4 :         self.location_config(origin_ps_id, stale_conf, Some(Duration::from_secs(10)))
     286           16 :             .await?;
     287              : 
     288           16 :         let baseline_lsns = Some(self.get_lsns(self.tenant_shard_id, &origin_ps_id).await?);
     289              : 
     290              :         // If we are migrating to a destination that has a secondary location, warm it up first
     291            4 :         if let Some(destination_conf) = self.observed.locations.get(&dest_ps_id) {
     292            0 :             if let Some(destination_conf) = &destination_conf.conf {
     293            0 :                 if destination_conf.mode == LocationConfigMode::Secondary {
     294            0 :                     tracing::info!(
     295            0 :                         "🔁 Downloading latest layers to destination pageserver {}",
     296            0 :                         dest_ps_id,
     297            0 :                     );
     298            0 :                     self.secondary_download(self.tenant_shard_id, &dest_ps_id)
     299            0 :                         .await;
     300            0 :                 }
     301            0 :             }
     302            4 :         }
     303              : 
     304              :         // Increment generation before attaching to new pageserver
     305            4 :         self.generation = self
     306            4 :             .persistence
     307            4 :             .increment_generation(self.tenant_shard_id, dest_ps_id)
     308            4 :             .await?;
     309              : 
     310            4 :         let dest_conf = build_location_config(
     311            4 :             &self.shard,
     312            4 :             &self.config,
     313            4 :             LocationConfigMode::AttachedMulti,
     314            4 :             Some(self.generation),
     315            4 :             None,
     316            4 :         );
     317              : 
     318            4 :         tracing::info!("🔁 Attaching to pageserver {}", dest_ps_id);
     319           16 :         self.location_config(dest_ps_id, dest_conf, None).await?;
     320              : 
     321            4 :         if let Some(baseline) = baseline_lsns {
     322            4 :             tracing::info!("🕑 Waiting for LSN to catch up...");
     323            4 :             self.await_lsn(self.tenant_shard_id, &dest_ps_id, baseline)
     324           32 :                 .await?;
     325            0 :         }
     326              : 
     327            4 :         tracing::info!("🔁 Notifying compute to use pageserver {}", dest_ps_id);
     328              : 
     329              :         // During a live migration it is unhelpful to proceed if we couldn't notify compute: if we detach
     330              :         // the origin without notifying compute, we will render the tenant unavailable.
     331           12 :         while let Err(e) = self.compute_notify().await {
     332            0 :             match e {
     333            0 :                 NotifyError::Fatal(_) => return Err(anyhow::anyhow!(e)),
     334              :                 _ => {
     335            0 :                     tracing::warn!(
     336            0 :                         "Live migration blocked by compute notification error, retrying: {e}"
     337            0 :                     );
     338              :                 }
     339              :             }
     340              :         }
     341              : 
     342              :         // Downgrade the origin to secondary.  If the tenant's policy is PlacementPolicy::Single, then
     343              :         // this location will be deleted in the general case reconciliation that runs after this.
     344            4 :         let origin_secondary_conf = build_location_config(
     345            4 :             &self.shard,
     346            4 :             &self.config,
     347            4 :             LocationConfigMode::Secondary,
     348            4 :             None,
     349            4 :             Some(LocationConfigSecondary { warm: true }),
     350            4 :         );
     351            4 :         self.location_config(origin_ps_id, origin_secondary_conf.clone(), None)
     352           16 :             .await?;
     353              :         // TODO: we should also be setting the ObservedState on earlier API calls, in case we fail
     354              :         // partway through.  In fact, all location conf API calls should be in a wrapper that sets
     355              :         // the observed state to None, then runs, then sets it to what we wrote.
     356            4 :         self.observed.locations.insert(
     357            4 :             origin_ps_id,
     358            4 :             ObservedStateLocation {
     359            4 :                 conf: Some(origin_secondary_conf),
     360            4 :             },
     361            4 :         );
     362            4 : 
     363            4 :         println!(
     364            4 :             "🔁 Switching to AttachedSingle mode on pageserver {}",
     365            4 :             dest_ps_id
     366            4 :         );
     367            4 :         let dest_final_conf = build_location_config(
     368            4 :             &self.shard,
     369            4 :             &self.config,
     370            4 :             LocationConfigMode::AttachedSingle,
     371            4 :             Some(self.generation),
     372            4 :             None,
     373            4 :         );
     374            4 :         self.location_config(dest_ps_id, dest_final_conf.clone(), None)
     375           16 :             .await?;
     376            4 :         self.observed.locations.insert(
     377            4 :             dest_ps_id,
     378            4 :             ObservedStateLocation {
     379            4 :                 conf: Some(dest_final_conf),
     380            4 :             },
     381            4 :         );
     382            4 : 
     383            4 :         println!("✅ Migration complete");
     384            4 : 
     385            4 :         Ok(())
     386            4 :     }
     387              : 
     388              :     /// Reconciling a tenant makes API calls to pageservers until the observed state
     389              :     /// matches the intended state.
     390              :     ///
     391              :     /// First we apply special case handling (e.g. for live migrations), and then a
     392              :     /// general case reconciliation where we walk through the intent by pageserver
     393              :     /// and call out to the pageserver to apply the desired state.
     394          503 :     pub(crate) async fn reconcile(&mut self) -> Result<(), ReconcileError> {
     395          503 :         // TODO: if any of self.observed is None, call to remote pageservers
     396          503 :         // to learn correct state.
     397          503 : 
     398          503 :         // Special case: live migration
     399          503 :         self.maybe_live_migrate().await?;
     400              : 
     401              :         // If the attached pageserver is not attached, do so now.
     402          503 :         if let Some(node_id) = self.intent.attached {
     403          503 :             let mut wanted_conf =
     404          503 :                 attached_location_conf(self.generation, &self.shard, &self.config);
     405          503 :             match self.observed.locations.get(&node_id) {
     406            4 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
     407              :                     // Nothing to do
     408            4 :                     tracing::info!("Observed configuration already correct.")
     409              :                 }
     410              :                 _ => {
     411              :                     // In all cases other than a matching observed configuration, we will
     412              :                     // reconcile this location.  This includes locations with different configurations, as well
     413              :                     // as locations with unknown (None) observed state.
     414          499 :                     self.generation = self
     415          499 :                         .persistence
     416          499 :                         .increment_generation(self.tenant_shard_id, node_id)
     417          499 :                         .await?;
     418          499 :                     wanted_conf.generation = self.generation.into();
     419          499 :                     tracing::info!("Observed configuration requires update.");
     420         1960 :                     self.location_config(node_id, wanted_conf, None).await?;
     421          496 :                     self.compute_notify().await?;
     422              :                 }
     423              :             }
     424            0 :         }
     425              : 
     426              :         // Configure secondary locations: if these were previously attached this
     427              :         // implicitly downgrades them from attached to secondary.
     428          500 :         let mut changes = Vec::new();
     429          500 :         for node_id in &self.intent.secondary {
     430            0 :             let wanted_conf = secondary_location_conf(&self.shard, &self.config);
     431            0 :             match self.observed.locations.get(node_id) {
     432            0 :                 Some(conf) if conf.conf.as_ref() == Some(&wanted_conf) => {
     433              :                     // Nothing to do
     434            0 :                     tracing::info!(%node_id, "Observed configuration already correct.")
     435              :                 }
     436              :                 _ => {
     437              :                     // In all cases other than a matching observed configuration, we will
     438              :                     // reconcile this location.
     439            0 :                     tracing::info!(%node_id, "Observed configuration requires update.");
     440            0 :                     changes.push((*node_id, wanted_conf))
     441              :                 }
     442              :             }
     443              :         }
     444              : 
     445              :         // Detach any extraneous pageservers that are no longer referenced
     446              :         // by our intent.
     447          500 :         let all_pageservers = self.intent.all_pageservers();
     448          509 :         for node_id in self.observed.locations.keys() {
     449          509 :             if all_pageservers.contains(node_id) {
     450              :                 // We are only detaching pageservers that aren't used at all.
     451          500 :                 continue;
     452            9 :             }
     453            9 : 
     454            9 :             changes.push((
     455            9 :                 *node_id,
     456            9 :                 LocationConfig {
     457            9 :                     mode: LocationConfigMode::Detached,
     458            9 :                     generation: None,
     459            9 :                     secondary_conf: None,
     460            9 :                     shard_number: self.shard.number.0,
     461            9 :                     shard_count: self.shard.count.0,
     462            9 :                     shard_stripe_size: self.shard.stripe_size.0,
     463            9 :                     tenant_conf: self.config.clone(),
     464            9 :                 },
     465            9 :             ));
     466              :         }
     467              : 
     468          509 :         for (node_id, conf) in changes {
     469           36 :             self.location_config(node_id, conf, None).await?;
     470              :         }
     471              : 
     472          500 :         Ok(())
     473          501 :     }
     474              : 
     475          500 :     pub(crate) async fn compute_notify(&mut self) -> Result<(), NotifyError> {
     476              :         // Whenever a particular Reconciler emits a notification, it is always notifying for the intended
     477              :         // destination.
     478          500 :         if let Some(node_id) = self.intent.attached {
     479          500 :             let result = self
     480          500 :                 .compute_hook
     481          500 :                 .notify(self.tenant_shard_id, node_id, &self.cancel)
     482           20 :                 .await;
     483          500 :             if let Err(e) = &result {
     484              :                 // It is up to the caller whether they want to drop out on this error, but they don't have to:
     485              :                 // in general we should avoid letting unavailability of the cloud control plane stop us from
     486              :                 // making progress.
     487            0 :                 tracing::warn!("Failed to notify compute of attached pageserver {node_id}: {e}");
     488              :                 // Set this flag so that in our ReconcileResult we will set the flag on the shard that it
     489              :                 // needs to retry at some point.
     490            0 :                 self.compute_notify_failure = true;
     491          500 :             }
     492          500 :             result
     493              :         } else {
     494            0 :             Ok(())
     495              :         }
     496          500 :     }
     497              : }
     498              : 
     499         1858 : pub(crate) fn attached_location_conf(
     500         1858 :     generation: Generation,
     501         1858 :     shard: &ShardIdentity,
     502         1858 :     config: &TenantConfig,
     503         1858 : ) -> LocationConfig {
     504         1858 :     LocationConfig {
     505         1858 :         mode: LocationConfigMode::AttachedSingle,
     506         1858 :         generation: generation.into(),
     507         1858 :         secondary_conf: None,
     508         1858 :         shard_number: shard.number.0,
     509         1858 :         shard_count: shard.count.0,
     510         1858 :         shard_stripe_size: shard.stripe_size.0,
     511         1858 :         tenant_conf: config.clone(),
     512         1858 :     }
     513         1858 : }
     514              : 
     515            0 : pub(crate) fn secondary_location_conf(
     516            0 :     shard: &ShardIdentity,
     517            0 :     config: &TenantConfig,
     518            0 : ) -> LocationConfig {
     519            0 :     LocationConfig {
     520            0 :         mode: LocationConfigMode::Secondary,
     521            0 :         generation: None,
     522            0 :         secondary_conf: Some(LocationConfigSecondary { warm: true }),
     523            0 :         shard_number: shard.number.0,
     524            0 :         shard_count: shard.count.0,
     525            0 :         shard_stripe_size: shard.stripe_size.0,
     526            0 :         tenant_conf: config.clone(),
     527            0 :     }
     528            0 : }
        

Generated by: LCOV version 2.1-beta