LCOV - code coverage report
Current view: top level - compute_tools/src - extension_server.rs (source / functions) Coverage Total Hit
Test: 8b13a09a5c233d98abd4a0d3e59157e7db16d6fd.info Lines: 27.0 % 152 41
Test Date: 2024-11-21 10:53:51 Functions: 36.4 % 11 4

            Line data    Source code
       1              : // Download extension files from the extension store
       2              : // and put them in the right place in the postgres directory (share / lib)
       3              : /*
       4              : The layout of the S3 bucket is as follows:
       5              : 5615610098 // this is an extension build number
       6              : ├── v14
       7              : │   ├── extensions
       8              : │   │   ├── anon.tar.zst
       9              : │   │   └── embedding.tar.zst
      10              : │   └── ext_index.json
      11              : └── v15
      12              :     ├── extensions
      13              :     │   ├── anon.tar.zst
      14              :     │   └── embedding.tar.zst
      15              :     └── ext_index.json
      16              : 5615261079
      17              : ├── v14
      18              : │   ├── extensions
      19              : │   │   └── anon.tar.zst
      20              : │   └── ext_index.json
      21              : └── v15
      22              :     ├── extensions
      23              :     │   └── anon.tar.zst
      24              :     └── ext_index.json
      25              : 5623261088
      26              : ├── v14
      27              : │   ├── extensions
      28              : │   │   └── embedding.tar.zst
      29              : │   └── ext_index.json
      30              : └── v15
      31              :     ├── extensions
      32              :     │   └── embedding.tar.zst
      33              :     └── ext_index.json
      34              : 
      35              : Note that build number cannot be part of prefix because we might need extensions
      36              : from other build numbers.
      37              : 
      38              : ext_index.json stores the control files and location of extension archives
      39              : It also stores a list of public extensions and a library_index
      40              : 
      41              : We don't need to duplicate extension.tar.zst files.
      42              : We only need to upload a new one if it is updated.
      43              : (Although currently we just upload every time anyways, hopefully will change
      44              : this sometime)
      45              : 
      46              : *access* is controlled by spec
      47              : 
      48              : More specifically, here is an example ext_index.json
      49              : {
      50              :     "public_extensions": [
      51              :         "anon",
      52              :         "pg_buffercache"
      53              :     ],
      54              :     "library_index": {
      55              :         "anon": "anon",
      56              :         "pg_buffercache": "pg_buffercache"
      57              :     },
      58              :     "extension_data": {
      59              :         "pg_buffercache": {
      60              :             "control_data": {
      61              :                 "pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
      62              :             },
      63              :             "archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
      64              :         },
      65              :         "anon": {
      66              :             "control_data": {
      67              :                 "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
      68              :             },
      69              :             "archive_path": "5670669815/v14/extensions/anon.tar.zst"
      70              :         }
      71              :     }
      72              : }
      73              : */
      74              : use anyhow::Result;
      75              : use anyhow::{bail, Context};
      76              : use bytes::Bytes;
      77              : use compute_api::spec::RemoteExtSpec;
      78              : use regex::Regex;
      79              : use remote_storage::*;
      80              : use reqwest::StatusCode;
      81              : use std::path::Path;
      82              : use std::str;
      83              : use tar::Archive;
      84              : use tracing::info;
      85              : use tracing::log::warn;
      86              : use zstd::stream::read::Decoder;
      87              : 
      88            0 : fn get_pg_config(argument: &str, pgbin: &str) -> String {
      89            0 :     // gives the result of `pg_config [argument]`
      90            0 :     // where argument is a flag like `--version` or `--sharedir`
      91            0 :     let pgconfig = pgbin
      92            0 :         .strip_suffix("postgres")
      93            0 :         .expect("bad pgbin")
      94            0 :         .to_owned()
      95            0 :         + "/pg_config";
      96            0 :     let config_output = std::process::Command::new(pgconfig)
      97            0 :         .arg(argument)
      98            0 :         .output()
      99            0 :         .expect("pg_config error");
     100            0 :     std::str::from_utf8(&config_output.stdout)
     101            0 :         .expect("pg_config error")
     102            0 :         .trim()
     103            0 :         .to_string()
     104            0 : }
     105              : 
     106            0 : pub fn get_pg_version(pgbin: &str) -> String {
     107            0 :     // pg_config --version returns a (platform specific) human readable string
     108            0 :     // such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
     109            0 :     let human_version = get_pg_config("--version", pgbin);
     110            0 :     parse_pg_version(&human_version).to_string()
     111            0 : }
     112              : 
     113           12 : fn parse_pg_version(human_version: &str) -> &str {
     114           12 :     // Normal releases have version strings like "PostgreSQL 15.4". But there
     115           12 :     // are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
     116           12 :     // 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
     117           12 :     // configure option, you can tack any string to the version number,
     118           12 :     // e.g. "PostgreSQL 15.4foobar".
     119           12 :     match Regex::new(r"^PostgreSQL (?<major>\d+).+")
     120           12 :         .unwrap()
     121           12 :         .captures(human_version)
     122              :     {
     123           12 :         Some(captures) if captures.len() == 2 => match &captures["major"] {
     124           12 :             "14" => return "v14",
     125            9 :             "15" => return "v15",
     126            6 :             "16" => return "v16",
     127            2 :             "17" => return "v17",
     128            2 :             _ => {}
     129              :         },
     130            0 :         _ => {}
     131              :     }
     132            2 :     panic!("Unsuported postgres version {human_version}");
     133           10 : }
     134              : 
     135              : // download the archive for a given extension,
     136              : // unzip it, and place files in the appropriate locations (share/lib)
     137            0 : pub async fn download_extension(
     138            0 :     ext_name: &str,
     139            0 :     ext_path: &RemotePath,
     140            0 :     ext_remote_storage: &str,
     141            0 :     pgbin: &str,
     142            0 : ) -> Result<u64> {
     143            0 :     info!("Download extension {:?} from {:?}", ext_name, ext_path);
     144              : 
     145              :     // TODO add retry logic
     146            0 :     let download_buffer =
     147            0 :         match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await {
     148            0 :             Ok(buffer) => buffer,
     149            0 :             Err(error_message) => {
     150            0 :                 return Err(anyhow::anyhow!(
     151            0 :                     "error downloading extension {:?}: {:?}",
     152            0 :                     ext_name,
     153            0 :                     error_message
     154            0 :                 ));
     155              :             }
     156              :         };
     157              : 
     158            0 :     let download_size = download_buffer.len() as u64;
     159            0 :     info!("Download size {:?}", download_size);
     160              :     // it's unclear whether it is more performant to decompress into memory or not
     161              :     // TODO: decompressing into memory can be avoided
     162            0 :     let decoder = Decoder::new(download_buffer.as_ref())?;
     163            0 :     let mut archive = Archive::new(decoder);
     164            0 : 
     165            0 :     let unzip_dest = pgbin
     166            0 :         .strip_suffix("/bin/postgres")
     167            0 :         .expect("bad pgbin")
     168            0 :         .to_string()
     169            0 :         + "/download_extensions";
     170            0 :     archive.unpack(&unzip_dest)?;
     171            0 :     info!("Download + unzip {:?} completed successfully", &ext_path);
     172              : 
     173            0 :     let sharedir_paths = (
     174            0 :         unzip_dest.to_string() + "/share/extension",
     175            0 :         Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
     176            0 :     );
     177            0 :     let libdir_paths = (
     178            0 :         unzip_dest.to_string() + "/lib",
     179            0 :         Path::new(&get_pg_config("--pkglibdir", pgbin)).to_path_buf(),
     180            0 :     );
     181              :     // move contents of the libdir / sharedir in unzipped archive to the correct local paths
     182            0 :     for paths in [sharedir_paths, libdir_paths] {
     183            0 :         let (zip_dir, real_dir) = paths;
     184            0 :         info!("mv {zip_dir:?}/*  {real_dir:?}");
     185            0 :         for file in std::fs::read_dir(zip_dir)? {
     186            0 :             let old_file = file?.path();
     187            0 :             let new_file =
     188            0 :                 Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
     189            0 :             info!("moving {old_file:?} to {new_file:?}");
     190              : 
     191              :             // extension download failed: Directory not empty (os error 39)
     192            0 :             match std::fs::rename(old_file, new_file) {
     193            0 :                 Ok(()) => info!("move succeeded"),
     194            0 :                 Err(e) => {
     195            0 :                     warn!("move failed, probably because the extension already exists: {e}")
     196              :                 }
     197              :             }
     198              :         }
     199              :     }
     200            0 :     info!("done moving extension {ext_name}");
     201            0 :     Ok(download_size)
     202            0 : }
     203              : 
     204              : // Create extension control files from spec
     205            0 : pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
     206            0 :     let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
     207            0 :     for (ext_name, ext_data) in remote_extensions.extension_data.iter() {
     208              :         // Check if extension is present in public or custom.
     209              :         // If not, then it is not allowed to be used by this compute.
     210            0 :         if let Some(public_extensions) = &remote_extensions.public_extensions {
     211            0 :             if !public_extensions.contains(ext_name) {
     212            0 :                 if let Some(custom_extensions) = &remote_extensions.custom_extensions {
     213            0 :                     if !custom_extensions.contains(ext_name) {
     214            0 :                         continue; // skip this extension, it is not allowed
     215            0 :                     }
     216            0 :                 }
     217            0 :             }
     218            0 :         }
     219              : 
     220            0 :         for (control_name, control_content) in &ext_data.control_data {
     221            0 :             let control_path = local_sharedir.join(control_name);
     222            0 :             if !control_path.exists() {
     223            0 :                 info!("writing file {:?}{:?}", control_path, control_content);
     224            0 :                 std::fs::write(control_path, control_content).unwrap();
     225              :             } else {
     226            0 :                 warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_path);
     227              :             }
     228              :         }
     229              :     }
     230            0 : }
     231              : 
     232              : // Do request to extension storage proxy, i.e.
     233              : // curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
     234              : // using HHTP GET
     235              : // and return the response body as bytes
     236              : //
     237            0 : async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
     238            0 :     let uri = format!("{}/{}", ext_remote_storage, ext_path);
     239            0 : 
     240            0 :     info!("Download extension {:?} from uri {:?}", ext_path, uri);
     241              : 
     242            0 :     let resp = reqwest::get(uri).await?;
     243              : 
     244            0 :     match resp.status() {
     245            0 :         StatusCode::OK => match resp.bytes().await {
     246            0 :             Ok(resp) => {
     247            0 :                 info!("Download extension {:?} completed successfully", ext_path);
     248            0 :                 Ok(resp)
     249              :             }
     250            0 :             Err(e) => bail!("could not deserialize remote extension response: {}", e),
     251              :         },
     252            0 :         StatusCode::SERVICE_UNAVAILABLE => bail!("remote extension is temporarily unavailable"),
     253            0 :         _ => bail!(
     254            0 :             "unexpected remote extension response status code: {}",
     255            0 :             resp.status()
     256            0 :         ),
     257              :     }
     258            0 : }
     259              : 
     260              : #[cfg(test)]
     261              : mod tests {
     262              :     use super::parse_pg_version;
     263              : 
     264              :     #[test]
     265            1 :     fn test_parse_pg_version() {
     266            1 :         assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15");
     267            1 :         assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15");
     268            1 :         assert_eq!(
     269            1 :             parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
     270            1 :             "v15"
     271            1 :         );
     272              : 
     273            1 :         assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14");
     274            1 :         assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14");
     275            1 :         assert_eq!(
     276            1 :             parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
     277            1 :             "v14"
     278            1 :         );
     279              : 
     280            1 :         assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16");
     281            1 :         assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16");
     282            1 :         assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16");
     283            1 :         assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16");
     284            1 :     }
     285              : 
     286              :     #[test]
     287              :     #[should_panic]
     288            1 :     fn test_parse_pg_unsupported_version() {
     289            1 :         parse_pg_version("PostgreSQL 13.14");
     290            1 :     }
     291              : 
     292              :     #[test]
     293              :     #[should_panic]
     294            1 :     fn test_parse_pg_incorrect_version_format() {
     295            1 :         parse_pg_version("PostgreSQL 14");
     296            1 :     }
     297              : }
        

Generated by: LCOV version 2.1-beta