LCOV - differential code coverage report
Current view: top level - proxy/src/context - parquet.rs (source / functions) Coverage Total Hit UIC UBC GIC CBC EUB ECB
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 88.0 % 449 395 54 395
Current Date: 2024-01-09 02:06:09 Functions: 56.9 % 130 74 1 55 1 73 1 1
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : use std::sync::Arc;
       2                 : 
       3                 : use anyhow::Context;
       4                 : use bytes::BytesMut;
       5                 : use futures::{Stream, StreamExt};
       6                 : use parquet::{
       7                 :     basic::Compression,
       8                 :     file::{
       9                 :         metadata::RowGroupMetaDataPtr,
      10                 :         properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE},
      11                 :         writer::SerializedFileWriter,
      12                 :     },
      13                 :     record::RecordWriter,
      14                 : };
      15                 : use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig};
      16                 : use tokio::{sync::mpsc, time};
      17                 : use tokio_util::sync::CancellationToken;
      18                 : use tracing::{debug, info, Span};
      19                 : use utils::backoff;
      20                 : 
      21                 : use super::{RequestMonitoring, LOG_CHAN};
      22                 : 
      23 CBC          25 : #[derive(clap::Args, Clone, Debug)]
      24                 : pub struct ParquetUploadArgs {
      25                 :     /// Storage location to upload the parquet files to.
      26                 :     /// Encoded as toml (same format as pageservers), eg
      27                 :     /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
      28                 :     #[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
      29 UBC           0 :     parquet_upload_remote_storage: OptRemoteStorageConfig,
      30                 : 
      31                 :     /// How many rows to include in a row group
      32 CBC          25 :     #[clap(long, default_value_t = 8192)]
      33 UBC           0 :     parquet_upload_row_group_size: usize,
      34                 : 
      35                 :     /// How large each column page should be in bytes
      36 CBC          25 :     #[clap(long, default_value_t = DEFAULT_PAGE_SIZE)]
      37 UBC           0 :     parquet_upload_page_size: usize,
      38                 : 
      39                 :     /// How large the total parquet file should be in bytes
      40 CBC          25 :     #[clap(long, default_value_t = 100_000_000)]
      41 UBC           0 :     parquet_upload_size: i64,
      42                 : 
      43                 :     /// How long to wait before forcing a file upload
      44                 :     #[clap(long, default_value = "20m", value_parser = humantime::parse_duration)]
      45               0 :     parquet_upload_maximum_duration: tokio::time::Duration,
      46                 : 
      47                 :     /// What level of compression to use
      48 CBC          25 :     #[clap(long, default_value_t = Compression::UNCOMPRESSED)]
      49 UBC           0 :     parquet_upload_compression: Compression,
      50                 : }
      51                 : 
      52                 : /// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
      53                 : /// runtime type errors from the value parser we use.
      54                 : type OptRemoteStorageConfig = Option<RemoteStorageConfig>;
      55                 : 
      56 CBC          50 : fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
      57              50 :     RemoteStorageConfig::from_toml(&s.parse()?)
      58              50 : }
      59                 : 
      60                 : // Occasional network issues and such can cause remote operations to fail, and
      61                 : // that's expected. If a upload fails, we log it at info-level, and retry.
      62                 : // But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
      63                 : // level instead, as repeated failures can mean a more serious problem. If it
      64                 : // fails more than FAILED_UPLOAD_RETRIES times, we give up
      65                 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
      66                 : pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
      67                 : 
      68                 : // the parquet crate leaves a lot to be desired...
      69                 : // what follows is an attempt to write parquet files with minimal allocs.
      70                 : // complication: parquet is a columnar format, while we want to write in as rows.
      71                 : // design:
      72                 : // * we batch up to 1024 rows, then flush them into a 'row group'
      73                 : // * after each rowgroup write, we check the length of the file and upload to s3 if large enough
      74                 : 
      75         2508005 : #[derive(parquet_derive::ParquetRecordWriter)]
      76                 : struct RequestData {
      77                 :     region: &'static str,
      78                 :     protocol: &'static str,
      79                 :     /// Must be UTC. The derive macro doesn't like the timezones
      80                 :     timestamp: chrono::NaiveDateTime,
      81                 :     session_id: uuid::Uuid,
      82                 :     peer_addr: String,
      83                 :     username: Option<String>,
      84                 :     application_name: Option<String>,
      85                 :     endpoint_id: Option<String>,
      86                 :     project: Option<String>,
      87                 :     branch: Option<String>,
      88                 :     error: Option<&'static str>,
      89                 : }
      90                 : 
      91                 : impl From<RequestMonitoring> for RequestData {
      92 UBC           0 :     fn from(value: RequestMonitoring) -> Self {
      93               0 :         Self {
      94               0 :             session_id: value.session_id,
      95               0 :             peer_addr: value.peer_addr.to_string(),
      96               0 :             timestamp: value.first_packet.naive_utc(),
      97               0 :             username: value.user.as_deref().map(String::from),
      98               0 :             application_name: value.application.as_deref().map(String::from),
      99               0 :             endpoint_id: value.endpoint_id.as_deref().map(String::from),
     100               0 :             project: value.project.as_deref().map(String::from),
     101               0 :             branch: value.branch.as_deref().map(String::from),
     102               0 :             protocol: value.protocol,
     103               0 :             region: value.region,
     104               0 :             error: value.error_kind.as_ref().map(|e| e.to_str()),
     105               0 :         }
     106               0 :     }
     107                 : }
     108                 : 
     109                 : /// Parquet request context worker
     110                 : ///
     111                 : /// It listened on a channel for all completed requests, extracts the data and writes it into a parquet file,
     112                 : /// then uploads a completed batch to S3
     113 CBC          22 : pub async fn worker(
     114              22 :     cancellation_token: CancellationToken,
     115              22 :     config: ParquetUploadArgs,
     116              22 : ) -> anyhow::Result<()> {
     117              22 :     let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
     118              22 :         tracing::warn!("parquet request upload: no s3 bucket configured");
     119              22 :         return Ok(());
     120                 :     };
     121                 : 
     122 UBC           0 :     let (tx, mut rx) = mpsc::unbounded_channel();
     123               0 :     LOG_CHAN.set(tx.downgrade()).unwrap();
     124               0 : 
     125               0 :     // setup row stream that will close on cancellation
     126               0 :     tokio::spawn(async move {
     127               0 :         cancellation_token.cancelled().await;
     128                 :         // dropping this sender will cause the channel to close only once
     129                 :         // all the remaining inflight requests have been completed.
     130               0 :         drop(tx);
     131               0 :     });
     132               0 :     let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
     133               0 :     let rx = rx.map(RequestData::from);
     134                 : 
     135               0 :     let storage =
     136               0 :         GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?;
     137                 : 
     138               0 :     let properties = WriterProperties::builder()
     139               0 :         .set_data_page_size_limit(config.parquet_upload_page_size)
     140               0 :         .set_compression(config.parquet_upload_compression);
     141               0 : 
     142               0 :     let parquet_config = ParquetConfig {
     143               0 :         propeties: Arc::new(properties.build()),
     144               0 :         rows_per_group: config.parquet_upload_row_group_size,
     145               0 :         file_size: config.parquet_upload_size,
     146               0 :         max_duration: config.parquet_upload_maximum_duration,
     147               0 : 
     148               0 :         #[cfg(any(test, feature = "testing"))]
     149               0 :         test_remote_failures: 0,
     150               0 :     };
     151               0 : 
     152               0 :     worker_inner(storage, rx, parquet_config).await
     153 CBC          22 : }
     154                 : 
     155                 : struct ParquetConfig {
     156                 :     propeties: WriterPropertiesPtr,
     157                 :     rows_per_group: usize,
     158                 :     file_size: i64,
     159                 : 
     160                 :     max_duration: tokio::time::Duration,
     161                 : 
     162                 :     #[cfg(any(test, feature = "testing"))]
     163                 :     test_remote_failures: u64,
     164                 : }
     165                 : 
     166               5 : async fn worker_inner(
     167               5 :     storage: GenericRemoteStorage,
     168               5 :     rx: impl Stream<Item = RequestData>,
     169               5 :     config: ParquetConfig,
     170               5 : ) -> anyhow::Result<()> {
     171                 :     #[cfg(any(test, feature = "testing"))]
     172               5 :     let storage = if config.test_remote_failures > 0 {
     173               2 :         GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures)
     174                 :     } else {
     175               3 :         storage
     176                 :     };
     177                 : 
     178               5 :     let mut rx = std::pin::pin!(rx);
     179               5 : 
     180               5 :     let mut rows = Vec::with_capacity(config.rows_per_group);
     181                 : 
     182               5 :     let schema = rows.as_slice().schema()?;
     183               5 :     let file = BytesWriter::default();
     184               5 :     let mut w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
     185                 : 
     186               5 :     let mut last_upload = time::Instant::now();
     187               5 : 
     188               5 :     let mut len = 0;
     189          209005 :     while let Some(row) = rx.next().await {
     190          209000 :         rows.push(row);
     191          209000 :         let force = last_upload.elapsed() > config.max_duration;
     192          209000 :         if rows.len() == config.rows_per_group || force {
     193             105 :             let rg_meta;
     194             105 :             (rows, w, rg_meta) = flush_rows(rows, w).await?;
     195             105 :             len += rg_meta.compressed_size();
     196          208895 :         }
     197          209000 :         if len > config.file_size || force {
     198              26 :             last_upload = time::Instant::now();
     199             104 :             let file = upload_parquet(w, len, &storage).await?;
     200              26 :             w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
     201              26 :             len = 0;
     202          208974 :         }
     203                 :     }
     204                 : 
     205               5 :     if !rows.is_empty() {
     206               1 :         let rg_meta;
     207               1 :         (_, w, rg_meta) = flush_rows(rows, w).await?;
     208               1 :         len += rg_meta.compressed_size();
     209               4 :     }
     210                 : 
     211               5 :     if !w.flushed_row_groups().is_empty() {
     212              20 :         let _: BytesWriter = upload_parquet(w, len, &storage).await?;
     213 UBC           0 :     }
     214                 : 
     215 CBC           5 :     Ok(())
     216               5 : }
     217                 : 
     218             106 : async fn flush_rows(
     219             106 :     rows: Vec<RequestData>,
     220             106 :     mut w: SerializedFileWriter<BytesWriter>,
     221             106 : ) -> anyhow::Result<(
     222             106 :     Vec<RequestData>,
     223             106 :     SerializedFileWriter<BytesWriter>,
     224             106 :     RowGroupMetaDataPtr,
     225             106 : )> {
     226             106 :     let span = Span::current();
     227             106 :     let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || {
     228             106 :         let _enter = span.enter();
     229                 : 
     230             106 :         let mut rg = w.next_row_group()?;
     231             106 :         rows.as_slice().write_to_row_group(&mut rg)?;
     232             106 :         let rg_meta = rg.close()?;
     233                 : 
     234             106 :         let size = rg_meta.compressed_size();
     235             106 :         let compression = rg_meta.compressed_size() as f64 / rg_meta.total_byte_size() as f64;
     236             106 : 
     237             106 :         debug!(size, compression, "flushed row group to parquet file");
     238                 : 
     239             106 :         Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
     240             106 :     })
     241             106 :     .await
     242             106 :     .unwrap()?;
     243                 : 
     244             106 :     rows.clear();
     245             106 :     Ok((rows, w, rg_meta))
     246             106 : }
     247                 : 
     248              31 : async fn upload_parquet(
     249              31 :     w: SerializedFileWriter<BytesWriter>,
     250              31 :     len: i64,
     251              31 :     storage: &GenericRemoteStorage,
     252              31 : ) -> anyhow::Result<BytesWriter> {
     253              31 :     let len_uncompressed = w
     254              31 :         .flushed_row_groups()
     255              31 :         .iter()
     256             106 :         .map(|rg| rg.total_byte_size())
     257              31 :         .sum::<i64>();
     258                 : 
     259                 :     // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
     260                 :     // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
     261              31 :     let (mut file, metadata) = tokio::task::spawn_blocking(move || w.finish())
     262              31 :         .await
     263              31 :         .unwrap()?;
     264                 : 
     265              31 :     let data = file.buf.split().freeze();
     266              31 : 
     267              31 :     let compression = len as f64 / len_uncompressed as f64;
     268              31 :     let size = data.len();
     269              31 :     let id = uuid::Uuid::now_v7();
     270                 : 
     271 UBC           0 :     info!(
     272               0 :         %id,
     273               0 :         rows = metadata.num_rows,
     274               0 :         size, compression, "uploading request parquet file"
     275               0 :     );
     276                 : 
     277 CBC          31 :     let path = RemotePath::from_string(&format!("requests_{id}.parquet"))?;
     278              31 :     backoff::retry(
     279              43 :         || async {
     280              43 :             let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
     281              93 :             storage.upload(stream, data.len(), &path, None).await
     282              43 :         },
     283              31 :         |_e| false,
     284              31 :         FAILED_UPLOAD_WARN_THRESHOLD,
     285              31 :         FAILED_UPLOAD_MAX_RETRIES,
     286              31 :         "request_data_upload",
     287              31 :         // we don't want cancellation to interrupt here, so we make a dummy cancel token
     288              31 :         backoff::Cancel::new(CancellationToken::new(), || anyhow::anyhow!("Cancelled")),
     289              31 :     )
     290              93 :     .await
     291              31 :     .context("request_data_upload")?;
     292                 : 
     293              31 :     Ok(file)
     294              31 : }
     295                 : 
     296                 : // why doesn't BytesMut impl io::Write?
     297               5 : #[derive(Default)]
     298                 : struct BytesWriter {
     299                 :     buf: BytesMut,
     300                 : }
     301                 : 
     302                 : impl std::io::Write for BytesWriter {
     303          171877 :     fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
     304          171877 :         self.buf.extend_from_slice(buf);
     305          171877 :         Ok(buf.len())
     306          171877 :     }
     307                 : 
     308            1166 :     fn flush(&mut self) -> std::io::Result<()> {
     309            1166 :         Ok(())
     310            1166 :     }
     311                 : }
     312                 : 
     313                 : #[cfg(test)]
     314                 : mod tests {
     315                 :     use std::{net::Ipv4Addr, num::NonZeroUsize, sync::Arc};
     316                 : 
     317                 :     use camino::Utf8Path;
     318                 :     use clap::Parser;
     319                 :     use futures::{Stream, StreamExt};
     320                 :     use itertools::Itertools;
     321                 :     use parquet::{
     322                 :         basic::{Compression, ZstdLevel},
     323                 :         file::{
     324                 :             properties::{WriterProperties, DEFAULT_PAGE_SIZE},
     325                 :             reader::FileReader,
     326                 :             serialized_reader::SerializedFileReader,
     327                 :         },
     328                 :     };
     329                 :     use rand::{rngs::StdRng, Rng, SeedableRng};
     330                 :     use remote_storage::{
     331                 :         GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config,
     332                 :         DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
     333                 :     };
     334                 :     use tokio::{sync::mpsc, time};
     335                 : 
     336                 :     use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};
     337                 : 
     338               2 :     #[derive(Parser)]
     339                 :     struct ProxyCliArgs {
     340                 :         #[clap(flatten)]
     341                 :         parquet_upload: ParquetUploadArgs,
     342                 :     }
     343                 : 
     344               1 :     #[test]
     345               1 :     fn default_parser() {
     346               1 :         let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from(["proxy"]);
     347               1 :         assert_eq!(parquet_upload.parquet_upload_remote_storage, None);
     348               1 :         assert_eq!(parquet_upload.parquet_upload_row_group_size, 8192);
     349               1 :         assert_eq!(parquet_upload.parquet_upload_page_size, DEFAULT_PAGE_SIZE);
     350               1 :         assert_eq!(parquet_upload.parquet_upload_size, 100_000_000);
     351               1 :         assert_eq!(
     352               1 :             parquet_upload.parquet_upload_maximum_duration,
     353               1 :             time::Duration::from_secs(20 * 60)
     354               1 :         );
     355               1 :         assert_eq!(
     356               1 :             parquet_upload.parquet_upload_compression,
     357               1 :             Compression::UNCOMPRESSED
     358               1 :         );
     359               1 :     }
     360                 : 
     361               1 :     #[test]
     362               1 :     fn full_parser() {
     363               1 :         let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from([
     364               1 :             "proxy",
     365               1 :             "--parquet-upload-remote-storage",
     366               1 :             "{bucket_name='default',prefix_in_bucket='proxy/',bucket_region='us-east-1',endpoint='http://minio:9000'}",
     367               1 :             "--parquet-upload-row-group-size",
     368               1 :             "100",
     369               1 :             "--parquet-upload-page-size",
     370               1 :             "10000",
     371               1 :             "--parquet-upload-size",
     372               1 :             "10000000",
     373               1 :             "--parquet-upload-maximum-duration",
     374               1 :             "10m",
     375               1 :             "--parquet-upload-compression",
     376               1 :             "zstd(5)",
     377               1 :         ]);
     378               1 :         assert_eq!(
     379               1 :             parquet_upload.parquet_upload_remote_storage,
     380               1 :             Some(RemoteStorageConfig {
     381               1 :                 storage: RemoteStorageKind::AwsS3(S3Config {
     382               1 :                     bucket_name: "default".into(),
     383               1 :                     bucket_region: "us-east-1".into(),
     384               1 :                     prefix_in_bucket: Some("proxy/".into()),
     385               1 :                     endpoint: Some("http://minio:9000".into()),
     386               1 :                     concurrency_limit: NonZeroUsize::new(
     387               1 :                         DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
     388               1 :                     )
     389               1 :                     .unwrap(),
     390               1 :                     max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
     391               1 :                 })
     392               1 :             })
     393               1 :         );
     394               1 :         assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
     395               1 :         assert_eq!(parquet_upload.parquet_upload_page_size, 10000);
     396               1 :         assert_eq!(parquet_upload.parquet_upload_size, 10_000_000);
     397               1 :         assert_eq!(
     398               1 :             parquet_upload.parquet_upload_maximum_duration,
     399               1 :             time::Duration::from_secs(10 * 60)
     400               1 :         );
     401               1 :         assert_eq!(
     402               1 :             parquet_upload.parquet_upload_compression,
     403               1 :             Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
     404               1 :         );
     405               1 :     }
     406                 : 
     407          209000 :     fn generate_request_data(rng: &mut impl Rng) -> RequestData {
     408          209000 :         RequestData {
     409          209000 :             session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(),
     410          209000 :             peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(),
     411          209000 :             timestamp: chrono::NaiveDateTime::from_timestamp_millis(
     412          209000 :                 rng.gen_range(1703862754..1803862754),
     413          209000 :             )
     414          209000 :             .unwrap(),
     415          209000 :             application_name: Some("test".to_owned()),
     416          209000 :             username: Some(hex::encode(rng.gen::<[u8; 4]>())),
     417          209000 :             endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
     418          209000 :             project: Some(hex::encode(rng.gen::<[u8; 16]>())),
     419          209000 :             branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
     420          209000 :             protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
     421          209000 :             region: "us-east-1",
     422          209000 :             error: None,
     423          209000 :         }
     424          209000 :     }
     425                 : 
     426               7 :     fn random_stream(len: usize) -> impl Stream<Item = RequestData> + Unpin {
     427               7 :         let mut rng = StdRng::from_seed([0x39; 32]);
     428               7 :         futures::stream::iter(
     429          209000 :             std::iter::repeat_with(move || generate_request_data(&mut rng)).take(len),
     430               7 :         )
     431               7 :     }
     432                 : 
     433               5 :     async fn run_test(
     434               5 :         tmpdir: &Utf8Path,
     435               5 :         config: ParquetConfig,
     436               5 :         rx: impl Stream<Item = RequestData>,
     437               5 :     ) -> Vec<(u64, usize, i64)> {
     438               5 :         let remote_storage_config = RemoteStorageConfig {
     439               5 :             storage: RemoteStorageKind::LocalFs(tmpdir.to_path_buf()),
     440               5 :         };
     441               5 :         let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap();
     442               5 : 
     443             300 :         worker_inner(storage, rx, config).await.unwrap();
     444               5 : 
     445               5 :         let mut files = std::fs::read_dir(tmpdir.as_std_path())
     446               5 :             .unwrap()
     447              31 :             .map(|entry| entry.unwrap().path())
     448               5 :             .collect_vec();
     449               5 :         files.sort();
     450               5 : 
     451               5 :         files
     452               5 :             .into_iter()
     453              31 :             .map(|path| std::fs::File::open(tmpdir.as_std_path().join(path)).unwrap())
     454              31 :             .map(|file| {
     455              31 :                 (
     456              31 :                     file.metadata().unwrap(),
     457              31 :                     SerializedFileReader::new(file).unwrap().metadata().clone(),
     458              31 :                 )
     459              31 :             })
     460              31 :             .map(|(file_meta, parquet_meta)| {
     461              31 :                 (
     462              31 :                     file_meta.len(),
     463              31 :                     parquet_meta.num_row_groups(),
     464              31 :                     parquet_meta.file_metadata().num_rows(),
     465              31 :                 )
     466              31 :             })
     467               5 :             .collect()
     468               5 :     }
     469                 : 
     470               1 :     #[tokio::test]
     471               1 :     async fn verify_parquet_no_compression() {
     472               1 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     473               1 : 
     474               1 :         let config = ParquetConfig {
     475               1 :             propeties: Arc::new(WriterProperties::new()),
     476               1 :             rows_per_group: 2_000,
     477               1 :             file_size: 1_000_000,
     478               1 :             max_duration: time::Duration::from_secs(20 * 60),
     479               1 :             test_remote_failures: 0,
     480               1 :         };
     481               1 : 
     482               1 :         let rx = random_stream(50_000);
     483              61 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     484                 : 
     485               1 :         assert_eq!(
     486               1 :             file_stats,
     487               1 :             [
     488               1 :                 (1029153, 3, 6000),
     489               1 :                 (1029075, 3, 6000),
     490               1 :                 (1029216, 3, 6000),
     491               1 :                 (1029129, 3, 6000),
     492               1 :                 (1029250, 3, 6000),
     493               1 :                 (1029017, 3, 6000),
     494               1 :                 (1029175, 3, 6000),
     495               1 :                 (1029247, 3, 6000),
     496               1 :                 (343124, 1, 2000)
     497               1 :             ],
     498               1 :         );
     499                 : 
     500               1 :         tmpdir.close().unwrap();
     501                 :     }
     502                 : 
     503               1 :     #[tokio::test]
     504               1 :     async fn verify_parquet_min_compression() {
     505               1 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     506               1 : 
     507               1 :         let config = ParquetConfig {
     508               1 :             propeties: Arc::new(
     509               1 :                 WriterProperties::builder()
     510               1 :                     .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::default()))
     511               1 :                     .build(),
     512               1 :             ),
     513               1 :             rows_per_group: 2_000,
     514               1 :             file_size: 1_000_000,
     515               1 :             max_duration: time::Duration::from_secs(20 * 60),
     516               1 :             test_remote_failures: 0,
     517               1 :         };
     518               1 : 
     519               1 :         let rx = random_stream(50_000);
     520              45 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     521                 : 
     522                 :         // with compression, there are fewer files with more rows per file
     523               1 :         assert_eq!(
     524               1 :             file_stats,
     525               1 :             [
     526               1 :                 (1166201, 6, 12000),
     527               1 :                 (1163577, 6, 12000),
     528               1 :                 (1164641, 6, 12000),
     529               1 :                 (1168772, 6, 12000),
     530               1 :                 (196761, 1, 2000)
     531               1 :             ],
     532               1 :         );
     533                 : 
     534               1 :         tmpdir.close().unwrap();
     535                 :     }
     536                 : 
     537               1 :     #[tokio::test]
     538               1 :     async fn verify_parquet_strong_compression() {
     539               1 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     540               1 : 
     541               1 :         let config = ParquetConfig {
     542               1 :             propeties: Arc::new(
     543               1 :                 WriterProperties::builder()
     544               1 :                     .set_compression(parquet::basic::Compression::ZSTD(
     545               1 :                         ZstdLevel::try_new(10).unwrap(),
     546               1 :                     ))
     547               1 :                     .build(),
     548               1 :             ),
     549               1 :             rows_per_group: 2_000,
     550               1 :             file_size: 1_000_000,
     551               1 :             max_duration: time::Duration::from_secs(20 * 60),
     552               1 :             test_remote_failures: 0,
     553               1 :         };
     554               1 : 
     555               1 :         let rx = random_stream(50_000);
     556              45 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     557                 : 
     558                 :         // with strong compression, the files are smaller
     559               1 :         assert_eq!(
     560               1 :             file_stats,
     561               1 :             [
     562               1 :                 (1144934, 6, 12000),
     563               1 :                 (1144941, 6, 12000),
     564               1 :                 (1144735, 6, 12000),
     565               1 :                 (1144936, 6, 12000),
     566               1 :                 (191035, 1, 2000)
     567               1 :             ],
     568               1 :         );
     569                 : 
     570               1 :         tmpdir.close().unwrap();
     571                 :     }
     572                 : 
     573               1 :     #[tokio::test]
     574               1 :     async fn verify_parquet_unreliable_upload() {
     575               1 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     576               1 : 
     577               1 :         let config = ParquetConfig {
     578               1 :             propeties: Arc::new(WriterProperties::new()),
     579               1 :             rows_per_group: 2_000,
     580               1 :             file_size: 1_000_000,
     581               1 :             max_duration: time::Duration::from_secs(20 * 60),
     582               1 :             test_remote_failures: 2,
     583               1 :         };
     584               1 : 
     585               1 :         let rx = random_stream(50_000);
     586              61 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     587                 : 
     588               1 :         assert_eq!(
     589               1 :             file_stats,
     590               1 :             [
     591               1 :                 (1029153, 3, 6000),
     592               1 :                 (1029075, 3, 6000),
     593               1 :                 (1029216, 3, 6000),
     594               1 :                 (1029129, 3, 6000),
     595               1 :                 (1029250, 3, 6000),
     596               1 :                 (1029017, 3, 6000),
     597               1 :                 (1029175, 3, 6000),
     598               1 :                 (1029247, 3, 6000),
     599               1 :                 (343124, 1, 2000)
     600               1 :             ],
     601               1 :         );
     602                 : 
     603               1 :         tmpdir.close().unwrap();
     604                 :     }
     605                 : 
     606               1 :     #[tokio::test(start_paused = true)]
     607               1 :     async fn verify_parquet_regular_upload() {
     608               1 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     609               1 : 
     610               1 :         let config = ParquetConfig {
     611               1 :             propeties: Arc::new(WriterProperties::new()),
     612               1 :             rows_per_group: 2_000,
     613               1 :             file_size: 1_000_000,
     614               1 :             max_duration: time::Duration::from_secs(60),
     615               1 :             test_remote_failures: 2,
     616               1 :         };
     617               1 : 
     618               1 :         let (tx, mut rx) = mpsc::unbounded_channel();
     619               1 : 
     620               1 :         tokio::spawn(async move {
     621               4 :             for _ in 0..3 {
     622               3 :                 let mut s = random_stream(3000);
     623            9003 :                 while let Some(r) = s.next().await {
     624            9000 :                     tx.send(r).unwrap();
     625            9000 :                 }
     626               3 :                 time::sleep(time::Duration::from_secs(70)).await
     627                 :             }
     628               1 :         });
     629               1 : 
     630            9071 :         let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
     631              88 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     632                 : 
     633                 :         // files are smaller than the size threshold, but they took too long to fill so were flushed early
     634               1 :         assert_eq!(
     635               1 :             file_stats,
     636               1 :             [(515807, 2, 3001), (515585, 2, 3000), (515425, 2, 2999)],
     637               1 :         );
     638                 : 
     639               1 :         tmpdir.close().unwrap();
     640                 :     }
     641                 : }
        

Generated by: LCOV version 2.1-beta