LCOV - code coverage report
Current view: top level - libs/vm_monitor/src - protocol.rs (source / functions) Coverage Total Hit
Test: a43a77853355b937a79c57b07a8f05607cf29e6c.info Lines: 0.0 % 53 0
Test Date: 2024-09-19 12:04:32 Functions: 0.0 % 131 0

            Line data    Source code
       1              : //! Types representing protocols and actual agent-monitor messages.
       2              : //!
       3              : //! The pervasive use of serde modifiers throughout this module is to ease
       4              : //! serialization on the go side. Because go does not have enums (which model
       5              : //! messages well), it is harder to model messages, and we accomodate that with
       6              : //! serde.
       7              : //!
       8              : //! *Note*: the agent sends and receives messages in different ways.
       9              : //!
      10              : //! The agent serializes messages in the form and then sends them. The use
      11              : //! of `#[serde(tag = "type", content = "content")]` allows us to use `Type`
      12              : //! to determine how to deserialize `Content`.
      13              : //! ```ignore
      14              : //! struct {
      15              : //!     Content any
      16              : //!     Type    string
      17              : //!     Id      uint64
      18              : //! }
      19              : //! ```
      20              : //! and receives messages in the form:
      21              : //! ```ignore
      22              : //! struct {
      23              : //!     {fields embedded}
      24              : //!     Type string
      25              : //!     Id   uint64
      26              : //! }
      27              : //! ```
      28              : //! After reading the type field, the agent will decode the entire message
      29              : //! again, this time into the correct type using the embedded fields.
      30              : //! Because the agent cannot just extract the json contained in a certain field
      31              : //! (it initially deserializes to `map[string]interface{}`), we keep the fields
      32              : //! at the top level, so the entire piece of json can be deserialized into a struct,
      33              : //! such as a `DownscaleResult`, with the `Type` and `Id` fields ignored.
      34              : 
      35              : use core::fmt;
      36              : use std::cmp;
      37              : 
      38              : use serde::{de::Error, Deserialize, Serialize};
      39              : 
      40              : /// A Message we send to the agent.
      41            0 : #[derive(Serialize, Deserialize, Debug, Clone)]
      42              : pub struct OutboundMsg {
      43              :     #[serde(flatten)]
      44              :     pub(crate) inner: OutboundMsgKind,
      45              :     pub(crate) id: usize,
      46              : }
      47              : 
      48              : impl OutboundMsg {
      49            0 :     pub fn new(inner: OutboundMsgKind, id: usize) -> Self {
      50            0 :         Self { inner, id }
      51            0 :     }
      52              : }
      53              : 
      54              : /// The different underlying message types we can send to the agent.
      55            0 : #[derive(Serialize, Deserialize, Debug, Clone)]
      56              : #[serde(tag = "type")]
      57              : pub enum OutboundMsgKind {
      58              :     /// Indicates that the agent sent an invalid message, i.e, we couldn't
      59              :     /// properly deserialize it.
      60              :     InvalidMessage { error: String },
      61              :     /// Indicates that we experienced an internal error while processing a message.
      62              :     /// For example, if a cgroup operation fails while trying to handle an upscale,
      63              :     /// we return `InternalError`.
      64              :     InternalError { error: String },
      65              :     /// Returned to the agent once we have finished handling an upscale. If the
      66              :     /// handling was unsuccessful, an `InternalError` will get returned instead.
      67              :     /// *Note*: this is a struct variant because of the way go serializes struct{}
      68              :     UpscaleConfirmation {},
      69              :     /// Indicates to the monitor that we are urgently requesting resources.
      70              :     /// *Note*: this is a struct variant because of the way go serializes struct{}
      71              :     UpscaleRequest {},
      72              :     /// Returned to the agent once we have finished attempting to downscale. If
      73              :     /// an error occured trying to do so, an `InternalError` will get returned instead.
      74              :     /// However, if we are simply unsuccessful (for example, do to needing the resources),
      75              :     /// that gets included in the `DownscaleResult`.
      76              :     DownscaleResult {
      77              :         // FIXME for the future (once the informant is deprecated)
      78              :         // As of the time of writing, the agent/informant version of this struct is
      79              :         // called api.DownscaleResult. This struct has uppercase fields which are
      80              :         // serialized as such. Thus, we serialize using uppercase names so we don't
      81              :         // have to make a breaking change to the agent<->informant protocol. Once
      82              :         // the informant has been superseded by the monitor, we can add the correct
      83              :         // struct tags to api.DownscaleResult without causing a breaking change,
      84              :         // since we don't need to support the agent<->informant protocol anymore.
      85              :         #[serde(rename = "Ok")]
      86              :         ok: bool,
      87              :         #[serde(rename = "Status")]
      88              :         status: String,
      89              :     },
      90              :     /// Part of the bidirectional heartbeat. The heartbeat is initiated by the
      91              :     /// agent.
      92              :     /// *Note*: this is a struct variant because of the way go serializes struct{}
      93              :     HealthCheck {},
      94              : }
      95              : 
      96              : /// A message received form the agent.
      97            0 : #[derive(Serialize, Deserialize, Debug, Clone)]
      98              : pub struct InboundMsg {
      99              :     #[serde(flatten)]
     100              :     pub(crate) inner: InboundMsgKind,
     101              :     pub(crate) id: usize,
     102              : }
     103              : 
     104              : /// The different underlying message types we can receive from the agent.
     105            0 : #[derive(Serialize, Deserialize, Debug, Clone)]
     106              : #[serde(tag = "type", content = "content")]
     107              : pub enum InboundMsgKind {
     108              :     /// Indicates that the we sent an invalid message, i.e, we couldn't
     109              :     /// properly deserialize it.
     110              :     InvalidMessage { error: String },
     111              :     /// Indicates that the informan experienced an internal error while processing
     112              :     /// a message. For example, if it failed to request upsacle from the agent, it
     113              :     /// would return an `InternalError`.
     114              :     InternalError { error: String },
     115              :     /// Indicates to us that we have been granted more resources. We should respond
     116              :     /// with an `UpscaleConfirmation` when done handling the resources (increasins
     117              :     /// file cache size, cgorup memory limits).
     118              :     UpscaleNotification { granted: Resources },
     119              :     /// A request to reduce resource usage. We should response with a `DownscaleResult`,
     120              :     /// when done.
     121              :     DownscaleRequest { target: Resources },
     122              :     /// Part of the bidirectional heartbeat. The heartbeat is initiated by the
     123              :     /// agent.
     124              :     /// *Note*: this is a struct variant because of the way go serializes struct{}
     125              :     HealthCheck {},
     126              : }
     127              : 
     128              : /// Represents the resources granted to a VM.
     129            0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy)]
     130              : // Renamed because the agent has multiple resources types:
     131              : // `Resources` (milliCPU/memory slots)
     132              : // `Allocation` (vCPU/bytes) <- what we correspond to
     133              : #[serde(rename(serialize = "Allocation", deserialize = "Allocation"))]
     134              : pub struct Resources {
     135              :     /// Number of vCPUs
     136              :     pub(crate) cpu: f64,
     137              :     /// Bytes of memory
     138              :     pub(crate) mem: u64,
     139              : }
     140              : 
     141              : impl Resources {
     142            0 :     pub fn new(cpu: f64, mem: u64) -> Self {
     143            0 :         Self { cpu, mem }
     144            0 :     }
     145              : }
     146              : 
     147              : pub const PROTOCOL_MIN_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
     148              : pub const PROTOCOL_MAX_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
     149              : 
     150            0 : #[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
     151              : pub struct ProtocolVersion(u8);
     152              : 
     153              : impl ProtocolVersion {
     154              :     /// Represents v1.0 of the agent<-> monitor protocol - the initial version
     155              :     ///
     156              :     /// Currently the latest version.
     157              :     const V1_0: ProtocolVersion = ProtocolVersion(1);
     158              : }
     159              : 
     160              : impl fmt::Display for ProtocolVersion {
     161            0 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     162            0 :         match *self {
     163            0 :             ProtocolVersion(0) => f.write_str("<invalid: zero>"),
     164            0 :             ProtocolVersion::V1_0 => f.write_str("v1.0"),
     165            0 :             other => write!(f, "<unknown: {other}>"),
     166              :         }
     167            0 :     }
     168              : }
     169              : 
     170              : /// A set of protocol bounds that determines what we are speaking.
     171              : ///
     172              : /// These bounds are inclusive.
     173              : #[derive(Debug)]
     174              : pub struct ProtocolRange {
     175              :     pub min: ProtocolVersion,
     176              :     pub max: ProtocolVersion,
     177              : }
     178              : 
     179              : // Use a custom deserialize impl to ensure that `self.min <= self.max`
     180              : impl<'de> Deserialize<'de> for ProtocolRange {
     181            0 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     182            0 :     where
     183            0 :         D: serde::Deserializer<'de>,
     184            0 :     {
     185            0 :         #[derive(Deserialize)]
     186              :         struct InnerProtocolRange {
     187              :             min: ProtocolVersion,
     188              :             max: ProtocolVersion,
     189              :         }
     190            0 :         let InnerProtocolRange { min, max } = InnerProtocolRange::deserialize(deserializer)?;
     191            0 :         if min > max {
     192            0 :             Err(D::Error::custom(format!(
     193            0 :                 "min version = {min} is greater than max version = {max}",
     194            0 :             )))
     195              :         } else {
     196            0 :             Ok(ProtocolRange { min, max })
     197              :         }
     198            0 :     }
     199              : }
     200              : 
     201              : impl fmt::Display for ProtocolRange {
     202            0 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     203            0 :         if self.min == self.max {
     204            0 :             f.write_fmt(format_args!("{}", self.max))
     205              :         } else {
     206            0 :             f.write_fmt(format_args!("{} to {}", self.min, self.max))
     207              :         }
     208            0 :     }
     209              : }
     210              : 
     211              : impl ProtocolRange {
     212              :     /// Find the highest shared version between two `ProtocolRange`'s
     213            0 :     pub fn highest_shared_version(&self, other: &Self) -> anyhow::Result<ProtocolVersion> {
     214            0 :         // We first have to make sure the ranges are overlapping. Once we know
     215            0 :         // this, we can merge the ranges by taking the max of the mins and the
     216            0 :         // mins of the maxes.
     217            0 :         if self.min > other.max {
     218            0 :             anyhow::bail!(
     219            0 :                 "Non-overlapping bounds: other.max = {} was less than self.min = {}",
     220            0 :                 other.max,
     221            0 :                 self.min,
     222            0 :             )
     223            0 :         } else if self.max < other.min {
     224            0 :             anyhow::bail!(
     225            0 :                 "Non-overlappinng bounds: self.max = {} was less than other.min = {}",
     226            0 :                 self.max,
     227            0 :                 other.min
     228            0 :             )
     229              :         } else {
     230            0 :             Ok(cmp::min(self.max, other.max))
     231              :         }
     232            0 :     }
     233              : }
     234              : 
     235              : /// We send this to the monitor after negotiating which protocol to use
     236              : #[derive(Serialize, Debug)]
     237              : #[serde(rename_all = "camelCase")]
     238              : pub enum ProtocolResponse {
     239              :     Error(String),
     240              :     Version(ProtocolVersion),
     241              : }
        

Generated by: LCOV version 2.1-beta