Line data Source code
1 : use std::cmp;
2 : use std::collections::hash_map::Entry;
3 : use std::collections::{HashMap, HashSet};
4 : use std::sync::Arc;
5 :
6 : use tenant_size_model::svg::SvgBranchKind;
7 : use tokio::sync::oneshot::error::RecvError;
8 : use tokio::sync::Semaphore;
9 : use tokio_util::sync::CancellationToken;
10 :
11 : use crate::context::RequestContext;
12 : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
13 :
14 : use super::{GcError, LogicalSizeCalculationCause, Tenant};
15 : use crate::tenant::Timeline;
16 : use utils::id::TimelineId;
17 : use utils::lsn::Lsn;
18 :
19 : use tracing::*;
20 :
21 : use tenant_size_model::{Segment, StorageModel};
22 :
23 : /// Inputs to the actual tenant sizing model
24 : ///
25 : /// Implements [`serde::Serialize`] but is not meant to be part of the public API, instead meant to
26 : /// be a transferrable format between execution environments and developer.
27 : ///
28 : /// This tracks more information than the actual StorageModel that calculation
29 : /// needs. We will convert this into a StorageModel when it's time to perform
30 : /// the calculation.
31 : ///
32 36 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
33 : pub struct ModelInputs {
34 : pub segments: Vec<SegmentMeta>,
35 : pub timeline_inputs: Vec<TimelineInputs>,
36 : }
37 :
38 : /// A [`Segment`], with some extra information for display purposes
39 336 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
40 : pub struct SegmentMeta {
41 : pub segment: Segment,
42 : pub timeline_id: TimelineId,
43 : pub kind: LsnKind,
44 : }
45 :
46 0 : #[derive(thiserror::Error, Debug)]
47 : pub(crate) enum CalculateSyntheticSizeError {
48 : /// Something went wrong internally to the calculation of logical size at a particular branch point
49 : #[error("Failed to calculated logical size on timeline {timeline_id} at {lsn}: {error}")]
50 : LogicalSize {
51 : timeline_id: TimelineId,
52 : lsn: Lsn,
53 : error: CalculateLogicalSizeError,
54 : },
55 :
56 : /// Something went wrong internally when calculating GC parameters at start of size calculation
57 : #[error(transparent)]
58 : GcInfo(GcError),
59 :
60 : /// Totally unexpected errors, like panics joining a task
61 : #[error(transparent)]
62 : Fatal(anyhow::Error),
63 :
64 : /// Tenant shut down while calculating size
65 : #[error("Cancelled")]
66 : Cancelled,
67 : }
68 :
69 : impl From<GcError> for CalculateSyntheticSizeError {
70 0 : fn from(value: GcError) -> Self {
71 0 : match value {
72 : GcError::TenantCancelled | GcError::TimelineCancelled => {
73 0 : CalculateSyntheticSizeError::Cancelled
74 : }
75 0 : other => CalculateSyntheticSizeError::GcInfo(other),
76 : }
77 0 : }
78 : }
79 :
80 : impl SegmentMeta {
81 0 : fn size_needed(&self) -> bool {
82 0 : match self.kind {
83 : LsnKind::BranchStart => {
84 : // If we don't have a later GcCutoff point on this branch, and
85 : // no ancestor, calculate size for the branch start point.
86 0 : self.segment.needed && self.segment.parent.is_none()
87 : }
88 0 : LsnKind::BranchPoint => true,
89 0 : LsnKind::GcCutOff => true,
90 0 : LsnKind::BranchEnd => false,
91 0 : LsnKind::LeasePoint => true,
92 0 : LsnKind::LeaseStart => false,
93 0 : LsnKind::LeaseEnd => false,
94 : }
95 0 : }
96 : }
97 :
98 : #[derive(
99 168 : Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, serde::Serialize, serde::Deserialize,
100 : )]
101 : pub enum LsnKind {
102 : /// A timeline starting here
103 : BranchStart,
104 : /// A child timeline branches off from here
105 : BranchPoint,
106 : /// GC cutoff point
107 : GcCutOff,
108 : /// Last record LSN
109 : BranchEnd,
110 : /// A LSN lease is granted here.
111 : LeasePoint,
112 : /// A lease starts from here.
113 : LeaseStart,
114 : /// Last record LSN for the lease (should have the same LSN as the previous [`LsnKind::LeaseStart`]).
115 : LeaseEnd,
116 : }
117 :
118 : impl From<LsnKind> for SvgBranchKind {
119 0 : fn from(kind: LsnKind) -> Self {
120 0 : match kind {
121 0 : LsnKind::LeasePoint | LsnKind::LeaseStart | LsnKind::LeaseEnd => SvgBranchKind::Lease,
122 0 : _ => SvgBranchKind::Timeline,
123 : }
124 0 : }
125 : }
126 :
127 : /// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
128 : /// part of [`ModelInputs`] from the HTTP api, explaining the inputs.
129 192 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
130 : pub struct TimelineInputs {
131 : pub timeline_id: TimelineId,
132 :
133 : pub ancestor_id: Option<TimelineId>,
134 :
135 : ancestor_lsn: Lsn,
136 : last_record: Lsn,
137 : latest_gc_cutoff: Lsn,
138 :
139 : /// Cutoff point based on GC settings
140 : next_pitr_cutoff: Lsn,
141 :
142 : /// Cutoff point calculated from the user-supplied 'max_retention_period'
143 : retention_param_cutoff: Option<Lsn>,
144 :
145 : /// Lease points on the timeline
146 : lease_points: Vec<Lsn>,
147 : }
148 :
149 : /// Gathers the inputs for the tenant sizing model.
150 : ///
151 : /// Tenant size does not consider the latest state, but only the state until next_pitr_cutoff, which
152 : /// is updated on-demand, during the start of this calculation and separate from the
153 : /// [`TimelineInputs::latest_gc_cutoff`].
154 : ///
155 : /// For timelines in general:
156 : ///
157 : /// ```text
158 : /// 0-----|---------|----|------------| · · · · · |·> lsn
159 : /// initdb_lsn branchpoints* next_pitr_cutoff latest
160 : /// ```
161 0 : pub(super) async fn gather_inputs(
162 0 : tenant: &Tenant,
163 0 : limit: &Arc<Semaphore>,
164 0 : max_retention_period: Option<u64>,
165 0 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
166 0 : cause: LogicalSizeCalculationCause,
167 0 : cancel: &CancellationToken,
168 0 : ctx: &RequestContext,
169 0 : ) -> Result<ModelInputs, CalculateSyntheticSizeError> {
170 0 : // refresh is needed to update [`timeline::GcCutoffs`]
171 0 : tenant.refresh_gc_info(cancel, ctx).await?;
172 :
173 : // Collect information about all the timelines
174 0 : let mut timelines = tenant.list_timelines();
175 0 :
176 0 : if timelines.is_empty() {
177 : // perhaps the tenant has just been created, and as such doesn't have any data yet
178 0 : return Ok(ModelInputs {
179 0 : segments: vec![],
180 0 : timeline_inputs: Vec::new(),
181 0 : });
182 0 : }
183 0 :
184 0 : // Filter out timelines that are not active
185 0 : //
186 0 : // There may be a race when a timeline is dropped,
187 0 : // but it is unlikely to cause any issues. In the worst case,
188 0 : // the calculation will error out.
189 0 : timelines.retain(|t| t.is_active());
190 0 :
191 0 : // Build a map of branch points.
192 0 : let mut branchpoints: HashMap<TimelineId, HashSet<Lsn>> = HashMap::new();
193 0 : for timeline in timelines.iter() {
194 0 : if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
195 0 : branchpoints
196 0 : .entry(ancestor_id)
197 0 : .or_default()
198 0 : .insert(timeline.get_ancestor_lsn());
199 0 : }
200 : }
201 :
202 : // These become the final result.
203 0 : let mut timeline_inputs = Vec::with_capacity(timelines.len());
204 0 : let mut segments: Vec<SegmentMeta> = Vec::new();
205 0 :
206 0 : //
207 0 : // Build Segments representing each timeline. As we do that, also remember
208 0 : // the branchpoints and branch startpoints in 'branchpoint_segments' and
209 0 : // 'branchstart_segments'
210 0 : //
211 0 :
212 0 : // BranchPoint segments of each timeline
213 0 : // (timeline, branchpoint LSN) -> segment_id
214 0 : let mut branchpoint_segments: HashMap<(TimelineId, Lsn), usize> = HashMap::new();
215 :
216 : // timeline, Branchpoint seg id, (ancestor, ancestor LSN)
217 : type BranchStartSegment = (TimelineId, usize, Option<(TimelineId, Lsn)>);
218 0 : let mut branchstart_segments: Vec<BranchStartSegment> = Vec::new();
219 :
220 0 : for timeline in timelines.iter() {
221 0 : let timeline_id = timeline.timeline_id;
222 0 : let last_record_lsn = timeline.get_last_record_lsn();
223 0 : let ancestor_lsn = timeline.get_ancestor_lsn();
224 0 :
225 0 : // there's a race between the update (holding tenant.gc_lock) and this read but it
226 0 : // might not be an issue, because it's not for Timeline::gc
227 0 : let gc_info = timeline.gc_info.read().unwrap();
228 0 :
229 0 : // similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a
230 0 : // new gc run, which we have no control over. however differently from `Timeline::gc`
231 0 : // we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not
232 0 : // actually removing files.
233 0 : //
234 0 : // We only consider [`timeline::GcCutoffs::time`], and not [`timeline::GcCutoffs::space`], because from
235 0 : // a user's perspective they have only requested retention up to the time bound (pitr_cutoff), rather
236 0 : // than our internal space cutoff. This means that if someone drops a database and waits for their
237 0 : // PITR interval, they will see synthetic size decrease, even if we are still storing data inside
238 0 : // the space cutoff.
239 0 : let mut next_pitr_cutoff = gc_info.cutoffs.time;
240 :
241 : // If the caller provided a shorter retention period, use that instead of the GC cutoff.
242 0 : let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {
243 0 : let param_cutoff = Lsn(last_record_lsn.0.saturating_sub(max_retention_period));
244 0 : if next_pitr_cutoff < param_cutoff {
245 0 : next_pitr_cutoff = param_cutoff;
246 0 : }
247 0 : Some(param_cutoff)
248 : } else {
249 0 : None
250 : };
251 :
252 0 : let lease_points = gc_info
253 0 : .leases
254 0 : .keys()
255 0 : .filter(|&&lsn| lsn > ancestor_lsn)
256 0 : .copied()
257 0 : .collect::<Vec<_>>();
258 0 :
259 0 : // next_pitr_cutoff in parent branch are not of interest (right now at least), nor do we
260 0 : // want to query any logical size before initdb_lsn.
261 0 : let branch_start_lsn = cmp::max(ancestor_lsn, timeline.initdb_lsn);
262 0 :
263 0 : // Build "interesting LSNs" on this timeline
264 0 : let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
265 0 : .retain_lsns
266 0 : .iter()
267 0 : .filter(|(lsn, _child_id)| lsn > &ancestor_lsn)
268 0 : .copied()
269 0 : // this assumes there are no other retain_lsns than the branchpoints
270 0 : .map(|(lsn, _child_id)| (lsn, LsnKind::BranchPoint))
271 0 : .collect::<Vec<_>>();
272 0 :
273 0 : lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
274 0 :
275 0 : drop(gc_info);
276 :
277 : // Add branch points we collected earlier, just in case there were any that were
278 : // not present in retain_lsns. We will remove any duplicates below later.
279 0 : if let Some(this_branchpoints) = branchpoints.get(&timeline_id) {
280 0 : lsns.extend(
281 0 : this_branchpoints
282 0 : .iter()
283 0 : .map(|lsn| (*lsn, LsnKind::BranchPoint)),
284 0 : )
285 0 : }
286 :
287 : // Add a point for the PITR cutoff
288 0 : let branch_start_needed = next_pitr_cutoff <= branch_start_lsn;
289 0 : if !branch_start_needed {
290 0 : lsns.push((next_pitr_cutoff, LsnKind::GcCutOff));
291 0 : }
292 :
293 0 : lsns.sort_unstable();
294 0 : lsns.dedup();
295 0 :
296 0 : //
297 0 : // Create Segments for the interesting points.
298 0 : //
299 0 :
300 0 : // Timeline start point
301 0 : let ancestor = timeline
302 0 : .get_ancestor_timeline_id()
303 0 : .map(|ancestor_id| (ancestor_id, ancestor_lsn));
304 0 : branchstart_segments.push((timeline_id, segments.len(), ancestor));
305 0 : segments.push(SegmentMeta {
306 0 : segment: Segment {
307 0 : parent: None, // filled in later
308 0 : lsn: branch_start_lsn.0,
309 0 : size: None, // filled in later
310 0 : needed: branch_start_needed,
311 0 : },
312 0 : timeline_id: timeline.timeline_id,
313 0 : kind: LsnKind::BranchStart,
314 0 : });
315 0 :
316 0 : // GC cutoff point, and any branch points, i.e. points where
317 0 : // other timelines branch off from this timeline.
318 0 : let mut parent = segments.len() - 1;
319 0 : for (lsn, kind) in lsns {
320 0 : if kind == LsnKind::BranchPoint {
321 0 : branchpoint_segments.insert((timeline_id, lsn), segments.len());
322 0 : }
323 :
324 0 : segments.push(SegmentMeta {
325 0 : segment: Segment {
326 0 : parent: Some(parent),
327 0 : lsn: lsn.0,
328 0 : size: None,
329 0 : needed: lsn > next_pitr_cutoff,
330 0 : },
331 0 : timeline_id: timeline.timeline_id,
332 0 : kind,
333 0 : });
334 0 :
335 0 : parent = segments.len() - 1;
336 0 :
337 0 : if kind == LsnKind::LeasePoint {
338 0 : // Needs `LeaseStart` and `LeaseEnd` as well to model lease as a read-only branch that never writes data
339 0 : // (i.e. it's lsn has not advanced from ancestor_lsn), and therefore the three segments have the same LSN
340 0 : // value. Without the other two segments, the calculation code would not count the leased LSN as a point
341 0 : // to be retained.
342 0 : // Did not use `BranchStart` or `BranchEnd` so we can differentiate branches and leases during debug.
343 0 : //
344 0 : // Alt Design: rewrite the entire calculation code to be independent of timeline id. Both leases and
345 0 : // branch points can be given a synthetic id so we can unite them.
346 0 : let mut lease_parent = parent;
347 0 :
348 0 : // Start of a lease.
349 0 : segments.push(SegmentMeta {
350 0 : segment: Segment {
351 0 : parent: Some(lease_parent),
352 0 : lsn: lsn.0,
353 0 : size: None, // Filled in later, if necessary
354 0 : needed: lsn > next_pitr_cutoff, // only needed if the point is within rentention.
355 0 : },
356 0 : timeline_id: timeline.timeline_id,
357 0 : kind: LsnKind::LeaseStart,
358 0 : });
359 0 : lease_parent += 1;
360 0 :
361 0 : // End of the lease.
362 0 : segments.push(SegmentMeta {
363 0 : segment: Segment {
364 0 : parent: Some(lease_parent),
365 0 : lsn: lsn.0,
366 0 : size: None, // Filled in later, if necessary
367 0 : needed: true, // everything at the lease LSN must be readable => is needed
368 0 : },
369 0 : timeline_id: timeline.timeline_id,
370 0 : kind: LsnKind::LeaseEnd,
371 0 : });
372 0 : }
373 : }
374 :
375 : // Current end of the timeline
376 0 : segments.push(SegmentMeta {
377 0 : segment: Segment {
378 0 : parent: Some(parent),
379 0 : lsn: last_record_lsn.0,
380 0 : size: None, // Filled in later, if necessary
381 0 : needed: true,
382 0 : },
383 0 : timeline_id: timeline.timeline_id,
384 0 : kind: LsnKind::BranchEnd,
385 0 : });
386 0 :
387 0 : timeline_inputs.push(TimelineInputs {
388 0 : timeline_id: timeline.timeline_id,
389 0 : ancestor_id: timeline.get_ancestor_timeline_id(),
390 0 : ancestor_lsn,
391 0 : last_record: last_record_lsn,
392 0 : // this is not used above, because it might not have updated recently enough
393 0 : latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(),
394 0 : next_pitr_cutoff,
395 0 : retention_param_cutoff,
396 0 : lease_points,
397 0 : });
398 : }
399 :
400 : // We now have all segments from the timelines in 'segments'. The timelines
401 : // haven't been linked to each other yet, though. Do that.
402 0 : for (_timeline_id, seg_id, ancestor) in branchstart_segments {
403 : // Look up the branch point
404 0 : if let Some(ancestor) = ancestor {
405 0 : let parent_id = *branchpoint_segments.get(&ancestor).unwrap();
406 0 : segments[seg_id].segment.parent = Some(parent_id);
407 0 : }
408 : }
409 :
410 : // We left the 'size' field empty in all of the Segments so far.
411 : // Now find logical sizes for all of the points that might need or benefit from them.
412 0 : fill_logical_sizes(
413 0 : &timelines,
414 0 : &mut segments,
415 0 : limit,
416 0 : logical_size_cache,
417 0 : cause,
418 0 : ctx,
419 0 : )
420 0 : .await?;
421 :
422 0 : if tenant.cancel.is_cancelled() {
423 : // If we're shutting down, return an error rather than a sparse result that might include some
424 : // timelines from before we started shutting down
425 0 : return Err(CalculateSyntheticSizeError::Cancelled);
426 0 : }
427 0 :
428 0 : Ok(ModelInputs {
429 0 : segments,
430 0 : timeline_inputs,
431 0 : })
432 0 : }
433 :
434 : /// Augment 'segments' with logical sizes
435 : ///
436 : /// This will leave segments' sizes as None if the Timeline associated with the segment is deleted concurrently
437 : /// (i.e. we cannot read its logical size at a particular LSN).
438 0 : async fn fill_logical_sizes(
439 0 : timelines: &[Arc<Timeline>],
440 0 : segments: &mut [SegmentMeta],
441 0 : limit: &Arc<Semaphore>,
442 0 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
443 0 : cause: LogicalSizeCalculationCause,
444 0 : ctx: &RequestContext,
445 0 : ) -> Result<(), CalculateSyntheticSizeError> {
446 0 : let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
447 0 : timelines
448 0 : .iter()
449 0 : .map(|timeline| (timeline.timeline_id, Arc::clone(timeline))),
450 0 : );
451 0 :
452 0 : // record the used/inserted cache keys here, to remove extras not to start leaking
453 0 : // after initial run the cache should be quite stable, but live timelines will eventually
454 0 : // require new lsns to be inspected.
455 0 : let mut sizes_needed = HashMap::<(TimelineId, Lsn), Option<u64>>::new();
456 0 :
457 0 : // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
458 0 : // our advantage with `?` error handling.
459 0 : let mut joinset = tokio::task::JoinSet::new();
460 :
461 : // For each point that would benefit from having a logical size available,
462 : // spawn a Task to fetch it, unless we have it cached already.
463 0 : for seg in segments.iter() {
464 0 : if !seg.size_needed() {
465 0 : continue;
466 0 : }
467 0 :
468 0 : let timeline_id = seg.timeline_id;
469 0 : let lsn = Lsn(seg.segment.lsn);
470 :
471 0 : if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) {
472 0 : let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
473 0 : if cached_size.is_none() {
474 0 : let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
475 0 : let parallel_size_calcs = Arc::clone(limit);
476 0 : let ctx = ctx.attached_child();
477 0 : joinset.spawn(
478 0 : calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx)
479 0 : .in_current_span(),
480 0 : );
481 0 : }
482 0 : e.insert(cached_size);
483 0 : }
484 : }
485 :
486 : // Perform the size lookups
487 0 : let mut have_any_error = None;
488 0 : while let Some(res) = joinset.join_next().await {
489 : // each of these come with Result<anyhow::Result<_>, JoinError>
490 : // because of spawn + spawn_blocking
491 0 : match res {
492 0 : Err(join_error) if join_error.is_cancelled() => {
493 0 : unreachable!("we are not cancelling any of the futures, nor should be");
494 : }
495 0 : Err(join_error) => {
496 0 : // cannot really do anything, as this panic is likely a bug
497 0 : error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
498 :
499 0 : have_any_error = Some(CalculateSyntheticSizeError::Fatal(
500 0 : anyhow::anyhow!(join_error)
501 0 : .context("task that calls spawn_ondemand_logical_size_calculation"),
502 0 : ));
503 : }
504 0 : Ok(Err(recv_result_error)) => {
505 0 : // cannot really do anything, as this panic is likely a bug
506 0 : error!("failed to receive logical size query result: {recv_result_error:#}");
507 0 : have_any_error = Some(CalculateSyntheticSizeError::Fatal(
508 0 : anyhow::anyhow!(recv_result_error)
509 0 : .context("Receiving logical size query result"),
510 0 : ));
511 : }
512 0 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
513 0 : if matches!(error, CalculateLogicalSizeError::Cancelled) {
514 : // Skip this: it's okay if one timeline among many is shutting down while we
515 : // calculate inputs for the overall tenant.
516 0 : continue;
517 : } else {
518 0 : warn!(
519 0 : timeline_id=%timeline.timeline_id,
520 0 : "failed to calculate logical size at {lsn}: {error:#}"
521 : );
522 0 : have_any_error = Some(CalculateSyntheticSizeError::LogicalSize {
523 0 : timeline_id: timeline.timeline_id,
524 0 : lsn,
525 0 : error,
526 0 : });
527 : }
528 : }
529 0 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
530 0 : debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
531 :
532 0 : logical_size_cache.insert((timeline.timeline_id, lsn), size);
533 0 : sizes_needed.insert((timeline.timeline_id, lsn), Some(size));
534 : }
535 : }
536 : }
537 :
538 : // prune any keys not needed anymore; we record every used key and added key.
539 0 : logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
540 :
541 0 : if let Some(error) = have_any_error {
542 : // we cannot complete this round, because we are missing data.
543 : // we have however cached all we were able to request calculation on.
544 0 : return Err(error);
545 0 : }
546 :
547 : // Insert the looked up sizes to the Segments
548 0 : for seg in segments.iter_mut() {
549 0 : if !seg.size_needed() {
550 0 : continue;
551 0 : }
552 0 :
553 0 : let timeline_id = seg.timeline_id;
554 0 : let lsn = Lsn(seg.segment.lsn);
555 :
556 0 : if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
557 0 : seg.segment.size = Some(*size);
558 0 : }
559 : }
560 0 : Ok(())
561 0 : }
562 :
563 : impl ModelInputs {
564 12 : pub fn calculate_model(&self) -> tenant_size_model::StorageModel {
565 12 : // Convert SegmentMetas into plain Segments
566 12 : StorageModel {
567 12 : segments: self
568 12 : .segments
569 12 : .iter()
570 84 : .map(|seg| seg.segment.clone())
571 12 : .collect(),
572 12 : }
573 12 : }
574 :
575 : // calculate total project size
576 6 : pub fn calculate(&self) -> u64 {
577 6 : let storage = self.calculate_model();
578 6 : let sizes = storage.calculate();
579 6 : sizes.total_size
580 6 : }
581 : }
582 :
583 : /// Newtype around the tuple that carries the timeline at lsn logical size calculation.
584 : struct TimelineAtLsnSizeResult(
585 : Arc<crate::tenant::Timeline>,
586 : utils::lsn::Lsn,
587 : Result<u64, CalculateLogicalSizeError>,
588 : );
589 :
590 0 : #[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
591 : async fn calculate_logical_size(
592 : limit: Arc<tokio::sync::Semaphore>,
593 : timeline: Arc<crate::tenant::Timeline>,
594 : lsn: utils::lsn::Lsn,
595 : cause: LogicalSizeCalculationCause,
596 : ctx: RequestContext,
597 : ) -> Result<TimelineAtLsnSizeResult, RecvError> {
598 : let _permit = tokio::sync::Semaphore::acquire_owned(limit)
599 : .await
600 : .expect("global semaphore should not had been closed");
601 :
602 : let size_res = timeline
603 : .spawn_ondemand_logical_size_calculation(lsn, cause, ctx)
604 : .instrument(info_span!("spawn_ondemand_logical_size_calculation"))
605 : .await?;
606 : Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
607 : }
608 :
609 : #[test]
610 6 : fn verify_size_for_multiple_branches() {
611 6 : // this is generated from integration test test_tenant_size_with_multiple_branches, but this way
612 6 : // it has the stable lsn's
613 6 : //
614 6 : // The timeline_inputs don't participate in the size calculation, and are here just to explain
615 6 : // the inputs.
616 6 : let doc = r#"
617 6 : {
618 6 : "segments": [
619 6 : {
620 6 : "segment": {
621 6 : "parent": 9,
622 6 : "lsn": 26033560,
623 6 : "size": null,
624 6 : "needed": false
625 6 : },
626 6 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
627 6 : "kind": "BranchStart"
628 6 : },
629 6 : {
630 6 : "segment": {
631 6 : "parent": 0,
632 6 : "lsn": 35720400,
633 6 : "size": 25206784,
634 6 : "needed": false
635 6 : },
636 6 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
637 6 : "kind": "GcCutOff"
638 6 : },
639 6 : {
640 6 : "segment": {
641 6 : "parent": 1,
642 6 : "lsn": 35851472,
643 6 : "size": null,
644 6 : "needed": true
645 6 : },
646 6 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
647 6 : "kind": "BranchEnd"
648 6 : },
649 6 : {
650 6 : "segment": {
651 6 : "parent": 7,
652 6 : "lsn": 24566168,
653 6 : "size": null,
654 6 : "needed": false
655 6 : },
656 6 : "timeline_id": "454626700469f0a9914949b9d018e876",
657 6 : "kind": "BranchStart"
658 6 : },
659 6 : {
660 6 : "segment": {
661 6 : "parent": 3,
662 6 : "lsn": 25261936,
663 6 : "size": 26050560,
664 6 : "needed": false
665 6 : },
666 6 : "timeline_id": "454626700469f0a9914949b9d018e876",
667 6 : "kind": "GcCutOff"
668 6 : },
669 6 : {
670 6 : "segment": {
671 6 : "parent": 4,
672 6 : "lsn": 25393008,
673 6 : "size": null,
674 6 : "needed": true
675 6 : },
676 6 : "timeline_id": "454626700469f0a9914949b9d018e876",
677 6 : "kind": "BranchEnd"
678 6 : },
679 6 : {
680 6 : "segment": {
681 6 : "parent": null,
682 6 : "lsn": 23694408,
683 6 : "size": null,
684 6 : "needed": false
685 6 : },
686 6 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
687 6 : "kind": "BranchStart"
688 6 : },
689 6 : {
690 6 : "segment": {
691 6 : "parent": 6,
692 6 : "lsn": 24566168,
693 6 : "size": 25739264,
694 6 : "needed": false
695 6 : },
696 6 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
697 6 : "kind": "BranchPoint"
698 6 : },
699 6 : {
700 6 : "segment": {
701 6 : "parent": 7,
702 6 : "lsn": 25902488,
703 6 : "size": 26402816,
704 6 : "needed": false
705 6 : },
706 6 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
707 6 : "kind": "GcCutOff"
708 6 : },
709 6 : {
710 6 : "segment": {
711 6 : "parent": 8,
712 6 : "lsn": 26033560,
713 6 : "size": 26468352,
714 6 : "needed": true
715 6 : },
716 6 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
717 6 : "kind": "BranchPoint"
718 6 : },
719 6 : {
720 6 : "segment": {
721 6 : "parent": 9,
722 6 : "lsn": 26033560,
723 6 : "size": null,
724 6 : "needed": true
725 6 : },
726 6 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
727 6 : "kind": "BranchEnd"
728 6 : }
729 6 : ],
730 6 : "timeline_inputs": [
731 6 : {
732 6 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
733 6 : "ancestor_lsn": "0/18D3D98",
734 6 : "last_record": "0/2230CD0",
735 6 : "latest_gc_cutoff": "0/1698C48",
736 6 : "next_pitr_cutoff": "0/2210CD0",
737 6 : "retention_param_cutoff": null,
738 6 : "lease_points": []
739 6 : },
740 6 : {
741 6 : "timeline_id": "454626700469f0a9914949b9d018e876",
742 6 : "ancestor_lsn": "0/176D998",
743 6 : "last_record": "0/1837770",
744 6 : "latest_gc_cutoff": "0/1698C48",
745 6 : "next_pitr_cutoff": "0/1817770",
746 6 : "retention_param_cutoff": null,
747 6 : "lease_points": []
748 6 : },
749 6 : {
750 6 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
751 6 : "ancestor_lsn": "0/0",
752 6 : "last_record": "0/18D3D98",
753 6 : "latest_gc_cutoff": "0/1698C48",
754 6 : "next_pitr_cutoff": "0/18B3D98",
755 6 : "retention_param_cutoff": null,
756 6 : "lease_points": []
757 6 : }
758 6 : ]
759 6 : }
760 6 : "#;
761 6 : let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
762 6 :
763 6 : assert_eq!(inputs.calculate(), 37_851_408);
764 6 : }
765 :
766 : #[test]
767 6 : fn verify_size_for_one_branch() {
768 6 : let doc = r#"
769 6 : {
770 6 : "segments": [
771 6 : {
772 6 : "segment": {
773 6 : "parent": null,
774 6 : "lsn": 0,
775 6 : "size": null,
776 6 : "needed": false
777 6 : },
778 6 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
779 6 : "kind": "BranchStart"
780 6 : },
781 6 : {
782 6 : "segment": {
783 6 : "parent": 0,
784 6 : "lsn": 305547335776,
785 6 : "size": 220054675456,
786 6 : "needed": false
787 6 : },
788 6 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
789 6 : "kind": "GcCutOff"
790 6 : },
791 6 : {
792 6 : "segment": {
793 6 : "parent": 1,
794 6 : "lsn": 305614444640,
795 6 : "size": null,
796 6 : "needed": true
797 6 : },
798 6 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
799 6 : "kind": "BranchEnd"
800 6 : }
801 6 : ],
802 6 : "timeline_inputs": [
803 6 : {
804 6 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
805 6 : "ancestor_lsn": "0/0",
806 6 : "last_record": "47/280A5860",
807 6 : "latest_gc_cutoff": "47/240A5860",
808 6 : "next_pitr_cutoff": "47/240A5860",
809 6 : "retention_param_cutoff": "0/0",
810 6 : "lease_points": []
811 6 : }
812 6 : ]
813 6 : }"#;
814 6 :
815 6 : let model: ModelInputs = serde_json::from_str(doc).unwrap();
816 6 :
817 6 : let res = model.calculate_model().calculate();
818 6 :
819 6 : println!("calculated synthetic size: {}", res.total_size);
820 6 : println!("result: {:?}", serde_json::to_string(&res.segments));
821 :
822 : use utils::lsn::Lsn;
823 6 : let latest_gc_cutoff_lsn: Lsn = "47/240A5860".parse().unwrap();
824 6 : let last_lsn: Lsn = "47/280A5860".parse().unwrap();
825 6 : println!(
826 6 : "latest_gc_cutoff lsn 47/240A5860 is {}, last_lsn lsn 47/280A5860 is {}",
827 6 : u64::from(latest_gc_cutoff_lsn),
828 6 : u64::from(last_lsn)
829 6 : );
830 6 : assert_eq!(res.total_size, 220121784320);
831 6 : }
|