Line data Source code
1 : use anyhow::{Context, Result};
2 : use fail::fail_point;
3 : use postgres::{Client, Transaction};
4 : use tracing::{error, info};
5 :
6 : use crate::metrics::DB_MIGRATION_FAILED;
7 :
8 : /// Runs a series of migrations on a target database
9 : pub(crate) struct MigrationRunner<'m> {
10 : client: &'m mut Client,
11 : migrations: &'m [&'m str],
12 : }
13 :
14 : impl<'m> MigrationRunner<'m> {
15 : /// Create a new migration runner
16 0 : pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
17 0 : // The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
18 0 : assert!(migrations.len() + 1 < i64::MAX as usize);
19 :
20 0 : Self { client, migrations }
21 0 : }
22 :
23 : /// Get the current value neon_migration.migration_id
24 0 : fn get_migration_id(&mut self) -> Result<i64> {
25 0 : let row = self
26 0 : .client
27 0 : .query_one("SELECT id FROM neon_migration.migration_id", &[])?;
28 :
29 0 : Ok(row.get::<&str, i64>("id"))
30 0 : }
31 :
32 : /// Update the neon_migration.migration_id value
33 : ///
34 : /// This function has a fail point called compute-migration, which can be
35 : /// used if you would like to fail the application of a series of migrations
36 : /// at some point.
37 0 : fn update_migration_id(txn: &mut Transaction, migration_id: i64) -> Result<()> {
38 0 : // We use this fail point in order to check that failing in the
39 0 : // middle of applying a series of migrations fails in an expected
40 0 : // manner
41 0 : if cfg!(feature = "testing") {
42 0 : let fail = (|| {
43 0 : fail_point!("compute-migration", |fail_migration_id| {
44 0 : migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
45 0 : });
46 :
47 0 : false
48 0 : })();
49 0 :
50 0 : if fail {
51 0 : return Err(anyhow::anyhow!(format!(
52 0 : "migration {} was configured to fail because of a failpoint",
53 0 : migration_id
54 0 : )));
55 0 : }
56 0 : }
57 :
58 0 : txn.query(
59 0 : "UPDATE neon_migration.migration_id SET id = $1",
60 0 : &[&migration_id],
61 0 : )
62 0 : .with_context(|| format!("update neon_migration.migration_id to {migration_id}"))?;
63 :
64 0 : Ok(())
65 0 : }
66 :
67 : /// Prepare the migrations the target database for handling migrations
68 0 : fn prepare_database(&mut self) -> Result<()> {
69 0 : self.client
70 0 : .simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")?;
71 0 : self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)")?;
72 0 : self.client.simple_query(
73 0 : "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
74 0 : )?;
75 0 : self.client
76 0 : .simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")?;
77 0 : self.client
78 0 : .simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")?;
79 :
80 0 : Ok(())
81 0 : }
82 :
83 : /// Run an individual migration in a separate transaction block.
84 0 : fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
85 0 : let mut txn = client
86 0 : .transaction()
87 0 : .with_context(|| format!("begin transaction for migration {migration_id}"))?;
88 :
89 0 : if migration.starts_with("-- SKIP") {
90 0 : info!("Skipping migration id={}", migration_id);
91 :
92 : // Even though we are skipping the migration, updating the
93 : // migration ID should help keep logic easy to understand when
94 : // trying to understand the state of a cluster.
95 0 : Self::update_migration_id(&mut txn, migration_id)?;
96 : } else {
97 0 : info!("Running migration id={}:\n{}\n", migration_id, migration);
98 :
99 0 : txn.simple_query(migration)
100 0 : .with_context(|| format!("apply migration {migration_id}"))?;
101 :
102 0 : Self::update_migration_id(&mut txn, migration_id)?;
103 : }
104 :
105 0 : txn.commit()
106 0 : .with_context(|| format!("commit transaction for migration {migration_id}"))?;
107 :
108 0 : Ok(())
109 0 : }
110 :
111 : /// Run the configured set of migrations
112 0 : pub fn run_migrations(mut self) -> Result<()> {
113 0 : self.prepare_database()
114 0 : .context("prepare database to handle migrations")?;
115 :
116 0 : let mut current_migration = self.get_migration_id()? as usize;
117 0 : while current_migration < self.migrations.len() {
118 : // The index lags the migration ID by 1, so the current migration
119 : // ID is also the next index
120 0 : let migration_id = (current_migration + 1) as i64;
121 0 : let migration = self.migrations[current_migration];
122 0 :
123 0 : match Self::run_migration(self.client, migration_id, migration) {
124 : Ok(_) => {
125 0 : info!("Finished migration id={}", migration_id);
126 : }
127 0 : Err(e) => {
128 0 : error!("Failed to run migration id={}: {}", migration_id, e);
129 0 : DB_MIGRATION_FAILED
130 0 : .with_label_values(&[migration_id.to_string().as_str()])
131 0 : .inc();
132 0 : return Err(e);
133 : }
134 : }
135 :
136 0 : current_migration += 1;
137 : }
138 :
139 0 : Ok(())
140 0 : }
141 : }
|