LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - protocol.rs (source / functions) Coverage Total Hit
Test: 1b0a6a0c05cee5a7de360813c8034804e105ce1c.info Lines: 0.0 % 53 0
Test Date: 2025-03-12 00:01:28 Functions: 0.0 % 101 0

            Line data    Source code
       1              : //! Types representing protocols and actual agent-monitor messages.
       2              : //!
       3              : //! The pervasive use of serde modifiers throughout this module is to ease
       4              : //! serialization on the go side. Because go does not have enums (which model
       5              : //! messages well), it is harder to model messages, and we accomodate that with
       6              : //! serde.
       7              : //!
       8              : //! *Note*: the agent sends and receives messages in different ways.
       9              : //!
      10              : //! The agent serializes messages in the form and then sends them. The use
      11              : //! of `#[serde(tag = "type", content = "content")]` allows us to use `Type`
      12              : //! to determine how to deserialize `Content`.
      13              : //! ```ignore
      14              : //! struct {
      15              : //!     Content any
      16              : //!     Type    string
      17              : //!     Id      uint64
      18              : //! }
      19              : //! ```
      20              : //! and receives messages in the form:
      21              : //! ```ignore
      22              : //! struct {
      23              : //!     {fields embedded}
      24              : //!     Type string
      25              : //!     Id   uint64
      26              : //! }
      27              : //! ```
      28              : //! After reading the type field, the agent will decode the entire message
      29              : //! again, this time into the correct type using the embedded fields.
      30              : //! Because the agent cannot just extract the json contained in a certain field
      31              : //! (it initially deserializes to `map[string]interface{}`), we keep the fields
      32              : //! at the top level, so the entire piece of json can be deserialized into a struct,
      33              : //! such as a `DownscaleResult`, with the `Type` and `Id` fields ignored.
      34              : 
      35              : use core::fmt;
      36              : use std::cmp;
      37              : 
      38              : use serde::de::Error;
      39              : use serde::{Deserialize, Serialize};
      40              : 
      41              : /// A Message we send to the agent.
      42            0 : #[derive(Serialize, Deserialize, Debug, Clone)]
      43              : pub struct OutboundMsg {
      44              :     #[serde(flatten)]
      45              :     pub(crate) inner: OutboundMsgKind,
      46              :     pub(crate) id: usize,
      47              : }
      48              : 
      49              : impl OutboundMsg {
      50            0 :     pub fn new(inner: OutboundMsgKind, id: usize) -> Self {
      51            0 :         Self { inner, id }
      52            0 :     }
      53              : }
      54              : 
      55              : /// The different underlying message types we can send to the agent.
      56            0 : #[derive(Serialize, Deserialize, Debug, Clone)]
      57              : #[serde(tag = "type")]
      58              : pub enum OutboundMsgKind {
      59              :     /// Indicates that the agent sent an invalid message, i.e, we couldn't
      60              :     /// properly deserialize it.
      61              :     InvalidMessage { error: String },
      62              :     /// Indicates that we experienced an internal error while processing a message.
      63              :     /// For example, if a cgroup operation fails while trying to handle an upscale,
      64              :     /// we return `InternalError`.
      65              :     InternalError { error: String },
      66              :     /// Returned to the agent once we have finished handling an upscale. If the
      67              :     /// handling was unsuccessful, an `InternalError` will get returned instead.
      68              :     /// *Note*: this is a struct variant because of the way go serializes struct{}
      69              :     UpscaleConfirmation {},
      70              :     /// Indicates to the monitor that we are urgently requesting resources.
      71              :     /// *Note*: this is a struct variant because of the way go serializes struct{}
      72              :     UpscaleRequest {},
      73              :     /// Returned to the agent once we have finished attempting to downscale. If
      74              :     /// an error occured trying to do so, an `InternalError` will get returned instead.
      75              :     /// However, if we are simply unsuccessful (for example, do to needing the resources),
      76              :     /// that gets included in the `DownscaleResult`.
      77              :     DownscaleResult {
      78              :         // FIXME for the future (once the informant is deprecated)
      79              :         // As of the time of writing, the agent/informant version of this struct is
      80              :         // called api.DownscaleResult. This struct has uppercase fields which are
      81              :         // serialized as such. Thus, we serialize using uppercase names so we don't
      82              :         // have to make a breaking change to the agent<->informant protocol. Once
      83              :         // the informant has been superseded by the monitor, we can add the correct
      84              :         // struct tags to api.DownscaleResult without causing a breaking change,
      85              :         // since we don't need to support the agent<->informant protocol anymore.
      86              :         #[serde(rename = "Ok")]
      87              :         ok: bool,
      88              :         #[serde(rename = "Status")]
      89              :         status: String,
      90              :     },
      91              :     /// Part of the bidirectional heartbeat. The heartbeat is initiated by the
      92              :     /// agent.
      93              :     /// *Note*: this is a struct variant because of the way go serializes struct{}
      94              :     HealthCheck {},
      95              : }
      96              : 
      97              : /// A message received form the agent.
      98            0 : #[derive(Serialize, Deserialize, Debug, Clone)]
      99              : pub struct InboundMsg {
     100              :     #[serde(flatten)]
     101              :     pub(crate) inner: InboundMsgKind,
     102              :     pub(crate) id: usize,
     103              : }
     104              : 
     105              : /// The different underlying message types we can receive from the agent.
     106            0 : #[derive(Serialize, Deserialize, Debug, Clone)]
     107              : #[serde(tag = "type", content = "content")]
     108              : pub enum InboundMsgKind {
     109              :     /// Indicates that the we sent an invalid message, i.e, we couldn't
     110              :     /// properly deserialize it.
     111              :     InvalidMessage { error: String },
     112              :     /// Indicates that the informan experienced an internal error while processing
     113              :     /// a message. For example, if it failed to request upsacle from the agent, it
     114              :     /// would return an `InternalError`.
     115              :     InternalError { error: String },
     116              :     /// Indicates to us that we have been granted more resources. We should respond
     117              :     /// with an `UpscaleConfirmation` when done handling the resources (increasins
     118              :     /// file cache size, cgorup memory limits).
     119              :     UpscaleNotification { granted: Resources },
     120              :     /// A request to reduce resource usage. We should response with a `DownscaleResult`,
     121              :     /// when done.
     122              :     DownscaleRequest { target: Resources },
     123              :     /// Part of the bidirectional heartbeat. The heartbeat is initiated by the
     124              :     /// agent.
     125              :     /// *Note*: this is a struct variant because of the way go serializes struct{}
     126              :     HealthCheck {},
     127              : }
     128              : 
     129              : /// Represents the resources granted to a VM.
     130            0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy)]
     131              : // Renamed because the agent has multiple resources types:
     132              : // `Resources` (milliCPU/memory slots)
     133              : // `Allocation` (vCPU/bytes) <- what we correspond to
     134              : #[serde(rename(serialize = "Allocation", deserialize = "Allocation"))]
     135              : pub struct Resources {
     136              :     /// Number of vCPUs
     137              :     pub(crate) cpu: f64,
     138              :     /// Bytes of memory
     139              :     pub(crate) mem: u64,
     140              : }
     141              : 
     142              : impl Resources {
     143            0 :     pub fn new(cpu: f64, mem: u64) -> Self {
     144            0 :         Self { cpu, mem }
     145            0 :     }
     146              : }
     147              : 
     148              : pub const PROTOCOL_MIN_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
     149              : pub const PROTOCOL_MAX_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
     150              : 
     151            0 : #[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
     152              : pub struct ProtocolVersion(u8);
     153              : 
     154              : impl ProtocolVersion {
     155              :     /// Represents v1.0 of the agent<-> monitor protocol - the initial version
     156              :     ///
     157              :     /// Currently the latest version.
     158              :     const V1_0: ProtocolVersion = ProtocolVersion(1);
     159              : }
     160              : 
     161              : impl fmt::Display for ProtocolVersion {
     162            0 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     163            0 :         match *self {
     164            0 :             ProtocolVersion(0) => f.write_str("<invalid: zero>"),
     165            0 :             ProtocolVersion::V1_0 => f.write_str("v1.0"),
     166            0 :             other => write!(f, "<unknown: {other}>"),
     167              :         }
     168            0 :     }
     169              : }
     170              : 
     171              : /// A set of protocol bounds that determines what we are speaking.
     172              : ///
     173              : /// These bounds are inclusive.
     174              : #[derive(Debug)]
     175              : pub struct ProtocolRange {
     176              :     pub min: ProtocolVersion,
     177              :     pub max: ProtocolVersion,
     178              : }
     179              : 
     180              : // Use a custom deserialize impl to ensure that `self.min <= self.max`
     181              : impl<'de> Deserialize<'de> for ProtocolRange {
     182            0 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     183            0 :     where
     184            0 :         D: serde::Deserializer<'de>,
     185            0 :     {
     186            0 :         #[derive(Deserialize)]
     187              :         struct InnerProtocolRange {
     188              :             min: ProtocolVersion,
     189              :             max: ProtocolVersion,
     190              :         }
     191            0 :         let InnerProtocolRange { min, max } = InnerProtocolRange::deserialize(deserializer)?;
     192            0 :         if min > max {
     193            0 :             Err(D::Error::custom(format!(
     194            0 :                 "min version = {min} is greater than max version = {max}",
     195            0 :             )))
     196              :         } else {
     197            0 :             Ok(ProtocolRange { min, max })
     198              :         }
     199            0 :     }
     200              : }
     201              : 
     202              : impl fmt::Display for ProtocolRange {
     203            0 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     204            0 :         if self.min == self.max {
     205            0 :             f.write_fmt(format_args!("{}", self.max))
     206              :         } else {
     207            0 :             f.write_fmt(format_args!("{} to {}", self.min, self.max))
     208              :         }
     209            0 :     }
     210              : }
     211              : 
     212              : impl ProtocolRange {
     213              :     /// Find the highest shared version between two `ProtocolRange`'s
     214            0 :     pub fn highest_shared_version(&self, other: &Self) -> anyhow::Result<ProtocolVersion> {
     215            0 :         // We first have to make sure the ranges are overlapping. Once we know
     216            0 :         // this, we can merge the ranges by taking the max of the mins and the
     217            0 :         // mins of the maxes.
     218            0 :         if self.min > other.max {
     219            0 :             anyhow::bail!(
     220            0 :                 "Non-overlapping bounds: other.max = {} was less than self.min = {}",
     221            0 :                 other.max,
     222            0 :                 self.min,
     223            0 :             )
     224            0 :         } else if self.max < other.min {
     225            0 :             anyhow::bail!(
     226            0 :                 "Non-overlappinng bounds: self.max = {} was less than other.min = {}",
     227            0 :                 self.max,
     228            0 :                 other.min
     229            0 :             )
     230              :         } else {
     231            0 :             Ok(cmp::min(self.max, other.max))
     232              :         }
     233            0 :     }
     234              : }
     235              : 
     236              : /// We send this to the monitor after negotiating which protocol to use
     237              : #[derive(Serialize, Debug)]
     238              : #[serde(rename_all = "camelCase")]
     239              : pub enum ProtocolResponse {
     240              :     Error(String),
     241              :     Version(ProtocolVersion),
     242              : }
        

Generated by: LCOV version 2.1-beta