Line data Source code
1 : //! Periodically collect consumption metrics for all active tenants
2 : //! and push them to a HTTP endpoint.
3 : use crate::context::{DownloadBehavior, RequestContext};
4 : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
5 : use crate::tenant::tasks::BackgroundLoopKind;
6 : use crate::tenant::{mgr, LogicalSizeCalculationCause, PageReconstructError, Tenant};
7 : use camino::Utf8PathBuf;
8 : use consumption_metrics::EventType;
9 : use pageserver_api::models::TenantState;
10 : use reqwest::Url;
11 : use std::collections::HashMap;
12 : use std::sync::Arc;
13 : use std::time::{Duration, SystemTime};
14 : use tokio::time::Instant;
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::*;
17 : use utils::id::NodeId;
18 :
19 : mod metrics;
20 : use metrics::MetricsKey;
21 : mod disk_cache;
22 : mod upload;
23 :
24 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
25 :
26 : /// Basically a key-value pair, but usually in a Vec except for [`Cache`].
27 : ///
28 : /// This is as opposed to `consumption_metrics::Event` which is the externally communicated form.
29 : /// Difference is basically the missing idempotency key, which lives only for the duration of
30 : /// upload attempts.
31 : type RawMetric = (MetricsKey, (EventType, u64));
32 :
33 : /// Caches the [`RawMetric`]s
34 : ///
35 : /// In practice, during startup, last sent values are stored here to be used in calculating new
36 : /// ones. After successful uploading, the cached values are updated to cache. This used to be used
37 : /// for deduplication, but that is no longer needed.
38 : type Cache = HashMap<MetricsKey, (EventType, u64)>;
39 :
40 : /// Main thread that serves metrics collection
41 : #[allow(clippy::too_many_arguments)]
42 6 : pub async fn collect_metrics(
43 6 : metric_collection_endpoint: &Url,
44 6 : metric_collection_interval: Duration,
45 6 : _cached_metric_collection_interval: Duration,
46 6 : synthetic_size_calculation_interval: Duration,
47 6 : node_id: NodeId,
48 6 : local_disk_storage: Utf8PathBuf,
49 6 : cancel: CancellationToken,
50 6 : ctx: RequestContext,
51 6 : ) -> anyhow::Result<()> {
52 6 : if _cached_metric_collection_interval != Duration::ZERO {
53 0 : tracing::warn!(
54 0 : "cached_metric_collection_interval is no longer used, please set it to zero."
55 0 : )
56 6 : }
57 :
58 : // spin up background worker that caclulates tenant sizes
59 6 : let worker_ctx =
60 6 : ctx.detached_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
61 6 : task_mgr::spawn(
62 6 : BACKGROUND_RUNTIME.handle(),
63 6 : TaskKind::CalculateSyntheticSize,
64 6 : None,
65 6 : None,
66 6 : "synthetic size calculation",
67 6 : false,
68 6 : async move {
69 6 : calculate_synthetic_size_worker(
70 6 : synthetic_size_calculation_interval,
71 6 : &cancel,
72 6 : &worker_ctx,
73 6 : )
74 6 : .instrument(info_span!("synthetic_size_worker"))
75 26 : .await?;
76 3 : Ok(())
77 6 : },
78 6 : );
79 6 :
80 6 : let path: Arc<Utf8PathBuf> = Arc::new(local_disk_storage);
81 6 :
82 6 : let cancel = task_mgr::shutdown_token();
83 6 :
84 6 : let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
85 :
86 6 : let mut cached_metrics = tokio::select! {
87 : _ = cancel.cancelled() => return Ok(()),
88 6 : ret = restore_and_reschedule => ret,
89 : };
90 :
91 : // define client here to reuse it for all requests
92 6 : let client = reqwest::ClientBuilder::new()
93 6 : .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
94 6 : .build()
95 6 : .expect("Failed to create http client with timeout");
96 6 :
97 6 : let node_id = node_id.to_string();
98 :
99 22 : loop {
100 22 : let started_at = Instant::now();
101 :
102 : // these are point in time, with variable "now"
103 22 : let metrics = metrics::collect_all_metrics(&cached_metrics, &ctx).await;
104 :
105 22 : let metrics = Arc::new(metrics);
106 22 :
107 22 : // why not race cancellation here? because we are one of the last tasks, and if we are
108 22 : // already here, better to try to flush the new values.
109 22 :
110 22 : let flush = async {
111 68 : match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
112 : Ok(()) => {
113 21 : tracing::debug!("flushed metrics to disk");
114 : }
115 0 : Err(e) => {
116 0 : // idea here is that if someone creates a directory as our path, then they
117 0 : // might notice it from the logs before shutdown and remove it
118 0 : tracing::error!("failed to persist metrics to {path:?}: {e:#}");
119 : }
120 : }
121 21 : };
122 :
123 22 : let upload = async {
124 22 : let res = upload::upload_metrics(
125 22 : &client,
126 22 : metric_collection_endpoint,
127 22 : &cancel,
128 22 : &node_id,
129 22 : &metrics,
130 22 : &mut cached_metrics,
131 22 : )
132 186 : .await;
133 21 : if let Err(e) = res {
134 : // serialization error which should never happen
135 0 : tracing::error!("failed to upload due to {e:#}");
136 21 : }
137 21 : };
138 :
139 : // let these run concurrently
140 214 : let (_, _) = tokio::join!(flush, upload);
141 :
142 20 : crate::tenant::tasks::warn_when_period_overrun(
143 20 : started_at.elapsed(),
144 20 : metric_collection_interval,
145 20 : BackgroundLoopKind::ConsumptionMetricsCollectMetrics,
146 20 : );
147 :
148 20 : let res = tokio::time::timeout_at(
149 20 : started_at + metric_collection_interval,
150 20 : task_mgr::shutdown_token().cancelled(),
151 20 : )
152 17 : .await;
153 19 : if res.is_ok() {
154 3 : return Ok(());
155 16 : }
156 : }
157 3 : }
158 :
159 : /// Called on the first iteration in an attempt to join the metric uploading schedule from previous
160 : /// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
161 : ///
162 : /// Cancellation safe.
163 6 : async fn restore_and_reschedule(
164 6 : path: &Arc<Utf8PathBuf>,
165 6 : metric_collection_interval: Duration,
166 6 : ) -> Cache {
167 6 : let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
168 3 : Ok(found_some) => {
169 3 : // there is no min needed because we write these sequentially in
170 3 : // collect_all_metrics
171 3 : let earlier_metric_at = found_some
172 3 : .iter()
173 3 : .map(|(_, (et, _))| et.recorded_at())
174 3 : .copied()
175 3 : .next();
176 3 :
177 3 : let cached = found_some.into_iter().collect::<Cache>();
178 3 :
179 3 : (cached, earlier_metric_at)
180 : }
181 3 : Err(e) => {
182 3 : use std::io::{Error, ErrorKind};
183 3 :
184 3 : let root = e.root_cause();
185 3 : let maybe_ioerr = root.downcast_ref::<Error>();
186 3 : let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
187 3 :
188 3 : if !is_not_found {
189 0 : tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
190 3 : }
191 :
192 3 : (HashMap::new(), None)
193 : }
194 : };
195 :
196 6 : if let Some(earlier_metric_at) = earlier_metric_at {
197 3 : let earlier_metric_at: SystemTime = earlier_metric_at.into();
198 :
199 3 : let error = reschedule(earlier_metric_at, metric_collection_interval).await;
200 :
201 3 : if let Some(error) = error {
202 3 : if error.as_secs() >= 60 {
203 0 : tracing::info!(
204 0 : error_ms = error.as_millis(),
205 0 : "startup scheduling error due to restart"
206 0 : )
207 3 : }
208 0 : }
209 3 : }
210 :
211 6 : cached
212 6 : }
213 :
214 3 : async fn reschedule(
215 3 : earlier_metric_at: SystemTime,
216 3 : metric_collection_interval: Duration,
217 3 : ) -> Option<Duration> {
218 3 : let now = SystemTime::now();
219 3 : match now.duration_since(earlier_metric_at) {
220 3 : Ok(from_last_send) if from_last_send < metric_collection_interval => {
221 0 : let sleep_for = metric_collection_interval - from_last_send;
222 0 :
223 0 : let deadline = std::time::Instant::now() + sleep_for;
224 0 :
225 0 : tokio::time::sleep_until(deadline.into()).await;
226 :
227 0 : let now = std::time::Instant::now();
228 0 :
229 0 : // executor threads might be busy, add extra measurements
230 0 : Some(if now < deadline {
231 0 : deadline - now
232 : } else {
233 0 : now - deadline
234 : })
235 : }
236 3 : Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
237 : Err(_) => {
238 0 : tracing::warn!(
239 0 : ?now,
240 0 : ?earlier_metric_at,
241 0 : "oldest recorded metric is in future; first values will come out with inconsistent timestamps"
242 0 : );
243 0 : earlier_metric_at.duration_since(now).ok()
244 : }
245 : }
246 3 : }
247 :
248 : /// Caclculate synthetic size for each active tenant
249 6 : async fn calculate_synthetic_size_worker(
250 6 : synthetic_size_calculation_interval: Duration,
251 6 : cancel: &CancellationToken,
252 6 : ctx: &RequestContext,
253 6 : ) -> anyhow::Result<()> {
254 6 : info!("starting calculate_synthetic_size_worker");
255 3 : scopeguard::defer! {
256 3 : info!("calculate_synthetic_size_worker stopped");
257 : };
258 :
259 25 : loop {
260 25 : let started_at = Instant::now();
261 :
262 25 : let tenants = match mgr::list_tenants().await {
263 25 : Ok(tenants) => tenants,
264 0 : Err(e) => {
265 0 : warn!("cannot get tenant list: {e:#}");
266 0 : continue;
267 : }
268 : };
269 :
270 46 : for (tenant_shard_id, tenant_state, _gen) in tenants {
271 22 : if tenant_state != TenantState::Active {
272 0 : continue;
273 22 : }
274 22 :
275 22 : if !tenant_shard_id.is_zero() {
276 : // We only send consumption metrics from shard 0, so don't waste time calculating
277 : // synthetic size on other shards.
278 0 : continue;
279 22 : }
280 :
281 22 : let Ok(tenant) = mgr::get_tenant(tenant_shard_id, true) else {
282 0 : continue;
283 : };
284 :
285 : // there is never any reason to exit calculate_synthetic_size_worker following any
286 : // return value -- we don't need to care about shutdown because no tenant is found when
287 : // pageserver is shut down.
288 22 : calculate_and_log(&tenant, cancel, ctx).await;
289 : }
290 :
291 24 : crate::tenant::tasks::warn_when_period_overrun(
292 24 : started_at.elapsed(),
293 24 : synthetic_size_calculation_interval,
294 24 : BackgroundLoopKind::ConsumptionMetricsSyntheticSizeWorker,
295 24 : );
296 :
297 24 : let res = tokio::time::timeout_at(
298 24 : started_at + synthetic_size_calculation_interval,
299 24 : cancel.cancelled(),
300 24 : )
301 22 : .await;
302 22 : if res.is_ok() {
303 3 : return Ok(());
304 19 : }
305 : }
306 3 : }
307 :
308 22 : async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) {
309 : const CAUSE: LogicalSizeCalculationCause =
310 : LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
311 :
312 : // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
313 : // We can put in some prioritization for consumption metrics.
314 : // Same for the loop that fetches computed metrics.
315 : // By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
316 : // which turns out is really handy to understand the system.
317 22 : let Err(e) = tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await else {
318 21 : return;
319 : };
320 :
321 : // this error can be returned if timeline is shutting down, but it does not
322 : // mean the synthetic size worker should terminate. we do not need any checks
323 : // in this function because `mgr::get_tenant` will error out after shutdown has
324 : // progressed to shutting down tenants.
325 0 : let shutting_down = matches!(
326 0 : e.downcast_ref::<PageReconstructError>(),
327 : Some(PageReconstructError::Cancelled | PageReconstructError::AncestorStopping(_))
328 : );
329 :
330 0 : if !shutting_down {
331 0 : let tenant_shard_id = tenant.tenant_shard_id();
332 0 : error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
333 0 : }
334 21 : }
|