LCOV - code coverage report
Current view: top level - compute_tools/src - extension_server.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 19.1 % 188 36
Test Date: 2025-02-20 13:11:02 Functions: 26.7 % 15 4

            Line data    Source code
       1              : // Download extension files from the extension store
       2              : // and put them in the right place in the postgres directory (share / lib)
       3              : /*
       4              : The layout of the S3 bucket is as follows:
       5              : 5615610098 // this is an extension build number
       6              : ├── v14
       7              : │   ├── extensions
       8              : │   │   ├── anon.tar.zst
       9              : │   │   └── embedding.tar.zst
      10              : │   └── ext_index.json
      11              : └── v15
      12              :     ├── extensions
      13              :     │   ├── anon.tar.zst
      14              :     │   └── embedding.tar.zst
      15              :     └── ext_index.json
      16              : 5615261079
      17              : ├── v14
      18              : │   ├── extensions
      19              : │   │   └── anon.tar.zst
      20              : │   └── ext_index.json
      21              : └── v15
      22              :     ├── extensions
      23              :     │   └── anon.tar.zst
      24              :     └── ext_index.json
      25              : 5623261088
      26              : ├── v14
      27              : │   ├── extensions
      28              : │   │   └── embedding.tar.zst
      29              : │   └── ext_index.json
      30              : └── v15
      31              :     ├── extensions
      32              :     │   └── embedding.tar.zst
      33              :     └── ext_index.json
      34              : 
      35              : Note that build number cannot be part of prefix because we might need extensions
      36              : from other build numbers.
      37              : 
      38              : ext_index.json stores the control files and location of extension archives
      39              : It also stores a list of public extensions and a library_index
      40              : 
      41              : We don't need to duplicate extension.tar.zst files.
      42              : We only need to upload a new one if it is updated.
      43              : (Although currently we just upload every time anyways, hopefully will change
      44              : this sometime)
      45              : 
      46              : *access* is controlled by spec
      47              : 
      48              : More specifically, here is an example ext_index.json
      49              : {
      50              :     "public_extensions": [
      51              :         "anon",
      52              :         "pg_buffercache"
      53              :     ],
      54              :     "library_index": {
      55              :         "anon": "anon",
      56              :         "pg_buffercache": "pg_buffercache"
      57              :     },
      58              :     "extension_data": {
      59              :         "pg_buffercache": {
      60              :             "control_data": {
      61              :                 "pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
      62              :             },
      63              :             "archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
      64              :         },
      65              :         "anon": {
      66              :             "control_data": {
      67              :                 "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
      68              :             },
      69              :             "archive_path": "5670669815/v14/extensions/anon.tar.zst"
      70              :         }
      71              :     }
      72              : }
      73              : */
      74              : use anyhow::Result;
      75              : use anyhow::{bail, Context};
      76              : use bytes::Bytes;
      77              : use compute_api::spec::RemoteExtSpec;
      78              : use regex::Regex;
      79              : use remote_storage::*;
      80              : use reqwest::StatusCode;
      81              : use std::path::Path;
      82              : use std::str;
      83              : use tar::Archive;
      84              : use tracing::info;
      85              : use tracing::log::warn;
      86              : use zstd::stream::read::Decoder;
      87              : 
      88              : use crate::metrics::{REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
      89              : 
      90            0 : fn get_pg_config(argument: &str, pgbin: &str) -> String {
      91            0 :     // gives the result of `pg_config [argument]`
      92            0 :     // where argument is a flag like `--version` or `--sharedir`
      93            0 :     let pgconfig = pgbin
      94            0 :         .strip_suffix("postgres")
      95            0 :         .expect("bad pgbin")
      96            0 :         .to_owned()
      97            0 :         + "/pg_config";
      98            0 :     let config_output = std::process::Command::new(pgconfig)
      99            0 :         .arg(argument)
     100            0 :         .output()
     101            0 :         .expect("pg_config error");
     102            0 :     std::str::from_utf8(&config_output.stdout)
     103            0 :         .expect("pg_config error")
     104            0 :         .trim()
     105            0 :         .to_string()
     106            0 : }
     107              : 
     108            0 : pub fn get_pg_version(pgbin: &str) -> PostgresMajorVersion {
     109            0 :     // pg_config --version returns a (platform specific) human readable string
     110            0 :     // such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
     111            0 :     let human_version = get_pg_config("--version", pgbin);
     112            0 :     parse_pg_version(&human_version)
     113            0 : }
     114              : 
     115            0 : pub fn get_pg_version_string(pgbin: &str) -> String {
     116            0 :     match get_pg_version(pgbin) {
     117            0 :         PostgresMajorVersion::V14 => "v14",
     118            0 :         PostgresMajorVersion::V15 => "v15",
     119            0 :         PostgresMajorVersion::V16 => "v16",
     120            0 :         PostgresMajorVersion::V17 => "v17",
     121              :     }
     122            0 :     .to_owned()
     123            0 : }
     124              : 
     125              : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
     126              : pub enum PostgresMajorVersion {
     127              :     V14,
     128              :     V15,
     129              :     V16,
     130              :     V17,
     131              : }
     132              : 
     133           12 : fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
     134              :     use PostgresMajorVersion::*;
     135              :     // Normal releases have version strings like "PostgreSQL 15.4". But there
     136              :     // are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
     137              :     // 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
     138              :     // configure option, you can tack any string to the version number,
     139              :     // e.g. "PostgreSQL 15.4foobar".
     140           12 :     match Regex::new(r"^PostgreSQL (?<major>\d+).+")
     141           12 :         .unwrap()
     142           12 :         .captures(human_version)
     143              :     {
     144           12 :         Some(captures) if captures.len() == 2 => match &captures["major"] {
     145           12 :             "14" => return V14,
     146            9 :             "15" => return V15,
     147            6 :             "16" => return V16,
     148            2 :             "17" => return V17,
     149            2 :             _ => {}
     150              :         },
     151            0 :         _ => {}
     152              :     }
     153            2 :     panic!("Unsuported postgres version {human_version}");
     154           10 : }
     155              : 
     156              : // download the archive for a given extension,
     157              : // unzip it, and place files in the appropriate locations (share/lib)
     158            0 : pub async fn download_extension(
     159            0 :     ext_name: &str,
     160            0 :     ext_path: &RemotePath,
     161            0 :     ext_remote_storage: &str,
     162            0 :     pgbin: &str,
     163            0 : ) -> Result<u64> {
     164            0 :     info!("Download extension {:?} from {:?}", ext_name, ext_path);
     165              : 
     166              :     // TODO add retry logic
     167            0 :     let download_buffer =
     168            0 :         match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await {
     169            0 :             Ok(buffer) => buffer,
     170            0 :             Err(error_message) => {
     171            0 :                 return Err(anyhow::anyhow!(
     172            0 :                     "error downloading extension {:?}: {:?}",
     173            0 :                     ext_name,
     174            0 :                     error_message
     175            0 :                 ));
     176              :             }
     177              :         };
     178              : 
     179            0 :     let download_size = download_buffer.len() as u64;
     180            0 :     info!("Download size {:?}", download_size);
     181              :     // it's unclear whether it is more performant to decompress into memory or not
     182              :     // TODO: decompressing into memory can be avoided
     183            0 :     let decoder = Decoder::new(download_buffer.as_ref())?;
     184            0 :     let mut archive = Archive::new(decoder);
     185            0 : 
     186            0 :     let unzip_dest = pgbin
     187            0 :         .strip_suffix("/bin/postgres")
     188            0 :         .expect("bad pgbin")
     189            0 :         .to_string()
     190            0 :         + "/download_extensions";
     191            0 :     archive.unpack(&unzip_dest)?;
     192            0 :     info!("Download + unzip {:?} completed successfully", &ext_path);
     193              : 
     194            0 :     let sharedir_paths = (
     195            0 :         unzip_dest.to_string() + "/share/extension",
     196            0 :         Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
     197            0 :     );
     198            0 :     let libdir_paths = (
     199            0 :         unzip_dest.to_string() + "/lib",
     200            0 :         Path::new(&get_pg_config("--pkglibdir", pgbin)).to_path_buf(),
     201            0 :     );
     202              :     // move contents of the libdir / sharedir in unzipped archive to the correct local paths
     203            0 :     for paths in [sharedir_paths, libdir_paths] {
     204            0 :         let (zip_dir, real_dir) = paths;
     205            0 :         info!("mv {zip_dir:?}/*  {real_dir:?}");
     206            0 :         for file in std::fs::read_dir(zip_dir)? {
     207            0 :             let old_file = file?.path();
     208            0 :             let new_file =
     209            0 :                 Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
     210            0 :             info!("moving {old_file:?} to {new_file:?}");
     211              : 
     212              :             // extension download failed: Directory not empty (os error 39)
     213            0 :             match std::fs::rename(old_file, new_file) {
     214            0 :                 Ok(()) => info!("move succeeded"),
     215            0 :                 Err(e) => {
     216            0 :                     warn!("move failed, probably because the extension already exists: {e}")
     217              :                 }
     218              :             }
     219              :         }
     220              :     }
     221            0 :     info!("done moving extension {ext_name}");
     222            0 :     Ok(download_size)
     223            0 : }
     224              : 
     225              : // Create extension control files from spec
     226            0 : pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
     227            0 :     let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
     228            0 :     for (ext_name, ext_data) in remote_extensions.extension_data.iter() {
     229              :         // Check if extension is present in public or custom.
     230              :         // If not, then it is not allowed to be used by this compute.
     231            0 :         if let Some(public_extensions) = &remote_extensions.public_extensions {
     232            0 :             if !public_extensions.contains(ext_name) {
     233            0 :                 if let Some(custom_extensions) = &remote_extensions.custom_extensions {
     234            0 :                     if !custom_extensions.contains(ext_name) {
     235            0 :                         continue; // skip this extension, it is not allowed
     236            0 :                     }
     237            0 :                 }
     238            0 :             }
     239            0 :         }
     240              : 
     241            0 :         for (control_name, control_content) in &ext_data.control_data {
     242            0 :             let control_path = local_sharedir.join(control_name);
     243            0 :             if !control_path.exists() {
     244            0 :                 info!("writing file {:?}{:?}", control_path, control_content);
     245            0 :                 std::fs::write(control_path, control_content).unwrap();
     246              :             } else {
     247            0 :                 warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_path);
     248              :             }
     249              :         }
     250              :     }
     251            0 : }
     252              : 
     253              : // Do request to extension storage proxy, i.e.
     254              : // curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
     255              : // using HHTP GET
     256              : // and return the response body as bytes
     257              : //
     258            0 : async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
     259            0 :     let uri = format!("{}/{}", ext_remote_storage, ext_path);
     260            0 : 
     261            0 :     info!("Download extension {} from uri {}", ext_path, uri);
     262              : 
     263            0 :     match do_extension_server_request(&uri).await {
     264            0 :         Ok(resp) => {
     265            0 :             info!("Successfully downloaded remote extension data {}", ext_path);
     266            0 :             REMOTE_EXT_REQUESTS_TOTAL
     267            0 :                 .with_label_values(&[&StatusCode::OK.to_string()])
     268            0 :                 .inc();
     269            0 :             Ok(resp)
     270              :         }
     271            0 :         Err((msg, status)) => {
     272            0 :             REMOTE_EXT_REQUESTS_TOTAL
     273            0 :                 .with_label_values(&[&status])
     274            0 :                 .inc();
     275            0 :             bail!(msg);
     276              :         }
     277              :     }
     278            0 : }
     279              : 
     280              : // Do a single remote extensions server request.
     281              : // Return result or (error message + stringified status code) in case of any failures.
     282            0 : async fn do_extension_server_request(uri: &str) -> Result<Bytes, (String, String)> {
     283            0 :     let resp = reqwest::get(uri).await.map_err(|e| {
     284            0 :         (
     285            0 :             format!(
     286            0 :                 "could not perform remote extensions server request: {:?}",
     287            0 :                 e
     288            0 :             ),
     289            0 :             UNKNOWN_HTTP_STATUS.to_string(),
     290            0 :         )
     291            0 :     })?;
     292            0 :     let status = resp.status();
     293            0 : 
     294            0 :     match status {
     295            0 :         StatusCode::OK => match resp.bytes().await {
     296            0 :             Ok(resp) => Ok(resp),
     297            0 :             Err(e) => Err((
     298            0 :                 format!("could not read remote extensions server response: {:?}", e),
     299            0 :                 // It's fine to return and report error with status as 200 OK,
     300            0 :                 // because we still failed to read the response.
     301            0 :                 status.to_string(),
     302            0 :             )),
     303              :         },
     304            0 :         StatusCode::SERVICE_UNAVAILABLE => Err((
     305            0 :             "remote extensions server is temporarily unavailable".to_string(),
     306            0 :             status.to_string(),
     307            0 :         )),
     308            0 :         _ => Err((
     309            0 :             format!(
     310            0 :                 "unexpected remote extensions server response status code: {}",
     311            0 :                 status
     312            0 :             ),
     313            0 :             status.to_string(),
     314            0 :         )),
     315              :     }
     316            0 : }
     317              : 
     318              : #[cfg(test)]
     319              : mod tests {
     320              :     use super::parse_pg_version;
     321              : 
     322              :     #[test]
     323            1 :     fn test_parse_pg_version() {
     324              :         use super::PostgresMajorVersion::*;
     325            1 :         assert_eq!(parse_pg_version("PostgreSQL 15.4"), V15);
     326            1 :         assert_eq!(parse_pg_version("PostgreSQL 15.14"), V15);
     327            1 :         assert_eq!(
     328            1 :             parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
     329            1 :             V15
     330            1 :         );
     331              : 
     332            1 :         assert_eq!(parse_pg_version("PostgreSQL 14.15"), V14);
     333            1 :         assert_eq!(parse_pg_version("PostgreSQL 14.0"), V14);
     334            1 :         assert_eq!(
     335            1 :             parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
     336            1 :             V14
     337            1 :         );
     338              : 
     339            1 :         assert_eq!(parse_pg_version("PostgreSQL 16devel"), V16);
     340            1 :         assert_eq!(parse_pg_version("PostgreSQL 16beta1"), V16);
     341            1 :         assert_eq!(parse_pg_version("PostgreSQL 16rc2"), V16);
     342            1 :         assert_eq!(parse_pg_version("PostgreSQL 16extra"), V16);
     343            1 :     }
     344              : 
     345              :     #[test]
     346              :     #[should_panic]
     347            1 :     fn test_parse_pg_unsupported_version() {
     348            1 :         parse_pg_version("PostgreSQL 13.14");
     349            1 :     }
     350              : 
     351              :     #[test]
     352              :     #[should_panic]
     353            1 :     fn test_parse_pg_incorrect_version_format() {
     354            1 :         parse_pg_version("PostgreSQL 14");
     355            1 :     }
     356              : }
        

Generated by: LCOV version 2.1-beta