Line data Source code
1 : use std::sync::Arc;
2 :
3 : use anyhow::bail;
4 : use futures::pin_mut;
5 : use futures::StreamExt;
6 : use hyper::body::HttpBody;
7 : use hyper::header;
8 : use hyper::http::HeaderName;
9 : use hyper::http::HeaderValue;
10 : use hyper::Response;
11 : use hyper::StatusCode;
12 : use hyper::{Body, HeaderMap, Request};
13 : use serde_json::json;
14 : use serde_json::Value;
15 : use tokio::try_join;
16 : use tokio_postgres::error::DbError;
17 : use tokio_postgres::error::ErrorPosition;
18 : use tokio_postgres::GenericClient;
19 : use tokio_postgres::IsolationLevel;
20 : use tokio_postgres::ReadyForQueryStatus;
21 : use tokio_postgres::Transaction;
22 : use tracing::error;
23 : use tracing::info;
24 : use tracing::instrument;
25 : use url::Url;
26 : use utils::http::error::ApiError;
27 : use utils::http::json::json_response;
28 :
29 : use crate::auth::backend::ComputeUserInfo;
30 : use crate::auth::endpoint_sni;
31 : use crate::auth::ComputeUserInfoParseError;
32 : use crate::config::ProxyConfig;
33 : use crate::config::TlsConfig;
34 : use crate::context::RequestMonitoring;
35 : use crate::metrics::HTTP_CONTENT_LENGTH;
36 : use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
37 : use crate::proxy::NeonOptions;
38 : use crate::DbName;
39 : use crate::RoleName;
40 :
41 : use super::backend::PoolingBackend;
42 : use super::conn_pool::ConnInfo;
43 : use super::json::json_to_pg_text;
44 : use super::json::pg_text_row_to_json;
45 :
46 0 : #[derive(serde::Deserialize)]
47 : #[serde(rename_all = "camelCase")]
48 : struct QueryData {
49 : query: String,
50 : #[serde(deserialize_with = "bytes_to_pg_text")]
51 : params: Vec<Option<String>>,
52 : #[serde(default)]
53 : array_mode: Option<bool>,
54 : }
55 :
56 0 : #[derive(serde::Deserialize)]
57 : struct BatchQueryData {
58 : queries: Vec<QueryData>,
59 : }
60 :
61 0 : #[derive(serde::Deserialize)]
62 : #[serde(untagged)]
63 : enum Payload {
64 : Single(QueryData),
65 : Batch(BatchQueryData),
66 : }
67 :
68 : const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
69 : const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
70 :
71 : static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
72 : static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
73 : static ALLOW_POOL: HeaderName = HeaderName::from_static("neon-pool-opt-in");
74 : static TXN_ISOLATION_LEVEL: HeaderName = HeaderName::from_static("neon-batch-isolation-level");
75 : static TXN_READ_ONLY: HeaderName = HeaderName::from_static("neon-batch-read-only");
76 : static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrable");
77 :
78 : static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true");
79 :
80 0 : fn bytes_to_pg_text<'de, D>(deserializer: D) -> Result<Vec<Option<String>>, D::Error>
81 0 : where
82 0 : D: serde::de::Deserializer<'de>,
83 0 : {
84 : // TODO: consider avoiding the allocation here.
85 0 : let json: Vec<Value> = serde::de::Deserialize::deserialize(deserializer)?;
86 0 : Ok(json_to_pg_text(json))
87 0 : }
88 :
89 0 : #[derive(Debug, thiserror::Error)]
90 : pub enum ConnInfoError {
91 : #[error("invalid header: {0}")]
92 : InvalidHeader(&'static str),
93 : #[error("invalid connection string: {0}")]
94 : UrlParseError(#[from] url::ParseError),
95 : #[error("incorrect scheme")]
96 : IncorrectScheme,
97 : #[error("missing database name")]
98 : MissingDbName,
99 : #[error("invalid database name")]
100 : InvalidDbName,
101 : #[error("missing username")]
102 : MissingUsername,
103 : #[error("invalid username: {0}")]
104 : InvalidUsername(#[from] std::string::FromUtf8Error),
105 : #[error("missing password")]
106 : MissingPassword,
107 : #[error("missing hostname")]
108 : MissingHostname,
109 : #[error("invalid hostname: {0}")]
110 : InvalidEndpoint(#[from] ComputeUserInfoParseError),
111 : #[error("malformed endpoint")]
112 : MalformedEndpoint,
113 : }
114 :
115 0 : fn get_conn_info(
116 0 : ctx: &mut RequestMonitoring,
117 0 : headers: &HeaderMap,
118 0 : tls: &TlsConfig,
119 0 : ) -> Result<ConnInfo, ConnInfoError> {
120 0 : // HTTP only uses cleartext (for now and likely always)
121 0 : ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
122 :
123 0 : let connection_string = headers
124 0 : .get("Neon-Connection-String")
125 0 : .ok_or(ConnInfoError::InvalidHeader("Neon-Connection-String"))?
126 0 : .to_str()
127 0 : .map_err(|_| ConnInfoError::InvalidHeader("Neon-Connection-String"))?;
128 :
129 0 : let connection_url = Url::parse(connection_string)?;
130 :
131 0 : let protocol = connection_url.scheme();
132 0 : if protocol != "postgres" && protocol != "postgresql" {
133 0 : return Err(ConnInfoError::IncorrectScheme);
134 0 : }
135 :
136 0 : let mut url_path = connection_url
137 0 : .path_segments()
138 0 : .ok_or(ConnInfoError::MissingDbName)?;
139 :
140 0 : let dbname: DbName = url_path.next().ok_or(ConnInfoError::InvalidDbName)?.into();
141 0 : ctx.set_dbname(dbname.clone());
142 :
143 0 : let username = RoleName::from(urlencoding::decode(connection_url.username())?);
144 0 : if username.is_empty() {
145 0 : return Err(ConnInfoError::MissingUsername);
146 0 : }
147 0 : ctx.set_user(username.clone());
148 :
149 0 : let password = connection_url
150 0 : .password()
151 0 : .ok_or(ConnInfoError::MissingPassword)?;
152 0 : let password = urlencoding::decode_binary(password.as_bytes());
153 :
154 0 : let hostname = connection_url
155 0 : .host_str()
156 0 : .ok_or(ConnInfoError::MissingHostname)?;
157 :
158 0 : let endpoint =
159 0 : endpoint_sni(hostname, &tls.common_names)?.ok_or(ConnInfoError::MalformedEndpoint)?;
160 0 : ctx.set_endpoint_id(endpoint.clone());
161 0 :
162 0 : let pairs = connection_url.query_pairs();
163 0 :
164 0 : let mut options = Option::None;
165 :
166 0 : for (key, value) in pairs {
167 0 : match &*key {
168 0 : "options" => {
169 0 : options = Some(NeonOptions::parse_options_raw(&value));
170 0 : }
171 0 : "application_name" => ctx.set_application(Some(value.into())),
172 0 : _ => {}
173 : }
174 : }
175 :
176 0 : let user_info = ComputeUserInfo {
177 0 : endpoint,
178 0 : user: username,
179 0 : options: options.unwrap_or_default(),
180 0 : };
181 0 :
182 0 : Ok(ConnInfo {
183 0 : user_info,
184 0 : dbname,
185 0 : password: match password {
186 0 : std::borrow::Cow::Borrowed(b) => b.into(),
187 0 : std::borrow::Cow::Owned(b) => b.into(),
188 : },
189 : })
190 0 : }
191 :
192 : // TODO: return different http error codes
193 0 : pub async fn handle(
194 0 : config: &'static ProxyConfig,
195 0 : mut ctx: RequestMonitoring,
196 0 : request: Request<Body>,
197 0 : backend: Arc<PoolingBackend>,
198 0 : ) -> Result<Response<Body>, ApiError> {
199 0 : let result = tokio::time::timeout(
200 0 : config.http_config.request_timeout,
201 0 : handle_inner(config, &mut ctx, request, backend),
202 0 : )
203 0 : .await;
204 0 : let mut response = match result {
205 0 : Ok(r) => match r {
206 0 : Ok(r) => {
207 0 : ctx.set_success();
208 0 : r
209 : }
210 0 : Err(e) => {
211 0 : // TODO: ctx.set_error_kind(e.get_error_type());
212 0 :
213 0 : let mut message = format!("{:?}", e);
214 0 : let db_error = e
215 0 : .downcast_ref::<tokio_postgres::Error>()
216 0 : .and_then(|e| e.as_db_error());
217 0 : fn get<'a, T: serde::Serialize>(
218 0 : db: Option<&'a DbError>,
219 0 : x: impl FnOnce(&'a DbError) -> T,
220 0 : ) -> Value {
221 0 : db.map(x)
222 0 : .and_then(|t| serde_json::to_value(t).ok())
223 0 : .unwrap_or_default()
224 0 : }
225 :
226 0 : if let Some(db_error) = db_error {
227 0 : db_error.message().clone_into(&mut message);
228 0 : }
229 :
230 0 : let position = db_error.and_then(|db| db.position());
231 0 : let (position, internal_position, internal_query) = match position {
232 0 : Some(ErrorPosition::Original(position)) => (
233 0 : Value::String(position.to_string()),
234 0 : Value::Null,
235 0 : Value::Null,
236 0 : ),
237 0 : Some(ErrorPosition::Internal { position, query }) => (
238 0 : Value::Null,
239 0 : Value::String(position.to_string()),
240 0 : Value::String(query.clone()),
241 0 : ),
242 0 : None => (Value::Null, Value::Null, Value::Null),
243 : };
244 :
245 0 : let code = get(db_error, |db| db.code().code());
246 0 : let severity = get(db_error, |db| db.severity());
247 0 : let detail = get(db_error, |db| db.detail());
248 0 : let hint = get(db_error, |db| db.hint());
249 0 : let where_ = get(db_error, |db| db.where_());
250 0 : let table = get(db_error, |db| db.table());
251 0 : let column = get(db_error, |db| db.column());
252 0 : let schema = get(db_error, |db| db.schema());
253 0 : let datatype = get(db_error, |db| db.datatype());
254 0 : let constraint = get(db_error, |db| db.constraint());
255 0 : let file = get(db_error, |db| db.file());
256 0 : let line = get(db_error, |db| db.line().map(|l| l.to_string()));
257 0 : let routine = get(db_error, |db| db.routine());
258 :
259 0 : error!(
260 0 : ?code,
261 0 : "sql-over-http per-client task finished with an error: {e:#}"
262 0 : );
263 : // TODO: this shouldn't always be bad request.
264 0 : json_response(
265 0 : StatusCode::BAD_REQUEST,
266 0 : json!({
267 0 : "message": message,
268 0 : "code": code,
269 0 : "detail": detail,
270 0 : "hint": hint,
271 0 : "position": position,
272 0 : "internalPosition": internal_position,
273 0 : "internalQuery": internal_query,
274 0 : "severity": severity,
275 0 : "where": where_,
276 0 : "table": table,
277 0 : "column": column,
278 0 : "schema": schema,
279 0 : "dataType": datatype,
280 0 : "constraint": constraint,
281 0 : "file": file,
282 0 : "line": line,
283 0 : "routine": routine,
284 0 : }),
285 0 : )?
286 : }
287 : },
288 : Err(_) => {
289 : // TODO: when http error classification is done, distinguish between
290 : // timeout on sql vs timeout in proxy/cplane
291 : // ctx.set_error_kind(crate::error::ErrorKind::RateLimit);
292 :
293 0 : let message = format!(
294 0 : "HTTP-Connection timed out, execution time exeeded {} seconds",
295 0 : config.http_config.request_timeout.as_secs()
296 0 : );
297 0 : error!(message);
298 0 : json_response(
299 0 : StatusCode::GATEWAY_TIMEOUT,
300 0 : json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }),
301 0 : )?
302 : }
303 : };
304 :
305 0 : response.headers_mut().insert(
306 0 : "Access-Control-Allow-Origin",
307 0 : hyper::http::HeaderValue::from_static("*"),
308 0 : );
309 0 : Ok(response)
310 0 : }
311 :
312 0 : #[instrument(
313 : name = "sql-over-http",
314 : skip_all,
315 : fields(
316 : pid = tracing::field::Empty,
317 : conn_id = tracing::field::Empty
318 : )
319 : )]
320 : async fn handle_inner(
321 : config: &'static ProxyConfig,
322 : ctx: &mut RequestMonitoring,
323 : request: Request<Body>,
324 : backend: Arc<PoolingBackend>,
325 : ) -> anyhow::Result<Response<Body>> {
326 : let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE
327 : .with_label_values(&[ctx.protocol])
328 : .guard();
329 0 : info!(
330 0 : protocol = ctx.protocol,
331 0 : "handling interactive connection from client"
332 0 : );
333 :
334 : //
335 : // Determine the destination and connection params
336 : //
337 : let headers = request.headers();
338 : // TLS config should be there.
339 : let conn_info = get_conn_info(ctx, headers, config.tls_config.as_ref().unwrap())?;
340 0 : info!(
341 0 : user = conn_info.user_info.user.as_str(),
342 0 : project = conn_info.user_info.endpoint.as_str(),
343 0 : "credentials"
344 0 : );
345 :
346 : // Determine the output options. Default behaviour is 'false'. Anything that is not
347 : // strictly 'true' assumed to be false.
348 : let raw_output = headers.get(&RAW_TEXT_OUTPUT) == Some(&HEADER_VALUE_TRUE);
349 : let default_array_mode = headers.get(&ARRAY_MODE) == Some(&HEADER_VALUE_TRUE);
350 :
351 : // Allow connection pooling only if explicitly requested
352 : // or if we have decided that http pool is no longer opt-in
353 : let allow_pool = !config.http_config.pool_options.opt_in
354 : || headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
355 :
356 : // isolation level, read only and deferrable
357 :
358 : let txn_isolation_level_raw = headers.get(&TXN_ISOLATION_LEVEL).cloned();
359 : let txn_isolation_level = match txn_isolation_level_raw {
360 : Some(ref x) => Some(match x.as_bytes() {
361 : b"Serializable" => IsolationLevel::Serializable,
362 : b"ReadUncommitted" => IsolationLevel::ReadUncommitted,
363 : b"ReadCommitted" => IsolationLevel::ReadCommitted,
364 : b"RepeatableRead" => IsolationLevel::RepeatableRead,
365 : _ => bail!("invalid isolation level"),
366 : }),
367 : None => None,
368 : };
369 :
370 : let txn_read_only = headers.get(&TXN_READ_ONLY) == Some(&HEADER_VALUE_TRUE);
371 : let txn_deferrable = headers.get(&TXN_DEFERRABLE) == Some(&HEADER_VALUE_TRUE);
372 :
373 : let request_content_length = match request.body().size_hint().upper() {
374 : Some(v) => v,
375 : None => MAX_REQUEST_SIZE + 1,
376 : };
377 0 : info!(request_content_length, "request size in bytes");
378 : HTTP_CONTENT_LENGTH.observe(request_content_length as f64);
379 :
380 : // we don't have a streaming request support yet so this is to prevent OOM
381 : // from a malicious user sending an extremely large request body
382 : if request_content_length > MAX_REQUEST_SIZE {
383 : return Err(anyhow::anyhow!(
384 : "request is too large (max is {MAX_REQUEST_SIZE} bytes)"
385 : ));
386 : }
387 :
388 0 : let fetch_and_process_request = async {
389 0 : let body = hyper::body::to_bytes(request.into_body())
390 0 : .await
391 0 : .map_err(anyhow::Error::from)?;
392 0 : info!(length = body.len(), "request payload read");
393 0 : let payload: Payload = serde_json::from_slice(&body)?;
394 0 : Ok::<Payload, anyhow::Error>(payload) // Adjust error type accordingly
395 0 : };
396 :
397 0 : let authenticate_and_connect = async {
398 0 : let keys = backend.authenticate(ctx, &conn_info).await?;
399 0 : let client = backend
400 0 : .connect_to_compute(ctx, conn_info, keys, !allow_pool)
401 0 : .await?;
402 : // not strictly necessary to mark success here,
403 : // but it's just insurance for if we forget it somewhere else
404 0 : ctx.latency_timer.success();
405 0 : Ok::<_, anyhow::Error>(client)
406 0 : };
407 :
408 : // Run both operations in parallel
409 0 : let (payload, mut client) = try_join!(fetch_and_process_request, authenticate_and_connect)?;
410 :
411 : let mut response = Response::builder()
412 : .status(StatusCode::OK)
413 : .header(header::CONTENT_TYPE, "application/json");
414 :
415 : //
416 : // Now execute the query and return the result
417 : //
418 : let mut size = 0;
419 : let result = match payload {
420 : Payload::Single(stmt) => {
421 : let (status, results) =
422 : query_to_json(&*client, stmt, &mut 0, raw_output, default_array_mode)
423 : .await
424 0 : .map_err(|e| {
425 0 : client.discard();
426 0 : e
427 0 : })?;
428 : client.check_idle(status);
429 : results
430 : }
431 : Payload::Batch(statements) => {
432 0 : info!("starting transaction");
433 : let (inner, mut discard) = client.inner();
434 : let mut builder = inner.build_transaction();
435 : if let Some(isolation_level) = txn_isolation_level {
436 : builder = builder.isolation_level(isolation_level);
437 : }
438 : if txn_read_only {
439 : builder = builder.read_only(true);
440 : }
441 : if txn_deferrable {
442 : builder = builder.deferrable(true);
443 : }
444 :
445 0 : let transaction = builder.start().await.map_err(|e| {
446 0 : // if we cannot start a transaction, we should return immediately
447 0 : // and not return to the pool. connection is clearly broken
448 0 : discard.discard();
449 0 : e
450 0 : })?;
451 :
452 : let results = match query_batch(
453 : &transaction,
454 : statements,
455 : &mut size,
456 : raw_output,
457 : default_array_mode,
458 : )
459 : .await
460 : {
461 : Ok(results) => {
462 0 : info!("commit");
463 0 : let status = transaction.commit().await.map_err(|e| {
464 0 : // if we cannot commit - for now don't return connection to pool
465 0 : // TODO: get a query status from the error
466 0 : discard.discard();
467 0 : e
468 0 : })?;
469 : discard.check_idle(status);
470 : results
471 : }
472 : Err(err) => {
473 0 : info!("rollback");
474 0 : let status = transaction.rollback().await.map_err(|e| {
475 0 : // if we cannot rollback - for now don't return connection to pool
476 0 : // TODO: get a query status from the error
477 0 : discard.discard();
478 0 : e
479 0 : })?;
480 : discard.check_idle(status);
481 : return Err(err);
482 : }
483 : };
484 :
485 : if txn_read_only {
486 : response = response.header(
487 : TXN_READ_ONLY.clone(),
488 : HeaderValue::try_from(txn_read_only.to_string())?,
489 : );
490 : }
491 : if txn_deferrable {
492 : response = response.header(
493 : TXN_DEFERRABLE.clone(),
494 : HeaderValue::try_from(txn_deferrable.to_string())?,
495 : );
496 : }
497 : if let Some(txn_isolation_level) = txn_isolation_level_raw {
498 : response = response.header(TXN_ISOLATION_LEVEL.clone(), txn_isolation_level);
499 : }
500 : json!({ "results": results })
501 : }
502 : };
503 :
504 : let metrics = client.metrics();
505 :
506 : // how could this possibly fail
507 : let body = serde_json::to_string(&result).expect("json serialization should not fail");
508 : let len = body.len();
509 : let response = response
510 : .body(Body::from(body))
511 : // only fails if invalid status code or invalid header/values are given.
512 : // these are not user configurable so it cannot fail dynamically
513 : .expect("building response payload should not fail");
514 :
515 : // count the egress bytes - we miss the TLS and header overhead but oh well...
516 : // moving this later in the stack is going to be a lot of effort and ehhhh
517 : metrics.record_egress(len as u64);
518 :
519 : Ok(response)
520 : }
521 :
522 0 : async fn query_batch(
523 0 : transaction: &Transaction<'_>,
524 0 : queries: BatchQueryData,
525 0 : total_size: &mut usize,
526 0 : raw_output: bool,
527 0 : array_mode: bool,
528 0 : ) -> anyhow::Result<Vec<Value>> {
529 0 : let mut results = Vec::with_capacity(queries.queries.len());
530 0 : let mut current_size = 0;
531 0 : for stmt in queries.queries {
532 0 : // TODO: maybe we should check that the transaction bit is set here
533 0 : let (_, values) =
534 0 : query_to_json(transaction, stmt, &mut current_size, raw_output, array_mode).await?;
535 0 : results.push(values);
536 : }
537 0 : *total_size += current_size;
538 0 : Ok(results)
539 0 : }
540 :
541 0 : async fn query_to_json<T: GenericClient>(
542 0 : client: &T,
543 0 : data: QueryData,
544 0 : current_size: &mut usize,
545 0 : raw_output: bool,
546 0 : default_array_mode: bool,
547 0 : ) -> anyhow::Result<(ReadyForQueryStatus, Value)> {
548 0 : info!("executing query");
549 0 : let query_params = data.params;
550 0 : let row_stream = client.query_raw_txt(&data.query, query_params).await?;
551 0 : info!("finished executing query");
552 :
553 : // Manually drain the stream into a vector to leave row_stream hanging
554 : // around to get a command tag. Also check that the response is not too
555 : // big.
556 0 : pin_mut!(row_stream);
557 0 : let mut rows: Vec<tokio_postgres::Row> = Vec::new();
558 0 : while let Some(row) = row_stream.next().await {
559 0 : let row = row?;
560 0 : *current_size += row.body_len();
561 0 : rows.push(row);
562 0 : // we don't have a streaming response support yet so this is to prevent OOM
563 0 : // from a malicious query (eg a cross join)
564 0 : if *current_size > MAX_RESPONSE_SIZE {
565 0 : return Err(anyhow::anyhow!(
566 0 : "response is too large (max is {MAX_RESPONSE_SIZE} bytes)"
567 0 : ));
568 0 : }
569 : }
570 :
571 0 : let ready = row_stream.ready_status();
572 0 :
573 0 : // grab the command tag and number of rows affected
574 0 : let command_tag = row_stream.command_tag().unwrap_or_default();
575 0 : let mut command_tag_split = command_tag.split(' ');
576 0 : let command_tag_name = command_tag_split.next().unwrap_or_default();
577 0 : let command_tag_count = if command_tag_name == "INSERT" {
578 : // INSERT returns OID first and then number of rows
579 0 : command_tag_split.nth(1)
580 : } else {
581 : // other commands return number of rows (if any)
582 0 : command_tag_split.next()
583 : }
584 0 : .and_then(|s| s.parse::<i64>().ok());
585 :
586 0 : info!(
587 0 : rows = rows.len(),
588 0 : ?ready,
589 0 : command_tag,
590 0 : "finished reading rows"
591 0 : );
592 :
593 0 : let mut fields = vec![];
594 0 : let mut columns = vec![];
595 :
596 0 : for c in row_stream.columns() {
597 0 : fields.push(json!({
598 0 : "name": Value::String(c.name().to_owned()),
599 0 : "dataTypeID": Value::Number(c.type_().oid().into()),
600 0 : "tableID": c.table_oid(),
601 0 : "columnID": c.column_id(),
602 0 : "dataTypeSize": c.type_size(),
603 0 : "dataTypeModifier": c.type_modifier(),
604 0 : "format": "text",
605 0 : }));
606 0 : columns.push(client.get_type(c.type_oid()).await?);
607 : }
608 :
609 0 : let array_mode = data.array_mode.unwrap_or(default_array_mode);
610 :
611 : // convert rows to JSON
612 0 : let rows = rows
613 0 : .iter()
614 0 : .map(|row| pg_text_row_to_json(row, &columns, raw_output, array_mode))
615 0 : .collect::<Result<Vec<_>, _>>()?;
616 :
617 : // resulting JSON format is based on the format of node-postgres result
618 0 : Ok((
619 0 : ready,
620 0 : json!({
621 0 : "command": command_tag_name,
622 0 : "rowCount": command_tag_count,
623 0 : "rows": rows,
624 0 : "fields": fields,
625 0 : "rowAsArray": array_mode,
626 0 : }),
627 0 : ))
628 0 : }
|