LCOV - differential code coverage report
Current view: top level - control_plane/src - tenant_migration.rs (source / functions) Coverage Total Hit UBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 0.0 % 160 0 160
Current Date: 2024-01-09 02:06:09 Functions: 0.0 % 10 0 10
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : //!
       2                 : //! Functionality for migrating tenants across pageservers: unlike most of neon_local, this code
       3                 : //! isn't scoped to a particular physical service, as it needs to update compute endpoints to
       4                 : //! point to the new pageserver.
       5                 : //!
       6                 : use crate::local_env::LocalEnv;
       7                 : use crate::{
       8                 :     attachment_service::AttachmentService, endpoint::ComputeControlPlane,
       9                 :     pageserver::PageServerNode,
      10                 : };
      11                 : use pageserver_api::models::{
      12                 :     LocationConfig, LocationConfigMode, LocationConfigSecondary, TenantConfig,
      13                 : };
      14                 : use pageserver_api::shard::TenantShardId;
      15                 : use std::collections::HashMap;
      16                 : use std::time::Duration;
      17                 : use utils::{
      18                 :     id::{TenantId, TimelineId},
      19                 :     lsn::Lsn,
      20                 : };
      21                 : 
      22                 : /// Given an attached pageserver, retrieve the LSN for all timelines
      23 UBC           0 : async fn get_lsns(
      24               0 :     tenant_id: TenantId,
      25               0 :     pageserver: &PageServerNode,
      26               0 : ) -> anyhow::Result<HashMap<TimelineId, Lsn>> {
      27               0 :     let timelines = pageserver.timeline_list(&tenant_id).await?;
      28               0 :     Ok(timelines
      29               0 :         .into_iter()
      30               0 :         .map(|t| (t.timeline_id, t.last_record_lsn))
      31               0 :         .collect())
      32               0 : }
      33                 : 
      34                 : /// Wait for the timeline LSNs on `pageserver` to catch up with or overtake
      35                 : /// `baseline`.
      36               0 : async fn await_lsn(
      37               0 :     tenant_id: TenantId,
      38               0 :     pageserver: &PageServerNode,
      39               0 :     baseline: HashMap<TimelineId, Lsn>,
      40               0 : ) -> anyhow::Result<()> {
      41                 :     loop {
      42               0 :         let latest = match get_lsns(tenant_id, pageserver).await {
      43               0 :             Ok(l) => l,
      44               0 :             Err(_e) => {
      45               0 :                 println!(
      46               0 :                     "🕑 Waiting for pageserver {} to activate...",
      47               0 :                     pageserver.conf.id
      48               0 :                 );
      49               0 :                 std::thread::sleep(Duration::from_millis(500));
      50               0 :                 continue;
      51                 :             }
      52                 :         };
      53                 : 
      54               0 :         let mut any_behind: bool = false;
      55               0 :         for (timeline_id, baseline_lsn) in &baseline {
      56               0 :             match latest.get(timeline_id) {
      57               0 :                 Some(latest_lsn) => {
      58               0 :                     println!("🕑 LSN origin {baseline_lsn} vs destination {latest_lsn}");
      59               0 :                     if latest_lsn < baseline_lsn {
      60               0 :                         any_behind = true;
      61               0 :                     }
      62                 :                 }
      63               0 :                 None => {
      64               0 :                     // Expected timeline isn't yet visible on migration destination.
      65               0 :                     // (IRL we would have to account for timeline deletion, but this
      66               0 :                     //  is just test helper)
      67               0 :                     any_behind = true;
      68               0 :                 }
      69                 :             }
      70                 :         }
      71                 : 
      72               0 :         if !any_behind {
      73               0 :             println!("✅ LSN caught up.  Proceeding...");
      74               0 :             break;
      75               0 :         } else {
      76               0 :             std::thread::sleep(Duration::from_millis(500));
      77               0 :         }
      78                 :     }
      79                 : 
      80               0 :     Ok(())
      81               0 : }
      82                 : 
      83                 : /// This function spans multiple services, to demonstrate live migration of a tenant
      84                 : /// between pageservers:
      85                 : ///  - Coordinate attach/secondary/detach on pageservers
      86                 : ///  - call into attachment_service for generations
      87                 : ///  - reconfigure compute endpoints to point to new attached pageserver
      88               0 : pub async fn migrate_tenant(
      89               0 :     env: &LocalEnv,
      90               0 :     tenant_id: TenantId,
      91               0 :     dest_ps: PageServerNode,
      92               0 : ) -> anyhow::Result<()> {
      93               0 :     println!("🤔 Checking existing status...");
      94               0 :     let attachment_service = AttachmentService::from_env(env);
      95                 : 
      96               0 :     fn build_location_config(
      97               0 :         mode: LocationConfigMode,
      98               0 :         generation: Option<u32>,
      99               0 :         secondary_conf: Option<LocationConfigSecondary>,
     100               0 :     ) -> LocationConfig {
     101               0 :         LocationConfig {
     102               0 :             mode,
     103               0 :             generation,
     104               0 :             secondary_conf,
     105               0 :             tenant_conf: TenantConfig::default(),
     106               0 :             shard_number: 0,
     107               0 :             shard_count: 0,
     108               0 :             shard_stripe_size: 0,
     109               0 :         }
     110               0 :     }
     111                 : 
     112               0 :     let previous = attachment_service.inspect(tenant_id).await?;
     113               0 :     let mut baseline_lsns = None;
     114               0 :     if let Some((generation, origin_ps_id)) = &previous {
     115               0 :         let origin_ps = PageServerNode::from_env(env, env.get_pageserver_conf(*origin_ps_id)?);
     116                 : 
     117               0 :         if origin_ps_id == &dest_ps.conf.id {
     118               0 :             println!("🔁 Already attached to {origin_ps_id}, freshening...");
     119               0 :             let gen = attachment_service
     120               0 :                 .attach_hook(tenant_id, dest_ps.conf.id)
     121               0 :                 .await?;
     122               0 :             let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
     123               0 :             dest_ps.location_config(tenant_id, dest_conf, None).await?;
     124               0 :             println!("✅ Migration complete");
     125               0 :             return Ok(());
     126               0 :         }
     127               0 : 
     128               0 :         println!("🔁 Switching origin pageserver {origin_ps_id} to stale mode");
     129               0 : 
     130               0 :         let stale_conf =
     131               0 :             build_location_config(LocationConfigMode::AttachedStale, Some(*generation), None);
     132               0 :         origin_ps
     133               0 :             .location_config(tenant_id, stale_conf, Some(Duration::from_secs(10)))
     134               0 :             .await?;
     135                 : 
     136               0 :         baseline_lsns = Some(get_lsns(tenant_id, &origin_ps).await?);
     137               0 :     }
     138                 : 
     139               0 :     println!(
     140               0 :         "🔁 Downloading latest layers to destination pageserver {}",
     141               0 :         dest_ps.conf.id
     142               0 :     );
     143               0 :     match dest_ps
     144               0 :         .tenant_secondary_download(&TenantShardId::unsharded(tenant_id))
     145               0 :         .await
     146                 :     {
     147               0 :         Ok(()) => {}
     148                 :         Err(_) => {
     149               0 :             println!("  (skipping, destination wasn't in secondary mode)")
     150                 :         }
     151                 :     }
     152                 : 
     153               0 :     let gen = attachment_service
     154               0 :         .attach_hook(tenant_id, dest_ps.conf.id)
     155               0 :         .await?;
     156               0 :     let dest_conf = build_location_config(LocationConfigMode::AttachedMulti, gen, None);
     157               0 : 
     158               0 :     println!("🔁 Attaching to pageserver {}", dest_ps.conf.id);
     159               0 :     dest_ps.location_config(tenant_id, dest_conf, None).await?;
     160                 : 
     161               0 :     if let Some(baseline) = baseline_lsns {
     162               0 :         println!("🕑 Waiting for LSN to catch up...");
     163               0 :         await_lsn(tenant_id, &dest_ps, baseline).await?;
     164               0 :     }
     165                 : 
     166               0 :     let cplane = ComputeControlPlane::load(env.clone())?;
     167               0 :     for (endpoint_name, endpoint) in &cplane.endpoints {
     168               0 :         if endpoint.tenant_id == tenant_id {
     169               0 :             println!(
     170               0 :                 "🔁 Reconfiguring endpoint {} to use pageserver {}",
     171               0 :                 endpoint_name, dest_ps.conf.id
     172               0 :             );
     173               0 :             endpoint.reconfigure(Some(dest_ps.conf.id)).await?;
     174               0 :         }
     175                 :     }
     176                 : 
     177               0 :     for other_ps_conf in &env.pageservers {
     178               0 :         if other_ps_conf.id == dest_ps.conf.id {
     179               0 :             continue;
     180               0 :         }
     181               0 : 
     182               0 :         let other_ps = PageServerNode::from_env(env, other_ps_conf);
     183               0 :         let other_ps_tenants = other_ps.tenant_list().await?;
     184                 : 
     185                 :         // Check if this tenant is attached
     186               0 :         let found = other_ps_tenants
     187               0 :             .into_iter()
     188               0 :             .map(|t| t.id)
     189               0 :             .any(|i| i.tenant_id == tenant_id);
     190               0 :         if !found {
     191               0 :             continue;
     192               0 :         }
     193               0 : 
     194               0 :         // Downgrade to a secondary location
     195               0 :         let secondary_conf = build_location_config(
     196               0 :             LocationConfigMode::Secondary,
     197               0 :             None,
     198               0 :             Some(LocationConfigSecondary { warm: true }),
     199               0 :         );
     200               0 : 
     201               0 :         println!(
     202               0 :             "💤 Switching to secondary mode on pageserver {}",
     203               0 :             other_ps.conf.id
     204               0 :         );
     205               0 :         other_ps
     206               0 :             .location_config(tenant_id, secondary_conf, None)
     207               0 :             .await?;
     208                 :     }
     209                 : 
     210               0 :     println!(
     211               0 :         "🔁 Switching to AttachedSingle mode on pageserver {}",
     212               0 :         dest_ps.conf.id
     213               0 :     );
     214               0 :     let dest_conf = build_location_config(LocationConfigMode::AttachedSingle, gen, None);
     215               0 :     dest_ps.location_config(tenant_id, dest_conf, None).await?;
     216                 : 
     217               0 :     println!("✅ Migration complete");
     218               0 : 
     219               0 :     Ok(())
     220               0 : }
        

Generated by: LCOV version 2.1-beta