LCOV - code coverage report
Current view: top level - compute_tools/src - extension_server.rs (source / functions) Coverage Total Hit
Test: 4f58e98c51285c7fa348e0b410c88a10caf68ad2.info Lines: 23.2 % 155 36
Test Date: 2025-01-07 20:58:07 Functions: 33.3 % 12 4

            Line data    Source code
       1              : // Download extension files from the extension store
       2              : // and put them in the right place in the postgres directory (share / lib)
       3              : /*
       4              : The layout of the S3 bucket is as follows:
       5              : 5615610098 // this is an extension build number
       6              : ├── v14
       7              : │   ├── extensions
       8              : │   │   ├── anon.tar.zst
       9              : │   │   └── embedding.tar.zst
      10              : │   └── ext_index.json
      11              : └── v15
      12              :     ├── extensions
      13              :     │   ├── anon.tar.zst
      14              :     │   └── embedding.tar.zst
      15              :     └── ext_index.json
      16              : 5615261079
      17              : ├── v14
      18              : │   ├── extensions
      19              : │   │   └── anon.tar.zst
      20              : │   └── ext_index.json
      21              : └── v15
      22              :     ├── extensions
      23              :     │   └── anon.tar.zst
      24              :     └── ext_index.json
      25              : 5623261088
      26              : ├── v14
      27              : │   ├── extensions
      28              : │   │   └── embedding.tar.zst
      29              : │   └── ext_index.json
      30              : └── v15
      31              :     ├── extensions
      32              :     │   └── embedding.tar.zst
      33              :     └── ext_index.json
      34              : 
      35              : Note that build number cannot be part of prefix because we might need extensions
      36              : from other build numbers.
      37              : 
      38              : ext_index.json stores the control files and location of extension archives
      39              : It also stores a list of public extensions and a library_index
      40              : 
      41              : We don't need to duplicate extension.tar.zst files.
      42              : We only need to upload a new one if it is updated.
      43              : (Although currently we just upload every time anyways, hopefully will change
      44              : this sometime)
      45              : 
      46              : *access* is controlled by spec
      47              : 
      48              : More specifically, here is an example ext_index.json
      49              : {
      50              :     "public_extensions": [
      51              :         "anon",
      52              :         "pg_buffercache"
      53              :     ],
      54              :     "library_index": {
      55              :         "anon": "anon",
      56              :         "pg_buffercache": "pg_buffercache"
      57              :     },
      58              :     "extension_data": {
      59              :         "pg_buffercache": {
      60              :             "control_data": {
      61              :                 "pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
      62              :             },
      63              :             "archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
      64              :         },
      65              :         "anon": {
      66              :             "control_data": {
      67              :                 "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
      68              :             },
      69              :             "archive_path": "5670669815/v14/extensions/anon.tar.zst"
      70              :         }
      71              :     }
      72              : }
      73              : */
      74              : use anyhow::Result;
      75              : use anyhow::{bail, Context};
      76              : use bytes::Bytes;
      77              : use compute_api::spec::RemoteExtSpec;
      78              : use regex::Regex;
      79              : use remote_storage::*;
      80              : use reqwest::StatusCode;
      81              : use std::path::Path;
      82              : use std::str;
      83              : use tar::Archive;
      84              : use tracing::info;
      85              : use tracing::log::warn;
      86              : use zstd::stream::read::Decoder;
      87              : 
      88            0 : fn get_pg_config(argument: &str, pgbin: &str) -> String {
      89            0 :     // gives the result of `pg_config [argument]`
      90            0 :     // where argument is a flag like `--version` or `--sharedir`
      91            0 :     let pgconfig = pgbin
      92            0 :         .strip_suffix("postgres")
      93            0 :         .expect("bad pgbin")
      94            0 :         .to_owned()
      95            0 :         + "/pg_config";
      96            0 :     let config_output = std::process::Command::new(pgconfig)
      97            0 :         .arg(argument)
      98            0 :         .output()
      99            0 :         .expect("pg_config error");
     100            0 :     std::str::from_utf8(&config_output.stdout)
     101            0 :         .expect("pg_config error")
     102            0 :         .trim()
     103            0 :         .to_string()
     104            0 : }
     105              : 
     106            0 : pub fn get_pg_version(pgbin: &str) -> PostgresMajorVersion {
     107            0 :     // pg_config --version returns a (platform specific) human readable string
     108            0 :     // such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
     109            0 :     let human_version = get_pg_config("--version", pgbin);
     110            0 :     parse_pg_version(&human_version)
     111            0 : }
     112              : 
     113            0 : pub fn get_pg_version_string(pgbin: &str) -> String {
     114            0 :     match get_pg_version(pgbin) {
     115            0 :         PostgresMajorVersion::V14 => "v14",
     116            0 :         PostgresMajorVersion::V15 => "v15",
     117            0 :         PostgresMajorVersion::V16 => "v16",
     118            0 :         PostgresMajorVersion::V17 => "v17",
     119              :     }
     120            0 :     .to_owned()
     121            0 : }
     122              : 
     123              : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
     124              : pub enum PostgresMajorVersion {
     125              :     V14,
     126              :     V15,
     127              :     V16,
     128              :     V17,
     129              : }
     130              : 
     131           12 : fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
     132              :     use PostgresMajorVersion::*;
     133              :     // Normal releases have version strings like "PostgreSQL 15.4". But there
     134              :     // are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
     135              :     // 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
     136              :     // configure option, you can tack any string to the version number,
     137              :     // e.g. "PostgreSQL 15.4foobar".
     138           12 :     match Regex::new(r"^PostgreSQL (?<major>\d+).+")
     139           12 :         .unwrap()
     140           12 :         .captures(human_version)
     141              :     {
     142           12 :         Some(captures) if captures.len() == 2 => match &captures["major"] {
     143           12 :             "14" => return V14,
     144            9 :             "15" => return V15,
     145            6 :             "16" => return V16,
     146            2 :             "17" => return V17,
     147            2 :             _ => {}
     148              :         },
     149            0 :         _ => {}
     150              :     }
     151            2 :     panic!("Unsuported postgres version {human_version}");
     152           10 : }
     153              : 
     154              : // download the archive for a given extension,
     155              : // unzip it, and place files in the appropriate locations (share/lib)
     156            0 : pub async fn download_extension(
     157            0 :     ext_name: &str,
     158            0 :     ext_path: &RemotePath,
     159            0 :     ext_remote_storage: &str,
     160            0 :     pgbin: &str,
     161            0 : ) -> Result<u64> {
     162            0 :     info!("Download extension {:?} from {:?}", ext_name, ext_path);
     163              : 
     164              :     // TODO add retry logic
     165            0 :     let download_buffer =
     166            0 :         match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await {
     167            0 :             Ok(buffer) => buffer,
     168            0 :             Err(error_message) => {
     169            0 :                 return Err(anyhow::anyhow!(
     170            0 :                     "error downloading extension {:?}: {:?}",
     171            0 :                     ext_name,
     172            0 :                     error_message
     173            0 :                 ));
     174              :             }
     175              :         };
     176              : 
     177            0 :     let download_size = download_buffer.len() as u64;
     178            0 :     info!("Download size {:?}", download_size);
     179              :     // it's unclear whether it is more performant to decompress into memory or not
     180              :     // TODO: decompressing into memory can be avoided
     181            0 :     let decoder = Decoder::new(download_buffer.as_ref())?;
     182            0 :     let mut archive = Archive::new(decoder);
     183            0 : 
     184            0 :     let unzip_dest = pgbin
     185            0 :         .strip_suffix("/bin/postgres")
     186            0 :         .expect("bad pgbin")
     187            0 :         .to_string()
     188            0 :         + "/download_extensions";
     189            0 :     archive.unpack(&unzip_dest)?;
     190            0 :     info!("Download + unzip {:?} completed successfully", &ext_path);
     191              : 
     192            0 :     let sharedir_paths = (
     193            0 :         unzip_dest.to_string() + "/share/extension",
     194            0 :         Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
     195            0 :     );
     196            0 :     let libdir_paths = (
     197            0 :         unzip_dest.to_string() + "/lib",
     198            0 :         Path::new(&get_pg_config("--pkglibdir", pgbin)).to_path_buf(),
     199            0 :     );
     200              :     // move contents of the libdir / sharedir in unzipped archive to the correct local paths
     201            0 :     for paths in [sharedir_paths, libdir_paths] {
     202            0 :         let (zip_dir, real_dir) = paths;
     203            0 :         info!("mv {zip_dir:?}/*  {real_dir:?}");
     204            0 :         for file in std::fs::read_dir(zip_dir)? {
     205            0 :             let old_file = file?.path();
     206            0 :             let new_file =
     207            0 :                 Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
     208            0 :             info!("moving {old_file:?} to {new_file:?}");
     209              : 
     210              :             // extension download failed: Directory not empty (os error 39)
     211            0 :             match std::fs::rename(old_file, new_file) {
     212            0 :                 Ok(()) => info!("move succeeded"),
     213            0 :                 Err(e) => {
     214            0 :                     warn!("move failed, probably because the extension already exists: {e}")
     215              :                 }
     216              :             }
     217              :         }
     218              :     }
     219            0 :     info!("done moving extension {ext_name}");
     220            0 :     Ok(download_size)
     221            0 : }
     222              : 
     223              : // Create extension control files from spec
     224            0 : pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
     225            0 :     let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
     226            0 :     for (ext_name, ext_data) in remote_extensions.extension_data.iter() {
     227              :         // Check if extension is present in public or custom.
     228              :         // If not, then it is not allowed to be used by this compute.
     229            0 :         if let Some(public_extensions) = &remote_extensions.public_extensions {
     230            0 :             if !public_extensions.contains(ext_name) {
     231            0 :                 if let Some(custom_extensions) = &remote_extensions.custom_extensions {
     232            0 :                     if !custom_extensions.contains(ext_name) {
     233            0 :                         continue; // skip this extension, it is not allowed
     234            0 :                     }
     235            0 :                 }
     236            0 :             }
     237            0 :         }
     238              : 
     239            0 :         for (control_name, control_content) in &ext_data.control_data {
     240            0 :             let control_path = local_sharedir.join(control_name);
     241            0 :             if !control_path.exists() {
     242            0 :                 info!("writing file {:?}{:?}", control_path, control_content);
     243            0 :                 std::fs::write(control_path, control_content).unwrap();
     244              :             } else {
     245            0 :                 warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_path);
     246              :             }
     247              :         }
     248              :     }
     249            0 : }
     250              : 
     251              : // Do request to extension storage proxy, i.e.
     252              : // curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
     253              : // using HHTP GET
     254              : // and return the response body as bytes
     255              : //
     256            0 : async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
     257            0 :     let uri = format!("{}/{}", ext_remote_storage, ext_path);
     258            0 : 
     259            0 :     info!("Download extension {:?} from uri {:?}", ext_path, uri);
     260              : 
     261            0 :     let resp = reqwest::get(uri).await?;
     262              : 
     263            0 :     match resp.status() {
     264            0 :         StatusCode::OK => match resp.bytes().await {
     265            0 :             Ok(resp) => {
     266            0 :                 info!("Download extension {:?} completed successfully", ext_path);
     267            0 :                 Ok(resp)
     268              :             }
     269            0 :             Err(e) => bail!("could not deserialize remote extension response: {}", e),
     270              :         },
     271            0 :         StatusCode::SERVICE_UNAVAILABLE => bail!("remote extension is temporarily unavailable"),
     272            0 :         _ => bail!(
     273            0 :             "unexpected remote extension response status code: {}",
     274            0 :             resp.status()
     275            0 :         ),
     276              :     }
     277            0 : }
     278              : 
     279              : #[cfg(test)]
     280              : mod tests {
     281              :     use super::parse_pg_version;
     282              : 
     283              :     #[test]
     284            1 :     fn test_parse_pg_version() {
     285              :         use super::PostgresMajorVersion::*;
     286            1 :         assert_eq!(parse_pg_version("PostgreSQL 15.4"), V15);
     287            1 :         assert_eq!(parse_pg_version("PostgreSQL 15.14"), V15);
     288            1 :         assert_eq!(
     289            1 :             parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
     290            1 :             V15
     291            1 :         );
     292              : 
     293            1 :         assert_eq!(parse_pg_version("PostgreSQL 14.15"), V14);
     294            1 :         assert_eq!(parse_pg_version("PostgreSQL 14.0"), V14);
     295            1 :         assert_eq!(
     296            1 :             parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
     297            1 :             V14
     298            1 :         );
     299              : 
     300            1 :         assert_eq!(parse_pg_version("PostgreSQL 16devel"), V16);
     301            1 :         assert_eq!(parse_pg_version("PostgreSQL 16beta1"), V16);
     302            1 :         assert_eq!(parse_pg_version("PostgreSQL 16rc2"), V16);
     303            1 :         assert_eq!(parse_pg_version("PostgreSQL 16extra"), V16);
     304            1 :     }
     305              : 
     306              :     #[test]
     307              :     #[should_panic]
     308            1 :     fn test_parse_pg_unsupported_version() {
     309            1 :         parse_pg_version("PostgreSQL 13.14");
     310            1 :     }
     311              : 
     312              :     #[test]
     313              :     #[should_panic]
     314            1 :     fn test_parse_pg_incorrect_version_format() {
     315            1 :         parse_pg_version("PostgreSQL 14");
     316            1 :     }
     317              : }
        

Generated by: LCOV version 2.1-beta