Line data Source code
1 : use std::{sync::Arc, time::SystemTime};
2 :
3 : use anyhow::Context;
4 : use bytes::{buf::Writer, BufMut, BytesMut};
5 : use chrono::{Datelike, Timelike};
6 : use futures::{Stream, StreamExt};
7 : use parquet::{
8 : basic::Compression,
9 : file::{
10 : metadata::RowGroupMetaDataPtr,
11 : properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE},
12 : writer::SerializedFileWriter,
13 : },
14 : record::RecordWriter,
15 : };
16 : use pq_proto::StartupMessageParams;
17 : use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
18 : use serde::ser::SerializeMap;
19 : use tokio::{sync::mpsc, time};
20 : use tokio_util::sync::CancellationToken;
21 : use tracing::{debug, info, Span};
22 : use utils::backoff;
23 :
24 : use crate::{
25 : config::{remote_storage_from_toml, OptRemoteStorageConfig},
26 : context::LOG_CHAN_DISCONNECT,
27 : };
28 :
29 : use super::{RequestMonitoring, LOG_CHAN};
30 :
31 12 : #[derive(clap::Args, Clone, Debug)]
32 : pub struct ParquetUploadArgs {
33 : /// Storage location to upload the parquet files to.
34 : /// Encoded as toml (same format as pageservers), eg
35 : /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
36 : #[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
37 0 : parquet_upload_remote_storage: OptRemoteStorageConfig,
38 :
39 : #[clap(long, default_value = "{}", value_parser = remote_storage_from_toml)]
40 0 : parquet_upload_disconnect_events_remote_storage: OptRemoteStorageConfig,
41 :
42 : /// How many rows to include in a row group
43 6 : #[clap(long, default_value_t = 8192)]
44 0 : parquet_upload_row_group_size: usize,
45 :
46 : /// How large each column page should be in bytes
47 6 : #[clap(long, default_value_t = DEFAULT_PAGE_SIZE)]
48 0 : parquet_upload_page_size: usize,
49 :
50 : /// How large the total parquet file should be in bytes
51 6 : #[clap(long, default_value_t = 100_000_000)]
52 0 : parquet_upload_size: i64,
53 :
54 : /// How long to wait before forcing a file upload
55 : #[clap(long, default_value = "20m", value_parser = humantime::parse_duration)]
56 0 : parquet_upload_maximum_duration: tokio::time::Duration,
57 :
58 : /// What level of compression to use
59 6 : #[clap(long, default_value_t = Compression::UNCOMPRESSED)]
60 0 : parquet_upload_compression: Compression,
61 : }
62 :
63 : // Occasional network issues and such can cause remote operations to fail, and
64 : // that's expected. If a upload fails, we log it at info-level, and retry.
65 : // But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
66 : // level instead, as repeated failures can mean a more serious problem. If it
67 : // fails more than FAILED_UPLOAD_RETRIES times, we give up
68 : pub const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
69 : pub const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
70 :
71 : // the parquet crate leaves a lot to be desired...
72 : // what follows is an attempt to write parquet files with minimal allocs.
73 : // complication: parquet is a columnar format, while we want to write in as rows.
74 : // design:
75 : // * we batch up to 1024 rows, then flush them into a 'row group'
76 : // * after each rowgroup write, we check the length of the file and upload to s3 if large enough
77 :
78 8360222 : #[derive(parquet_derive::ParquetRecordWriter)]
79 : pub struct RequestData {
80 : region: &'static str,
81 : protocol: &'static str,
82 : /// Must be UTC. The derive macro doesn't like the timezones
83 : timestamp: chrono::NaiveDateTime,
84 : session_id: uuid::Uuid,
85 : peer_addr: String,
86 : username: Option<String>,
87 : application_name: Option<String>,
88 : endpoint_id: Option<String>,
89 : database: Option<String>,
90 : project: Option<String>,
91 : branch: Option<String>,
92 : pg_options: Option<String>,
93 : auth_method: Option<&'static str>,
94 : error: Option<&'static str>,
95 : /// Success is counted if we form a HTTP response with sql rows inside
96 : /// Or if we make it to proxy_pass
97 : success: bool,
98 : /// Indicates if the cplane started the new compute node for this request.
99 : cold_start_info: &'static str,
100 : /// Tracks time from session start (HTTP request/libpq TCP handshake)
101 : /// Through to success/failure
102 : duration_us: u64,
103 : /// If the session was successful after the disconnect, will be created one more event with filled `disconnect_timestamp`.
104 : disconnect_timestamp: Option<chrono::NaiveDateTime>,
105 : }
106 :
107 : struct Options<'a> {
108 : options: &'a StartupMessageParams,
109 : }
110 :
111 : impl<'a> serde::Serialize for Options<'a> {
112 0 : fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
113 0 : where
114 0 : S: serde::Serializer,
115 0 : {
116 0 : let mut state = s.serialize_map(None)?;
117 0 : for (k, v) in self.options.iter() {
118 0 : state.serialize_entry(k, v)?;
119 : }
120 0 : state.end()
121 0 : }
122 : }
123 :
124 : impl From<&RequestMonitoring> for RequestData {
125 0 : fn from(value: &RequestMonitoring) -> Self {
126 0 : Self {
127 0 : session_id: value.session_id,
128 0 : peer_addr: value.peer_addr.to_string(),
129 0 : timestamp: value.first_packet.naive_utc(),
130 0 : username: value.user.as_deref().map(String::from),
131 0 : application_name: value.application.as_deref().map(String::from),
132 0 : endpoint_id: value.endpoint_id.as_deref().map(String::from),
133 0 : database: value.dbname.as_deref().map(String::from),
134 0 : project: value.project.as_deref().map(String::from),
135 0 : branch: value.branch.as_deref().map(String::from),
136 0 : pg_options: value
137 0 : .pg_options
138 0 : .as_ref()
139 0 : .and_then(|options| serde_json::to_string(&Options { options }).ok()),
140 0 : auth_method: value.auth_method.as_ref().map(|x| match x {
141 0 : super::AuthMethod::Web => "web",
142 0 : super::AuthMethod::ScramSha256 => "scram_sha_256",
143 0 : super::AuthMethod::ScramSha256Plus => "scram_sha_256_plus",
144 0 : super::AuthMethod::Cleartext => "cleartext",
145 0 : }),
146 0 : protocol: value.protocol.as_str(),
147 0 : region: value.region,
148 0 : error: value.error_kind.as_ref().map(|e| e.to_metric_label()),
149 0 : success: value.success,
150 0 : cold_start_info: value.cold_start_info.as_str(),
151 0 : duration_us: SystemTime::from(value.first_packet)
152 0 : .elapsed()
153 0 : .unwrap_or_default()
154 0 : .as_micros() as u64, // 584 millenia... good enough
155 0 : disconnect_timestamp: value.disconnect_timestamp.map(|x| x.naive_utc()),
156 0 : }
157 0 : }
158 : }
159 :
160 : /// Parquet request context worker
161 : ///
162 : /// It listened on a channel for all completed requests, extracts the data and writes it into a parquet file,
163 : /// then uploads a completed batch to S3
164 0 : pub async fn worker(
165 0 : cancellation_token: CancellationToken,
166 0 : config: ParquetUploadArgs,
167 0 : ) -> anyhow::Result<()> {
168 0 : let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
169 0 : tracing::warn!("parquet request upload: no s3 bucket configured");
170 0 : return Ok(());
171 : };
172 :
173 0 : let (tx, mut rx) = mpsc::unbounded_channel();
174 0 : LOG_CHAN.set(tx.downgrade()).unwrap();
175 0 :
176 0 : // setup row stream that will close on cancellation
177 0 : let cancellation_token2 = cancellation_token.clone();
178 0 : tokio::spawn(async move {
179 0 : cancellation_token2.cancelled().await;
180 : // dropping this sender will cause the channel to close only once
181 : // all the remaining inflight requests have been completed.
182 0 : drop(tx);
183 0 : });
184 0 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
185 0 : let rx = rx.map(RequestData::from);
186 :
187 0 : let storage =
188 0 : GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?;
189 :
190 0 : let properties = WriterProperties::builder()
191 0 : .set_data_page_size_limit(config.parquet_upload_page_size)
192 0 : .set_compression(config.parquet_upload_compression);
193 0 :
194 0 : let parquet_config = ParquetConfig {
195 0 : propeties: Arc::new(properties.build()),
196 0 : rows_per_group: config.parquet_upload_row_group_size,
197 0 : file_size: config.parquet_upload_size,
198 0 : max_duration: config.parquet_upload_maximum_duration,
199 0 :
200 0 : #[cfg(any(test, feature = "testing"))]
201 0 : test_remote_failures: 0,
202 0 : };
203 :
204 : // TODO(anna): consider moving this to a separate function.
205 0 : if let Some(disconnect_events_storage_config) =
206 0 : config.parquet_upload_disconnect_events_remote_storage
207 : {
208 0 : let (tx_disconnect, mut rx_disconnect) = mpsc::unbounded_channel();
209 0 : LOG_CHAN_DISCONNECT.set(tx_disconnect.downgrade()).unwrap();
210 0 :
211 0 : // setup row stream that will close on cancellation
212 0 : tokio::spawn(async move {
213 0 : cancellation_token.cancelled().await;
214 : // dropping this sender will cause the channel to close only once
215 : // all the remaining inflight requests have been completed.
216 0 : drop(tx_disconnect);
217 0 : });
218 0 : let rx_disconnect = futures::stream::poll_fn(move |cx| rx_disconnect.poll_recv(cx));
219 0 : let rx_disconnect = rx_disconnect.map(RequestData::from);
220 :
221 0 : let storage_disconnect =
222 0 : GenericRemoteStorage::from_config(&disconnect_events_storage_config)
223 0 : .context("remote storage for disconnect events init")?;
224 0 : let parquet_config_disconnect = parquet_config.clone();
225 : tokio::try_join!(
226 : worker_inner(storage, rx, parquet_config),
227 : worker_inner(storage_disconnect, rx_disconnect, parquet_config_disconnect)
228 : )
229 0 : .map(|_| ())
230 : } else {
231 0 : worker_inner(storage, rx, parquet_config).await
232 : }
233 0 : }
234 :
235 : #[derive(Clone, Debug)]
236 : struct ParquetConfig {
237 : propeties: WriterPropertiesPtr,
238 : rows_per_group: usize,
239 : file_size: i64,
240 :
241 : max_duration: tokio::time::Duration,
242 :
243 : #[cfg(any(test, feature = "testing"))]
244 : test_remote_failures: u64,
245 : }
246 :
247 10 : async fn worker_inner(
248 10 : storage: GenericRemoteStorage,
249 10 : rx: impl Stream<Item = RequestData>,
250 10 : config: ParquetConfig,
251 10 : ) -> anyhow::Result<()> {
252 : #[cfg(any(test, feature = "testing"))]
253 10 : let storage = if config.test_remote_failures > 0 {
254 4 : GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures)
255 : } else {
256 6 : storage
257 : };
258 :
259 10 : let mut rx = std::pin::pin!(rx);
260 10 :
261 10 : let mut rows = Vec::with_capacity(config.rows_per_group);
262 :
263 10 : let schema = rows.as_slice().schema()?;
264 10 : let buffer = BytesMut::new();
265 10 : let w = buffer.writer();
266 10 : let mut w = SerializedFileWriter::new(w, schema.clone(), config.propeties.clone())?;
267 :
268 10 : let mut last_upload = time::Instant::now();
269 10 :
270 10 : let mut len = 0;
271 418010 : while let Some(row) = rx.next().await {
272 418000 : rows.push(row);
273 418000 : let force = last_upload.elapsed() > config.max_duration;
274 418000 : if rows.len() == config.rows_per_group || force {
275 210 : let rg_meta;
276 210 : (rows, w, rg_meta) = flush_rows(rows, w).await?;
277 210 : len += rg_meta.compressed_size();
278 417790 : }
279 418000 : if len > config.file_size || force {
280 56 : last_upload = time::Instant::now();
281 234 : let file = upload_parquet(w, len, &storage).await?;
282 56 : w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
283 56 : len = 0;
284 417944 : }
285 : }
286 :
287 10 : if !rows.is_empty() {
288 2 : let rg_meta;
289 2 : (_, w, rg_meta) = flush_rows(rows, w).await?;
290 2 : len += rg_meta.compressed_size();
291 8 : }
292 :
293 10 : if !w.flushed_row_groups().is_empty() {
294 24 : let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
295 4 : }
296 :
297 10 : Ok(())
298 10 : }
299 :
300 212 : async fn flush_rows<W>(
301 212 : rows: Vec<RequestData>,
302 212 : mut w: SerializedFileWriter<W>,
303 212 : ) -> anyhow::Result<(
304 212 : Vec<RequestData>,
305 212 : SerializedFileWriter<W>,
306 212 : RowGroupMetaDataPtr,
307 212 : )>
308 212 : where
309 212 : W: std::io::Write + Send + 'static,
310 212 : {
311 212 : let span = Span::current();
312 212 : let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || {
313 212 : let _enter = span.enter();
314 :
315 212 : let mut rg = w.next_row_group()?;
316 212 : rows.as_slice().write_to_row_group(&mut rg)?;
317 212 : let rg_meta = rg.close()?;
318 :
319 212 : let size = rg_meta.compressed_size();
320 212 : let compression = rg_meta.compressed_size() as f64 / rg_meta.total_byte_size() as f64;
321 212 :
322 212 : debug!(size, compression, "flushed row group to parquet file");
323 :
324 212 : Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
325 212 : })
326 212 : .await
327 212 : .unwrap()?;
328 :
329 212 : rows.clear();
330 212 : Ok((rows, w, rg_meta))
331 212 : }
332 :
333 62 : async fn upload_parquet(
334 62 : mut w: SerializedFileWriter<Writer<BytesMut>>,
335 62 : len: i64,
336 62 : storage: &GenericRemoteStorage,
337 62 : ) -> anyhow::Result<Writer<BytesMut>> {
338 62 : let len_uncompressed = w
339 62 : .flushed_row_groups()
340 62 : .iter()
341 212 : .map(|rg| rg.total_byte_size())
342 62 : .sum::<i64>();
343 :
344 : // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
345 : // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
346 62 : let (mut buffer, metadata) =
347 62 : tokio::task::spawn_blocking(move || -> parquet::errors::Result<_> {
348 62 : let metadata = w.finish()?;
349 62 : let buffer = std::mem::take(w.inner_mut().get_mut());
350 62 : Ok((buffer, metadata))
351 62 : })
352 62 : .await
353 62 : .unwrap()?;
354 :
355 62 : let data = buffer.split().freeze();
356 62 :
357 62 : let compression = len as f64 / len_uncompressed as f64;
358 62 : let size = data.len();
359 62 : let now = chrono::Utc::now();
360 62 : let id = uuid::Uuid::new_v7(uuid::Timestamp::from_unix(
361 62 : uuid::NoContext,
362 62 : // we won't be running this in 1970. this cast is ok
363 62 : now.timestamp() as u64,
364 62 : now.timestamp_subsec_nanos(),
365 62 : ));
366 62 :
367 62 : info!(
368 : %id,
369 : rows = metadata.num_rows,
370 0 : size, compression, "uploading request parquet file"
371 : );
372 :
373 62 : let year = now.year();
374 62 : let month = now.month();
375 62 : let day = now.day();
376 62 : let hour = now.hour();
377 : // segment files by time for S3 performance
378 62 : let path = RemotePath::from_string(&format!(
379 62 : "{year:04}/{month:02}/{day:02}/{hour:02}/requests_{id}.parquet"
380 62 : ))?;
381 62 : let cancel = CancellationToken::new();
382 62 : let maybe_err = backoff::retry(
383 86 : || async {
384 86 : let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
385 86 : storage
386 86 : .upload(stream, data.len(), &path, None, &cancel)
387 196 : .await
388 86 : },
389 62 : TimeoutOrCancel::caused_by_cancel,
390 62 : FAILED_UPLOAD_WARN_THRESHOLD,
391 62 : FAILED_UPLOAD_MAX_RETRIES,
392 62 : "request_data_upload",
393 62 : // we don't want cancellation to interrupt here, so we make a dummy cancel token
394 62 : &cancel,
395 62 : )
396 196 : .await
397 62 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
398 62 : .and_then(|x| x)
399 62 : .context("request_data_upload")
400 62 : .err();
401 :
402 62 : if let Some(err) = maybe_err {
403 0 : tracing::warn!(%id, %err, "failed to upload request data");
404 62 : }
405 :
406 62 : Ok(buffer.writer())
407 62 : }
408 :
409 : #[cfg(test)]
410 : mod tests {
411 : use std::{net::Ipv4Addr, num::NonZeroUsize, sync::Arc};
412 :
413 : use camino::Utf8Path;
414 : use clap::Parser;
415 : use futures::{Stream, StreamExt};
416 : use itertools::Itertools;
417 : use parquet::{
418 : basic::{Compression, ZstdLevel},
419 : file::{
420 : properties::{WriterProperties, DEFAULT_PAGE_SIZE},
421 : reader::FileReader,
422 : serialized_reader::SerializedFileReader,
423 : },
424 : };
425 : use rand::{rngs::StdRng, Rng, SeedableRng};
426 : use remote_storage::{
427 : GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config,
428 : DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
429 : };
430 : use tokio::{sync::mpsc, time};
431 : use walkdir::WalkDir;
432 :
433 : use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};
434 :
435 4 : #[derive(Parser)]
436 : struct ProxyCliArgs {
437 : #[clap(flatten)]
438 : parquet_upload: ParquetUploadArgs,
439 : }
440 :
441 : #[test]
442 2 : fn default_parser() {
443 2 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from(["proxy"]);
444 2 : assert_eq!(parquet_upload.parquet_upload_remote_storage, None);
445 2 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 8192);
446 2 : assert_eq!(parquet_upload.parquet_upload_page_size, DEFAULT_PAGE_SIZE);
447 2 : assert_eq!(parquet_upload.parquet_upload_size, 100_000_000);
448 2 : assert_eq!(
449 2 : parquet_upload.parquet_upload_maximum_duration,
450 2 : time::Duration::from_secs(20 * 60)
451 2 : );
452 2 : assert_eq!(
453 2 : parquet_upload.parquet_upload_compression,
454 2 : Compression::UNCOMPRESSED
455 2 : );
456 2 : }
457 :
458 : #[test]
459 2 : fn full_parser() {
460 2 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from([
461 2 : "proxy",
462 2 : "--parquet-upload-remote-storage",
463 2 : "{bucket_name='default',prefix_in_bucket='proxy/',bucket_region='us-east-1',endpoint='http://minio:9000'}",
464 2 : "--parquet-upload-row-group-size",
465 2 : "100",
466 2 : "--parquet-upload-page-size",
467 2 : "10000",
468 2 : "--parquet-upload-size",
469 2 : "10000000",
470 2 : "--parquet-upload-maximum-duration",
471 2 : "10m",
472 2 : "--parquet-upload-compression",
473 2 : "zstd(5)",
474 2 : ]);
475 2 : assert_eq!(
476 2 : parquet_upload.parquet_upload_remote_storage,
477 2 : Some(RemoteStorageConfig {
478 2 : storage: RemoteStorageKind::AwsS3(S3Config {
479 2 : bucket_name: "default".into(),
480 2 : bucket_region: "us-east-1".into(),
481 2 : prefix_in_bucket: Some("proxy/".into()),
482 2 : endpoint: Some("http://minio:9000".into()),
483 2 : concurrency_limit: NonZeroUsize::new(
484 2 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
485 2 : )
486 2 : .unwrap(),
487 2 : max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
488 2 : upload_storage_class: None,
489 2 : }),
490 2 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
491 2 : })
492 2 : );
493 2 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
494 2 : assert_eq!(parquet_upload.parquet_upload_page_size, 10000);
495 2 : assert_eq!(parquet_upload.parquet_upload_size, 10_000_000);
496 2 : assert_eq!(
497 2 : parquet_upload.parquet_upload_maximum_duration,
498 2 : time::Duration::from_secs(10 * 60)
499 2 : );
500 2 : assert_eq!(
501 2 : parquet_upload.parquet_upload_compression,
502 2 : Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
503 2 : );
504 2 : }
505 :
506 418000 : fn generate_request_data(rng: &mut impl Rng) -> RequestData {
507 418000 : RequestData {
508 418000 : session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(),
509 418000 : peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(),
510 418000 : timestamp: chrono::DateTime::from_timestamp_millis(
511 418000 : rng.gen_range(1703862754..1803862754),
512 418000 : )
513 418000 : .unwrap()
514 418000 : .naive_utc(),
515 418000 : application_name: Some("test".to_owned()),
516 418000 : username: Some(hex::encode(rng.gen::<[u8; 4]>())),
517 418000 : endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
518 418000 : database: Some(hex::encode(rng.gen::<[u8; 16]>())),
519 418000 : project: Some(hex::encode(rng.gen::<[u8; 16]>())),
520 418000 : branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
521 418000 : pg_options: None,
522 418000 : auth_method: None,
523 418000 : protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
524 418000 : region: "us-east-1",
525 418000 : error: None,
526 418000 : success: rng.gen(),
527 418000 : cold_start_info: "no",
528 418000 : duration_us: rng.gen_range(0..30_000_000),
529 418000 : disconnect_timestamp: None,
530 418000 : }
531 418000 : }
532 :
533 14 : fn random_stream(len: usize) -> impl Stream<Item = RequestData> + Unpin {
534 14 : let mut rng = StdRng::from_seed([0x39; 32]);
535 14 : futures::stream::iter(
536 418000 : std::iter::repeat_with(move || generate_request_data(&mut rng)).take(len),
537 14 : )
538 14 : }
539 :
540 10 : async fn run_test(
541 10 : tmpdir: &Utf8Path,
542 10 : config: ParquetConfig,
543 10 : rx: impl Stream<Item = RequestData>,
544 10 : ) -> Vec<(u64, usize, i64)> {
545 10 : let remote_storage_config = RemoteStorageConfig {
546 10 : storage: RemoteStorageKind::LocalFs(tmpdir.to_path_buf()),
547 10 : timeout: std::time::Duration::from_secs(120),
548 10 : };
549 10 : let storage = GenericRemoteStorage::from_config(&remote_storage_config).unwrap();
550 10 :
551 610 : worker_inner(storage, rx, config).await.unwrap();
552 10 :
553 10 : let mut files = WalkDir::new(tmpdir.as_std_path())
554 10 : .into_iter()
555 112 : .filter_map(|entry| entry.ok())
556 112 : .filter(|entry| entry.file_type().is_file())
557 62 : .map(|entry| entry.path().to_path_buf())
558 10 : .collect_vec();
559 10 : files.sort();
560 10 :
561 10 : files
562 10 : .into_iter()
563 62 : .map(|path| std::fs::File::open(tmpdir.as_std_path().join(path)).unwrap())
564 62 : .map(|file| {
565 62 : (
566 62 : file.metadata().unwrap(),
567 62 : SerializedFileReader::new(file).unwrap().metadata().clone(),
568 62 : )
569 62 : })
570 62 : .map(|(file_meta, parquet_meta)| {
571 62 : (
572 62 : file_meta.len(),
573 62 : parquet_meta.num_row_groups(),
574 62 : parquet_meta.file_metadata().num_rows(),
575 62 : )
576 62 : })
577 10 : .collect()
578 10 : }
579 :
580 : #[tokio::test]
581 2 : async fn verify_parquet_no_compression() {
582 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
583 2 :
584 2 : let config = ParquetConfig {
585 2 : propeties: Arc::new(WriterProperties::new()),
586 2 : rows_per_group: 2_000,
587 2 : file_size: 1_000_000,
588 2 : max_duration: time::Duration::from_secs(20 * 60),
589 2 : test_remote_failures: 0,
590 2 : };
591 2 :
592 2 : let rx = random_stream(50_000);
593 124 : let file_stats = run_test(tmpdir.path(), config, rx).await;
594 2 :
595 2 : assert_eq!(
596 2 : file_stats,
597 2 : [
598 2 : (1315874, 3, 6000),
599 2 : (1315867, 3, 6000),
600 2 : (1315927, 3, 6000),
601 2 : (1315884, 3, 6000),
602 2 : (1316014, 3, 6000),
603 2 : (1315856, 3, 6000),
604 2 : (1315648, 3, 6000),
605 2 : (1315884, 3, 6000),
606 2 : (438913, 1, 2000)
607 2 : ]
608 2 : );
609 2 :
610 2 : tmpdir.close().unwrap();
611 2 : }
612 :
613 : #[tokio::test]
614 2 : async fn verify_parquet_min_compression() {
615 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
616 2 :
617 2 : let config = ParquetConfig {
618 2 : propeties: Arc::new(
619 2 : WriterProperties::builder()
620 2 : .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::default()))
621 2 : .build(),
622 2 : ),
623 2 : rows_per_group: 2_000,
624 2 : file_size: 1_000_000,
625 2 : max_duration: time::Duration::from_secs(20 * 60),
626 2 : test_remote_failures: 0,
627 2 : };
628 2 :
629 2 : let rx = random_stream(50_000);
630 92 : let file_stats = run_test(tmpdir.path(), config, rx).await;
631 2 :
632 2 : // with compression, there are fewer files with more rows per file
633 2 : assert_eq!(
634 2 : file_stats,
635 2 : [
636 2 : (1223214, 5, 10000),
637 2 : (1229364, 5, 10000),
638 2 : (1231158, 5, 10000),
639 2 : (1230520, 5, 10000),
640 2 : (1221798, 5, 10000)
641 2 : ]
642 2 : );
643 2 :
644 2 : tmpdir.close().unwrap();
645 2 : }
646 :
647 : #[tokio::test]
648 2 : async fn verify_parquet_strong_compression() {
649 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
650 2 :
651 2 : let config = ParquetConfig {
652 2 : propeties: Arc::new(
653 2 : WriterProperties::builder()
654 2 : .set_compression(parquet::basic::Compression::ZSTD(
655 2 : ZstdLevel::try_new(10).unwrap(),
656 2 : ))
657 2 : .build(),
658 2 : ),
659 2 : rows_per_group: 2_000,
660 2 : file_size: 1_000_000,
661 2 : max_duration: time::Duration::from_secs(20 * 60),
662 2 : test_remote_failures: 0,
663 2 : };
664 2 :
665 2 : let rx = random_stream(50_000);
666 92 : let file_stats = run_test(tmpdir.path(), config, rx).await;
667 2 :
668 2 : // with strong compression, the files are smaller
669 2 : assert_eq!(
670 2 : file_stats,
671 2 : [
672 2 : (1208861, 5, 10000),
673 2 : (1208592, 5, 10000),
674 2 : (1208885, 5, 10000),
675 2 : (1208873, 5, 10000),
676 2 : (1209128, 5, 10000)
677 2 : ]
678 2 : );
679 2 :
680 2 : tmpdir.close().unwrap();
681 2 : }
682 :
683 : #[tokio::test]
684 2 : async fn verify_parquet_unreliable_upload() {
685 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
686 2 :
687 2 : let config = ParquetConfig {
688 2 : propeties: Arc::new(WriterProperties::new()),
689 2 : rows_per_group: 2_000,
690 2 : file_size: 1_000_000,
691 2 : max_duration: time::Duration::from_secs(20 * 60),
692 2 : test_remote_failures: 2,
693 2 : };
694 2 :
695 2 : let rx = random_stream(50_000);
696 124 : let file_stats = run_test(tmpdir.path(), config, rx).await;
697 2 :
698 2 : assert_eq!(
699 2 : file_stats,
700 2 : [
701 2 : (1315874, 3, 6000),
702 2 : (1315867, 3, 6000),
703 2 : (1315927, 3, 6000),
704 2 : (1315884, 3, 6000),
705 2 : (1316014, 3, 6000),
706 2 : (1315856, 3, 6000),
707 2 : (1315648, 3, 6000),
708 2 : (1315884, 3, 6000),
709 2 : (438913, 1, 2000)
710 2 : ]
711 2 : );
712 2 :
713 2 : tmpdir.close().unwrap();
714 2 : }
715 :
716 : #[tokio::test(start_paused = true)]
717 2 : async fn verify_parquet_regular_upload() {
718 2 : let tmpdir = camino_tempfile::tempdir().unwrap();
719 2 :
720 2 : let config = ParquetConfig {
721 2 : propeties: Arc::new(WriterProperties::new()),
722 2 : rows_per_group: 2_000,
723 2 : file_size: 1_000_000,
724 2 : max_duration: time::Duration::from_secs(60),
725 2 : test_remote_failures: 2,
726 2 : };
727 2 :
728 2 : let (tx, mut rx) = mpsc::unbounded_channel();
729 2 :
730 2 : tokio::spawn(async move {
731 8 : for _ in 0..3 {
732 6 : let mut s = random_stream(3000);
733 18006 : while let Some(r) = s.next().await {
734 18000 : tx.send(r).unwrap();
735 18000 : }
736 6 : time::sleep(time::Duration::from_secs(70)).await
737 2 : }
738 2 : });
739 2 :
740 18142 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
741 178 : let file_stats = run_test(tmpdir.path(), config, rx).await;
742 2 :
743 2 : // files are smaller than the size threshold, but they took too long to fill so were flushed early
744 2 : assert_eq!(
745 2 : file_stats,
746 2 : [(659836, 2, 3001), (659550, 2, 3000), (659346, 2, 2999)]
747 2 : );
748 2 :
749 2 : tmpdir.close().unwrap();
750 2 : }
751 : }
|