Line data Source code
1 : use metrics::{
2 : register_histogram_vec, register_int_counter, register_int_counter_vec, Histogram, IntCounter,
3 : };
4 : use once_cell::sync::Lazy;
5 :
6 : pub(super) static BUCKET_METRICS: Lazy<BucketMetrics> = Lazy::new(Default::default);
7 :
8 : #[derive(Clone, Copy, Debug)]
9 : pub(crate) enum RequestKind {
10 : Get = 0,
11 : Put = 1,
12 : Delete = 2,
13 : List = 3,
14 : Copy = 4,
15 : TimeTravel = 5,
16 : }
17 :
18 : use scopeguard::ScopeGuard;
19 : use RequestKind::*;
20 :
21 : impl RequestKind {
22 720 : const fn as_str(&self) -> &'static str {
23 720 : match self {
24 120 : Get => "get_object",
25 120 : Put => "put_object",
26 120 : Delete => "delete_object",
27 120 : List => "list_objects",
28 120 : Copy => "copy_object",
29 120 : TimeTravel => "time_travel_recover",
30 : }
31 720 : }
32 1358 : const fn as_index(&self) -> usize {
33 1358 : *self as usize
34 1358 : }
35 : }
36 :
37 : pub(crate) struct RequestTyped<C>([C; 6]);
38 :
39 : impl<C> RequestTyped<C> {
40 638 : pub(crate) fn get(&self, kind: RequestKind) -> &C {
41 638 : &self.0[kind.as_index()]
42 638 : }
43 :
44 120 : fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
45 120 : use RequestKind::*;
46 120 : let mut it = [Get, Put, Delete, List, Copy, TimeTravel].into_iter();
47 720 : let arr = std::array::from_fn::<C, 6, _>(|index| {
48 720 : let next = it.next().unwrap();
49 720 : assert_eq!(index, next.as_index());
50 720 : f(next)
51 720 : });
52 :
53 120 : if let Some(next) = it.next() {
54 0 : panic!("unexpected {next:?}");
55 120 : }
56 120 :
57 120 : RequestTyped(arr)
58 120 : }
59 : }
60 :
61 : impl RequestTyped<Histogram> {
62 265 : pub(crate) fn observe_elapsed(&self, kind: RequestKind, started_at: std::time::Instant) {
63 265 : self.get(kind).observe(started_at.elapsed().as_secs_f64())
64 265 : }
65 : }
66 :
67 : pub(crate) struct PassFailCancelledRequestTyped<C> {
68 : success: RequestTyped<C>,
69 : fail: RequestTyped<C>,
70 : cancelled: RequestTyped<C>,
71 : }
72 :
73 : #[derive(Debug, Clone, Copy)]
74 : pub(crate) enum AttemptOutcome {
75 : Ok,
76 : Err,
77 : Cancelled,
78 : }
79 :
80 : impl<T, E> From<&Result<T, E>> for AttemptOutcome {
81 294 : fn from(value: &Result<T, E>) -> Self {
82 294 : match value {
83 294 : Ok(_) => AttemptOutcome::Ok,
84 0 : Err(_) => AttemptOutcome::Err,
85 : }
86 294 : }
87 : }
88 :
89 : impl AttemptOutcome {
90 432 : pub(crate) fn as_str(&self) -> &'static str {
91 432 : match self {
92 144 : AttemptOutcome::Ok => "ok",
93 144 : AttemptOutcome::Err => "err",
94 144 : AttemptOutcome::Cancelled => "cancelled",
95 : }
96 432 : }
97 : }
98 :
99 : impl<C> PassFailCancelledRequestTyped<C> {
100 373 : pub(crate) fn get(&self, kind: RequestKind, outcome: AttemptOutcome) -> &C {
101 373 : let target = match outcome {
102 366 : AttemptOutcome::Ok => &self.success,
103 1 : AttemptOutcome::Err => &self.fail,
104 6 : AttemptOutcome::Cancelled => &self.cancelled,
105 : };
106 373 : target.get(kind)
107 373 : }
108 :
109 24 : fn build_with(mut f: impl FnMut(RequestKind, AttemptOutcome) -> C) -> Self {
110 144 : let success = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Ok));
111 144 : let fail = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Err));
112 144 : let cancelled = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Cancelled));
113 24 :
114 24 : PassFailCancelledRequestTyped {
115 24 : success,
116 24 : fail,
117 24 : cancelled,
118 24 : }
119 24 : }
120 : }
121 :
122 : impl PassFailCancelledRequestTyped<Histogram> {
123 373 : pub(crate) fn observe_elapsed(
124 373 : &self,
125 373 : kind: RequestKind,
126 373 : outcome: impl Into<AttemptOutcome>,
127 373 : started_at: std::time::Instant,
128 373 : ) {
129 373 : self.get(kind, outcome.into())
130 373 : .observe(started_at.elapsed().as_secs_f64())
131 373 : }
132 : }
133 :
134 : /// On drop (cancellation) count towards [`BucketMetrics::cancelled_waits`].
135 265 : pub(crate) fn start_counting_cancelled_wait(
136 265 : kind: RequestKind,
137 265 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
138 265 : scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
139 0 : crate::metrics::BUCKET_METRICS
140 0 : .cancelled_waits
141 0 : .get(kind)
142 0 : .inc()
143 265 : })
144 265 : }
145 :
146 : /// On drop (cancellation) add time to [`BucketMetrics::req_seconds`].
147 373 : pub(crate) fn start_measuring_requests(
148 373 : kind: RequestKind,
149 373 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
150 373 : scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
151 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
152 0 : kind,
153 0 : AttemptOutcome::Cancelled,
154 0 : started_at,
155 0 : )
156 373 : })
157 373 : }
158 :
159 : pub(crate) struct BucketMetrics {
160 : /// Full request duration until successful completion, error or cancellation.
161 : pub(crate) req_seconds: PassFailCancelledRequestTyped<Histogram>,
162 : /// Total amount of seconds waited on queue.
163 : pub(crate) wait_seconds: RequestTyped<Histogram>,
164 :
165 : /// Track how many semaphore awaits were cancelled per request type.
166 : ///
167 : /// This is in case cancellations are happening more than expected.
168 : pub(crate) cancelled_waits: RequestTyped<IntCounter>,
169 :
170 : /// Total amount of deleted objects in batches or single requests.
171 : pub(crate) deleted_objects_total: IntCounter,
172 : }
173 :
174 : impl Default for BucketMetrics {
175 24 : fn default() -> Self {
176 24 : let buckets = [0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0];
177 24 :
178 24 : let req_seconds = register_histogram_vec!(
179 : "remote_storage_s3_request_seconds",
180 : "Seconds to complete a request",
181 : &["request_type", "result"],
182 : buckets.to_vec(),
183 : )
184 24 : .unwrap();
185 432 : let req_seconds = PassFailCancelledRequestTyped::build_with(|kind, outcome| {
186 432 : req_seconds.with_label_values(&[kind.as_str(), outcome.as_str()])
187 432 : });
188 24 :
189 24 : let wait_seconds = register_histogram_vec!(
190 : "remote_storage_s3_wait_seconds",
191 : "Seconds rate limited",
192 : &["request_type"],
193 : buckets.to_vec(),
194 : )
195 24 : .unwrap();
196 24 : let wait_seconds =
197 144 : RequestTyped::build_with(|kind| wait_seconds.with_label_values(&[kind.as_str()]));
198 24 :
199 24 : let cancelled_waits = register_int_counter_vec!(
200 : "remote_storage_s3_cancelled_waits_total",
201 : "Times a semaphore wait has been cancelled per request type",
202 : &["request_type"],
203 : )
204 24 : .unwrap();
205 24 : let cancelled_waits =
206 144 : RequestTyped::build_with(|kind| cancelled_waits.with_label_values(&[kind.as_str()]));
207 24 :
208 24 : let deleted_objects_total = register_int_counter!(
209 : "remote_storage_s3_deleted_objects_total",
210 : "Amount of deleted objects in total",
211 : )
212 24 : .unwrap();
213 24 :
214 24 : Self {
215 24 : req_seconds,
216 24 : wait_seconds,
217 24 : cancelled_waits,
218 24 : deleted_objects_total,
219 24 : }
220 24 : }
221 : }
|