Line data Source code
1 : use redis::{AsyncCommands, ToRedisArgs};
2 :
3 : use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
4 :
5 : use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
6 :
7 : pub struct RedisKVClient {
8 : client: ConnectionWithCredentialsProvider,
9 : limiter: GlobalRateLimiter,
10 : }
11 :
12 : impl RedisKVClient {
13 0 : pub fn new(client: ConnectionWithCredentialsProvider, info: &'static [RateBucketInfo]) -> Self {
14 0 : Self {
15 0 : client,
16 0 : limiter: GlobalRateLimiter::new(info.into()),
17 0 : }
18 0 : }
19 :
20 0 : pub async fn try_connect(&mut self) -> anyhow::Result<()> {
21 0 : match self.client.connect().await {
22 0 : Ok(()) => {}
23 0 : Err(e) => {
24 0 : tracing::error!("failed to connect to redis: {e}");
25 0 : return Err(e);
26 : }
27 : }
28 0 : Ok(())
29 0 : }
30 :
31 0 : pub(crate) async fn hset<K, F, V>(&mut self, key: K, field: F, value: V) -> anyhow::Result<()>
32 0 : where
33 0 : K: ToRedisArgs + Send + Sync,
34 0 : F: ToRedisArgs + Send + Sync,
35 0 : V: ToRedisArgs + Send + Sync,
36 0 : {
37 0 : if !self.limiter.check() {
38 0 : tracing::info!("Rate limit exceeded. Skipping hset");
39 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
40 0 : }
41 0 :
42 0 : match self.client.hset(&key, &field, &value).await {
43 0 : Ok(()) => return Ok(()),
44 0 : Err(e) => {
45 0 : tracing::error!("failed to set a key-value pair: {e}");
46 : }
47 : }
48 :
49 0 : tracing::info!("Redis client is disconnected. Reconnectiong...");
50 0 : self.try_connect().await?;
51 0 : self.client
52 0 : .hset(key, field, value)
53 0 : .await
54 0 : .map_err(anyhow::Error::new)
55 0 : }
56 :
57 : #[allow(dead_code)]
58 0 : pub(crate) async fn hset_multiple<K, V>(
59 0 : &mut self,
60 0 : key: &str,
61 0 : items: &[(K, V)],
62 0 : ) -> anyhow::Result<()>
63 0 : where
64 0 : K: ToRedisArgs + Send + Sync,
65 0 : V: ToRedisArgs + Send + Sync,
66 0 : {
67 0 : if !self.limiter.check() {
68 0 : tracing::info!("Rate limit exceeded. Skipping hset_multiple");
69 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
70 0 : }
71 0 :
72 0 : match self.client.hset_multiple(key, items).await {
73 0 : Ok(()) => return Ok(()),
74 0 : Err(e) => {
75 0 : tracing::error!("failed to set a key-value pair: {e}");
76 : }
77 : }
78 :
79 0 : tracing::info!("Redis client is disconnected. Reconnectiong...");
80 0 : self.try_connect().await?;
81 0 : self.client
82 0 : .hset_multiple(key, items)
83 0 : .await
84 0 : .map_err(anyhow::Error::new)
85 0 : }
86 :
87 : #[allow(dead_code)]
88 0 : pub(crate) async fn expire<K>(&mut self, key: K, seconds: i64) -> anyhow::Result<()>
89 0 : where
90 0 : K: ToRedisArgs + Send + Sync,
91 0 : {
92 0 : if !self.limiter.check() {
93 0 : tracing::info!("Rate limit exceeded. Skipping expire");
94 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
95 0 : }
96 0 :
97 0 : match self.client.expire(&key, seconds).await {
98 0 : Ok(()) => return Ok(()),
99 0 : Err(e) => {
100 0 : tracing::error!("failed to set a key-value pair: {e}");
101 : }
102 : }
103 :
104 0 : tracing::info!("Redis client is disconnected. Reconnectiong...");
105 0 : self.try_connect().await?;
106 0 : self.client
107 0 : .expire(key, seconds)
108 0 : .await
109 0 : .map_err(anyhow::Error::new)
110 0 : }
111 :
112 : #[allow(dead_code)]
113 0 : pub(crate) async fn hget<K, F, V>(&mut self, key: K, field: F) -> anyhow::Result<V>
114 0 : where
115 0 : K: ToRedisArgs + Send + Sync,
116 0 : F: ToRedisArgs + Send + Sync,
117 0 : V: redis::FromRedisValue,
118 0 : {
119 0 : if !self.limiter.check() {
120 0 : tracing::info!("Rate limit exceeded. Skipping hget");
121 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
122 0 : }
123 0 :
124 0 : match self.client.hget(&key, &field).await {
125 0 : Ok(value) => return Ok(value),
126 0 : Err(e) => {
127 0 : tracing::error!("failed to get a value: {e}");
128 : }
129 : }
130 :
131 0 : tracing::info!("Redis client is disconnected. Reconnectiong...");
132 0 : self.try_connect().await?;
133 0 : self.client
134 0 : .hget(key, field)
135 0 : .await
136 0 : .map_err(anyhow::Error::new)
137 0 : }
138 :
139 0 : pub(crate) async fn hget_all<K, V>(&mut self, key: K) -> anyhow::Result<V>
140 0 : where
141 0 : K: ToRedisArgs + Send + Sync,
142 0 : V: redis::FromRedisValue,
143 0 : {
144 0 : if !self.limiter.check() {
145 0 : tracing::info!("Rate limit exceeded. Skipping hgetall");
146 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
147 0 : }
148 0 :
149 0 : match self.client.hgetall(&key).await {
150 0 : Ok(value) => return Ok(value),
151 0 : Err(e) => {
152 0 : tracing::error!("failed to get a value: {e}");
153 : }
154 : }
155 :
156 0 : tracing::info!("Redis client is disconnected. Reconnectiong...");
157 0 : self.try_connect().await?;
158 0 : self.client.hgetall(key).await.map_err(anyhow::Error::new)
159 0 : }
160 :
161 0 : pub(crate) async fn hdel<K, F>(&mut self, key: K, field: F) -> anyhow::Result<()>
162 0 : where
163 0 : K: ToRedisArgs + Send + Sync,
164 0 : F: ToRedisArgs + Send + Sync,
165 0 : {
166 0 : if !self.limiter.check() {
167 0 : tracing::info!("Rate limit exceeded. Skipping hdel");
168 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
169 0 : }
170 0 :
171 0 : match self.client.hdel(&key, &field).await {
172 0 : Ok(()) => return Ok(()),
173 0 : Err(e) => {
174 0 : tracing::error!("failed to delete a key-value pair: {e}");
175 : }
176 : }
177 :
178 0 : tracing::info!("Redis client is disconnected. Reconnectiong...");
179 0 : self.try_connect().await?;
180 0 : self.client
181 0 : .hdel(key, field)
182 0 : .await
183 0 : .map_err(anyhow::Error::new)
184 0 : }
185 : }
|