Line data Source code
1 : // Download extension files from the extension store
2 : // and put them in the right place in the postgres directory (share / lib)
3 : /*
4 : The layout of the S3 bucket is as follows:
5 : 5615610098 // this is an extension build number
6 : ├── v14
7 : │ ├── extensions
8 : │ │ ├── anon.tar.zst
9 : │ │ └── embedding.tar.zst
10 : │ └── ext_index.json
11 : └── v15
12 : ├── extensions
13 : │ ├── anon.tar.zst
14 : │ └── embedding.tar.zst
15 : └── ext_index.json
16 : 5615261079
17 : ├── v14
18 : │ ├── extensions
19 : │ │ └── anon.tar.zst
20 : │ └── ext_index.json
21 : └── v15
22 : ├── extensions
23 : │ └── anon.tar.zst
24 : └── ext_index.json
25 : 5623261088
26 : ├── v14
27 : │ ├── extensions
28 : │ │ └── embedding.tar.zst
29 : │ └── ext_index.json
30 : └── v15
31 : ├── extensions
32 : │ └── embedding.tar.zst
33 : └── ext_index.json
34 :
35 : Note that build number cannot be part of prefix because we might need extensions
36 : from other build numbers.
37 :
38 : ext_index.json stores the control files and location of extension archives
39 : It also stores a list of public extensions and a library_index
40 :
41 : We don't need to duplicate extension.tar.zst files.
42 : We only need to upload a new one if it is updated.
43 : (Although currently we just upload every time anyways, hopefully will change
44 : this sometime)
45 :
46 : *access* is controlled by spec
47 :
48 : More specifically, here is an example ext_index.json
49 : {
50 : "public_extensions": [
51 : "anon",
52 : "pg_buffercache"
53 : ],
54 : "library_index": {
55 : "anon": "anon",
56 : "pg_buffercache": "pg_buffercache"
57 : },
58 : "extension_data": {
59 : "pg_buffercache": {
60 : "control_data": {
61 : "pg_buffercache.control": "# pg_buffercache extension \ncomment = 'examine the shared buffer cache' \ndefault_version = '1.3' \nmodule_pathname = '$libdir/pg_buffercache' \nrelocatable = true \ntrusted=true"
62 : },
63 : "archive_path": "5670669815/v14/extensions/pg_buffercache.tar.zst"
64 : },
65 : "anon": {
66 : "control_data": {
67 : "anon.control": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n"
68 : },
69 : "archive_path": "5670669815/v14/extensions/anon.tar.zst"
70 : }
71 : }
72 : }
73 : */
74 : use std::path::Path;
75 : use std::str;
76 :
77 : use anyhow::{Context, Result, bail};
78 : use bytes::Bytes;
79 : use compute_api::spec::RemoteExtSpec;
80 : use regex::Regex;
81 : use remote_storage::*;
82 : use reqwest::StatusCode;
83 : use tar::Archive;
84 : use tracing::info;
85 : use tracing::log::warn;
86 : use url::Url;
87 : use zstd::stream::read::Decoder;
88 :
89 : use crate::metrics::{REMOTE_EXT_REQUESTS_TOTAL, UNKNOWN_HTTP_STATUS};
90 :
91 0 : fn get_pg_config(argument: &str, pgbin: &str) -> String {
92 0 : // gives the result of `pg_config [argument]`
93 0 : // where argument is a flag like `--version` or `--sharedir`
94 0 : let pgconfig = pgbin
95 0 : .strip_suffix("postgres")
96 0 : .expect("bad pgbin")
97 0 : .to_owned()
98 0 : + "/pg_config";
99 0 : let config_output = std::process::Command::new(pgconfig)
100 0 : .arg(argument)
101 0 : .output()
102 0 : .expect("pg_config error");
103 0 : std::str::from_utf8(&config_output.stdout)
104 0 : .expect("pg_config error")
105 0 : .trim()
106 0 : .to_string()
107 0 : }
108 :
109 0 : pub fn get_pg_version(pgbin: &str) -> PostgresMajorVersion {
110 0 : // pg_config --version returns a (platform specific) human readable string
111 0 : // such as "PostgreSQL 15.4". We parse this to v14/v15/v16 etc.
112 0 : let human_version = get_pg_config("--version", pgbin);
113 0 : parse_pg_version(&human_version)
114 0 : }
115 :
116 0 : pub fn get_pg_version_string(pgbin: &str) -> String {
117 0 : match get_pg_version(pgbin) {
118 0 : PostgresMajorVersion::V14 => "v14",
119 0 : PostgresMajorVersion::V15 => "v15",
120 0 : PostgresMajorVersion::V16 => "v16",
121 0 : PostgresMajorVersion::V17 => "v17",
122 : }
123 0 : .to_owned()
124 0 : }
125 :
126 : #[derive(Copy, Clone, Debug, PartialEq, Eq)]
127 : pub enum PostgresMajorVersion {
128 : V14,
129 : V15,
130 : V16,
131 : V17,
132 : }
133 :
134 12 : fn parse_pg_version(human_version: &str) -> PostgresMajorVersion {
135 : use PostgresMajorVersion::*;
136 : // Normal releases have version strings like "PostgreSQL 15.4". But there
137 : // are also pre-release versions like "PostgreSQL 17devel" or "PostgreSQL
138 : // 16beta2" or "PostgreSQL 17rc1". And with the --with-extra-version
139 : // configure option, you can tack any string to the version number,
140 : // e.g. "PostgreSQL 15.4foobar".
141 12 : match Regex::new(r"^PostgreSQL (?<major>\d+).+")
142 12 : .unwrap()
143 12 : .captures(human_version)
144 : {
145 12 : Some(captures) if captures.len() == 2 => match &captures["major"] {
146 12 : "14" => return V14,
147 9 : "15" => return V15,
148 6 : "16" => return V16,
149 2 : "17" => return V17,
150 2 : _ => {}
151 : },
152 0 : _ => {}
153 : }
154 2 : panic!("Unsuported postgres version {human_version}");
155 10 : }
156 :
157 : // download the archive for a given extension,
158 : // unzip it, and place files in the appropriate locations (share/lib)
159 0 : pub async fn download_extension(
160 0 : ext_name: &str,
161 0 : ext_path: &RemotePath,
162 0 : remote_ext_base_url: &Url,
163 0 : pgbin: &str,
164 0 : ) -> Result<u64> {
165 0 : info!("Download extension {:?} from {:?}", ext_name, ext_path);
166 :
167 : // TODO add retry logic
168 0 : let download_buffer =
169 0 : match download_extension_tar(remote_ext_base_url, &ext_path.to_string()).await {
170 0 : Ok(buffer) => buffer,
171 0 : Err(error_message) => {
172 0 : return Err(anyhow::anyhow!(
173 0 : "error downloading extension {:?}: {:?}",
174 0 : ext_name,
175 0 : error_message
176 0 : ));
177 : }
178 : };
179 :
180 0 : let download_size = download_buffer.len() as u64;
181 0 : info!("Download size {:?}", download_size);
182 : // it's unclear whether it is more performant to decompress into memory or not
183 : // TODO: decompressing into memory can be avoided
184 0 : let decoder = Decoder::new(download_buffer.as_ref())?;
185 0 : let mut archive = Archive::new(decoder);
186 0 :
187 0 : let unzip_dest = pgbin
188 0 : .strip_suffix("/bin/postgres")
189 0 : .expect("bad pgbin")
190 0 : .to_string()
191 0 : + "/download_extensions";
192 0 : archive.unpack(&unzip_dest)?;
193 0 : info!("Download + unzip {:?} completed successfully", &ext_path);
194 :
195 0 : let sharedir_paths = (
196 0 : unzip_dest.to_string() + "/share/extension",
197 0 : Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
198 0 : );
199 0 : let libdir_paths = (
200 0 : unzip_dest.to_string() + "/lib",
201 0 : Path::new(&get_pg_config("--pkglibdir", pgbin)).to_path_buf(),
202 0 : );
203 : // move contents of the libdir / sharedir in unzipped archive to the correct local paths
204 0 : for paths in [sharedir_paths, libdir_paths] {
205 0 : let (zip_dir, real_dir) = paths;
206 :
207 0 : let dir = match std::fs::read_dir(&zip_dir) {
208 0 : Ok(dir) => dir,
209 0 : Err(e) => match e.kind() {
210 : // In the event of a SQL-only extension, there would be nothing
211 : // to move from the lib/ directory, so note that in the log and
212 : // move on.
213 : std::io::ErrorKind::NotFound => {
214 0 : info!("nothing to move from {}", zip_dir);
215 0 : continue;
216 : }
217 0 : _ => return Err(anyhow::anyhow!(e)),
218 : },
219 : };
220 :
221 0 : info!("mv {zip_dir:?}/* {real_dir:?}");
222 :
223 0 : for file in dir {
224 0 : let old_file = file?.path();
225 0 : let new_file =
226 0 : Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
227 0 : info!("moving {old_file:?} to {new_file:?}");
228 :
229 : // extension download failed: Directory not empty (os error 39)
230 0 : match std::fs::rename(old_file, new_file) {
231 0 : Ok(()) => info!("move succeeded"),
232 0 : Err(e) => {
233 0 : warn!("move failed, probably because the extension already exists: {e}")
234 : }
235 : }
236 : }
237 : }
238 0 : info!("done moving extension {ext_name}");
239 0 : Ok(download_size)
240 0 : }
241 :
242 : // Create extension control files from spec
243 0 : pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
244 0 : let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
245 0 : for (ext_name, ext_data) in remote_extensions.extension_data.iter() {
246 : // Check if extension is present in public or custom.
247 : // If not, then it is not allowed to be used by this compute.
248 0 : if let Some(public_extensions) = &remote_extensions.public_extensions {
249 0 : if !public_extensions.contains(ext_name) {
250 0 : if let Some(custom_extensions) = &remote_extensions.custom_extensions {
251 0 : if !custom_extensions.contains(ext_name) {
252 0 : continue; // skip this extension, it is not allowed
253 0 : }
254 0 : }
255 0 : }
256 0 : }
257 :
258 0 : for (control_name, control_content) in &ext_data.control_data {
259 0 : let control_path = local_sharedir.join(control_name);
260 0 : if !control_path.exists() {
261 0 : info!("writing file {:?}{:?}", control_path, control_content);
262 0 : std::fs::write(control_path, control_content).unwrap();
263 : } else {
264 0 : warn!(
265 0 : "control file {:?} exists both locally and remotely. ignoring the remote version.",
266 : control_path
267 : );
268 : }
269 : }
270 : }
271 0 : }
272 :
273 : // Do request to extension storage proxy, e.g.,
274 : // curl http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local/latest/v15/extensions/anon.tar.zst
275 : // using HTTP GET and return the response body as bytes.
276 0 : async fn download_extension_tar(remote_ext_base_url: &Url, ext_path: &str) -> Result<Bytes> {
277 0 : let uri = remote_ext_base_url.join(ext_path).with_context(|| {
278 0 : format!(
279 0 : "failed to create the remote extension URI for {ext_path} using {remote_ext_base_url}"
280 0 : )
281 0 : })?;
282 0 : let filename = Path::new(ext_path)
283 0 : .file_name()
284 0 : .unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
285 0 : .to_str()
286 0 : .unwrap_or("unknown")
287 0 : .to_string();
288 0 :
289 0 : info!("Downloading extension file '{}' from uri {}", filename, uri);
290 :
291 0 : match do_extension_server_request(uri).await {
292 0 : Ok(resp) => {
293 0 : info!("Successfully downloaded remote extension data {}", ext_path);
294 0 : REMOTE_EXT_REQUESTS_TOTAL
295 0 : .with_label_values(&[&StatusCode::OK.to_string(), &filename])
296 0 : .inc();
297 0 : Ok(resp)
298 : }
299 0 : Err((msg, status)) => {
300 0 : REMOTE_EXT_REQUESTS_TOTAL
301 0 : .with_label_values(&[&status, &filename])
302 0 : .inc();
303 0 : bail!(msg);
304 : }
305 : }
306 0 : }
307 :
308 : // Do a single remote extensions server request.
309 : // Return result or (error message + stringified status code) in case of any failures.
310 0 : async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)> {
311 0 : let resp = reqwest::get(uri).await.map_err(|e| {
312 0 : (
313 0 : format!(
314 0 : "could not perform remote extensions server request: {:?}",
315 0 : e
316 0 : ),
317 0 : UNKNOWN_HTTP_STATUS.to_string(),
318 0 : )
319 0 : })?;
320 0 : let status = resp.status();
321 0 :
322 0 : match status {
323 0 : StatusCode::OK => match resp.bytes().await {
324 0 : Ok(resp) => Ok(resp),
325 0 : Err(e) => Err((
326 0 : format!("could not read remote extensions server response: {:?}", e),
327 0 : // It's fine to return and report error with status as 200 OK,
328 0 : // because we still failed to read the response.
329 0 : status.to_string(),
330 0 : )),
331 : },
332 0 : StatusCode::SERVICE_UNAVAILABLE => Err((
333 0 : "remote extensions server is temporarily unavailable".to_string(),
334 0 : status.to_string(),
335 0 : )),
336 0 : _ => Err((
337 0 : format!(
338 0 : "unexpected remote extensions server response status code: {}",
339 0 : status
340 0 : ),
341 0 : status.to_string(),
342 0 : )),
343 : }
344 0 : }
345 :
346 : #[cfg(test)]
347 : mod tests {
348 : use super::parse_pg_version;
349 :
350 : #[test]
351 1 : fn test_parse_pg_version() {
352 : use super::PostgresMajorVersion::*;
353 1 : assert_eq!(parse_pg_version("PostgreSQL 15.4"), V15);
354 1 : assert_eq!(parse_pg_version("PostgreSQL 15.14"), V15);
355 1 : assert_eq!(
356 1 : parse_pg_version("PostgreSQL 15.4 (Ubuntu 15.4-0ubuntu0.23.04.1)"),
357 1 : V15
358 1 : );
359 :
360 1 : assert_eq!(parse_pg_version("PostgreSQL 14.15"), V14);
361 1 : assert_eq!(parse_pg_version("PostgreSQL 14.0"), V14);
362 1 : assert_eq!(
363 1 : parse_pg_version("PostgreSQL 14.9 (Debian 14.9-1.pgdg120+1"),
364 1 : V14
365 1 : );
366 :
367 1 : assert_eq!(parse_pg_version("PostgreSQL 16devel"), V16);
368 1 : assert_eq!(parse_pg_version("PostgreSQL 16beta1"), V16);
369 1 : assert_eq!(parse_pg_version("PostgreSQL 16rc2"), V16);
370 1 : assert_eq!(parse_pg_version("PostgreSQL 16extra"), V16);
371 1 : }
372 :
373 : #[test]
374 : #[should_panic]
375 1 : fn test_parse_pg_unsupported_version() {
376 1 : parse_pg_version("PostgreSQL 13.14");
377 1 : }
378 :
379 : #[test]
380 : #[should_panic]
381 1 : fn test_parse_pg_incorrect_version_format() {
382 1 : parse_pg_version("PostgreSQL 14");
383 1 : }
384 : }
|