Line data Source code
1 : use std::{future::Future, ops::Range, sync::Arc};
2 :
3 : use bytes::Bytes;
4 : use pageserver_api::key::{Key, KEY_SIZE};
5 : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
6 :
7 : use crate::tenant::storage_layer::Layer;
8 : use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
9 :
10 : use super::layer::S3_UPLOAD_LIMIT;
11 : use super::{
12 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
13 : };
14 :
15 : pub(crate) enum BatchWriterResult {
16 : Produced(ResidentLayer),
17 : Discarded(PersistentLayerKey),
18 : }
19 :
20 : #[cfg(test)]
21 : impl BatchWriterResult {
22 24 : fn into_resident_layer(self) -> ResidentLayer {
23 24 : match self {
24 24 : BatchWriterResult::Produced(layer) => layer,
25 0 : BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
26 : }
27 24 : }
28 :
29 16 : fn into_discarded_layer(self) -> PersistentLayerKey {
30 16 : match self {
31 0 : BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
32 16 : BatchWriterResult::Discarded(layer) => layer,
33 16 : }
34 16 : }
35 : }
36 :
37 : enum LayerWriterWrapper {
38 : Image(ImageLayerWriter),
39 : Delta(DeltaLayerWriter),
40 : }
41 :
42 : /// An layer writer that takes unfinished layers and finish them atomically.
43 : #[must_use]
44 : pub struct BatchLayerWriter {
45 : generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
46 : conf: &'static PageServerConf,
47 : }
48 :
49 : impl BatchLayerWriter {
50 68 : pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
51 68 : Ok(Self {
52 68 : generated_layer_writers: Vec::new(),
53 68 : conf,
54 68 : })
55 68 : }
56 :
57 42 : pub fn add_unfinished_image_writer(
58 42 : &mut self,
59 42 : writer: ImageLayerWriter,
60 42 : key_range: Range<Key>,
61 42 : lsn: Lsn,
62 42 : ) {
63 42 : self.generated_layer_writers.push((
64 42 : LayerWriterWrapper::Image(writer),
65 42 : PersistentLayerKey {
66 42 : key_range,
67 42 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
68 42 : is_delta: false,
69 42 : },
70 42 : ));
71 42 : }
72 :
73 42 : pub fn add_unfinished_delta_writer(
74 42 : &mut self,
75 42 : writer: DeltaLayerWriter,
76 42 : key_range: Range<Key>,
77 42 : lsn_range: Range<Lsn>,
78 42 : ) {
79 42 : self.generated_layer_writers.push((
80 42 : LayerWriterWrapper::Delta(writer),
81 42 : PersistentLayerKey {
82 42 : key_range,
83 42 : lsn_range,
84 42 : is_delta: true,
85 42 : },
86 42 : ));
87 42 : }
88 :
89 60 : pub(crate) async fn finish_with_discard_fn<D, F>(
90 60 : self,
91 60 : tline: &Arc<Timeline>,
92 60 : ctx: &RequestContext,
93 60 : discard_fn: D,
94 60 : ) -> anyhow::Result<Vec<BatchWriterResult>>
95 60 : where
96 60 : D: Fn(&PersistentLayerKey) -> F,
97 60 : F: Future<Output = bool>,
98 60 : {
99 60 : let Self {
100 60 : generated_layer_writers,
101 60 : ..
102 60 : } = self;
103 60 : let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
104 0 : for produced_layer in generated_layers {
105 0 : if let BatchWriterResult::Produced(resident_layer) = produced_layer {
106 0 : let layer: Layer = resident_layer.into();
107 0 : layer.delete_on_drop();
108 0 : }
109 : }
110 0 : };
111 : // BEGIN: catch every error and do the recovery in the below section
112 60 : let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
113 144 : for (inner, layer_key) in generated_layer_writers {
114 84 : if discard_fn(&layer_key).await {
115 32 : generated_layers.push(BatchWriterResult::Discarded(layer_key));
116 32 : } else {
117 52 : let res = match inner {
118 26 : LayerWriterWrapper::Delta(writer) => {
119 68 : writer.finish(layer_key.key_range.end, ctx).await
120 : }
121 26 : LayerWriterWrapper::Image(writer) => {
122 26 : writer
123 26 : .finish_with_end_key(layer_key.key_range.end, ctx)
124 52 : .await
125 : }
126 : };
127 52 : let layer = match res {
128 52 : Ok((desc, path)) => {
129 52 : match Layer::finish_creating(self.conf, tline, desc, &path) {
130 52 : Ok(layer) => layer,
131 0 : Err(e) => {
132 0 : tokio::fs::remove_file(&path).await.ok();
133 0 : clean_up_layers(generated_layers);
134 0 : return Err(e);
135 : }
136 : }
137 : }
138 0 : Err(e) => {
139 0 : // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
140 0 : // so we don't need to remove the layer we just failed to create by ourselves.
141 0 : clean_up_layers(generated_layers);
142 0 : return Err(e);
143 : }
144 : };
145 52 : generated_layers.push(BatchWriterResult::Produced(layer));
146 : }
147 : }
148 : // END: catch every error and do the recovery in the above section
149 60 : Ok(generated_layers)
150 60 : }
151 : }
152 :
153 : /// An image writer that takes images and produces multiple image layers.
154 : #[must_use]
155 : pub struct SplitImageLayerWriter {
156 : inner: ImageLayerWriter,
157 : target_layer_size: u64,
158 : lsn: Lsn,
159 : conf: &'static PageServerConf,
160 : timeline_id: TimelineId,
161 : tenant_shard_id: TenantShardId,
162 : batches: BatchLayerWriter,
163 : start_key: Key,
164 : }
165 :
166 : impl SplitImageLayerWriter {
167 32 : pub async fn new(
168 32 : conf: &'static PageServerConf,
169 32 : timeline_id: TimelineId,
170 32 : tenant_shard_id: TenantShardId,
171 32 : start_key: Key,
172 32 : lsn: Lsn,
173 32 : target_layer_size: u64,
174 32 : ctx: &RequestContext,
175 32 : ) -> anyhow::Result<Self> {
176 32 : Ok(Self {
177 32 : target_layer_size,
178 32 : inner: ImageLayerWriter::new(
179 32 : conf,
180 32 : timeline_id,
181 32 : tenant_shard_id,
182 32 : &(start_key..Key::MAX),
183 32 : lsn,
184 32 : ctx,
185 32 : )
186 16 : .await?,
187 32 : conf,
188 32 : timeline_id,
189 32 : tenant_shard_id,
190 32 : batches: BatchLayerWriter::new(conf).await?,
191 32 : lsn,
192 32 : start_key,
193 : })
194 32 : }
195 :
196 8414 : pub async fn put_image(
197 8414 : &mut self,
198 8414 : key: Key,
199 8414 : img: Bytes,
200 8414 : ctx: &RequestContext,
201 8414 : ) -> anyhow::Result<()> {
202 8414 : // The current estimation is an upper bound of the space that the key/image could take
203 8414 : // because we did not consider compression in this estimation. The resulting image layer
204 8414 : // could be smaller than the target size.
205 8414 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
206 8414 : if self.inner.num_keys() >= 1
207 8382 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
208 : {
209 14 : let next_image_writer = ImageLayerWriter::new(
210 14 : self.conf,
211 14 : self.timeline_id,
212 14 : self.tenant_shard_id,
213 14 : &(key..Key::MAX),
214 14 : self.lsn,
215 14 : ctx,
216 14 : )
217 7 : .await?;
218 14 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
219 14 : self.batches.add_unfinished_image_writer(
220 14 : prev_image_writer,
221 14 : self.start_key..key,
222 14 : self.lsn,
223 14 : );
224 14 : self.start_key = key;
225 8400 : }
226 8541 : self.inner.put_image(key, img, ctx).await
227 8414 : }
228 :
229 28 : pub(crate) async fn finish_with_discard_fn<D, F>(
230 28 : self,
231 28 : tline: &Arc<Timeline>,
232 28 : ctx: &RequestContext,
233 28 : end_key: Key,
234 28 : discard_fn: D,
235 28 : ) -> anyhow::Result<Vec<BatchWriterResult>>
236 28 : where
237 28 : D: Fn(&PersistentLayerKey) -> F,
238 28 : F: Future<Output = bool>,
239 28 : {
240 28 : let Self {
241 28 : mut batches, inner, ..
242 28 : } = self;
243 28 : if inner.num_keys() != 0 {
244 28 : batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
245 28 : }
246 52 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
247 28 : }
248 :
249 : #[cfg(test)]
250 4 : pub(crate) async fn finish(
251 4 : self,
252 4 : tline: &Arc<Timeline>,
253 4 : ctx: &RequestContext,
254 4 : end_key: Key,
255 4 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
256 6 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
257 12 : .await
258 4 : }
259 : }
260 :
261 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
262 : ///
263 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
264 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
265 : /// will split them into multiple files based on size.
266 : #[must_use]
267 : pub struct SplitDeltaLayerWriter {
268 : inner: Option<(Key, DeltaLayerWriter)>,
269 : target_layer_size: u64,
270 : conf: &'static PageServerConf,
271 : timeline_id: TimelineId,
272 : tenant_shard_id: TenantShardId,
273 : lsn_range: Range<Lsn>,
274 : last_key_written: Key,
275 : batches: BatchLayerWriter,
276 : }
277 :
278 : impl SplitDeltaLayerWriter {
279 36 : pub async fn new(
280 36 : conf: &'static PageServerConf,
281 36 : timeline_id: TimelineId,
282 36 : tenant_shard_id: TenantShardId,
283 36 : lsn_range: Range<Lsn>,
284 36 : target_layer_size: u64,
285 36 : ) -> anyhow::Result<Self> {
286 36 : Ok(Self {
287 36 : target_layer_size,
288 36 : inner: None,
289 36 : conf,
290 36 : timeline_id,
291 36 : tenant_shard_id,
292 36 : lsn_range,
293 36 : last_key_written: Key::MIN,
294 36 : batches: BatchLayerWriter::new(conf).await?,
295 : })
296 36 : }
297 :
298 12134 : pub async fn put_value(
299 12134 : &mut self,
300 12134 : key: Key,
301 12134 : lsn: Lsn,
302 12134 : val: Value,
303 12134 : ctx: &RequestContext,
304 12134 : ) -> anyhow::Result<()> {
305 12134 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
306 12134 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
307 12134 : //
308 12134 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
309 12134 : // strategy. https://github.com/neondatabase/neon/issues/8837
310 12134 :
311 12134 : if self.inner.is_none() {
312 32 : self.inner = Some((
313 32 : key,
314 32 : DeltaLayerWriter::new(
315 32 : self.conf,
316 32 : self.timeline_id,
317 32 : self.tenant_shard_id,
318 32 : key,
319 32 : self.lsn_range.clone(),
320 32 : ctx,
321 32 : )
322 16 : .await?,
323 : ));
324 12102 : }
325 12134 : let (_, inner) = self.inner.as_mut().unwrap();
326 12134 :
327 12134 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
328 12134 : if inner.num_keys() >= 1
329 12102 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
330 : {
331 2996 : if key != self.last_key_written {
332 14 : let next_delta_writer = DeltaLayerWriter::new(
333 14 : self.conf,
334 14 : self.timeline_id,
335 14 : self.tenant_shard_id,
336 14 : key,
337 14 : self.lsn_range.clone(),
338 14 : ctx,
339 14 : )
340 7 : .await?;
341 14 : let (start_key, prev_delta_writer) =
342 14 : std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
343 14 : self.batches.add_unfinished_delta_writer(
344 14 : prev_delta_writer,
345 14 : start_key..key,
346 14 : self.lsn_range.clone(),
347 14 : );
348 2982 : } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
349 : // We have to produce a very large file b/c a key is updated too often.
350 0 : anyhow::bail!(
351 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
352 0 : key,
353 0 : inner.estimated_size()
354 0 : );
355 2982 : }
356 9138 : }
357 12134 : self.last_key_written = key;
358 12134 : let (_, inner) = self.inner.as_mut().unwrap();
359 12134 : inner.put_value(key, lsn, val, ctx).await
360 12134 : }
361 :
362 32 : pub(crate) async fn finish_with_discard_fn<D, F>(
363 32 : self,
364 32 : tline: &Arc<Timeline>,
365 32 : ctx: &RequestContext,
366 32 : discard_fn: D,
367 32 : ) -> anyhow::Result<Vec<BatchWriterResult>>
368 32 : where
369 32 : D: Fn(&PersistentLayerKey) -> F,
370 32 : F: Future<Output = bool>,
371 32 : {
372 32 : let Self {
373 32 : mut batches, inner, ..
374 32 : } = self;
375 32 : if let Some((start_key, writer)) = inner {
376 28 : if writer.num_keys() != 0 {
377 28 : let end_key = self.last_key_written.next();
378 28 : batches.add_unfinished_delta_writer(
379 28 : writer,
380 28 : start_key..end_key,
381 28 : self.lsn_range.clone(),
382 28 : );
383 28 : }
384 4 : }
385 68 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
386 32 : }
387 :
388 : #[cfg(test)]
389 6 : pub(crate) async fn finish(
390 6 : self,
391 6 : tline: &Arc<Timeline>,
392 6 : ctx: &RequestContext,
393 6 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
394 8 : self.finish_with_discard_fn(tline, ctx, |_| async { false })
395 23 : .await
396 6 : }
397 : }
398 :
399 : #[cfg(test)]
400 : mod tests {
401 : use itertools::Itertools;
402 : use rand::{RngCore, SeedableRng};
403 :
404 : use crate::{
405 : tenant::{
406 : harness::{TenantHarness, TIMELINE_ID},
407 : storage_layer::AsLayerDesc,
408 : },
409 : DEFAULT_PG_VERSION,
410 : };
411 :
412 : use super::*;
413 :
414 20052 : fn get_key(id: u32) -> Key {
415 20052 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
416 20052 : key.field6 = id;
417 20052 : key
418 20052 : }
419 :
420 8 : fn get_img(id: u32) -> Bytes {
421 8 : format!("{id:064}").into()
422 8 : }
423 :
424 20004 : fn get_large_img() -> Bytes {
425 20004 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
426 20004 : let mut data = vec![0; 8192];
427 20004 : rng.fill_bytes(&mut data);
428 20004 : data.into()
429 20004 : }
430 :
431 : #[tokio::test]
432 2 : async fn write_one_image() {
433 2 : let harness = TenantHarness::create("split_writer_write_one_image")
434 2 : .await
435 2 : .unwrap();
436 10 : let (tenant, ctx) = harness.load().await;
437 2 :
438 2 : let tline = tenant
439 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
440 4 : .await
441 2 : .unwrap();
442 2 :
443 2 : let mut image_writer = SplitImageLayerWriter::new(
444 2 : tenant.conf,
445 2 : tline.timeline_id,
446 2 : tenant.tenant_shard_id,
447 2 : get_key(0),
448 2 : Lsn(0x18),
449 2 : 4 * 1024 * 1024,
450 2 : &ctx,
451 2 : )
452 2 : .await
453 2 : .unwrap();
454 2 :
455 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
456 2 : tenant.conf,
457 2 : tline.timeline_id,
458 2 : tenant.tenant_shard_id,
459 2 : Lsn(0x18)..Lsn(0x20),
460 2 : 4 * 1024 * 1024,
461 2 : )
462 2 : .await
463 2 : .unwrap();
464 2 :
465 2 : image_writer
466 2 : .put_image(get_key(0), get_img(0), &ctx)
467 2 : .await
468 2 : .unwrap();
469 2 : let layers = image_writer
470 2 : .finish(&tline, &ctx, get_key(10))
471 4 : .await
472 2 : .unwrap();
473 2 : assert_eq!(layers.len(), 1);
474 2 :
475 2 : delta_writer
476 2 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
477 2 : .await
478 2 : .unwrap();
479 5 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
480 2 : assert_eq!(layers.len(), 1);
481 2 : assert_eq!(
482 2 : layers
483 2 : .into_iter()
484 2 : .next()
485 2 : .unwrap()
486 2 : .into_resident_layer()
487 2 : .layer_desc()
488 2 : .key(),
489 2 : PersistentLayerKey {
490 2 : key_range: get_key(0)..get_key(1),
491 2 : lsn_range: Lsn(0x18)..Lsn(0x20),
492 2 : is_delta: true
493 2 : }
494 2 : );
495 2 : }
496 :
497 : #[tokio::test]
498 2 : async fn write_split() {
499 2 : // Test the split writer with retaining all the layers we have produced (discard=false)
500 4372 : write_split_helper("split_writer_write_split", false).await;
501 2 : }
502 :
503 : #[tokio::test]
504 2 : async fn write_split_discard() {
505 2 : // Test the split writer with discarding all the layers we have produced (discard=true)
506 4336 : write_split_helper("split_writer_write_split_discard", true).await;
507 2 : }
508 :
509 : /// Test the image+delta writer by writing a large number of images and deltas. If discard is
510 : /// set to true, all layers will be discarded.
511 4 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
512 4 : let harness = TenantHarness::create(harness_name).await.unwrap();
513 20 : let (tenant, ctx) = harness.load().await;
514 :
515 4 : let tline = tenant
516 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
517 8 : .await
518 4 : .unwrap();
519 :
520 4 : let mut image_writer = SplitImageLayerWriter::new(
521 4 : tenant.conf,
522 4 : tline.timeline_id,
523 4 : tenant.tenant_shard_id,
524 4 : get_key(0),
525 4 : Lsn(0x18),
526 4 : 4 * 1024 * 1024,
527 4 : &ctx,
528 4 : )
529 2 : .await
530 4 : .unwrap();
531 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
532 4 : tenant.conf,
533 4 : tline.timeline_id,
534 4 : tenant.tenant_shard_id,
535 4 : Lsn(0x18)..Lsn(0x20),
536 4 : 4 * 1024 * 1024,
537 4 : )
538 0 : .await
539 4 : .unwrap();
540 : const N: usize = 2000;
541 8004 : for i in 0..N {
542 8000 : let i = i as u32;
543 8000 : image_writer
544 8000 : .put_image(get_key(i), get_large_img(), &ctx)
545 8130 : .await
546 8000 : .unwrap();
547 8000 : delta_writer
548 8000 : .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
549 512 : .await
550 8000 : .unwrap();
551 : }
552 4 : let image_layers = image_writer
553 16 : .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
554 16 : .await
555 4 : .unwrap();
556 4 : let delta_layers = delta_writer
557 16 : .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
558 20 : .await
559 4 : .unwrap();
560 4 : let image_layers = image_layers
561 4 : .into_iter()
562 16 : .map(|x| {
563 16 : if discard {
564 8 : x.into_discarded_layer()
565 : } else {
566 8 : x.into_resident_layer().layer_desc().key()
567 : }
568 16 : })
569 4 : .collect_vec();
570 4 : let delta_layers = delta_layers
571 4 : .into_iter()
572 16 : .map(|x| {
573 16 : if discard {
574 8 : x.into_discarded_layer()
575 : } else {
576 8 : x.into_resident_layer().layer_desc().key()
577 : }
578 16 : })
579 4 : .collect_vec();
580 4 : assert_eq!(image_layers.len(), N / 512 + 1);
581 4 : assert_eq!(delta_layers.len(), N / 512 + 1);
582 4 : assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
583 4 : assert_eq!(
584 4 : delta_layers.last().unwrap().key_range.end,
585 4 : get_key(N as u32)
586 4 : );
587 16 : for idx in 0..image_layers.len() {
588 16 : assert_ne!(image_layers[idx].key_range.start, Key::MIN);
589 16 : assert_ne!(image_layers[idx].key_range.end, Key::MAX);
590 16 : assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
591 16 : assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
592 16 : if idx > 0 {
593 12 : assert_eq!(
594 12 : image_layers[idx - 1].key_range.end,
595 12 : image_layers[idx].key_range.start
596 12 : );
597 12 : assert_eq!(
598 12 : delta_layers[idx - 1].key_range.end,
599 12 : delta_layers[idx].key_range.start
600 12 : );
601 4 : }
602 : }
603 4 : }
604 :
605 : #[tokio::test]
606 2 : async fn write_large_img() {
607 2 : let harness = TenantHarness::create("split_writer_write_large_img")
608 2 : .await
609 2 : .unwrap();
610 10 : let (tenant, ctx) = harness.load().await;
611 2 :
612 2 : let tline = tenant
613 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
614 4 : .await
615 2 : .unwrap();
616 2 :
617 2 : let mut image_writer = SplitImageLayerWriter::new(
618 2 : tenant.conf,
619 2 : tline.timeline_id,
620 2 : tenant.tenant_shard_id,
621 2 : get_key(0),
622 2 : Lsn(0x18),
623 2 : 4 * 1024,
624 2 : &ctx,
625 2 : )
626 2 : .await
627 2 : .unwrap();
628 2 :
629 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
630 2 : tenant.conf,
631 2 : tline.timeline_id,
632 2 : tenant.tenant_shard_id,
633 2 : Lsn(0x18)..Lsn(0x20),
634 2 : 4 * 1024,
635 2 : )
636 2 : .await
637 2 : .unwrap();
638 2 :
639 2 : image_writer
640 2 : .put_image(get_key(0), get_img(0), &ctx)
641 2 : .await
642 2 : .unwrap();
643 2 : image_writer
644 2 : .put_image(get_key(1), get_large_img(), &ctx)
645 3 : .await
646 2 : .unwrap();
647 2 : let layers = image_writer
648 2 : .finish(&tline, &ctx, get_key(10))
649 8 : .await
650 2 : .unwrap();
651 2 : assert_eq!(layers.len(), 2);
652 2 :
653 2 : delta_writer
654 2 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
655 2 : .await
656 2 : .unwrap();
657 2 : delta_writer
658 2 : .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
659 2 : .await
660 2 : .unwrap();
661 10 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
662 2 : assert_eq!(layers.len(), 2);
663 2 : let mut layers_iter = layers.into_iter();
664 2 : assert_eq!(
665 2 : layers_iter
666 2 : .next()
667 2 : .unwrap()
668 2 : .into_resident_layer()
669 2 : .layer_desc()
670 2 : .key(),
671 2 : PersistentLayerKey {
672 2 : key_range: get_key(0)..get_key(1),
673 2 : lsn_range: Lsn(0x18)..Lsn(0x20),
674 2 : is_delta: true
675 2 : }
676 2 : );
677 2 : assert_eq!(
678 2 : layers_iter
679 2 : .next()
680 2 : .unwrap()
681 2 : .into_resident_layer()
682 2 : .layer_desc()
683 2 : .key(),
684 2 : PersistentLayerKey {
685 2 : key_range: get_key(1)..get_key(2),
686 2 : lsn_range: Lsn(0x18)..Lsn(0x20),
687 2 : is_delta: true
688 2 : }
689 2 : );
690 2 : }
691 :
692 : #[tokio::test]
693 2 : async fn write_split_single_key() {
694 2 : let harness = TenantHarness::create("split_writer_write_split_single_key")
695 2 : .await
696 2 : .unwrap();
697 10 : let (tenant, ctx) = harness.load().await;
698 2 :
699 2 : let tline = tenant
700 2 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
701 4 : .await
702 2 : .unwrap();
703 2 :
704 2 : const N: usize = 2000;
705 2 : let mut delta_writer = SplitDeltaLayerWriter::new(
706 2 : tenant.conf,
707 2 : tline.timeline_id,
708 2 : tenant.tenant_shard_id,
709 2 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
710 2 : 4 * 1024 * 1024,
711 2 : )
712 2 : .await
713 2 : .unwrap();
714 2 :
715 4002 : for i in 0..N {
716 4000 : let i = i as u32;
717 4000 : delta_writer
718 4000 : .put_value(
719 4000 : get_key(0),
720 4000 : Lsn(i as u64 * 16 + 0x10),
721 4000 : Value::Image(get_large_img()),
722 4000 : &ctx,
723 4000 : )
724 254 : .await
725 4000 : .unwrap();
726 2 : }
727 8 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
728 2 : assert_eq!(delta_layers.len(), 1);
729 2 : let delta_layer = delta_layers
730 2 : .into_iter()
731 2 : .next()
732 2 : .unwrap()
733 2 : .into_resident_layer();
734 2 : assert_eq!(
735 2 : delta_layer.layer_desc().key(),
736 2 : PersistentLayerKey {
737 2 : key_range: get_key(0)..get_key(1),
738 2 : lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
739 2 : is_delta: true
740 2 : }
741 2 : );
742 2 : }
743 : }
|