Line data Source code
1 : use compute_api::responses::{InstalledExtension, InstalledExtensions};
2 : use metrics::proto::MetricFamily;
3 : use std::collections::HashMap;
4 : use std::collections::HashSet;
5 : use tracing::info;
6 : use url::Url;
7 :
8 : use anyhow::Result;
9 : use postgres::{Client, NoTls};
10 : use tokio::task;
11 :
12 : use metrics::core::Collector;
13 : use metrics::{register_uint_gauge_vec, UIntGaugeVec};
14 : use once_cell::sync::Lazy;
15 :
16 : /// We don't reuse get_existing_dbs() just for code clarity
17 : /// and to make database listing query here more explicit.
18 : ///
19 : /// Limit the number of databases to 500 to avoid excessive load.
20 0 : fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
21 : // `pg_database.datconnlimit = -2` means that the database is in the
22 : // invalid state
23 0 : let databases = client
24 0 : .query(
25 0 : "SELECT datname FROM pg_catalog.pg_database
26 0 : WHERE datallowconn
27 0 : AND datconnlimit <> - 2
28 0 : LIMIT 500",
29 0 : &[],
30 0 : )?
31 0 : .iter()
32 0 : .map(|row| {
33 0 : let db: String = row.get("datname");
34 0 : db
35 0 : })
36 0 : .collect();
37 0 :
38 0 : Ok(databases)
39 0 : }
40 :
41 : /// Connect to every database (see list_dbs above) and get the list of installed extensions.
42 : ///
43 : /// Same extension can be installed in multiple databases with different versions,
44 : /// we only keep the highest and lowest version across all databases.
45 0 : pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtensions> {
46 0 : let mut connstr = connstr.clone();
47 0 :
48 0 : task::spawn_blocking(move || {
49 0 : let mut client = Client::connect(connstr.as_str(), NoTls)?;
50 0 : let databases: Vec<String> = list_dbs(&mut client)?;
51 :
52 0 : let mut extensions_map: HashMap<String, InstalledExtension> = HashMap::new();
53 0 : for db in databases.iter() {
54 0 : connstr.set_path(db);
55 0 : let mut db_client = Client::connect(connstr.as_str(), NoTls)?;
56 0 : let extensions: Vec<(String, String)> = db_client
57 0 : .query(
58 0 : "SELECT extname, extversion FROM pg_catalog.pg_extension;",
59 0 : &[],
60 0 : )?
61 0 : .iter()
62 0 : .map(|row| (row.get("extname"), row.get("extversion")))
63 0 : .collect();
64 :
65 0 : for (extname, v) in extensions.iter() {
66 0 : let version = v.to_string();
67 0 :
68 0 : // increment the number of databases where the version of extension is installed
69 0 : INSTALLED_EXTENSIONS
70 0 : .with_label_values(&[extname, &version])
71 0 : .inc();
72 0 :
73 0 : extensions_map
74 0 : .entry(extname.to_string())
75 0 : .and_modify(|e| {
76 0 : e.versions.insert(version.clone());
77 0 : // count the number of databases where the extension is installed
78 0 : e.n_databases += 1;
79 0 : })
80 0 : .or_insert(InstalledExtension {
81 0 : extname: extname.to_string(),
82 0 : versions: HashSet::from([version.clone()]),
83 0 : n_databases: 1,
84 0 : });
85 0 : }
86 : }
87 :
88 0 : let res = InstalledExtensions {
89 0 : extensions: extensions_map.values().cloned().collect(),
90 0 : };
91 0 :
92 0 : Ok(res)
93 0 : })
94 0 : .await?
95 0 : }
96 :
97 : // Gather info about installed extensions
98 0 : pub fn get_installed_extensions_sync(connstr: Url) -> Result<()> {
99 0 : let rt = tokio::runtime::Builder::new_current_thread()
100 0 : .enable_all()
101 0 : .build()
102 0 : .expect("failed to create runtime");
103 0 : let result = rt
104 0 : .block_on(crate::installed_extensions::get_installed_extensions(
105 0 : connstr,
106 0 : ))
107 0 : .expect("failed to get installed extensions");
108 0 :
109 0 : info!(
110 0 : "[NEON_EXT_STAT] {}",
111 0 : serde_json::to_string(&result).expect("failed to serialize extensions list")
112 : );
113 0 : Ok(())
114 0 : }
115 :
116 0 : static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
117 0 : register_uint_gauge_vec!(
118 0 : "compute_installed_extensions",
119 0 : "Number of databases where the version of extension is installed",
120 0 : &["extension_name", "version"]
121 0 : )
122 0 : .expect("failed to define a metric")
123 0 : });
124 :
125 0 : pub fn collect() -> Vec<MetricFamily> {
126 0 : INSTALLED_EXTENSIONS.collect()
127 0 : }
|