TLA Line data Source code
1 : //! Periodically collect consumption metrics for all active tenants
2 : //! and push them to a HTTP endpoint.
3 : use crate::context::{DownloadBehavior, RequestContext};
4 : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
5 : use crate::tenant::tasks::BackgroundLoopKind;
6 : use crate::tenant::{mgr, LogicalSizeCalculationCause};
7 : use camino::Utf8PathBuf;
8 : use consumption_metrics::EventType;
9 : use pageserver_api::models::TenantState;
10 : use reqwest::Url;
11 : use std::collections::HashMap;
12 : use std::sync::Arc;
13 : use std::time::{Duration, SystemTime};
14 : use tracing::*;
15 : use utils::id::NodeId;
16 :
17 : mod metrics;
18 : use metrics::MetricsKey;
19 : mod disk_cache;
20 : mod upload;
21 :
22 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
23 :
24 : /// Basically a key-value pair, but usually in a Vec except for [`Cache`].
25 : ///
26 : /// This is as opposed to `consumption_metrics::Event` which is the externally communicated form.
27 : /// Difference is basically the missing idempotency key, which lives only for the duration of
28 : /// upload attempts.
29 : type RawMetric = (MetricsKey, (EventType, u64));
30 :
31 : /// Caches the [`RawMetric`]s
32 : ///
33 : /// In practice, during startup, last sent values are stored here to be used in calculating new
34 : /// ones. After successful uploading, the cached values are updated to cache. This used to be used
35 : /// for deduplication, but that is no longer needed.
36 : type Cache = HashMap<MetricsKey, (EventType, u64)>;
37 :
38 : /// Main thread that serves metrics collection
39 CBC 6 : pub async fn collect_metrics(
40 6 : metric_collection_endpoint: &Url,
41 6 : metric_collection_interval: Duration,
42 6 : _cached_metric_collection_interval: Duration,
43 6 : synthetic_size_calculation_interval: Duration,
44 6 : node_id: NodeId,
45 6 : local_disk_storage: Utf8PathBuf,
46 6 : ctx: RequestContext,
47 6 : ) -> anyhow::Result<()> {
48 6 : if _cached_metric_collection_interval != Duration::ZERO {
49 UBC 0 : tracing::warn!(
50 0 : "cached_metric_collection_interval is no longer used, please set it to zero."
51 0 : )
52 CBC 6 : }
53 :
54 : // spin up background worker that caclulates tenant sizes
55 6 : let worker_ctx =
56 6 : ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
57 6 : task_mgr::spawn(
58 6 : BACKGROUND_RUNTIME.handle(),
59 6 : TaskKind::CalculateSyntheticSize,
60 6 : None,
61 6 : None,
62 6 : "synthetic size calculation",
63 6 : false,
64 6 : async move {
65 6 : calculate_synthetic_size_worker(synthetic_size_calculation_interval, &worker_ctx)
66 6 : .instrument(info_span!("synthetic_size_worker"))
67 31 : .await?;
68 3 : Ok(())
69 6 : },
70 6 : );
71 6 :
72 6 : let path: Arc<Utf8PathBuf> = Arc::new(local_disk_storage);
73 6 :
74 6 : let cancel = task_mgr::shutdown_token();
75 6 :
76 6 : let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
77 :
78 6 : let mut cached_metrics = tokio::select! {
79 : _ = cancel.cancelled() => return Ok(()),
80 6 : ret = restore_and_reschedule => ret,
81 : };
82 :
83 : // define client here to reuse it for all requests
84 6 : let client = reqwest::ClientBuilder::new()
85 6 : .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
86 6 : .build()
87 6 : .expect("Failed to create http client with timeout");
88 6 :
89 6 : let node_id = node_id.to_string();
90 6 :
91 6 : // reminder: ticker is ready immediatedly
92 6 : let mut ticker = tokio::time::interval(metric_collection_interval);
93 :
94 : loop {
95 22 : let tick_at = tokio::select! {
96 : _ = cancel.cancelled() => return Ok(()),
97 18 : tick_at = ticker.tick() => tick_at,
98 : };
99 :
100 : // these are point in time, with variable "now"
101 18 : let metrics = metrics::collect_all_metrics(&cached_metrics, &ctx).await;
102 :
103 18 : if metrics.is_empty() {
104 3 : continue;
105 15 : }
106 15 :
107 15 : let metrics = Arc::new(metrics);
108 15 :
109 15 : // why not race cancellation here? because we are one of the last tasks, and if we are
110 15 : // already here, better to try to flush the new values.
111 15 :
112 15 : let flush = async {
113 69 : match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
114 : Ok(()) => {
115 14 : tracing::debug!("flushed metrics to disk");
116 : }
117 UBC 0 : Err(e) => {
118 0 : // idea here is that if someone creates a directory as our path, then they
119 0 : // might notice it from the logs before shutdown and remove it
120 0 : tracing::error!("failed to persist metrics to {path:?}: {e:#}");
121 : }
122 : }
123 CBC 14 : };
124 :
125 15 : let upload = async {
126 15 : let res = upload::upload_metrics(
127 15 : &client,
128 15 : metric_collection_endpoint,
129 15 : &cancel,
130 15 : &node_id,
131 15 : &metrics,
132 15 : &mut cached_metrics,
133 15 : )
134 163 : .await;
135 14 : if let Err(e) = res {
136 : // serialization error which should never happen
137 UBC 0 : tracing::error!("failed to upload due to {e:#}");
138 CBC 14 : }
139 14 : };
140 :
141 : // let these run concurrently
142 15 : let (_, _) = tokio::join!(flush, upload);
143 :
144 13 : crate::tenant::tasks::warn_when_period_overrun(
145 13 : tick_at.elapsed(),
146 13 : metric_collection_interval,
147 13 : BackgroundLoopKind::ConsumptionMetricsCollectMetrics,
148 13 : );
149 : }
150 3 : }
151 :
152 : /// Called on the first iteration in an attempt to join the metric uploading schedule from previous
153 : /// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
154 : ///
155 : /// Cancellation safe.
156 6 : async fn restore_and_reschedule(
157 6 : path: &Arc<Utf8PathBuf>,
158 6 : metric_collection_interval: Duration,
159 6 : ) -> Cache {
160 6 : let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
161 3 : Ok(found_some) => {
162 3 : // there is no min needed because we write these sequentially in
163 3 : // collect_all_metrics
164 3 : let earlier_metric_at = found_some
165 3 : .iter()
166 3 : .map(|(_, (et, _))| et.recorded_at())
167 3 : .copied()
168 3 : .next();
169 3 :
170 3 : let cached = found_some.into_iter().collect::<Cache>();
171 3 :
172 3 : (cached, earlier_metric_at)
173 : }
174 3 : Err(e) => {
175 3 : use std::io::{Error, ErrorKind};
176 3 :
177 3 : let root = e.root_cause();
178 3 : let maybe_ioerr = root.downcast_ref::<Error>();
179 3 : let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
180 3 :
181 3 : if !is_not_found {
182 UBC 0 : tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
183 CBC 3 : }
184 :
185 3 : (HashMap::new(), None)
186 : }
187 : };
188 :
189 6 : if let Some(earlier_metric_at) = earlier_metric_at {
190 3 : let earlier_metric_at: SystemTime = earlier_metric_at.into();
191 :
192 3 : let error = reschedule(earlier_metric_at, metric_collection_interval).await;
193 :
194 3 : if let Some(error) = error {
195 3 : if error.as_secs() >= 60 {
196 UBC 0 : tracing::info!(
197 0 : error_ms = error.as_millis(),
198 0 : "startup scheduling error due to restart"
199 0 : )
200 CBC 3 : }
201 UBC 0 : }
202 CBC 3 : }
203 :
204 6 : cached
205 6 : }
206 :
207 3 : async fn reschedule(
208 3 : earlier_metric_at: SystemTime,
209 3 : metric_collection_interval: Duration,
210 3 : ) -> Option<Duration> {
211 3 : let now = SystemTime::now();
212 3 : match now.duration_since(earlier_metric_at) {
213 3 : Ok(from_last_send) if from_last_send < metric_collection_interval => {
214 UBC 0 : let sleep_for = metric_collection_interval - from_last_send;
215 0 :
216 0 : let deadline = std::time::Instant::now() + sleep_for;
217 0 :
218 0 : tokio::time::sleep_until(deadline.into()).await;
219 :
220 0 : let now = std::time::Instant::now();
221 0 :
222 0 : // executor threads might be busy, add extra measurements
223 0 : Some(if now < deadline {
224 0 : deadline - now
225 : } else {
226 0 : now - deadline
227 : })
228 : }
229 CBC 3 : Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
230 : Err(_) => {
231 UBC 0 : tracing::warn!(
232 0 : ?now,
233 0 : ?earlier_metric_at,
234 0 : "oldest recorded metric is in future; first values will come out with inconsistent timestamps"
235 0 : );
236 0 : earlier_metric_at.duration_since(now).ok()
237 : }
238 : }
239 CBC 3 : }
240 :
241 : /// Caclculate synthetic size for each active tenant
242 6 : async fn calculate_synthetic_size_worker(
243 6 : synthetic_size_calculation_interval: Duration,
244 6 : ctx: &RequestContext,
245 6 : ) -> anyhow::Result<()> {
246 6 : info!("starting calculate_synthetic_size_worker");
247 :
248 : // reminder: ticker is ready immediatedly
249 6 : let mut ticker = tokio::time::interval(synthetic_size_calculation_interval);
250 6 : let cause = LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
251 :
252 : loop {
253 28 : let tick_at = tokio::select! {
254 : _ = task_mgr::shutdown_watcher() => return Ok(()),
255 22 : tick_at = ticker.tick() => tick_at,
256 : };
257 :
258 22 : let tenants = match mgr::list_tenants().await {
259 22 : Ok(tenants) => tenants,
260 UBC 0 : Err(e) => {
261 0 : warn!("cannot get tenant list: {e:#}");
262 0 : continue;
263 : }
264 : };
265 :
266 CBC 41 : for (tenant_id, tenant_state) in tenants {
267 19 : if tenant_state != TenantState::Active {
268 UBC 0 : continue;
269 CBC 19 : }
270 :
271 19 : if let Ok(tenant) = mgr::get_tenant(tenant_id, true).await {
272 : // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
273 : // We can put in some prioritization for consumption metrics.
274 : // Same for the loop that fetches computed metrics.
275 : // By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
276 : // which turns out is really handy to understand the system.
277 19 : if let Err(e) = tenant.calculate_synthetic_size(cause, ctx).await {
278 UBC 0 : error!("failed to calculate synthetic size for tenant {tenant_id}: {e:#}");
279 CBC 19 : }
280 UBC 0 : }
281 : }
282 :
283 CBC 22 : crate::tenant::tasks::warn_when_period_overrun(
284 22 : tick_at.elapsed(),
285 22 : synthetic_size_calculation_interval,
286 22 : BackgroundLoopKind::ConsumptionMetricsSyntheticSizeWorker,
287 22 : );
288 : }
289 3 : }
|