Line data Source code
1 : //! Types representing protocols and actual agent-monitor messages.
2 : //!
3 : //! The pervasive use of serde modifiers throughout this module is to ease
4 : //! serialization on the go side. Because go does not have enums (which model
5 : //! messages well), it is harder to model messages, and we accomodate that with
6 : //! serde.
7 : //!
8 : //! *Note*: the agent sends and receives messages in different ways.
9 : //!
10 : //! The agent serializes messages in the form and then sends them. The use
11 : //! of `#[serde(tag = "type", content = "content")]` allows us to use `Type`
12 : //! to determine how to deserialize `Content`.
13 : //! ```ignore
14 : //! struct {
15 : //! Content any
16 : //! Type string
17 : //! Id uint64
18 : //! }
19 : //! ```
20 : //! and receives messages in the form:
21 : //! ```ignore
22 : //! struct {
23 : //! {fields embedded}
24 : //! Type string
25 : //! Id uint64
26 : //! }
27 : //! ```
28 : //! After reading the type field, the agent will decode the entire message
29 : //! again, this time into the correct type using the embedded fields.
30 : //! Because the agent cannot just extract the json contained in a certain field
31 : //! (it initially deserializes to `map[string]interface{}`), we keep the fields
32 : //! at the top level, so the entire piece of json can be deserialized into a struct,
33 : //! such as a `DownscaleResult`, with the `Type` and `Id` fields ignored.
34 :
35 : use core::fmt;
36 : use std::cmp;
37 :
38 : use serde::de::Error;
39 : use serde::{Deserialize, Serialize};
40 :
41 : /// A Message we send to the agent.
42 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
43 : pub struct OutboundMsg {
44 : #[serde(flatten)]
45 : pub(crate) inner: OutboundMsgKind,
46 : pub(crate) id: usize,
47 : }
48 :
49 : impl OutboundMsg {
50 0 : pub fn new(inner: OutboundMsgKind, id: usize) -> Self {
51 0 : Self { inner, id }
52 0 : }
53 : }
54 :
55 : /// The different underlying message types we can send to the agent.
56 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
57 : #[serde(tag = "type")]
58 : pub enum OutboundMsgKind {
59 : /// Indicates that the agent sent an invalid message, i.e, we couldn't
60 : /// properly deserialize it.
61 : InvalidMessage { error: String },
62 : /// Indicates that we experienced an internal error while processing a message.
63 : /// For example, if a cgroup operation fails while trying to handle an upscale,
64 : /// we return `InternalError`.
65 : InternalError { error: String },
66 : /// Returned to the agent once we have finished handling an upscale. If the
67 : /// handling was unsuccessful, an `InternalError` will get returned instead.
68 : /// *Note*: this is a struct variant because of the way go serializes struct{}
69 : UpscaleConfirmation {},
70 : /// Indicates to the monitor that we are urgently requesting resources.
71 : /// *Note*: this is a struct variant because of the way go serializes struct{}
72 : UpscaleRequest {},
73 : /// Returned to the agent once we have finished attempting to downscale. If
74 : /// an error occured trying to do so, an `InternalError` will get returned instead.
75 : /// However, if we are simply unsuccessful (for example, do to needing the resources),
76 : /// that gets included in the `DownscaleResult`.
77 : DownscaleResult {
78 : // FIXME for the future (once the informant is deprecated)
79 : // As of the time of writing, the agent/informant version of this struct is
80 : // called api.DownscaleResult. This struct has uppercase fields which are
81 : // serialized as such. Thus, we serialize using uppercase names so we don't
82 : // have to make a breaking change to the agent<->informant protocol. Once
83 : // the informant has been superseded by the monitor, we can add the correct
84 : // struct tags to api.DownscaleResult without causing a breaking change,
85 : // since we don't need to support the agent<->informant protocol anymore.
86 : #[serde(rename = "Ok")]
87 : ok: bool,
88 : #[serde(rename = "Status")]
89 : status: String,
90 : },
91 : /// Part of the bidirectional heartbeat. The heartbeat is initiated by the
92 : /// agent.
93 : /// *Note*: this is a struct variant because of the way go serializes struct{}
94 : HealthCheck {},
95 : }
96 :
97 : /// A message received form the agent.
98 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
99 : pub struct InboundMsg {
100 : #[serde(flatten)]
101 : pub(crate) inner: InboundMsgKind,
102 : pub(crate) id: usize,
103 : }
104 :
105 : /// The different underlying message types we can receive from the agent.
106 0 : #[derive(Serialize, Deserialize, Debug, Clone)]
107 : #[serde(tag = "type", content = "content")]
108 : pub enum InboundMsgKind {
109 : /// Indicates that the we sent an invalid message, i.e, we couldn't
110 : /// properly deserialize it.
111 : InvalidMessage { error: String },
112 : /// Indicates that the informan experienced an internal error while processing
113 : /// a message. For example, if it failed to request upsacle from the agent, it
114 : /// would return an `InternalError`.
115 : InternalError { error: String },
116 : /// Indicates to us that we have been granted more resources. We should respond
117 : /// with an `UpscaleConfirmation` when done handling the resources (increasins
118 : /// file cache size, cgorup memory limits).
119 : UpscaleNotification { granted: Resources },
120 : /// A request to reduce resource usage. We should response with a `DownscaleResult`,
121 : /// when done.
122 : DownscaleRequest { target: Resources },
123 : /// Part of the bidirectional heartbeat. The heartbeat is initiated by the
124 : /// agent.
125 : /// *Note*: this is a struct variant because of the way go serializes struct{}
126 : HealthCheck {},
127 : }
128 :
129 : /// Represents the resources granted to a VM.
130 0 : #[derive(Serialize, Deserialize, Debug, Clone, Copy)]
131 : // Renamed because the agent has multiple resources types:
132 : // `Resources` (milliCPU/memory slots)
133 : // `Allocation` (vCPU/bytes) <- what we correspond to
134 : #[serde(rename(serialize = "Allocation", deserialize = "Allocation"))]
135 : pub struct Resources {
136 : /// Number of vCPUs
137 : pub(crate) cpu: f64,
138 : /// Bytes of memory
139 : pub(crate) mem: u64,
140 : }
141 :
142 : impl Resources {
143 0 : pub fn new(cpu: f64, mem: u64) -> Self {
144 0 : Self { cpu, mem }
145 0 : }
146 : }
147 :
148 : pub const PROTOCOL_MIN_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
149 : pub const PROTOCOL_MAX_VERSION: ProtocolVersion = ProtocolVersion::V1_0;
150 :
151 0 : #[derive(Debug, Clone, Copy, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
152 : pub struct ProtocolVersion(u8);
153 :
154 : impl ProtocolVersion {
155 : /// Represents v1.0 of the agent<-> monitor protocol - the initial version
156 : ///
157 : /// Currently the latest version.
158 : const V1_0: ProtocolVersion = ProtocolVersion(1);
159 : }
160 :
161 : impl fmt::Display for ProtocolVersion {
162 0 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
163 0 : match *self {
164 0 : ProtocolVersion(0) => f.write_str("<invalid: zero>"),
165 0 : ProtocolVersion::V1_0 => f.write_str("v1.0"),
166 0 : other => write!(f, "<unknown: {other}>"),
167 : }
168 0 : }
169 : }
170 :
171 : /// A set of protocol bounds that determines what we are speaking.
172 : ///
173 : /// These bounds are inclusive.
174 : #[derive(Debug)]
175 : pub struct ProtocolRange {
176 : pub min: ProtocolVersion,
177 : pub max: ProtocolVersion,
178 : }
179 :
180 : // Use a custom deserialize impl to ensure that `self.min <= self.max`
181 : impl<'de> Deserialize<'de> for ProtocolRange {
182 0 : fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
183 0 : where
184 0 : D: serde::Deserializer<'de>,
185 0 : {
186 0 : #[derive(Deserialize)]
187 : struct InnerProtocolRange {
188 : min: ProtocolVersion,
189 : max: ProtocolVersion,
190 : }
191 0 : let InnerProtocolRange { min, max } = InnerProtocolRange::deserialize(deserializer)?;
192 0 : if min > max {
193 0 : Err(D::Error::custom(format!(
194 0 : "min version = {min} is greater than max version = {max}",
195 0 : )))
196 : } else {
197 0 : Ok(ProtocolRange { min, max })
198 : }
199 0 : }
200 : }
201 :
202 : impl fmt::Display for ProtocolRange {
203 0 : fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
204 0 : if self.min == self.max {
205 0 : f.write_fmt(format_args!("{}", self.max))
206 : } else {
207 0 : f.write_fmt(format_args!("{} to {}", self.min, self.max))
208 : }
209 0 : }
210 : }
211 :
212 : impl ProtocolRange {
213 : /// Find the highest shared version between two `ProtocolRange`'s
214 0 : pub fn highest_shared_version(&self, other: &Self) -> anyhow::Result<ProtocolVersion> {
215 0 : // We first have to make sure the ranges are overlapping. Once we know
216 0 : // this, we can merge the ranges by taking the max of the mins and the
217 0 : // mins of the maxes.
218 0 : if self.min > other.max {
219 0 : anyhow::bail!(
220 0 : "Non-overlapping bounds: other.max = {} was less than self.min = {}",
221 0 : other.max,
222 0 : self.min,
223 0 : )
224 0 : } else if self.max < other.min {
225 0 : anyhow::bail!(
226 0 : "Non-overlappinng bounds: self.max = {} was less than other.min = {}",
227 0 : self.max,
228 0 : other.min
229 0 : )
230 : } else {
231 0 : Ok(cmp::min(self.max, other.max))
232 : }
233 0 : }
234 : }
235 :
236 : /// We send this to the monitor after negotiating which protocol to use
237 : #[derive(Serialize, Debug)]
238 : #[serde(rename_all = "camelCase")]
239 : pub enum ProtocolResponse {
240 : Error(String),
241 : Version(ProtocolVersion),
242 : }
|