LCOV - code coverage report
Current view: top level - proxy/src/context - parquet.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 86.9 % 497 432
Test Date: 2024-02-14 18:05:35 Functions: 55.6 % 151 84

            Line data    Source code
       1              : use std::{sync::Arc, time::SystemTime};
       2              : 
       3              : use anyhow::Context;
       4              : use bytes::{buf::Writer, BufMut, BytesMut};
       5              : use chrono::{Datelike, Timelike};
       6              : use futures::{Stream, StreamExt};
       7              : use parquet::{
       8              :     basic::Compression,
       9              :     file::{
      10              :         metadata::RowGroupMetaDataPtr,
      11              :         properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE},
      12              :         writer::SerializedFileWriter,
      13              :     },
      14              :     record::RecordWriter,
      15              : };
      16              : use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig};
      17              : use tokio::{sync::mpsc, time};
      18              : use tokio_util::sync::CancellationToken;
      19              : use tracing::{debug, info, Span};
      20              : use utils::backoff;
      21              : 
      22              : use super::{RequestMonitoring, LOG_CHAN};
      23              : 
      24           62 : #[derive(clap::Args, Clone, Debug)]
      25              : pub struct ParquetUploadArgs {
      26              :     /// Storage location to upload the parquet files to.
      27              :     /// Encoded as toml (same format as pageservers), eg
      28              :     /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
      29              :     #[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
      30            0 :     parquet_upload_remote_storage: OptRemoteStorageConfig,
      31              : 
      32              :     /// How many rows to include in a row group
      33           31 :     #[clap(long, default_value_t = 8192)]
      34            0 :     parquet_upload_row_group_size: usize,
      35              : 
      36              :     /// How large each column page should be in bytes
      37           31 :     #[clap(long, default_value_t = DEFAULT_PAGE_SIZE)]
      38            0 :     parquet_upload_page_size: usize,
      39              : 
      40              :     /// How large the total parquet file should be in bytes
      41           31 :     #[clap(long, default_value_t = 100_000_000)]
      42            0 :     parquet_upload_size: i64,
      43              : 
      44              :     /// How long to wait before forcing a file upload
      45              :     #[clap(long, default_value = "20m", value_parser = humantime::parse_duration)]
      46            0 :     parquet_upload_maximum_duration: tokio::time::Duration,
      47              : 
      48              :     /// What level of compression to use
      49           31 :     #[clap(long, default_value_t = Compression::UNCOMPRESSED)]
      50            0 :     parquet_upload_compression: Compression,
      51              : }
      52              : 
      53              : /// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
      54              : /// runtime type errors from the value parser we use.
      55              : type OptRemoteStorageConfig = Option<RemoteStorageConfig>;
      56              : 
      57           62 : fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
      58           62 :     RemoteStorageConfig::from_toml(&s.parse()?)
      59           62 : }
      60              : 
      61              : // Occasional network issues and such can cause remote operations to fail, and
      62              : // that's expected. If a upload fails, we log it at info-level, and retry.
      63              : // But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
      64              : // level instead, as repeated failures can mean a more serious problem. If it
      65              : // fails more than FAILED_UPLOAD_RETRIES times, we give up
      66              : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
      67              : pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
      68              : 
      69              : // the parquet crate leaves a lot to be desired...
      70              : // what follows is an attempt to write parquet files with minimal allocs.
      71              : // complication: parquet is a columnar format, while we want to write in as rows.
      72              : // design:
      73              : // * we batch up to 1024 rows, then flush them into a 'row group'
      74              : // * after each rowgroup write, we check the length of the file and upload to s3 if large enough
      75              : 
      76      6688010 : #[derive(parquet_derive::ParquetRecordWriter)]
      77              : struct RequestData {
      78              :     region: &'static str,
      79              :     protocol: &'static str,
      80              :     /// Must be UTC. The derive macro doesn't like the timezones
      81              :     timestamp: chrono::NaiveDateTime,
      82              :     session_id: uuid::Uuid,
      83              :     peer_addr: String,
      84              :     username: Option<String>,
      85              :     application_name: Option<String>,
      86              :     endpoint_id: Option<String>,
      87              :     database: Option<String>,
      88              :     project: Option<String>,
      89              :     branch: Option<String>,
      90              :     auth_method: Option<&'static str>,
      91              :     error: Option<&'static str>,
      92              :     /// Success is counted if we form a HTTP response with sql rows inside
      93              :     /// Or if we make it to proxy_pass
      94              :     success: bool,
      95              :     /// Tracks time from session start (HTTP request/libpq TCP handshake)
      96              :     /// Through to success/failure
      97              :     duration_us: u64,
      98              : }
      99              : 
     100              : impl From<RequestMonitoring> for RequestData {
     101            0 :     fn from(value: RequestMonitoring) -> Self {
     102            0 :         Self {
     103            0 :             session_id: value.session_id,
     104            0 :             peer_addr: value.peer_addr.to_string(),
     105            0 :             timestamp: value.first_packet.naive_utc(),
     106            0 :             username: value.user.as_deref().map(String::from),
     107            0 :             application_name: value.application.as_deref().map(String::from),
     108            0 :             endpoint_id: value.endpoint_id.as_deref().map(String::from),
     109            0 :             database: value.dbname.as_deref().map(String::from),
     110            0 :             project: value.project.as_deref().map(String::from),
     111            0 :             branch: value.branch.as_deref().map(String::from),
     112            0 :             auth_method: value.auth_method.as_ref().map(|x| match x {
     113            0 :                 super::AuthMethod::Web => "web",
     114            0 :                 super::AuthMethod::ScramSha256 => "scram_sha_256",
     115            0 :                 super::AuthMethod::ScramSha256Plus => "scram_sha_256_plus",
     116            0 :                 super::AuthMethod::Cleartext => "cleartext",
     117            0 :             }),
     118            0 :             protocol: value.protocol,
     119            0 :             region: value.region,
     120            0 :             error: value.error_kind.as_ref().map(|e| e.to_metric_label()),
     121            0 :             success: value.success,
     122            0 :             duration_us: SystemTime::from(value.first_packet)
     123            0 :                 .elapsed()
     124            0 :                 .unwrap_or_default()
     125            0 :                 .as_micros() as u64, // 584 millenia... good enough
     126            0 :         }
     127            0 :     }
     128              : }
     129              : 
     130              : /// Parquet request context worker
     131              : ///
     132              : /// It listened on a channel for all completed requests, extracts the data and writes it into a parquet file,
     133              : /// then uploads a completed batch to S3
     134           25 : pub async fn worker(
     135           25 :     cancellation_token: CancellationToken,
     136           25 :     config: ParquetUploadArgs,
     137           25 : ) -> anyhow::Result<()> {
     138           25 :     let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
     139           25 :         tracing::warn!("parquet request upload: no s3 bucket configured");
     140           25 :         return Ok(());
     141              :     };
     142              : 
     143            0 :     let (tx, mut rx) = mpsc::unbounded_channel();
     144            0 :     LOG_CHAN.set(tx.downgrade()).unwrap();
     145            0 : 
     146            0 :     // setup row stream that will close on cancellation
     147            0 :     tokio::spawn(async move {
     148            0 :         cancellation_token.cancelled().await;
     149              :         // dropping this sender will cause the channel to close only once
     150              :         // all the remaining inflight requests have been completed.
     151            0 :         drop(tx);
     152            0 :     });
     153            0 :     let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
     154            0 :     let rx = rx.map(RequestData::from);
     155              : 
     156            0 :     let storage =
     157            0 :         GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?;
     158              : 
     159            0 :     let properties = WriterProperties::builder()
     160            0 :         .set_data_page_size_limit(config.parquet_upload_page_size)
     161            0 :         .set_compression(config.parquet_upload_compression);
     162            0 : 
     163            0 :     let parquet_config = ParquetConfig {
     164            0 :         propeties: Arc::new(properties.build()),
     165            0 :         rows_per_group: config.parquet_upload_row_group_size,
     166            0 :         file_size: config.parquet_upload_size,
     167            0 :         max_duration: config.parquet_upload_maximum_duration,
     168            0 : 
     169            0 :         #[cfg(any(test, feature = "testing"))]
     170            0 :         test_remote_failures: 0,
     171            0 :     };
     172            0 : 
     173            0 :     worker_inner(storage, rx, parquet_config).await
     174           25 : }
     175              : 
     176              : struct ParquetConfig {
     177              :     propeties: WriterPropertiesPtr,
     178              :     rows_per_group: usize,
     179              :     file_size: i64,
     180              : 
     181              :     max_duration: tokio::time::Duration,
     182              : 
     183              :     #[cfg(any(test, feature = "testing"))]
     184              :     test_remote_failures: u64,
     185              : }
     186              : 
     187           10 : async fn worker_inner(
     188           10 :     storage: GenericRemoteStorage,
     189           10 :     rx: impl Stream<Item = RequestData>,
     190           10 :     config: ParquetConfig,
     191           10 : ) -> anyhow::Result<()> {
     192              :     #[cfg(any(test, feature = "testing"))]
     193           10 :     let storage = if config.test_remote_failures > 0 {
     194            4 :         GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures)
     195              :     } else {
     196            6 :         storage
     197              :     };
     198              : 
     199           10 :     let mut rx = std::pin::pin!(rx);
     200           10 : 
     201           10 :     let mut rows = Vec::with_capacity(config.rows_per_group);
     202              : 
     203           10 :     let schema = rows.as_slice().schema()?;
     204           10 :     let buffer = BytesMut::new();
     205           10 :     let w = buffer.writer();
     206           10 :     let mut w = SerializedFileWriter::new(w, schema.clone(), config.propeties.clone())?;
     207              : 
     208           10 :     let mut last_upload = time::Instant::now();
     209           10 : 
     210           10 :     let mut len = 0;
     211       418010 :     while let Some(row) = rx.next().await {
     212       418000 :         rows.push(row);
     213       418000 :         let force = last_upload.elapsed() > config.max_duration;
     214       418000 :         if rows.len() == config.rows_per_group || force {
     215          210 :             let rg_meta;
     216          210 :             (rows, w, rg_meta) = flush_rows(rows, w).await?;
     217          210 :             len += rg_meta.compressed_size();
     218       417790 :         }
     219       418000 :         if len > config.file_size || force {
     220           56 :             last_upload = time::Instant::now();
     221          234 :             let file = upload_parquet(w, len, &storage).await?;
     222           56 :             w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
     223           56 :             len = 0;
     224       417944 :         }
     225              :     }
     226              : 
     227           10 :     if !rows.is_empty() {
     228            2 :         let rg_meta;
     229            2 :         (_, w, rg_meta) = flush_rows(rows, w).await?;
     230            2 :         len += rg_meta.compressed_size();
     231            8 :     }
     232              : 
     233           10 :     if !w.flushed_row_groups().is_empty() {
     234           24 :         let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
     235            4 :     }
     236              : 
     237           10 :     Ok(())
     238           10 : }
     239              : 
     240          212 : async fn flush_rows<W>(
     241          212 :     rows: Vec<RequestData>,
     242          212 :     mut w: SerializedFileWriter<W>,
     243          212 : ) -> anyhow::Result<(
     244          212 :     Vec<RequestData>,
     245          212 :     SerializedFileWriter<W>,
     246          212 :     RowGroupMetaDataPtr,
     247          212 : )>
     248          212 : where
     249          212 :     W: std::io::Write + Send + 'static,
     250          212 : {
     251          212 :     let span = Span::current();
     252          212 :     let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || {
     253          212 :         let _enter = span.enter();
     254              : 
     255          212 :         let mut rg = w.next_row_group()?;
     256          212 :         rows.as_slice().write_to_row_group(&mut rg)?;
     257          212 :         let rg_meta = rg.close()?;
     258              : 
     259          212 :         let size = rg_meta.compressed_size();
     260          212 :         let compression = rg_meta.compressed_size() as f64 / rg_meta.total_byte_size() as f64;
     261          212 : 
     262          212 :         debug!(size, compression, "flushed row group to parquet file");
     263              : 
     264          212 :         Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
     265          212 :     })
     266          212 :     .await
     267          212 :     .unwrap()?;
     268              : 
     269          212 :     rows.clear();
     270          212 :     Ok((rows, w, rg_meta))
     271          212 : }
     272              : 
     273           62 : async fn upload_parquet(
     274           62 :     w: SerializedFileWriter<Writer<BytesMut>>,
     275           62 :     len: i64,
     276           62 :     storage: &GenericRemoteStorage,
     277           62 : ) -> anyhow::Result<Writer<BytesMut>> {
     278           62 :     let len_uncompressed = w
     279           62 :         .flushed_row_groups()
     280           62 :         .iter()
     281          212 :         .map(|rg| rg.total_byte_size())
     282           62 :         .sum::<i64>();
     283              : 
     284              :     // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
     285              :     // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
     286           62 :     let (writer, metadata) = tokio::task::spawn_blocking(move || w.finish())
     287           62 :         .await
     288           62 :         .unwrap()?;
     289              : 
     290           62 :     let mut buffer = writer.into_inner();
     291           62 :     let data = buffer.split().freeze();
     292           62 : 
     293           62 :     let compression = len as f64 / len_uncompressed as f64;
     294           62 :     let size = data.len();
     295           62 :     let now = chrono::Utc::now();
     296           62 :     let id = uuid::Uuid::new_v7(uuid::Timestamp::from_unix(
     297           62 :         uuid::NoContext,
     298           62 :         // we won't be running this in 1970. this cast is ok
     299           62 :         now.timestamp() as u64,
     300           62 :         now.timestamp_subsec_nanos(),
     301           62 :     ));
     302              : 
     303            0 :     info!(
     304            0 :         %id,
     305            0 :         rows = metadata.num_rows,
     306            0 :         size, compression, "uploading request parquet file"
     307            0 :     );
     308              : 
     309           62 :     let year = now.year();
     310           62 :     let month = now.month();
     311           62 :     let day = now.day();
     312           62 :     let hour = now.hour();
     313              :     // segment files by time for S3 performance
     314           62 :     let path = RemotePath::from_string(&format!(
     315           62 :         "{year:04}/{month:02}/{day:02}/{hour:02}/requests_{id}.parquet"
     316           62 :     ))?;
     317           62 :     backoff::retry(
     318           86 :         || async {
     319           86 :             let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
     320          196 :             storage.upload(stream, data.len(), &path, None).await
     321          172 :         },
     322           62 :         |_e| false,
     323           62 :         FAILED_UPLOAD_WARN_THRESHOLD,
     324           62 :         FAILED_UPLOAD_MAX_RETRIES,
     325           62 :         "request_data_upload",
     326           62 :         // we don't want cancellation to interrupt here, so we make a dummy cancel token
     327           62 :         &CancellationToken::new(),
     328           62 :     )
     329          196 :     .await
     330           62 :     .ok_or_else(|| anyhow::anyhow!("Cancelled"))
     331           62 :     .and_then(|x| x)
     332           62 :     .context("request_data_upload")?;
     333              : 
     334           62 :     Ok(buffer.writer())
     335           62 : }
     336              : 
     337              : #[cfg(test)]
     338              : mod tests {
     339              :     use std::{net::Ipv4Addr, num::NonZeroUsize, sync::Arc};
     340              : 
     341              :     use camino::Utf8Path;
     342              :     use clap::Parser;
     343              :     use futures::{Stream, StreamExt};
     344              :     use itertools::Itertools;
     345              :     use parquet::{
     346              :         basic::{Compression, ZstdLevel},
     347              :         file::{
     348              :             properties::{WriterProperties, DEFAULT_PAGE_SIZE},
     349              :             reader::FileReader,
     350              :             serialized_reader::SerializedFileReader,
     351              :         },
     352              :     };
     353              :     use rand::{rngs::StdRng, Rng, SeedableRng};
     354              :     use remote_storage::{
     355              :         GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config,
     356              :         DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
     357              :     };
     358              :     use tokio::{sync::mpsc, time};
     359              :     use walkdir::WalkDir;
     360              : 
     361              :     use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};
     362              : 
     363            4 :     #[derive(Parser)]
     364              :     struct ProxyCliArgs {
     365              :         #[clap(flatten)]
     366              :         parquet_upload: ParquetUploadArgs,
     367              :     }
     368              : 
     369            2 :     #[test]
     370            2 :     fn default_parser() {
     371            2 :         let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from(["proxy"]);
     372            2 :         assert_eq!(parquet_upload.parquet_upload_remote_storage, None);
     373            2 :         assert_eq!(parquet_upload.parquet_upload_row_group_size, 8192);
     374            2 :         assert_eq!(parquet_upload.parquet_upload_page_size, DEFAULT_PAGE_SIZE);
     375            2 :         assert_eq!(parquet_upload.parquet_upload_size, 100_000_000);
     376            2 :         assert_eq!(
     377            2 :             parquet_upload.parquet_upload_maximum_duration,
     378            2 :             time::Duration::from_secs(20 * 60)
     379            2 :         );
     380            2 :         assert_eq!(
     381            2 :             parquet_upload.parquet_upload_compression,
     382            2 :             Compression::UNCOMPRESSED
     383            2 :         );
     384            2 :     }
     385              : 
     386            2 :     #[test]
     387            2 :     fn full_parser() {
     388            2 :         let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from([
     389            2 :             "proxy",
     390            2 :             "--parquet-upload-remote-storage",
     391            2 :             "{bucket_name='default',prefix_in_bucket='proxy/',bucket_region='us-east-1',endpoint='http://minio:9000'}",
     392            2 :             "--parquet-upload-row-group-size",
     393            2 :             "100",
     394            2 :             "--parquet-upload-page-size",
     395            2 :             "10000",
     396            2 :             "--parquet-upload-size",
     397            2 :             "10000000",
     398            2 :             "--parquet-upload-maximum-duration",
     399            2 :             "10m",
     400            2 :             "--parquet-upload-compression",
     401            2 :             "zstd(5)",
     402            2 :         ]);
     403            2 :         assert_eq!(
     404            2 :             parquet_upload.parquet_upload_remote_storage,
     405            2 :             Some(RemoteStorageConfig {
     406            2 :                 storage: RemoteStorageKind::AwsS3(S3Config {
     407            2 :                     bucket_name: "default".into(),
     408            2 :                     bucket_region: "us-east-1".into(),
     409            2 :                     prefix_in_bucket: Some("proxy/".into()),
     410            2 :                     endpoint: Some("http://minio:9000".into()),
     411            2 :                     concurrency_limit: NonZeroUsize::new(
     412            2 :                         DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
     413            2 :                     )
     414            2 :                     .unwrap(),
     415            2 :                     max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
     416            2 :                 })
     417            2 :             })
     418            2 :         );
     419            2 :         assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
     420            2 :         assert_eq!(parquet_upload.parquet_upload_page_size, 10000);
     421            2 :         assert_eq!(parquet_upload.parquet_upload_size, 10_000_000);
     422            2 :         assert_eq!(
     423            2 :             parquet_upload.parquet_upload_maximum_duration,
     424            2 :             time::Duration::from_secs(10 * 60)
     425            2 :         );
     426            2 :         assert_eq!(
     427            2 :             parquet_upload.parquet_upload_compression,
     428            2 :             Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
     429            2 :         );
     430            2 :     }
     431              : 
     432       418000 :     fn generate_request_data(rng: &mut impl Rng) -> RequestData {
     433       418000 :         RequestData {
     434       418000 :             session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(),
     435       418000 :             peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(),
     436       418000 :             timestamp: chrono::NaiveDateTime::from_timestamp_millis(
     437       418000 :                 rng.gen_range(1703862754..1803862754),
     438       418000 :             )
     439       418000 :             .unwrap(),
     440       418000 :             application_name: Some("test".to_owned()),
     441       418000 :             username: Some(hex::encode(rng.gen::<[u8; 4]>())),
     442       418000 :             endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
     443       418000 :             database: Some(hex::encode(rng.gen::<[u8; 16]>())),
     444       418000 :             project: Some(hex::encode(rng.gen::<[u8; 16]>())),
     445       418000 :             branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
     446       418000 :             auth_method: None,
     447       418000 :             protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
     448       418000 :             region: "us-east-1",
     449       418000 :             error: None,
     450       418000 :             success: rng.gen(),
     451       418000 :             duration_us: rng.gen_range(0..30_000_000),
     452       418000 :         }
     453       418000 :     }
     454              : 
     455           14 :     fn random_stream(len: usize) -> impl Stream<Item = RequestData> + Unpin {
     456           14 :         let mut rng = StdRng::from_seed([0x39; 32]);
     457           14 :         futures::stream::iter(
     458       418000 :             std::iter::repeat_with(move || generate_request_data(&mut rng)).take(len),
     459           14 :         )
     460           14 :     }
     461              : 
     462           10 :     async fn run_test(
     463           10 :         tmpdir: &Utf8Path,
     464           10 :         config: ParquetConfig,
     465           10 :         rx: impl Stream<Item = RequestData>,
     466           10 :     ) -> Vec<(u64, usize, i64)> {
     467           10 :         let remote_storage_config = RemoteStorageConfig {
     468           10 :             storage: RemoteStorageKind::LocalFs(tmpdir.to_path_buf()),
     469           10 :         };
     470           10 :         let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap();
     471           10 : 
     472          610 :         worker_inner(storage, rx, config).await.unwrap();
     473           10 : 
     474           10 :         let mut files = WalkDir::new(tmpdir.as_std_path())
     475           10 :             .into_iter()
     476          112 :             .filter_map(|entry| entry.ok())
     477          112 :             .filter(|entry| entry.file_type().is_file())
     478           62 :             .map(|entry| entry.path().to_path_buf())
     479           10 :             .collect_vec();
     480           10 :         files.sort();
     481           10 : 
     482           10 :         files
     483           10 :             .into_iter()
     484           62 :             .map(|path| std::fs::File::open(tmpdir.as_std_path().join(path)).unwrap())
     485           62 :             .map(|file| {
     486           62 :                 (
     487           62 :                     file.metadata().unwrap(),
     488           62 :                     SerializedFileReader::new(file).unwrap().metadata().clone(),
     489           62 :                 )
     490           62 :             })
     491           62 :             .map(|(file_meta, parquet_meta)| {
     492           62 :                 (
     493           62 :                     file_meta.len(),
     494           62 :                     parquet_meta.num_row_groups(),
     495           62 :                     parquet_meta.file_metadata().num_rows(),
     496           62 :                 )
     497           62 :             })
     498           10 :             .collect()
     499           10 :     }
     500              : 
     501            2 :     #[tokio::test]
     502            2 :     async fn verify_parquet_no_compression() {
     503            2 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     504            2 : 
     505            2 :         let config = ParquetConfig {
     506            2 :             propeties: Arc::new(WriterProperties::new()),
     507            2 :             rows_per_group: 2_000,
     508            2 :             file_size: 1_000_000,
     509            2 :             max_duration: time::Duration::from_secs(20 * 60),
     510            2 :             test_remote_failures: 0,
     511            2 :         };
     512            2 : 
     513            2 :         let rx = random_stream(50_000);
     514          124 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     515            2 : 
     516            2 :         assert_eq!(
     517            2 :             file_stats,
     518            2 :             [
     519            2 :                 (1313727, 3, 6000),
     520            2 :                 (1313720, 3, 6000),
     521            2 :                 (1313780, 3, 6000),
     522            2 :                 (1313737, 3, 6000),
     523            2 :                 (1313867, 3, 6000),
     524            2 :                 (1313709, 3, 6000),
     525            2 :                 (1313501, 3, 6000),
     526            2 :                 (1313737, 3, 6000),
     527            2 :                 (438118, 1, 2000)
     528            2 :             ],
     529            2 :         );
     530            2 : 
     531            2 :         tmpdir.close().unwrap();
     532            2 :     }
     533              : 
     534            2 :     #[tokio::test]
     535            2 :     async fn verify_parquet_min_compression() {
     536            2 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     537            2 : 
     538            2 :         let config = ParquetConfig {
     539            2 :             propeties: Arc::new(
     540            2 :                 WriterProperties::builder()
     541            2 :                     .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::default()))
     542            2 :                     .build(),
     543            2 :             ),
     544            2 :             rows_per_group: 2_000,
     545            2 :             file_size: 1_000_000,
     546            2 :             max_duration: time::Duration::from_secs(20 * 60),
     547            2 :             test_remote_failures: 0,
     548            2 :         };
     549            2 : 
     550            2 :         let rx = random_stream(50_000);
     551           92 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     552            2 : 
     553            2 :         // with compression, there are fewer files with more rows per file
     554            2 :         assert_eq!(
     555            2 :             file_stats,
     556            2 :             [
     557            2 :                 (1219459, 5, 10000),
     558            2 :                 (1225609, 5, 10000),
     559            2 :                 (1227403, 5, 10000),
     560            2 :                 (1226765, 5, 10000),
     561            2 :                 (1218043, 5, 10000)
     562            2 :             ],
     563            2 :         );
     564            2 : 
     565            2 :         tmpdir.close().unwrap();
     566            2 :     }
     567              : 
     568            2 :     #[tokio::test]
     569            2 :     async fn verify_parquet_strong_compression() {
     570            2 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     571            2 : 
     572            2 :         let config = ParquetConfig {
     573            2 :             propeties: Arc::new(
     574            2 :                 WriterProperties::builder()
     575            2 :                     .set_compression(parquet::basic::Compression::ZSTD(
     576            2 :                         ZstdLevel::try_new(10).unwrap(),
     577            2 :                     ))
     578            2 :                     .build(),
     579            2 :             ),
     580            2 :             rows_per_group: 2_000,
     581            2 :             file_size: 1_000_000,
     582            2 :             max_duration: time::Duration::from_secs(20 * 60),
     583            2 :             test_remote_failures: 0,
     584            2 :         };
     585            2 : 
     586            2 :         let rx = random_stream(50_000);
     587           92 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     588            2 : 
     589            2 :         // with strong compression, the files are smaller
     590            2 :         assert_eq!(
     591            2 :             file_stats,
     592            2 :             [
     593            2 :                 (1205106, 5, 10000),
     594            2 :                 (1204837, 5, 10000),
     595            2 :                 (1205130, 5, 10000),
     596            2 :                 (1205118, 5, 10000),
     597            2 :                 (1205373, 5, 10000)
     598            2 :             ],
     599            2 :         );
     600            2 : 
     601            2 :         tmpdir.close().unwrap();
     602            2 :     }
     603              : 
     604            2 :     #[tokio::test]
     605            2 :     async fn verify_parquet_unreliable_upload() {
     606            2 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     607            2 : 
     608            2 :         let config = ParquetConfig {
     609            2 :             propeties: Arc::new(WriterProperties::new()),
     610            2 :             rows_per_group: 2_000,
     611            2 :             file_size: 1_000_000,
     612            2 :             max_duration: time::Duration::from_secs(20 * 60),
     613            2 :             test_remote_failures: 2,
     614            2 :         };
     615            2 : 
     616            2 :         let rx = random_stream(50_000);
     617          124 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     618            2 : 
     619            2 :         assert_eq!(
     620            2 :             file_stats,
     621            2 :             [
     622            2 :                 (1313727, 3, 6000),
     623            2 :                 (1313720, 3, 6000),
     624            2 :                 (1313780, 3, 6000),
     625            2 :                 (1313737, 3, 6000),
     626            2 :                 (1313867, 3, 6000),
     627            2 :                 (1313709, 3, 6000),
     628            2 :                 (1313501, 3, 6000),
     629            2 :                 (1313737, 3, 6000),
     630            2 :                 (438118, 1, 2000)
     631            2 :             ],
     632            2 :         );
     633            2 : 
     634            2 :         tmpdir.close().unwrap();
     635            2 :     }
     636              : 
     637            2 :     #[tokio::test(start_paused = true)]
     638            2 :     async fn verify_parquet_regular_upload() {
     639            2 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     640            2 : 
     641            2 :         let config = ParquetConfig {
     642            2 :             propeties: Arc::new(WriterProperties::new()),
     643            2 :             rows_per_group: 2_000,
     644            2 :             file_size: 1_000_000,
     645            2 :             max_duration: time::Duration::from_secs(60),
     646            2 :             test_remote_failures: 2,
     647            2 :         };
     648            2 : 
     649            2 :         let (tx, mut rx) = mpsc::unbounded_channel();
     650            2 : 
     651            2 :         tokio::spawn(async move {
     652            8 :             for _ in 0..3 {
     653            6 :                 let mut s = random_stream(3000);
     654        18006 :                 while let Some(r) = s.next().await {
     655        18000 :                     tx.send(r).unwrap();
     656        18000 :                 }
     657            6 :                 time::sleep(time::Duration::from_secs(70)).await
     658            2 :             }
     659            2 :         });
     660            2 : 
     661        18142 :         let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
     662          178 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     663            2 : 
     664            2 :         // files are smaller than the size threshold, but they took too long to fill so were flushed early
     665            2 :         assert_eq!(
     666            2 :             file_stats,
     667            2 :             [(658383, 2, 3001), (658097, 2, 3000), (657893, 2, 2999)],
     668            2 :         );
     669            2 : 
     670            2 :         tmpdir.close().unwrap();
     671            2 :     }
     672              : }
        

Generated by: LCOV version 2.1-beta