Line data Source code
1 : use std::future::Future;
2 : use std::ops::Range;
3 : use std::sync::Arc;
4 :
5 : use bytes::Bytes;
6 : use pageserver_api::key::{KEY_SIZE, Key};
7 : use pageserver_api::value::Value;
8 : use tokio_util::sync::CancellationToken;
9 : use utils::id::TimelineId;
10 : use utils::lsn::Lsn;
11 : use utils::shard::TenantShardId;
12 :
13 : use super::layer::S3_UPLOAD_LIMIT;
14 : use super::{
15 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
16 : };
17 : use crate::config::PageServerConf;
18 : use crate::context::RequestContext;
19 : use crate::tenant::Timeline;
20 : use crate::tenant::storage_layer::Layer;
21 :
22 : pub(crate) enum BatchWriterResult {
23 : Produced(ResidentLayer),
24 : Discarded(PersistentLayerKey),
25 : }
26 :
27 : #[cfg(test)]
28 : impl BatchWriterResult {
29 144 : fn into_resident_layer(self) -> ResidentLayer {
30 144 : match self {
31 144 : BatchWriterResult::Produced(layer) => layer,
32 0 : BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
33 : }
34 144 : }
35 :
36 96 : fn into_discarded_layer(self) -> PersistentLayerKey {
37 96 : match self {
38 0 : BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
39 96 : BatchWriterResult::Discarded(layer) => layer,
40 96 : }
41 96 : }
42 : }
43 :
44 : enum LayerWriterWrapper {
45 : Image(ImageLayerWriter),
46 : Delta(DeltaLayerWriter),
47 : }
48 :
49 : /// An layer writer that takes unfinished layers and finish them atomically.
50 : #[must_use]
51 : pub struct BatchLayerWriter {
52 : generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
53 : conf: &'static PageServerConf,
54 : }
55 :
56 : impl BatchLayerWriter {
57 4176 : pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
58 4176 : Ok(Self {
59 4176 : generated_layer_writers: Vec::new(),
60 4176 : conf,
61 4176 : })
62 4176 : }
63 :
64 1884 : pub fn add_unfinished_image_writer(
65 1884 : &mut self,
66 1884 : writer: ImageLayerWriter,
67 1884 : key_range: Range<Key>,
68 1884 : lsn: Lsn,
69 1884 : ) {
70 1884 : self.generated_layer_writers.push((
71 1884 : LayerWriterWrapper::Image(writer),
72 1884 : PersistentLayerKey {
73 1884 : key_range,
74 1884 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
75 1884 : is_delta: false,
76 1884 : },
77 1884 : ));
78 1884 : }
79 :
80 360 : pub fn add_unfinished_delta_writer(
81 360 : &mut self,
82 360 : writer: DeltaLayerWriter,
83 360 : key_range: Range<Key>,
84 360 : lsn_range: Range<Lsn>,
85 360 : ) {
86 360 : self.generated_layer_writers.push((
87 360 : LayerWriterWrapper::Delta(writer),
88 360 : PersistentLayerKey {
89 360 : key_range,
90 360 : lsn_range,
91 360 : is_delta: true,
92 360 : },
93 360 : ));
94 360 : }
95 :
96 3480 : pub(crate) async fn finish(
97 3480 : self,
98 3480 : tline: &Arc<Timeline>,
99 3480 : ctx: &RequestContext,
100 3480 : ) -> anyhow::Result<Vec<ResidentLayer>> {
101 3480 : let res = self
102 3480 : .finish_with_discard_fn(tline, ctx, |_| async { false })
103 3480 : .await?;
104 3480 : let mut output = Vec::new();
105 5004 : for r in res {
106 1524 : if let BatchWriterResult::Produced(layer) = r {
107 1524 : output.push(layer);
108 1524 : }
109 : }
110 3480 : Ok(output)
111 3480 : }
112 :
113 4104 : pub(crate) async fn finish_with_discard_fn<D, F>(
114 4104 : self,
115 4104 : tline: &Arc<Timeline>,
116 4104 : ctx: &RequestContext,
117 4104 : discard_fn: D,
118 4104 : ) -> anyhow::Result<Vec<BatchWriterResult>>
119 4104 : where
120 4104 : D: Fn(&PersistentLayerKey) -> F,
121 4104 : F: Future<Output = bool>,
122 4104 : {
123 4104 : let Self {
124 4104 : generated_layer_writers,
125 4104 : ..
126 4104 : } = self;
127 4104 : let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
128 0 : for produced_layer in generated_layers {
129 0 : if let BatchWriterResult::Produced(resident_layer) = produced_layer {
130 0 : let layer: Layer = resident_layer.into();
131 0 : layer.delete_on_drop();
132 0 : }
133 : }
134 0 : };
135 : // BEGIN: catch every error and do the recovery in the below section
136 4104 : let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
137 6348 : for (inner, layer_key) in generated_layer_writers {
138 2244 : if discard_fn(&layer_key).await {
139 228 : generated_layers.push(BatchWriterResult::Discarded(layer_key));
140 228 : } else {
141 2016 : let res = match inner {
142 228 : LayerWriterWrapper::Delta(writer) => {
143 228 : writer.finish(layer_key.key_range.end, ctx).await
144 : }
145 1788 : LayerWriterWrapper::Image(writer) => {
146 1788 : writer
147 1788 : .finish_with_end_key(layer_key.key_range.end, ctx)
148 1788 : .await
149 : }
150 : };
151 2016 : let layer = match res {
152 2016 : Ok((desc, path)) => {
153 2016 : match Layer::finish_creating(self.conf, tline, desc, &path) {
154 2016 : Ok(layer) => layer,
155 0 : Err(e) => {
156 0 : tokio::fs::remove_file(&path).await.ok();
157 0 : clean_up_layers(generated_layers);
158 0 : return Err(e);
159 : }
160 : }
161 : }
162 0 : Err(e) => {
163 0 : // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
164 0 : // so we don't need to remove the layer we just failed to create by ourselves.
165 0 : clean_up_layers(generated_layers);
166 0 : return Err(e);
167 : }
168 : };
169 2016 : generated_layers.push(BatchWriterResult::Produced(layer));
170 : }
171 : }
172 : // END: catch every error and do the recovery in the above section
173 4104 : Ok(generated_layers)
174 4104 : }
175 :
176 0 : pub fn pending_layer_num(&self) -> usize {
177 0 : self.generated_layer_writers.len()
178 0 : }
179 : }
180 :
181 : /// An image writer that takes images and produces multiple image layers.
182 : #[must_use]
183 : pub struct SplitImageLayerWriter<'a> {
184 : inner: ImageLayerWriter,
185 : target_layer_size: u64,
186 : lsn: Lsn,
187 : conf: &'static PageServerConf,
188 : timeline_id: TimelineId,
189 : tenant_shard_id: TenantShardId,
190 : batches: BatchLayerWriter,
191 : start_key: Key,
192 : gate: &'a utils::sync::gate::Gate,
193 : cancel: CancellationToken,
194 : }
195 :
196 : impl<'a> SplitImageLayerWriter<'a> {
197 : #[allow(clippy::too_many_arguments)]
198 312 : pub async fn new(
199 312 : conf: &'static PageServerConf,
200 312 : timeline_id: TimelineId,
201 312 : tenant_shard_id: TenantShardId,
202 312 : start_key: Key,
203 312 : lsn: Lsn,
204 312 : target_layer_size: u64,
205 312 : gate: &'a utils::sync::gate::Gate,
206 312 : cancel: CancellationToken,
207 312 : ctx: &RequestContext,
208 312 : ) -> anyhow::Result<Self> {
209 312 : Ok(Self {
210 312 : target_layer_size,
211 312 : inner: ImageLayerWriter::new(
212 312 : conf,
213 312 : timeline_id,
214 312 : tenant_shard_id,
215 312 : &(start_key..Key::MAX),
216 312 : lsn,
217 312 : gate,
218 312 : cancel.clone(),
219 312 : ctx,
220 312 : )
221 312 : .await?,
222 312 : conf,
223 312 : timeline_id,
224 312 : tenant_shard_id,
225 312 : batches: BatchLayerWriter::new(conf).await?,
226 312 : lsn,
227 312 : start_key,
228 312 : gate,
229 312 : cancel,
230 : })
231 312 : }
232 :
233 51636 : pub async fn put_image(
234 51636 : &mut self,
235 51636 : key: Key,
236 51636 : img: Bytes,
237 51636 : ctx: &RequestContext,
238 51636 : ) -> anyhow::Result<()> {
239 51636 : // The current estimation is an upper bound of the space that the key/image could take
240 51636 : // because we did not consider compression in this estimation. The resulting image layer
241 51636 : // could be smaller than the target size.
242 51636 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
243 51636 : if self.inner.num_keys() >= 1
244 51324 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
245 : {
246 84 : let next_image_writer = ImageLayerWriter::new(
247 84 : self.conf,
248 84 : self.timeline_id,
249 84 : self.tenant_shard_id,
250 84 : &(key..Key::MAX),
251 84 : self.lsn,
252 84 : self.gate,
253 84 : self.cancel.clone(),
254 84 : ctx,
255 84 : )
256 84 : .await?;
257 84 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
258 84 : self.batches.add_unfinished_image_writer(
259 84 : prev_image_writer,
260 84 : self.start_key..key,
261 84 : self.lsn,
262 84 : );
263 84 : self.start_key = key;
264 51552 : }
265 51636 : self.inner.put_image(key, img, ctx).await
266 51636 : }
267 :
268 276 : pub(crate) async fn finish_with_discard_fn<D, F>(
269 276 : self,
270 276 : tline: &Arc<Timeline>,
271 276 : ctx: &RequestContext,
272 276 : end_key: Key,
273 276 : discard_fn: D,
274 276 : ) -> anyhow::Result<Vec<BatchWriterResult>>
275 276 : where
276 276 : D: Fn(&PersistentLayerKey) -> F,
277 276 : F: Future<Output = bool>,
278 276 : {
279 276 : let Self {
280 276 : mut batches, inner, ..
281 276 : } = self;
282 276 : if inner.num_keys() != 0 {
283 276 : batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
284 276 : }
285 276 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
286 276 : }
287 :
288 : #[cfg(test)]
289 24 : pub(crate) async fn finish(
290 24 : self,
291 24 : tline: &Arc<Timeline>,
292 24 : ctx: &RequestContext,
293 24 : end_key: Key,
294 24 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
295 36 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
296 24 : .await
297 24 : }
298 : }
299 :
300 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
301 : ///
302 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
303 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
304 : /// will split them into multiple files based on size.
305 : #[must_use]
306 : pub struct SplitDeltaLayerWriter<'a> {
307 : inner: Option<(Key, DeltaLayerWriter)>,
308 : target_layer_size: u64,
309 : conf: &'static PageServerConf,
310 : timeline_id: TimelineId,
311 : tenant_shard_id: TenantShardId,
312 : lsn_range: Range<Lsn>,
313 : last_key_written: Key,
314 : batches: BatchLayerWriter,
315 : gate: &'a utils::sync::gate::Gate,
316 : cancel: CancellationToken,
317 : }
318 :
319 : impl<'a> SplitDeltaLayerWriter<'a> {
320 384 : pub async fn new(
321 384 : conf: &'static PageServerConf,
322 384 : timeline_id: TimelineId,
323 384 : tenant_shard_id: TenantShardId,
324 384 : lsn_range: Range<Lsn>,
325 384 : target_layer_size: u64,
326 384 : gate: &'a utils::sync::gate::Gate,
327 384 : cancel: CancellationToken,
328 384 : ) -> anyhow::Result<Self> {
329 384 : Ok(Self {
330 384 : target_layer_size,
331 384 : inner: None,
332 384 : conf,
333 384 : timeline_id,
334 384 : tenant_shard_id,
335 384 : lsn_range,
336 384 : last_key_written: Key::MIN,
337 384 : batches: BatchLayerWriter::new(conf).await?,
338 384 : gate,
339 384 : cancel,
340 : })
341 384 : }
342 :
343 73248 : pub async fn put_value(
344 73248 : &mut self,
345 73248 : key: Key,
346 73248 : lsn: Lsn,
347 73248 : val: Value,
348 73248 : ctx: &RequestContext,
349 73248 : ) -> anyhow::Result<()> {
350 73248 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
351 73248 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
352 73248 : //
353 73248 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
354 73248 : // strategy. https://github.com/neondatabase/neon/issues/8837
355 73248 :
356 73248 : if self.inner.is_none() {
357 300 : self.inner = Some((
358 300 : key,
359 300 : DeltaLayerWriter::new(
360 300 : self.conf,
361 300 : self.timeline_id,
362 300 : self.tenant_shard_id,
363 300 : key,
364 300 : self.lsn_range.clone(),
365 300 : self.gate,
366 300 : self.cancel.clone(),
367 300 : ctx,
368 300 : )
369 300 : .await?,
370 : ));
371 72948 : }
372 73248 : let (_, inner) = self.inner.as_mut().unwrap();
373 73248 :
374 73248 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
375 73248 : if inner.num_keys() >= 1
376 72948 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
377 : {
378 17976 : if key != self.last_key_written {
379 84 : let next_delta_writer = DeltaLayerWriter::new(
380 84 : self.conf,
381 84 : self.timeline_id,
382 84 : self.tenant_shard_id,
383 84 : key,
384 84 : self.lsn_range.clone(),
385 84 : self.gate,
386 84 : self.cancel.clone(),
387 84 : ctx,
388 84 : )
389 84 : .await?;
390 84 : let (start_key, prev_delta_writer) =
391 84 : self.inner.replace((key, next_delta_writer)).unwrap();
392 84 : self.batches.add_unfinished_delta_writer(
393 84 : prev_delta_writer,
394 84 : start_key..key,
395 84 : self.lsn_range.clone(),
396 84 : );
397 17892 : } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
398 : // We have to produce a very large file b/c a key is updated too often.
399 0 : anyhow::bail!(
400 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
401 0 : key,
402 0 : inner.estimated_size()
403 0 : );
404 17892 : }
405 55272 : }
406 73248 : self.last_key_written = key;
407 73248 : let (_, inner) = self.inner.as_mut().unwrap();
408 73248 : inner.put_value(key, lsn, val, ctx).await
409 73248 : }
410 :
411 348 : pub(crate) async fn finish_with_discard_fn<D, F>(
412 348 : self,
413 348 : tline: &Arc<Timeline>,
414 348 : ctx: &RequestContext,
415 348 : discard_fn: D,
416 348 : ) -> anyhow::Result<Vec<BatchWriterResult>>
417 348 : where
418 348 : D: Fn(&PersistentLayerKey) -> F,
419 348 : F: Future<Output = bool>,
420 348 : {
421 348 : let Self {
422 348 : mut batches, inner, ..
423 348 : } = self;
424 348 : if let Some((start_key, writer)) = inner {
425 276 : if writer.num_keys() != 0 {
426 276 : let end_key = self.last_key_written.next();
427 276 : batches.add_unfinished_delta_writer(
428 276 : writer,
429 276 : start_key..end_key,
430 276 : self.lsn_range.clone(),
431 276 : );
432 276 : }
433 72 : }
434 348 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
435 348 : }
436 :
437 : #[cfg(test)]
438 36 : pub(crate) async fn finish(
439 36 : self,
440 36 : tline: &Arc<Timeline>,
441 36 : ctx: &RequestContext,
442 36 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
443 48 : self.finish_with_discard_fn(tline, ctx, |_| async { false })
444 36 : .await
445 36 : }
446 : }
447 :
448 : #[cfg(test)]
449 : mod tests {
450 : use itertools::Itertools;
451 : use rand::{RngCore, SeedableRng};
452 :
453 : use super::*;
454 : use crate::DEFAULT_PG_VERSION;
455 : use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
456 : use crate::tenant::storage_layer::AsLayerDesc;
457 :
458 120312 : fn get_key(id: u32) -> Key {
459 120312 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
460 120312 : key.field6 = id;
461 120312 : key
462 120312 : }
463 :
464 48 : fn get_img(id: u32) -> Bytes {
465 48 : format!("{id:064}").into()
466 48 : }
467 :
468 120024 : fn get_large_img() -> Bytes {
469 120024 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
470 120024 : let mut data = vec![0; 8192];
471 120024 : rng.fill_bytes(&mut data);
472 120024 : data.into()
473 120024 : }
474 :
475 : #[tokio::test]
476 12 : async fn write_one_image() {
477 12 : let harness = TenantHarness::create("split_writer_write_one_image")
478 12 : .await
479 12 : .unwrap();
480 12 : let (tenant, ctx) = harness.load().await;
481 12 :
482 12 : let tline = tenant
483 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
484 12 : .await
485 12 : .unwrap();
486 12 :
487 12 : let mut image_writer = SplitImageLayerWriter::new(
488 12 : tenant.conf,
489 12 : tline.timeline_id,
490 12 : tenant.tenant_shard_id,
491 12 : get_key(0),
492 12 : Lsn(0x18),
493 12 : 4 * 1024 * 1024,
494 12 : &tline.gate,
495 12 : tline.cancel.clone(),
496 12 : &ctx,
497 12 : )
498 12 : .await
499 12 : .unwrap();
500 12 :
501 12 : let mut delta_writer = SplitDeltaLayerWriter::new(
502 12 : tenant.conf,
503 12 : tline.timeline_id,
504 12 : tenant.tenant_shard_id,
505 12 : Lsn(0x18)..Lsn(0x20),
506 12 : 4 * 1024 * 1024,
507 12 : &tline.gate,
508 12 : tline.cancel.clone(),
509 12 : )
510 12 : .await
511 12 : .unwrap();
512 12 :
513 12 : image_writer
514 12 : .put_image(get_key(0), get_img(0), &ctx)
515 12 : .await
516 12 : .unwrap();
517 12 : let layers = image_writer
518 12 : .finish(&tline, &ctx, get_key(10))
519 12 : .await
520 12 : .unwrap();
521 12 : assert_eq!(layers.len(), 1);
522 12 :
523 12 : delta_writer
524 12 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
525 12 : .await
526 12 : .unwrap();
527 12 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
528 12 : assert_eq!(layers.len(), 1);
529 12 : assert_eq!(
530 12 : layers
531 12 : .into_iter()
532 12 : .next()
533 12 : .unwrap()
534 12 : .into_resident_layer()
535 12 : .layer_desc()
536 12 : .key(),
537 12 : PersistentLayerKey {
538 12 : key_range: get_key(0)..get_key(1),
539 12 : lsn_range: Lsn(0x18)..Lsn(0x20),
540 12 : is_delta: true
541 12 : }
542 12 : );
543 12 : }
544 :
545 : #[tokio::test]
546 12 : async fn write_split() {
547 12 : // Test the split writer with retaining all the layers we have produced (discard=false)
548 12 : write_split_helper("split_writer_write_split", false).await;
549 12 : }
550 :
551 : #[tokio::test]
552 12 : async fn write_split_discard() {
553 12 : // Test the split writer with discarding all the layers we have produced (discard=true)
554 12 : write_split_helper("split_writer_write_split_discard", true).await;
555 12 : }
556 :
557 : /// Test the image+delta writer by writing a large number of images and deltas. If discard is
558 : /// set to true, all layers will be discarded.
559 24 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
560 24 : let harness = TenantHarness::create(harness_name).await.unwrap();
561 24 : let (tenant, ctx) = harness.load().await;
562 :
563 24 : let tline = tenant
564 24 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
565 24 : .await
566 24 : .unwrap();
567 :
568 24 : let mut image_writer = SplitImageLayerWriter::new(
569 24 : tenant.conf,
570 24 : tline.timeline_id,
571 24 : tenant.tenant_shard_id,
572 24 : get_key(0),
573 24 : Lsn(0x18),
574 24 : 4 * 1024 * 1024,
575 24 : &tline.gate,
576 24 : tline.cancel.clone(),
577 24 : &ctx,
578 24 : )
579 24 : .await
580 24 : .unwrap();
581 24 : let mut delta_writer = SplitDeltaLayerWriter::new(
582 24 : tenant.conf,
583 24 : tline.timeline_id,
584 24 : tenant.tenant_shard_id,
585 24 : Lsn(0x18)..Lsn(0x20),
586 24 : 4 * 1024 * 1024,
587 24 : &tline.gate,
588 24 : tline.cancel.clone(),
589 24 : )
590 24 : .await
591 24 : .unwrap();
592 : const N: usize = 2000;
593 48024 : for i in 0..N {
594 48000 : let i = i as u32;
595 48000 : image_writer
596 48000 : .put_image(get_key(i), get_large_img(), &ctx)
597 48000 : .await
598 48000 : .unwrap();
599 48000 : delta_writer
600 48000 : .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
601 48000 : .await
602 48000 : .unwrap();
603 : }
604 24 : let image_layers = image_writer
605 96 : .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
606 24 : .await
607 24 : .unwrap();
608 24 : let delta_layers = delta_writer
609 96 : .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
610 24 : .await
611 24 : .unwrap();
612 24 : let image_layers = image_layers
613 24 : .into_iter()
614 96 : .map(|x| {
615 96 : if discard {
616 48 : x.into_discarded_layer()
617 : } else {
618 48 : x.into_resident_layer().layer_desc().key()
619 : }
620 96 : })
621 24 : .collect_vec();
622 24 : let delta_layers = delta_layers
623 24 : .into_iter()
624 96 : .map(|x| {
625 96 : if discard {
626 48 : x.into_discarded_layer()
627 : } else {
628 48 : x.into_resident_layer().layer_desc().key()
629 : }
630 96 : })
631 24 : .collect_vec();
632 24 : assert_eq!(image_layers.len(), N / 512 + 1);
633 24 : assert_eq!(delta_layers.len(), N / 512 + 1);
634 24 : assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
635 24 : assert_eq!(
636 24 : delta_layers.last().unwrap().key_range.end,
637 24 : get_key(N as u32)
638 24 : );
639 96 : for idx in 0..image_layers.len() {
640 96 : assert_ne!(image_layers[idx].key_range.start, Key::MIN);
641 96 : assert_ne!(image_layers[idx].key_range.end, Key::MAX);
642 96 : assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
643 96 : assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
644 96 : if idx > 0 {
645 72 : assert_eq!(
646 72 : image_layers[idx - 1].key_range.end,
647 72 : image_layers[idx].key_range.start
648 72 : );
649 72 : assert_eq!(
650 72 : delta_layers[idx - 1].key_range.end,
651 72 : delta_layers[idx].key_range.start
652 72 : );
653 24 : }
654 : }
655 24 : }
656 :
657 : #[tokio::test]
658 12 : async fn write_large_img() {
659 12 : let harness = TenantHarness::create("split_writer_write_large_img")
660 12 : .await
661 12 : .unwrap();
662 12 : let (tenant, ctx) = harness.load().await;
663 12 :
664 12 : let tline = tenant
665 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
666 12 : .await
667 12 : .unwrap();
668 12 :
669 12 : let mut image_writer = SplitImageLayerWriter::new(
670 12 : tenant.conf,
671 12 : tline.timeline_id,
672 12 : tenant.tenant_shard_id,
673 12 : get_key(0),
674 12 : Lsn(0x18),
675 12 : 4 * 1024,
676 12 : &tline.gate,
677 12 : tline.cancel.clone(),
678 12 : &ctx,
679 12 : )
680 12 : .await
681 12 : .unwrap();
682 12 :
683 12 : let mut delta_writer = SplitDeltaLayerWriter::new(
684 12 : tenant.conf,
685 12 : tline.timeline_id,
686 12 : tenant.tenant_shard_id,
687 12 : Lsn(0x18)..Lsn(0x20),
688 12 : 4 * 1024,
689 12 : &tline.gate,
690 12 : tline.cancel.clone(),
691 12 : )
692 12 : .await
693 12 : .unwrap();
694 12 :
695 12 : image_writer
696 12 : .put_image(get_key(0), get_img(0), &ctx)
697 12 : .await
698 12 : .unwrap();
699 12 : image_writer
700 12 : .put_image(get_key(1), get_large_img(), &ctx)
701 12 : .await
702 12 : .unwrap();
703 12 : let layers = image_writer
704 12 : .finish(&tline, &ctx, get_key(10))
705 12 : .await
706 12 : .unwrap();
707 12 : assert_eq!(layers.len(), 2);
708 12 :
709 12 : delta_writer
710 12 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
711 12 : .await
712 12 : .unwrap();
713 12 : delta_writer
714 12 : .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
715 12 : .await
716 12 : .unwrap();
717 12 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
718 12 : assert_eq!(layers.len(), 2);
719 12 : let mut layers_iter = layers.into_iter();
720 12 : assert_eq!(
721 12 : layers_iter
722 12 : .next()
723 12 : .unwrap()
724 12 : .into_resident_layer()
725 12 : .layer_desc()
726 12 : .key(),
727 12 : PersistentLayerKey {
728 12 : key_range: get_key(0)..get_key(1),
729 12 : lsn_range: Lsn(0x18)..Lsn(0x20),
730 12 : is_delta: true
731 12 : }
732 12 : );
733 12 : assert_eq!(
734 12 : layers_iter
735 12 : .next()
736 12 : .unwrap()
737 12 : .into_resident_layer()
738 12 : .layer_desc()
739 12 : .key(),
740 12 : PersistentLayerKey {
741 12 : key_range: get_key(1)..get_key(2),
742 12 : lsn_range: Lsn(0x18)..Lsn(0x20),
743 12 : is_delta: true
744 12 : }
745 12 : );
746 12 : }
747 :
748 : #[tokio::test]
749 12 : async fn write_split_single_key() {
750 12 : let harness = TenantHarness::create("split_writer_write_split_single_key")
751 12 : .await
752 12 : .unwrap();
753 12 : let (tenant, ctx) = harness.load().await;
754 12 :
755 12 : let tline = tenant
756 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
757 12 : .await
758 12 : .unwrap();
759 12 :
760 12 : const N: usize = 2000;
761 12 : let mut delta_writer = SplitDeltaLayerWriter::new(
762 12 : tenant.conf,
763 12 : tline.timeline_id,
764 12 : tenant.tenant_shard_id,
765 12 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
766 12 : 4 * 1024 * 1024,
767 12 : &tline.gate,
768 12 : tline.cancel.clone(),
769 12 : )
770 12 : .await
771 12 : .unwrap();
772 12 :
773 24012 : for i in 0..N {
774 24000 : let i = i as u32;
775 24000 : delta_writer
776 24000 : .put_value(
777 24000 : get_key(0),
778 24000 : Lsn(i as u64 * 16 + 0x10),
779 24000 : Value::Image(get_large_img()),
780 24000 : &ctx,
781 24000 : )
782 24000 : .await
783 24000 : .unwrap();
784 12 : }
785 12 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
786 12 : assert_eq!(delta_layers.len(), 1);
787 12 : let delta_layer = delta_layers
788 12 : .into_iter()
789 12 : .next()
790 12 : .unwrap()
791 12 : .into_resident_layer();
792 12 : assert_eq!(
793 12 : delta_layer.layer_desc().key(),
794 12 : PersistentLayerKey {
795 12 : key_range: get_key(0)..get_key(1),
796 12 : lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
797 12 : is_delta: true
798 12 : }
799 12 : );
800 12 : }
801 : }
|