Line data Source code
1 : use std::time::Duration;
2 :
3 : use futures::FutureExt;
4 : use redis::aio::ConnectionLike;
5 : use redis::{Cmd, FromRedisValue, Pipeline, RedisResult};
6 :
7 : use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
8 :
9 : pub struct RedisKVClient {
10 : client: ConnectionWithCredentialsProvider,
11 : }
12 :
13 : #[allow(async_fn_in_trait)]
14 : pub trait Queryable {
15 : async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T>;
16 : }
17 :
18 : impl Queryable for Pipeline {
19 0 : async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
20 0 : self.query_async(conn).await
21 0 : }
22 : }
23 :
24 : impl Queryable for Cmd {
25 0 : async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
26 0 : self.query_async(conn).await
27 0 : }
28 : }
29 :
30 : impl RedisKVClient {
31 0 : pub fn new(client: ConnectionWithCredentialsProvider) -> Self {
32 0 : Self { client }
33 0 : }
34 :
35 0 : pub async fn try_connect(&mut self) -> anyhow::Result<()> {
36 0 : self.client
37 0 : .connect()
38 0 : .boxed()
39 0 : .await
40 0 : .inspect_err(|e| tracing::error!("failed to connect to redis: {e}"))
41 0 : }
42 :
43 0 : pub(crate) fn credentials_refreshed(&self) -> bool {
44 0 : self.client.credentials_refreshed()
45 0 : }
46 :
47 0 : pub(crate) async fn query<T: FromRedisValue>(
48 0 : &mut self,
49 0 : q: &impl Queryable,
50 0 : ) -> anyhow::Result<T> {
51 0 : let e = match q.query(&mut self.client).await {
52 0 : Ok(t) => return Ok(t),
53 0 : Err(e) => e,
54 : };
55 :
56 0 : tracing::debug!("failed to run query: {e}");
57 0 : match e.retry_method() {
58 : redis::RetryMethod::Reconnect => {
59 0 : tracing::info!("Redis client is disconnected. Reconnecting...");
60 0 : self.try_connect().await?;
61 : }
62 0 : redis::RetryMethod::RetryImmediately => {}
63 : redis::RetryMethod::WaitAndRetry => {
64 : // somewhat arbitrary.
65 0 : tokio::time::sleep(Duration::from_millis(100)).await;
66 : }
67 0 : _ => Err(e)?,
68 : }
69 :
70 0 : Ok(q.query(&mut self.client).await?)
71 0 : }
72 : }
|