TLA Line data Source code
1 : use std::cmp;
2 : use std::collections::hash_map::Entry;
3 : use std::collections::{HashMap, HashSet};
4 : use std::sync::Arc;
5 :
6 : use anyhow::{bail, Context};
7 : use tokio::sync::oneshot::error::RecvError;
8 : use tokio::sync::Semaphore;
9 : use tokio_util::sync::CancellationToken;
10 :
11 : use crate::context::RequestContext;
12 : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
13 :
14 : use super::{LogicalSizeCalculationCause, Tenant};
15 : use crate::tenant::Timeline;
16 : use utils::id::TimelineId;
17 : use utils::lsn::Lsn;
18 :
19 : use tracing::*;
20 :
21 : use tenant_size_model::{Segment, StorageModel};
22 :
23 : /// Inputs to the actual tenant sizing model
24 : ///
25 : /// Implements [`serde::Serialize`] but is not meant to be part of the public API, instead meant to
26 : /// be a transferrable format between execution environments and developer.
27 : ///
28 : /// This tracks more information than the actual StorageModel that calculation
29 : /// needs. We will convert this into a StorageModel when it's time to perform
30 : /// the calculation.
31 : ///
32 : #[serde_with::serde_as]
33 CBC 53 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
34 : pub struct ModelInputs {
35 : pub segments: Vec<SegmentMeta>,
36 : pub timeline_inputs: Vec<TimelineInputs>,
37 : }
38 :
39 : /// A [`Segment`], with some extra information for display purposes
40 : #[serde_with::serde_as]
41 257 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
42 : pub struct SegmentMeta {
43 : pub segment: Segment,
44 : #[serde_as(as = "serde_with::DisplayFromStr")]
45 : pub timeline_id: TimelineId,
46 : pub kind: LsnKind,
47 : }
48 :
49 : impl SegmentMeta {
50 614 : fn size_needed(&self) -> bool {
51 614 : match self.kind {
52 : LsnKind::BranchStart => {
53 : // If we don't have a later GcCutoff point on this branch, and
54 : // no ancestor, calculate size for the branch start point.
55 238 : self.segment.needed && self.segment.parent.is_none()
56 : }
57 82 : LsnKind::BranchPoint => true,
58 56 : LsnKind::GcCutOff => true,
59 238 : LsnKind::BranchEnd => false,
60 : }
61 614 : }
62 : }
63 :
64 : #[derive(
65 257 : Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, serde::Serialize, serde::Deserialize,
66 : )]
67 : pub enum LsnKind {
68 : /// A timeline starting here
69 : BranchStart,
70 : /// A child timeline branches off from here
71 : BranchPoint,
72 : /// GC cutoff point
73 : GcCutOff,
74 : /// Last record LSN
75 : BranchEnd,
76 : }
77 :
78 : /// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
79 : /// part of [`ModelInputs`] from the HTTP api, explaining the inputs.
80 : #[serde_with::serde_as]
81 100 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
82 : pub struct TimelineInputs {
83 : #[serde_as(as = "serde_with::DisplayFromStr")]
84 : pub timeline_id: TimelineId,
85 :
86 : #[serde_as(as = "Option<serde_with::DisplayFromStr>")]
87 : pub ancestor_id: Option<TimelineId>,
88 :
89 : #[serde_as(as = "serde_with::DisplayFromStr")]
90 : ancestor_lsn: Lsn,
91 : #[serde_as(as = "serde_with::DisplayFromStr")]
92 : last_record: Lsn,
93 : #[serde_as(as = "serde_with::DisplayFromStr")]
94 : latest_gc_cutoff: Lsn,
95 : #[serde_as(as = "serde_with::DisplayFromStr")]
96 : horizon_cutoff: Lsn,
97 : #[serde_as(as = "serde_with::DisplayFromStr")]
98 : pitr_cutoff: Lsn,
99 :
100 : /// Cutoff point based on GC settings
101 : #[serde_as(as = "serde_with::DisplayFromStr")]
102 : next_gc_cutoff: Lsn,
103 :
104 : /// Cutoff point calculated from the user-supplied 'max_retention_period'
105 : #[serde_as(as = "Option<serde_with::DisplayFromStr>")]
106 : retention_param_cutoff: Option<Lsn>,
107 : }
108 :
109 : /// Gathers the inputs for the tenant sizing model.
110 : ///
111 : /// Tenant size does not consider the latest state, but only the state until next_gc_cutoff, which
112 : /// is updated on-demand, during the start of this calculation and separate from the
113 : /// [`TimelineInputs::latest_gc_cutoff`].
114 : ///
115 : /// For timelines in general:
116 : ///
117 : /// ```text
118 : /// 0-----|---------|----|------------| · · · · · |·> lsn
119 : /// initdb_lsn branchpoints* next_gc_cutoff latest
120 : /// ```
121 : ///
122 : /// Until gc_horizon_cutoff > `Timeline::last_record_lsn` for any of the tenant's timelines, the
123 : /// tenant size will be zero.
124 78 : pub(super) async fn gather_inputs(
125 78 : tenant: &Tenant,
126 78 : limit: &Arc<Semaphore>,
127 78 : max_retention_period: Option<u64>,
128 78 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
129 78 : cause: LogicalSizeCalculationCause,
130 78 : ctx: &RequestContext,
131 78 : ) -> anyhow::Result<ModelInputs> {
132 : // refresh is needed to update gc related pitr_cutoff and horizon_cutoff
133 78 : tenant
134 78 : .refresh_gc_info(ctx)
135 20 : .await
136 78 : .context("Failed to refresh gc_info before gathering inputs")?;
137 :
138 : // Collect information about all the timelines
139 78 : let mut timelines = tenant.list_timelines();
140 78 :
141 78 : if timelines.is_empty() {
142 : // perhaps the tenant has just been created, and as such doesn't have any data yet
143 UBC 0 : return Ok(ModelInputs {
144 0 : segments: vec![],
145 0 : timeline_inputs: Vec::new(),
146 0 : });
147 CBC 78 : }
148 78 :
149 78 : // Filter out timelines that are not active
150 78 : //
151 78 : // There may be a race when a timeline is dropped,
152 78 : // but it is unlikely to cause any issues. In the worst case,
153 78 : // the calculation will error out.
154 119 : timelines.retain(|t| t.is_active());
155 78 :
156 78 : // Build a map of branch points.
157 78 : let mut branchpoints: HashMap<TimelineId, HashSet<Lsn>> = HashMap::new();
158 119 : for timeline in timelines.iter() {
159 119 : if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
160 41 : branchpoints
161 41 : .entry(ancestor_id)
162 41 : .or_default()
163 41 : .insert(timeline.get_ancestor_lsn());
164 78 : }
165 : }
166 :
167 : // These become the final result.
168 78 : let mut timeline_inputs = Vec::with_capacity(timelines.len());
169 78 : let mut segments: Vec<SegmentMeta> = Vec::new();
170 78 :
171 78 : //
172 78 : // Build Segments representing each timeline. As we do that, also remember
173 78 : // the branchpoints and branch startpoints in 'branchpoint_segments' and
174 78 : // 'branchstart_segments'
175 78 : //
176 78 :
177 78 : // BranchPoint segments of each timeline
178 78 : // (timeline, branchpoint LSN) -> segment_id
179 78 : let mut branchpoint_segments: HashMap<(TimelineId, Lsn), usize> = HashMap::new();
180 78 :
181 78 : // timeline, Branchpoint seg id, (ancestor, ancestor LSN)
182 78 : type BranchStartSegment = (TimelineId, usize, Option<(TimelineId, Lsn)>);
183 78 : let mut branchstart_segments: Vec<BranchStartSegment> = Vec::new();
184 :
185 119 : for timeline in timelines.iter() {
186 119 : let timeline_id = timeline.timeline_id;
187 119 : let last_record_lsn = timeline.get_last_record_lsn();
188 119 : let ancestor_lsn = timeline.get_ancestor_lsn();
189 119 :
190 119 : // there's a race between the update (holding tenant.gc_lock) and this read but it
191 119 : // might not be an issue, because it's not for Timeline::gc
192 119 : let gc_info = timeline.gc_info.read().unwrap();
193 119 :
194 119 : // similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a
195 119 : // new gc run, which we have no control over. however differently from `Timeline::gc`
196 119 : // we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not
197 119 : // actually removing files.
198 119 : let mut next_gc_cutoff = cmp::min(gc_info.horizon_cutoff, gc_info.pitr_cutoff);
199 :
200 : // If the caller provided a shorter retention period, use that instead of the GC cutoff.
201 119 : let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {
202 UBC 0 : let param_cutoff = Lsn(last_record_lsn.0.saturating_sub(max_retention_period));
203 0 : if next_gc_cutoff < param_cutoff {
204 0 : next_gc_cutoff = param_cutoff;
205 0 : }
206 0 : Some(param_cutoff)
207 : } else {
208 CBC 119 : None
209 : };
210 :
211 : // next_gc_cutoff in parent branch are not of interest (right now at least), nor do we
212 : // want to query any logical size before initdb_lsn.
213 119 : let branch_start_lsn = cmp::max(ancestor_lsn, timeline.initdb_lsn);
214 119 :
215 119 : // Build "interesting LSNs" on this timeline
216 119 : let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
217 119 : .retain_lsns
218 119 : .iter()
219 119 : .filter(|&&lsn| lsn > ancestor_lsn)
220 119 : .copied()
221 119 : // this assumes there are no other retain_lsns than the branchpoints
222 119 : .map(|lsn| (lsn, LsnKind::BranchPoint))
223 119 : .collect::<Vec<_>>();
224 :
225 : // Add branch points we collected earlier, just in case there were any that were
226 : // not present in retain_lsns. We will remove any duplicates below later.
227 119 : if let Some(this_branchpoints) = branchpoints.get(&timeline_id) {
228 36 : lsns.extend(
229 36 : this_branchpoints
230 36 : .iter()
231 41 : .map(|lsn| (*lsn, LsnKind::BranchPoint)),
232 36 : )
233 83 : }
234 :
235 : // Add a point for the GC cutoff
236 119 : let branch_start_needed = next_gc_cutoff <= branch_start_lsn;
237 119 : if !branch_start_needed {
238 28 : lsns.push((next_gc_cutoff, LsnKind::GcCutOff));
239 91 : }
240 :
241 119 : lsns.sort_unstable();
242 119 : lsns.dedup();
243 119 :
244 119 : //
245 119 : // Create Segments for the interesting points.
246 119 : //
247 119 :
248 119 : // Timeline start point
249 119 : let ancestor = timeline
250 119 : .get_ancestor_timeline_id()
251 119 : .map(|ancestor_id| (ancestor_id, ancestor_lsn));
252 119 : branchstart_segments.push((timeline_id, segments.len(), ancestor));
253 119 : segments.push(SegmentMeta {
254 119 : segment: Segment {
255 119 : parent: None, // filled in later
256 119 : lsn: branch_start_lsn.0,
257 119 : size: None, // filled in later
258 119 : needed: branch_start_needed,
259 119 : },
260 119 : timeline_id: timeline.timeline_id,
261 119 : kind: LsnKind::BranchStart,
262 119 : });
263 119 :
264 119 : // GC cutoff point, and any branch points, i.e. points where
265 119 : // other timelines branch off from this timeline.
266 119 : let mut parent = segments.len() - 1;
267 188 : for (lsn, kind) in lsns {
268 69 : if kind == LsnKind::BranchPoint {
269 41 : branchpoint_segments.insert((timeline_id, lsn), segments.len());
270 41 : }
271 69 : segments.push(SegmentMeta {
272 69 : segment: Segment {
273 69 : parent: Some(parent),
274 69 : lsn: lsn.0,
275 69 : size: None,
276 69 : needed: lsn > next_gc_cutoff,
277 69 : },
278 69 : timeline_id: timeline.timeline_id,
279 69 : kind,
280 69 : });
281 69 : parent += 1;
282 : }
283 :
284 : // Current end of the timeline
285 119 : segments.push(SegmentMeta {
286 119 : segment: Segment {
287 119 : parent: Some(parent),
288 119 : lsn: last_record_lsn.0,
289 119 : size: None, // Filled in later, if necessary
290 119 : needed: true,
291 119 : },
292 119 : timeline_id: timeline.timeline_id,
293 119 : kind: LsnKind::BranchEnd,
294 119 : });
295 119 :
296 119 : timeline_inputs.push(TimelineInputs {
297 119 : timeline_id: timeline.timeline_id,
298 119 : ancestor_id: timeline.get_ancestor_timeline_id(),
299 119 : ancestor_lsn,
300 119 : last_record: last_record_lsn,
301 119 : // this is not used above, because it might not have updated recently enough
302 119 : latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(),
303 119 : horizon_cutoff: gc_info.horizon_cutoff,
304 119 : pitr_cutoff: gc_info.pitr_cutoff,
305 119 : next_gc_cutoff,
306 119 : retention_param_cutoff,
307 119 : });
308 : }
309 :
310 : // We now have all segments from the timelines in 'segments'. The timelines
311 : // haven't been linked to each other yet, though. Do that.
312 197 : for (_timeline_id, seg_id, ancestor) in branchstart_segments {
313 : // Look up the branch point
314 119 : if let Some(ancestor) = ancestor {
315 41 : let parent_id = *branchpoint_segments.get(&ancestor).unwrap();
316 41 : segments[seg_id].segment.parent = Some(parent_id);
317 78 : }
318 : }
319 :
320 : // We left the 'size' field empty in all of the Segments so far.
321 : // Now find logical sizes for all of the points that might need or benefit from them.
322 78 : fill_logical_sizes(
323 78 : &timelines,
324 78 : &mut segments,
325 78 : limit,
326 78 : logical_size_cache,
327 78 : cause,
328 78 : ctx,
329 78 : )
330 39 : .await?;
331 :
332 78 : Ok(ModelInputs {
333 78 : segments,
334 78 : timeline_inputs,
335 78 : })
336 78 : }
337 :
338 : /// Augment 'segments' with logical sizes
339 : ///
340 : /// this will probably conflict with on-demand downloaded layers, or at least force them all
341 : /// to be downloaded
342 : ///
343 78 : async fn fill_logical_sizes(
344 78 : timelines: &[Arc<Timeline>],
345 78 : segments: &mut [SegmentMeta],
346 78 : limit: &Arc<Semaphore>,
347 78 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
348 78 : cause: LogicalSizeCalculationCause,
349 78 : ctx: &RequestContext,
350 78 : ) -> anyhow::Result<()> {
351 78 : let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
352 78 : timelines
353 78 : .iter()
354 119 : .map(|timeline| (timeline.timeline_id, Arc::clone(timeline))),
355 78 : );
356 78 :
357 78 : // record the used/inserted cache keys here, to remove extras not to start leaking
358 78 : // after initial run the cache should be quite stable, but live timelines will eventually
359 78 : // require new lsns to be inspected.
360 78 : let mut sizes_needed = HashMap::<(TimelineId, Lsn), Option<u64>>::new();
361 78 :
362 78 : // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
363 78 : // our advantage with `?` error handling.
364 78 : let mut joinset = tokio::task::JoinSet::new();
365 78 :
366 78 : let cancel = tokio_util::sync::CancellationToken::new();
367 78 : // be sure to cancel all spawned tasks if we are dropped
368 78 : let _dg = cancel.clone().drop_guard();
369 :
370 : // For each point that would benefit from having a logical size available,
371 : // spawn a Task to fetch it, unless we have it cached already.
372 307 : for seg in segments.iter() {
373 307 : if !seg.size_needed() {
374 176 : continue;
375 131 : }
376 131 :
377 131 : let timeline_id = seg.timeline_id;
378 131 : let lsn = Lsn(seg.segment.lsn);
379 :
380 131 : if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) {
381 131 : let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
382 131 : if cached_size.is_none() {
383 39 : let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
384 39 : let parallel_size_calcs = Arc::clone(limit);
385 39 : let ctx = ctx.attached_child();
386 39 : joinset.spawn(
387 39 : calculate_logical_size(
388 39 : parallel_size_calcs,
389 39 : timeline,
390 39 : lsn,
391 39 : cause,
392 39 : ctx,
393 39 : cancel.child_token(),
394 39 : )
395 39 : .in_current_span(),
396 39 : );
397 92 : }
398 131 : e.insert(cached_size);
399 UBC 0 : }
400 : }
401 :
402 : // Perform the size lookups
403 CBC 78 : let mut have_any_error = false;
404 117 : while let Some(res) = joinset.join_next().await {
405 : // each of these come with Result<anyhow::Result<_>, JoinError>
406 : // because of spawn + spawn_blocking
407 39 : match res {
408 UBC 0 : Err(join_error) if join_error.is_cancelled() => {
409 0 : unreachable!("we are not cancelling any of the futures, nor should be");
410 : }
411 0 : Err(join_error) => {
412 : // cannot really do anything, as this panic is likely a bug
413 0 : error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
414 0 : have_any_error = true;
415 : }
416 0 : Ok(Err(recv_result_error)) => {
417 : // cannot really do anything, as this panic is likely a bug
418 0 : error!("failed to receive logical size query result: {recv_result_error:#}");
419 0 : have_any_error = true;
420 : }
421 0 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
422 0 : warn!(
423 0 : timeline_id=%timeline.timeline_id,
424 0 : "failed to calculate logical size at {lsn}: {error:#}"
425 0 : );
426 0 : have_any_error = true;
427 : }
428 CBC 39 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
429 UBC 0 : debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
430 :
431 CBC 39 : logical_size_cache.insert((timeline.timeline_id, lsn), size);
432 39 : sizes_needed.insert((timeline.timeline_id, lsn), Some(size));
433 : }
434 : }
435 : }
436 :
437 : // prune any keys not needed anymore; we record every used key and added key.
438 141 : logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
439 78 :
440 78 : if have_any_error {
441 : // we cannot complete this round, because we are missing data.
442 : // we have however cached all we were able to request calculation on.
443 UBC 0 : anyhow::bail!("failed to calculate some logical_sizes");
444 CBC 78 : }
445 :
446 : // Insert the looked up sizes to the Segments
447 307 : for seg in segments.iter_mut() {
448 307 : if !seg.size_needed() {
449 176 : continue;
450 131 : }
451 131 :
452 131 : let timeline_id = seg.timeline_id;
453 131 : let lsn = Lsn(seg.segment.lsn);
454 :
455 131 : if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
456 131 : seg.segment.size = Some(*size);
457 131 : } else {
458 UBC 0 : bail!("could not find size at {} in timeline {}", lsn, timeline_id);
459 : }
460 : }
461 CBC 78 : Ok(())
462 78 : }
463 :
464 : impl ModelInputs {
465 74 : pub fn calculate_model(&self) -> anyhow::Result<tenant_size_model::StorageModel> {
466 74 : // Convert SegmentMetas into plain Segments
467 74 : let storage = StorageModel {
468 74 : segments: self
469 74 : .segments
470 74 : .iter()
471 309 : .map(|seg| seg.segment.clone())
472 74 : .collect(),
473 74 : };
474 74 :
475 74 : Ok(storage)
476 74 : }
477 :
478 : // calculate total project size
479 20 : pub fn calculate(&self) -> anyhow::Result<u64> {
480 20 : let storage = self.calculate_model()?;
481 20 : let sizes = storage.calculate();
482 20 :
483 20 : Ok(sizes.total_size)
484 20 : }
485 : }
486 :
487 : /// Newtype around the tuple that carries the timeline at lsn logical size calculation.
488 : struct TimelineAtLsnSizeResult(
489 : Arc<crate::tenant::Timeline>,
490 : utils::lsn::Lsn,
491 : Result<u64, CalculateLogicalSizeError>,
492 : );
493 :
494 78 : #[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
495 : async fn calculate_logical_size(
496 : limit: Arc<tokio::sync::Semaphore>,
497 : timeline: Arc<crate::tenant::Timeline>,
498 : lsn: utils::lsn::Lsn,
499 : cause: LogicalSizeCalculationCause,
500 : ctx: RequestContext,
501 : cancel: CancellationToken,
502 : ) -> Result<TimelineAtLsnSizeResult, RecvError> {
503 : let _permit = tokio::sync::Semaphore::acquire_owned(limit)
504 : .await
505 : .expect("global semaphore should not had been closed");
506 :
507 : let size_res = timeline
508 : .spawn_ondemand_logical_size_calculation(lsn, cause, ctx, cancel)
509 : .instrument(info_span!("spawn_ondemand_logical_size_calculation"))
510 : .await?;
511 : Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
512 : }
513 :
514 1 : #[test]
515 1 : fn verify_size_for_multiple_branches() {
516 1 : // this is generated from integration test test_tenant_size_with_multiple_branches, but this way
517 1 : // it has the stable lsn's
518 1 : //
519 1 : // The timeline_inputs don't participate in the size calculation, and are here just to explain
520 1 : // the inputs.
521 1 : let doc = r#"
522 1 : {
523 1 : "segments": [
524 1 : {
525 1 : "segment": {
526 1 : "parent": 9,
527 1 : "lsn": 26033560,
528 1 : "size": null,
529 1 : "needed": false
530 1 : },
531 1 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
532 1 : "kind": "BranchStart"
533 1 : },
534 1 : {
535 1 : "segment": {
536 1 : "parent": 0,
537 1 : "lsn": 35720400,
538 1 : "size": 25206784,
539 1 : "needed": false
540 1 : },
541 1 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
542 1 : "kind": "GcCutOff"
543 1 : },
544 1 : {
545 1 : "segment": {
546 1 : "parent": 1,
547 1 : "lsn": 35851472,
548 1 : "size": null,
549 1 : "needed": true
550 1 : },
551 1 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
552 1 : "kind": "BranchEnd"
553 1 : },
554 1 : {
555 1 : "segment": {
556 1 : "parent": 7,
557 1 : "lsn": 24566168,
558 1 : "size": null,
559 1 : "needed": false
560 1 : },
561 1 : "timeline_id": "454626700469f0a9914949b9d018e876",
562 1 : "kind": "BranchStart"
563 1 : },
564 1 : {
565 1 : "segment": {
566 1 : "parent": 3,
567 1 : "lsn": 25261936,
568 1 : "size": 26050560,
569 1 : "needed": false
570 1 : },
571 1 : "timeline_id": "454626700469f0a9914949b9d018e876",
572 1 : "kind": "GcCutOff"
573 1 : },
574 1 : {
575 1 : "segment": {
576 1 : "parent": 4,
577 1 : "lsn": 25393008,
578 1 : "size": null,
579 1 : "needed": true
580 1 : },
581 1 : "timeline_id": "454626700469f0a9914949b9d018e876",
582 1 : "kind": "BranchEnd"
583 1 : },
584 1 : {
585 1 : "segment": {
586 1 : "parent": null,
587 1 : "lsn": 23694408,
588 1 : "size": null,
589 1 : "needed": false
590 1 : },
591 1 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
592 1 : "kind": "BranchStart"
593 1 : },
594 1 : {
595 1 : "segment": {
596 1 : "parent": 6,
597 1 : "lsn": 24566168,
598 1 : "size": 25739264,
599 1 : "needed": false
600 1 : },
601 1 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
602 1 : "kind": "BranchPoint"
603 1 : },
604 1 : {
605 1 : "segment": {
606 1 : "parent": 7,
607 1 : "lsn": 25902488,
608 1 : "size": 26402816,
609 1 : "needed": false
610 1 : },
611 1 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
612 1 : "kind": "GcCutOff"
613 1 : },
614 1 : {
615 1 : "segment": {
616 1 : "parent": 8,
617 1 : "lsn": 26033560,
618 1 : "size": 26468352,
619 1 : "needed": true
620 1 : },
621 1 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
622 1 : "kind": "BranchPoint"
623 1 : },
624 1 : {
625 1 : "segment": {
626 1 : "parent": 9,
627 1 : "lsn": 26033560,
628 1 : "size": null,
629 1 : "needed": true
630 1 : },
631 1 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
632 1 : "kind": "BranchEnd"
633 1 : }
634 1 : ],
635 1 : "timeline_inputs": [
636 1 : {
637 1 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
638 1 : "ancestor_lsn": "0/18D3D98",
639 1 : "last_record": "0/2230CD0",
640 1 : "latest_gc_cutoff": "0/1698C48",
641 1 : "horizon_cutoff": "0/2210CD0",
642 1 : "pitr_cutoff": "0/2210CD0",
643 1 : "next_gc_cutoff": "0/2210CD0",
644 1 : "retention_param_cutoff": null
645 1 : },
646 1 : {
647 1 : "timeline_id": "454626700469f0a9914949b9d018e876",
648 1 : "ancestor_lsn": "0/176D998",
649 1 : "last_record": "0/1837770",
650 1 : "latest_gc_cutoff": "0/1698C48",
651 1 : "horizon_cutoff": "0/1817770",
652 1 : "pitr_cutoff": "0/1817770",
653 1 : "next_gc_cutoff": "0/1817770",
654 1 : "retention_param_cutoff": null
655 1 : },
656 1 : {
657 1 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
658 1 : "ancestor_lsn": "0/0",
659 1 : "last_record": "0/18D3D98",
660 1 : "latest_gc_cutoff": "0/1698C48",
661 1 : "horizon_cutoff": "0/18B3D98",
662 1 : "pitr_cutoff": "0/18B3D98",
663 1 : "next_gc_cutoff": "0/18B3D98",
664 1 : "retention_param_cutoff": null
665 1 : }
666 1 : ]
667 1 : }
668 1 : "#;
669 1 : let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
670 1 :
671 1 : assert_eq!(inputs.calculate().unwrap(), 37_851_408);
672 1 : }
673 :
674 1 : #[test]
675 1 : fn verify_size_for_one_branch() {
676 1 : let doc = r#"
677 1 : {
678 1 : "segments": [
679 1 : {
680 1 : "segment": {
681 1 : "parent": null,
682 1 : "lsn": 0,
683 1 : "size": null,
684 1 : "needed": false
685 1 : },
686 1 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
687 1 : "kind": "BranchStart"
688 1 : },
689 1 : {
690 1 : "segment": {
691 1 : "parent": 0,
692 1 : "lsn": 305547335776,
693 1 : "size": 220054675456,
694 1 : "needed": false
695 1 : },
696 1 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
697 1 : "kind": "GcCutOff"
698 1 : },
699 1 : {
700 1 : "segment": {
701 1 : "parent": 1,
702 1 : "lsn": 305614444640,
703 1 : "size": null,
704 1 : "needed": true
705 1 : },
706 1 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
707 1 : "kind": "BranchEnd"
708 1 : }
709 1 : ],
710 1 : "timeline_inputs": [
711 1 : {
712 1 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
713 1 : "ancestor_lsn": "0/0",
714 1 : "last_record": "47/280A5860",
715 1 : "latest_gc_cutoff": "47/240A5860",
716 1 : "horizon_cutoff": "47/240A5860",
717 1 : "pitr_cutoff": "47/240A5860",
718 1 : "next_gc_cutoff": "47/240A5860",
719 1 : "retention_param_cutoff": "0/0"
720 1 : }
721 1 : ]
722 1 : }"#;
723 1 :
724 1 : let model: ModelInputs = serde_json::from_str(doc).unwrap();
725 1 :
726 1 : let res = model.calculate_model().unwrap().calculate();
727 1 :
728 1 : println!("calculated synthetic size: {}", res.total_size);
729 1 : println!("result: {:?}", serde_json::to_string(&res.segments));
730 1 :
731 1 : use utils::lsn::Lsn;
732 1 : let latest_gc_cutoff_lsn: Lsn = "47/240A5860".parse().unwrap();
733 1 : let last_lsn: Lsn = "47/280A5860".parse().unwrap();
734 1 : println!(
735 1 : "latest_gc_cutoff lsn 47/240A5860 is {}, last_lsn lsn 47/280A5860 is {}",
736 1 : u64::from(latest_gc_cutoff_lsn),
737 1 : u64::from(last_lsn)
738 1 : );
739 1 : assert_eq!(res.total_size, 220121784320);
740 1 : }
|