LCOV - code coverage report
Current view: top level - proxy/src/redis - kv_ops.rs (source / functions) Coverage Total Hit
Test: aca806cab4756d7eb6a304846130f4a73a5d5393.info Lines: 0.0 % 37 0
Test Date: 2025-04-24 20:31:15 Functions: 0.0 % 9 0

            Line data    Source code
       1              : use redis::aio::ConnectionLike;
       2              : use redis::{Cmd, FromRedisValue, Pipeline, RedisResult};
       3              : 
       4              : use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
       5              : use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
       6              : 
       7              : pub struct RedisKVClient {
       8              :     client: ConnectionWithCredentialsProvider,
       9              :     limiter: GlobalRateLimiter,
      10              : }
      11              : 
      12              : #[allow(async_fn_in_trait)]
      13              : pub trait Queryable {
      14              :     async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T>;
      15              : }
      16              : 
      17              : impl Queryable for Pipeline {
      18            0 :     async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
      19            0 :         self.query_async(conn).await
      20            0 :     }
      21              : }
      22              : 
      23              : impl Queryable for Cmd {
      24            0 :     async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
      25            0 :         self.query_async(conn).await
      26            0 :     }
      27              : }
      28              : 
      29              : impl RedisKVClient {
      30            0 :     pub fn new(client: ConnectionWithCredentialsProvider, info: &'static [RateBucketInfo]) -> Self {
      31            0 :         Self {
      32            0 :             client,
      33            0 :             limiter: GlobalRateLimiter::new(info.into()),
      34            0 :         }
      35            0 :     }
      36              : 
      37            0 :     pub async fn try_connect(&mut self) -> anyhow::Result<()> {
      38            0 :         match self.client.connect().await {
      39            0 :             Ok(()) => {}
      40            0 :             Err(e) => {
      41            0 :                 tracing::error!("failed to connect to redis: {e}");
      42            0 :                 return Err(e);
      43              :             }
      44              :         }
      45            0 :         Ok(())
      46            0 :     }
      47              : 
      48            0 :     pub(crate) async fn query<T: FromRedisValue>(
      49            0 :         &mut self,
      50            0 :         q: impl Queryable,
      51            0 :     ) -> anyhow::Result<T> {
      52            0 :         if !self.limiter.check() {
      53            0 :             tracing::info!("Rate limit exceeded. Skipping query");
      54            0 :             return Err(anyhow::anyhow!("Rate limit exceeded"));
      55            0 :         }
      56            0 : 
      57            0 :         match q.query(&mut self.client).await {
      58            0 :             Ok(t) => return Ok(t),
      59            0 :             Err(e) => {
      60            0 :                 tracing::error!("failed to run query: {e}");
      61              :             }
      62              :         }
      63              : 
      64            0 :         tracing::info!("Redis client is disconnected. Reconnecting...");
      65            0 :         self.try_connect().await?;
      66            0 :         Ok(q.query(&mut self.client).await?)
      67            0 :     }
      68              : }
        

Generated by: LCOV version 2.1-beta