Line data Source code
1 : mod draw;
2 :
3 : use draw::{LayerTraceEvent, LayerTraceFile, LayerTraceOp};
4 :
5 : use futures::StreamExt;
6 : use pageserver_api::shard::ShardIdentity;
7 : use rand::Rng;
8 : use tracing::info;
9 :
10 : use utils::lsn::Lsn;
11 :
12 : use std::fmt::Write;
13 : use std::ops::Range;
14 : use std::sync::Arc;
15 : use std::sync::Mutex;
16 :
17 : use crate::helpers::PAGE_SZ;
18 : use crate::helpers::{merge_delta_keys, overlaps_with};
19 :
20 : use crate::interface;
21 : use crate::interface::CompactionLayer;
22 :
23 : //
24 : // Implementation for the CompactionExecutor interface
25 : //
26 : pub struct MockTimeline {
27 : // Parameters for the compaction algorithm
28 : pub target_file_size: u64,
29 : tiers_per_level: u64,
30 :
31 : num_l0_flushes: u64,
32 : last_compact_at_flush: u64,
33 : last_flush_lsn: Lsn,
34 :
35 : // In-memory layer
36 : records: Vec<MockRecord>,
37 : total_len: u64,
38 : start_lsn: Lsn,
39 : end_lsn: Lsn,
40 :
41 : // Current keyspace at `end_lsn`. This is updated on every ingested record.
42 : keyspace: KeySpace,
43 :
44 : // historic keyspaces
45 : old_keyspaces: Vec<(Lsn, KeySpace)>,
46 :
47 : // "on-disk" layers
48 : pub live_layers: Vec<MockLayer>,
49 :
50 : num_deleted_layers: u64,
51 :
52 : // Statistics
53 : wal_ingested: u64,
54 : bytes_written: u64,
55 : bytes_deleted: u64,
56 : layers_created: u64,
57 : layers_deleted: u64,
58 :
59 : // All the events - creation and deletion of files - are collected
60 : // in 'history'. It is used to draw the SVG animation at the end.
61 : time: u64,
62 : history: Vec<draw::LayerTraceEvent>,
63 : }
64 :
65 : type KeySpace = interface::CompactionKeySpace<Key>;
66 :
67 : pub struct MockRequestContext {}
68 : impl interface::CompactionRequestContext for MockRequestContext {}
69 :
70 : pub type Key = u64;
71 :
72 : impl interface::CompactionKey for Key {
73 : const MIN: Self = u64::MIN;
74 : const MAX: Self = u64::MAX;
75 :
76 3 : fn key_range_size(key_range: &Range<Self>, _shard_identity: &ShardIdentity) -> u32 {
77 3 : std::cmp::min(key_range.end - key_range.start, u32::MAX as u64) as u32
78 3 : }
79 :
80 1494 : fn next(&self) -> Self {
81 1494 : self + 1
82 1494 : }
83 0 : fn skip_some(&self) -> Self {
84 0 : // round up to next xx
85 0 : self + 100
86 0 : }
87 : }
88 :
89 : #[derive(Clone)]
90 : pub struct MockRecord {
91 : lsn: Lsn,
92 : key: Key,
93 : len: u64,
94 : }
95 :
96 : impl interface::CompactionDeltaEntry<'_, Key> for MockRecord {
97 39838118 : fn key(&self) -> Key {
98 39838118 : self.key
99 39838118 : }
100 38905759 : fn lsn(&self) -> Lsn {
101 38905759 : self.lsn
102 38905759 : }
103 839047 : fn size(&self) -> u64 {
104 839047 : self.len
105 839047 : }
106 : }
107 :
108 : pub struct MockDeltaLayer {
109 : pub key_range: Range<Key>,
110 : pub lsn_range: Range<Lsn>,
111 :
112 : pub file_size: u64,
113 :
114 : pub deleted: Mutex<bool>,
115 :
116 : pub records: Vec<MockRecord>,
117 : }
118 :
119 : impl interface::CompactionLayer<Key> for Arc<MockDeltaLayer> {
120 7316407 : fn key_range(&self) -> &Range<Key> {
121 7316407 : &self.key_range
122 7316407 : }
123 7343826 : fn lsn_range(&self) -> &Range<Lsn> {
124 7343826 : &self.lsn_range
125 7343826 : }
126 :
127 0 : fn file_size(&self) -> u64 {
128 0 : self.file_size
129 0 : }
130 :
131 381 : fn short_id(&self) -> String {
132 381 : format!(
133 381 : "{:016X}-{:016X}__{:08X}-{:08X}",
134 381 : self.key_range.start, self.key_range.end, self.lsn_range.start.0, self.lsn_range.end.0
135 381 : )
136 381 : }
137 :
138 0 : fn is_delta(&self) -> bool {
139 0 : true
140 0 : }
141 : }
142 :
143 : impl interface::CompactionDeltaLayer<MockTimeline> for Arc<MockDeltaLayer> {
144 : type DeltaEntry<'a> = MockRecord;
145 :
146 1608 : async fn load_keys<'a>(&self, _ctx: &MockRequestContext) -> anyhow::Result<Vec<MockRecord>> {
147 1608 : Ok(self.records.clone())
148 1608 : }
149 : }
150 :
151 : pub struct MockImageLayer {
152 : pub key_range: Range<Key>,
153 : pub lsn_range: Range<Lsn>,
154 :
155 : pub file_size: u64,
156 :
157 : pub deleted: Mutex<bool>,
158 : }
159 :
160 : impl interface::CompactionImageLayer<MockTimeline> for Arc<MockImageLayer> {}
161 :
162 : impl interface::CompactionLayer<Key> for Arc<MockImageLayer> {
163 3 : fn key_range(&self) -> &Range<Key> {
164 3 : &self.key_range
165 3 : }
166 7 : fn lsn_range(&self) -> &Range<Lsn> {
167 7 : &self.lsn_range
168 7 : }
169 :
170 0 : fn file_size(&self) -> u64 {
171 0 : self.file_size
172 0 : }
173 :
174 0 : fn short_id(&self) -> String {
175 0 : format!(
176 0 : "{:016X}-{:016X}__{:08X}",
177 0 : self.key_range.start, self.key_range.end, self.lsn_range.start.0,
178 0 : )
179 0 : }
180 :
181 0 : fn is_delta(&self) -> bool {
182 0 : false
183 0 : }
184 : }
185 :
186 : impl MockTimeline {
187 2 : pub fn new() -> Self {
188 2 : MockTimeline {
189 2 : target_file_size: 256 * 1024 * 1024,
190 2 : tiers_per_level: 4,
191 2 :
192 2 : num_l0_flushes: 0,
193 2 : last_compact_at_flush: 0,
194 2 : last_flush_lsn: Lsn(0),
195 2 :
196 2 : records: Vec::new(),
197 2 : total_len: 0,
198 2 : start_lsn: Lsn(1000),
199 2 : end_lsn: Lsn(1000),
200 2 : keyspace: KeySpace::new(),
201 2 :
202 2 : old_keyspaces: vec![],
203 2 :
204 2 : live_layers: vec![],
205 2 :
206 2 : num_deleted_layers: 0,
207 2 :
208 2 : wal_ingested: 0,
209 2 : bytes_written: 0,
210 2 : bytes_deleted: 0,
211 2 : layers_created: 0,
212 2 : layers_deleted: 0,
213 2 :
214 2 : time: 0,
215 2 : history: Vec::new(),
216 2 : }
217 2 : }
218 :
219 1000 : pub async fn compact(&mut self) -> anyhow::Result<()> {
220 1000 : let ctx = MockRequestContext {};
221 1000 :
222 1000 : crate::compact_tiered::compact_tiered(
223 1000 : self,
224 1000 : self.last_flush_lsn,
225 1000 : self.target_file_size,
226 1000 : self.tiers_per_level,
227 1000 : &ctx,
228 1000 : )
229 0 : .await?;
230 :
231 1000 : Ok(())
232 1000 : }
233 :
234 : // Ingest one record to the timeline
235 1138800 : pub fn ingest_record(&mut self, key: Key, len: u64) {
236 1138800 : self.records.push(MockRecord {
237 1138800 : lsn: self.end_lsn,
238 1138800 : key,
239 1138800 : len,
240 1138800 : });
241 1138800 : self.total_len += len;
242 1138800 : self.end_lsn += len;
243 1138800 :
244 1138800 : if self.total_len > self.target_file_size {
245 49 : self.flush_l0();
246 1138751 : }
247 1138800 : }
248 :
249 0 : pub async fn compact_if_needed(&mut self) -> anyhow::Result<()> {
250 0 : if self.num_l0_flushes - self.last_compact_at_flush >= self.tiers_per_level {
251 0 : self.compact().await?;
252 0 : self.last_compact_at_flush = self.num_l0_flushes;
253 0 : }
254 0 : Ok(())
255 0 : }
256 :
257 49 : pub fn flush_l0(&mut self) {
258 49 : if self.records.is_empty() {
259 0 : return;
260 49 : }
261 49 :
262 49 : let mut records = std::mem::take(&mut self.records);
263 15957630 : records.sort_by_key(|rec| rec.key);
264 49 :
265 49 : let lsn_range = self.start_lsn..self.end_lsn;
266 49 : let new_layer = Arc::new(MockDeltaLayer {
267 49 : key_range: Key::MIN..Key::MAX,
268 49 : lsn_range: lsn_range.clone(),
269 49 : file_size: self.total_len,
270 49 : records,
271 49 : deleted: Mutex::new(false),
272 49 : });
273 49 : info!("flushed L0 layer {}", new_layer.short_id());
274 49 : self.live_layers.push(MockLayer::from(&new_layer));
275 49 :
276 49 : // reset L0
277 49 : self.start_lsn = self.end_lsn;
278 49 : self.total_len = 0;
279 49 : self.records = Vec::new();
280 49 :
281 49 : self.layers_created += 1;
282 49 : self.bytes_written += new_layer.file_size;
283 49 :
284 49 : self.time += 1;
285 49 : self.history.push(LayerTraceEvent {
286 49 : time_rel: self.time,
287 49 : op: LayerTraceOp::Flush,
288 49 : file: LayerTraceFile {
289 49 : filename: new_layer.short_id(),
290 49 : key_range: new_layer.key_range.clone(),
291 49 : lsn_range: new_layer.lsn_range.clone(),
292 49 : },
293 49 : });
294 49 :
295 49 : self.num_l0_flushes += 1;
296 49 : self.last_flush_lsn = self.end_lsn;
297 49 : }
298 :
299 : // Ingest `num_records' records to the timeline, with random keys
300 : // uniformly distributed in `key_range`
301 2397 : pub fn ingest_uniform(
302 2397 : &mut self,
303 2397 : num_records: u64,
304 2397 : len: u64,
305 2397 : key_range: &Range<Key>,
306 2397 : ) -> anyhow::Result<()> {
307 2397 : crate::helpers::union_to_keyspace(&mut self.keyspace, vec![key_range.clone()]);
308 2397 : let mut rng = rand::thread_rng();
309 1138800 : for _ in 0..num_records {
310 1138800 : self.ingest_record(rng.gen_range(key_range.clone()), len);
311 1138800 : self.wal_ingested += len;
312 1138800 : }
313 2397 : Ok(())
314 2397 : }
315 :
316 0 : pub fn stats(&self) -> anyhow::Result<String> {
317 0 : let mut s = String::new();
318 0 :
319 0 : writeln!(s, "STATISTICS:")?;
320 0 : writeln!(
321 0 : s,
322 0 : "WAL ingested: {:>10} MB",
323 0 : self.wal_ingested / (1024 * 1024)
324 0 : )?;
325 0 : writeln!(
326 0 : s,
327 0 : "size created: {:>10} MB",
328 0 : self.bytes_written / (1024 * 1024)
329 0 : )?;
330 0 : writeln!(
331 0 : s,
332 0 : "size deleted: {:>10} MB",
333 0 : self.bytes_deleted / (1024 * 1024)
334 0 : )?;
335 0 : writeln!(s, "files created: {:>10}", self.layers_created)?;
336 0 : writeln!(s, "files deleted: {:>10}", self.layers_deleted)?;
337 0 : writeln!(
338 0 : s,
339 0 : "write amp: {:>10.2}",
340 0 : self.bytes_written as f64 / self.wal_ingested as f64
341 0 : )?;
342 0 : writeln!(
343 0 : s,
344 0 : "storage amp: {:>10.2}",
345 0 : (self.bytes_written - self.bytes_deleted) as f64 / self.wal_ingested as f64
346 0 : )?;
347 :
348 0 : Ok(s)
349 0 : }
350 :
351 0 : pub fn draw_history<W: std::io::Write>(&self, output: W) -> anyhow::Result<()> {
352 0 : draw::draw_history(&self.history, output)
353 0 : }
354 : }
355 :
356 : impl Default for MockTimeline {
357 0 : fn default() -> Self {
358 0 : Self::new()
359 0 : }
360 : }
361 :
362 : #[derive(Clone)]
363 : pub enum MockLayer {
364 : Delta(Arc<MockDeltaLayer>),
365 : Image(Arc<MockImageLayer>),
366 : }
367 :
368 : impl interface::CompactionLayer<Key> for MockLayer {
369 11913 : fn key_range(&self) -> &Range<Key> {
370 11913 : match self {
371 11910 : MockLayer::Delta(this) => this.key_range(),
372 3 : MockLayer::Image(this) => this.key_range(),
373 : }
374 11913 : }
375 39336 : fn lsn_range(&self) -> &Range<Lsn> {
376 39336 : match self {
377 39329 : MockLayer::Delta(this) => this.lsn_range(),
378 7 : MockLayer::Image(this) => this.lsn_range(),
379 : }
380 39336 : }
381 208 : fn file_size(&self) -> u64 {
382 208 : match self {
383 208 : MockLayer::Delta(this) => this.file_size,
384 0 : MockLayer::Image(this) => this.file_size,
385 : }
386 208 : }
387 185 : fn short_id(&self) -> String {
388 185 : match self {
389 185 : MockLayer::Delta(this) => this.short_id(),
390 0 : MockLayer::Image(this) => this.short_id(),
391 : }
392 185 : }
393 :
394 11051 : fn is_delta(&self) -> bool {
395 11051 : match self {
396 11047 : MockLayer::Delta(_) => true,
397 4 : MockLayer::Image(_) => false,
398 : }
399 11051 : }
400 : }
401 :
402 : impl MockLayer {
403 6041 : fn is_deleted(&self) -> bool {
404 6041 : let guard = match self {
405 6041 : MockLayer::Delta(this) => this.deleted.lock().unwrap(),
406 0 : MockLayer::Image(this) => this.deleted.lock().unwrap(),
407 : };
408 6041 : *guard
409 6041 : }
410 47 : fn mark_deleted(&self) {
411 47 : let mut deleted_guard = match self {
412 47 : MockLayer::Delta(this) => this.deleted.lock().unwrap(),
413 0 : MockLayer::Image(this) => this.deleted.lock().unwrap(),
414 : };
415 47 : assert!(!*deleted_guard, "layer already deleted");
416 47 : *deleted_guard = true;
417 47 : }
418 : }
419 :
420 : impl From<&Arc<MockDeltaLayer>> for MockLayer {
421 49 : fn from(l: &Arc<MockDeltaLayer>) -> Self {
422 49 : MockLayer::Delta(l.clone())
423 49 : }
424 : }
425 :
426 : impl From<&Arc<MockImageLayer>> for MockLayer {
427 0 : fn from(l: &Arc<MockImageLayer>) -> Self {
428 0 : MockLayer::Image(l.clone())
429 0 : }
430 : }
431 :
432 : impl interface::CompactionJobExecutor for MockTimeline {
433 : type Key = Key;
434 : type Layer = MockLayer;
435 : type DeltaLayer = Arc<MockDeltaLayer>;
436 : type ImageLayer = Arc<MockImageLayer>;
437 : type RequestContext = MockRequestContext;
438 :
439 3 : fn get_shard_identity(&self) -> &ShardIdentity {
440 : static IDENTITY: ShardIdentity = ShardIdentity::unsharded();
441 3 : &IDENTITY
442 3 : }
443 :
444 1003 : async fn get_layers(
445 1003 : &mut self,
446 1003 : key_range: &Range<Self::Key>,
447 1003 : lsn_range: &Range<Lsn>,
448 1003 : _ctx: &Self::RequestContext,
449 1003 : ) -> anyhow::Result<Vec<Self::Layer>> {
450 1003 : // Clear any deleted layers from our vec
451 6041 : self.live_layers.retain(|l| !l.is_deleted());
452 1003 :
453 1003 : let layers: Vec<MockLayer> = self
454 1003 : .live_layers
455 1003 : .iter()
456 5994 : .filter(|l| {
457 5994 : overlaps_with(l.lsn_range(), lsn_range) && overlaps_with(l.key_range(), key_range)
458 5994 : })
459 1003 : .cloned()
460 1003 : .collect();
461 1003 :
462 1003 : Ok(layers)
463 1003 : }
464 :
465 3 : async fn get_keyspace(
466 3 : &mut self,
467 3 : key_range: &Range<Self::Key>,
468 3 : _lsn: Lsn,
469 3 : _ctx: &Self::RequestContext,
470 3 : ) -> anyhow::Result<interface::CompactionKeySpace<Key>> {
471 3 : // find it in the levels
472 3 : if self.old_keyspaces.is_empty() {
473 3 : Ok(crate::helpers::intersect_keyspace(
474 3 : &self.keyspace,
475 3 : key_range,
476 3 : ))
477 : } else {
478 : // not implemented
479 :
480 : // The mock implementation only allows requesting the
481 : // keyspace at the level's end LSN. That's all that the
482 : // current implementation needs.
483 0 : panic!("keyspace not available for requested lsn");
484 : }
485 3 : }
486 :
487 1608 : async fn downcast_delta_layer(
488 1608 : &self,
489 1608 : layer: &MockLayer,
490 1608 : ) -> anyhow::Result<Option<Arc<MockDeltaLayer>>> {
491 1608 : Ok(match layer {
492 1608 : MockLayer::Delta(l) => Some(l.clone()),
493 0 : MockLayer::Image(_) => None,
494 : })
495 1608 : }
496 :
497 0 : async fn create_image(
498 0 : &mut self,
499 0 : lsn: Lsn,
500 0 : key_range: &Range<Key>,
501 0 : ctx: &MockRequestContext,
502 0 : ) -> anyhow::Result<()> {
503 0 : let keyspace = self.get_keyspace(key_range, lsn, ctx).await?;
504 :
505 0 : let mut accum_size: u64 = 0;
506 0 : for r in keyspace {
507 0 : accum_size += r.end - r.start;
508 0 : }
509 :
510 0 : let new_layer = Arc::new(MockImageLayer {
511 0 : key_range: key_range.clone(),
512 0 : lsn_range: lsn..lsn,
513 0 : file_size: accum_size * PAGE_SZ,
514 0 : deleted: Mutex::new(false),
515 0 : });
516 0 : info!(
517 0 : "created image layer, size {}: {}",
518 0 : new_layer.file_size,
519 0 : new_layer.short_id()
520 : );
521 0 : self.live_layers.push(MockLayer::Image(new_layer.clone()));
522 0 :
523 0 : // update stats
524 0 : self.bytes_written += new_layer.file_size;
525 0 : self.layers_created += 1;
526 0 :
527 0 : self.time += 1;
528 0 : self.history.push(LayerTraceEvent {
529 0 : time_rel: self.time,
530 0 : op: LayerTraceOp::CreateImage,
531 0 : file: LayerTraceFile {
532 0 : filename: new_layer.short_id(),
533 0 : key_range: new_layer.key_range.clone(),
534 0 : lsn_range: new_layer.lsn_range.clone(),
535 0 : },
536 0 : });
537 0 :
538 0 : Ok(())
539 0 : }
540 :
541 49 : async fn create_delta(
542 49 : &mut self,
543 49 : lsn_range: &Range<Lsn>,
544 49 : key_range: &Range<Key>,
545 49 : input_layers: &[Arc<MockDeltaLayer>],
546 49 : ctx: &MockRequestContext,
547 49 : ) -> anyhow::Result<()> {
548 49 : let mut key_value_stream =
549 49 : std::pin::pin!(merge_delta_keys::<MockTimeline>(input_layers, ctx));
550 49 : let mut records: Vec<MockRecord> = Vec::new();
551 49 : let mut total_len = 2;
552 5522610 : while let Some(delta_entry) = key_value_stream.next().await {
553 5522561 : let delta_entry: MockRecord = delta_entry?;
554 5522561 : if key_range.contains(&delta_entry.key) && lsn_range.contains(&delta_entry.lsn) {
555 839047 : total_len += delta_entry.len;
556 839047 : records.push(delta_entry);
557 4683514 : }
558 : }
559 49 : let total_records = records.len();
560 49 : let new_layer = Arc::new(MockDeltaLayer {
561 49 : key_range: key_range.clone(),
562 49 : lsn_range: lsn_range.clone(),
563 49 : file_size: total_len,
564 49 : records,
565 49 : deleted: Mutex::new(false),
566 49 : });
567 49 : info!(
568 0 : "created delta layer, recs {}, size {}: {}",
569 0 : total_records,
570 0 : total_len,
571 0 : new_layer.short_id()
572 : );
573 49 : self.live_layers.push(MockLayer::Delta(new_layer.clone()));
574 49 :
575 49 : // update stats
576 49 : self.bytes_written += total_len;
577 49 : self.layers_created += 1;
578 49 :
579 49 : self.time += 1;
580 49 : self.history.push(LayerTraceEvent {
581 49 : time_rel: self.time,
582 49 : op: LayerTraceOp::CreateDelta,
583 49 : file: LayerTraceFile {
584 49 : filename: new_layer.short_id(),
585 49 : key_range: new_layer.key_range.clone(),
586 49 : lsn_range: new_layer.lsn_range.clone(),
587 49 : },
588 49 : });
589 49 :
590 49 : Ok(())
591 49 : }
592 :
593 47 : async fn delete_layer(
594 47 : &mut self,
595 47 : layer: &Self::Layer,
596 47 : _ctx: &MockRequestContext,
597 47 : ) -> anyhow::Result<()> {
598 47 : let layer = std::pin::pin!(layer);
599 47 : info!("deleting layer: {}", layer.short_id());
600 47 : self.num_deleted_layers += 1;
601 47 : self.bytes_deleted += layer.file_size();
602 47 : layer.mark_deleted();
603 47 :
604 47 : self.time += 1;
605 47 : self.history.push(LayerTraceEvent {
606 47 : time_rel: self.time,
607 47 : op: LayerTraceOp::Delete,
608 47 : file: LayerTraceFile {
609 47 : filename: layer.short_id(),
610 47 : key_range: layer.key_range().clone(),
611 47 : lsn_range: layer.lsn_range().clone(),
612 47 : },
613 47 : });
614 47 :
615 47 : Ok(())
616 47 : }
617 : }
|