LCOV - code coverage report
Current view: top level - pageserver/src/tenant/timeline/import_pgdata - importbucket_client.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 0.0 % 170 0
Test Date: 2025-03-12 00:01:28 Functions: 0.0 % 39 0

            Line data    Source code
       1              : use std::ops::Bound;
       2              : use std::sync::Arc;
       3              : 
       4              : use anyhow::Context;
       5              : use bytes::Bytes;
       6              : use postgres_ffi::ControlFileData;
       7              : use remote_storage::{
       8              :     Download, DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, Listing,
       9              :     ListingObject, RemotePath,
      10              : };
      11              : use serde::de::DeserializeOwned;
      12              : use tokio_util::sync::CancellationToken;
      13              : use tracing::{debug, info, instrument};
      14              : use utils::lsn::Lsn;
      15              : 
      16              : use super::{importbucket_format, index_part_format};
      17              : use crate::assert_u64_eq_usize::U64IsUsize;
      18              : use crate::config::PageServerConf;
      19              : 
      20            0 : pub async fn new(
      21            0 :     conf: &'static PageServerConf,
      22            0 :     location: &index_part_format::Location,
      23            0 :     cancel: CancellationToken,
      24            0 : ) -> Result<RemoteStorageWrapper, anyhow::Error> {
      25            0 :     // FIXME: we probably want some timeout, and we might be able to assume the max file
      26            0 :     // size on S3 is 1GiB (postgres segment size). But the problem is that the individual
      27            0 :     // downloaders don't know enough about concurrent downloads to make a guess on the
      28            0 :     // expected bandwidth and resulting best timeout.
      29            0 :     let timeout = std::time::Duration::from_secs(24 * 60 * 60);
      30            0 :     let location_storage = match location {
      31              :         #[cfg(feature = "testing")]
      32            0 :         index_part_format::Location::LocalFs { path } => {
      33            0 :             GenericRemoteStorage::LocalFs(remote_storage::LocalFs::new(path.clone(), timeout)?)
      34              :         }
      35              :         index_part_format::Location::AwsS3 {
      36            0 :             region,
      37            0 :             bucket,
      38            0 :             key,
      39            0 :         } => {
      40            0 :             // TODO: think about security implications of letting the client specify the bucket & prefix.
      41            0 :             // It's the most flexible right now, but, possibly we want to move bucket name into PS conf
      42            0 :             // and force the timeline_id into the prefix?
      43            0 :             GenericRemoteStorage::AwsS3(Arc::new(
      44            0 :                 remote_storage::S3Bucket::new(
      45            0 :                     &remote_storage::S3Config {
      46            0 :                         bucket_name: bucket.clone(),
      47            0 :                         prefix_in_bucket: Some(key.clone()),
      48            0 :                         bucket_region: region.clone(),
      49            0 :                         endpoint: conf
      50            0 :                             .import_pgdata_aws_endpoint_url
      51            0 :                             .clone()
      52            0 :                             .map(|url| url.to_string()), //  by specifying None here, remote_storage/aws-sdk-rust will infer from env
      53            0 :                         concurrency_limit: 100.try_into().unwrap(), // TODO: think about this
      54            0 :                         max_keys_per_list_response: Some(1000),     // TODO: think about this
      55            0 :                         upload_storage_class: None,                 // irrelevant
      56            0 :                     },
      57            0 :                     timeout,
      58            0 :                 )
      59            0 :                 .await
      60            0 :                 .context("setup s3 bucket")?,
      61              :             ))
      62              :         }
      63              :     };
      64            0 :     let storage_wrapper = RemoteStorageWrapper::new(location_storage, cancel);
      65            0 :     Ok(storage_wrapper)
      66            0 : }
      67              : 
      68              : /// Wrap [`remote_storage`] APIs to make it look a bit more like a filesystem API
      69              : /// such as [`tokio::fs`], which was used in the original implementation of the import code.
      70              : #[derive(Clone)]
      71              : pub struct RemoteStorageWrapper {
      72              :     storage: GenericRemoteStorage,
      73              :     cancel: CancellationToken,
      74              : }
      75              : 
      76              : impl RemoteStorageWrapper {
      77            0 :     pub fn new(storage: GenericRemoteStorage, cancel: CancellationToken) -> Self {
      78            0 :         Self { storage, cancel }
      79            0 :     }
      80              : 
      81            0 :     #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
      82              :     pub async fn listfilesindir(
      83              :         &self,
      84              :         path: &RemotePath,
      85              :     ) -> Result<Vec<(RemotePath, usize)>, DownloadError> {
      86              :         assert!(
      87              :             path.object_name().is_some(),
      88              :             "must specify dirname, without trailing slash"
      89              :         );
      90              :         let path = path.add_trailing_slash();
      91              : 
      92              :         let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
      93            0 :             || async {
      94            0 :                 let Listing { keys, prefixes: _ } = self
      95            0 :                     .storage
      96            0 :                     .list(
      97            0 :                         Some(&path),
      98            0 :                         remote_storage::ListingMode::WithDelimiter,
      99            0 :                         None,
     100            0 :                         &self.cancel,
     101            0 :                     )
     102            0 :                     .await?;
     103            0 :                 let res = keys
     104            0 :                     .into_iter()
     105            0 :                     .map(|ListingObject { key, size, .. }| (key, size.into_usize()))
     106            0 :                     .collect();
     107            0 :                 Ok(res)
     108            0 :             },
     109              :             &format!("listfilesindir {path:?}"),
     110              :             &self.cancel,
     111              :         )
     112              :         .await;
     113              :         debug!(?res, "returning");
     114              :         res
     115              :     }
     116              : 
     117            0 :     #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
     118              :     pub async fn listdir(&self, path: &RemotePath) -> Result<Vec<RemotePath>, DownloadError> {
     119              :         assert!(
     120              :             path.object_name().is_some(),
     121              :             "must specify dirname, without trailing slash"
     122              :         );
     123              :         let path = path.add_trailing_slash();
     124              : 
     125              :         let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
     126            0 :             || async {
     127            0 :                 let Listing { keys, prefixes } = self
     128            0 :                     .storage
     129            0 :                     .list(
     130            0 :                         Some(&path),
     131            0 :                         remote_storage::ListingMode::WithDelimiter,
     132            0 :                         None,
     133            0 :                         &self.cancel,
     134            0 :                     )
     135            0 :                     .await?;
     136            0 :                 let res = keys
     137            0 :                     .into_iter()
     138            0 :                     .map(|ListingObject { key, .. }| key)
     139            0 :                     .chain(prefixes.into_iter())
     140            0 :                     .collect();
     141            0 :                 Ok(res)
     142            0 :             },
     143              :             &format!("listdir {path:?}"),
     144              :             &self.cancel,
     145              :         )
     146              :         .await;
     147              :         debug!(?res, "returning");
     148              :         res
     149              :     }
     150              : 
     151            0 :     #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
     152              :     pub async fn get(&self, path: &RemotePath) -> Result<Bytes, DownloadError> {
     153              :         let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
     154            0 :             || async {
     155              :                 let Download {
     156            0 :                     download_stream, ..
     157            0 :                 } = self
     158            0 :                     .storage
     159            0 :                     .download(path, &DownloadOpts::default(), &self.cancel)
     160            0 :                     .await?;
     161            0 :                 let mut reader = tokio_util::io::StreamReader::new(download_stream);
     162            0 : 
     163            0 :                 // XXX optimize this, can we get the capacity hint from somewhere?
     164            0 :                 let mut buf = Vec::new();
     165            0 :                 tokio::io::copy_buf(&mut reader, &mut buf).await?;
     166            0 :                 Ok(Bytes::from(buf))
     167            0 :             },
     168              :             &format!("download {path:?}"),
     169              :             &self.cancel,
     170              :         )
     171              :         .await;
     172            0 :         debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done");
     173              :         res
     174              :     }
     175              : 
     176            0 :     pub async fn get_spec(&self) -> Result<Option<importbucket_format::Spec>, anyhow::Error> {
     177            0 :         self.get_json(&RemotePath::from_string("spec.json").unwrap())
     178            0 :             .await
     179            0 :             .context("get spec")
     180            0 :     }
     181              : 
     182            0 :     #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
     183              :     pub async fn get_json<T: DeserializeOwned>(
     184              :         &self,
     185              :         path: &RemotePath,
     186              :     ) -> Result<Option<T>, DownloadError> {
     187              :         let buf = match self.get(path).await {
     188              :             Ok(buf) => buf,
     189              :             Err(DownloadError::NotFound) => return Ok(None),
     190              :             Err(err) => return Err(err),
     191              :         };
     192              :         let res = serde_json::from_slice(&buf)
     193              :             .context("serialize")
     194              :             // TODO: own error type
     195              :             .map_err(DownloadError::Other)?;
     196              :         Ok(Some(res))
     197              :     }
     198              : 
     199            0 :     #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
     200              :     pub async fn put_json<T>(&self, path: &RemotePath, value: &T) -> anyhow::Result<()>
     201              :     where
     202              :         T: serde::Serialize,
     203              :     {
     204              :         let buf = serde_json::to_vec(value)?;
     205              :         let bytes = Bytes::from(buf);
     206              :         utils::backoff::retry(
     207            0 :             || async {
     208            0 :                 let size = bytes.len();
     209            0 :                 let bytes = futures::stream::once(futures::future::ready(Ok(bytes.clone())));
     210            0 :                 self.storage
     211            0 :                     .upload_storage_object(bytes, size, path, &self.cancel)
     212            0 :                     .await
     213            0 :             },
     214              :             remote_storage::TimeoutOrCancel::caused_by_cancel,
     215              :             1,
     216              :             u32::MAX,
     217              :             &format!("put json {path}"),
     218              :             &self.cancel,
     219              :         )
     220              :         .await
     221              :         .expect("practically infinite retries")
     222              :     }
     223              : 
     224            0 :     #[instrument(level = tracing::Level::DEBUG, skip_all, fields(%path))]
     225              :     pub async fn get_range(
     226              :         &self,
     227              :         path: &RemotePath,
     228              :         start_inclusive: u64,
     229              :         end_exclusive: u64,
     230              :     ) -> Result<Vec<u8>, DownloadError> {
     231              :         let len = end_exclusive
     232              :             .checked_sub(start_inclusive)
     233              :             .unwrap()
     234              :             .into_usize();
     235              :         let res = crate::tenant::remote_timeline_client::download::download_retry_forever(
     236            0 :             || async {
     237              :                 let Download {
     238            0 :                     download_stream, ..
     239            0 :                 } = self
     240            0 :                     .storage
     241            0 :                     .download(
     242            0 :                         path,
     243            0 :                         &DownloadOpts {
     244            0 :                             kind: DownloadKind::Large,
     245            0 :                             etag: None,
     246            0 :                             byte_start: Bound::Included(start_inclusive),
     247            0 :                             byte_end: Bound::Excluded(end_exclusive)
     248            0 :                         },
     249            0 :                         &self.cancel)
     250            0 :                     .await?;
     251            0 :                 let mut reader = tokio_util::io::StreamReader::new(download_stream);
     252            0 : 
     253            0 :                 let mut buf = Vec::with_capacity(len);
     254            0 :                 tokio::io::copy_buf(&mut reader, &mut buf).await?;
     255            0 :                 Ok(buf)
     256            0 :             },
     257              :             &format!("download range len=0x{len:x} [0x{start_inclusive:x},0x{end_exclusive:x}) from {path:?}"),
     258              :             &self.cancel,
     259              :         )
     260              :         .await;
     261            0 :         debug!(len = res.as_ref().ok().map(|buf| buf.len()), "done");
     262              :         res
     263              :     }
     264              : 
     265            0 :     pub fn pgdata(&self) -> RemotePath {
     266            0 :         RemotePath::from_string("pgdata").unwrap()
     267            0 :     }
     268              : 
     269            0 :     pub async fn get_control_file(&self) -> Result<ControlFile, anyhow::Error> {
     270            0 :         let control_file_path = self.pgdata().join("global/pg_control");
     271            0 :         info!("get control file from {control_file_path}");
     272            0 :         let control_file_buf = self.get(&control_file_path).await?;
     273            0 :         ControlFile::new(control_file_buf)
     274            0 :     }
     275              : }
     276              : 
     277              : pub struct ControlFile {
     278              :     control_file_data: ControlFileData,
     279              :     control_file_buf: Bytes,
     280              : }
     281              : 
     282              : impl ControlFile {
     283            0 :     pub(crate) fn new(control_file_buf: Bytes) -> Result<Self, anyhow::Error> {
     284              :         // XXX ControlFileData is version-specific, we're always using v14 here. v17 had changes.
     285            0 :         let control_file_data = ControlFileData::decode(&control_file_buf)?;
     286            0 :         let control_file = ControlFile {
     287            0 :             control_file_data,
     288            0 :             control_file_buf,
     289            0 :         };
     290            0 :         control_file.try_pg_version()?; // so that we can offer infallible pg_version()
     291            0 :         Ok(control_file)
     292            0 :     }
     293            0 :     pub(crate) fn base_lsn(&self) -> Lsn {
     294            0 :         Lsn(self.control_file_data.checkPoint).align()
     295            0 :     }
     296            0 :     pub(crate) fn pg_version(&self) -> u32 {
     297            0 :         self.try_pg_version()
     298            0 :             .expect("prepare() checks that try_pg_version doesn't error")
     299            0 :     }
     300            0 :     pub(crate) fn control_file_data(&self) -> &ControlFileData {
     301            0 :         &self.control_file_data
     302            0 :     }
     303            0 :     pub(crate) fn control_file_buf(&self) -> &Bytes {
     304            0 :         &self.control_file_buf
     305            0 :     }
     306            0 :     fn try_pg_version(&self) -> anyhow::Result<u32> {
     307            0 :         Ok(match self.control_file_data.catalog_version_no {
     308              :             // thesea are from catversion.h
     309            0 :             202107181 => 14,
     310            0 :             202209061 => 15,
     311            0 :             202307071 => 16,
     312            0 :             202406281 => 17,
     313            0 :             catversion => {
     314            0 :                 anyhow::bail!("unrecognized catalog version {catversion}")
     315              :             }
     316              :         })
     317            0 :     }
     318              : }
        

Generated by: LCOV version 2.1-beta