Line data Source code
1 : use metrics::{
2 : register_histogram_vec, register_int_counter, register_int_counter_vec, Histogram, IntCounter,
3 : };
4 : use once_cell::sync::Lazy;
5 :
6 : pub(super) static BUCKET_METRICS: Lazy<BucketMetrics> = Lazy::new(Default::default);
7 :
8 : #[derive(Clone, Copy, Debug)]
9 : pub(crate) enum RequestKind {
10 : Get = 0,
11 : Put = 1,
12 : Delete = 2,
13 : List = 3,
14 : Copy = 4,
15 : TimeTravel = 5,
16 : Head = 6,
17 : }
18 :
19 : use scopeguard::ScopeGuard;
20 : use RequestKind::*;
21 :
22 : impl RequestKind {
23 1260 : const fn as_str(&self) -> &'static str {
24 1260 : match self {
25 180 : Get => "get_object",
26 180 : Put => "put_object",
27 180 : Delete => "delete_object",
28 180 : List => "list_objects",
29 180 : Copy => "copy_object",
30 180 : TimeTravel => "time_travel_recover",
31 180 : Head => "head_object",
32 : }
33 1260 : }
34 2505 : const fn as_index(&self) -> usize {
35 2505 : *self as usize
36 2505 : }
37 : }
38 :
39 : const REQUEST_KIND_COUNT: usize = 7;
40 : pub(crate) struct RequestTyped<C>([C; REQUEST_KIND_COUNT]);
41 :
42 : impl<C> RequestTyped<C> {
43 1245 : pub(crate) fn get(&self, kind: RequestKind) -> &C {
44 1245 : &self.0[kind.as_index()]
45 1245 : }
46 :
47 180 : fn build_with(mut f: impl FnMut(RequestKind) -> C) -> Self {
48 : use RequestKind::*;
49 180 : let mut it = [Get, Put, Delete, List, Copy, TimeTravel, Head].into_iter();
50 1260 : let arr = std::array::from_fn::<C, REQUEST_KIND_COUNT, _>(|index| {
51 1260 : let next = it.next().unwrap();
52 1260 : assert_eq!(index, next.as_index());
53 1260 : f(next)
54 1260 : });
55 :
56 180 : if let Some(next) = it.next() {
57 0 : panic!("unexpected {next:?}");
58 180 : }
59 180 :
60 180 : RequestTyped(arr)
61 180 : }
62 : }
63 :
64 : impl RequestTyped<Histogram> {
65 503 : pub(crate) fn observe_elapsed(&self, kind: RequestKind, started_at: std::time::Instant) {
66 503 : self.get(kind).observe(started_at.elapsed().as_secs_f64())
67 503 : }
68 : }
69 :
70 : pub(crate) struct PassFailCancelledRequestTyped<C> {
71 : success: RequestTyped<C>,
72 : fail: RequestTyped<C>,
73 : cancelled: RequestTyped<C>,
74 : }
75 :
76 : #[derive(Debug, Clone, Copy)]
77 : pub(crate) enum AttemptOutcome {
78 : Ok,
79 : Err,
80 : Cancelled,
81 : }
82 :
83 : impl<T, E> From<&Result<T, E>> for AttemptOutcome {
84 604 : fn from(value: &Result<T, E>) -> Self {
85 604 : match value {
86 600 : Ok(_) => AttemptOutcome::Ok,
87 4 : Err(_) => AttemptOutcome::Err,
88 : }
89 604 : }
90 : }
91 :
92 : impl AttemptOutcome {
93 756 : pub(crate) fn as_str(&self) -> &'static str {
94 756 : match self {
95 252 : AttemptOutcome::Ok => "ok",
96 252 : AttemptOutcome::Err => "err",
97 252 : AttemptOutcome::Cancelled => "cancelled",
98 : }
99 756 : }
100 : }
101 :
102 : impl<C> PassFailCancelledRequestTyped<C> {
103 742 : pub(crate) fn get(&self, kind: RequestKind, outcome: AttemptOutcome) -> &C {
104 742 : let target = match outcome {
105 726 : AttemptOutcome::Ok => &self.success,
106 6 : AttemptOutcome::Err => &self.fail,
107 10 : AttemptOutcome::Cancelled => &self.cancelled,
108 : };
109 742 : target.get(kind)
110 742 : }
111 :
112 36 : fn build_with(mut f: impl FnMut(RequestKind, AttemptOutcome) -> C) -> Self {
113 252 : let success = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Ok));
114 252 : let fail = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Err));
115 252 : let cancelled = RequestTyped::build_with(|kind| f(kind, AttemptOutcome::Cancelled));
116 36 :
117 36 : PassFailCancelledRequestTyped {
118 36 : success,
119 36 : fail,
120 36 : cancelled,
121 36 : }
122 36 : }
123 : }
124 :
125 : impl PassFailCancelledRequestTyped<Histogram> {
126 742 : pub(crate) fn observe_elapsed(
127 742 : &self,
128 742 : kind: RequestKind,
129 742 : outcome: impl Into<AttemptOutcome>,
130 742 : started_at: std::time::Instant,
131 742 : ) {
132 742 : self.get(kind, outcome.into())
133 742 : .observe(started_at.elapsed().as_secs_f64())
134 742 : }
135 : }
136 :
137 : /// On drop (cancellation) count towards [`BucketMetrics::cancelled_waits`].
138 503 : pub(crate) fn start_counting_cancelled_wait(
139 503 : kind: RequestKind,
140 503 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
141 503 : scopeguard::guard_on_success(std::time::Instant::now(), move |_| {
142 0 : crate::metrics::BUCKET_METRICS
143 0 : .cancelled_waits
144 0 : .get(kind)
145 0 : .inc()
146 503 : })
147 503 : }
148 :
149 : /// On drop (cancellation) add time to [`BucketMetrics::req_seconds`].
150 740 : pub(crate) fn start_measuring_requests(
151 740 : kind: RequestKind,
152 740 : ) -> ScopeGuard<std::time::Instant, impl FnOnce(std::time::Instant), scopeguard::OnSuccess> {
153 740 : scopeguard::guard_on_success(std::time::Instant::now(), move |started_at| {
154 0 : crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed(
155 0 : kind,
156 0 : AttemptOutcome::Cancelled,
157 0 : started_at,
158 0 : )
159 740 : })
160 740 : }
161 :
162 : pub(crate) struct BucketMetrics {
163 : /// Full request duration until successful completion, error or cancellation.
164 : pub(crate) req_seconds: PassFailCancelledRequestTyped<Histogram>,
165 : /// Total amount of seconds waited on queue.
166 : pub(crate) wait_seconds: RequestTyped<Histogram>,
167 :
168 : /// Track how many semaphore awaits were cancelled per request type.
169 : ///
170 : /// This is in case cancellations are happening more than expected.
171 : pub(crate) cancelled_waits: RequestTyped<IntCounter>,
172 :
173 : /// Total amount of deleted objects in batches or single requests.
174 : pub(crate) deleted_objects_total: IntCounter,
175 : }
176 :
177 : impl Default for BucketMetrics {
178 36 : fn default() -> Self {
179 36 : let buckets = [0.01, 0.10, 0.5, 1.0, 5.0, 10.0, 50.0, 100.0];
180 36 :
181 36 : let req_seconds = register_histogram_vec!(
182 36 : "remote_storage_s3_request_seconds",
183 36 : "Seconds to complete a request",
184 36 : &["request_type", "result"],
185 36 : buckets.to_vec(),
186 36 : )
187 36 : .unwrap();
188 756 : let req_seconds = PassFailCancelledRequestTyped::build_with(|kind, outcome| {
189 756 : req_seconds.with_label_values(&[kind.as_str(), outcome.as_str()])
190 756 : });
191 36 :
192 36 : let wait_seconds = register_histogram_vec!(
193 36 : "remote_storage_s3_wait_seconds",
194 36 : "Seconds rate limited",
195 36 : &["request_type"],
196 36 : buckets.to_vec(),
197 36 : )
198 36 : .unwrap();
199 36 : let wait_seconds =
200 252 : RequestTyped::build_with(|kind| wait_seconds.with_label_values(&[kind.as_str()]));
201 36 :
202 36 : let cancelled_waits = register_int_counter_vec!(
203 36 : "remote_storage_s3_cancelled_waits_total",
204 36 : "Times a semaphore wait has been cancelled per request type",
205 36 : &["request_type"],
206 36 : )
207 36 : .unwrap();
208 36 : let cancelled_waits =
209 252 : RequestTyped::build_with(|kind| cancelled_waits.with_label_values(&[kind.as_str()]));
210 36 :
211 36 : let deleted_objects_total = register_int_counter!(
212 36 : "remote_storage_s3_deleted_objects_total",
213 36 : "Amount of deleted objects in total",
214 36 : )
215 36 : .unwrap();
216 36 :
217 36 : Self {
218 36 : req_seconds,
219 36 : wait_seconds,
220 36 : cancelled_waits,
221 36 : deleted_objects_total,
222 36 : }
223 36 : }
224 : }
|