LCOV - code coverage report
Current view: top level - proxy/src/redis - cancellation_publisher.rs (source / functions) Coverage Total Hit
Test: 20b6afc7b7f34578dcaab2b3acdaecfe91cd8bf1.info Lines: 7.5 % 107 8
Test Date: 2024-11-25 17:48:16 Functions: 9.5 % 21 2

            Line data    Source code
       1              : use std::sync::Arc;
       2              : 
       3              : use core::net::IpAddr;
       4              : use pq_proto::CancelKeyData;
       5              : use redis::AsyncCommands;
       6              : use tokio::sync::Mutex;
       7              : use uuid::Uuid;
       8              : 
       9              : use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
      10              : use super::notifications::{CancelSession, Notification, PROXY_CHANNEL_NAME};
      11              : use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
      12              : 
      13              : pub trait CancellationPublisherMut: Send + Sync + 'static {
      14              :     #[allow(async_fn_in_trait)]
      15              :     async fn try_publish(
      16              :         &mut self,
      17              :         cancel_key_data: CancelKeyData,
      18              :         session_id: Uuid,
      19              :         peer_addr: IpAddr,
      20              :     ) -> anyhow::Result<()>;
      21              : }
      22              : 
      23              : pub trait CancellationPublisher: Send + Sync + 'static {
      24              :     #[allow(async_fn_in_trait)]
      25              :     async fn try_publish(
      26              :         &self,
      27              :         cancel_key_data: CancelKeyData,
      28              :         session_id: Uuid,
      29              :         peer_addr: IpAddr,
      30              :     ) -> anyhow::Result<()>;
      31              : }
      32              : 
      33              : impl CancellationPublisher for () {
      34            1 :     async fn try_publish(
      35            1 :         &self,
      36            1 :         _cancel_key_data: CancelKeyData,
      37            1 :         _session_id: Uuid,
      38            1 :         _peer_addr: IpAddr,
      39            1 :     ) -> anyhow::Result<()> {
      40            1 :         Ok(())
      41            1 :     }
      42              : }
      43              : 
      44              : impl<P: CancellationPublisher> CancellationPublisherMut for P {
      45            0 :     async fn try_publish(
      46            0 :         &mut self,
      47            0 :         cancel_key_data: CancelKeyData,
      48            0 :         session_id: Uuid,
      49            0 :         peer_addr: IpAddr,
      50            0 :     ) -> anyhow::Result<()> {
      51            0 :         <P as CancellationPublisher>::try_publish(self, cancel_key_data, session_id, peer_addr)
      52            0 :             .await
      53            0 :     }
      54              : }
      55              : 
      56              : impl<P: CancellationPublisher> CancellationPublisher for Option<P> {
      57            0 :     async fn try_publish(
      58            0 :         &self,
      59            0 :         cancel_key_data: CancelKeyData,
      60            0 :         session_id: Uuid,
      61            0 :         peer_addr: IpAddr,
      62            0 :     ) -> anyhow::Result<()> {
      63            0 :         if let Some(p) = self {
      64            0 :             p.try_publish(cancel_key_data, session_id, peer_addr).await
      65              :         } else {
      66            0 :             Ok(())
      67              :         }
      68            0 :     }
      69              : }
      70              : 
      71              : impl<P: CancellationPublisherMut> CancellationPublisher for Arc<Mutex<P>> {
      72            0 :     async fn try_publish(
      73            0 :         &self,
      74            0 :         cancel_key_data: CancelKeyData,
      75            0 :         session_id: Uuid,
      76            0 :         peer_addr: IpAddr,
      77            0 :     ) -> anyhow::Result<()> {
      78            0 :         self.lock()
      79            0 :             .await
      80            0 :             .try_publish(cancel_key_data, session_id, peer_addr)
      81            0 :             .await
      82            0 :     }
      83              : }
      84              : 
      85              : pub struct RedisPublisherClient {
      86              :     client: ConnectionWithCredentialsProvider,
      87              :     region_id: String,
      88              :     limiter: GlobalRateLimiter,
      89              : }
      90              : 
      91              : impl RedisPublisherClient {
      92            0 :     pub fn new(
      93            0 :         client: ConnectionWithCredentialsProvider,
      94            0 :         region_id: String,
      95            0 :         info: &'static [RateBucketInfo],
      96            0 :     ) -> anyhow::Result<Self> {
      97            0 :         Ok(Self {
      98            0 :             client,
      99            0 :             region_id,
     100            0 :             limiter: GlobalRateLimiter::new(info.into()),
     101            0 :         })
     102            0 :     }
     103              : 
     104            0 :     async fn publish(
     105            0 :         &mut self,
     106            0 :         cancel_key_data: CancelKeyData,
     107            0 :         session_id: Uuid,
     108            0 :         peer_addr: IpAddr,
     109            0 :     ) -> anyhow::Result<()> {
     110            0 :         let payload = serde_json::to_string(&Notification::Cancel(CancelSession {
     111            0 :             region_id: Some(self.region_id.clone()),
     112            0 :             cancel_key_data,
     113            0 :             session_id,
     114            0 :             peer_addr: Some(peer_addr),
     115            0 :         }))?;
     116            0 :         let _: () = self.client.publish(PROXY_CHANNEL_NAME, payload).await?;
     117            0 :         Ok(())
     118            0 :     }
     119            0 :     pub(crate) async fn try_connect(&mut self) -> anyhow::Result<()> {
     120            0 :         match self.client.connect().await {
     121            0 :             Ok(()) => {}
     122            0 :             Err(e) => {
     123            0 :                 tracing::error!("failed to connect to redis: {e}");
     124            0 :                 return Err(e);
     125              :             }
     126              :         }
     127            0 :         Ok(())
     128            0 :     }
     129            0 :     async fn try_publish_internal(
     130            0 :         &mut self,
     131            0 :         cancel_key_data: CancelKeyData,
     132            0 :         session_id: Uuid,
     133            0 :         peer_addr: IpAddr,
     134            0 :     ) -> anyhow::Result<()> {
     135            0 :         // TODO: review redundant error duplication logs.
     136            0 :         if !self.limiter.check() {
     137            0 :             tracing::info!("Rate limit exceeded. Skipping cancellation message");
     138            0 :             return Err(anyhow::anyhow!("Rate limit exceeded"));
     139            0 :         }
     140            0 :         match self.publish(cancel_key_data, session_id, peer_addr).await {
     141            0 :             Ok(()) => return Ok(()),
     142            0 :             Err(e) => {
     143            0 :                 tracing::error!("failed to publish a message: {e}");
     144              :             }
     145              :         }
     146            0 :         tracing::info!("Publisher is disconnected. Reconnectiong...");
     147            0 :         self.try_connect().await?;
     148            0 :         self.publish(cancel_key_data, session_id, peer_addr).await
     149            0 :     }
     150              : }
     151              : 
     152              : impl CancellationPublisherMut for RedisPublisherClient {
     153            0 :     async fn try_publish(
     154            0 :         &mut self,
     155            0 :         cancel_key_data: CancelKeyData,
     156            0 :         session_id: Uuid,
     157            0 :         peer_addr: IpAddr,
     158            0 :     ) -> anyhow::Result<()> {
     159            0 :         tracing::info!("publishing cancellation key to Redis");
     160            0 :         match self
     161            0 :             .try_publish_internal(cancel_key_data, session_id, peer_addr)
     162            0 :             .await
     163              :         {
     164              :             Ok(()) => {
     165            0 :                 tracing::debug!("cancellation key successfuly published to Redis");
     166            0 :                 Ok(())
     167              :             }
     168            0 :             Err(e) => {
     169            0 :                 tracing::error!("failed to publish a message: {e}");
     170            0 :                 Err(e)
     171              :             }
     172              :         }
     173            0 :     }
     174              : }
        

Generated by: LCOV version 2.1-beta