Line data Source code
1 : use std::{future::Future, ops::Range, sync::Arc};
2 :
3 : use bytes::Bytes;
4 : use pageserver_api::key::{Key, KEY_SIZE};
5 : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
6 :
7 : use crate::tenant::storage_layer::Layer;
8 : use crate::{config::PageServerConf, context::RequestContext, tenant::Timeline};
9 : use pageserver_api::value::Value;
10 :
11 : use super::layer::S3_UPLOAD_LIMIT;
12 : use super::{
13 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
14 : };
15 :
16 : pub(crate) enum BatchWriterResult {
17 : Produced(ResidentLayer),
18 : Discarded(PersistentLayerKey),
19 : }
20 :
21 : #[cfg(test)]
22 : impl BatchWriterResult {
23 48 : fn into_resident_layer(self) -> ResidentLayer {
24 48 : match self {
25 48 : BatchWriterResult::Produced(layer) => layer,
26 0 : BatchWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
27 : }
28 48 : }
29 :
30 32 : fn into_discarded_layer(self) -> PersistentLayerKey {
31 32 : match self {
32 0 : BatchWriterResult::Produced(_) => panic!("unexpected produced layer"),
33 32 : BatchWriterResult::Discarded(layer) => layer,
34 32 : }
35 32 : }
36 : }
37 :
38 : enum LayerWriterWrapper {
39 : Image(ImageLayerWriter),
40 : Delta(DeltaLayerWriter),
41 : }
42 :
43 : /// An layer writer that takes unfinished layers and finish them atomically.
44 : #[must_use]
45 : pub struct BatchLayerWriter {
46 : generated_layer_writers: Vec<(LayerWriterWrapper, PersistentLayerKey)>,
47 : conf: &'static PageServerConf,
48 : }
49 :
50 : impl BatchLayerWriter {
51 1360 : pub async fn new(conf: &'static PageServerConf) -> anyhow::Result<Self> {
52 1360 : Ok(Self {
53 1360 : generated_layer_writers: Vec::new(),
54 1360 : conf,
55 1360 : })
56 1360 : }
57 :
58 604 : pub fn add_unfinished_image_writer(
59 604 : &mut self,
60 604 : writer: ImageLayerWriter,
61 604 : key_range: Range<Key>,
62 604 : lsn: Lsn,
63 604 : ) {
64 604 : self.generated_layer_writers.push((
65 604 : LayerWriterWrapper::Image(writer),
66 604 : PersistentLayerKey {
67 604 : key_range,
68 604 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
69 604 : is_delta: false,
70 604 : },
71 604 : ));
72 604 : }
73 :
74 120 : pub fn add_unfinished_delta_writer(
75 120 : &mut self,
76 120 : writer: DeltaLayerWriter,
77 120 : key_range: Range<Key>,
78 120 : lsn_range: Range<Lsn>,
79 120 : ) {
80 120 : self.generated_layer_writers.push((
81 120 : LayerWriterWrapper::Delta(writer),
82 120 : PersistentLayerKey {
83 120 : key_range,
84 120 : lsn_range,
85 120 : is_delta: true,
86 120 : },
87 120 : ));
88 120 : }
89 :
90 1136 : pub(crate) async fn finish(
91 1136 : self,
92 1136 : tline: &Arc<Timeline>,
93 1136 : ctx: &RequestContext,
94 1136 : ) -> anyhow::Result<Vec<ResidentLayer>> {
95 1136 : let res = self
96 1136 : .finish_with_discard_fn(tline, ctx, |_| async { false })
97 1136 : .await?;
98 1136 : let mut output = Vec::new();
99 1620 : for r in res {
100 484 : if let BatchWriterResult::Produced(layer) = r {
101 484 : output.push(layer);
102 484 : }
103 : }
104 1136 : Ok(output)
105 1136 : }
106 :
107 1344 : pub(crate) async fn finish_with_discard_fn<D, F>(
108 1344 : self,
109 1344 : tline: &Arc<Timeline>,
110 1344 : ctx: &RequestContext,
111 1344 : discard_fn: D,
112 1344 : ) -> anyhow::Result<Vec<BatchWriterResult>>
113 1344 : where
114 1344 : D: Fn(&PersistentLayerKey) -> F,
115 1344 : F: Future<Output = bool>,
116 1344 : {
117 1344 : let Self {
118 1344 : generated_layer_writers,
119 1344 : ..
120 1344 : } = self;
121 1344 : let clean_up_layers = |generated_layers: Vec<BatchWriterResult>| {
122 0 : for produced_layer in generated_layers {
123 0 : if let BatchWriterResult::Produced(resident_layer) = produced_layer {
124 0 : let layer: Layer = resident_layer.into();
125 0 : layer.delete_on_drop();
126 0 : }
127 : }
128 0 : };
129 : // BEGIN: catch every error and do the recovery in the below section
130 1344 : let mut generated_layers: Vec<BatchWriterResult> = Vec::new();
131 2068 : for (inner, layer_key) in generated_layer_writers {
132 724 : if discard_fn(&layer_key).await {
133 76 : generated_layers.push(BatchWriterResult::Discarded(layer_key));
134 76 : } else {
135 648 : let res = match inner {
136 76 : LayerWriterWrapper::Delta(writer) => {
137 76 : writer.finish(layer_key.key_range.end, ctx).await
138 : }
139 572 : LayerWriterWrapper::Image(writer) => {
140 572 : writer
141 572 : .finish_with_end_key(layer_key.key_range.end, ctx)
142 572 : .await
143 : }
144 : };
145 648 : let layer = match res {
146 648 : Ok((desc, path)) => {
147 648 : match Layer::finish_creating(self.conf, tline, desc, &path) {
148 648 : Ok(layer) => layer,
149 0 : Err(e) => {
150 0 : tokio::fs::remove_file(&path).await.ok();
151 0 : clean_up_layers(generated_layers);
152 0 : return Err(e);
153 : }
154 : }
155 : }
156 0 : Err(e) => {
157 0 : // Image/DeltaLayerWriter::finish will clean up the temporary layer if anything goes wrong,
158 0 : // so we don't need to remove the layer we just failed to create by ourselves.
159 0 : clean_up_layers(generated_layers);
160 0 : return Err(e);
161 : }
162 : };
163 648 : generated_layers.push(BatchWriterResult::Produced(layer));
164 : }
165 : }
166 : // END: catch every error and do the recovery in the above section
167 1344 : Ok(generated_layers)
168 1344 : }
169 : }
170 :
171 : /// An image writer that takes images and produces multiple image layers.
172 : #[must_use]
173 : pub struct SplitImageLayerWriter {
174 : inner: ImageLayerWriter,
175 : target_layer_size: u64,
176 : lsn: Lsn,
177 : conf: &'static PageServerConf,
178 : timeline_id: TimelineId,
179 : tenant_shard_id: TenantShardId,
180 : batches: BatchLayerWriter,
181 : start_key: Key,
182 : }
183 :
184 : impl SplitImageLayerWriter {
185 100 : pub async fn new(
186 100 : conf: &'static PageServerConf,
187 100 : timeline_id: TimelineId,
188 100 : tenant_shard_id: TenantShardId,
189 100 : start_key: Key,
190 100 : lsn: Lsn,
191 100 : target_layer_size: u64,
192 100 : ctx: &RequestContext,
193 100 : ) -> anyhow::Result<Self> {
194 100 : Ok(Self {
195 100 : target_layer_size,
196 100 : inner: ImageLayerWriter::new(
197 100 : conf,
198 100 : timeline_id,
199 100 : tenant_shard_id,
200 100 : &(start_key..Key::MAX),
201 100 : lsn,
202 100 : ctx,
203 100 : )
204 100 : .await?,
205 100 : conf,
206 100 : timeline_id,
207 100 : tenant_shard_id,
208 100 : batches: BatchLayerWriter::new(conf).await?,
209 100 : lsn,
210 100 : start_key,
211 : })
212 100 : }
213 :
214 17180 : pub async fn put_image(
215 17180 : &mut self,
216 17180 : key: Key,
217 17180 : img: Bytes,
218 17180 : ctx: &RequestContext,
219 17180 : ) -> anyhow::Result<()> {
220 17180 : // The current estimation is an upper bound of the space that the key/image could take
221 17180 : // because we did not consider compression in this estimation. The resulting image layer
222 17180 : // could be smaller than the target size.
223 17180 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
224 17180 : if self.inner.num_keys() >= 1
225 17080 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
226 : {
227 28 : let next_image_writer = ImageLayerWriter::new(
228 28 : self.conf,
229 28 : self.timeline_id,
230 28 : self.tenant_shard_id,
231 28 : &(key..Key::MAX),
232 28 : self.lsn,
233 28 : ctx,
234 28 : )
235 28 : .await?;
236 28 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
237 28 : self.batches.add_unfinished_image_writer(
238 28 : prev_image_writer,
239 28 : self.start_key..key,
240 28 : self.lsn,
241 28 : );
242 28 : self.start_key = key;
243 17152 : }
244 17180 : self.inner.put_image(key, img, ctx).await
245 17180 : }
246 :
247 92 : pub(crate) async fn finish_with_discard_fn<D, F>(
248 92 : self,
249 92 : tline: &Arc<Timeline>,
250 92 : ctx: &RequestContext,
251 92 : end_key: Key,
252 92 : discard_fn: D,
253 92 : ) -> anyhow::Result<Vec<BatchWriterResult>>
254 92 : where
255 92 : D: Fn(&PersistentLayerKey) -> F,
256 92 : F: Future<Output = bool>,
257 92 : {
258 92 : let Self {
259 92 : mut batches, inner, ..
260 92 : } = self;
261 92 : if inner.num_keys() != 0 {
262 92 : batches.add_unfinished_image_writer(inner, self.start_key..end_key, self.lsn);
263 92 : }
264 92 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
265 92 : }
266 :
267 : #[cfg(test)]
268 8 : pub(crate) async fn finish(
269 8 : self,
270 8 : tline: &Arc<Timeline>,
271 8 : ctx: &RequestContext,
272 8 : end_key: Key,
273 8 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
274 12 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
275 8 : .await
276 8 : }
277 : }
278 :
279 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
280 : ///
281 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
282 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
283 : /// will split them into multiple files based on size.
284 : #[must_use]
285 : pub struct SplitDeltaLayerWriter {
286 : inner: Option<(Key, DeltaLayerWriter)>,
287 : target_layer_size: u64,
288 : conf: &'static PageServerConf,
289 : timeline_id: TimelineId,
290 : tenant_shard_id: TenantShardId,
291 : lsn_range: Range<Lsn>,
292 : last_key_written: Key,
293 : batches: BatchLayerWriter,
294 : }
295 :
296 : impl SplitDeltaLayerWriter {
297 124 : pub async fn new(
298 124 : conf: &'static PageServerConf,
299 124 : timeline_id: TimelineId,
300 124 : tenant_shard_id: TenantShardId,
301 124 : lsn_range: Range<Lsn>,
302 124 : target_layer_size: u64,
303 124 : ) -> anyhow::Result<Self> {
304 124 : Ok(Self {
305 124 : target_layer_size,
306 124 : inner: None,
307 124 : conf,
308 124 : timeline_id,
309 124 : tenant_shard_id,
310 124 : lsn_range,
311 124 : last_key_written: Key::MIN,
312 124 : batches: BatchLayerWriter::new(conf).await?,
313 : })
314 124 : }
315 :
316 24416 : pub async fn put_value(
317 24416 : &mut self,
318 24416 : key: Key,
319 24416 : lsn: Lsn,
320 24416 : val: Value,
321 24416 : ctx: &RequestContext,
322 24416 : ) -> anyhow::Result<()> {
323 24416 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
324 24416 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
325 24416 : //
326 24416 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
327 24416 : // strategy. https://github.com/neondatabase/neon/issues/8837
328 24416 :
329 24416 : if self.inner.is_none() {
330 100 : self.inner = Some((
331 100 : key,
332 100 : DeltaLayerWriter::new(
333 100 : self.conf,
334 100 : self.timeline_id,
335 100 : self.tenant_shard_id,
336 100 : key,
337 100 : self.lsn_range.clone(),
338 100 : ctx,
339 100 : )
340 100 : .await?,
341 : ));
342 24316 : }
343 24416 : let (_, inner) = self.inner.as_mut().unwrap();
344 24416 :
345 24416 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
346 24416 : if inner.num_keys() >= 1
347 24316 : && inner.estimated_size() + addition_size_estimation >= self.target_layer_size
348 : {
349 5992 : if key != self.last_key_written {
350 28 : let next_delta_writer = DeltaLayerWriter::new(
351 28 : self.conf,
352 28 : self.timeline_id,
353 28 : self.tenant_shard_id,
354 28 : key,
355 28 : self.lsn_range.clone(),
356 28 : ctx,
357 28 : )
358 28 : .await?;
359 28 : let (start_key, prev_delta_writer) =
360 28 : std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
361 28 : self.batches.add_unfinished_delta_writer(
362 28 : prev_delta_writer,
363 28 : start_key..key,
364 28 : self.lsn_range.clone(),
365 28 : );
366 5964 : } else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
367 : // We have to produce a very large file b/c a key is updated too often.
368 0 : anyhow::bail!(
369 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
370 0 : key,
371 0 : inner.estimated_size()
372 0 : );
373 5964 : }
374 18424 : }
375 24416 : self.last_key_written = key;
376 24416 : let (_, inner) = self.inner.as_mut().unwrap();
377 24416 : inner.put_value(key, lsn, val, ctx).await
378 24416 : }
379 :
380 116 : pub(crate) async fn finish_with_discard_fn<D, F>(
381 116 : self,
382 116 : tline: &Arc<Timeline>,
383 116 : ctx: &RequestContext,
384 116 : discard_fn: D,
385 116 : ) -> anyhow::Result<Vec<BatchWriterResult>>
386 116 : where
387 116 : D: Fn(&PersistentLayerKey) -> F,
388 116 : F: Future<Output = bool>,
389 116 : {
390 116 : let Self {
391 116 : mut batches, inner, ..
392 116 : } = self;
393 116 : if let Some((start_key, writer)) = inner {
394 92 : if writer.num_keys() != 0 {
395 92 : let end_key = self.last_key_written.next();
396 92 : batches.add_unfinished_delta_writer(
397 92 : writer,
398 92 : start_key..end_key,
399 92 : self.lsn_range.clone(),
400 92 : );
401 92 : }
402 24 : }
403 116 : batches.finish_with_discard_fn(tline, ctx, discard_fn).await
404 116 : }
405 :
406 : #[cfg(test)]
407 12 : pub(crate) async fn finish(
408 12 : self,
409 12 : tline: &Arc<Timeline>,
410 12 : ctx: &RequestContext,
411 12 : ) -> anyhow::Result<Vec<BatchWriterResult>> {
412 16 : self.finish_with_discard_fn(tline, ctx, |_| async { false })
413 12 : .await
414 12 : }
415 : }
416 :
417 : #[cfg(test)]
418 : mod tests {
419 : use itertools::Itertools;
420 : use rand::{RngCore, SeedableRng};
421 :
422 : use crate::{
423 : tenant::{
424 : harness::{TenantHarness, TIMELINE_ID},
425 : storage_layer::AsLayerDesc,
426 : },
427 : DEFAULT_PG_VERSION,
428 : };
429 :
430 : use super::*;
431 :
432 40104 : fn get_key(id: u32) -> Key {
433 40104 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
434 40104 : key.field6 = id;
435 40104 : key
436 40104 : }
437 :
438 16 : fn get_img(id: u32) -> Bytes {
439 16 : format!("{id:064}").into()
440 16 : }
441 :
442 40008 : fn get_large_img() -> Bytes {
443 40008 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
444 40008 : let mut data = vec![0; 8192];
445 40008 : rng.fill_bytes(&mut data);
446 40008 : data.into()
447 40008 : }
448 :
449 : #[tokio::test]
450 4 : async fn write_one_image() {
451 4 : let harness = TenantHarness::create("split_writer_write_one_image")
452 4 : .await
453 4 : .unwrap();
454 4 : let (tenant, ctx) = harness.load().await;
455 4 :
456 4 : let tline = tenant
457 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
458 4 : .await
459 4 : .unwrap();
460 4 :
461 4 : let mut image_writer = SplitImageLayerWriter::new(
462 4 : tenant.conf,
463 4 : tline.timeline_id,
464 4 : tenant.tenant_shard_id,
465 4 : get_key(0),
466 4 : Lsn(0x18),
467 4 : 4 * 1024 * 1024,
468 4 : &ctx,
469 4 : )
470 4 : .await
471 4 : .unwrap();
472 4 :
473 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
474 4 : tenant.conf,
475 4 : tline.timeline_id,
476 4 : tenant.tenant_shard_id,
477 4 : Lsn(0x18)..Lsn(0x20),
478 4 : 4 * 1024 * 1024,
479 4 : )
480 4 : .await
481 4 : .unwrap();
482 4 :
483 4 : image_writer
484 4 : .put_image(get_key(0), get_img(0), &ctx)
485 4 : .await
486 4 : .unwrap();
487 4 : let layers = image_writer
488 4 : .finish(&tline, &ctx, get_key(10))
489 4 : .await
490 4 : .unwrap();
491 4 : assert_eq!(layers.len(), 1);
492 4 :
493 4 : delta_writer
494 4 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
495 4 : .await
496 4 : .unwrap();
497 4 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
498 4 : assert_eq!(layers.len(), 1);
499 4 : assert_eq!(
500 4 : layers
501 4 : .into_iter()
502 4 : .next()
503 4 : .unwrap()
504 4 : .into_resident_layer()
505 4 : .layer_desc()
506 4 : .key(),
507 4 : PersistentLayerKey {
508 4 : key_range: get_key(0)..get_key(1),
509 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
510 4 : is_delta: true
511 4 : }
512 4 : );
513 4 : }
514 :
515 : #[tokio::test]
516 4 : async fn write_split() {
517 4 : // Test the split writer with retaining all the layers we have produced (discard=false)
518 4 : write_split_helper("split_writer_write_split", false).await;
519 4 : }
520 :
521 : #[tokio::test]
522 4 : async fn write_split_discard() {
523 4 : // Test the split writer with discarding all the layers we have produced (discard=true)
524 4 : write_split_helper("split_writer_write_split_discard", true).await;
525 4 : }
526 :
527 : /// Test the image+delta writer by writing a large number of images and deltas. If discard is
528 : /// set to true, all layers will be discarded.
529 8 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
530 8 : let harness = TenantHarness::create(harness_name).await.unwrap();
531 8 : let (tenant, ctx) = harness.load().await;
532 :
533 8 : let tline = tenant
534 8 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
535 8 : .await
536 8 : .unwrap();
537 :
538 8 : let mut image_writer = SplitImageLayerWriter::new(
539 8 : tenant.conf,
540 8 : tline.timeline_id,
541 8 : tenant.tenant_shard_id,
542 8 : get_key(0),
543 8 : Lsn(0x18),
544 8 : 4 * 1024 * 1024,
545 8 : &ctx,
546 8 : )
547 8 : .await
548 8 : .unwrap();
549 8 : let mut delta_writer = SplitDeltaLayerWriter::new(
550 8 : tenant.conf,
551 8 : tline.timeline_id,
552 8 : tenant.tenant_shard_id,
553 8 : Lsn(0x18)..Lsn(0x20),
554 8 : 4 * 1024 * 1024,
555 8 : )
556 8 : .await
557 8 : .unwrap();
558 : const N: usize = 2000;
559 16008 : for i in 0..N {
560 16000 : let i = i as u32;
561 16000 : image_writer
562 16000 : .put_image(get_key(i), get_large_img(), &ctx)
563 16000 : .await
564 16000 : .unwrap();
565 16000 : delta_writer
566 16000 : .put_value(get_key(i), Lsn(0x20), Value::Image(get_large_img()), &ctx)
567 16000 : .await
568 16000 : .unwrap();
569 : }
570 8 : let image_layers = image_writer
571 32 : .finish_with_discard_fn(&tline, &ctx, get_key(N as u32), |_| async { discard })
572 8 : .await
573 8 : .unwrap();
574 8 : let delta_layers = delta_writer
575 32 : .finish_with_discard_fn(&tline, &ctx, |_| async { discard })
576 8 : .await
577 8 : .unwrap();
578 8 : let image_layers = image_layers
579 8 : .into_iter()
580 32 : .map(|x| {
581 32 : if discard {
582 16 : x.into_discarded_layer()
583 : } else {
584 16 : x.into_resident_layer().layer_desc().key()
585 : }
586 32 : })
587 8 : .collect_vec();
588 8 : let delta_layers = delta_layers
589 8 : .into_iter()
590 32 : .map(|x| {
591 32 : if discard {
592 16 : x.into_discarded_layer()
593 : } else {
594 16 : x.into_resident_layer().layer_desc().key()
595 : }
596 32 : })
597 8 : .collect_vec();
598 8 : assert_eq!(image_layers.len(), N / 512 + 1);
599 8 : assert_eq!(delta_layers.len(), N / 512 + 1);
600 8 : assert_eq!(delta_layers.first().unwrap().key_range.start, get_key(0));
601 8 : assert_eq!(
602 8 : delta_layers.last().unwrap().key_range.end,
603 8 : get_key(N as u32)
604 8 : );
605 32 : for idx in 0..image_layers.len() {
606 32 : assert_ne!(image_layers[idx].key_range.start, Key::MIN);
607 32 : assert_ne!(image_layers[idx].key_range.end, Key::MAX);
608 32 : assert_ne!(delta_layers[idx].key_range.start, Key::MIN);
609 32 : assert_ne!(delta_layers[idx].key_range.end, Key::MAX);
610 32 : if idx > 0 {
611 24 : assert_eq!(
612 24 : image_layers[idx - 1].key_range.end,
613 24 : image_layers[idx].key_range.start
614 24 : );
615 24 : assert_eq!(
616 24 : delta_layers[idx - 1].key_range.end,
617 24 : delta_layers[idx].key_range.start
618 24 : );
619 8 : }
620 : }
621 8 : }
622 :
623 : #[tokio::test]
624 4 : async fn write_large_img() {
625 4 : let harness = TenantHarness::create("split_writer_write_large_img")
626 4 : .await
627 4 : .unwrap();
628 4 : let (tenant, ctx) = harness.load().await;
629 4 :
630 4 : let tline = tenant
631 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
632 4 : .await
633 4 : .unwrap();
634 4 :
635 4 : let mut image_writer = SplitImageLayerWriter::new(
636 4 : tenant.conf,
637 4 : tline.timeline_id,
638 4 : tenant.tenant_shard_id,
639 4 : get_key(0),
640 4 : Lsn(0x18),
641 4 : 4 * 1024,
642 4 : &ctx,
643 4 : )
644 4 : .await
645 4 : .unwrap();
646 4 :
647 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
648 4 : tenant.conf,
649 4 : tline.timeline_id,
650 4 : tenant.tenant_shard_id,
651 4 : Lsn(0x18)..Lsn(0x20),
652 4 : 4 * 1024,
653 4 : )
654 4 : .await
655 4 : .unwrap();
656 4 :
657 4 : image_writer
658 4 : .put_image(get_key(0), get_img(0), &ctx)
659 4 : .await
660 4 : .unwrap();
661 4 : image_writer
662 4 : .put_image(get_key(1), get_large_img(), &ctx)
663 4 : .await
664 4 : .unwrap();
665 4 : let layers = image_writer
666 4 : .finish(&tline, &ctx, get_key(10))
667 4 : .await
668 4 : .unwrap();
669 4 : assert_eq!(layers.len(), 2);
670 4 :
671 4 : delta_writer
672 4 : .put_value(get_key(0), Lsn(0x18), Value::Image(get_img(0)), &ctx)
673 4 : .await
674 4 : .unwrap();
675 4 : delta_writer
676 4 : .put_value(get_key(1), Lsn(0x1A), Value::Image(get_large_img()), &ctx)
677 4 : .await
678 4 : .unwrap();
679 4 : let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
680 4 : assert_eq!(layers.len(), 2);
681 4 : let mut layers_iter = layers.into_iter();
682 4 : assert_eq!(
683 4 : layers_iter
684 4 : .next()
685 4 : .unwrap()
686 4 : .into_resident_layer()
687 4 : .layer_desc()
688 4 : .key(),
689 4 : PersistentLayerKey {
690 4 : key_range: get_key(0)..get_key(1),
691 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
692 4 : is_delta: true
693 4 : }
694 4 : );
695 4 : assert_eq!(
696 4 : layers_iter
697 4 : .next()
698 4 : .unwrap()
699 4 : .into_resident_layer()
700 4 : .layer_desc()
701 4 : .key(),
702 4 : PersistentLayerKey {
703 4 : key_range: get_key(1)..get_key(2),
704 4 : lsn_range: Lsn(0x18)..Lsn(0x20),
705 4 : is_delta: true
706 4 : }
707 4 : );
708 4 : }
709 :
710 : #[tokio::test]
711 4 : async fn write_split_single_key() {
712 4 : let harness = TenantHarness::create("split_writer_write_split_single_key")
713 4 : .await
714 4 : .unwrap();
715 4 : let (tenant, ctx) = harness.load().await;
716 4 :
717 4 : let tline = tenant
718 4 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
719 4 : .await
720 4 : .unwrap();
721 4 :
722 4 : const N: usize = 2000;
723 4 : let mut delta_writer = SplitDeltaLayerWriter::new(
724 4 : tenant.conf,
725 4 : tline.timeline_id,
726 4 : tenant.tenant_shard_id,
727 4 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
728 4 : 4 * 1024 * 1024,
729 4 : )
730 4 : .await
731 4 : .unwrap();
732 4 :
733 8004 : for i in 0..N {
734 8000 : let i = i as u32;
735 8000 : delta_writer
736 8000 : .put_value(
737 8000 : get_key(0),
738 8000 : Lsn(i as u64 * 16 + 0x10),
739 8000 : Value::Image(get_large_img()),
740 8000 : &ctx,
741 8000 : )
742 8000 : .await
743 8000 : .unwrap();
744 4 : }
745 4 : let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
746 4 : assert_eq!(delta_layers.len(), 1);
747 4 : let delta_layer = delta_layers
748 4 : .into_iter()
749 4 : .next()
750 4 : .unwrap()
751 4 : .into_resident_layer();
752 4 : assert_eq!(
753 4 : delta_layer.layer_desc().key(),
754 4 : PersistentLayerKey {
755 4 : key_range: get_key(0)..get_key(1),
756 4 : lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
757 4 : is_delta: true
758 4 : }
759 4 : );
760 4 : }
761 : }
|