Line data Source code
1 : use std::{sync::Arc, time::SystemTime};
2 :
3 : use anyhow::Context;
4 : use bytes::{buf::Writer, BufMut, BytesMut};
5 : use chrono::{Datelike, Timelike};
6 : use futures::{Stream, StreamExt};
7 : use parquet::{
8 : basic::Compression,
9 : file::{
10 : metadata::RowGroupMetaDataPtr,
11 : properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE},
12 : writer::SerializedFileWriter,
13 : },
14 : record::RecordWriter,
15 : };
16 : use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig};
17 : use tokio::{sync::mpsc, time};
18 : use tokio_util::sync::CancellationToken;
19 : use tracing::{debug, info, Span};
20 : use utils::backoff;
21 :
22 : use super::{RequestMonitoring, LOG_CHAN};
23 :
24 62 : #[derive(clap::Args, Clone, Debug)]
25 : pub struct ParquetUploadArgs {
26 : /// Storage location to upload the parquet files to.
27 : /// Encoded as toml (same format as pageservers), eg
28 : /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
29 : #[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
30 0 : parquet_upload_remote_storage: OptRemoteStorageConfig,
31 :
32 : /// How many rows to include in a row group
33 31 : #[clap(long, default_value_t = 8192)]
34 0 : parquet_upload_row_group_size: usize,
35 :
36 : /// How large each column page should be in bytes
37 31 : #[clap(long, default_value_t = DEFAULT_PAGE_SIZE)]
38 0 : parquet_upload_page_size: usize,
39 :
40 : /// How large the total parquet file should be in bytes
41 31 : #[clap(long, default_value_t = 100_000_000)]
42 0 : parquet_upload_size: i64,
43 :
44 : /// How long to wait before forcing a file upload
45 : #[clap(long, default_value = "20m", value_parser = humantime::parse_duration)]
46 0 : parquet_upload_maximum_duration: tokio::time::Duration,
47 :
48 : /// What level of compression to use
49 31 : #[clap(long, default_value_t = Compression::UNCOMPRESSED)]
50 0 : parquet_upload_compression: Compression,
51 : }
52 :
53 : /// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
54 : /// runtime type errors from the value parser we use.
55 : type OptRemoteStorageConfig = Option<RemoteStorageConfig>;
56 :
57 62 : fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
58 62 : RemoteStorageConfig::from_toml(&s.parse()?)
59 62 : }
60 :
61 : // Occasional network issues and such can cause remote operations to fail, and
62 : // that's expected. If a upload fails, we log it at info-level, and retry.
63 : // But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
64 : // level instead, as repeated failures can mean a more serious problem. If it
65 : // fails more than FAILED_UPLOAD_RETRIES times, we give up
66 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
67 : pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
68 :
69 : // the parquet crate leaves a lot to be desired...
70 : // what follows is an attempt to write parquet files with minimal allocs.
71 : // complication: parquet is a columnar format, while we want to write in as rows.
72 : // design:
73 : // * we batch up to 1024 rows, then flush them into a 'row group'
74 : // * after each rowgroup write, we check the length of the file and upload to s3 if large enough
75 :
76 6688010 : #[derive(parquet_derive::ParquetRecordWriter)]
77 : struct RequestData {
78 : region: &'static str,
79 : protocol: &'static str,
80 : /// Must be UTC. The derive macro doesn't like the timezones
81 : timestamp: chrono::NaiveDateTime,
82 : session_id: uuid::Uuid,
83 : peer_addr: String,
84 : username: Option<String>,
85 : application_name: Option<String>,
86 : endpoint_id: Option<String>,
87 : database: Option<String>,
88 : project: Option<String>,
89 : branch: Option<String>,
90 : auth_method: Option<&'static str>,
91 : error: Option<&'static str>,
92 : /// Success is counted if we form a HTTP response with sql rows inside
93 : /// Or if we make it to proxy_pass
94 : success: bool,
95 : /// Tracks time from session start (HTTP request/libpq TCP handshake)
96 : /// Through to success/failure
97 : duration_us: u64,
98 : }
99 :
100 : impl From<RequestMonitoring> for RequestData {
101 0 : fn from(value: RequestMonitoring) -> Self {
102 0 : Self {
103 0 : session_id: value.session_id,
104 0 : peer_addr: value.peer_addr.to_string(),
105 0 : timestamp: value.first_packet.naive_utc(),
106 0 : username: value.user.as_deref().map(String::from),
107 0 : application_name: value.application.as_deref().map(String::from),
108 0 : endpoint_id: value.endpoint_id.as_deref().map(String::from),
109 0 : database: value.dbname.as_deref().map(String::from),
110 0 : project: value.project.as_deref().map(String::from),
111 0 : branch: value.branch.as_deref().map(String::from),
112 0 : auth_method: value.auth_method.as_ref().map(|x| match x {
113 0 : super::AuthMethod::Web => "web",
114 0 : super::AuthMethod::ScramSha256 => "scram_sha_256",
115 0 : super::AuthMethod::ScramSha256Plus => "scram_sha_256_plus",
116 0 : super::AuthMethod::Cleartext => "cleartext",
117 0 : }),
118 0 : protocol: value.protocol,
119 0 : region: value.region,
120 0 : error: value.error_kind.as_ref().map(|e| e.to_metric_label()),
121 0 : success: value.success,
122 0 : duration_us: SystemTime::from(value.first_packet)
123 0 : .elapsed()
124 0 : .unwrap_or_default()
125 0 : .as_micros() as u64, // 584 millenia... good enough
126 0 : }
127 0 : }
128 : }
129 :
130 : /// Parquet request context worker
131 : ///
132 : /// It listened on a channel for all completed requests, extracts the data and writes it into a parquet file,
133 : /// then uploads a completed batch to S3
134 25 : pub async fn worker(
135 25 : cancellation_token: CancellationToken,
136 25 : config: ParquetUploadArgs,
137 25 : ) -> anyhow::Result<()> {
138 25 : let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
139 25 : tracing::warn!("parquet request upload: no s3 bucket configured");
140 25 : return Ok(());
141 : };
142 :
143 0 : let (tx, mut rx) = mpsc::unbounded_channel();
144 0 : LOG_CHAN.set(tx.downgrade()).unwrap();
145 0 :
146 0 : // setup row stream that will close on cancellation
147 0 : tokio::spawn(async move {
148 0 : cancellation_token.cancelled().await;
149 : // dropping this sender will cause the channel to close only once
150 : // all the remaining inflight requests have been completed.
151 0 : drop(tx);
152 0 : });
153 0 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
154 0 : let rx = rx.map(RequestData::from);
155 :
156 0 : let storage =
157 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?;
158 :
159 0 : let properties = WriterProperties::builder()
160 0 : .set_data_page_size_limit(config.parquet_upload_page_size)
161 0 : .set_compression(config.parquet_upload_compression);
162 0 :
163 0 : let parquet_config = ParquetConfig {
164 0 : propeties: Arc::new(properties.build()),
165 0 : rows_per_group: config.parquet_upload_row_group_size,
166 0 : file_size: config.parquet_upload_size,
167 0 : max_duration: config.parquet_upload_maximum_duration,
168 0 :
169 0 : #[cfg(any(test, feature = "testing"))]
170 0 : test_remote_failures: 0,
171 0 : };
172 0 :
173 0 : worker_inner(storage, rx, parquet_config).await
174 25 : }
175 :
176 : struct ParquetConfig {
177 : propeties: WriterPropertiesPtr,
178 : rows_per_group: usize,
179 : file_size: i64,
180 :
181 : max_duration: tokio::time::Duration,
182 :
183 : #[cfg(any(test, feature = "testing"))]
184 : test_remote_failures: u64,
185 : }
186 :
187 10 : async fn worker_inner(
188 10 : storage: GenericRemoteStorage,
189 10 : rx: impl Stream<Item = RequestData>,
190 10 : config: ParquetConfig,
191 10 : ) -> anyhow::Result<()> {
192 : #[cfg(any(test, feature = "testing"))]
193 10 : let storage = if config.test_remote_failures > 0 {
194 4 : GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures)
195 : } else {
196 6 : storage
197 : };
198 :
199 10 : let mut rx = std::pin::pin!(rx);
200 10 :
201 10 : let mut rows = Vec::with_capacity(config.rows_per_group);
202 :
203 10 : let schema = rows.as_slice().schema()?;
204 10 : let buffer = BytesMut::new();
205 10 : let w = buffer.writer();
206 10 : let mut w = SerializedFileWriter::new(w, schema.clone(), config.propeties.clone())?;
207 :
208 10 : let mut last_upload = time::Instant::now();
209 10 :
210 10 : let mut len = 0;
211 418010 : while let Some(row) = rx.next().await {
212 418000 : rows.push(row);
213 418000 : let force = last_upload.elapsed() > config.max_duration;
214 418000 : if rows.len() == config.rows_per_group || force {
215 210 : let rg_meta;
216 210 : (rows, w, rg_meta) = flush_rows(rows, w).await?;
217 210 : len += rg_meta.compressed_size();
218 417790 : }
219 418000 : if len > config.file_size || force {
220 56 : last_upload = time::Instant::now();
221 234 : let file = upload_parquet(w, len, &storage).await?;
222 56 : w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
223 56 : len = 0;
224 417944 : }
225 : }
226 :
227 10 : if !rows.is_empty() {
228 2 : let rg_meta;
229 2 : (_, w, rg_meta) = flush_rows(rows, w).await?;
230 2 : len += rg_meta.compressed_size();
231 8 : }
232 :
233 10 : if !w.flushed_row_groups().is_empty() {
234 24 : let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
235 4 : }
236 :
237 10 : Ok(())
238 10 : }
239 :
240 212 : async fn flush_rows<W>(
241 212 : rows: Vec<RequestData>,
242 212 : mut w: SerializedFileWriter<W>,
243 212 : ) -> anyhow::Result<(
244 212 : Vec<RequestData>,
245 212 : SerializedFileWriter<W>,
246 212 : RowGroupMetaDataPtr,
247 212 : )>
248 212 : where
249 212 : W: std::io::Write + Send + 'static,
250 212 : {
251 212 : let span = Span::current();
252 212 : let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || {
253 212 : let _enter = span.enter();
254 :
255 212 : let mut rg = w.next_row_group()?;
256 212 : rows.as_slice().write_to_row_group(&mut rg)?;
257 212 : let rg_meta = rg.close()?;
258 :
259 212 : let size = rg_meta.compressed_size();
260 212 : let compression = rg_meta.compressed_size() as f64 / rg_meta.total_byte_size() as f64;
261 212 :
262 212 : debug!(size, compression, "flushed row group to parquet file");
263 :
264 212 : Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
265 212 : })
266 212 : .await
267 212 : .unwrap()?;
268 :
269 212 : rows.clear();
270 212 : Ok((rows, w, rg_meta))
271 212 : }
272 :
273 62 : async fn upload_parquet(
274 62 : w: SerializedFileWriter<Writer<BytesMut>>,
275 62 : len: i64,
276 62 : storage: &GenericRemoteStorage,
277 62 : ) -> anyhow::Result<Writer<BytesMut>> {
278 62 : let len_uncompressed = w
279 62 : .flushed_row_groups()
280 62 : .iter()
281 212 : .map(|rg| rg.total_byte_size())
282 62 : .sum::<i64>();
283 :
284 : // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
285 : // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
286 62 : let (writer, metadata) = tokio::task::spawn_blocking(move || w.finish())
287 62 : .await
288 62 : .unwrap()?;
289 :
290 62 : let mut buffer = writer.into_inner();
291 62 : let data = buffer.split().freeze();
292 62 :
293 62 : let compression = len as f64 / len_uncompressed as f64;
294 62 : let size = data.len();
295 62 : let now = chrono::Utc::now();
296 62 : let id = uuid::Uuid::new_v7(uuid::Timestamp::from_unix(
297 62 : uuid::NoContext,
298 62 : // we won't be running this in 1970. this cast is ok
299 62 : now.timestamp() as u64,
300 62 : now.timestamp_subsec_nanos(),
301 62 : ));
302 :
303 0 : info!(
304 0 : %id,
305 0 : rows = metadata.num_rows,
306 0 : size, compression, "uploading request parquet file"
307 0 : );
308 :
309 62 : let year = now.year();
310 62 : let month = now.month();
311 62 : let day = now.day();
312 62 : let hour = now.hour();
313 : // segment files by time for S3 performance
314 62 : let path = RemotePath::from_string(&format!(
315 62 : "{year:04}/{month:02}/{day:02}/{hour:02}/requests_{id}.parquet"
316 62 : ))?;
317 62 : backoff::retry(
318 86 : || async {
319 86 : let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
320 196 : storage.upload(stream, data.len(), &path, None).await
321 172 : },
322 62 : |_e| false,
323 62 : FAILED_UPLOAD_WARN_THRESHOLD,
324 62 : FAILED_UPLOAD_MAX_RETRIES,
325 62 : "request_data_upload",
326 62 : // we don't want cancellation to interrupt here, so we make a dummy cancel token
327 62 : &CancellationToken::new(),
328 62 : )
329 196 : .await
330 62 : .ok_or_else(|| anyhow::anyhow!("Cancelled"))
331 62 : .and_then(|x| x)
332 62 : .context("request_data_upload")?;
333 :
334 62 : Ok(buffer.writer())
335 62 : }
336 :
337 : #[cfg(test)]
338 : mod tests {
339 : use std::{net::Ipv4Addr, num::NonZeroUsize, sync::Arc};
340 :
341 : use camino::Utf8Path;
342 : use clap::Parser;
343 : use futures::{Stream, StreamExt};
344 : use itertools::Itertools;
345 : use parquet::{
346 : basic::{Compression, ZstdLevel},
347 : file::{
348 : properties::{WriterProperties, DEFAULT_PAGE_SIZE},
349 : reader::FileReader,
350 : serialized_reader::SerializedFileReader,
351 : },
352 : };
353 : use rand::{rngs::StdRng, Rng, SeedableRng};
354 : use remote_storage::{
355 : GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config,
356 : DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
357 : };
358 : use tokio::{sync::mpsc, time};
359 : use walkdir::WalkDir;
360 :
361 : use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};
362 :
363 4 : #[derive(Parser)]
364 : struct ProxyCliArgs {
365 : #[clap(flatten)]
366 : parquet_upload: ParquetUploadArgs,
367 : }
368 :
369 2 : #[test]
370 2 : fn default_parser() {
371 2 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from(["proxy"]);
372 2 : assert_eq!(parquet_upload.parquet_upload_remote_storage, None);
373 2 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 8192);
374 2 : assert_eq!(parquet_upload.parquet_upload_page_size, DEFAULT_PAGE_SIZE);
375 2 : assert_eq!(parquet_upload.parquet_upload_size, 100_000_000);
376 2 : assert_eq!(
377 2 : parquet_upload.parquet_upload_maximum_duration,
378 2 : time::Duration::from_secs(20 * 60)
379 2 : );
380 2 : assert_eq!(
381 2 : parquet_upload.parquet_upload_compression,
382 2 : Compression::UNCOMPRESSED
383 2 : );
384 2 : }
385 :
386 2 : #[test]
387 2 : fn full_parser() {
388 2 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from([
389 2 : "proxy",
390 2 : "--parquet-upload-remote-storage",
391 2 : "{bucket_name='default',prefix_in_bucket='proxy/',bucket_region='us-east-1',endpoint='http://minio:9000'}",
392 2 : "--parquet-upload-row-group-size",
393 2 : "100",
394 2 : "--parquet-upload-page-size",
395 2 : "10000",
396 2 : "--parquet-upload-size",
397 2 : "10000000",
398 2 : "--parquet-upload-maximum-duration",
399 2 : "10m",
400 2 : "--parquet-upload-compression",
401 2 : "zstd(5)",
402 2 : ]);
403 2 : assert_eq!(
404 2 : parquet_upload.parquet_upload_remote_storage,
405 2 : Some(RemoteStorageConfig {
406 2 : storage: RemoteStorageKind::AwsS3(S3Config {
407 2 : bucket_name: "default".into(),
408 2 : bucket_region: "us-east-1".into(),
409 2 : prefix_in_bucket: Some("proxy/".into()),
410 2 : endpoint: Some("http://minio:9000".into()),
411 2 : concurrency_limit: NonZeroUsize::new(
412 2 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
413 2 : )
414 2 : .unwrap(),
415 2 : max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
416 2 : })
417 2 : })
418 2 : );
419 2 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
420 2 : assert_eq!(parquet_upload.parquet_upload_page_size, 10000);
421 2 : assert_eq!(parquet_upload.parquet_upload_size, 10_000_000);
422 2 : assert_eq!(
423 2 : parquet_upload.parquet_upload_maximum_duration,
424 2 : time::Duration::from_secs(10 * 60)
425 2 : );
426 2 : assert_eq!(
427 2 : parquet_upload.parquet_upload_compression,
428 2 : Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
429 2 : );
430 2 : }
431 :
432 418000 : fn generate_request_data(rng: &mut impl Rng) -> RequestData {
433 418000 : RequestData {
434 418000 : session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(),
435 418000 : peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(),
436 418000 : timestamp: chrono::NaiveDateTime::from_timestamp_millis(
437 418000 : rng.gen_range(1703862754..1803862754),
438 418000 : )
439 418000 : .unwrap(),
440 418000 : application_name: Some("test".to_owned()),
441 418000 : username: Some(hex::encode(rng.gen::<[u8; 4]>())),
442 418000 : endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
443 418000 : database: Some(hex::encode(rng.gen::<[u8; 16]>())),
444 418000 : project: Some(hex::encode(rng.gen::<[u8; 16]>())),
445 418000 : branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
446 418000 : auth_method: None,
447 418000 : protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
448 418000 : region: "us-east-1",
449 418000 : error: None,
450 418000 : success: rng.gen(),
451 418000 : duration_us: rng.gen_range(0..30_000_000),
452 418000 : }
453 418000 : }
454 :
455 14 : fn random_stream(len: usize) -> impl Stream<Item = RequestData> + Unpin {
456 14 : let mut rng = StdRng::from_seed([0x39; 32]);
457 14 : futures::stream::iter(
458 418000 : std::iter::repeat_with(move || generate_request_data(&mut rng)).take(len),
459 14 : )
460 14 : }
461 :
462 10 : async fn run_test(
463 10 : tmpdir: &Utf8Path,
464 10 : config: ParquetConfig,
465 10 : rx: impl Stream<Item = RequestData>,
466 10 : ) -> Vec<(u64, usize, i64)> {
467 10 : let remote_storage_config = RemoteStorageConfig {
468 10 : storage: RemoteStorageKind::LocalFs(tmpdir.to_path_buf()),
469 10 : };
470 10 : let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap();
471 10 :
472 610 : worker_inner(storage, rx, config).await.unwrap();
473 10 :
474 10 : let mut files = WalkDir::new(tmpdir.as_std_path())
475 10 : .into_iter()
476 112 : .filter_map(|entry| entry.ok())
477 112 : .filter(|entry| entry.file_type().is_file())
478 62 : .map(|entry| entry.path().to_path_buf())
479 10 : .collect_vec();
480 10 : files.sort();
481 10 :
482 10 : files
483 10 : .into_iter()
484 62 : .map(|path| std::fs::File::open(tmpdir.as_std_path().join(path)).unwrap())
485 62 : .map(|file| {
486 62 : (
487 62 : file.metadata().unwrap(),
488 62 : SerializedFileReader::new(file).unwrap().metadata().clone(),
489 62 : )
490 62 : })
491 62 : .map(|(file_meta, parquet_meta)| {
492 62 : (
493 62 : file_meta.len(),
494 62 : parquet_meta.num_row_groups(),
495 62 : parquet_meta.file_metadata().num_rows(),
496 62 : )
497 62 : })
498 10 : .collect()
499 10 : }
500 :
501 2 : #[tokio::test]
502 2 : async fn verify_parquet_no_compression() {
503 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
504 2 :
505 2 : let config = ParquetConfig {
506 2 : propeties: Arc::new(WriterProperties::new()),
507 2 : rows_per_group: 2_000,
508 2 : file_size: 1_000_000,
509 2 : max_duration: time::Duration::from_secs(20 * 60),
510 2 : test_remote_failures: 0,
511 2 : };
512 2 :
513 2 : let rx = random_stream(50_000);
514 124 : let file_stats = run_test(tmpdir.path(), config, rx).await;
515 2 :
516 2 : assert_eq!(
517 2 : file_stats,
518 2 : [
519 2 : (1313727, 3, 6000),
520 2 : (1313720, 3, 6000),
521 2 : (1313780, 3, 6000),
522 2 : (1313737, 3, 6000),
523 2 : (1313867, 3, 6000),
524 2 : (1313709, 3, 6000),
525 2 : (1313501, 3, 6000),
526 2 : (1313737, 3, 6000),
527 2 : (438118, 1, 2000)
528 2 : ],
529 2 : );
530 2 :
531 2 : tmpdir.close().unwrap();
532 2 : }
533 :
534 2 : #[tokio::test]
535 2 : async fn verify_parquet_min_compression() {
536 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
537 2 :
538 2 : let config = ParquetConfig {
539 2 : propeties: Arc::new(
540 2 : WriterProperties::builder()
541 2 : .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::default()))
542 2 : .build(),
543 2 : ),
544 2 : rows_per_group: 2_000,
545 2 : file_size: 1_000_000,
546 2 : max_duration: time::Duration::from_secs(20 * 60),
547 2 : test_remote_failures: 0,
548 2 : };
549 2 :
550 2 : let rx = random_stream(50_000);
551 92 : let file_stats = run_test(tmpdir.path(), config, rx).await;
552 2 :
553 2 : // with compression, there are fewer files with more rows per file
554 2 : assert_eq!(
555 2 : file_stats,
556 2 : [
557 2 : (1219459, 5, 10000),
558 2 : (1225609, 5, 10000),
559 2 : (1227403, 5, 10000),
560 2 : (1226765, 5, 10000),
561 2 : (1218043, 5, 10000)
562 2 : ],
563 2 : );
564 2 :
565 2 : tmpdir.close().unwrap();
566 2 : }
567 :
568 2 : #[tokio::test]
569 2 : async fn verify_parquet_strong_compression() {
570 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
571 2 :
572 2 : let config = ParquetConfig {
573 2 : propeties: Arc::new(
574 2 : WriterProperties::builder()
575 2 : .set_compression(parquet::basic::Compression::ZSTD(
576 2 : ZstdLevel::try_new(10).unwrap(),
577 2 : ))
578 2 : .build(),
579 2 : ),
580 2 : rows_per_group: 2_000,
581 2 : file_size: 1_000_000,
582 2 : max_duration: time::Duration::from_secs(20 * 60),
583 2 : test_remote_failures: 0,
584 2 : };
585 2 :
586 2 : let rx = random_stream(50_000);
587 92 : let file_stats = run_test(tmpdir.path(), config, rx).await;
588 2 :
589 2 : // with strong compression, the files are smaller
590 2 : assert_eq!(
591 2 : file_stats,
592 2 : [
593 2 : (1205106, 5, 10000),
594 2 : (1204837, 5, 10000),
595 2 : (1205130, 5, 10000),
596 2 : (1205118, 5, 10000),
597 2 : (1205373, 5, 10000)
598 2 : ],
599 2 : );
600 2 :
601 2 : tmpdir.close().unwrap();
602 2 : }
603 :
604 2 : #[tokio::test]
605 2 : async fn verify_parquet_unreliable_upload() {
606 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
607 2 :
608 2 : let config = ParquetConfig {
609 2 : propeties: Arc::new(WriterProperties::new()),
610 2 : rows_per_group: 2_000,
611 2 : file_size: 1_000_000,
612 2 : max_duration: time::Duration::from_secs(20 * 60),
613 2 : test_remote_failures: 2,
614 2 : };
615 2 :
616 2 : let rx = random_stream(50_000);
617 124 : let file_stats = run_test(tmpdir.path(), config, rx).await;
618 2 :
619 2 : assert_eq!(
620 2 : file_stats,
621 2 : [
622 2 : (1313727, 3, 6000),
623 2 : (1313720, 3, 6000),
624 2 : (1313780, 3, 6000),
625 2 : (1313737, 3, 6000),
626 2 : (1313867, 3, 6000),
627 2 : (1313709, 3, 6000),
628 2 : (1313501, 3, 6000),
629 2 : (1313737, 3, 6000),
630 2 : (438118, 1, 2000)
631 2 : ],
632 2 : );
633 2 :
634 2 : tmpdir.close().unwrap();
635 2 : }
636 :
637 2 : #[tokio::test(start_paused = true)]
638 2 : async fn verify_parquet_regular_upload() {
639 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
640 2 :
641 2 : let config = ParquetConfig {
642 2 : propeties: Arc::new(WriterProperties::new()),
643 2 : rows_per_group: 2_000,
644 2 : file_size: 1_000_000,
645 2 : max_duration: time::Duration::from_secs(60),
646 2 : test_remote_failures: 2,
647 2 : };
648 2 :
649 2 : let (tx, mut rx) = mpsc::unbounded_channel();
650 2 :
651 2 : tokio::spawn(async move {
652 8 : for _ in 0..3 {
653 6 : let mut s = random_stream(3000);
654 18006 : while let Some(r) = s.next().await {
655 18000 : tx.send(r).unwrap();
656 18000 : }
657 6 : time::sleep(time::Duration::from_secs(70)).await
658 2 : }
659 2 : });
660 2 :
661 18142 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
662 178 : let file_stats = run_test(tmpdir.path(), config, rx).await;
663 2 :
664 2 : // files are smaller than the size threshold, but they took too long to fill so were flushed early
665 2 : assert_eq!(
666 2 : file_stats,
667 2 : [(658383, 2, 3001), (658097, 2, 3000), (657893, 2, 2999)],
668 2 : );
669 2 :
670 2 : tmpdir.close().unwrap();
671 2 : }
672 : }
|