LCOV - code coverage report
Current view: top level - compute_tools/src - migration.rs (source / functions) Coverage Total Hit
Test: f2bfe5dc5ab550768e936d6bc7b94d9b2e2d4cc9.info Lines: 0.0 % 78 0
Test Date: 2025-01-27 20:39:28 Functions: 0.0 % 13 0

            Line data    Source code
       1              : use anyhow::{Context, Result};
       2              : use fail::fail_point;
       3              : use postgres::{Client, Transaction};
       4              : use tracing::info;
       5              : 
       6              : /// Runs a series of migrations on a target database
       7              : pub(crate) struct MigrationRunner<'m> {
       8              :     client: &'m mut Client,
       9              :     migrations: &'m [&'m str],
      10              : }
      11              : 
      12              : impl<'m> MigrationRunner<'m> {
      13              :     /// Create a new migration runner
      14            0 :     pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
      15            0 :         // The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
      16            0 :         assert!(migrations.len() + 1 < i64::MAX as usize);
      17              : 
      18            0 :         Self { client, migrations }
      19            0 :     }
      20              : 
      21              :     /// Get the current value neon_migration.migration_id
      22            0 :     fn get_migration_id(&mut self) -> Result<i64> {
      23            0 :         let row = self
      24            0 :             .client
      25            0 :             .query_one("SELECT id FROM neon_migration.migration_id", &[])?;
      26              : 
      27            0 :         Ok(row.get::<&str, i64>("id"))
      28            0 :     }
      29              : 
      30              :     /// Update the neon_migration.migration_id value
      31              :     ///
      32              :     /// This function has a fail point called compute-migration, which can be
      33              :     /// used if you would like to fail the application of a series of migrations
      34              :     /// at some point.
      35            0 :     fn update_migration_id(txn: &mut Transaction, migration_id: i64) -> Result<()> {
      36            0 :         // We use this fail point in order to check that failing in the
      37            0 :         // middle of applying a series of migrations fails in an expected
      38            0 :         // manner
      39            0 :         if cfg!(feature = "testing") {
      40            0 :             let fail = (|| {
      41            0 :                 fail_point!("compute-migration", |fail_migration_id| {
      42            0 :                     migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
      43            0 :                 });
      44              : 
      45            0 :                 false
      46            0 :             })();
      47            0 : 
      48            0 :             if fail {
      49            0 :                 return Err(anyhow::anyhow!(format!(
      50            0 :                     "migration {} was configured to fail because of a failpoint",
      51            0 :                     migration_id
      52            0 :                 )));
      53            0 :             }
      54            0 :         }
      55              : 
      56            0 :         txn.query(
      57            0 :             "UPDATE neon_migration.migration_id SET id = $1",
      58            0 :             &[&migration_id],
      59            0 :         )
      60            0 :         .with_context(|| format!("update neon_migration.migration_id to {migration_id}"))?;
      61              : 
      62            0 :         Ok(())
      63            0 :     }
      64              : 
      65              :     /// Prepare the migrations the target database for handling migrations
      66            0 :     fn prepare_database(&mut self) -> Result<()> {
      67            0 :         self.client
      68            0 :             .simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")?;
      69            0 :         self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)")?;
      70            0 :         self.client.simple_query(
      71            0 :             "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
      72            0 :         )?;
      73            0 :         self.client
      74            0 :             .simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")?;
      75            0 :         self.client
      76            0 :             .simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")?;
      77              : 
      78            0 :         Ok(())
      79            0 :     }
      80              : 
      81              :     /// Run an individual migration
      82            0 :     fn run_migration(txn: &mut Transaction, migration_id: i64, migration: &str) -> Result<()> {
      83            0 :         if migration.starts_with("-- SKIP") {
      84            0 :             info!("Skipping migration id={}", migration_id);
      85              : 
      86              :             // Even though we are skipping the migration, updating the
      87              :             // migration ID should help keep logic easy to understand when
      88              :             // trying to understand the state of a cluster.
      89            0 :             Self::update_migration_id(txn, migration_id)?;
      90              :         } else {
      91            0 :             info!("Running migration id={}:\n{}\n", migration_id, migration);
      92              : 
      93            0 :             txn.simple_query(migration)
      94            0 :                 .with_context(|| format!("apply migration {migration_id}"))?;
      95              : 
      96            0 :             Self::update_migration_id(txn, migration_id)?;
      97              :         }
      98              : 
      99            0 :         Ok(())
     100            0 :     }
     101              : 
     102              :     /// Run the configured set of migrations
     103            0 :     pub fn run_migrations(mut self) -> Result<()> {
     104            0 :         self.prepare_database()
     105            0 :             .context("prepare database to handle migrations")?;
     106              : 
     107            0 :         let mut current_migration = self.get_migration_id()? as usize;
     108            0 :         while current_migration < self.migrations.len() {
     109              :             // The index lags the migration ID by 1, so the current migration
     110              :             // ID is also the next index
     111            0 :             let migration_id = (current_migration + 1) as i64;
     112              : 
     113            0 :             let mut txn = self
     114            0 :                 .client
     115            0 :                 .transaction()
     116            0 :                 .with_context(|| format!("begin transaction for migration {migration_id}"))?;
     117              : 
     118            0 :             Self::run_migration(&mut txn, migration_id, self.migrations[current_migration])
     119            0 :                 .with_context(|| format!("running migration {migration_id}"))?;
     120              : 
     121            0 :             txn.commit()
     122            0 :                 .with_context(|| format!("commit transaction for migration {migration_id}"))?;
     123              : 
     124            0 :             info!("Finished migration id={}", migration_id);
     125              : 
     126            0 :             current_migration += 1;
     127              :         }
     128              : 
     129            0 :         Ok(())
     130            0 :     }
     131              : }
        

Generated by: LCOV version 2.1-beta