Line data Source code
1 : use std::sync::Arc;
2 :
3 : use core::net::IpAddr;
4 : use pq_proto::CancelKeyData;
5 : use redis::AsyncCommands;
6 : use tokio::sync::Mutex;
7 : use uuid::Uuid;
8 :
9 : use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
10 : use super::notifications::{CancelSession, Notification, PROXY_CHANNEL_NAME};
11 : use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
12 :
13 : pub trait CancellationPublisherMut: Send + Sync + 'static {
14 : #[allow(async_fn_in_trait)]
15 : async fn try_publish(
16 : &mut self,
17 : cancel_key_data: CancelKeyData,
18 : session_id: Uuid,
19 : peer_addr: IpAddr,
20 : ) -> anyhow::Result<()>;
21 : }
22 :
23 : pub trait CancellationPublisher: Send + Sync + 'static {
24 : #[allow(async_fn_in_trait)]
25 : async fn try_publish(
26 : &self,
27 : cancel_key_data: CancelKeyData,
28 : session_id: Uuid,
29 : peer_addr: IpAddr,
30 : ) -> anyhow::Result<()>;
31 : }
32 :
33 : impl CancellationPublisher for () {
34 1 : async fn try_publish(
35 1 : &self,
36 1 : _cancel_key_data: CancelKeyData,
37 1 : _session_id: Uuid,
38 1 : _peer_addr: IpAddr,
39 1 : ) -> anyhow::Result<()> {
40 1 : Ok(())
41 1 : }
42 : }
43 :
44 : impl<P: CancellationPublisher> CancellationPublisherMut for P {
45 0 : async fn try_publish(
46 0 : &mut self,
47 0 : cancel_key_data: CancelKeyData,
48 0 : session_id: Uuid,
49 0 : peer_addr: IpAddr,
50 0 : ) -> anyhow::Result<()> {
51 0 : <P as CancellationPublisher>::try_publish(self, cancel_key_data, session_id, peer_addr)
52 0 : .await
53 0 : }
54 : }
55 :
56 : impl<P: CancellationPublisher> CancellationPublisher for Option<P> {
57 0 : async fn try_publish(
58 0 : &self,
59 0 : cancel_key_data: CancelKeyData,
60 0 : session_id: Uuid,
61 0 : peer_addr: IpAddr,
62 0 : ) -> anyhow::Result<()> {
63 0 : if let Some(p) = self {
64 0 : p.try_publish(cancel_key_data, session_id, peer_addr).await
65 : } else {
66 0 : Ok(())
67 : }
68 0 : }
69 : }
70 :
71 : impl<P: CancellationPublisherMut> CancellationPublisher for Arc<Mutex<P>> {
72 0 : async fn try_publish(
73 0 : &self,
74 0 : cancel_key_data: CancelKeyData,
75 0 : session_id: Uuid,
76 0 : peer_addr: IpAddr,
77 0 : ) -> anyhow::Result<()> {
78 0 : self.lock()
79 0 : .await
80 0 : .try_publish(cancel_key_data, session_id, peer_addr)
81 0 : .await
82 0 : }
83 : }
84 :
85 : pub struct RedisPublisherClient {
86 : client: ConnectionWithCredentialsProvider,
87 : region_id: String,
88 : limiter: GlobalRateLimiter,
89 : }
90 :
91 : impl RedisPublisherClient {
92 0 : pub fn new(
93 0 : client: ConnectionWithCredentialsProvider,
94 0 : region_id: String,
95 0 : info: &'static [RateBucketInfo],
96 0 : ) -> anyhow::Result<Self> {
97 0 : Ok(Self {
98 0 : client,
99 0 : region_id,
100 0 : limiter: GlobalRateLimiter::new(info.into()),
101 0 : })
102 0 : }
103 :
104 0 : async fn publish(
105 0 : &mut self,
106 0 : cancel_key_data: CancelKeyData,
107 0 : session_id: Uuid,
108 0 : peer_addr: IpAddr,
109 0 : ) -> anyhow::Result<()> {
110 0 : let payload = serde_json::to_string(&Notification::Cancel(CancelSession {
111 0 : region_id: Some(self.region_id.clone()),
112 0 : cancel_key_data,
113 0 : session_id,
114 0 : peer_addr: Some(peer_addr),
115 0 : }))?;
116 0 : let _: () = self.client.publish(PROXY_CHANNEL_NAME, payload).await?;
117 0 : Ok(())
118 0 : }
119 0 : pub(crate) async fn try_connect(&mut self) -> anyhow::Result<()> {
120 0 : match self.client.connect().await {
121 0 : Ok(()) => {}
122 0 : Err(e) => {
123 0 : tracing::error!("failed to connect to redis: {e}");
124 0 : return Err(e);
125 : }
126 : }
127 0 : Ok(())
128 0 : }
129 0 : async fn try_publish_internal(
130 0 : &mut self,
131 0 : cancel_key_data: CancelKeyData,
132 0 : session_id: Uuid,
133 0 : peer_addr: IpAddr,
134 0 : ) -> anyhow::Result<()> {
135 0 : // TODO: review redundant error duplication logs.
136 0 : if !self.limiter.check() {
137 0 : tracing::info!("Rate limit exceeded. Skipping cancellation message");
138 0 : return Err(anyhow::anyhow!("Rate limit exceeded"));
139 0 : }
140 0 : match self.publish(cancel_key_data, session_id, peer_addr).await {
141 0 : Ok(()) => return Ok(()),
142 0 : Err(e) => {
143 0 : tracing::error!("failed to publish a message: {e}");
144 : }
145 : }
146 0 : tracing::info!("Publisher is disconnected. Reconnectiong...");
147 0 : self.try_connect().await?;
148 0 : self.publish(cancel_key_data, session_id, peer_addr).await
149 0 : }
150 : }
151 :
152 : impl CancellationPublisherMut for RedisPublisherClient {
153 0 : async fn try_publish(
154 0 : &mut self,
155 0 : cancel_key_data: CancelKeyData,
156 0 : session_id: Uuid,
157 0 : peer_addr: IpAddr,
158 0 : ) -> anyhow::Result<()> {
159 0 : tracing::info!("publishing cancellation key to Redis");
160 0 : match self
161 0 : .try_publish_internal(cancel_key_data, session_id, peer_addr)
162 0 : .await
163 : {
164 : Ok(()) => {
165 0 : tracing::debug!("cancellation key successfuly published to Redis");
166 0 : Ok(())
167 : }
168 0 : Err(e) => {
169 0 : tracing::error!("failed to publish a message: {e}");
170 0 : Err(e)
171 : }
172 : }
173 0 : }
174 : }
|