Line data Source code
1 : use std::{sync::Arc, time::SystemTime};
2 :
3 : use anyhow::Context;
4 : use bytes::{buf::Writer, BufMut, BytesMut};
5 : use chrono::{Datelike, Timelike};
6 : use futures::{Stream, StreamExt};
7 : use parquet::{
8 : basic::Compression,
9 : file::{
10 : metadata::RowGroupMetaDataPtr,
11 : properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE},
12 : writer::SerializedFileWriter,
13 : },
14 : record::RecordWriter,
15 : };
16 : use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel};
17 : use tokio::{sync::mpsc, time};
18 : use tokio_util::sync::CancellationToken;
19 : use tracing::{debug, info, Span};
20 : use utils::backoff;
21 :
22 : use super::{RequestMonitoring, LOG_CHAN};
23 :
24 12 : #[derive(clap::Args, Clone, Debug)]
25 : pub struct ParquetUploadArgs {
26 : /// Storage location to upload the parquet files to.
27 : /// Encoded as toml (same format as pageservers), eg
28 : /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
29 : #[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
30 0 : parquet_upload_remote_storage: OptRemoteStorageConfig,
31 :
32 : /// How many rows to include in a row group
33 6 : #[clap(long, default_value_t = 8192)]
34 0 : parquet_upload_row_group_size: usize,
35 :
36 : /// How large each column page should be in bytes
37 6 : #[clap(long, default_value_t = DEFAULT_PAGE_SIZE)]
38 0 : parquet_upload_page_size: usize,
39 :
40 : /// How large the total parquet file should be in bytes
41 6 : #[clap(long, default_value_t = 100_000_000)]
42 0 : parquet_upload_size: i64,
43 :
44 : /// How long to wait before forcing a file upload
45 : #[clap(long, default_value = "20m", value_parser = humantime::parse_duration)]
46 0 : parquet_upload_maximum_duration: tokio::time::Duration,
47 :
48 : /// What level of compression to use
49 6 : #[clap(long, default_value_t = Compression::UNCOMPRESSED)]
50 0 : parquet_upload_compression: Compression,
51 : }
52 :
53 : /// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
54 : /// runtime type errors from the value parser we use.
55 : type OptRemoteStorageConfig = Option<RemoteStorageConfig>;
56 :
57 12 : fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
58 12 : RemoteStorageConfig::from_toml(&s.parse()?)
59 12 : }
60 :
61 : // Occasional network issues and such can cause remote operations to fail, and
62 : // that's expected. If a upload fails, we log it at info-level, and retry.
63 : // But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
64 : // level instead, as repeated failures can mean a more serious problem. If it
65 : // fails more than FAILED_UPLOAD_RETRIES times, we give up
66 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
67 : pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
68 :
69 : // the parquet crate leaves a lot to be desired...
70 : // what follows is an attempt to write parquet files with minimal allocs.
71 : // complication: parquet is a columnar format, while we want to write in as rows.
72 : // design:
73 : // * we batch up to 1024 rows, then flush them into a 'row group'
74 : // * after each rowgroup write, we check the length of the file and upload to s3 if large enough
75 :
76 7524010 : #[derive(parquet_derive::ParquetRecordWriter)]
77 : struct RequestData {
78 : region: &'static str,
79 : protocol: &'static str,
80 : /// Must be UTC. The derive macro doesn't like the timezones
81 : timestamp: chrono::NaiveDateTime,
82 : session_id: uuid::Uuid,
83 : peer_addr: String,
84 : username: Option<String>,
85 : application_name: Option<String>,
86 : endpoint_id: Option<String>,
87 : database: Option<String>,
88 : project: Option<String>,
89 : branch: Option<String>,
90 : auth_method: Option<&'static str>,
91 : error: Option<&'static str>,
92 : /// Success is counted if we form a HTTP response with sql rows inside
93 : /// Or if we make it to proxy_pass
94 : success: bool,
95 : /// Indicates if the cplane started the new compute node for this request.
96 : is_cold_start: Option<bool>,
97 : /// Tracks time from session start (HTTP request/libpq TCP handshake)
98 : /// Through to success/failure
99 : duration_us: u64,
100 : }
101 :
102 : impl From<RequestMonitoring> for RequestData {
103 0 : fn from(value: RequestMonitoring) -> Self {
104 0 : Self {
105 0 : session_id: value.session_id,
106 0 : peer_addr: value.peer_addr.to_string(),
107 0 : timestamp: value.first_packet.naive_utc(),
108 0 : username: value.user.as_deref().map(String::from),
109 0 : application_name: value.application.as_deref().map(String::from),
110 0 : endpoint_id: value.endpoint_id.as_deref().map(String::from),
111 0 : database: value.dbname.as_deref().map(String::from),
112 0 : project: value.project.as_deref().map(String::from),
113 0 : branch: value.branch.as_deref().map(String::from),
114 0 : auth_method: value.auth_method.as_ref().map(|x| match x {
115 0 : super::AuthMethod::Web => "web",
116 0 : super::AuthMethod::ScramSha256 => "scram_sha_256",
117 0 : super::AuthMethod::ScramSha256Plus => "scram_sha_256_plus",
118 0 : super::AuthMethod::Cleartext => "cleartext",
119 0 : }),
120 0 : protocol: value.protocol,
121 0 : region: value.region,
122 0 : error: value.error_kind.as_ref().map(|e| e.to_metric_label()),
123 0 : success: value.success,
124 0 : is_cold_start: value.is_cold_start,
125 0 : duration_us: SystemTime::from(value.first_packet)
126 0 : .elapsed()
127 0 : .unwrap_or_default()
128 0 : .as_micros() as u64, // 584 millenia... good enough
129 0 : }
130 0 : }
131 : }
132 :
133 : /// Parquet request context worker
134 : ///
135 : /// It listened on a channel for all completed requests, extracts the data and writes it into a parquet file,
136 : /// then uploads a completed batch to S3
137 0 : pub async fn worker(
138 0 : cancellation_token: CancellationToken,
139 0 : config: ParquetUploadArgs,
140 0 : ) -> anyhow::Result<()> {
141 0 : let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
142 0 : tracing::warn!("parquet request upload: no s3 bucket configured");
143 0 : return Ok(());
144 : };
145 :
146 0 : let (tx, mut rx) = mpsc::unbounded_channel();
147 0 : LOG_CHAN.set(tx.downgrade()).unwrap();
148 0 :
149 0 : // setup row stream that will close on cancellation
150 0 : tokio::spawn(async move {
151 0 : cancellation_token.cancelled().await;
152 : // dropping this sender will cause the channel to close only once
153 : // all the remaining inflight requests have been completed.
154 0 : drop(tx);
155 0 : });
156 0 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
157 0 : let rx = rx.map(RequestData::from);
158 :
159 0 : let storage =
160 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?;
161 :
162 0 : let properties = WriterProperties::builder()
163 0 : .set_data_page_size_limit(config.parquet_upload_page_size)
164 0 : .set_compression(config.parquet_upload_compression);
165 0 :
166 0 : let parquet_config = ParquetConfig {
167 0 : propeties: Arc::new(properties.build()),
168 0 : rows_per_group: config.parquet_upload_row_group_size,
169 0 : file_size: config.parquet_upload_size,
170 0 : max_duration: config.parquet_upload_maximum_duration,
171 0 :
172 0 : #[cfg(any(test, feature = "testing"))]
173 0 : test_remote_failures: 0,
174 0 : };
175 0 :
176 0 : worker_inner(storage, rx, parquet_config).await
177 0 : }
178 :
179 : struct ParquetConfig {
180 : propeties: WriterPropertiesPtr,
181 : rows_per_group: usize,
182 : file_size: i64,
183 :
184 : max_duration: tokio::time::Duration,
185 :
186 : #[cfg(any(test, feature = "testing"))]
187 : test_remote_failures: u64,
188 : }
189 :
190 10 : async fn worker_inner(
191 10 : storage: GenericRemoteStorage,
192 10 : rx: impl Stream<Item = RequestData>,
193 10 : config: ParquetConfig,
194 10 : ) -> anyhow::Result<()> {
195 : #[cfg(any(test, feature = "testing"))]
196 10 : let storage = if config.test_remote_failures > 0 {
197 4 : GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures)
198 : } else {
199 6 : storage
200 : };
201 :
202 10 : let mut rx = std::pin::pin!(rx);
203 10 :
204 10 : let mut rows = Vec::with_capacity(config.rows_per_group);
205 :
206 10 : let schema = rows.as_slice().schema()?;
207 10 : let buffer = BytesMut::new();
208 10 : let w = buffer.writer();
209 10 : let mut w = SerializedFileWriter::new(w, schema.clone(), config.propeties.clone())?;
210 :
211 10 : let mut last_upload = time::Instant::now();
212 10 :
213 10 : let mut len = 0;
214 418010 : while let Some(row) = rx.next().await {
215 418000 : rows.push(row);
216 418000 : let force = last_upload.elapsed() > config.max_duration;
217 418000 : if rows.len() == config.rows_per_group || force {
218 210 : let rg_meta;
219 210 : (rows, w, rg_meta) = flush_rows(rows, w).await?;
220 210 : len += rg_meta.compressed_size();
221 417790 : }
222 418000 : if len > config.file_size || force {
223 56 : last_upload = time::Instant::now();
224 233 : let file = upload_parquet(w, len, &storage).await?;
225 56 : w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
226 56 : len = 0;
227 417944 : }
228 : }
229 :
230 10 : if !rows.is_empty() {
231 2 : let rg_meta;
232 2 : (_, w, rg_meta) = flush_rows(rows, w).await?;
233 2 : len += rg_meta.compressed_size();
234 8 : }
235 :
236 10 : if !w.flushed_row_groups().is_empty() {
237 24 : let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
238 4 : }
239 :
240 10 : Ok(())
241 10 : }
242 :
243 212 : async fn flush_rows<W>(
244 212 : rows: Vec<RequestData>,
245 212 : mut w: SerializedFileWriter<W>,
246 212 : ) -> anyhow::Result<(
247 212 : Vec<RequestData>,
248 212 : SerializedFileWriter<W>,
249 212 : RowGroupMetaDataPtr,
250 212 : )>
251 212 : where
252 212 : W: std::io::Write + Send + 'static,
253 212 : {
254 212 : let span = Span::current();
255 212 : let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || {
256 212 : let _enter = span.enter();
257 :
258 212 : let mut rg = w.next_row_group()?;
259 212 : rows.as_slice().write_to_row_group(&mut rg)?;
260 212 : let rg_meta = rg.close()?;
261 :
262 212 : let size = rg_meta.compressed_size();
263 212 : let compression = rg_meta.compressed_size() as f64 / rg_meta.total_byte_size() as f64;
264 212 :
265 212 : debug!(size, compression, "flushed row group to parquet file");
266 :
267 212 : Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
268 212 : })
269 212 : .await
270 212 : .unwrap()?;
271 :
272 212 : rows.clear();
273 212 : Ok((rows, w, rg_meta))
274 212 : }
275 :
276 62 : async fn upload_parquet(
277 62 : w: SerializedFileWriter<Writer<BytesMut>>,
278 62 : len: i64,
279 62 : storage: &GenericRemoteStorage,
280 62 : ) -> anyhow::Result<Writer<BytesMut>> {
281 62 : let len_uncompressed = w
282 62 : .flushed_row_groups()
283 62 : .iter()
284 212 : .map(|rg| rg.total_byte_size())
285 62 : .sum::<i64>();
286 :
287 : // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
288 : // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
289 62 : let (writer, metadata) = tokio::task::spawn_blocking(move || w.finish())
290 62 : .await
291 62 : .unwrap()?;
292 :
293 62 : let mut buffer = writer.into_inner();
294 62 : let data = buffer.split().freeze();
295 62 :
296 62 : let compression = len as f64 / len_uncompressed as f64;
297 62 : let size = data.len();
298 62 : let now = chrono::Utc::now();
299 62 : let id = uuid::Uuid::new_v7(uuid::Timestamp::from_unix(
300 62 : uuid::NoContext,
301 62 : // we won't be running this in 1970. this cast is ok
302 62 : now.timestamp() as u64,
303 62 : now.timestamp_subsec_nanos(),
304 62 : ));
305 :
306 0 : info!(
307 0 : %id,
308 0 : rows = metadata.num_rows,
309 0 : size, compression, "uploading request parquet file"
310 0 : );
311 :
312 62 : let year = now.year();
313 62 : let month = now.month();
314 62 : let day = now.day();
315 62 : let hour = now.hour();
316 : // segment files by time for S3 performance
317 62 : let path = RemotePath::from_string(&format!(
318 62 : "{year:04}/{month:02}/{day:02}/{hour:02}/requests_{id}.parquet"
319 62 : ))?;
320 62 : let cancel = CancellationToken::new();
321 62 : backoff::retry(
322 86 : || async {
323 86 : let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
324 86 : storage
325 86 : .upload(stream, data.len(), &path, None, &cancel)
326 195 : .await
327 172 : },
328 62 : TimeoutOrCancel::caused_by_cancel,
329 62 : FAILED_UPLOAD_WARN_THRESHOLD,
330 62 : FAILED_UPLOAD_MAX_RETRIES,
331 62 : "request_data_upload",
332 62 : // we don't want cancellation to interrupt here, so we make a dummy cancel token
333 62 : &cancel,
334 62 : )
335 195 : .await
336 62 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
337 62 : .and_then(|x| x)
338 62 : .context("request_data_upload")?;
339 :
340 62 : Ok(buffer.writer())
341 62 : }
342 :
343 : #[cfg(test)]
344 : mod tests {
345 : use std::{net::Ipv4Addr, num::NonZeroUsize, sync::Arc};
346 :
347 : use camino::Utf8Path;
348 : use clap::Parser;
349 : use futures::{Stream, StreamExt};
350 : use itertools::Itertools;
351 : use parquet::{
352 : basic::{Compression, ZstdLevel},
353 : file::{
354 : properties::{WriterProperties, DEFAULT_PAGE_SIZE},
355 : reader::FileReader,
356 : serialized_reader::SerializedFileReader,
357 : },
358 : };
359 : use rand::{rngs::StdRng, Rng, SeedableRng};
360 : use remote_storage::{
361 : GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config,
362 : DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
363 : };
364 : use tokio::{sync::mpsc, time};
365 : use walkdir::WalkDir;
366 :
367 : use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};
368 :
369 4 : #[derive(Parser)]
370 : struct ProxyCliArgs {
371 : #[clap(flatten)]
372 : parquet_upload: ParquetUploadArgs,
373 : }
374 :
375 2 : #[test]
376 2 : fn default_parser() {
377 2 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from(["proxy"]);
378 2 : assert_eq!(parquet_upload.parquet_upload_remote_storage, None);
379 2 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 8192);
380 2 : assert_eq!(parquet_upload.parquet_upload_page_size, DEFAULT_PAGE_SIZE);
381 2 : assert_eq!(parquet_upload.parquet_upload_size, 100_000_000);
382 2 : assert_eq!(
383 2 : parquet_upload.parquet_upload_maximum_duration,
384 2 : time::Duration::from_secs(20 * 60)
385 2 : );
386 2 : assert_eq!(
387 2 : parquet_upload.parquet_upload_compression,
388 2 : Compression::UNCOMPRESSED
389 2 : );
390 2 : }
391 :
392 2 : #[test]
393 2 : fn full_parser() {
394 2 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from([
395 2 : "proxy",
396 2 : "--parquet-upload-remote-storage",
397 2 : "{bucket_name='default',prefix_in_bucket='proxy/',bucket_region='us-east-1',endpoint='http://minio:9000'}",
398 2 : "--parquet-upload-row-group-size",
399 2 : "100",
400 2 : "--parquet-upload-page-size",
401 2 : "10000",
402 2 : "--parquet-upload-size",
403 2 : "10000000",
404 2 : "--parquet-upload-maximum-duration",
405 2 : "10m",
406 2 : "--parquet-upload-compression",
407 2 : "zstd(5)",
408 2 : ]);
409 2 : assert_eq!(
410 2 : parquet_upload.parquet_upload_remote_storage,
411 2 : Some(RemoteStorageConfig {
412 2 : storage: RemoteStorageKind::AwsS3(S3Config {
413 2 : bucket_name: "default".into(),
414 2 : bucket_region: "us-east-1".into(),
415 2 : prefix_in_bucket: Some("proxy/".into()),
416 2 : endpoint: Some("http://minio:9000".into()),
417 2 : concurrency_limit: NonZeroUsize::new(
418 2 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
419 2 : )
420 2 : .unwrap(),
421 2 : max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
422 2 : }),
423 2 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
424 2 : })
425 2 : );
426 2 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
427 2 : assert_eq!(parquet_upload.parquet_upload_page_size, 10000);
428 2 : assert_eq!(parquet_upload.parquet_upload_size, 10_000_000);
429 2 : assert_eq!(
430 2 : parquet_upload.parquet_upload_maximum_duration,
431 2 : time::Duration::from_secs(10 * 60)
432 2 : );
433 2 : assert_eq!(
434 2 : parquet_upload.parquet_upload_compression,
435 2 : Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
436 2 : );
437 2 : }
438 :
439 418000 : fn generate_request_data(rng: &mut impl Rng) -> RequestData {
440 418000 : RequestData {
441 418000 : session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(),
442 418000 : peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(),
443 418000 : timestamp: chrono::NaiveDateTime::from_timestamp_millis(
444 418000 : rng.gen_range(1703862754..1803862754),
445 418000 : )
446 418000 : .unwrap(),
447 418000 : application_name: Some("test".to_owned()),
448 418000 : username: Some(hex::encode(rng.gen::<[u8; 4]>())),
449 418000 : endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
450 418000 : database: Some(hex::encode(rng.gen::<[u8; 16]>())),
451 418000 : project: Some(hex::encode(rng.gen::<[u8; 16]>())),
452 418000 : branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
453 418000 : auth_method: None,
454 418000 : protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
455 418000 : region: "us-east-1",
456 418000 : error: None,
457 418000 : success: rng.gen(),
458 418000 : is_cold_start: Some(true),
459 418000 : duration_us: rng.gen_range(0..30_000_000),
460 418000 : }
461 418000 : }
462 :
463 14 : fn random_stream(len: usize) -> impl Stream<Item = RequestData> + Unpin {
464 14 : let mut rng = StdRng::from_seed([0x39; 32]);
465 14 : futures::stream::iter(
466 418000 : std::iter::repeat_with(move || generate_request_data(&mut rng)).take(len),
467 14 : )
468 14 : }
469 :
470 10 : async fn run_test(
471 10 : tmpdir: &Utf8Path,
472 10 : config: ParquetConfig,
473 10 : rx: impl Stream<Item = RequestData>,
474 10 : ) -> Vec<(u64, usize, i64)> {
475 10 : let remote_storage_config = RemoteStorageConfig {
476 10 : storage: RemoteStorageKind::LocalFs(tmpdir.to_path_buf()),
477 10 : timeout: std::time::Duration::from_secs(120),
478 10 : };
479 10 : let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap();
480 10 :
481 609 : worker_inner(storage, rx, config).await.unwrap();
482 10 :
483 10 : let mut files = WalkDir::new(tmpdir.as_std_path())
484 10 : .into_iter()
485 112 : .filter_map(|entry| entry.ok())
486 112 : .filter(|entry| entry.file_type().is_file())
487 62 : .map(|entry| entry.path().to_path_buf())
488 10 : .collect_vec();
489 10 : files.sort();
490 10 :
491 10 : files
492 10 : .into_iter()
493 62 : .map(|path| std::fs::File::open(tmpdir.as_std_path().join(path)).unwrap())
494 62 : .map(|file| {
495 62 : (
496 62 : file.metadata().unwrap(),
497 62 : SerializedFileReader::new(file).unwrap().metadata().clone(),
498 62 : )
499 62 : })
500 62 : .map(|(file_meta, parquet_meta)| {
501 62 : (
502 62 : file_meta.len(),
503 62 : parquet_meta.num_row_groups(),
504 62 : parquet_meta.file_metadata().num_rows(),
505 62 : )
506 62 : })
507 10 : .collect()
508 10 : }
509 :
510 2 : #[tokio::test]
511 2 : async fn verify_parquet_no_compression() {
512 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
513 2 :
514 2 : let config = ParquetConfig {
515 2 : propeties: Arc::new(WriterProperties::new()),
516 2 : rows_per_group: 2_000,
517 2 : file_size: 1_000_000,
518 2 : max_duration: time::Duration::from_secs(20 * 60),
519 2 : test_remote_failures: 0,
520 2 : };
521 2 :
522 2 : let rx = random_stream(50_000);
523 123 : let file_stats = run_test(tmpdir.path(), config, rx).await;
524 2 :
525 2 : assert_eq!(
526 2 : file_stats,
527 2 : [
528 2 : (1315032, 3, 6000),
529 2 : (1315025, 3, 6000),
530 2 : (1315085, 3, 6000),
531 2 : (1315042, 3, 6000),
532 2 : (1315172, 3, 6000),
533 2 : (1315014, 3, 6000),
534 2 : (1314806, 3, 6000),
535 2 : (1315042, 3, 6000),
536 2 : (438563, 1, 2000)
537 2 : ],
538 2 : );
539 2 :
540 2 : tmpdir.close().unwrap();
541 2 : }
542 :
543 2 : #[tokio::test]
544 2 : async fn verify_parquet_min_compression() {
545 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
546 2 :
547 2 : let config = ParquetConfig {
548 2 : propeties: Arc::new(
549 2 : WriterProperties::builder()
550 2 : .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::default()))
551 2 : .build(),
552 2 : ),
553 2 : rows_per_group: 2_000,
554 2 : file_size: 1_000_000,
555 2 : max_duration: time::Duration::from_secs(20 * 60),
556 2 : test_remote_failures: 0,
557 2 : };
558 2 :
559 2 : let rx = random_stream(50_000);
560 92 : let file_stats = run_test(tmpdir.path(), config, rx).await;
561 2 :
562 2 : // with compression, there are fewer files with more rows per file
563 2 : assert_eq!(
564 2 : file_stats,
565 2 : [
566 2 : (1220433, 5, 10000),
567 2 : (1226583, 5, 10000),
568 2 : (1228377, 5, 10000),
569 2 : (1227739, 5, 10000),
570 2 : (1219017, 5, 10000)
571 2 : ],
572 2 : );
573 2 :
574 2 : tmpdir.close().unwrap();
575 2 : }
576 :
577 2 : #[tokio::test]
578 2 : async fn verify_parquet_strong_compression() {
579 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
580 2 :
581 2 : let config = ParquetConfig {
582 2 : propeties: Arc::new(
583 2 : WriterProperties::builder()
584 2 : .set_compression(parquet::basic::Compression::ZSTD(
585 2 : ZstdLevel::try_new(10).unwrap(),
586 2 : ))
587 2 : .build(),
588 2 : ),
589 2 : rows_per_group: 2_000,
590 2 : file_size: 1_000_000,
591 2 : max_duration: time::Duration::from_secs(20 * 60),
592 2 : test_remote_failures: 0,
593 2 : };
594 2 :
595 2 : let rx = random_stream(50_000);
596 92 : let file_stats = run_test(tmpdir.path(), config, rx).await;
597 2 :
598 2 : // with strong compression, the files are smaller
599 2 : assert_eq!(
600 2 : file_stats,
601 2 : [
602 2 : (1206080, 5, 10000),
603 2 : (1205811, 5, 10000),
604 2 : (1206104, 5, 10000),
605 2 : (1206092, 5, 10000),
606 2 : (1206347, 5, 10000)
607 2 : ],
608 2 : );
609 2 :
610 2 : tmpdir.close().unwrap();
611 2 : }
612 :
613 2 : #[tokio::test]
614 2 : async fn verify_parquet_unreliable_upload() {
615 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
616 2 :
617 2 : let config = ParquetConfig {
618 2 : propeties: Arc::new(WriterProperties::new()),
619 2 : rows_per_group: 2_000,
620 2 : file_size: 1_000_000,
621 2 : max_duration: time::Duration::from_secs(20 * 60),
622 2 : test_remote_failures: 2,
623 2 : };
624 2 :
625 2 : let rx = random_stream(50_000);
626 124 : let file_stats = run_test(tmpdir.path(), config, rx).await;
627 2 :
628 2 : assert_eq!(
629 2 : file_stats,
630 2 : [
631 2 : (1315032, 3, 6000),
632 2 : (1315025, 3, 6000),
633 2 : (1315085, 3, 6000),
634 2 : (1315042, 3, 6000),
635 2 : (1315172, 3, 6000),
636 2 : (1315014, 3, 6000),
637 2 : (1314806, 3, 6000),
638 2 : (1315042, 3, 6000),
639 2 : (438563, 1, 2000)
640 2 : ],
641 2 : );
642 2 :
643 2 : tmpdir.close().unwrap();
644 2 : }
645 :
646 2 : #[tokio::test(start_paused = true)]
647 2 : async fn verify_parquet_regular_upload() {
648 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
649 2 :
650 2 : let config = ParquetConfig {
651 2 : propeties: Arc::new(WriterProperties::new()),
652 2 : rows_per_group: 2_000,
653 2 : file_size: 1_000_000,
654 2 : max_duration: time::Duration::from_secs(60),
655 2 : test_remote_failures: 2,
656 2 : };
657 2 :
658 2 : let (tx, mut rx) = mpsc::unbounded_channel();
659 2 :
660 2 : tokio::spawn(async move {
661 8 : for _ in 0..3 {
662 6 : let mut s = random_stream(3000);
663 18006 : while let Some(r) = s.next().await {
664 18000 : tx.send(r).unwrap();
665 18000 : }
666 6 : time::sleep(time::Duration::from_secs(70)).await
667 2 : }
668 2 : });
669 2 :
670 18142 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
671 178 : let file_stats = run_test(tmpdir.path(), config, rx).await;
672 2 :
673 2 : // files are smaller than the size threshold, but they took too long to fill so were flushed early
674 2 : assert_eq!(
675 2 : file_stats,
676 2 : [(659129, 2, 3001), (658842, 2, 3000), (658638, 2, 2999)],
677 2 : );
678 2 :
679 2 : tmpdir.close().unwrap();
680 2 : }
681 : }
|