LCOV - code coverage report
Current view: top level - compute_tools/src - migration.rs (source / functions) Coverage Total Hit
Test: c8f8d331b83562868d9054d9e0e68f866772aeaa.info Lines: 0.0 % 95 0
Test Date: 2025-07-26 17:20:05 Functions: 0.0 % 17 0

            Line data    Source code
       1              : use anyhow::{Context, Result};
       2              : use fail::fail_point;
       3              : use tokio_postgres::{Client, Transaction};
       4              : use tracing::{error, info};
       5              : 
       6              : use crate::metrics::DB_MIGRATION_FAILED;
       7              : 
       8              : /// Runs a series of migrations on a target database
       9              : pub(crate) struct MigrationRunner<'m> {
      10              :     client: &'m mut Client,
      11              :     migrations: &'m [&'m str],
      12              :     lakebase_mode: bool,
      13              : }
      14              : 
      15              : impl<'m> MigrationRunner<'m> {
      16              :     /// Create a new migration runner
      17            0 :     pub fn new(client: &'m mut Client, migrations: &'m [&'m str], lakebase_mode: bool) -> Self {
      18              :         // The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
      19            0 :         assert!(migrations.len() + 1 < i64::MAX as usize);
      20              : 
      21            0 :         Self {
      22            0 :             client,
      23            0 :             migrations,
      24            0 :             lakebase_mode,
      25            0 :         }
      26            0 :     }
      27              : 
      28              :     /// Get the current value neon_migration.migration_id
      29            0 :     async fn get_migration_id(&mut self) -> Result<i64> {
      30            0 :         let row = self
      31            0 :             .client
      32            0 :             .query_one("SELECT id FROM neon_migration.migration_id", &[])
      33            0 :             .await?;
      34              : 
      35            0 :         Ok(row.get::<&str, i64>("id"))
      36            0 :     }
      37              : 
      38              :     /// Update the neon_migration.migration_id value
      39              :     ///
      40              :     /// This function has a fail point called compute-migration, which can be
      41              :     /// used if you would like to fail the application of a series of migrations
      42              :     /// at some point.
      43            0 :     async fn update_migration_id(txn: &mut Transaction<'_>, migration_id: i64) -> Result<()> {
      44              :         // We use this fail point in order to check that failing in the
      45              :         // middle of applying a series of migrations fails in an expected
      46              :         // manner
      47            0 :         if cfg!(feature = "testing") {
      48            0 :             let fail = (|| {
      49            0 :                 fail_point!("compute-migration", |fail_migration_id| {
      50            0 :                     migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
      51            0 :                 });
      52              : 
      53            0 :                 false
      54              :             })();
      55              : 
      56            0 :             if fail {
      57            0 :                 return Err(anyhow::anyhow!(format!(
      58            0 :                     "migration {} was configured to fail because of a failpoint",
      59            0 :                     migration_id
      60            0 :                 )));
      61            0 :             }
      62            0 :         }
      63              : 
      64            0 :         txn.query(
      65            0 :             "UPDATE neon_migration.migration_id SET id = $1",
      66            0 :             &[&migration_id],
      67            0 :         )
      68            0 :         .await
      69            0 :         .with_context(|| format!("update neon_migration.migration_id to {migration_id}"))?;
      70              : 
      71            0 :         Ok(())
      72            0 :     }
      73              : 
      74              :     /// Prepare the migrations the target database for handling migrations
      75            0 :     async fn prepare_database(&mut self) -> Result<()> {
      76            0 :         self.client
      77            0 :             .simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
      78            0 :             .await?;
      79            0 :         self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
      80            0 :         self.client
      81            0 :             .simple_query(
      82            0 :                 "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
      83            0 :             )
      84            0 :             .await?;
      85            0 :         self.client
      86            0 :             .simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")
      87            0 :             .await?;
      88            0 :         self.client
      89            0 :             .simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")
      90            0 :             .await?;
      91              : 
      92            0 :         Ok(())
      93            0 :     }
      94              : 
      95              :     /// Run an individual migration in a separate transaction block.
      96            0 :     async fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
      97            0 :         let mut txn = client
      98            0 :             .transaction()
      99            0 :             .await
     100            0 :             .with_context(|| format!("begin transaction for migration {migration_id}"))?;
     101              : 
     102            0 :         if migration.starts_with("-- SKIP") {
     103            0 :             info!("Skipping migration id={}", migration_id);
     104              : 
     105              :             // Even though we are skipping the migration, updating the
     106              :             // migration ID should help keep logic easy to understand when
     107              :             // trying to understand the state of a cluster.
     108            0 :             Self::update_migration_id(&mut txn, migration_id).await?;
     109              :         } else {
     110            0 :             info!("Running migration id={}:\n{}\n", migration_id, migration);
     111              : 
     112            0 :             txn.simple_query(migration)
     113            0 :                 .await
     114            0 :                 .with_context(|| format!("apply migration {migration_id}"))?;
     115              : 
     116            0 :             Self::update_migration_id(&mut txn, migration_id).await?;
     117              :         }
     118              : 
     119            0 :         txn.commit()
     120            0 :             .await
     121            0 :             .with_context(|| format!("commit transaction for migration {migration_id}"))?;
     122              : 
     123            0 :         Ok(())
     124            0 :     }
     125              : 
     126              :     /// Run the configured set of migrations
     127            0 :     pub async fn run_migrations(mut self) -> Result<()> {
     128            0 :         self.prepare_database()
     129            0 :             .await
     130            0 :             .context("prepare database to handle migrations")?;
     131              : 
     132            0 :         let mut current_migration = self.get_migration_id().await? as usize;
     133            0 :         while current_migration < self.migrations.len() {
     134              :             // The index lags the migration ID by 1, so the current migration
     135              :             // ID is also the next index
     136            0 :             let migration_id = (current_migration + 1) as i64;
     137            0 :             let migration = self.migrations[current_migration];
     138            0 :             let migration = if self.lakebase_mode {
     139            0 :                 migration.replace("neon_superuser", "databricks_superuser")
     140              :             } else {
     141            0 :                 migration.to_string()
     142              :             };
     143              : 
     144            0 :             match Self::run_migration(self.client, migration_id, &migration).await {
     145              :                 Ok(_) => {
     146            0 :                     info!("Finished migration id={}", migration_id);
     147              :                 }
     148            0 :                 Err(e) => {
     149            0 :                     error!("Failed to run migration id={}: {:?}", migration_id, e);
     150            0 :                     DB_MIGRATION_FAILED
     151            0 :                         .with_label_values(&[migration_id.to_string().as_str()])
     152            0 :                         .inc();
     153            0 :                     return Err(e);
     154              :                 }
     155              :             }
     156              : 
     157            0 :             current_migration += 1;
     158              :         }
     159              : 
     160            0 :         Ok(())
     161            0 :     }
     162              : }
        

Generated by: LCOV version 2.1-beta