Line data Source code
1 : use std::cmp;
2 : use std::collections::hash_map::Entry;
3 : use std::collections::{HashMap, HashSet};
4 : use std::sync::Arc;
5 :
6 : use tenant_size_model::svg::SvgBranchKind;
7 : use tenant_size_model::{Segment, StorageModel};
8 : use tokio::sync::Semaphore;
9 : use tokio::sync::oneshot::error::RecvError;
10 : use tokio_util::sync::CancellationToken;
11 : use tracing::*;
12 : use utils::id::TimelineId;
13 : use utils::lsn::Lsn;
14 :
15 : use super::{GcError, LogicalSizeCalculationCause, TenantShard};
16 : use crate::context::RequestContext;
17 : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
18 : use crate::tenant::{MaybeOffloaded, Timeline};
19 :
20 : /// Inputs to the actual tenant sizing model
21 : ///
22 : /// Implements [`serde::Serialize`] but is not meant to be part of the public API, instead meant to
23 : /// be a transferrable format between execution environments and developer.
24 : ///
25 : /// This tracks more information than the actual StorageModel that calculation
26 : /// needs. We will convert this into a StorageModel when it's time to perform
27 : /// the calculation.
28 : ///
29 48 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
30 : pub struct ModelInputs {
31 : pub segments: Vec<SegmentMeta>,
32 : pub timeline_inputs: Vec<TimelineInputs>,
33 : }
34 :
35 : /// A [`Segment`], with some extra information for display purposes
36 504 : #[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
37 : pub struct SegmentMeta {
38 : pub segment: Segment,
39 : pub timeline_id: TimelineId,
40 : pub kind: LsnKind,
41 : }
42 :
43 : #[derive(thiserror::Error, Debug)]
44 : pub(crate) enum CalculateSyntheticSizeError {
45 : /// Something went wrong internally to the calculation of logical size at a particular branch point
46 : #[error("Failed to calculated logical size on timeline {timeline_id} at {lsn}: {error}")]
47 : LogicalSize {
48 : timeline_id: TimelineId,
49 : lsn: Lsn,
50 : error: CalculateLogicalSizeError,
51 : },
52 :
53 : /// Something went wrong internally when calculating GC parameters at start of size calculation
54 : #[error(transparent)]
55 : GcInfo(GcError),
56 :
57 : /// Totally unexpected errors, like panics joining a task
58 : #[error(transparent)]
59 : Fatal(anyhow::Error),
60 :
61 : /// Tenant shut down while calculating size
62 : #[error("Cancelled")]
63 : Cancelled,
64 : }
65 :
66 : impl From<GcError> for CalculateSyntheticSizeError {
67 0 : fn from(value: GcError) -> Self {
68 0 : match value {
69 : GcError::TenantCancelled | GcError::TimelineCancelled => {
70 0 : CalculateSyntheticSizeError::Cancelled
71 : }
72 0 : other => CalculateSyntheticSizeError::GcInfo(other),
73 : }
74 0 : }
75 : }
76 :
77 : impl SegmentMeta {
78 696 : fn size_needed(&self) -> bool {
79 696 : match self.kind {
80 : LsnKind::BranchStart => {
81 : // If we don't have a later GcCutoff point on this branch, and
82 : // no ancestor, calculate size for the branch start point.
83 192 : self.segment.needed && self.segment.parent.is_none()
84 : }
85 144 : LsnKind::BranchPoint => true,
86 168 : LsnKind::GcCutOff => true,
87 192 : LsnKind::BranchEnd => false,
88 0 : LsnKind::LeasePoint => true,
89 0 : LsnKind::LeaseStart => false,
90 0 : LsnKind::LeaseEnd => false,
91 : }
92 696 : }
93 : }
94 :
95 : #[derive(
96 168 : Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, serde::Serialize, serde::Deserialize,
97 : )]
98 : pub enum LsnKind {
99 : /// A timeline starting here
100 : BranchStart,
101 : /// A child timeline branches off from here
102 : BranchPoint,
103 : /// GC cutoff point
104 : GcCutOff,
105 : /// Last record LSN
106 : BranchEnd,
107 : /// A LSN lease is granted here.
108 : LeasePoint,
109 : /// A lease starts from here.
110 : LeaseStart,
111 : /// Last record LSN for the lease (should have the same LSN as the previous [`LsnKind::LeaseStart`]).
112 : LeaseEnd,
113 : }
114 :
115 : impl From<LsnKind> for SvgBranchKind {
116 0 : fn from(kind: LsnKind) -> Self {
117 0 : match kind {
118 0 : LsnKind::LeasePoint | LsnKind::LeaseStart | LsnKind::LeaseEnd => SvgBranchKind::Lease,
119 0 : _ => SvgBranchKind::Timeline,
120 : }
121 0 : }
122 : }
123 :
124 : /// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
125 : /// part of [`ModelInputs`] from the HTTP api, explaining the inputs.
126 336 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
127 : pub struct TimelineInputs {
128 : pub timeline_id: TimelineId,
129 :
130 : pub ancestor_id: Option<TimelineId>,
131 :
132 : ancestor_lsn: Lsn,
133 : last_record: Lsn,
134 : latest_gc_cutoff: Lsn,
135 :
136 : /// Cutoff point based on GC settings
137 : next_pitr_cutoff: Lsn,
138 :
139 : /// Cutoff point calculated from the user-supplied 'max_retention_period'
140 : retention_param_cutoff: Option<Lsn>,
141 :
142 : /// Lease points on the timeline
143 : lease_points: Vec<Lsn>,
144 : }
145 :
146 : /// Gathers the inputs for the tenant sizing model.
147 : ///
148 : /// Tenant size does not consider the latest state, but only the state until next_pitr_cutoff, which
149 : /// is updated on-demand, during the start of this calculation and separate from the
150 : /// [`TimelineInputs::latest_gc_cutoff`].
151 : ///
152 : /// For timelines in general:
153 : ///
154 : /// ```text
155 : /// 0-----|---------|----|------------| · · · · · |·> lsn
156 : /// initdb_lsn branchpoints* next_pitr_cutoff latest
157 : /// ```
158 24 : pub(super) async fn gather_inputs(
159 24 : tenant: &TenantShard,
160 24 : limit: &Arc<Semaphore>,
161 24 : max_retention_period: Option<u64>,
162 24 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
163 24 : cause: LogicalSizeCalculationCause,
164 24 : cancel: &CancellationToken,
165 24 : ctx: &RequestContext,
166 24 : ) -> Result<ModelInputs, CalculateSyntheticSizeError> {
167 24 : // refresh is needed to update [`timeline::GcCutoffs`]
168 24 : tenant.refresh_gc_info(cancel, ctx).await?;
169 :
170 : // Collect information about all the timelines
171 24 : let mut timelines = tenant.list_timelines();
172 24 :
173 24 : if timelines.is_empty() {
174 : // perhaps the tenant has just been created, and as such doesn't have any data yet
175 0 : return Ok(ModelInputs {
176 0 : segments: vec![],
177 0 : timeline_inputs: Vec::new(),
178 0 : });
179 24 : }
180 24 :
181 24 : // Filter out timelines that are not active
182 24 : //
183 24 : // There may be a race when a timeline is dropped,
184 24 : // but it is unlikely to cause any issues. In the worst case,
185 24 : // the calculation will error out.
186 96 : timelines.retain(|t| t.is_active());
187 24 : // Also filter out archived timelines.
188 96 : timelines.retain(|t| t.is_archived() != Some(true));
189 24 :
190 24 : // Build a map of branch points.
191 24 : let mut branchpoints: HashMap<TimelineId, HashSet<Lsn>> = HashMap::new();
192 96 : for timeline in timelines.iter() {
193 96 : if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
194 72 : branchpoints
195 72 : .entry(ancestor_id)
196 72 : .or_default()
197 72 : .insert(timeline.get_ancestor_lsn());
198 72 : }
199 : }
200 :
201 : // These become the final result.
202 24 : let mut timeline_inputs = Vec::with_capacity(timelines.len());
203 24 : let mut segments: Vec<SegmentMeta> = Vec::new();
204 24 :
205 24 : //
206 24 : // Build Segments representing each timeline. As we do that, also remember
207 24 : // the branchpoints and branch startpoints in 'branchpoint_segments' and
208 24 : // 'branchstart_segments'
209 24 : //
210 24 :
211 24 : // BranchPoint segments of each timeline
212 24 : // (timeline, branchpoint LSN) -> segment_id
213 24 : let mut branchpoint_segments: HashMap<(TimelineId, Lsn), usize> = HashMap::new();
214 :
215 : // timeline, Branchpoint seg id, (ancestor, ancestor LSN)
216 : type BranchStartSegment = (TimelineId, usize, Option<(TimelineId, Lsn)>);
217 24 : let mut branchstart_segments: Vec<BranchStartSegment> = Vec::new();
218 :
219 96 : for timeline in timelines.iter() {
220 96 : let timeline_id = timeline.timeline_id;
221 96 : let last_record_lsn = timeline.get_last_record_lsn();
222 96 : let ancestor_lsn = timeline.get_ancestor_lsn();
223 96 :
224 96 : // there's a race between the update (holding tenant.gc_lock) and this read but it
225 96 : // might not be an issue, because it's not for Timeline::gc
226 96 : let gc_info = timeline.gc_info.read().unwrap();
227 96 :
228 96 : // similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a
229 96 : // new gc run, which we have no control over. however differently from `Timeline::gc`
230 96 : // we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not
231 96 : // actually removing files.
232 96 : //
233 96 : // We only consider [`timeline::GcCutoffs::time`], and not [`timeline::GcCutoffs::space`], because from
234 96 : // a user's perspective they have only requested retention up to the time bound (pitr_cutoff), rather
235 96 : // than our internal space cutoff. This means that if someone drops a database and waits for their
236 96 : // PITR interval, they will see synthetic size decrease, even if we are still storing data inside
237 96 : // the space cutoff.
238 96 : let mut next_pitr_cutoff = gc_info.cutoffs.time;
239 :
240 : // If the caller provided a shorter retention period, use that instead of the GC cutoff.
241 96 : let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {
242 0 : let param_cutoff = Lsn(last_record_lsn.0.saturating_sub(max_retention_period));
243 0 : if next_pitr_cutoff < param_cutoff {
244 0 : next_pitr_cutoff = param_cutoff;
245 0 : }
246 0 : Some(param_cutoff)
247 : } else {
248 96 : None
249 : };
250 :
251 96 : let branch_is_invisible = timeline.is_invisible() == Some(true);
252 96 :
253 96 : let lease_points = gc_info
254 96 : .leases
255 96 : .keys()
256 96 : .filter(|&&lsn| lsn > ancestor_lsn)
257 96 : .copied()
258 96 : .collect::<Vec<_>>();
259 96 :
260 96 : // next_pitr_cutoff in parent branch are not of interest (right now at least), nor do we
261 96 : // want to query any logical size before initdb_lsn.
262 96 : let branch_start_lsn = cmp::max(ancestor_lsn, timeline.initdb_lsn);
263 96 :
264 96 : // Build "interesting LSNs" on this timeline
265 96 : let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
266 96 : .retain_lsns
267 96 : .iter()
268 96 : .filter(|(lsn, _child_id, is_offloaded)| {
269 72 : lsn > &ancestor_lsn && *is_offloaded == MaybeOffloaded::No
270 96 : })
271 96 : .copied()
272 96 : // this assumes there are no other retain_lsns than the branchpoints
273 96 : .map(|(lsn, _child_id, _is_offloaded)| (lsn, LsnKind::BranchPoint))
274 96 : .collect::<Vec<_>>();
275 96 :
276 96 : if !branch_is_invisible {
277 84 : // Do not count lease points for invisible branches.
278 84 : lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
279 84 : }
280 :
281 96 : drop(gc_info);
282 :
283 : // Add branch points we collected earlier, just in case there were any that were
284 : // not present in retain_lsns. We will remove any duplicates below later.
285 96 : if let Some(this_branchpoints) = branchpoints.get(&timeline_id) {
286 24 : lsns.extend(
287 24 : this_branchpoints
288 24 : .iter()
289 72 : .map(|lsn| (*lsn, LsnKind::BranchPoint)),
290 24 : )
291 72 : }
292 :
293 : // Add a point for the PITR cutoff
294 96 : let branch_start_needed = next_pitr_cutoff <= branch_start_lsn;
295 96 : if !branch_start_needed && !branch_is_invisible {
296 84 : // Only add the GcCutOff point when the timeline is visible; otherwise, do not compute the size for the LSN
297 84 : // range from the last branch point to the latest data.
298 84 : lsns.push((next_pitr_cutoff, LsnKind::GcCutOff));
299 84 : }
300 :
301 96 : lsns.sort_unstable();
302 96 : lsns.dedup();
303 96 :
304 96 : //
305 96 : // Create Segments for the interesting points.
306 96 : //
307 96 :
308 96 : // Timeline start point
309 96 : let ancestor = timeline
310 96 : .get_ancestor_timeline_id()
311 96 : .map(|ancestor_id| (ancestor_id, ancestor_lsn));
312 96 : branchstart_segments.push((timeline_id, segments.len(), ancestor));
313 96 : segments.push(SegmentMeta {
314 96 : segment: Segment {
315 96 : parent: None, // filled in later
316 96 : lsn: branch_start_lsn.0,
317 96 : size: None, // filled in later
318 96 : needed: branch_start_needed,
319 96 : },
320 96 : timeline_id: timeline.timeline_id,
321 96 : kind: LsnKind::BranchStart,
322 96 : });
323 96 :
324 96 : // GC cutoff point, and any branch points, i.e. points where
325 96 : // other timelines branch off from this timeline.
326 96 : let mut parent = segments.len() - 1;
327 252 : for (lsn, kind) in lsns {
328 156 : if kind == LsnKind::BranchPoint {
329 72 : branchpoint_segments.insert((timeline_id, lsn), segments.len());
330 84 : }
331 :
332 156 : segments.push(SegmentMeta {
333 156 : segment: Segment {
334 156 : parent: Some(parent),
335 156 : lsn: lsn.0,
336 156 : size: None,
337 156 : needed: lsn > next_pitr_cutoff,
338 156 : },
339 156 : timeline_id: timeline.timeline_id,
340 156 : kind,
341 156 : });
342 156 :
343 156 : parent = segments.len() - 1;
344 156 :
345 156 : if kind == LsnKind::LeasePoint {
346 0 : // Needs `LeaseStart` and `LeaseEnd` as well to model lease as a read-only branch that never writes data
347 0 : // (i.e. it's lsn has not advanced from ancestor_lsn), and therefore the three segments have the same LSN
348 0 : // value. Without the other two segments, the calculation code would not count the leased LSN as a point
349 0 : // to be retained.
350 0 : // Did not use `BranchStart` or `BranchEnd` so we can differentiate branches and leases during debug.
351 0 : //
352 0 : // Alt Design: rewrite the entire calculation code to be independent of timeline id. Both leases and
353 0 : // branch points can be given a synthetic id so we can unite them.
354 0 : let mut lease_parent = parent;
355 0 :
356 0 : // Start of a lease.
357 0 : segments.push(SegmentMeta {
358 0 : segment: Segment {
359 0 : parent: Some(lease_parent),
360 0 : lsn: lsn.0,
361 0 : size: None, // Filled in later, if necessary
362 0 : needed: lsn > next_pitr_cutoff, // only needed if the point is within rentention.
363 0 : },
364 0 : timeline_id: timeline.timeline_id,
365 0 : kind: LsnKind::LeaseStart,
366 0 : });
367 0 : lease_parent += 1;
368 0 :
369 0 : // End of the lease.
370 0 : segments.push(SegmentMeta {
371 0 : segment: Segment {
372 0 : parent: Some(lease_parent),
373 0 : lsn: lsn.0,
374 0 : size: None, // Filled in later, if necessary
375 0 : needed: true, // everything at the lease LSN must be readable => is needed
376 0 : },
377 0 : timeline_id: timeline.timeline_id,
378 0 : kind: LsnKind::LeaseEnd,
379 0 : });
380 156 : }
381 : }
382 :
383 96 : let branch_end_lsn = if branch_is_invisible {
384 : // If the branch is invisible, the branch end is the last requested LSN (likely a branch cutoff point).
385 12 : segments.last().unwrap().segment.lsn
386 : } else {
387 : // Otherwise, the branch end is the last record LSN.
388 84 : last_record_lsn.0
389 : };
390 :
391 : // Current end of the timeline
392 96 : segments.push(SegmentMeta {
393 96 : segment: Segment {
394 96 : parent: Some(parent),
395 96 : lsn: branch_end_lsn,
396 96 : size: None, // Filled in later, if necessary
397 96 : needed: true,
398 96 : },
399 96 : timeline_id: timeline.timeline_id,
400 96 : kind: LsnKind::BranchEnd,
401 96 : });
402 96 :
403 96 : timeline_inputs.push(TimelineInputs {
404 96 : timeline_id: timeline.timeline_id,
405 96 : ancestor_id: timeline.get_ancestor_timeline_id(),
406 96 : ancestor_lsn,
407 96 : last_record: last_record_lsn,
408 96 : // this is not used above, because it might not have updated recently enough
409 96 : latest_gc_cutoff: *timeline.get_applied_gc_cutoff_lsn(),
410 96 : next_pitr_cutoff,
411 96 : retention_param_cutoff,
412 96 : lease_points,
413 96 : });
414 : }
415 :
416 : // We now have all segments from the timelines in 'segments'. The timelines
417 : // haven't been linked to each other yet, though. Do that.
418 120 : for (_timeline_id, seg_id, ancestor) in branchstart_segments {
419 : // Look up the branch point
420 96 : if let Some(ancestor) = ancestor {
421 72 : let parent_id = *branchpoint_segments.get(&ancestor).unwrap();
422 72 : segments[seg_id].segment.parent = Some(parent_id);
423 72 : }
424 : }
425 :
426 : // We left the 'size' field empty in all of the Segments so far.
427 : // Now find logical sizes for all of the points that might need or benefit from them.
428 24 : fill_logical_sizes(
429 24 : &timelines,
430 24 : &mut segments,
431 24 : limit,
432 24 : logical_size_cache,
433 24 : cause,
434 24 : ctx,
435 24 : )
436 24 : .await?;
437 :
438 24 : if tenant.cancel.is_cancelled() {
439 : // If we're shutting down, return an error rather than a sparse result that might include some
440 : // timelines from before we started shutting down
441 0 : return Err(CalculateSyntheticSizeError::Cancelled);
442 24 : }
443 24 :
444 24 : Ok(ModelInputs {
445 24 : segments,
446 24 : timeline_inputs,
447 24 : })
448 24 : }
449 :
450 : /// Augment 'segments' with logical sizes
451 : ///
452 : /// This will leave segments' sizes as None if the Timeline associated with the segment is deleted concurrently
453 : /// (i.e. we cannot read its logical size at a particular LSN).
454 24 : async fn fill_logical_sizes(
455 24 : timelines: &[Arc<Timeline>],
456 24 : segments: &mut [SegmentMeta],
457 24 : limit: &Arc<Semaphore>,
458 24 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
459 24 : cause: LogicalSizeCalculationCause,
460 24 : ctx: &RequestContext,
461 24 : ) -> Result<(), CalculateSyntheticSizeError> {
462 24 : let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
463 24 : timelines
464 24 : .iter()
465 96 : .map(|timeline| (timeline.timeline_id, Arc::clone(timeline))),
466 24 : );
467 24 :
468 24 : // record the used/inserted cache keys here, to remove extras not to start leaking
469 24 : // after initial run the cache should be quite stable, but live timelines will eventually
470 24 : // require new lsns to be inspected.
471 24 : let mut sizes_needed = HashMap::<(TimelineId, Lsn), Option<u64>>::new();
472 24 :
473 24 : // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
474 24 : // our advantage with `?` error handling.
475 24 : let mut joinset = tokio::task::JoinSet::new();
476 :
477 : // For each point that would benefit from having a logical size available,
478 : // spawn a Task to fetch it, unless we have it cached already.
479 348 : for seg in segments.iter() {
480 348 : if !seg.size_needed() {
481 192 : continue;
482 156 : }
483 156 :
484 156 : let timeline_id = seg.timeline_id;
485 156 : let lsn = Lsn(seg.segment.lsn);
486 :
487 156 : if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) {
488 156 : let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
489 156 : if cached_size.is_none() {
490 84 : let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
491 84 : let parallel_size_calcs = Arc::clone(limit);
492 84 : let ctx = ctx.attached_child().with_scope_timeline(&timeline);
493 84 : joinset.spawn(
494 84 : calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx)
495 84 : .in_current_span(),
496 84 : );
497 84 : }
498 156 : e.insert(cached_size);
499 0 : }
500 : }
501 :
502 : // Perform the size lookups
503 24 : let mut have_any_error = None;
504 108 : while let Some(res) = joinset.join_next().await {
505 : // each of these come with Result<anyhow::Result<_>, JoinError>
506 : // because of spawn + spawn_blocking
507 84 : match res {
508 0 : Err(join_error) if join_error.is_cancelled() => {
509 0 : unreachable!("we are not cancelling any of the futures, nor should be");
510 : }
511 0 : Err(join_error) => {
512 0 : // cannot really do anything, as this panic is likely a bug
513 0 : error!(
514 0 : "task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}"
515 : );
516 :
517 0 : have_any_error = Some(CalculateSyntheticSizeError::Fatal(
518 0 : anyhow::anyhow!(join_error)
519 0 : .context("task that calls spawn_ondemand_logical_size_calculation"),
520 0 : ));
521 : }
522 0 : Ok(Err(recv_result_error)) => {
523 0 : // cannot really do anything, as this panic is likely a bug
524 0 : error!("failed to receive logical size query result: {recv_result_error:#}");
525 0 : have_any_error = Some(CalculateSyntheticSizeError::Fatal(
526 0 : anyhow::anyhow!(recv_result_error)
527 0 : .context("Receiving logical size query result"),
528 0 : ));
529 : }
530 0 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
531 0 : if matches!(error, CalculateLogicalSizeError::Cancelled) {
532 : // Skip this: it's okay if one timeline among many is shutting down while we
533 : // calculate inputs for the overall tenant.
534 0 : continue;
535 : } else {
536 0 : warn!(
537 0 : timeline_id=%timeline.timeline_id,
538 0 : "failed to calculate logical size at {lsn}: {error:#}"
539 : );
540 0 : have_any_error = Some(CalculateSyntheticSizeError::LogicalSize {
541 0 : timeline_id: timeline.timeline_id,
542 0 : lsn,
543 0 : error,
544 0 : });
545 : }
546 : }
547 84 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
548 84 : debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
549 :
550 84 : logical_size_cache.insert((timeline.timeline_id, lsn), size);
551 84 : sizes_needed.insert((timeline.timeline_id, lsn), Some(size));
552 : }
553 : }
554 : }
555 :
556 : // prune any keys not needed anymore; we record every used key and added key.
557 168 : logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
558 :
559 24 : if let Some(error) = have_any_error {
560 : // we cannot complete this round, because we are missing data.
561 : // we have however cached all we were able to request calculation on.
562 0 : return Err(error);
563 24 : }
564 :
565 : // Insert the looked up sizes to the Segments
566 348 : for seg in segments.iter_mut() {
567 348 : if !seg.size_needed() {
568 192 : continue;
569 156 : }
570 156 :
571 156 : let timeline_id = seg.timeline_id;
572 156 : let lsn = Lsn(seg.segment.lsn);
573 :
574 156 : if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
575 156 : seg.segment.size = Some(*size);
576 156 : }
577 : }
578 24 : Ok(())
579 24 : }
580 :
581 : impl ModelInputs {
582 24 : pub fn calculate_model(&self) -> tenant_size_model::StorageModel {
583 24 : // Convert SegmentMetas into plain Segments
584 24 : StorageModel {
585 24 : segments: self
586 24 : .segments
587 24 : .iter()
588 168 : .map(|seg| seg.segment.clone())
589 24 : .collect(),
590 24 : }
591 24 : }
592 :
593 : // calculate total project size
594 12 : pub fn calculate(&self) -> u64 {
595 12 : let storage = self.calculate_model();
596 12 : let sizes = storage.calculate();
597 12 : sizes.total_size
598 12 : }
599 : }
600 :
601 : /// Newtype around the tuple that carries the timeline at lsn logical size calculation.
602 : struct TimelineAtLsnSizeResult(
603 : Arc<crate::tenant::Timeline>,
604 : utils::lsn::Lsn,
605 : Result<u64, CalculateLogicalSizeError>,
606 : );
607 :
608 : #[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
609 : async fn calculate_logical_size(
610 : limit: Arc<tokio::sync::Semaphore>,
611 : timeline: Arc<crate::tenant::Timeline>,
612 : lsn: utils::lsn::Lsn,
613 : cause: LogicalSizeCalculationCause,
614 : ctx: RequestContext,
615 : ) -> Result<TimelineAtLsnSizeResult, RecvError> {
616 : let _permit = tokio::sync::Semaphore::acquire_owned(limit)
617 : .await
618 : .expect("global semaphore should not had been closed");
619 :
620 : let size_res = timeline
621 : .spawn_ondemand_logical_size_calculation(lsn, cause, ctx)
622 : .instrument(info_span!("spawn_ondemand_logical_size_calculation"))
623 : .await?;
624 : Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
625 : }
626 :
627 : #[cfg(test)]
628 : #[test]
629 12 : fn verify_size_for_multiple_branches() {
630 12 : // this is generated from integration test test_tenant_size_with_multiple_branches, but this way
631 12 : // it has the stable lsn's
632 12 : //
633 12 : // The timeline_inputs don't participate in the size calculation, and are here just to explain
634 12 : // the inputs.
635 12 : let doc = r#"
636 12 : {
637 12 : "segments": [
638 12 : {
639 12 : "segment": {
640 12 : "parent": 9,
641 12 : "lsn": 26033560,
642 12 : "size": null,
643 12 : "needed": false
644 12 : },
645 12 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
646 12 : "kind": "BranchStart"
647 12 : },
648 12 : {
649 12 : "segment": {
650 12 : "parent": 0,
651 12 : "lsn": 35720400,
652 12 : "size": 25206784,
653 12 : "needed": false
654 12 : },
655 12 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
656 12 : "kind": "GcCutOff"
657 12 : },
658 12 : {
659 12 : "segment": {
660 12 : "parent": 1,
661 12 : "lsn": 35851472,
662 12 : "size": null,
663 12 : "needed": true
664 12 : },
665 12 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
666 12 : "kind": "BranchEnd"
667 12 : },
668 12 : {
669 12 : "segment": {
670 12 : "parent": 7,
671 12 : "lsn": 24566168,
672 12 : "size": null,
673 12 : "needed": false
674 12 : },
675 12 : "timeline_id": "454626700469f0a9914949b9d018e876",
676 12 : "kind": "BranchStart"
677 12 : },
678 12 : {
679 12 : "segment": {
680 12 : "parent": 3,
681 12 : "lsn": 25261936,
682 12 : "size": 26050560,
683 12 : "needed": false
684 12 : },
685 12 : "timeline_id": "454626700469f0a9914949b9d018e876",
686 12 : "kind": "GcCutOff"
687 12 : },
688 12 : {
689 12 : "segment": {
690 12 : "parent": 4,
691 12 : "lsn": 25393008,
692 12 : "size": null,
693 12 : "needed": true
694 12 : },
695 12 : "timeline_id": "454626700469f0a9914949b9d018e876",
696 12 : "kind": "BranchEnd"
697 12 : },
698 12 : {
699 12 : "segment": {
700 12 : "parent": null,
701 12 : "lsn": 23694408,
702 12 : "size": null,
703 12 : "needed": false
704 12 : },
705 12 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
706 12 : "kind": "BranchStart"
707 12 : },
708 12 : {
709 12 : "segment": {
710 12 : "parent": 6,
711 12 : "lsn": 24566168,
712 12 : "size": 25739264,
713 12 : "needed": false
714 12 : },
715 12 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
716 12 : "kind": "BranchPoint"
717 12 : },
718 12 : {
719 12 : "segment": {
720 12 : "parent": 7,
721 12 : "lsn": 25902488,
722 12 : "size": 26402816,
723 12 : "needed": false
724 12 : },
725 12 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
726 12 : "kind": "GcCutOff"
727 12 : },
728 12 : {
729 12 : "segment": {
730 12 : "parent": 8,
731 12 : "lsn": 26033560,
732 12 : "size": 26468352,
733 12 : "needed": true
734 12 : },
735 12 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
736 12 : "kind": "BranchPoint"
737 12 : },
738 12 : {
739 12 : "segment": {
740 12 : "parent": 9,
741 12 : "lsn": 26033560,
742 12 : "size": null,
743 12 : "needed": true
744 12 : },
745 12 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
746 12 : "kind": "BranchEnd"
747 12 : }
748 12 : ],
749 12 : "timeline_inputs": [
750 12 : {
751 12 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
752 12 : "ancestor_lsn": "0/18D3D98",
753 12 : "last_record": "0/2230CD0",
754 12 : "latest_gc_cutoff": "0/1698C48",
755 12 : "next_pitr_cutoff": "0/2210CD0",
756 12 : "retention_param_cutoff": null,
757 12 : "lease_points": []
758 12 : },
759 12 : {
760 12 : "timeline_id": "454626700469f0a9914949b9d018e876",
761 12 : "ancestor_lsn": "0/176D998",
762 12 : "last_record": "0/1837770",
763 12 : "latest_gc_cutoff": "0/1698C48",
764 12 : "next_pitr_cutoff": "0/1817770",
765 12 : "retention_param_cutoff": null,
766 12 : "lease_points": []
767 12 : },
768 12 : {
769 12 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
770 12 : "ancestor_lsn": "0/0",
771 12 : "last_record": "0/18D3D98",
772 12 : "latest_gc_cutoff": "0/1698C48",
773 12 : "next_pitr_cutoff": "0/18B3D98",
774 12 : "retention_param_cutoff": null,
775 12 : "lease_points": []
776 12 : }
777 12 : ]
778 12 : }
779 12 : "#;
780 12 : let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
781 12 :
782 12 : assert_eq!(inputs.calculate(), 37_851_408);
783 12 : }
784 :
785 : #[cfg(test)]
786 : #[test]
787 12 : fn verify_size_for_one_branch() {
788 12 : let doc = r#"
789 12 : {
790 12 : "segments": [
791 12 : {
792 12 : "segment": {
793 12 : "parent": null,
794 12 : "lsn": 0,
795 12 : "size": null,
796 12 : "needed": false
797 12 : },
798 12 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
799 12 : "kind": "BranchStart"
800 12 : },
801 12 : {
802 12 : "segment": {
803 12 : "parent": 0,
804 12 : "lsn": 305547335776,
805 12 : "size": 220054675456,
806 12 : "needed": false
807 12 : },
808 12 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
809 12 : "kind": "GcCutOff"
810 12 : },
811 12 : {
812 12 : "segment": {
813 12 : "parent": 1,
814 12 : "lsn": 305614444640,
815 12 : "size": null,
816 12 : "needed": true
817 12 : },
818 12 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
819 12 : "kind": "BranchEnd"
820 12 : }
821 12 : ],
822 12 : "timeline_inputs": [
823 12 : {
824 12 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
825 12 : "ancestor_lsn": "0/0",
826 12 : "last_record": "47/280A5860",
827 12 : "latest_gc_cutoff": "47/240A5860",
828 12 : "next_pitr_cutoff": "47/240A5860",
829 12 : "retention_param_cutoff": "0/0",
830 12 : "lease_points": []
831 12 : }
832 12 : ]
833 12 : }"#;
834 12 :
835 12 : let model: ModelInputs = serde_json::from_str(doc).unwrap();
836 12 :
837 12 : let res = model.calculate_model().calculate();
838 12 :
839 12 : println!("calculated synthetic size: {}", res.total_size);
840 12 : println!("result: {:?}", serde_json::to_string(&res.segments));
841 :
842 : use utils::lsn::Lsn;
843 12 : let latest_gc_cutoff_lsn: Lsn = "47/240A5860".parse().unwrap();
844 12 : let last_lsn: Lsn = "47/280A5860".parse().unwrap();
845 12 : println!(
846 12 : "latest_gc_cutoff lsn 47/240A5860 is {}, last_lsn lsn 47/280A5860 is {}",
847 12 : u64::from(latest_gc_cutoff_lsn),
848 12 : u64::from(last_lsn)
849 12 : );
850 12 : assert_eq!(res.total_size, 220121784320);
851 12 : }
|