Line data Source code
1 : use redis::aio::ConnectionLike;
2 : use redis::{Cmd, FromRedisValue, Pipeline, RedisResult};
3 :
4 : use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
5 : use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
6 :
7 : pub struct RedisKVClient {
8 : client: ConnectionWithCredentialsProvider,
9 : limiter: GlobalRateLimiter,
10 : }
11 :
12 : #[allow(async_fn_in_trait)]
13 : pub trait Queryable {
14 : async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T>;
15 : }
16 :
17 : impl Queryable for Pipeline {
18 0 : async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
19 0 : self.query_async(conn).await
20 0 : }
21 : }
22 :
23 : impl Queryable for Cmd {
24 0 : async fn query<T: FromRedisValue>(&self, conn: &mut impl ConnectionLike) -> RedisResult<T> {
25 0 : self.query_async(conn).await
26 0 : }
27 : }
28 :
29 : impl RedisKVClient {
30 0 : pub fn new(client: ConnectionWithCredentialsProvider, info: &'static [RateBucketInfo]) -> Self {
31 0 : Self {
32 0 : client,
33 0 : limiter: GlobalRateLimiter::new(info.into()),
34 0 : }
35 0 : }
36 :
37 0 : pub async fn try_connect(&mut self) -> anyhow::Result<()> {
38 0 : match self.client.connect().await {
39 0 : Ok(()) => {}
40 0 : Err(e) => {
41 0 : tracing::error!("failed to connect to redis: {e}");
42 0 : return Err(e);
43 : }
44 : }
45 0 : Ok(())
46 0 : }
47 :
48 0 : pub(crate) async fn query<T: FromRedisValue>(
49 0 : &mut self,
50 0 : q: impl Queryable,
51 0 : ) -> anyhow::Result<T> {
52 0 : if !self.limiter.check() {
53 0 : tracing::info!("Rate limit exceeded. Skipping query");
54 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
55 0 : }
56 0 :
57 0 : match q.query(&mut self.client).await {
58 0 : Ok(t) => return Ok(t),
59 0 : Err(e) => {
60 0 : tracing::error!("failed to run query: {e}");
61 : }
62 : }
63 :
64 0 : tracing::info!("Redis client is disconnected. Reconnecting...");
65 0 : self.try_connect().await?;
66 0 : Ok(q.query(&mut self.client).await?)
67 0 : }
68 : }
|