Line data Source code
1 : //! Periodically collect consumption metrics for all active tenants
2 : //! and push them to a HTTP endpoint.
3 : use crate::config::PageServerConf;
4 : use crate::consumption_metrics::metrics::MetricsKey;
5 : use crate::consumption_metrics::upload::KeyGen as _;
6 : use crate::context::{DownloadBehavior, RequestContext};
7 : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
8 : use crate::tenant::size::CalculateSyntheticSizeError;
9 : use crate::tenant::tasks::BackgroundLoopKind;
10 : use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant};
11 : use camino::Utf8PathBuf;
12 : use consumption_metrics::EventType;
13 : use itertools::Itertools as _;
14 : use pageserver_api::models::TenantState;
15 : use remote_storage::{GenericRemoteStorage, RemoteStorageConfig};
16 : use reqwest::Url;
17 : use serde::{Deserialize, Serialize};
18 : use std::collections::HashMap;
19 : use std::sync::Arc;
20 : use std::time::{Duration, SystemTime};
21 : use tokio::time::Instant;
22 : use tokio_util::sync::CancellationToken;
23 : use tracing::*;
24 : use utils::id::NodeId;
25 :
26 : mod disk_cache;
27 : mod metrics;
28 : mod upload;
29 :
30 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
31 :
32 : /// Basically a key-value pair, but usually in a Vec except for [`Cache`].
33 : ///
34 : /// This is as opposed to `consumption_metrics::Event` which is the externally communicated form.
35 : /// Difference is basically the missing idempotency key, which lives only for the duration of
36 : /// upload attempts.
37 : type RawMetric = (MetricsKey, (EventType, u64));
38 :
39 : /// The new serializable metrics format
40 6 : #[derive(Serialize, Deserialize)]
41 : struct NewMetricsRoot {
42 : version: usize,
43 : metrics: Vec<NewRawMetric>,
44 : }
45 :
46 : impl NewMetricsRoot {
47 4 : pub fn is_v2_metrics(json_value: &serde_json::Value) -> bool {
48 4 : if let Some(ver) = json_value.get("version") {
49 2 : if let Some(2) = ver.as_u64() {
50 2 : return true;
51 0 : }
52 2 : }
53 2 : false
54 4 : }
55 : }
56 :
57 : /// The new serializable metrics format
58 : #[derive(Serialize)]
59 : struct NewMetricsRefRoot<'a> {
60 : version: usize,
61 : metrics: &'a [NewRawMetric],
62 : }
63 :
64 : impl<'a> NewMetricsRefRoot<'a> {
65 2 : fn new(metrics: &'a [NewRawMetric]) -> Self {
66 2 : Self {
67 2 : version: 2,
68 2 : metrics,
69 2 : }
70 2 : }
71 : }
72 :
73 : /// The new serializable metrics format
74 48 : #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
75 : struct NewRawMetric {
76 : key: MetricsKey,
77 : kind: EventType,
78 : value: u64,
79 : // TODO: add generation field and check against generations
80 : }
81 :
82 : impl NewRawMetric {
83 : #[cfg(test)]
84 18 : fn to_kv_pair(&self) -> (MetricsKey, NewRawMetric) {
85 18 : (self.key, self.clone())
86 18 : }
87 : }
88 :
89 : /// Caches the [`RawMetric`]s
90 : ///
91 : /// In practice, during startup, last sent values are stored here to be used in calculating new
92 : /// ones. After successful uploading, the cached values are updated to cache. This used to be used
93 : /// for deduplication, but that is no longer needed.
94 : type Cache = HashMap<MetricsKey, NewRawMetric>;
95 :
96 0 : pub async fn run(
97 0 : conf: &'static PageServerConf,
98 0 : tenant_manager: Arc<TenantManager>,
99 0 : cancel: CancellationToken,
100 0 : ) {
101 0 : let Some(metric_collection_endpoint) = conf.metric_collection_endpoint.as_ref() else {
102 0 : return;
103 : };
104 :
105 0 : let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");
106 0 :
107 0 : let metrics_ctx = RequestContext::todo_child(
108 0 : TaskKind::MetricsCollection,
109 0 : // This task itself shouldn't download anything.
110 0 : // The actual size calculation does need downloads, and
111 0 : // creates a child context with the right DownloadBehavior.
112 0 : DownloadBehavior::Error,
113 0 : );
114 0 : let collect_metrics = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
115 0 : "consumption metrics collection",
116 0 : collect_metrics(
117 0 : tenant_manager.clone(),
118 0 : metric_collection_endpoint,
119 0 : &conf.metric_collection_bucket,
120 0 : conf.metric_collection_interval,
121 0 : conf.id,
122 0 : local_disk_storage,
123 0 : cancel.clone(),
124 0 : metrics_ctx,
125 0 : )
126 0 : .instrument(info_span!("metrics_collection")),
127 : ));
128 :
129 0 : let worker_ctx =
130 0 : RequestContext::todo_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
131 0 : let synthetic_size_worker = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
132 0 : "synthetic size calculation",
133 0 : calculate_synthetic_size_worker(
134 0 : tenant_manager.clone(),
135 0 : conf.synthetic_size_calculation_interval,
136 0 : cancel.clone(),
137 0 : worker_ctx,
138 0 : )
139 0 : .instrument(info_span!("synthetic_size_worker")),
140 : ));
141 :
142 0 : let (collect_metrics, synthetic_size_worker) =
143 0 : futures::future::join(collect_metrics, synthetic_size_worker).await;
144 0 : collect_metrics
145 0 : .expect("unreachable: exit_on_panic_or_error would catch the panic and exit the process");
146 0 : synthetic_size_worker
147 0 : .expect("unreachable: exit_on_panic_or_error would catch the panic and exit the process");
148 0 : }
149 :
150 : /// Main thread that serves metrics collection
151 : #[allow(clippy::too_many_arguments)]
152 0 : async fn collect_metrics(
153 0 : tenant_manager: Arc<TenantManager>,
154 0 : metric_collection_endpoint: &Url,
155 0 : metric_collection_bucket: &Option<RemoteStorageConfig>,
156 0 : metric_collection_interval: Duration,
157 0 : node_id: NodeId,
158 0 : local_disk_storage: Utf8PathBuf,
159 0 : cancel: CancellationToken,
160 0 : ctx: RequestContext,
161 0 : ) -> anyhow::Result<()> {
162 0 : let path: Arc<Utf8PathBuf> = Arc::new(local_disk_storage);
163 0 :
164 0 : let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
165 :
166 0 : let mut cached_metrics = tokio::select! {
167 0 : _ = cancel.cancelled() => return Ok(()),
168 0 : ret = restore_and_reschedule => ret,
169 0 : };
170 0 :
171 0 : // define client here to reuse it for all requests
172 0 : let client = reqwest::ClientBuilder::new()
173 0 : .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
174 0 : .build()
175 0 : .expect("Failed to create http client with timeout");
176 :
177 0 : let bucket_client = if let Some(bucket_config) = metric_collection_bucket {
178 0 : match GenericRemoteStorage::from_config(bucket_config).await {
179 0 : Ok(client) => Some(client),
180 0 : Err(e) => {
181 0 : // Non-fatal error: if we were given an invalid config, we will proceed
182 0 : // with sending metrics over the network, but not to S3.
183 0 : tracing::warn!("Invalid configuration for metric_collection_bucket: {e}");
184 0 : None
185 : }
186 : }
187 : } else {
188 0 : None
189 : };
190 :
191 0 : let node_id = node_id.to_string();
192 :
193 : loop {
194 0 : let started_at = Instant::now();
195 :
196 : // these are point in time, with variable "now"
197 0 : let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &ctx).await;
198 :
199 : // Pre-generate event idempotency keys, to reuse them across the bucket
200 : // and HTTP sinks.
201 0 : let idempotency_keys = std::iter::repeat_with(|| node_id.as_str().generate())
202 0 : .take(metrics.len())
203 0 : .collect_vec();
204 0 :
205 0 : let metrics = Arc::new(metrics);
206 0 :
207 0 : // why not race cancellation here? because we are one of the last tasks, and if we are
208 0 : // already here, better to try to flush the new values.
209 0 :
210 0 : let flush = async {
211 0 : match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
212 : Ok(()) => {
213 0 : tracing::debug!("flushed metrics to disk");
214 : }
215 0 : Err(e) => {
216 0 : // idea here is that if someone creates a directory as our path, then they
217 0 : // might notice it from the logs before shutdown and remove it
218 0 : tracing::error!("failed to persist metrics to {path:?}: {e:#}");
219 : }
220 : }
221 :
222 0 : if let Some(bucket_client) = &bucket_client {
223 0 : let res = upload::upload_metrics_bucket(
224 0 : bucket_client,
225 0 : &cancel,
226 0 : &node_id,
227 0 : &metrics,
228 0 : &idempotency_keys,
229 0 : )
230 0 : .await;
231 0 : if let Err(e) = res {
232 0 : tracing::error!("failed to upload to remote storage: {e:#}");
233 0 : }
234 0 : }
235 0 : };
236 :
237 0 : let upload = async {
238 0 : let res = upload::upload_metrics_http(
239 0 : &client,
240 0 : metric_collection_endpoint,
241 0 : &cancel,
242 0 : &metrics,
243 0 : &mut cached_metrics,
244 0 : &idempotency_keys,
245 0 : )
246 0 : .await;
247 0 : if let Err(e) = res {
248 : // serialization error which should never happen
249 0 : tracing::error!("failed to upload via HTTP due to {e:#}");
250 0 : }
251 0 : };
252 :
253 : // let these run concurrently
254 0 : let (_, _) = tokio::join!(flush, upload);
255 :
256 0 : crate::tenant::tasks::warn_when_period_overrun(
257 0 : started_at.elapsed(),
258 0 : metric_collection_interval,
259 0 : BackgroundLoopKind::ConsumptionMetricsCollectMetrics,
260 0 : );
261 :
262 0 : let res =
263 0 : tokio::time::timeout_at(started_at + metric_collection_interval, cancel.cancelled())
264 0 : .await;
265 0 : if res.is_ok() {
266 0 : return Ok(());
267 0 : }
268 : }
269 0 : }
270 :
271 : /// Called on the first iteration in an attempt to join the metric uploading schedule from previous
272 : /// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
273 : ///
274 : /// Cancellation safe.
275 0 : async fn restore_and_reschedule(
276 0 : path: &Arc<Utf8PathBuf>,
277 0 : metric_collection_interval: Duration,
278 0 : ) -> Cache {
279 0 : let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
280 0 : Ok(found_some) => {
281 0 : // there is no min needed because we write these sequentially in
282 0 : // collect_all_metrics
283 0 : let earlier_metric_at = found_some
284 0 : .iter()
285 0 : .map(|item| item.kind.recorded_at())
286 0 : .copied()
287 0 : .next();
288 0 :
289 0 : let cached = found_some
290 0 : .into_iter()
291 0 : .map(|item| (item.key, item))
292 0 : .collect::<Cache>();
293 0 :
294 0 : (cached, earlier_metric_at)
295 : }
296 0 : Err(e) => {
297 : use std::io::{Error, ErrorKind};
298 :
299 0 : let root = e.root_cause();
300 0 : let maybe_ioerr = root.downcast_ref::<Error>();
301 0 : let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
302 0 :
303 0 : if !is_not_found {
304 0 : tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
305 0 : }
306 :
307 0 : (HashMap::new(), None)
308 : }
309 : };
310 :
311 0 : if let Some(earlier_metric_at) = earlier_metric_at {
312 0 : let earlier_metric_at: SystemTime = earlier_metric_at.into();
313 :
314 0 : let error = reschedule(earlier_metric_at, metric_collection_interval).await;
315 :
316 0 : if let Some(error) = error {
317 0 : if error.as_secs() >= 60 {
318 0 : tracing::info!(
319 0 : error_ms = error.as_millis(),
320 0 : "startup scheduling error due to restart"
321 : )
322 0 : }
323 0 : }
324 0 : }
325 :
326 0 : cached
327 0 : }
328 :
329 0 : async fn reschedule(
330 0 : earlier_metric_at: SystemTime,
331 0 : metric_collection_interval: Duration,
332 0 : ) -> Option<Duration> {
333 0 : let now = SystemTime::now();
334 0 : match now.duration_since(earlier_metric_at) {
335 0 : Ok(from_last_send) if from_last_send < metric_collection_interval => {
336 0 : let sleep_for = metric_collection_interval - from_last_send;
337 0 :
338 0 : let deadline = std::time::Instant::now() + sleep_for;
339 0 :
340 0 : tokio::time::sleep_until(deadline.into()).await;
341 :
342 0 : let now = std::time::Instant::now();
343 0 :
344 0 : // executor threads might be busy, add extra measurements
345 0 : Some(if now < deadline {
346 0 : deadline - now
347 : } else {
348 0 : now - deadline
349 : })
350 : }
351 0 : Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
352 : Err(_) => {
353 0 : tracing::warn!(
354 : ?now,
355 : ?earlier_metric_at,
356 0 : "oldest recorded metric is in future; first values will come out with inconsistent timestamps"
357 : );
358 0 : earlier_metric_at.duration_since(now).ok()
359 : }
360 : }
361 0 : }
362 :
363 : /// Caclculate synthetic size for each active tenant
364 0 : async fn calculate_synthetic_size_worker(
365 0 : tenant_manager: Arc<TenantManager>,
366 0 : synthetic_size_calculation_interval: Duration,
367 0 : cancel: CancellationToken,
368 0 : ctx: RequestContext,
369 0 : ) -> anyhow::Result<()> {
370 0 : info!("starting calculate_synthetic_size_worker");
371 0 : scopeguard::defer! {
372 0 : info!("calculate_synthetic_size_worker stopped");
373 0 : };
374 :
375 : loop {
376 0 : let started_at = Instant::now();
377 :
378 0 : let tenants = match tenant_manager.list_tenants() {
379 0 : Ok(tenants) => tenants,
380 0 : Err(e) => {
381 0 : warn!("cannot get tenant list: {e:#}");
382 0 : continue;
383 : }
384 : };
385 :
386 0 : for (tenant_shard_id, tenant_state, _gen) in tenants {
387 0 : if tenant_state != TenantState::Active {
388 0 : continue;
389 0 : }
390 0 :
391 0 : if !tenant_shard_id.is_shard_zero() {
392 : // We only send consumption metrics from shard 0, so don't waste time calculating
393 : // synthetic size on other shards.
394 0 : continue;
395 0 : }
396 :
397 0 : let Ok(tenant) = tenant_manager.get_attached_tenant_shard(tenant_shard_id) else {
398 0 : continue;
399 : };
400 :
401 0 : if !tenant.is_active() {
402 0 : continue;
403 0 : }
404 0 :
405 0 : // there is never any reason to exit calculate_synthetic_size_worker following any
406 0 : // return value -- we don't need to care about shutdown because no tenant is found when
407 0 : // pageserver is shut down.
408 0 : calculate_and_log(&tenant, &cancel, &ctx).await;
409 : }
410 :
411 0 : crate::tenant::tasks::warn_when_period_overrun(
412 0 : started_at.elapsed(),
413 0 : synthetic_size_calculation_interval,
414 0 : BackgroundLoopKind::ConsumptionMetricsSyntheticSizeWorker,
415 0 : );
416 :
417 0 : let res = tokio::time::timeout_at(
418 0 : started_at + synthetic_size_calculation_interval,
419 0 : cancel.cancelled(),
420 0 : )
421 0 : .await;
422 0 : if res.is_ok() {
423 0 : return Ok(());
424 0 : }
425 : }
426 0 : }
427 :
428 0 : async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) {
429 : const CAUSE: LogicalSizeCalculationCause =
430 : LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
431 :
432 : // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
433 : // We can put in some prioritization for consumption metrics.
434 : // Same for the loop that fetches computed metrics.
435 : // By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
436 : // which turns out is really handy to understand the system.
437 0 : match tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await {
438 0 : Ok(_) => {}
439 0 : Err(CalculateSyntheticSizeError::Cancelled) => {}
440 0 : Err(e) => {
441 0 : let tenant_shard_id = tenant.tenant_shard_id();
442 0 : error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
443 : }
444 : }
445 0 : }
|