Line data Source code
1 : // Download extension files from the extension store
2 : // and put them in the right place in the postgres directory (share / lib)
3 : /*
4 : The layout of the S3 bucket is as follows:
5 : 5615610098 // this is an extension build number
6 : ├── v14
7 : │ ├── extensions
8 : │ │ ├── anon.tar.zst
9 : │ │ └── embedding.tar.zst
10 : │ └── ext_index.json
11 : └── v15
12 : ├── extensions
13 : │ ├── anon.tar.zst
14 : │ └── embedding.tar.zst
15 : └── ext_index.json
16 : 5615261079
17 : ├── v14
18 : │ ├── extensions
19 : │ │ └── anon.tar.zst
20 : │ └── ext_index.json
21 : └── v15
22 : ├── extensions
23 : │ └── anon.tar.zst
24 : └── ext_index.json
25 : 5623261088
26 : ├── v14
27 : │ ├── extensions
28 : │ │ └── embedding.tar.zst
29 : │ └── ext_index.json
30 : └── v15
31 : ├── extensions
32 : │ └── embedding.tar.zst
33 : └── ext_index.json
34 :
35 : Note that build number cannot be part of prefix because we might need extensions
36 : from other build numbers.
37 :
38 : ext_index.json stores the control files and location of extension archives
39 : It also stores a list of public extensions and a library_index
40 :
41 : We don't need to duplicate extension.tar.zst files.
42 : We only need to upload a new one if it is updated.
43 : (Although currently we just upload every time anyways, hopefully will change
44 : this sometime)
45 :
46 : *access* is controlled by spec
47 :
48 : More specifically, here is an example ext_index.json
49 : {
50 : "public_extensions": [
51 : "anon",
52 : "pg_buffercache"
53 : ],
54 : "library_index": {
55 : "anon": "anon",
56 : "pg_buffercache": "pg_buffercache"
57 : },
58 : "extension_data": {
59 : "pg_buffercache": {
60 : "control_data": {
61 : "pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
62 : },
63 : "archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
64 : },
65 : "anon": {
66 : "control_data": {
67 : "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
68 : },
69 : "archive_path": "5670669815/v14/extensions/anon.tar.zst"
70 : }
71 : }
72 : }
73 : */
74 : use anyhow::Context;
75 : use anyhow::{self, Result};
76 : use compute_api::spec::RemoteExtSpec;
77 : use remote_storage::*;
78 : use serde_json;
79 : use std::io::Read;
80 : use std::num::{NonZeroU32, NonZeroUsize};
81 : use std::path::Path;
82 : use std::str;
83 : use tar::Archive;
84 : use tokio::io::AsyncReadExt;
85 : use tracing::info;
86 : use tracing::log::warn;
87 : use zstd::stream::read::Decoder;
88 :
89 663 : fn get_pg_config(argument: &str, pgbin: &str) -> String {
90 663 : // gives the result of `pg_config [argument]`
91 663 : // where argument is a flag like `--version` or `--sharedir`
92 663 : let pgconfig = pgbin
93 663 : .strip_suffix("postgres")
94 663 : .expect("bad pgbin")
95 663 : .to_owned()
96 663 : + "/pg_config";
97 663 : let config_output = std::process::Command::new(pgconfig)
98 663 : .arg(argument)
99 663 : .output()
100 663 : .expect("pg_config error");
101 663 : std::str::from_utf8(&config_output.stdout)
102 663 : .expect("pg_config error")
103 663 : .trim()
104 663 : .to_string()
105 663 : }
106 :
107 663 : pub fn get_pg_version(pgbin: &str) -> String {
108 663 : // pg_config --version returns a (platform specific) human readable string
109 663 : // such as "PostgreSQL 15.4". We parse this to v14/v15
110 663 : let human_version = get_pg_config("--version", pgbin);
111 663 : if human_version.contains("15") {
112 0 : return "v15".to_string();
113 663 : } else if human_version.contains("14") {
114 663 : return "v14".to_string();
115 0 : }
116 0 : panic!("Unsuported postgres version {human_version}");
117 663 : }
118 :
119 : // download the archive for a given extension,
120 : // unzip it, and place files in the appropriate locations (share/lib)
121 0 : pub async fn download_extension(
122 0 : ext_name: &str,
123 0 : ext_path: &RemotePath,
124 0 : remote_storage: &GenericRemoteStorage,
125 0 : pgbin: &str,
126 0 : ) -> Result<u64> {
127 0 : info!("Download extension {:?} from {:?}", ext_name, ext_path);
128 0 : let mut download = remote_storage.download(ext_path).await?;
129 0 : let mut download_buffer = Vec::new();
130 0 : download
131 0 : .download_stream
132 0 : .read_to_end(&mut download_buffer)
133 0 : .await?;
134 0 : let download_size = download_buffer.len() as u64;
135 : // it's unclear whether it is more performant to decompress into memory or not
136 : // TODO: decompressing into memory can be avoided
137 0 : let mut decoder = Decoder::new(download_buffer.as_slice())?;
138 0 : let mut decompress_buffer = Vec::new();
139 0 : decoder.read_to_end(&mut decompress_buffer)?;
140 0 : let mut archive = Archive::new(decompress_buffer.as_slice());
141 0 : let unzip_dest = pgbin
142 0 : .strip_suffix("/bin/postgres")
143 0 : .expect("bad pgbin")
144 0 : .to_string()
145 0 : + "/download_extensions";
146 0 : archive.unpack(&unzip_dest)?;
147 0 : info!("Download + unzip {:?} completed successfully", &ext_path);
148 :
149 0 : let sharedir_paths = (
150 0 : unzip_dest.to_string() + "/share/extension",
151 0 : Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
152 0 : );
153 0 : let libdir_paths = (
154 0 : unzip_dest.to_string() + "/lib",
155 0 : Path::new(&get_pg_config("--pkglibdir", pgbin)).to_path_buf(),
156 0 : );
157 : // move contents of the libdir / sharedir in unzipped archive to the correct local paths
158 0 : for paths in [sharedir_paths, libdir_paths] {
159 0 : let (zip_dir, real_dir) = paths;
160 0 : info!("mv {zip_dir:?}/* {real_dir:?}");
161 0 : for file in std::fs::read_dir(zip_dir)? {
162 0 : let old_file = file?.path();
163 0 : let new_file =
164 0 : Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
165 0 : info!("moving {old_file:?} to {new_file:?}");
166 :
167 : // extension download failed: Directory not empty (os error 39)
168 0 : match std::fs::rename(old_file, new_file) {
169 0 : Ok(()) => info!("move succeeded"),
170 0 : Err(e) => {
171 0 : warn!("move failed, probably because the extension already exists: {e}")
172 : }
173 : }
174 : }
175 : }
176 0 : info!("done moving extension {ext_name}");
177 0 : Ok(download_size)
178 0 : }
179 :
180 : // Create extension control files from spec
181 0 : pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
182 0 : let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
183 0 : for (ext_name, ext_data) in remote_extensions.extension_data.iter() {
184 : // Check if extension is present in public or custom.
185 : // If not, then it is not allowed to be used by this compute.
186 0 : if let Some(public_extensions) = &remote_extensions.public_extensions {
187 0 : if !public_extensions.contains(ext_name) {
188 0 : if let Some(custom_extensions) = &remote_extensions.custom_extensions {
189 0 : if !custom_extensions.contains(ext_name) {
190 0 : continue; // skip this extension, it is not allowed
191 0 : }
192 0 : }
193 0 : }
194 0 : }
195 :
196 0 : for (control_name, control_content) in &ext_data.control_data {
197 0 : let control_path = local_sharedir.join(control_name);
198 0 : if !control_path.exists() {
199 0 : info!("writing file {:?}{:?}", control_path, control_content);
200 0 : std::fs::write(control_path, control_content).unwrap();
201 : } else {
202 0 : warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_path);
203 : }
204 : }
205 : }
206 0 : }
207 :
208 : // This function initializes the necessary structs to use remote storage
209 0 : pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRemoteStorage> {
210 0 : #[derive(Debug, serde::Deserialize)]
211 : struct RemoteExtJson {
212 : bucket: String,
213 : region: String,
214 : endpoint: Option<String>,
215 : prefix: Option<String>,
216 : }
217 0 : let remote_ext_json = serde_json::from_str::<RemoteExtJson>(remote_ext_config)?;
218 :
219 0 : let config = S3Config {
220 0 : bucket_name: remote_ext_json.bucket,
221 0 : bucket_region: remote_ext_json.region,
222 0 : prefix_in_bucket: remote_ext_json.prefix,
223 0 : endpoint: remote_ext_json.endpoint,
224 0 : concurrency_limit: NonZeroUsize::new(100).expect("100 != 0"),
225 0 : max_keys_per_list_response: None,
226 0 : };
227 0 : let config = RemoteStorageConfig {
228 0 : max_concurrent_syncs: NonZeroUsize::new(100).expect("100 != 0"),
229 0 : max_sync_errors: NonZeroU32::new(100).expect("100 != 0"),
230 0 : storage: RemoteStorageKind::AwsS3(config),
231 0 : };
232 0 : GenericRemoteStorage::from_config(&config)
233 0 : }
|