LCOV - code coverage report
Current view: top level - proxy/src/redis - publisher.rs (source / functions) Coverage Total Hit
Test: aca8877be6ceba750c1be359ed71bc1799d52b30.info Lines: 0.0 % 58 0
Test Date: 2024-02-14 18:05:35 Functions: 0.0 % 12 0

            Line data    Source code
       1              : use pq_proto::CancelKeyData;
       2              : use redis::AsyncCommands;
       3              : use uuid::Uuid;
       4              : 
       5              : use crate::rate_limiter::{RateBucketInfo, RedisRateLimiter};
       6              : 
       7              : use super::notifications::{CancelSession, Notification, PROXY_CHANNEL_NAME};
       8              : 
       9              : pub struct RedisPublisherClient {
      10              :     client: redis::Client,
      11              :     publisher: Option<redis::aio::Connection>,
      12              :     region_id: String,
      13              :     limiter: RedisRateLimiter,
      14              : }
      15              : 
      16              : impl RedisPublisherClient {
      17            0 :     pub fn new(
      18            0 :         url: &str,
      19            0 :         region_id: String,
      20            0 :         info: &'static [RateBucketInfo],
      21            0 :     ) -> anyhow::Result<Self> {
      22            0 :         let client = redis::Client::open(url)?;
      23            0 :         Ok(Self {
      24            0 :             client,
      25            0 :             publisher: None,
      26            0 :             region_id,
      27            0 :             limiter: RedisRateLimiter::new(info),
      28            0 :         })
      29            0 :     }
      30            0 :     pub async fn try_publish(
      31            0 :         &mut self,
      32            0 :         cancel_key_data: CancelKeyData,
      33            0 :         session_id: Uuid,
      34            0 :     ) -> anyhow::Result<()> {
      35            0 :         if !self.limiter.check() {
      36            0 :             tracing::info!("Rate limit exceeded. Skipping cancellation message");
      37            0 :             return Err(anyhow::anyhow!("Rate limit exceeded"));
      38            0 :         }
      39            0 :         match self.publish(cancel_key_data, session_id).await {
      40            0 :             Ok(()) => return Ok(()),
      41            0 :             Err(e) => {
      42            0 :                 tracing::error!("failed to publish a message: {e}");
      43            0 :                 self.publisher = None;
      44              :             }
      45              :         }
      46            0 :         tracing::info!("Publisher is disconnected. Reconnectiong...");
      47            0 :         self.try_connect().await?;
      48            0 :         self.publish(cancel_key_data, session_id).await
      49            0 :     }
      50              : 
      51            0 :     async fn publish(
      52            0 :         &mut self,
      53            0 :         cancel_key_data: CancelKeyData,
      54            0 :         session_id: Uuid,
      55            0 :     ) -> anyhow::Result<()> {
      56            0 :         let conn = self
      57            0 :             .publisher
      58            0 :             .as_mut()
      59            0 :             .ok_or_else(|| anyhow::anyhow!("not connected"))?;
      60            0 :         let payload = serde_json::to_string(&Notification::Cancel(CancelSession {
      61            0 :             region_id: Some(self.region_id.clone()),
      62            0 :             cancel_key_data,
      63            0 :             session_id,
      64            0 :         }))?;
      65            0 :         conn.publish(PROXY_CHANNEL_NAME, payload).await?;
      66            0 :         Ok(())
      67            0 :     }
      68            0 :     pub async fn try_connect(&mut self) -> anyhow::Result<()> {
      69            0 :         match self.client.get_async_connection().await {
      70            0 :             Ok(conn) => {
      71            0 :                 self.publisher = Some(conn);
      72            0 :             }
      73            0 :             Err(e) => {
      74            0 :                 tracing::error!("failed to connect to redis: {e}");
      75            0 :                 return Err(e.into());
      76              :             }
      77              :         }
      78            0 :         Ok(())
      79            0 :     }
      80              : }
        

Generated by: LCOV version 2.1-beta