|             Line data    Source code 
       1              : use std::{sync::Arc, time::SystemTime};
       2              : 
       3              : use anyhow::Context;
       4              : use bytes::{buf::Writer, BufMut, BytesMut};
       5              : use chrono::{Datelike, Timelike};
       6              : use futures::{Stream, StreamExt};
       7              : use parquet::{
       8              :     basic::Compression,
       9              :     file::{
      10              :         metadata::RowGroupMetaDataPtr,
      11              :         properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE},
      12              :         writer::SerializedFileWriter,
      13              :     },
      14              :     record::RecordWriter,
      15              : };
      16              : use pq_proto::StartupMessageParams;
      17              : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
      18              : use serde::ser::SerializeMap;
      19              : use tokio::{sync::mpsc, time};
      20              : use tokio_util::sync::CancellationToken;
      21              : use tracing::{debug, info, Span};
      22              : use utils::backoff;
      23              : 
      24              : use crate::{
      25              :     config::{remote_storage_from_toml, OptRemoteStorageConfig},
      26              :     context::LOG_CHAN_DISCONNECT,
      27              : };
      28              : 
      29              : use super::{RequestMonitoring, LOG_CHAN};
      30              : 
      31           12 : #[derive(clap::Args, Clone, Debug)]
      32              : pub struct ParquetUploadArgs {
      33              :     /// Storage location to upload the parquet files to.
      34              :     /// Encoded as toml (same format as pageservers), eg
      35              :     /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
      36              :     #[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
      37            0 :     parquet_upload_remote_storage: OptRemoteStorageConfig,
      38              : 
      39              :     #[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
      40            0 :     parquet_upload_disconnect_events_remote_storage: OptRemoteStorageConfig,
      41              : 
      42              :     /// How many rows to include in a row group
      43            6 :     #[clap(long, default_value_t = 8192)]
      44            0 :     parquet_upload_row_group_size: usize,
      45              : 
      46              :     /// How large each column page should be in bytes
      47            6 :     #[clap(long, default_value_t = DEFAULT_PAGE_SIZE)]
      48            0 :     parquet_upload_page_size: usize,
      49              : 
      50              :     /// How large the total parquet file should be in bytes
      51            6 :     #[clap(long, default_value_t = 100_000_000)]
      52            0 :     parquet_upload_size: i64,
      53              : 
      54              :     /// How long to wait before forcing a file upload
      55              :     #[clap(long, default_value = "20m", value_parser = humantime::parse_duration)]
      56            0 :     parquet_upload_maximum_duration: tokio::time::Duration,
      57              : 
      58              :     /// What level of compression to use
      59            6 :     #[clap(long, default_value_t = Compression::UNCOMPRESSED)]
      60            0 :     parquet_upload_compression: Compression,
      61              : }
      62              : 
      63              : // Occasional network issues and such can cause remote operations to fail, and
      64              : // that's expected. If a upload fails, we log it at info-level, and retry.
      65              : // But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
      66              : // level instead, as repeated failures can mean a more serious problem. If it
      67              : // fails more than FAILED_UPLOAD_RETRIES times, we give up
      68              : pub const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
      69              : pub const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
      70              : 
      71              : // the parquet crate leaves a lot to be desired...
      72              : // what follows is an attempt to write parquet files with minimal allocs.
      73              : // complication: parquet is a columnar format, while we want to write in as rows.
      74              : // design:
      75              : // * we batch up to 1024 rows, then flush them into a 'row group'
      76              : // * after each rowgroup write, we check the length of the file and upload to s3 if large enough
      77              : 
      78      8360222 : #[derive(parquet_derive::ParquetRecordWriter)]
      79              : pub struct RequestData {
      80              :     region: &'static str,
      81              :     protocol: &'static str,
      82              :     /// Must be UTC. The derive macro doesn't like the timezones
      83              :     timestamp: chrono::NaiveDateTime,
      84              :     session_id: uuid::Uuid,
      85              :     peer_addr: String,
      86              :     username: Option<String>,
      87              :     application_name: Option<String>,
      88              :     endpoint_id: Option<String>,
      89              :     database: Option<String>,
      90              :     project: Option<String>,
      91              :     branch: Option<String>,
      92              :     pg_options: Option<String>,
      93              :     auth_method: Option<&'static str>,
      94              :     error: Option<&'static str>,
      95              :     /// Success is counted if we form a HTTP response with sql rows inside
      96              :     /// Or if we make it to proxy_pass
      97              :     success: bool,
      98              :     /// Indicates if the cplane started the new compute node for this request.
      99              :     cold_start_info: &'static str,
     100              :     /// Tracks time from session start (HTTP request/libpq TCP handshake)
     101              :     /// Through to success/failure
     102              :     duration_us: u64,
     103              :     /// If the session was successful after the disconnect, will be created one more event with filled `disconnect_timestamp`.
     104              :     disconnect_timestamp: Option<chrono::NaiveDateTime>,
     105              : }
     106              : 
     107              : struct Options<'a> {
     108              :     options: &'a StartupMessageParams,
     109              : }
     110              : 
     111              : impl<'a> serde::Serialize for Options<'a> {
     112            0 :     fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
     113            0 :     where
     114            0 :         S: serde::Serializer,
     115            0 :     {
     116            0 :         let mut state = s.serialize_map(None)?;
     117            0 :         for (k, v) in self.options.iter() {
     118            0 :             state.serialize_entry(k, v)?;
     119              :         }
     120            0 :         state.end()
     121            0 :     }
     122              : }
     123              : 
     124              : impl From<&RequestMonitoring> for RequestData {
     125            0 :     fn from(value: &RequestMonitoring) -> Self {
     126            0 :         Self {
     127            0 :             session_id: value.session_id,
     128            0 :             peer_addr: value.peer_addr.to_string(),
     129            0 :             timestamp: value.first_packet.naive_utc(),
     130            0 :             username: value.user.as_deref().map(String::from),
     131            0 :             application_name: value.application.as_deref().map(String::from),
     132            0 :             endpoint_id: value.endpoint_id.as_deref().map(String::from),
     133            0 :             database: value.dbname.as_deref().map(String::from),
     134            0 :             project: value.project.as_deref().map(String::from),
     135            0 :             branch: value.branch.as_deref().map(String::from),
     136            0 :             pg_options: value
     137            0 :                 .pg_options
     138            0 :                 .as_ref()
     139            0 :                 .and_then(|options| serde_json::to_string(&Options { options }).ok()),
     140            0 :             auth_method: value.auth_method.as_ref().map(|x| match x {
     141            0 :                 super::AuthMethod::Web => "web",
     142            0 :                 super::AuthMethod::ScramSha256 => "scram_sha_256",
     143            0 :                 super::AuthMethod::ScramSha256Plus => "scram_sha_256_plus",
     144            0 :                 super::AuthMethod::Cleartext => "cleartext",
     145            0 :             }),
     146            0 :             protocol: value.protocol.as_str(),
     147            0 :             region: value.region,
     148            0 :             error: value.error_kind.as_ref().map(|e| e.to_metric_label()),
     149            0 :             success: value.success,
     150            0 :             cold_start_info: value.cold_start_info.as_str(),
     151            0 :             duration_us: SystemTime::from(value.first_packet)
     152            0 :                 .elapsed()
     153            0 :                 .unwrap_or_default()
     154            0 :                 .as_micros() as u64, // 584 millenia... good enough
     155            0 :             disconnect_timestamp: value.disconnect_timestamp.map(|x| x.naive_utc()),
     156            0 :         }
     157            0 :     }
     158              : }
     159              : 
     160              : /// Parquet request context worker
     161              : ///
     162              : /// It listened on a channel for all completed requests, extracts the data and writes it into a parquet file,
     163              : /// then uploads a completed batch to S3
     164            0 : pub async fn worker(
     165            0 :     cancellation_token: CancellationToken,
     166            0 :     config: ParquetUploadArgs,
     167            0 : ) -> anyhow::Result<()> {
     168            0 :     let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
     169            0 :         tracing::warn!("parquet request upload: no s3 bucket configured");
     170            0 :         return Ok(());
     171              :     };
     172              : 
     173            0 :     let (tx, mut rx) = mpsc::unbounded_channel();
     174            0 :     LOG_CHAN.set(tx.downgrade()).unwrap();
     175            0 : 
     176            0 :     // setup row stream that will close on cancellation
     177            0 :     let cancellation_token2 = cancellation_token.clone();
     178            0 :     tokio::spawn(async move {
     179            0 :         cancellation_token2.cancelled().await;
     180              :         // dropping this sender will cause the channel to close only once
     181              :         // all the remaining inflight requests have been completed.
     182            0 :         drop(tx);
     183            0 :     });
     184            0 :     let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
     185            0 :     let rx = rx.map(RequestData::from);
     186              : 
     187            0 :     let storage =
     188            0 :         GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?;
     189              : 
     190            0 :     let properties = WriterProperties::builder()
     191            0 :         .set_data_page_size_limit(config.parquet_upload_page_size)
     192            0 :         .set_compression(config.parquet_upload_compression);
     193            0 : 
     194            0 :     let parquet_config = ParquetConfig {
     195            0 :         propeties: Arc::new(properties.build()),
     196            0 :         rows_per_group: config.parquet_upload_row_group_size,
     197            0 :         file_size: config.parquet_upload_size,
     198            0 :         max_duration: config.parquet_upload_maximum_duration,
     199            0 : 
     200            0 :         #[cfg(any(test, feature = "testing"))]
     201            0 :         test_remote_failures: 0,
     202            0 :     };
     203              : 
     204              :     // TODO(anna): consider moving this to a separate function.
     205            0 :     if let Some(disconnect_events_storage_config) =
     206            0 :         config.parquet_upload_disconnect_events_remote_storage
     207              :     {
     208            0 :         let (tx_disconnect, mut rx_disconnect) = mpsc::unbounded_channel();
     209            0 :         LOG_CHAN_DISCONNECT.set(tx_disconnect.downgrade()).unwrap();
     210            0 : 
     211            0 :         // setup row stream that will close on cancellation
     212            0 :         tokio::spawn(async move {
     213            0 :             cancellation_token.cancelled().await;
     214              :             // dropping this sender will cause the channel to close only once
     215              :             // all the remaining inflight requests have been completed.
     216            0 :             drop(tx_disconnect);
     217            0 :         });
     218            0 :         let rx_disconnect = futures::stream::poll_fn(move |cx| rx_disconnect.poll_recv(cx));
     219            0 :         let rx_disconnect = rx_disconnect.map(RequestData::from);
     220              : 
     221            0 :         let storage_disconnect =
     222            0 :             GenericRemoteStorage::from_config(&disconnect_events_storage_config)
     223            0 :                 .context("remote storage for disconnect events init")?;
     224            0 :         let parquet_config_disconnect = parquet_config.clone();
     225              :         tokio::try_join!(
     226              :             worker_inner(storage, rx, parquet_config),
     227              :             worker_inner(storage_disconnect, rx_disconnect, parquet_config_disconnect)
     228              :         )
     229            0 :         .map(|_| ())
     230              :     } else {
     231            0 :         worker_inner(storage, rx, parquet_config).await
     232              :     }
     233            0 : }
     234              : 
     235              : #[derive(Clone, Debug)]
     236              : struct ParquetConfig {
     237              :     propeties: WriterPropertiesPtr,
     238              :     rows_per_group: usize,
     239              :     file_size: i64,
     240              : 
     241              :     max_duration: tokio::time::Duration,
     242              : 
     243              :     #[cfg(any(test, feature = "testing"))]
     244              :     test_remote_failures: u64,
     245              : }
     246              : 
     247           10 : async fn worker_inner(
     248           10 :     storage: GenericRemoteStorage,
     249           10 :     rx: impl Stream<Item = RequestData>,
     250           10 :     config: ParquetConfig,
     251           10 : ) -> anyhow::Result<()> {
     252              :     #[cfg(any(test, feature = "testing"))]
     253           10 :     let storage = if config.test_remote_failures > 0 {
     254            4 :         GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures)
     255              :     } else {
     256            6 :         storage
     257              :     };
     258              : 
     259           10 :     let mut rx = std::pin::pin!(rx);
     260           10 : 
     261           10 :     let mut rows = Vec::with_capacity(config.rows_per_group);
     262              : 
     263           10 :     let schema = rows.as_slice().schema()?;
     264           10 :     let buffer = BytesMut::new();
     265           10 :     let w = buffer.writer();
     266           10 :     let mut w = SerializedFileWriter::new(w, schema.clone(), config.propeties.clone())?;
     267              : 
     268           10 :     let mut last_upload = time::Instant::now();
     269           10 : 
     270           10 :     let mut len = 0;
     271       418010 :     while let Some(row) = rx.next().await {
     272       418000 :         rows.push(row);
     273       418000 :         let force = last_upload.elapsed() > config.max_duration;
     274       418000 :         if rows.len() == config.rows_per_group || force {
     275          210 :             let rg_meta;
     276          210 :             (rows, w, rg_meta) = flush_rows(rows, w).await?;
     277          210 :             len += rg_meta.compressed_size();
     278       417790 :         }
     279       418000 :         if len > config.file_size || force {
     280           56 :             last_upload = time::Instant::now();
     281          234 :             let file = upload_parquet(w, len, &storage).await?;
     282           56 :             w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
     283           56 :             len = 0;
     284       417944 :         }
     285              :     }
     286              : 
     287           10 :     if !rows.is_empty() {
     288            2 :         let rg_meta;
     289            2 :         (_, w, rg_meta) = flush_rows(rows, w).await?;
     290            2 :         len += rg_meta.compressed_size();
     291            8 :     }
     292              : 
     293           10 :     if !w.flushed_row_groups().is_empty() {
     294           24 :         let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
     295            4 :     }
     296              : 
     297           10 :     Ok(())
     298           10 : }
     299              : 
     300          212 : async fn flush_rows<W>(
     301          212 :     rows: Vec<RequestData>,
     302          212 :     mut w: SerializedFileWriter<W>,
     303          212 : ) -> anyhow::Result<(
     304          212 :     Vec<RequestData>,
     305          212 :     SerializedFileWriter<W>,
     306          212 :     RowGroupMetaDataPtr,
     307          212 : )>
     308          212 : where
     309          212 :     W: std::io::Write + Send + 'static,
     310          212 : {
     311          212 :     let span = Span::current();
     312          212 :     let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || {
     313          212 :         let _enter = span.enter();
     314              : 
     315          212 :         let mut rg = w.next_row_group()?;
     316          212 :         rows.as_slice().write_to_row_group(&mut rg)?;
     317          212 :         let rg_meta = rg.close()?;
     318              : 
     319          212 :         let size = rg_meta.compressed_size();
     320          212 :         let compression = rg_meta.compressed_size() as f64 / rg_meta.total_byte_size() as f64;
     321          212 : 
     322          212 :         debug!(size, compression, "flushed row group to parquet file");
     323              : 
     324          212 :         Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
     325          212 :     })
     326          212 :     .await
     327          212 :     .unwrap()?;
     328              : 
     329          212 :     rows.clear();
     330          212 :     Ok((rows, w, rg_meta))
     331          212 : }
     332              : 
     333           62 : async fn upload_parquet(
     334           62 :     mut w: SerializedFileWriter<Writer<BytesMut>>,
     335           62 :     len: i64,
     336           62 :     storage: &GenericRemoteStorage,
     337           62 : ) -> anyhow::Result<Writer<BytesMut>> {
     338           62 :     let len_uncompressed = w
     339           62 :         .flushed_row_groups()
     340           62 :         .iter()
     341          212 :         .map(|rg| rg.total_byte_size())
     342           62 :         .sum::<i64>();
     343              : 
     344              :     // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
     345              :     // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
     346           62 :     let (mut buffer, metadata) =
     347           62 :         tokio::task::spawn_blocking(move || -> parquet::errors::Result<_> {
     348           62 :             let metadata = w.finish()?;
     349           62 :             let buffer = std::mem::take(w.inner_mut().get_mut());
     350           62 :             Ok((buffer, metadata))
     351           62 :         })
     352           62 :         .await
     353           62 :         .unwrap()?;
     354              : 
     355           62 :     let data = buffer.split().freeze();
     356           62 : 
     357           62 :     let compression = len as f64 / len_uncompressed as f64;
     358           62 :     let size = data.len();
     359           62 :     let now = chrono::Utc::now();
     360           62 :     let id = uuid::Uuid::new_v7(uuid::Timestamp::from_unix(
     361           62 :         uuid::NoContext,
     362           62 :         // we won't be running this in 1970. this cast is ok
     363           62 :         now.timestamp() as u64,
     364           62 :         now.timestamp_subsec_nanos(),
     365           62 :     ));
     366           62 : 
     367           62 :     info!(
     368              :         %id,
     369              :         rows = metadata.num_rows,
     370            0 :         size, compression, "uploading request parquet file"
     371              :     );
     372              : 
     373           62 :     let year = now.year();
     374           62 :     let month = now.month();
     375           62 :     let day = now.day();
     376           62 :     let hour = now.hour();
     377              :     // segment files by time for S3 performance
     378           62 :     let path = RemotePath::from_string(&format!(
     379           62 :         "{year:04}/{month:02}/{day:02}/{hour:02}/requests_{id}.parquet"
     380           62 :     ))?;
     381           62 :     let cancel = CancellationToken::new();
     382           62 :     let maybe_err = backoff::retry(
     383           86 :         || async {
     384           86 :             let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
     385           86 :             storage
     386           86 :                 .upload(stream, data.len(), &path, None, &cancel)
     387          196 :                 .await
     388           86 :         },
     389           62 :         TimeoutOrCancel::caused_by_cancel,
     390           62 :         FAILED_UPLOAD_WARN_THRESHOLD,
     391           62 :         FAILED_UPLOAD_MAX_RETRIES,
     392           62 :         "request_data_upload",
     393           62 :         // we don't want cancellation to interrupt here, so we make a dummy cancel token
     394           62 :         &cancel,
     395           62 :     )
     396          196 :     .await
     397           62 :     .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
     398           62 :     .and_then(|x| x)
     399           62 :     .context("request_data_upload")
     400           62 :     .err();
     401              : 
     402           62 :     if let Some(err) = maybe_err {
     403            0 :         tracing::warn!(%id, %err, "failed to upload request data");
     404           62 :     }
     405              : 
     406           62 :     Ok(buffer.writer())
     407           62 : }
     408              : 
     409              : #[cfg(test)]
     410              : mod tests {
     411              :     use std::{net::Ipv4Addr, num::NonZeroUsize, sync::Arc};
     412              : 
     413              :     use camino::Utf8Path;
     414              :     use clap::Parser;
     415              :     use futures::{Stream, StreamExt};
     416              :     use itertools::Itertools;
     417              :     use parquet::{
     418              :         basic::{Compression, ZstdLevel},
     419              :         file::{
     420              :             properties::{WriterProperties, DEFAULT_PAGE_SIZE},
     421              :             reader::FileReader,
     422              :             serialized_reader::SerializedFileReader,
     423              :         },
     424              :     };
     425              :     use rand::{rngs::StdRng, Rng, SeedableRng};
     426              :     use remote_storage::{
     427              :         GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config,
     428              :         DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
     429              :     };
     430              :     use tokio::{sync::mpsc, time};
     431              :     use walkdir::WalkDir;
     432              : 
     433              :     use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};
     434              : 
     435            4 :     #[derive(Parser)]
     436              :     struct ProxyCliArgs {
     437              :         #[clap(flatten)]
     438              :         parquet_upload: ParquetUploadArgs,
     439              :     }
     440              : 
     441              :     #[test]
     442            2 :     fn default_parser() {
     443            2 :         let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from(["proxy"]);
     444            2 :         assert_eq!(parquet_upload.parquet_upload_remote_storage, None);
     445            2 :         assert_eq!(parquet_upload.parquet_upload_row_group_size, 8192);
     446            2 :         assert_eq!(parquet_upload.parquet_upload_page_size, DEFAULT_PAGE_SIZE);
     447            2 :         assert_eq!(parquet_upload.parquet_upload_size, 100_000_000);
     448            2 :         assert_eq!(
     449            2 :             parquet_upload.parquet_upload_maximum_duration,
     450            2 :             time::Duration::from_secs(20 * 60)
     451            2 :         );
     452            2 :         assert_eq!(
     453            2 :             parquet_upload.parquet_upload_compression,
     454            2 :             Compression::UNCOMPRESSED
     455            2 :         );
     456            2 :     }
     457              : 
     458              :     #[test]
     459            2 :     fn full_parser() {
     460            2 :         let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from([
     461            2 :             "proxy",
     462            2 :             "--parquet-upload-remote-storage",
     463            2 :             "{bucket_name='default',prefix_in_bucket='proxy/',bucket_region='us-east-1',endpoint='http://minio:9000'}",
     464            2 :             "--parquet-upload-row-group-size",
     465            2 :             "100",
     466            2 :             "--parquet-upload-page-size",
     467            2 :             "10000",
     468            2 :             "--parquet-upload-size",
     469            2 :             "10000000",
     470            2 :             "--parquet-upload-maximum-duration",
     471            2 :             "10m",
     472            2 :             "--parquet-upload-compression",
     473            2 :             "zstd(5)",
     474            2 :         ]);
     475            2 :         assert_eq!(
     476            2 :             parquet_upload.parquet_upload_remote_storage,
     477            2 :             Some(RemoteStorageConfig {
     478            2 :                 storage: RemoteStorageKind::AwsS3(S3Config {
     479            2 :                     bucket_name: "default".into(),
     480            2 :                     bucket_region: "us-east-1".into(),
     481            2 :                     prefix_in_bucket: Some("proxy/".into()),
     482            2 :                     endpoint: Some("http://minio:9000".into()),
     483            2 :                     concurrency_limit: NonZeroUsize::new(
     484            2 :                         DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
     485            2 :                     )
     486            2 :                     .unwrap(),
     487            2 :                     max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
     488            2 :                     upload_storage_class: None,
     489            2 :                 }),
     490            2 :                 timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
     491            2 :             })
     492            2 :         );
     493            2 :         assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
     494            2 :         assert_eq!(parquet_upload.parquet_upload_page_size, 10000);
     495            2 :         assert_eq!(parquet_upload.parquet_upload_size, 10_000_000);
     496            2 :         assert_eq!(
     497            2 :             parquet_upload.parquet_upload_maximum_duration,
     498            2 :             time::Duration::from_secs(10 * 60)
     499            2 :         );
     500            2 :         assert_eq!(
     501            2 :             parquet_upload.parquet_upload_compression,
     502            2 :             Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
     503            2 :         );
     504            2 :     }
     505              : 
     506       418000 :     fn generate_request_data(rng: &mut impl Rng) -> RequestData {
     507       418000 :         RequestData {
     508       418000 :             session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(),
     509       418000 :             peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(),
     510       418000 :             timestamp: chrono::DateTime::from_timestamp_millis(
     511       418000 :                 rng.gen_range(1703862754..1803862754),
     512       418000 :             )
     513       418000 :             .unwrap()
     514       418000 :             .naive_utc(),
     515       418000 :             application_name: Some("test".to_owned()),
     516       418000 :             username: Some(hex::encode(rng.gen::<[u8; 4]>())),
     517       418000 :             endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
     518       418000 :             database: Some(hex::encode(rng.gen::<[u8; 16]>())),
     519       418000 :             project: Some(hex::encode(rng.gen::<[u8; 16]>())),
     520       418000 :             branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
     521       418000 :             pg_options: None,
     522       418000 :             auth_method: None,
     523       418000 :             protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
     524       418000 :             region: "us-east-1",
     525       418000 :             error: None,
     526       418000 :             success: rng.gen(),
     527       418000 :             cold_start_info: "no",
     528       418000 :             duration_us: rng.gen_range(0..30_000_000),
     529       418000 :             disconnect_timestamp: None,
     530       418000 :         }
     531       418000 :     }
     532              : 
     533           14 :     fn random_stream(len: usize) -> impl Stream<Item = RequestData> + Unpin {
     534           14 :         let mut rng = StdRng::from_seed([0x39; 32]);
     535           14 :         futures::stream::iter(
     536       418000 :             std::iter::repeat_with(move || generate_request_data(&mut rng)).take(len),
     537           14 :         )
     538           14 :     }
     539              : 
     540           10 :     async fn run_test(
     541           10 :         tmpdir: &Utf8Path,
     542           10 :         config: ParquetConfig,
     543           10 :         rx: impl Stream<Item = RequestData>,
     544           10 :     ) -> Vec<(u64, usize, i64)> {
     545           10 :         let remote_storage_config = RemoteStorageConfig {
     546           10 :             storage: RemoteStorageKind::LocalFs {
     547           10 :                 local_path: tmpdir.to_path_buf(),
     548           10 :             },
     549           10 :             timeout: std::time::Duration::from_secs(120),
     550           10 :         };
     551           10 :         let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap();
     552           10 : 
     553          610 :         worker_inner(storage, rx, config).await.unwrap();
     554           10 : 
     555           10 :         let mut files = WalkDir::new(tmpdir.as_std_path())
     556           10 :             .into_iter()
     557          112 :             .filter_map(|entry| entry.ok())
     558          112 :             .filter(|entry| entry.file_type().is_file())
     559           62 :             .map(|entry| entry.path().to_path_buf())
     560           10 :             .collect_vec();
     561           10 :         files.sort();
     562           10 : 
     563           10 :         files
     564           10 :             .into_iter()
     565           62 :             .map(|path| std::fs::File::open(tmpdir.as_std_path().join(path)).unwrap())
     566           62 :             .map(|file| {
     567           62 :                 (
     568           62 :                     file.metadata().unwrap(),
     569           62 :                     SerializedFileReader::new(file).unwrap().metadata().clone(),
     570           62 :                 )
     571           62 :             })
     572           62 :             .map(|(file_meta, parquet_meta)| {
     573           62 :                 (
     574           62 :                     file_meta.len(),
     575           62 :                     parquet_meta.num_row_groups(),
     576           62 :                     parquet_meta.file_metadata().num_rows(),
     577           62 :                 )
     578           62 :             })
     579           10 :             .collect()
     580           10 :     }
     581              : 
     582              :     #[tokio::test]
     583            2 :     async fn verify_parquet_no_compression() {
     584            2 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     585            2 : 
     586            2 :         let config = ParquetConfig {
     587            2 :             propeties: Arc::new(WriterProperties::new()),
     588            2 :             rows_per_group: 2_000,
     589            2 :             file_size: 1_000_000,
     590            2 :             max_duration: time::Duration::from_secs(20 * 60),
     591            2 :             test_remote_failures: 0,
     592            2 :         };
     593            2 : 
     594            2 :         let rx = random_stream(50_000);
     595          124 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     596            2 : 
     597            2 :         assert_eq!(
     598            2 :             file_stats,
     599            2 :             [
     600            2 :                 (1315874, 3, 6000),
     601            2 :                 (1315867, 3, 6000),
     602            2 :                 (1315927, 3, 6000),
     603            2 :                 (1315884, 3, 6000),
     604            2 :                 (1316014, 3, 6000),
     605            2 :                 (1315856, 3, 6000),
     606            2 :                 (1315648, 3, 6000),
     607            2 :                 (1315884, 3, 6000),
     608            2 :                 (438913, 1, 2000)
     609            2 :             ]
     610            2 :         );
     611            2 : 
     612            2 :         tmpdir.close().unwrap();
     613            2 :     }
     614              : 
     615              :     #[tokio::test]
     616            2 :     async fn verify_parquet_min_compression() {
     617            2 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     618            2 : 
     619            2 :         let config = ParquetConfig {
     620            2 :             propeties: Arc::new(
     621            2 :                 WriterProperties::builder()
     622            2 :                     .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::default()))
     623            2 :                     .build(),
     624            2 :             ),
     625            2 :             rows_per_group: 2_000,
     626            2 :             file_size: 1_000_000,
     627            2 :             max_duration: time::Duration::from_secs(20 * 60),
     628            2 :             test_remote_failures: 0,
     629            2 :         };
     630            2 : 
     631            2 :         let rx = random_stream(50_000);
     632           92 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     633            2 : 
     634            2 :         // with compression, there are fewer files with more rows per file
     635            2 :         assert_eq!(
     636            2 :             file_stats,
     637            2 :             [
     638            2 :                 (1223214, 5, 10000),
     639            2 :                 (1229364, 5, 10000),
     640            2 :                 (1231158, 5, 10000),
     641            2 :                 (1230520, 5, 10000),
     642            2 :                 (1221798, 5, 10000)
     643            2 :             ]
     644            2 :         );
     645            2 : 
     646            2 :         tmpdir.close().unwrap();
     647            2 :     }
     648              : 
     649              :     #[tokio::test]
     650            2 :     async fn verify_parquet_strong_compression() {
     651            2 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     652            2 : 
     653            2 :         let config = ParquetConfig {
     654            2 :             propeties: Arc::new(
     655            2 :                 WriterProperties::builder()
     656            2 :                     .set_compression(parquet::basic::Compression::ZSTD(
     657            2 :                         ZstdLevel::try_new(10).unwrap(),
     658            2 :                     ))
     659            2 :                     .build(),
     660            2 :             ),
     661            2 :             rows_per_group: 2_000,
     662            2 :             file_size: 1_000_000,
     663            2 :             max_duration: time::Duration::from_secs(20 * 60),
     664            2 :             test_remote_failures: 0,
     665            2 :         };
     666            2 : 
     667            2 :         let rx = random_stream(50_000);
     668           92 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     669            2 : 
     670            2 :         // with strong compression, the files are smaller
     671            2 :         assert_eq!(
     672            2 :             file_stats,
     673            2 :             [
     674            2 :                 (1208861, 5, 10000),
     675            2 :                 (1208592, 5, 10000),
     676            2 :                 (1208885, 5, 10000),
     677            2 :                 (1208873, 5, 10000),
     678            2 :                 (1209128, 5, 10000)
     679            2 :             ]
     680            2 :         );
     681            2 : 
     682            2 :         tmpdir.close().unwrap();
     683            2 :     }
     684              : 
     685              :     #[tokio::test]
     686            2 :     async fn verify_parquet_unreliable_upload() {
     687            2 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     688            2 : 
     689            2 :         let config = ParquetConfig {
     690            2 :             propeties: Arc::new(WriterProperties::new()),
     691            2 :             rows_per_group: 2_000,
     692            2 :             file_size: 1_000_000,
     693            2 :             max_duration: time::Duration::from_secs(20 * 60),
     694            2 :             test_remote_failures: 2,
     695            2 :         };
     696            2 : 
     697            2 :         let rx = random_stream(50_000);
     698          124 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     699            2 : 
     700            2 :         assert_eq!(
     701            2 :             file_stats,
     702            2 :             [
     703            2 :                 (1315874, 3, 6000),
     704            2 :                 (1315867, 3, 6000),
     705            2 :                 (1315927, 3, 6000),
     706            2 :                 (1315884, 3, 6000),
     707            2 :                 (1316014, 3, 6000),
     708            2 :                 (1315856, 3, 6000),
     709            2 :                 (1315648, 3, 6000),
     710            2 :                 (1315884, 3, 6000),
     711            2 :                 (438913, 1, 2000)
     712            2 :             ]
     713            2 :         );
     714            2 : 
     715            2 :         tmpdir.close().unwrap();
     716            2 :     }
     717              : 
     718              :     #[tokio::test(start_paused = true)]
     719            2 :     async fn verify_parquet_regular_upload() {
     720            2 :         let tmpdir = camino_tempfile::tempdir().unwrap();
     721            2 : 
     722            2 :         let config = ParquetConfig {
     723            2 :             propeties: Arc::new(WriterProperties::new()),
     724            2 :             rows_per_group: 2_000,
     725            2 :             file_size: 1_000_000,
     726            2 :             max_duration: time::Duration::from_secs(60),
     727            2 :             test_remote_failures: 2,
     728            2 :         };
     729            2 : 
     730            2 :         let (tx, mut rx) = mpsc::unbounded_channel();
     731            2 : 
     732            2 :         tokio::spawn(async move {
     733            8 :             for _ in 0..3 {
     734            6 :                 let mut s = random_stream(3000);
     735        18006 :                 while let Some(r) = s.next().await {
     736        18000 :                     tx.send(r).unwrap();
     737        18000 :                 }
     738            6 :                 time::sleep(time::Duration::from_secs(70)).await
     739            2 :             }
     740            2 :         });
     741            2 : 
     742        18142 :         let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
     743          178 :         let file_stats = run_test(tmpdir.path(), config, rx).await;
     744            2 : 
     745            2 :         // files are smaller than the size threshold, but they took too long to fill so were flushed early
     746            2 :         assert_eq!(
     747            2 :             file_stats,
     748            2 :             [(659836, 2, 3001), (659550, 2, 3000), (659346, 2, 2999)]
     749            2 :         );
     750            2 : 
     751            2 :         tmpdir.close().unwrap();
     752            2 :     }
     753              : }
         |