Line data Source code
1 : //! Periodically collect consumption metrics for all active tenants
2 : //! and push them to a HTTP endpoint.
3 : use std::collections::HashMap;
4 : use std::sync::Arc;
5 : use std::time::{Duration, SystemTime};
6 :
7 : use camino::Utf8PathBuf;
8 : use consumption_metrics::EventType;
9 : use itertools::Itertools as _;
10 : use pageserver_api::models::TenantState;
11 : use remote_storage::{GenericRemoteStorage, RemoteStorageConfig};
12 : use reqwest::Url;
13 : use serde::{Deserialize, Serialize};
14 : use tokio::time::Instant;
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::*;
17 : use utils::id::NodeId;
18 :
19 : use crate::config::PageServerConf;
20 : use crate::consumption_metrics::metrics::MetricsKey;
21 : use crate::consumption_metrics::upload::KeyGen as _;
22 : use crate::context::{DownloadBehavior, RequestContext};
23 : use crate::task_mgr::{self, BACKGROUND_RUNTIME, TaskKind};
24 : use crate::tenant::mgr::TenantManager;
25 : use crate::tenant::size::CalculateSyntheticSizeError;
26 : use crate::tenant::tasks::BackgroundLoopKind;
27 : use crate::tenant::{LogicalSizeCalculationCause, Tenant};
28 :
29 : mod disk_cache;
30 : mod metrics;
31 : mod upload;
32 :
33 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
34 :
35 : /// Basically a key-value pair, but usually in a Vec except for [`Cache`].
36 : ///
37 : /// This is as opposed to `consumption_metrics::Event` which is the externally communicated form.
38 : /// Difference is basically the missing idempotency key, which lives only for the duration of
39 : /// upload attempts.
40 : type RawMetric = (MetricsKey, (EventType, u64));
41 :
42 : /// The new serializable metrics format
43 8 : #[derive(Serialize, Deserialize)]
44 : struct NewMetricsRoot {
45 : version: usize,
46 : metrics: Vec<NewRawMetric>,
47 : }
48 :
49 : impl NewMetricsRoot {
50 8 : pub fn is_v2_metrics(json_value: &serde_json::Value) -> bool {
51 8 : if let Some(ver) = json_value.get("version") {
52 4 : if let Some(2) = ver.as_u64() {
53 4 : return true;
54 0 : }
55 4 : }
56 4 : false
57 8 : }
58 : }
59 :
60 : /// The new serializable metrics format
61 : #[derive(Serialize)]
62 : struct NewMetricsRefRoot<'a> {
63 : version: usize,
64 : metrics: &'a [NewRawMetric],
65 : }
66 :
67 : impl<'a> NewMetricsRefRoot<'a> {
68 4 : fn new(metrics: &'a [NewRawMetric]) -> Self {
69 4 : Self {
70 4 : version: 2,
71 4 : metrics,
72 4 : }
73 4 : }
74 : }
75 :
76 : /// The new serializable metrics format
77 72 : #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
78 : struct NewRawMetric {
79 : key: MetricsKey,
80 : kind: EventType,
81 : value: u64,
82 : // TODO: add generation field and check against generations
83 : }
84 :
85 : impl NewRawMetric {
86 : #[cfg(test)]
87 36 : fn to_kv_pair(&self) -> (MetricsKey, NewRawMetric) {
88 36 : (self.key, self.clone())
89 36 : }
90 : }
91 :
92 : /// Caches the [`RawMetric`]s
93 : ///
94 : /// In practice, during startup, last sent values are stored here to be used in calculating new
95 : /// ones. After successful uploading, the cached values are updated to cache. This used to be used
96 : /// for deduplication, but that is no longer needed.
97 : type Cache = HashMap<MetricsKey, NewRawMetric>;
98 :
99 0 : pub async fn run(
100 0 : conf: &'static PageServerConf,
101 0 : tenant_manager: Arc<TenantManager>,
102 0 : cancel: CancellationToken,
103 0 : ) {
104 0 : let Some(metric_collection_endpoint) = conf.metric_collection_endpoint.as_ref() else {
105 0 : return;
106 : };
107 :
108 0 : let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");
109 0 :
110 0 : let metrics_ctx = RequestContext::todo_child(
111 0 : TaskKind::MetricsCollection,
112 0 : // This task itself shouldn't download anything.
113 0 : // The actual size calculation does need downloads, and
114 0 : // creates a child context with the right DownloadBehavior.
115 0 : DownloadBehavior::Error,
116 0 : );
117 0 : let collect_metrics = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
118 0 : "consumption metrics collection",
119 0 : collect_metrics(
120 0 : tenant_manager.clone(),
121 0 : metric_collection_endpoint,
122 0 : &conf.metric_collection_bucket,
123 0 : conf.metric_collection_interval,
124 0 : conf.id,
125 0 : local_disk_storage,
126 0 : cancel.clone(),
127 0 : metrics_ctx,
128 0 : )
129 0 : .instrument(info_span!("metrics_collection")),
130 : ));
131 :
132 0 : let worker_ctx =
133 0 : RequestContext::todo_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
134 0 : let synthetic_size_worker = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
135 0 : "synthetic size calculation",
136 0 : calculate_synthetic_size_worker(
137 0 : tenant_manager.clone(),
138 0 : conf.synthetic_size_calculation_interval,
139 0 : cancel.clone(),
140 0 : worker_ctx,
141 0 : )
142 0 : .instrument(info_span!("synthetic_size_worker")),
143 : ));
144 :
145 0 : let (collect_metrics, synthetic_size_worker) =
146 0 : futures::future::join(collect_metrics, synthetic_size_worker).await;
147 0 : collect_metrics
148 0 : .expect("unreachable: exit_on_panic_or_error would catch the panic and exit the process");
149 0 : synthetic_size_worker
150 0 : .expect("unreachable: exit_on_panic_or_error would catch the panic and exit the process");
151 0 : }
152 :
153 : /// Main thread that serves metrics collection
154 : #[allow(clippy::too_many_arguments)]
155 0 : async fn collect_metrics(
156 0 : tenant_manager: Arc<TenantManager>,
157 0 : metric_collection_endpoint: &Url,
158 0 : metric_collection_bucket: &Option<RemoteStorageConfig>,
159 0 : metric_collection_interval: Duration,
160 0 : node_id: NodeId,
161 0 : local_disk_storage: Utf8PathBuf,
162 0 : cancel: CancellationToken,
163 0 : ctx: RequestContext,
164 0 : ) -> anyhow::Result<()> {
165 0 : let path: Arc<Utf8PathBuf> = Arc::new(local_disk_storage);
166 0 :
167 0 : let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
168 :
169 0 : let mut cached_metrics = tokio::select! {
170 0 : _ = cancel.cancelled() => return Ok(()),
171 0 : ret = restore_and_reschedule => ret,
172 0 : };
173 0 :
174 0 : // define client here to reuse it for all requests
175 0 : let client = reqwest::ClientBuilder::new()
176 0 : .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
177 0 : .build()
178 0 : .expect("Failed to create http client with timeout");
179 :
180 0 : let bucket_client = if let Some(bucket_config) = metric_collection_bucket {
181 0 : match GenericRemoteStorage::from_config(bucket_config).await {
182 0 : Ok(client) => Some(client),
183 0 : Err(e) => {
184 0 : // Non-fatal error: if we were given an invalid config, we will proceed
185 0 : // with sending metrics over the network, but not to S3.
186 0 : tracing::warn!("Invalid configuration for metric_collection_bucket: {e}");
187 0 : None
188 : }
189 : }
190 : } else {
191 0 : None
192 : };
193 :
194 0 : let node_id = node_id.to_string();
195 :
196 : loop {
197 0 : let started_at = Instant::now();
198 :
199 : // these are point in time, with variable "now"
200 0 : let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &ctx).await;
201 :
202 : // Pre-generate event idempotency keys, to reuse them across the bucket
203 : // and HTTP sinks.
204 0 : let idempotency_keys = std::iter::repeat_with(|| node_id.as_str().generate())
205 0 : .take(metrics.len())
206 0 : .collect_vec();
207 0 :
208 0 : let metrics = Arc::new(metrics);
209 0 :
210 0 : // why not race cancellation here? because we are one of the last tasks, and if we are
211 0 : // already here, better to try to flush the new values.
212 0 :
213 0 : let flush = async {
214 0 : match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
215 : Ok(()) => {
216 0 : tracing::debug!("flushed metrics to disk");
217 : }
218 0 : Err(e) => {
219 0 : // idea here is that if someone creates a directory as our path, then they
220 0 : // might notice it from the logs before shutdown and remove it
221 0 : tracing::error!("failed to persist metrics to {path:?}: {e:#}");
222 : }
223 : }
224 :
225 0 : if let Some(bucket_client) = &bucket_client {
226 0 : let res = upload::upload_metrics_bucket(
227 0 : bucket_client,
228 0 : &cancel,
229 0 : &node_id,
230 0 : &metrics,
231 0 : &idempotency_keys,
232 0 : )
233 0 : .await;
234 0 : if let Err(e) = res {
235 0 : tracing::error!("failed to upload to remote storage: {e:#}");
236 0 : }
237 0 : }
238 0 : };
239 :
240 0 : let upload = async {
241 0 : let res = upload::upload_metrics_http(
242 0 : &client,
243 0 : metric_collection_endpoint,
244 0 : &cancel,
245 0 : &metrics,
246 0 : &mut cached_metrics,
247 0 : &idempotency_keys,
248 0 : )
249 0 : .await;
250 0 : if let Err(e) = res {
251 : // serialization error which should never happen
252 0 : tracing::error!("failed to upload via HTTP due to {e:#}");
253 0 : }
254 0 : };
255 :
256 : // let these run concurrently
257 0 : let (_, _) = tokio::join!(flush, upload);
258 :
259 0 : crate::tenant::tasks::warn_when_period_overrun(
260 0 : started_at.elapsed(),
261 0 : metric_collection_interval,
262 0 : BackgroundLoopKind::ConsumptionMetricsCollectMetrics,
263 0 : );
264 :
265 0 : let res =
266 0 : tokio::time::timeout_at(started_at + metric_collection_interval, cancel.cancelled())
267 0 : .await;
268 0 : if res.is_ok() {
269 0 : return Ok(());
270 0 : }
271 : }
272 0 : }
273 :
274 : /// Called on the first iteration in an attempt to join the metric uploading schedule from previous
275 : /// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
276 : ///
277 : /// Cancellation safe.
278 0 : async fn restore_and_reschedule(
279 0 : path: &Arc<Utf8PathBuf>,
280 0 : metric_collection_interval: Duration,
281 0 : ) -> Cache {
282 0 : let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
283 0 : Ok(found_some) => {
284 0 : // there is no min needed because we write these sequentially in
285 0 : // collect_all_metrics
286 0 : let earlier_metric_at = found_some
287 0 : .iter()
288 0 : .map(|item| item.kind.recorded_at())
289 0 : .copied()
290 0 : .next();
291 0 :
292 0 : let cached = found_some
293 0 : .into_iter()
294 0 : .map(|item| (item.key, item))
295 0 : .collect::<Cache>();
296 0 :
297 0 : (cached, earlier_metric_at)
298 : }
299 0 : Err(e) => {
300 : use std::io::{Error, ErrorKind};
301 :
302 0 : let root = e.root_cause();
303 0 : let maybe_ioerr = root.downcast_ref::<Error>();
304 0 : let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
305 0 :
306 0 : if !is_not_found {
307 0 : tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
308 0 : }
309 :
310 0 : (HashMap::new(), None)
311 : }
312 : };
313 :
314 0 : if let Some(earlier_metric_at) = earlier_metric_at {
315 0 : let earlier_metric_at: SystemTime = earlier_metric_at.into();
316 :
317 0 : let error = reschedule(earlier_metric_at, metric_collection_interval).await;
318 :
319 0 : if let Some(error) = error {
320 0 : if error.as_secs() >= 60 {
321 0 : tracing::info!(
322 0 : error_ms = error.as_millis(),
323 0 : "startup scheduling error due to restart"
324 : )
325 0 : }
326 0 : }
327 0 : }
328 :
329 0 : cached
330 0 : }
331 :
332 0 : async fn reschedule(
333 0 : earlier_metric_at: SystemTime,
334 0 : metric_collection_interval: Duration,
335 0 : ) -> Option<Duration> {
336 0 : let now = SystemTime::now();
337 0 : match now.duration_since(earlier_metric_at) {
338 0 : Ok(from_last_send) if from_last_send < metric_collection_interval => {
339 0 : let sleep_for = metric_collection_interval - from_last_send;
340 0 :
341 0 : let deadline = std::time::Instant::now() + sleep_for;
342 0 :
343 0 : tokio::time::sleep_until(deadline.into()).await;
344 :
345 0 : let now = std::time::Instant::now();
346 0 :
347 0 : // executor threads might be busy, add extra measurements
348 0 : Some(if now < deadline {
349 0 : deadline - now
350 : } else {
351 0 : now - deadline
352 : })
353 : }
354 0 : Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
355 : Err(_) => {
356 0 : tracing::warn!(
357 : ?now,
358 : ?earlier_metric_at,
359 0 : "oldest recorded metric is in future; first values will come out with inconsistent timestamps"
360 : );
361 0 : earlier_metric_at.duration_since(now).ok()
362 : }
363 : }
364 0 : }
365 :
366 : /// Caclculate synthetic size for each active tenant
367 0 : async fn calculate_synthetic_size_worker(
368 0 : tenant_manager: Arc<TenantManager>,
369 0 : synthetic_size_calculation_interval: Duration,
370 0 : cancel: CancellationToken,
371 0 : ctx: RequestContext,
372 0 : ) -> anyhow::Result<()> {
373 0 : info!("starting calculate_synthetic_size_worker");
374 0 : scopeguard::defer! {
375 0 : info!("calculate_synthetic_size_worker stopped");
376 0 : };
377 :
378 : loop {
379 0 : let started_at = Instant::now();
380 :
381 0 : let tenants = match tenant_manager.list_tenants() {
382 0 : Ok(tenants) => tenants,
383 0 : Err(e) => {
384 0 : warn!("cannot get tenant list: {e:#}");
385 0 : continue;
386 : }
387 : };
388 :
389 0 : for (tenant_shard_id, tenant_state, _gen) in tenants {
390 0 : if tenant_state != TenantState::Active {
391 0 : continue;
392 0 : }
393 0 :
394 0 : if !tenant_shard_id.is_shard_zero() {
395 : // We only send consumption metrics from shard 0, so don't waste time calculating
396 : // synthetic size on other shards.
397 0 : continue;
398 0 : }
399 :
400 0 : let Ok(tenant) = tenant_manager.get_attached_tenant_shard(tenant_shard_id) else {
401 0 : continue;
402 : };
403 :
404 0 : if !tenant.is_active() {
405 0 : continue;
406 0 : }
407 0 :
408 0 : // there is never any reason to exit calculate_synthetic_size_worker following any
409 0 : // return value -- we don't need to care about shutdown because no tenant is found when
410 0 : // pageserver is shut down.
411 0 : calculate_and_log(&tenant, &cancel, &ctx).await;
412 : }
413 :
414 0 : crate::tenant::tasks::warn_when_period_overrun(
415 0 : started_at.elapsed(),
416 0 : synthetic_size_calculation_interval,
417 0 : BackgroundLoopKind::ConsumptionMetricsSyntheticSizeWorker,
418 0 : );
419 :
420 0 : let res = tokio::time::timeout_at(
421 0 : started_at + synthetic_size_calculation_interval,
422 0 : cancel.cancelled(),
423 0 : )
424 0 : .await;
425 0 : if res.is_ok() {
426 0 : return Ok(());
427 0 : }
428 : }
429 0 : }
430 :
431 0 : async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) {
432 : const CAUSE: LogicalSizeCalculationCause =
433 : LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
434 :
435 : // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
436 : // We can put in some prioritization for consumption metrics.
437 : // Same for the loop that fetches computed metrics.
438 : // By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
439 : // which turns out is really handy to understand the system.
440 0 : match tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await {
441 0 : Ok(_) => {}
442 0 : Err(CalculateSyntheticSizeError::Cancelled) => {}
443 0 : Err(e) => {
444 0 : let tenant_shard_id = tenant.tenant_shard_id();
445 0 : error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
446 : }
447 : }
448 0 : }
|