Line data Source code
1 : use std::ops::Bound;
2 : use std::sync::Arc;
3 :
4 : use anyhow::Context;
5 : use bytes::Bytes;
6 : use postgres_ffi::ControlFileData;
7 : use remote_storage::{
8 : Download, DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, Listing,
9 : ListingObject, RemotePath,
10 : };
11 : use serde::de::DeserializeOwned;
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{debug, info, instrument};
14 : use utils::lsn::Lsn;
15 :
16 : use super::index_part_format;
17 : use crate::assert_u64_eq_usize::U64IsUsize;
18 : use crate::config::PageServerConf;
19 :
20 0 : pub async fn new(
21 0 : conf: &'static PageServerConf,
22 0 : location: &index_part_format::Location,
23 0 : cancel: CancellationToken,
24 0 : ) -> Result<RemoteStorageWrapper, anyhow::Error> {
25 0 : // FIXME: we probably want some timeout, and we might be able to assume the max file
26 0 : // size on S3 is 1GiB (postgres segment size). But the problem is that the individual
27 0 : // downloaders don't know enough about concurrent downloads to make a guess on the
28 0 : // expected bandwidth and resulting best timeout.
29 0 : let timeout = std::time::Duration::from_secs(24 * 60 * 60);
30 0 : let location_storage = match location {
31 : #[cfg(feature = "testing")]
32 0 : index_part_format::Location::LocalFs { path } => {
33 0 : GenericRemoteStorage::LocalFs(remote_storage::LocalFs::new(path.clone(), timeout)?)
34 : }
35 : index_part_format::Location::AwsS3 {
36 0 : region,
37 0 : bucket,
38 0 : key,
39 0 : } => {
40 0 : // TODO: think about security implications of letting the client specify the bucket & prefix.
41 0 : // It's the most flexible right now, but, possibly we want to move bucket name into PS conf
42 0 : // and force the timeline_id into the prefix?
43 0 : GenericRemoteStorage::AwsS3(Arc::new(
44 0 : remote_storage::S3Bucket::new(
45 0 : &remote_storage::S3Config {
46 0 : bucket_name: bucket.clone(),
47 0 : prefix_in_bucket: Some(key.clone()),
48 0 : bucket_region: region.clone(),
49 0 : endpoint: conf
50 0 : .import_pgdata_aws_endpoint_url
51 0 : .clone()
52 0 : .map(|url| url.to_string()), // by specifying None here, remote_storage/aws-sdk-rust will infer from env
53 0 : concurrency_limit: 100.try_into().unwrap(), // TODO: think about this
54 0 : max_keys_per_list_response: Some(1000), // TODO: think about this
55 0 : upload_storage_class: None, // irrelevant
56 0 : },
57 0 : timeout,
58 0 : )
59 0 : .await
60 0 : .context("setup s3 bucket")?,
61 : ))
62 : }
63 : };
64 0 : let storage_wrapper = RemoteStorageWrapper::new(location_storage, cancel);
65 0 : Ok(storage_wrapper)
66 0 : }
67 :
68 : /// Wrap [`remote_storage`] APIs to make it look a bit more like a filesystem API
69 : /// such as [`tokio::fs`], which was used in the original implementation of the import code.
70 : #[derive(Clone)]
71 : pub struct RemoteStorageWrapper {
72 : storage: GenericRemoteStorage,
73 : cancel: CancellationToken,
74 : }
75 :
76 : impl RemoteStorageWrapper {
77 0 : pub fn new(storage: GenericRemoteStorage, cancel: CancellationToken) -> Self {
78 0 : Self { storage, cancel }
79 0 : }
80 :
81 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
82 : pub async fn listfilesindir(
83 : &self,
84 : path: &RemotePath,
85 : ) -> Result<Vec<(RemotePath, usize)>, DownloadError> {
86 : assert!(
87 : path.object_name().is_some(),
88 : "must specify dirname, without trailing slash"
89 : );
90 : let path = path.add_trailing_slash();
91 :
92 : let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
93 0 : || async {
94 0 : let Listing { keys, prefixes: _ } = self
95 0 : .storage
96 0 : .list(
97 0 : Some(&path),
98 0 : remote_storage::ListingMode::WithDelimiter,
99 0 : None,
100 0 : &self.cancel,
101 0 : )
102 0 : .await?;
103 0 : let res = keys
104 0 : .into_iter()
105 0 : .map(|ListingObject { key, size, .. }| (key, size.into_usize()))
106 0 : .collect();
107 0 : Ok(res)
108 0 : },
109 : &format!("listfilesindir {path:?}"),
110 : &self.cancel,
111 : )
112 : .await;
113 : debug!(?res, "returning");
114 : res
115 : }
116 :
117 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
118 : pub async fn listdir(&self, path: &RemotePath) -> Result<Vec<RemotePath>, DownloadError> {
119 : assert!(
120 : path.object_name().is_some(),
121 : "must specify dirname, without trailing slash"
122 : );
123 : let path = path.add_trailing_slash();
124 :
125 : let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
126 0 : || async {
127 0 : let Listing { keys, prefixes } = self
128 0 : .storage
129 0 : .list(
130 0 : Some(&path),
131 0 : remote_storage::ListingMode::WithDelimiter,
132 0 : None,
133 0 : &self.cancel,
134 0 : )
135 0 : .await?;
136 0 : let res = keys
137 0 : .into_iter()
138 0 : .map(|ListingObject { key, .. }| key)
139 0 : .chain(prefixes.into_iter())
140 0 : .collect();
141 0 : Ok(res)
142 0 : },
143 : &format!("listdir {path:?}"),
144 : &self.cancel,
145 : )
146 : .await;
147 : debug!(?res, "returning");
148 : res
149 : }
150 :
151 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
152 : pub async fn get(&self, path: &RemotePath) -> Result<Bytes, DownloadError> {
153 : let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
154 0 : || async {
155 : let Download {
156 0 : download_stream, ..
157 0 : } = self
158 0 : .storage
159 0 : .download(path, &DownloadOpts::default(), &self.cancel)
160 0 : .await?;
161 0 : let mut reader = tokio_util::io::StreamReader::new(download_stream);
162 0 :
163 0 : // XXX optimize this, can we get the capacity hint from somewhere?
164 0 : let mut buf = Vec::new();
165 0 : tokio::io::copy_buf(&mut reader, &mut buf).await?;
166 0 : Ok(Bytes::from(buf))
167 0 : },
168 : &format!("download {path:?}"),
169 : &self.cancel,
170 : )
171 : .await;
172 0 : debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done");
173 : res
174 : }
175 :
176 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
177 : pub async fn get_json<T: DeserializeOwned>(
178 : &self,
179 : path: &RemotePath,
180 : ) -> Result<Option<T>, DownloadError> {
181 : let buf = match self.get(path).await {
182 : Ok(buf) => buf,
183 : Err(DownloadError::NotFound) => return Ok(None),
184 : Err(err) => return Err(err),
185 : };
186 : let res = serde_json::from_slice(&buf)
187 : .context("serialize")
188 : // TODO: own error type
189 : .map_err(DownloadError::Other)?;
190 : Ok(Some(res))
191 : }
192 :
193 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
194 : pub async fn get_range(
195 : &self,
196 : path: &RemotePath,
197 : start_inclusive: u64,
198 : end_exclusive: u64,
199 : ) -> Result<Vec<u8>, DownloadError> {
200 : let len = end_exclusive
201 : .checked_sub(start_inclusive)
202 : .unwrap()
203 : .into_usize();
204 : let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
205 0 : || async {
206 : let Download {
207 0 : download_stream, ..
208 0 : } = self
209 0 : .storage
210 0 : .download(
211 0 : path,
212 0 : &DownloadOpts {
213 0 : kind: DownloadKind::Large,
214 0 : etag: None,
215 0 : byte_start: Bound::Included(start_inclusive),
216 0 : byte_end: Bound::Excluded(end_exclusive),
217 0 : version_id: None,
218 0 : },
219 0 : &self.cancel)
220 0 : .await?;
221 0 : let mut reader = tokio_util::io::StreamReader::new(download_stream);
222 0 :
223 0 : let mut buf = Vec::with_capacity(len);
224 0 : tokio::io::copy_buf(&mut reader, &mut buf).await?;
225 0 : Ok(buf)
226 0 : },
227 : &format!("download range len=0x{len:x} [0x{start_inclusive:x},0x{end_exclusive:x}) from {path:?}"),
228 : &self.cancel,
229 : )
230 : .await;
231 0 : debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done");
232 : res
233 : }
234 :
235 0 : pub fn pgdata(&self) -> RemotePath {
236 0 : RemotePath::from_string("pgdata").unwrap()
237 0 : }
238 :
239 0 : pub async fn get_control_file(&self) -> Result<ControlFile, anyhow::Error> {
240 0 : let control_file_path = self.pgdata().join("global/pg_control");
241 0 : info!("get control file from {control_file_path}");
242 0 : let control_file_buf = self.get(&control_file_path).await?;
243 0 : ControlFile::new(control_file_buf)
244 0 : }
245 : }
246 :
247 : pub struct ControlFile {
248 : control_file_data: ControlFileData,
249 : control_file_buf: Bytes,
250 : }
251 :
252 : impl ControlFile {
253 0 : pub(crate) fn new(control_file_buf: Bytes) -> Result<Self, anyhow::Error> {
254 : // XXX ControlFileData is version-specific, we're always using v14 here. v17 had changes.
255 0 : let control_file_data = ControlFileData::decode(&control_file_buf)?;
256 0 : let control_file = ControlFile {
257 0 : control_file_data,
258 0 : control_file_buf,
259 0 : };
260 0 : control_file.try_pg_version()?; // so that we can offer infallible pg_version()
261 0 : Ok(control_file)
262 0 : }
263 0 : pub(crate) fn base_lsn(&self) -> Lsn {
264 0 : Lsn(self.control_file_data.checkPoint).align()
265 0 : }
266 0 : pub(crate) fn pg_version(&self) -> u32 {
267 0 : self.try_pg_version()
268 0 : .expect("prepare() checks that try_pg_version doesn't error")
269 0 : }
270 0 : pub(crate) fn control_file_data(&self) -> &ControlFileData {
271 0 : &self.control_file_data
272 0 : }
273 0 : pub(crate) fn control_file_buf(&self) -> &Bytes {
274 0 : &self.control_file_buf
275 0 : }
276 0 : fn try_pg_version(&self) -> anyhow::Result<u32> {
277 0 : Ok(match self.control_file_data.catalog_version_no {
278 : // thesea are from catversion.h
279 0 : 202107181 => 14,
280 0 : 202209061 => 15,
281 0 : 202307071 => 16,
282 0 : 202406281 => 17,
283 0 : catversion => {
284 0 : anyhow::bail!("unrecognized catalog version {catversion}")
285 : }
286 : })
287 0 : }
288 : }
|