Line data Source code
1 : use metrics::{
2 : register_histogram_vec, register_int_counter, register_int_counter_vec, Histogram, IntCounter,
3 : };
4 : use once_cell::sync::Lazy;
5 :
6 : pub(super) static BUCKET_METRICS: Lazy<BucketMetrics> = Lazy::new(Default::default);
7 :
8 : #[derive(Clone, Copy, Debug)]
9 : pub(crate) enum RequestKind {
10 : Get = 0,
11 : Put = 1,
12 : Delete = 2,
13 : List = 3,
14 : Copy = 4,
15 : TimeTravel = 5,
16 : }
17 :
18 : use scopeguard::ScopeGuard;
19 : use RequestKind::*;
20 :
21 : impl RequestKind {
22 750 : const fn as_str(&self) -> &'static str {
23 750 : match self {
24 125 : Get => "get_object",
25 125 : Put => "put_object",
26 125 : Delete => "delete_object",
27 125 : List => "list_objects",
28 125 : Copy => "copy_object",
29 125 : TimeTravel => "time_travel_recover",
30 : }
31 750 : }
32 1390 : const fn as_index(&self) -> usize {
33 1390 : *self as usize
34 1390 : }
35 : }
36 :
37 : pub(crate) struct RequestTyped<C>([C; 6]);
38 :
39 : impl<C> RequestTyped<C> {
40 640 : pub(crate) fn get(&self, kind: RequestKind) -> &C {
41 640 : &self.0[kind.as_index()]
42 640 : }
43 :
44 125 : fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
45 125 : use RequestKind::*;
46 125 : let mut it = [Get, Put, Delete, List, Copy, TimeTravel].into_iter();
47 750 : let arr = std::array::from_fn::<C, 6, _>(|index| {
48 750 : let next = it.next().unwrap();
49 750 : assert_eq!(index, next.as_index());
50 750 : f(next)
51 750 : });
52 :
53 125 : if let Some(next) = it.next() {
54 0 : panic!("unexpected {next:?}");
55 125 : }
56 125 :
57 125 : RequestTyped(arr)
58 125 : }
59 : }
60 :
61 : impl RequestTyped<Histogram> {
62 266 : pub(crate) fn observe_elapsed(&self, kind: RequestKind, started_at: std::time::Instant) {
63 266 : self.get(kind).observe(started_at.elapsed().as_secs_f64())
64 266 : }
65 : }
66 :
67 : pub(crate) struct PassFailCancelledRequestTyped<C> {
68 : success: RequestTyped<C>,
69 : fail: RequestTyped<C>,
70 : cancelled: RequestTyped<C>,
71 : }
72 :
73 : #[derive(Debug, Clone, Copy)]
74 : pub(crate) enum AttemptOutcome {
75 : Ok,
76 : Err,
77 : Cancelled,
78 : }
79 :
80 : impl<T, E> From<&Result<T, E>> for AttemptOutcome {
81 296 : fn from(value: &Result<T, E>) -> Self {
82 296 : match value {
83 294 : Ok(_) => AttemptOutcome::Ok,
84 2 : Err(_) => AttemptOutcome::Err,
85 : }
86 296 : }
87 : }
88 :
89 : impl AttemptOutcome {
90 450 : pub(crate) fn as_str(&self) -> &'static str {
91 450 : match self {
92 150 : AttemptOutcome::Ok => "ok",
93 150 : AttemptOutcome::Err => "err",
94 150 : AttemptOutcome::Cancelled => "cancelled",
95 : }
96 450 : }
97 : }
98 :
99 : impl<C> PassFailCancelledRequestTyped<C> {
100 374 : pub(crate) fn get(&self, kind: RequestKind, outcome: AttemptOutcome) -> &C {
101 374 : let target = match outcome {
102 366 : AttemptOutcome::Ok => &self.success,
103 2 : AttemptOutcome::Err => &self.fail,
104 6 : AttemptOutcome::Cancelled => &self.cancelled,
105 : };
106 374 : target.get(kind)
107 374 : }
108 :
109 25 : fn build_with(mut f: impl FnMut(RequestKind, AttemptOutcome) -> C) -> Self {
110 150 : let success = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Ok));
111 150 : let fail = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Err));
112 150 : let cancelled = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Cancelled));
113 25 :
114 25 : PassFailCancelledRequestTyped {
115 25 : success,
116 25 : fail,
117 25 : cancelled,
118 25 : }
119 25 : }
120 : }
121 :
122 : impl PassFailCancelledRequestTyped<Histogram> {
123 374 : pub(crate) fn observe_elapsed(
124 374 : &self,
125 374 : kind: RequestKind,
126 374 : outcome: impl Into<AttemptOutcome>,
127 374 : started_at: std::time::Instant,
128 374 : ) {
129 374 : self.get(kind, outcome.into())
130 374 : .observe(started_at.elapsed().as_secs_f64())
131 374 : }
132 : }
133 :
134 : /// On drop (cancellation) count towards [`BucketMetrics::cancelled_waits`].
135 266 : pub(crate) fn start_counting_cancelled_wait(
136 266 : kind: RequestKind,
137 266 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
138 266 : scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
139 0 : crate::metrics::BUCKET_METRICS
140 0 : .cancelled_waits
141 0 : .get(kind)
142 0 : .inc()
143 266 : })
144 266 : }
145 :
146 : /// On drop (cancellation) add time to [`BucketMetrics::req_seconds`].
147 374 : pub(crate) fn start_measuring_requests(
148 374 : kind: RequestKind,
149 374 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
150 374 : scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
151 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
152 0 : kind,
153 0 : AttemptOutcome::Cancelled,
154 0 : started_at,
155 0 : )
156 374 : })
157 374 : }
158 :
159 : pub(crate) struct BucketMetrics {
160 : /// Full request duration until successful completion, error or cancellation.
161 : pub(crate) req_seconds: PassFailCancelledRequestTyped<Histogram>,
162 : /// Total amount of seconds waited on queue.
163 : pub(crate) wait_seconds: RequestTyped<Histogram>,
164 :
165 : /// Track how many semaphore awaits were cancelled per request type.
166 : ///
167 : /// This is in case cancellations are happening more than expected.
168 : pub(crate) cancelled_waits: RequestTyped<IntCounter>,
169 :
170 : /// Total amount of deleted objects in batches or single requests.
171 : pub(crate) deleted_objects_total: IntCounter,
172 : }
173 :
174 : impl Default for BucketMetrics {
175 25 : fn default() -> Self {
176 25 : let buckets = [0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0];
177 25 :
178 25 : let req_seconds = register_histogram_vec!(
179 : "remote_storage_s3_request_seconds",
180 : "Seconds to complete a request",
181 : &["request_type", "result"],
182 : buckets.to_vec(),
183 : )
184 25 : .unwrap();
185 450 : let req_seconds = PassFailCancelledRequestTyped::build_with(|kind, outcome| {
186 450 : req_seconds.with_label_values(&[kind.as_str(), outcome.as_str()])
187 450 : });
188 25 :
189 25 : let wait_seconds = register_histogram_vec!(
190 : "remote_storage_s3_wait_seconds",
191 : "Seconds rate limited",
192 : &["request_type"],
193 : buckets.to_vec(),
194 : )
195 25 : .unwrap();
196 25 : let wait_seconds =
197 150 : RequestTyped::build_with(|kind| wait_seconds.with_label_values(&[kind.as_str()]));
198 25 :
199 25 : let cancelled_waits = register_int_counter_vec!(
200 : "remote_storage_s3_cancelled_waits_total",
201 : "Times a semaphore wait has been cancelled per request type",
202 : &["request_type"],
203 : )
204 25 : .unwrap();
205 25 : let cancelled_waits =
206 150 : RequestTyped::build_with(|kind| cancelled_waits.with_label_values(&[kind.as_str()]));
207 25 :
208 25 : let deleted_objects_total = register_int_counter!(
209 : "remote_storage_s3_deleted_objects_total",
210 : "Amount of deleted objects in total",
211 : )
212 25 : .unwrap();
213 25 :
214 25 : Self {
215 25 : req_seconds,
216 25 : wait_seconds,
217 25 : cancelled_waits,
218 25 : deleted_objects_total,
219 25 : }
220 25 : }
221 : }
|