LCOV - code coverage report
Current view: top level - proxy/src/redis - kv_ops.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 0.0 % 36 0
Test Date: 2025-07-16 12:29:03 Functions: 0.0 % 11 0

            Line data    Source code
       1              : use std::time::Duration;
       2              : 
       3              : use futures::FutureExt;
       4              : use redis::aio::ConnectionLike;
       5              : use redis::{Cmd, FromRedisValue, Pipeline, RedisError, RedisResult};
       6              : 
       7              : use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
       8              : use crate::redis::connection_with_credentials_provider::ConnectionProviderError;
       9              : 
      10              : #[derive(thiserror::Error, Debug)]
      11              : pub enum RedisKVClientError {
      12              :     #[error(transparent)]
      13              :     Redis(#[from] RedisError),
      14              :     #[error(transparent)]
      15              :     ConnectionProvider(#[from] ConnectionProviderError),
      16              : }
      17              : 
      18              : pub struct RedisKVClient {
      19              :     client: ConnectionWithCredentialsProvider,
      20              : }
      21              : 
      22              : #[allow(async_fn_in_trait)]
      23              : pub trait Queryable {
      24              :     async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T>;
      25              : }
      26              : 
      27              : impl Queryable for Pipeline {
      28            0 :     async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
      29            0 :         self.query_async(conn).await
      30            0 :     }
      31              : }
      32              : 
      33              : impl Queryable for Cmd {
      34            0 :     async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
      35            0 :         self.query_async(conn).await
      36            0 :     }
      37              : }
      38              : 
      39              : impl RedisKVClient {
      40            0 :     pub fn new(client: ConnectionWithCredentialsProvider) -> Self {
      41            0 :         Self { client }
      42            0 :     }
      43              : 
      44            0 :     pub async fn try_connect(&mut self) -> Result<(), RedisKVClientError> {
      45            0 :         self.client
      46            0 :             .connect()
      47            0 :             .boxed()
      48            0 :             .await
      49            0 :             .inspect_err(|e| tracing::error!("failed to connect to redis: {e}"))
      50            0 :             .map_err(Into::into)
      51            0 :     }
      52              : 
      53            0 :     pub(crate) fn credentials_refreshed(&self) -> bool {
      54            0 :         self.client.credentials_refreshed()
      55            0 :     }
      56              : 
      57            0 :     pub(crate) async fn query<T: FromRedisValue>(
      58            0 :         &mut self,
      59            0 :         q: &impl Queryable,
      60            0 :     ) -> Result<T, RedisKVClientError> {
      61            0 :         let e = match q.query(&mut self.client).await {
      62            0 :             Ok(t) => return Ok(t),
      63            0 :             Err(e) => e,
      64              :         };
      65              : 
      66            0 :         tracing::debug!("failed to run query: {e}");
      67            0 :         match e.retry_method() {
      68              :             redis::RetryMethod::Reconnect => {
      69            0 :                 tracing::info!("Redis client is disconnected. Reconnecting...");
      70            0 :                 self.try_connect().await?;
      71              :             }
      72            0 :             redis::RetryMethod::RetryImmediately => {}
      73              :             redis::RetryMethod::WaitAndRetry => {
      74              :                 // somewhat arbitrary.
      75            0 :                 tokio::time::sleep(Duration::from_millis(100)).await;
      76              :             }
      77            0 :             _ => Err(e)?,
      78              :         }
      79              : 
      80            0 :         Ok(q.query(&mut self.client).await?)
      81            0 :     }
      82              : }
        

Generated by: LCOV version 2.1-beta