Line data Source code
1 : use anyhow::{Context, Result};
2 : use fail::fail_point;
3 : use tokio_postgres::{Client, Transaction};
4 : use tracing::{error, info};
5 :
6 : use crate::metrics::DB_MIGRATION_FAILED;
7 :
8 : /// Runs a series of migrations on a target database
9 : pub(crate) struct MigrationRunner<'m> {
10 : client: &'m mut Client,
11 : migrations: &'m [&'m str],
12 : lakebase_mode: bool,
13 : }
14 :
15 : impl<'m> MigrationRunner<'m> {
16 : /// Create a new migration runner
17 0 : pub fn new(client: &'m mut Client, migrations: &'m [&'m str], lakebase_mode: bool) -> Self {
18 : // The neon_migration.migration_id::id column is a bigint, which is equivalent to an i64
19 0 : assert!(migrations.len() + 1 < i64::MAX as usize);
20 :
21 0 : Self {
22 0 : client,
23 0 : migrations,
24 0 : lakebase_mode,
25 0 : }
26 0 : }
27 :
28 : /// Get the current value neon_migration.migration_id
29 0 : async fn get_migration_id(&mut self) -> Result<i64> {
30 0 : let row = self
31 0 : .client
32 0 : .query_one("SELECT id FROM neon_migration.migration_id", &[])
33 0 : .await?;
34 :
35 0 : Ok(row.get::<&str, i64>("id"))
36 0 : }
37 :
38 : /// Update the neon_migration.migration_id value
39 : ///
40 : /// This function has a fail point called compute-migration, which can be
41 : /// used if you would like to fail the application of a series of migrations
42 : /// at some point.
43 0 : async fn update_migration_id(txn: &mut Transaction<'_>, migration_id: i64) -> Result<()> {
44 : // We use this fail point in order to check that failing in the
45 : // middle of applying a series of migrations fails in an expected
46 : // manner
47 0 : if cfg!(feature = "testing") {
48 0 : let fail = (|| {
49 0 : fail_point!("compute-migration", |fail_migration_id| {
50 0 : migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
51 0 : });
52 :
53 0 : false
54 : })();
55 :
56 0 : if fail {
57 0 : return Err(anyhow::anyhow!(format!(
58 0 : "migration {} was configured to fail because of a failpoint",
59 0 : migration_id
60 0 : )));
61 0 : }
62 0 : }
63 :
64 0 : txn.query(
65 0 : "UPDATE neon_migration.migration_id SET id = $1",
66 0 : &[&migration_id],
67 0 : )
68 0 : .await
69 0 : .with_context(|| format!("update neon_migration.migration_id to {migration_id}"))?;
70 :
71 0 : Ok(())
72 0 : }
73 :
74 : /// Prepare the migrations the target database for handling migrations
75 0 : async fn prepare_database(&mut self) -> Result<()> {
76 0 : self.client
77 0 : .simple_query("CREATE SCHEMA IF NOT EXISTS neon_migration")
78 0 : .await?;
79 0 : self.client.simple_query("CREATE TABLE IF NOT EXISTS neon_migration.migration_id (key INT NOT NULL PRIMARY KEY, id bigint NOT NULL DEFAULT 0)").await?;
80 0 : self.client
81 0 : .simple_query(
82 0 : "INSERT INTO neon_migration.migration_id VALUES (0, 0) ON CONFLICT DO NOTHING",
83 0 : )
84 0 : .await?;
85 0 : self.client
86 0 : .simple_query("ALTER SCHEMA neon_migration OWNER TO cloud_admin")
87 0 : .await?;
88 0 : self.client
89 0 : .simple_query("REVOKE ALL ON SCHEMA neon_migration FROM PUBLIC")
90 0 : .await?;
91 :
92 0 : Ok(())
93 0 : }
94 :
95 : /// Run an individual migration in a separate transaction block.
96 0 : async fn run_migration(client: &mut Client, migration_id: i64, migration: &str) -> Result<()> {
97 0 : let mut txn = client
98 0 : .transaction()
99 0 : .await
100 0 : .with_context(|| format!("begin transaction for migration {migration_id}"))?;
101 :
102 0 : if migration.starts_with("-- SKIP") {
103 0 : info!("Skipping migration id={}", migration_id);
104 :
105 : // Even though we are skipping the migration, updating the
106 : // migration ID should help keep logic easy to understand when
107 : // trying to understand the state of a cluster.
108 0 : Self::update_migration_id(&mut txn, migration_id).await?;
109 : } else {
110 0 : info!("Running migration id={}:\n{}\n", migration_id, migration);
111 :
112 0 : txn.simple_query(migration)
113 0 : .await
114 0 : .with_context(|| format!("apply migration {migration_id}"))?;
115 :
116 0 : Self::update_migration_id(&mut txn, migration_id).await?;
117 : }
118 :
119 0 : txn.commit()
120 0 : .await
121 0 : .with_context(|| format!("commit transaction for migration {migration_id}"))?;
122 :
123 0 : Ok(())
124 0 : }
125 :
126 : /// Run the configured set of migrations
127 0 : pub async fn run_migrations(mut self) -> Result<()> {
128 0 : self.prepare_database()
129 0 : .await
130 0 : .context("prepare database to handle migrations")?;
131 :
132 0 : let mut current_migration = self.get_migration_id().await? as usize;
133 0 : while current_migration < self.migrations.len() {
134 : // The index lags the migration ID by 1, so the current migration
135 : // ID is also the next index
136 0 : let migration_id = (current_migration + 1) as i64;
137 0 : let migration = self.migrations[current_migration];
138 0 : let migration = if self.lakebase_mode {
139 0 : migration.replace("neon_superuser", "databricks_superuser")
140 : } else {
141 0 : migration.to_string()
142 : };
143 :
144 0 : match Self::run_migration(self.client, migration_id, &migration).await {
145 : Ok(_) => {
146 0 : info!("Finished migration id={}", migration_id);
147 : }
148 0 : Err(e) => {
149 0 : error!("Failed to run migration id={}: {:?}", migration_id, e);
150 0 : DB_MIGRATION_FAILED
151 0 : .with_label_values(&[migration_id.to_string().as_str()])
152 0 : .inc();
153 0 : return Err(e);
154 : }
155 : }
156 :
157 0 : current_migration += 1;
158 : }
159 :
160 0 : Ok(())
161 0 : }
162 : }
|