Line data Source code
1 : use std::{sync::Arc, time::SystemTime};
2 :
3 : use anyhow::Context;
4 : use bytes::{buf::Writer, BufMut, BytesMut};
5 : use chrono::{Datelike, Timelike};
6 : use futures::{Stream, StreamExt};
7 : use parquet::{
8 : basic::Compression,
9 : file::{
10 : metadata::RowGroupMetaDataPtr,
11 : properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE},
12 : writer::SerializedFileWriter,
13 : },
14 : record::RecordWriter,
15 : };
16 : use pq_proto::StartupMessageParams;
17 : use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel};
18 : use serde::ser::SerializeMap;
19 : use tokio::{sync::mpsc, time};
20 : use tokio_util::sync::CancellationToken;
21 : use tracing::{debug, info, Span};
22 : use utils::backoff;
23 :
24 : use crate::{config::remote_storage_from_toml, context::LOG_CHAN_DISCONNECT};
25 :
26 : use super::{RequestMonitoringInner, LOG_CHAN};
27 :
28 36 : #[derive(clap::Args, Clone, Debug)]
29 : pub struct ParquetUploadArgs {
30 : /// Storage location to upload the parquet files to.
31 : /// Encoded as toml (same format as pageservers), eg
32 : /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
33 : #[clap(long, value_parser = remote_storage_from_toml)]
34 : parquet_upload_remote_storage: Option<RemoteStorageConfig>,
35 :
36 : #[clap(long, value_parser = remote_storage_from_toml)]
37 : parquet_upload_disconnect_events_remote_storage: Option<RemoteStorageConfig>,
38 :
39 : /// How many rows to include in a row group
40 18 : #[clap(long, default_value_t = 8192)]
41 0 : parquet_upload_row_group_size: usize,
42 :
43 : /// How large each column page should be in bytes
44 18 : #[clap(long, default_value_t = DEFAULT_PAGE_SIZE)]
45 0 : parquet_upload_page_size: usize,
46 :
47 : /// How large the total parquet file should be in bytes
48 18 : #[clap(long, default_value_t = 100_000_000)]
49 0 : parquet_upload_size: i64,
50 :
51 : /// How long to wait before forcing a file upload
52 : #[clap(long, default_value = "20m", value_parser = humantime::parse_duration)]
53 0 : parquet_upload_maximum_duration: tokio::time::Duration,
54 :
55 : /// What level of compression to use
56 18 : #[clap(long, default_value_t = Compression::UNCOMPRESSED)]
57 0 : parquet_upload_compression: Compression,
58 : }
59 :
60 : // Occasional network issues and such can cause remote operations to fail, and
61 : // that's expected. If a upload fails, we log it at info-level, and retry.
62 : // But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
63 : // level instead, as repeated failures can mean a more serious problem. If it
64 : // fails more than FAILED_UPLOAD_RETRIES times, we give up
65 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
66 : pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
67 :
68 : // the parquet crate leaves a lot to be desired...
69 : // what follows is an attempt to write parquet files with minimal allocs.
70 : // complication: parquet is a columnar format, while we want to write in as rows.
71 : // design:
72 : // * we batch up to 1024 rows, then flush them into a 'row group'
73 : // * after each rowgroup write, we check the length of the file and upload to s3 if large enough
74 :
75 25080666 : #[derive(parquet_derive::ParquetRecordWriter)]
76 : pub(crate) struct RequestData {
77 : region: &'static str,
78 : protocol: &'static str,
79 : /// Must be UTC. The derive macro doesn't like the timezones
80 : timestamp: chrono::NaiveDateTime,
81 : session_id: uuid::Uuid,
82 : peer_addr: String,
83 : username: Option<String>,
84 : application_name: Option<String>,
85 : endpoint_id: Option<String>,
86 : database: Option<String>,
87 : project: Option<String>,
88 : branch: Option<String>,
89 : pg_options: Option<String>,
90 : auth_method: Option<&'static str>,
91 : error: Option<&'static str>,
92 : /// Success is counted if we form a HTTP response with sql rows inside
93 : /// Or if we make it to proxy_pass
94 : success: bool,
95 : /// Indicates if the cplane started the new compute node for this request.
96 : cold_start_info: &'static str,
97 : /// Tracks time from session start (HTTP request/libpq TCP handshake)
98 : /// Through to success/failure
99 : duration_us: u64,
100 : /// If the session was successful after the disconnect, will be created one more event with filled `disconnect_timestamp`.
101 : disconnect_timestamp: Option<chrono::NaiveDateTime>,
102 : }
103 :
104 : struct Options<'a> {
105 : options: &'a StartupMessageParams,
106 : }
107 :
108 : impl<'a> serde::Serialize for Options<'a> {
109 0 : fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
110 0 : where
111 0 : S: serde::Serializer,
112 0 : {
113 0 : let mut state = s.serialize_map(None)?;
114 0 : for (k, v) in self.options.iter() {
115 0 : state.serialize_entry(k, v)?;
116 : }
117 0 : state.end()
118 0 : }
119 : }
120 :
121 : impl From<&RequestMonitoringInner> for RequestData {
122 0 : fn from(value: &RequestMonitoringInner) -> Self {
123 0 : Self {
124 0 : session_id: value.session_id,
125 0 : peer_addr: value.peer_addr.to_string(),
126 0 : timestamp: value.first_packet.naive_utc(),
127 0 : username: value.user.as_deref().map(String::from),
128 0 : application_name: value.application.as_deref().map(String::from),
129 0 : endpoint_id: value.endpoint_id.as_deref().map(String::from),
130 0 : database: value.dbname.as_deref().map(String::from),
131 0 : project: value.project.as_deref().map(String::from),
132 0 : branch: value.branch.as_deref().map(String::from),
133 0 : pg_options: value
134 0 : .pg_options
135 0 : .as_ref()
136 0 : .and_then(|options| serde_json::to_string(&Options { options }).ok()),
137 0 : auth_method: value.auth_method.as_ref().map(|x| match x {
138 0 : super::AuthMethod::Web => "web",
139 0 : super::AuthMethod::ScramSha256 => "scram_sha_256",
140 0 : super::AuthMethod::ScramSha256Plus => "scram_sha_256_plus",
141 0 : super::AuthMethod::Cleartext => "cleartext",
142 0 : }),
143 0 : protocol: value.protocol.as_str(),
144 0 : region: value.region,
145 0 : error: value.error_kind.as_ref().map(|e| e.to_metric_label()),
146 0 : success: value.success,
147 0 : cold_start_info: value.cold_start_info.as_str(),
148 0 : duration_us: SystemTime::from(value.first_packet)
149 0 : .elapsed()
150 0 : .unwrap_or_default()
151 0 : .as_micros() as u64, // 584 millenia... good enough
152 0 : disconnect_timestamp: value.disconnect_timestamp.map(|x| x.naive_utc()),
153 0 : }
154 0 : }
155 : }
156 :
157 : /// Parquet request context worker
158 : ///
159 : /// It listened on a channel for all completed requests, extracts the data and writes it into a parquet file,
160 : /// then uploads a completed batch to S3
161 0 : pub async fn worker(
162 0 : cancellation_token: CancellationToken,
163 0 : config: ParquetUploadArgs,
164 0 : ) -> anyhow::Result<()> {
165 0 : let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
166 0 : tracing::warn!("parquet request upload: no s3 bucket configured");
167 0 : return Ok(());
168 : };
169 :
170 0 : let (tx, mut rx) = mpsc::unbounded_channel();
171 0 : LOG_CHAN.set(tx.downgrade()).unwrap();
172 0 :
173 0 : // setup row stream that will close on cancellation
174 0 : let cancellation_token2 = cancellation_token.clone();
175 0 : tokio::spawn(async move {
176 0 : cancellation_token2.cancelled().await;
177 : // dropping this sender will cause the channel to close only once
178 : // all the remaining inflight requests have been completed.
179 0 : drop(tx);
180 0 : });
181 0 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
182 0 : let rx = rx.map(RequestData::from);
183 :
184 0 : let storage = GenericRemoteStorage::from_config(&remote_storage_config)
185 0 : .await
186 0 : .context("remote storage init")?;
187 :
188 0 : let properties = WriterProperties::builder()
189 0 : .set_data_page_size_limit(config.parquet_upload_page_size)
190 0 : .set_compression(config.parquet_upload_compression);
191 0 :
192 0 : let parquet_config = ParquetConfig {
193 0 : propeties: Arc::new(properties.build()),
194 0 : rows_per_group: config.parquet_upload_row_group_size,
195 0 : file_size: config.parquet_upload_size,
196 0 : max_duration: config.parquet_upload_maximum_duration,
197 0 :
198 0 : #[cfg(any(test, feature = "testing"))]
199 0 : test_remote_failures: 0,
200 0 : };
201 :
202 : // TODO(anna): consider moving this to a separate function.
203 0 : if let Some(disconnect_events_storage_config) =
204 0 : config.parquet_upload_disconnect_events_remote_storage
205 : {
206 0 : let (tx_disconnect, mut rx_disconnect) = mpsc::unbounded_channel();
207 0 : LOG_CHAN_DISCONNECT.set(tx_disconnect.downgrade()).unwrap();
208 0 :
209 0 : // setup row stream that will close on cancellation
210 0 : tokio::spawn(async move {
211 0 : cancellation_token.cancelled().await;
212 : // dropping this sender will cause the channel to close only once
213 : // all the remaining inflight requests have been completed.
214 0 : drop(tx_disconnect);
215 0 : });
216 0 : let rx_disconnect = futures::stream::poll_fn(move |cx| rx_disconnect.poll_recv(cx));
217 0 : let rx_disconnect = rx_disconnect.map(RequestData::from);
218 :
219 0 : let storage_disconnect =
220 0 : GenericRemoteStorage::from_config(&disconnect_events_storage_config)
221 0 : .await
222 0 : .context("remote storage for disconnect events init")?;
223 0 : let parquet_config_disconnect = parquet_config.clone();
224 : tokio::try_join!(
225 : worker_inner(storage, rx, parquet_config),
226 : worker_inner(storage_disconnect, rx_disconnect, parquet_config_disconnect)
227 : )
228 0 : .map(|_| ())
229 : } else {
230 0 : worker_inner(storage, rx, parquet_config).await
231 : }
232 0 : }
233 :
234 : #[derive(Clone, Debug)]
235 : struct ParquetConfig {
236 : propeties: WriterPropertiesPtr,
237 : rows_per_group: usize,
238 : file_size: i64,
239 :
240 : max_duration: tokio::time::Duration,
241 :
242 : #[cfg(any(test, feature = "testing"))]
243 : test_remote_failures: u64,
244 : }
245 :
246 30 : async fn worker_inner(
247 30 : storage: GenericRemoteStorage,
248 30 : rx: impl Stream<Item = RequestData>,
249 30 : config: ParquetConfig,
250 30 : ) -> anyhow::Result<()> {
251 : #[cfg(any(test, feature = "testing"))]
252 30 : let storage = if config.test_remote_failures > 0 {
253 12 : GenericRemoteStorage::unreliable_wrapper(storage, config.test_remote_failures)
254 : } else {
255 18 : storage
256 : };
257 :
258 30 : let mut rx = std::pin::pin!(rx);
259 30 :
260 30 : let mut rows = Vec::with_capacity(config.rows_per_group);
261 :
262 30 : let schema = rows.as_slice().schema()?;
263 30 : let buffer = BytesMut::new();
264 30 : let w = buffer.writer();
265 30 : let mut w = SerializedFileWriter::new(w, schema.clone(), config.propeties.clone())?;
266 :
267 30 : let mut last_upload = time::Instant::now();
268 30 :
269 30 : let mut len = 0;
270 1254030 : while let Some(row) = rx.next().await {
271 1254000 : rows.push(row);
272 1254000 : let force = last_upload.elapsed() > config.max_duration;
273 1254000 : if rows.len() == config.rows_per_group || force {
274 630 : let rg_meta;
275 630 : (rows, w, rg_meta) = flush_rows(rows, w).await?;
276 630 : len += rg_meta.compressed_size();
277 1253370 : }
278 1254000 : if len > config.file_size || force {
279 168 : last_upload = time::Instant::now();
280 697 : let file = upload_parquet(w, len, &storage).await?;
281 168 : w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
282 168 : len = 0;
283 1253832 : }
284 : }
285 :
286 30 : if !rows.is_empty() {
287 6 : let rg_meta;
288 6 : (_, w, rg_meta) = flush_rows(rows, w).await?;
289 6 : len += rg_meta.compressed_size();
290 24 : }
291 :
292 30 : if !w.flushed_row_groups().is_empty() {
293 72 : let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
294 12 : }
295 :
296 30 : Ok(())
297 30 : }
298 :
299 636 : async fn flush_rows<W>(
300 636 : rows: Vec<RequestData>,
301 636 : mut w: SerializedFileWriter<W>,
302 636 : ) -> anyhow::Result<(
303 636 : Vec<RequestData>,
304 636 : SerializedFileWriter<W>,
305 636 : RowGroupMetaDataPtr,
306 636 : )>
307 636 : where
308 636 : W: std::io::Write + Send + 'static,
309 636 : {
310 636 : let span = Span::current();
311 636 : let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || {
312 636 : let _enter = span.enter();
313 :
314 636 : let mut rg = w.next_row_group()?;
315 636 : rows.as_slice().write_to_row_group(&mut rg)?;
316 636 : let rg_meta = rg.close()?;
317 :
318 636 : let size = rg_meta.compressed_size();
319 636 : let compression = rg_meta.compressed_size() as f64 / rg_meta.total_byte_size() as f64;
320 636 :
321 636 : debug!(size, compression, "flushed row group to parquet file");
322 :
323 636 : Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
324 636 : })
325 636 : .await
326 636 : .unwrap()?;
327 :
328 636 : rows.clear();
329 636 : Ok((rows, w, rg_meta))
330 636 : }
331 :
332 186 : async fn upload_parquet(
333 186 : mut w: SerializedFileWriter<Writer<BytesMut>>,
334 186 : len: i64,
335 186 : storage: &GenericRemoteStorage,
336 186 : ) -> anyhow::Result<Writer<BytesMut>> {
337 186 : let len_uncompressed = w
338 186 : .flushed_row_groups()
339 186 : .iter()
340 636 : .map(|rg| rg.total_byte_size())
341 186 : .sum::<i64>();
342 :
343 : // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
344 : // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
345 186 : let (mut buffer, metadata) =
346 186 : tokio::task::spawn_blocking(move || -> parquet::errors::Result<_> {
347 186 : let metadata = w.finish()?;
348 186 : let buffer = std::mem::take(w.inner_mut().get_mut());
349 186 : Ok((buffer, metadata))
350 186 : })
351 185 : .await
352 186 : .unwrap()?;
353 :
354 186 : let data = buffer.split().freeze();
355 186 :
356 186 : let compression = len as f64 / len_uncompressed as f64;
357 186 : let size = data.len();
358 186 : let now = chrono::Utc::now();
359 186 : let id = uuid::Uuid::new_v7(uuid::Timestamp::from_unix(
360 186 : uuid::NoContext,
361 186 : // we won't be running this in 1970. this cast is ok
362 186 : now.timestamp() as u64,
363 186 : now.timestamp_subsec_nanos(),
364 186 : ));
365 186 :
366 186 : info!(
367 : %id,
368 : rows = metadata.num_rows,
369 0 : size, compression, "uploading request parquet file"
370 : );
371 :
372 186 : let year = now.year();
373 186 : let month = now.month();
374 186 : let day = now.day();
375 186 : let hour = now.hour();
376 : // segment files by time for S3 performance
377 186 : let path = RemotePath::from_string(&format!(
378 186 : "{year:04}/{month:02}/{day:02}/{hour:02}/requests_{id}.parquet"
379 186 : ))?;
380 186 : let cancel = CancellationToken::new();
381 186 : let maybe_err = backoff::retry(
382 258 : || async {
383 258 : let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
384 258 : storage
385 258 : .upload(stream, data.len(), &path, None, &cancel)
386 584 : .await
387 258 : },
388 186 : TimeoutOrCancel::caused_by_cancel,
389 186 : FAILED_UPLOAD_WARN_THRESHOLD,
390 186 : FAILED_UPLOAD_MAX_RETRIES,
391 186 : "request_data_upload",
392 186 : // we don't want cancellation to interrupt here, so we make a dummy cancel token
393 186 : &cancel,
394 186 : )
395 584 : .await
396 186 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
397 186 : .and_then(|x| x)
398 186 : .context("request_data_upload")
399 186 : .err();
400 :
401 186 : if let Some(err) = maybe_err {
402 0 : tracing::warn!(%id, %err, "failed to upload request data");
403 186 : }
404 :
405 186 : Ok(buffer.writer())
406 186 : }
407 :
408 : #[cfg(test)]
409 : mod tests {
410 : use std::{net::Ipv4Addr, num::NonZeroUsize, sync::Arc};
411 :
412 : use camino::Utf8Path;
413 : use clap::Parser;
414 : use futures::{Stream, StreamExt};
415 : use itertools::Itertools;
416 : use parquet::{
417 : basic::{Compression, ZstdLevel},
418 : file::{
419 : properties::{WriterProperties, DEFAULT_PAGE_SIZE},
420 : reader::FileReader,
421 : serialized_reader::SerializedFileReader,
422 : },
423 : };
424 : use rand::{rngs::StdRng, Rng, SeedableRng};
425 : use remote_storage::{
426 : GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config,
427 : DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
428 : };
429 : use tokio::{sync::mpsc, time};
430 : use walkdir::WalkDir;
431 :
432 : use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};
433 :
434 12 : #[derive(Parser)]
435 : struct ProxyCliArgs {
436 : #[clap(flatten)]
437 : parquet_upload: ParquetUploadArgs,
438 : }
439 :
440 : #[test]
441 6 : fn default_parser() {
442 6 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from(["proxy"]);
443 6 : assert_eq!(parquet_upload.parquet_upload_remote_storage, None);
444 6 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 8192);
445 6 : assert_eq!(parquet_upload.parquet_upload_page_size, DEFAULT_PAGE_SIZE);
446 6 : assert_eq!(parquet_upload.parquet_upload_size, 100_000_000);
447 6 : assert_eq!(
448 6 : parquet_upload.parquet_upload_maximum_duration,
449 6 : time::Duration::from_secs(20 * 60)
450 6 : );
451 6 : assert_eq!(
452 6 : parquet_upload.parquet_upload_compression,
453 6 : Compression::UNCOMPRESSED
454 6 : );
455 6 : }
456 :
457 : #[test]
458 6 : fn full_parser() {
459 6 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from([
460 6 : "proxy",
461 6 : "--parquet-upload-remote-storage",
462 6 : "{bucket_name='default',prefix_in_bucket='proxy/',bucket_region='us-east-1',endpoint='http://minio:9000'}",
463 6 : "--parquet-upload-row-group-size",
464 6 : "100",
465 6 : "--parquet-upload-page-size",
466 6 : "10000",
467 6 : "--parquet-upload-size",
468 6 : "10000000",
469 6 : "--parquet-upload-maximum-duration",
470 6 : "10m",
471 6 : "--parquet-upload-compression",
472 6 : "zstd(5)",
473 6 : ]);
474 6 : assert_eq!(
475 6 : parquet_upload.parquet_upload_remote_storage,
476 6 : Some(RemoteStorageConfig {
477 6 : storage: RemoteStorageKind::AwsS3(S3Config {
478 6 : bucket_name: "default".into(),
479 6 : bucket_region: "us-east-1".into(),
480 6 : prefix_in_bucket: Some("proxy/".into()),
481 6 : endpoint: Some("http://minio:9000".into()),
482 6 : concurrency_limit: NonZeroUsize::new(
483 6 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
484 6 : )
485 6 : .unwrap(),
486 6 : max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
487 6 : upload_storage_class: None,
488 6 : }),
489 6 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
490 6 : })
491 6 : );
492 6 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
493 6 : assert_eq!(parquet_upload.parquet_upload_page_size, 10000);
494 6 : assert_eq!(parquet_upload.parquet_upload_size, 10_000_000);
495 6 : assert_eq!(
496 6 : parquet_upload.parquet_upload_maximum_duration,
497 6 : time::Duration::from_secs(10 * 60)
498 6 : );
499 6 : assert_eq!(
500 6 : parquet_upload.parquet_upload_compression,
501 6 : Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
502 6 : );
503 6 : }
504 :
505 1254000 : fn generate_request_data(rng: &mut impl Rng) -> RequestData {
506 1254000 : RequestData {
507 1254000 : session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(),
508 1254000 : peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(),
509 1254000 : timestamp: chrono::DateTime::from_timestamp_millis(
510 1254000 : rng.gen_range(1703862754..1803862754),
511 1254000 : )
512 1254000 : .unwrap()
513 1254000 : .naive_utc(),
514 1254000 : application_name: Some("test".to_owned()),
515 1254000 : username: Some(hex::encode(rng.gen::<[u8; 4]>())),
516 1254000 : endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
517 1254000 : database: Some(hex::encode(rng.gen::<[u8; 16]>())),
518 1254000 : project: Some(hex::encode(rng.gen::<[u8; 16]>())),
519 1254000 : branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
520 1254000 : pg_options: None,
521 1254000 : auth_method: None,
522 1254000 : protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
523 1254000 : region: "us-east-1",
524 1254000 : error: None,
525 1254000 : success: rng.gen(),
526 1254000 : cold_start_info: "no",
527 1254000 : duration_us: rng.gen_range(0..30_000_000),
528 1254000 : disconnect_timestamp: None,
529 1254000 : }
530 1254000 : }
531 :
532 42 : fn random_stream(len: usize) -> impl Stream<Item = RequestData> + Unpin {
533 42 : let mut rng = StdRng::from_seed([0x39; 32]);
534 42 : futures::stream::iter(
535 1254000 : std::iter::repeat_with(move || generate_request_data(&mut rng)).take(len),
536 42 : )
537 42 : }
538 :
539 30 : async fn run_test(
540 30 : tmpdir: &Utf8Path,
541 30 : config: ParquetConfig,
542 30 : rx: impl Stream<Item = RequestData>,
543 30 : ) -> Vec<(u64, usize, i64)> {
544 30 : let remote_storage_config = RemoteStorageConfig {
545 30 : storage: RemoteStorageKind::LocalFs {
546 30 : local_path: tmpdir.to_path_buf(),
547 30 : },
548 30 : timeout: std::time::Duration::from_secs(120),
549 30 : };
550 30 : let storage = GenericRemoteStorage::from_config(&remote_storage_config)
551 0 : .await
552 30 : .unwrap();
553 30 :
554 1825 : worker_inner(storage, rx, config).await.unwrap();
555 30 :
556 30 : let mut files = WalkDir::new(tmpdir.as_std_path())
557 30 : .into_iter()
558 336 : .filter_map(|entry| entry.ok())
559 336 : .filter(|entry| entry.file_type().is_file())
560 186 : .map(|entry| entry.path().to_path_buf())
561 30 : .collect_vec();
562 30 : files.sort();
563 30 :
564 30 : files
565 30 : .into_iter()
566 186 : .map(|path| std::fs::File::open(tmpdir.as_std_path().join(path)).unwrap())
567 186 : .map(|file| {
568 186 : (
569 186 : file.metadata().unwrap(),
570 186 : SerializedFileReader::new(file).unwrap().metadata().clone(),
571 186 : )
572 186 : })
573 186 : .map(|(file_meta, parquet_meta)| {
574 186 : (
575 186 : file_meta.len(),
576 186 : parquet_meta.num_row_groups(),
577 186 : parquet_meta.file_metadata().num_rows(),
578 186 : )
579 186 : })
580 30 : .collect()
581 30 : }
582 :
583 : #[tokio::test]
584 6 : async fn verify_parquet_no_compression() {
585 6 : let tmpdir = camino_tempfile::tempdir().unwrap();
586 6 :
587 6 : let config = ParquetConfig {
588 6 : propeties: Arc::new(WriterProperties::new()),
589 6 : rows_per_group: 2_000,
590 6 : file_size: 1_000_000,
591 6 : max_duration: time::Duration::from_secs(20 * 60),
592 6 : test_remote_failures: 0,
593 6 : };
594 6 :
595 6 : let rx = random_stream(50_000);
596 369 : let file_stats = run_test(tmpdir.path(), config, rx).await;
597 6 :
598 6 : assert_eq!(
599 6 : file_stats,
600 6 : [
601 6 : (1315874, 3, 6000),
602 6 : (1315867, 3, 6000),
603 6 : (1315927, 3, 6000),
604 6 : (1315884, 3, 6000),
605 6 : (1316014, 3, 6000),
606 6 : (1315856, 3, 6000),
607 6 : (1315648, 3, 6000),
608 6 : (1315884, 3, 6000),
609 6 : (438913, 1, 2000)
610 6 : ]
611 6 : );
612 6 :
613 6 : tmpdir.close().unwrap();
614 6 : }
615 :
616 : #[tokio::test]
617 6 : async fn verify_parquet_min_compression() {
618 6 : let tmpdir = camino_tempfile::tempdir().unwrap();
619 6 :
620 6 : let config = ParquetConfig {
621 6 : propeties: Arc::new(
622 6 : WriterProperties::builder()
623 6 : .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::default()))
624 6 : .build(),
625 6 : ),
626 6 : rows_per_group: 2_000,
627 6 : file_size: 1_000_000,
628 6 : max_duration: time::Duration::from_secs(20 * 60),
629 6 : test_remote_failures: 0,
630 6 : };
631 6 :
632 6 : let rx = random_stream(50_000);
633 276 : let file_stats = run_test(tmpdir.path(), config, rx).await;
634 6 :
635 6 : // with compression, there are fewer files with more rows per file
636 6 : assert_eq!(
637 6 : file_stats,
638 6 : [
639 6 : (1223214, 5, 10000),
640 6 : (1229364, 5, 10000),
641 6 : (1231158, 5, 10000),
642 6 : (1230520, 5, 10000),
643 6 : (1221798, 5, 10000)
644 6 : ]
645 6 : );
646 6 :
647 6 : tmpdir.close().unwrap();
648 6 : }
649 :
650 : #[tokio::test]
651 6 : async fn verify_parquet_strong_compression() {
652 6 : let tmpdir = camino_tempfile::tempdir().unwrap();
653 6 :
654 6 : let config = ParquetConfig {
655 6 : propeties: Arc::new(
656 6 : WriterProperties::builder()
657 6 : .set_compression(parquet::basic::Compression::ZSTD(
658 6 : ZstdLevel::try_new(10).unwrap(),
659 6 : ))
660 6 : .build(),
661 6 : ),
662 6 : rows_per_group: 2_000,
663 6 : file_size: 1_000_000,
664 6 : max_duration: time::Duration::from_secs(20 * 60),
665 6 : test_remote_failures: 0,
666 6 : };
667 6 :
668 6 : let rx = random_stream(50_000);
669 276 : let file_stats = run_test(tmpdir.path(), config, rx).await;
670 6 :
671 6 : // with strong compression, the files are smaller
672 6 : assert_eq!(
673 6 : file_stats,
674 6 : [
675 6 : (1208861, 5, 10000),
676 6 : (1208592, 5, 10000),
677 6 : (1208885, 5, 10000),
678 6 : (1208873, 5, 10000),
679 6 : (1209128, 5, 10000)
680 6 : ]
681 6 : );
682 6 :
683 6 : tmpdir.close().unwrap();
684 6 : }
685 :
686 : #[tokio::test]
687 6 : async fn verify_parquet_unreliable_upload() {
688 6 : let tmpdir = camino_tempfile::tempdir().unwrap();
689 6 :
690 6 : let config = ParquetConfig {
691 6 : propeties: Arc::new(WriterProperties::new()),
692 6 : rows_per_group: 2_000,
693 6 : file_size: 1_000_000,
694 6 : max_duration: time::Duration::from_secs(20 * 60),
695 6 : test_remote_failures: 2,
696 6 : };
697 6 :
698 6 : let rx = random_stream(50_000);
699 370 : let file_stats = run_test(tmpdir.path(), config, rx).await;
700 6 :
701 6 : assert_eq!(
702 6 : file_stats,
703 6 : [
704 6 : (1315874, 3, 6000),
705 6 : (1315867, 3, 6000),
706 6 : (1315927, 3, 6000),
707 6 : (1315884, 3, 6000),
708 6 : (1316014, 3, 6000),
709 6 : (1315856, 3, 6000),
710 6 : (1315648, 3, 6000),
711 6 : (1315884, 3, 6000),
712 6 : (438913, 1, 2000)
713 6 : ]
714 6 : );
715 6 :
716 6 : tmpdir.close().unwrap();
717 6 : }
718 :
719 : #[tokio::test(start_paused = true)]
720 6 : async fn verify_parquet_regular_upload() {
721 6 : let tmpdir = camino_tempfile::tempdir().unwrap();
722 6 :
723 6 : let config = ParquetConfig {
724 6 : propeties: Arc::new(WriterProperties::new()),
725 6 : rows_per_group: 2_000,
726 6 : file_size: 1_000_000,
727 6 : max_duration: time::Duration::from_secs(60),
728 6 : test_remote_failures: 2,
729 6 : };
730 6 :
731 6 : let (tx, mut rx) = mpsc::unbounded_channel();
732 6 :
733 6 : tokio::spawn(async move {
734 24 : for _ in 0..3 {
735 18 : let mut s = random_stream(3000);
736 54018 : while let Some(r) = s.next().await {
737 54000 : tx.send(r).unwrap();
738 54000 : }
739 18 : time::sleep(time::Duration::from_secs(70)).await;
740 6 : }
741 6 : });
742 6 :
743 54426 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
744 534 : let file_stats = run_test(tmpdir.path(), config, rx).await;
745 6 :
746 6 : // files are smaller than the size threshold, but they took too long to fill so were flushed early
747 6 : assert_eq!(
748 6 : file_stats,
749 6 : [(659836, 2, 3001), (659550, 2, 3000), (659346, 2, 2999)]
750 6 : );
751 6 :
752 6 : tmpdir.close().unwrap();
753 6 : }
754 : }
|