Line data Source code
1 : use std::sync::Arc;
2 : use std::time::SystemTime;
3 :
4 : use anyhow::Context;
5 : use bytes::buf::Writer;
6 : use bytes::{BufMut, BytesMut};
7 : use chrono::{Datelike, Timelike};
8 : use futures::{Stream, StreamExt};
9 : use parquet::basic::Compression;
10 : use parquet::file::metadata::RowGroupMetaDataPtr;
11 : use parquet::file::properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE};
12 : use parquet::file::writer::SerializedFileWriter;
13 : use parquet::record::RecordWriter;
14 : use pq_proto::StartupMessageParams;
15 : use remote_storage::{GenericRemoteStorage, RemotePath, RemoteStorageConfig, TimeoutOrCancel};
16 : use serde::ser::SerializeMap;
17 : use tokio::sync::mpsc;
18 : use tokio::time;
19 : use tokio_util::sync::CancellationToken;
20 : use tracing::{debug, info, Span};
21 : use utils::backoff;
22 :
23 : use super::{RequestContextInner, LOG_CHAN};
24 : use crate::config::remote_storage_from_toml;
25 : use crate::context::LOG_CHAN_DISCONNECT;
26 : use crate::ext::TaskExt;
27 :
28 : #[derive(clap::Args, Clone, Debug)]
29 : pub struct ParquetUploadArgs {
30 : /// Storage location to upload the parquet files to.
31 : /// Encoded as toml (same format as pageservers), eg
32 : /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}`
33 : #[clap(long, value_parser = remote_storage_from_toml)]
34 : parquet_upload_remote_storage: Option<RemoteStorageConfig>,
35 :
36 : #[clap(long, value_parser = remote_storage_from_toml)]
37 : parquet_upload_disconnect_events_remote_storage: Option<RemoteStorageConfig>,
38 :
39 : /// How many rows to include in a row group
40 3 : #[clap(long, default_value_t = 8192)]
41 0 : parquet_upload_row_group_size: usize,
42 :
43 : /// How large each column page should be in bytes
44 3 : #[clap(long, default_value_t = DEFAULT_PAGE_SIZE)]
45 0 : parquet_upload_page_size: usize,
46 :
47 : /// How large the total parquet file should be in bytes
48 3 : #[clap(long, default_value_t = 100_000_000)]
49 0 : parquet_upload_size: i64,
50 :
51 : /// How long to wait before forcing a file upload
52 : #[clap(long, default_value = "20m", value_parser = humantime::parse_duration)]
53 0 : parquet_upload_maximum_duration: tokio::time::Duration,
54 :
55 : /// What level of compression to use
56 3 : #[clap(long, default_value_t = Compression::UNCOMPRESSED)]
57 0 : parquet_upload_compression: Compression,
58 : }
59 :
60 : // Occasional network issues and such can cause remote operations to fail, and
61 : // that's expected. If a upload fails, we log it at info-level, and retry.
62 : // But after FAILED_UPLOAD_WARN_THRESHOLD retries, we start to log it at WARN
63 : // level instead, as repeated failures can mean a more serious problem. If it
64 : // fails more than FAILED_UPLOAD_RETRIES times, we give up
65 : pub(crate) const FAILED_UPLOAD_WARN_THRESHOLD: u32 = 3;
66 : pub(crate) const FAILED_UPLOAD_MAX_RETRIES: u32 = 10;
67 :
68 : // the parquet crate leaves a lot to be desired...
69 : // what follows is an attempt to write parquet files with minimal allocs.
70 : // complication: parquet is a columnar format, while we want to write in as rows.
71 : // design:
72 : // * we batch up to 1024 rows, then flush them into a 'row group'
73 : // * after each rowgroup write, we check the length of the file and upload to s3 if large enough
74 :
75 1272000 : #[derive(parquet_derive::ParquetRecordWriter)]
76 : pub(crate) struct RequestData {
77 : region: &'static str,
78 : protocol: &'static str,
79 : /// Must be UTC. The derive macro doesn't like the timezones
80 : timestamp: chrono::NaiveDateTime,
81 : session_id: uuid::Uuid,
82 : peer_addr: String,
83 : username: Option<String>,
84 : application_name: Option<String>,
85 : endpoint_id: Option<String>,
86 : database: Option<String>,
87 : project: Option<String>,
88 : branch: Option<String>,
89 : pg_options: Option<String>,
90 : auth_method: Option<&'static str>,
91 : jwt_issuer: Option<String>,
92 :
93 : error: Option<&'static str>,
94 : /// Success is counted if we form a HTTP response with sql rows inside
95 : /// Or if we make it to proxy_pass
96 : success: bool,
97 : /// Indicates if the cplane started the new compute node for this request.
98 : cold_start_info: &'static str,
99 : /// Tracks time from session start (HTTP request/libpq TCP handshake)
100 : /// Through to success/failure
101 : duration_us: u64,
102 : /// If the session was successful after the disconnect, will be created one more event with filled `disconnect_timestamp`.
103 : disconnect_timestamp: Option<chrono::NaiveDateTime>,
104 : }
105 :
106 : struct Options<'a> {
107 : options: &'a StartupMessageParams,
108 : }
109 :
110 : impl serde::Serialize for Options<'_> {
111 0 : fn serialize<S>(&self, s: S) -> Result<S::Ok, S::Error>
112 0 : where
113 0 : S: serde::Serializer,
114 0 : {
115 0 : let mut state = s.serialize_map(None)?;
116 0 : for (k, v) in self.options.iter() {
117 0 : state.serialize_entry(k, v)?;
118 : }
119 0 : state.end()
120 0 : }
121 : }
122 :
123 : impl From<&RequestContextInner> for RequestData {
124 0 : fn from(value: &RequestContextInner) -> Self {
125 0 : Self {
126 0 : session_id: value.session_id,
127 0 : peer_addr: value.conn_info.addr.ip().to_string(),
128 0 : timestamp: value.first_packet.naive_utc(),
129 0 : username: value.user.as_deref().map(String::from),
130 0 : application_name: value.application.as_deref().map(String::from),
131 0 : endpoint_id: value.endpoint_id.as_deref().map(String::from),
132 0 : database: value.dbname.as_deref().map(String::from),
133 0 : project: value.project.as_deref().map(String::from),
134 0 : branch: value.branch.as_deref().map(String::from),
135 0 : pg_options: value
136 0 : .pg_options
137 0 : .as_ref()
138 0 : .and_then(|options| serde_json::to_string(&Options { options }).ok()),
139 0 : auth_method: value.auth_method.as_ref().map(|x| match x {
140 0 : super::AuthMethod::ConsoleRedirect => "console_redirect",
141 0 : super::AuthMethod::ScramSha256 => "scram_sha_256",
142 0 : super::AuthMethod::ScramSha256Plus => "scram_sha_256_plus",
143 0 : super::AuthMethod::Cleartext => "cleartext",
144 0 : super::AuthMethod::Jwt => "jwt",
145 0 : }),
146 0 : jwt_issuer: value.jwt_issuer.clone(),
147 0 : protocol: value.protocol.as_str(),
148 0 : region: value.region,
149 0 : error: value.error_kind.as_ref().map(|e| e.to_metric_label()),
150 0 : success: value.success,
151 0 : cold_start_info: value.cold_start_info.as_str(),
152 0 : duration_us: SystemTime::from(value.first_packet)
153 0 : .elapsed()
154 0 : .unwrap_or_default()
155 0 : .as_micros() as u64, // 584 millenia... good enough
156 0 : disconnect_timestamp: value.disconnect_timestamp.map(|x| x.naive_utc()),
157 0 : }
158 0 : }
159 : }
160 :
161 : /// Parquet request context worker
162 : ///
163 : /// It listened on a channel for all completed requests, extracts the data and writes it into a parquet file,
164 : /// then uploads a completed batch to S3
165 0 : pub async fn worker(
166 0 : cancellation_token: CancellationToken,
167 0 : config: ParquetUploadArgs,
168 0 : ) -> anyhow::Result<()> {
169 0 : let Some(remote_storage_config) = config.parquet_upload_remote_storage else {
170 0 : tracing::warn!("parquet request upload: no s3 bucket configured");
171 0 : return Ok(());
172 : };
173 :
174 0 : let (tx, mut rx) = mpsc::unbounded_channel();
175 0 : LOG_CHAN
176 0 : .set(tx.downgrade())
177 0 : .expect("only one worker should set the channel");
178 0 :
179 0 : // setup row stream that will close on cancellation
180 0 : let cancellation_token2 = cancellation_token.clone();
181 0 : tokio::spawn(async move {
182 0 : cancellation_token2.cancelled().await;
183 : // dropping this sender will cause the channel to close only once
184 : // all the remaining inflight requests have been completed.
185 0 : drop(tx);
186 0 : });
187 0 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
188 0 : let rx = rx.map(RequestData::from);
189 0 :
190 0 : let properties = WriterProperties::builder()
191 0 : .set_data_page_size_limit(config.parquet_upload_page_size)
192 0 : .set_compression(config.parquet_upload_compression);
193 0 :
194 0 : let parquet_config = ParquetConfig {
195 0 : propeties: Arc::new(properties.build()),
196 0 : rows_per_group: config.parquet_upload_row_group_size,
197 0 : file_size: config.parquet_upload_size,
198 0 : max_duration: config.parquet_upload_maximum_duration,
199 0 :
200 0 : #[cfg(any(test, feature = "testing"))]
201 0 : test_remote_failures: 0,
202 0 : };
203 :
204 : // TODO(anna): consider moving this to a separate function.
205 0 : if let Some(disconnect_events_storage_config) =
206 0 : config.parquet_upload_disconnect_events_remote_storage
207 : {
208 0 : let (tx_disconnect, mut rx_disconnect) = mpsc::unbounded_channel();
209 0 : LOG_CHAN_DISCONNECT
210 0 : .set(tx_disconnect.downgrade())
211 0 : .expect("only one worker should set the channel");
212 0 :
213 0 : // setup row stream that will close on cancellation
214 0 : tokio::spawn(async move {
215 0 : cancellation_token.cancelled().await;
216 : // dropping this sender will cause the channel to close only once
217 : // all the remaining inflight requests have been completed.
218 0 : drop(tx_disconnect);
219 0 : });
220 0 : let rx_disconnect = futures::stream::poll_fn(move |cx| rx_disconnect.poll_recv(cx));
221 0 : let rx_disconnect = rx_disconnect.map(RequestData::from);
222 0 :
223 0 : let parquet_config_disconnect = parquet_config.clone();
224 0 : tokio::try_join!(
225 0 : worker_inner(remote_storage_config, rx, parquet_config),
226 0 : worker_inner(
227 0 : disconnect_events_storage_config,
228 0 : rx_disconnect,
229 0 : parquet_config_disconnect
230 0 : )
231 0 : )
232 0 : .map(|_| ())
233 : } else {
234 0 : worker_inner(remote_storage_config, rx, parquet_config).await
235 : }
236 0 : }
237 :
238 : #[derive(Clone, Debug)]
239 : struct ParquetConfig {
240 : propeties: WriterPropertiesPtr,
241 : rows_per_group: usize,
242 : file_size: i64,
243 :
244 : max_duration: tokio::time::Duration,
245 :
246 : #[cfg(any(test, feature = "testing"))]
247 : test_remote_failures: u64,
248 : }
249 :
250 : impl ParquetConfig {
251 26 : async fn storage(
252 26 : &self,
253 26 : storage_config: &RemoteStorageConfig,
254 26 : ) -> anyhow::Result<GenericRemoteStorage> {
255 26 : let storage = GenericRemoteStorage::from_config(storage_config)
256 26 : .await
257 26 : .context("remote storage init")?;
258 :
259 : #[cfg(any(test, feature = "testing"))]
260 26 : if self.test_remote_failures > 0 {
261 12 : return Ok(GenericRemoteStorage::unreliable_wrapper(
262 12 : storage,
263 12 : self.test_remote_failures,
264 12 : ));
265 14 : }
266 14 :
267 14 : Ok(storage)
268 26 : }
269 : }
270 :
271 4 : async fn worker_inner(
272 4 : storage_config: RemoteStorageConfig,
273 4 : rx: impl Stream<Item = RequestData>,
274 4 : config: ParquetConfig,
275 4 : ) -> anyhow::Result<()> {
276 4 : let mut rx = std::pin::pin!(rx);
277 4 :
278 4 : let mut rows = Vec::with_capacity(config.rows_per_group);
279 :
280 4 : let schema = rows.as_slice().schema()?;
281 4 : let buffer = BytesMut::new();
282 4 : let w = buffer.writer();
283 4 : let mut w = SerializedFileWriter::new(w, schema.clone(), config.propeties.clone())?;
284 :
285 4 : let mut last_upload = time::Instant::now();
286 4 :
287 4 : let mut len = 0;
288 159004 : while let Some(row) = rx.next().await {
289 159000 : rows.push(row);
290 159000 : let force = last_upload.elapsed() > config.max_duration;
291 159000 : if rows.len() == config.rows_per_group || force {
292 : let rg_meta;
293 80 : (rows, w, rg_meta) = flush_rows(rows, w).await?;
294 80 : len += rg_meta.compressed_size();
295 158920 : }
296 159000 : if len > config.file_size || force {
297 23 : last_upload = time::Instant::now();
298 23 : let file = upload_parquet(w, len, &storage_config, &config).await?;
299 23 : w = SerializedFileWriter::new(file, schema.clone(), config.propeties.clone())?;
300 23 : len = 0;
301 158977 : }
302 : }
303 :
304 4 : if !rows.is_empty() {
305 : let rg_meta;
306 1 : (_, w, rg_meta) = flush_rows(rows, w).await?;
307 1 : len += rg_meta.compressed_size();
308 3 : }
309 :
310 4 : if !w.flushed_row_groups().is_empty() {
311 3 : let _rtchk: Writer<BytesMut> = upload_parquet(w, len, &storage_config, &config).await?;
312 1 : }
313 :
314 4 : Ok(())
315 4 : }
316 :
317 81 : async fn flush_rows<W>(
318 81 : rows: Vec<RequestData>,
319 81 : mut w: SerializedFileWriter<W>,
320 81 : ) -> anyhow::Result<(
321 81 : Vec<RequestData>,
322 81 : SerializedFileWriter<W>,
323 81 : RowGroupMetaDataPtr,
324 81 : )>
325 81 : where
326 81 : W: std::io::Write + Send + 'static,
327 81 : {
328 81 : let span = Span::current();
329 81 : let (mut rows, w, rg_meta) = tokio::task::spawn_blocking(move || {
330 81 : let _enter = span.enter();
331 :
332 81 : let mut rg = w.next_row_group()?;
333 81 : rows.as_slice().write_to_row_group(&mut rg)?;
334 81 : let rg_meta = rg.close()?;
335 :
336 81 : let size = rg_meta.compressed_size();
337 81 : let compression = rg_meta.compressed_size() as f64 / rg_meta.total_byte_size() as f64;
338 81 :
339 81 : debug!(size, compression, "flushed row group to parquet file");
340 :
341 81 : Ok::<_, parquet::errors::ParquetError>((rows, w, rg_meta))
342 81 : })
343 81 : .await
344 81 : .propagate_task_panic()?;
345 :
346 81 : rows.clear();
347 81 : Ok((rows, w, rg_meta))
348 81 : }
349 :
350 26 : async fn upload_parquet(
351 26 : mut w: SerializedFileWriter<Writer<BytesMut>>,
352 26 : len: i64,
353 26 : storage_config: &RemoteStorageConfig,
354 26 : config: &ParquetConfig,
355 26 : ) -> anyhow::Result<Writer<BytesMut>> {
356 26 : let len_uncompressed = w
357 26 : .flushed_row_groups()
358 26 : .iter()
359 81 : .map(|rg| rg.total_byte_size())
360 26 : .sum::<i64>();
361 :
362 : // I don't know how compute intensive this is, although it probably isn't much... better be safe than sorry.
363 : // finish method only available on the fork: https://github.com/apache/arrow-rs/issues/5253
364 26 : let (mut buffer, metadata) =
365 26 : tokio::task::spawn_blocking(move || -> parquet::errors::Result<_> {
366 26 : let metadata = w.finish()?;
367 26 : let buffer = std::mem::take(w.inner_mut().get_mut());
368 26 : Ok((buffer, metadata))
369 26 : })
370 26 : .await
371 26 : .propagate_task_panic()?;
372 :
373 26 : let data = buffer.split().freeze();
374 26 :
375 26 : let compression = len as f64 / len_uncompressed as f64;
376 26 : let size = data.len();
377 26 : let now = chrono::Utc::now();
378 26 : let id = uuid::Uuid::new_v7(uuid::Timestamp::from_unix(
379 26 : uuid::NoContext,
380 26 : // we won't be running this in 1970. this cast is ok
381 26 : now.timestamp() as u64,
382 26 : now.timestamp_subsec_nanos(),
383 26 : ));
384 26 :
385 26 : info!(
386 : %id,
387 : rows = metadata.num_rows,
388 0 : size, compression, "uploading request parquet file"
389 : );
390 :
391 : // A bug in azure-sdk means that the identity-token-file that expires after
392 : // 1 hour is not refreshed. This identity-token is used to fetch the actual azure storage
393 : // tokens that last for 24 hours. After this 24 hour period, azure-sdk tries to refresh
394 : // the storage token, but the identity token has now expired.
395 : // <https://github.com/Azure/azure-sdk-for-rust/issues/1739>
396 : //
397 : // To work around this, we recreate the storage every time.
398 26 : let storage = config.storage(storage_config).await?;
399 :
400 26 : let year = now.year();
401 26 : let month = now.month();
402 26 : let day = now.day();
403 26 : let hour = now.hour();
404 : // segment files by time for S3 performance
405 26 : let path = RemotePath::from_string(&format!(
406 26 : "{year:04}/{month:02}/{day:02}/{hour:02}/requests_{id}.parquet"
407 26 : ))?;
408 26 : let cancel = CancellationToken::new();
409 26 : let maybe_err = backoff::retry(
410 38 : || async {
411 38 : let stream = futures::stream::once(futures::future::ready(Ok(data.clone())));
412 38 : storage
413 38 : .upload(stream, data.len(), &path, None, &cancel)
414 38 : .await
415 76 : },
416 26 : TimeoutOrCancel::caused_by_cancel,
417 26 : FAILED_UPLOAD_WARN_THRESHOLD,
418 26 : FAILED_UPLOAD_MAX_RETRIES,
419 26 : "request_data_upload",
420 26 : // we don't want cancellation to interrupt here, so we make a dummy cancel token
421 26 : &cancel,
422 26 : )
423 26 : .await
424 26 : .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel))
425 26 : .and_then(|x| x)
426 26 : .with_context(|| format!("request_data_upload: path={path}"))
427 26 : .err();
428 :
429 26 : if let Some(err) = maybe_err {
430 0 : tracing::error!(%id, %path, error = ?err, "failed to upload request data");
431 26 : }
432 :
433 26 : Ok(buffer.writer())
434 26 : }
435 :
436 : #[cfg(test)]
437 : #[expect(clippy::unwrap_used)]
438 : mod tests {
439 : use std::net::Ipv4Addr;
440 : use std::num::NonZeroUsize;
441 : use std::sync::Arc;
442 :
443 : use camino::Utf8Path;
444 : use clap::Parser;
445 : use futures::{Stream, StreamExt};
446 : use itertools::Itertools;
447 : use parquet::basic::{Compression, ZstdLevel};
448 : use parquet::file::properties::{WriterProperties, DEFAULT_PAGE_SIZE};
449 : use parquet::file::reader::FileReader;
450 : use parquet::file::serialized_reader::SerializedFileReader;
451 : use rand::rngs::StdRng;
452 : use rand::{Rng, SeedableRng};
453 : use remote_storage::{
454 : RemoteStorageConfig, RemoteStorageKind, S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
455 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
456 : };
457 : use tokio::sync::mpsc;
458 : use tokio::time;
459 : use walkdir::WalkDir;
460 :
461 : use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData};
462 :
463 : #[derive(Parser)]
464 : struct ProxyCliArgs {
465 : #[clap(flatten)]
466 : parquet_upload: ParquetUploadArgs,
467 : }
468 :
469 : #[test]
470 1 : fn default_parser() {
471 1 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from(["proxy"]);
472 1 : assert_eq!(parquet_upload.parquet_upload_remote_storage, None);
473 1 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 8192);
474 1 : assert_eq!(parquet_upload.parquet_upload_page_size, DEFAULT_PAGE_SIZE);
475 1 : assert_eq!(parquet_upload.parquet_upload_size, 100_000_000);
476 1 : assert_eq!(
477 1 : parquet_upload.parquet_upload_maximum_duration,
478 1 : time::Duration::from_secs(20 * 60)
479 1 : );
480 1 : assert_eq!(
481 1 : parquet_upload.parquet_upload_compression,
482 1 : Compression::UNCOMPRESSED
483 1 : );
484 1 : }
485 :
486 : #[test]
487 1 : fn full_parser() {
488 1 : let ProxyCliArgs { parquet_upload } = ProxyCliArgs::parse_from([
489 1 : "proxy",
490 1 : "--parquet-upload-remote-storage",
491 1 : "{bucket_name='default',prefix_in_bucket='proxy/',bucket_region='us-east-1',endpoint='http://minio:9000'}",
492 1 : "--parquet-upload-row-group-size",
493 1 : "100",
494 1 : "--parquet-upload-page-size",
495 1 : "10000",
496 1 : "--parquet-upload-size",
497 1 : "10000000",
498 1 : "--parquet-upload-maximum-duration",
499 1 : "10m",
500 1 : "--parquet-upload-compression",
501 1 : "zstd(5)",
502 1 : ]);
503 1 : assert_eq!(
504 1 : parquet_upload.parquet_upload_remote_storage,
505 1 : Some(RemoteStorageConfig {
506 1 : storage: RemoteStorageKind::AwsS3(S3Config {
507 1 : bucket_name: "default".into(),
508 1 : bucket_region: "us-east-1".into(),
509 1 : prefix_in_bucket: Some("proxy/".into()),
510 1 : endpoint: Some("http://minio:9000".into()),
511 1 : concurrency_limit: NonZeroUsize::new(
512 1 : DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT
513 1 : )
514 1 : .unwrap(),
515 1 : max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
516 1 : upload_storage_class: None,
517 1 : }),
518 1 : timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
519 1 : small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT,
520 1 : })
521 1 : );
522 1 : assert_eq!(parquet_upload.parquet_upload_row_group_size, 100);
523 1 : assert_eq!(parquet_upload.parquet_upload_page_size, 10000);
524 1 : assert_eq!(parquet_upload.parquet_upload_size, 10_000_000);
525 1 : assert_eq!(
526 1 : parquet_upload.parquet_upload_maximum_duration,
527 1 : time::Duration::from_secs(10 * 60)
528 1 : );
529 1 : assert_eq!(
530 1 : parquet_upload.parquet_upload_compression,
531 1 : Compression::ZSTD(ZstdLevel::try_new(5).unwrap())
532 1 : );
533 1 : }
534 :
535 159000 : fn generate_request_data(rng: &mut impl Rng) -> RequestData {
536 159000 : RequestData {
537 159000 : session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(),
538 159000 : peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(),
539 159000 : timestamp: chrono::DateTime::from_timestamp_millis(
540 159000 : rng.gen_range(1703862754..1803862754),
541 159000 : )
542 159000 : .unwrap()
543 159000 : .naive_utc(),
544 159000 : application_name: Some("test".to_owned()),
545 159000 : username: Some(hex::encode(rng.gen::<[u8; 4]>())),
546 159000 : endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())),
547 159000 : database: Some(hex::encode(rng.gen::<[u8; 16]>())),
548 159000 : project: Some(hex::encode(rng.gen::<[u8; 16]>())),
549 159000 : branch: Some(hex::encode(rng.gen::<[u8; 16]>())),
550 159000 : pg_options: None,
551 159000 : auth_method: None,
552 159000 : jwt_issuer: None,
553 159000 : protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)],
554 159000 : region: "us-east-1",
555 159000 : error: None,
556 159000 : success: rng.gen(),
557 159000 : cold_start_info: "no",
558 159000 : duration_us: rng.gen_range(0..30_000_000),
559 159000 : disconnect_timestamp: None,
560 159000 : }
561 159000 : }
562 :
563 6 : fn random_stream(len: usize) -> impl Stream<Item = RequestData> + Unpin {
564 6 : let mut rng = StdRng::from_seed([0x39; 32]);
565 6 : futures::stream::iter(
566 159000 : std::iter::repeat_with(move || generate_request_data(&mut rng)).take(len),
567 6 : )
568 6 : }
569 :
570 4 : async fn run_test(
571 4 : tmpdir: &Utf8Path,
572 4 : config: ParquetConfig,
573 4 : rx: impl Stream<Item = RequestData>,
574 4 : ) -> Vec<(u64, usize, i64)> {
575 4 : let remote_storage_config = RemoteStorageConfig {
576 4 : storage: RemoteStorageKind::LocalFs {
577 4 : local_path: tmpdir.to_path_buf(),
578 4 : },
579 4 : timeout: std::time::Duration::from_secs(120),
580 4 : small_timeout: std::time::Duration::from_secs(30),
581 4 : };
582 4 :
583 4 : worker_inner(remote_storage_config, rx, config)
584 4 : .await
585 4 : .unwrap();
586 4 :
587 4 : let mut files = WalkDir::new(tmpdir.as_std_path())
588 4 : .into_iter()
589 46 : .filter_map(|entry| entry.ok())
590 46 : .filter(|entry| entry.file_type().is_file())
591 26 : .map(|entry| entry.path().to_path_buf())
592 4 : .collect_vec();
593 4 : files.sort();
594 4 :
595 4 : files
596 4 : .into_iter()
597 26 : .map(|path| std::fs::File::open(tmpdir.as_std_path().join(path)).unwrap())
598 26 : .map(|file| {
599 26 : (
600 26 : file.metadata().unwrap(),
601 26 : SerializedFileReader::new(file).unwrap().metadata().clone(),
602 26 : )
603 26 : })
604 26 : .map(|(file_meta, parquet_meta)| {
605 26 : (
606 26 : file_meta.len(),
607 26 : parquet_meta.num_row_groups(),
608 26 : parquet_meta.file_metadata().num_rows(),
609 26 : )
610 26 : })
611 4 : .collect()
612 4 : }
613 :
614 : #[tokio::test]
615 1 : async fn verify_parquet_no_compression() {
616 1 : let tmpdir = camino_tempfile::tempdir().unwrap();
617 1 :
618 1 : let config = ParquetConfig {
619 1 : propeties: Arc::new(WriterProperties::new()),
620 1 : rows_per_group: 2_000,
621 1 : file_size: 1_000_000,
622 1 : max_duration: time::Duration::from_secs(20 * 60),
623 1 : test_remote_failures: 0,
624 1 : };
625 1 :
626 1 : let rx = random_stream(50_000);
627 1 : let file_stats = run_test(tmpdir.path(), config, rx).await;
628 1 :
629 1 : assert_eq!(
630 1 : file_stats,
631 1 : [
632 1 : (1313105, 3, 6000),
633 1 : (1313094, 3, 6000),
634 1 : (1313153, 3, 6000),
635 1 : (1313110, 3, 6000),
636 1 : (1313246, 3, 6000),
637 1 : (1313083, 3, 6000),
638 1 : (1312877, 3, 6000),
639 1 : (1313112, 3, 6000),
640 1 : (438020, 1, 2000)
641 1 : ]
642 1 : );
643 1 :
644 1 : tmpdir.close().unwrap();
645 1 : }
646 :
647 : #[tokio::test]
648 1 : async fn verify_parquet_strong_compression() {
649 1 : let tmpdir = camino_tempfile::tempdir().unwrap();
650 1 :
651 1 : let config = ParquetConfig {
652 1 : propeties: Arc::new(
653 1 : WriterProperties::builder()
654 1 : .set_compression(parquet::basic::Compression::ZSTD(
655 1 : ZstdLevel::try_new(10).unwrap(),
656 1 : ))
657 1 : .build(),
658 1 : ),
659 1 : rows_per_group: 2_000,
660 1 : file_size: 1_000_000,
661 1 : max_duration: time::Duration::from_secs(20 * 60),
662 1 : test_remote_failures: 0,
663 1 : };
664 1 :
665 1 : let rx = random_stream(50_000);
666 1 : let file_stats = run_test(tmpdir.path(), config, rx).await;
667 1 :
668 1 : // with strong compression, the files are smaller
669 1 : assert_eq!(
670 1 : file_stats,
671 1 : [
672 1 : (1204324, 5, 10000),
673 1 : (1204048, 5, 10000),
674 1 : (1204349, 5, 10000),
675 1 : (1204334, 5, 10000),
676 1 : (1204588, 5, 10000)
677 1 : ]
678 1 : );
679 1 :
680 1 : tmpdir.close().unwrap();
681 1 : }
682 :
683 : #[tokio::test]
684 1 : async fn verify_parquet_unreliable_upload() {
685 1 : let tmpdir = camino_tempfile::tempdir().unwrap();
686 1 :
687 1 : let config = ParquetConfig {
688 1 : propeties: Arc::new(WriterProperties::new()),
689 1 : rows_per_group: 2_000,
690 1 : file_size: 1_000_000,
691 1 : max_duration: time::Duration::from_secs(20 * 60),
692 1 : test_remote_failures: 2,
693 1 : };
694 1 :
695 1 : let rx = random_stream(50_000);
696 1 : let file_stats = run_test(tmpdir.path(), config, rx).await;
697 1 :
698 1 : assert_eq!(
699 1 : file_stats,
700 1 : [
701 1 : (1313105, 3, 6000),
702 1 : (1313094, 3, 6000),
703 1 : (1313153, 3, 6000),
704 1 : (1313110, 3, 6000),
705 1 : (1313246, 3, 6000),
706 1 : (1313083, 3, 6000),
707 1 : (1312877, 3, 6000),
708 1 : (1313112, 3, 6000),
709 1 : (438020, 1, 2000)
710 1 : ]
711 1 : );
712 1 :
713 1 : tmpdir.close().unwrap();
714 1 : }
715 :
716 : #[tokio::test(start_paused = true)]
717 1 : async fn verify_parquet_regular_upload() {
718 1 : let tmpdir = camino_tempfile::tempdir().unwrap();
719 1 :
720 1 : let config = ParquetConfig {
721 1 : propeties: Arc::new(WriterProperties::new()),
722 1 : rows_per_group: 2_000,
723 1 : file_size: 1_000_000,
724 1 : max_duration: time::Duration::from_secs(60),
725 1 : test_remote_failures: 2,
726 1 : };
727 1 :
728 1 : let (tx, mut rx) = mpsc::unbounded_channel();
729 1 :
730 1 : tokio::spawn(async move {
731 4 : for _ in 0..3 {
732 3 : let mut s = random_stream(3000);
733 9003 : while let Some(r) = s.next().await {
734 9000 : tx.send(r).unwrap();
735 9000 : }
736 3 : time::sleep(time::Duration::from_secs(70)).await;
737 1 : }
738 1 : });
739 1 :
740 9071 : let rx = futures::stream::poll_fn(move |cx| rx.poll_recv(cx));
741 1 : let file_stats = run_test(tmpdir.path(), config, rx).await;
742 1 :
743 1 : // files are smaller than the size threshold, but they took too long to fill so were flushed early
744 1 : assert_eq!(
745 1 : file_stats,
746 1 : [(658014, 2, 3001), (657728, 2, 3000), (657524, 2, 2999)]
747 1 : );
748 1 :
749 1 : tmpdir.close().unwrap();
750 1 : }
751 : }
|