Line data Source code
1 : use anyhow::{Context, Result};
2 : use fail::fail_point;
3 : use tokio_postgres::{Client, Transaction};
4 : use tracing::{error, info};
5 :
6 : use crate::metrics::DB_MIGRATION_FAILED;
7 :
8 : /// Runs a series of migrations on a target database
9 : pub(crate) struct MigrationRunner<'m> {
10 : client: &'m mut Client,
11 : migrations: &'m [&'m str],
12 : }
13 :
14 : impl<'m> MigrationRunner<'m> {
15 : /// Create a new migration runner
16 0 : pub fn new(client: &'m mut Client, migrations: &'m [&'m str]) -> Self {
17 0 : // The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
18 0 : assert!(migrations.len() + 1 < i64::MAX as usize);
19 :
20 0 : Self { client, migrations }
21 0 : }
22 :
23 : /// Get the current value neon_migration.migration_id
24 0 : async fn get_migration_id(&mut self) -> Result<i64> {
25 0 : let row = self
26 0 : .client
27 0 : .query_one("SELECT id FROM neon_migration.migration_id", &[])
28 0 : .await?;
29 :
30 0 : Ok(row.get::<&str, i64>("id"))
31 0 : }
32 :
33 : /// Update the neon_migration.migration_id value
34 : ///
35 : /// This function has a fail point called compute-migration, which can be
36 : /// used if you would like to fail the application of a series of migrations
37 : /// at some point.
38 0 : async fn update_migration_id(txn: &mut Transaction<'_>, migration_id: i64) -> Result<()> {
39 0 : // We use this fail point in order to check that failing in the
40 0 : // middle of applying a series of migrations fails in an expected
41 0 : // manner
42 0 : if cfg!(feature = "testing") {
43 0 : let fail = (|| {
44 0 : fail_point!("compute-migration", |fail_migration_id| {
45 0 : migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
46 0 : });
47 :
48 0 : false
49 0 : })();
50 0 :
51 0 : if fail {
52 0 : return Err(anyhow::anyhow!(format!(
53 0 : "migration {} was configured to fail because of a failpoint",
54 0 : migration_id
55 0 : )));
56 0 : }
57 0 : }
58 :
59 0 : txn.query(
60 0 : "UPDATE neon_migration.migration_id SET id = $1",
61 0 : &[&migration_id],
62 0 : )
63 0 : .await
64 0 : .with_context(|| format!("update neon_migration.migration_id to {migration_id}"))?;
65 :
66 0 : Ok(())
67 0 : }
68 :
69 : /// Prepare the migrations the target database for handling migrations
70 0 : async fn prepare_database(&mut self) -> Result<()> {
71 0 : self.client
72 0 : .simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
73 0 : .await?;
74 0 : self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
75 0 : self.client
76 0 : .simple_query(
77 0 : "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
78 0 : )
79 0 : .await?;
80 0 : self.client
81 0 : .simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")
82 0 : .await?;
83 0 : self.client
84 0 : .simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")
85 0 : .await?;
86 :
87 0 : Ok(())
88 0 : }
89 :
90 : /// Run an individual migration in a separate transaction block.
91 0 : async fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
92 0 : let mut txn = client
93 0 : .transaction()
94 0 : .await
95 0 : .with_context(|| format!("begin transaction for migration {migration_id}"))?;
96 :
97 0 : if migration.starts_with("-- SKIP") {
98 0 : info!("Skipping migration id={}", migration_id);
99 :
100 : // Even though we are skipping the migration, updating the
101 : // migration ID should help keep logic easy to understand when
102 : // trying to understand the state of a cluster.
103 0 : Self::update_migration_id(&mut txn, migration_id).await?;
104 : } else {
105 0 : info!("Running migration id={}:\n{}\n", migration_id, migration);
106 :
107 0 : txn.simple_query(migration)
108 0 : .await
109 0 : .with_context(|| format!("apply migration {migration_id}"))?;
110 :
111 0 : Self::update_migration_id(&mut txn, migration_id).await?;
112 : }
113 :
114 0 : txn.commit()
115 0 : .await
116 0 : .with_context(|| format!("commit transaction for migration {migration_id}"))?;
117 :
118 0 : Ok(())
119 0 : }
120 :
121 : /// Run the configured set of migrations
122 0 : pub async fn run_migrations(mut self) -> Result<()> {
123 0 : self.prepare_database()
124 0 : .await
125 0 : .context("prepare database to handle migrations")?;
126 :
127 0 : let mut current_migration = self.get_migration_id().await? as usize;
128 0 : while current_migration < self.migrations.len() {
129 : // The index lags the migration ID by 1, so the current migration
130 : // ID is also the next index
131 0 : let migration_id = (current_migration + 1) as i64;
132 0 : let migration = self.migrations[current_migration];
133 0 :
134 0 : match Self::run_migration(self.client, migration_id, migration).await {
135 : Ok(_) => {
136 0 : info!("Finished migration id={}", migration_id);
137 : }
138 0 : Err(e) => {
139 0 : error!("Failed to run migration id={}: {:?}", migration_id, e);
140 0 : DB_MIGRATION_FAILED
141 0 : .with_label_values(&[migration_id.to_string().as_str()])
142 0 : .inc();
143 0 : return Err(e);
144 : }
145 : }
146 :
147 0 : current_migration += 1;
148 : }
149 :
150 0 : Ok(())
151 0 : }
152 : }
|