Line data Source code
1 : use std::{sync::Arc, time::SystemTime};
2 :
3 : use anyhow::Context;
4 : use bytes::{buf::Writer, BufMut, BytesMut};
5 : use chrono::{Datelike, Timelike};
6 : use futures::{Stream, StreamExt};
7 : use parquet::{
8 : basic::Compression,
9 : file::{
10 : metadata::RowGroupMetaDataPtr,
11 : properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE},
12 : writer::SerializedFileWriter,
13 : },
14 : record::RecordWriter,
15 : };
16 : use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig};
17 : use tokio::{sync::mpsc, time};
18 : use tokio_util::sync::CancellationToken;
19 : use tracing::{debug, info, Span};
20 : use utils::backoff;
21 :
22 : use super::{RequestMonitoring, LOG_CHAN};
23 :
24 29 : #[derive(clap::Args, Clone, Debug)]
25 : pub struct ParquetUploadArgs {
26 : /// Storage location to upload the parquet files to.
27 : /// Encoded as toml (same format as pageservers), eg
28 : /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
29 : #[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
30 0 : parquet_upload_remote_storage: OptRemoteStorageConfig,
31 :
32 : /// How many rows to include in a row group
33 29 : #[clap(long, default_value_t = 8192)]
34 0 : parquet_upload_row_group_size: usize,
35 :
36 : /// How large each column page should be in bytes
37 29 : #[clap(long, default_value_t = DEFAULT_PAGE_SIZE)]
38 0 : parquet_upload_page_size: usize,
39 :
40 : /// How large the total parquet file should be in bytes
41 29 : #[clap(long, default_value_t = 100_000_000)]
42 0 : parquet_upload_size: i64,
43 :
44 : /// How long to wait before forcing a file upload
45 : #[clap(long, default_value = "20m", value_parser = humantime::parse_duration)]
46 0 : parquet_upload_maximum_duration: tokio::time::Duration,
47 :
48 : /// What level of compression to use
49 29 : #[clap(long, default_value_t = Compression::UNCOMPRESSED)]
50 0 : parquet_upload_compression: Compression,
51 : }
52 :
53 : /// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
54 : /// runtime type errors from the value parser we use.
55 : type OptRemoteStorageConfig = Option<RemoteStorageConfig>;
56 :
57 58 : fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
58 58 : RemoteStorageConfig::from_toml(&s.parse()?)
59 58 : }
60 :
61 : // Occasional network issues and such can cause remote operations to fail, and
62 : // that's expected. If a upload fails, we log it at info-level, and retry.
63 : // But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
64 : // level instead, as repeated failures can mean a more serious problem. If it
65 : // fails more than FAILED_UPLOAD_RETRIES times, we give up
66 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
67 : pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
68 :
69 : // the parquet crate leaves a lot to be desired...
70 : // what follows is an attempt to write parquet files with minimal allocs.
71 : // complication: parquet is a columnar format, while we want to write in as rows.
72 : // design:
73 : // * we batch up to 1024 rows, then flush them into a 'row group'
74 : // * after each rowgroup write, we check the length of the file and upload to s3 if large enough
75 :
76 5016010 : #[derive(parquet_derive::ParquetRecordWriter)]
77 : struct RequestData {
78 : region: &'static str,
79 : protocol: &'static str,
80 : /// Must be UTC. The derive macro doesn't like the timezones
81 : timestamp: chrono::NaiveDateTime,
82 : session_id: uuid::Uuid,
83 : peer_addr: String,
84 : username: Option<String>,
85 : application_name: Option<String>,
86 : endpoint_id: Option<String>,
87 : project: Option<String>,
88 : branch: Option<String>,
89 : error: Option<&'static str>,
90 : /// Success is counted if we form a HTTP response with sql rows inside
91 : /// Or if we make it to proxy_pass
92 : success: bool,
93 : /// Tracks time from session start (HTTP request/libpq TCP handshake)
94 : /// Through to success/failure
95 : duration_us: u64,
96 : }
97 :
98 : impl From<RequestMonitoring> for RequestData {
99 0 : fn from(value: RequestMonitoring) -> Self {
100 0 : Self {
101 0 : session_id: value.session_id,
102 0 : peer_addr: value.peer_addr.to_string(),
103 0 : timestamp: value.first_packet.naive_utc(),
104 0 : username: value.user.as_deref().map(String::from),
105 0 : application_name: value.application.as_deref().map(String::from),
106 0 : endpoint_id: value.endpoint_id.as_deref().map(String::from),
107 0 : project: value.project.as_deref().map(String::from),
108 0 : branch: value.branch.as_deref().map(String::from),
109 0 : protocol: value.protocol,
110 0 : region: value.region,
111 0 : error: value.error_kind.as_ref().map(|e| e.to_str()),
112 0 : success: value.success,
113 0 : duration_us: SystemTime::from(value.first_packet)
114 0 : .elapsed()
115 0 : .unwrap_or_default()
116 0 : .as_micros() as u64, // 584 millenia... good enough
117 0 : }
118 0 : }
119 : }
120 :
121 : /// Parquet request context worker
122 : ///
123 : /// It listened on a channel for all completed requests, extracts the data and writes it into a parquet file,
124 : /// then uploads a completed batch to S3
125 23 : pub async fn worker(
126 23 : cancellation_token: CancellationToken,
127 23 : config: ParquetUploadArgs,
128 23 : ) -> anyhow::Result<()> {
129 23 : let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
130 23 : tracing::warn!("parquet request upload: no s3 bucket configured");
131 23 : return Ok(());
132 : };
133 :
134 0 : let (tx, mut rx) = mpsc::unbounded_channel();
135 0 : LOG_CHAN.set(tx.downgrade()).unwrap();
136 0 :
137 0 : // setup row stream that will close on cancellation
138 0 : tokio::spawn(async move {
139 0 : cancellation_token.cancelled().await;
140 : // dropping this sender will cause the channel to close only once
141 : // all the remaining inflight requests have been completed.
142 0 : drop(tx);
143 0 : });
144 0 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
145 0 : let rx = rx.map(RequestData::from);
146 :
147 0 : let storage =
148 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?;
149 :
150 0 : let properties = WriterProperties::builder()
151 0 : .set_data_page_size_limit(config.parquet_upload_page_size)
152 0 : .set_compression(config.parquet_upload_compression);
153 0 :
154 0 : let parquet_config = ParquetConfig {
155 0 : propeties: Arc::new(properties.build()),
156 0 : rows_per_group: config.parquet_upload_row_group_size,
157 0 : file_size: config.parquet_upload_size,
158 0 : max_duration: config.parquet_upload_maximum_duration,
159 0 :
160 0 : #[cfg(any(test, feature = "testing"))]
161 0 : test_remote_failures: 0,
162 0 : };
163 0 :
164 0 : worker_inner(storage, rx, parquet_config).await
165 23 : }
166 :
167 : struct ParquetConfig {
168 : propeties: WriterPropertiesPtr,
169 : rows_per_group: usize,
170 : file_size: i64,
171 :
172 : max_duration: tokio::time::Duration,
173 :
174 : #[cfg(any(test, feature = "testing"))]
175 : test_remote_failures: u64,
176 : }
177 :
178 10 : async fn worker_inner(
179 10 : storage: GenericRemoteStorage,
180 10 : rx: impl Stream<Item = RequestData>,
181 10 : config: ParquetConfig,
182 10 : ) -> anyhow::Result<()> {
183 : #[cfg(any(test, feature = "testing"))]
184 10 : let storage = if config.test_remote_failures > 0 {
185 4 : GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures)
186 : } else {
187 6 : storage
188 : };
189 :
190 10 : let mut rx = std::pin::pin!(rx);
191 10 :
192 10 : let mut rows = Vec::with_capacity(config.rows_per_group);
193 :
194 10 : let schema = rows.as_slice().schema()?;
195 10 : let buffer = BytesMut::new();
196 10 : let w = buffer.writer();
197 10 : let mut w = SerializedFileWriter::new(w, schema.clone(), config.propeties.clone())?;
198 :
199 10 : let mut last_upload = time::Instant::now();
200 10 :
201 10 : let mut len = 0;
202 418010 : while let Some(row) = rx.next().await {
203 418000 : rows.push(row);
204 418000 : let force = last_upload.elapsed() > config.max_duration;
205 418000 : if rows.len() == config.rows_per_group || force {
206 210 : let rg_meta;
207 210 : (rows, w, rg_meta) = flush_rows(rows, w).await?;
208 210 : len += rg_meta.compressed_size();
209 417790 : }
210 418000 : if len > config.file_size || force {
211 54 : last_upload = time::Instant::now();
212 225 : let file = upload_parquet(w, len, &storage).await?;
213 54 : w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
214 54 : len = 0;
215 417946 : }
216 : }
217 :
218 10 : if !rows.is_empty() {
219 2 : let rg_meta;
220 2 : (_, w, rg_meta) = flush_rows(rows, w).await?;
221 2 : len += rg_meta.compressed_size();
222 8 : }
223 :
224 10 : if !w.flushed_row_groups().is_empty() {
225 32 : let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
226 2 : }
227 :
228 10 : Ok(())
229 10 : }
230 :
231 212 : async fn flush_rows<W>(
232 212 : rows: Vec<RequestData>,
233 212 : mut w: SerializedFileWriter<W>,
234 212 : ) -> anyhow::Result<(
235 212 : Vec<RequestData>,
236 212 : SerializedFileWriter<W>,
237 212 : RowGroupMetaDataPtr,
238 212 : )>
239 212 : where
240 212 : W: std::io::Write + Send + 'static,
241 212 : {
242 212 : let span = Span::current();
243 212 : let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || {
244 212 : let _enter = span.enter();
245 :
246 212 : let mut rg = w.next_row_group()?;
247 212 : rows.as_slice().write_to_row_group(&mut rg)?;
248 212 : let rg_meta = rg.close()?;
249 :
250 212 : let size = rg_meta.compressed_size();
251 212 : let compression = rg_meta.compressed_size() as f64 / rg_meta.total_byte_size() as f64;
252 212 :
253 212 : debug!(size, compression, "flushed row group to parquet file");
254 :
255 212 : Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
256 212 : })
257 212 : .await
258 212 : .unwrap()?;
259 :
260 212 : rows.clear();
261 212 : Ok((rows, w, rg_meta))
262 212 : }
263 :
264 62 : async fn upload_parquet(
265 62 : w: SerializedFileWriter<Writer<BytesMut>>,
266 62 : len: i64,
267 62 : storage: &GenericRemoteStorage,
268 62 : ) -> anyhow::Result<Writer<BytesMut>> {
269 62 : let len_uncompressed = w
270 62 : .flushed_row_groups()
271 62 : .iter()
272 212 : .map(|rg| rg.total_byte_size())
273 62 : .sum::<i64>();
274 :
275 : // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
276 : // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
277 62 : let (writer, metadata) = tokio::task::spawn_blocking(move || w.finish())
278 62 : .await
279 62 : .unwrap()?;
280 :
281 62 : let mut buffer = writer.into_inner();
282 62 : let data = buffer.split().freeze();
283 62 :
284 62 : let compression = len as f64 / len_uncompressed as f64;
285 62 : let size = data.len();
286 62 : let now = chrono::Utc::now();
287 62 : let id = uuid::Uuid::new_v7(uuid::Timestamp::from_unix(
288 62 : uuid::NoContext,
289 62 : // we won't be running this in 1970. this cast is ok
290 62 : now.timestamp() as u64,
291 62 : now.timestamp_subsec_nanos(),
292 62 : ));
293 :
294 0 : info!(
295 0 : %id,
296 0 : rows = metadata.num_rows,
297 0 : size, compression, "uploading request parquet file"
298 0 : );
299 :
300 62 : let year = now.year();
301 62 : let month = now.month();
302 62 : let day = now.day();
303 62 : let hour = now.hour();
304 : // segment files by time for S3 performance
305 62 : let path = RemotePath::from_string(&format!(
306 62 : "{year:04}/{month:02}/{day:02}/{hour:02}/requests_{id}.parquet"
307 62 : ))?;
308 62 : backoff::retry(
309 86 : || async {
310 86 : let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
311 195 : storage.upload(stream, data.len(), &path, None).await
312 86 : },
313 62 : |_e| false,
314 62 : FAILED_UPLOAD_WARN_THRESHOLD,
315 62 : FAILED_UPLOAD_MAX_RETRIES,
316 62 : "request_data_upload",
317 62 : // we don't want cancellation to interrupt here, so we make a dummy cancel token
318 62 : &CancellationToken::new(),
319 62 : )
320 195 : .await
321 62 : .ok_or_else(|| anyhow::anyhow!("Cancelled"))
322 62 : .and_then(|x| x)
323 62 : .context("request_data_upload")?;
324 :
325 62 : Ok(buffer.writer())
326 62 : }
327 :
328 : #[cfg(test)]
329 : mod tests {
330 : use std::{net::Ipv4Addr, num::NonZeroUsize, sync::Arc};
331 :
332 : use camino::Utf8Path;
333 : use clap::Parser;
334 : use futures::{Stream, StreamExt};
335 : use itertools::Itertools;
336 : use parquet::{
337 : basic::{Compression, ZstdLevel},
338 : file::{
339 : properties::{WriterProperties, DEFAULT_PAGE_SIZE},
340 : reader::FileReader,
341 : serialized_reader::SerializedFileReader,
342 : },
343 : };
344 : use rand::{rngs::StdRng, Rng, SeedableRng};
345 : use remote_storage::{
346 : GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config,
347 : DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
348 : };
349 : use tokio::{sync::mpsc, time};
350 : use walkdir::WalkDir;
351 :
352 : use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};
353 :
354 4 : #[derive(Parser)]
355 : struct ProxyCliArgs {
356 : #[clap(flatten)]
357 : parquet_upload: ParquetUploadArgs,
358 : }
359 :
360 2 : #[test]
361 2 : fn default_parser() {
362 2 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from(["proxy"]);
363 2 : assert_eq!(parquet_upload.parquet_upload_remote_storage, None);
364 2 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 8192);
365 2 : assert_eq!(parquet_upload.parquet_upload_page_size, DEFAULT_PAGE_SIZE);
366 2 : assert_eq!(parquet_upload.parquet_upload_size, 100_000_000);
367 2 : assert_eq!(
368 2 : parquet_upload.parquet_upload_maximum_duration,
369 2 : time::Duration::from_secs(20 * 60)
370 2 : );
371 2 : assert_eq!(
372 2 : parquet_upload.parquet_upload_compression,
373 2 : Compression::UNCOMPRESSED
374 2 : );
375 2 : }
376 :
377 2 : #[test]
378 2 : fn full_parser() {
379 2 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from([
380 2 : "proxy",
381 2 : "--parquet-upload-remote-storage",
382 2 : "{bucket_name='default',prefix_in_bucket='proxy/',bucket_region='us-east-1',endpoint='http://minio:9000'}",
383 2 : "--parquet-upload-row-group-size",
384 2 : "100",
385 2 : "--parquet-upload-page-size",
386 2 : "10000",
387 2 : "--parquet-upload-size",
388 2 : "10000000",
389 2 : "--parquet-upload-maximum-duration",
390 2 : "10m",
391 2 : "--parquet-upload-compression",
392 2 : "zstd(5)",
393 2 : ]);
394 2 : assert_eq!(
395 2 : parquet_upload.parquet_upload_remote_storage,
396 2 : Some(RemoteStorageConfig {
397 2 : storage: RemoteStorageKind::AwsS3(S3Config {
398 2 : bucket_name: "default".into(),
399 2 : bucket_region: "us-east-1".into(),
400 2 : prefix_in_bucket: Some("proxy/".into()),
401 2 : endpoint: Some("http://minio:9000".into()),
402 2 : concurrency_limit: NonZeroUsize::new(
403 2 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
404 2 : )
405 2 : .unwrap(),
406 2 : max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
407 2 : })
408 2 : })
409 2 : );
410 2 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
411 2 : assert_eq!(parquet_upload.parquet_upload_page_size, 10000);
412 2 : assert_eq!(parquet_upload.parquet_upload_size, 10_000_000);
413 2 : assert_eq!(
414 2 : parquet_upload.parquet_upload_maximum_duration,
415 2 : time::Duration::from_secs(10 * 60)
416 2 : );
417 2 : assert_eq!(
418 2 : parquet_upload.parquet_upload_compression,
419 2 : Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
420 2 : );
421 2 : }
422 :
423 418000 : fn generate_request_data(rng: &mut impl Rng) -> RequestData {
424 418000 : RequestData {
425 418000 : session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(),
426 418000 : peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(),
427 418000 : timestamp: chrono::NaiveDateTime::from_timestamp_millis(
428 418000 : rng.gen_range(1703862754..1803862754),
429 418000 : )
430 418000 : .unwrap(),
431 418000 : application_name: Some("test".to_owned()),
432 418000 : username: Some(hex::encode(rng.gen::<[u8; 4]>())),
433 418000 : endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
434 418000 : project: Some(hex::encode(rng.gen::<[u8; 16]>())),
435 418000 : branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
436 418000 : protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
437 418000 : region: "us-east-1",
438 418000 : error: None,
439 418000 : success: rng.gen(),
440 418000 : duration_us: rng.gen_range(0..30_000_000),
441 418000 : }
442 418000 : }
443 :
444 14 : fn random_stream(len: usize) -> impl Stream<Item = RequestData> + Unpin {
445 14 : let mut rng = StdRng::from_seed([0x39; 32]);
446 14 : futures::stream::iter(
447 418000 : std::iter::repeat_with(move || generate_request_data(&mut rng)).take(len),
448 14 : )
449 14 : }
450 :
451 10 : async fn run_test(
452 10 : tmpdir: &Utf8Path,
453 10 : config: ParquetConfig,
454 10 : rx: impl Stream<Item = RequestData>,
455 10 : ) -> Vec<(u64, usize, i64)> {
456 10 : let remote_storage_config = RemoteStorageConfig {
457 10 : storage: RemoteStorageKind::LocalFs(tmpdir.to_path_buf()),
458 10 : };
459 10 : let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap();
460 10 :
461 609 : worker_inner(storage, rx, config).await.unwrap();
462 10 :
463 10 : let mut files = WalkDir::new(tmpdir.as_std_path())
464 10 : .into_iter()
465 112 : .filter_map(|entry| entry.ok())
466 112 : .filter(|entry| entry.file_type().is_file())
467 62 : .map(|entry| entry.path().to_path_buf())
468 10 : .collect_vec();
469 10 : files.sort();
470 10 :
471 10 : files
472 10 : .into_iter()
473 62 : .map(|path| std::fs::File::open(tmpdir.as_std_path().join(path)).unwrap())
474 62 : .map(|file| {
475 62 : (
476 62 : file.metadata().unwrap(),
477 62 : SerializedFileReader::new(file).unwrap().metadata().clone(),
478 62 : )
479 62 : })
480 62 : .map(|(file_meta, parquet_meta)| {
481 62 : (
482 62 : file_meta.len(),
483 62 : parquet_meta.num_row_groups(),
484 62 : parquet_meta.file_metadata().num_rows(),
485 62 : )
486 62 : })
487 10 : .collect()
488 10 : }
489 :
490 2 : #[tokio::test]
491 2 : async fn verify_parquet_no_compression() {
492 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
493 2 :
494 2 : let config = ParquetConfig {
495 2 : propeties: Arc::new(WriterProperties::new()),
496 2 : rows_per_group: 2_000,
497 2 : file_size: 1_000_000,
498 2 : max_duration: time::Duration::from_secs(20 * 60),
499 2 : test_remote_failures: 0,
500 2 : };
501 2 :
502 2 : let rx = random_stream(50_000);
503 124 : let file_stats = run_test(tmpdir.path(), config, rx).await;
504 :
505 2 : assert_eq!(
506 2 : file_stats,
507 2 : [
508 2 : (1087635, 3, 6000),
509 2 : (1087288, 3, 6000),
510 2 : (1087444, 3, 6000),
511 2 : (1087572, 3, 6000),
512 2 : (1087468, 3, 6000),
513 2 : (1087500, 3, 6000),
514 2 : (1087533, 3, 6000),
515 2 : (1087566, 3, 6000),
516 2 : (362671, 1, 2000)
517 2 : ],
518 2 : );
519 :
520 2 : tmpdir.close().unwrap();
521 : }
522 :
523 2 : #[tokio::test]
524 2 : async fn verify_parquet_min_compression() {
525 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
526 2 :
527 2 : let config = ParquetConfig {
528 2 : propeties: Arc::new(
529 2 : WriterProperties::builder()
530 2 : .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::default()))
531 2 : .build(),
532 2 : ),
533 2 : rows_per_group: 2_000,
534 2 : file_size: 1_000_000,
535 2 : max_duration: time::Duration::from_secs(20 * 60),
536 2 : test_remote_failures: 0,
537 2 : };
538 2 :
539 2 : let rx = random_stream(50_000);
540 91 : let file_stats = run_test(tmpdir.path(), config, rx).await;
541 :
542 : // with compression, there are fewer files with more rows per file
543 2 : assert_eq!(
544 2 : file_stats,
545 2 : [
546 2 : (1028637, 5, 10000),
547 2 : (1031969, 5, 10000),
548 2 : (1019900, 5, 10000),
549 2 : (1020365, 5, 10000),
550 2 : (1025010, 5, 10000)
551 2 : ],
552 2 : );
553 :
554 2 : tmpdir.close().unwrap();
555 : }
556 :
557 2 : #[tokio::test]
558 2 : async fn verify_parquet_strong_compression() {
559 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
560 2 :
561 2 : let config = ParquetConfig {
562 2 : propeties: Arc::new(
563 2 : WriterProperties::builder()
564 2 : .set_compression(parquet::basic::Compression::ZSTD(
565 2 : ZstdLevel::try_new(10).unwrap(),
566 2 : ))
567 2 : .build(),
568 2 : ),
569 2 : rows_per_group: 2_000,
570 2 : file_size: 1_000_000,
571 2 : max_duration: time::Duration::from_secs(20 * 60),
572 2 : test_remote_failures: 0,
573 2 : };
574 2 :
575 2 : let rx = random_stream(50_000);
576 92 : let file_stats = run_test(tmpdir.path(), config, rx).await;
577 :
578 : // with strong compression, the files are smaller
579 2 : assert_eq!(
580 2 : file_stats,
581 2 : [
582 2 : (1210770, 6, 12000),
583 2 : (1211036, 6, 12000),
584 2 : (1210990, 6, 12000),
585 2 : (1210861, 6, 12000),
586 2 : (202073, 1, 2000)
587 2 : ],
588 2 : );
589 :
590 2 : tmpdir.close().unwrap();
591 : }
592 :
593 2 : #[tokio::test]
594 2 : async fn verify_parquet_unreliable_upload() {
595 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
596 2 :
597 2 : let config = ParquetConfig {
598 2 : propeties: Arc::new(WriterProperties::new()),
599 2 : rows_per_group: 2_000,
600 2 : file_size: 1_000_000,
601 2 : max_duration: time::Duration::from_secs(20 * 60),
602 2 : test_remote_failures: 2,
603 2 : };
604 2 :
605 2 : let rx = random_stream(50_000);
606 124 : let file_stats = run_test(tmpdir.path(), config, rx).await;
607 :
608 2 : assert_eq!(
609 2 : file_stats,
610 2 : [
611 2 : (1087635, 3, 6000),
612 2 : (1087288, 3, 6000),
613 2 : (1087444, 3, 6000),
614 2 : (1087572, 3, 6000),
615 2 : (1087468, 3, 6000),
616 2 : (1087500, 3, 6000),
617 2 : (1087533, 3, 6000),
618 2 : (1087566, 3, 6000),
619 2 : (362671, 1, 2000)
620 2 : ],
621 2 : );
622 :
623 2 : tmpdir.close().unwrap();
624 : }
625 :
626 2 : #[tokio::test(start_paused = true)]
627 2 : async fn verify_parquet_regular_upload() {
628 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
629 2 :
630 2 : let config = ParquetConfig {
631 2 : propeties: Arc::new(WriterProperties::new()),
632 2 : rows_per_group: 2_000,
633 2 : file_size: 1_000_000,
634 2 : max_duration: time::Duration::from_secs(60),
635 2 : test_remote_failures: 2,
636 2 : };
637 2 :
638 2 : let (tx, mut rx) = mpsc::unbounded_channel();
639 2 :
640 2 : tokio::spawn(async move {
641 8 : for _ in 0..3 {
642 6 : let mut s = random_stream(3000);
643 18006 : while let Some(r) = s.next().await {
644 18000 : tx.send(r).unwrap();
645 18000 : }
646 6 : time::sleep(time::Duration::from_secs(70)).await
647 : }
648 2 : });
649 2 :
650 18142 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
651 178 : let file_stats = run_test(tmpdir.path(), config, rx).await;
652 :
653 : // files are smaller than the size threshold, but they took too long to fill so were flushed early
654 2 : assert_eq!(
655 2 : file_stats,
656 2 : [(545264, 2, 3001), (545025, 2, 3000), (544857, 2, 2999)],
657 2 : );
658 :
659 2 : tmpdir.close().unwrap();
660 : }
661 : }
|