LCOV - code coverage report
Current view: top level - compute_tools/src - extension_server.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 17.6 % 182 32
Test Date: 2025-07-16 12:29:03 Functions: 23.5 % 17 4

            Line data    Source code
       1              : // Download extension files from the extension store
       2              : // and put them in the right place in the postgres directory (share / lib)
       3              : /*
       4              : The layout of the S3 bucket is as follows:
       5              : 5615610098 // this is an extension build number
       6              : ├── v14
       7              : │   ├── extensions
       8              : │   │   ├── anon.tar.zst
       9              : │   │   └── embedding.tar.zst
      10              : │   └── ext_index.json
      11              : └── v15
      12              :     ├── extensions
      13              :     │   ├── anon.tar.zst
      14              :     │   └── embedding.tar.zst
      15              :     └── ext_index.json
      16              : 5615261079
      17              : ├── v14
      18              : │   ├── extensions
      19              : │   │   └── anon.tar.zst
      20              : │   └── ext_index.json
      21              : └── v15
      22              :     ├── extensions
      23              :     │   └── anon.tar.zst
      24              :     └── ext_index.json
      25              : 5623261088
      26              : ├── v14
      27              : │   ├── extensions
      28              : │   │   └── embedding.tar.zst
      29              : │   └── ext_index.json
      30              : └── v15
      31              :     ├── extensions
      32              :     │   └── embedding.tar.zst
      33              :     └── ext_index.json
      34              : 
      35              : Note that build number cannot be part of prefix because we might need extensions
      36              : from other build numbers.
      37              : 
      38              : ext_index.json stores the control files and location of extension archives
      39              : It also stores a list of public extensions and a library_index
      40              : 
      41              : We don't need to duplicate extension.tar.zst files.
      42              : We only need to upload a new one if it is updated.
      43              : (Although currently we just upload every time anyways, hopefully will change
      44              : this sometime)
      45              : 
      46              : *access* is controlled by spec
      47              : 
      48              : More specifically, here is an example ext_index.json
      49              : {
      50              :     "public_extensions": [
      51              :         "anon",
      52              :         "pg_buffercache"
      53              :     ],
      54              :     "library_index": {
      55              :         "anon": "anon",
      56              :         "pg_buffercache": "pg_buffercache"
      57              :     },
      58              :     "extension_data": {
      59              :         "pg_buffercache": {
      60              :             "control_data": {
      61              :                 "pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
      62              :             },
      63              :             "archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
      64              :         },
      65              :         "anon": {
      66              :             "control_data": {
      67              :                 "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
      68              :             },
      69              :             "archive_path": "5670669815/v14/extensions/anon.tar.zst"
      70              :         }
      71              :     }
      72              : }
      73              : */
      74              : use std::path::Path;
      75              : use std::str;
      76              : 
      77              : use crate::metrics::{REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
      78              : use anyhow::{Context, Result, bail};
      79              : use bytes::Bytes;
      80              : use compute_api::spec::RemoteExtSpec;
      81              : use postgres_versioninfo::PgMajorVersion;
      82              : use regex::Regex;
      83              : use remote_storage::*;
      84              : use reqwest::StatusCode;
      85              : use tar::Archive;
      86              : use tracing::info;
      87              : use tracing::log::warn;
      88              : use url::Url;
      89              : use zstd::stream::read::Decoder;
      90              : 
      91            0 : fn get_pg_config(argument: &str, pgbin: &str) -> String {
      92              :     // gives the result of `pg_config [argument]`
      93              :     // where argument is a flag like `--version` or `--sharedir`
      94            0 :     let pgconfig = pgbin
      95            0 :         .strip_suffix("postgres")
      96            0 :         .expect("bad pgbin")
      97            0 :         .to_owned()
      98            0 :         + "/pg_config";
      99            0 :     let config_output = std::process::Command::new(pgconfig)
     100            0 :         .arg(argument)
     101            0 :         .output()
     102            0 :         .expect("pg_config error");
     103            0 :     std::str::from_utf8(&config_output.stdout)
     104            0 :         .expect("pg_config error")
     105            0 :         .trim()
     106            0 :         .to_string()
     107            0 : }
     108              : 
     109            0 : pub fn get_pg_version(pgbin: &str) -> PgMajorVersion {
     110              :     // pg_config --version returns a (platform specific) human readable string
     111              :     // such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
     112            0 :     let human_version = get_pg_config("--version", pgbin);
     113            0 :     parse_pg_version(&human_version)
     114            0 : }
     115              : 
     116            0 : pub fn get_pg_version_string(pgbin: &str) -> String {
     117            0 :     get_pg_version(pgbin).v_str()
     118            0 : }
     119              : 
     120           12 : fn parse_pg_version(human_version: &str) -> PgMajorVersion {
     121              :     use PgMajorVersion::*;
     122              :     // Normal releases have version strings like "PostgreSQL 15.4". But there
     123              :     // are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
     124              :     // 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
     125              :     // configure option, you can tack any string to the version number,
     126              :     // e.g. "PostgreSQL 15.4foobar".
     127           12 :     match Regex::new(r"^PostgreSQL (?<major>\d+).+")
     128           12 :         .unwrap()
     129           12 :         .captures(human_version)
     130              :     {
     131           12 :         Some(captures) if captures.len() == 2 => match &captures["major"] {
     132           12 :             "14" => return PG14,
     133            9 :             "15" => return PG15,
     134            6 :             "16" => return PG16,
     135            2 :             "17" => return PG17,
     136            2 :             _ => {}
     137              :         },
     138            0 :         _ => {}
     139              :     }
     140            2 :     panic!("Unsuported postgres version {human_version}");
     141           10 : }
     142              : 
     143              : // download the archive for a given extension,
     144              : // unzip it, and place files in the appropriate locations (share/lib)
     145            0 : pub async fn download_extension(
     146            0 :     ext_name: &str,
     147            0 :     ext_path: &RemotePath,
     148            0 :     remote_ext_base_url: &Url,
     149            0 :     pgbin: &str,
     150            0 : ) -> Result<u64> {
     151            0 :     info!("Download extension {:?} from {:?}", ext_name, ext_path);
     152              : 
     153              :     // TODO add retry logic
     154            0 :     let download_buffer =
     155            0 :         match download_extension_tar(remote_ext_base_url, &ext_path.to_string()).await {
     156            0 :             Ok(buffer) => buffer,
     157            0 :             Err(error_message) => {
     158            0 :                 return Err(anyhow::anyhow!(
     159            0 :                     "error downloading extension {:?}: {:?}",
     160            0 :                     ext_name,
     161            0 :                     error_message
     162            0 :                 ));
     163              :             }
     164              :         };
     165              : 
     166            0 :     let download_size = download_buffer.len() as u64;
     167            0 :     info!("Download size {:?}", download_size);
     168              :     // it's unclear whether it is more performant to decompress into memory or not
     169              :     // TODO: decompressing into memory can be avoided
     170            0 :     let decoder = Decoder::new(download_buffer.as_ref())?;
     171            0 :     let mut archive = Archive::new(decoder);
     172              : 
     173            0 :     let unzip_dest = pgbin
     174            0 :         .strip_suffix("/bin/postgres")
     175            0 :         .expect("bad pgbin")
     176            0 :         .to_string()
     177            0 :         + "/download_extensions";
     178            0 :     archive.unpack(&unzip_dest)?;
     179            0 :     info!("Download + unzip {:?} completed successfully", &ext_path);
     180              : 
     181            0 :     let sharedir_paths = (
     182            0 :         unzip_dest.to_string() + "/share/extension",
     183            0 :         Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
     184            0 :     );
     185            0 :     let libdir_paths = (
     186            0 :         unzip_dest.to_string() + "/lib",
     187            0 :         Path::new(&get_pg_config("--pkglibdir", pgbin)).to_path_buf(),
     188            0 :     );
     189              :     // move contents of the libdir / sharedir in unzipped archive to the correct local paths
     190            0 :     for paths in [sharedir_paths, libdir_paths] {
     191            0 :         let (zip_dir, real_dir) = paths;
     192              : 
     193            0 :         let dir = match std::fs::read_dir(&zip_dir) {
     194            0 :             Ok(dir) => dir,
     195            0 :             Err(e) => match e.kind() {
     196              :                 // In the event of a SQL-only extension, there would be nothing
     197              :                 // to move from the lib/ directory, so note that in the log and
     198              :                 // move on.
     199              :                 std::io::ErrorKind::NotFound => {
     200            0 :                     info!("nothing to move from {}", zip_dir);
     201            0 :                     continue;
     202              :                 }
     203            0 :                 _ => return Err(anyhow::anyhow!(e)),
     204              :             },
     205              :         };
     206              : 
     207            0 :         info!("mv {zip_dir:?}/*  {real_dir:?}");
     208              : 
     209            0 :         for file in dir {
     210            0 :             let old_file = file?.path();
     211            0 :             let new_file =
     212            0 :                 Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
     213            0 :             info!("moving {old_file:?} to {new_file:?}");
     214              : 
     215              :             // extension download failed: Directory not empty (os error 39)
     216            0 :             match std::fs::rename(old_file, new_file) {
     217            0 :                 Ok(()) => info!("move succeeded"),
     218            0 :                 Err(e) => {
     219            0 :                     warn!("move failed, probably because the extension already exists: {e}")
     220              :                 }
     221              :             }
     222              :         }
     223              :     }
     224            0 :     info!("done moving extension {ext_name}");
     225            0 :     Ok(download_size)
     226            0 : }
     227              : 
     228              : // Create extension control files from spec
     229            0 : pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
     230            0 :     let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
     231            0 :     for (ext_name, ext_data) in remote_extensions.extension_data.iter() {
     232              :         // Check if extension is present in public or custom.
     233              :         // If not, then it is not allowed to be used by this compute.
     234            0 :         if let Some(public_extensions) = &remote_extensions.public_extensions {
     235            0 :             if !public_extensions.contains(ext_name) {
     236            0 :                 if let Some(custom_extensions) = &remote_extensions.custom_extensions {
     237            0 :                     if !custom_extensions.contains(ext_name) {
     238            0 :                         continue; // skip this extension, it is not allowed
     239            0 :                     }
     240            0 :                 }
     241            0 :             }
     242            0 :         }
     243              : 
     244            0 :         for (control_name, control_content) in &ext_data.control_data {
     245            0 :             let control_path = local_sharedir.join(control_name);
     246            0 :             if !control_path.exists() {
     247            0 :                 info!("writing file {:?}{:?}", control_path, control_content);
     248            0 :                 std::fs::write(control_path, control_content).unwrap();
     249              :             } else {
     250            0 :                 warn!(
     251            0 :                     "control file {:?} exists both locally and remotely. ignoring the remote version.",
     252              :                     control_path
     253              :                 );
     254              :             }
     255              :         }
     256              :     }
     257            0 : }
     258              : 
     259              : // Do request to extension storage proxy, e.g.,
     260              : // curl http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local/latest/v15/extensions/anon.tar.zst
     261              : // using HTTP GET and return the response body as bytes.
     262            0 : async fn download_extension_tar(remote_ext_base_url: &Url, ext_path: &str) -> Result<Bytes> {
     263            0 :     let uri = remote_ext_base_url.join(ext_path).with_context(|| {
     264            0 :         format!(
     265            0 :             "failed to create the remote extension URI for {ext_path} using {remote_ext_base_url}"
     266              :         )
     267            0 :     })?;
     268            0 :     let filename = Path::new(ext_path)
     269            0 :         .file_name()
     270            0 :         .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
     271            0 :         .to_str()
     272            0 :         .unwrap_or("unknown")
     273            0 :         .to_string();
     274              : 
     275            0 :     info!("Downloading extension file '{}' from uri {}", filename, uri);
     276              : 
     277            0 :     match do_extension_server_request(uri).await {
     278            0 :         Ok(resp) => {
     279            0 :             info!("Successfully downloaded remote extension data {}", ext_path);
     280            0 :             REMOTE_EXT_REQUESTS_TOTAL
     281            0 :                 .with_label_values(&[&StatusCode::OK.to_string(), &filename])
     282            0 :                 .inc();
     283            0 :             Ok(resp)
     284              :         }
     285            0 :         Err((msg, status)) => {
     286            0 :             REMOTE_EXT_REQUESTS_TOTAL
     287            0 :                 .with_label_values(&[&status, &filename])
     288            0 :                 .inc();
     289            0 :             bail!(msg);
     290              :         }
     291              :     }
     292            0 : }
     293              : 
     294              : // Do a single remote extensions server request.
     295              : // Return result or (error message + stringified status code) in case of any failures.
     296            0 : async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)> {
     297            0 :     let resp = reqwest::get(uri).await.map_err(|e| {
     298            0 :         (
     299            0 :             format!("could not perform remote extensions server request: {e:?}"),
     300            0 :             UNKNOWN_HTTP_STATUS.to_string(),
     301            0 :         )
     302            0 :     })?;
     303            0 :     let status = resp.status();
     304              : 
     305            0 :     match status {
     306            0 :         StatusCode::OK => match resp.bytes().await {
     307            0 :             Ok(resp) => Ok(resp),
     308            0 :             Err(e) => Err((
     309            0 :                 format!("could not read remote extensions server response: {e:?}"),
     310            0 :                 // It's fine to return and report error with status as 200 OK,
     311            0 :                 // because we still failed to read the response.
     312            0 :                 status.to_string(),
     313            0 :             )),
     314              :         },
     315            0 :         StatusCode::SERVICE_UNAVAILABLE => Err((
     316            0 :             "remote extensions server is temporarily unavailable".to_string(),
     317            0 :             status.to_string(),
     318            0 :         )),
     319            0 :         _ => Err((
     320            0 :             format!("unexpected remote extensions server response status code: {status}"),
     321            0 :             status.to_string(),
     322            0 :         )),
     323              :     }
     324            0 : }
     325              : 
     326              : #[cfg(test)]
     327              : mod tests {
     328              :     use super::parse_pg_version;
     329              : 
     330              :     #[test]
     331            1 :     fn test_parse_pg_version() {
     332              :         use postgres_versioninfo::PgMajorVersion::*;
     333            1 :         assert_eq!(parse_pg_version("PostgreSQL 15.4"), PG15);
     334            1 :         assert_eq!(parse_pg_version("PostgreSQL 15.14"), PG15);
     335            1 :         assert_eq!(
     336            1 :             parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
     337              :             PG15
     338              :         );
     339              : 
     340            1 :         assert_eq!(parse_pg_version("PostgreSQL 14.15"), PG14);
     341            1 :         assert_eq!(parse_pg_version("PostgreSQL 14.0"), PG14);
     342            1 :         assert_eq!(
     343            1 :             parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
     344              :             PG14
     345              :         );
     346              : 
     347            1 :         assert_eq!(parse_pg_version("PostgreSQL 16devel"), PG16);
     348            1 :         assert_eq!(parse_pg_version("PostgreSQL 16beta1"), PG16);
     349            1 :         assert_eq!(parse_pg_version("PostgreSQL 16rc2"), PG16);
     350            1 :         assert_eq!(parse_pg_version("PostgreSQL 16extra"), PG16);
     351            1 :     }
     352              : 
     353              :     #[test]
     354              :     #[should_panic]
     355            1 :     fn test_parse_pg_unsupported_version() {
     356            1 :         parse_pg_version("PostgreSQL 13.14");
     357            1 :     }
     358              : 
     359              :     #[test]
     360              :     #[should_panic]
     361            1 :     fn test_parse_pg_incorrect_version_format() {
     362            1 :         parse_pg_version("PostgreSQL 14");
     363            1 :     }
     364              : }
        

Generated by: LCOV version 2.1-beta