Line data Source code
1 : //! Types representing protocols and actual agent-monitor messages.
2 : //!
3 : //! The pervasive use of serde modifiers throughout this module is to ease
4 : //! serialization on the go side. Because go does not have enums (which model
5 : //! messages well), it is harder to model messages, and we accomodate that with
6 : //! serde.
7 : //!
8 : //! *Note*: the agent sends and receives messages in different ways.
9 : //!
10 : //! The agent serializes messages in the form and then sends them. The use
11 : //! of `#[serde(tag = "type", content = "content")]` allows us to use `Type`
12 : //! to determine how to deserialize `Content`.
13 : //! ```ignore
14 : //! struct {
15 : //! Content any
16 : //! Type string
17 : //! Id uint64
18 : //! }
19 : //! ```
20 : //! and receives messages in the form:
21 : //! ```ignore
22 : //! struct {
23 : //! {fields embedded}
24 : //! Type string
25 : //! Id uint64
26 : //! }
27 : //! ```
28 : //! After reading the type field, the agent will decode the entire message
29 : //! again, this time into the correct type using the embedded fields.
30 : //! Because the agent cannot just extract the json contained in a certain field
31 : //! (it initially deserializes to `map[string]interface{}`), we keep the fields
32 : //! at the top level, so the entire piece of json can be deserialized into a struct,
33 : //! such as a `DownscaleResult`, with the `Type` and `Id` fields ignored.
34 :
35 : use core::fmt;
36 : use std::cmp;
37 :
38 : use serde::{de::Error, Deserialize, Serialize};
39 :
40 : /// A Message we send to the agent.
41 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
42 : pub struct OutboundMsg {
43 : #[serde(flatten)]
44 : pub(crate) inner: OutboundMsgKind,
45 : pub(crate) id: usize,
46 : }
47 :
48 : impl OutboundMsg {
49 0 : pub fn new(inner: OutboundMsgKind, id: usize) -> Self {
50 0 : Self { inner, id }
51 0 : }
52 : }
53 :
54 : /// The different underlying message types we can send to the agent.
55 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
56 : #[serde(tag = "type")]
57 : pub enum OutboundMsgKind {
58 : /// Indicates that the agent sent an invalid message, i.e, we couldn't
59 : /// properly deserialize it.
60 : InvalidMessage { error: String },
61 : /// Indicates that we experienced an internal error while processing a message.
62 : /// For example, if a cgroup operation fails while trying to handle an upscale,
63 : /// we return `InternalError`.
64 : InternalError { error: String },
65 : /// Returned to the agent once we have finished handling an upscale. If the
66 : /// handling was unsuccessful, an `InternalError` will get returned instead.
67 : /// *Note*: this is a struct variant because of the way go serializes struct{}
68 : UpscaleConfirmation {},
69 : /// Indicates to the monitor that we are urgently requesting resources.
70 : /// *Note*: this is a struct variant because of the way go serializes struct{}
71 : UpscaleRequest {},
72 : /// Returned to the agent once we have finished attempting to downscale. If
73 : /// an error occured trying to do so, an `InternalError` will get returned instead.
74 : /// However, if we are simply unsuccessful (for example, do to needing the resources),
75 : /// that gets included in the `DownscaleResult`.
76 : DownscaleResult {
77 : // FIXME for the future (once the informant is deprecated)
78 : // As of the time of writing, the agent/informant version of this struct is
79 : // called api.DownscaleResult. This struct has uppercase fields which are
80 : // serialized as such. Thus, we serialize using uppercase names so we don't
81 : // have to make a breaking change to the agent<->informant protocol. Once
82 : // the informant has been superseded by the monitor, we can add the correct
83 : // struct tags to api.DownscaleResult without causing a breaking change,
84 : // since we don't need to support the agent<->informant protocol anymore.
85 : #[serde(rename = "Ok")]
86 : ok: bool,
87 : #[serde(rename = "Status")]
88 : status: String,
89 : },
90 : /// Part of the bidirectional heartbeat. The heartbeat is initiated by the
91 : /// agent.
92 : /// *Note*: this is a struct variant because of the way go serializes struct{}
93 : HealthCheck {},
94 : }
95 :
96 : /// A message received form the agent.
97 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
98 : pub struct InboundMsg {
99 : #[serde(flatten)]
100 : pub(crate) inner: InboundMsgKind,
101 : pub(crate) id: usize,
102 : }
103 :
104 : /// The different underlying message types we can receive from the agent.
105 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
106 : #[serde(tag = "type", content = "content")]
107 : pub enum InboundMsgKind {
108 : /// Indicates that the we sent an invalid message, i.e, we couldn't
109 : /// properly deserialize it.
110 : InvalidMessage { error: String },
111 : /// Indicates that the informan experienced an internal error while processing
112 : /// a message. For example, if it failed to request upsacle from the agent, it
113 : /// would return an `InternalError`.
114 : InternalError { error: String },
115 : /// Indicates to us that we have been granted more resources. We should respond
116 : /// with an `UpscaleConfirmation` when done handling the resources (increasins
117 : /// file cache size, cgorup memory limits).
118 : UpscaleNotification { granted: Resources },
119 : /// A request to reduce resource usage. We should response with a `DownscaleResult`,
120 : /// when done.
121 : DownscaleRequest { target: Resources },
122 : /// Part of the bidirectional heartbeat. The heartbeat is initiated by the
123 : /// agent.
124 : /// *Note*: this is a struct variant because of the way go serializes struct{}
125 : HealthCheck {},
126 : }
127 :
128 : /// Represents the resources granted to a VM.
129 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy)]
130 : // Renamed because the agent has multiple resources types:
131 : // `Resources` (milliCPU/memory slots)
132 : // `Allocation` (vCPU/bytes) <- what we correspond to
133 : #[serde(rename(serialize = "Allocation", deserialize = "Allocation"))]
134 : pub struct Resources {
135 : /// Number of vCPUs
136 : pub(crate) cpu: f64,
137 : /// Bytes of memory
138 : pub(crate) mem: u64,
139 : }
140 :
141 : impl Resources {
142 0 : pub fn new(cpu: f64, mem: u64) -> Self {
143 0 : Self { cpu, mem }
144 0 : }
145 : }
146 :
147 : pub const PROTOCOL_MIN_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
148 : pub const PROTOCOL_MAX_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
149 :
150 0 : #[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
151 : pub struct ProtocolVersion(u8);
152 :
153 : impl ProtocolVersion {
154 : /// Represents v1.0 of the agent<-> monitor protocol - the initial version
155 : ///
156 : /// Currently the latest version.
157 : const V1_0: ProtocolVersion = ProtocolVersion(1);
158 : }
159 :
160 : impl fmt::Display for ProtocolVersion {
161 0 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162 0 : match *self {
163 0 : ProtocolVersion(0) => f.write_str("<invalid: zero>"),
164 0 : ProtocolVersion::V1_0 => f.write_str("v1.0"),
165 0 : other => write!(f, "<unknown: {other}>"),
166 : }
167 0 : }
168 : }
169 :
170 : /// A set of protocol bounds that determines what we are speaking.
171 : ///
172 : /// These bounds are inclusive.
173 : #[derive(Debug)]
174 : pub struct ProtocolRange {
175 : pub min: ProtocolVersion,
176 : pub max: ProtocolVersion,
177 : }
178 :
179 : // Use a custom deserialize impl to ensure that `self.min <= self.max`
180 : impl<'de> Deserialize<'de> for ProtocolRange {
181 0 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
182 0 : where
183 0 : D: serde::Deserializer<'de>,
184 0 : {
185 0 : #[derive(Deserialize)]
186 : struct InnerProtocolRange {
187 : min: ProtocolVersion,
188 : max: ProtocolVersion,
189 : }
190 0 : let InnerProtocolRange { min, max } = InnerProtocolRange::deserialize(deserializer)?;
191 0 : if min > max {
192 0 : Err(D::Error::custom(format!(
193 0 : "min version = {min} is greater than max version = {max}",
194 0 : )))
195 : } else {
196 0 : Ok(ProtocolRange { min, max })
197 : }
198 0 : }
199 : }
200 :
201 : impl fmt::Display for ProtocolRange {
202 0 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203 0 : if self.min == self.max {
204 0 : f.write_fmt(format_args!("{}", self.max))
205 : } else {
206 0 : f.write_fmt(format_args!("{} to {}", self.min, self.max))
207 : }
208 0 : }
209 : }
210 :
211 : impl ProtocolRange {
212 : /// Find the highest shared version between two `ProtocolRange`'s
213 0 : pub fn highest_shared_version(&self, other: &Self) -> anyhow::Result<ProtocolVersion> {
214 0 : // We first have to make sure the ranges are overlapping. Once we know
215 0 : // this, we can merge the ranges by taking the max of the mins and the
216 0 : // mins of the maxes.
217 0 : if self.min > other.max {
218 0 : anyhow::bail!(
219 0 : "Non-overlapping bounds: other.max = {} was less than self.min = {}",
220 0 : other.max,
221 0 : self.min,
222 0 : )
223 0 : } else if self.max < other.min {
224 0 : anyhow::bail!(
225 0 : "Non-overlappinng bounds: self.max = {} was less than other.min = {}",
226 0 : self.max,
227 0 : other.min
228 0 : )
229 : } else {
230 0 : Ok(cmp::min(self.max, other.max))
231 : }
232 0 : }
233 : }
234 :
235 : /// We send this to the monitor after negotiating which protocol to use
236 : #[derive(Serialize, Debug)]
237 : #[serde(rename_all = "camelCase")]
238 : pub enum ProtocolResponse {
239 : Error(String),
240 : Version(ProtocolVersion),
241 : }
|