Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use anyhow::Result;
4 : use compute_api::responses::{InstalledExtension, InstalledExtensions};
5 : use tokio_postgres::{Client, Config, NoTls};
6 :
7 : use crate::metrics::INSTALLED_EXTENSIONS;
8 :
9 : /// We don't reuse get_existing_dbs() just for code clarity
10 : /// and to make database listing query here more explicit.
11 : ///
12 : /// Limit the number of databases to 500 to avoid excessive load.
13 0 : async fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
14 : // `pg_database.datconnlimit = -2` means that the database is in the
15 : // invalid state
16 0 : let databases = client
17 0 : .query(
18 0 : "SELECT datname FROM pg_catalog.pg_database
19 0 : WHERE datallowconn
20 0 : AND datconnlimit <> - 2
21 0 : LIMIT 500",
22 0 : &[],
23 0 : )
24 0 : .await?
25 0 : .iter()
26 0 : .map(|row| {
27 0 : let db: String = row.get("datname");
28 0 : db
29 0 : })
30 0 : .collect();
31 0 :
32 0 : Ok(databases)
33 0 : }
34 :
35 : /// Connect to every database (see list_dbs above) and get the list of installed extensions.
36 : ///
37 : /// Same extension can be installed in multiple databases with different versions,
38 : /// so we report a separate metric (number of databases where it is installed)
39 : /// for each extension version.
40 0 : pub async fn get_installed_extensions(mut conf: Config) -> Result<InstalledExtensions> {
41 0 : conf.application_name("compute_ctl:get_installed_extensions");
42 0 : let databases: Vec<String> = {
43 0 : let (mut client, connection) = conf.connect(NoTls).await?;
44 0 : tokio::spawn(async move {
45 0 : if let Err(e) = connection.await {
46 0 : eprintln!("connection error: {}", e);
47 0 : }
48 0 : });
49 0 :
50 0 : list_dbs(&mut client).await?
51 : };
52 :
53 0 : let mut extensions_map: HashMap<(String, String, String), InstalledExtension> = HashMap::new();
54 0 : for db in databases.iter() {
55 0 : conf.dbname(db);
56 :
57 0 : let (client, connection) = conf.connect(NoTls).await?;
58 0 : tokio::spawn(async move {
59 0 : if let Err(e) = connection.await {
60 0 : eprintln!("connection error: {}", e);
61 0 : }
62 0 : });
63 :
64 0 : let extensions: Vec<(String, String, i32)> = client
65 0 : .query(
66 0 : "SELECT extname, extversion, extowner::integer FROM pg_catalog.pg_extension",
67 0 : &[],
68 0 : )
69 0 : .await?
70 0 : .iter()
71 0 : .map(|row| {
72 0 : (
73 0 : row.get("extname"),
74 0 : row.get("extversion"),
75 0 : row.get("extowner"),
76 0 : )
77 0 : })
78 0 : .collect();
79 :
80 0 : for (extname, v, extowner) in extensions.iter() {
81 0 : let version = v.to_string();
82 :
83 : // check if the extension is owned by superuser
84 : // 10 is the oid of superuser
85 0 : let owned_by_superuser = if *extowner == 10 { "1" } else { "0" };
86 :
87 0 : extensions_map
88 0 : .entry((
89 0 : extname.to_string(),
90 0 : version.clone(),
91 0 : owned_by_superuser.to_string(),
92 0 : ))
93 0 : .and_modify(|e| {
94 0 : // count the number of databases where the extension is installed
95 0 : e.n_databases += 1;
96 0 : })
97 0 : .or_insert(InstalledExtension {
98 0 : extname: extname.to_string(),
99 0 : version: version.clone(),
100 0 : n_databases: 1,
101 0 : owned_by_superuser: owned_by_superuser.to_string(),
102 0 : });
103 0 : }
104 : }
105 :
106 0 : for (key, ext) in extensions_map.iter() {
107 0 : let (extname, version, owned_by_superuser) = key;
108 0 : let n_databases = ext.n_databases as u64;
109 0 :
110 0 : INSTALLED_EXTENSIONS
111 0 : .with_label_values(&[extname, version, owned_by_superuser])
112 0 : .set(n_databases);
113 0 : }
114 :
115 0 : Ok(InstalledExtensions {
116 0 : extensions: extensions_map.into_values().collect(),
117 0 : })
118 0 : }
|