Line data Source code
1 : use std::sync::Arc;
2 :
3 : use anyhow::bail;
4 : use futures::pin_mut;
5 : use futures::StreamExt;
6 : use hyper::body::HttpBody;
7 : use hyper::header;
8 : use hyper::http::HeaderName;
9 : use hyper::http::HeaderValue;
10 : use hyper::Response;
11 : use hyper::StatusCode;
12 : use hyper::{Body, HeaderMap, Request};
13 : use serde_json::json;
14 : use serde_json::Value;
15 : use tokio::try_join;
16 : use tokio_postgres::error::DbError;
17 : use tokio_postgres::error::ErrorPosition;
18 : use tokio_postgres::GenericClient;
19 : use tokio_postgres::IsolationLevel;
20 : use tokio_postgres::ReadyForQueryStatus;
21 : use tokio_postgres::Transaction;
22 : use tracing::error;
23 : use tracing::info;
24 : use url::Url;
25 : use utils::http::error::ApiError;
26 : use utils::http::json::json_response;
27 :
28 : use crate::auth::backend::ComputeUserInfo;
29 : use crate::auth::endpoint_sni;
30 : use crate::auth::ComputeUserInfoParseError;
31 : use crate::config::ProxyConfig;
32 : use crate::config::TlsConfig;
33 : use crate::context::RequestMonitoring;
34 : use crate::metrics::HTTP_CONTENT_LENGTH;
35 : use crate::metrics::NUM_CONNECTION_REQUESTS_GAUGE;
36 : use crate::proxy::NeonOptions;
37 : use crate::DbName;
38 : use crate::RoleName;
39 :
40 : use super::backend::PoolingBackend;
41 : use super::conn_pool::ConnInfo;
42 : use super::json::json_to_pg_text;
43 : use super::json::pg_text_row_to_json;
44 :
45 0 : #[derive(serde::Deserialize)]
46 : #[serde(rename_all = "camelCase")]
47 : struct QueryData {
48 : query: String,
49 : #[serde(deserialize_with = "bytes_to_pg_text")]
50 : params: Vec<Option<String>>,
51 : #[serde(default)]
52 : array_mode: Option<bool>,
53 : }
54 :
55 0 : #[derive(serde::Deserialize)]
56 : struct BatchQueryData {
57 : queries: Vec<QueryData>,
58 : }
59 :
60 0 : #[derive(serde::Deserialize)]
61 : #[serde(untagged)]
62 : enum Payload {
63 : Single(QueryData),
64 : Batch(BatchQueryData),
65 : }
66 :
67 : const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
68 : const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
69 :
70 : static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
71 : static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
72 : static ALLOW_POOL: HeaderName = HeaderName::from_static("neon-pool-opt-in");
73 : static TXN_ISOLATION_LEVEL: HeaderName = HeaderName::from_static("neon-batch-isolation-level");
74 : static TXN_READ_ONLY: HeaderName = HeaderName::from_static("neon-batch-read-only");
75 : static TXN_DEFERRABLE: HeaderName = HeaderName::from_static("neon-batch-deferrable");
76 :
77 : static HEADER_VALUE_TRUE: HeaderValue = HeaderValue::from_static("true");
78 :
79 0 : fn bytes_to_pg_text<'de, D>(deserializer: D) -> Result<Vec<Option<String>>, D::Error>
80 0 : where
81 0 : D: serde::de::Deserializer<'de>,
82 0 : {
83 : // TODO: consider avoiding the allocation here.
84 0 : let json: Vec<Value> = serde::de::Deserialize::deserialize(deserializer)?;
85 0 : Ok(json_to_pg_text(json))
86 0 : }
87 :
88 0 : #[derive(Debug, thiserror::Error)]
89 : pub enum ConnInfoError {
90 : #[error("invalid header: {0}")]
91 : InvalidHeader(&'static str),
92 : #[error("invalid connection string: {0}")]
93 : UrlParseError(#[from] url::ParseError),
94 : #[error("incorrect scheme")]
95 : IncorrectScheme,
96 : #[error("missing database name")]
97 : MissingDbName,
98 : #[error("invalid database name")]
99 : InvalidDbName,
100 : #[error("missing username")]
101 : MissingUsername,
102 : #[error("invalid username: {0}")]
103 : InvalidUsername(#[from] std::string::FromUtf8Error),
104 : #[error("missing password")]
105 : MissingPassword,
106 : #[error("missing hostname")]
107 : MissingHostname,
108 : #[error("invalid hostname: {0}")]
109 : InvalidEndpoint(#[from] ComputeUserInfoParseError),
110 : #[error("malformed endpoint")]
111 : MalformedEndpoint,
112 : }
113 :
114 0 : fn get_conn_info(
115 0 : ctx: &mut RequestMonitoring,
116 0 : headers: &HeaderMap,
117 0 : tls: &TlsConfig,
118 0 : ) -> Result<ConnInfo, ConnInfoError> {
119 0 : // HTTP only uses cleartext (for now and likely always)
120 0 : ctx.set_auth_method(crate::context::AuthMethod::Cleartext);
121 :
122 0 : let connection_string = headers
123 0 : .get("Neon-Connection-String")
124 0 : .ok_or(ConnInfoError::InvalidHeader("Neon-Connection-String"))?
125 0 : .to_str()
126 0 : .map_err(|_| ConnInfoError::InvalidHeader("Neon-Connection-String"))?;
127 :
128 0 : let connection_url = Url::parse(connection_string)?;
129 :
130 0 : let protocol = connection_url.scheme();
131 0 : if protocol != "postgres" && protocol != "postgresql" {
132 0 : return Err(ConnInfoError::IncorrectScheme);
133 0 : }
134 :
135 0 : let mut url_path = connection_url
136 0 : .path_segments()
137 0 : .ok_or(ConnInfoError::MissingDbName)?;
138 :
139 0 : let dbname: DbName = url_path.next().ok_or(ConnInfoError::InvalidDbName)?.into();
140 0 : ctx.set_dbname(dbname.clone());
141 :
142 0 : let username = RoleName::from(urlencoding::decode(connection_url.username())?);
143 0 : if username.is_empty() {
144 0 : return Err(ConnInfoError::MissingUsername);
145 0 : }
146 0 : ctx.set_user(username.clone());
147 :
148 0 : let password = connection_url
149 0 : .password()
150 0 : .ok_or(ConnInfoError::MissingPassword)?;
151 0 : let password = urlencoding::decode_binary(password.as_bytes());
152 :
153 0 : let hostname = connection_url
154 0 : .host_str()
155 0 : .ok_or(ConnInfoError::MissingHostname)?;
156 :
157 0 : let endpoint =
158 0 : endpoint_sni(hostname, &tls.common_names)?.ok_or(ConnInfoError::MalformedEndpoint)?;
159 0 : ctx.set_endpoint_id(endpoint.clone());
160 0 :
161 0 : let pairs = connection_url.query_pairs();
162 0 :
163 0 : let mut options = Option::None;
164 :
165 0 : for (key, value) in pairs {
166 0 : match &*key {
167 0 : "options" => {
168 0 : options = Some(NeonOptions::parse_options_raw(&value));
169 0 : }
170 0 : "application_name" => ctx.set_application(Some(value.into())),
171 0 : _ => {}
172 : }
173 : }
174 :
175 0 : let user_info = ComputeUserInfo {
176 0 : endpoint,
177 0 : user: username,
178 0 : options: options.unwrap_or_default(),
179 0 : };
180 0 :
181 0 : Ok(ConnInfo {
182 0 : user_info,
183 0 : dbname,
184 0 : password: match password {
185 0 : std::borrow::Cow::Borrowed(b) => b.into(),
186 0 : std::borrow::Cow::Owned(b) => b.into(),
187 : },
188 : })
189 0 : }
190 :
191 : // TODO: return different http error codes
192 0 : pub async fn handle(
193 0 : config: &'static ProxyConfig,
194 0 : mut ctx: RequestMonitoring,
195 0 : request: Request<Body>,
196 0 : backend: Arc<PoolingBackend>,
197 0 : ) -> Result<Response<Body>, ApiError> {
198 0 : let result = tokio::time::timeout(
199 0 : config.http_config.request_timeout,
200 0 : handle_inner(config, &mut ctx, request, backend),
201 0 : )
202 0 : .await;
203 0 : let mut response = match result {
204 0 : Ok(r) => match r {
205 0 : Ok(r) => {
206 0 : ctx.set_success();
207 0 : r
208 : }
209 0 : Err(e) => {
210 0 : // TODO: ctx.set_error_kind(e.get_error_type());
211 0 :
212 0 : let mut message = format!("{:?}", e);
213 0 : let db_error = e
214 0 : .downcast_ref::<tokio_postgres::Error>()
215 0 : .and_then(|e| e.as_db_error());
216 0 : fn get<'a, T: serde::Serialize>(
217 0 : db: Option<&'a DbError>,
218 0 : x: impl FnOnce(&'a DbError) -> T,
219 0 : ) -> Value {
220 0 : db.map(x)
221 0 : .and_then(|t| serde_json::to_value(t).ok())
222 0 : .unwrap_or_default()
223 0 : }
224 :
225 0 : if let Some(db_error) = db_error {
226 0 : db_error.message().clone_into(&mut message);
227 0 : }
228 :
229 0 : let position = db_error.and_then(|db| db.position());
230 0 : let (position, internal_position, internal_query) = match position {
231 0 : Some(ErrorPosition::Original(position)) => (
232 0 : Value::String(position.to_string()),
233 0 : Value::Null,
234 0 : Value::Null,
235 0 : ),
236 0 : Some(ErrorPosition::Internal { position, query }) => (
237 0 : Value::Null,
238 0 : Value::String(position.to_string()),
239 0 : Value::String(query.clone()),
240 0 : ),
241 0 : None => (Value::Null, Value::Null, Value::Null),
242 : };
243 :
244 0 : let code = get(db_error, |db| db.code().code());
245 0 : let severity = get(db_error, |db| db.severity());
246 0 : let detail = get(db_error, |db| db.detail());
247 0 : let hint = get(db_error, |db| db.hint());
248 0 : let where_ = get(db_error, |db| db.where_());
249 0 : let table = get(db_error, |db| db.table());
250 0 : let column = get(db_error, |db| db.column());
251 0 : let schema = get(db_error, |db| db.schema());
252 0 : let datatype = get(db_error, |db| db.datatype());
253 0 : let constraint = get(db_error, |db| db.constraint());
254 0 : let file = get(db_error, |db| db.file());
255 0 : let line = get(db_error, |db| db.line().map(|l| l.to_string()));
256 0 : let routine = get(db_error, |db| db.routine());
257 :
258 0 : error!(
259 0 : ?code,
260 0 : "sql-over-http per-client task finished with an error: {e:#}"
261 0 : );
262 : // TODO: this shouldn't always be bad request.
263 0 : json_response(
264 0 : StatusCode::BAD_REQUEST,
265 0 : json!({
266 0 : "message": message,
267 0 : "code": code,
268 0 : "detail": detail,
269 0 : "hint": hint,
270 0 : "position": position,
271 0 : "internalPosition": internal_position,
272 0 : "internalQuery": internal_query,
273 0 : "severity": severity,
274 0 : "where": where_,
275 0 : "table": table,
276 0 : "column": column,
277 0 : "schema": schema,
278 0 : "dataType": datatype,
279 0 : "constraint": constraint,
280 0 : "file": file,
281 0 : "line": line,
282 0 : "routine": routine,
283 0 : }),
284 0 : )?
285 : }
286 : },
287 : Err(_) => {
288 : // TODO: when http error classification is done, distinguish between
289 : // timeout on sql vs timeout in proxy/cplane
290 : // ctx.set_error_kind(crate::error::ErrorKind::RateLimit);
291 :
292 0 : let message = format!(
293 0 : "HTTP-Connection timed out, execution time exceeded {} seconds",
294 0 : config.http_config.request_timeout.as_secs()
295 0 : );
296 0 : error!(message);
297 0 : json_response(
298 0 : StatusCode::GATEWAY_TIMEOUT,
299 0 : json!({ "message": message, "code": StatusCode::GATEWAY_TIMEOUT.as_u16() }),
300 0 : )?
301 : }
302 : };
303 :
304 0 : response.headers_mut().insert(
305 0 : "Access-Control-Allow-Origin",
306 0 : hyper::http::HeaderValue::from_static("*"),
307 0 : );
308 0 : Ok(response)
309 0 : }
310 :
311 0 : async fn handle_inner(
312 0 : config: &'static ProxyConfig,
313 0 : ctx: &mut RequestMonitoring,
314 0 : request: Request<Body>,
315 0 : backend: Arc<PoolingBackend>,
316 0 : ) -> anyhow::Result<Response<Body>> {
317 0 : let _request_gauge = NUM_CONNECTION_REQUESTS_GAUGE
318 0 : .with_label_values(&[ctx.protocol])
319 0 : .guard();
320 0 : info!("handling interactive connection from client");
321 :
322 : //
323 : // Determine the destination and connection params
324 : //
325 0 : let headers = request.headers();
326 : // TLS config should be there.
327 0 : let conn_info = get_conn_info(ctx, headers, config.tls_config.as_ref().unwrap())?;
328 0 : info!(user = conn_info.user_info.user.as_str(), "credentials");
329 :
330 : // Determine the output options. Default behaviour is 'false'. Anything that is not
331 : // strictly 'true' assumed to be false.
332 0 : let raw_output = headers.get(&RAW_TEXT_OUTPUT) == Some(&HEADER_VALUE_TRUE);
333 0 : let default_array_mode = headers.get(&ARRAY_MODE) == Some(&HEADER_VALUE_TRUE);
334 :
335 : // Allow connection pooling only if explicitly requested
336 : // or if we have decided that http pool is no longer opt-in
337 0 : let allow_pool = !config.http_config.pool_options.opt_in
338 0 : || headers.get(&ALLOW_POOL) == Some(&HEADER_VALUE_TRUE);
339 :
340 : // isolation level, read only and deferrable
341 :
342 0 : let txn_isolation_level_raw = headers.get(&TXN_ISOLATION_LEVEL).cloned();
343 0 : let txn_isolation_level = match txn_isolation_level_raw {
344 0 : Some(ref x) => Some(match x.as_bytes() {
345 0 : b"Serializable" => IsolationLevel::Serializable,
346 0 : b"ReadUncommitted" => IsolationLevel::ReadUncommitted,
347 0 : b"ReadCommitted" => IsolationLevel::ReadCommitted,
348 0 : b"RepeatableRead" => IsolationLevel::RepeatableRead,
349 0 : _ => bail!("invalid isolation level"),
350 : }),
351 0 : None => None,
352 : };
353 :
354 0 : let txn_read_only = headers.get(&TXN_READ_ONLY) == Some(&HEADER_VALUE_TRUE);
355 0 : let txn_deferrable = headers.get(&TXN_DEFERRABLE) == Some(&HEADER_VALUE_TRUE);
356 :
357 0 : let request_content_length = match request.body().size_hint().upper() {
358 0 : Some(v) => v,
359 0 : None => MAX_REQUEST_SIZE + 1,
360 : };
361 0 : info!(request_content_length, "request size in bytes");
362 0 : HTTP_CONTENT_LENGTH.observe(request_content_length as f64);
363 0 :
364 0 : // we don't have a streaming request support yet so this is to prevent OOM
365 0 : // from a malicious user sending an extremely large request body
366 0 : if request_content_length > MAX_REQUEST_SIZE {
367 0 : return Err(anyhow::anyhow!(
368 0 : "request is too large (max is {MAX_REQUEST_SIZE} bytes)"
369 0 : ));
370 0 : }
371 0 :
372 0 : let fetch_and_process_request = async {
373 0 : let body = hyper::body::to_bytes(request.into_body())
374 0 : .await
375 0 : .map_err(anyhow::Error::from)?;
376 0 : info!(length = body.len(), "request payload read");
377 0 : let payload: Payload = serde_json::from_slice(&body)?;
378 0 : Ok::<Payload, anyhow::Error>(payload) // Adjust error type accordingly
379 0 : };
380 :
381 0 : let authenticate_and_connect = async {
382 0 : let keys = backend.authenticate(ctx, &conn_info).await?;
383 0 : let client = backend
384 0 : .connect_to_compute(ctx, conn_info, keys, !allow_pool)
385 0 : .await?;
386 : // not strictly necessary to mark success here,
387 : // but it's just insurance for if we forget it somewhere else
388 0 : ctx.latency_timer.success();
389 0 : Ok::<_, anyhow::Error>(client)
390 0 : };
391 :
392 : // Run both operations in parallel
393 0 : let (payload, mut client) = try_join!(fetch_and_process_request, authenticate_and_connect)?;
394 :
395 0 : let mut response = Response::builder()
396 0 : .status(StatusCode::OK)
397 0 : .header(header::CONTENT_TYPE, "application/json");
398 0 :
399 0 : //
400 0 : // Now execute the query and return the result
401 0 : //
402 0 : let mut size = 0;
403 0 : let result = match payload {
404 0 : Payload::Single(stmt) => {
405 0 : let (status, results) =
406 0 : query_to_json(&*client, stmt, &mut 0, raw_output, default_array_mode)
407 0 : .await
408 0 : .map_err(|e| {
409 0 : client.discard();
410 0 : e
411 0 : })?;
412 0 : client.check_idle(status);
413 0 : results
414 : }
415 0 : Payload::Batch(statements) => {
416 0 : info!("starting transaction");
417 0 : let (inner, mut discard) = client.inner();
418 0 : let mut builder = inner.build_transaction();
419 0 : if let Some(isolation_level) = txn_isolation_level {
420 0 : builder = builder.isolation_level(isolation_level);
421 0 : }
422 0 : if txn_read_only {
423 0 : builder = builder.read_only(true);
424 0 : }
425 0 : if txn_deferrable {
426 0 : builder = builder.deferrable(true);
427 0 : }
428 :
429 0 : let transaction = builder.start().await.map_err(|e| {
430 0 : // if we cannot start a transaction, we should return immediately
431 0 : // and not return to the pool. connection is clearly broken
432 0 : discard.discard();
433 0 : e
434 0 : })?;
435 :
436 0 : let results = match query_batch(
437 0 : &transaction,
438 0 : statements,
439 0 : &mut size,
440 0 : raw_output,
441 0 : default_array_mode,
442 0 : )
443 0 : .await
444 : {
445 0 : Ok(results) => {
446 0 : info!("commit");
447 0 : let status = transaction.commit().await.map_err(|e| {
448 0 : // if we cannot commit - for now don't return connection to pool
449 0 : // TODO: get a query status from the error
450 0 : discard.discard();
451 0 : e
452 0 : })?;
453 0 : discard.check_idle(status);
454 0 : results
455 : }
456 0 : Err(err) => {
457 0 : info!("rollback");
458 0 : let status = transaction.rollback().await.map_err(|e| {
459 0 : // if we cannot rollback - for now don't return connection to pool
460 0 : // TODO: get a query status from the error
461 0 : discard.discard();
462 0 : e
463 0 : })?;
464 0 : discard.check_idle(status);
465 0 : return Err(err);
466 : }
467 : };
468 :
469 0 : if txn_read_only {
470 0 : response = response.header(
471 0 : TXN_READ_ONLY.clone(),
472 0 : HeaderValue::try_from(txn_read_only.to_string())?,
473 : );
474 0 : }
475 0 : if txn_deferrable {
476 0 : response = response.header(
477 0 : TXN_DEFERRABLE.clone(),
478 0 : HeaderValue::try_from(txn_deferrable.to_string())?,
479 : );
480 0 : }
481 0 : if let Some(txn_isolation_level) = txn_isolation_level_raw {
482 0 : response = response.header(TXN_ISOLATION_LEVEL.clone(), txn_isolation_level);
483 0 : }
484 0 : json!({ "results": results })
485 : }
486 : };
487 :
488 0 : let metrics = client.metrics();
489 0 :
490 0 : // how could this possibly fail
491 0 : let body = serde_json::to_string(&result).expect("json serialization should not fail");
492 0 : let len = body.len();
493 0 : let response = response
494 0 : .body(Body::from(body))
495 0 : // only fails if invalid status code or invalid header/values are given.
496 0 : // these are not user configurable so it cannot fail dynamically
497 0 : .expect("building response payload should not fail");
498 0 :
499 0 : // count the egress bytes - we miss the TLS and header overhead but oh well...
500 0 : // moving this later in the stack is going to be a lot of effort and ehhhh
501 0 : metrics.record_egress(len as u64);
502 0 :
503 0 : Ok(response)
504 0 : }
505 :
506 0 : async fn query_batch(
507 0 : transaction: &Transaction<'_>,
508 0 : queries: BatchQueryData,
509 0 : total_size: &mut usize,
510 0 : raw_output: bool,
511 0 : array_mode: bool,
512 0 : ) -> anyhow::Result<Vec<Value>> {
513 0 : let mut results = Vec::with_capacity(queries.queries.len());
514 0 : let mut current_size = 0;
515 0 : for stmt in queries.queries {
516 0 : // TODO: maybe we should check that the transaction bit is set here
517 0 : let (_, values) =
518 0 : query_to_json(transaction, stmt, &mut current_size, raw_output, array_mode).await?;
519 0 : results.push(values);
520 : }
521 0 : *total_size += current_size;
522 0 : Ok(results)
523 0 : }
524 :
525 0 : async fn query_to_json<T: GenericClient>(
526 0 : client: &T,
527 0 : data: QueryData,
528 0 : current_size: &mut usize,
529 0 : raw_output: bool,
530 0 : default_array_mode: bool,
531 0 : ) -> anyhow::Result<(ReadyForQueryStatus, Value)> {
532 0 : info!("executing query");
533 0 : let query_params = data.params;
534 0 : let row_stream = client.query_raw_txt(&data.query, query_params).await?;
535 0 : info!("finished executing query");
536 :
537 : // Manually drain the stream into a vector to leave row_stream hanging
538 : // around to get a command tag. Also check that the response is not too
539 : // big.
540 0 : pin_mut!(row_stream);
541 0 : let mut rows: Vec<tokio_postgres::Row> = Vec::new();
542 0 : while let Some(row) = row_stream.next().await {
543 0 : let row = row?;
544 0 : *current_size += row.body_len();
545 0 : rows.push(row);
546 0 : // we don't have a streaming response support yet so this is to prevent OOM
547 0 : // from a malicious query (eg a cross join)
548 0 : if *current_size > MAX_RESPONSE_SIZE {
549 0 : return Err(anyhow::anyhow!(
550 0 : "response is too large (max is {MAX_RESPONSE_SIZE} bytes)"
551 0 : ));
552 0 : }
553 : }
554 :
555 0 : let ready = row_stream.ready_status();
556 0 :
557 0 : // grab the command tag and number of rows affected
558 0 : let command_tag = row_stream.command_tag().unwrap_or_default();
559 0 : let mut command_tag_split = command_tag.split(' ');
560 0 : let command_tag_name = command_tag_split.next().unwrap_or_default();
561 0 : let command_tag_count = if command_tag_name == "INSERT" {
562 : // INSERT returns OID first and then number of rows
563 0 : command_tag_split.nth(1)
564 : } else {
565 : // other commands return number of rows (if any)
566 0 : command_tag_split.next()
567 : }
568 0 : .and_then(|s| s.parse::<i64>().ok());
569 :
570 0 : info!(
571 0 : rows = rows.len(),
572 0 : ?ready,
573 0 : command_tag,
574 0 : "finished reading rows"
575 0 : );
576 :
577 0 : let mut fields = vec![];
578 0 : let mut columns = vec![];
579 :
580 0 : for c in row_stream.columns() {
581 0 : fields.push(json!({
582 0 : "name": Value::String(c.name().to_owned()),
583 0 : "dataTypeID": Value::Number(c.type_().oid().into()),
584 0 : "tableID": c.table_oid(),
585 0 : "columnID": c.column_id(),
586 0 : "dataTypeSize": c.type_size(),
587 0 : "dataTypeModifier": c.type_modifier(),
588 0 : "format": "text",
589 0 : }));
590 0 : columns.push(client.get_type(c.type_oid()).await?);
591 : }
592 :
593 0 : let array_mode = data.array_mode.unwrap_or(default_array_mode);
594 :
595 : // convert rows to JSON
596 0 : let rows = rows
597 0 : .iter()
598 0 : .map(|row| pg_text_row_to_json(row, &columns, raw_output, array_mode))
599 0 : .collect::<Result<Vec<_>, _>>()?;
600 :
601 : // resulting JSON format is based on the format of node-postgres result
602 0 : Ok((
603 0 : ready,
604 0 : json!({
605 0 : "command": command_tag_name,
606 0 : "rowCount": command_tag_count,
607 0 : "rows": rows,
608 0 : "fields": fields,
609 0 : "rowAsArray": array_mode,
610 0 : }),
611 0 : ))
612 0 : }
|