Line data Source code
1 : mod draw;
2 :
3 : use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp};
4 :
5 : use futures::StreamExt;
6 : use pageserver_api::shard::ShardIdentity;
7 : use rand::Rng;
8 : use tracing::info;
9 :
10 : use utils::lsn::Lsn;
11 :
12 : use std::fmt::Write;
13 : use std::ops::Range;
14 : use std::sync::Arc;
15 : use std::sync::Mutex;
16 :
17 : use crate::helpers::PAGE_SZ;
18 : use crate::helpers::{merge_delta_keys, overlaps_with};
19 :
20 : use crate::interface;
21 : use crate::interface::CompactionLayer;
22 :
23 : //
24 : // Implementation for the CompactionExecutor interface
25 : //
26 : pub struct MockTimeline {
27 : // Parameters for the compaction algorithm
28 : pub target_file_size: u64,
29 : tiers_per_level: u64,
30 :
31 : num_l0_flushes: u64,
32 : last_compact_at_flush: u64,
33 : last_flush_lsn: Lsn,
34 :
35 : // In-memory layer
36 : records: Vec<MockRecord>,
37 : total_len: u64,
38 : start_lsn: Lsn,
39 : end_lsn: Lsn,
40 :
41 : // Current keyspace at `end_lsn`. This is updated on every ingested record.
42 : keyspace: KeySpace,
43 :
44 : // historic keyspaces
45 : old_keyspaces: Vec<(Lsn, KeySpace)>,
46 :
47 : // "on-disk" layers
48 : pub live_layers: Vec<MockLayer>,
49 :
50 : num_deleted_layers: u64,
51 :
52 : // Statistics
53 : wal_ingested: u64,
54 : bytes_written: u64,
55 : bytes_deleted: u64,
56 : layers_created: u64,
57 : layers_deleted: u64,
58 :
59 : // All the events - creation and deletion of files - are collected
60 : // in 'history'. It is used to draw the SVG animation at the end.
61 : time: u64,
62 : history: Vec<draw::LayerTraceEvent>,
63 : }
64 :
65 : type KeySpace = interface::CompactionKeySpace<Key>;
66 :
67 : pub struct MockRequestContext {}
68 : impl interface::CompactionRequestContext for MockRequestContext {}
69 :
70 : pub type Key = u64;
71 :
72 : impl interface::CompactionKey for Key {
73 : const MIN: Self = u64::MIN;
74 : const MAX: Self = u64::MAX;
75 :
76 6 : fn key_range_size(key_range: &Range<Self>, _shard_identity: &ShardIdentity) -> u32 {
77 6 : std::cmp::min(key_range.end - key_range.start, u32::MAX as u64) as u32
78 6 : }
79 :
80 2988 : fn next(&self) -> Self {
81 2988 : self + 1
82 2988 : }
83 0 : fn skip_some(&self) -> Self {
84 0 : // round up to next xx
85 0 : self + 100
86 0 : }
87 : }
88 :
89 : #[derive(Clone)]
90 : pub struct MockRecord {
91 : lsn: Lsn,
92 : key: Key,
93 : len: u64,
94 : }
95 :
96 : impl interface::CompactionDeltaEntry<'_, Key> for MockRecord {
97 79711249 : fn key(&self) -> Key {
98 79711249 : self.key
99 79711249 : }
100 77846694 : fn lsn(&self) -> Lsn {
101 77846694 : self.lsn
102 77846694 : }
103 1678094 : fn size(&self) -> u64 {
104 1678094 : self.len
105 1678094 : }
106 : }
107 :
108 : pub struct MockDeltaLayer {
109 : pub key_range: Range<Key>,
110 : pub lsn_range: Range<Lsn>,
111 :
112 : pub file_size: u64,
113 :
114 : pub deleted: Mutex<bool>,
115 :
116 : pub records: Vec<MockRecord>,
117 : }
118 :
119 : impl interface::CompactionLayer<Key> for Arc<MockDeltaLayer> {
120 14632764 : fn key_range(&self) -> &Range<Key> {
121 14632764 : &self.key_range
122 14632764 : }
123 14687602 : fn lsn_range(&self) -> &Range<Lsn> {
124 14687602 : &self.lsn_range
125 14687602 : }
126 :
127 0 : fn file_size(&self) -> u64 {
128 0 : self.file_size
129 0 : }
130 :
131 762 : fn short_id(&self) -> String {
132 762 : format!(
133 762 : "{:016X}-{:016X}__{:08X}-{:08X}",
134 762 : self.key_range.start, self.key_range.end, self.lsn_range.start.0, self.lsn_range.end.0
135 762 : )
136 762 : }
137 :
138 0 : fn is_delta(&self) -> bool {
139 0 : true
140 0 : }
141 : }
142 :
143 : impl interface::CompactionDeltaLayer<MockTimeline> for Arc<MockDeltaLayer> {
144 : type DeltaEntry<'a> = MockRecord;
145 :
146 3216 : async fn load_keys<'a>(&self, _ctx: &MockRequestContext) -> anyhow::Result<Vec<MockRecord>> {
147 3216 : Ok(self.records.clone())
148 3216 : }
149 : }
150 :
151 : pub struct MockImageLayer {
152 : pub key_range: Range<Key>,
153 : pub lsn_range: Range<Lsn>,
154 :
155 : pub file_size: u64,
156 :
157 : pub deleted: Mutex<bool>,
158 : }
159 :
160 : impl interface::CompactionImageLayer<MockTimeline> for Arc<MockImageLayer> {}
161 :
162 : impl interface::CompactionLayer<Key> for Arc<MockImageLayer> {
163 6 : fn key_range(&self) -> &Range<Key> {
164 6 : &self.key_range
165 6 : }
166 14 : fn lsn_range(&self) -> &Range<Lsn> {
167 14 : &self.lsn_range
168 14 : }
169 :
170 0 : fn file_size(&self) -> u64 {
171 0 : self.file_size
172 0 : }
173 :
174 0 : fn short_id(&self) -> String {
175 0 : format!(
176 0 : "{:016X}-{:016X}__{:08X}",
177 0 : self.key_range.start, self.key_range.end, self.lsn_range.start.0,
178 0 : )
179 0 : }
180 :
181 0 : fn is_delta(&self) -> bool {
182 0 : false
183 0 : }
184 : }
185 :
186 : impl MockTimeline {
187 4 : pub fn new() -> Self {
188 4 : MockTimeline {
189 4 : target_file_size: 256 * 1024 * 1024,
190 4 : tiers_per_level: 4,
191 4 :
192 4 : num_l0_flushes: 0,
193 4 : last_compact_at_flush: 0,
194 4 : last_flush_lsn: Lsn(0),
195 4 :
196 4 : records: Vec::new(),
197 4 : total_len: 0,
198 4 : start_lsn: Lsn(1000),
199 4 : end_lsn: Lsn(1000),
200 4 : keyspace: KeySpace::new(),
201 4 :
202 4 : old_keyspaces: vec![],
203 4 :
204 4 : live_layers: vec![],
205 4 :
206 4 : num_deleted_layers: 0,
207 4 :
208 4 : wal_ingested: 0,
209 4 : bytes_written: 0,
210 4 : bytes_deleted: 0,
211 4 : layers_created: 0,
212 4 : layers_deleted: 0,
213 4 :
214 4 : time: 0,
215 4 : history: Vec::new(),
216 4 : }
217 4 : }
218 :
219 2000 : pub async fn compact(&mut self) -> anyhow::Result<()> {
220 2000 : let ctx = MockRequestContext {};
221 2000 :
222 2000 : crate::compact_tiered::compact_tiered(
223 2000 : self,
224 2000 : self.last_flush_lsn,
225 2000 : self.target_file_size,
226 2000 : self.tiers_per_level,
227 2000 : &ctx,
228 2000 : )
229 0 : .await?;
230 :
231 2000 : Ok(())
232 2000 : }
233 :
234 : // Ingest one record to the timeline
235 2277600 : pub fn ingest_record(&mut self, key: Key, len: u64) {
236 2277600 : self.records.push(MockRecord {
237 2277600 : lsn: self.end_lsn,
238 2277600 : key,
239 2277600 : len,
240 2277600 : });
241 2277600 : self.total_len += len;
242 2277600 : self.end_lsn += len;
243 2277600 :
244 2277600 : if self.total_len > self.target_file_size {
245 98 : self.flush_l0();
246 2277502 : }
247 2277600 : }
248 :
249 0 : pub async fn compact_if_needed(&mut self) -> anyhow::Result<()> {
250 0 : if self.num_l0_flushes - self.last_compact_at_flush >= self.tiers_per_level {
251 0 : self.compact().await?;
252 0 : self.last_compact_at_flush = self.num_l0_flushes;
253 0 : }
254 0 : Ok(())
255 0 : }
256 :
257 98 : pub fn flush_l0(&mut self) {
258 98 : if self.records.is_empty() {
259 0 : return;
260 98 : }
261 98 :
262 98 : let mut records = std::mem::take(&mut self.records);
263 33138958 : records.sort_by_key(|rec| rec.key);
264 98 :
265 98 : let lsn_range = self.start_lsn..self.end_lsn;
266 98 : let new_layer = Arc::new(MockDeltaLayer {
267 98 : key_range: Key::MIN..Key::MAX,
268 98 : lsn_range: lsn_range.clone(),
269 98 : file_size: self.total_len,
270 98 : records,
271 98 : deleted: Mutex::new(false),
272 98 : });
273 98 : info!("flushed L0 layer {}", new_layer.short_id());
274 98 : self.live_layers.push(MockLayer::from(&new_layer));
275 98 :
276 98 : // reset L0
277 98 : self.start_lsn = self.end_lsn;
278 98 : self.total_len = 0;
279 98 : self.records = Vec::new();
280 98 :
281 98 : self.layers_created += 1;
282 98 : self.bytes_written += new_layer.file_size;
283 98 :
284 98 : self.time += 1;
285 98 : self.history.push(LayerTraceEvent {
286 98 : time_rel: self.time,
287 98 : op: LayerTraceOp::Flush,
288 98 : file: LayerTraceFile {
289 98 : filename: new_layer.short_id(),
290 98 : key_range: new_layer.key_range.clone(),
291 98 : lsn_range: new_layer.lsn_range.clone(),
292 98 : },
293 98 : });
294 98 :
295 98 : self.num_l0_flushes += 1;
296 98 : self.last_flush_lsn = self.end_lsn;
297 98 : }
298 :
299 : // Ingest `num_records' records to the timeline, with random keys
300 : // uniformly distributed in `key_range`
301 4794 : pub fn ingest_uniform(
302 4794 : &mut self,
303 4794 : num_records: u64,
304 4794 : len: u64,
305 4794 : key_range: &Range<Key>,
306 4794 : ) -> anyhow::Result<()> {
307 4794 : crate::helpers::union_to_keyspace(&mut self.keyspace, vec![key_range.clone()]);
308 4794 : let mut rng = rand::thread_rng();
309 2277600 : for _ in 0..num_records {
310 2277600 : self.ingest_record(rng.gen_range(key_range.clone()), len);
311 2277600 : self.wal_ingested += len;
312 2277600 : }
313 4794 : Ok(())
314 4794 : }
315 :
316 0 : pub fn stats(&self) -> anyhow::Result<String> {
317 0 : let mut s = String::new();
318 0 :
319 0 : writeln!(s, "STATISTICS:")?;
320 0 : writeln!(
321 0 : s,
322 0 : "WAL ingested: {:>10} MB",
323 0 : self.wal_ingested / (1024 * 1024)
324 0 : )?;
325 0 : writeln!(
326 0 : s,
327 0 : "size created: {:>10} MB",
328 0 : self.bytes_written / (1024 * 1024)
329 0 : )?;
330 0 : writeln!(
331 0 : s,
332 0 : "size deleted: {:>10} MB",
333 0 : self.bytes_deleted / (1024 * 1024)
334 0 : )?;
335 0 : writeln!(s, "files created: {:>10}", self.layers_created)?;
336 0 : writeln!(s, "files deleted: {:>10}", self.layers_deleted)?;
337 0 : writeln!(
338 0 : s,
339 0 : "write amp: {:>10.2}",
340 0 : self.bytes_written as f64 / self.wal_ingested as f64
341 0 : )?;
342 0 : writeln!(
343 0 : s,
344 0 : "storage amp: {:>10.2}",
345 0 : (self.bytes_written - self.bytes_deleted) as f64 / self.wal_ingested as f64
346 0 : )?;
347 :
348 0 : Ok(s)
349 0 : }
350 :
351 0 : pub fn draw_history<W: std::io::Write>(&self, output: W) -> anyhow::Result<()> {
352 0 : draw::draw_history(&self.history, output)
353 0 : }
354 : }
355 :
356 : impl Default for MockTimeline {
357 0 : fn default() -> Self {
358 0 : Self::new()
359 0 : }
360 : }
361 :
362 : #[derive(Clone)]
363 : pub enum MockLayer {
364 : Delta(Arc<MockDeltaLayer>),
365 : Image(Arc<MockImageLayer>),
366 : }
367 :
368 : impl interface::CompactionLayer<Key> for MockLayer {
369 23826 : fn key_range(&self) -> &Range<Key> {
370 23826 : match self {
371 23820 : MockLayer::Delta(this) => this.key_range(),
372 6 : MockLayer::Image(this) => this.key_range(),
373 : }
374 23826 : }
375 78672 : fn lsn_range(&self) -> &Range<Lsn> {
376 78672 : match self {
377 78658 : MockLayer::Delta(this) => this.lsn_range(),
378 14 : MockLayer::Image(this) => this.lsn_range(),
379 : }
380 78672 : }
381 416 : fn file_size(&self) -> u64 {
382 416 : match self {
383 416 : MockLayer::Delta(this) => this.file_size,
384 0 : MockLayer::Image(this) => this.file_size,
385 : }
386 416 : }
387 370 : fn short_id(&self) -> String {
388 370 : match self {
389 370 : MockLayer::Delta(this) => this.short_id(),
390 0 : MockLayer::Image(this) => this.short_id(),
391 : }
392 370 : }
393 :
394 22102 : fn is_delta(&self) -> bool {
395 22102 : match self {
396 22094 : MockLayer::Delta(_) => true,
397 8 : MockLayer::Image(_) => false,
398 : }
399 22102 : }
400 : }
401 :
402 : impl MockLayer {
403 12082 : fn is_deleted(&self) -> bool {
404 12082 : let guard = match self {
405 12082 : MockLayer::Delta(this) => this.deleted.lock().unwrap(),
406 0 : MockLayer::Image(this) => this.deleted.lock().unwrap(),
407 : };
408 12082 : *guard
409 12082 : }
410 94 : fn mark_deleted(&self) {
411 94 : let mut deleted_guard = match self {
412 94 : MockLayer::Delta(this) => this.deleted.lock().unwrap(),
413 0 : MockLayer::Image(this) => this.deleted.lock().unwrap(),
414 : };
415 94 : assert!(!*deleted_guard, "layer already deleted");
416 94 : *deleted_guard = true;
417 94 : }
418 : }
419 :
420 : impl From<&Arc<MockDeltaLayer>> for MockLayer {
421 98 : fn from(l: &Arc<MockDeltaLayer>) -> Self {
422 98 : MockLayer::Delta(l.clone())
423 98 : }
424 : }
425 :
426 : impl From<&Arc<MockImageLayer>> for MockLayer {
427 0 : fn from(l: &Arc<MockImageLayer>) -> Self {
428 0 : MockLayer::Image(l.clone())
429 0 : }
430 : }
431 :
432 : impl interface::CompactionJobExecutor for MockTimeline {
433 : type Key = Key;
434 : type Layer = MockLayer;
435 : type DeltaLayer = Arc<MockDeltaLayer>;
436 : type ImageLayer = Arc<MockImageLayer>;
437 : type RequestContext = MockRequestContext;
438 :
439 6 : fn get_shard_identity(&self) -> &ShardIdentity {
440 6 : static IDENTITY: ShardIdentity = ShardIdentity::unsharded();
441 6 : &IDENTITY
442 6 : }
443 :
444 2006 : async fn get_layers(
445 2006 : &mut self,
446 2006 : key_range: &Range<Self::Key>,
447 2006 : lsn_range: &Range<Lsn>,
448 2006 : _ctx: &Self::RequestContext,
449 2006 : ) -> anyhow::Result<Vec<Self::Layer>> {
450 2006 : // Clear any deleted layers from our vec
451 12082 : self.live_layers.retain(|l| !l.is_deleted());
452 2006 :
453 2006 : let layers: Vec<MockLayer> = self
454 2006 : .live_layers
455 2006 : .iter()
456 11988 : .filter(|l| {
457 11988 : overlaps_with(l.lsn_range(), lsn_range) && overlaps_with(l.key_range(), key_range)
458 11988 : })
459 2006 : .cloned()
460 2006 : .collect();
461 2006 :
462 2006 : Ok(layers)
463 2006 : }
464 :
465 6 : async fn get_keyspace(
466 6 : &mut self,
467 6 : key_range: &Range<Self::Key>,
468 6 : _lsn: Lsn,
469 6 : _ctx: &Self::RequestContext,
470 6 : ) -> anyhow::Result<interface::CompactionKeySpace<Key>> {
471 6 : // find it in the levels
472 6 : if self.old_keyspaces.is_empty() {
473 6 : Ok(crate::helpers::intersect_keyspace(
474 6 : &self.keyspace,
475 6 : key_range,
476 6 : ))
477 : } else {
478 : // not implemented
479 :
480 : // The mock implementation only allows requesting the
481 : // keyspace at the level's end LSN. That's all that the
482 : // current implementation needs.
483 0 : panic!("keyspace not available for requested lsn");
484 : }
485 6 : }
486 :
487 3216 : async fn downcast_delta_layer(
488 3216 : &self,
489 3216 : layer: &MockLayer,
490 3216 : ) -> anyhow::Result<Option<Arc<MockDeltaLayer>>> {
491 3216 : Ok(match layer {
492 3216 : MockLayer::Delta(l) => Some(l.clone()),
493 0 : MockLayer::Image(_) => None,
494 : })
495 3216 : }
496 :
497 0 : async fn create_image(
498 0 : &mut self,
499 0 : lsn: Lsn,
500 0 : key_range: &Range<Key>,
501 0 : ctx: &MockRequestContext,
502 0 : ) -> anyhow::Result<()> {
503 0 : let keyspace = self.get_keyspace(key_range, lsn, ctx).await?;
504 :
505 0 : let mut accum_size: u64 = 0;
506 0 : for r in keyspace {
507 0 : accum_size += r.end - r.start;
508 0 : }
509 :
510 0 : let new_layer = Arc::new(MockImageLayer {
511 0 : key_range: key_range.clone(),
512 0 : lsn_range: lsn..lsn,
513 0 : file_size: accum_size * PAGE_SZ,
514 0 : deleted: Mutex::new(false),
515 0 : });
516 0 : info!(
517 0 : "created image layer, size {}: {}",
518 0 : new_layer.file_size,
519 0 : new_layer.short_id()
520 : );
521 0 : self.live_layers.push(MockLayer::Image(new_layer.clone()));
522 0 :
523 0 : // update stats
524 0 : self.bytes_written += new_layer.file_size;
525 0 : self.layers_created += 1;
526 0 :
527 0 : self.time += 1;
528 0 : self.history.push(LayerTraceEvent {
529 0 : time_rel: self.time,
530 0 : op: LayerTraceOp::CreateImage,
531 0 : file: LayerTraceFile {
532 0 : filename: new_layer.short_id(),
533 0 : key_range: new_layer.key_range.clone(),
534 0 : lsn_range: new_layer.lsn_range.clone(),
535 0 : },
536 0 : });
537 0 :
538 0 : Ok(())
539 0 : }
540 :
541 98 : async fn create_delta(
542 98 : &mut self,
543 98 : lsn_range: &Range<Lsn>,
544 98 : key_range: &Range<Key>,
545 98 : input_layers: &[Arc<MockDeltaLayer>],
546 98 : ctx: &MockRequestContext,
547 98 : ) -> anyhow::Result<()> {
548 98 : let mut key_value_stream =
549 98 : std::pin::pin!(merge_delta_keys::<MockTimeline>(input_layers, ctx));
550 98 : let mut records: Vec<MockRecord> = Vec::new();
551 98 : let mut total_len = 2;
552 11045220 : while let Some(delta_entry) = key_value_stream.next().await {
553 11045122 : let delta_entry: MockRecord = delta_entry?;
554 11045122 : if key_range.contains(&delta_entry.key) && lsn_range.contains(&delta_entry.lsn) {
555 1678094 : total_len += delta_entry.len;
556 1678094 : records.push(delta_entry);
557 9367028 : }
558 : }
559 98 : let total_records = records.len();
560 98 : let new_layer = Arc::new(MockDeltaLayer {
561 98 : key_range: key_range.clone(),
562 98 : lsn_range: lsn_range.clone(),
563 98 : file_size: total_len,
564 98 : records,
565 98 : deleted: Mutex::new(false),
566 98 : });
567 98 : info!(
568 0 : "created delta layer, recs {}, size {}: {}",
569 0 : total_records,
570 0 : total_len,
571 0 : new_layer.short_id()
572 : );
573 98 : self.live_layers.push(MockLayer::Delta(new_layer.clone()));
574 98 :
575 98 : // update stats
576 98 : self.bytes_written += total_len;
577 98 : self.layers_created += 1;
578 98 :
579 98 : self.time += 1;
580 98 : self.history.push(LayerTraceEvent {
581 98 : time_rel: self.time,
582 98 : op: LayerTraceOp::CreateDelta,
583 98 : file: LayerTraceFile {
584 98 : filename: new_layer.short_id(),
585 98 : key_range: new_layer.key_range.clone(),
586 98 : lsn_range: new_layer.lsn_range.clone(),
587 98 : },
588 98 : });
589 98 :
590 98 : Ok(())
591 98 : }
592 :
593 94 : async fn delete_layer(
594 94 : &mut self,
595 94 : layer: &Self::Layer,
596 94 : _ctx: &MockRequestContext,
597 94 : ) -> anyhow::Result<()> {
598 94 : let layer = std::pin::pin!(layer);
599 94 : info!("deleting layer: {}", layer.short_id());
600 94 : self.num_deleted_layers += 1;
601 94 : self.bytes_deleted += layer.file_size();
602 94 : layer.mark_deleted();
603 94 :
604 94 : self.time += 1;
605 94 : self.history.push(LayerTraceEvent {
606 94 : time_rel: self.time,
607 94 : op: LayerTraceOp::Delete,
608 94 : file: LayerTraceFile {
609 94 : filename: layer.short_id(),
610 94 : key_range: layer.key_range().clone(),
611 94 : lsn_range: layer.lsn_range().clone(),
612 94 : },
613 94 : });
614 94 :
615 94 : Ok(())
616 94 : }
617 : }
|