TLA Line data Source code
1 : use std::sync::Arc;
2 :
3 : use anyhow::Context;
4 : use bytes::BytesMut;
5 : use futures::{Stream, StreamExt};
6 : use parquet::{
7 : basic::Compression,
8 : file::{
9 : metadata::RowGroupMetaDataPtr,
10 : properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE},
11 : writer::SerializedFileWriter,
12 : },
13 : record::RecordWriter,
14 : };
15 : use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig};
16 : use tokio::{sync::mpsc, time};
17 : use tokio_util::sync::CancellationToken;
18 : use tracing::{debug, info, Span};
19 : use utils::backoff;
20 :
21 : use super::{RequestMonitoring, LOG_CHAN};
22 :
23 CBC 25 : #[derive(clap::Args, Clone, Debug)]
24 : pub struct ParquetUploadArgs {
25 : /// Storage location to upload the parquet files to.
26 : /// Encoded as toml (same format as pageservers), eg
27 : /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
28 : #[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
29 UBC 0 : parquet_upload_remote_storage: OptRemoteStorageConfig,
30 :
31 : /// How many rows to include in a row group
32 CBC 25 : #[clap(long, default_value_t = 8192)]
33 UBC 0 : parquet_upload_row_group_size: usize,
34 :
35 : /// How large each column page should be in bytes
36 CBC 25 : #[clap(long, default_value_t = DEFAULT_PAGE_SIZE)]
37 UBC 0 : parquet_upload_page_size: usize,
38 :
39 : /// How large the total parquet file should be in bytes
40 CBC 25 : #[clap(long, default_value_t = 100_000_000)]
41 UBC 0 : parquet_upload_size: i64,
42 :
43 : /// How long to wait before forcing a file upload
44 : #[clap(long, default_value = "20m", value_parser = humantime::parse_duration)]
45 0 : parquet_upload_maximum_duration: tokio::time::Duration,
46 :
47 : /// What level of compression to use
48 CBC 25 : #[clap(long, default_value_t = Compression::UNCOMPRESSED)]
49 UBC 0 : parquet_upload_compression: Compression,
50 : }
51 :
52 : /// Hack to avoid clap being smarter. If you don't use this type alias, clap assumes more about the optional state and you get
53 : /// runtime type errors from the value parser we use.
54 : type OptRemoteStorageConfig = Option<RemoteStorageConfig>;
55 :
56 CBC 50 : fn remote_storage_from_toml(s: &str) -> anyhow::Result<OptRemoteStorageConfig> {
57 50 : RemoteStorageConfig::from_toml(&s.parse()?)
58 50 : }
59 :
60 : // Occasional network issues and such can cause remote operations to fail, and
61 : // that's expected. If a upload fails, we log it at info-level, and retry.
62 : // But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
63 : // level instead, as repeated failures can mean a more serious problem. If it
64 : // fails more than FAILED_UPLOAD_RETRIES times, we give up
65 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
66 : pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
67 :
68 : // the parquet crate leaves a lot to be desired...
69 : // what follows is an attempt to write parquet files with minimal allocs.
70 : // complication: parquet is a columnar format, while we want to write in as rows.
71 : // design:
72 : // * we batch up to 1024 rows, then flush them into a 'row group'
73 : // * after each rowgroup write, we check the length of the file and upload to s3 if large enough
74 :
75 2508005 : #[derive(parquet_derive::ParquetRecordWriter)]
76 : struct RequestData {
77 : region: &'static str,
78 : protocol: &'static str,
79 : /// Must be UTC. The derive macro doesn't like the timezones
80 : timestamp: chrono::NaiveDateTime,
81 : session_id: uuid::Uuid,
82 : peer_addr: String,
83 : username: Option<String>,
84 : application_name: Option<String>,
85 : endpoint_id: Option<String>,
86 : project: Option<String>,
87 : branch: Option<String>,
88 : error: Option<&'static str>,
89 : }
90 :
91 : impl From<RequestMonitoring> for RequestData {
92 UBC 0 : fn from(value: RequestMonitoring) -> Self {
93 0 : Self {
94 0 : session_id: value.session_id,
95 0 : peer_addr: value.peer_addr.to_string(),
96 0 : timestamp: value.first_packet.naive_utc(),
97 0 : username: value.user.as_deref().map(String::from),
98 0 : application_name: value.application.as_deref().map(String::from),
99 0 : endpoint_id: value.endpoint_id.as_deref().map(String::from),
100 0 : project: value.project.as_deref().map(String::from),
101 0 : branch: value.branch.as_deref().map(String::from),
102 0 : protocol: value.protocol,
103 0 : region: value.region,
104 0 : error: value.error_kind.as_ref().map(|e| e.to_str()),
105 0 : }
106 0 : }
107 : }
108 :
109 : /// Parquet request context worker
110 : ///
111 : /// It listened on a channel for all completed requests, extracts the data and writes it into a parquet file,
112 : /// then uploads a completed batch to S3
113 CBC 22 : pub async fn worker(
114 22 : cancellation_token: CancellationToken,
115 22 : config: ParquetUploadArgs,
116 22 : ) -> anyhow::Result<()> {
117 22 : let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
118 22 : tracing::warn!("parquet request upload: no s3 bucket configured");
119 22 : return Ok(());
120 : };
121 :
122 UBC 0 : let (tx, mut rx) = mpsc::unbounded_channel();
123 0 : LOG_CHAN.set(tx.downgrade()).unwrap();
124 0 :
125 0 : // setup row stream that will close on cancellation
126 0 : tokio::spawn(async move {
127 0 : cancellation_token.cancelled().await;
128 : // dropping this sender will cause the channel to close only once
129 : // all the remaining inflight requests have been completed.
130 0 : drop(tx);
131 0 : });
132 0 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
133 0 : let rx = rx.map(RequestData::from);
134 :
135 0 : let storage =
136 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?;
137 :
138 0 : let properties = WriterProperties::builder()
139 0 : .set_data_page_size_limit(config.parquet_upload_page_size)
140 0 : .set_compression(config.parquet_upload_compression);
141 0 :
142 0 : let parquet_config = ParquetConfig {
143 0 : propeties: Arc::new(properties.build()),
144 0 : rows_per_group: config.parquet_upload_row_group_size,
145 0 : file_size: config.parquet_upload_size,
146 0 : max_duration: config.parquet_upload_maximum_duration,
147 0 :
148 0 : #[cfg(any(test, feature = "testing"))]
149 0 : test_remote_failures: 0,
150 0 : };
151 0 :
152 0 : worker_inner(storage, rx, parquet_config).await
153 CBC 22 : }
154 :
155 : struct ParquetConfig {
156 : propeties: WriterPropertiesPtr,
157 : rows_per_group: usize,
158 : file_size: i64,
159 :
160 : max_duration: tokio::time::Duration,
161 :
162 : #[cfg(any(test, feature = "testing"))]
163 : test_remote_failures: u64,
164 : }
165 :
166 5 : async fn worker_inner(
167 5 : storage: GenericRemoteStorage,
168 5 : rx: impl Stream<Item = RequestData>,
169 5 : config: ParquetConfig,
170 5 : ) -> anyhow::Result<()> {
171 : #[cfg(any(test, feature = "testing"))]
172 5 : let storage = if config.test_remote_failures > 0 {
173 2 : GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures)
174 : } else {
175 3 : storage
176 : };
177 :
178 5 : let mut rx = std::pin::pin!(rx);
179 5 :
180 5 : let mut rows = Vec::with_capacity(config.rows_per_group);
181 :
182 5 : let schema = rows.as_slice().schema()?;
183 5 : let file = BytesWriter::default();
184 5 : let mut w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
185 :
186 5 : let mut last_upload = time::Instant::now();
187 5 :
188 5 : let mut len = 0;
189 209005 : while let Some(row) = rx.next().await {
190 209000 : rows.push(row);
191 209000 : let force = last_upload.elapsed() > config.max_duration;
192 209000 : if rows.len() == config.rows_per_group || force {
193 105 : let rg_meta;
194 105 : (rows, w, rg_meta) = flush_rows(rows, w).await?;
195 105 : len += rg_meta.compressed_size();
196 208895 : }
197 209000 : if len > config.file_size || force {
198 26 : last_upload = time::Instant::now();
199 104 : let file = upload_parquet(w, len, &storage).await?;
200 26 : w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
201 26 : len = 0;
202 208974 : }
203 : }
204 :
205 5 : if !rows.is_empty() {
206 1 : let rg_meta;
207 1 : (_, w, rg_meta) = flush_rows(rows, w).await?;
208 1 : len += rg_meta.compressed_size();
209 4 : }
210 :
211 5 : if !w.flushed_row_groups().is_empty() {
212 20 : let _: BytesWriter = upload_parquet(w, len, &storage).await?;
213 UBC 0 : }
214 :
215 CBC 5 : Ok(())
216 5 : }
217 :
218 106 : async fn flush_rows(
219 106 : rows: Vec<RequestData>,
220 106 : mut w: SerializedFileWriter<BytesWriter>,
221 106 : ) -> anyhow::Result<(
222 106 : Vec<RequestData>,
223 106 : SerializedFileWriter<BytesWriter>,
224 106 : RowGroupMetaDataPtr,
225 106 : )> {
226 106 : let span = Span::current();
227 106 : let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || {
228 106 : let _enter = span.enter();
229 :
230 106 : let mut rg = w.next_row_group()?;
231 106 : rows.as_slice().write_to_row_group(&mut rg)?;
232 106 : let rg_meta = rg.close()?;
233 :
234 106 : let size = rg_meta.compressed_size();
235 106 : let compression = rg_meta.compressed_size() as f64 / rg_meta.total_byte_size() as f64;
236 106 :
237 106 : debug!(size, compression, "flushed row group to parquet file");
238 :
239 106 : Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
240 106 : })
241 106 : .await
242 106 : .unwrap()?;
243 :
244 106 : rows.clear();
245 106 : Ok((rows, w, rg_meta))
246 106 : }
247 :
248 31 : async fn upload_parquet(
249 31 : w: SerializedFileWriter<BytesWriter>,
250 31 : len: i64,
251 31 : storage: &GenericRemoteStorage,
252 31 : ) -> anyhow::Result<BytesWriter> {
253 31 : let len_uncompressed = w
254 31 : .flushed_row_groups()
255 31 : .iter()
256 106 : .map(|rg| rg.total_byte_size())
257 31 : .sum::<i64>();
258 :
259 : // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
260 : // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
261 31 : let (mut file, metadata) = tokio::task::spawn_blocking(move || w.finish())
262 31 : .await
263 31 : .unwrap()?;
264 :
265 31 : let data = file.buf.split().freeze();
266 31 :
267 31 : let compression = len as f64 / len_uncompressed as f64;
268 31 : let size = data.len();
269 31 : let id = uuid::Uuid::now_v7();
270 :
271 UBC 0 : info!(
272 0 : %id,
273 0 : rows = metadata.num_rows,
274 0 : size, compression, "uploading request parquet file"
275 0 : );
276 :
277 CBC 31 : let path = RemotePath::from_string(&format!("requests_{id}.parquet"))?;
278 31 : backoff::retry(
279 43 : || async {
280 43 : let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
281 93 : storage.upload(stream, data.len(), &path, None).await
282 43 : },
283 31 : |_e| false,
284 31 : FAILED_UPLOAD_WARN_THRESHOLD,
285 31 : FAILED_UPLOAD_MAX_RETRIES,
286 31 : "request_data_upload",
287 31 : // we don't want cancellation to interrupt here, so we make a dummy cancel token
288 31 : backoff::Cancel::new(CancellationToken::new(), || anyhow::anyhow!("Cancelled")),
289 31 : )
290 93 : .await
291 31 : .context("request_data_upload")?;
292 :
293 31 : Ok(file)
294 31 : }
295 :
296 : // why doesn't BytesMut impl io::Write?
297 5 : #[derive(Default)]
298 : struct BytesWriter {
299 : buf: BytesMut,
300 : }
301 :
302 : impl std::io::Write for BytesWriter {
303 171877 : fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
304 171877 : self.buf.extend_from_slice(buf);
305 171877 : Ok(buf.len())
306 171877 : }
307 :
308 1166 : fn flush(&mut self) -> std::io::Result<()> {
309 1166 : Ok(())
310 1166 : }
311 : }
312 :
313 : #[cfg(test)]
314 : mod tests {
315 : use std::{net::Ipv4Addr, num::NonZeroUsize, sync::Arc};
316 :
317 : use camino::Utf8Path;
318 : use clap::Parser;
319 : use futures::{Stream, StreamExt};
320 : use itertools::Itertools;
321 : use parquet::{
322 : basic::{Compression, ZstdLevel},
323 : file::{
324 : properties::{WriterProperties, DEFAULT_PAGE_SIZE},
325 : reader::FileReader,
326 : serialized_reader::SerializedFileReader,
327 : },
328 : };
329 : use rand::{rngs::StdRng, Rng, SeedableRng};
330 : use remote_storage::{
331 : GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config,
332 : DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
333 : };
334 : use tokio::{sync::mpsc, time};
335 :
336 : use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};
337 :
338 2 : #[derive(Parser)]
339 : struct ProxyCliArgs {
340 : #[clap(flatten)]
341 : parquet_upload: ParquetUploadArgs,
342 : }
343 :
344 1 : #[test]
345 1 : fn default_parser() {
346 1 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from(["proxy"]);
347 1 : assert_eq!(parquet_upload.parquet_upload_remote_storage, None);
348 1 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 8192);
349 1 : assert_eq!(parquet_upload.parquet_upload_page_size, DEFAULT_PAGE_SIZE);
350 1 : assert_eq!(parquet_upload.parquet_upload_size, 100_000_000);
351 1 : assert_eq!(
352 1 : parquet_upload.parquet_upload_maximum_duration,
353 1 : time::Duration::from_secs(20 * 60)
354 1 : );
355 1 : assert_eq!(
356 1 : parquet_upload.parquet_upload_compression,
357 1 : Compression::UNCOMPRESSED
358 1 : );
359 1 : }
360 :
361 1 : #[test]
362 1 : fn full_parser() {
363 1 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from([
364 1 : "proxy",
365 1 : "--parquet-upload-remote-storage",
366 1 : "{bucket_name='default',prefix_in_bucket='proxy/',bucket_region='us-east-1',endpoint='http://minio:9000'}",
367 1 : "--parquet-upload-row-group-size",
368 1 : "100",
369 1 : "--parquet-upload-page-size",
370 1 : "10000",
371 1 : "--parquet-upload-size",
372 1 : "10000000",
373 1 : "--parquet-upload-maximum-duration",
374 1 : "10m",
375 1 : "--parquet-upload-compression",
376 1 : "zstd(5)",
377 1 : ]);
378 1 : assert_eq!(
379 1 : parquet_upload.parquet_upload_remote_storage,
380 1 : Some(RemoteStorageConfig {
381 1 : storage: RemoteStorageKind::AwsS3(S3Config {
382 1 : bucket_name: "default".into(),
383 1 : bucket_region: "us-east-1".into(),
384 1 : prefix_in_bucket: Some("proxy/".into()),
385 1 : endpoint: Some("http://minio:9000".into()),
386 1 : concurrency_limit: NonZeroUsize::new(
387 1 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
388 1 : )
389 1 : .unwrap(),
390 1 : max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
391 1 : })
392 1 : })
393 1 : );
394 1 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
395 1 : assert_eq!(parquet_upload.parquet_upload_page_size, 10000);
396 1 : assert_eq!(parquet_upload.parquet_upload_size, 10_000_000);
397 1 : assert_eq!(
398 1 : parquet_upload.parquet_upload_maximum_duration,
399 1 : time::Duration::from_secs(10 * 60)
400 1 : );
401 1 : assert_eq!(
402 1 : parquet_upload.parquet_upload_compression,
403 1 : Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
404 1 : );
405 1 : }
406 :
407 209000 : fn generate_request_data(rng: &mut impl Rng) -> RequestData {
408 209000 : RequestData {
409 209000 : session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(),
410 209000 : peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(),
411 209000 : timestamp: chrono::NaiveDateTime::from_timestamp_millis(
412 209000 : rng.gen_range(1703862754..1803862754),
413 209000 : )
414 209000 : .unwrap(),
415 209000 : application_name: Some("test".to_owned()),
416 209000 : username: Some(hex::encode(rng.gen::<[u8; 4]>())),
417 209000 : endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
418 209000 : project: Some(hex::encode(rng.gen::<[u8; 16]>())),
419 209000 : branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
420 209000 : protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
421 209000 : region: "us-east-1",
422 209000 : error: None,
423 209000 : }
424 209000 : }
425 :
426 7 : fn random_stream(len: usize) -> impl Stream<Item = RequestData> + Unpin {
427 7 : let mut rng = StdRng::from_seed([0x39; 32]);
428 7 : futures::stream::iter(
429 209000 : std::iter::repeat_with(move || generate_request_data(&mut rng)).take(len),
430 7 : )
431 7 : }
432 :
433 5 : async fn run_test(
434 5 : tmpdir: &Utf8Path,
435 5 : config: ParquetConfig,
436 5 : rx: impl Stream<Item = RequestData>,
437 5 : ) -> Vec<(u64, usize, i64)> {
438 5 : let remote_storage_config = RemoteStorageConfig {
439 5 : storage: RemoteStorageKind::LocalFs(tmpdir.to_path_buf()),
440 5 : };
441 5 : let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap();
442 5 :
443 300 : worker_inner(storage, rx, config).await.unwrap();
444 5 :
445 5 : let mut files = std::fs::read_dir(tmpdir.as_std_path())
446 5 : .unwrap()
447 31 : .map(|entry| entry.unwrap().path())
448 5 : .collect_vec();
449 5 : files.sort();
450 5 :
451 5 : files
452 5 : .into_iter()
453 31 : .map(|path| std::fs::File::open(tmpdir.as_std_path().join(path)).unwrap())
454 31 : .map(|file| {
455 31 : (
456 31 : file.metadata().unwrap(),
457 31 : SerializedFileReader::new(file).unwrap().metadata().clone(),
458 31 : )
459 31 : })
460 31 : .map(|(file_meta, parquet_meta)| {
461 31 : (
462 31 : file_meta.len(),
463 31 : parquet_meta.num_row_groups(),
464 31 : parquet_meta.file_metadata().num_rows(),
465 31 : )
466 31 : })
467 5 : .collect()
468 5 : }
469 :
470 1 : #[tokio::test]
471 1 : async fn verify_parquet_no_compression() {
472 1 : let tmpdir = camino_tempfile::tempdir().unwrap();
473 1 :
474 1 : let config = ParquetConfig {
475 1 : propeties: Arc::new(WriterProperties::new()),
476 1 : rows_per_group: 2_000,
477 1 : file_size: 1_000_000,
478 1 : max_duration: time::Duration::from_secs(20 * 60),
479 1 : test_remote_failures: 0,
480 1 : };
481 1 :
482 1 : let rx = random_stream(50_000);
483 61 : let file_stats = run_test(tmpdir.path(), config, rx).await;
484 :
485 1 : assert_eq!(
486 1 : file_stats,
487 1 : [
488 1 : (1029153, 3, 6000),
489 1 : (1029075, 3, 6000),
490 1 : (1029216, 3, 6000),
491 1 : (1029129, 3, 6000),
492 1 : (1029250, 3, 6000),
493 1 : (1029017, 3, 6000),
494 1 : (1029175, 3, 6000),
495 1 : (1029247, 3, 6000),
496 1 : (343124, 1, 2000)
497 1 : ],
498 1 : );
499 :
500 1 : tmpdir.close().unwrap();
501 : }
502 :
503 1 : #[tokio::test]
504 1 : async fn verify_parquet_min_compression() {
505 1 : let tmpdir = camino_tempfile::tempdir().unwrap();
506 1 :
507 1 : let config = ParquetConfig {
508 1 : propeties: Arc::new(
509 1 : WriterProperties::builder()
510 1 : .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::default()))
511 1 : .build(),
512 1 : ),
513 1 : rows_per_group: 2_000,
514 1 : file_size: 1_000_000,
515 1 : max_duration: time::Duration::from_secs(20 * 60),
516 1 : test_remote_failures: 0,
517 1 : };
518 1 :
519 1 : let rx = random_stream(50_000);
520 45 : let file_stats = run_test(tmpdir.path(), config, rx).await;
521 :
522 : // with compression, there are fewer files with more rows per file
523 1 : assert_eq!(
524 1 : file_stats,
525 1 : [
526 1 : (1166201, 6, 12000),
527 1 : (1163577, 6, 12000),
528 1 : (1164641, 6, 12000),
529 1 : (1168772, 6, 12000),
530 1 : (196761, 1, 2000)
531 1 : ],
532 1 : );
533 :
534 1 : tmpdir.close().unwrap();
535 : }
536 :
537 1 : #[tokio::test]
538 1 : async fn verify_parquet_strong_compression() {
539 1 : let tmpdir = camino_tempfile::tempdir().unwrap();
540 1 :
541 1 : let config = ParquetConfig {
542 1 : propeties: Arc::new(
543 1 : WriterProperties::builder()
544 1 : .set_compression(parquet::basic::Compression::ZSTD(
545 1 : ZstdLevel::try_new(10).unwrap(),
546 1 : ))
547 1 : .build(),
548 1 : ),
549 1 : rows_per_group: 2_000,
550 1 : file_size: 1_000_000,
551 1 : max_duration: time::Duration::from_secs(20 * 60),
552 1 : test_remote_failures: 0,
553 1 : };
554 1 :
555 1 : let rx = random_stream(50_000);
556 45 : let file_stats = run_test(tmpdir.path(), config, rx).await;
557 :
558 : // with strong compression, the files are smaller
559 1 : assert_eq!(
560 1 : file_stats,
561 1 : [
562 1 : (1144934, 6, 12000),
563 1 : (1144941, 6, 12000),
564 1 : (1144735, 6, 12000),
565 1 : (1144936, 6, 12000),
566 1 : (191035, 1, 2000)
567 1 : ],
568 1 : );
569 :
570 1 : tmpdir.close().unwrap();
571 : }
572 :
573 1 : #[tokio::test]
574 1 : async fn verify_parquet_unreliable_upload() {
575 1 : let tmpdir = camino_tempfile::tempdir().unwrap();
576 1 :
577 1 : let config = ParquetConfig {
578 1 : propeties: Arc::new(WriterProperties::new()),
579 1 : rows_per_group: 2_000,
580 1 : file_size: 1_000_000,
581 1 : max_duration: time::Duration::from_secs(20 * 60),
582 1 : test_remote_failures: 2,
583 1 : };
584 1 :
585 1 : let rx = random_stream(50_000);
586 61 : let file_stats = run_test(tmpdir.path(), config, rx).await;
587 :
588 1 : assert_eq!(
589 1 : file_stats,
590 1 : [
591 1 : (1029153, 3, 6000),
592 1 : (1029075, 3, 6000),
593 1 : (1029216, 3, 6000),
594 1 : (1029129, 3, 6000),
595 1 : (1029250, 3, 6000),
596 1 : (1029017, 3, 6000),
597 1 : (1029175, 3, 6000),
598 1 : (1029247, 3, 6000),
599 1 : (343124, 1, 2000)
600 1 : ],
601 1 : );
602 :
603 1 : tmpdir.close().unwrap();
604 : }
605 :
606 1 : #[tokio::test(start_paused = true)]
607 1 : async fn verify_parquet_regular_upload() {
608 1 : let tmpdir = camino_tempfile::tempdir().unwrap();
609 1 :
610 1 : let config = ParquetConfig {
611 1 : propeties: Arc::new(WriterProperties::new()),
612 1 : rows_per_group: 2_000,
613 1 : file_size: 1_000_000,
614 1 : max_duration: time::Duration::from_secs(60),
615 1 : test_remote_failures: 2,
616 1 : };
617 1 :
618 1 : let (tx, mut rx) = mpsc::unbounded_channel();
619 1 :
620 1 : tokio::spawn(async move {
621 4 : for _ in 0..3 {
622 3 : let mut s = random_stream(3000);
623 9003 : while let Some(r) = s.next().await {
624 9000 : tx.send(r).unwrap();
625 9000 : }
626 3 : time::sleep(time::Duration::from_secs(70)).await
627 : }
628 1 : });
629 1 :
630 9071 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
631 88 : let file_stats = run_test(tmpdir.path(), config, rx).await;
632 :
633 : // files are smaller than the size threshold, but they took too long to fill so were flushed early
634 1 : assert_eq!(
635 1 : file_stats,
636 1 : [(515807, 2, 3001), (515585, 2, 3000), (515425, 2, 2999)],
637 1 : );
638 :
639 1 : tmpdir.close().unwrap();
640 : }
641 : }
|