Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use anyhow::Result;
4 : use compute_api::responses::{InstalledExtension, InstalledExtensions};
5 : use tokio_postgres::error::Error as PostgresError;
6 : use tokio_postgres::{Client, Config, NoTls};
7 :
8 : use crate::metrics::INSTALLED_EXTENSIONS;
9 :
10 : /// We don't reuse get_existing_dbs() just for code clarity
11 : /// and to make database listing query here more explicit.
12 : ///
13 : /// Limit the number of databases to 500 to avoid excessive load.
14 0 : async fn list_dbs(client: &mut Client) -> Result<Vec<String>, PostgresError> {
15 : // `pg_database.datconnlimit = -2` means that the database is in the
16 : // invalid state
17 0 : let databases = client
18 0 : .query(
19 0 : "SELECT datname FROM pg_catalog.pg_database
20 0 : WHERE datallowconn
21 0 : AND datconnlimit <> - 2
22 0 : LIMIT 500",
23 0 : &[],
24 0 : )
25 0 : .await?
26 0 : .iter()
27 0 : .map(|row| {
28 0 : let db: String = row.get("datname");
29 0 : db
30 0 : })
31 0 : .collect();
32 :
33 0 : Ok(databases)
34 0 : }
35 :
36 : /// Connect to every database (see list_dbs above) and get the list of installed extensions.
37 : ///
38 : /// Same extension can be installed in multiple databases with different versions,
39 : /// so we report a separate metric (number of databases where it is installed)
40 : /// for each extension version.
41 0 : pub async fn get_installed_extensions(
42 0 : mut conf: Config,
43 0 : ) -> Result<InstalledExtensions, PostgresError> {
44 0 : conf.application_name("compute_ctl:get_installed_extensions");
45 0 : let databases: Vec<String> = {
46 0 : let (mut client, connection) = conf.connect(NoTls).await?;
47 0 : tokio::spawn(async move {
48 0 : if let Err(e) = connection.await {
49 0 : eprintln!("connection error: {e}");
50 0 : }
51 0 : });
52 :
53 0 : list_dbs(&mut client).await?
54 : };
55 :
56 0 : let mut extensions_map: HashMap<(String, String, String), InstalledExtension> = HashMap::new();
57 0 : for db in databases.iter() {
58 0 : conf.dbname(db);
59 :
60 0 : let (client, connection) = conf.connect(NoTls).await?;
61 0 : tokio::spawn(async move {
62 0 : if let Err(e) = connection.await {
63 0 : eprintln!("connection error: {e}");
64 0 : }
65 0 : });
66 :
67 0 : let extensions: Vec<(String, String, i32)> = client
68 0 : .query(
69 0 : "SELECT extname, extversion, extowner::integer FROM pg_catalog.pg_extension",
70 0 : &[],
71 0 : )
72 0 : .await?
73 0 : .iter()
74 0 : .map(|row| {
75 0 : (
76 0 : row.get("extname"),
77 0 : row.get("extversion"),
78 0 : row.get("extowner"),
79 0 : )
80 0 : })
81 0 : .collect();
82 :
83 0 : for (extname, v, extowner) in extensions.iter() {
84 0 : let version = v.to_string();
85 :
86 : // check if the extension is owned by superuser
87 : // 10 is the oid of superuser
88 0 : let owned_by_superuser = if *extowner == 10 { "1" } else { "0" };
89 :
90 0 : extensions_map
91 0 : .entry((
92 0 : extname.to_string(),
93 0 : version.clone(),
94 0 : owned_by_superuser.to_string(),
95 0 : ))
96 0 : .and_modify(|e| {
97 : // count the number of databases where the extension is installed
98 0 : e.n_databases += 1;
99 0 : })
100 0 : .or_insert(InstalledExtension {
101 0 : extname: extname.to_string(),
102 0 : version: version.clone(),
103 0 : n_databases: 1,
104 0 : owned_by_superuser: owned_by_superuser.to_string(),
105 0 : });
106 : }
107 : }
108 :
109 0 : for (key, ext) in extensions_map.iter() {
110 0 : let (extname, version, owned_by_superuser) = key;
111 0 : let n_databases = ext.n_databases as u64;
112 0 :
113 0 : INSTALLED_EXTENSIONS
114 0 : .with_label_values(&[extname, version, owned_by_superuser])
115 0 : .set(n_databases);
116 0 : }
117 :
118 0 : Ok(InstalledExtensions {
119 0 : extensions: extensions_map.into_values().collect(),
120 0 : })
121 0 : }
|