Line data Source code
1 : use metrics::{
2 : Histogram, IntCounter, register_histogram_vec, register_int_counter, register_int_counter_vec,
3 : };
4 : use once_cell::sync::Lazy;
5 :
6 : pub(super) static BUCKET_METRICS: Lazy<BucketMetrics> = Lazy::new(Default::default);
7 :
8 : #[derive(Clone, Copy, Debug)]
9 : pub(crate) enum RequestKind {
10 : Get = 0,
11 : Put = 1,
12 : Delete = 2,
13 : List = 3,
14 : Copy = 4,
15 : TimeTravel = 5,
16 : Head = 6,
17 : ListVersions = 7,
18 : }
19 :
20 : use RequestKind::*;
21 : use scopeguard::ScopeGuard;
22 :
23 : impl RequestKind {
24 1440 : const fn as_str(&self) -> &'static str {
25 1440 : match self {
26 180 : Get => "get_object",
27 180 : Put => "put_object",
28 180 : Delete => "delete_object",
29 180 : List => "list_objects",
30 180 : Copy => "copy_object",
31 180 : TimeTravel => "time_travel_recover",
32 180 : Head => "head_object",
33 180 : ListVersions => "list_versions",
34 : }
35 1440 : }
36 2683 : const fn as_index(&self) -> usize {
37 2683 : *self as usize
38 2683 : }
39 : }
40 :
41 : const REQUEST_KIND_LIST: &[RequestKind] =
42 : &[Get, Put, Delete, List, Copy, TimeTravel, Head, ListVersions];
43 :
44 : const REQUEST_KIND_COUNT: usize = REQUEST_KIND_LIST.len();
45 : pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);
46 :
47 : impl<C> RequestTyped<C> {
48 1243 : pub(crate) fn get(&self, kind: RequestKind) -> &C {
49 1243 : &self.0[kind.as_index()]
50 1243 : }
51 :
52 180 : fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
53 180 : let mut it = REQUEST_KIND_LIST.iter();
54 1440 : let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
55 1440 : let next = it.next().unwrap();
56 1440 : assert_eq!(index, next.as_index());
57 1440 : f(*next)
58 1440 : });
59 :
60 180 : if let Some(next) = it.next() {
61 0 : panic!("unexpected {next:?}");
62 180 : }
63 180 :
64 180 : RequestTyped(arr)
65 180 : }
66 : }
67 :
68 : impl RequestTyped<Histogram> {
69 502 : pub(crate) fn observe_elapsed(&self, kind: RequestKind, started_at: std::time::Instant) {
70 502 : self.get(kind).observe(started_at.elapsed().as_secs_f64())
71 502 : }
72 : }
73 :
74 : pub(crate) struct PassFailCancelledRequestTyped<C> {
75 : success: RequestTyped<C>,
76 : fail: RequestTyped<C>,
77 : cancelled: RequestTyped<C>,
78 : }
79 :
80 : #[derive(Debug, Clone, Copy)]
81 : pub(crate) enum AttemptOutcome {
82 : Ok,
83 : Err,
84 : Cancelled,
85 : }
86 :
87 : impl<T, E> From<&Result<T, E>> for AttemptOutcome {
88 603 : fn from(value: &Result<T, E>) -> Self {
89 603 : match value {
90 600 : Ok(_) => AttemptOutcome::Ok,
91 3 : Err(_) => AttemptOutcome::Err,
92 : }
93 405 : }
94 : }
95 :
96 : impl AttemptOutcome {
97 864 : pub(crate) fn as_str(&self) -> &'static str {
98 864 : match self {
99 288 : AttemptOutcome::Ok => "ok",
100 288 : AttemptOutcome::Err => "err",
101 288 : AttemptOutcome::Cancelled => "cancelled",
102 : }
103 864 : }
104 : }
105 :
106 : impl<C> PassFailCancelledRequestTyped<C> {
107 741 : pub(crate) fn get(&self, kind: RequestKind, outcome: AttemptOutcome) -> &C {
108 741 : let target = match outcome {
109 728 : AttemptOutcome::Ok => &self.success,
110 3 : AttemptOutcome::Err => &self.fail,
111 10 : AttemptOutcome::Cancelled => &self.cancelled,
112 : };
113 741 : target.get(kind)
114 741 : }
115 :
116 36 : fn build_with(mut f: impl FnMut(RequestKind, AttemptOutcome) -> C) -> Self {
117 288 : let success = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Ok));
118 288 : let fail = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Err));
119 288 : let cancelled = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Cancelled));
120 36 :
121 36 : PassFailCancelledRequestTyped {
122 36 : success,
123 36 : fail,
124 36 : cancelled,
125 36 : }
126 36 : }
127 : }
128 :
129 : impl PassFailCancelledRequestTyped<Histogram> {
130 741 : pub(crate) fn observe_elapsed(
131 741 : &self,
132 741 : kind: RequestKind,
133 741 : outcome: impl Into<AttemptOutcome>,
134 741 : started_at: std::time::Instant,
135 741 : ) {
136 741 : self.get(kind, outcome.into())
137 741 : .observe(started_at.elapsed().as_secs_f64())
138 741 : }
139 : }
140 :
141 : /// On drop (cancellation) count towards [`BucketMetrics::cancelled_waits`].
142 502 : pub(crate) fn start_counting_cancelled_wait(
143 502 : kind: RequestKind,
144 502 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
145 502 : scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
146 0 : crate::metrics::BUCKET_METRICS
147 0 : .cancelled_waits
148 0 : .get(kind)
149 0 : .inc()
150 502 : })
151 502 : }
152 :
153 : /// On drop (cancellation) add time to [`BucketMetrics::req_seconds`].
154 739 : pub(crate) fn start_measuring_requests(
155 739 : kind: RequestKind,
156 739 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
157 739 : scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
158 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
159 0 : kind,
160 0 : AttemptOutcome::Cancelled,
161 0 : started_at,
162 0 : )
163 739 : })
164 739 : }
165 :
166 : pub(crate) struct BucketMetrics {
167 : /// Full request duration until successful completion, error or cancellation.
168 : pub(crate) req_seconds: PassFailCancelledRequestTyped<Histogram>,
169 : /// Total amount of seconds waited on queue.
170 : pub(crate) wait_seconds: RequestTyped<Histogram>,
171 :
172 : /// Track how many semaphore awaits were cancelled per request type.
173 : ///
174 : /// This is in case cancellations are happening more than expected.
175 : pub(crate) cancelled_waits: RequestTyped<IntCounter>,
176 :
177 : /// Total amount of deleted objects in batches or single requests.
178 : pub(crate) deleted_objects_total: IntCounter,
179 : }
180 :
181 : impl Default for BucketMetrics {
182 36 : fn default() -> Self {
183 36 : // first bucket 100 microseconds to count requests that do not need to wait at all
184 36 : // and get a permit immediately
185 36 : let buckets = [0.0001, 0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0];
186 36 :
187 36 : let req_seconds = register_histogram_vec!(
188 36 : "remote_storage_s3_request_seconds",
189 36 : "Seconds to complete a request",
190 36 : &["request_type", "result"],
191 36 : buckets.to_vec(),
192 36 : )
193 36 : .unwrap();
194 864 : let req_seconds = PassFailCancelledRequestTyped::build_with(|kind, outcome| {
195 864 : req_seconds.with_label_values(&[kind.as_str(), outcome.as_str()])
196 864 : });
197 36 :
198 36 : let wait_seconds = register_histogram_vec!(
199 36 : "remote_storage_s3_wait_seconds",
200 36 : "Seconds rate limited",
201 36 : &["request_type"],
202 36 : buckets.to_vec(),
203 36 : )
204 36 : .unwrap();
205 36 : let wait_seconds =
206 288 : RequestTyped::build_with(|kind| wait_seconds.with_label_values(&[kind.as_str()]));
207 36 :
208 36 : let cancelled_waits = register_int_counter_vec!(
209 36 : "remote_storage_s3_cancelled_waits_total",
210 36 : "Times a semaphore wait has been cancelled per request type",
211 36 : &["request_type"],
212 36 : )
213 36 : .unwrap();
214 36 : let cancelled_waits =
215 288 : RequestTyped::build_with(|kind| cancelled_waits.with_label_values(&[kind.as_str()]));
216 36 :
217 36 : let deleted_objects_total = register_int_counter!(
218 36 : "remote_storage_s3_deleted_objects_total",
219 36 : "Amount of deleted objects in total",
220 36 : )
221 36 : .unwrap();
222 36 :
223 36 : Self {
224 36 : req_seconds,
225 36 : wait_seconds,
226 36 : cancelled_waits,
227 36 : deleted_objects_total,
228 36 : }
229 36 : }
230 : }
|