Line data Source code
1 : mod draw;
2 :
3 : use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp};
4 :
5 : use async_trait::async_trait;
6 : use futures::StreamExt;
7 : use rand::Rng;
8 : use tracing::info;
9 :
10 : use utils::lsn::Lsn;
11 :
12 : use std::fmt::Write;
13 : use std::ops::Range;
14 : use std::sync::Arc;
15 : use std::sync::Mutex;
16 :
17 : use crate::helpers::{merge_delta_keys, overlaps_with};
18 :
19 : use crate::interface;
20 : use crate::interface::CompactionLayer;
21 :
22 : //
23 : // Implementation for the CompactionExecutor interface
24 : //
25 : pub struct MockTimeline {
26 : // Parameters for the compaction algorithm
27 : pub target_file_size: u64,
28 : tiers_per_level: u64,
29 :
30 : num_l0_flushes: u64,
31 : last_compact_at_flush: u64,
32 : last_flush_lsn: Lsn,
33 :
34 : // In-memory layer
35 : records: Vec<MockRecord>,
36 : total_len: u64,
37 : start_lsn: Lsn,
38 : end_lsn: Lsn,
39 :
40 : // Current keyspace at `end_lsn`. This is updated on every ingested record.
41 : keyspace: KeySpace,
42 :
43 : // historic keyspaces
44 : old_keyspaces: Vec<(Lsn, KeySpace)>,
45 :
46 : // "on-disk" layers
47 : pub live_layers: Vec<MockLayer>,
48 :
49 : num_deleted_layers: u64,
50 :
51 : // Statistics
52 : wal_ingested: u64,
53 : bytes_written: u64,
54 : bytes_deleted: u64,
55 : layers_created: u64,
56 : layers_deleted: u64,
57 :
58 : // All the events - creation and deletion of files - are collected
59 : // in 'history'. It is used to draw the SVG animation at the end.
60 : time: u64,
61 : history: Vec<draw::LayerTraceEvent>,
62 : }
63 :
64 : type KeySpace = interface::CompactionKeySpace<Key>;
65 :
66 : pub struct MockRequestContext {}
67 : impl interface::CompactionRequestContext for MockRequestContext {}
68 :
69 : pub type Key = u64;
70 :
71 : impl interface::CompactionKey for Key {
72 : const MIN: Self = u64::MIN;
73 : const MAX: Self = u64::MAX;
74 :
75 0 : fn key_range_size(key_range: &Range<Self>) -> u32 {
76 0 : std::cmp::min(key_range.end - key_range.start, u32::MAX as u64) as u32
77 0 : }
78 :
79 0 : fn next(&self) -> Self {
80 0 : self + 1
81 0 : }
82 0 : fn skip_some(&self) -> Self {
83 0 : // round up to next xx
84 0 : self + 100
85 0 : }
86 : }
87 :
88 0 : #[derive(Clone)]
89 : pub struct MockRecord {
90 : lsn: Lsn,
91 : key: Key,
92 : len: u64,
93 : }
94 :
95 : impl interface::CompactionDeltaEntry<'_, Key> for MockRecord {
96 0 : fn key(&self) -> Key {
97 0 : self.key
98 0 : }
99 0 : fn lsn(&self) -> Lsn {
100 0 : self.lsn
101 0 : }
102 0 : fn size(&self) -> u64 {
103 0 : self.len
104 0 : }
105 : }
106 :
107 : pub struct MockDeltaLayer {
108 : pub key_range: Range<Key>,
109 : pub lsn_range: Range<Lsn>,
110 :
111 : pub file_size: u64,
112 :
113 : pub deleted: Mutex<bool>,
114 :
115 : pub records: Vec<MockRecord>,
116 : }
117 :
118 : impl interface::CompactionLayer<Key> for Arc<MockDeltaLayer> {
119 76 : fn key_range(&self) -> &Range<Key> {
120 76 : &self.key_range
121 76 : }
122 514 : fn lsn_range(&self) -> &Range<Lsn> {
123 514 : &self.lsn_range
124 514 : }
125 :
126 0 : fn file_size(&self) -> u64 {
127 0 : self.file_size
128 0 : }
129 :
130 2 : fn short_id(&self) -> String {
131 2 : format!(
132 2 : "{:016X}-{:016X}__{:08X}-{:08X}",
133 2 : self.key_range.start, self.key_range.end, self.lsn_range.start.0, self.lsn_range.end.0
134 2 : )
135 2 : }
136 :
137 0 : fn is_delta(&self) -> bool {
138 0 : true
139 0 : }
140 : }
141 :
142 : #[async_trait]
143 : impl interface::CompactionDeltaLayer<MockTimeline> for Arc<MockDeltaLayer> {
144 : type DeltaEntry<'a> = MockRecord;
145 :
146 0 : async fn load_keys<'a>(&self, _ctx: &MockRequestContext) -> anyhow::Result<Vec<MockRecord>> {
147 0 : Ok(self.records.clone())
148 0 : }
149 : }
150 :
151 : pub struct MockImageLayer {
152 : pub key_range: Range<Key>,
153 : pub lsn_range: Range<Lsn>,
154 :
155 : pub file_size: u64,
156 :
157 : pub deleted: Mutex<bool>,
158 : }
159 :
160 : impl interface::CompactionImageLayer<MockTimeline> for Arc<MockImageLayer> {}
161 :
162 : impl interface::CompactionLayer<Key> for Arc<MockImageLayer> {
163 4 : fn key_range(&self) -> &Range<Key> {
164 4 : &self.key_range
165 4 : }
166 14 : fn lsn_range(&self) -> &Range<Lsn> {
167 14 : &self.lsn_range
168 14 : }
169 :
170 0 : fn file_size(&self) -> u64 {
171 0 : self.file_size
172 0 : }
173 :
174 0 : fn short_id(&self) -> String {
175 0 : format!(
176 0 : "{:016X}-{:016X}__{:08X}",
177 0 : self.key_range.start, self.key_range.end, self.lsn_range.start.0,
178 0 : )
179 0 : }
180 :
181 0 : fn is_delta(&self) -> bool {
182 0 : false
183 0 : }
184 : }
185 :
186 : impl MockTimeline {
187 0 : pub fn new() -> Self {
188 0 : MockTimeline {
189 0 : target_file_size: 256 * 1024 * 1024,
190 0 : tiers_per_level: 4,
191 0 :
192 0 : num_l0_flushes: 0,
193 0 : last_compact_at_flush: 0,
194 0 : last_flush_lsn: Lsn(0),
195 0 :
196 0 : records: Vec::new(),
197 0 : total_len: 0,
198 0 : start_lsn: Lsn(1000),
199 0 : end_lsn: Lsn(1000),
200 0 : keyspace: KeySpace::new(),
201 0 :
202 0 : old_keyspaces: vec![],
203 0 :
204 0 : live_layers: vec![],
205 0 :
206 0 : num_deleted_layers: 0,
207 0 :
208 0 : wal_ingested: 0,
209 0 : bytes_written: 0,
210 0 : bytes_deleted: 0,
211 0 : layers_created: 0,
212 0 : layers_deleted: 0,
213 0 :
214 0 : time: 0,
215 0 : history: Vec::new(),
216 0 : }
217 0 : }
218 :
219 0 : pub async fn compact(&mut self) -> anyhow::Result<()> {
220 0 : let ctx = MockRequestContext {};
221 0 :
222 0 : crate::compact_tiered::compact_tiered(
223 0 : self,
224 0 : self.last_flush_lsn,
225 0 : self.target_file_size,
226 0 : self.tiers_per_level,
227 0 : &ctx,
228 0 : )
229 0 : .await?;
230 :
231 0 : Ok(())
232 0 : }
233 :
234 : // Ingest one record to the timeline
235 0 : pub fn ingest_record(&mut self, key: Key, len: u64) {
236 0 : self.records.push(MockRecord {
237 0 : lsn: self.end_lsn,
238 0 : key,
239 0 : len,
240 0 : });
241 0 : self.total_len += len;
242 0 : self.end_lsn += len;
243 0 :
244 0 : if self.total_len > self.target_file_size {
245 0 : self.flush_l0();
246 0 : }
247 0 : }
248 :
249 0 : pub async fn compact_if_needed(&mut self) -> anyhow::Result<()> {
250 0 : if self.num_l0_flushes - self.last_compact_at_flush >= self.tiers_per_level {
251 0 : self.compact().await?;
252 0 : self.last_compact_at_flush = self.num_l0_flushes;
253 0 : }
254 0 : Ok(())
255 0 : }
256 :
257 0 : pub fn flush_l0(&mut self) {
258 0 : if self.records.is_empty() {
259 0 : return;
260 0 : }
261 0 :
262 0 : let mut records = std::mem::take(&mut self.records);
263 0 : records.sort_by_key(|rec| rec.key);
264 0 :
265 0 : let lsn_range = self.start_lsn..self.end_lsn;
266 0 : let new_layer = Arc::new(MockDeltaLayer {
267 0 : key_range: Key::MIN..Key::MAX,
268 0 : lsn_range: lsn_range.clone(),
269 0 : file_size: self.total_len,
270 0 : records,
271 0 : deleted: Mutex::new(false),
272 0 : });
273 0 : info!("flushed L0 layer {}", new_layer.short_id());
274 0 : self.live_layers.push(MockLayer::from(&new_layer));
275 0 :
276 0 : // reset L0
277 0 : self.start_lsn = self.end_lsn;
278 0 : self.total_len = 0;
279 0 : self.records = Vec::new();
280 0 :
281 0 : self.layers_created += 1;
282 0 : self.bytes_written += new_layer.file_size;
283 0 :
284 0 : self.time += 1;
285 0 : self.history.push(LayerTraceEvent {
286 0 : time_rel: self.time,
287 0 : op: LayerTraceOp::Flush,
288 0 : file: LayerTraceFile {
289 0 : filename: new_layer.short_id(),
290 0 : key_range: new_layer.key_range.clone(),
291 0 : lsn_range: new_layer.lsn_range.clone(),
292 0 : },
293 0 : });
294 0 :
295 0 : self.num_l0_flushes += 1;
296 0 : self.last_flush_lsn = self.end_lsn;
297 0 : }
298 :
299 : // Ingest `num_records' records to the timeline, with random keys
300 : // uniformly distributed in `key_range`
301 0 : pub fn ingest_uniform(
302 0 : &mut self,
303 0 : num_records: u64,
304 0 : len: u64,
305 0 : key_range: &Range<Key>,
306 0 : ) -> anyhow::Result<()> {
307 0 : crate::helpers::union_to_keyspace(&mut self.keyspace, vec![key_range.clone()]);
308 0 : let mut rng = rand::thread_rng();
309 0 : for _ in 0..num_records {
310 0 : self.ingest_record(rng.gen_range(key_range.clone()), len);
311 0 : self.wal_ingested += len;
312 0 : }
313 0 : Ok(())
314 0 : }
315 :
316 0 : pub fn stats(&self) -> anyhow::Result<String> {
317 0 : let mut s = String::new();
318 0 :
319 0 : writeln!(s, "STATISTICS:")?;
320 0 : writeln!(
321 0 : s,
322 0 : "WAL ingested: {:>10} MB",
323 0 : self.wal_ingested / (1024 * 1024)
324 0 : )?;
325 0 : writeln!(
326 0 : s,
327 0 : "size created: {:>10} MB",
328 0 : self.bytes_written / (1024 * 1024)
329 0 : )?;
330 0 : writeln!(
331 0 : s,
332 0 : "size deleted: {:>10} MB",
333 0 : self.bytes_deleted / (1024 * 1024)
334 0 : )?;
335 0 : writeln!(s, "files created: {:>10}", self.layers_created)?;
336 0 : writeln!(s, "files deleted: {:>10}", self.layers_deleted)?;
337 0 : writeln!(
338 0 : s,
339 0 : "write amp: {:>10.2}",
340 0 : self.bytes_written as f64 / self.wal_ingested as f64
341 0 : )?;
342 0 : writeln!(
343 0 : s,
344 0 : "storage amp: {:>10.2}",
345 0 : (self.bytes_written - self.bytes_deleted) as f64 / self.wal_ingested as f64
346 0 : )?;
347 :
348 0 : Ok(s)
349 0 : }
350 :
351 0 : pub fn draw_history<W: std::io::Write>(&self, output: W) -> anyhow::Result<()> {
352 0 : draw::draw_history(&self.history, output)
353 0 : }
354 : }
355 :
356 : impl Default for MockTimeline {
357 0 : fn default() -> Self {
358 0 : Self::new()
359 0 : }
360 : }
361 :
362 106 : #[derive(Clone)]
363 : pub enum MockLayer {
364 : Delta(Arc<MockDeltaLayer>),
365 : Image(Arc<MockImageLayer>),
366 : }
367 :
368 : impl interface::CompactionLayer<Key> for MockLayer {
369 80 : fn key_range(&self) -> &Range<Key> {
370 80 : match self {
371 76 : MockLayer::Delta(this) => this.key_range(),
372 4 : MockLayer::Image(this) => this.key_range(),
373 : }
374 80 : }
375 528 : fn lsn_range(&self) -> &Range<Lsn> {
376 528 : match self {
377 514 : MockLayer::Delta(this) => this.lsn_range(),
378 14 : MockLayer::Image(this) => this.lsn_range(),
379 : }
380 528 : }
381 0 : fn file_size(&self) -> u64 {
382 0 : match self {
383 0 : MockLayer::Delta(this) => this.file_size(),
384 0 : MockLayer::Image(this) => this.file_size(),
385 : }
386 0 : }
387 2 : fn short_id(&self) -> String {
388 2 : match self {
389 2 : MockLayer::Delta(this) => this.short_id(),
390 0 : MockLayer::Image(this) => this.short_id(),
391 : }
392 2 : }
393 :
394 198 : fn is_delta(&self) -> bool {
395 198 : match self {
396 190 : MockLayer::Delta(_) => true,
397 8 : MockLayer::Image(_) => false,
398 : }
399 198 : }
400 : }
401 :
402 : impl MockLayer {
403 0 : fn is_deleted(&self) -> bool {
404 0 : let guard = match self {
405 0 : MockLayer::Delta(this) => this.deleted.lock().unwrap(),
406 0 : MockLayer::Image(this) => this.deleted.lock().unwrap(),
407 : };
408 0 : *guard
409 0 : }
410 0 : fn mark_deleted(&self) {
411 0 : let mut deleted_guard = match self {
412 0 : MockLayer::Delta(this) => this.deleted.lock().unwrap(),
413 0 : MockLayer::Image(this) => this.deleted.lock().unwrap(),
414 : };
415 0 : assert!(!*deleted_guard, "layer already deleted");
416 0 : *deleted_guard = true;
417 0 : }
418 : }
419 :
420 : impl From<&Arc<MockDeltaLayer>> for MockLayer {
421 0 : fn from(l: &Arc<MockDeltaLayer>) -> Self {
422 0 : MockLayer::Delta(l.clone())
423 0 : }
424 : }
425 :
426 : impl From<&Arc<MockImageLayer>> for MockLayer {
427 0 : fn from(l: &Arc<MockImageLayer>) -> Self {
428 0 : MockLayer::Image(l.clone())
429 0 : }
430 : }
431 :
432 : #[async_trait]
433 : impl interface::CompactionJobExecutor for MockTimeline {
434 : type Key = Key;
435 : type Layer = MockLayer;
436 : type DeltaLayer = Arc<MockDeltaLayer>;
437 : type ImageLayer = Arc<MockImageLayer>;
438 : type RequestContext = MockRequestContext;
439 :
440 0 : async fn get_layers(
441 0 : &mut self,
442 0 : key_range: &Range<Self::Key>,
443 0 : lsn_range: &Range<Lsn>,
444 0 : _ctx: &Self::RequestContext,
445 0 : ) -> anyhow::Result<Vec<Self::Layer>> {
446 : // Clear any deleted layers from our vec
447 0 : self.live_layers.retain(|l| !l.is_deleted());
448 0 :
449 0 : let layers: Vec<MockLayer> = self
450 0 : .live_layers
451 0 : .iter()
452 0 : .filter(|l| {
453 0 : overlaps_with(l.lsn_range(), lsn_range) && overlaps_with(l.key_range(), key_range)
454 0 : })
455 0 : .cloned()
456 0 : .collect();
457 0 :
458 0 : Ok(layers)
459 0 : }
460 :
461 0 : async fn get_keyspace(
462 0 : &mut self,
463 0 : key_range: &Range<Self::Key>,
464 0 : _lsn: Lsn,
465 0 : _ctx: &Self::RequestContext,
466 0 : ) -> anyhow::Result<interface::CompactionKeySpace<Key>> {
467 : // find it in the levels
468 0 : if self.old_keyspaces.is_empty() {
469 0 : Ok(crate::helpers::intersect_keyspace(
470 0 : &self.keyspace,
471 0 : key_range,
472 0 : ))
473 : } else {
474 : // not implemented
475 :
476 : // The mock implementation only allows requesting the
477 : // keyspace at the level's end LSN. That's all that the
478 : // current implementation needs.
479 0 : panic!("keyspace not available for requested lsn");
480 : }
481 0 : }
482 :
483 0 : async fn downcast_delta_layer(
484 0 : &self,
485 0 : layer: &MockLayer,
486 0 : ) -> anyhow::Result<Option<Arc<MockDeltaLayer>>> {
487 0 : Ok(match layer {
488 0 : MockLayer::Delta(l) => Some(l.clone()),
489 0 : MockLayer::Image(_) => None,
490 : })
491 0 : }
492 :
493 0 : async fn create_image(
494 0 : &mut self,
495 0 : lsn: Lsn,
496 0 : key_range: &Range<Key>,
497 0 : ctx: &MockRequestContext,
498 0 : ) -> anyhow::Result<()> {
499 0 : let keyspace = self.get_keyspace(key_range, lsn, ctx).await?;
500 :
501 0 : let mut accum_size: u64 = 0;
502 0 : for r in keyspace {
503 0 : accum_size += r.end - r.start;
504 0 : }
505 :
506 0 : let new_layer = Arc::new(MockImageLayer {
507 0 : key_range: key_range.clone(),
508 0 : lsn_range: lsn..lsn,
509 0 : file_size: accum_size * 8192,
510 0 : deleted: Mutex::new(false),
511 0 : });
512 0 : info!(
513 0 : "created image layer, size {}: {}",
514 0 : new_layer.file_size,
515 0 : new_layer.short_id()
516 0 : );
517 0 : self.live_layers.push(MockLayer::Image(new_layer.clone()));
518 0 :
519 0 : // update stats
520 0 : self.bytes_written += new_layer.file_size;
521 0 : self.layers_created += 1;
522 0 :
523 0 : self.time += 1;
524 0 : self.history.push(LayerTraceEvent {
525 0 : time_rel: self.time,
526 0 : op: LayerTraceOp::CreateImage,
527 0 : file: LayerTraceFile {
528 0 : filename: new_layer.short_id(),
529 0 : key_range: new_layer.key_range.clone(),
530 0 : lsn_range: new_layer.lsn_range.clone(),
531 0 : },
532 0 : });
533 0 :
534 0 : Ok(())
535 0 : }
536 :
537 0 : async fn create_delta(
538 0 : &mut self,
539 0 : lsn_range: &Range<Lsn>,
540 0 : key_range: &Range<Key>,
541 0 : input_layers: &[Arc<MockDeltaLayer>],
542 0 : ctx: &MockRequestContext,
543 0 : ) -> anyhow::Result<()> {
544 0 : let mut key_value_stream =
545 0 : std::pin::pin!(merge_delta_keys::<MockTimeline>(input_layers, ctx));
546 0 : let mut records: Vec<MockRecord> = Vec::new();
547 0 : let mut total_len = 2;
548 0 : while let Some(delta_entry) = key_value_stream.next().await {
549 0 : let delta_entry: MockRecord = delta_entry?;
550 0 : if key_range.contains(&delta_entry.key) && lsn_range.contains(&delta_entry.lsn) {
551 0 : total_len += delta_entry.len;
552 0 : records.push(delta_entry);
553 0 : }
554 : }
555 0 : let total_records = records.len();
556 0 : let new_layer = Arc::new(MockDeltaLayer {
557 0 : key_range: key_range.clone(),
558 0 : lsn_range: lsn_range.clone(),
559 0 : file_size: total_len,
560 0 : records,
561 0 : deleted: Mutex::new(false),
562 0 : });
563 0 : info!(
564 0 : "created delta layer, recs {}, size {}: {}",
565 0 : total_records,
566 0 : total_len,
567 0 : new_layer.short_id()
568 0 : );
569 0 : self.live_layers.push(MockLayer::Delta(new_layer.clone()));
570 0 :
571 0 : // update stats
572 0 : self.bytes_written += total_len;
573 0 : self.layers_created += 1;
574 0 :
575 0 : self.time += 1;
576 0 : self.history.push(LayerTraceEvent {
577 0 : time_rel: self.time,
578 0 : op: LayerTraceOp::CreateDelta,
579 0 : file: LayerTraceFile {
580 0 : filename: new_layer.short_id(),
581 0 : key_range: new_layer.key_range.clone(),
582 0 : lsn_range: new_layer.lsn_range.clone(),
583 0 : },
584 0 : });
585 0 :
586 0 : Ok(())
587 0 : }
588 :
589 0 : async fn delete_layer(
590 0 : &mut self,
591 0 : layer: &Self::Layer,
592 0 : _ctx: &MockRequestContext,
593 0 : ) -> anyhow::Result<()> {
594 0 : let layer = std::pin::pin!(layer);
595 0 : info!("deleting layer: {}", layer.short_id());
596 0 : self.num_deleted_layers += 1;
597 0 : self.bytes_deleted += layer.file_size();
598 0 : layer.mark_deleted();
599 0 :
600 0 : self.time += 1;
601 0 : self.history.push(LayerTraceEvent {
602 0 : time_rel: self.time,
603 0 : op: LayerTraceOp::Delete,
604 0 : file: LayerTraceFile {
605 0 : filename: layer.short_id(),
606 0 : key_range: layer.key_range().clone(),
607 0 : lsn_range: layer.lsn_range().clone(),
608 0 : },
609 0 : });
610 0 :
611 0 : Ok(())
612 0 : }
613 : }
|