Line data Source code
1 : // Download extension files from the extension store
2 : // and put them in the right place in the postgres directory (share / lib)
3 : /*
4 : The layout of the S3 bucket is as follows:
5 : 5615610098 // this is an extension build number
6 : ├── v14
7 : │ ├── extensions
8 : │ │ ├── anon.tar.zst
9 : │ │ └── embedding.tar.zst
10 : │ └── ext_index.json
11 : └── v15
12 : ├── extensions
13 : │ ├── anon.tar.zst
14 : │ └── embedding.tar.zst
15 : └── ext_index.json
16 : 5615261079
17 : ├── v14
18 : │ ├── extensions
19 : │ │ └── anon.tar.zst
20 : │ └── ext_index.json
21 : └── v15
22 : ├── extensions
23 : │ └── anon.tar.zst
24 : └── ext_index.json
25 : 5623261088
26 : ├── v14
27 : │ ├── extensions
28 : │ │ └── embedding.tar.zst
29 : │ └── ext_index.json
30 : └── v15
31 : ├── extensions
32 : │ └── embedding.tar.zst
33 : └── ext_index.json
34 :
35 : Note that build number cannot be part of prefix because we might need extensions
36 : from other build numbers.
37 :
38 : ext_index.json stores the control files and location of extension archives
39 : It also stores a list of public extensions and a library_index
40 :
41 : We don't need to duplicate extension.tar.zst files.
42 : We only need to upload a new one if it is updated.
43 : (Although currently we just upload every time anyways, hopefully will change
44 : this sometime)
45 :
46 : *access* is controlled by spec
47 :
48 : More specifically, here is an example ext_index.json
49 : {
50 : "public_extensions": [
51 : "anon",
52 : "pg_buffercache"
53 : ],
54 : "library_index": {
55 : "anon": "anon",
56 : "pg_buffercache": "pg_buffercache"
57 : },
58 : "extension_data": {
59 : "pg_buffercache": {
60 : "control_data": {
61 : "pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
62 : },
63 : "archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
64 : },
65 : "anon": {
66 : "control_data": {
67 : "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
68 : },
69 : "archive_path": "5670669815/v14/extensions/anon.tar.zst"
70 : }
71 : }
72 : }
73 : */
74 : use anyhow::{self, Result};
75 : use anyhow::{bail, Context};
76 : use bytes::Bytes;
77 : use compute_api::spec::RemoteExtSpec;
78 : use regex::Regex;
79 : use remote_storage::*;
80 : use reqwest::StatusCode;
81 : use std::path::Path;
82 : use std::str;
83 : use tar::Archive;
84 : use tracing::info;
85 : use tracing::log::warn;
86 : use zstd::stream::read::Decoder;
87 :
88 578 : fn get_pg_config(argument: &str, pgbin: &str) -> String {
89 578 : // gives the result of `pg_config [argument]`
90 578 : // where argument is a flag like `--version` or `--sharedir`
91 578 : let pgconfig = pgbin
92 578 : .strip_suffix("postgres")
93 578 : .expect("bad pgbin")
94 578 : .to_owned()
95 578 : + "/pg_config";
96 578 : let config_output = std::process::Command::new(pgconfig)
97 578 : .arg(argument)
98 578 : .output()
99 578 : .expect("pg_config error");
100 578 : std::str::from_utf8(&config_output.stdout)
101 578 : .expect("pg_config error")
102 578 : .trim()
103 578 : .to_string()
104 578 : }
105 :
106 575 : pub fn get_pg_version(pgbin: &str) -> String {
107 575 : // pg_config --version returns a (platform specific) human readable string
108 575 : // such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
109 575 : let human_version = get_pg_config("--version", pgbin);
110 575 : return parse_pg_version(&human_version).to_string();
111 575 : }
112 :
113 599 : fn parse_pg_version(human_version: &str) -> &str {
114 599 : // Normal releases have version strings like "PostgreSQL 15.4". But there
115 599 : // are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
116 599 : // 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
117 599 : // configure option, you can tack any string to the version number,
118 599 : // e.g. "PostgreSQL 15.4foobar".
119 599 : match Regex::new(r"^PostgreSQL (?<major>\d+).+")
120 599 : .unwrap()
121 599 : .captures(human_version)
122 : {
123 599 : Some(captures) if captures.len() == 2 => match &captures["major"] {
124 599 : "14" => return "v14",
125 18 : "15" => return "v15",
126 12 : "16" => return "v16",
127 4 : _ => {}
128 : },
129 0 : _ => {}
130 : }
131 4 : panic!("Unsuported postgres version {human_version}");
132 595 : }
133 :
134 : // download the archive for a given extension,
135 : // unzip it, and place files in the appropriate locations (share/lib)
136 1 : pub async fn download_extension(
137 1 : ext_name: &str,
138 1 : ext_path: &RemotePath,
139 1 : ext_remote_storage: &str,
140 1 : pgbin: &str,
141 1 : ) -> Result<u64> {
142 1 : info!("Download extension {:?} from {:?}", ext_name, ext_path);
143 :
144 : // TODO add retry logic
145 1 : let download_buffer =
146 10 : match download_extension_tar(ext_remote_storage, &ext_path.to_string()).await {
147 1 : Ok(buffer) => buffer,
148 0 : Err(error_message) => {
149 0 : return Err(anyhow::anyhow!(
150 0 : "error downloading extension {:?}: {:?}",
151 0 : ext_name,
152 0 : error_message
153 0 : ));
154 : }
155 : };
156 :
157 1 : let download_size = download_buffer.len() as u64;
158 1 : info!("Download size {:?}", download_size);
159 : // it's unclear whether it is more performant to decompress into memory or not
160 : // TODO: decompressing into memory can be avoided
161 1 : let decoder = Decoder::new(download_buffer.as_ref())?;
162 1 : let mut archive = Archive::new(decoder);
163 1 :
164 1 : let unzip_dest = pgbin
165 1 : .strip_suffix("/bin/postgres")
166 1 : .expect("bad pgbin")
167 1 : .to_string()
168 1 : + "/download_extensions";
169 1 : archive.unpack(&unzip_dest)?;
170 1 : info!("Download + unzip {:?} completed successfully", &ext_path);
171 :
172 1 : let sharedir_paths = (
173 1 : unzip_dest.to_string() + "/share/extension",
174 1 : Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
175 1 : );
176 1 : let libdir_paths = (
177 1 : unzip_dest.to_string() + "/lib",
178 1 : Path::new(&get_pg_config("--pkglibdir", pgbin)).to_path_buf(),
179 1 : );
180 : // move contents of the libdir / sharedir in unzipped archive to the correct local paths
181 2 : for paths in [sharedir_paths, libdir_paths] {
182 2 : let (zip_dir, real_dir) = paths;
183 2 : info!("mv {zip_dir:?}/* {real_dir:?}");
184 3 : for file in std::fs::read_dir(zip_dir)? {
185 3 : let old_file = file?.path();
186 3 : let new_file =
187 3 : Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
188 3 : info!("moving {old_file:?} to {new_file:?}");
189 :
190 : // extension download failed: Directory not empty (os error 39)
191 3 : match std::fs::rename(old_file, new_file) {
192 3 : Ok(()) => info!("move succeeded"),
193 0 : Err(e) => {
194 0 : warn!("move failed, probably because the extension already exists: {e}")
195 : }
196 : }
197 : }
198 : }
199 1 : info!("done moving extension {ext_name}");
200 1 : Ok(download_size)
201 1 : }
202 :
203 : // Create extension control files from spec
204 1 : pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
205 1 : let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
206 1 : for (ext_name, ext_data) in remote_extensions.extension_data.iter() {
207 : // Check if extension is present in public or custom.
208 : // If not, then it is not allowed to be used by this compute.
209 1 : if let Some(public_extensions) = &remote_extensions.public_extensions {
210 0 : if !public_extensions.contains(ext_name) {
211 0 : if let Some(custom_extensions) = &remote_extensions.custom_extensions {
212 0 : if !custom_extensions.contains(ext_name) {
213 0 : continue; // skip this extension, it is not allowed
214 0 : }
215 0 : }
216 0 : }
217 1 : }
218 :
219 2 : for (control_name, control_content) in &ext_data.control_data {
220 1 : let control_path = local_sharedir.join(control_name);
221 1 : if !control_path.exists() {
222 1 : info!("writing file {:?}{:?}", control_path, control_content);
223 1 : std::fs::write(control_path, control_content).unwrap();
224 : } else {
225 0 : warn!("control file {:?} exists both locally and remotely. ignoring the remote version.", control_path);
226 : }
227 : }
228 : }
229 1 : }
230 :
231 : // Do request to extension storage proxy, i.e.
232 : // curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
233 : // using HHTP GET
234 : // and return the response body as bytes
235 : //
236 1 : async fn download_extension_tar(ext_remote_storage: &str, ext_path: &str) -> Result<Bytes> {
237 1 : let uri = format!("{}/{}", ext_remote_storage, ext_path);
238 :
239 1 : info!("Download extension {:?} from uri {:?}", ext_path, uri);
240 :
241 4 : let resp = reqwest::get(uri).await?;
242 :
243 1 : match resp.status() {
244 6 : StatusCode::OK => match resp.bytes().await {
245 1 : Ok(resp) => {
246 1 : info!("Download extension {:?} completed successfully", ext_path);
247 1 : Ok(resp)
248 : }
249 0 : Err(e) => bail!("could not deserialize remote extension response: {}", e),
250 : },
251 0 : StatusCode::SERVICE_UNAVAILABLE => bail!("remote extension is temporarily unavailable"),
252 0 : _ => bail!(
253 0 : "unexpected remote extension response status code: {}",
254 0 : resp.status()
255 0 : ),
256 : }
257 1 : }
258 :
259 : #[cfg(test)]
260 : mod tests {
261 : use super::parse_pg_version;
262 :
263 2 : #[test]
264 2 : fn test_parse_pg_version() {
265 2 : assert_eq!(parse_pg_version("PostgreSQL 15.4"), "v15");
266 2 : assert_eq!(parse_pg_version("PostgreSQL 15.14"), "v15");
267 2 : assert_eq!(
268 2 : parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
269 2 : "v15"
270 2 : );
271 :
272 2 : assert_eq!(parse_pg_version("PostgreSQL 14.15"), "v14");
273 2 : assert_eq!(parse_pg_version("PostgreSQL 14.0"), "v14");
274 2 : assert_eq!(
275 2 : parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
276 2 : "v14"
277 2 : );
278 :
279 2 : assert_eq!(parse_pg_version("PostgreSQL 16devel"), "v16");
280 2 : assert_eq!(parse_pg_version("PostgreSQL 16beta1"), "v16");
281 2 : assert_eq!(parse_pg_version("PostgreSQL 16rc2"), "v16");
282 2 : assert_eq!(parse_pg_version("PostgreSQL 16extra"), "v16");
283 2 : }
284 :
285 2 : #[test]
286 : #[should_panic]
287 2 : fn test_parse_pg_unsupported_version() {
288 2 : parse_pg_version("PostgreSQL 13.14");
289 2 : }
290 :
291 2 : #[test]
292 : #[should_panic]
293 2 : fn test_parse_pg_incorrect_version_format() {
294 2 : parse_pg_version("PostgreSQL 14");
295 2 : }
296 : }
|