Line data Source code
1 : use anyhow::{Context, Result};
2 : use fail::fail_point;
3 : use postgres::Client;
4 : use tracing::info;
5 :
6 : /// Runs a series of migrations on a target database
7 : pub(crate) struct MigrationRunner<'m> {
8 : client: &'m mut Client,
9 : migrations: &'m [&'m str],
10 : }
11 :
12 : impl<'m> MigrationRunner<'m> {
13 : /// Create a new migration runner
14 0 : pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
15 0 : // The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
16 0 : assert!(migrations.len() + 1 < i64::MAX as usize);
17 :
18 0 : Self { client, migrations }
19 0 : }
20 :
21 : /// Get the current value neon_migration.migration_id
22 0 : fn get_migration_id(&mut self) -> Result<i64> {
23 0 : let query = "SELECT id FROM neon_migration.migration_id";
24 0 : let row = self
25 0 : .client
26 0 : .query_one(query, &[])
27 0 : .context("run_migrations get migration_id")?;
28 :
29 0 : Ok(row.get::<&str, i64>("id"))
30 0 : }
31 :
32 : /// Update the neon_migration.migration_id value
33 : ///
34 : /// This function has a fail point called compute-migration, which can be
35 : /// used if you would like to fail the application of a series of migrations
36 : /// at some point.
37 0 : fn update_migration_id(&mut self, migration_id: i64) -> Result<()> {
38 0 : // We use this fail point in order to check that failing in the
39 0 : // middle of applying a series of migrations fails in an expected
40 0 : // manner
41 0 : if cfg!(feature = "testing") {
42 0 : let fail = (|| {
43 0 : fail_point!("compute-migration", |fail_migration_id| {
44 0 : migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
45 0 : });
46 :
47 0 : false
48 0 : })();
49 0 :
50 0 : if fail {
51 0 : return Err(anyhow::anyhow!(format!(
52 0 : "migration {} was configured to fail because of a failpoint",
53 0 : migration_id
54 0 : )));
55 0 : }
56 0 : }
57 :
58 0 : self.client
59 0 : .query(
60 0 : "UPDATE neon_migration.migration_id SET id = $1",
61 0 : &[&migration_id],
62 0 : )
63 0 : .context("run_migrations update id")?;
64 :
65 0 : Ok(())
66 0 : }
67 :
68 : /// Prepare the migrations the target database for handling migrations
69 0 : fn prepare_database(&mut self) -> Result<()> {
70 0 : self.client
71 0 : .simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")?;
72 0 : self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)")?;
73 0 : self.client.simple_query(
74 0 : "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
75 0 : )?;
76 0 : self.client
77 0 : .simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")?;
78 0 : self.client
79 0 : .simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")?;
80 :
81 0 : Ok(())
82 0 : }
83 :
84 : /// Run the configrured set of migrations
85 0 : pub fn run_migrations(mut self) -> Result<()> {
86 0 : self.prepare_database()?;
87 :
88 0 : let mut current_migration = self.get_migration_id()? as usize;
89 0 : while current_migration < self.migrations.len() {
90 0 : macro_rules! migration_id {
91 : ($cm:expr) => {
92 : ($cm + 1) as i64
93 : };
94 : }
95 :
96 0 : let migration = self.migrations[current_migration];
97 0 :
98 0 : if migration.starts_with("-- SKIP") {
99 0 : info!("Skipping migration id={}", migration_id!(current_migration));
100 :
101 : // Even though we are skipping the migration, updating the
102 : // migration ID should help keep logic easy to understand when
103 : // trying to understand the state of a cluster.
104 0 : self.update_migration_id(migration_id!(current_migration))?;
105 : } else {
106 0 : info!(
107 0 : "Running migration id={}:\n{}\n",
108 0 : migration_id!(current_migration),
109 : migration
110 : );
111 :
112 0 : self.client
113 0 : .simple_query("BEGIN")
114 0 : .context("begin migration")?;
115 :
116 0 : self.client.simple_query(migration).with_context(|| {
117 0 : format!(
118 0 : "run_migrations migration id={}",
119 0 : migration_id!(current_migration)
120 0 : )
121 0 : })?;
122 :
123 0 : self.update_migration_id(migration_id!(current_migration))?;
124 :
125 0 : self.client
126 0 : .simple_query("COMMIT")
127 0 : .context("commit migration")?;
128 :
129 0 : info!("Finished migration id={}", migration_id!(current_migration));
130 : }
131 :
132 0 : current_migration += 1;
133 : }
134 :
135 0 : Ok(())
136 0 : }
137 : }
|