Line data Source code
1 : use core::net::IpAddr;
2 : use std::sync::Arc;
3 :
4 : use pq_proto::CancelKeyData;
5 : use tokio::sync::Mutex;
6 : use uuid::Uuid;
7 :
8 : use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
9 : use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
10 :
11 : pub trait CancellationPublisherMut: Send + Sync + 'static {
12 : #[allow(async_fn_in_trait)]
13 : async fn try_publish(
14 : &mut self,
15 : cancel_key_data: CancelKeyData,
16 : session_id: Uuid,
17 : peer_addr: IpAddr,
18 : ) -> anyhow::Result<()>;
19 : }
20 :
21 : pub trait CancellationPublisher: Send + Sync + 'static {
22 : #[allow(async_fn_in_trait)]
23 : async fn try_publish(
24 : &self,
25 : cancel_key_data: CancelKeyData,
26 : session_id: Uuid,
27 : peer_addr: IpAddr,
28 : ) -> anyhow::Result<()>;
29 : }
30 :
31 : impl CancellationPublisher for () {
32 0 : async fn try_publish(
33 0 : &self,
34 0 : _cancel_key_data: CancelKeyData,
35 0 : _session_id: Uuid,
36 0 : _peer_addr: IpAddr,
37 0 : ) -> anyhow::Result<()> {
38 0 : Ok(())
39 0 : }
40 : }
41 :
42 : impl<P: CancellationPublisher> CancellationPublisherMut for P {
43 0 : async fn try_publish(
44 0 : &mut self,
45 0 : cancel_key_data: CancelKeyData,
46 0 : session_id: Uuid,
47 0 : peer_addr: IpAddr,
48 0 : ) -> anyhow::Result<()> {
49 0 : <P as CancellationPublisher>::try_publish(self, cancel_key_data, session_id, peer_addr)
50 0 : .await
51 0 : }
52 : }
53 :
54 : impl<P: CancellationPublisher> CancellationPublisher for Option<P> {
55 0 : async fn try_publish(
56 0 : &self,
57 0 : cancel_key_data: CancelKeyData,
58 0 : session_id: Uuid,
59 0 : peer_addr: IpAddr,
60 0 : ) -> anyhow::Result<()> {
61 0 : if let Some(p) = self {
62 0 : p.try_publish(cancel_key_data, session_id, peer_addr).await
63 : } else {
64 0 : Ok(())
65 : }
66 0 : }
67 : }
68 :
69 : impl<P: CancellationPublisherMut> CancellationPublisher for Arc<Mutex<P>> {
70 0 : async fn try_publish(
71 0 : &self,
72 0 : cancel_key_data: CancelKeyData,
73 0 : session_id: Uuid,
74 0 : peer_addr: IpAddr,
75 0 : ) -> anyhow::Result<()> {
76 0 : self.lock()
77 0 : .await
78 0 : .try_publish(cancel_key_data, session_id, peer_addr)
79 0 : .await
80 0 : }
81 : }
82 :
83 : pub struct RedisPublisherClient {
84 : #[allow(dead_code)]
85 : client: ConnectionWithCredentialsProvider,
86 : _region_id: String,
87 : _limiter: GlobalRateLimiter,
88 : }
89 :
90 : impl RedisPublisherClient {
91 0 : pub fn new(
92 0 : client: ConnectionWithCredentialsProvider,
93 0 : region_id: String,
94 0 : info: &'static [RateBucketInfo],
95 0 : ) -> anyhow::Result<Self> {
96 0 : Ok(Self {
97 0 : client,
98 0 : _region_id: region_id,
99 0 : _limiter: GlobalRateLimiter::new(info.into()),
100 0 : })
101 0 : }
102 :
103 : #[allow(dead_code)]
104 0 : pub(crate) async fn try_connect(&mut self) -> anyhow::Result<()> {
105 0 : match self.client.connect().await {
106 0 : Ok(()) => {}
107 0 : Err(e) => {
108 0 : tracing::error!("failed to connect to redis: {e}");
109 0 : return Err(e);
110 : }
111 : }
112 0 : Ok(())
113 0 : }
114 : }
|