Line data Source code
1 : use std::cmp;
2 : use std::collections::hash_map::Entry;
3 : use std::collections::{HashMap, HashSet};
4 : use std::sync::Arc;
5 :
6 : use tenant_size_model::svg::SvgBranchKind;
7 : use tenant_size_model::{Segment, StorageModel};
8 : use tokio::sync::Semaphore;
9 : use tokio::sync::oneshot::error::RecvError;
10 : use tokio_util::sync::CancellationToken;
11 : use tracing::*;
12 : use utils::id::TimelineId;
13 : use utils::lsn::Lsn;
14 :
15 : use super::{GcError, LogicalSizeCalculationCause, Tenant};
16 : use crate::context::RequestContext;
17 : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
18 : use crate::tenant::{MaybeOffloaded, Timeline};
19 :
20 : /// Inputs to the actual tenant sizing model
21 : ///
22 : /// Implements [`serde::Serialize`] but is not meant to be part of the public API, instead meant to
23 : /// be a transferrable format between execution environments and developer.
24 : ///
25 : /// This tracks more information than the actual StorageModel that calculation
26 : /// needs. We will convert this into a StorageModel when it's time to perform
27 : /// the calculation.
28 : ///
29 16 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
30 : pub struct ModelInputs {
31 : pub segments: Vec<SegmentMeta>,
32 : pub timeline_inputs: Vec<TimelineInputs>,
33 : }
34 :
35 : /// A [`Segment`], with some extra information for display purposes
36 168 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
37 : pub struct SegmentMeta {
38 : pub segment: Segment,
39 : pub timeline_id: TimelineId,
40 : pub kind: LsnKind,
41 : }
42 :
43 : #[derive(thiserror::Error, Debug)]
44 : pub(crate) enum CalculateSyntheticSizeError {
45 : /// Something went wrong internally to the calculation of logical size at a particular branch point
46 : #[error("Failed to calculated logical size on timeline {timeline_id} at {lsn}: {error}")]
47 : LogicalSize {
48 : timeline_id: TimelineId,
49 : lsn: Lsn,
50 : error: CalculateLogicalSizeError,
51 : },
52 :
53 : /// Something went wrong internally when calculating GC parameters at start of size calculation
54 : #[error(transparent)]
55 : GcInfo(GcError),
56 :
57 : /// Totally unexpected errors, like panics joining a task
58 : #[error(transparent)]
59 : Fatal(anyhow::Error),
60 :
61 : /// Tenant shut down while calculating size
62 : #[error("Cancelled")]
63 : Cancelled,
64 : }
65 :
66 : impl From<GcError> for CalculateSyntheticSizeError {
67 0 : fn from(value: GcError) -> Self {
68 0 : match value {
69 : GcError::TenantCancelled | GcError::TimelineCancelled => {
70 0 : CalculateSyntheticSizeError::Cancelled
71 : }
72 0 : other => CalculateSyntheticSizeError::GcInfo(other),
73 : }
74 0 : }
75 : }
76 :
77 : impl SegmentMeta {
78 0 : fn size_needed(&self) -> bool {
79 0 : match self.kind {
80 : LsnKind::BranchStart => {
81 : // If we don't have a later GcCutoff point on this branch, and
82 : // no ancestor, calculate size for the branch start point.
83 0 : self.segment.needed && self.segment.parent.is_none()
84 : }
85 0 : LsnKind::BranchPoint => true,
86 0 : LsnKind::GcCutOff => true,
87 0 : LsnKind::BranchEnd => false,
88 0 : LsnKind::LeasePoint => true,
89 0 : LsnKind::LeaseStart => false,
90 0 : LsnKind::LeaseEnd => false,
91 : }
92 0 : }
93 : }
94 :
95 : #[derive(
96 56 : Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, serde::Serialize, serde::Deserialize,
97 : )]
98 : pub enum LsnKind {
99 : /// A timeline starting here
100 : BranchStart,
101 : /// A child timeline branches off from here
102 : BranchPoint,
103 : /// GC cutoff point
104 : GcCutOff,
105 : /// Last record LSN
106 : BranchEnd,
107 : /// A LSN lease is granted here.
108 : LeasePoint,
109 : /// A lease starts from here.
110 : LeaseStart,
111 : /// Last record LSN for the lease (should have the same LSN as the previous [`LsnKind::LeaseStart`]).
112 : LeaseEnd,
113 : }
114 :
115 : impl From<LsnKind> for SvgBranchKind {
116 0 : fn from(kind: LsnKind) -> Self {
117 0 : match kind {
118 0 : LsnKind::LeasePoint | LsnKind::LeaseStart | LsnKind::LeaseEnd => SvgBranchKind::Lease,
119 0 : _ => SvgBranchKind::Timeline,
120 : }
121 0 : }
122 : }
123 :
124 : /// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
125 : /// part of [`ModelInputs`] from the HTTP api, explaining the inputs.
126 112 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
127 : pub struct TimelineInputs {
128 : pub timeline_id: TimelineId,
129 :
130 : pub ancestor_id: Option<TimelineId>,
131 :
132 : ancestor_lsn: Lsn,
133 : last_record: Lsn,
134 : latest_gc_cutoff: Lsn,
135 :
136 : /// Cutoff point based on GC settings
137 : next_pitr_cutoff: Lsn,
138 :
139 : /// Cutoff point calculated from the user-supplied 'max_retention_period'
140 : retention_param_cutoff: Option<Lsn>,
141 :
142 : /// Lease points on the timeline
143 : lease_points: Vec<Lsn>,
144 : }
145 :
146 : /// Gathers the inputs for the tenant sizing model.
147 : ///
148 : /// Tenant size does not consider the latest state, but only the state until next_pitr_cutoff, which
149 : /// is updated on-demand, during the start of this calculation and separate from the
150 : /// [`TimelineInputs::latest_gc_cutoff`].
151 : ///
152 : /// For timelines in general:
153 : ///
154 : /// ```text
155 : /// 0-----|---------|----|------------| · · · · · |·> lsn
156 : /// initdb_lsn branchpoints* next_pitr_cutoff latest
157 : /// ```
158 0 : pub(super) async fn gather_inputs(
159 0 : tenant: &Tenant,
160 0 : limit: &Arc<Semaphore>,
161 0 : max_retention_period: Option<u64>,
162 0 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
163 0 : cause: LogicalSizeCalculationCause,
164 0 : cancel: &CancellationToken,
165 0 : ctx: &RequestContext,
166 0 : ) -> Result<ModelInputs, CalculateSyntheticSizeError> {
167 0 : // refresh is needed to update [`timeline::GcCutoffs`]
168 0 : tenant.refresh_gc_info(cancel, ctx).await?;
169 :
170 : // Collect information about all the timelines
171 0 : let mut timelines = tenant.list_timelines();
172 0 :
173 0 : if timelines.is_empty() {
174 : // perhaps the tenant has just been created, and as such doesn't have any data yet
175 0 : return Ok(ModelInputs {
176 0 : segments: vec![],
177 0 : timeline_inputs: Vec::new(),
178 0 : });
179 0 : }
180 0 :
181 0 : // Filter out timelines that are not active
182 0 : //
183 0 : // There may be a race when a timeline is dropped,
184 0 : // but it is unlikely to cause any issues. In the worst case,
185 0 : // the calculation will error out.
186 0 : timelines.retain(|t| t.is_active());
187 0 : // Also filter out archived timelines.
188 0 : timelines.retain(|t| t.is_archived() != Some(true));
189 0 :
190 0 : // Build a map of branch points.
191 0 : let mut branchpoints: HashMap<TimelineId, HashSet<Lsn>> = HashMap::new();
192 0 : for timeline in timelines.iter() {
193 0 : if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
194 0 : branchpoints
195 0 : .entry(ancestor_id)
196 0 : .or_default()
197 0 : .insert(timeline.get_ancestor_lsn());
198 0 : }
199 : }
200 :
201 : // These become the final result.
202 0 : let mut timeline_inputs = Vec::with_capacity(timelines.len());
203 0 : let mut segments: Vec<SegmentMeta> = Vec::new();
204 0 :
205 0 : //
206 0 : // Build Segments representing each timeline. As we do that, also remember
207 0 : // the branchpoints and branch startpoints in 'branchpoint_segments' and
208 0 : // 'branchstart_segments'
209 0 : //
210 0 :
211 0 : // BranchPoint segments of each timeline
212 0 : // (timeline, branchpoint LSN) -> segment_id
213 0 : let mut branchpoint_segments: HashMap<(TimelineId, Lsn), usize> = HashMap::new();
214 :
215 : // timeline, Branchpoint seg id, (ancestor, ancestor LSN)
216 : type BranchStartSegment = (TimelineId, usize, Option<(TimelineId, Lsn)>);
217 0 : let mut branchstart_segments: Vec<BranchStartSegment> = Vec::new();
218 :
219 0 : for timeline in timelines.iter() {
220 0 : let timeline_id = timeline.timeline_id;
221 0 : let last_record_lsn = timeline.get_last_record_lsn();
222 0 : let ancestor_lsn = timeline.get_ancestor_lsn();
223 0 :
224 0 : // there's a race between the update (holding tenant.gc_lock) and this read but it
225 0 : // might not be an issue, because it's not for Timeline::gc
226 0 : let gc_info = timeline.gc_info.read().unwrap();
227 0 :
228 0 : // similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a
229 0 : // new gc run, which we have no control over. however differently from `Timeline::gc`
230 0 : // we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not
231 0 : // actually removing files.
232 0 : //
233 0 : // We only consider [`timeline::GcCutoffs::time`], and not [`timeline::GcCutoffs::space`], because from
234 0 : // a user's perspective they have only requested retention up to the time bound (pitr_cutoff), rather
235 0 : // than our internal space cutoff. This means that if someone drops a database and waits for their
236 0 : // PITR interval, they will see synthetic size decrease, even if we are still storing data inside
237 0 : // the space cutoff.
238 0 : let mut next_pitr_cutoff = gc_info.cutoffs.time;
239 :
240 : // If the caller provided a shorter retention period, use that instead of the GC cutoff.
241 0 : let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {
242 0 : let param_cutoff = Lsn(last_record_lsn.0.saturating_sub(max_retention_period));
243 0 : if next_pitr_cutoff < param_cutoff {
244 0 : next_pitr_cutoff = param_cutoff;
245 0 : }
246 0 : Some(param_cutoff)
247 : } else {
248 0 : None
249 : };
250 :
251 0 : let lease_points = gc_info
252 0 : .leases
253 0 : .keys()
254 0 : .filter(|&&lsn| lsn > ancestor_lsn)
255 0 : .copied()
256 0 : .collect::<Vec<_>>();
257 0 :
258 0 : // next_pitr_cutoff in parent branch are not of interest (right now at least), nor do we
259 0 : // want to query any logical size before initdb_lsn.
260 0 : let branch_start_lsn = cmp::max(ancestor_lsn, timeline.initdb_lsn);
261 0 :
262 0 : // Build "interesting LSNs" on this timeline
263 0 : let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
264 0 : .retain_lsns
265 0 : .iter()
266 0 : .filter(|(lsn, _child_id, is_offloaded)| {
267 0 : lsn > &ancestor_lsn && *is_offloaded == MaybeOffloaded::No
268 0 : })
269 0 : .copied()
270 0 : // this assumes there are no other retain_lsns than the branchpoints
271 0 : .map(|(lsn, _child_id, _is_offloaded)| (lsn, LsnKind::BranchPoint))
272 0 : .collect::<Vec<_>>();
273 0 :
274 0 : lsns.extend(lease_points.iter().map(|&lsn| (lsn, LsnKind::LeasePoint)));
275 0 :
276 0 : drop(gc_info);
277 :
278 : // Add branch points we collected earlier, just in case there were any that were
279 : // not present in retain_lsns. We will remove any duplicates below later.
280 0 : if let Some(this_branchpoints) = branchpoints.get(&timeline_id) {
281 0 : lsns.extend(
282 0 : this_branchpoints
283 0 : .iter()
284 0 : .map(|lsn| (*lsn, LsnKind::BranchPoint)),
285 0 : )
286 0 : }
287 :
288 : // Add a point for the PITR cutoff
289 0 : let branch_start_needed = next_pitr_cutoff <= branch_start_lsn;
290 0 : if !branch_start_needed {
291 0 : lsns.push((next_pitr_cutoff, LsnKind::GcCutOff));
292 0 : }
293 :
294 0 : lsns.sort_unstable();
295 0 : lsns.dedup();
296 0 :
297 0 : //
298 0 : // Create Segments for the interesting points.
299 0 : //
300 0 :
301 0 : // Timeline start point
302 0 : let ancestor = timeline
303 0 : .get_ancestor_timeline_id()
304 0 : .map(|ancestor_id| (ancestor_id, ancestor_lsn));
305 0 : branchstart_segments.push((timeline_id, segments.len(), ancestor));
306 0 : segments.push(SegmentMeta {
307 0 : segment: Segment {
308 0 : parent: None, // filled in later
309 0 : lsn: branch_start_lsn.0,
310 0 : size: None, // filled in later
311 0 : needed: branch_start_needed,
312 0 : },
313 0 : timeline_id: timeline.timeline_id,
314 0 : kind: LsnKind::BranchStart,
315 0 : });
316 0 :
317 0 : // GC cutoff point, and any branch points, i.e. points where
318 0 : // other timelines branch off from this timeline.
319 0 : let mut parent = segments.len() - 1;
320 0 : for (lsn, kind) in lsns {
321 0 : if kind == LsnKind::BranchPoint {
322 0 : branchpoint_segments.insert((timeline_id, lsn), segments.len());
323 0 : }
324 :
325 0 : segments.push(SegmentMeta {
326 0 : segment: Segment {
327 0 : parent: Some(parent),
328 0 : lsn: lsn.0,
329 0 : size: None,
330 0 : needed: lsn > next_pitr_cutoff,
331 0 : },
332 0 : timeline_id: timeline.timeline_id,
333 0 : kind,
334 0 : });
335 0 :
336 0 : parent = segments.len() - 1;
337 0 :
338 0 : if kind == LsnKind::LeasePoint {
339 0 : // Needs `LeaseStart` and `LeaseEnd` as well to model lease as a read-only branch that never writes data
340 0 : // (i.e. it's lsn has not advanced from ancestor_lsn), and therefore the three segments have the same LSN
341 0 : // value. Without the other two segments, the calculation code would not count the leased LSN as a point
342 0 : // to be retained.
343 0 : // Did not use `BranchStart` or `BranchEnd` so we can differentiate branches and leases during debug.
344 0 : //
345 0 : // Alt Design: rewrite the entire calculation code to be independent of timeline id. Both leases and
346 0 : // branch points can be given a synthetic id so we can unite them.
347 0 : let mut lease_parent = parent;
348 0 :
349 0 : // Start of a lease.
350 0 : segments.push(SegmentMeta {
351 0 : segment: Segment {
352 0 : parent: Some(lease_parent),
353 0 : lsn: lsn.0,
354 0 : size: None, // Filled in later, if necessary
355 0 : needed: lsn > next_pitr_cutoff, // only needed if the point is within rentention.
356 0 : },
357 0 : timeline_id: timeline.timeline_id,
358 0 : kind: LsnKind::LeaseStart,
359 0 : });
360 0 : lease_parent += 1;
361 0 :
362 0 : // End of the lease.
363 0 : segments.push(SegmentMeta {
364 0 : segment: Segment {
365 0 : parent: Some(lease_parent),
366 0 : lsn: lsn.0,
367 0 : size: None, // Filled in later, if necessary
368 0 : needed: true, // everything at the lease LSN must be readable => is needed
369 0 : },
370 0 : timeline_id: timeline.timeline_id,
371 0 : kind: LsnKind::LeaseEnd,
372 0 : });
373 0 : }
374 : }
375 :
376 : // Current end of the timeline
377 0 : segments.push(SegmentMeta {
378 0 : segment: Segment {
379 0 : parent: Some(parent),
380 0 : lsn: last_record_lsn.0,
381 0 : size: None, // Filled in later, if necessary
382 0 : needed: true,
383 0 : },
384 0 : timeline_id: timeline.timeline_id,
385 0 : kind: LsnKind::BranchEnd,
386 0 : });
387 0 :
388 0 : timeline_inputs.push(TimelineInputs {
389 0 : timeline_id: timeline.timeline_id,
390 0 : ancestor_id: timeline.get_ancestor_timeline_id(),
391 0 : ancestor_lsn,
392 0 : last_record: last_record_lsn,
393 0 : // this is not used above, because it might not have updated recently enough
394 0 : latest_gc_cutoff: *timeline.get_applied_gc_cutoff_lsn(),
395 0 : next_pitr_cutoff,
396 0 : retention_param_cutoff,
397 0 : lease_points,
398 0 : });
399 : }
400 :
401 : // We now have all segments from the timelines in 'segments'. The timelines
402 : // haven't been linked to each other yet, though. Do that.
403 0 : for (_timeline_id, seg_id, ancestor) in branchstart_segments {
404 : // Look up the branch point
405 0 : if let Some(ancestor) = ancestor {
406 0 : let parent_id = *branchpoint_segments.get(&ancestor).unwrap();
407 0 : segments[seg_id].segment.parent = Some(parent_id);
408 0 : }
409 : }
410 :
411 : // We left the 'size' field empty in all of the Segments so far.
412 : // Now find logical sizes for all of the points that might need or benefit from them.
413 0 : fill_logical_sizes(
414 0 : &timelines,
415 0 : &mut segments,
416 0 : limit,
417 0 : logical_size_cache,
418 0 : cause,
419 0 : ctx,
420 0 : )
421 0 : .await?;
422 :
423 0 : if tenant.cancel.is_cancelled() {
424 : // If we're shutting down, return an error rather than a sparse result that might include some
425 : // timelines from before we started shutting down
426 0 : return Err(CalculateSyntheticSizeError::Cancelled);
427 0 : }
428 0 :
429 0 : Ok(ModelInputs {
430 0 : segments,
431 0 : timeline_inputs,
432 0 : })
433 0 : }
434 :
435 : /// Augment 'segments' with logical sizes
436 : ///
437 : /// This will leave segments' sizes as None if the Timeline associated with the segment is deleted concurrently
438 : /// (i.e. we cannot read its logical size at a particular LSN).
439 0 : async fn fill_logical_sizes(
440 0 : timelines: &[Arc<Timeline>],
441 0 : segments: &mut [SegmentMeta],
442 0 : limit: &Arc<Semaphore>,
443 0 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
444 0 : cause: LogicalSizeCalculationCause,
445 0 : ctx: &RequestContext,
446 0 : ) -> Result<(), CalculateSyntheticSizeError> {
447 0 : let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
448 0 : timelines
449 0 : .iter()
450 0 : .map(|timeline| (timeline.timeline_id, Arc::clone(timeline))),
451 0 : );
452 0 :
453 0 : // record the used/inserted cache keys here, to remove extras not to start leaking
454 0 : // after initial run the cache should be quite stable, but live timelines will eventually
455 0 : // require new lsns to be inspected.
456 0 : let mut sizes_needed = HashMap::<(TimelineId, Lsn), Option<u64>>::new();
457 0 :
458 0 : // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
459 0 : // our advantage with `?` error handling.
460 0 : let mut joinset = tokio::task::JoinSet::new();
461 :
462 : // For each point that would benefit from having a logical size available,
463 : // spawn a Task to fetch it, unless we have it cached already.
464 0 : for seg in segments.iter() {
465 0 : if !seg.size_needed() {
466 0 : continue;
467 0 : }
468 0 :
469 0 : let timeline_id = seg.timeline_id;
470 0 : let lsn = Lsn(seg.segment.lsn);
471 :
472 0 : if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) {
473 0 : let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
474 0 : if cached_size.is_none() {
475 0 : let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
476 0 : let parallel_size_calcs = Arc::clone(limit);
477 0 : let ctx = ctx.attached_child().with_scope_timeline(&timeline);
478 0 : joinset.spawn(
479 0 : calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx)
480 0 : .in_current_span(),
481 0 : );
482 0 : }
483 0 : e.insert(cached_size);
484 0 : }
485 : }
486 :
487 : // Perform the size lookups
488 0 : let mut have_any_error = None;
489 0 : while let Some(res) = joinset.join_next().await {
490 : // each of these come with Result<anyhow::Result<_>, JoinError>
491 : // because of spawn + spawn_blocking
492 0 : match res {
493 0 : Err(join_error) if join_error.is_cancelled() => {
494 0 : unreachable!("we are not cancelling any of the futures, nor should be");
495 : }
496 0 : Err(join_error) => {
497 0 : // cannot really do anything, as this panic is likely a bug
498 0 : error!(
499 0 : "task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}"
500 : );
501 :
502 0 : have_any_error = Some(CalculateSyntheticSizeError::Fatal(
503 0 : anyhow::anyhow!(join_error)
504 0 : .context("task that calls spawn_ondemand_logical_size_calculation"),
505 0 : ));
506 : }
507 0 : Ok(Err(recv_result_error)) => {
508 0 : // cannot really do anything, as this panic is likely a bug
509 0 : error!("failed to receive logical size query result: {recv_result_error:#}");
510 0 : have_any_error = Some(CalculateSyntheticSizeError::Fatal(
511 0 : anyhow::anyhow!(recv_result_error)
512 0 : .context("Receiving logical size query result"),
513 0 : ));
514 : }
515 0 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
516 0 : if matches!(error, CalculateLogicalSizeError::Cancelled) {
517 : // Skip this: it's okay if one timeline among many is shutting down while we
518 : // calculate inputs for the overall tenant.
519 0 : continue;
520 : } else {
521 0 : warn!(
522 0 : timeline_id=%timeline.timeline_id,
523 0 : "failed to calculate logical size at {lsn}: {error:#}"
524 : );
525 0 : have_any_error = Some(CalculateSyntheticSizeError::LogicalSize {
526 0 : timeline_id: timeline.timeline_id,
527 0 : lsn,
528 0 : error,
529 0 : });
530 : }
531 : }
532 0 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
533 0 : debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
534 :
535 0 : logical_size_cache.insert((timeline.timeline_id, lsn), size);
536 0 : sizes_needed.insert((timeline.timeline_id, lsn), Some(size));
537 : }
538 : }
539 : }
540 :
541 : // prune any keys not needed anymore; we record every used key and added key.
542 0 : logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
543 :
544 0 : if let Some(error) = have_any_error {
545 : // we cannot complete this round, because we are missing data.
546 : // we have however cached all we were able to request calculation on.
547 0 : return Err(error);
548 0 : }
549 :
550 : // Insert the looked up sizes to the Segments
551 0 : for seg in segments.iter_mut() {
552 0 : if !seg.size_needed() {
553 0 : continue;
554 0 : }
555 0 :
556 0 : let timeline_id = seg.timeline_id;
557 0 : let lsn = Lsn(seg.segment.lsn);
558 :
559 0 : if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
560 0 : seg.segment.size = Some(*size);
561 0 : }
562 : }
563 0 : Ok(())
564 0 : }
565 :
566 : impl ModelInputs {
567 8 : pub fn calculate_model(&self) -> tenant_size_model::StorageModel {
568 8 : // Convert SegmentMetas into plain Segments
569 8 : StorageModel {
570 8 : segments: self
571 8 : .segments
572 8 : .iter()
573 56 : .map(|seg| seg.segment.clone())
574 8 : .collect(),
575 8 : }
576 8 : }
577 :
578 : // calculate total project size
579 4 : pub fn calculate(&self) -> u64 {
580 4 : let storage = self.calculate_model();
581 4 : let sizes = storage.calculate();
582 4 : sizes.total_size
583 4 : }
584 : }
585 :
586 : /// Newtype around the tuple that carries the timeline at lsn logical size calculation.
587 : struct TimelineAtLsnSizeResult(
588 : Arc<crate::tenant::Timeline>,
589 : utils::lsn::Lsn,
590 : Result<u64, CalculateLogicalSizeError>,
591 : );
592 :
593 : #[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
594 : async fn calculate_logical_size(
595 : limit: Arc<tokio::sync::Semaphore>,
596 : timeline: Arc<crate::tenant::Timeline>,
597 : lsn: utils::lsn::Lsn,
598 : cause: LogicalSizeCalculationCause,
599 : ctx: RequestContext,
600 : ) -> Result<TimelineAtLsnSizeResult, RecvError> {
601 : let _permit = tokio::sync::Semaphore::acquire_owned(limit)
602 : .await
603 : .expect("global semaphore should not had been closed");
604 :
605 : let size_res = timeline
606 : .spawn_ondemand_logical_size_calculation(lsn, cause, ctx)
607 : .instrument(info_span!("spawn_ondemand_logical_size_calculation"))
608 : .await?;
609 : Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
610 : }
611 :
612 : #[test]
613 4 : fn verify_size_for_multiple_branches() {
614 4 : // this is generated from integration test test_tenant_size_with_multiple_branches, but this way
615 4 : // it has the stable lsn's
616 4 : //
617 4 : // The timeline_inputs don't participate in the size calculation, and are here just to explain
618 4 : // the inputs.
619 4 : let doc = r#"
620 4 : {
621 4 : "segments": [
622 4 : {
623 4 : "segment": {
624 4 : "parent": 9,
625 4 : "lsn": 26033560,
626 4 : "size": null,
627 4 : "needed": false
628 4 : },
629 4 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
630 4 : "kind": "BranchStart"
631 4 : },
632 4 : {
633 4 : "segment": {
634 4 : "parent": 0,
635 4 : "lsn": 35720400,
636 4 : "size": 25206784,
637 4 : "needed": false
638 4 : },
639 4 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
640 4 : "kind": "GcCutOff"
641 4 : },
642 4 : {
643 4 : "segment": {
644 4 : "parent": 1,
645 4 : "lsn": 35851472,
646 4 : "size": null,
647 4 : "needed": true
648 4 : },
649 4 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
650 4 : "kind": "BranchEnd"
651 4 : },
652 4 : {
653 4 : "segment": {
654 4 : "parent": 7,
655 4 : "lsn": 24566168,
656 4 : "size": null,
657 4 : "needed": false
658 4 : },
659 4 : "timeline_id": "454626700469f0a9914949b9d018e876",
660 4 : "kind": "BranchStart"
661 4 : },
662 4 : {
663 4 : "segment": {
664 4 : "parent": 3,
665 4 : "lsn": 25261936,
666 4 : "size": 26050560,
667 4 : "needed": false
668 4 : },
669 4 : "timeline_id": "454626700469f0a9914949b9d018e876",
670 4 : "kind": "GcCutOff"
671 4 : },
672 4 : {
673 4 : "segment": {
674 4 : "parent": 4,
675 4 : "lsn": 25393008,
676 4 : "size": null,
677 4 : "needed": true
678 4 : },
679 4 : "timeline_id": "454626700469f0a9914949b9d018e876",
680 4 : "kind": "BranchEnd"
681 4 : },
682 4 : {
683 4 : "segment": {
684 4 : "parent": null,
685 4 : "lsn": 23694408,
686 4 : "size": null,
687 4 : "needed": false
688 4 : },
689 4 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
690 4 : "kind": "BranchStart"
691 4 : },
692 4 : {
693 4 : "segment": {
694 4 : "parent": 6,
695 4 : "lsn": 24566168,
696 4 : "size": 25739264,
697 4 : "needed": false
698 4 : },
699 4 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
700 4 : "kind": "BranchPoint"
701 4 : },
702 4 : {
703 4 : "segment": {
704 4 : "parent": 7,
705 4 : "lsn": 25902488,
706 4 : "size": 26402816,
707 4 : "needed": false
708 4 : },
709 4 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
710 4 : "kind": "GcCutOff"
711 4 : },
712 4 : {
713 4 : "segment": {
714 4 : "parent": 8,
715 4 : "lsn": 26033560,
716 4 : "size": 26468352,
717 4 : "needed": true
718 4 : },
719 4 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
720 4 : "kind": "BranchPoint"
721 4 : },
722 4 : {
723 4 : "segment": {
724 4 : "parent": 9,
725 4 : "lsn": 26033560,
726 4 : "size": null,
727 4 : "needed": true
728 4 : },
729 4 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
730 4 : "kind": "BranchEnd"
731 4 : }
732 4 : ],
733 4 : "timeline_inputs": [
734 4 : {
735 4 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
736 4 : "ancestor_lsn": "0/18D3D98",
737 4 : "last_record": "0/2230CD0",
738 4 : "latest_gc_cutoff": "0/1698C48",
739 4 : "next_pitr_cutoff": "0/2210CD0",
740 4 : "retention_param_cutoff": null,
741 4 : "lease_points": []
742 4 : },
743 4 : {
744 4 : "timeline_id": "454626700469f0a9914949b9d018e876",
745 4 : "ancestor_lsn": "0/176D998",
746 4 : "last_record": "0/1837770",
747 4 : "latest_gc_cutoff": "0/1698C48",
748 4 : "next_pitr_cutoff": "0/1817770",
749 4 : "retention_param_cutoff": null,
750 4 : "lease_points": []
751 4 : },
752 4 : {
753 4 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
754 4 : "ancestor_lsn": "0/0",
755 4 : "last_record": "0/18D3D98",
756 4 : "latest_gc_cutoff": "0/1698C48",
757 4 : "next_pitr_cutoff": "0/18B3D98",
758 4 : "retention_param_cutoff": null,
759 4 : "lease_points": []
760 4 : }
761 4 : ]
762 4 : }
763 4 : "#;
764 4 : let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
765 4 :
766 4 : assert_eq!(inputs.calculate(), 37_851_408);
767 4 : }
768 :
769 : #[test]
770 4 : fn verify_size_for_one_branch() {
771 4 : let doc = r#"
772 4 : {
773 4 : "segments": [
774 4 : {
775 4 : "segment": {
776 4 : "parent": null,
777 4 : "lsn": 0,
778 4 : "size": null,
779 4 : "needed": false
780 4 : },
781 4 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
782 4 : "kind": "BranchStart"
783 4 : },
784 4 : {
785 4 : "segment": {
786 4 : "parent": 0,
787 4 : "lsn": 305547335776,
788 4 : "size": 220054675456,
789 4 : "needed": false
790 4 : },
791 4 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
792 4 : "kind": "GcCutOff"
793 4 : },
794 4 : {
795 4 : "segment": {
796 4 : "parent": 1,
797 4 : "lsn": 305614444640,
798 4 : "size": null,
799 4 : "needed": true
800 4 : },
801 4 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
802 4 : "kind": "BranchEnd"
803 4 : }
804 4 : ],
805 4 : "timeline_inputs": [
806 4 : {
807 4 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
808 4 : "ancestor_lsn": "0/0",
809 4 : "last_record": "47/280A5860",
810 4 : "latest_gc_cutoff": "47/240A5860",
811 4 : "next_pitr_cutoff": "47/240A5860",
812 4 : "retention_param_cutoff": "0/0",
813 4 : "lease_points": []
814 4 : }
815 4 : ]
816 4 : }"#;
817 4 :
818 4 : let model: ModelInputs = serde_json::from_str(doc).unwrap();
819 4 :
820 4 : let res = model.calculate_model().calculate();
821 4 :
822 4 : println!("calculated synthetic size: {}", res.total_size);
823 4 : println!("result: {:?}", serde_json::to_string(&res.segments));
824 :
825 : use utils::lsn::Lsn;
826 4 : let latest_gc_cutoff_lsn: Lsn = "47/240A5860".parse().unwrap();
827 4 : let last_lsn: Lsn = "47/280A5860".parse().unwrap();
828 4 : println!(
829 4 : "latest_gc_cutoff lsn 47/240A5860 is {}, last_lsn lsn 47/280A5860 is {}",
830 4 : u64::from(latest_gc_cutoff_lsn),
831 4 : u64::from(last_lsn)
832 4 : );
833 4 : assert_eq!(res.total_size, 220121784320);
834 4 : }
|