Line data Source code
1 : use std::cmp;
2 : use std::collections::hash_map::Entry;
3 : use std::collections::{HashMap, HashSet};
4 : use std::sync::Arc;
5 :
6 : use tenant_size_model::svg::SvgBranchKind;
7 : use tenant_size_model::{Segment, StorageModel};
8 : use tokio::sync::Semaphore;
9 : use tokio::sync::oneshot::error::RecvError;
10 : use tokio_util::sync::CancellationToken;
11 : use tracing::*;
12 : use utils::id::TimelineId;
13 : use utils::lsn::Lsn;
14 :
15 : use super::{GcError, LogicalSizeCalculationCause, Tenant};
16 : use crate::context::RequestContext;
17 : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
18 : use crate::tenant::{MaybeOffloaded, Timeline};
19 :
20 : /// Inputs to the actual tenant sizing model
21 : ///
22 : /// Implements [`serde::Serialize`] but is not meant to be part of the public API, instead meant to
23 : /// be a transferrable format between execution environments and developer.
24 : ///
25 : /// This tracks more information than the actual StorageModel that calculation
26 : /// needs. We will convert this into a StorageModel when it's time to perform
27 : /// the calculation.
28 : ///
29 16 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
30 : pub struct ModelInputs {
31 : pub segments: Vec<SegmentMeta>,
32 : pub timeline_inputs: Vec<TimelineInputs>,
33 : }
34 :
35 : /// A [`Segment`], with some extra information for display purposes
36 168 : #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
37 : pub struct SegmentMeta {
38 : pub segment: Segment,
39 : pub timeline_id: TimelineId,
40 : pub kind: LsnKind,
41 : }
42 :
43 : #[derive(thiserror::Error, Debug)]
44 : pub(crate) enum CalculateSyntheticSizeError {
45 : /// Something went wrong internally to the calculation of logical size at a particular branch point
46 : #[error("Failed to calculated logical size on timeline {timeline_id} at {lsn}: {error}")]
47 : LogicalSize {
48 : timeline_id: TimelineId,
49 : lsn: Lsn,
50 : error: CalculateLogicalSizeError,
51 : },
52 :
53 : /// Something went wrong internally when calculating GC parameters at start of size calculation
54 : #[error(transparent)]
55 : GcInfo(GcError),
56 :
57 : /// Totally unexpected errors, like panics joining a task
58 : #[error(transparent)]
59 : Fatal(anyhow::Error),
60 :
61 : /// Tenant shut down while calculating size
62 : #[error("Cancelled")]
63 : Cancelled,
64 : }
65 :
66 : impl From<GcError> for CalculateSyntheticSizeError {
67 0 : fn from(value: GcError) -> Self {
68 0 : match value {
69 : GcError::TenantCancelled | GcError::TimelineCancelled => {
70 0 : CalculateSyntheticSizeError::Cancelled
71 : }
72 0 : other => CalculateSyntheticSizeError::GcInfo(other),
73 : }
74 0 : }
75 : }
76 :
77 : impl SegmentMeta {
78 232 : fn size_needed(&self) -> bool {
79 232 : match self.kind {
80 : LsnKind::BranchStart => {
81 : // If we don't have a later GcCutoff point on this branch, and
82 : // no ancestor, calculate size for the branch start point.
83 64 : self.segment.needed && self.segment.parent.is_none()
84 : }
85 48 : LsnKind::BranchPoint => true,
86 56 : LsnKind::GcCutOff => true,
87 64 : LsnKind::BranchEnd => false,
88 0 : LsnKind::LeasePoint => true,
89 0 : LsnKind::LeaseStart => false,
90 0 : LsnKind::LeaseEnd => false,
91 : }
92 232 : }
93 : }
94 :
95 : #[derive(
96 56 : Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, serde::Serialize, serde::Deserialize,
97 : )]
98 : pub enum LsnKind {
99 : /// A timeline starting here
100 : BranchStart,
101 : /// A child timeline branches off from here
102 : BranchPoint,
103 : /// GC cutoff point
104 : GcCutOff,
105 : /// Last record LSN
106 : BranchEnd,
107 : /// A LSN lease is granted here.
108 : LeasePoint,
109 : /// A lease starts from here.
110 : LeaseStart,
111 : /// Last record LSN for the lease (should have the same LSN as the previous [`LsnKind::LeaseStart`]).
112 : LeaseEnd,
113 : }
114 :
115 : impl From<LsnKind> for SvgBranchKind {
116 0 : fn from(kind: LsnKind) -> Self {
117 0 : match kind {
118 0 : LsnKind::LeasePoint | LsnKind::LeaseStart | LsnKind::LeaseEnd => SvgBranchKind::Lease,
119 0 : _ => SvgBranchKind::Timeline,
120 : }
121 0 : }
122 : }
123 :
124 : /// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
125 : /// part of [`ModelInputs`] from the HTTP api, explaining the inputs.
126 112 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
127 : pub struct TimelineInputs {
128 : pub timeline_id: TimelineId,
129 :
130 : pub ancestor_id: Option<TimelineId>,
131 :
132 : ancestor_lsn: Lsn,
133 : last_record: Lsn,
134 : latest_gc_cutoff: Lsn,
135 :
136 : /// Cutoff point based on GC settings
137 : next_pitr_cutoff: Lsn,
138 :
139 : /// Cutoff point calculated from the user-supplied 'max_retention_period'
140 : retention_param_cutoff: Option<Lsn>,
141 :
142 : /// Lease points on the timeline
143 : lease_points: Vec<Lsn>,
144 : }
145 :
146 : /// Gathers the inputs for the tenant sizing model.
147 : ///
148 : /// Tenant size does not consider the latest state, but only the state until next_pitr_cutoff, which
149 : /// is updated on-demand, during the start of this calculation and separate from the
150 : /// [`TimelineInputs::latest_gc_cutoff`].
151 : ///
152 : /// For timelines in general:
153 : ///
154 : /// ```text
155 : /// 0-----|---------|----|------------| · · · · · |·> lsn
156 : /// initdb_lsn branchpoints* next_pitr_cutoff latest
157 : /// ```
158 8 : pub(super) async fn gather_inputs(
159 8 : tenant: &Tenant,
160 8 : limit: &Arc<Semaphore>,
161 8 : max_retention_period: Option<u64>,
162 8 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
163 8 : cause: LogicalSizeCalculationCause,
164 8 : cancel: &CancellationToken,
165 8 : ctx: &RequestContext,
166 8 : ) -> Result<ModelInputs, CalculateSyntheticSizeError> {
167 8 : // refresh is needed to update [`timeline::GcCutoffs`]
168 8 : tenant.refresh_gc_info(cancel, ctx).await?;
169 :
170 : // Collect information about all the timelines
171 8 : let mut timelines = tenant.list_timelines();
172 8 :
173 8 : if timelines.is_empty() {
174 : // perhaps the tenant has just been created, and as such doesn't have any data yet
175 0 : return Ok(ModelInputs {
176 0 : segments: vec![],
177 0 : timeline_inputs: Vec::new(),
178 0 : });
179 8 : }
180 8 :
181 8 : // Filter out timelines that are not active
182 8 : //
183 8 : // There may be a race when a timeline is dropped,
184 8 : // but it is unlikely to cause any issues. In the worst case,
185 8 : // the calculation will error out.
186 32 : timelines.retain(|t| t.is_active());
187 8 : // Also filter out archived timelines.
188 32 : timelines.retain(|t| t.is_archived() != Some(true));
189 8 :
190 8 : // Build a map of branch points.
191 8 : let mut branchpoints: HashMap<TimelineId, HashSet<Lsn>> = HashMap::new();
192 32 : for timeline in timelines.iter() {
193 32 : if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
194 24 : branchpoints
195 24 : .entry(ancestor_id)
196 24 : .or_default()
197 24 : .insert(timeline.get_ancestor_lsn());
198 24 : }
199 : }
200 :
201 : // These become the final result.
202 8 : let mut timeline_inputs = Vec::with_capacity(timelines.len());
203 8 : let mut segments: Vec<SegmentMeta> = Vec::new();
204 8 :
205 8 : //
206 8 : // Build Segments representing each timeline. As we do that, also remember
207 8 : // the branchpoints and branch startpoints in 'branchpoint_segments' and
208 8 : // 'branchstart_segments'
209 8 : //
210 8 :
211 8 : // BranchPoint segments of each timeline
212 8 : // (timeline, branchpoint LSN) -> segment_id
213 8 : let mut branchpoint_segments: HashMap<(TimelineId, Lsn), usize> = HashMap::new();
214 :
215 : // timeline, Branchpoint seg id, (ancestor, ancestor LSN)
216 : type BranchStartSegment = (TimelineId, usize, Option<(TimelineId, Lsn)>);
217 8 : let mut branchstart_segments: Vec<BranchStartSegment> = Vec::new();
218 :
219 32 : for timeline in timelines.iter() {
220 32 : let timeline_id = timeline.timeline_id;
221 32 : let last_record_lsn = timeline.get_last_record_lsn();
222 32 : let ancestor_lsn = timeline.get_ancestor_lsn();
223 32 :
224 32 : // there's a race between the update (holding tenant.gc_lock) and this read but it
225 32 : // might not be an issue, because it's not for Timeline::gc
226 32 : let gc_info = timeline.gc_info.read().unwrap();
227 32 :
228 32 : // similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a
229 32 : // new gc run, which we have no control over. however differently from `Timeline::gc`
230 32 : // we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not
231 32 : // actually removing files.
232 32 : //
233 32 : // We only consider [`timeline::GcCutoffs::time`], and not [`timeline::GcCutoffs::space`], because from
234 32 : // a user's perspective they have only requested retention up to the time bound (pitr_cutoff), rather
235 32 : // than our internal space cutoff. This means that if someone drops a database and waits for their
236 32 : // PITR interval, they will see synthetic size decrease, even if we are still storing data inside
237 32 : // the space cutoff.
238 32 : let mut next_pitr_cutoff = gc_info.cutoffs.time;
239 :
240 : // If the caller provided a shorter retention period, use that instead of the GC cutoff.
241 32 : let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {
242 0 : let param_cutoff = Lsn(last_record_lsn.0.saturating_sub(max_retention_period));
243 0 : if next_pitr_cutoff < param_cutoff {
244 0 : next_pitr_cutoff = param_cutoff;
245 0 : }
246 0 : Some(param_cutoff)
247 : } else {
248 32 : None
249 : };
250 :
251 32 : let branch_is_invisible = timeline.is_invisible() == Some(true);
252 32 :
253 32 : let lease_points = gc_info
254 32 : .leases
255 32 : .keys()
256 32 : .filter(|&&lsn| lsn > ancestor_lsn)
257 32 : .copied()
258 32 : .collect::<Vec<_>>();
259 32 :
260 32 : // next_pitr_cutoff in parent branch are not of interest (right now at least), nor do we
261 32 : // want to query any logical size before initdb_lsn.
262 32 : let branch_start_lsn = cmp::max(ancestor_lsn, timeline.initdb_lsn);
263 32 :
264 32 : // Build "interesting LSNs" on this timeline
265 32 : let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
266 32 : .retain_lsns
267 32 : .iter()
268 32 : .filter(|(lsn, _child_id, is_offloaded)| {
269 24 : lsn > &ancestor_lsn && *is_offloaded == MaybeOffloaded::No
270 32 : })
271 32 : .copied()
272 32 : // this assumes there are no other retain_lsns than the branchpoints
273 32 : .map(|(lsn, _child_id, _is_offloaded)| (lsn, LsnKind::BranchPoint))
274 32 : .collect::<Vec<_>>();
275 32 :
276 32 : if !branch_is_invisible {
277 28 : // Do not count lease points for invisible branches.
278 28 : lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
279 28 : }
280 :
281 32 : drop(gc_info);
282 :
283 : // Add branch points we collected earlier, just in case there were any that were
284 : // not present in retain_lsns. We will remove any duplicates below later.
285 32 : if let Some(this_branchpoints) = branchpoints.get(&timeline_id) {
286 8 : lsns.extend(
287 8 : this_branchpoints
288 8 : .iter()
289 24 : .map(|lsn| (*lsn, LsnKind::BranchPoint)),
290 8 : )
291 24 : }
292 :
293 : // Add a point for the PITR cutoff
294 32 : let branch_start_needed = next_pitr_cutoff <= branch_start_lsn;
295 32 : if !branch_start_needed && !branch_is_invisible {
296 28 : // Only add the GcCutOff point when the timeline is visible; otherwise, do not compute the size for the LSN
297 28 : // range from the last branch point to the latest data.
298 28 : lsns.push((next_pitr_cutoff, LsnKind::GcCutOff));
299 28 : }
300 :
301 32 : lsns.sort_unstable();
302 32 : lsns.dedup();
303 32 :
304 32 : //
305 32 : // Create Segments for the interesting points.
306 32 : //
307 32 :
308 32 : // Timeline start point
309 32 : let ancestor = timeline
310 32 : .get_ancestor_timeline_id()
311 32 : .map(|ancestor_id| (ancestor_id, ancestor_lsn));
312 32 : branchstart_segments.push((timeline_id, segments.len(), ancestor));
313 32 : segments.push(SegmentMeta {
314 32 : segment: Segment {
315 32 : parent: None, // filled in later
316 32 : lsn: branch_start_lsn.0,
317 32 : size: None, // filled in later
318 32 : needed: branch_start_needed,
319 32 : },
320 32 : timeline_id: timeline.timeline_id,
321 32 : kind: LsnKind::BranchStart,
322 32 : });
323 32 :
324 32 : // GC cutoff point, and any branch points, i.e. points where
325 32 : // other timelines branch off from this timeline.
326 32 : let mut parent = segments.len() - 1;
327 84 : for (lsn, kind) in lsns {
328 52 : if kind == LsnKind::BranchPoint {
329 24 : branchpoint_segments.insert((timeline_id, lsn), segments.len());
330 28 : }
331 :
332 52 : segments.push(SegmentMeta {
333 52 : segment: Segment {
334 52 : parent: Some(parent),
335 52 : lsn: lsn.0,
336 52 : size: None,
337 52 : needed: lsn > next_pitr_cutoff,
338 52 : },
339 52 : timeline_id: timeline.timeline_id,
340 52 : kind,
341 52 : });
342 52 :
343 52 : parent = segments.len() - 1;
344 52 :
345 52 : if kind == LsnKind::LeasePoint {
346 0 : // Needs `LeaseStart` and `LeaseEnd` as well to model lease as a read-only branch that never writes data
347 0 : // (i.e. it's lsn has not advanced from ancestor_lsn), and therefore the three segments have the same LSN
348 0 : // value. Without the other two segments, the calculation code would not count the leased LSN as a point
349 0 : // to be retained.
350 0 : // Did not use `BranchStart` or `BranchEnd` so we can differentiate branches and leases during debug.
351 0 : //
352 0 : // Alt Design: rewrite the entire calculation code to be independent of timeline id. Both leases and
353 0 : // branch points can be given a synthetic id so we can unite them.
354 0 : let mut lease_parent = parent;
355 0 :
356 0 : // Start of a lease.
357 0 : segments.push(SegmentMeta {
358 0 : segment: Segment {
359 0 : parent: Some(lease_parent),
360 0 : lsn: lsn.0,
361 0 : size: None, // Filled in later, if necessary
362 0 : needed: lsn > next_pitr_cutoff, // only needed if the point is within rentention.
363 0 : },
364 0 : timeline_id: timeline.timeline_id,
365 0 : kind: LsnKind::LeaseStart,
366 0 : });
367 0 : lease_parent += 1;
368 0 :
369 0 : // End of the lease.
370 0 : segments.push(SegmentMeta {
371 0 : segment: Segment {
372 0 : parent: Some(lease_parent),
373 0 : lsn: lsn.0,
374 0 : size: None, // Filled in later, if necessary
375 0 : needed: true, // everything at the lease LSN must be readable => is needed
376 0 : },
377 0 : timeline_id: timeline.timeline_id,
378 0 : kind: LsnKind::LeaseEnd,
379 0 : });
380 52 : }
381 : }
382 :
383 32 : let branch_end_lsn = if branch_is_invisible {
384 : // If the branch is invisible, the branch end is the last requested LSN (likely a branch cutoff point).
385 4 : segments.last().unwrap().segment.lsn
386 : } else {
387 : // Otherwise, the branch end is the last record LSN.
388 28 : last_record_lsn.0
389 : };
390 :
391 : // Current end of the timeline
392 32 : segments.push(SegmentMeta {
393 32 : segment: Segment {
394 32 : parent: Some(parent),
395 32 : lsn: branch_end_lsn,
396 32 : size: None, // Filled in later, if necessary
397 32 : needed: true,
398 32 : },
399 32 : timeline_id: timeline.timeline_id,
400 32 : kind: LsnKind::BranchEnd,
401 32 : });
402 32 :
403 32 : timeline_inputs.push(TimelineInputs {
404 32 : timeline_id: timeline.timeline_id,
405 32 : ancestor_id: timeline.get_ancestor_timeline_id(),
406 32 : ancestor_lsn,
407 32 : last_record: last_record_lsn,
408 32 : // this is not used above, because it might not have updated recently enough
409 32 : latest_gc_cutoff: *timeline.get_applied_gc_cutoff_lsn(),
410 32 : next_pitr_cutoff,
411 32 : retention_param_cutoff,
412 32 : lease_points,
413 32 : });
414 : }
415 :
416 : // We now have all segments from the timelines in 'segments'. The timelines
417 : // haven't been linked to each other yet, though. Do that.
418 40 : for (_timeline_id, seg_id, ancestor) in branchstart_segments {
419 : // Look up the branch point
420 32 : if let Some(ancestor) = ancestor {
421 24 : let parent_id = *branchpoint_segments.get(&ancestor).unwrap();
422 24 : segments[seg_id].segment.parent = Some(parent_id);
423 24 : }
424 : }
425 :
426 : // We left the 'size' field empty in all of the Segments so far.
427 : // Now find logical sizes for all of the points that might need or benefit from them.
428 8 : fill_logical_sizes(
429 8 : &timelines,
430 8 : &mut segments,
431 8 : limit,
432 8 : logical_size_cache,
433 8 : cause,
434 8 : ctx,
435 8 : )
436 8 : .await?;
437 :
438 8 : if tenant.cancel.is_cancelled() {
439 : // If we're shutting down, return an error rather than a sparse result that might include some
440 : // timelines from before we started shutting down
441 0 : return Err(CalculateSyntheticSizeError::Cancelled);
442 8 : }
443 8 :
444 8 : Ok(ModelInputs {
445 8 : segments,
446 8 : timeline_inputs,
447 8 : })
448 8 : }
449 :
450 : /// Augment 'segments' with logical sizes
451 : ///
452 : /// This will leave segments' sizes as None if the Timeline associated with the segment is deleted concurrently
453 : /// (i.e. we cannot read its logical size at a particular LSN).
454 8 : async fn fill_logical_sizes(
455 8 : timelines: &[Arc<Timeline>],
456 8 : segments: &mut [SegmentMeta],
457 8 : limit: &Arc<Semaphore>,
458 8 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
459 8 : cause: LogicalSizeCalculationCause,
460 8 : ctx: &RequestContext,
461 8 : ) -> Result<(), CalculateSyntheticSizeError> {
462 8 : let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
463 8 : timelines
464 8 : .iter()
465 32 : .map(|timeline| (timeline.timeline_id, Arc::clone(timeline))),
466 8 : );
467 8 :
468 8 : // record the used/inserted cache keys here, to remove extras not to start leaking
469 8 : // after initial run the cache should be quite stable, but live timelines will eventually
470 8 : // require new lsns to be inspected.
471 8 : let mut sizes_needed = HashMap::<(TimelineId, Lsn), Option<u64>>::new();
472 8 :
473 8 : // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
474 8 : // our advantage with `?` error handling.
475 8 : let mut joinset = tokio::task::JoinSet::new();
476 :
477 : // For each point that would benefit from having a logical size available,
478 : // spawn a Task to fetch it, unless we have it cached already.
479 116 : for seg in segments.iter() {
480 116 : if !seg.size_needed() {
481 64 : continue;
482 52 : }
483 52 :
484 52 : let timeline_id = seg.timeline_id;
485 52 : let lsn = Lsn(seg.segment.lsn);
486 :
487 52 : if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) {
488 52 : let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
489 52 : if cached_size.is_none() {
490 28 : let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
491 28 : let parallel_size_calcs = Arc::clone(limit);
492 28 : let ctx = ctx.attached_child().with_scope_timeline(&timeline);
493 28 : joinset.spawn(
494 28 : calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx)
495 28 : .in_current_span(),
496 28 : );
497 28 : }
498 52 : e.insert(cached_size);
499 0 : }
500 : }
501 :
502 : // Perform the size lookups
503 8 : let mut have_any_error = None;
504 36 : while let Some(res) = joinset.join_next().await {
505 : // each of these come with Result<anyhow::Result<_>, JoinError>
506 : // because of spawn + spawn_blocking
507 28 : match res {
508 0 : Err(join_error) if join_error.is_cancelled() => {
509 0 : unreachable!("we are not cancelling any of the futures, nor should be");
510 : }
511 0 : Err(join_error) => {
512 0 : // cannot really do anything, as this panic is likely a bug
513 0 : error!(
514 0 : "task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}"
515 : );
516 :
517 0 : have_any_error = Some(CalculateSyntheticSizeError::Fatal(
518 0 : anyhow::anyhow!(join_error)
519 0 : .context("task that calls spawn_ondemand_logical_size_calculation"),
520 0 : ));
521 : }
522 0 : Ok(Err(recv_result_error)) => {
523 0 : // cannot really do anything, as this panic is likely a bug
524 0 : error!("failed to receive logical size query result: {recv_result_error:#}");
525 0 : have_any_error = Some(CalculateSyntheticSizeError::Fatal(
526 0 : anyhow::anyhow!(recv_result_error)
527 0 : .context("Receiving logical size query result"),
528 0 : ));
529 : }
530 0 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
531 0 : if matches!(error, CalculateLogicalSizeError::Cancelled) {
532 : // Skip this: it's okay if one timeline among many is shutting down while we
533 : // calculate inputs for the overall tenant.
534 0 : continue;
535 : } else {
536 0 : warn!(
537 0 : timeline_id=%timeline.timeline_id,
538 0 : "failed to calculate logical size at {lsn}: {error:#}"
539 : );
540 0 : have_any_error = Some(CalculateSyntheticSizeError::LogicalSize {
541 0 : timeline_id: timeline.timeline_id,
542 0 : lsn,
543 0 : error,
544 0 : });
545 : }
546 : }
547 28 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
548 28 : debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
549 :
550 28 : logical_size_cache.insert((timeline.timeline_id, lsn), size);
551 28 : sizes_needed.insert((timeline.timeline_id, lsn), Some(size));
552 : }
553 : }
554 : }
555 :
556 : // prune any keys not needed anymore; we record every used key and added key.
557 56 : logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
558 :
559 8 : if let Some(error) = have_any_error {
560 : // we cannot complete this round, because we are missing data.
561 : // we have however cached all we were able to request calculation on.
562 0 : return Err(error);
563 8 : }
564 :
565 : // Insert the looked up sizes to the Segments
566 116 : for seg in segments.iter_mut() {
567 116 : if !seg.size_needed() {
568 64 : continue;
569 52 : }
570 52 :
571 52 : let timeline_id = seg.timeline_id;
572 52 : let lsn = Lsn(seg.segment.lsn);
573 :
574 52 : if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
575 52 : seg.segment.size = Some(*size);
576 52 : }
577 : }
578 8 : Ok(())
579 8 : }
580 :
581 : impl ModelInputs {
582 8 : pub fn calculate_model(&self) -> tenant_size_model::StorageModel {
583 8 : // Convert SegmentMetas into plain Segments
584 8 : StorageModel {
585 8 : segments: self
586 8 : .segments
587 8 : .iter()
588 56 : .map(|seg| seg.segment.clone())
589 8 : .collect(),
590 8 : }
591 8 : }
592 :
593 : // calculate total project size
594 4 : pub fn calculate(&self) -> u64 {
595 4 : let storage = self.calculate_model();
596 4 : let sizes = storage.calculate();
597 4 : sizes.total_size
598 4 : }
599 : }
600 :
601 : /// Newtype around the tuple that carries the timeline at lsn logical size calculation.
602 : struct TimelineAtLsnSizeResult(
603 : Arc<crate::tenant::Timeline>,
604 : utils::lsn::Lsn,
605 : Result<u64, CalculateLogicalSizeError>,
606 : );
607 :
608 : #[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
609 : async fn calculate_logical_size(
610 : limit: Arc<tokio::sync::Semaphore>,
611 : timeline: Arc<crate::tenant::Timeline>,
612 : lsn: utils::lsn::Lsn,
613 : cause: LogicalSizeCalculationCause,
614 : ctx: RequestContext,
615 : ) -> Result<TimelineAtLsnSizeResult, RecvError> {
616 : let _permit = tokio::sync::Semaphore::acquire_owned(limit)
617 : .await
618 : .expect("global semaphore should not had been closed");
619 :
620 : let size_res = timeline
621 : .spawn_ondemand_logical_size_calculation(lsn, cause, ctx)
622 : .instrument(info_span!("spawn_ondemand_logical_size_calculation"))
623 : .await?;
624 : Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
625 : }
626 :
627 : #[cfg(test)]
628 : #[test]
629 4 : fn verify_size_for_multiple_branches() {
630 4 : // this is generated from integration test test_tenant_size_with_multiple_branches, but this way
631 4 : // it has the stable lsn's
632 4 : //
633 4 : // The timeline_inputs don't participate in the size calculation, and are here just to explain
634 4 : // the inputs.
635 4 : let doc = r#"
636 4 : {
637 4 : "segments": [
638 4 : {
639 4 : "segment": {
640 4 : "parent": 9,
641 4 : "lsn": 26033560,
642 4 : "size": null,
643 4 : "needed": false
644 4 : },
645 4 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
646 4 : "kind": "BranchStart"
647 4 : },
648 4 : {
649 4 : "segment": {
650 4 : "parent": 0,
651 4 : "lsn": 35720400,
652 4 : "size": 25206784,
653 4 : "needed": false
654 4 : },
655 4 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
656 4 : "kind": "GcCutOff"
657 4 : },
658 4 : {
659 4 : "segment": {
660 4 : "parent": 1,
661 4 : "lsn": 35851472,
662 4 : "size": null,
663 4 : "needed": true
664 4 : },
665 4 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
666 4 : "kind": "BranchEnd"
667 4 : },
668 4 : {
669 4 : "segment": {
670 4 : "parent": 7,
671 4 : "lsn": 24566168,
672 4 : "size": null,
673 4 : "needed": false
674 4 : },
675 4 : "timeline_id": "454626700469f0a9914949b9d018e876",
676 4 : "kind": "BranchStart"
677 4 : },
678 4 : {
679 4 : "segment": {
680 4 : "parent": 3,
681 4 : "lsn": 25261936,
682 4 : "size": 26050560,
683 4 : "needed": false
684 4 : },
685 4 : "timeline_id": "454626700469f0a9914949b9d018e876",
686 4 : "kind": "GcCutOff"
687 4 : },
688 4 : {
689 4 : "segment": {
690 4 : "parent": 4,
691 4 : "lsn": 25393008,
692 4 : "size": null,
693 4 : "needed": true
694 4 : },
695 4 : "timeline_id": "454626700469f0a9914949b9d018e876",
696 4 : "kind": "BranchEnd"
697 4 : },
698 4 : {
699 4 : "segment": {
700 4 : "parent": null,
701 4 : "lsn": 23694408,
702 4 : "size": null,
703 4 : "needed": false
704 4 : },
705 4 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
706 4 : "kind": "BranchStart"
707 4 : },
708 4 : {
709 4 : "segment": {
710 4 : "parent": 6,
711 4 : "lsn": 24566168,
712 4 : "size": 25739264,
713 4 : "needed": false
714 4 : },
715 4 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
716 4 : "kind": "BranchPoint"
717 4 : },
718 4 : {
719 4 : "segment": {
720 4 : "parent": 7,
721 4 : "lsn": 25902488,
722 4 : "size": 26402816,
723 4 : "needed": false
724 4 : },
725 4 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
726 4 : "kind": "GcCutOff"
727 4 : },
728 4 : {
729 4 : "segment": {
730 4 : "parent": 8,
731 4 : "lsn": 26033560,
732 4 : "size": 26468352,
733 4 : "needed": true
734 4 : },
735 4 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
736 4 : "kind": "BranchPoint"
737 4 : },
738 4 : {
739 4 : "segment": {
740 4 : "parent": 9,
741 4 : "lsn": 26033560,
742 4 : "size": null,
743 4 : "needed": true
744 4 : },
745 4 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
746 4 : "kind": "BranchEnd"
747 4 : }
748 4 : ],
749 4 : "timeline_inputs": [
750 4 : {
751 4 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
752 4 : "ancestor_lsn": "0/18D3D98",
753 4 : "last_record": "0/2230CD0",
754 4 : "latest_gc_cutoff": "0/1698C48",
755 4 : "next_pitr_cutoff": "0/2210CD0",
756 4 : "retention_param_cutoff": null,
757 4 : "lease_points": []
758 4 : },
759 4 : {
760 4 : "timeline_id": "454626700469f0a9914949b9d018e876",
761 4 : "ancestor_lsn": "0/176D998",
762 4 : "last_record": "0/1837770",
763 4 : "latest_gc_cutoff": "0/1698C48",
764 4 : "next_pitr_cutoff": "0/1817770",
765 4 : "retention_param_cutoff": null,
766 4 : "lease_points": []
767 4 : },
768 4 : {
769 4 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
770 4 : "ancestor_lsn": "0/0",
771 4 : "last_record": "0/18D3D98",
772 4 : "latest_gc_cutoff": "0/1698C48",
773 4 : "next_pitr_cutoff": "0/18B3D98",
774 4 : "retention_param_cutoff": null,
775 4 : "lease_points": []
776 4 : }
777 4 : ]
778 4 : }
779 4 : "#;
780 4 : let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
781 4 :
782 4 : assert_eq!(inputs.calculate(), 37_851_408);
783 4 : }
784 :
785 : #[cfg(test)]
786 : #[test]
787 4 : fn verify_size_for_one_branch() {
788 4 : let doc = r#"
789 4 : {
790 4 : "segments": [
791 4 : {
792 4 : "segment": {
793 4 : "parent": null,
794 4 : "lsn": 0,
795 4 : "size": null,
796 4 : "needed": false
797 4 : },
798 4 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
799 4 : "kind": "BranchStart"
800 4 : },
801 4 : {
802 4 : "segment": {
803 4 : "parent": 0,
804 4 : "lsn": 305547335776,
805 4 : "size": 220054675456,
806 4 : "needed": false
807 4 : },
808 4 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
809 4 : "kind": "GcCutOff"
810 4 : },
811 4 : {
812 4 : "segment": {
813 4 : "parent": 1,
814 4 : "lsn": 305614444640,
815 4 : "size": null,
816 4 : "needed": true
817 4 : },
818 4 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
819 4 : "kind": "BranchEnd"
820 4 : }
821 4 : ],
822 4 : "timeline_inputs": [
823 4 : {
824 4 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
825 4 : "ancestor_lsn": "0/0",
826 4 : "last_record": "47/280A5860",
827 4 : "latest_gc_cutoff": "47/240A5860",
828 4 : "next_pitr_cutoff": "47/240A5860",
829 4 : "retention_param_cutoff": "0/0",
830 4 : "lease_points": []
831 4 : }
832 4 : ]
833 4 : }"#;
834 4 :
835 4 : let model: ModelInputs = serde_json::from_str(doc).unwrap();
836 4 :
837 4 : let res = model.calculate_model().calculate();
838 4 :
839 4 : println!("calculated synthetic size: {}", res.total_size);
840 4 : println!("result: {:?}", serde_json::to_string(&res.segments));
841 :
842 : use utils::lsn::Lsn;
843 4 : let latest_gc_cutoff_lsn: Lsn = "47/240A5860".parse().unwrap();
844 4 : let last_lsn: Lsn = "47/280A5860".parse().unwrap();
845 4 : println!(
846 4 : "latest_gc_cutoff lsn 47/240A5860 is {}, last_lsn lsn 47/280A5860 is {}",
847 4 : u64::from(latest_gc_cutoff_lsn),
848 4 : u64::from(last_lsn)
849 4 : );
850 4 : assert_eq!(res.total_size, 220121784320);
851 4 : }
|