LCOV - code coverage report
Current view: top level - proxy/src/redis - cancellation_publisher.rs (source / functions) Coverage Total Hit
Test: 727bdccc1d7d53837da843959afb612f56da4e79.info Lines: 0.0 % 57 0
Test Date: 2025-01-30 15:18:43 Functions: 0.0 % 11 0

            Line data    Source code
       1              : use core::net::IpAddr;
       2              : use std::sync::Arc;
       3              : 
       4              : use pq_proto::CancelKeyData;
       5              : use tokio::sync::Mutex;
       6              : use uuid::Uuid;
       7              : 
       8              : use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
       9              : use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo};
      10              : 
      11              : pub trait CancellationPublisherMut: Send + Sync + 'static {
      12              :     #[allow(async_fn_in_trait)]
      13              :     async fn try_publish(
      14              :         &mut self,
      15              :         cancel_key_data: CancelKeyData,
      16              :         session_id: Uuid,
      17              :         peer_addr: IpAddr,
      18              :     ) -> anyhow::Result<()>;
      19              : }
      20              : 
      21              : pub trait CancellationPublisher: Send + Sync + 'static {
      22              :     #[allow(async_fn_in_trait)]
      23              :     async fn try_publish(
      24              :         &self,
      25              :         cancel_key_data: CancelKeyData,
      26              :         session_id: Uuid,
      27              :         peer_addr: IpAddr,
      28              :     ) -> anyhow::Result<()>;
      29              : }
      30              : 
      31              : impl CancellationPublisher for () {
      32            0 :     async fn try_publish(
      33            0 :         &self,
      34            0 :         _cancel_key_data: CancelKeyData,
      35            0 :         _session_id: Uuid,
      36            0 :         _peer_addr: IpAddr,
      37            0 :     ) -> anyhow::Result<()> {
      38            0 :         Ok(())
      39            0 :     }
      40              : }
      41              : 
      42              : impl<P: CancellationPublisher> CancellationPublisherMut for P {
      43            0 :     async fn try_publish(
      44            0 :         &mut self,
      45            0 :         cancel_key_data: CancelKeyData,
      46            0 :         session_id: Uuid,
      47            0 :         peer_addr: IpAddr,
      48            0 :     ) -> anyhow::Result<()> {
      49            0 :         <P as CancellationPublisher>::try_publish(self, cancel_key_data, session_id, peer_addr)
      50            0 :             .await
      51            0 :     }
      52              : }
      53              : 
      54              : impl<P: CancellationPublisher> CancellationPublisher for Option<P> {
      55            0 :     async fn try_publish(
      56            0 :         &self,
      57            0 :         cancel_key_data: CancelKeyData,
      58            0 :         session_id: Uuid,
      59            0 :         peer_addr: IpAddr,
      60            0 :     ) -> anyhow::Result<()> {
      61            0 :         if let Some(p) = self {
      62            0 :             p.try_publish(cancel_key_data, session_id, peer_addr).await
      63              :         } else {
      64            0 :             Ok(())
      65              :         }
      66            0 :     }
      67              : }
      68              : 
      69              : impl<P: CancellationPublisherMut> CancellationPublisher for Arc<Mutex<P>> {
      70            0 :     async fn try_publish(
      71            0 :         &self,
      72            0 :         cancel_key_data: CancelKeyData,
      73            0 :         session_id: Uuid,
      74            0 :         peer_addr: IpAddr,
      75            0 :     ) -> anyhow::Result<()> {
      76            0 :         self.lock()
      77            0 :             .await
      78            0 :             .try_publish(cancel_key_data, session_id, peer_addr)
      79            0 :             .await
      80            0 :     }
      81              : }
      82              : 
      83              : pub struct RedisPublisherClient {
      84              :     #[allow(dead_code)]
      85              :     client: ConnectionWithCredentialsProvider,
      86              :     _region_id: String,
      87              :     _limiter: GlobalRateLimiter,
      88              : }
      89              : 
      90              : impl RedisPublisherClient {
      91            0 :     pub fn new(
      92            0 :         client: ConnectionWithCredentialsProvider,
      93            0 :         region_id: String,
      94            0 :         info: &'static [RateBucketInfo],
      95            0 :     ) -> anyhow::Result<Self> {
      96            0 :         Ok(Self {
      97            0 :             client,
      98            0 :             _region_id: region_id,
      99            0 :             _limiter: GlobalRateLimiter::new(info.into()),
     100            0 :         })
     101            0 :     }
     102              : 
     103              :     #[allow(dead_code)]
     104            0 :     pub(crate) async fn try_connect(&mut self) -> anyhow::Result<()> {
     105            0 :         match self.client.connect().await {
     106            0 :             Ok(()) => {}
     107            0 :             Err(e) => {
     108            0 :                 tracing::error!("failed to connect to redis: {e}");
     109            0 :                 return Err(e);
     110              :             }
     111              :         }
     112            0 :         Ok(())
     113            0 :     }
     114              : }
        

Generated by: LCOV version 2.1-beta