Line data Source code
1 : use anyhow::{Context, Result};
2 : use postgres::Client;
3 : use tracing::info;
4 :
5 : pub(crate) struct MigrationRunner<'m> {
6 : client: &'m mut Client,
7 : migrations: &'m [&'m str],
8 : }
9 :
10 : impl<'m> MigrationRunner<'m> {
11 0 : pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
12 0 : // The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
13 0 : assert!(migrations.len() + 1 < i64::MAX as usize);
14 :
15 0 : Self { client, migrations }
16 0 : }
17 :
18 0 : fn get_migration_id(&mut self) -> Result<i64> {
19 0 : let query = "SELECT id FROM neon_migration.migration_id";
20 0 : let row = self
21 0 : .client
22 0 : .query_one(query, &[])
23 0 : .context("run_migrations get migration_id")?;
24 :
25 0 : Ok(row.get::<&str, i64>("id"))
26 0 : }
27 :
28 0 : fn update_migration_id(&mut self, migration_id: i64) -> Result<()> {
29 0 : let setval = format!("UPDATE neon_migration.migration_id SET id={}", migration_id);
30 0 :
31 0 : self.client
32 0 : .simple_query(&setval)
33 0 : .context("run_migrations update id")?;
34 :
35 0 : Ok(())
36 0 : }
37 :
38 0 : fn prepare_migrations(&mut self) -> Result<()> {
39 0 : let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
40 0 : self.client.simple_query(query)?;
41 :
42 0 : let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
43 0 : self.client.simple_query(query)?;
44 :
45 0 : let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
46 0 : self.client.simple_query(query)?;
47 :
48 0 : let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
49 0 : self.client.simple_query(query)?;
50 :
51 0 : let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
52 0 : self.client.simple_query(query)?;
53 :
54 0 : Ok(())
55 0 : }
56 :
57 0 : pub fn run_migrations(mut self) -> Result<()> {
58 0 : self.prepare_migrations()?;
59 :
60 0 : let mut current_migration = self.get_migration_id()? as usize;
61 0 : while current_migration < self.migrations.len() {
62 0 : macro_rules! migration_id {
63 : ($cm:expr) => {
64 : ($cm + 1) as i64
65 : };
66 : }
67 :
68 0 : let migration = self.migrations[current_migration];
69 0 :
70 0 : if migration.starts_with("-- SKIP") {
71 0 : info!("Skipping migration id={}", migration_id!(current_migration));
72 : } else {
73 0 : info!(
74 0 : "Running migration id={}:\n{}\n",
75 0 : migration_id!(current_migration),
76 : migration
77 : );
78 :
79 0 : self.client
80 0 : .simple_query("BEGIN")
81 0 : .context("begin migration")?;
82 :
83 0 : self.client.simple_query(migration).with_context(|| {
84 0 : format!(
85 0 : "run_migrations migration id={}",
86 0 : migration_id!(current_migration)
87 0 : )
88 0 : })?;
89 :
90 : // Migration IDs start at 1
91 0 : self.update_migration_id(migration_id!(current_migration))?;
92 :
93 0 : self.client
94 0 : .simple_query("COMMIT")
95 0 : .context("commit migration")?;
96 :
97 0 : info!("Finished migration id={}", migration_id!(current_migration));
98 : }
99 :
100 0 : current_migration += 1;
101 : }
102 :
103 0 : Ok(())
104 0 : }
105 : }
|