Line data Source code
1 : use anyhow::{Context, Result};
2 : use fail::fail_point;
3 : use postgres::Client;
4 : use tracing::info;
5 :
6 : pub(crate) struct MigrationRunner<'m> {
7 : client: &'m mut Client,
8 : migrations: &'m [&'m str],
9 : }
10 :
11 : impl<'m> MigrationRunner<'m> {
12 0 : pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
13 0 : // The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
14 0 : assert!(migrations.len() + 1 < i64::MAX as usize);
15 :
16 0 : Self { client, migrations }
17 0 : }
18 :
19 0 : fn get_migration_id(&mut self) -> Result<i64> {
20 0 : let query = "SELECT id FROM neon_migration.migration_id";
21 0 : let row = self
22 0 : .client
23 0 : .query_one(query, &[])
24 0 : .context("run_migrations get migration_id")?;
25 :
26 0 : Ok(row.get::<&str, i64>("id"))
27 0 : }
28 :
29 0 : fn update_migration_id(&mut self, migration_id: i64) -> Result<()> {
30 0 : let setval = format!("UPDATE neon_migration.migration_id SET id={}", migration_id);
31 0 :
32 0 : self.client
33 0 : .simple_query(&setval)
34 0 : .context("run_migrations update id")?;
35 :
36 0 : Ok(())
37 0 : }
38 :
39 0 : fn prepare_migrations(&mut self) -> Result<()> {
40 0 : let query = "CREATE SCHEMA IF NOT EXISTS neon_migration";
41 0 : self.client.simple_query(query)?;
42 :
43 0 : let query = "CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)";
44 0 : self.client.simple_query(query)?;
45 :
46 0 : let query = "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING";
47 0 : self.client.simple_query(query)?;
48 :
49 0 : let query = "ALTER SCHEMA neon_migration OWNER TO cloud_admin";
50 0 : self.client.simple_query(query)?;
51 :
52 0 : let query = "REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC";
53 0 : self.client.simple_query(query)?;
54 :
55 0 : Ok(())
56 0 : }
57 :
58 0 : pub fn run_migrations(mut self) -> Result<()> {
59 0 : self.prepare_migrations()?;
60 :
61 0 : let mut current_migration = self.get_migration_id()? as usize;
62 0 : while current_migration < self.migrations.len() {
63 0 : macro_rules! migration_id {
64 : ($cm:expr) => {
65 : ($cm + 1) as i64
66 : };
67 : }
68 :
69 0 : let migration = self.migrations[current_migration];
70 0 :
71 0 : if migration.starts_with("-- SKIP") {
72 0 : info!("Skipping migration id={}", migration_id!(current_migration));
73 : } else {
74 0 : info!(
75 0 : "Running migration id={}:\n{}\n",
76 0 : migration_id!(current_migration),
77 : migration
78 : );
79 :
80 : // We use this fail point in order to check that failing in
81 : // the middle of applying a series of migrations fails in
82 : // an expected manner, including
83 : // neon_migration.migration_id reporting as 1 and that we
84 : // start applying migrations on the next restart at 2.
85 : {
86 0 : fail_point!(
87 0 : "the-second-compute-migration",
88 0 : migration_id!(current_migration) == 2,
89 0 : |_| {
90 0 : Err(anyhow::anyhow!(
91 0 : "Failing the second migration for that restarted endpoints start at the correct migration"
92 0 : ))
93 0 : }
94 0 : );
95 : }
96 :
97 0 : self.client
98 0 : .simple_query("BEGIN")
99 0 : .context("begin migration")?;
100 :
101 0 : self.client.simple_query(migration).with_context(|| {
102 0 : format!(
103 0 : "run_migrations migration id={}",
104 0 : migration_id!(current_migration)
105 0 : )
106 0 : })?;
107 :
108 : // Migration IDs start at 1
109 0 : self.update_migration_id(migration_id!(current_migration))?;
110 :
111 0 : self.client
112 0 : .simple_query("COMMIT")
113 0 : .context("commit migration")?;
114 :
115 0 : info!("Finished migration id={}", migration_id!(current_migration));
116 : }
117 :
118 0 : current_migration += 1;
119 : }
120 :
121 0 : Ok(())
122 0 : }
123 : }
|