LCOV - code coverage report
Current view: top level - proxy/src/context - parquet.rs (source / functions) Coverage Total Hit
Test: 727bdccc1d7d53837da843959afb612f56da4e79.info Lines: 78.9 % 530 418
Test Date: 2025-01-30 15:18:43 Functions: 56.2 % 105 59

            Line data    Source code
       1              : use std::sync::Arc;
       2              : use std::time::SystemTime;
       3              : 
       4              : use anyhow::Context;
       5              : use bytes::buf::Writer;
       6              : use bytes::{BufMut, BytesMut};
       7              : use chrono::{Datelike, Timelike};
       8              : use futures::{Stream, StreamExt};
       9              : use parquet::basic::Compression;
      10              : use parquet::file::metadata::RowGroupMetaDataPtr;
      11              : use parquet::file::properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE};
      12              : use parquet::file::writer::SerializedFileWriter;
      13              : use parquet::record::RecordWriter;
      14              : use pq_proto::StartupMessageParams;
      15              : use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel};
      16              : use serde::ser::SerializeMap;
      17              : use tokio::sync::mpsc;
      18              : use tokio::time;
      19              : use tokio_util::sync::CancellationToken;
      20              : use tracing::{debug, info, Span};
      21              : use utils::backoff;
      22              : 
      23              : use super::{RequestContextInner, LOG_CHAN};
      24              : use crate::config::remote_storage_from_toml;
      25              : use crate::context::LOG_CHAN_DISCONNECT;
      26              : use crate::ext::TaskExt;
      27              : 
      28              : #[derive(clap::Args, Clone, Debug)]
      29              : pub struct ParquetUploadArgs {
      30              :     /// Storage location to upload the parquet files to.
      31              :     /// Encoded as toml (same format as pageservers), eg
      32              :     /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
      33              :     #[clap(long, value_parser = remote_storage_from_toml)]
      34              :     parquet_upload_remote_storage: Option<RemoteStorageConfig>,
      35              : 
      36              :     #[clap(long, value_parser = remote_storage_from_toml)]
      37              :     parquet_upload_disconnect_events_remote_storage: Option<RemoteStorageConfig>,
      38              : 
      39              :     /// How many rows to include in a row group
      40            3 :     #[clap(long, default_value_t = 8192)]
      41            0 :     parquet_upload_row_group_size: usize,
      42              : 
      43              :     /// How large each column page should be in bytes
      44            3 :     #[clap(long, default_value_t = DEFAULT_PAGE_SIZE)]
      45            0 :     parquet_upload_page_size: usize,
      46              : 
      47              :     /// How large the total parquet file should be in bytes
      48            3 :     #[clap(long, default_value_t = 100_000_000)]
      49            0 :     parquet_upload_size: i64,
      50              : 
      51              :     /// How long to wait before forcing a file upload
      52              :     #[clap(long, default_value = "20m", value_parser = humantime::parse_duration)]
      53            0 :     parquet_upload_maximum_duration: tokio::time::Duration,
      54              : 
      55              :     /// What level of compression to use
      56            3 :     #[clap(long, default_value_t = Compression::UNCOMPRESSED)]
      57            0 :     parquet_upload_compression: Compression,
      58              : }
      59              : 
      60              : // Occasional network issues and such can cause remote operations to fail, and
      61              : // that's expected. If a upload fails, we log it at info-level, and retry.
      62              : // But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
      63              : // level instead, as repeated failures can mean a more serious problem. If it
      64              : // fails more than FAILED_UPLOAD_RETRIES times, we give up
      65              : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
      66              : pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
      67              : 
      68              : // the parquet crate leaves a lot to be desired...
      69              : // what follows is an attempt to write parquet files with minimal allocs.
      70              : // complication: parquet is a columnar format, while we want to write in as rows.
      71              : // design:
      72              : // * we batch up to 1024 rows, then flush them into a 'row group'
      73              : // * after each rowgroup write, we check the length of the file and upload to s3 if large enough
      74              : 
      75      1272000 : #[derive(parquet_derive::ParquetRecordWriter)]
      76              : pub(crate) struct RequestData {
      77              :     region: &'static str,
      78              :     protocol: &'static str,
      79              :     /// Must be UTC. The derive macro doesn't like the timezones
      80              :     timestamp: chrono::NaiveDateTime,
      81              :     session_id: uuid::Uuid,
      82              :     peer_addr: String,
      83              :     username: Option<String>,
      84              :     application_name: Option<String>,
      85              :     endpoint_id: Option<String>,
      86              :     database: Option<String>,
      87              :     project: Option<String>,
      88              :     branch: Option<String>,
      89              :     pg_options: Option<String>,
      90              :     auth_method: Option<&'static str>,
      91              :     jwt_issuer: Option<String>,
      92              : 
      93              :     error: Option<&'static str>,
      94              :     /// Success is counted if we form a HTTP response with sql rows inside
      95              :     /// Or if we make it to proxy_pass
      96              :     success: bool,
      97              :     /// Indicates if the cplane started the new compute node for this request.
      98              :     cold_start_info: &'static str,
      99              :     /// Tracks time from session start (HTTP request/libpq TCP handshake)
     100              :     /// Through to success/failure
     101              :     duration_us: u64,
     102              :     /// If the session was successful after the disconnect, will be created one more event with filled `disconnect_timestamp`.
     103              :     disconnect_timestamp: Option<chrono::NaiveDateTime>,
     104              : }
     105              : 
     106              : struct Options<'a> {
     107              :     options: &'a StartupMessageParams,
     108              : }
     109              : 
     110              : impl serde::Serialize for Options<'_> {
     111            0 :     fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
     112            0 :     where
     113            0 :         S: serde::Serializer,
     114            0 :     {
     115            0 :         let mut state = s.serialize_map(None)?;
     116            0 :         for (k, v) in self.options.iter() {
     117            0 :             state.serialize_entry(k, v)?;
     118              :         }
     119            0 :         state.end()
     120            0 :     }
     121              : }
     122              : 
     123              : impl From<&RequestContextInner> for RequestData {
     124            0 :     fn from(value: &RequestContextInner) -> Self {
     125            0 :         Self {
     126            0 :             session_id: value.session_id,
     127            0 :             peer_addr: value.conn_info.addr.ip().to_string(),
     128            0 :             timestamp: value.first_packet.naive_utc(),
     129            0 :             username: value.user.as_deref().map(String::from),
     130            0 :             application_name: value.application.as_deref().map(String::from),
     131            0 :             endpoint_id: value.endpoint_id.as_deref().map(String::from),
     132            0 :             database: value.dbname.as_deref().map(String::from),
     133            0 :             project: value.project.as_deref().map(String::from),
     134            0 :             branch: value.branch.as_deref().map(String::from),
     135            0 :             pg_options: value
     136            0 :                 .pg_options
     137            0 :                 .as_ref()
     138            0 :                 .and_then(|options| serde_json::to_string(&Options { options }).ok()),
     139            0 :             auth_method: value.auth_method.as_ref().map(|x| match x {
     140            0 :                 super::AuthMethod::ConsoleRedirect => "console_redirect",
     141            0 :                 super::AuthMethod::ScramSha256 => "scram_sha_256",
     142            0 :                 super::AuthMethod::ScramSha256Plus => "scram_sha_256_plus",
     143            0 :                 super::AuthMethod::Cleartext => "cleartext",
     144            0 :                 super::AuthMethod::Jwt => "jwt",
     145            0 :             }),
     146            0 :             jwt_issuer: value.jwt_issuer.clone(),
     147            0 :             protocol: value.protocol.as_str(),
     148            0 :             region: value.region,
     149            0 :             error: value.error_kind.as_ref().map(|e| e.to_metric_label()),
     150            0 :             success: value.success,
     151            0 :             cold_start_info: value.cold_start_info.as_str(),
     152            0 :             duration_us: SystemTime::from(value.first_packet)
     153            0 :                 .elapsed()
     154            0 :                 .unwrap_or_default()
     155            0 :                 .as_micros() as u64, // 584 millenia... good enough
     156            0 :             disconnect_timestamp: value.disconnect_timestamp.map(|x| x.naive_utc()),
     157            0 :         }
     158            0 :     }
     159              : }
     160              : 
     161              : /// Parquet request context worker
     162              : ///
     163              : /// It listened on a channel for all completed requests, extracts the data and writes it into a parquet file,
     164              : /// then uploads a completed batch to S3
     165            0 : pub async fn worker(
     166            0 :     cancellation_token: CancellationToken,
     167            0 :     config: ParquetUploadArgs,
     168            0 : ) -> anyhow::Result<()> {
     169            0 :     let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
     170            0 :         tracing::warn!("parquet request upload: no s3 bucket configured");
     171            0 :         return Ok(());
     172              :     };
     173              : 
     174            0 :     let (tx, mut rx) = mpsc::unbounded_channel();
     175            0 :     LOG_CHAN
     176            0 :         .set(tx.downgrade())
     177            0 :         .expect("only one worker should set the channel");
     178            0 : 
     179            0 :     // setup row stream that will close on cancellation
     180            0 :     let cancellation_token2 = cancellation_token.clone();
     181            0 :     tokio::spawn(async move {
     182            0 :         cancellation_token2.cancelled().await;
     183              :         // dropping this sender will cause the channel to close only once
     184              :         // all the remaining inflight requests have been completed.
     185            0 :         drop(tx);
     186            0 :     });
     187            0 :     let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
     188            0 :     let rx = rx.map(RequestData::from);
     189            0 : 
     190            0 :     let properties = WriterProperties::builder()
     191            0 :         .set_data_page_size_limit(config.parquet_upload_page_size)
     192            0 :         .set_compression(config.parquet_upload_compression);
     193            0 : 
     194            0 :     let parquet_config = ParquetConfig {
     195            0 :         propeties: Arc::new(properties.build()),
     196            0 :         rows_per_group: config.parquet_upload_row_group_size,
     197            0 :         file_size: config.parquet_upload_size,
     198            0 :         max_duration: config.parquet_upload_maximum_duration,
     199            0 : 
     200            0 :         #[cfg(any(test, feature = "testing"))]
     201            0 :         test_remote_failures: 0,
     202            0 :     };
     203              : 
     204              :     // TODO(anna): consider moving this to a separate function.
     205            0 :     if let Some(disconnect_events_storage_config) =
     206            0 :         config.parquet_upload_disconnect_events_remote_storage
     207              :     {
     208            0 :         let (tx_disconnect, mut rx_disconnect) = mpsc::unbounded_channel();
     209            0 :         LOG_CHAN_DISCONNECT
     210            0 :             .set(tx_disconnect.downgrade())
     211            0 :             .expect("only one worker should set the channel");
     212            0 : 
     213            0 :         // setup row stream that will close on cancellation
     214            0 :         tokio::spawn(async move {
     215            0 :             cancellation_token.cancelled().await;
     216              :             // dropping this sender will cause the channel to close only once
     217              :             // all the remaining inflight requests have been completed.
     218            0 :             drop(tx_disconnect);
     219            0 :         });
     220            0 :         let rx_disconnect = futures::stream::poll_fn(move |cx| rx_disconnect.poll_recv(cx));
     221            0 :         let rx_disconnect = rx_disconnect.map(RequestData::from);
     222            0 : 
     223            0 :         let parquet_config_disconnect = parquet_config.clone();
     224            0 :         tokio::try_join!(
     225            0 :             worker_inner(remote_storage_config, rx, parquet_config),
     226            0 :             worker_inner(
     227            0 :                 disconnect_events_storage_config,
     228            0 :                 rx_disconnect,
     229            0 :                 parquet_config_disconnect
     230            0 :             )
     231            0 :         )
     232            0 :         .map(|_| ())
     233              :     } else {
     234            0 :         worker_inner(remote_storage_config, rx, parquet_config).await
     235              :     }
     236            0 : }
     237              : 
     238              : #[derive(Clone, Debug)]
     239              : struct ParquetConfig {
     240              :     propeties: WriterPropertiesPtr,
     241              :     rows_per_group: usize,
     242              :     file_size: i64,
     243              : 
     244              :     max_duration: tokio::time::Duration,
     245              : 
     246              :     #[cfg(any(test, feature = "testing"))]
     247              :     test_remote_failures: u64,
     248              : }
     249              : 
     250              : impl ParquetConfig {
     251           26 :     async fn storage(
     252           26 :         &self,
     253           26 :         storage_config: &RemoteStorageConfig,
     254           26 :     ) -> anyhow::Result<GenericRemoteStorage> {
     255           26 :         let storage = GenericRemoteStorage::from_config(storage_config)
     256           26 :             .await
     257           26 :             .context("remote storage init")?;
     258              : 
     259              :         #[cfg(any(test, feature = "testing"))]
     260           26 :         if self.test_remote_failures > 0 {
     261           12 :             return Ok(GenericRemoteStorage::unreliable_wrapper(
     262           12 :                 storage,
     263           12 :                 self.test_remote_failures,
     264           12 :             ));
     265           14 :         }
     266           14 : 
     267           14 :         Ok(storage)
     268           26 :     }
     269              : }
     270              : 
     271            4 : async fn worker_inner(
     272            4 :     storage_config: RemoteStorageConfig,
     273            4 :     rx: impl Stream<Item = RequestData>,
     274            4 :     config: ParquetConfig,
     275            4 : ) -> anyhow::Result<()> {
     276            4 :     let mut rx = std::pin::pin!(rx);
     277            4 : 
     278            4 :     let mut rows = Vec::with_capacity(config.rows_per_group);
     279              : 
     280            4 :     let schema = rows.as_slice().schema()?;
     281            4 :     let buffer = BytesMut::new();
     282            4 :     let w = buffer.writer();
     283            4 :     let mut w = SerializedFileWriter::new(w, schema.clone(), config.propeties.clone())?;
     284              : 
     285            4 :     let mut last_upload = time::Instant::now();
     286            4 : 
     287            4 :     let mut len = 0;
     288       159004 :     while let Some(row) = rx.next().await {
     289       159000 :         rows.push(row);
     290       159000 :         let force = last_upload.elapsed() > config.max_duration;
     291       159000 :         if rows.len() == config.rows_per_group || force {
     292              :             let rg_meta;
     293           80 :             (rows, w, rg_meta) = flush_rows(rows, w).await?;
     294           80 :             len += rg_meta.compressed_size();
     295       158920 :         }
     296       159000 :         if len > config.file_size || force {
     297           23 :             last_upload = time::Instant::now();
     298           23 :             let file = upload_parquet(w, len, &storage_config, &config).await?;
     299           23 :             w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
     300           23 :             len = 0;
     301       158977 :         }
     302              :     }
     303              : 
     304            4 :     if !rows.is_empty() {
     305              :         let rg_meta;
     306            1 :         (_, w, rg_meta) = flush_rows(rows, w).await?;
     307            1 :         len += rg_meta.compressed_size();
     308            3 :     }
     309              : 
     310            4 :     if !w.flushed_row_groups().is_empty() {
     311            3 :         let _rtchk: Writer<BytesMut> = upload_parquet(w, len, &storage_config, &config).await?;
     312            1 :     }
     313              : 
     314            4 :     Ok(())
     315            4 : }
     316              : 
     317           81 : async fn flush_rows<W>(
     318           81 :     rows: Vec<RequestData>,
     319           81 :     mut w: SerializedFileWriter<W>,
     320           81 : ) -> anyhow::Result<(
     321           81 :     Vec<RequestData>,
     322           81 :     SerializedFileWriter<W>,
     323           81 :     RowGroupMetaDataPtr,
     324           81 : )>
     325           81 : where
     326           81 :     W: std::io::Write + Send + 'static,
     327           81 : {
     328           81 :     let span = Span::current();
     329           81 :     let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || {
     330           81 :         let _enter = span.enter();
     331              : 
     332           81 :         let mut rg = w.next_row_group()?;
     333           81 :         rows.as_slice().write_to_row_group(&mut rg)?;
     334           81 :         let rg_meta = rg.close()?;
     335              : 
     336           81 :         let size = rg_meta.compressed_size();
     337           81 :         let compression = rg_meta.compressed_size() as f64 / rg_meta.total_byte_size() as f64;
     338           81 : 
     339           81 :         debug!(size, compression, "flushed row group to parquet file");
     340              : 
     341           81 :         Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
     342           81 :     })
     343           81 :     .await
     344           81 :     .propagate_task_panic()?;
     345              : 
     346           81 :     rows.clear();
     347           81 :     Ok((rows, w, rg_meta))
     348           81 : }
     349              : 
     350           26 : async fn upload_parquet(
     351           26 :     mut w: SerializedFileWriter<Writer<BytesMut>>,
     352           26 :     len: i64,
     353           26 :     storage_config: &RemoteStorageConfig,
     354           26 :     config: &ParquetConfig,
     355           26 : ) -> anyhow::Result<Writer<BytesMut>> {
     356           26 :     let len_uncompressed = w
     357           26 :         .flushed_row_groups()
     358           26 :         .iter()
     359           81 :         .map(|rg| rg.total_byte_size())
     360           26 :         .sum::<i64>();
     361              : 
     362              :     // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
     363              :     // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
     364           26 :     let (mut buffer, metadata) =
     365           26 :         tokio::task::spawn_blocking(move || -> parquet::errors::Result<_> {
     366           26 :             let metadata = w.finish()?;
     367           26 :             let buffer = std::mem::take(w.inner_mut().get_mut());
     368           26 :             Ok((buffer, metadata))
     369           26 :         })
     370           26 :         .await
     371           26 :         .propagate_task_panic()?;
     372              : 
     373           26 :     let data = buffer.split().freeze();
     374           26 : 
     375           26 :     let compression = len as f64 / len_uncompressed as f64;
     376           26 :     let size = data.len();
     377           26 :     let now = chrono::Utc::now();
     378           26 :     let id = uuid::Uuid::new_v7(uuid::Timestamp::from_unix(
     379           26 :         uuid::NoContext,
     380           26 :         // we won't be running this in 1970. this cast is ok
     381           26 :         now.timestamp() as u64,
     382           26 :         now.timestamp_subsec_nanos(),
     383           26 :     ));
     384           26 : 
     385           26 :     info!(
     386              :         %id,
     387              :         rows = metadata.num_rows,
     388            0 :         size, compression, "uploading request parquet file"
     389              :     );
     390              : 
     391              :     // A bug in azure-sdk means that the identity-token-file that expires after
     392              :     // 1 hour is not refreshed. This identity-token is used to fetch the actual azure storage
     393              :     // tokens that last for 24 hours. After this 24 hour period, azure-sdk tries to refresh
     394              :     // the storage token, but the identity token has now expired.
     395              :     // <https://github.com/Azure/azure-sdk-for-rust/issues/1739>
     396              :     //
     397              :     // To work around this, we recreate the storage every time.
     398           26 :     let storage = config.storage(storage_config).await?;
     399              : 
     400           26 :     let year = now.year();
     401           26 :     let month = now.month();
     402           26 :     let day = now.day();
     403           26 :     let hour = now.hour();
     404              :     // segment files by time for S3 performance
     405           26 :     let path = RemotePath::from_string(&format!(
     406           26 :         "{year:04}/{month:02}/{day:02}/{hour:02}/requests_{id}.parquet"
     407           26 :     ))?;
     408           26 :     let cancel = CancellationToken::new();
     409           26 :     let maybe_err = backoff::retry(
     410           38 :         || async {
     411           38 :             let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
     412           38 :             storage
     413           38 :                 .upload(stream, data.len(), &path, None, &cancel)
     414           38 :                 .await
     415           76 :         },
     416           26 :         TimeoutOrCancel::caused_by_cancel,
     417           26 :         FAILED_UPLOAD_WARN_THRESHOLD,
     418           26 :         FAILED_UPLOAD_MAX_RETRIES,
     419           26 :         "request_data_upload",
     420           26 :         // we don't want cancellation to interrupt here, so we make a dummy cancel token
     421           26 :         &cancel,
     422           26 :     )
     423           26 :     .await
     424           26 :     .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
     425           26 :     .and_then(|x| x)
     426           26 :     .with_context(|| format!("request_data_upload: path={path}"))
     427           26 :     .err();
     428              : 
     429           26 :     if let Some(err) = maybe_err {
     430            0 :         tracing::error!(%id, %path, error = ?err, "failed to upload request data");
     431           26 :     }
     432              : 
     433           26 :     Ok(buffer.writer())
     434           26 : }
     435              : 
     436              : #[cfg(test)]
     437              : #[expect(clippy::unwrap_used)]
     438              : mod tests {
     439              :     use std::net::Ipv4Addr;
     440              :     use std::num::NonZeroUsize;
     441              :     use std::sync::Arc;
     442              : 
     443              :     use camino::Utf8Path;
     444              :     use clap::Parser;
     445              :     use futures::{Stream, StreamExt};
     446              :     use itertools::Itertools;
     447              :     use parquet::basic::{Compression, ZstdLevel};
     448              :     use parquet::file::properties::{WriterProperties, DEFAULT_PAGE_SIZE};
     449              :     use parquet::file::reader::FileReader;
     450              :     use parquet::file::serialized_reader::SerializedFileReader;
     451              :     use rand::rngs::StdRng;
     452              :     use rand::{Rng, SeedableRng};
     453              :     use remote_storage::{
     454              :         RemoteStorageConfig, RemoteStorageKind, S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
     455              :         DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
     456              :     };
     457              :     use tokio::sync::mpsc;
     458              :     use tokio::time;
     459              :     use walkdir::WalkDir;
     460              : 
     461              :     use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};
     462              : 
     463              :     #[derive(Parser)]
     464              :     struct ProxyCliArgs {
     465              :         #[clap(flatten)]
     466              :         parquet_upload: ParquetUploadArgs,
     467              :     }
     468              : 
     469              :     #[test]
     470            1 :     fn default_parser() {
     471            1 :         let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from(["proxy"]);
     472            1 :         assert_eq!(parquet_upload.parquet_upload_remote_storage, None);
     473            1 :         assert_eq!(parquet_upload.parquet_upload_row_group_size, 8192);
     474            1 :         assert_eq!(parquet_upload.parquet_upload_page_size, DEFAULT_PAGE_SIZE);
     475            1 :         assert_eq!(parquet_upload.parquet_upload_size, 100_000_000);
     476            1 :         assert_eq!(
     477            1 :             parquet_upload.parquet_upload_maximum_duration,
     478            1 :             time::Duration::from_secs(20 * 60)
     479            1 :         );
     480            1 :         assert_eq!(
     481            1 :             parquet_upload.parquet_upload_compression,
     482            1 :             Compression::UNCOMPRESSED
     483            1 :         );
     484            1 :     }
     485              : 
     486              :     #[test]
     487            1 :     fn full_parser() {
     488            1 :         let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from([
     489            1 :             "proxy",
     490            1 :             "--parquet-upload-remote-storage",
     491            1 :             "{bucket_name='default',prefix_in_bucket='proxy/',bucket_region='us-east-1',endpoint='http://minio:9000'}",
     492            1 :             "--parquet-upload-row-group-size",
     493            1 :             "100",
     494            1 :             "--parquet-upload-page-size",
     495            1 :             "10000",
     496            1 :             "--parquet-upload-size",
     497            1 :             "10000000",
     498            1 :             "--parquet-upload-maximum-duration",
     499            1 :             "10m",
     500            1 :             "--parquet-upload-compression",
     501            1 :             "zstd(5)",
     502            1 :         ]);
     503            1 :         assert_eq!(
     504            1 :             parquet_upload.parquet_upload_remote_storage,
     505            1 :             Some(RemoteStorageConfig {
     506            1 :                 storage: RemoteStorageKind::AwsS3(S3Config {
     507            1 :                     bucket_name: "default".into(),
     508            1 :                     bucket_region: "us-east-1".into(),
     509            1 :                     prefix_in_bucket: Some("proxy/".into()),
     510            1 :                     endpoint: Some("http://minio:9000".into()),
     511            1 :                     concurrency_limit: NonZeroUsize::new(
     512            1 :                         DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
     513            1 :                     )
     514            1 :                     .unwrap(),
     515            1 :                     max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
     516            1 :                     upload_storage_class: None,
     517            1 :                 }),
     518            1 :                 timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
     519            1 :                 small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
     520            1 :             })
     521            1 :         );
     522            1 :         assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
     523            1 :         assert_eq!(parquet_upload.parquet_upload_page_size, 10000);
     524            1 :         assert_eq!(parquet_upload.parquet_upload_size, 10_000_000);
     525            1 :         assert_eq!(
     526            1 :             parquet_upload.parquet_upload_maximum_duration,
     527            1 :             time::Duration::from_secs(10 * 60)
     528            1 :         );
     529            1 :         assert_eq!(
     530            1 :             parquet_upload.parquet_upload_compression,
     531            1 :             Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
     532            1 :         );
     533            1 :     }
     534              : 
     535       159000 :     fn generate_request_data(rng: &mut impl Rng) -> RequestData {
     536       159000 :         RequestData {
     537       159000 :             session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(),
     538       159000 :             peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(),
     539       159000 :             timestamp: chrono::DateTime::from_timestamp_millis(
     540       159000 :                 rng.gen_range(1703862754..1803862754),
     541       159000 :             )
     542       159000 :             .unwrap()
     543       159000 :             .naive_utc(),
     544       159000 :             application_name: Some("test".to_owned()),
     545       159000 :             username: Some(hex::encode(rng.gen::<[u8; 4]>())),
     546       159000 :             endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
     547       159000 :             database: Some(hex::encode(rng.gen::<[u8; 16]>())),
     548       159000 :             project: Some(hex::encode(rng.gen::<[u8; 16]>())),
     549       159000 :             branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
     550       159000 :             pg_options: None,
     551       159000 :             auth_method: None,
     552       159000 :             jwt_issuer: None,
     553       159000 :             protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
     554       159000 :             region: "us-east-1",
     555       159000 :             error: None,
     556       159000 :             success: rng.gen(),
     557       159000 :             cold_start_info: "no",
     558       159000 :             duration_us: rng.gen_range(0..30_000_000),
     559       159000 :             disconnect_timestamp: None,
     560       159000 :         }
     561       159000 :     }
     562              : 
     563            6 :     fn random_stream(len: usize) -> impl Stream<Item = RequestData> + Unpin {
     564            6 :         let mut rng = StdRng::from_seed([0x39; 32]);
     565            6 :         futures::stream::iter(
     566       159000 :             std::iter::repeat_with(move || generate_request_data(&mut rng)).take(len),
     567            6 :         )
     568            6 :     }
     569              : 
     570            4 :     async fn run_test(
     571            4 :         tmpdir: &Utf8Path,
     572            4 :         config: ParquetConfig,
     573            4 :         rx: impl Stream<Item = RequestData>,
     574            4 :     ) -> Vec<(u64, usize, i64)> {
     575            4 :         let remote_storage_config = RemoteStorageConfig {
     576            4 :             storage: RemoteStorageKind::LocalFs {
     577            4 :                 local_path: tmpdir.to_path_buf(),
     578            4 :             },
     579            4 :             timeout: std::time::Duration::from_secs(120),
     580            4 :             small_timeout: std::time::Duration::from_secs(30),
     581            4 :         };
     582            4 : 
     583            4 :         worker_inner(remote_storage_config, rx, config)
     584            4 :             .await
     585            4 :             .unwrap();
     586            4 : 
     587            4 :         let mut files = WalkDir::new(tmpdir.as_std_path())
     588            4 :             .into_iter()
     589           46 :             .filter_map(|entry| entry.ok())
     590           46 :             .filter(|entry| entry.file_type().is_file())
     591           26 :             .map(|entry| entry.path().to_path_buf())
     592            4 :             .collect_vec();
     593            4 :         files.sort();
     594            4 : 
     595            4 :         files
     596            4 :             .into_iter()
     597           26 :             .map(|path| std::fs::File::open(tmpdir.as_std_path().join(path)).unwrap())
     598           26 :             .map(|file| {
     599           26 :                 (
     600           26 :                     file.metadata().unwrap(),
     601           26 :                     SerializedFileReader::new(file).unwrap().metadata().clone(),
     602           26 :                 )
     603           26 :             })
     604           26 :             .map(|(file_meta, parquet_meta)| {
     605           26 :                 (
     606           26 :                     file_meta.len(),
     607           26 :                     parquet_meta.num_row_groups(),
     608           26 :                     parquet_meta.file_metadata().num_rows(),
     609           26 :                 )
     610           26 :             })
     611            4 :             .collect()
     612            4 :     }
     613              : 
     614              :     #[tokio::test]
     615            1 :     async fn verify_parquet_no_compression() {
     616            1 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     617            1 : 
     618            1 :         let config = ParquetConfig {
     619            1 :             propeties: Arc::new(WriterProperties::new()),
     620            1 :             rows_per_group: 2_000,
     621            1 :             file_size: 1_000_000,
     622            1 :             max_duration: time::Duration::from_secs(20 * 60),
     623            1 :             test_remote_failures: 0,
     624            1 :         };
     625            1 : 
     626            1 :         let rx = random_stream(50_000);
     627            1 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     628            1 : 
     629            1 :         assert_eq!(
     630            1 :             file_stats,
     631            1 :             [
     632            1 :                 (1313105, 3, 6000),
     633            1 :                 (1313094, 3, 6000),
     634            1 :                 (1313153, 3, 6000),
     635            1 :                 (1313110, 3, 6000),
     636            1 :                 (1313246, 3, 6000),
     637            1 :                 (1313083, 3, 6000),
     638            1 :                 (1312877, 3, 6000),
     639            1 :                 (1313112, 3, 6000),
     640            1 :                 (438020, 1, 2000)
     641            1 :             ]
     642            1 :         );
     643            1 : 
     644            1 :         tmpdir.close().unwrap();
     645            1 :     }
     646              : 
     647              :     #[tokio::test]
     648            1 :     async fn verify_parquet_strong_compression() {
     649            1 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     650            1 : 
     651            1 :         let config = ParquetConfig {
     652            1 :             propeties: Arc::new(
     653            1 :                 WriterProperties::builder()
     654            1 :                     .set_compression(parquet::basic::Compression::ZSTD(
     655            1 :                         ZstdLevel::try_new(10).unwrap(),
     656            1 :                     ))
     657            1 :                     .build(),
     658            1 :             ),
     659            1 :             rows_per_group: 2_000,
     660            1 :             file_size: 1_000_000,
     661            1 :             max_duration: time::Duration::from_secs(20 * 60),
     662            1 :             test_remote_failures: 0,
     663            1 :         };
     664            1 : 
     665            1 :         let rx = random_stream(50_000);
     666            1 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     667            1 : 
     668            1 :         // with strong compression, the files are smaller
     669            1 :         assert_eq!(
     670            1 :             file_stats,
     671            1 :             [
     672            1 :                 (1204324, 5, 10000),
     673            1 :                 (1204048, 5, 10000),
     674            1 :                 (1204349, 5, 10000),
     675            1 :                 (1204334, 5, 10000),
     676            1 :                 (1204588, 5, 10000)
     677            1 :             ]
     678            1 :         );
     679            1 : 
     680            1 :         tmpdir.close().unwrap();
     681            1 :     }
     682              : 
     683              :     #[tokio::test]
     684            1 :     async fn verify_parquet_unreliable_upload() {
     685            1 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     686            1 : 
     687            1 :         let config = ParquetConfig {
     688            1 :             propeties: Arc::new(WriterProperties::new()),
     689            1 :             rows_per_group: 2_000,
     690            1 :             file_size: 1_000_000,
     691            1 :             max_duration: time::Duration::from_secs(20 * 60),
     692            1 :             test_remote_failures: 2,
     693            1 :         };
     694            1 : 
     695            1 :         let rx = random_stream(50_000);
     696            1 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     697            1 : 
     698            1 :         assert_eq!(
     699            1 :             file_stats,
     700            1 :             [
     701            1 :                 (1313105, 3, 6000),
     702            1 :                 (1313094, 3, 6000),
     703            1 :                 (1313153, 3, 6000),
     704            1 :                 (1313110, 3, 6000),
     705            1 :                 (1313246, 3, 6000),
     706            1 :                 (1313083, 3, 6000),
     707            1 :                 (1312877, 3, 6000),
     708            1 :                 (1313112, 3, 6000),
     709            1 :                 (438020, 1, 2000)
     710            1 :             ]
     711            1 :         );
     712            1 : 
     713            1 :         tmpdir.close().unwrap();
     714            1 :     }
     715              : 
     716              :     #[tokio::test(start_paused = true)]
     717            1 :     async fn verify_parquet_regular_upload() {
     718            1 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     719            1 : 
     720            1 :         let config = ParquetConfig {
     721            1 :             propeties: Arc::new(WriterProperties::new()),
     722            1 :             rows_per_group: 2_000,
     723            1 :             file_size: 1_000_000,
     724            1 :             max_duration: time::Duration::from_secs(60),
     725            1 :             test_remote_failures: 2,
     726            1 :         };
     727            1 : 
     728            1 :         let (tx, mut rx) = mpsc::unbounded_channel();
     729            1 : 
     730            1 :         tokio::spawn(async move {
     731            4 :             for _ in 0..3 {
     732            3 :                 let mut s = random_stream(3000);
     733         9003 :                 while let Some(r) = s.next().await {
     734         9000 :                     tx.send(r).unwrap();
     735         9000 :                 }
     736            3 :                 time::sleep(time::Duration::from_secs(70)).await;
     737            1 :             }
     738            1 :         });
     739            1 : 
     740         9071 :         let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
     741            1 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     742            1 : 
     743            1 :         // files are smaller than the size threshold, but they took too long to fill so were flushed early
     744            1 :         assert_eq!(
     745            1 :             file_stats,
     746            1 :             [(658014, 2, 3001), (657728, 2, 3000), (657524, 2, 2999)]
     747            1 :         );
     748            1 : 
     749            1 :         tmpdir.close().unwrap();
     750            1 :     }
     751              : }
        

Generated by: LCOV version 2.1-beta