LCOV - differential code coverage report
Current view: top level - compute_tools/src - extension_server.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 85.0 % 153 130 23 130
Current Date: 2024-01-09 02:06:09 Functions: 100.0 % 24 24 24
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : // Download extension files from the extension store
       2                 : // and put them in the right place in the postgres directory (share / lib)
       3                 : /*
       4                 : The layout of the S3 bucket is as follows:
       5                 : 5615610098 // this is an extension build number
       6                 : ├── v14
       7                 : │   ├── extensions
       8                 : │   │   ├── anon.tar.zst
       9                 : │   │   └── embedding.tar.zst
      10                 : │   └── ext_index.json
      11                 : └── v15
      12                 :     ├── extensions
      13                 :     │   ├── anon.tar.zst
      14                 :     │   └── embedding.tar.zst
      15                 :     └── ext_index.json
      16                 : 5615261079
      17                 : ├── v14
      18                 : │   ├── extensions
      19                 : │   │   └── anon.tar.zst
      20                 : │   └── ext_index.json
      21                 : └── v15
      22                 :     ├── extensions
      23                 :     │   └── anon.tar.zst
      24                 :     └── ext_index.json
      25                 : 5623261088
      26                 : ├── v14
      27                 : │   ├── extensions
      28                 : │   │   └── embedding.tar.zst
      29                 : │   └── ext_index.json
      30                 : └── v15
      31                 :     ├── extensions
      32                 :     │   └── embedding.tar.zst
      33                 :     └── ext_index.json
      34                 : 
      35                 : Note that build number cannot be part of prefix because we might need extensions
      36                 : from other build numbers.
      37                 : 
      38                 : ext_index.json stores the control files and location of extension archives
      39                 : It also stores a list of public extensions and a library_index
      40                 : 
      41                 : We don't need to duplicate extension.tar.zst files.
      42                 : We only need to upload a new one if it is updated.
      43                 : (Although currently we just upload every time anyways, hopefully will change
      44                 : this sometime)
      45                 : 
      46                 : *access* is controlled by spec
      47                 : 
      48                 : More specifically, here is an example ext_index.json
      49                 : {
      50                 :     "public_extensions": [
      51                 :         "anon",
      52                 :         "pg_buffercache"
      53                 :     ],
      54                 :     "library_index": {
      55                 :         "anon": "anon",
      56                 :         "pg_buffercache": "pg_buffercache"
      57                 :     },
      58                 :     "extension_data": {
      59                 :         "pg_buffercache": {
      60                 :             "control_data": {
      61                 :                 "pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
      62                 :             },
      63                 :             "archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
      64                 :         },
      65                 :         "anon": {
      66                 :             "control_data": {
      67                 :                 "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
      68                 :             },
      69                 :             "archive_path": "5670669815/v14/extensions/anon.tar.zst"
      70                 :         }
      71                 :     }
      72                 : }
      73                 : */
      74                 : use anyhow::{self, Result};
      75                 : use anyhow::{bail, Context};
      76                 : use bytes::Bytes;
      77                 : use compute_api::spec::RemoteExtSpec;
      78                 : use regex::Regex;
      79                 : use remote_storage::*;
      80                 : use reqwest::StatusCode;
      81                 : use std::path::Path;
      82                 : use std::str;
      83                 : use tar::Archive;
      84                 : use tracing::info;
      85                 : use tracing::log::warn;
      86                 : use zstd::stream::read::Decoder;
      87                 : 
      88 CBC         547 : fn get_pg_config(argument: &str, pgbin: &str) -> String {
      89             547 :     // gives the result of `pg_config [argument]`
      90             547 :     // where argument is a flag like `--version` or `--sharedir`
      91             547 :     let pgconfig = pgbin
      92             547 :         .strip_suffix("postgres")
      93             547 :         .expect("bad pgbin")
      94             547 :         .to_owned()
      95             547 :         + "/pg_config";
      96             547 :     let config_output = std::process::Command::new(pgconfig)
      97             547 :         .arg(argument)
      98             547 :         .output()
      99             547 :         .expect("pg_config error");
     100             547 :     std::str::from_utf8(&config_output.stdout)
     101             547 :         .expect("pg_config error")
     102             547 :         .trim()
     103             547 :         .to_string()
     104             547 : }
     105                 : 
     106             544 : pub fn get_pg_version(pgbin: &str) -> String {
     107             544 :     // pg_config --version returns a (platform specific) human readable string
     108             544 :     // such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
     109             544 :     let human_version = get_pg_config("--version", pgbin);
     110             544 :     return parse_pg_version(&human_version).to_string();
     111             544 : }
     112                 : 
     113             556 : fn parse_pg_version(human_version: &str) -> &str {
     114             556 :     // Normal releases have version strings like "PostgreSQL 15.4". But there
     115             556 :     // are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
     116             556 :     // 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
     117             556 :     // configure option, you can tack any string to the version number,
     118             556 :     // e.g. "PostgreSQL 15.4foobar".
     119             556 :     match Regex::new(r"^PostgreSQL (?<major>\d+).+")
     120             556 :         .unwrap()
     121             556 :         .captures(human_version)
     122                 :     {
     123             556 :         Some(captures) if captures.len() == 2 => match &captures["major"] {
     124             556 :             "14" => return "v14",
     125               9 :             "15" => return "v15",
     126               6 :             "16" => return "v16",
     127               2 :             _ => {}
     128                 :         },
     129 UBC           0 :         _ => {}
     130                 :     }
     131 CBC           2 :     panic!("Unsuported postgres version {human_version}");
     132             554 : }
     133                 : 
     134                 : // download the archive for a given extension,
     135                 : // unzip it, and place files in the appropriate locations (share/lib)
     136               1 : pub async fn download_extension(
     137               1 :     ext_name: &str,
     138               1 :     ext_path: &RemotePath,
     139               1 :     ext_remote_storage: &str,
     140               1 :     pgbin: &str,
     141               1 : ) -> Result<u64> {
     142               1 :     info!("Download extension {:?} from {:?}", ext_name, ext_path);
     143                 : 
     144                 :     // TODO add retry logic
     145               1 :     let download_buffer =
     146              58 :         match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await {
     147               1 :             Ok(buffer) => buffer,
     148 UBC           0 :             Err(error_message) => {
     149               0 :                 return Err(anyhow::anyhow!(
     150               0 :                     "error downloading extension {:?}: {:?}",
     151               0 :                     ext_name,
     152               0 :                     error_message
     153               0 :                 ));
     154                 :             }
     155                 :         };
     156                 : 
     157 CBC           1 :     let download_size = download_buffer.len() as u64;
     158               1 :     info!("Download size {:?}", download_size);
     159                 :     // it's unclear whether it is more performant to decompress into memory or not
     160                 :     // TODO: decompressing into memory can be avoided
     161               1 :     let decoder = Decoder::new(download_buffer.as_ref())?;
     162               1 :     let mut archive = Archive::new(decoder);
     163               1 : 
     164               1 :     let unzip_dest = pgbin
     165               1 :         .strip_suffix("/bin/postgres")
     166               1 :         .expect("bad pgbin")
     167               1 :         .to_string()
     168               1 :         + "/download_extensions";
     169               1 :     archive.unpack(&unzip_dest)?;
     170               1 :     info!("Download + unzip {:?} completed successfully", &ext_path);
     171                 : 
     172               1 :     let sharedir_paths = (
     173               1 :         unzip_dest.to_string() + "/share/extension",
     174               1 :         Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
     175               1 :     );
     176               1 :     let libdir_paths = (
     177               1 :         unzip_dest.to_string() + "/lib",
     178               1 :         Path::new(&get_pg_config("--pkglibdir", pgbin)).to_path_buf(),
     179               1 :     );
     180                 :     // move contents of the libdir / sharedir in unzipped archive to the correct local paths
     181               2 :     for paths in [sharedir_paths, libdir_paths] {
     182               2 :         let (zip_dir, real_dir) = paths;
     183               2 :         info!("mv {zip_dir:?}/*  {real_dir:?}");
     184               3 :         for file in std::fs::read_dir(zip_dir)? {
     185               3 :             let old_file = file?.path();
     186               3 :             let new_file =
     187               3 :                 Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
     188               3 :             info!("moving {old_file:?} to {new_file:?}");
     189                 : 
     190                 :             // extension download failed: Directory not empty (os error 39)
     191               3 :             match std::fs::rename(old_file, new_file) {
     192               3 :                 Ok(()) => info!("move succeeded"),
     193 UBC           0 :                 Err(e) => {
     194               0 :                     warn!("move failed, probably because the extension already exists: {e}")
     195                 :                 }
     196                 :             }
     197                 :         }
     198                 :     }
     199 CBC           1 :     info!("done moving extension {ext_name}");
     200               1 :     Ok(download_size)
     201               1 : }
     202                 : 
     203                 : // Create extension control files from spec
     204               1 : pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
     205               1 :     let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
     206               1 :     for (ext_name, ext_data) in remote_extensions.extension_data.iter() {
     207                 :         // Check if extension is present in public or custom.
     208                 :         // If not, then it is not allowed to be used by this compute.
     209               1 :         if let Some(public_extensions) = &remote_extensions.public_extensions {
     210 UBC           0 :             if !public_extensions.contains(ext_name) {
     211               0 :                 if let Some(custom_extensions) = &remote_extensions.custom_extensions {
     212               0 :                     if !custom_extensions.contains(ext_name) {
     213               0 :                         continue; // skip this extension, it is not allowed
     214               0 :                     }
     215               0 :                 }
     216               0 :             }
     217 CBC           1 :         }
     218                 : 
     219               2 :         for (control_name, control_content) in &ext_data.control_data {
     220               1 :             let control_path = local_sharedir.join(control_name);
     221               1 :             if !control_path.exists() {
     222               1 :                 info!("writing file {:?}{:?}", control_path, control_content);
     223               1 :                 std::fs::write(control_path, control_content).unwrap();
     224                 :             } else {
     225 UBC           0 :                 warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_path);
     226                 :             }
     227                 :         }
     228                 :     }
     229 CBC           1 : }
     230                 : 
     231                 : // Do request to extension storage proxy, i.e.
     232                 : // curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
     233                 : // using HHTP GET
     234                 : // and return the response body as bytes
     235                 : //
     236               1 : async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
     237               1 :     let uri = format!("{}/{}", ext_remote_storage, ext_path);
     238                 : 
     239               1 :     info!("Download extension {:?} from uri {:?}", ext_path, uri);
     240                 : 
     241               4 :     let resp = reqwest::get(uri).await?;
     242                 : 
     243               1 :     match resp.status() {
     244              54 :         StatusCode::OK => match resp.bytes().await {
     245               1 :             Ok(resp) => {
     246               1 :                 info!("Download extension {:?} completed successfully", ext_path);
     247               1 :                 Ok(resp)
     248                 :             }
     249 UBC           0 :             Err(e) => bail!("could not deserialize remote extension response: {}", e),
     250                 :         },
     251               0 :         StatusCode::SERVICE_UNAVAILABLE => bail!("remote extension is temporarily unavailable"),
     252               0 :         _ => bail!(
     253               0 :             "unexpected remote extension response status code: {}",
     254               0 :             resp.status()
     255               0 :         ),
     256                 :     }
     257 CBC           1 : }
     258                 : 
     259                 : #[cfg(test)]
     260                 : mod tests {
     261                 :     use super::parse_pg_version;
     262                 : 
     263               1 :     #[test]
     264               1 :     fn test_parse_pg_version() {
     265               1 :         assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15");
     266               1 :         assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15");
     267               1 :         assert_eq!(
     268               1 :             parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
     269               1 :             "v15"
     270               1 :         );
     271                 : 
     272               1 :         assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14");
     273               1 :         assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14");
     274               1 :         assert_eq!(
     275               1 :             parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
     276               1 :             "v14"
     277               1 :         );
     278                 : 
     279               1 :         assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16");
     280               1 :         assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16");
     281               1 :         assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16");
     282               1 :         assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16");
     283               1 :     }
     284                 : 
     285               1 :     #[test]
     286                 :     #[should_panic]
     287               1 :     fn test_parse_pg_unsupported_version() {
     288               1 :         parse_pg_version("PostgreSQL 13.14");
     289               1 :     }
     290                 : 
     291               1 :     #[test]
     292                 :     #[should_panic]
     293               1 :     fn test_parse_pg_incorrect_version_format() {
     294               1 :         parse_pg_version("PostgreSQL 14");
     295               1 :     }
     296                 : }
        

Generated by: LCOV version 2.1-beta