LCOV - code coverage report
Current view: top level - compute_tools/src - extension_server.rs (source / functions) Coverage Total Hit
Test: fcf55189004bd3119eed75e2873a97da8078700c.info Lines: 26.5 % 151 40
Test Date: 2024-06-25 12:07:31 Functions: 36.4 % 11 4

            Line data    Source code
       1              : // Download extension files from the extension store
       2              : // and put them in the right place in the postgres directory (share / lib)
       3              : /*
       4              : The layout of the S3 bucket is as follows:
       5              : 5615610098 // this is an extension build number
       6              : ├── v14
       7              : │   ├── extensions
       8              : │   │   ├── anon.tar.zst
       9              : │   │   └── embedding.tar.zst
      10              : │   └── ext_index.json
      11              : └── v15
      12              :     ├── extensions
      13              :     │   ├── anon.tar.zst
      14              :     │   └── embedding.tar.zst
      15              :     └── ext_index.json
      16              : 5615261079
      17              : ├── v14
      18              : │   ├── extensions
      19              : │   │   └── anon.tar.zst
      20              : │   └── ext_index.json
      21              : └── v15
      22              :     ├── extensions
      23              :     │   └── anon.tar.zst
      24              :     └── ext_index.json
      25              : 5623261088
      26              : ├── v14
      27              : │   ├── extensions
      28              : │   │   └── embedding.tar.zst
      29              : │   └── ext_index.json
      30              : └── v15
      31              :     ├── extensions
      32              :     │   └── embedding.tar.zst
      33              :     └── ext_index.json
      34              : 
      35              : Note that build number cannot be part of prefix because we might need extensions
      36              : from other build numbers.
      37              : 
      38              : ext_index.json stores the control files and location of extension archives
      39              : It also stores a list of public extensions and a library_index
      40              : 
      41              : We don't need to duplicate extension.tar.zst files.
      42              : We only need to upload a new one if it is updated.
      43              : (Although currently we just upload every time anyways, hopefully will change
      44              : this sometime)
      45              : 
      46              : *access* is controlled by spec
      47              : 
      48              : More specifically, here is an example ext_index.json
      49              : {
      50              :     "public_extensions": [
      51              :         "anon",
      52              :         "pg_buffercache"
      53              :     ],
      54              :     "library_index": {
      55              :         "anon": "anon",
      56              :         "pg_buffercache": "pg_buffercache"
      57              :     },
      58              :     "extension_data": {
      59              :         "pg_buffercache": {
      60              :             "control_data": {
      61              :                 "pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
      62              :             },
      63              :             "archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
      64              :         },
      65              :         "anon": {
      66              :             "control_data": {
      67              :                 "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
      68              :             },
      69              :             "archive_path": "5670669815/v14/extensions/anon.tar.zst"
      70              :         }
      71              :     }
      72              : }
      73              : */
      74              : use anyhow::Result;
      75              : use anyhow::{bail, Context};
      76              : use bytes::Bytes;
      77              : use compute_api::spec::RemoteExtSpec;
      78              : use regex::Regex;
      79              : use remote_storage::*;
      80              : use reqwest::StatusCode;
      81              : use std::path::Path;
      82              : use std::str;
      83              : use tar::Archive;
      84              : use tracing::info;
      85              : use tracing::log::warn;
      86              : use zstd::stream::read::Decoder;
      87              : 
      88            0 : fn get_pg_config(argument: &str, pgbin: &str) -> String {
      89            0 :     // gives the result of `pg_config [argument]`
      90            0 :     // where argument is a flag like `--version` or `--sharedir`
      91            0 :     let pgconfig = pgbin
      92            0 :         .strip_suffix("postgres")
      93            0 :         .expect("bad pgbin")
      94            0 :         .to_owned()
      95            0 :         + "/pg_config";
      96            0 :     let config_output = std::process::Command::new(pgconfig)
      97            0 :         .arg(argument)
      98            0 :         .output()
      99            0 :         .expect("pg_config error");
     100            0 :     std::str::from_utf8(&config_output.stdout)
     101            0 :         .expect("pg_config error")
     102            0 :         .trim()
     103            0 :         .to_string()
     104            0 : }
     105              : 
     106            0 : pub fn get_pg_version(pgbin: &str) -> String {
     107            0 :     // pg_config --version returns a (platform specific) human readable string
     108            0 :     // such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
     109            0 :     let human_version = get_pg_config("--version", pgbin);
     110            0 :     return parse_pg_version(&human_version).to_string();
     111            0 : }
     112              : 
     113           24 : fn parse_pg_version(human_version: &str) -> &str {
     114           24 :     // Normal releases have version strings like "PostgreSQL 15.4". But there
     115           24 :     // are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
     116           24 :     // 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
     117           24 :     // configure option, you can tack any string to the version number,
     118           24 :     // e.g. "PostgreSQL 15.4foobar".
     119           24 :     match Regex::new(r"^PostgreSQL (?<major>\d+).+")
     120           24 :         .unwrap()
     121           24 :         .captures(human_version)
     122              :     {
     123           24 :         Some(captures) if captures.len() == 2 => match &captures["major"] {
     124           24 :             "14" => return "v14",
     125           18 :             "15" => return "v15",
     126           12 :             "16" => return "v16",
     127            4 :             _ => {}
     128              :         },
     129            0 :         _ => {}
     130              :     }
     131            4 :     panic!("Unsuported postgres version {human_version}");
     132           20 : }
     133              : 
     134              : // download the archive for a given extension,
     135              : // unzip it, and place files in the appropriate locations (share/lib)
     136            0 : pub async fn download_extension(
     137            0 :     ext_name: &str,
     138            0 :     ext_path: &RemotePath,
     139            0 :     ext_remote_storage: &str,
     140            0 :     pgbin: &str,
     141            0 : ) -> Result<u64> {
     142            0 :     info!("Download extension {:?} from {:?}", ext_name, ext_path);
     143              : 
     144              :     // TODO add retry logic
     145            0 :     let download_buffer =
     146            0 :         match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await {
     147            0 :             Ok(buffer) => buffer,
     148            0 :             Err(error_message) => {
     149            0 :                 return Err(anyhow::anyhow!(
     150            0 :                     "error downloading extension {:?}: {:?}",
     151            0 :                     ext_name,
     152            0 :                     error_message
     153            0 :                 ));
     154              :             }
     155              :         };
     156              : 
     157            0 :     let download_size = download_buffer.len() as u64;
     158            0 :     info!("Download size {:?}", download_size);
     159              :     // it's unclear whether it is more performant to decompress into memory or not
     160              :     // TODO: decompressing into memory can be avoided
     161            0 :     let decoder = Decoder::new(download_buffer.as_ref())?;
     162            0 :     let mut archive = Archive::new(decoder);
     163            0 : 
     164            0 :     let unzip_dest = pgbin
     165            0 :         .strip_suffix("/bin/postgres")
     166            0 :         .expect("bad pgbin")
     167            0 :         .to_string()
     168            0 :         + "/download_extensions";
     169            0 :     archive.unpack(&unzip_dest)?;
     170            0 :     info!("Download + unzip {:?} completed successfully", &ext_path);
     171              : 
     172            0 :     let sharedir_paths = (
     173            0 :         unzip_dest.to_string() + "/share/extension",
     174            0 :         Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
     175            0 :     );
     176            0 :     let libdir_paths = (
     177            0 :         unzip_dest.to_string() + "/lib",
     178            0 :         Path::new(&get_pg_config("--pkglibdir", pgbin)).to_path_buf(),
     179            0 :     );
     180              :     // move contents of the libdir / sharedir in unzipped archive to the correct local paths
     181            0 :     for paths in [sharedir_paths, libdir_paths] {
     182            0 :         let (zip_dir, real_dir) = paths;
     183            0 :         info!("mv {zip_dir:?}/*  {real_dir:?}");
     184            0 :         for file in std::fs::read_dir(zip_dir)? {
     185            0 :             let old_file = file?.path();
     186            0 :             let new_file =
     187            0 :                 Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
     188            0 :             info!("moving {old_file:?} to {new_file:?}");
     189              : 
     190              :             // extension download failed: Directory not empty (os error 39)
     191            0 :             match std::fs::rename(old_file, new_file) {
     192            0 :                 Ok(()) => info!("move succeeded"),
     193            0 :                 Err(e) => {
     194            0 :                     warn!("move failed, probably because the extension already exists: {e}")
     195              :                 }
     196              :             }
     197              :         }
     198              :     }
     199            0 :     info!("done moving extension {ext_name}");
     200            0 :     Ok(download_size)
     201            0 : }
     202              : 
     203              : // Create extension control files from spec
     204            0 : pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
     205            0 :     let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
     206            0 :     for (ext_name, ext_data) in remote_extensions.extension_data.iter() {
     207              :         // Check if extension is present in public or custom.
     208              :         // If not, then it is not allowed to be used by this compute.
     209            0 :         if let Some(public_extensions) = &remote_extensions.public_extensions {
     210            0 :             if !public_extensions.contains(ext_name) {
     211            0 :                 if let Some(custom_extensions) = &remote_extensions.custom_extensions {
     212            0 :                     if !custom_extensions.contains(ext_name) {
     213            0 :                         continue; // skip this extension, it is not allowed
     214            0 :                     }
     215            0 :                 }
     216            0 :             }
     217            0 :         }
     218              : 
     219            0 :         for (control_name, control_content) in &ext_data.control_data {
     220            0 :             let control_path = local_sharedir.join(control_name);
     221            0 :             if !control_path.exists() {
     222            0 :                 info!("writing file {:?}{:?}", control_path, control_content);
     223            0 :                 std::fs::write(control_path, control_content).unwrap();
     224              :             } else {
     225            0 :                 warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_path);
     226              :             }
     227              :         }
     228              :     }
     229            0 : }
     230              : 
     231              : // Do request to extension storage proxy, i.e.
     232              : // curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
     233              : // using HHTP GET
     234              : // and return the response body as bytes
     235              : //
     236            0 : async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
     237            0 :     let uri = format!("{}/{}", ext_remote_storage, ext_path);
     238            0 : 
     239            0 :     info!("Download extension {:?} from uri {:?}", ext_path, uri);
     240              : 
     241            0 :     let resp = reqwest::get(uri).await?;
     242              : 
     243            0 :     match resp.status() {
     244            0 :         StatusCode::OK => match resp.bytes().await {
     245            0 :             Ok(resp) => {
     246            0 :                 info!("Download extension {:?} completed successfully", ext_path);
     247            0 :                 Ok(resp)
     248              :             }
     249            0 :             Err(e) => bail!("could not deserialize remote extension response: {}", e),
     250              :         },
     251            0 :         StatusCode::SERVICE_UNAVAILABLE => bail!("remote extension is temporarily unavailable"),
     252            0 :         _ => bail!(
     253            0 :             "unexpected remote extension response status code: {}",
     254            0 :             resp.status()
     255            0 :         ),
     256              :     }
     257            0 : }
     258              : 
     259              : #[cfg(test)]
     260              : mod tests {
     261              :     use super::parse_pg_version;
     262              : 
     263              :     #[test]
     264            2 :     fn test_parse_pg_version() {
     265            2 :         assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15");
     266            2 :         assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15");
     267            2 :         assert_eq!(
     268            2 :             parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
     269            2 :             "v15"
     270            2 :         );
     271              : 
     272            2 :         assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14");
     273            2 :         assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14");
     274            2 :         assert_eq!(
     275            2 :             parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
     276            2 :             "v14"
     277            2 :         );
     278              : 
     279            2 :         assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16");
     280            2 :         assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16");
     281            2 :         assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16");
     282            2 :         assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16");
     283            2 :     }
     284              : 
     285              :     #[test]
     286              :     #[should_panic]
     287            2 :     fn test_parse_pg_unsupported_version() {
     288            2 :         parse_pg_version("PostgreSQL 13.14");
     289            2 :     }
     290              : 
     291              :     #[test]
     292              :     #[should_panic]
     293            2 :     fn test_parse_pg_incorrect_version_format() {
     294            2 :         parse_pg_version("PostgreSQL 14");
     295            2 :     }
     296              : }
        

Generated by: LCOV version 2.1-beta