Line data Source code
1 : //! Periodically collect consumption metrics for all active tenants
2 : //! and push them to a HTTP endpoint.
3 : use crate::config::PageServerConf;
4 : use crate::context::{DownloadBehavior, RequestContext};
5 : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
6 : use crate::tenant::size::CalculateSyntheticSizeError;
7 : use crate::tenant::tasks::BackgroundLoopKind;
8 : use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant};
9 : use camino::Utf8PathBuf;
10 : use consumption_metrics::EventType;
11 : use pageserver_api::models::TenantState;
12 : use remote_storage::{GenericRemoteStorage, RemoteStorageConfig};
13 : use reqwest::Url;
14 : use std::collections::HashMap;
15 : use std::sync::Arc;
16 : use std::time::{Duration, SystemTime};
17 : use tokio::time::Instant;
18 : use tokio_util::sync::CancellationToken;
19 : use tracing::*;
20 : use utils::id::NodeId;
21 :
22 : mod metrics;
23 : use crate::consumption_metrics::metrics::MetricsKey;
24 : mod disk_cache;
25 : mod upload;
26 :
27 : const DEFAULT_HTTP_REPORTING_TIMEOUT: Duration = Duration::from_secs(60);
28 :
29 : /// Basically a key-value pair, but usually in a Vec except for [`Cache`].
30 : ///
31 : /// This is as opposed to `consumption_metrics::Event` which is the externally communicated form.
32 : /// Difference is basically the missing idempotency key, which lives only for the duration of
33 : /// upload attempts.
34 : type RawMetric = (MetricsKey, (EventType, u64));
35 :
36 : /// Caches the [`RawMetric`]s
37 : ///
38 : /// In practice, during startup, last sent values are stored here to be used in calculating new
39 : /// ones. After successful uploading, the cached values are updated to cache. This used to be used
40 : /// for deduplication, but that is no longer needed.
41 : type Cache = HashMap<MetricsKey, (EventType, u64)>;
42 :
43 0 : pub async fn run(
44 0 : conf: &'static PageServerConf,
45 0 : tenant_manager: Arc<TenantManager>,
46 0 : cancel: CancellationToken,
47 0 : ) {
48 0 : let Some(metric_collection_endpoint) = conf.metric_collection_endpoint.as_ref() else {
49 0 : return;
50 : };
51 :
52 0 : let local_disk_storage = conf.workdir.join("last_consumption_metrics.json");
53 0 :
54 0 : let metrics_ctx = RequestContext::todo_child(
55 0 : TaskKind::MetricsCollection,
56 0 : // This task itself shouldn't download anything.
57 0 : // The actual size calculation does need downloads, and
58 0 : // creates a child context with the right DownloadBehavior.
59 0 : DownloadBehavior::Error,
60 0 : );
61 0 : let collect_metrics = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
62 0 : "consumption metrics collection",
63 0 : collect_metrics(
64 0 : tenant_manager.clone(),
65 0 : metric_collection_endpoint,
66 0 : &conf.metric_collection_bucket,
67 0 : conf.metric_collection_interval,
68 0 : conf.id,
69 0 : local_disk_storage,
70 0 : cancel.clone(),
71 0 : metrics_ctx,
72 0 : )
73 0 : .instrument(info_span!("metrics_collection")),
74 : ));
75 :
76 0 : let worker_ctx =
77 0 : RequestContext::todo_child(TaskKind::CalculateSyntheticSize, DownloadBehavior::Download);
78 0 : let synthetic_size_worker = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
79 0 : "synthetic size calculation",
80 0 : calculate_synthetic_size_worker(
81 0 : tenant_manager.clone(),
82 0 : conf.synthetic_size_calculation_interval,
83 0 : cancel.clone(),
84 0 : worker_ctx,
85 0 : )
86 0 : .instrument(info_span!("synthetic_size_worker")),
87 : ));
88 :
89 0 : let (collect_metrics, synthetic_size_worker) =
90 0 : futures::future::join(collect_metrics, synthetic_size_worker).await;
91 0 : collect_metrics
92 0 : .expect("unreachable: exit_on_panic_or_error would catch the panic and exit the process");
93 0 : synthetic_size_worker
94 0 : .expect("unreachable: exit_on_panic_or_error would catch the panic and exit the process");
95 0 : }
96 :
97 : /// Main thread that serves metrics collection
98 : #[allow(clippy::too_many_arguments)]
99 0 : async fn collect_metrics(
100 0 : tenant_manager: Arc<TenantManager>,
101 0 : metric_collection_endpoint: &Url,
102 0 : metric_collection_bucket: &Option<RemoteStorageConfig>,
103 0 : metric_collection_interval: Duration,
104 0 : node_id: NodeId,
105 0 : local_disk_storage: Utf8PathBuf,
106 0 : cancel: CancellationToken,
107 0 : ctx: RequestContext,
108 0 : ) -> anyhow::Result<()> {
109 0 : let path: Arc<Utf8PathBuf> = Arc::new(local_disk_storage);
110 0 :
111 0 : let restore_and_reschedule = restore_and_reschedule(&path, metric_collection_interval);
112 :
113 0 : let mut cached_metrics = tokio::select! {
114 : _ = cancel.cancelled() => return Ok(()),
115 : ret = restore_and_reschedule => ret,
116 : };
117 :
118 : // define client here to reuse it for all requests
119 0 : let client = reqwest::ClientBuilder::new()
120 0 : .timeout(DEFAULT_HTTP_REPORTING_TIMEOUT)
121 0 : .build()
122 0 : .expect("Failed to create http client with timeout");
123 :
124 0 : let bucket_client = if let Some(bucket_config) = metric_collection_bucket {
125 0 : match GenericRemoteStorage::from_config(bucket_config).await {
126 0 : Ok(client) => Some(client),
127 0 : Err(e) => {
128 0 : // Non-fatal error: if we were given an invalid config, we will proceed
129 0 : // with sending metrics over the network, but not to S3.
130 0 : tracing::warn!("Invalid configuration for metric_collection_bucket: {e}");
131 0 : None
132 : }
133 : }
134 : } else {
135 0 : None
136 : };
137 :
138 0 : let node_id = node_id.to_string();
139 :
140 0 : loop {
141 0 : let started_at = Instant::now();
142 :
143 : // these are point in time, with variable "now"
144 0 : let metrics = metrics::collect_all_metrics(&tenant_manager, &cached_metrics, &ctx).await;
145 :
146 0 : let metrics = Arc::new(metrics);
147 0 :
148 0 : // why not race cancellation here? because we are one of the last tasks, and if we are
149 0 : // already here, better to try to flush the new values.
150 0 :
151 0 : let flush = async {
152 0 : match disk_cache::flush_metrics_to_disk(&metrics, &path).await {
153 : Ok(()) => {
154 0 : tracing::debug!("flushed metrics to disk");
155 : }
156 0 : Err(e) => {
157 0 : // idea here is that if someone creates a directory as our path, then they
158 0 : // might notice it from the logs before shutdown and remove it
159 0 : tracing::error!("failed to persist metrics to {path:?}: {e:#}");
160 : }
161 : }
162 :
163 0 : if let Some(bucket_client) = &bucket_client {
164 0 : let res =
165 0 : upload::upload_metrics_bucket(bucket_client, &cancel, &node_id, &metrics).await;
166 0 : if let Err(e) = res {
167 0 : tracing::error!("failed to upload to S3: {e:#}");
168 0 : }
169 0 : }
170 0 : };
171 :
172 0 : let upload = async {
173 0 : let res = upload::upload_metrics_http(
174 0 : &client,
175 0 : metric_collection_endpoint,
176 0 : &cancel,
177 0 : &node_id,
178 0 : &metrics,
179 0 : &mut cached_metrics,
180 0 : )
181 0 : .await;
182 0 : if let Err(e) = res {
183 : // serialization error which should never happen
184 0 : tracing::error!("failed to upload via HTTP due to {e:#}");
185 0 : }
186 0 : };
187 0 :
188 0 : // let these run concurrently
189 0 : let (_, _) = tokio::join!(flush, upload);
190 :
191 0 : crate::tenant::tasks::warn_when_period_overrun(
192 0 : started_at.elapsed(),
193 0 : metric_collection_interval,
194 0 : BackgroundLoopKind::ConsumptionMetricsCollectMetrics,
195 0 : );
196 :
197 0 : let res =
198 0 : tokio::time::timeout_at(started_at + metric_collection_interval, cancel.cancelled())
199 0 : .await;
200 0 : if res.is_ok() {
201 0 : return Ok(());
202 0 : }
203 : }
204 0 : }
205 :
206 : /// Called on the first iteration in an attempt to join the metric uploading schedule from previous
207 : /// pageserver session. Pageserver is supposed to upload at intervals regardless of restarts.
208 : ///
209 : /// Cancellation safe.
210 0 : async fn restore_and_reschedule(
211 0 : path: &Arc<Utf8PathBuf>,
212 0 : metric_collection_interval: Duration,
213 0 : ) -> Cache {
214 0 : let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {
215 0 : Ok(found_some) => {
216 0 : // there is no min needed because we write these sequentially in
217 0 : // collect_all_metrics
218 0 : let earlier_metric_at = found_some
219 0 : .iter()
220 0 : .map(|(_, (et, _))| et.recorded_at())
221 0 : .copied()
222 0 : .next();
223 0 :
224 0 : let cached = found_some.into_iter().collect::<Cache>();
225 0 :
226 0 : (cached, earlier_metric_at)
227 : }
228 0 : Err(e) => {
229 0 : use std::io::{Error, ErrorKind};
230 0 :
231 0 : let root = e.root_cause();
232 0 : let maybe_ioerr = root.downcast_ref::<Error>();
233 0 : let is_not_found = maybe_ioerr.is_some_and(|e| e.kind() == ErrorKind::NotFound);
234 0 :
235 0 : if !is_not_found {
236 0 : tracing::info!("failed to read any previous metrics from {path:?}: {e:#}");
237 0 : }
238 :
239 0 : (HashMap::new(), None)
240 : }
241 : };
242 :
243 0 : if let Some(earlier_metric_at) = earlier_metric_at {
244 0 : let earlier_metric_at: SystemTime = earlier_metric_at.into();
245 :
246 0 : let error = reschedule(earlier_metric_at, metric_collection_interval).await;
247 :
248 0 : if let Some(error) = error {
249 0 : if error.as_secs() >= 60 {
250 0 : tracing::info!(
251 0 : error_ms = error.as_millis(),
252 0 : "startup scheduling error due to restart"
253 : )
254 0 : }
255 0 : }
256 0 : }
257 :
258 0 : cached
259 0 : }
260 :
261 0 : async fn reschedule(
262 0 : earlier_metric_at: SystemTime,
263 0 : metric_collection_interval: Duration,
264 0 : ) -> Option<Duration> {
265 0 : let now = SystemTime::now();
266 0 : match now.duration_since(earlier_metric_at) {
267 0 : Ok(from_last_send) if from_last_send < metric_collection_interval => {
268 0 : let sleep_for = metric_collection_interval - from_last_send;
269 0 :
270 0 : let deadline = std::time::Instant::now() + sleep_for;
271 0 :
272 0 : tokio::time::sleep_until(deadline.into()).await;
273 :
274 0 : let now = std::time::Instant::now();
275 0 :
276 0 : // executor threads might be busy, add extra measurements
277 0 : Some(if now < deadline {
278 0 : deadline - now
279 : } else {
280 0 : now - deadline
281 : })
282 : }
283 0 : Ok(from_last_send) => Some(from_last_send.saturating_sub(metric_collection_interval)),
284 : Err(_) => {
285 0 : tracing::warn!(
286 : ?now,
287 : ?earlier_metric_at,
288 0 : "oldest recorded metric is in future; first values will come out with inconsistent timestamps"
289 : );
290 0 : earlier_metric_at.duration_since(now).ok()
291 : }
292 : }
293 0 : }
294 :
295 : /// Caclculate synthetic size for each active tenant
296 0 : async fn calculate_synthetic_size_worker(
297 0 : tenant_manager: Arc<TenantManager>,
298 0 : synthetic_size_calculation_interval: Duration,
299 0 : cancel: CancellationToken,
300 0 : ctx: RequestContext,
301 0 : ) -> anyhow::Result<()> {
302 0 : info!("starting calculate_synthetic_size_worker");
303 : scopeguard::defer! {
304 : info!("calculate_synthetic_size_worker stopped");
305 : };
306 :
307 0 : loop {
308 0 : let started_at = Instant::now();
309 :
310 0 : let tenants = match tenant_manager.list_tenants() {
311 0 : Ok(tenants) => tenants,
312 0 : Err(e) => {
313 0 : warn!("cannot get tenant list: {e:#}");
314 0 : continue;
315 : }
316 : };
317 :
318 0 : for (tenant_shard_id, tenant_state, _gen) in tenants {
319 0 : if tenant_state != TenantState::Active {
320 0 : continue;
321 0 : }
322 0 :
323 0 : if !tenant_shard_id.is_shard_zero() {
324 : // We only send consumption metrics from shard 0, so don't waste time calculating
325 : // synthetic size on other shards.
326 0 : continue;
327 0 : }
328 :
329 0 : let Ok(tenant) = tenant_manager.get_attached_tenant_shard(tenant_shard_id) else {
330 0 : continue;
331 : };
332 :
333 0 : if !tenant.is_active() {
334 0 : continue;
335 0 : }
336 0 :
337 0 : // there is never any reason to exit calculate_synthetic_size_worker following any
338 0 : // return value -- we don't need to care about shutdown because no tenant is found when
339 0 : // pageserver is shut down.
340 0 : calculate_and_log(&tenant, &cancel, &ctx).await;
341 : }
342 :
343 0 : crate::tenant::tasks::warn_when_period_overrun(
344 0 : started_at.elapsed(),
345 0 : synthetic_size_calculation_interval,
346 0 : BackgroundLoopKind::ConsumptionMetricsSyntheticSizeWorker,
347 0 : );
348 :
349 0 : let res = tokio::time::timeout_at(
350 0 : started_at + synthetic_size_calculation_interval,
351 0 : cancel.cancelled(),
352 0 : )
353 0 : .await;
354 0 : if res.is_ok() {
355 0 : return Ok(());
356 0 : }
357 : }
358 0 : }
359 :
360 0 : async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &RequestContext) {
361 0 : const CAUSE: LogicalSizeCalculationCause =
362 0 : LogicalSizeCalculationCause::ConsumptionMetricsSyntheticSize;
363 0 :
364 0 : // TODO should we use concurrent_background_tasks_rate_limit() here, like the other background tasks?
365 0 : // We can put in some prioritization for consumption metrics.
366 0 : // Same for the loop that fetches computed metrics.
367 0 : // By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
368 0 : // which turns out is really handy to understand the system.
369 0 : match tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await {
370 0 : Ok(_) => {}
371 0 : Err(CalculateSyntheticSizeError::Cancelled) => {}
372 0 : Err(e) => {
373 0 : let tenant_shard_id = tenant.tenant_shard_id();
374 0 : error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
375 : }
376 : }
377 0 : }
|