LCOV - code coverage report
Current view: top level - compute_tools/src - installed_extensions.rs (source / functions) Coverage Total Hit
Test: 8ff8efadb0253cf618c612650348666c0c564111.info Lines: 0.0 % 92 0
Test Date: 2024-11-20 17:53:50 Functions: 0.0 % 10 0

            Line data    Source code
       1              : use compute_api::responses::{InstalledExtension, InstalledExtensions};
       2              : use metrics::proto::MetricFamily;
       3              : use std::collections::HashMap;
       4              : use std::collections::HashSet;
       5              : use tracing::info;
       6              : use url::Url;
       7              : 
       8              : use anyhow::Result;
       9              : use postgres::{Client, NoTls};
      10              : use tokio::task;
      11              : 
      12              : use metrics::core::Collector;
      13              : use metrics::{register_uint_gauge_vec, UIntGaugeVec};
      14              : use once_cell::sync::Lazy;
      15              : 
      16              : /// We don't reuse get_existing_dbs() just for code clarity
      17              : /// and to make database listing query here more explicit.
      18              : ///
      19              : /// Limit the number of databases to 500 to avoid excessive load.
      20            0 : fn list_dbs(client: &mut Client) -> Result<Vec<String>> {
      21              :     // `pg_database.datconnlimit = -2` means that the database is in the
      22              :     // invalid state
      23            0 :     let databases = client
      24            0 :         .query(
      25            0 :             "SELECT datname FROM pg_catalog.pg_database
      26            0 :                 WHERE datallowconn
      27            0 :                 AND datconnlimit <> - 2
      28            0 :                 LIMIT 500",
      29            0 :             &[],
      30            0 :         )?
      31            0 :         .iter()
      32            0 :         .map(|row| {
      33            0 :             let db: String = row.get("datname");
      34            0 :             db
      35            0 :         })
      36            0 :         .collect();
      37            0 : 
      38            0 :     Ok(databases)
      39            0 : }
      40              : 
      41              : /// Connect to every database (see list_dbs above) and get the list of installed extensions.
      42              : ///
      43              : /// Same extension can be installed in multiple databases with different versions,
      44              : /// we only keep the highest and lowest version across all databases.
      45            0 : pub async fn get_installed_extensions(connstr: Url) -> Result<InstalledExtensions> {
      46            0 :     let mut connstr = connstr.clone();
      47            0 : 
      48            0 :     task::spawn_blocking(move || {
      49            0 :         let mut client = Client::connect(connstr.as_str(), NoTls)?;
      50            0 :         let databases: Vec<String> = list_dbs(&mut client)?;
      51              : 
      52            0 :         let mut extensions_map: HashMap<String, InstalledExtension> = HashMap::new();
      53            0 :         for db in databases.iter() {
      54            0 :             connstr.set_path(db);
      55            0 :             let mut db_client = Client::connect(connstr.as_str(), NoTls)?;
      56            0 :             let extensions: Vec<(String, String)> = db_client
      57            0 :                 .query(
      58            0 :                     "SELECT extname, extversion FROM pg_catalog.pg_extension;",
      59            0 :                     &[],
      60            0 :                 )?
      61            0 :                 .iter()
      62            0 :                 .map(|row| (row.get("extname"), row.get("extversion")))
      63            0 :                 .collect();
      64              : 
      65            0 :             for (extname, v) in extensions.iter() {
      66            0 :                 let version = v.to_string();
      67            0 : 
      68            0 :                 // increment the number of databases where the version of extension is installed
      69            0 :                 INSTALLED_EXTENSIONS
      70            0 :                     .with_label_values(&[extname, &version])
      71            0 :                     .inc();
      72            0 : 
      73            0 :                 extensions_map
      74            0 :                     .entry(extname.to_string())
      75            0 :                     .and_modify(|e| {
      76            0 :                         e.versions.insert(version.clone());
      77            0 :                         // count the number of databases where the extension is installed
      78            0 :                         e.n_databases += 1;
      79            0 :                     })
      80            0 :                     .or_insert(InstalledExtension {
      81            0 :                         extname: extname.to_string(),
      82            0 :                         versions: HashSet::from([version.clone()]),
      83            0 :                         n_databases: 1,
      84            0 :                     });
      85            0 :             }
      86              :         }
      87              : 
      88            0 :         let res = InstalledExtensions {
      89            0 :             extensions: extensions_map.values().cloned().collect(),
      90            0 :         };
      91            0 : 
      92            0 :         Ok(res)
      93            0 :     })
      94            0 :     .await?
      95            0 : }
      96              : 
      97              : // Gather info about installed extensions
      98            0 : pub fn get_installed_extensions_sync(connstr: Url) -> Result<()> {
      99            0 :     let rt = tokio::runtime::Builder::new_current_thread()
     100            0 :         .enable_all()
     101            0 :         .build()
     102            0 :         .expect("failed to create runtime");
     103            0 :     let result = rt
     104            0 :         .block_on(crate::installed_extensions::get_installed_extensions(
     105            0 :             connstr,
     106            0 :         ))
     107            0 :         .expect("failed to get installed extensions");
     108            0 : 
     109            0 :     info!(
     110            0 :         "[NEON_EXT_STAT] {}",
     111            0 :         serde_json::to_string(&result).expect("failed to serialize extensions list")
     112              :     );
     113            0 :     Ok(())
     114            0 : }
     115              : 
     116            0 : static INSTALLED_EXTENSIONS: Lazy<UIntGaugeVec> = Lazy::new(|| {
     117            0 :     register_uint_gauge_vec!(
     118            0 :         "installed_extensions",
     119            0 :         "Number of databases where the version of extension is installed",
     120            0 :         &["extension_name", "version"]
     121            0 :     )
     122            0 :     .expect("failed to define a metric")
     123            0 : });
     124              : 
     125            0 : pub fn collect() -> Vec<MetricFamily> {
     126            0 :     INSTALLED_EXTENSIONS.collect()
     127            0 : }
        

Generated by: LCOV version 2.1-beta