Line data Source code
1 : use std::{sync::Arc, time::SystemTime};
2 :
3 : use anyhow::Context;
4 : use bytes::{buf::Writer, BufMut, BytesMut};
5 : use chrono::{Datelike, Timelike};
6 : use futures::{Stream, StreamExt};
7 : use parquet::{
8 : basic::Compression,
9 : file::{
10 : metadata::RowGroupMetaDataPtr,
11 : properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE},
12 : writer::SerializedFileWriter,
13 : },
14 : record::RecordWriter,
15 : };
16 : use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel};
17 : use tokio::{sync::mpsc, time};
18 : use tokio_util::sync::CancellationToken;
19 : use tracing::{debug, info, Span};
20 : use utils::backoff;
21 :
22 : use super::{RequestMonitoring, LOG_CHAN};
23 :
24 12 : #[derive(clap::Args, Clone, Debug)]
25 : pub struct ParquetUploadArgs {
26 : /// Storage location to upload the parquet files to.
27 : /// Encoded as toml (same format as pageservers), eg
28 : /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
29 : #[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
30 0 : parquet_upload_remote_storage: OptRemoteStorageConfig,
31 :
32 : /// How many rows to include in a row group
33 6 : #[clap(long, default_value_t = 8192)]
34 0 : parquet_upload_row_group_size: usize,
35 :
36 : /// How large each column page should be in bytes
37 6 : #[clap(long, default_value_t = DEFAULT_PAGE_SIZE)]
38 0 : parquet_upload_page_size: usize,
39 :
40 : /// How large the total parquet file should be in bytes
41 6 : #[clap(long, default_value_t = 100_000_000)]
42 0 : parquet_upload_size: i64,
43 :
44 : /// How long to wait before forcing a file upload
45 : #[clap(long, default_value = "20m", value_parser = humantime::parse_duration)]
46 0 : parquet_upload_maximum_duration: tokio::time::Duration,
47 :
48 : /// What level of compression to use
49 6 : #[clap(long, default_value_t = Compression::UNCOMPRESSED)]
50 0 : parquet_upload_compression: Compression,
51 : }
52 :
53 : /// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
54 : /// runtime type errors from the value parser we use.
55 : type OptRemoteStorageConfig = Option<RemoteStorageConfig>;
56 :
57 12 : fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
58 12 : RemoteStorageConfig::from_toml(&s.parse()?)
59 12 : }
60 :
61 : // Occasional network issues and such can cause remote operations to fail, and
62 : // that's expected. If a upload fails, we log it at info-level, and retry.
63 : // But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
64 : // level instead, as repeated failures can mean a more serious problem. If it
65 : // fails more than FAILED_UPLOAD_RETRIES times, we give up
66 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
67 : pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
68 :
69 : // the parquet crate leaves a lot to be desired...
70 : // what follows is an attempt to write parquet files with minimal allocs.
71 : // complication: parquet is a columnar format, while we want to write in as rows.
72 : // design:
73 : // * we batch up to 1024 rows, then flush them into a 'row group'
74 : // * after each rowgroup write, we check the length of the file and upload to s3 if large enough
75 :
76 6688010 : #[derive(parquet_derive::ParquetRecordWriter)]
77 : struct RequestData {
78 : region: &'static str,
79 : protocol: &'static str,
80 : /// Must be UTC. The derive macro doesn't like the timezones
81 : timestamp: chrono::NaiveDateTime,
82 : session_id: uuid::Uuid,
83 : peer_addr: String,
84 : username: Option<String>,
85 : application_name: Option<String>,
86 : endpoint_id: Option<String>,
87 : database: Option<String>,
88 : project: Option<String>,
89 : branch: Option<String>,
90 : auth_method: Option<&'static str>,
91 : error: Option<&'static str>,
92 : /// Success is counted if we form a HTTP response with sql rows inside
93 : /// Or if we make it to proxy_pass
94 : success: bool,
95 : /// Tracks time from session start (HTTP request/libpq TCP handshake)
96 : /// Through to success/failure
97 : duration_us: u64,
98 : }
99 :
100 : impl From<RequestMonitoring> for RequestData {
101 0 : fn from(value: RequestMonitoring) -> Self {
102 0 : Self {
103 0 : session_id: value.session_id,
104 0 : peer_addr: value.peer_addr.to_string(),
105 0 : timestamp: value.first_packet.naive_utc(),
106 0 : username: value.user.as_deref().map(String::from),
107 0 : application_name: value.application.as_deref().map(String::from),
108 0 : endpoint_id: value.endpoint_id.as_deref().map(String::from),
109 0 : database: value.dbname.as_deref().map(String::from),
110 0 : project: value.project.as_deref().map(String::from),
111 0 : branch: value.branch.as_deref().map(String::from),
112 0 : auth_method: value.auth_method.as_ref().map(|x| match x {
113 0 : super::AuthMethod::Web => "web",
114 0 : super::AuthMethod::ScramSha256 => "scram_sha_256",
115 0 : super::AuthMethod::ScramSha256Plus => "scram_sha_256_plus",
116 0 : super::AuthMethod::Cleartext => "cleartext",
117 0 : }),
118 0 : protocol: value.protocol,
119 0 : region: value.region,
120 0 : error: value.error_kind.as_ref().map(|e| e.to_metric_label()),
121 0 : success: value.success,
122 0 : duration_us: SystemTime::from(value.first_packet)
123 0 : .elapsed()
124 0 : .unwrap_or_default()
125 0 : .as_micros() as u64, // 584 millenia... good enough
126 0 : }
127 0 : }
128 : }
129 :
130 : /// Parquet request context worker
131 : ///
132 : /// It listened on a channel for all completed requests, extracts the data and writes it into a parquet file,
133 : /// then uploads a completed batch to S3
134 0 : pub async fn worker(
135 0 : cancellation_token: CancellationToken,
136 0 : config: ParquetUploadArgs,
137 0 : ) -> anyhow::Result<()> {
138 0 : let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
139 0 : tracing::warn!("parquet request upload: no s3 bucket configured");
140 0 : return Ok(());
141 : };
142 :
143 0 : let (tx, mut rx) = mpsc::unbounded_channel();
144 0 : LOG_CHAN.set(tx.downgrade()).unwrap();
145 0 :
146 0 : // setup row stream that will close on cancellation
147 0 : tokio::spawn(async move {
148 0 : cancellation_token.cancelled().await;
149 : // dropping this sender will cause the channel to close only once
150 : // all the remaining inflight requests have been completed.
151 0 : drop(tx);
152 0 : });
153 0 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
154 0 : let rx = rx.map(RequestData::from);
155 :
156 0 : let storage =
157 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?;
158 :
159 0 : let properties = WriterProperties::builder()
160 0 : .set_data_page_size_limit(config.parquet_upload_page_size)
161 0 : .set_compression(config.parquet_upload_compression);
162 0 :
163 0 : let parquet_config = ParquetConfig {
164 0 : propeties: Arc::new(properties.build()),
165 0 : rows_per_group: config.parquet_upload_row_group_size,
166 0 : file_size: config.parquet_upload_size,
167 0 : max_duration: config.parquet_upload_maximum_duration,
168 0 :
169 0 : #[cfg(any(test, feature = "testing"))]
170 0 : test_remote_failures: 0,
171 0 : };
172 0 :
173 0 : worker_inner(storage, rx, parquet_config).await
174 0 : }
175 :
176 : struct ParquetConfig {
177 : propeties: WriterPropertiesPtr,
178 : rows_per_group: usize,
179 : file_size: i64,
180 :
181 : max_duration: tokio::time::Duration,
182 :
183 : #[cfg(any(test, feature = "testing"))]
184 : test_remote_failures: u64,
185 : }
186 :
187 10 : async fn worker_inner(
188 10 : storage: GenericRemoteStorage,
189 10 : rx: impl Stream<Item = RequestData>,
190 10 : config: ParquetConfig,
191 10 : ) -> anyhow::Result<()> {
192 : #[cfg(any(test, feature = "testing"))]
193 10 : let storage = if config.test_remote_failures > 0 {
194 4 : GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures)
195 : } else {
196 6 : storage
197 : };
198 :
199 10 : let mut rx = std::pin::pin!(rx);
200 10 :
201 10 : let mut rows = Vec::with_capacity(config.rows_per_group);
202 :
203 10 : let schema = rows.as_slice().schema()?;
204 10 : let buffer = BytesMut::new();
205 10 : let w = buffer.writer();
206 10 : let mut w = SerializedFileWriter::new(w, schema.clone(), config.propeties.clone())?;
207 :
208 10 : let mut last_upload = time::Instant::now();
209 10 :
210 10 : let mut len = 0;
211 418010 : while let Some(row) = rx.next().await {
212 418000 : rows.push(row);
213 418000 : let force = last_upload.elapsed() > config.max_duration;
214 418000 : if rows.len() == config.rows_per_group || force {
215 210 : let rg_meta;
216 210 : (rows, w, rg_meta) = flush_rows(rows, w).await?;
217 210 : len += rg_meta.compressed_size();
218 417790 : }
219 418000 : if len > config.file_size || force {
220 56 : last_upload = time::Instant::now();
221 234 : let file = upload_parquet(w, len, &storage).await?;
222 56 : w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
223 56 : len = 0;
224 417944 : }
225 : }
226 :
227 10 : if !rows.is_empty() {
228 2 : let rg_meta;
229 2 : (_, w, rg_meta) = flush_rows(rows, w).await?;
230 2 : len += rg_meta.compressed_size();
231 8 : }
232 :
233 10 : if !w.flushed_row_groups().is_empty() {
234 24 : let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
235 4 : }
236 :
237 10 : Ok(())
238 10 : }
239 :
240 212 : async fn flush_rows<W>(
241 212 : rows: Vec<RequestData>,
242 212 : mut w: SerializedFileWriter<W>,
243 212 : ) -> anyhow::Result<(
244 212 : Vec<RequestData>,
245 212 : SerializedFileWriter<W>,
246 212 : RowGroupMetaDataPtr,
247 212 : )>
248 212 : where
249 212 : W: std::io::Write + Send + 'static,
250 212 : {
251 212 : let span = Span::current();
252 212 : let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || {
253 212 : let _enter = span.enter();
254 :
255 212 : let mut rg = w.next_row_group()?;
256 212 : rows.as_slice().write_to_row_group(&mut rg)?;
257 212 : let rg_meta = rg.close()?;
258 :
259 212 : let size = rg_meta.compressed_size();
260 212 : let compression = rg_meta.compressed_size() as f64 / rg_meta.total_byte_size() as f64;
261 212 :
262 212 : debug!(size, compression, "flushed row group to parquet file");
263 :
264 212 : Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
265 212 : })
266 212 : .await
267 212 : .unwrap()?;
268 :
269 212 : rows.clear();
270 212 : Ok((rows, w, rg_meta))
271 212 : }
272 :
273 62 : async fn upload_parquet(
274 62 : w: SerializedFileWriter<Writer<BytesMut>>,
275 62 : len: i64,
276 62 : storage: &GenericRemoteStorage,
277 62 : ) -> anyhow::Result<Writer<BytesMut>> {
278 62 : let len_uncompressed = w
279 62 : .flushed_row_groups()
280 62 : .iter()
281 212 : .map(|rg| rg.total_byte_size())
282 62 : .sum::<i64>();
283 :
284 : // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
285 : // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
286 62 : let (writer, metadata) = tokio::task::spawn_blocking(move || w.finish())
287 62 : .await
288 62 : .unwrap()?;
289 :
290 62 : let mut buffer = writer.into_inner();
291 62 : let data = buffer.split().freeze();
292 62 :
293 62 : let compression = len as f64 / len_uncompressed as f64;
294 62 : let size = data.len();
295 62 : let now = chrono::Utc::now();
296 62 : let id = uuid::Uuid::new_v7(uuid::Timestamp::from_unix(
297 62 : uuid::NoContext,
298 62 : // we won't be running this in 1970. this cast is ok
299 62 : now.timestamp() as u64,
300 62 : now.timestamp_subsec_nanos(),
301 62 : ));
302 :
303 0 : info!(
304 0 : %id,
305 0 : rows = metadata.num_rows,
306 0 : size, compression, "uploading request parquet file"
307 0 : );
308 :
309 62 : let year = now.year();
310 62 : let month = now.month();
311 62 : let day = now.day();
312 62 : let hour = now.hour();
313 : // segment files by time for S3 performance
314 62 : let path = RemotePath::from_string(&format!(
315 62 : "{year:04}/{month:02}/{day:02}/{hour:02}/requests_{id}.parquet"
316 62 : ))?;
317 62 : let cancel = CancellationToken::new();
318 62 : backoff::retry(
319 86 : || async {
320 86 : let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
321 86 : storage
322 86 : .upload(stream, data.len(), &path, None, &cancel)
323 196 : .await
324 172 : },
325 62 : TimeoutOrCancel::caused_by_cancel,
326 62 : FAILED_UPLOAD_WARN_THRESHOLD,
327 62 : FAILED_UPLOAD_MAX_RETRIES,
328 62 : "request_data_upload",
329 62 : // we don't want cancellation to interrupt here, so we make a dummy cancel token
330 62 : &cancel,
331 62 : )
332 196 : .await
333 62 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
334 62 : .and_then(|x| x)
335 62 : .context("request_data_upload")?;
336 :
337 62 : Ok(buffer.writer())
338 62 : }
339 :
340 : #[cfg(test)]
341 : mod tests {
342 : use std::{net::Ipv4Addr, num::NonZeroUsize, sync::Arc};
343 :
344 : use camino::Utf8Path;
345 : use clap::Parser;
346 : use futures::{Stream, StreamExt};
347 : use itertools::Itertools;
348 : use parquet::{
349 : basic::{Compression, ZstdLevel},
350 : file::{
351 : properties::{WriterProperties, DEFAULT_PAGE_SIZE},
352 : reader::FileReader,
353 : serialized_reader::SerializedFileReader,
354 : },
355 : };
356 : use rand::{rngs::StdRng, Rng, SeedableRng};
357 : use remote_storage::{
358 : GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config,
359 : DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
360 : };
361 : use tokio::{sync::mpsc, time};
362 : use walkdir::WalkDir;
363 :
364 : use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};
365 :
366 4 : #[derive(Parser)]
367 : struct ProxyCliArgs {
368 : #[clap(flatten)]
369 : parquet_upload: ParquetUploadArgs,
370 : }
371 :
372 2 : #[test]
373 2 : fn default_parser() {
374 2 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from(["proxy"]);
375 2 : assert_eq!(parquet_upload.parquet_upload_remote_storage, None);
376 2 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 8192);
377 2 : assert_eq!(parquet_upload.parquet_upload_page_size, DEFAULT_PAGE_SIZE);
378 2 : assert_eq!(parquet_upload.parquet_upload_size, 100_000_000);
379 2 : assert_eq!(
380 2 : parquet_upload.parquet_upload_maximum_duration,
381 2 : time::Duration::from_secs(20 * 60)
382 2 : );
383 2 : assert_eq!(
384 2 : parquet_upload.parquet_upload_compression,
385 2 : Compression::UNCOMPRESSED
386 2 : );
387 2 : }
388 :
389 2 : #[test]
390 2 : fn full_parser() {
391 2 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from([
392 2 : "proxy",
393 2 : "--parquet-upload-remote-storage",
394 2 : "{bucket_name='default',prefix_in_bucket='proxy/',bucket_region='us-east-1',endpoint='http://minio:9000'}",
395 2 : "--parquet-upload-row-group-size",
396 2 : "100",
397 2 : "--parquet-upload-page-size",
398 2 : "10000",
399 2 : "--parquet-upload-size",
400 2 : "10000000",
401 2 : "--parquet-upload-maximum-duration",
402 2 : "10m",
403 2 : "--parquet-upload-compression",
404 2 : "zstd(5)",
405 2 : ]);
406 2 : assert_eq!(
407 2 : parquet_upload.parquet_upload_remote_storage,
408 2 : Some(RemoteStorageConfig {
409 2 : storage: RemoteStorageKind::AwsS3(S3Config {
410 2 : bucket_name: "default".into(),
411 2 : bucket_region: "us-east-1".into(),
412 2 : prefix_in_bucket: Some("proxy/".into()),
413 2 : endpoint: Some("http://minio:9000".into()),
414 2 : concurrency_limit: NonZeroUsize::new(
415 2 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
416 2 : )
417 2 : .unwrap(),
418 2 : max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
419 2 : }),
420 2 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
421 2 : })
422 2 : );
423 2 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
424 2 : assert_eq!(parquet_upload.parquet_upload_page_size, 10000);
425 2 : assert_eq!(parquet_upload.parquet_upload_size, 10_000_000);
426 2 : assert_eq!(
427 2 : parquet_upload.parquet_upload_maximum_duration,
428 2 : time::Duration::from_secs(10 * 60)
429 2 : );
430 2 : assert_eq!(
431 2 : parquet_upload.parquet_upload_compression,
432 2 : Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
433 2 : );
434 2 : }
435 :
436 418000 : fn generate_request_data(rng: &mut impl Rng) -> RequestData {
437 418000 : RequestData {
438 418000 : session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(),
439 418000 : peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(),
440 418000 : timestamp: chrono::NaiveDateTime::from_timestamp_millis(
441 418000 : rng.gen_range(1703862754..1803862754),
442 418000 : )
443 418000 : .unwrap(),
444 418000 : application_name: Some("test".to_owned()),
445 418000 : username: Some(hex::encode(rng.gen::<[u8; 4]>())),
446 418000 : endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
447 418000 : database: Some(hex::encode(rng.gen::<[u8; 16]>())),
448 418000 : project: Some(hex::encode(rng.gen::<[u8; 16]>())),
449 418000 : branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
450 418000 : auth_method: None,
451 418000 : protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
452 418000 : region: "us-east-1",
453 418000 : error: None,
454 418000 : success: rng.gen(),
455 418000 : duration_us: rng.gen_range(0..30_000_000),
456 418000 : }
457 418000 : }
458 :
459 14 : fn random_stream(len: usize) -> impl Stream<Item = RequestData> + Unpin {
460 14 : let mut rng = StdRng::from_seed([0x39; 32]);
461 14 : futures::stream::iter(
462 418000 : std::iter::repeat_with(move || generate_request_data(&mut rng)).take(len),
463 14 : )
464 14 : }
465 :
466 10 : async fn run_test(
467 10 : tmpdir: &Utf8Path,
468 10 : config: ParquetConfig,
469 10 : rx: impl Stream<Item = RequestData>,
470 10 : ) -> Vec<(u64, usize, i64)> {
471 10 : let remote_storage_config = RemoteStorageConfig {
472 10 : storage: RemoteStorageKind::LocalFs(tmpdir.to_path_buf()),
473 10 : timeout: std::time::Duration::from_secs(120),
474 10 : };
475 10 : let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap();
476 10 :
477 610 : worker_inner(storage, rx, config).await.unwrap();
478 10 :
479 10 : let mut files = WalkDir::new(tmpdir.as_std_path())
480 10 : .into_iter()
481 112 : .filter_map(|entry| entry.ok())
482 112 : .filter(|entry| entry.file_type().is_file())
483 62 : .map(|entry| entry.path().to_path_buf())
484 10 : .collect_vec();
485 10 : files.sort();
486 10 :
487 10 : files
488 10 : .into_iter()
489 62 : .map(|path| std::fs::File::open(tmpdir.as_std_path().join(path)).unwrap())
490 62 : .map(|file| {
491 62 : (
492 62 : file.metadata().unwrap(),
493 62 : SerializedFileReader::new(file).unwrap().metadata().clone(),
494 62 : )
495 62 : })
496 62 : .map(|(file_meta, parquet_meta)| {
497 62 : (
498 62 : file_meta.len(),
499 62 : parquet_meta.num_row_groups(),
500 62 : parquet_meta.file_metadata().num_rows(),
501 62 : )
502 62 : })
503 10 : .collect()
504 10 : }
505 :
506 2 : #[tokio::test]
507 2 : async fn verify_parquet_no_compression() {
508 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
509 2 :
510 2 : let config = ParquetConfig {
511 2 : propeties: Arc::new(WriterProperties::new()),
512 2 : rows_per_group: 2_000,
513 2 : file_size: 1_000_000,
514 2 : max_duration: time::Duration::from_secs(20 * 60),
515 2 : test_remote_failures: 0,
516 2 : };
517 2 :
518 2 : let rx = random_stream(50_000);
519 124 : let file_stats = run_test(tmpdir.path(), config, rx).await;
520 2 :
521 2 : assert_eq!(
522 2 : file_stats,
523 2 : [
524 2 : (1313727, 3, 6000),
525 2 : (1313720, 3, 6000),
526 2 : (1313780, 3, 6000),
527 2 : (1313737, 3, 6000),
528 2 : (1313867, 3, 6000),
529 2 : (1313709, 3, 6000),
530 2 : (1313501, 3, 6000),
531 2 : (1313737, 3, 6000),
532 2 : (438118, 1, 2000)
533 2 : ],
534 2 : );
535 2 :
536 2 : tmpdir.close().unwrap();
537 2 : }
538 :
539 2 : #[tokio::test]
540 2 : async fn verify_parquet_min_compression() {
541 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
542 2 :
543 2 : let config = ParquetConfig {
544 2 : propeties: Arc::new(
545 2 : WriterProperties::builder()
546 2 : .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::default()))
547 2 : .build(),
548 2 : ),
549 2 : rows_per_group: 2_000,
550 2 : file_size: 1_000_000,
551 2 : max_duration: time::Duration::from_secs(20 * 60),
552 2 : test_remote_failures: 0,
553 2 : };
554 2 :
555 2 : let rx = random_stream(50_000);
556 92 : let file_stats = run_test(tmpdir.path(), config, rx).await;
557 2 :
558 2 : // with compression, there are fewer files with more rows per file
559 2 : assert_eq!(
560 2 : file_stats,
561 2 : [
562 2 : (1219459, 5, 10000),
563 2 : (1225609, 5, 10000),
564 2 : (1227403, 5, 10000),
565 2 : (1226765, 5, 10000),
566 2 : (1218043, 5, 10000)
567 2 : ],
568 2 : );
569 2 :
570 2 : tmpdir.close().unwrap();
571 2 : }
572 :
573 2 : #[tokio::test]
574 2 : async fn verify_parquet_strong_compression() {
575 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
576 2 :
577 2 : let config = ParquetConfig {
578 2 : propeties: Arc::new(
579 2 : WriterProperties::builder()
580 2 : .set_compression(parquet::basic::Compression::ZSTD(
581 2 : ZstdLevel::try_new(10).unwrap(),
582 2 : ))
583 2 : .build(),
584 2 : ),
585 2 : rows_per_group: 2_000,
586 2 : file_size: 1_000_000,
587 2 : max_duration: time::Duration::from_secs(20 * 60),
588 2 : test_remote_failures: 0,
589 2 : };
590 2 :
591 2 : let rx = random_stream(50_000);
592 92 : let file_stats = run_test(tmpdir.path(), config, rx).await;
593 2 :
594 2 : // with strong compression, the files are smaller
595 2 : assert_eq!(
596 2 : file_stats,
597 2 : [
598 2 : (1205106, 5, 10000),
599 2 : (1204837, 5, 10000),
600 2 : (1205130, 5, 10000),
601 2 : (1205118, 5, 10000),
602 2 : (1205373, 5, 10000)
603 2 : ],
604 2 : );
605 2 :
606 2 : tmpdir.close().unwrap();
607 2 : }
608 :
609 2 : #[tokio::test]
610 2 : async fn verify_parquet_unreliable_upload() {
611 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
612 2 :
613 2 : let config = ParquetConfig {
614 2 : propeties: Arc::new(WriterProperties::new()),
615 2 : rows_per_group: 2_000,
616 2 : file_size: 1_000_000,
617 2 : max_duration: time::Duration::from_secs(20 * 60),
618 2 : test_remote_failures: 2,
619 2 : };
620 2 :
621 2 : let rx = random_stream(50_000);
622 124 : let file_stats = run_test(tmpdir.path(), config, rx).await;
623 2 :
624 2 : assert_eq!(
625 2 : file_stats,
626 2 : [
627 2 : (1313727, 3, 6000),
628 2 : (1313720, 3, 6000),
629 2 : (1313780, 3, 6000),
630 2 : (1313737, 3, 6000),
631 2 : (1313867, 3, 6000),
632 2 : (1313709, 3, 6000),
633 2 : (1313501, 3, 6000),
634 2 : (1313737, 3, 6000),
635 2 : (438118, 1, 2000)
636 2 : ],
637 2 : );
638 2 :
639 2 : tmpdir.close().unwrap();
640 2 : }
641 :
642 2 : #[tokio::test(start_paused = true)]
643 2 : async fn verify_parquet_regular_upload() {
644 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
645 2 :
646 2 : let config = ParquetConfig {
647 2 : propeties: Arc::new(WriterProperties::new()),
648 2 : rows_per_group: 2_000,
649 2 : file_size: 1_000_000,
650 2 : max_duration: time::Duration::from_secs(60),
651 2 : test_remote_failures: 2,
652 2 : };
653 2 :
654 2 : let (tx, mut rx) = mpsc::unbounded_channel();
655 2 :
656 2 : tokio::spawn(async move {
657 8 : for _ in 0..3 {
658 6 : let mut s = random_stream(3000);
659 18006 : while let Some(r) = s.next().await {
660 18000 : tx.send(r).unwrap();
661 18000 : }
662 6 : time::sleep(time::Duration::from_secs(70)).await
663 2 : }
664 2 : });
665 2 :
666 18142 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
667 178 : let file_stats = run_test(tmpdir.path(), config, rx).await;
668 2 :
669 2 : // files are smaller than the size threshold, but they took too long to fill so were flushed early
670 2 : assert_eq!(
671 2 : file_stats,
672 2 : [(658383, 2, 3001), (658097, 2, 3000), (657893, 2, 2999)],
673 2 : );
674 2 :
675 2 : tmpdir.close().unwrap();
676 2 : }
677 : }
|