Line data Source code
1 : use std::collections::HashMap;
2 :
3 : use anyhow::Result;
4 : use compute_api::responses::{InstalledExtension, InstalledExtensions};
5 : use once_cell::sync::Lazy;
6 : use tokio_postgres::error::Error as PostgresError;
7 : use tokio_postgres::{Client, Config, NoTls};
8 :
9 : use crate::metrics::INSTALLED_EXTENSIONS;
10 :
11 : /// We don't reuse get_existing_dbs() just for code clarity
12 : /// and to make database listing query here more explicit.
13 : ///
14 : /// Limit the number of databases to 500 to avoid excessive load.
15 0 : async fn list_dbs(client: &mut Client) -> Result<Vec<String>, PostgresError> {
16 : // `pg_database.datconnlimit = -2` means that the database is in the
17 : // invalid state
18 0 : let databases = client
19 0 : .query(
20 0 : "SELECT datname FROM pg_catalog.pg_database
21 0 : WHERE datallowconn
22 0 : AND datconnlimit <> - 2
23 0 : LIMIT 500",
24 0 : &[],
25 0 : )
26 0 : .await?
27 0 : .iter()
28 0 : .map(|row| {
29 0 : let db: String = row.get("datname");
30 0 : db
31 0 : })
32 0 : .collect();
33 :
34 0 : Ok(databases)
35 0 : }
36 :
37 : /// Connect to every database (see list_dbs above) and get the list of installed extensions.
38 : ///
39 : /// Same extension can be installed in multiple databases with different versions,
40 : /// so we report a separate metric (number of databases where it is installed)
41 : /// for each extension version.
42 0 : pub async fn get_installed_extensions(
43 0 : mut conf: Config,
44 0 : ) -> Result<InstalledExtensions, PostgresError> {
45 0 : conf.application_name("compute_ctl:get_installed_extensions");
46 0 : let databases: Vec<String> = {
47 0 : let (mut client, connection) = conf.connect(NoTls).await?;
48 0 : tokio::spawn(async move {
49 0 : if let Err(e) = connection.await {
50 0 : eprintln!("connection error: {e}");
51 0 : }
52 0 : });
53 :
54 0 : list_dbs(&mut client).await?
55 : };
56 :
57 0 : let mut extensions_map: HashMap<(String, String, String), InstalledExtension> = HashMap::new();
58 0 : for db in databases.iter() {
59 0 : conf.dbname(db);
60 :
61 0 : let (client, connection) = conf.connect(NoTls).await?;
62 0 : tokio::spawn(async move {
63 0 : if let Err(e) = connection.await {
64 0 : eprintln!("connection error: {e}");
65 0 : }
66 0 : });
67 :
68 0 : let extensions: Vec<(String, String, i32)> = client
69 0 : .query(
70 0 : "SELECT extname, extversion, extowner::integer FROM pg_catalog.pg_extension",
71 0 : &[],
72 0 : )
73 0 : .await?
74 0 : .iter()
75 0 : .map(|row| {
76 0 : (
77 0 : row.get("extname"),
78 0 : row.get("extversion"),
79 0 : row.get("extowner"),
80 0 : )
81 0 : })
82 0 : .collect();
83 :
84 0 : for (extname, v, extowner) in extensions.iter() {
85 0 : let version = v.to_string();
86 :
87 : // check if the extension is owned by superuser
88 : // 10 is the oid of superuser
89 0 : let owned_by_superuser = if *extowner == 10 { "1" } else { "0" };
90 :
91 0 : extensions_map
92 0 : .entry((
93 0 : extname.to_string(),
94 0 : version.clone(),
95 0 : owned_by_superuser.to_string(),
96 0 : ))
97 0 : .and_modify(|e| {
98 : // count the number of databases where the extension is installed
99 0 : e.n_databases += 1;
100 0 : })
101 0 : .or_insert(InstalledExtension {
102 0 : extname: extname.to_string(),
103 0 : version: version.clone(),
104 0 : n_databases: 1,
105 0 : owned_by_superuser: owned_by_superuser.to_string(),
106 0 : });
107 : }
108 : }
109 :
110 0 : for (key, ext) in extensions_map.iter() {
111 0 : let (extname, version, owned_by_superuser) = key;
112 0 : let n_databases = ext.n_databases as u64;
113 0 :
114 0 : INSTALLED_EXTENSIONS
115 0 : .with_label_values(&[extname, version, owned_by_superuser])
116 0 : .set(n_databases);
117 0 : }
118 :
119 0 : Ok(InstalledExtensions {
120 0 : extensions: extensions_map.into_values().collect(),
121 0 : })
122 0 : }
123 :
124 0 : pub fn initialize_metrics() {
125 0 : Lazy::force(&INSTALLED_EXTENSIONS);
126 0 : }
|