Line data Source code
1 : use pq_proto::CancelKeyData;
2 : use redis::AsyncCommands;
3 : use uuid::Uuid;
4 :
5 : use crate::rate_limiter::{RateBucketInfo, RedisRateLimiter};
6 :
7 : use super::notifications::{CancelSession, Notification, PROXY_CHANNEL_NAME};
8 :
9 : pub struct RedisPublisherClient {
10 : client: redis::Client,
11 : publisher: Option<redis::aio::Connection>,
12 : region_id: String,
13 : limiter: RedisRateLimiter,
14 : }
15 :
16 : impl RedisPublisherClient {
17 0 : pub fn new(
18 0 : url: &str,
19 0 : region_id: String,
20 0 : info: &'static [RateBucketInfo],
21 0 : ) -> anyhow::Result<Self> {
22 0 : let client = redis::Client::open(url)?;
23 0 : Ok(Self {
24 0 : client,
25 0 : publisher: None,
26 0 : region_id,
27 0 : limiter: RedisRateLimiter::new(info),
28 0 : })
29 0 : }
30 0 : pub async fn try_publish(
31 0 : &mut self,
32 0 : cancel_key_data: CancelKeyData,
33 0 : session_id: Uuid,
34 0 : ) -> anyhow::Result<()> {
35 0 : if !self.limiter.check() {
36 0 : tracing::info!("Rate limit exceeded. Skipping cancellation message");
37 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
38 0 : }
39 0 : match self.publish(cancel_key_data, session_id).await {
40 0 : Ok(()) => return Ok(()),
41 0 : Err(e) => {
42 0 : tracing::error!("failed to publish a message: {e}");
43 0 : self.publisher = None;
44 : }
45 : }
46 0 : tracing::info!("Publisher is disconnected. Reconnectiong...");
47 0 : self.try_connect().await?;
48 0 : self.publish(cancel_key_data, session_id).await
49 0 : }
50 :
51 0 : async fn publish(
52 0 : &mut self,
53 0 : cancel_key_data: CancelKeyData,
54 0 : session_id: Uuid,
55 0 : ) -> anyhow::Result<()> {
56 0 : let conn = self
57 0 : .publisher
58 0 : .as_mut()
59 0 : .ok_or_else(|| anyhow::anyhow!("not connected"))?;
60 0 : let payload = serde_json::to_string(&Notification::Cancel(CancelSession {
61 0 : region_id: Some(self.region_id.clone()),
62 0 : cancel_key_data,
63 0 : session_id,
64 0 : }))?;
65 0 : conn.publish(PROXY_CHANNEL_NAME, payload).await?;
66 0 : Ok(())
67 0 : }
68 0 : pub async fn try_connect(&mut self) -> anyhow::Result<()> {
69 0 : match self.client.get_async_connection().await {
70 0 : Ok(conn) => {
71 0 : self.publisher = Some(conn);
72 0 : }
73 0 : Err(e) => {
74 0 : tracing::error!("failed to connect to redis: {e}");
75 0 : return Err(e.into());
76 : }
77 : }
78 0 : Ok(())
79 0 : }
80 : }
|