LCOV - code coverage report
Current view: top level - compute_tools/src - migration.rs (source / functions) Coverage Total Hit
Test: 727bdccc1d7d53837da843959afb612f56da4e79.info Lines: 0.0 % 84 0
Test Date: 2025-01-30 15:18:43 Functions: 0.0 % 12 0

            Line data    Source code
       1              : use anyhow::{Context, Result};
       2              : use fail::fail_point;
       3              : use postgres::{Client, Transaction};
       4              : use tracing::{error, info};
       5              : 
       6              : use crate::metrics::DB_MIGRATION_FAILED;
       7              : 
       8              : /// Runs a series of migrations on a target database
       9              : pub(crate) struct MigrationRunner<'m> {
      10              :     client: &'m mut Client,
      11              :     migrations: &'m [&'m str],
      12              : }
      13              : 
      14              : impl<'m> MigrationRunner<'m> {
      15              :     /// Create a new migration runner
      16            0 :     pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
      17            0 :         // The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
      18            0 :         assert!(migrations.len() + 1 < i64::MAX as usize);
      19              : 
      20            0 :         Self { client, migrations }
      21            0 :     }
      22              : 
      23              :     /// Get the current value neon_migration.migration_id
      24            0 :     fn get_migration_id(&mut self) -> Result<i64> {
      25            0 :         let row = self
      26            0 :             .client
      27            0 :             .query_one("SELECT id FROM neon_migration.migration_id", &[])?;
      28              : 
      29            0 :         Ok(row.get::<&str, i64>("id"))
      30            0 :     }
      31              : 
      32              :     /// Update the neon_migration.migration_id value
      33              :     ///
      34              :     /// This function has a fail point called compute-migration, which can be
      35              :     /// used if you would like to fail the application of a series of migrations
      36              :     /// at some point.
      37            0 :     fn update_migration_id(txn: &mut Transaction, migration_id: i64) -> Result<()> {
      38            0 :         // We use this fail point in order to check that failing in the
      39            0 :         // middle of applying a series of migrations fails in an expected
      40            0 :         // manner
      41            0 :         if cfg!(feature = "testing") {
      42            0 :             let fail = (|| {
      43            0 :                 fail_point!("compute-migration", |fail_migration_id| {
      44            0 :                     migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
      45            0 :                 });
      46              : 
      47            0 :                 false
      48            0 :             })();
      49            0 : 
      50            0 :             if fail {
      51            0 :                 return Err(anyhow::anyhow!(format!(
      52            0 :                     "migration {} was configured to fail because of a failpoint",
      53            0 :                     migration_id
      54            0 :                 )));
      55            0 :             }
      56            0 :         }
      57              : 
      58            0 :         txn.query(
      59            0 :             "UPDATE neon_migration.migration_id SET id = $1",
      60            0 :             &[&migration_id],
      61            0 :         )
      62            0 :         .with_context(|| format!("update neon_migration.migration_id to {migration_id}"))?;
      63              : 
      64            0 :         Ok(())
      65            0 :     }
      66              : 
      67              :     /// Prepare the migrations the target database for handling migrations
      68            0 :     fn prepare_database(&mut self) -> Result<()> {
      69            0 :         self.client
      70            0 :             .simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")?;
      71            0 :         self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)")?;
      72            0 :         self.client.simple_query(
      73            0 :             "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
      74            0 :         )?;
      75            0 :         self.client
      76            0 :             .simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")?;
      77            0 :         self.client
      78            0 :             .simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")?;
      79              : 
      80            0 :         Ok(())
      81            0 :     }
      82              : 
      83              :     /// Run an individual migration in a separate transaction block.
      84            0 :     fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
      85            0 :         let mut txn = client
      86            0 :             .transaction()
      87            0 :             .with_context(|| format!("begin transaction for migration {migration_id}"))?;
      88              : 
      89            0 :         if migration.starts_with("-- SKIP") {
      90            0 :             info!("Skipping migration id={}", migration_id);
      91              : 
      92              :             // Even though we are skipping the migration, updating the
      93              :             // migration ID should help keep logic easy to understand when
      94              :             // trying to understand the state of a cluster.
      95            0 :             Self::update_migration_id(&mut txn, migration_id)?;
      96              :         } else {
      97            0 :             info!("Running migration id={}:\n{}\n", migration_id, migration);
      98              : 
      99            0 :             txn.simple_query(migration)
     100            0 :                 .with_context(|| format!("apply migration {migration_id}"))?;
     101              : 
     102            0 :             Self::update_migration_id(&mut txn, migration_id)?;
     103              :         }
     104              : 
     105            0 :         txn.commit()
     106            0 :             .with_context(|| format!("commit transaction for migration {migration_id}"))?;
     107              : 
     108            0 :         Ok(())
     109            0 :     }
     110              : 
     111              :     /// Run the configured set of migrations
     112            0 :     pub fn run_migrations(mut self) -> Result<()> {
     113            0 :         self.prepare_database()
     114            0 :             .context("prepare database to handle migrations")?;
     115              : 
     116            0 :         let mut current_migration = self.get_migration_id()? as usize;
     117            0 :         while current_migration < self.migrations.len() {
     118              :             // The index lags the migration ID by 1, so the current migration
     119              :             // ID is also the next index
     120            0 :             let migration_id = (current_migration + 1) as i64;
     121            0 :             let migration = self.migrations[current_migration];
     122            0 : 
     123            0 :             match Self::run_migration(self.client, migration_id, migration) {
     124              :                 Ok(_) => {
     125            0 :                     info!("Finished migration id={}", migration_id);
     126              :                 }
     127            0 :                 Err(e) => {
     128            0 :                     error!("Failed to run migration id={}: {}", migration_id, e);
     129            0 :                     DB_MIGRATION_FAILED
     130            0 :                         .with_label_values(&[migration_id.to_string().as_str()])
     131            0 :                         .inc();
     132            0 :                     return Err(e);
     133              :                 }
     134              :             }
     135              : 
     136            0 :             current_migration += 1;
     137              :         }
     138              : 
     139            0 :         Ok(())
     140            0 :     }
     141              : }
        

Generated by: LCOV version 2.1-beta