Line data Source code
1 : use std::ops::Bound;
2 : use std::sync::Arc;
3 :
4 : use anyhow::Context;
5 : use bytes::Bytes;
6 : use postgres_ffi::{ControlFileData, PgMajorVersion};
7 : use remote_storage::{
8 : Download, DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, Listing,
9 : ListingObject, RemotePath, RemoteStorageConfig,
10 : };
11 : use serde::de::DeserializeOwned;
12 : use tokio_util::sync::CancellationToken;
13 : use tracing::{debug, info, instrument};
14 : use utils::lsn::Lsn;
15 :
16 : use super::index_part_format;
17 : use crate::assert_u64_eq_usize::U64IsUsize;
18 : use crate::config::PageServerConf;
19 :
20 0 : pub async fn new(
21 0 : conf: &'static PageServerConf,
22 0 : location: &index_part_format::Location,
23 0 : cancel: CancellationToken,
24 0 : ) -> Result<RemoteStorageWrapper, anyhow::Error> {
25 : // Downloads should be reasonably sized. We do ranged reads for relblock raw data
26 : // and full reads for SLRU segments which are bounded by Postgres.
27 0 : let timeout = RemoteStorageConfig::DEFAULT_TIMEOUT;
28 0 : let location_storage = match location {
29 : #[cfg(feature = "testing")]
30 0 : index_part_format::Location::LocalFs { path } => {
31 0 : GenericRemoteStorage::LocalFs(remote_storage::LocalFs::new(path.clone(), timeout)?)
32 : }
33 : index_part_format::Location::AwsS3 {
34 0 : region,
35 0 : bucket,
36 0 : key,
37 : } => {
38 : // TODO: think about security implications of letting the client specify the bucket & prefix.
39 : // It's the most flexible right now, but, possibly we want to move bucket name into PS conf
40 : // and force the timeline_id into the prefix?
41 0 : GenericRemoteStorage::AwsS3(Arc::new(
42 0 : remote_storage::S3Bucket::new(
43 : &remote_storage::S3Config {
44 0 : bucket_name: bucket.clone(),
45 0 : prefix_in_bucket: Some(key.clone()),
46 0 : bucket_region: region.clone(),
47 0 : endpoint: conf
48 0 : .import_pgdata_aws_endpoint_url
49 0 : .clone()
50 0 : .map(|url| url.to_string()), // by specifying None here, remote_storage/aws-sdk-rust will infer from env
51 : // This matches the default import job concurrency. This is managed
52 : // separately from the usual S3 client, but the concern here is bandwidth
53 : // usage.
54 0 : concurrency_limit: 128.try_into().unwrap(),
55 0 : max_keys_per_list_response: Some(1000),
56 0 : upload_storage_class: None, // irrelevant
57 : },
58 0 : timeout,
59 : )
60 0 : .await
61 0 : .context("setup s3 bucket")?,
62 : ))
63 : }
64 : };
65 0 : let storage_wrapper = RemoteStorageWrapper::new(location_storage, cancel);
66 0 : Ok(storage_wrapper)
67 0 : }
68 :
69 : /// Wrap [`remote_storage`] APIs to make it look a bit more like a filesystem API
70 : /// such as [`tokio::fs`], which was used in the original implementation of the import code.
71 : #[derive(Clone)]
72 : pub struct RemoteStorageWrapper {
73 : storage: GenericRemoteStorage,
74 : cancel: CancellationToken,
75 : }
76 :
77 : impl RemoteStorageWrapper {
78 0 : pub fn new(storage: GenericRemoteStorage, cancel: CancellationToken) -> Self {
79 0 : Self { storage, cancel }
80 0 : }
81 :
82 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
83 : pub async fn listfilesindir(
84 : &self,
85 : path: &RemotePath,
86 : ) -> Result<Vec<(RemotePath, usize)>, DownloadError> {
87 : assert!(
88 : path.object_name().is_some(),
89 : "must specify dirname, without trailing slash"
90 : );
91 : let path = path.add_trailing_slash();
92 :
93 : let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
94 0 : || async {
95 0 : let Listing { keys, prefixes: _ } = self
96 0 : .storage
97 0 : .list(
98 0 : Some(&path),
99 0 : remote_storage::ListingMode::WithDelimiter,
100 0 : None,
101 0 : &self.cancel,
102 0 : )
103 0 : .await?;
104 0 : let res = keys
105 0 : .into_iter()
106 0 : .map(|ListingObject { key, size, .. }| (key, size.into_usize()))
107 0 : .collect();
108 0 : Ok(res)
109 0 : },
110 : &format!("listfilesindir {path:?}"),
111 : &self.cancel,
112 : )
113 : .await;
114 : debug!(?res, "returning");
115 : res
116 : }
117 :
118 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
119 : pub async fn listdir(&self, path: &RemotePath) -> Result<Vec<RemotePath>, DownloadError> {
120 : assert!(
121 : path.object_name().is_some(),
122 : "must specify dirname, without trailing slash"
123 : );
124 : let path = path.add_trailing_slash();
125 :
126 : let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
127 0 : || async {
128 0 : let Listing { keys, prefixes } = self
129 0 : .storage
130 0 : .list(
131 0 : Some(&path),
132 0 : remote_storage::ListingMode::WithDelimiter,
133 0 : None,
134 0 : &self.cancel,
135 0 : )
136 0 : .await?;
137 0 : let res = keys
138 0 : .into_iter()
139 0 : .map(|ListingObject { key, .. }| key)
140 0 : .chain(prefixes.into_iter())
141 0 : .collect();
142 0 : Ok(res)
143 0 : },
144 : &format!("listdir {path:?}"),
145 : &self.cancel,
146 : )
147 : .await;
148 : debug!(?res, "returning");
149 : res
150 : }
151 :
152 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
153 : pub async fn get(&self, path: &RemotePath) -> Result<Bytes, DownloadError> {
154 : let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
155 0 : || async {
156 : let Download {
157 0 : download_stream, ..
158 0 : } = self
159 0 : .storage
160 0 : .download(path, &DownloadOpts::default(), &self.cancel)
161 0 : .await?;
162 0 : let mut reader = tokio_util::io::StreamReader::new(download_stream);
163 :
164 : // XXX optimize this, can we get the capacity hint from somewhere?
165 0 : let mut buf = Vec::new();
166 0 : tokio::io::copy_buf(&mut reader, &mut buf).await?;
167 0 : Ok(Bytes::from(buf))
168 0 : },
169 : &format!("download {path:?}"),
170 : &self.cancel,
171 : )
172 : .await;
173 0 : debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done");
174 : res
175 : }
176 :
177 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
178 : pub async fn get_json<T: DeserializeOwned>(
179 : &self,
180 : path: &RemotePath,
181 : ) -> Result<Option<T>, DownloadError> {
182 : let buf = match self.get(path).await {
183 : Ok(buf) => buf,
184 : Err(DownloadError::NotFound) => return Ok(None),
185 : Err(err) => return Err(err),
186 : };
187 : let res = serde_json::from_slice(&buf)
188 : .context("serialize")
189 : // TODO: own error type
190 : .map_err(DownloadError::Other)?;
191 : Ok(Some(res))
192 : }
193 :
194 0 : #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
195 : pub async fn get_range(
196 : &self,
197 : path: &RemotePath,
198 : start_inclusive: u64,
199 : end_exclusive: u64,
200 : ) -> Result<Vec<u8>, DownloadError> {
201 : let len = end_exclusive
202 : .checked_sub(start_inclusive)
203 : .unwrap()
204 : .into_usize();
205 : let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
206 0 : || async {
207 : let Download {
208 0 : download_stream, ..
209 0 : } = self
210 0 : .storage
211 0 : .download(
212 0 : path,
213 0 : &DownloadOpts {
214 0 : kind: DownloadKind::Large,
215 0 : etag: None,
216 0 : byte_start: Bound::Included(start_inclusive),
217 0 : byte_end: Bound::Excluded(end_exclusive),
218 0 : version_id: None,
219 0 : },
220 0 : &self.cancel)
221 0 : .await?;
222 0 : let mut reader = tokio_util::io::StreamReader::new(download_stream);
223 :
224 0 : let mut buf = Vec::with_capacity(len);
225 0 : tokio::io::copy_buf(&mut reader, &mut buf).await?;
226 0 : Ok(buf)
227 0 : },
228 : &format!("download range len=0x{len:x} [0x{start_inclusive:x},0x{end_exclusive:x}) from {path:?}"),
229 : &self.cancel,
230 : )
231 : .await;
232 0 : debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done");
233 : res
234 : }
235 :
236 0 : pub fn pgdata(&self) -> RemotePath {
237 0 : RemotePath::from_string("pgdata").unwrap()
238 0 : }
239 :
240 0 : pub async fn get_control_file(&self) -> Result<ControlFile, anyhow::Error> {
241 0 : let control_file_path = self.pgdata().join("global/pg_control");
242 0 : info!("get control file from {control_file_path}");
243 0 : let control_file_buf = self.get(&control_file_path).await?;
244 0 : ControlFile::new(control_file_buf)
245 0 : }
246 : }
247 :
248 : pub struct ControlFile {
249 : control_file_data: ControlFileData,
250 : control_file_buf: Bytes,
251 : }
252 :
253 : impl ControlFile {
254 0 : pub(crate) fn new(control_file_buf: Bytes) -> Result<Self, anyhow::Error> {
255 : // XXX ControlFileData is version-specific, we're always using v14 here. v17 had changes.
256 0 : let control_file_data = ControlFileData::decode(&control_file_buf)?;
257 0 : let control_file = ControlFile {
258 0 : control_file_data,
259 0 : control_file_buf,
260 0 : };
261 0 : control_file.try_pg_version()?; // so that we can offer infallible pg_version()
262 0 : Ok(control_file)
263 0 : }
264 0 : pub(crate) fn base_lsn(&self) -> Lsn {
265 0 : Lsn(self.control_file_data.checkPoint).align()
266 0 : }
267 0 : pub(crate) fn pg_version(&self) -> PgMajorVersion {
268 0 : self.try_pg_version()
269 0 : .expect("prepare() checks that try_pg_version doesn't error")
270 0 : }
271 0 : pub(crate) fn control_file_data(&self) -> &ControlFileData {
272 0 : &self.control_file_data
273 0 : }
274 0 : pub(crate) fn control_file_buf(&self) -> &Bytes {
275 0 : &self.control_file_buf
276 0 : }
277 :
278 0 : fn try_pg_version(&self) -> anyhow::Result<PgMajorVersion> {
279 0 : Ok(match self.control_file_data.catalog_version_no {
280 : // thesea are from catversion.h
281 0 : 202107181 => PgMajorVersion::PG14,
282 0 : 202209061 => PgMajorVersion::PG15,
283 0 : 202307071 => PgMajorVersion::PG16,
284 0 : 202406281 => PgMajorVersion::PG17,
285 0 : catversion => {
286 0 : anyhow::bail!("unrecognized catalog version {catversion}")
287 : }
288 : })
289 0 : }
290 : }
|