Line data Source code
1 : use redis::{AsyncCommands, ToRedisArgs};
2 :
3 : use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
4 : use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
5 :
6 : pub struct RedisKVClient {
7 : client: ConnectionWithCredentialsProvider,
8 : limiter: GlobalRateLimiter,
9 : }
10 :
11 : impl RedisKVClient {
12 0 : pub fn new(client: ConnectionWithCredentialsProvider, info: &'static [RateBucketInfo]) -> Self {
13 0 : Self {
14 0 : client,
15 0 : limiter: GlobalRateLimiter::new(info.into()),
16 0 : }
17 0 : }
18 :
19 0 : pub async fn try_connect(&mut self) -> anyhow::Result<()> {
20 0 : match self.client.connect().await {
21 0 : Ok(()) => {}
22 0 : Err(e) => {
23 0 : tracing::error!("failed to connect to redis: {e}");
24 0 : return Err(e);
25 : }
26 : }
27 0 : Ok(())
28 0 : }
29 :
30 0 : pub(crate) async fn hset<K, F, V>(&mut self, key: K, field: F, value: V) -> anyhow::Result<()>
31 0 : where
32 0 : K: ToRedisArgs + Send + Sync,
33 0 : F: ToRedisArgs + Send + Sync,
34 0 : V: ToRedisArgs + Send + Sync,
35 0 : {
36 0 : if !self.limiter.check() {
37 0 : tracing::info!("Rate limit exceeded. Skipping hset");
38 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
39 0 : }
40 0 :
41 0 : match self.client.hset(&key, &field, &value).await {
42 0 : Ok(()) => return Ok(()),
43 0 : Err(e) => {
44 0 : tracing::error!("failed to set a key-value pair: {e}");
45 : }
46 : }
47 :
48 0 : tracing::info!("Redis client is disconnected. Reconnectiong...");
49 0 : self.try_connect().await?;
50 0 : self.client
51 0 : .hset(key, field, value)
52 0 : .await
53 0 : .map_err(anyhow::Error::new)
54 0 : }
55 :
56 : #[allow(dead_code)]
57 0 : pub(crate) async fn hset_multiple<K, V>(
58 0 : &mut self,
59 0 : key: &str,
60 0 : items: &[(K, V)],
61 0 : ) -> anyhow::Result<()>
62 0 : where
63 0 : K: ToRedisArgs + Send + Sync,
64 0 : V: ToRedisArgs + Send + Sync,
65 0 : {
66 0 : if !self.limiter.check() {
67 0 : tracing::info!("Rate limit exceeded. Skipping hset_multiple");
68 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
69 0 : }
70 0 :
71 0 : match self.client.hset_multiple(key, items).await {
72 0 : Ok(()) => return Ok(()),
73 0 : Err(e) => {
74 0 : tracing::error!("failed to set a key-value pair: {e}");
75 : }
76 : }
77 :
78 0 : tracing::info!("Redis client is disconnected. Reconnectiong...");
79 0 : self.try_connect().await?;
80 0 : self.client
81 0 : .hset_multiple(key, items)
82 0 : .await
83 0 : .map_err(anyhow::Error::new)
84 0 : }
85 :
86 : #[allow(dead_code)]
87 0 : pub(crate) async fn expire<K>(&mut self, key: K, seconds: i64) -> anyhow::Result<()>
88 0 : where
89 0 : K: ToRedisArgs + Send + Sync,
90 0 : {
91 0 : if !self.limiter.check() {
92 0 : tracing::info!("Rate limit exceeded. Skipping expire");
93 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
94 0 : }
95 0 :
96 0 : match self.client.expire(&key, seconds).await {
97 0 : Ok(()) => return Ok(()),
98 0 : Err(e) => {
99 0 : tracing::error!("failed to set a key-value pair: {e}");
100 : }
101 : }
102 :
103 0 : tracing::info!("Redis client is disconnected. Reconnectiong...");
104 0 : self.try_connect().await?;
105 0 : self.client
106 0 : .expire(key, seconds)
107 0 : .await
108 0 : .map_err(anyhow::Error::new)
109 0 : }
110 :
111 : #[allow(dead_code)]
112 0 : pub(crate) async fn hget<K, F, V>(&mut self, key: K, field: F) -> anyhow::Result<V>
113 0 : where
114 0 : K: ToRedisArgs + Send + Sync,
115 0 : F: ToRedisArgs + Send + Sync,
116 0 : V: redis::FromRedisValue,
117 0 : {
118 0 : if !self.limiter.check() {
119 0 : tracing::info!("Rate limit exceeded. Skipping hget");
120 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
121 0 : }
122 0 :
123 0 : match self.client.hget(&key, &field).await {
124 0 : Ok(value) => return Ok(value),
125 0 : Err(e) => {
126 0 : tracing::error!("failed to get a value: {e}");
127 : }
128 : }
129 :
130 0 : tracing::info!("Redis client is disconnected. Reconnectiong...");
131 0 : self.try_connect().await?;
132 0 : self.client
133 0 : .hget(key, field)
134 0 : .await
135 0 : .map_err(anyhow::Error::new)
136 0 : }
137 :
138 0 : pub(crate) async fn hget_all<K, V>(&mut self, key: K) -> anyhow::Result<V>
139 0 : where
140 0 : K: ToRedisArgs + Send + Sync,
141 0 : V: redis::FromRedisValue,
142 0 : {
143 0 : if !self.limiter.check() {
144 0 : tracing::info!("Rate limit exceeded. Skipping hgetall");
145 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
146 0 : }
147 0 :
148 0 : match self.client.hgetall(&key).await {
149 0 : Ok(value) => return Ok(value),
150 0 : Err(e) => {
151 0 : tracing::error!("failed to get a value: {e}");
152 : }
153 : }
154 :
155 0 : tracing::info!("Redis client is disconnected. Reconnectiong...");
156 0 : self.try_connect().await?;
157 0 : self.client.hgetall(key).await.map_err(anyhow::Error::new)
158 0 : }
159 :
160 0 : pub(crate) async fn hdel<K, F>(&mut self, key: K, field: F) -> anyhow::Result<()>
161 0 : where
162 0 : K: ToRedisArgs + Send + Sync,
163 0 : F: ToRedisArgs + Send + Sync,
164 0 : {
165 0 : if !self.limiter.check() {
166 0 : tracing::info!("Rate limit exceeded. Skipping hdel");
167 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
168 0 : }
169 0 :
170 0 : match self.client.hdel(&key, &field).await {
171 0 : Ok(()) => return Ok(()),
172 0 : Err(e) => {
173 0 : tracing::error!("failed to delete a key-value pair: {e}");
174 : }
175 : }
176 :
177 0 : tracing::info!("Redis client is disconnected. Reconnectiong...");
178 0 : self.try_connect().await?;
179 0 : self.client
180 0 : .hdel(key, field)
181 0 : .await
182 0 : .map_err(anyhow::Error::new)
183 0 : }
184 : }
|