LCOV - code coverage report
Current view: top level - proxy/src/redis - kv_ops.rs (source / functions) Coverage Total Hit
Test: ac1e0b9bf1b4ead74961174b01ba016322d3f9a6.info Lines: 0.0 % 35 0
Test Date: 2025-07-08 09:16:10 Functions: 0.0 % 11 0

            Line data    Source code
       1              : use std::time::Duration;
       2              : 
       3              : use futures::FutureExt;
       4              : use redis::aio::ConnectionLike;
       5              : use redis::{Cmd, FromRedisValue, Pipeline, RedisResult};
       6              : 
       7              : use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
       8              : 
       9              : pub struct RedisKVClient {
      10              :     client: ConnectionWithCredentialsProvider,
      11              : }
      12              : 
      13              : #[allow(async_fn_in_trait)]
      14              : pub trait Queryable {
      15              :     async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T>;
      16              : }
      17              : 
      18              : impl Queryable for Pipeline {
      19            0 :     async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
      20            0 :         self.query_async(conn).await
      21            0 :     }
      22              : }
      23              : 
      24              : impl Queryable for Cmd {
      25            0 :     async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
      26            0 :         self.query_async(conn).await
      27            0 :     }
      28              : }
      29              : 
      30              : impl RedisKVClient {
      31            0 :     pub fn new(client: ConnectionWithCredentialsProvider) -> Self {
      32            0 :         Self { client }
      33            0 :     }
      34              : 
      35            0 :     pub async fn try_connect(&mut self) -> anyhow::Result<()> {
      36            0 :         self.client
      37            0 :             .connect()
      38            0 :             .boxed()
      39            0 :             .await
      40            0 :             .inspect_err(|e| tracing::error!("failed to connect to redis: {e}"))
      41            0 :     }
      42              : 
      43            0 :     pub(crate) fn credentials_refreshed(&self) -> bool {
      44            0 :         self.client.credentials_refreshed()
      45            0 :     }
      46              : 
      47            0 :     pub(crate) async fn query<T: FromRedisValue>(
      48            0 :         &mut self,
      49            0 :         q: &impl Queryable,
      50            0 :     ) -> anyhow::Result<T> {
      51            0 :         let e = match q.query(&mut self.client).await {
      52            0 :             Ok(t) => return Ok(t),
      53            0 :             Err(e) => e,
      54              :         };
      55              : 
      56            0 :         tracing::debug!("failed to run query: {e}");
      57            0 :         match e.retry_method() {
      58              :             redis::RetryMethod::Reconnect => {
      59            0 :                 tracing::info!("Redis client is disconnected. Reconnecting...");
      60            0 :                 self.try_connect().await?;
      61              :             }
      62            0 :             redis::RetryMethod::RetryImmediately => {}
      63              :             redis::RetryMethod::WaitAndRetry => {
      64              :                 // somewhat arbitrary.
      65            0 :                 tokio::time::sleep(Duration::from_millis(100)).await;
      66              :             }
      67            0 :             _ => Err(e)?,
      68              :         }
      69              : 
      70            0 :         Ok(q.query(&mut self.client).await?)
      71            0 :     }
      72              : }
        

Generated by: LCOV version 2.1-beta