LCOV - differential code coverage report
Current view: top level - libs/vm_monitor/src - protocol.rs (source / functions) Coverage Total Hit UBC
Current: f6946e90941b557c917ac98cd5a7e9506d180f3e.info Lines: 0.0 % 55 0 55
Current Date: 2023-10-19 02:04:12 Functions: 0.0 % 173 0 173
Baseline: c8637f37369098875162f194f92736355783b050.info
Baseline Date: 2023-10-18 20:25:20

           TLA  Line data    Source code
       1                 : //! Types representing protocols and actual agent-monitor messages.
       2                 : //!
       3                 : //! The pervasive use of serde modifiers throughout this module is to ease
       4                 : //! serialization on the go side. Because go does not have enums (which model
       5                 : //! messages well), it is harder to model messages, and we accomodate that with
       6                 : //! serde.
       7                 : //!
       8                 : //! *Note*: the agent sends and receives messages in different ways.
       9                 : //!
      10                 : //! The agent serializes messages in the form and then sends them. The use
      11                 : //! of `#[serde(tag = "type", content = "content")]` allows us to use `Type`
      12                 : //! to determine how to deserialize `Content`.
      13                 : //! ```ignore
      14                 : //! struct {
      15                 : //!     Content any
      16                 : //!     Type    string
      17                 : //!     Id      uint64
      18                 : //! }
      19                 : //! ```
      20                 : //! and receives messages in the form:
      21                 : //! ```ignore
      22                 : //! struct {
      23                 : //!     {fields embedded}
      24                 : //!     Type string
      25                 : //!     Id   uint64
      26                 : //! }
      27                 : //! ```
      28                 : //! After reading the type field, the agent will decode the entire message
      29                 : //! again, this time into the correct type using the embedded fields.
      30                 : //! Because the agent cannot just extract the json contained in a certain field
      31                 : //! (it initially deserializes to `map[string]interface{}`), we keep the fields
      32                 : //! at the top level, so the entire piece of json can be deserialized into a struct,
      33                 : //! such as a `DownscaleResult`, with the `Type` and `Id` fields ignored.
      34                 : 
      35                 : use core::fmt;
      36                 : use std::cmp;
      37                 : 
      38                 : use serde::{de::Error, Deserialize, Serialize};
      39                 : 
      40                 : /// A Message we send to the agent.
      41 UBC           0 : #[derive(Serialize, Deserialize, Debug, Clone)]
      42                 : pub struct OutboundMsg {
      43                 :     #[serde(flatten)]
      44                 :     pub(crate) inner: OutboundMsgKind,
      45                 :     pub(crate) id: usize,
      46                 : }
      47                 : 
      48                 : impl OutboundMsg {
      49               0 :     pub fn new(inner: OutboundMsgKind, id: usize) -> Self {
      50               0 :         Self { inner, id }
      51               0 :     }
      52                 : }
      53                 : 
      54                 : /// The different underlying message types we can send to the agent.
      55               0 : #[derive(Serialize, Deserialize, Debug, Clone)]
      56                 : #[serde(tag = "type")]
      57                 : pub enum OutboundMsgKind {
      58                 :     /// Indicates that the agent sent an invalid message, i.e, we couldn't
      59                 :     /// properly deserialize it.
      60                 :     InvalidMessage { error: String },
      61                 :     /// Indicates that we experienced an internal error while processing a message.
      62                 :     /// For example, if a cgroup operation fails while trying to handle an upscale,
      63                 :     /// we return `InternalError`.
      64                 :     InternalError { error: String },
      65                 :     /// Returned to the agent once we have finished handling an upscale. If the
      66                 :     /// handling was unsuccessful, an `InternalError` will get returned instead.
      67                 :     /// *Note*: this is a struct variant because of the way go serializes struct{}
      68                 :     UpscaleConfirmation {},
      69                 :     /// Indicates to the monitor that we are urgently requesting resources.
      70                 :     /// *Note*: this is a struct variant because of the way go serializes struct{}
      71                 :     UpscaleRequest {},
      72                 :     /// Returned to the agent once we have finished attempting to downscale. If
      73                 :     /// an error occured trying to do so, an `InternalError` will get returned instead.
      74                 :     /// However, if we are simply unsuccessful (for example, do to needing the resources),
      75                 :     /// that gets included in the `DownscaleResult`.
      76                 :     DownscaleResult {
      77                 :         // FIXME for the future (once the informant is deprecated)
      78                 :         // As of the time of writing, the agent/informant version of this struct is
      79                 :         // called api.DownscaleResult. This struct has uppercase fields which are
      80                 :         // serialized as such. Thus, we serialize using uppercase names so we don't
      81                 :         // have to make a breaking change to the agent<->informant protocol. Once
      82                 :         // the informant has been superseded by the monitor, we can add the correct
      83                 :         // struct tags to api.DownscaleResult without causing a breaking change,
      84                 :         // since we don't need to support the agent<->informant protocol anymore.
      85                 :         #[serde(rename = "Ok")]
      86                 :         ok: bool,
      87                 :         #[serde(rename = "Status")]
      88                 :         status: String,
      89                 :     },
      90                 :     /// Part of the bidirectional heartbeat. The heartbeat is initiated by the
      91                 :     /// agent.
      92                 :     /// *Note*: this is a struct variant because of the way go serializes struct{}
      93                 :     HealthCheck {},
      94                 : }
      95                 : 
      96                 : /// A message received form the agent.
      97               0 : #[derive(Serialize, Deserialize, Debug, Clone)]
      98                 : pub struct InboundMsg {
      99                 :     #[serde(flatten)]
     100                 :     pub(crate) inner: InboundMsgKind,
     101                 :     pub(crate) id: usize,
     102                 : }
     103                 : 
     104                 : /// The different underlying message types we can receive from the agent.
     105               0 : #[derive(Serialize, Deserialize, Debug, Clone)]
     106                 : #[serde(tag = "type", content = "content")]
     107                 : pub enum InboundMsgKind {
     108                 :     /// Indicates that the we sent an invalid message, i.e, we couldn't
     109                 :     /// properly deserialize it.
     110                 :     InvalidMessage { error: String },
     111                 :     /// Indicates that the informan experienced an internal error while processing
     112                 :     /// a message. For example, if it failed to request upsacle from the agent, it
     113                 :     /// would return an `InternalError`.
     114                 :     InternalError { error: String },
     115                 :     /// Indicates to us that we have been granted more resources. We should respond
     116                 :     /// with an `UpscaleConfirmation` when done handling the resources (increasins
     117                 :     /// file cache size, cgorup memory limits).
     118                 :     UpscaleNotification { granted: Resources },
     119                 :     /// A request to reduce resource usage. We should response with a `DownscaleResult`,
     120                 :     /// when done.
     121                 :     DownscaleRequest { target: Resources },
     122                 :     /// Part of the bidirectional heartbeat. The heartbeat is initiated by the
     123                 :     /// agent.
     124                 :     /// *Note*: this is a struct variant because of the way go serializes struct{}
     125                 :     HealthCheck {},
     126                 : }
     127                 : 
     128                 : /// Represents the resources granted to a VM.
     129               0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy)]
     130                 : // Renamed because the agent has multiple resources types:
     131                 : // `Resources` (milliCPU/memory slots)
     132                 : // `Allocation` (vCPU/bytes) <- what we correspond to
     133                 : #[serde(rename(serialize = "Allocation", deserialize = "Allocation"))]
     134                 : pub struct Resources {
     135                 :     /// Number of vCPUs
     136                 :     pub(crate) cpu: f64,
     137                 :     /// Bytes of memory
     138                 :     pub(crate) mem: u64,
     139                 : }
     140                 : 
     141                 : impl Resources {
     142               0 :     pub fn new(cpu: f64, mem: u64) -> Self {
     143               0 :         Self { cpu, mem }
     144               0 :     }
     145                 : }
     146                 : 
     147                 : pub const PROTOCOL_MIN_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
     148                 : pub const PROTOCOL_MAX_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
     149                 : 
     150               0 : #[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
     151                 : pub struct ProtocolVersion(u8);
     152                 : 
     153                 : impl ProtocolVersion {
     154                 :     /// Represents v1.0 of the agent<-> monitor protocol - the initial version
     155                 :     ///
     156                 :     /// Currently the latest version.
     157                 :     const V1_0: ProtocolVersion = ProtocolVersion(1);
     158                 : }
     159                 : 
     160                 : impl fmt::Display for ProtocolVersion {
     161               0 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     162               0 :         match *self {
     163               0 :             ProtocolVersion(0) => f.write_str("<invalid: zero>"),
     164               0 :             ProtocolVersion::V1_0 => f.write_str("v1.0"),
     165               0 :             other => write!(f, "<unknown: {other}>"),
     166                 :         }
     167               0 :     }
     168                 : }
     169                 : 
     170                 : /// A set of protocol bounds that determines what we are speaking.
     171                 : ///
     172                 : /// These bounds are inclusive.
     173               0 : #[derive(Debug)]
     174                 : pub struct ProtocolRange {
     175                 :     pub min: ProtocolVersion,
     176                 :     pub max: ProtocolVersion,
     177                 : }
     178                 : 
     179                 : // Use a custom deserialize impl to ensure that `self.min <= self.max`
     180                 : impl<'de> Deserialize<'de> for ProtocolRange {
     181               0 :     fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
     182               0 :     where
     183               0 :         D: serde::Deserializer<'de>,
     184               0 :     {
     185               0 :         #[derive(Deserialize)]
     186                 :         struct InnerProtocolRange {
     187                 :             min: ProtocolVersion,
     188                 :             max: ProtocolVersion,
     189                 :         }
     190               0 :         let InnerProtocolRange { min, max } = InnerProtocolRange::deserialize(deserializer)?;
     191               0 :         if min > max {
     192               0 :             Err(D::Error::custom(format!(
     193               0 :                 "min version = {min} is greater than max version = {max}",
     194               0 :             )))
     195                 :         } else {
     196               0 :             Ok(ProtocolRange { min, max })
     197                 :         }
     198               0 :     }
     199                 : }
     200                 : 
     201                 : impl fmt::Display for ProtocolRange {
     202               0 :     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     203               0 :         if self.min == self.max {
     204               0 :             f.write_fmt(format_args!("{}", self.max))
     205                 :         } else {
     206               0 :             f.write_fmt(format_args!("{} to {}", self.min, self.max))
     207                 :         }
     208               0 :     }
     209                 : }
     210                 : 
     211                 : impl ProtocolRange {
     212                 :     /// Find the highest shared version between two `ProtocolRange`'s
     213               0 :     pub fn highest_shared_version(&self, other: &Self) -> anyhow::Result<ProtocolVersion> {
     214               0 :         // We first have to make sure the ranges are overlapping. Once we know
     215               0 :         // this, we can merge the ranges by taking the max of the mins and the
     216               0 :         // mins of the maxes.
     217               0 :         if self.min > other.max {
     218               0 :             anyhow::bail!(
     219               0 :                 "Non-overlapping bounds: other.max = {} was less than self.min = {}",
     220               0 :                 other.max,
     221               0 :                 self.min,
     222               0 :             )
     223               0 :         } else if self.max < other.min {
     224               0 :             anyhow::bail!(
     225               0 :                 "Non-overlappinng bounds: self.max = {} was less than other.min = {}",
     226               0 :                 self.max,
     227               0 :                 other.min
     228               0 :             )
     229                 :         } else {
     230               0 :             Ok(cmp::min(self.max, other.max))
     231                 :         }
     232               0 :     }
     233                 : }
     234                 : 
     235                 : /// We send this to the monitor after negotiating which protocol to use
     236               0 : #[derive(Serialize, Debug)]
     237                 : #[serde(rename_all = "camelCase")]
     238                 : pub enum ProtocolResponse {
     239                 :     Error(String),
     240                 :     Version(ProtocolVersion),
     241                 : }
        

Generated by: LCOV version 2.1-beta