LCOV - code coverage report
Current view: top level - compute_tools/src - migration.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 0.0 % 95 0
Test Date: 2025-03-12 00:01:28 Functions: 0.0 % 17 0

            Line data    Source code
       1              : use anyhow::{Context, Result};
       2              : use fail::fail_point;
       3              : use tokio_postgres::{Client, Transaction};
       4              : use tracing::{error, info};
       5              : 
       6              : use crate::metrics::DB_MIGRATION_FAILED;
       7              : 
       8              : /// Runs a series of migrations on a target database
       9              : pub(crate) struct MigrationRunner<'m> {
      10              :     client: &'m mut Client,
      11              :     migrations: &'m [&'m str],
      12              : }
      13              : 
      14              : impl<'m> MigrationRunner<'m> {
      15              :     /// Create a new migration runner
      16            0 :     pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
      17            0 :         // The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
      18            0 :         assert!(migrations.len() + 1 < i64::MAX as usize);
      19              : 
      20            0 :         Self { client, migrations }
      21            0 :     }
      22              : 
      23              :     /// Get the current value neon_migration.migration_id
      24            0 :     async fn get_migration_id(&mut self) -> Result<i64> {
      25            0 :         let row = self
      26            0 :             .client
      27            0 :             .query_one("SELECT id FROM neon_migration.migration_id", &[])
      28            0 :             .await?;
      29              : 
      30            0 :         Ok(row.get::<&str, i64>("id"))
      31            0 :     }
      32              : 
      33              :     /// Update the neon_migration.migration_id value
      34              :     ///
      35              :     /// This function has a fail point called compute-migration, which can be
      36              :     /// used if you would like to fail the application of a series of migrations
      37              :     /// at some point.
      38            0 :     async fn update_migration_id(txn: &mut Transaction<'_>, migration_id: i64) -> Result<()> {
      39            0 :         // We use this fail point in order to check that failing in the
      40            0 :         // middle of applying a series of migrations fails in an expected
      41            0 :         // manner
      42            0 :         if cfg!(feature = "testing") {
      43            0 :             let fail = (|| {
      44            0 :                 fail_point!("compute-migration", |fail_migration_id| {
      45            0 :                     migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
      46            0 :                 });
      47              : 
      48            0 :                 false
      49            0 :             })();
      50            0 : 
      51            0 :             if fail {
      52            0 :                 return Err(anyhow::anyhow!(format!(
      53            0 :                     "migration {} was configured to fail because of a failpoint",
      54            0 :                     migration_id
      55            0 :                 )));
      56            0 :             }
      57            0 :         }
      58              : 
      59            0 :         txn.query(
      60            0 :             "UPDATE neon_migration.migration_id SET id = $1",
      61            0 :             &[&migration_id],
      62            0 :         )
      63            0 :         .await
      64            0 :         .with_context(|| format!("update neon_migration.migration_id to {migration_id}"))?;
      65              : 
      66            0 :         Ok(())
      67            0 :     }
      68              : 
      69              :     /// Prepare the migrations the target database for handling migrations
      70            0 :     async fn prepare_database(&mut self) -> Result<()> {
      71            0 :         self.client
      72            0 :             .simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
      73            0 :             .await?;
      74            0 :         self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
      75            0 :         self.client
      76            0 :             .simple_query(
      77            0 :                 "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
      78            0 :             )
      79            0 :             .await?;
      80            0 :         self.client
      81            0 :             .simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")
      82            0 :             .await?;
      83            0 :         self.client
      84            0 :             .simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")
      85            0 :             .await?;
      86              : 
      87            0 :         Ok(())
      88            0 :     }
      89              : 
      90              :     /// Run an individual migration in a separate transaction block.
      91            0 :     async fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
      92            0 :         let mut txn = client
      93            0 :             .transaction()
      94            0 :             .await
      95            0 :             .with_context(|| format!("begin transaction for migration {migration_id}"))?;
      96              : 
      97            0 :         if migration.starts_with("-- SKIP") {
      98            0 :             info!("Skipping migration id={}", migration_id);
      99              : 
     100              :             // Even though we are skipping the migration, updating the
     101              :             // migration ID should help keep logic easy to understand when
     102              :             // trying to understand the state of a cluster.
     103            0 :             Self::update_migration_id(&mut txn, migration_id).await?;
     104              :         } else {
     105            0 :             info!("Running migration id={}:\n{}\n", migration_id, migration);
     106              : 
     107            0 :             txn.simple_query(migration)
     108            0 :                 .await
     109            0 :                 .with_context(|| format!("apply migration {migration_id}"))?;
     110              : 
     111            0 :             Self::update_migration_id(&mut txn, migration_id).await?;
     112              :         }
     113              : 
     114            0 :         txn.commit()
     115            0 :             .await
     116            0 :             .with_context(|| format!("commit transaction for migration {migration_id}"))?;
     117              : 
     118            0 :         Ok(())
     119            0 :     }
     120              : 
     121              :     /// Run the configured set of migrations
     122            0 :     pub async fn run_migrations(mut self) -> Result<()> {
     123            0 :         self.prepare_database()
     124            0 :             .await
     125            0 :             .context("prepare database to handle migrations")?;
     126              : 
     127            0 :         let mut current_migration = self.get_migration_id().await? as usize;
     128            0 :         while current_migration < self.migrations.len() {
     129              :             // The index lags the migration ID by 1, so the current migration
     130              :             // ID is also the next index
     131            0 :             let migration_id = (current_migration + 1) as i64;
     132            0 :             let migration = self.migrations[current_migration];
     133            0 : 
     134            0 :             match Self::run_migration(self.client, migration_id, migration).await {
     135              :                 Ok(_) => {
     136            0 :                     info!("Finished migration id={}", migration_id);
     137              :                 }
     138            0 :                 Err(e) => {
     139            0 :                     error!("Failed to run migration id={}: {:?}", migration_id, e);
     140            0 :                     DB_MIGRATION_FAILED
     141            0 :                         .with_label_values(&[migration_id.to_string().as_str()])
     142            0 :                         .inc();
     143            0 :                     return Err(e);
     144              :                 }
     145              :             }
     146              : 
     147            0 :             current_migration += 1;
     148              :         }
     149              : 
     150            0 :         Ok(())
     151            0 :     }
     152              : }
        

Generated by: LCOV version 2.1-beta