LCOV - code coverage report
Current view: top level - proxy/src - cancellation.rs (source / functions) Coverage Total Hit
Test: fabb29a6339542ee130cd1d32b534fafdc0be240.info Lines: 64.1 % 128 82
Test Date: 2024-06-25 13:20:00 Functions: 32.4 % 34 11

            Line data    Source code
       1              : use dashmap::DashMap;
       2              : use pq_proto::CancelKeyData;
       3              : use std::{net::SocketAddr, sync::Arc};
       4              : use thiserror::Error;
       5              : use tokio::net::TcpStream;
       6              : use tokio::sync::Mutex;
       7              : use tokio_postgres::{CancelToken, NoTls};
       8              : use tracing::info;
       9              : use uuid::Uuid;
      10              : 
      11              : use crate::{
      12              :     error::ReportableError,
      13              :     metrics::{CancellationRequest, CancellationSource, Metrics},
      14              :     redis::cancellation_publisher::{
      15              :         CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
      16              :     },
      17              : };
      18              : 
      19              : pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
      20              : pub type CancellationHandlerMain = CancellationHandler<Option<Arc<Mutex<RedisPublisherClient>>>>;
      21              : pub type CancellationHandlerMainInternal = Option<Arc<Mutex<RedisPublisherClient>>>;
      22              : 
      23              : /// Enables serving `CancelRequest`s.
      24              : ///
      25              : /// If `CancellationPublisher` is available, cancel request will be used to publish the cancellation key to other proxy instances.
      26              : pub struct CancellationHandler<P> {
      27              :     map: CancelMap,
      28              :     client: P,
      29              :     /// This field used for the monitoring purposes.
      30              :     /// Represents the source of the cancellation request.
      31              :     from: CancellationSource,
      32              : }
      33              : 
      34            0 : #[derive(Debug, Error)]
      35              : pub enum CancelError {
      36              :     #[error("{0}")]
      37              :     IO(#[from] std::io::Error),
      38              :     #[error("{0}")]
      39              :     Postgres(#[from] tokio_postgres::Error),
      40              : }
      41              : 
      42              : impl ReportableError for CancelError {
      43            0 :     fn get_error_kind(&self) -> crate::error::ErrorKind {
      44            0 :         match self {
      45            0 :             CancelError::IO(_) => crate::error::ErrorKind::Compute,
      46            0 :             CancelError::Postgres(e) if e.as_db_error().is_some() => {
      47            0 :                 crate::error::ErrorKind::Postgres
      48              :             }
      49            0 :             CancelError::Postgres(_) => crate::error::ErrorKind::Compute,
      50              :         }
      51            0 :     }
      52              : }
      53              : 
      54              : impl<P: CancellationPublisher> CancellationHandler<P> {
      55              :     /// Run async action within an ephemeral session identified by [`CancelKeyData`].
      56            2 :     pub fn get_session(self: Arc<Self>) -> Session<P> {
      57              :         // HACK: We'd rather get the real backend_pid but tokio_postgres doesn't
      58              :         // expose it and we don't want to do another roundtrip to query
      59              :         // for it. The client will be able to notice that this is not the
      60              :         // actual backend_pid, but backend_pid is not used for anything
      61              :         // so it doesn't matter.
      62            2 :         let key = loop {
      63            2 :             let key = rand::random();
      64            2 : 
      65            2 :             // Random key collisions are unlikely to happen here, but they're still possible,
      66            2 :             // which is why we have to take care not to rewrite an existing key.
      67            2 :             match self.map.entry(key) {
      68            0 :                 dashmap::mapref::entry::Entry::Occupied(_) => continue,
      69            2 :                 dashmap::mapref::entry::Entry::Vacant(e) => {
      70            2 :                     e.insert(None);
      71            2 :                 }
      72            2 :             }
      73            2 :             break key;
      74            2 :         };
      75            2 : 
      76            2 :         info!("registered new query cancellation key {key}");
      77            2 :         Session {
      78            2 :             key,
      79            2 :             cancellation_handler: self,
      80            2 :         }
      81            2 :     }
      82              :     /// Try to cancel a running query for the corresponding connection.
      83              :     /// If the cancellation key is not found, it will be published to Redis.
      84            2 :     pub async fn cancel_session(
      85            2 :         &self,
      86            2 :         key: CancelKeyData,
      87            2 :         session_id: Uuid,
      88            2 :     ) -> Result<(), CancelError> {
      89              :         // NB: we should immediately release the lock after cloning the token.
      90            2 :         let Some(cancel_closure) = self.map.get(&key).and_then(|x| x.clone()) else {
      91            2 :             tracing::warn!("query cancellation key not found: {key}");
      92            2 :             Metrics::get()
      93            2 :                 .proxy
      94            2 :                 .cancellation_requests_total
      95            2 :                 .inc(CancellationRequest {
      96            2 :                     source: self.from,
      97            2 :                     kind: crate::metrics::CancellationOutcome::NotFound,
      98            2 :                 });
      99            2 :             match self.client.try_publish(key, session_id).await {
     100            2 :                 Ok(()) => {} // do nothing
     101            0 :                 Err(e) => {
     102            0 :                     return Err(CancelError::IO(std::io::Error::new(
     103            0 :                         std::io::ErrorKind::Other,
     104            0 :                         e.to_string(),
     105            0 :                     )));
     106              :                 }
     107              :             }
     108            2 :             return Ok(());
     109              :         };
     110            0 :         Metrics::get()
     111            0 :             .proxy
     112            0 :             .cancellation_requests_total
     113            0 :             .inc(CancellationRequest {
     114            0 :                 source: self.from,
     115            0 :                 kind: crate::metrics::CancellationOutcome::Found,
     116            0 :             });
     117            0 :         info!("cancelling query per user's request using key {key}");
     118            0 :         cancel_closure.try_cancel_query().await
     119            2 :     }
     120              : 
     121              :     #[cfg(test)]
     122            2 :     fn contains(&self, session: &Session<P>) -> bool {
     123            2 :         self.map.contains_key(&session.key)
     124            2 :     }
     125              : 
     126              :     #[cfg(test)]
     127            2 :     fn is_empty(&self) -> bool {
     128            2 :         self.map.is_empty()
     129            2 :     }
     130              : }
     131              : 
     132              : impl CancellationHandler<()> {
     133            4 :     pub fn new(map: CancelMap, from: CancellationSource) -> Self {
     134            4 :         Self {
     135            4 :             map,
     136            4 :             client: (),
     137            4 :             from,
     138            4 :         }
     139            4 :     }
     140              : }
     141              : 
     142              : impl<P: CancellationPublisherMut> CancellationHandler<Option<Arc<Mutex<P>>>> {
     143            0 :     pub fn new(map: CancelMap, client: Option<Arc<Mutex<P>>>, from: CancellationSource) -> Self {
     144            0 :         Self { map, client, from }
     145            0 :     }
     146              : }
     147              : 
     148              : /// This should've been a [`std::future::Future`], but
     149              : /// it's impossible to name a type of an unboxed future
     150              : /// (we'd need something like `#![feature(type_alias_impl_trait)]`).
     151              : #[derive(Clone)]
     152              : pub struct CancelClosure {
     153              :     socket_addr: SocketAddr,
     154              :     cancel_token: CancelToken,
     155              : }
     156              : 
     157              : impl CancelClosure {
     158            0 :     pub fn new(socket_addr: SocketAddr, cancel_token: CancelToken) -> Self {
     159            0 :         Self {
     160            0 :             socket_addr,
     161            0 :             cancel_token,
     162            0 :         }
     163            0 :     }
     164              :     /// Cancels the query running on user's compute node.
     165            0 :     pub async fn try_cancel_query(self) -> Result<(), CancelError> {
     166            0 :         let socket = TcpStream::connect(self.socket_addr).await?;
     167            0 :         self.cancel_token.cancel_query_raw(socket, NoTls).await?;
     168            0 :         info!("query was cancelled");
     169            0 :         Ok(())
     170            0 :     }
     171              : }
     172              : 
     173              : /// Helper for registering query cancellation tokens.
     174              : pub struct Session<P> {
     175              :     /// The user-facing key identifying this session.
     176              :     key: CancelKeyData,
     177              :     /// The [`CancelMap`] this session belongs to.
     178              :     cancellation_handler: Arc<CancellationHandler<P>>,
     179              : }
     180              : 
     181              : impl<P> Session<P> {
     182              :     /// Store the cancel token for the given session.
     183              :     /// This enables query cancellation in `crate::proxy::prepare_client_connection`.
     184            0 :     pub fn enable_query_cancellation(&self, cancel_closure: CancelClosure) -> CancelKeyData {
     185            0 :         info!("enabling query cancellation for this session");
     186            0 :         self.cancellation_handler
     187            0 :             .map
     188            0 :             .insert(self.key, Some(cancel_closure));
     189            0 : 
     190            0 :         self.key
     191            0 :     }
     192              : }
     193              : 
     194              : impl<P> Drop for Session<P> {
     195            2 :     fn drop(&mut self) {
     196            2 :         self.cancellation_handler.map.remove(&self.key);
     197            2 :         info!("dropped query cancellation key {}", &self.key);
     198            2 :     }
     199              : }
     200              : 
     201              : #[cfg(test)]
     202              : mod tests {
     203              :     use super::*;
     204              : 
     205              :     #[tokio::test]
     206            2 :     async fn check_session_drop() -> anyhow::Result<()> {
     207            2 :         let cancellation_handler = Arc::new(CancellationHandler::<()>::new(
     208            2 :             CancelMap::default(),
     209            2 :             CancellationSource::FromRedis,
     210            2 :         ));
     211            2 : 
     212            2 :         let session = cancellation_handler.clone().get_session();
     213            2 :         assert!(cancellation_handler.contains(&session));
     214            2 :         drop(session);
     215            2 :         // Check that the session has been dropped.
     216            2 :         assert!(cancellation_handler.is_empty());
     217            2 : 
     218            2 :         Ok(())
     219            2 :     }
     220              : 
     221              :     #[tokio::test]
     222            2 :     async fn cancel_session_noop_regression() {
     223            2 :         let handler = CancellationHandler::<()>::new(Default::default(), CancellationSource::Local);
     224            2 :         handler
     225            2 :             .cancel_session(
     226            2 :                 CancelKeyData {
     227            2 :                     backend_pid: 0,
     228            2 :                     cancel_key: 0,
     229            2 :                 },
     230            2 :                 Uuid::new_v4(),
     231            2 :             )
     232            2 :             .await
     233            2 :             .unwrap();
     234            2 :     }
     235              : }
        

Generated by: LCOV version 2.1-beta