Line data Source code
1 : use std::cmp;
2 : use std::collections::hash_map::Entry;
3 : use std::collections::{HashMap, HashSet};
4 : use std::sync::Arc;
5 :
6 : use anyhow::{bail, Context};
7 : use tokio::sync::oneshot::error::RecvError;
8 : use tokio::sync::Semaphore;
9 : use tokio_util::sync::CancellationToken;
10 :
11 : use crate::context::RequestContext;
12 : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
13 :
14 : use super::{LogicalSizeCalculationCause, Tenant};
15 : use crate::tenant::Timeline;
16 : use utils::id::TimelineId;
17 : use utils::lsn::Lsn;
18 :
19 : use tracing::*;
20 :
21 : use tenant_size_model::{Segment, StorageModel};
22 :
23 : /// Inputs to the actual tenant sizing model
24 : ///
25 : /// Implements [`serde::Serialize`] but is not meant to be part of the public API, instead meant to
26 : /// be a transferrable format between execution environments and developer.
27 : ///
28 : /// This tracks more information than the actual StorageModel that calculation
29 : /// needs. We will convert this into a StorageModel when it's time to perform
30 : /// the calculation.
31 : ///
32 : #[serde_with::serde_as]
33 53 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
34 : pub struct ModelInputs {
35 : pub segments: Vec<SegmentMeta>,
36 : pub timeline_inputs: Vec<TimelineInputs>,
37 : }
38 :
39 : /// A [`Segment`], with some extra information for display purposes
40 : #[serde_with::serde_as]
41 257 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
42 : pub struct SegmentMeta {
43 : pub segment: Segment,
44 : #[serde_as(as = "serde_with::DisplayFromStr")]
45 : pub timeline_id: TimelineId,
46 : pub kind: LsnKind,
47 : }
48 :
49 : impl SegmentMeta {
50 594 : fn size_needed(&self) -> bool {
51 594 : match self.kind {
52 : LsnKind::BranchStart => {
53 : // If we don't have a later GcCutoff point on this branch, and
54 : // no ancestor, calculate size for the branch start point.
55 228 : self.segment.needed && self.segment.parent.is_none()
56 : }
57 82 : LsnKind::BranchPoint => true,
58 56 : LsnKind::GcCutOff => true,
59 228 : LsnKind::BranchEnd => false,
60 : }
61 594 : }
62 : }
63 :
64 : #[derive(
65 257 : Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, serde::Serialize, serde::Deserialize,
66 : )]
67 : pub enum LsnKind {
68 : /// A timeline starting here
69 : BranchStart,
70 : /// A child timeline branches off from here
71 : BranchPoint,
72 : /// GC cutoff point
73 : GcCutOff,
74 : /// Last record LSN
75 : BranchEnd,
76 : }
77 :
78 : /// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
79 : /// part of [`ModelInputs`] from the HTTP api, explaining the inputs.
80 : #[serde_with::serde_as]
81 100 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
82 : pub struct TimelineInputs {
83 : #[serde_as(as = "serde_with::DisplayFromStr")]
84 : pub timeline_id: TimelineId,
85 :
86 : #[serde_as(as = "Option<serde_with::DisplayFromStr>")]
87 : pub ancestor_id: Option<TimelineId>,
88 :
89 : #[serde_as(as = "serde_with::DisplayFromStr")]
90 : ancestor_lsn: Lsn,
91 : #[serde_as(as = "serde_with::DisplayFromStr")]
92 : last_record: Lsn,
93 : #[serde_as(as = "serde_with::DisplayFromStr")]
94 : latest_gc_cutoff: Lsn,
95 : #[serde_as(as = "serde_with::DisplayFromStr")]
96 : horizon_cutoff: Lsn,
97 : #[serde_as(as = "serde_with::DisplayFromStr")]
98 : pitr_cutoff: Lsn,
99 :
100 : /// Cutoff point based on GC settings
101 : #[serde_as(as = "serde_with::DisplayFromStr")]
102 : next_gc_cutoff: Lsn,
103 :
104 : /// Cutoff point calculated from the user-supplied 'max_retention_period'
105 : #[serde_as(as = "Option<serde_with::DisplayFromStr>")]
106 : retention_param_cutoff: Option<Lsn>,
107 : }
108 :
109 : /// Gathers the inputs for the tenant sizing model.
110 : ///
111 : /// Tenant size does not consider the latest state, but only the state until next_gc_cutoff, which
112 : /// is updated on-demand, during the start of this calculation and separate from the
113 : /// [`TimelineInputs::latest_gc_cutoff`].
114 : ///
115 : /// For timelines in general:
116 : ///
117 : /// ```text
118 : /// 0-----|---------|----|------------| · · · · · |·> lsn
119 : /// initdb_lsn branchpoints* next_gc_cutoff latest
120 : /// ```
121 : ///
122 : /// Until gc_horizon_cutoff > `Timeline::last_record_lsn` for any of the tenant's timelines, the
123 : /// tenant size will be zero.
124 73 : pub(super) async fn gather_inputs(
125 73 : tenant: &Tenant,
126 73 : limit: &Arc<Semaphore>,
127 73 : max_retention_period: Option<u64>,
128 73 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
129 73 : cause: LogicalSizeCalculationCause,
130 73 : ctx: &RequestContext,
131 73 : ) -> anyhow::Result<ModelInputs> {
132 : // refresh is needed to update gc related pitr_cutoff and horizon_cutoff
133 73 : tenant
134 73 : .refresh_gc_info(ctx)
135 12 : .await
136 73 : .context("Failed to refresh gc_info before gathering inputs")?;
137 :
138 : // Collect information about all the timelines
139 73 : let mut timelines = tenant.list_timelines();
140 73 :
141 73 : if timelines.is_empty() {
142 : // perhaps the tenant has just been created, and as such doesn't have any data yet
143 0 : return Ok(ModelInputs {
144 0 : segments: vec![],
145 0 : timeline_inputs: Vec::new(),
146 0 : });
147 73 : }
148 73 :
149 73 : // Filter out timelines that are not active
150 73 : //
151 73 : // There may be a race when a timeline is dropped,
152 73 : // but it is unlikely to cause any issues. In the worst case,
153 73 : // the calculation will error out.
154 114 : timelines.retain(|t| t.is_active());
155 73 :
156 73 : // Build a map of branch points.
157 73 : let mut branchpoints: HashMap<TimelineId, HashSet<Lsn>> = HashMap::new();
158 114 : for timeline in timelines.iter() {
159 114 : if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
160 41 : branchpoints
161 41 : .entry(ancestor_id)
162 41 : .or_default()
163 41 : .insert(timeline.get_ancestor_lsn());
164 73 : }
165 : }
166 :
167 : // These become the final result.
168 73 : let mut timeline_inputs = Vec::with_capacity(timelines.len());
169 73 : let mut segments: Vec<SegmentMeta> = Vec::new();
170 73 :
171 73 : //
172 73 : // Build Segments representing each timeline. As we do that, also remember
173 73 : // the branchpoints and branch startpoints in 'branchpoint_segments' and
174 73 : // 'branchstart_segments'
175 73 : //
176 73 :
177 73 : // BranchPoint segments of each timeline
178 73 : // (timeline, branchpoint LSN) -> segment_id
179 73 : let mut branchpoint_segments: HashMap<(TimelineId, Lsn), usize> = HashMap::new();
180 73 :
181 73 : // timeline, Branchpoint seg id, (ancestor, ancestor LSN)
182 73 : type BranchStartSegment = (TimelineId, usize, Option<(TimelineId, Lsn)>);
183 73 : let mut branchstart_segments: Vec<BranchStartSegment> = Vec::new();
184 :
185 114 : for timeline in timelines.iter() {
186 114 : let timeline_id = timeline.timeline_id;
187 114 : let last_record_lsn = timeline.get_last_record_lsn();
188 114 : let ancestor_lsn = timeline.get_ancestor_lsn();
189 114 :
190 114 : // there's a race between the update (holding tenant.gc_lock) and this read but it
191 114 : // might not be an issue, because it's not for Timeline::gc
192 114 : let gc_info = timeline.gc_info.read().unwrap();
193 114 :
194 114 : // similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a
195 114 : // new gc run, which we have no control over. however differently from `Timeline::gc`
196 114 : // we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not
197 114 : // actually removing files.
198 114 : let mut next_gc_cutoff = cmp::min(gc_info.horizon_cutoff, gc_info.pitr_cutoff);
199 :
200 : // If the caller provided a shorter retention period, use that instead of the GC cutoff.
201 114 : let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {
202 0 : let param_cutoff = Lsn(last_record_lsn.0.saturating_sub(max_retention_period));
203 0 : if next_gc_cutoff < param_cutoff {
204 0 : next_gc_cutoff = param_cutoff;
205 0 : }
206 0 : Some(param_cutoff)
207 : } else {
208 114 : None
209 : };
210 :
211 : // next_gc_cutoff in parent branch are not of interest (right now at least), nor do we
212 : // want to query any logical size before initdb_lsn.
213 114 : let branch_start_lsn = cmp::max(ancestor_lsn, timeline.initdb_lsn);
214 114 :
215 114 : // Build "interesting LSNs" on this timeline
216 114 : let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
217 114 : .retain_lsns
218 114 : .iter()
219 114 : .filter(|&&lsn| lsn > ancestor_lsn)
220 114 : .copied()
221 114 : // this assumes there are no other retain_lsns than the branchpoints
222 114 : .map(|lsn| (lsn, LsnKind::BranchPoint))
223 114 : .collect::<Vec<_>>();
224 :
225 : // Add branch points we collected earlier, just in case there were any that were
226 : // not present in retain_lsns. We will remove any duplicates below later.
227 114 : if let Some(this_branchpoints) = branchpoints.get(&timeline_id) {
228 36 : lsns.extend(
229 36 : this_branchpoints
230 36 : .iter()
231 41 : .map(|lsn| (*lsn, LsnKind::BranchPoint)),
232 36 : )
233 78 : }
234 :
235 : // Add a point for the GC cutoff
236 114 : let branch_start_needed = next_gc_cutoff <= branch_start_lsn;
237 114 : if !branch_start_needed {
238 28 : lsns.push((next_gc_cutoff, LsnKind::GcCutOff));
239 86 : }
240 :
241 114 : lsns.sort_unstable();
242 114 : lsns.dedup();
243 114 :
244 114 : //
245 114 : // Create Segments for the interesting points.
246 114 : //
247 114 :
248 114 : // Timeline start point
249 114 : let ancestor = timeline
250 114 : .get_ancestor_timeline_id()
251 114 : .map(|ancestor_id| (ancestor_id, ancestor_lsn));
252 114 : branchstart_segments.push((timeline_id, segments.len(), ancestor));
253 114 : segments.push(SegmentMeta {
254 114 : segment: Segment {
255 114 : parent: None, // filled in later
256 114 : lsn: branch_start_lsn.0,
257 114 : size: None, // filled in later
258 114 : needed: branch_start_needed,
259 114 : },
260 114 : timeline_id: timeline.timeline_id,
261 114 : kind: LsnKind::BranchStart,
262 114 : });
263 114 :
264 114 : // GC cutoff point, and any branch points, i.e. points where
265 114 : // other timelines branch off from this timeline.
266 114 : let mut parent = segments.len() - 1;
267 183 : for (lsn, kind) in lsns {
268 69 : if kind == LsnKind::BranchPoint {
269 41 : branchpoint_segments.insert((timeline_id, lsn), segments.len());
270 41 : }
271 69 : segments.push(SegmentMeta {
272 69 : segment: Segment {
273 69 : parent: Some(parent),
274 69 : lsn: lsn.0,
275 69 : size: None,
276 69 : needed: lsn > next_gc_cutoff,
277 69 : },
278 69 : timeline_id: timeline.timeline_id,
279 69 : kind,
280 69 : });
281 69 : parent += 1;
282 : }
283 :
284 : // Current end of the timeline
285 114 : segments.push(SegmentMeta {
286 114 : segment: Segment {
287 114 : parent: Some(parent),
288 114 : lsn: last_record_lsn.0,
289 114 : size: None, // Filled in later, if necessary
290 114 : needed: true,
291 114 : },
292 114 : timeline_id: timeline.timeline_id,
293 114 : kind: LsnKind::BranchEnd,
294 114 : });
295 114 :
296 114 : timeline_inputs.push(TimelineInputs {
297 114 : timeline_id: timeline.timeline_id,
298 114 : ancestor_id: timeline.get_ancestor_timeline_id(),
299 114 : ancestor_lsn,
300 114 : last_record: last_record_lsn,
301 114 : // this is not used above, because it might not have updated recently enough
302 114 : latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(),
303 114 : horizon_cutoff: gc_info.horizon_cutoff,
304 114 : pitr_cutoff: gc_info.pitr_cutoff,
305 114 : next_gc_cutoff,
306 114 : retention_param_cutoff,
307 114 : });
308 : }
309 :
310 : // We now have all segments from the timelines in 'segments'. The timelines
311 : // haven't been linked to each other yet, though. Do that.
312 187 : for (_timeline_id, seg_id, ancestor) in branchstart_segments {
313 : // Look up the branch point
314 114 : if let Some(ancestor) = ancestor {
315 41 : let parent_id = *branchpoint_segments.get(&ancestor).unwrap();
316 41 : segments[seg_id].segment.parent = Some(parent_id);
317 73 : }
318 : }
319 :
320 : // We left the 'size' field empty in all of the Segments so far.
321 : // Now find logical sizes for all of the points that might need or benefit from them.
322 73 : fill_logical_sizes(
323 73 : &timelines,
324 73 : &mut segments,
325 73 : limit,
326 73 : logical_size_cache,
327 73 : cause,
328 73 : ctx,
329 73 : )
330 32 : .await?;
331 :
332 73 : Ok(ModelInputs {
333 73 : segments,
334 73 : timeline_inputs,
335 73 : })
336 73 : }
337 :
338 : /// Augment 'segments' with logical sizes
339 : ///
340 : /// this will probably conflict with on-demand downloaded layers, or at least force them all
341 : /// to be downloaded
342 : ///
343 73 : async fn fill_logical_sizes(
344 73 : timelines: &[Arc<Timeline>],
345 73 : segments: &mut [SegmentMeta],
346 73 : limit: &Arc<Semaphore>,
347 73 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
348 73 : cause: LogicalSizeCalculationCause,
349 73 : ctx: &RequestContext,
350 73 : ) -> anyhow::Result<()> {
351 73 : let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
352 73 : timelines
353 73 : .iter()
354 114 : .map(|timeline| (timeline.timeline_id, Arc::clone(timeline))),
355 73 : );
356 73 :
357 73 : // record the used/inserted cache keys here, to remove extras not to start leaking
358 73 : // after initial run the cache should be quite stable, but live timelines will eventually
359 73 : // require new lsns to be inspected.
360 73 : let mut sizes_needed = HashMap::<(TimelineId, Lsn), Option<u64>>::new();
361 73 :
362 73 : // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
363 73 : // our advantage with `?` error handling.
364 73 : let mut joinset = tokio::task::JoinSet::new();
365 73 :
366 73 : let cancel = tokio_util::sync::CancellationToken::new();
367 73 : // be sure to cancel all spawned tasks if we are dropped
368 73 : let _dg = cancel.clone().drop_guard();
369 :
370 : // For each point that would benefit from having a logical size available,
371 : // spawn a Task to fetch it, unless we have it cached already.
372 297 : for seg in segments.iter() {
373 297 : if !seg.size_needed() {
374 171 : continue;
375 126 : }
376 126 :
377 126 : let timeline_id = seg.timeline_id;
378 126 : let lsn = Lsn(seg.segment.lsn);
379 :
380 126 : if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) {
381 126 : let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
382 126 : if cached_size.is_none() {
383 32 : let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
384 32 : let parallel_size_calcs = Arc::clone(limit);
385 32 : let ctx = ctx.attached_child();
386 32 : joinset.spawn(
387 32 : calculate_logical_size(
388 32 : parallel_size_calcs,
389 32 : timeline,
390 32 : lsn,
391 32 : cause,
392 32 : ctx,
393 32 : cancel.child_token(),
394 32 : )
395 32 : .in_current_span(),
396 32 : );
397 94 : }
398 126 : e.insert(cached_size);
399 0 : }
400 : }
401 :
402 : // Perform the size lookups
403 73 : let mut have_any_error = false;
404 105 : while let Some(res) = joinset.join_next().await {
405 : // each of these come with Result<anyhow::Result<_>, JoinError>
406 : // because of spawn + spawn_blocking
407 32 : match res {
408 0 : Err(join_error) if join_error.is_cancelled() => {
409 0 : unreachable!("we are not cancelling any of the futures, nor should be");
410 : }
411 0 : Err(join_error) => {
412 : // cannot really do anything, as this panic is likely a bug
413 0 : error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
414 0 : have_any_error = true;
415 : }
416 0 : Ok(Err(recv_result_error)) => {
417 : // cannot really do anything, as this panic is likely a bug
418 0 : error!("failed to receive logical size query result: {recv_result_error:#}");
419 0 : have_any_error = true;
420 : }
421 0 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
422 0 : warn!(
423 0 : timeline_id=%timeline.timeline_id,
424 0 : "failed to calculate logical size at {lsn}: {error:#}"
425 0 : );
426 0 : have_any_error = true;
427 : }
428 32 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
429 0 : debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
430 :
431 32 : logical_size_cache.insert((timeline.timeline_id, lsn), size);
432 32 : sizes_needed.insert((timeline.timeline_id, lsn), Some(size));
433 : }
434 : }
435 : }
436 :
437 : // prune any keys not needed anymore; we record every used key and added key.
438 134 : logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
439 73 :
440 73 : if have_any_error {
441 : // we cannot complete this round, because we are missing data.
442 : // we have however cached all we were able to request calculation on.
443 0 : anyhow::bail!("failed to calculate some logical_sizes");
444 73 : }
445 :
446 : // Insert the looked up sizes to the Segments
447 297 : for seg in segments.iter_mut() {
448 297 : if !seg.size_needed() {
449 171 : continue;
450 126 : }
451 126 :
452 126 : let timeline_id = seg.timeline_id;
453 126 : let lsn = Lsn(seg.segment.lsn);
454 :
455 126 : if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
456 126 : seg.segment.size = Some(*size);
457 126 : } else {
458 0 : bail!("could not find size at {} in timeline {}", lsn, timeline_id);
459 : }
460 : }
461 73 : Ok(())
462 73 : }
463 :
464 : impl ModelInputs {
465 69 : pub fn calculate_model(&self) -> anyhow::Result<tenant_size_model::StorageModel> {
466 69 : // Convert SegmentMetas into plain Segments
467 69 : let storage = StorageModel {
468 69 : segments: self
469 69 : .segments
470 69 : .iter()
471 299 : .map(|seg| seg.segment.clone())
472 69 : .collect(),
473 69 : };
474 69 :
475 69 : Ok(storage)
476 69 : }
477 :
478 : // calculate total project size
479 15 : pub fn calculate(&self) -> anyhow::Result<u64> {
480 15 : let storage = self.calculate_model()?;
481 15 : let sizes = storage.calculate();
482 15 :
483 15 : Ok(sizes.total_size)
484 15 : }
485 : }
486 :
487 : /// Newtype around the tuple that carries the timeline at lsn logical size calculation.
488 : struct TimelineAtLsnSizeResult(
489 : Arc<crate::tenant::Timeline>,
490 : utils::lsn::Lsn,
491 : Result<u64, CalculateLogicalSizeError>,
492 : );
493 :
494 64 : #[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
495 : async fn calculate_logical_size(
496 : limit: Arc<tokio::sync::Semaphore>,
497 : timeline: Arc<crate::tenant::Timeline>,
498 : lsn: utils::lsn::Lsn,
499 : cause: LogicalSizeCalculationCause,
500 : ctx: RequestContext,
501 : cancel: CancellationToken,
502 : ) -> Result<TimelineAtLsnSizeResult, RecvError> {
503 : let _permit = tokio::sync::Semaphore::acquire_owned(limit)
504 : .await
505 : .expect("global semaphore should not had been closed");
506 :
507 : let size_res = timeline
508 : .spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel)
509 : .instrument(info_span!("spawn_ondemand_logical_size_calculation"))
510 : .await?;
511 : Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
512 : }
513 :
514 1 : #[test]
515 1 : fn verify_size_for_multiple_branches() {
516 1 : // this is generated from integration test test_tenant_size_with_multiple_branches, but this way
517 1 : // it has the stable lsn's
518 1 : //
519 1 : // The timeline_inputs don't participate in the size calculation, and are here just to explain
520 1 : // the inputs.
521 1 : let doc = r#"
522 1 : {
523 1 : "segments": [
524 1 : {
525 1 : "segment": {
526 1 : "parent": 9,
527 1 : "lsn": 26033560,
528 1 : "size": null,
529 1 : "needed": false
530 1 : },
531 1 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
532 1 : "kind": "BranchStart"
533 1 : },
534 1 : {
535 1 : "segment": {
536 1 : "parent": 0,
537 1 : "lsn": 35720400,
538 1 : "size": 25206784,
539 1 : "needed": false
540 1 : },
541 1 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
542 1 : "kind": "GcCutOff"
543 1 : },
544 1 : {
545 1 : "segment": {
546 1 : "parent": 1,
547 1 : "lsn": 35851472,
548 1 : "size": null,
549 1 : "needed": true
550 1 : },
551 1 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
552 1 : "kind": "BranchEnd"
553 1 : },
554 1 : {
555 1 : "segment": {
556 1 : "parent": 7,
557 1 : "lsn": 24566168,
558 1 : "size": null,
559 1 : "needed": false
560 1 : },
561 1 : "timeline_id": "454626700469f0a9914949b9d018e876",
562 1 : "kind": "BranchStart"
563 1 : },
564 1 : {
565 1 : "segment": {
566 1 : "parent": 3,
567 1 : "lsn": 25261936,
568 1 : "size": 26050560,
569 1 : "needed": false
570 1 : },
571 1 : "timeline_id": "454626700469f0a9914949b9d018e876",
572 1 : "kind": "GcCutOff"
573 1 : },
574 1 : {
575 1 : "segment": {
576 1 : "parent": 4,
577 1 : "lsn": 25393008,
578 1 : "size": null,
579 1 : "needed": true
580 1 : },
581 1 : "timeline_id": "454626700469f0a9914949b9d018e876",
582 1 : "kind": "BranchEnd"
583 1 : },
584 1 : {
585 1 : "segment": {
586 1 : "parent": null,
587 1 : "lsn": 23694408,
588 1 : "size": null,
589 1 : "needed": false
590 1 : },
591 1 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
592 1 : "kind": "BranchStart"
593 1 : },
594 1 : {
595 1 : "segment": {
596 1 : "parent": 6,
597 1 : "lsn": 24566168,
598 1 : "size": 25739264,
599 1 : "needed": false
600 1 : },
601 1 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
602 1 : "kind": "BranchPoint"
603 1 : },
604 1 : {
605 1 : "segment": {
606 1 : "parent": 7,
607 1 : "lsn": 25902488,
608 1 : "size": 26402816,
609 1 : "needed": false
610 1 : },
611 1 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
612 1 : "kind": "GcCutOff"
613 1 : },
614 1 : {
615 1 : "segment": {
616 1 : "parent": 8,
617 1 : "lsn": 26033560,
618 1 : "size": 26468352,
619 1 : "needed": true
620 1 : },
621 1 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
622 1 : "kind": "BranchPoint"
623 1 : },
624 1 : {
625 1 : "segment": {
626 1 : "parent": 9,
627 1 : "lsn": 26033560,
628 1 : "size": null,
629 1 : "needed": true
630 1 : },
631 1 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
632 1 : "kind": "BranchEnd"
633 1 : }
634 1 : ],
635 1 : "timeline_inputs": [
636 1 : {
637 1 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
638 1 : "ancestor_lsn": "0/18D3D98",
639 1 : "last_record": "0/2230CD0",
640 1 : "latest_gc_cutoff": "0/1698C48",
641 1 : "horizon_cutoff": "0/2210CD0",
642 1 : "pitr_cutoff": "0/2210CD0",
643 1 : "next_gc_cutoff": "0/2210CD0",
644 1 : "retention_param_cutoff": null
645 1 : },
646 1 : {
647 1 : "timeline_id": "454626700469f0a9914949b9d018e876",
648 1 : "ancestor_lsn": "0/176D998",
649 1 : "last_record": "0/1837770",
650 1 : "latest_gc_cutoff": "0/1698C48",
651 1 : "horizon_cutoff": "0/1817770",
652 1 : "pitr_cutoff": "0/1817770",
653 1 : "next_gc_cutoff": "0/1817770",
654 1 : "retention_param_cutoff": null
655 1 : },
656 1 : {
657 1 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
658 1 : "ancestor_lsn": "0/0",
659 1 : "last_record": "0/18D3D98",
660 1 : "latest_gc_cutoff": "0/1698C48",
661 1 : "horizon_cutoff": "0/18B3D98",
662 1 : "pitr_cutoff": "0/18B3D98",
663 1 : "next_gc_cutoff": "0/18B3D98",
664 1 : "retention_param_cutoff": null
665 1 : }
666 1 : ]
667 1 : }
668 1 : "#;
669 1 : let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
670 1 :
671 1 : assert_eq!(inputs.calculate().unwrap(), 37_851_408);
672 1 : }
673 :
674 1 : #[test]
675 1 : fn verify_size_for_one_branch() {
676 1 : let doc = r#"
677 1 : {
678 1 : "segments": [
679 1 : {
680 1 : "segment": {
681 1 : "parent": null,
682 1 : "lsn": 0,
683 1 : "size": null,
684 1 : "needed": false
685 1 : },
686 1 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
687 1 : "kind": "BranchStart"
688 1 : },
689 1 : {
690 1 : "segment": {
691 1 : "parent": 0,
692 1 : "lsn": 305547335776,
693 1 : "size": 220054675456,
694 1 : "needed": false
695 1 : },
696 1 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
697 1 : "kind": "GcCutOff"
698 1 : },
699 1 : {
700 1 : "segment": {
701 1 : "parent": 1,
702 1 : "lsn": 305614444640,
703 1 : "size": null,
704 1 : "needed": true
705 1 : },
706 1 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
707 1 : "kind": "BranchEnd"
708 1 : }
709 1 : ],
710 1 : "timeline_inputs": [
711 1 : {
712 1 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
713 1 : "ancestor_lsn": "0/0",
714 1 : "last_record": "47/280A5860",
715 1 : "latest_gc_cutoff": "47/240A5860",
716 1 : "horizon_cutoff": "47/240A5860",
717 1 : "pitr_cutoff": "47/240A5860",
718 1 : "next_gc_cutoff": "47/240A5860",
719 1 : "retention_param_cutoff": "0/0"
720 1 : }
721 1 : ]
722 1 : }"#;
723 1 :
724 1 : let model: ModelInputs = serde_json::from_str(doc).unwrap();
725 1 :
726 1 : let res = model.calculate_model().unwrap().calculate();
727 1 :
728 1 : println!("calculated synthetic size: {}", res.total_size);
729 1 : println!("result: {:?}", serde_json::to_string(&res.segments));
730 1 :
731 1 : use utils::lsn::Lsn;
732 1 : let latest_gc_cutoff_lsn: Lsn = "47/240A5860".parse().unwrap();
733 1 : let last_lsn: Lsn = "47/280A5860".parse().unwrap();
734 1 : println!(
735 1 : "latest_gc_cutoff lsn 47/240A5860 is {}, last_lsn lsn 47/280A5860 is {}",
736 1 : u64::from(latest_gc_cutoff_lsn),
737 1 : u64::from(last_lsn)
738 1 : );
739 1 : assert_eq!(res.total_size, 220121784320);
740 1 : }
|