Line data Source code
1 : use anyhow::{Context, Result};
2 : use fail::fail_point;
3 : use postgres::{Client, Transaction};
4 : use tracing::info;
5 :
6 : /// Runs a series of migrations on a target database
7 : pub(crate) struct MigrationRunner<'m> {
8 : client: &'m mut Client,
9 : migrations: &'m [&'m str],
10 : }
11 :
12 : impl<'m> MigrationRunner<'m> {
13 : /// Create a new migration runner
14 0 : pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
15 0 : // The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
16 0 : assert!(migrations.len() + 1 < i64::MAX as usize);
17 :
18 0 : Self { client, migrations }
19 0 : }
20 :
21 : /// Get the current value neon_migration.migration_id
22 0 : fn get_migration_id(&mut self) -> Result<i64> {
23 0 : let row = self
24 0 : .client
25 0 : .query_one("SELECT id FROM neon_migration.migration_id", &[])?;
26 :
27 0 : Ok(row.get::<&str, i64>("id"))
28 0 : }
29 :
30 : /// Update the neon_migration.migration_id value
31 : ///
32 : /// This function has a fail point called compute-migration, which can be
33 : /// used if you would like to fail the application of a series of migrations
34 : /// at some point.
35 0 : fn update_migration_id(txn: &mut Transaction, migration_id: i64) -> Result<()> {
36 0 : // We use this fail point in order to check that failing in the
37 0 : // middle of applying a series of migrations fails in an expected
38 0 : // manner
39 0 : if cfg!(feature = "testing") {
40 0 : let fail = (|| {
41 0 : fail_point!("compute-migration", |fail_migration_id| {
42 0 : migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
43 0 : });
44 :
45 0 : false
46 0 : })();
47 0 :
48 0 : if fail {
49 0 : return Err(anyhow::anyhow!(format!(
50 0 : "migration {} was configured to fail because of a failpoint",
51 0 : migration_id
52 0 : )));
53 0 : }
54 0 : }
55 :
56 0 : txn.query(
57 0 : "UPDATE neon_migration.migration_id SET id = $1",
58 0 : &[&migration_id],
59 0 : )
60 0 : .with_context(|| format!("update neon_migration.migration_id to {migration_id}"))?;
61 :
62 0 : Ok(())
63 0 : }
64 :
65 : /// Prepare the migrations the target database for handling migrations
66 0 : fn prepare_database(&mut self) -> Result<()> {
67 0 : self.client
68 0 : .simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")?;
69 0 : self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)")?;
70 0 : self.client.simple_query(
71 0 : "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
72 0 : )?;
73 0 : self.client
74 0 : .simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")?;
75 0 : self.client
76 0 : .simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")?;
77 :
78 0 : Ok(())
79 0 : }
80 :
81 : /// Run an individual migration
82 0 : fn run_migration(txn: &mut Transaction, migration_id: i64, migration: &str) -> Result<()> {
83 0 : if migration.starts_with("-- SKIP") {
84 0 : info!("Skipping migration id={}", migration_id);
85 :
86 : // Even though we are skipping the migration, updating the
87 : // migration ID should help keep logic easy to understand when
88 : // trying to understand the state of a cluster.
89 0 : Self::update_migration_id(txn, migration_id)?;
90 : } else {
91 0 : info!("Running migration id={}:\n{}\n", migration_id, migration);
92 :
93 0 : txn.simple_query(migration)
94 0 : .with_context(|| format!("apply migration {migration_id}"))?;
95 :
96 0 : Self::update_migration_id(txn, migration_id)?;
97 : }
98 :
99 0 : Ok(())
100 0 : }
101 :
102 : /// Run the configured set of migrations
103 0 : pub fn run_migrations(mut self) -> Result<()> {
104 0 : self.prepare_database()
105 0 : .context("prepare database to handle migrations")?;
106 :
107 0 : let mut current_migration = self.get_migration_id()? as usize;
108 0 : while current_migration < self.migrations.len() {
109 : // The index lags the migration ID by 1, so the current migration
110 : // ID is also the next index
111 0 : let migration_id = (current_migration + 1) as i64;
112 :
113 0 : let mut txn = self
114 0 : .client
115 0 : .transaction()
116 0 : .with_context(|| format!("begin transaction for migration {migration_id}"))?;
117 :
118 0 : Self::run_migration(&mut txn, migration_id, self.migrations[current_migration])
119 0 : .with_context(|| format!("running migration {migration_id}"))?;
120 :
121 0 : txn.commit()
122 0 : .with_context(|| format!("commit transaction for migration {migration_id}"))?;
123 :
124 0 : info!("Finished migration id={}", migration_id);
125 :
126 0 : current_migration += 1;
127 : }
128 :
129 0 : Ok(())
130 0 : }
131 : }
|