Line data Source code
1 : use metrics::{
2 : register_histogram_vec, register_int_counter, register_int_counter_vec, Histogram, IntCounter,
3 : };
4 : use once_cell::sync::Lazy;
5 :
6 : pub(super) static BUCKET_METRICS: Lazy<BucketMetrics> = Lazy::new(Default::default);
7 :
8 : #[derive(Clone, Copy, Debug)]
9 : pub(crate) enum RequestKind {
10 : Get = 0,
11 : Put = 1,
12 : Delete = 2,
13 : List = 3,
14 : Copy = 4,
15 : TimeTravel = 5,
16 : Head = 6,
17 : }
18 :
19 : use scopeguard::ScopeGuard;
20 : use RequestKind::*;
21 :
22 : impl RequestKind {
23 840 : const fn as_str(&self) -> &'static str {
24 840 : match self {
25 120 : Get => "get_object",
26 120 : Put => "put_object",
27 120 : Delete => "delete_object",
28 120 : List => "list_objects",
29 120 : Copy => "copy_object",
30 120 : TimeTravel => "time_travel_recover",
31 120 : Head => "head_object",
32 : }
33 840 : }
34 1484 : const fn as_index(&self) -> usize {
35 1484 : *self as usize
36 1484 : }
37 : }
38 :
39 : const REQUEST_KIND_COUNT: usize = 7;
40 : pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);
41 :
42 : impl<C> RequestTyped<C> {
43 644 : pub(crate) fn get(&self, kind: RequestKind) -> &C {
44 644 : &self.0[kind.as_index()]
45 644 : }
46 :
47 120 : fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
48 : use RequestKind::*;
49 120 : let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter();
50 840 : let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
51 840 : let next = it.next().unwrap();
52 840 : assert_eq!(index, next.as_index());
53 840 : f(next)
54 840 : });
55 :
56 120 : if let Some(next) = it.next() {
57 0 : panic!("unexpected {next:?}");
58 120 : }
59 120 :
60 120 : RequestTyped(arr)
61 120 : }
62 : }
63 :
64 : impl RequestTyped<Histogram> {
65 266 : pub(crate) fn observe_elapsed(&self, kind: RequestKind, started_at: std::time::Instant) {
66 266 : self.get(kind).observe(started_at.elapsed().as_secs_f64())
67 266 : }
68 : }
69 :
70 : pub(crate) struct PassFailCancelledRequestTyped<C> {
71 : success: RequestTyped<C>,
72 : fail: RequestTyped<C>,
73 : cancelled: RequestTyped<C>,
74 : }
75 :
76 : #[derive(Debug, Clone, Copy)]
77 : pub(crate) enum AttemptOutcome {
78 : Ok,
79 : Err,
80 : Cancelled,
81 : }
82 :
83 : impl<T, E> From<&Result<T, E>> for AttemptOutcome {
84 300 : fn from(value: &Result<T, E>) -> Self {
85 300 : match value {
86 300 : Ok(_) => AttemptOutcome::Ok,
87 0 : Err(_) => AttemptOutcome::Err,
88 : }
89 300 : }
90 : }
91 :
92 : impl AttemptOutcome {
93 504 : pub(crate) fn as_str(&self) -> &'static str {
94 504 : match self {
95 168 : AttemptOutcome::Ok => "ok",
96 168 : AttemptOutcome::Err => "err",
97 168 : AttemptOutcome::Cancelled => "cancelled",
98 : }
99 504 : }
100 : }
101 :
102 : impl<C> PassFailCancelledRequestTyped<C> {
103 378 : pub(crate) fn get(&self, kind: RequestKind, outcome: AttemptOutcome) -> &C {
104 378 : let target = match outcome {
105 372 : AttemptOutcome::Ok => &self.success,
106 0 : AttemptOutcome::Err => &self.fail,
107 6 : AttemptOutcome::Cancelled => &self.cancelled,
108 : };
109 378 : target.get(kind)
110 378 : }
111 :
112 24 : fn build_with(mut f: impl FnMut(RequestKind, AttemptOutcome) -> C) -> Self {
113 168 : let success = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Ok));
114 168 : let fail = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Err));
115 168 : let cancelled = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Cancelled));
116 24 :
117 24 : PassFailCancelledRequestTyped {
118 24 : success,
119 24 : fail,
120 24 : cancelled,
121 24 : }
122 24 : }
123 : }
124 :
125 : impl PassFailCancelledRequestTyped<Histogram> {
126 378 : pub(crate) fn observe_elapsed(
127 378 : &self,
128 378 : kind: RequestKind,
129 378 : outcome: impl Into<AttemptOutcome>,
130 378 : started_at: std::time::Instant,
131 378 : ) {
132 378 : self.get(kind, outcome.into())
133 378 : .observe(started_at.elapsed().as_secs_f64())
134 378 : }
135 : }
136 :
137 : /// On drop (cancellation) count towards [`BucketMetrics::cancelled_waits`].
138 266 : pub(crate) fn start_counting_cancelled_wait(
139 266 : kind: RequestKind,
140 266 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
141 266 : scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
142 0 : crate::metrics::BUCKET_METRICS
143 0 : .cancelled_waits
144 0 : .get(kind)
145 0 : .inc()
146 266 : })
147 266 : }
148 :
149 : /// On drop (cancellation) add time to [`BucketMetrics::req_seconds`].
150 378 : pub(crate) fn start_measuring_requests(
151 378 : kind: RequestKind,
152 378 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
153 378 : scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
154 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
155 0 : kind,
156 0 : AttemptOutcome::Cancelled,
157 0 : started_at,
158 0 : )
159 378 : })
160 378 : }
161 :
162 : pub(crate) struct BucketMetrics {
163 : /// Full request duration until successful completion, error or cancellation.
164 : pub(crate) req_seconds: PassFailCancelledRequestTyped<Histogram>,
165 : /// Total amount of seconds waited on queue.
166 : pub(crate) wait_seconds: RequestTyped<Histogram>,
167 :
168 : /// Track how many semaphore awaits were cancelled per request type.
169 : ///
170 : /// This is in case cancellations are happening more than expected.
171 : pub(crate) cancelled_waits: RequestTyped<IntCounter>,
172 :
173 : /// Total amount of deleted objects in batches or single requests.
174 : pub(crate) deleted_objects_total: IntCounter,
175 : }
176 :
177 : impl Default for BucketMetrics {
178 24 : fn default() -> Self {
179 24 : let buckets = [0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0];
180 24 :
181 24 : let req_seconds = register_histogram_vec!(
182 24 : "remote_storage_s3_request_seconds",
183 24 : "Seconds to complete a request",
184 24 : &["request_type", "result"],
185 24 : buckets.to_vec(),
186 24 : )
187 24 : .unwrap();
188 504 : let req_seconds = PassFailCancelledRequestTyped::build_with(|kind, outcome| {
189 504 : req_seconds.with_label_values(&[kind.as_str(), outcome.as_str()])
190 504 : });
191 24 :
192 24 : let wait_seconds = register_histogram_vec!(
193 24 : "remote_storage_s3_wait_seconds",
194 24 : "Seconds rate limited",
195 24 : &["request_type"],
196 24 : buckets.to_vec(),
197 24 : )
198 24 : .unwrap();
199 24 : let wait_seconds =
200 168 : RequestTyped::build_with(|kind| wait_seconds.with_label_values(&[kind.as_str()]));
201 24 :
202 24 : let cancelled_waits = register_int_counter_vec!(
203 24 : "remote_storage_s3_cancelled_waits_total",
204 24 : "Times a semaphore wait has been cancelled per request type",
205 24 : &["request_type"],
206 24 : )
207 24 : .unwrap();
208 24 : let cancelled_waits =
209 168 : RequestTyped::build_with(|kind| cancelled_waits.with_label_values(&[kind.as_str()]));
210 24 :
211 24 : let deleted_objects_total = register_int_counter!(
212 24 : "remote_storage_s3_deleted_objects_total",
213 24 : "Amount of deleted objects in total",
214 24 : )
215 24 : .unwrap();
216 24 :
217 24 : Self {
218 24 : req_seconds,
219 24 : wait_seconds,
220 24 : cancelled_waits,
221 24 : deleted_objects_total,
222 24 : }
223 24 : }
224 : }
|