Line data Source code
1 : use std::{ops::Bound, sync::Arc};
2 :
3 : use anyhow::Context;
4 : use bytes::Bytes;
5 : use postgres_ffi::ControlFileData;
6 : use remote_storage::{
7 : Download, DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, Listing,
8 : ListingObject, RemotePath,
9 : };
10 : use serde::de::DeserializeOwned;
11 : use tokio_util::sync::CancellationToken;
12 : use tracing::{debug, info, instrument};
13 : use utils::lsn::Lsn;
14 :
15 : use crate::{assert_u64_eq_usize::U64IsUsize, config::PageServerConf};
16 :
17 : use super::{importbucket_format, index_part_format};
18 :
19 0 : pub async fn new(
20 0 : conf: &'static PageServerConf,
21 0 : location: &index_part_format::Location,
22 0 : cancel: CancellationToken,
23 0 : ) -> Result<RemoteStorageWrapper, anyhow::Error> {
24 0 : // FIXME: we probably want some timeout, and we might be able to assume the max file
25 0 : // size on S3 is 1GiB (postgres segment size). But the problem is that the individual
26 0 : // downloaders don't know enough about concurrent downloads to make a guess on the
27 0 : // expected bandwidth and resulting best timeout.
28 0 : let timeout = std::time::Duration::from_secs(24 * 60 * 60);
29 0 : let location_storage = match location {
30 : #[cfg(feature = "testing")]
31 0 : index_part_format::Location::LocalFs { path } => {
32 0 : GenericRemoteStorage::LocalFs(remote_storage::LocalFs::new(path.clone(), timeout)?)
33 : }
34 : index_part_format::Location::AwsS3 {
35 0 : region,
36 0 : bucket,
37 0 : key,
38 0 : } => {
39 0 : // TODO: think about security implications of letting the client specify the bucket & prefix.
40 0 : // It's the most flexible right now, but, possibly we want to move bucket name into PS conf
41 0 : // and force the timeline_id into the prefix?
42 0 : GenericRemoteStorage::AwsS3(Arc::new(
43 0 : remote_storage::S3Bucket::new(
44 0 : &remote_storage::S3Config {
45 0 : bucket_name: bucket.clone(),
46 0 : prefix_in_bucket: Some(key.clone()),
47 0 : bucket_region: region.clone(),
48 0 : endpoint: conf
49 0 : .import_pgdata_aws_endpoint_url
50 0 : .clone()
51 0 : .map(|url| url.to_string()), // by specifying None here, remote_storage/aws-sdk-rust will infer from env
52 0 : concurrency_limit: 100.try_into().unwrap(), // TODO: think about this
53 0 : max_keys_per_list_response: Some(1000), // TODO: think about this
54 0 : upload_storage_class: None, // irrelevant
55 0 : },
56 0 : timeout,
57 0 : )
58 0 : .await
59 0 : .context("setup s3 bucket")?,
60 : ))
61 : }
62 : };
63 0 : let storage_wrapper = RemoteStorageWrapper::new(location_storage, cancel);
64 0 : Ok(storage_wrapper)
65 0 : }
66 :
67 : /// Wrap [`remote_storage`] APIs to make it look a bit more like a filesystem API
68 : /// such as [`tokio::fs`], which was used in the original implementation of the import code.
69 : #[derive(Clone)]
70 : pub struct RemoteStorageWrapper {
71 : storage: GenericRemoteStorage,
72 : cancel: CancellationToken,
73 : }
74 :
75 : impl RemoteStorageWrapper {
76 0 : pub fn new(storage: GenericRemoteStorage, cancel: CancellationToken) -> Self {
77 0 : Self { storage, cancel }
78 0 : }
79 :
80 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
81 : pub async fn listfilesindir(
82 : &self,
83 : path: &RemotePath,
84 : ) -> Result<Vec<(RemotePath, usize)>, DownloadError> {
85 : assert!(
86 : path.object_name().is_some(),
87 : "must specify dirname, without trailing slash"
88 : );
89 : let path = path.add_trailing_slash();
90 :
91 : let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
92 0 : || async {
93 0 : let Listing { keys, prefixes: _ } = self
94 0 : .storage
95 0 : .list(
96 0 : Some(&path),
97 0 : remote_storage::ListingMode::WithDelimiter,
98 0 : None,
99 0 : &self.cancel,
100 0 : )
101 0 : .await?;
102 0 : let res = keys
103 0 : .into_iter()
104 0 : .map(|ListingObject { key, size, .. }| (key, size.into_usize()))
105 0 : .collect();
106 0 : Ok(res)
107 0 : },
108 : &format!("listfilesindir {path:?}"),
109 : &self.cancel,
110 : )
111 : .await;
112 : debug!(?res, "returning");
113 : res
114 : }
115 :
116 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
117 : pub async fn listdir(&self, path: &RemotePath) -> Result<Vec<RemotePath>, DownloadError> {
118 : assert!(
119 : path.object_name().is_some(),
120 : "must specify dirname, without trailing slash"
121 : );
122 : let path = path.add_trailing_slash();
123 :
124 : let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
125 0 : || async {
126 0 : let Listing { keys, prefixes } = self
127 0 : .storage
128 0 : .list(
129 0 : Some(&path),
130 0 : remote_storage::ListingMode::WithDelimiter,
131 0 : None,
132 0 : &self.cancel,
133 0 : )
134 0 : .await?;
135 0 : let res = keys
136 0 : .into_iter()
137 0 : .map(|ListingObject { key, .. }| key)
138 0 : .chain(prefixes.into_iter())
139 0 : .collect();
140 0 : Ok(res)
141 0 : },
142 : &format!("listdir {path:?}"),
143 : &self.cancel,
144 : )
145 : .await;
146 : debug!(?res, "returning");
147 : res
148 : }
149 :
150 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
151 : pub async fn get(&self, path: &RemotePath) -> Result<Bytes, DownloadError> {
152 : let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
153 0 : || async {
154 : let Download {
155 0 : download_stream, ..
156 0 : } = self
157 0 : .storage
158 0 : .download(path, &DownloadOpts::default(), &self.cancel)
159 0 : .await?;
160 0 : let mut reader = tokio_util::io::StreamReader::new(download_stream);
161 0 :
162 0 : // XXX optimize this, can we get the capacity hint from somewhere?
163 0 : let mut buf = Vec::new();
164 0 : tokio::io::copy_buf(&mut reader, &mut buf).await?;
165 0 : Ok(Bytes::from(buf))
166 0 : },
167 : &format!("download {path:?}"),
168 : &self.cancel,
169 : )
170 : .await;
171 0 : debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done");
172 : res
173 : }
174 :
175 0 : pub async fn get_spec(&self) -> Result<Option<importbucket_format::Spec>, anyhow::Error> {
176 0 : self.get_json(&RemotePath::from_string("spec.json").unwrap())
177 0 : .await
178 0 : .context("get spec")
179 0 : }
180 :
181 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
182 : pub async fn get_json<T: DeserializeOwned>(
183 : &self,
184 : path: &RemotePath,
185 : ) -> Result<Option<T>, DownloadError> {
186 : let buf = match self.get(path).await {
187 : Ok(buf) => buf,
188 : Err(DownloadError::NotFound) => return Ok(None),
189 : Err(err) => return Err(err),
190 : };
191 : let res = serde_json::from_slice(&buf)
192 : .context("serialize")
193 : // TODO: own error type
194 : .map_err(DownloadError::Other)?;
195 : Ok(Some(res))
196 : }
197 :
198 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
199 : pub async fn put_json<T>(&self, path: &RemotePath, value: &T) -> anyhow::Result<()>
200 : where
201 : T: serde::Serialize,
202 : {
203 : let buf = serde_json::to_vec(value)?;
204 : let bytes = Bytes::from(buf);
205 : utils::backoff::retry(
206 0 : || async {
207 0 : let size = bytes.len();
208 0 : let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone())));
209 0 : self.storage
210 0 : .upload_storage_object(bytes, size, path, &self.cancel)
211 0 : .await
212 0 : },
213 : remote_storage::TimeoutOrCancel::caused_by_cancel,
214 : 1,
215 : u32::MAX,
216 : &format!("put json {path}"),
217 : &self.cancel,
218 : )
219 : .await
220 : .expect("practically infinite retries")
221 : }
222 :
223 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
224 : pub async fn get_range(
225 : &self,
226 : path: &RemotePath,
227 : start_inclusive: u64,
228 : end_exclusive: u64,
229 : ) -> Result<Vec<u8>, DownloadError> {
230 : let len = end_exclusive
231 : .checked_sub(start_inclusive)
232 : .unwrap()
233 : .into_usize();
234 : let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
235 0 : || async {
236 : let Download {
237 0 : download_stream, ..
238 0 : } = self
239 0 : .storage
240 0 : .download(
241 0 : path,
242 0 : &DownloadOpts {
243 0 : kind: DownloadKind::Large,
244 0 : etag: None,
245 0 : byte_start: Bound::Included(start_inclusive),
246 0 : byte_end: Bound::Excluded(end_exclusive)
247 0 : },
248 0 : &self.cancel)
249 0 : .await?;
250 0 : let mut reader = tokio_util::io::StreamReader::new(download_stream);
251 0 :
252 0 : let mut buf = Vec::with_capacity(len);
253 0 : tokio::io::copy_buf(&mut reader, &mut buf).await?;
254 0 : Ok(buf)
255 0 : },
256 : &format!("download range len=0x{len:x} [0x{start_inclusive:x},0x{end_exclusive:x}) from {path:?}"),
257 : &self.cancel,
258 : )
259 : .await;
260 0 : debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done");
261 : res
262 : }
263 :
264 0 : pub fn pgdata(&self) -> RemotePath {
265 0 : RemotePath::from_string("pgdata").unwrap()
266 0 : }
267 :
268 0 : pub async fn get_control_file(&self) -> Result<ControlFile, anyhow::Error> {
269 0 : let control_file_path = self.pgdata().join("global/pg_control");
270 0 : info!("get control file from {control_file_path}");
271 0 : let control_file_buf = self.get(&control_file_path).await?;
272 0 : ControlFile::new(control_file_buf)
273 0 : }
274 : }
275 :
276 : pub struct ControlFile {
277 : control_file_data: ControlFileData,
278 : control_file_buf: Bytes,
279 : }
280 :
281 : impl ControlFile {
282 0 : pub(crate) fn new(control_file_buf: Bytes) -> Result<Self, anyhow::Error> {
283 : // XXX ControlFileData is version-specific, we're always using v14 here. v17 had changes.
284 0 : let control_file_data = ControlFileData::decode(&control_file_buf)?;
285 0 : let control_file = ControlFile {
286 0 : control_file_data,
287 0 : control_file_buf,
288 0 : };
289 0 : control_file.try_pg_version()?; // so that we can offer infallible pg_version()
290 0 : Ok(control_file)
291 0 : }
292 0 : pub(crate) fn base_lsn(&self) -> Lsn {
293 0 : Lsn(self.control_file_data.checkPoint).align()
294 0 : }
295 0 : pub(crate) fn pg_version(&self) -> u32 {
296 0 : self.try_pg_version()
297 0 : .expect("prepare() checks that try_pg_version doesn't error")
298 0 : }
299 0 : pub(crate) fn control_file_data(&self) -> &ControlFileData {
300 0 : &self.control_file_data
301 0 : }
302 0 : pub(crate) fn control_file_buf(&self) -> &Bytes {
303 0 : &self.control_file_buf
304 0 : }
305 0 : fn try_pg_version(&self) -> anyhow::Result<u32> {
306 0 : Ok(match self.control_file_data.catalog_version_no {
307 : // thesea are from catversion.h
308 0 : 202107181 => 14,
309 0 : 202209061 => 15,
310 0 : 202307071 => 16,
311 0 : 202406281 => 17,
312 0 : catversion => {
313 0 : anyhow::bail!("unrecognized catalog version {catversion}")
314 : }
315 : })
316 0 : }
317 : }
|