LCOV - code coverage report
Current view: top level - pageserver/src/tenant - size.rs (source / functions) Coverage Total Hit
Test: 98683a8629f0f7f0031d02e04512998d589d76ea.info Lines: 85.5 % 598 511
Test Date: 2025-04-11 16:58:57 Functions: 41.5 % 53 22

            Line data    Source code
       1              : use std::cmp;
       2              : use std::collections::hash_map::Entry;
       3              : use std::collections::{HashMap, HashSet};
       4              : use std::sync::Arc;
       5              : 
       6              : use tenant_size_model::svg::SvgBranchKind;
       7              : use tenant_size_model::{Segment, StorageModel};
       8              : use tokio::sync::Semaphore;
       9              : use tokio::sync::oneshot::error::RecvError;
      10              : use tokio_util::sync::CancellationToken;
      11              : use tracing::*;
      12              : use utils::id::TimelineId;
      13              : use utils::lsn::Lsn;
      14              : 
      15              : use super::{GcError, LogicalSizeCalculationCause, Tenant};
      16              : use crate::context::RequestContext;
      17              : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
      18              : use crate::tenant::{MaybeOffloaded, Timeline};
      19              : 
      20              : /// Inputs to the actual tenant sizing model
      21              : ///
      22              : /// Implements [`serde::Serialize`] but is not meant to be part of the public API, instead meant to
      23              : /// be a transferrable format between execution environments and developer.
      24              : ///
      25              : /// This tracks more information than the actual StorageModel that calculation
      26              : /// needs. We will convert this into a StorageModel when it's time to perform
      27              : /// the calculation.
      28              : ///
      29           16 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
      30              : pub struct ModelInputs {
      31              :     pub segments: Vec<SegmentMeta>,
      32              :     pub timeline_inputs: Vec<TimelineInputs>,
      33              : }
      34              : 
      35              : /// A [`Segment`], with some extra information for display purposes
      36          168 : #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
      37              : pub struct SegmentMeta {
      38              :     pub segment: Segment,
      39              :     pub timeline_id: TimelineId,
      40              :     pub kind: LsnKind,
      41              : }
      42              : 
      43              : #[derive(thiserror::Error, Debug)]
      44              : pub(crate) enum CalculateSyntheticSizeError {
      45              :     /// Something went wrong internally to the calculation of logical size at a particular branch point
      46              :     #[error("Failed to calculated logical size on timeline {timeline_id} at {lsn}: {error}")]
      47              :     LogicalSize {
      48              :         timeline_id: TimelineId,
      49              :         lsn: Lsn,
      50              :         error: CalculateLogicalSizeError,
      51              :     },
      52              : 
      53              :     /// Something went wrong internally when calculating GC parameters at start of size calculation
      54              :     #[error(transparent)]
      55              :     GcInfo(GcError),
      56              : 
      57              :     /// Totally unexpected errors, like panics joining a task
      58              :     #[error(transparent)]
      59              :     Fatal(anyhow::Error),
      60              : 
      61              :     /// Tenant shut down while calculating size
      62              :     #[error("Cancelled")]
      63              :     Cancelled,
      64              : }
      65              : 
      66              : impl From<GcError> for CalculateSyntheticSizeError {
      67            0 :     fn from(value: GcError) -> Self {
      68            0 :         match value {
      69              :             GcError::TenantCancelled | GcError::TimelineCancelled => {
      70            0 :                 CalculateSyntheticSizeError::Cancelled
      71              :             }
      72            0 :             other => CalculateSyntheticSizeError::GcInfo(other),
      73              :         }
      74            0 :     }
      75              : }
      76              : 
      77              : impl SegmentMeta {
      78          232 :     fn size_needed(&self) -> bool {
      79          232 :         match self.kind {
      80              :             LsnKind::BranchStart => {
      81              :                 // If we don't have a later GcCutoff point on this branch, and
      82              :                 // no ancestor, calculate size for the branch start point.
      83           64 :                 self.segment.needed && self.segment.parent.is_none()
      84              :             }
      85           48 :             LsnKind::BranchPoint => true,
      86           56 :             LsnKind::GcCutOff => true,
      87           64 :             LsnKind::BranchEnd => false,
      88            0 :             LsnKind::LeasePoint => true,
      89            0 :             LsnKind::LeaseStart => false,
      90            0 :             LsnKind::LeaseEnd => false,
      91              :         }
      92          232 :     }
      93              : }
      94              : 
      95              : #[derive(
      96           56 :     Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, serde::Serialize, serde::Deserialize,
      97              : )]
      98              : pub enum LsnKind {
      99              :     /// A timeline starting here
     100              :     BranchStart,
     101              :     /// A child timeline branches off from here
     102              :     BranchPoint,
     103              :     /// GC cutoff point
     104              :     GcCutOff,
     105              :     /// Last record LSN
     106              :     BranchEnd,
     107              :     /// A LSN lease is granted here.
     108              :     LeasePoint,
     109              :     /// A lease starts from here.
     110              :     LeaseStart,
     111              :     /// Last record LSN for the lease (should have the same LSN as the previous [`LsnKind::LeaseStart`]).
     112              :     LeaseEnd,
     113              : }
     114              : 
     115              : impl From<LsnKind> for SvgBranchKind {
     116            0 :     fn from(kind: LsnKind) -> Self {
     117            0 :         match kind {
     118            0 :             LsnKind::LeasePoint | LsnKind::LeaseStart | LsnKind::LeaseEnd => SvgBranchKind::Lease,
     119            0 :             _ => SvgBranchKind::Timeline,
     120              :         }
     121            0 :     }
     122              : }
     123              : 
     124              : /// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
     125              : /// part of [`ModelInputs`] from the HTTP api, explaining the inputs.
     126          112 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
     127              : pub struct TimelineInputs {
     128              :     pub timeline_id: TimelineId,
     129              : 
     130              :     pub ancestor_id: Option<TimelineId>,
     131              : 
     132              :     ancestor_lsn: Lsn,
     133              :     last_record: Lsn,
     134              :     latest_gc_cutoff: Lsn,
     135              : 
     136              :     /// Cutoff point based on GC settings
     137              :     next_pitr_cutoff: Lsn,
     138              : 
     139              :     /// Cutoff point calculated from the user-supplied 'max_retention_period'
     140              :     retention_param_cutoff: Option<Lsn>,
     141              : 
     142              :     /// Lease points on the timeline
     143              :     lease_points: Vec<Lsn>,
     144              : }
     145              : 
     146              : /// Gathers the inputs for the tenant sizing model.
     147              : ///
     148              : /// Tenant size does not consider the latest state, but only the state until next_pitr_cutoff, which
     149              : /// is updated on-demand, during the start of this calculation and separate from the
     150              : /// [`TimelineInputs::latest_gc_cutoff`].
     151              : ///
     152              : /// For timelines in general:
     153              : ///
     154              : /// ```text
     155              : /// 0-----|---------|----|------------| · · · · · |·> lsn
     156              : ///   initdb_lsn  branchpoints*  next_pitr_cutoff  latest
     157              : /// ```
     158            8 : pub(super) async fn gather_inputs(
     159            8 :     tenant: &Tenant,
     160            8 :     limit: &Arc<Semaphore>,
     161            8 :     max_retention_period: Option<u64>,
     162            8 :     logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
     163            8 :     cause: LogicalSizeCalculationCause,
     164            8 :     cancel: &CancellationToken,
     165            8 :     ctx: &RequestContext,
     166            8 : ) -> Result<ModelInputs, CalculateSyntheticSizeError> {
     167            8 :     // refresh is needed to update [`timeline::GcCutoffs`]
     168            8 :     tenant.refresh_gc_info(cancel, ctx).await?;
     169              : 
     170              :     // Collect information about all the timelines
     171            8 :     let mut timelines = tenant.list_timelines();
     172            8 : 
     173            8 :     if timelines.is_empty() {
     174              :         // perhaps the tenant has just been created, and as such doesn't have any data yet
     175            0 :         return Ok(ModelInputs {
     176            0 :             segments: vec![],
     177            0 :             timeline_inputs: Vec::new(),
     178            0 :         });
     179            8 :     }
     180            8 : 
     181            8 :     // Filter out timelines that are not active
     182            8 :     //
     183            8 :     // There may be a race when a timeline is dropped,
     184            8 :     // but it is unlikely to cause any issues. In the worst case,
     185            8 :     // the calculation will error out.
     186           32 :     timelines.retain(|t| t.is_active());
     187            8 :     // Also filter out archived timelines.
     188           32 :     timelines.retain(|t| t.is_archived() != Some(true));
     189            8 : 
     190            8 :     // Build a map of branch points.
     191            8 :     let mut branchpoints: HashMap<TimelineId, HashSet<Lsn>> = HashMap::new();
     192           32 :     for timeline in timelines.iter() {
     193           32 :         if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
     194           24 :             branchpoints
     195           24 :                 .entry(ancestor_id)
     196           24 :                 .or_default()
     197           24 :                 .insert(timeline.get_ancestor_lsn());
     198           24 :         }
     199              :     }
     200              : 
     201              :     // These become the final result.
     202            8 :     let mut timeline_inputs = Vec::with_capacity(timelines.len());
     203            8 :     let mut segments: Vec<SegmentMeta> = Vec::new();
     204            8 : 
     205            8 :     //
     206            8 :     // Build Segments representing each timeline. As we do that, also remember
     207            8 :     // the branchpoints and branch startpoints in 'branchpoint_segments' and
     208            8 :     // 'branchstart_segments'
     209            8 :     //
     210            8 : 
     211            8 :     // BranchPoint segments of each timeline
     212            8 :     // (timeline, branchpoint LSN) -> segment_id
     213            8 :     let mut branchpoint_segments: HashMap<(TimelineId, Lsn), usize> = HashMap::new();
     214              : 
     215              :     // timeline, Branchpoint seg id, (ancestor, ancestor LSN)
     216              :     type BranchStartSegment = (TimelineId, usize, Option<(TimelineId, Lsn)>);
     217            8 :     let mut branchstart_segments: Vec<BranchStartSegment> = Vec::new();
     218              : 
     219           32 :     for timeline in timelines.iter() {
     220           32 :         let timeline_id = timeline.timeline_id;
     221           32 :         let last_record_lsn = timeline.get_last_record_lsn();
     222           32 :         let ancestor_lsn = timeline.get_ancestor_lsn();
     223           32 : 
     224           32 :         // there's a race between the update (holding tenant.gc_lock) and this read but it
     225           32 :         // might not be an issue, because it's not for Timeline::gc
     226           32 :         let gc_info = timeline.gc_info.read().unwrap();
     227           32 : 
     228           32 :         // similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a
     229           32 :         // new gc run, which we have no control over. however differently from `Timeline::gc`
     230           32 :         // we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not
     231           32 :         // actually removing files.
     232           32 :         //
     233           32 :         // We only consider [`timeline::GcCutoffs::time`], and not [`timeline::GcCutoffs::space`], because from
     234           32 :         // a user's perspective they have only requested retention up to the time bound (pitr_cutoff), rather
     235           32 :         // than our internal space cutoff.  This means that if someone drops a database and waits for their
     236           32 :         // PITR interval, they will see synthetic size decrease, even if we are still storing data inside
     237           32 :         // the space cutoff.
     238           32 :         let mut next_pitr_cutoff = gc_info.cutoffs.time;
     239              : 
     240              :         // If the caller provided a shorter retention period, use that instead of the GC cutoff.
     241           32 :         let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {
     242            0 :             let param_cutoff = Lsn(last_record_lsn.0.saturating_sub(max_retention_period));
     243            0 :             if next_pitr_cutoff < param_cutoff {
     244            0 :                 next_pitr_cutoff = param_cutoff;
     245            0 :             }
     246            0 :             Some(param_cutoff)
     247              :         } else {
     248           32 :             None
     249              :         };
     250              : 
     251           32 :         let branch_is_invisible = timeline.is_invisible() == Some(true);
     252           32 : 
     253           32 :         let lease_points = gc_info
     254           32 :             .leases
     255           32 :             .keys()
     256           32 :             .filter(|&&lsn| lsn > ancestor_lsn)
     257           32 :             .copied()
     258           32 :             .collect::<Vec<_>>();
     259           32 : 
     260           32 :         // next_pitr_cutoff in parent branch are not of interest (right now at least), nor do we
     261           32 :         // want to query any logical size before initdb_lsn.
     262           32 :         let branch_start_lsn = cmp::max(ancestor_lsn, timeline.initdb_lsn);
     263           32 : 
     264           32 :         // Build "interesting LSNs" on this timeline
     265           32 :         let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
     266           32 :             .retain_lsns
     267           32 :             .iter()
     268           32 :             .filter(|(lsn, _child_id, is_offloaded)| {
     269           24 :                 lsn > &ancestor_lsn && *is_offloaded == MaybeOffloaded::No
     270           32 :             })
     271           32 :             .copied()
     272           32 :             // this assumes there are no other retain_lsns than the branchpoints
     273           32 :             .map(|(lsn, _child_id, _is_offloaded)| (lsn, LsnKind::BranchPoint))
     274           32 :             .collect::<Vec<_>>();
     275           32 : 
     276           32 :         if !branch_is_invisible {
     277           28 :             // Do not count lease points for invisible branches.
     278           28 :             lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
     279           28 :         }
     280              : 
     281           32 :         drop(gc_info);
     282              : 
     283              :         // Add branch points we collected earlier, just in case there were any that were
     284              :         // not present in retain_lsns. We will remove any duplicates below later.
     285           32 :         if let Some(this_branchpoints) = branchpoints.get(&timeline_id) {
     286            8 :             lsns.extend(
     287            8 :                 this_branchpoints
     288            8 :                     .iter()
     289           24 :                     .map(|lsn| (*lsn, LsnKind::BranchPoint)),
     290            8 :             )
     291           24 :         }
     292              : 
     293              :         // Add a point for the PITR cutoff
     294           32 :         let branch_start_needed = next_pitr_cutoff <= branch_start_lsn;
     295           32 :         if !branch_start_needed && !branch_is_invisible {
     296           28 :             // Only add the GcCutOff point when the timeline is visible; otherwise, do not compute the size for the LSN
     297           28 :             // range from the last branch point to the latest data.
     298           28 :             lsns.push((next_pitr_cutoff, LsnKind::GcCutOff));
     299           28 :         }
     300              : 
     301           32 :         lsns.sort_unstable();
     302           32 :         lsns.dedup();
     303           32 : 
     304           32 :         //
     305           32 :         // Create Segments for the interesting points.
     306           32 :         //
     307           32 : 
     308           32 :         // Timeline start point
     309           32 :         let ancestor = timeline
     310           32 :             .get_ancestor_timeline_id()
     311           32 :             .map(|ancestor_id| (ancestor_id, ancestor_lsn));
     312           32 :         branchstart_segments.push((timeline_id, segments.len(), ancestor));
     313           32 :         segments.push(SegmentMeta {
     314           32 :             segment: Segment {
     315           32 :                 parent: None, // filled in later
     316           32 :                 lsn: branch_start_lsn.0,
     317           32 :                 size: None, // filled in later
     318           32 :                 needed: branch_start_needed,
     319           32 :             },
     320           32 :             timeline_id: timeline.timeline_id,
     321           32 :             kind: LsnKind::BranchStart,
     322           32 :         });
     323           32 : 
     324           32 :         // GC cutoff point, and any branch points, i.e. points where
     325           32 :         // other timelines branch off from this timeline.
     326           32 :         let mut parent = segments.len() - 1;
     327           84 :         for (lsn, kind) in lsns {
     328           52 :             if kind == LsnKind::BranchPoint {
     329           24 :                 branchpoint_segments.insert((timeline_id, lsn), segments.len());
     330           28 :             }
     331              : 
     332           52 :             segments.push(SegmentMeta {
     333           52 :                 segment: Segment {
     334           52 :                     parent: Some(parent),
     335           52 :                     lsn: lsn.0,
     336           52 :                     size: None,
     337           52 :                     needed: lsn > next_pitr_cutoff,
     338           52 :                 },
     339           52 :                 timeline_id: timeline.timeline_id,
     340           52 :                 kind,
     341           52 :             });
     342           52 : 
     343           52 :             parent = segments.len() - 1;
     344           52 : 
     345           52 :             if kind == LsnKind::LeasePoint {
     346            0 :                 // Needs `LeaseStart` and `LeaseEnd` as well to model lease as a read-only branch that never writes data
     347            0 :                 // (i.e. it's lsn has not advanced from ancestor_lsn), and therefore the three segments have the same LSN
     348            0 :                 // value. Without the other two segments, the calculation code would not count the leased LSN as a point
     349            0 :                 // to be retained.
     350            0 :                 // Did not use `BranchStart` or `BranchEnd` so we can differentiate branches and leases during debug.
     351            0 :                 //
     352            0 :                 // Alt Design: rewrite the entire calculation code to be independent of timeline id. Both leases and
     353            0 :                 // branch points can be given a synthetic id so we can unite them.
     354            0 :                 let mut lease_parent = parent;
     355            0 : 
     356            0 :                 // Start of a lease.
     357            0 :                 segments.push(SegmentMeta {
     358            0 :                     segment: Segment {
     359            0 :                         parent: Some(lease_parent),
     360            0 :                         lsn: lsn.0,
     361            0 :                         size: None,                     // Filled in later, if necessary
     362            0 :                         needed: lsn > next_pitr_cutoff, // only needed if the point is within rentention.
     363            0 :                     },
     364            0 :                     timeline_id: timeline.timeline_id,
     365            0 :                     kind: LsnKind::LeaseStart,
     366            0 :                 });
     367            0 :                 lease_parent += 1;
     368            0 : 
     369            0 :                 // End of the lease.
     370            0 :                 segments.push(SegmentMeta {
     371            0 :                     segment: Segment {
     372            0 :                         parent: Some(lease_parent),
     373            0 :                         lsn: lsn.0,
     374            0 :                         size: None,   // Filled in later, if necessary
     375            0 :                         needed: true, // everything at the lease LSN must be readable => is needed
     376            0 :                     },
     377            0 :                     timeline_id: timeline.timeline_id,
     378            0 :                     kind: LsnKind::LeaseEnd,
     379            0 :                 });
     380           52 :             }
     381              :         }
     382              : 
     383           32 :         let branch_end_lsn = if branch_is_invisible {
     384              :             // If the branch is invisible, the branch end is the last requested LSN (likely a branch cutoff point).
     385            4 :             segments.last().unwrap().segment.lsn
     386              :         } else {
     387              :             // Otherwise, the branch end is the last record LSN.
     388           28 :             last_record_lsn.0
     389              :         };
     390              : 
     391              :         // Current end of the timeline
     392           32 :         segments.push(SegmentMeta {
     393           32 :             segment: Segment {
     394           32 :                 parent: Some(parent),
     395           32 :                 lsn: branch_end_lsn,
     396           32 :                 size: None, // Filled in later, if necessary
     397           32 :                 needed: true,
     398           32 :             },
     399           32 :             timeline_id: timeline.timeline_id,
     400           32 :             kind: LsnKind::BranchEnd,
     401           32 :         });
     402           32 : 
     403           32 :         timeline_inputs.push(TimelineInputs {
     404           32 :             timeline_id: timeline.timeline_id,
     405           32 :             ancestor_id: timeline.get_ancestor_timeline_id(),
     406           32 :             ancestor_lsn,
     407           32 :             last_record: last_record_lsn,
     408           32 :             // this is not used above, because it might not have updated recently enough
     409           32 :             latest_gc_cutoff: *timeline.get_applied_gc_cutoff_lsn(),
     410           32 :             next_pitr_cutoff,
     411           32 :             retention_param_cutoff,
     412           32 :             lease_points,
     413           32 :         });
     414              :     }
     415              : 
     416              :     // We now have all segments from the timelines in 'segments'. The timelines
     417              :     // haven't been linked to each other yet, though. Do that.
     418           40 :     for (_timeline_id, seg_id, ancestor) in branchstart_segments {
     419              :         // Look up the branch point
     420           32 :         if let Some(ancestor) = ancestor {
     421           24 :             let parent_id = *branchpoint_segments.get(&ancestor).unwrap();
     422           24 :             segments[seg_id].segment.parent = Some(parent_id);
     423           24 :         }
     424              :     }
     425              : 
     426              :     // We left the 'size' field empty in all of the Segments so far.
     427              :     // Now find logical sizes for all of the points that might need or benefit from them.
     428            8 :     fill_logical_sizes(
     429            8 :         &timelines,
     430            8 :         &mut segments,
     431            8 :         limit,
     432            8 :         logical_size_cache,
     433            8 :         cause,
     434            8 :         ctx,
     435            8 :     )
     436            8 :     .await?;
     437              : 
     438            8 :     if tenant.cancel.is_cancelled() {
     439              :         // If we're shutting down, return an error rather than a sparse result that might include some
     440              :         // timelines from before we started shutting down
     441            0 :         return Err(CalculateSyntheticSizeError::Cancelled);
     442            8 :     }
     443            8 : 
     444            8 :     Ok(ModelInputs {
     445            8 :         segments,
     446            8 :         timeline_inputs,
     447            8 :     })
     448            8 : }
     449              : 
     450              : /// Augment 'segments' with logical sizes
     451              : ///
     452              : /// This will leave segments' sizes as None if the Timeline associated with the segment is deleted concurrently
     453              : /// (i.e. we cannot read its logical size at a particular LSN).
     454            8 : async fn fill_logical_sizes(
     455            8 :     timelines: &[Arc<Timeline>],
     456            8 :     segments: &mut [SegmentMeta],
     457            8 :     limit: &Arc<Semaphore>,
     458            8 :     logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
     459            8 :     cause: LogicalSizeCalculationCause,
     460            8 :     ctx: &RequestContext,
     461            8 : ) -> Result<(), CalculateSyntheticSizeError> {
     462            8 :     let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
     463            8 :         timelines
     464            8 :             .iter()
     465           32 :             .map(|timeline| (timeline.timeline_id, Arc::clone(timeline))),
     466            8 :     );
     467            8 : 
     468            8 :     // record the used/inserted cache keys here, to remove extras not to start leaking
     469            8 :     // after initial run the cache should be quite stable, but live timelines will eventually
     470            8 :     // require new lsns to be inspected.
     471            8 :     let mut sizes_needed = HashMap::<(TimelineId, Lsn), Option<u64>>::new();
     472            8 : 
     473            8 :     // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
     474            8 :     // our advantage with `?` error handling.
     475            8 :     let mut joinset = tokio::task::JoinSet::new();
     476              : 
     477              :     // For each point that would benefit from having a logical size available,
     478              :     // spawn a Task to fetch it, unless we have it cached already.
     479          116 :     for seg in segments.iter() {
     480          116 :         if !seg.size_needed() {
     481           64 :             continue;
     482           52 :         }
     483           52 : 
     484           52 :         let timeline_id = seg.timeline_id;
     485           52 :         let lsn = Lsn(seg.segment.lsn);
     486              : 
     487           52 :         if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) {
     488           52 :             let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
     489           52 :             if cached_size.is_none() {
     490           28 :                 let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
     491           28 :                 let parallel_size_calcs = Arc::clone(limit);
     492           28 :                 let ctx = ctx.attached_child().with_scope_timeline(&timeline);
     493           28 :                 joinset.spawn(
     494           28 :                     calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx)
     495           28 :                         .in_current_span(),
     496           28 :                 );
     497           28 :             }
     498           52 :             e.insert(cached_size);
     499            0 :         }
     500              :     }
     501              : 
     502              :     // Perform the size lookups
     503            8 :     let mut have_any_error = None;
     504           36 :     while let Some(res) = joinset.join_next().await {
     505              :         // each of these come with Result<anyhow::Result<_>, JoinError>
     506              :         // because of spawn + spawn_blocking
     507           28 :         match res {
     508            0 :             Err(join_error) if join_error.is_cancelled() => {
     509            0 :                 unreachable!("we are not cancelling any of the futures, nor should be");
     510              :             }
     511            0 :             Err(join_error) => {
     512            0 :                 // cannot really do anything, as this panic is likely a bug
     513            0 :                 error!(
     514            0 :                     "task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}"
     515              :                 );
     516              : 
     517            0 :                 have_any_error = Some(CalculateSyntheticSizeError::Fatal(
     518            0 :                     anyhow::anyhow!(join_error)
     519            0 :                         .context("task that calls spawn_ondemand_logical_size_calculation"),
     520            0 :                 ));
     521              :             }
     522            0 :             Ok(Err(recv_result_error)) => {
     523            0 :                 // cannot really do anything, as this panic is likely a bug
     524            0 :                 error!("failed to receive logical size query result: {recv_result_error:#}");
     525            0 :                 have_any_error = Some(CalculateSyntheticSizeError::Fatal(
     526            0 :                     anyhow::anyhow!(recv_result_error)
     527            0 :                         .context("Receiving logical size query result"),
     528            0 :                 ));
     529              :             }
     530            0 :             Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
     531            0 :                 if matches!(error, CalculateLogicalSizeError::Cancelled) {
     532              :                     // Skip this: it's okay if one timeline among many is shutting down while we
     533              :                     // calculate inputs for the overall tenant.
     534            0 :                     continue;
     535              :                 } else {
     536            0 :                     warn!(
     537            0 :                         timeline_id=%timeline.timeline_id,
     538            0 :                         "failed to calculate logical size at {lsn}: {error:#}"
     539              :                     );
     540            0 :                     have_any_error = Some(CalculateSyntheticSizeError::LogicalSize {
     541            0 :                         timeline_id: timeline.timeline_id,
     542            0 :                         lsn,
     543            0 :                         error,
     544            0 :                     });
     545              :                 }
     546              :             }
     547           28 :             Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
     548           28 :                 debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
     549              : 
     550           28 :                 logical_size_cache.insert((timeline.timeline_id, lsn), size);
     551           28 :                 sizes_needed.insert((timeline.timeline_id, lsn), Some(size));
     552              :             }
     553              :         }
     554              :     }
     555              : 
     556              :     // prune any keys not needed anymore; we record every used key and added key.
     557           56 :     logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
     558              : 
     559            8 :     if let Some(error) = have_any_error {
     560              :         // we cannot complete this round, because we are missing data.
     561              :         // we have however cached all we were able to request calculation on.
     562            0 :         return Err(error);
     563            8 :     }
     564              : 
     565              :     // Insert the looked up sizes to the Segments
     566          116 :     for seg in segments.iter_mut() {
     567          116 :         if !seg.size_needed() {
     568           64 :             continue;
     569           52 :         }
     570           52 : 
     571           52 :         let timeline_id = seg.timeline_id;
     572           52 :         let lsn = Lsn(seg.segment.lsn);
     573              : 
     574           52 :         if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
     575           52 :             seg.segment.size = Some(*size);
     576           52 :         }
     577              :     }
     578            8 :     Ok(())
     579            8 : }
     580              : 
     581              : impl ModelInputs {
     582            8 :     pub fn calculate_model(&self) -> tenant_size_model::StorageModel {
     583            8 :         // Convert SegmentMetas into plain Segments
     584            8 :         StorageModel {
     585            8 :             segments: self
     586            8 :                 .segments
     587            8 :                 .iter()
     588           56 :                 .map(|seg| seg.segment.clone())
     589            8 :                 .collect(),
     590            8 :         }
     591            8 :     }
     592              : 
     593              :     // calculate total project size
     594            4 :     pub fn calculate(&self) -> u64 {
     595            4 :         let storage = self.calculate_model();
     596            4 :         let sizes = storage.calculate();
     597            4 :         sizes.total_size
     598            4 :     }
     599              : }
     600              : 
     601              : /// Newtype around the tuple that carries the timeline at lsn logical size calculation.
     602              : struct TimelineAtLsnSizeResult(
     603              :     Arc<crate::tenant::Timeline>,
     604              :     utils::lsn::Lsn,
     605              :     Result<u64, CalculateLogicalSizeError>,
     606              : );
     607              : 
     608              : #[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
     609              : async fn calculate_logical_size(
     610              :     limit: Arc<tokio::sync::Semaphore>,
     611              :     timeline: Arc<crate::tenant::Timeline>,
     612              :     lsn: utils::lsn::Lsn,
     613              :     cause: LogicalSizeCalculationCause,
     614              :     ctx: RequestContext,
     615              : ) -> Result<TimelineAtLsnSizeResult, RecvError> {
     616              :     let _permit = tokio::sync::Semaphore::acquire_owned(limit)
     617              :         .await
     618              :         .expect("global semaphore should not had been closed");
     619              : 
     620              :     let size_res = timeline
     621              :         .spawn_ondemand_logical_size_calculation(lsn, cause, ctx)
     622              :         .instrument(info_span!("spawn_ondemand_logical_size_calculation"))
     623              :         .await?;
     624              :     Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
     625              : }
     626              : 
     627              : #[cfg(test)]
     628              : #[test]
     629            4 : fn verify_size_for_multiple_branches() {
     630            4 :     // this is generated from integration test test_tenant_size_with_multiple_branches, but this way
     631            4 :     // it has the stable lsn's
     632            4 :     //
     633            4 :     // The timeline_inputs don't participate in the size calculation, and are here just to explain
     634            4 :     // the inputs.
     635            4 :     let doc = r#"
     636            4 : {
     637            4 :   "segments": [
     638            4 :     {
     639            4 :       "segment": {
     640            4 :         "parent": 9,
     641            4 :         "lsn": 26033560,
     642            4 :         "size": null,
     643            4 :         "needed": false
     644            4 :       },
     645            4 :       "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
     646            4 :       "kind": "BranchStart"
     647            4 :     },
     648            4 :     {
     649            4 :       "segment": {
     650            4 :         "parent": 0,
     651            4 :         "lsn": 35720400,
     652            4 :         "size": 25206784,
     653            4 :         "needed": false
     654            4 :       },
     655            4 :       "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
     656            4 :       "kind": "GcCutOff"
     657            4 :     },
     658            4 :     {
     659            4 :       "segment": {
     660            4 :         "parent": 1,
     661            4 :         "lsn": 35851472,
     662            4 :         "size": null,
     663            4 :         "needed": true
     664            4 :       },
     665            4 :       "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
     666            4 :       "kind": "BranchEnd"
     667            4 :     },
     668            4 :     {
     669            4 :       "segment": {
     670            4 :         "parent": 7,
     671            4 :         "lsn": 24566168,
     672            4 :         "size": null,
     673            4 :         "needed": false
     674            4 :       },
     675            4 :       "timeline_id": "454626700469f0a9914949b9d018e876",
     676            4 :       "kind": "BranchStart"
     677            4 :     },
     678            4 :     {
     679            4 :       "segment": {
     680            4 :         "parent": 3,
     681            4 :         "lsn": 25261936,
     682            4 :         "size": 26050560,
     683            4 :         "needed": false
     684            4 :       },
     685            4 :       "timeline_id": "454626700469f0a9914949b9d018e876",
     686            4 :       "kind": "GcCutOff"
     687            4 :     },
     688            4 :     {
     689            4 :       "segment": {
     690            4 :         "parent": 4,
     691            4 :         "lsn": 25393008,
     692            4 :         "size": null,
     693            4 :         "needed": true
     694            4 :       },
     695            4 :       "timeline_id": "454626700469f0a9914949b9d018e876",
     696            4 :       "kind": "BranchEnd"
     697            4 :     },
     698            4 :     {
     699            4 :       "segment": {
     700            4 :         "parent": null,
     701            4 :         "lsn": 23694408,
     702            4 :         "size": null,
     703            4 :         "needed": false
     704            4 :       },
     705            4 :       "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
     706            4 :       "kind": "BranchStart"
     707            4 :     },
     708            4 :     {
     709            4 :       "segment": {
     710            4 :         "parent": 6,
     711            4 :         "lsn": 24566168,
     712            4 :         "size": 25739264,
     713            4 :         "needed": false
     714            4 :       },
     715            4 :       "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
     716            4 :       "kind": "BranchPoint"
     717            4 :     },
     718            4 :     {
     719            4 :       "segment": {
     720            4 :         "parent": 7,
     721            4 :         "lsn": 25902488,
     722            4 :         "size": 26402816,
     723            4 :         "needed": false
     724            4 :       },
     725            4 :       "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
     726            4 :       "kind": "GcCutOff"
     727            4 :     },
     728            4 :     {
     729            4 :       "segment": {
     730            4 :         "parent": 8,
     731            4 :         "lsn": 26033560,
     732            4 :         "size": 26468352,
     733            4 :         "needed": true
     734            4 :       },
     735            4 :       "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
     736            4 :       "kind": "BranchPoint"
     737            4 :     },
     738            4 :     {
     739            4 :       "segment": {
     740            4 :         "parent": 9,
     741            4 :         "lsn": 26033560,
     742            4 :         "size": null,
     743            4 :         "needed": true
     744            4 :       },
     745            4 :       "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
     746            4 :       "kind": "BranchEnd"
     747            4 :     }
     748            4 :   ],
     749            4 :   "timeline_inputs": [
     750            4 :     {
     751            4 :       "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
     752            4 :       "ancestor_lsn": "0/18D3D98",
     753            4 :       "last_record": "0/2230CD0",
     754            4 :       "latest_gc_cutoff": "0/1698C48",
     755            4 :       "next_pitr_cutoff": "0/2210CD0",
     756            4 :       "retention_param_cutoff": null,
     757            4 :       "lease_points": []
     758            4 :     },
     759            4 :     {
     760            4 :       "timeline_id": "454626700469f0a9914949b9d018e876",
     761            4 :       "ancestor_lsn": "0/176D998",
     762            4 :       "last_record": "0/1837770",
     763            4 :       "latest_gc_cutoff": "0/1698C48",
     764            4 :       "next_pitr_cutoff": "0/1817770",
     765            4 :       "retention_param_cutoff": null,
     766            4 :       "lease_points": []
     767            4 :     },
     768            4 :     {
     769            4 :       "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
     770            4 :       "ancestor_lsn": "0/0",
     771            4 :       "last_record": "0/18D3D98",
     772            4 :       "latest_gc_cutoff": "0/1698C48",
     773            4 :       "next_pitr_cutoff": "0/18B3D98",
     774            4 :       "retention_param_cutoff": null,
     775            4 :       "lease_points": []
     776            4 :     }
     777            4 :   ]
     778            4 : }
     779            4 : "#;
     780            4 :     let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
     781            4 : 
     782            4 :     assert_eq!(inputs.calculate(), 37_851_408);
     783            4 : }
     784              : 
     785              : #[cfg(test)]
     786              : #[test]
     787            4 : fn verify_size_for_one_branch() {
     788            4 :     let doc = r#"
     789            4 : {
     790            4 :   "segments": [
     791            4 :     {
     792            4 :       "segment": {
     793            4 :         "parent": null,
     794            4 :         "lsn": 0,
     795            4 :         "size": null,
     796            4 :         "needed": false
     797            4 :       },
     798            4 :       "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
     799            4 :       "kind": "BranchStart"
     800            4 :     },
     801            4 :     {
     802            4 :       "segment": {
     803            4 :         "parent": 0,
     804            4 :         "lsn": 305547335776,
     805            4 :         "size": 220054675456,
     806            4 :         "needed": false
     807            4 :       },
     808            4 :       "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
     809            4 :       "kind": "GcCutOff"
     810            4 :     },
     811            4 :     {
     812            4 :       "segment": {
     813            4 :         "parent": 1,
     814            4 :         "lsn": 305614444640,
     815            4 :         "size": null,
     816            4 :         "needed": true
     817            4 :       },
     818            4 :       "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
     819            4 :       "kind": "BranchEnd"
     820            4 :     }
     821            4 :   ],
     822            4 :   "timeline_inputs": [
     823            4 :     {
     824            4 :       "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
     825            4 :       "ancestor_lsn": "0/0",
     826            4 :       "last_record": "47/280A5860",
     827            4 :       "latest_gc_cutoff": "47/240A5860",
     828            4 :       "next_pitr_cutoff": "47/240A5860",
     829            4 :       "retention_param_cutoff": "0/0",
     830            4 :       "lease_points": []
     831            4 :     }
     832            4 :   ]
     833            4 : }"#;
     834            4 : 
     835            4 :     let model: ModelInputs = serde_json::from_str(doc).unwrap();
     836            4 : 
     837            4 :     let res = model.calculate_model().calculate();
     838            4 : 
     839            4 :     println!("calculated synthetic size: {}", res.total_size);
     840            4 :     println!("result: {:?}", serde_json::to_string(&res.segments));
     841              : 
     842              :     use utils::lsn::Lsn;
     843            4 :     let latest_gc_cutoff_lsn: Lsn = "47/240A5860".parse().unwrap();
     844            4 :     let last_lsn: Lsn = "47/280A5860".parse().unwrap();
     845            4 :     println!(
     846            4 :         "latest_gc_cutoff lsn 47/240A5860 is {}, last_lsn lsn 47/280A5860 is {}",
     847            4 :         u64::from(latest_gc_cutoff_lsn),
     848            4 :         u64::from(last_lsn)
     849            4 :     );
     850            4 :     assert_eq!(res.total_size, 220121784320);
     851            4 : }
        

Generated by: LCOV version 2.1-beta