Line data Source code
1 : use std::time::Duration;
2 :
3 : use futures::FutureExt;
4 : use redis::aio::ConnectionLike;
5 : use redis::{Cmd, FromRedisValue, Pipeline, RedisError, RedisResult};
6 :
7 : use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
8 : use crate::redis::connection_with_credentials_provider::ConnectionProviderError;
9 :
10 : #[derive(thiserror::Error, Debug)]
11 : pub enum RedisKVClientError {
12 : #[error(transparent)]
13 : Redis(#[from] RedisError),
14 : #[error(transparent)]
15 : ConnectionProvider(#[from] ConnectionProviderError),
16 : }
17 :
18 : pub struct RedisKVClient {
19 : client: ConnectionWithCredentialsProvider,
20 : }
21 :
22 : #[allow(async_fn_in_trait)]
23 : pub trait Queryable {
24 : async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T>;
25 : }
26 :
27 : impl Queryable for Pipeline {
28 0 : async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
29 0 : self.query_async(conn).await
30 0 : }
31 : }
32 :
33 : impl Queryable for Cmd {
34 0 : async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
35 0 : self.query_async(conn).await
36 0 : }
37 : }
38 :
39 : impl RedisKVClient {
40 0 : pub fn new(client: ConnectionWithCredentialsProvider) -> Self {
41 0 : Self { client }
42 0 : }
43 :
44 0 : pub async fn try_connect(&mut self) -> Result<(), RedisKVClientError> {
45 0 : self.client
46 0 : .connect()
47 0 : .boxed()
48 0 : .await
49 0 : .inspect_err(|e| tracing::error!("failed to connect to redis: {e}"))
50 0 : .map_err(Into::into)
51 0 : }
52 :
53 0 : pub(crate) fn credentials_refreshed(&self) -> bool {
54 0 : self.client.credentials_refreshed()
55 0 : }
56 :
57 0 : pub(crate) async fn query<T: FromRedisValue>(
58 0 : &mut self,
59 0 : q: &impl Queryable,
60 0 : ) -> Result<T, RedisKVClientError> {
61 0 : let e = match q.query(&mut self.client).await {
62 0 : Ok(t) => return Ok(t),
63 0 : Err(e) => e,
64 : };
65 :
66 0 : tracing::debug!("failed to run query: {e}");
67 0 : match e.retry_method() {
68 : redis::RetryMethod::Reconnect => {
69 0 : tracing::info!("Redis client is disconnected. Reconnecting...");
70 0 : self.try_connect().await?;
71 : }
72 0 : redis::RetryMethod::RetryImmediately => {}
73 : redis::RetryMethod::WaitAndRetry => {
74 : // somewhat arbitrary.
75 0 : tokio::time::sleep(Duration::from_millis(100)).await;
76 : }
77 0 : _ => Err(e)?,
78 : }
79 :
80 0 : Ok(q.query(&mut self.client).await?)
81 0 : }
82 : }
|