LCOV - differential code coverage report
Current view: top level - libs/vm_monitor/src - runner.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 108 0 108
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 48 0 48
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Exposes the `Runner`, which handles messages received from agent and
       2                 : //! sends upscale requests.
       3                 : //!
       4                 : //! This is the "Monitor" part of the monitor binary and is the main entrypoint for
       5                 : //! all functionality.
       6                 : 
       7                 : use std::fmt::Debug;
       8                 : use std::time::{Duration, Instant};
       9                 : 
      10                 : use anyhow::{bail, Context};
      11                 : use axum::extract::ws::{Message, WebSocket};
      12                 : use futures::StreamExt;
      13                 : use tokio::sync::{broadcast, watch};
      14                 : use tokio_util::sync::CancellationToken;
      15                 : use tracing::{error, info, warn};
      16                 : 
      17                 : use crate::cgroup::{self, CgroupWatcher};
      18                 : use crate::dispatcher::Dispatcher;
      19                 : use crate::filecache::{FileCacheConfig, FileCacheState};
      20                 : use crate::protocol::{InboundMsg, InboundMsgKind, OutboundMsg, OutboundMsgKind, Resources};
      21                 : use crate::{bytes_to_mebibytes, get_total_system_memory, spawn_with_cancel, Args, MiB};
      22                 : 
      23                 : /// Central struct that interacts with agent, dispatcher, and cgroup to handle
      24                 : /// signals from the agent.
      25 UBC           0 : #[derive(Debug)]
      26                 : pub struct Runner {
      27                 :     config: Config,
      28                 :     filecache: Option<FileCacheState>,
      29                 :     cgroup: Option<CgroupState>,
      30                 :     dispatcher: Dispatcher,
      31                 : 
      32                 :     /// We "mint" new message ids by incrementing this counter and taking the value.
      33                 :     ///
      34                 :     /// **Note**: This counter is always odd, so that we avoid collisions between the IDs generated
      35                 :     /// by us vs the autoscaler-agent.
      36                 :     counter: usize,
      37                 : 
      38                 :     last_upscale_request_at: Option<Instant>,
      39                 : 
      40                 :     /// A signal to kill the main thread produced by `self.run()`. This is triggered
      41                 :     /// when the server receives a new connection. When the thread receives the
      42                 :     /// signal off this channel, it will gracefully shutdown.
      43                 :     kill: broadcast::Receiver<()>,
      44                 : }
      45                 : 
      46               0 : #[derive(Debug)]
      47                 : struct CgroupState {
      48                 :     watcher: watch::Receiver<(Instant, cgroup::MemoryHistory)>,
      49                 :     /// If [`cgroup::MemoryHistory::avg_non_reclaimable`] exceeds `threshold`, we send upscale
      50                 :     /// requests.
      51                 :     threshold: u64,
      52                 : }
      53                 : 
      54                 : /// Configuration for a `Runner`
      55               0 : #[derive(Debug)]
      56                 : pub struct Config {
      57                 :     /// `sys_buffer_bytes` gives the estimated amount of memory, in bytes, that the kernel uses before
      58                 :     /// handing out the rest to userspace. This value is the estimated difference between the
      59                 :     /// *actual* physical memory and the amount reported by `grep MemTotal /proc/meminfo`.
      60                 :     ///
      61                 :     /// For more information, refer to `man 5 proc`, which defines MemTotal as "Total usable RAM
      62                 :     /// (i.e., physical RAM minus a few reserved bits and the kernel binary code)".
      63                 :     ///
      64                 :     /// We only use `sys_buffer_bytes` when calculating the system memory from the *external* memory
      65                 :     /// size, rather than the self-reported memory size, according to the kernel.
      66                 :     ///
      67                 :     /// TODO: this field is only necessary while we still have to trust the autoscaler-agent's
      68                 :     /// upscale resource amounts (because we might not *actually* have been upscaled yet). This field
      69                 :     /// should be removed once we have a better solution there.
      70                 :     sys_buffer_bytes: u64,
      71                 : 
      72                 :     /// Minimum fraction of total system memory reserved *before* the the cgroup threshold; in
      73                 :     /// other words, providing a ceiling for the highest value of the threshold by enforcing that
      74                 :     /// there's at least `cgroup_min_overhead_fraction` of the total memory remaining beyond the
      75                 :     /// threshold.
      76                 :     ///
      77                 :     /// For example, a value of `0.1` means that 10% of total memory must remain after exceeding
      78                 :     /// the threshold, so the value of the cgroup threshold would always be capped at 90% of total
      79                 :     /// memory.
      80                 :     ///
      81                 :     /// The default value of `0.15` means that we *guarantee* sending upscale requests if the
      82                 :     /// cgroup is using more than 85% of total memory (even if we're *not* separately reserving
      83                 :     /// memory for the file cache).
      84                 :     cgroup_min_overhead_fraction: f64,
      85                 : 
      86                 :     cgroup_downscale_threshold_buffer_bytes: u64,
      87                 : }
      88                 : 
      89                 : impl Default for Config {
      90               0 :     fn default() -> Self {
      91               0 :         Self {
      92               0 :             sys_buffer_bytes: 100 * MiB,
      93               0 :             cgroup_min_overhead_fraction: 0.15,
      94               0 :             cgroup_downscale_threshold_buffer_bytes: 100 * MiB,
      95               0 :         }
      96               0 :     }
      97                 : }
      98                 : 
      99                 : impl Config {
     100               0 :     fn cgroup_threshold(&self, total_mem: u64, file_cache_disk_size: u64) -> u64 {
     101               0 :         // If the file cache is in tmpfs, then it will count towards shmem usage of the cgroup,
     102               0 :         // and thus be non-reclaimable, so we should allow for additional memory usage.
     103               0 :         //
     104               0 :         // If the file cache sits on disk, our desired stable system state is for it to be fully
     105               0 :         // page cached (its contents should only be paged to/from disk in situations where we can't
     106               0 :         // upscale fast enough). Page-cached memory is reclaimable, so we need to lower the
     107               0 :         // threshold for non-reclaimable memory so we scale up *before* the kernel starts paging
     108               0 :         // out the file cache.
     109               0 :         let memory_remaining_for_cgroup = total_mem.saturating_sub(file_cache_disk_size);
     110               0 : 
     111               0 :         // Even if we're not separately making room for the file cache (if it's in tmpfs), we still
     112               0 :         // want our threshold to be met gracefully instead of letting postgres get OOM-killed.
     113               0 :         // So we guarantee that there's at least `cgroup_min_overhead_fraction` of total memory
     114               0 :         // remaining above the threshold.
     115               0 :         let max_threshold = (total_mem as f64 * (1.0 - self.cgroup_min_overhead_fraction)) as u64;
     116               0 : 
     117               0 :         memory_remaining_for_cgroup.min(max_threshold)
     118               0 :     }
     119                 : }
     120                 : 
     121                 : impl Runner {
     122                 :     /// Create a new monitor.
     123               0 :     #[tracing::instrument(skip_all, fields(?config, ?args))]
     124                 :     pub async fn new(
     125                 :         config: Config,
     126                 :         args: &Args,
     127                 :         ws: WebSocket,
     128                 :         kill: broadcast::Receiver<()>,
     129                 :         token: CancellationToken,
     130                 :     ) -> anyhow::Result<Runner> {
     131                 :         anyhow::ensure!(
     132                 :             config.sys_buffer_bytes != 0,
     133                 :             "invalid monitor Config: sys_buffer_bytes cannot be 0"
     134                 :         );
     135                 : 
     136                 :         let dispatcher = Dispatcher::new(ws)
     137                 :             .await
     138                 :             .context("error creating new dispatcher")?;
     139                 : 
     140                 :         let mut state = Runner {
     141                 :             config,
     142                 :             filecache: None,
     143                 :             cgroup: None,
     144                 :             dispatcher,
     145                 :             counter: 1, // NB: must be odd, see the comment about the field for more.
     146                 :             last_upscale_request_at: None,
     147                 :             kill,
     148                 :         };
     149                 : 
     150                 :         let mem = get_total_system_memory();
     151                 : 
     152                 :         let mut file_cache_disk_size = 0;
     153                 : 
     154                 :         // We need to process file cache initialization before cgroup initialization, so that the memory
     155                 :         // allocated to the file cache is appropriately taken into account when we decide the cgroup's
     156                 :         // memory limits.
     157                 :         if let Some(connstr) = &args.pgconnstr {
     158               0 :             info!("initializing file cache");
     159                 :             let config = match args.file_cache_on_disk {
     160                 :                 true => FileCacheConfig::default_on_disk(),
     161                 :                 false => FileCacheConfig::default_in_memory(),
     162                 :             };
     163                 : 
     164                 :             let mut file_cache = FileCacheState::new(connstr, config, token.clone())
     165                 :                 .await
     166                 :                 .context("failed to create file cache")?;
     167                 : 
     168                 :             let size = file_cache
     169                 :                 .get_file_cache_size()
     170                 :                 .await
     171                 :                 .context("error getting file cache size")?;
     172                 : 
     173                 :             let new_size = file_cache.config.calculate_cache_size(mem);
     174               0 :             info!(
     175               0 :                 initial = bytes_to_mebibytes(size),
     176               0 :                 new = bytes_to_mebibytes(new_size),
     177               0 :                 "setting initial file cache size",
     178               0 :             );
     179                 : 
     180                 :             // note: even if size == new_size, we want to explicitly set it, just
     181                 :             // to make sure that we have the permissions to do so
     182                 :             let actual_size = file_cache
     183                 :                 .set_file_cache_size(new_size)
     184                 :                 .await
     185                 :                 .context("failed to set file cache size, possibly due to inadequate permissions")?;
     186                 :             if actual_size != new_size {
     187               0 :                 info!("file cache size actually got set to {actual_size}")
     188                 :             }
     189                 : 
     190                 :             if args.file_cache_on_disk {
     191                 :                 file_cache_disk_size = actual_size;
     192                 :             }
     193                 : 
     194                 :             state.filecache = Some(file_cache);
     195                 :         }
     196                 : 
     197                 :         if let Some(name) = &args.cgroup {
     198                 :             // Best not to set up cgroup stuff more than once, so we'll initialize cgroup state
     199                 :             // now, and then set limits later.
     200               0 :             info!("initializing cgroup");
     201                 : 
     202                 :             let cgroup =
     203                 :                 CgroupWatcher::new(name.clone()).context("failed to create cgroup manager")?;
     204                 : 
     205                 :             let init_value = cgroup::MemoryHistory {
     206                 :                 avg_non_reclaimable: 0,
     207                 :                 samples_count: 0,
     208                 :                 samples_span: Duration::ZERO,
     209                 :             };
     210                 :             let (hist_tx, hist_rx) = watch::channel((Instant::now(), init_value));
     211                 : 
     212               0 :             spawn_with_cancel(token, |_| error!("cgroup watcher terminated"), async move {
     213               0 :                 cgroup.watch(hist_tx).await
     214               0 :             });
     215                 : 
     216                 :             let threshold = state.config.cgroup_threshold(mem, file_cache_disk_size);
     217               0 :             info!(threshold, "set initial cgroup threshold",);
     218                 : 
     219                 :             state.cgroup = Some(CgroupState {
     220                 :                 watcher: hist_rx,
     221                 :                 threshold,
     222                 :             });
     223                 :         }
     224                 : 
     225                 :         Ok(state)
     226                 :     }
     227                 : 
     228                 :     /// Attempt to downscale filecache + cgroup
     229               0 :     #[tracing::instrument(skip_all, fields(?target))]
     230                 :     pub async fn try_downscale(&mut self, target: Resources) -> anyhow::Result<(bool, String)> {
     231                 :         // Nothing to adjust
     232                 :         if self.cgroup.is_none() && self.filecache.is_none() {
     233               0 :             info!("no action needed for downscale (no cgroup or file cache enabled)");
     234                 :             return Ok((
     235                 :                 true,
     236                 :                 "monitor is not managing cgroup or file cache".to_string(),
     237                 :             ));
     238                 :         }
     239                 : 
     240                 :         let requested_mem = target.mem;
     241                 :         let usable_system_memory = requested_mem.saturating_sub(self.config.sys_buffer_bytes);
     242                 :         let (expected_file_cache_size, expected_file_cache_disk_size) = self
     243                 :             .filecache
     244                 :             .as_ref()
     245               0 :             .map(|file_cache| {
     246               0 :                 let size = file_cache.config.calculate_cache_size(usable_system_memory);
     247               0 :                 match file_cache.config.in_memory {
     248               0 :                     true => (size, 0),
     249               0 :                     false => (size, size),
     250                 :                 }
     251               0 :             })
     252                 :             .unwrap_or((0, 0));
     253                 :         if let Some(cgroup) = &self.cgroup {
     254                 :             let (last_time, last_history) = *cgroup.watcher.borrow();
     255                 : 
     256                 :             // TODO: make the duration here configurable.
     257                 :             if last_time.elapsed() > Duration::from_secs(5) {
     258                 :                 bail!("haven't gotten cgroup memory stats recently enough to determine downscaling information");
     259                 :             } else if last_history.samples_count <= 1 {
     260                 :                 bail!("haven't received enough cgroup memory stats yet");
     261                 :             }
     262                 : 
     263                 :             let new_threshold = self
     264                 :                 .config
     265                 :                 .cgroup_threshold(usable_system_memory, expected_file_cache_disk_size);
     266                 : 
     267                 :             let current = last_history.avg_non_reclaimable;
     268                 : 
     269                 :             if new_threshold < current + self.config.cgroup_downscale_threshold_buffer_bytes {
     270                 :                 let status = format!(
     271                 :                     "{}: {} MiB (new threshold) < {} (current usage) + {} (downscale buffer)",
     272                 :                     "calculated memory threshold too low",
     273                 :                     bytes_to_mebibytes(new_threshold),
     274                 :                     bytes_to_mebibytes(current),
     275                 :                     bytes_to_mebibytes(self.config.cgroup_downscale_threshold_buffer_bytes)
     276                 :                 );
     277                 : 
     278               0 :                 info!(status, "discontinuing downscale");
     279                 : 
     280                 :                 return Ok((false, status));
     281                 :             }
     282                 :         }
     283                 : 
     284                 :         // The downscaling has been approved. Downscale the file cache, then the cgroup.
     285                 :         let mut status = vec![];
     286                 :         let mut file_cache_disk_size = 0;
     287                 :         if let Some(file_cache) = &mut self.filecache {
     288                 :             let actual_usage = file_cache
     289                 :                 .set_file_cache_size(expected_file_cache_size)
     290                 :                 .await
     291                 :                 .context("failed to set file cache size")?;
     292                 :             if !file_cache.config.in_memory {
     293                 :                 file_cache_disk_size = actual_usage;
     294                 :             }
     295                 :             let message = format!(
     296                 :                 "set file cache size to {} MiB (in memory = {})",
     297                 :                 bytes_to_mebibytes(actual_usage),
     298                 :                 file_cache.config.in_memory,
     299                 :             );
     300               0 :             info!("downscale: {message}");
     301                 :             status.push(message);
     302                 :         }
     303                 : 
     304                 :         if let Some(cgroup) = &mut self.cgroup {
     305                 :             let new_threshold = self
     306                 :                 .config
     307                 :                 .cgroup_threshold(usable_system_memory, file_cache_disk_size);
     308                 : 
     309                 :             let message = format!(
     310                 :                 "set cgroup memory threshold from {} MiB to {} MiB, of new total {} MiB",
     311                 :                 bytes_to_mebibytes(cgroup.threshold),
     312                 :                 bytes_to_mebibytes(new_threshold),
     313                 :                 bytes_to_mebibytes(usable_system_memory)
     314                 :             );
     315                 :             cgroup.threshold = new_threshold;
     316               0 :             info!("downscale: {message}");
     317                 :             status.push(message);
     318                 :         }
     319                 : 
     320                 :         // TODO: make this status thing less jank
     321                 :         let status = status.join("; ");
     322                 :         Ok((true, status))
     323                 :     }
     324                 : 
     325                 :     /// Handle new resources
     326               0 :     #[tracing::instrument(skip_all, fields(?resources))]
     327                 :     pub async fn handle_upscale(&mut self, resources: Resources) -> anyhow::Result<()> {
     328                 :         if self.filecache.is_none() && self.cgroup.is_none() {
     329               0 :             info!("no action needed for upscale (no cgroup or file cache enabled)");
     330                 :             return Ok(());
     331                 :         }
     332                 : 
     333                 :         let new_mem = resources.mem;
     334                 :         let usable_system_memory = new_mem.saturating_sub(self.config.sys_buffer_bytes);
     335                 : 
     336                 :         let mut file_cache_disk_size = 0;
     337                 :         if let Some(file_cache) = &mut self.filecache {
     338                 :             let expected_usage = file_cache.config.calculate_cache_size(usable_system_memory);
     339               0 :             info!(
     340               0 :                 target = bytes_to_mebibytes(expected_usage),
     341               0 :                 total = bytes_to_mebibytes(new_mem),
     342               0 :                 "updating file cache size",
     343               0 :             );
     344                 : 
     345                 :             let actual_usage = file_cache
     346                 :                 .set_file_cache_size(expected_usage)
     347                 :                 .await
     348                 :                 .context("failed to set file cache size")?;
     349                 :             if !file_cache.config.in_memory {
     350                 :                 file_cache_disk_size = actual_usage;
     351                 :             }
     352                 : 
     353                 :             if actual_usage != expected_usage {
     354               0 :                 warn!(
     355               0 :                     "file cache was set to a different size that we wanted: target = {} Mib, actual= {} Mib",
     356               0 :                     bytes_to_mebibytes(expected_usage),
     357               0 :                     bytes_to_mebibytes(actual_usage)
     358               0 :                 )
     359                 :             }
     360                 :         }
     361                 : 
     362                 :         if let Some(cgroup) = &mut self.cgroup {
     363                 :             let new_threshold = self
     364                 :                 .config
     365                 :                 .cgroup_threshold(usable_system_memory, file_cache_disk_size);
     366                 : 
     367               0 :             info!(
     368               0 :                 "set cgroup memory threshold from {} MiB to {} MiB of new total {} MiB",
     369               0 :                 bytes_to_mebibytes(cgroup.threshold),
     370               0 :                 bytes_to_mebibytes(new_threshold),
     371               0 :                 bytes_to_mebibytes(usable_system_memory)
     372               0 :             );
     373                 :             cgroup.threshold = new_threshold;
     374                 :         }
     375                 : 
     376                 :         Ok(())
     377                 :     }
     378                 : 
     379                 :     /// Take in a message and perform some action, such as downscaling or upscaling,
     380                 :     /// and return a message to be send back.
     381               0 :     #[tracing::instrument(skip_all, fields(%id, message = ?inner))]
     382                 :     pub async fn process_message(
     383                 :         &mut self,
     384                 :         InboundMsg { inner, id }: InboundMsg,
     385                 :     ) -> anyhow::Result<Option<OutboundMsg>> {
     386                 :         match inner {
     387                 :             InboundMsgKind::UpscaleNotification { granted } => {
     388                 :                 self.handle_upscale(granted)
     389                 :                     .await
     390                 :                     .context("failed to handle upscale")?;
     391                 :                 Ok(Some(OutboundMsg::new(
     392                 :                     OutboundMsgKind::UpscaleConfirmation {},
     393                 :                     id,
     394                 :                 )))
     395                 :             }
     396                 :             InboundMsgKind::DownscaleRequest { target } => self
     397                 :                 .try_downscale(target)
     398                 :                 .await
     399                 :                 .context("failed to downscale")
     400               0 :                 .map(|(ok, status)| {
     401               0 :                     Some(OutboundMsg::new(
     402               0 :                         OutboundMsgKind::DownscaleResult { ok, status },
     403               0 :                         id,
     404               0 :                     ))
     405               0 :                 }),
     406                 :             InboundMsgKind::InvalidMessage { error } => {
     407               0 :                 warn!(
     408               0 :                     %error, id, "received notification of an invalid message we sent"
     409               0 :                 );
     410                 :                 Ok(None)
     411                 :             }
     412                 :             InboundMsgKind::InternalError { error } => {
     413               0 :                 warn!(error, id, "agent experienced an internal error");
     414                 :                 Ok(None)
     415                 :             }
     416                 :             InboundMsgKind::HealthCheck {} => {
     417                 :                 Ok(Some(OutboundMsg::new(OutboundMsgKind::HealthCheck {}, id)))
     418                 :             }
     419                 :         }
     420                 :     }
     421                 : 
     422                 :     // TODO: don't propagate errors, probably just warn!?
     423               0 :     #[tracing::instrument(skip_all)]
     424                 :     pub async fn run(&mut self) -> anyhow::Result<()> {
     425               0 :         info!("starting dispatcher");
     426                 :         loop {
     427               0 :             tokio::select! {
     428               0 :                 signal = self.kill.recv() => {
     429                 :                     match signal {
     430                 :                         Ok(()) => return Ok(()),
     431                 :                         Err(e) => bail!("failed to receive kill signal: {e}")
     432                 :                     }
     433                 :                 }
     434                 : 
     435                 :                 // New memory stats from the cgroup, *may* need to request upscaling, if we've
     436                 :                 // exceeded the threshold
     437               0 :                 result = self.cgroup.as_mut().unwrap().watcher.changed(), if self.cgroup.is_some() => {
     438                 :                     result.context("failed to receive from cgroup memory stats watcher")?;
     439                 : 
     440                 :                     let cgroup = self.cgroup.as_ref().unwrap();
     441                 : 
     442                 :                     let (_time, cgroup_mem_stat) = *cgroup.watcher.borrow();
     443                 : 
     444                 :                     // If we haven't exceeded the threshold, then we're all ok
     445                 :                     if cgroup_mem_stat.avg_non_reclaimable < cgroup.threshold {
     446                 :                         continue;
     447                 :                     }
     448                 : 
     449                 :                     // Otherwise, we generally want upscaling. But, if it's been less than 1 second
     450                 :                     // since the last time we requested upscaling, ignore the event, to avoid
     451                 :                     // spamming the agent.
     452                 :                     if let Some(t) = self.last_upscale_request_at {
     453                 :                         let elapsed = t.elapsed();
     454                 :                         if elapsed < Duration::from_secs(1) {
     455               0 :                             info!(
     456               0 :                                 elapsed_millis = elapsed.as_millis(),
     457               0 :                                 avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
     458               0 :                                 threshold = bytes_to_mebibytes(cgroup.threshold),
     459               0 :                                 "cgroup memory stats are high enough to upscale but too soon to forward the request, ignoring",
     460               0 :                             );
     461                 :                             continue;
     462                 :                         }
     463                 :                     }
     464                 : 
     465                 :                     self.last_upscale_request_at = Some(Instant::now());
     466                 : 
     467               0 :                     info!(
     468               0 :                         avg_non_reclaimable = bytes_to_mebibytes(cgroup_mem_stat.avg_non_reclaimable),
     469               0 :                         threshold = bytes_to_mebibytes(cgroup.threshold),
     470               0 :                         "cgroup memory stats are high enough to upscale, requesting upscale",
     471               0 :                     );
     472                 : 
     473                 :                     self.counter += 2; // Increment, preserving parity (i.e. keep the
     474                 :                                        // counter odd). See the field comment for more.
     475                 :                     self.dispatcher
     476                 :                         .send(OutboundMsg::new(OutboundMsgKind::UpscaleRequest {}, self.counter))
     477                 :                         .await
     478                 :                         .context("failed to send message")?;
     479                 :                 },
     480                 : 
     481                 :                 // there is a message from the agent
     482               0 :                 msg = self.dispatcher.source.next() => {
     483                 :                     if let Some(msg) = msg {
     484                 :                         // Don't use 'message' as a key as the string also uses
     485                 :                         // that for its key
     486               0 :                         info!(?msg, "received message");
     487                 :                         match msg {
     488                 :                             Ok(msg) => {
     489                 :                                 let message: InboundMsg = match msg {
     490                 :                                     Message::Text(text) => {
     491                 :                                         serde_json::from_str(&text).context("failed to deserialize text message")?
     492                 :                                     }
     493                 :                                     other => {
     494               0 :                                         warn!(
     495               0 :                                             // Don't use 'message' as a key as the
     496               0 :                                             // string also uses that for its key
     497               0 :                                             msg = ?other,
     498               0 :                                             "agent should only send text messages but received different type"
     499               0 :                                         );
     500                 :                                         continue
     501                 :                                     },
     502                 :                                 };
     503                 : 
     504                 :                                 let out = match self.process_message(message.clone()).await {
     505                 :                                     Ok(Some(out)) => out,
     506                 :                                     Ok(None) => continue,
     507                 :                                     Err(e) => {
     508                 :                                         let error = e.to_string();
     509               0 :                                         warn!(?error, "error handling message");
     510                 :                                         OutboundMsg::new(
     511                 :                                             OutboundMsgKind::InternalError {
     512                 :                                                 error
     513                 :                                             },
     514                 :                                             message.id
     515                 :                                         )
     516                 :                                     }
     517                 :                                 };
     518                 : 
     519                 :                                 self.dispatcher
     520                 :                                     .send(out)
     521                 :                                     .await
     522                 :                                     .context("failed to send message")?;
     523                 :                             }
     524               0 :                             Err(e) => warn!("{e}"),
     525                 :                         }
     526                 :                     } else {
     527                 :                         anyhow::bail!("dispatcher connection closed")
     528                 :                     }
     529                 :                 }
     530                 :             }
     531                 :         }
     532                 :     }
     533                 : }
        

Generated by: LCOV version 2.1-beta