Line data Source code
1 : use std::cmp;
2 : use std::collections::hash_map::Entry;
3 : use std::collections::{HashMap, HashSet};
4 : use std::sync::Arc;
5 :
6 : use anyhow::{bail, Context};
7 : use tokio::sync::oneshot::error::RecvError;
8 : use tokio::sync::Semaphore;
9 : use tokio_util::sync::CancellationToken;
10 :
11 : use crate::context::RequestContext;
12 : use crate::pgdatadir_mapping::CalculateLogicalSizeError;
13 :
14 : use super::{LogicalSizeCalculationCause, Tenant};
15 : use crate::tenant::Timeline;
16 : use utils::id::TimelineId;
17 : use utils::lsn::Lsn;
18 :
19 : use tracing::*;
20 :
21 : use tenant_size_model::{Segment, StorageModel};
22 :
23 : /// Inputs to the actual tenant sizing model
24 : ///
25 : /// Implements [`serde::Serialize`] but is not meant to be part of the public API, instead meant to
26 : /// be a transferrable format between execution environments and developer.
27 : ///
28 : /// This tracks more information than the actual StorageModel that calculation
29 : /// needs. We will convert this into a StorageModel when it's time to perform
30 : /// the calculation.
31 : ///
32 20 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
33 : pub struct ModelInputs {
34 : pub segments: Vec<SegmentMeta>,
35 : pub timeline_inputs: Vec<TimelineInputs>,
36 : }
37 :
38 : /// A [`Segment`], with some extra information for display purposes
39 196 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
40 : pub struct SegmentMeta {
41 : pub segment: Segment,
42 : pub timeline_id: TimelineId,
43 : pub kind: LsnKind,
44 : }
45 :
46 : impl SegmentMeta {
47 0 : fn size_needed(&self) -> bool {
48 0 : match self.kind {
49 : LsnKind::BranchStart => {
50 : // If we don't have a later GcCutoff point on this branch, and
51 : // no ancestor, calculate size for the branch start point.
52 0 : self.segment.needed && self.segment.parent.is_none()
53 : }
54 0 : LsnKind::BranchPoint => true,
55 0 : LsnKind::GcCutOff => true,
56 0 : LsnKind::BranchEnd => false,
57 : }
58 0 : }
59 : }
60 :
61 : #[derive(
62 56 : Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd, serde::Serialize, serde::Deserialize,
63 : )]
64 : pub enum LsnKind {
65 : /// A timeline starting here
66 : BranchStart,
67 : /// A child timeline branches off from here
68 : BranchPoint,
69 : /// GC cutoff point
70 : GcCutOff,
71 : /// Last record LSN
72 : BranchEnd,
73 : }
74 :
75 : /// Collect all relevant LSNs to the inputs. These will only be helpful in the serialized form as
76 : /// part of [`ModelInputs`] from the HTTP api, explaining the inputs.
77 136 : #[derive(Debug, serde::Serialize, serde::Deserialize)]
78 : pub struct TimelineInputs {
79 : pub timeline_id: TimelineId,
80 :
81 : pub ancestor_id: Option<TimelineId>,
82 :
83 : ancestor_lsn: Lsn,
84 : last_record: Lsn,
85 : latest_gc_cutoff: Lsn,
86 : horizon_cutoff: Lsn,
87 : pitr_cutoff: Lsn,
88 :
89 : /// Cutoff point based on GC settings
90 : next_gc_cutoff: Lsn,
91 :
92 : /// Cutoff point calculated from the user-supplied 'max_retention_period'
93 : retention_param_cutoff: Option<Lsn>,
94 : }
95 :
96 : /// Gathers the inputs for the tenant sizing model.
97 : ///
98 : /// Tenant size does not consider the latest state, but only the state until next_gc_cutoff, which
99 : /// is updated on-demand, during the start of this calculation and separate from the
100 : /// [`TimelineInputs::latest_gc_cutoff`].
101 : ///
102 : /// For timelines in general:
103 : ///
104 : /// ```text
105 : /// 0-----|---------|----|------------| · · · · · |·> lsn
106 : /// initdb_lsn branchpoints* next_gc_cutoff latest
107 : /// ```
108 : ///
109 : /// Until gc_horizon_cutoff > `Timeline::last_record_lsn` for any of the tenant's timelines, the
110 : /// tenant size will be zero.
111 0 : pub(super) async fn gather_inputs(
112 0 : tenant: &Tenant,
113 0 : limit: &Arc<Semaphore>,
114 0 : max_retention_period: Option<u64>,
115 0 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
116 0 : cause: LogicalSizeCalculationCause,
117 0 : cancel: &CancellationToken,
118 0 : ctx: &RequestContext,
119 0 : ) -> anyhow::Result<ModelInputs> {
120 0 : // refresh is needed to update gc related pitr_cutoff and horizon_cutoff
121 0 : tenant
122 0 : .refresh_gc_info(cancel, ctx)
123 0 : .await
124 0 : .context("Failed to refresh gc_info before gathering inputs")?;
125 :
126 : // Collect information about all the timelines
127 0 : let mut timelines = tenant.list_timelines();
128 0 :
129 0 : if timelines.is_empty() {
130 : // perhaps the tenant has just been created, and as such doesn't have any data yet
131 0 : return Ok(ModelInputs {
132 0 : segments: vec![],
133 0 : timeline_inputs: Vec::new(),
134 0 : });
135 0 : }
136 0 :
137 0 : // Filter out timelines that are not active
138 0 : //
139 0 : // There may be a race when a timeline is dropped,
140 0 : // but it is unlikely to cause any issues. In the worst case,
141 0 : // the calculation will error out.
142 0 : timelines.retain(|t| t.is_active());
143 0 :
144 0 : // Build a map of branch points.
145 0 : let mut branchpoints: HashMap<TimelineId, HashSet<Lsn>> = HashMap::new();
146 0 : for timeline in timelines.iter() {
147 0 : if let Some(ancestor_id) = timeline.get_ancestor_timeline_id() {
148 0 : branchpoints
149 0 : .entry(ancestor_id)
150 0 : .or_default()
151 0 : .insert(timeline.get_ancestor_lsn());
152 0 : }
153 : }
154 :
155 : // These become the final result.
156 0 : let mut timeline_inputs = Vec::with_capacity(timelines.len());
157 0 : let mut segments: Vec<SegmentMeta> = Vec::new();
158 0 :
159 0 : //
160 0 : // Build Segments representing each timeline. As we do that, also remember
161 0 : // the branchpoints and branch startpoints in 'branchpoint_segments' and
162 0 : // 'branchstart_segments'
163 0 : //
164 0 :
165 0 : // BranchPoint segments of each timeline
166 0 : // (timeline, branchpoint LSN) -> segment_id
167 0 : let mut branchpoint_segments: HashMap<(TimelineId, Lsn), usize> = HashMap::new();
168 0 :
169 0 : // timeline, Branchpoint seg id, (ancestor, ancestor LSN)
170 0 : type BranchStartSegment = (TimelineId, usize, Option<(TimelineId, Lsn)>);
171 0 : let mut branchstart_segments: Vec<BranchStartSegment> = Vec::new();
172 :
173 0 : for timeline in timelines.iter() {
174 0 : let timeline_id = timeline.timeline_id;
175 0 : let last_record_lsn = timeline.get_last_record_lsn();
176 0 : let ancestor_lsn = timeline.get_ancestor_lsn();
177 0 :
178 0 : // there's a race between the update (holding tenant.gc_lock) and this read but it
179 0 : // might not be an issue, because it's not for Timeline::gc
180 0 : let gc_info = timeline.gc_info.read().unwrap();
181 0 :
182 0 : // similar to gc, but Timeline::get_latest_gc_cutoff_lsn() will not be updated before a
183 0 : // new gc run, which we have no control over. however differently from `Timeline::gc`
184 0 : // we don't consider the `Timeline::disk_consistent_lsn` at all, because we are not
185 0 : // actually removing files.
186 0 : let mut next_gc_cutoff = cmp::min(gc_info.horizon_cutoff, gc_info.pitr_cutoff);
187 :
188 : // If the caller provided a shorter retention period, use that instead of the GC cutoff.
189 0 : let retention_param_cutoff = if let Some(max_retention_period) = max_retention_period {
190 0 : let param_cutoff = Lsn(last_record_lsn.0.saturating_sub(max_retention_period));
191 0 : if next_gc_cutoff < param_cutoff {
192 0 : next_gc_cutoff = param_cutoff;
193 0 : }
194 0 : Some(param_cutoff)
195 : } else {
196 0 : None
197 : };
198 :
199 : // next_gc_cutoff in parent branch are not of interest (right now at least), nor do we
200 : // want to query any logical size before initdb_lsn.
201 0 : let branch_start_lsn = cmp::max(ancestor_lsn, timeline.initdb_lsn);
202 0 :
203 0 : // Build "interesting LSNs" on this timeline
204 0 : let mut lsns: Vec<(Lsn, LsnKind)> = gc_info
205 0 : .retain_lsns
206 0 : .iter()
207 0 : .filter(|&&lsn| lsn > ancestor_lsn)
208 0 : .copied()
209 0 : // this assumes there are no other retain_lsns than the branchpoints
210 0 : .map(|lsn| (lsn, LsnKind::BranchPoint))
211 0 : .collect::<Vec<_>>();
212 :
213 : // Add branch points we collected earlier, just in case there were any that were
214 : // not present in retain_lsns. We will remove any duplicates below later.
215 0 : if let Some(this_branchpoints) = branchpoints.get(&timeline_id) {
216 0 : lsns.extend(
217 0 : this_branchpoints
218 0 : .iter()
219 0 : .map(|lsn| (*lsn, LsnKind::BranchPoint)),
220 0 : )
221 0 : }
222 :
223 : // Add a point for the GC cutoff
224 0 : let branch_start_needed = next_gc_cutoff <= branch_start_lsn;
225 0 : if !branch_start_needed {
226 0 : lsns.push((next_gc_cutoff, LsnKind::GcCutOff));
227 0 : }
228 :
229 0 : lsns.sort_unstable();
230 0 : lsns.dedup();
231 0 :
232 0 : //
233 0 : // Create Segments for the interesting points.
234 0 : //
235 0 :
236 0 : // Timeline start point
237 0 : let ancestor = timeline
238 0 : .get_ancestor_timeline_id()
239 0 : .map(|ancestor_id| (ancestor_id, ancestor_lsn));
240 0 : branchstart_segments.push((timeline_id, segments.len(), ancestor));
241 0 : segments.push(SegmentMeta {
242 0 : segment: Segment {
243 0 : parent: None, // filled in later
244 0 : lsn: branch_start_lsn.0,
245 0 : size: None, // filled in later
246 0 : needed: branch_start_needed,
247 0 : },
248 0 : timeline_id: timeline.timeline_id,
249 0 : kind: LsnKind::BranchStart,
250 0 : });
251 0 :
252 0 : // GC cutoff point, and any branch points, i.e. points where
253 0 : // other timelines branch off from this timeline.
254 0 : let mut parent = segments.len() - 1;
255 0 : for (lsn, kind) in lsns {
256 0 : if kind == LsnKind::BranchPoint {
257 0 : branchpoint_segments.insert((timeline_id, lsn), segments.len());
258 0 : }
259 0 : segments.push(SegmentMeta {
260 0 : segment: Segment {
261 0 : parent: Some(parent),
262 0 : lsn: lsn.0,
263 0 : size: None,
264 0 : needed: lsn > next_gc_cutoff,
265 0 : },
266 0 : timeline_id: timeline.timeline_id,
267 0 : kind,
268 0 : });
269 0 : parent += 1;
270 : }
271 :
272 : // Current end of the timeline
273 0 : segments.push(SegmentMeta {
274 0 : segment: Segment {
275 0 : parent: Some(parent),
276 0 : lsn: last_record_lsn.0,
277 0 : size: None, // Filled in later, if necessary
278 0 : needed: true,
279 0 : },
280 0 : timeline_id: timeline.timeline_id,
281 0 : kind: LsnKind::BranchEnd,
282 0 : });
283 0 :
284 0 : timeline_inputs.push(TimelineInputs {
285 0 : timeline_id: timeline.timeline_id,
286 0 : ancestor_id: timeline.get_ancestor_timeline_id(),
287 0 : ancestor_lsn,
288 0 : last_record: last_record_lsn,
289 0 : // this is not used above, because it might not have updated recently enough
290 0 : latest_gc_cutoff: *timeline.get_latest_gc_cutoff_lsn(),
291 0 : horizon_cutoff: gc_info.horizon_cutoff,
292 0 : pitr_cutoff: gc_info.pitr_cutoff,
293 0 : next_gc_cutoff,
294 0 : retention_param_cutoff,
295 0 : });
296 : }
297 :
298 : // We now have all segments from the timelines in 'segments'. The timelines
299 : // haven't been linked to each other yet, though. Do that.
300 0 : for (_timeline_id, seg_id, ancestor) in branchstart_segments {
301 : // Look up the branch point
302 0 : if let Some(ancestor) = ancestor {
303 0 : let parent_id = *branchpoint_segments.get(&ancestor).unwrap();
304 0 : segments[seg_id].segment.parent = Some(parent_id);
305 0 : }
306 : }
307 :
308 : // We left the 'size' field empty in all of the Segments so far.
309 : // Now find logical sizes for all of the points that might need or benefit from them.
310 0 : fill_logical_sizes(
311 0 : &timelines,
312 0 : &mut segments,
313 0 : limit,
314 0 : logical_size_cache,
315 0 : cause,
316 0 : ctx,
317 0 : )
318 0 : .await?;
319 :
320 0 : Ok(ModelInputs {
321 0 : segments,
322 0 : timeline_inputs,
323 0 : })
324 0 : }
325 :
326 : /// Augment 'segments' with logical sizes
327 : ///
328 : /// this will probably conflict with on-demand downloaded layers, or at least force them all
329 : /// to be downloaded
330 : ///
331 0 : async fn fill_logical_sizes(
332 0 : timelines: &[Arc<Timeline>],
333 0 : segments: &mut [SegmentMeta],
334 0 : limit: &Arc<Semaphore>,
335 0 : logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
336 0 : cause: LogicalSizeCalculationCause,
337 0 : ctx: &RequestContext,
338 0 : ) -> anyhow::Result<()> {
339 0 : let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
340 0 : timelines
341 0 : .iter()
342 0 : .map(|timeline| (timeline.timeline_id, Arc::clone(timeline))),
343 0 : );
344 0 :
345 0 : // record the used/inserted cache keys here, to remove extras not to start leaking
346 0 : // after initial run the cache should be quite stable, but live timelines will eventually
347 0 : // require new lsns to be inspected.
348 0 : let mut sizes_needed = HashMap::<(TimelineId, Lsn), Option<u64>>::new();
349 0 :
350 0 : // with joinset, on drop, all of the tasks will just be de-scheduled, which we can use to
351 0 : // our advantage with `?` error handling.
352 0 : let mut joinset = tokio::task::JoinSet::new();
353 :
354 : // For each point that would benefit from having a logical size available,
355 : // spawn a Task to fetch it, unless we have it cached already.
356 0 : for seg in segments.iter() {
357 0 : if !seg.size_needed() {
358 0 : continue;
359 0 : }
360 0 :
361 0 : let timeline_id = seg.timeline_id;
362 0 : let lsn = Lsn(seg.segment.lsn);
363 :
364 0 : if let Entry::Vacant(e) = sizes_needed.entry((timeline_id, lsn)) {
365 0 : let cached_size = logical_size_cache.get(&(timeline_id, lsn)).cloned();
366 0 : if cached_size.is_none() {
367 0 : let timeline = Arc::clone(timeline_hash.get(&timeline_id).unwrap());
368 0 : let parallel_size_calcs = Arc::clone(limit);
369 0 : let ctx = ctx.attached_child();
370 0 : joinset.spawn(
371 0 : calculate_logical_size(parallel_size_calcs, timeline, lsn, cause, ctx)
372 0 : .in_current_span(),
373 0 : );
374 0 : }
375 0 : e.insert(cached_size);
376 0 : }
377 : }
378 :
379 : // Perform the size lookups
380 0 : let mut have_any_error = false;
381 0 : while let Some(res) = joinset.join_next().await {
382 : // each of these come with Result<anyhow::Result<_>, JoinError>
383 : // because of spawn + spawn_blocking
384 0 : match res {
385 0 : Err(join_error) if join_error.is_cancelled() => {
386 0 : unreachable!("we are not cancelling any of the futures, nor should be");
387 : }
388 0 : Err(join_error) => {
389 : // cannot really do anything, as this panic is likely a bug
390 0 : error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
391 0 : have_any_error = true;
392 : }
393 0 : Ok(Err(recv_result_error)) => {
394 : // cannot really do anything, as this panic is likely a bug
395 0 : error!("failed to receive logical size query result: {recv_result_error:#}");
396 0 : have_any_error = true;
397 : }
398 0 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
399 0 : if !matches!(error, CalculateLogicalSizeError::Cancelled) {
400 0 : warn!(
401 0 : timeline_id=%timeline.timeline_id,
402 0 : "failed to calculate logical size at {lsn}: {error:#}"
403 0 : );
404 0 : }
405 0 : have_any_error = true;
406 : }
407 0 : Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
408 0 : debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
409 :
410 0 : logical_size_cache.insert((timeline.timeline_id, lsn), size);
411 0 : sizes_needed.insert((timeline.timeline_id, lsn), Some(size));
412 : }
413 : }
414 : }
415 :
416 : // prune any keys not needed anymore; we record every used key and added key.
417 0 : logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
418 0 :
419 0 : if have_any_error {
420 : // we cannot complete this round, because we are missing data.
421 : // we have however cached all we were able to request calculation on.
422 0 : anyhow::bail!("failed to calculate some logical_sizes");
423 0 : }
424 :
425 : // Insert the looked up sizes to the Segments
426 0 : for seg in segments.iter_mut() {
427 0 : if !seg.size_needed() {
428 0 : continue;
429 0 : }
430 0 :
431 0 : let timeline_id = seg.timeline_id;
432 0 : let lsn = Lsn(seg.segment.lsn);
433 :
434 0 : if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
435 0 : seg.segment.size = Some(*size);
436 0 : } else {
437 0 : bail!("could not find size at {} in timeline {}", lsn, timeline_id);
438 : }
439 : }
440 0 : Ok(())
441 0 : }
442 :
443 : impl ModelInputs {
444 4 : pub fn calculate_model(&self) -> anyhow::Result<tenant_size_model::StorageModel> {
445 4 : // Convert SegmentMetas into plain Segments
446 4 : let storage = StorageModel {
447 4 : segments: self
448 4 : .segments
449 4 : .iter()
450 28 : .map(|seg| seg.segment.clone())
451 4 : .collect(),
452 4 : };
453 4 :
454 4 : Ok(storage)
455 4 : }
456 :
457 : // calculate total project size
458 2 : pub fn calculate(&self) -> anyhow::Result<u64> {
459 2 : let storage = self.calculate_model()?;
460 2 : let sizes = storage.calculate();
461 2 :
462 2 : Ok(sizes.total_size)
463 2 : }
464 : }
465 :
466 : /// Newtype around the tuple that carries the timeline at lsn logical size calculation.
467 : struct TimelineAtLsnSizeResult(
468 : Arc<crate::tenant::Timeline>,
469 : utils::lsn::Lsn,
470 : Result<u64, CalculateLogicalSizeError>,
471 : );
472 :
473 0 : #[instrument(skip_all, fields(timeline_id=%timeline.timeline_id, lsn=%lsn))]
474 : async fn calculate_logical_size(
475 : limit: Arc<tokio::sync::Semaphore>,
476 : timeline: Arc<crate::tenant::Timeline>,
477 : lsn: utils::lsn::Lsn,
478 : cause: LogicalSizeCalculationCause,
479 : ctx: RequestContext,
480 : ) -> Result<TimelineAtLsnSizeResult, RecvError> {
481 : let _permit = tokio::sync::Semaphore::acquire_owned(limit)
482 : .await
483 : .expect("global semaphore should not had been closed");
484 :
485 : let size_res = timeline
486 : .spawn_ondemand_logical_size_calculation(lsn, cause, ctx)
487 : .instrument(info_span!("spawn_ondemand_logical_size_calculation"))
488 : .await?;
489 : Ok(TimelineAtLsnSizeResult(timeline, lsn, size_res))
490 : }
491 :
492 2 : #[test]
493 2 : fn verify_size_for_multiple_branches() {
494 2 : // this is generated from integration test test_tenant_size_with_multiple_branches, but this way
495 2 : // it has the stable lsn's
496 2 : //
497 2 : // The timeline_inputs don't participate in the size calculation, and are here just to explain
498 2 : // the inputs.
499 2 : let doc = r#"
500 2 : {
501 2 : "segments": [
502 2 : {
503 2 : "segment": {
504 2 : "parent": 9,
505 2 : "lsn": 26033560,
506 2 : "size": null,
507 2 : "needed": false
508 2 : },
509 2 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
510 2 : "kind": "BranchStart"
511 2 : },
512 2 : {
513 2 : "segment": {
514 2 : "parent": 0,
515 2 : "lsn": 35720400,
516 2 : "size": 25206784,
517 2 : "needed": false
518 2 : },
519 2 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
520 2 : "kind": "GcCutOff"
521 2 : },
522 2 : {
523 2 : "segment": {
524 2 : "parent": 1,
525 2 : "lsn": 35851472,
526 2 : "size": null,
527 2 : "needed": true
528 2 : },
529 2 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
530 2 : "kind": "BranchEnd"
531 2 : },
532 2 : {
533 2 : "segment": {
534 2 : "parent": 7,
535 2 : "lsn": 24566168,
536 2 : "size": null,
537 2 : "needed": false
538 2 : },
539 2 : "timeline_id": "454626700469f0a9914949b9d018e876",
540 2 : "kind": "BranchStart"
541 2 : },
542 2 : {
543 2 : "segment": {
544 2 : "parent": 3,
545 2 : "lsn": 25261936,
546 2 : "size": 26050560,
547 2 : "needed": false
548 2 : },
549 2 : "timeline_id": "454626700469f0a9914949b9d018e876",
550 2 : "kind": "GcCutOff"
551 2 : },
552 2 : {
553 2 : "segment": {
554 2 : "parent": 4,
555 2 : "lsn": 25393008,
556 2 : "size": null,
557 2 : "needed": true
558 2 : },
559 2 : "timeline_id": "454626700469f0a9914949b9d018e876",
560 2 : "kind": "BranchEnd"
561 2 : },
562 2 : {
563 2 : "segment": {
564 2 : "parent": null,
565 2 : "lsn": 23694408,
566 2 : "size": null,
567 2 : "needed": false
568 2 : },
569 2 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
570 2 : "kind": "BranchStart"
571 2 : },
572 2 : {
573 2 : "segment": {
574 2 : "parent": 6,
575 2 : "lsn": 24566168,
576 2 : "size": 25739264,
577 2 : "needed": false
578 2 : },
579 2 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
580 2 : "kind": "BranchPoint"
581 2 : },
582 2 : {
583 2 : "segment": {
584 2 : "parent": 7,
585 2 : "lsn": 25902488,
586 2 : "size": 26402816,
587 2 : "needed": false
588 2 : },
589 2 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
590 2 : "kind": "GcCutOff"
591 2 : },
592 2 : {
593 2 : "segment": {
594 2 : "parent": 8,
595 2 : "lsn": 26033560,
596 2 : "size": 26468352,
597 2 : "needed": true
598 2 : },
599 2 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
600 2 : "kind": "BranchPoint"
601 2 : },
602 2 : {
603 2 : "segment": {
604 2 : "parent": 9,
605 2 : "lsn": 26033560,
606 2 : "size": null,
607 2 : "needed": true
608 2 : },
609 2 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
610 2 : "kind": "BranchEnd"
611 2 : }
612 2 : ],
613 2 : "timeline_inputs": [
614 2 : {
615 2 : "timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
616 2 : "ancestor_lsn": "0/18D3D98",
617 2 : "last_record": "0/2230CD0",
618 2 : "latest_gc_cutoff": "0/1698C48",
619 2 : "horizon_cutoff": "0/2210CD0",
620 2 : "pitr_cutoff": "0/2210CD0",
621 2 : "next_gc_cutoff": "0/2210CD0",
622 2 : "retention_param_cutoff": null
623 2 : },
624 2 : {
625 2 : "timeline_id": "454626700469f0a9914949b9d018e876",
626 2 : "ancestor_lsn": "0/176D998",
627 2 : "last_record": "0/1837770",
628 2 : "latest_gc_cutoff": "0/1698C48",
629 2 : "horizon_cutoff": "0/1817770",
630 2 : "pitr_cutoff": "0/1817770",
631 2 : "next_gc_cutoff": "0/1817770",
632 2 : "retention_param_cutoff": null
633 2 : },
634 2 : {
635 2 : "timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
636 2 : "ancestor_lsn": "0/0",
637 2 : "last_record": "0/18D3D98",
638 2 : "latest_gc_cutoff": "0/1698C48",
639 2 : "horizon_cutoff": "0/18B3D98",
640 2 : "pitr_cutoff": "0/18B3D98",
641 2 : "next_gc_cutoff": "0/18B3D98",
642 2 : "retention_param_cutoff": null
643 2 : }
644 2 : ]
645 2 : }
646 2 : "#;
647 2 : let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
648 2 :
649 2 : assert_eq!(inputs.calculate().unwrap(), 37_851_408);
650 2 : }
651 :
652 2 : #[test]
653 2 : fn verify_size_for_one_branch() {
654 2 : let doc = r#"
655 2 : {
656 2 : "segments": [
657 2 : {
658 2 : "segment": {
659 2 : "parent": null,
660 2 : "lsn": 0,
661 2 : "size": null,
662 2 : "needed": false
663 2 : },
664 2 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
665 2 : "kind": "BranchStart"
666 2 : },
667 2 : {
668 2 : "segment": {
669 2 : "parent": 0,
670 2 : "lsn": 305547335776,
671 2 : "size": 220054675456,
672 2 : "needed": false
673 2 : },
674 2 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
675 2 : "kind": "GcCutOff"
676 2 : },
677 2 : {
678 2 : "segment": {
679 2 : "parent": 1,
680 2 : "lsn": 305614444640,
681 2 : "size": null,
682 2 : "needed": true
683 2 : },
684 2 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
685 2 : "kind": "BranchEnd"
686 2 : }
687 2 : ],
688 2 : "timeline_inputs": [
689 2 : {
690 2 : "timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
691 2 : "ancestor_lsn": "0/0",
692 2 : "last_record": "47/280A5860",
693 2 : "latest_gc_cutoff": "47/240A5860",
694 2 : "horizon_cutoff": "47/240A5860",
695 2 : "pitr_cutoff": "47/240A5860",
696 2 : "next_gc_cutoff": "47/240A5860",
697 2 : "retention_param_cutoff": "0/0"
698 2 : }
699 2 : ]
700 2 : }"#;
701 2 :
702 2 : let model: ModelInputs = serde_json::from_str(doc).unwrap();
703 2 :
704 2 : let res = model.calculate_model().unwrap().calculate();
705 2 :
706 2 : println!("calculated synthetic size: {}", res.total_size);
707 2 : println!("result: {:?}", serde_json::to_string(&res.segments));
708 2 :
709 2 : use utils::lsn::Lsn;
710 2 : let latest_gc_cutoff_lsn: Lsn = "47/240A5860".parse().unwrap();
711 2 : let last_lsn: Lsn = "47/280A5860".parse().unwrap();
712 2 : println!(
713 2 : "latest_gc_cutoff lsn 47/240A5860 is {}, last_lsn lsn 47/280A5860 is {}",
714 2 : u64::from(latest_gc_cutoff_lsn),
715 2 : u64::from(last_lsn)
716 2 : );
717 2 : assert_eq!(res.total_size, 220121784320);
718 2 : }
|