Line data Source code
1 : use std::{future::Future, ops::Range, sync::Arc};
2 :
3 : use bytes::Bytes;
4 : use pageserver_api::key::{Key, KEY_SIZE};
5 : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
6 :
7 : use crate::tenant::storage_layer::Layer;
8 : use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
9 :
10 : use super::layer::S3_UPLOAD_LIMIT;
11 : use super::{
12 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
13 : };
14 :
15 : pub(crate) enum SplitWriterResult {
16 : Produced(ResidentLayer),
17 : Discarded(PersistentLayerKey),
18 : }
19 :
20 : #[cfg(test)]
21 : impl SplitWriterResult {
22 96 : fn into_resident_layer(self) -> ResidentLayer {
23 96 : match self {
24 96 : SplitWriterResult::Produced(layer) => layer,
25 0 : SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
26 : }
27 96 : }
28 :
29 0 : fn into_discarded_layer(self) -> PersistentLayerKey {
30 0 : match self {
31 0 : SplitWriterResult::Produced(_) => panic!("unexpected produced layer"),
32 0 : SplitWriterResult::Discarded(layer) => layer,
33 0 : }
34 0 : }
35 : }
36 :
37 : /// An image writer that takes images and produces multiple image layers.
38 : ///
39 : /// The interface does not guarantee atomicity (i.e., if the image layer generation
40 : /// fails, there might be leftover files to be cleaned up)
41 : #[must_use]
42 : pub struct SplitImageLayerWriter {
43 : inner: ImageLayerWriter,
44 : target_layer_size: u64,
45 : generated_layers: Vec<SplitWriterResult>,
46 : conf: &'static PageServerConf,
47 : timeline_id: TimelineId,
48 : tenant_shard_id: TenantShardId,
49 : lsn: Lsn,
50 : start_key: Key,
51 : }
52 :
53 : impl SplitImageLayerWriter {
54 96 : pub async fn new(
55 96 : conf: &'static PageServerConf,
56 96 : timeline_id: TimelineId,
57 96 : tenant_shard_id: TenantShardId,
58 96 : start_key: Key,
59 96 : lsn: Lsn,
60 96 : target_layer_size: u64,
61 96 : ctx: &RequestContext,
62 96 : ) -> anyhow::Result<Self> {
63 96 : Ok(Self {
64 96 : target_layer_size,
65 96 : inner: ImageLayerWriter::new(
66 96 : conf,
67 96 : timeline_id,
68 96 : tenant_shard_id,
69 96 : &(start_key..Key::MAX),
70 96 : lsn,
71 96 : ctx,
72 96 : )
73 48 : .await?,
74 96 : generated_layers: Vec::new(),
75 96 : conf,
76 96 : timeline_id,
77 96 : tenant_shard_id,
78 96 : lsn,
79 96 : start_key,
80 : })
81 96 : }
82 :
83 25242 : pub async fn put_image_with_discard_fn<D, F>(
84 25242 : &mut self,
85 25242 : key: Key,
86 25242 : img: Bytes,
87 25242 : tline: &Arc<Timeline>,
88 25242 : ctx: &RequestContext,
89 25242 : discard: D,
90 25242 : ) -> anyhow::Result<()>
91 25242 : where
92 25242 : D: FnOnce(&PersistentLayerKey) -> F,
93 25242 : F: Future<Output = bool>,
94 25242 : {
95 25242 : // The current estimation is an upper bound of the space that the key/image could take
96 25242 : // because we did not consider compression in this estimation. The resulting image layer
97 25242 : // could be smaller than the target size.
98 25242 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
99 25242 : if self.inner.num_keys() >= 1
100 25146 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
101 : {
102 42 : let next_image_writer = ImageLayerWriter::new(
103 42 : self.conf,
104 42 : self.timeline_id,
105 42 : self.tenant_shard_id,
106 42 : &(key..Key::MAX),
107 42 : self.lsn,
108 42 : ctx,
109 42 : )
110 21 : .await?;
111 42 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
112 42 : let layer_key = PersistentLayerKey {
113 42 : key_range: self.start_key..key,
114 42 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
115 42 : is_delta: false,
116 42 : };
117 42 : self.start_key = key;
118 42 :
119 42 : if discard(&layer_key).await {
120 0 : drop(prev_image_writer);
121 0 : self.generated_layers
122 0 : .push(SplitWriterResult::Discarded(layer_key));
123 0 : } else {
124 42 : self.generated_layers.push(SplitWriterResult::Produced(
125 42 : prev_image_writer
126 42 : .finish_with_end_key(tline, key, ctx)
127 90 : .await?,
128 : ));
129 : }
130 25200 : }
131 25620 : self.inner.put_image(key, img, ctx).await
132 25242 : }
133 :
134 : #[cfg(test)]
135 18 : pub async fn put_image(
136 18 : &mut self,
137 18 : key: Key,
138 18 : img: Bytes,
139 18 : tline: &Arc<Timeline>,
140 18 : ctx: &RequestContext,
141 18 : ) -> anyhow::Result<()> {
142 18 : self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false })
143 33 : .await
144 18 : }
145 :
146 84 : pub(crate) async fn finish_with_discard_fn<D, F>(
147 84 : self,
148 84 : tline: &Arc<Timeline>,
149 84 : ctx: &RequestContext,
150 84 : end_key: Key,
151 84 : discard: D,
152 84 : ) -> anyhow::Result<Vec<SplitWriterResult>>
153 84 : where
154 84 : D: FnOnce(&PersistentLayerKey) -> F,
155 84 : F: Future<Output = bool>,
156 84 : {
157 84 : let Self {
158 84 : mut generated_layers,
159 84 : inner,
160 84 : ..
161 84 : } = self;
162 84 : if inner.num_keys() == 0 {
163 0 : return Ok(generated_layers);
164 84 : }
165 84 : let layer_key = PersistentLayerKey {
166 84 : key_range: self.start_key..end_key,
167 84 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
168 84 : is_delta: false,
169 84 : };
170 84 : if discard(&layer_key).await {
171 24 : generated_layers.push(SplitWriterResult::Discarded(layer_key));
172 24 : } else {
173 60 : generated_layers.push(SplitWriterResult::Produced(
174 120 : inner.finish_with_end_key(tline, end_key, ctx).await?,
175 : ));
176 : }
177 84 : Ok(generated_layers)
178 84 : }
179 :
180 : #[cfg(test)]
181 24 : pub(crate) async fn finish(
182 24 : self,
183 24 : tline: &Arc<Timeline>,
184 24 : ctx: &RequestContext,
185 24 : end_key: Key,
186 24 : ) -> anyhow::Result<Vec<SplitWriterResult>> {
187 24 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
188 48 : .await
189 24 : }
190 :
191 : /// When split writer fails, the caller should call this function and handle partially generated layers.
192 12 : pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
193 12 : Ok((self.generated_layers, self.inner))
194 12 : }
195 : }
196 :
197 : /// A delta writer that takes key-lsn-values and produces multiple delta layers.
198 : ///
199 : /// The interface does not guarantee atomicity (i.e., if the delta layer generation fails,
200 : /// there might be leftover files to be cleaned up).
201 : ///
202 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
203 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
204 : /// will split them into multiple files based on size.
205 : #[must_use]
206 : pub struct SplitDeltaLayerWriter {
207 : inner: DeltaLayerWriter,
208 : target_layer_size: u64,
209 : generated_layers: Vec<SplitWriterResult>,
210 : conf: &'static PageServerConf,
211 : timeline_id: TimelineId,
212 : tenant_shard_id: TenantShardId,
213 : lsn_range: Range<Lsn>,
214 : last_key_written: Key,
215 : start_key: Key,
216 : }
217 :
218 : impl SplitDeltaLayerWriter {
219 108 : pub async fn new(
220 108 : conf: &'static PageServerConf,
221 108 : timeline_id: TimelineId,
222 108 : tenant_shard_id: TenantShardId,
223 108 : start_key: Key,
224 108 : lsn_range: Range<Lsn>,
225 108 : target_layer_size: u64,
226 108 : ctx: &RequestContext,
227 108 : ) -> anyhow::Result<Self> {
228 108 : Ok(Self {
229 108 : target_layer_size,
230 108 : inner: DeltaLayerWriter::new(
231 108 : conf,
232 108 : timeline_id,
233 108 : tenant_shard_id,
234 108 : start_key,
235 108 : lsn_range.clone(),
236 108 : ctx,
237 108 : )
238 54 : .await?,
239 108 : generated_layers: Vec::new(),
240 108 : conf,
241 108 : timeline_id,
242 108 : tenant_shard_id,
243 108 : lsn_range,
244 108 : last_key_written: Key::MIN,
245 108 : start_key,
246 : })
247 108 : }
248 :
249 : /// Put value into the layer writer. In the case the writer decides to produce a layer, and the discard fn returns true, no layer will be written in the end.
250 36402 : pub async fn put_value_with_discard_fn<D, F>(
251 36402 : &mut self,
252 36402 : key: Key,
253 36402 : lsn: Lsn,
254 36402 : val: Value,
255 36402 : tline: &Arc<Timeline>,
256 36402 : ctx: &RequestContext,
257 36402 : discard: D,
258 36402 : ) -> anyhow::Result<()>
259 36402 : where
260 36402 : D: FnOnce(&PersistentLayerKey) -> F,
261 36402 : F: Future<Output = bool>,
262 36402 : {
263 36402 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
264 36402 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
265 36402 : //
266 36402 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
267 36402 : // strategy. https://github.com/neondatabase/neon/issues/8837
268 36402 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
269 36402 : if self.inner.num_keys() >= 1
270 36306 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
271 : {
272 8988 : if key != self.last_key_written {
273 42 : let next_delta_writer = DeltaLayerWriter::new(
274 42 : self.conf,
275 42 : self.timeline_id,
276 42 : self.tenant_shard_id,
277 42 : key,
278 42 : self.lsn_range.clone(),
279 42 : ctx,
280 42 : )
281 21 : .await?;
282 42 : let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
283 42 : let layer_key = PersistentLayerKey {
284 42 : key_range: self.start_key..key,
285 42 : lsn_range: self.lsn_range.clone(),
286 42 : is_delta: true,
287 42 : };
288 42 : self.start_key = key;
289 42 : if discard(&layer_key).await {
290 0 : drop(prev_delta_writer);
291 0 : self.generated_layers
292 0 : .push(SplitWriterResult::Discarded(layer_key));
293 0 : } else {
294 111 : let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
295 42 : let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
296 42 : self.generated_layers
297 42 : .push(SplitWriterResult::Produced(delta_layer));
298 : }
299 8946 : } else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT {
300 : // We have to produce a very large file b/c a key is updated too often.
301 0 : anyhow::bail!(
302 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
303 0 : key,
304 0 : self.inner.estimated_size()
305 0 : );
306 8946 : }
307 27414 : }
308 36402 : self.last_key_written = key;
309 36402 : self.inner.put_value(key, lsn, val, ctx).await
310 36402 : }
311 :
312 12018 : pub async fn put_value(
313 12018 : &mut self,
314 12018 : key: Key,
315 12018 : lsn: Lsn,
316 12018 : val: Value,
317 12018 : tline: &Arc<Timeline>,
318 12018 : ctx: &RequestContext,
319 12018 : ) -> anyhow::Result<()> {
320 12018 : self.put_value_with_discard_fn(key, lsn, val, tline, ctx, |_| async { false })
321 777 : .await
322 12018 : }
323 :
324 96 : pub(crate) async fn finish_with_discard_fn<D, F>(
325 96 : self,
326 96 : tline: &Arc<Timeline>,
327 96 : ctx: &RequestContext,
328 96 : end_key: Key,
329 96 : discard: D,
330 96 : ) -> anyhow::Result<Vec<SplitWriterResult>>
331 96 : where
332 96 : D: FnOnce(&PersistentLayerKey) -> F,
333 96 : F: Future<Output = bool>,
334 96 : {
335 96 : let Self {
336 96 : mut generated_layers,
337 96 : inner,
338 96 : ..
339 96 : } = self;
340 96 : if inner.num_keys() == 0 {
341 12 : return Ok(generated_layers);
342 84 : }
343 84 : let layer_key = PersistentLayerKey {
344 84 : key_range: self.start_key..end_key,
345 84 : lsn_range: self.lsn_range.clone(),
346 84 : is_delta: true,
347 84 : };
348 84 : if discard(&layer_key).await {
349 24 : generated_layers.push(SplitWriterResult::Discarded(layer_key));
350 24 : } else {
351 162 : let (desc, path) = inner.finish(end_key, ctx).await?;
352 60 : let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
353 60 : generated_layers.push(SplitWriterResult::Produced(delta_layer));
354 : }
355 84 : Ok(generated_layers)
356 96 : }
357 :
358 : #[cfg(test)]
359 30 : pub(crate) async fn finish(
360 30 : self,
361 30 : tline: &Arc<Timeline>,
362 30 : ctx: &RequestContext,
363 30 : end_key: Key,
364 30 : ) -> anyhow::Result<Vec<SplitWriterResult>> {
365 30 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
366 84 : .await
367 30 : }
368 :
369 : /// When split writer fails, the caller should call this function and handle partially generated layers.
370 12 : pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, DeltaLayerWriter)> {
371 12 : Ok((self.generated_layers, self.inner))
372 12 : }
373 : }
374 :
375 : #[cfg(test)]
376 : mod tests {
377 : use itertools::Itertools;
378 : use rand::{RngCore, SeedableRng};
379 :
380 : use crate::{
381 : tenant::{
382 : harness::{TenantHarness, TIMELINE_ID},
383 : storage_layer::AsLayerDesc,
384 : },
385 : DEFAULT_PG_VERSION,
386 : };
387 :
388 : use super::*;
389 :
390 60144 : fn get_key(id: u32) -> Key {
391 60144 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
392 60144 : key.field6 = id;
393 60144 : key
394 60144 : }
395 :
396 24 : fn get_img(id: u32) -> Bytes {
397 24 : format!("{id:064}").into()
398 24 : }
399 :
400 60012 : fn get_large_img() -> Bytes {
401 60012 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
402 60012 : let mut data = vec![0; 8192];
403 60012 : rng.fill_bytes(&mut data);
404 60012 : data.into()
405 60012 : }
406 :
407 : #[tokio::test]
408 6 : async fn write_one_image() {
409 6 : let harness = TenantHarness::create("split_writer_write_one_image")
410 6 : .await
411 6 : .unwrap();
412 24 : let (tenant, ctx) = harness.load().await;
413 6 :
414 6 : let tline = tenant
415 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
416 12 : .await
417 6 : .unwrap();
418 6 :
419 6 : let mut image_writer = SplitImageLayerWriter::new(
420 6 : tenant.conf,
421 6 : tline.timeline_id,
422 6 : tenant.tenant_shard_id,
423 6 : get_key(0),
424 6 : Lsn(0x18),
425 6 : 4 * 1024 * 1024,
426 6 : &ctx,
427 6 : )
428 6 : .await
429 6 : .unwrap();
430 6 :
431 6 : let mut delta_writer = SplitDeltaLayerWriter::new(
432 6 : tenant.conf,
433 6 : tline.timeline_id,
434 6 : tenant.tenant_shard_id,
435 6 : get_key(0),
436 6 : Lsn(0x18)..Lsn(0x20),
437 6 : 4 * 1024 * 1024,
438 6 : &ctx,
439 6 : )
440 6 : .await
441 6 : .unwrap();
442 6 :
443 6 : image_writer
444 6 : .put_image(get_key(0), get_img(0), &tline, &ctx)
445 6 : .await
446 6 : .unwrap();
447 6 : let layers = image_writer
448 6 : .finish(&tline, &ctx, get_key(10))
449 12 : .await
450 6 : .unwrap();
451 6 : assert_eq!(layers.len(), 1);
452 6 :
453 6 : delta_writer
454 6 : .put_value(
455 6 : get_key(0),
456 6 : Lsn(0x18),
457 6 : Value::Image(get_img(0)),
458 6 : &tline,
459 6 : &ctx,
460 6 : )
461 6 : .await
462 6 : .unwrap();
463 6 : let layers = delta_writer
464 6 : .finish(&tline, &ctx, get_key(10))
465 15 : .await
466 6 : .unwrap();
467 6 : assert_eq!(layers.len(), 1);
468 6 : }
469 :
470 : #[tokio::test]
471 6 : async fn write_split() {
472 13110 : write_split_helper("split_writer_write_split", false).await;
473 6 : }
474 :
475 : #[tokio::test]
476 6 : async fn write_split_discard() {
477 13110 : write_split_helper("split_writer_write_split_discard", false).await;
478 6 : }
479 :
480 12 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
481 12 : let harness = TenantHarness::create(harness_name).await.unwrap();
482 48 : let (tenant, ctx) = harness.load().await;
483 :
484 12 : let tline = tenant
485 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
486 24 : .await
487 12 : .unwrap();
488 :
489 12 : let mut image_writer = SplitImageLayerWriter::new(
490 12 : tenant.conf,
491 12 : tline.timeline_id,
492 12 : tenant.tenant_shard_id,
493 12 : get_key(0),
494 12 : Lsn(0x18),
495 12 : 4 * 1024 * 1024,
496 12 : &ctx,
497 12 : )
498 6 : .await
499 12 : .unwrap();
500 12 : let mut delta_writer = SplitDeltaLayerWriter::new(
501 12 : tenant.conf,
502 12 : tline.timeline_id,
503 12 : tenant.tenant_shard_id,
504 12 : get_key(0),
505 12 : Lsn(0x18)..Lsn(0x20),
506 12 : 4 * 1024 * 1024,
507 12 : &ctx,
508 12 : )
509 6 : .await
510 12 : .unwrap();
511 : const N: usize = 2000;
512 24012 : for i in 0..N {
513 24000 : let i = i as u32;
514 24000 : image_writer
515 24000 : .put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async {
516 36 : discard
517 24000 : })
518 24456 : .await
519 24000 : .unwrap();
520 24000 : delta_writer
521 24000 : .put_value_with_discard_fn(
522 24000 : get_key(i),
523 24000 : Lsn(0x20),
524 24000 : Value::Image(get_large_img()),
525 24000 : &tline,
526 24000 : &ctx,
527 24000 : |_| async { discard },
528 24000 : )
529 1626 : .await
530 24000 : .unwrap();
531 : }
532 12 : let image_layers = image_writer
533 12 : .finish(&tline, &ctx, get_key(N as u32))
534 24 : .await
535 12 : .unwrap();
536 12 : let delta_layers = delta_writer
537 12 : .finish(&tline, &ctx, get_key(N as u32))
538 30 : .await
539 12 : .unwrap();
540 12 : if discard {
541 0 : for layer in image_layers {
542 0 : layer.into_discarded_layer();
543 0 : }
544 0 : for layer in delta_layers {
545 0 : layer.into_discarded_layer();
546 0 : }
547 : } else {
548 12 : let image_layers = image_layers
549 12 : .into_iter()
550 48 : .map(|x| x.into_resident_layer())
551 12 : .collect_vec();
552 12 : let delta_layers = delta_layers
553 12 : .into_iter()
554 48 : .map(|x| x.into_resident_layer())
555 12 : .collect_vec();
556 12 : assert_eq!(image_layers.len(), N / 512 + 1);
557 12 : assert_eq!(delta_layers.len(), N / 512 + 1);
558 48 : for idx in 0..image_layers.len() {
559 48 : assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
560 48 : assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
561 48 : assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
562 48 : assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
563 48 : if idx > 0 {
564 36 : assert_eq!(
565 36 : image_layers[idx - 1].layer_desc().key_range.end,
566 36 : image_layers[idx].layer_desc().key_range.start
567 36 : );
568 36 : assert_eq!(
569 36 : delta_layers[idx - 1].layer_desc().key_range.end,
570 36 : delta_layers[idx].layer_desc().key_range.start
571 36 : );
572 12 : }
573 : }
574 : }
575 12 : }
576 :
577 : #[tokio::test]
578 6 : async fn write_large_img() {
579 6 : let harness = TenantHarness::create("split_writer_write_large_img")
580 6 : .await
581 6 : .unwrap();
582 24 : let (tenant, ctx) = harness.load().await;
583 6 :
584 6 : let tline = tenant
585 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
586 12 : .await
587 6 : .unwrap();
588 6 :
589 6 : let mut image_writer = SplitImageLayerWriter::new(
590 6 : tenant.conf,
591 6 : tline.timeline_id,
592 6 : tenant.tenant_shard_id,
593 6 : get_key(0),
594 6 : Lsn(0x18),
595 6 : 4 * 1024,
596 6 : &ctx,
597 6 : )
598 6 : .await
599 6 : .unwrap();
600 6 :
601 6 : let mut delta_writer = SplitDeltaLayerWriter::new(
602 6 : tenant.conf,
603 6 : tline.timeline_id,
604 6 : tenant.tenant_shard_id,
605 6 : get_key(0),
606 6 : Lsn(0x18)..Lsn(0x20),
607 6 : 4 * 1024,
608 6 : &ctx,
609 6 : )
610 6 : .await
611 6 : .unwrap();
612 6 :
613 6 : image_writer
614 6 : .put_image(get_key(0), get_img(0), &tline, &ctx)
615 6 : .await
616 6 : .unwrap();
617 6 : image_writer
618 6 : .put_image(get_key(1), get_large_img(), &tline, &ctx)
619 21 : .await
620 6 : .unwrap();
621 6 : let layers = image_writer
622 6 : .finish(&tline, &ctx, get_key(10))
623 12 : .await
624 6 : .unwrap();
625 6 : assert_eq!(layers.len(), 2);
626 6 :
627 6 : delta_writer
628 6 : .put_value(
629 6 : get_key(0),
630 6 : Lsn(0x18),
631 6 : Value::Image(get_img(0)),
632 6 : &tline,
633 6 : &ctx,
634 6 : )
635 6 : .await
636 6 : .unwrap();
637 6 : delta_writer
638 6 : .put_value(
639 6 : get_key(1),
640 6 : Lsn(0x1A),
641 6 : Value::Image(get_large_img()),
642 6 : &tline,
643 6 : &ctx,
644 6 : )
645 18 : .await
646 6 : .unwrap();
647 6 : let layers = delta_writer
648 6 : .finish(&tline, &ctx, get_key(10))
649 15 : .await
650 6 : .unwrap();
651 6 : assert_eq!(layers.len(), 2);
652 6 : }
653 :
654 : #[tokio::test]
655 6 : async fn write_split_single_key() {
656 6 : let harness = TenantHarness::create("split_writer_write_split_single_key")
657 6 : .await
658 6 : .unwrap();
659 24 : let (tenant, ctx) = harness.load().await;
660 6 :
661 6 : let tline = tenant
662 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
663 12 : .await
664 6 : .unwrap();
665 6 :
666 6 : const N: usize = 2000;
667 6 : let mut delta_writer = SplitDeltaLayerWriter::new(
668 6 : tenant.conf,
669 6 : tline.timeline_id,
670 6 : tenant.tenant_shard_id,
671 6 : get_key(0),
672 6 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
673 6 : 4 * 1024 * 1024,
674 6 : &ctx,
675 6 : )
676 6 : .await
677 6 : .unwrap();
678 6 :
679 12006 : for i in 0..N {
680 12000 : let i = i as u32;
681 12000 : delta_writer
682 12000 : .put_value(
683 12000 : get_key(0),
684 12000 : Lsn(i as u64 * 16 + 0x10),
685 12000 : Value::Image(get_large_img()),
686 12000 : &tline,
687 12000 : &ctx,
688 12000 : )
689 759 : .await
690 12000 : .unwrap();
691 6 : }
692 6 : let delta_layers = delta_writer
693 6 : .finish(&tline, &ctx, get_key(N as u32))
694 24 : .await
695 6 : .unwrap();
696 6 : assert_eq!(delta_layers.len(), 1);
697 6 : }
698 : }
|