LCOV - code coverage report
Current view: top level - compute_tools/src - extension_server.rs (source / functions) Coverage Total Hit
Test: 8ac049b474321fdc72ddcb56d7165153a1a900e8.info Lines: 22.1 % 113 25
Test Date: 2023-09-06 10:18:01 Functions: 8.7 % 23 2

            Line data    Source code
       1              : // Download extension files from the extension store
       2              : // and put them in the right place in the postgres directory (share / lib)
       3              : /*
       4              : The layout of the S3 bucket is as follows:
       5              : 5615610098 // this is an extension build number
       6              : ├── v14
       7              : │   ├── extensions
       8              : │   │   ├── anon.tar.zst
       9              : │   │   └── embedding.tar.zst
      10              : │   └── ext_index.json
      11              : └── v15
      12              :     ├── extensions
      13              :     │   ├── anon.tar.zst
      14              :     │   └── embedding.tar.zst
      15              :     └── ext_index.json
      16              : 5615261079
      17              : ├── v14
      18              : │   ├── extensions
      19              : │   │   └── anon.tar.zst
      20              : │   └── ext_index.json
      21              : └── v15
      22              :     ├── extensions
      23              :     │   └── anon.tar.zst
      24              :     └── ext_index.json
      25              : 5623261088
      26              : ├── v14
      27              : │   ├── extensions
      28              : │   │   └── embedding.tar.zst
      29              : │   └── ext_index.json
      30              : └── v15
      31              :     ├── extensions
      32              :     │   └── embedding.tar.zst
      33              :     └── ext_index.json
      34              : 
      35              : Note that build number cannot be part of prefix because we might need extensions
      36              : from other build numbers.
      37              : 
      38              : ext_index.json stores the control files and location of extension archives
      39              : It also stores a list of public extensions and a library_index
      40              : 
      41              : We don't need to duplicate extension.tar.zst files.
      42              : We only need to upload a new one if it is updated.
      43              : (Although currently we just upload every time anyways, hopefully will change
      44              : this sometime)
      45              : 
      46              : *access* is controlled by spec
      47              : 
      48              : More specifically, here is an example ext_index.json
      49              : {
      50              :     "public_extensions": [
      51              :         "anon",
      52              :         "pg_buffercache"
      53              :     ],
      54              :     "library_index": {
      55              :         "anon": "anon",
      56              :         "pg_buffercache": "pg_buffercache"
      57              :     },
      58              :     "extension_data": {
      59              :         "pg_buffercache": {
      60              :             "control_data": {
      61              :                 "pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
      62              :             },
      63              :             "archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
      64              :         },
      65              :         "anon": {
      66              :             "control_data": {
      67              :                 "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
      68              :             },
      69              :             "archive_path": "5670669815/v14/extensions/anon.tar.zst"
      70              :         }
      71              :     }
      72              : }
      73              : */
      74              : use anyhow::Context;
      75              : use anyhow::{self, Result};
      76              : use compute_api::spec::RemoteExtSpec;
      77              : use remote_storage::*;
      78              : use serde_json;
      79              : use std::io::Read;
      80              : use std::num::{NonZeroU32, NonZeroUsize};
      81              : use std::path::Path;
      82              : use std::str;
      83              : use tar::Archive;
      84              : use tokio::io::AsyncReadExt;
      85              : use tracing::info;
      86              : use tracing::log::warn;
      87              : use zstd::stream::read::Decoder;
      88              : 
      89          663 : fn get_pg_config(argument: &str, pgbin: &str) -> String {
      90          663 :     // gives the result of `pg_config [argument]`
      91          663 :     // where argument is a flag like `--version` or `--sharedir`
      92          663 :     let pgconfig = pgbin
      93          663 :         .strip_suffix("postgres")
      94          663 :         .expect("bad pgbin")
      95          663 :         .to_owned()
      96          663 :         + "/pg_config";
      97          663 :     let config_output = std::process::Command::new(pgconfig)
      98          663 :         .arg(argument)
      99          663 :         .output()
     100          663 :         .expect("pg_config error");
     101          663 :     std::str::from_utf8(&config_output.stdout)
     102          663 :         .expect("pg_config error")
     103          663 :         .trim()
     104          663 :         .to_string()
     105          663 : }
     106              : 
     107          663 : pub fn get_pg_version(pgbin: &str) -> String {
     108          663 :     // pg_config --version returns a (platform specific) human readable string
     109          663 :     // such as "PostgreSQL 15.4". We parse this to v14/v15
     110          663 :     let human_version = get_pg_config("--version", pgbin);
     111          663 :     if human_version.contains("15") {
     112            0 :         return "v15".to_string();
     113          663 :     } else if human_version.contains("14") {
     114          663 :         return "v14".to_string();
     115            0 :     }
     116            0 :     panic!("Unsuported postgres version {human_version}");
     117          663 : }
     118              : 
     119              : // download the archive for a given extension,
     120              : // unzip it, and place files in the appropriate locations (share/lib)
     121            0 : pub async fn download_extension(
     122            0 :     ext_name: &str,
     123            0 :     ext_path: &RemotePath,
     124            0 :     remote_storage: &GenericRemoteStorage,
     125            0 :     pgbin: &str,
     126            0 : ) -> Result<u64> {
     127            0 :     info!("Download extension {:?} from {:?}", ext_name, ext_path);
     128            0 :     let mut download = remote_storage.download(ext_path).await?;
     129            0 :     let mut download_buffer = Vec::new();
     130            0 :     download
     131            0 :         .download_stream
     132            0 :         .read_to_end(&mut download_buffer)
     133            0 :         .await?;
     134            0 :     let download_size = download_buffer.len() as u64;
     135              :     // it's unclear whether it is more performant to decompress into memory or not
     136              :     // TODO: decompressing into memory can be avoided
     137            0 :     let mut decoder = Decoder::new(download_buffer.as_slice())?;
     138            0 :     let mut decompress_buffer = Vec::new();
     139            0 :     decoder.read_to_end(&mut decompress_buffer)?;
     140            0 :     let mut archive = Archive::new(decompress_buffer.as_slice());
     141            0 :     let unzip_dest = pgbin
     142            0 :         .strip_suffix("/bin/postgres")
     143            0 :         .expect("bad pgbin")
     144            0 :         .to_string()
     145            0 :         + "/download_extensions";
     146            0 :     archive.unpack(&unzip_dest)?;
     147            0 :     info!("Download + unzip {:?} completed successfully", &ext_path);
     148              : 
     149            0 :     let sharedir_paths = (
     150            0 :         unzip_dest.to_string() + "/share/extension",
     151            0 :         Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
     152            0 :     );
     153            0 :     let libdir_paths = (
     154            0 :         unzip_dest.to_string() + "/lib",
     155            0 :         Path::new(&get_pg_config("--pkglibdir", pgbin)).to_path_buf(),
     156            0 :     );
     157              :     // move contents of the libdir / sharedir in unzipped archive to the correct local paths
     158            0 :     for paths in [sharedir_paths, libdir_paths] {
     159            0 :         let (zip_dir, real_dir) = paths;
     160            0 :         info!("mv {zip_dir:?}/*  {real_dir:?}");
     161            0 :         for file in std::fs::read_dir(zip_dir)? {
     162            0 :             let old_file = file?.path();
     163            0 :             let new_file =
     164            0 :                 Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
     165            0 :             info!("moving {old_file:?} to {new_file:?}");
     166              : 
     167              :             // extension download failed: Directory not empty (os error 39)
     168            0 :             match std::fs::rename(old_file, new_file) {
     169            0 :                 Ok(()) => info!("move succeeded"),
     170            0 :                 Err(e) => {
     171            0 :                     warn!("move failed, probably because the extension already exists: {e}")
     172              :                 }
     173              :             }
     174              :         }
     175              :     }
     176            0 :     info!("done moving extension {ext_name}");
     177            0 :     Ok(download_size)
     178            0 : }
     179              : 
     180              : // Create extension control files from spec
     181            0 : pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
     182            0 :     let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
     183            0 :     for (ext_name, ext_data) in remote_extensions.extension_data.iter() {
     184              :         // Check if extension is present in public or custom.
     185              :         // If not, then it is not allowed to be used by this compute.
     186            0 :         if let Some(public_extensions) = &remote_extensions.public_extensions {
     187            0 :             if !public_extensions.contains(ext_name) {
     188            0 :                 if let Some(custom_extensions) = &remote_extensions.custom_extensions {
     189            0 :                     if !custom_extensions.contains(ext_name) {
     190            0 :                         continue; // skip this extension, it is not allowed
     191            0 :                     }
     192            0 :                 }
     193            0 :             }
     194            0 :         }
     195              : 
     196            0 :         for (control_name, control_content) in &ext_data.control_data {
     197            0 :             let control_path = local_sharedir.join(control_name);
     198            0 :             if !control_path.exists() {
     199            0 :                 info!("writing file {:?}{:?}", control_path, control_content);
     200            0 :                 std::fs::write(control_path, control_content).unwrap();
     201              :             } else {
     202            0 :                 warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_path);
     203              :             }
     204              :         }
     205              :     }
     206            0 : }
     207              : 
     208              : // This function initializes the necessary structs to use remote storage
     209            0 : pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRemoteStorage> {
     210            0 :     #[derive(Debug, serde::Deserialize)]
     211              :     struct RemoteExtJson {
     212              :         bucket: String,
     213              :         region: String,
     214              :         endpoint: Option<String>,
     215              :         prefix: Option<String>,
     216              :     }
     217            0 :     let remote_ext_json = serde_json::from_str::<RemoteExtJson>(remote_ext_config)?;
     218              : 
     219            0 :     let config = S3Config {
     220            0 :         bucket_name: remote_ext_json.bucket,
     221            0 :         bucket_region: remote_ext_json.region,
     222            0 :         prefix_in_bucket: remote_ext_json.prefix,
     223            0 :         endpoint: remote_ext_json.endpoint,
     224            0 :         concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
     225            0 :         max_keys_per_list_response: None,
     226            0 :     };
     227            0 :     let config = RemoteStorageConfig {
     228            0 :         max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"),
     229            0 :         max_sync_errors: NonZeroU32::new(100).expect("100 != 0"),
     230            0 :         storage: RemoteStorageKind::AwsS3(config),
     231            0 :     };
     232            0 :     GenericRemoteStorage::from_config(&config)
     233            0 : }
        

Generated by: LCOV version 2.1-beta