Line data Source code
1 : use std::{future::Future, ops::Range, sync::Arc};
2 :
3 : use bytes::Bytes;
4 : use pageserver_api::key::{Key, KEY_SIZE};
5 : use utils::{id::TimelineId, lsn::Lsn, shard::TenantShardId};
6 :
7 : use crate::tenant::storage_layer::Layer;
8 : use crate::{config::PageServerConf, context::RequestContext, repository::Value, tenant::Timeline};
9 :
10 : use super::layer::S3_UPLOAD_LIMIT;
11 : use super::{
12 : DeltaLayerWriter, ImageLayerWriter, PersistentLayerDesc, PersistentLayerKey, ResidentLayer,
13 : };
14 :
15 : pub(crate) enum SplitWriterResult {
16 : Produced(ResidentLayer),
17 : Discarded(PersistentLayerKey),
18 : }
19 :
20 : #[cfg(test)]
21 : impl SplitWriterResult {
22 96 : fn into_resident_layer(self) -> ResidentLayer {
23 96 : match self {
24 96 : SplitWriterResult::Produced(layer) => layer,
25 0 : SplitWriterResult::Discarded(_) => panic!("unexpected discarded layer"),
26 : }
27 96 : }
28 :
29 0 : fn into_discarded_layer(self) -> PersistentLayerKey {
30 0 : match self {
31 0 : SplitWriterResult::Produced(_) => panic!("unexpected produced layer"),
32 0 : SplitWriterResult::Discarded(layer) => layer,
33 0 : }
34 0 : }
35 : }
36 :
37 : /// An image writer that takes images and produces multiple image layers. The interface does not
38 : /// guarantee atomicity (i.e., if the image layer generation fails, there might be leftover files
39 : /// to be cleaned up)
40 : #[must_use]
41 : pub struct SplitImageLayerWriter {
42 : inner: ImageLayerWriter,
43 : target_layer_size: u64,
44 : generated_layers: Vec<SplitWriterResult>,
45 : conf: &'static PageServerConf,
46 : timeline_id: TimelineId,
47 : tenant_shard_id: TenantShardId,
48 : lsn: Lsn,
49 : start_key: Key,
50 : }
51 :
52 : impl SplitImageLayerWriter {
53 96 : pub async fn new(
54 96 : conf: &'static PageServerConf,
55 96 : timeline_id: TimelineId,
56 96 : tenant_shard_id: TenantShardId,
57 96 : start_key: Key,
58 96 : lsn: Lsn,
59 96 : target_layer_size: u64,
60 96 : ctx: &RequestContext,
61 96 : ) -> anyhow::Result<Self> {
62 96 : Ok(Self {
63 96 : target_layer_size,
64 96 : inner: ImageLayerWriter::new(
65 96 : conf,
66 96 : timeline_id,
67 96 : tenant_shard_id,
68 96 : &(start_key..Key::MAX),
69 96 : lsn,
70 96 : ctx,
71 96 : )
72 48 : .await?,
73 96 : generated_layers: Vec::new(),
74 96 : conf,
75 96 : timeline_id,
76 96 : tenant_shard_id,
77 96 : lsn,
78 96 : start_key,
79 : })
80 96 : }
81 :
82 25242 : pub async fn put_image_with_discard_fn<D, F>(
83 25242 : &mut self,
84 25242 : key: Key,
85 25242 : img: Bytes,
86 25242 : tline: &Arc<Timeline>,
87 25242 : ctx: &RequestContext,
88 25242 : discard: D,
89 25242 : ) -> anyhow::Result<()>
90 25242 : where
91 25242 : D: FnOnce(&PersistentLayerKey) -> F,
92 25242 : F: Future<Output = bool>,
93 25242 : {
94 25242 : // The current estimation is an upper bound of the space that the key/image could take
95 25242 : // because we did not consider compression in this estimation. The resulting image layer
96 25242 : // could be smaller than the target size.
97 25242 : let addition_size_estimation = KEY_SIZE as u64 + img.len() as u64;
98 25242 : if self.inner.num_keys() >= 1
99 25146 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
100 : {
101 42 : let next_image_writer = ImageLayerWriter::new(
102 42 : self.conf,
103 42 : self.timeline_id,
104 42 : self.tenant_shard_id,
105 42 : &(key..Key::MAX),
106 42 : self.lsn,
107 42 : ctx,
108 42 : )
109 21 : .await?;
110 42 : let prev_image_writer = std::mem::replace(&mut self.inner, next_image_writer);
111 42 : let layer_key = PersistentLayerKey {
112 42 : key_range: self.start_key..key,
113 42 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
114 42 : is_delta: false,
115 42 : };
116 42 : self.start_key = key;
117 42 :
118 42 : if discard(&layer_key).await {
119 0 : drop(prev_image_writer);
120 0 : self.generated_layers
121 0 : .push(SplitWriterResult::Discarded(layer_key));
122 0 : } else {
123 42 : self.generated_layers.push(SplitWriterResult::Produced(
124 42 : prev_image_writer
125 42 : .finish_with_end_key(tline, key, ctx)
126 90 : .await?,
127 : ));
128 : }
129 25200 : }
130 25620 : self.inner.put_image(key, img, ctx).await
131 25242 : }
132 :
133 : #[cfg(test)]
134 18 : pub async fn put_image(
135 18 : &mut self,
136 18 : key: Key,
137 18 : img: Bytes,
138 18 : tline: &Arc<Timeline>,
139 18 : ctx: &RequestContext,
140 18 : ) -> anyhow::Result<()> {
141 18 : self.put_image_with_discard_fn(key, img, tline, ctx, |_| async { false })
142 33 : .await
143 18 : }
144 :
145 84 : pub(crate) async fn finish_with_discard_fn<D, F>(
146 84 : self,
147 84 : tline: &Arc<Timeline>,
148 84 : ctx: &RequestContext,
149 84 : end_key: Key,
150 84 : discard: D,
151 84 : ) -> anyhow::Result<Vec<SplitWriterResult>>
152 84 : where
153 84 : D: FnOnce(&PersistentLayerKey) -> F,
154 84 : F: Future<Output = bool>,
155 84 : {
156 84 : let Self {
157 84 : mut generated_layers,
158 84 : inner,
159 84 : ..
160 84 : } = self;
161 84 : if inner.num_keys() == 0 {
162 0 : return Ok(generated_layers);
163 84 : }
164 84 : let layer_key = PersistentLayerKey {
165 84 : key_range: self.start_key..end_key,
166 84 : lsn_range: PersistentLayerDesc::image_layer_lsn_range(self.lsn),
167 84 : is_delta: false,
168 84 : };
169 84 : if discard(&layer_key).await {
170 24 : generated_layers.push(SplitWriterResult::Discarded(layer_key));
171 24 : } else {
172 60 : generated_layers.push(SplitWriterResult::Produced(
173 120 : inner.finish_with_end_key(tline, end_key, ctx).await?,
174 : ));
175 : }
176 84 : Ok(generated_layers)
177 84 : }
178 :
179 : #[cfg(test)]
180 24 : pub(crate) async fn finish(
181 24 : self,
182 24 : tline: &Arc<Timeline>,
183 24 : ctx: &RequestContext,
184 24 : end_key: Key,
185 24 : ) -> anyhow::Result<Vec<SplitWriterResult>> {
186 24 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
187 48 : .await
188 24 : }
189 :
190 : /// When split writer fails, the caller should call this function and handle partially generated layers.
191 12 : pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
192 12 : Ok((self.generated_layers, self.inner))
193 12 : }
194 : }
195 :
196 : /// A delta writer that takes key-lsn-values and produces multiple delta layers. The interface does not
197 : /// guarantee atomicity (i.e., if the delta layer generation fails, there might be leftover files
198 : /// to be cleaned up).
199 : ///
200 : /// Note that if updates of a single key exceed the target size limit, all of the updates will be batched
201 : /// into a single file. This behavior might change in the future. For reference, the legacy compaction algorithm
202 : /// will split them into multiple files based on size.
203 : #[must_use]
204 : pub struct SplitDeltaLayerWriter {
205 : inner: DeltaLayerWriter,
206 : target_layer_size: u64,
207 : generated_layers: Vec<SplitWriterResult>,
208 : conf: &'static PageServerConf,
209 : timeline_id: TimelineId,
210 : tenant_shard_id: TenantShardId,
211 : lsn_range: Range<Lsn>,
212 : last_key_written: Key,
213 : start_key: Key,
214 : }
215 :
216 : impl SplitDeltaLayerWriter {
217 108 : pub async fn new(
218 108 : conf: &'static PageServerConf,
219 108 : timeline_id: TimelineId,
220 108 : tenant_shard_id: TenantShardId,
221 108 : start_key: Key,
222 108 : lsn_range: Range<Lsn>,
223 108 : target_layer_size: u64,
224 108 : ctx: &RequestContext,
225 108 : ) -> anyhow::Result<Self> {
226 108 : Ok(Self {
227 108 : target_layer_size,
228 108 : inner: DeltaLayerWriter::new(
229 108 : conf,
230 108 : timeline_id,
231 108 : tenant_shard_id,
232 108 : start_key,
233 108 : lsn_range.clone(),
234 108 : ctx,
235 108 : )
236 54 : .await?,
237 108 : generated_layers: Vec::new(),
238 108 : conf,
239 108 : timeline_id,
240 108 : tenant_shard_id,
241 108 : lsn_range,
242 108 : last_key_written: Key::MIN,
243 108 : start_key,
244 : })
245 108 : }
246 :
247 : /// Put value into the layer writer. In the case the writer decides to produce a layer, and the discard fn returns true, no layer will be written in the end.
248 36402 : pub async fn put_value_with_discard_fn<D, F>(
249 36402 : &mut self,
250 36402 : key: Key,
251 36402 : lsn: Lsn,
252 36402 : val: Value,
253 36402 : tline: &Arc<Timeline>,
254 36402 : ctx: &RequestContext,
255 36402 : discard: D,
256 36402 : ) -> anyhow::Result<()>
257 36402 : where
258 36402 : D: FnOnce(&PersistentLayerKey) -> F,
259 36402 : F: Future<Output = bool>,
260 36402 : {
261 36402 : // The current estimation is key size plus LSN size plus value size estimation. This is not an accurate
262 36402 : // number, and therefore the final layer size could be a little bit larger or smaller than the target.
263 36402 : //
264 36402 : // Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
265 36402 : // strategy. https://github.com/neondatabase/neon/issues/8837
266 36402 : let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
267 36402 : if self.inner.num_keys() >= 1
268 36306 : && self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
269 : {
270 8988 : if key != self.last_key_written {
271 42 : let next_delta_writer = DeltaLayerWriter::new(
272 42 : self.conf,
273 42 : self.timeline_id,
274 42 : self.tenant_shard_id,
275 42 : key,
276 42 : self.lsn_range.clone(),
277 42 : ctx,
278 42 : )
279 21 : .await?;
280 42 : let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
281 42 : let layer_key = PersistentLayerKey {
282 42 : key_range: self.start_key..key,
283 42 : lsn_range: self.lsn_range.clone(),
284 42 : is_delta: true,
285 42 : };
286 42 : self.start_key = key;
287 42 : if discard(&layer_key).await {
288 0 : drop(prev_delta_writer);
289 0 : self.generated_layers
290 0 : .push(SplitWriterResult::Discarded(layer_key));
291 0 : } else {
292 111 : let (desc, path) = prev_delta_writer.finish(key, ctx).await?;
293 42 : let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
294 42 : self.generated_layers
295 42 : .push(SplitWriterResult::Produced(delta_layer));
296 : }
297 8946 : } else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT {
298 : // We have to produce a very large file b/c a key is updated too often.
299 0 : anyhow::bail!(
300 0 : "a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
301 0 : key,
302 0 : self.inner.estimated_size()
303 0 : );
304 8946 : }
305 27414 : }
306 36402 : self.last_key_written = key;
307 36402 : self.inner.put_value(key, lsn, val, ctx).await
308 36402 : }
309 :
310 12018 : pub async fn put_value(
311 12018 : &mut self,
312 12018 : key: Key,
313 12018 : lsn: Lsn,
314 12018 : val: Value,
315 12018 : tline: &Arc<Timeline>,
316 12018 : ctx: &RequestContext,
317 12018 : ) -> anyhow::Result<()> {
318 12018 : self.put_value_with_discard_fn(key, lsn, val, tline, ctx, |_| async { false })
319 777 : .await
320 12018 : }
321 :
322 96 : pub(crate) async fn finish_with_discard_fn<D, F>(
323 96 : self,
324 96 : tline: &Arc<Timeline>,
325 96 : ctx: &RequestContext,
326 96 : end_key: Key,
327 96 : discard: D,
328 96 : ) -> anyhow::Result<Vec<SplitWriterResult>>
329 96 : where
330 96 : D: FnOnce(&PersistentLayerKey) -> F,
331 96 : F: Future<Output = bool>,
332 96 : {
333 96 : let Self {
334 96 : mut generated_layers,
335 96 : inner,
336 96 : ..
337 96 : } = self;
338 96 : if inner.num_keys() == 0 {
339 12 : return Ok(generated_layers);
340 84 : }
341 84 : let layer_key = PersistentLayerKey {
342 84 : key_range: self.start_key..end_key,
343 84 : lsn_range: self.lsn_range.clone(),
344 84 : is_delta: true,
345 84 : };
346 84 : if discard(&layer_key).await {
347 24 : generated_layers.push(SplitWriterResult::Discarded(layer_key));
348 24 : } else {
349 162 : let (desc, path) = inner.finish(end_key, ctx).await?;
350 60 : let delta_layer = Layer::finish_creating(self.conf, tline, desc, &path)?;
351 60 : generated_layers.push(SplitWriterResult::Produced(delta_layer));
352 : }
353 84 : Ok(generated_layers)
354 96 : }
355 :
356 : #[allow(dead_code)]
357 30 : pub(crate) async fn finish(
358 30 : self,
359 30 : tline: &Arc<Timeline>,
360 30 : ctx: &RequestContext,
361 30 : end_key: Key,
362 30 : ) -> anyhow::Result<Vec<SplitWriterResult>> {
363 30 : self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
364 84 : .await
365 30 : }
366 :
367 : /// When split writer fails, the caller should call this function and handle partially generated layers.
368 12 : pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, DeltaLayerWriter)> {
369 12 : Ok((self.generated_layers, self.inner))
370 12 : }
371 : }
372 :
373 : #[cfg(test)]
374 : mod tests {
375 : use itertools::Itertools;
376 : use rand::{RngCore, SeedableRng};
377 :
378 : use crate::{
379 : tenant::{
380 : harness::{TenantHarness, TIMELINE_ID},
381 : storage_layer::AsLayerDesc,
382 : },
383 : DEFAULT_PG_VERSION,
384 : };
385 :
386 : use super::*;
387 :
388 60144 : fn get_key(id: u32) -> Key {
389 60144 : let mut key = Key::from_hex("000000000033333333444444445500000000").unwrap();
390 60144 : key.field6 = id;
391 60144 : key
392 60144 : }
393 :
394 24 : fn get_img(id: u32) -> Bytes {
395 24 : format!("{id:064}").into()
396 24 : }
397 :
398 60012 : fn get_large_img() -> Bytes {
399 60012 : let mut rng = rand::rngs::SmallRng::seed_from_u64(42);
400 60012 : let mut data = vec![0; 8192];
401 60012 : rng.fill_bytes(&mut data);
402 60012 : data.into()
403 60012 : }
404 :
405 : #[tokio::test]
406 6 : async fn write_one_image() {
407 6 : let harness = TenantHarness::create("split_writer_write_one_image")
408 6 : .await
409 6 : .unwrap();
410 24 : let (tenant, ctx) = harness.load().await;
411 6 :
412 6 : let tline = tenant
413 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
414 12 : .await
415 6 : .unwrap();
416 6 :
417 6 : let mut image_writer = SplitImageLayerWriter::new(
418 6 : tenant.conf,
419 6 : tline.timeline_id,
420 6 : tenant.tenant_shard_id,
421 6 : get_key(0),
422 6 : Lsn(0x18),
423 6 : 4 * 1024 * 1024,
424 6 : &ctx,
425 6 : )
426 6 : .await
427 6 : .unwrap();
428 6 :
429 6 : let mut delta_writer = SplitDeltaLayerWriter::new(
430 6 : tenant.conf,
431 6 : tline.timeline_id,
432 6 : tenant.tenant_shard_id,
433 6 : get_key(0),
434 6 : Lsn(0x18)..Lsn(0x20),
435 6 : 4 * 1024 * 1024,
436 6 : &ctx,
437 6 : )
438 6 : .await
439 6 : .unwrap();
440 6 :
441 6 : image_writer
442 6 : .put_image(get_key(0), get_img(0), &tline, &ctx)
443 6 : .await
444 6 : .unwrap();
445 6 : let layers = image_writer
446 6 : .finish(&tline, &ctx, get_key(10))
447 12 : .await
448 6 : .unwrap();
449 6 : assert_eq!(layers.len(), 1);
450 6 :
451 6 : delta_writer
452 6 : .put_value(
453 6 : get_key(0),
454 6 : Lsn(0x18),
455 6 : Value::Image(get_img(0)),
456 6 : &tline,
457 6 : &ctx,
458 6 : )
459 6 : .await
460 6 : .unwrap();
461 6 : let layers = delta_writer
462 6 : .finish(&tline, &ctx, get_key(10))
463 15 : .await
464 6 : .unwrap();
465 6 : assert_eq!(layers.len(), 1);
466 6 : }
467 :
468 : #[tokio::test]
469 6 : async fn write_split() {
470 13110 : write_split_helper("split_writer_write_split", false).await;
471 6 : }
472 :
473 : #[tokio::test]
474 6 : async fn write_split_discard() {
475 13110 : write_split_helper("split_writer_write_split_discard", false).await;
476 6 : }
477 :
478 12 : async fn write_split_helper(harness_name: &'static str, discard: bool) {
479 12 : let harness = TenantHarness::create(harness_name).await.unwrap();
480 48 : let (tenant, ctx) = harness.load().await;
481 :
482 12 : let tline = tenant
483 12 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
484 24 : .await
485 12 : .unwrap();
486 :
487 12 : let mut image_writer = SplitImageLayerWriter::new(
488 12 : tenant.conf,
489 12 : tline.timeline_id,
490 12 : tenant.tenant_shard_id,
491 12 : get_key(0),
492 12 : Lsn(0x18),
493 12 : 4 * 1024 * 1024,
494 12 : &ctx,
495 12 : )
496 6 : .await
497 12 : .unwrap();
498 12 : let mut delta_writer = SplitDeltaLayerWriter::new(
499 12 : tenant.conf,
500 12 : tline.timeline_id,
501 12 : tenant.tenant_shard_id,
502 12 : get_key(0),
503 12 : Lsn(0x18)..Lsn(0x20),
504 12 : 4 * 1024 * 1024,
505 12 : &ctx,
506 12 : )
507 6 : .await
508 12 : .unwrap();
509 : const N: usize = 2000;
510 24012 : for i in 0..N {
511 24000 : let i = i as u32;
512 24000 : image_writer
513 24000 : .put_image_with_discard_fn(get_key(i), get_large_img(), &tline, &ctx, |_| async {
514 36 : discard
515 24000 : })
516 24456 : .await
517 24000 : .unwrap();
518 24000 : delta_writer
519 24000 : .put_value_with_discard_fn(
520 24000 : get_key(i),
521 24000 : Lsn(0x20),
522 24000 : Value::Image(get_large_img()),
523 24000 : &tline,
524 24000 : &ctx,
525 24000 : |_| async { discard },
526 24000 : )
527 1626 : .await
528 24000 : .unwrap();
529 : }
530 12 : let image_layers = image_writer
531 12 : .finish(&tline, &ctx, get_key(N as u32))
532 24 : .await
533 12 : .unwrap();
534 12 : let delta_layers = delta_writer
535 12 : .finish(&tline, &ctx, get_key(N as u32))
536 30 : .await
537 12 : .unwrap();
538 12 : if discard {
539 0 : for layer in image_layers {
540 0 : layer.into_discarded_layer();
541 0 : }
542 0 : for layer in delta_layers {
543 0 : layer.into_discarded_layer();
544 0 : }
545 : } else {
546 12 : let image_layers = image_layers
547 12 : .into_iter()
548 48 : .map(|x| x.into_resident_layer())
549 12 : .collect_vec();
550 12 : let delta_layers = delta_layers
551 12 : .into_iter()
552 48 : .map(|x| x.into_resident_layer())
553 12 : .collect_vec();
554 12 : assert_eq!(image_layers.len(), N / 512 + 1);
555 12 : assert_eq!(delta_layers.len(), N / 512 + 1);
556 48 : for idx in 0..image_layers.len() {
557 48 : assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
558 48 : assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
559 48 : assert_ne!(delta_layers[idx].layer_desc().key_range.start, Key::MIN);
560 48 : assert_ne!(delta_layers[idx].layer_desc().key_range.end, Key::MAX);
561 48 : if idx > 0 {
562 36 : assert_eq!(
563 36 : image_layers[idx - 1].layer_desc().key_range.end,
564 36 : image_layers[idx].layer_desc().key_range.start
565 36 : );
566 36 : assert_eq!(
567 36 : delta_layers[idx - 1].layer_desc().key_range.end,
568 36 : delta_layers[idx].layer_desc().key_range.start
569 36 : );
570 12 : }
571 : }
572 : }
573 12 : }
574 :
575 : #[tokio::test]
576 6 : async fn write_large_img() {
577 6 : let harness = TenantHarness::create("split_writer_write_large_img")
578 6 : .await
579 6 : .unwrap();
580 24 : let (tenant, ctx) = harness.load().await;
581 6 :
582 6 : let tline = tenant
583 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
584 12 : .await
585 6 : .unwrap();
586 6 :
587 6 : let mut image_writer = SplitImageLayerWriter::new(
588 6 : tenant.conf,
589 6 : tline.timeline_id,
590 6 : tenant.tenant_shard_id,
591 6 : get_key(0),
592 6 : Lsn(0x18),
593 6 : 4 * 1024,
594 6 : &ctx,
595 6 : )
596 6 : .await
597 6 : .unwrap();
598 6 :
599 6 : let mut delta_writer = SplitDeltaLayerWriter::new(
600 6 : tenant.conf,
601 6 : tline.timeline_id,
602 6 : tenant.tenant_shard_id,
603 6 : get_key(0),
604 6 : Lsn(0x18)..Lsn(0x20),
605 6 : 4 * 1024,
606 6 : &ctx,
607 6 : )
608 6 : .await
609 6 : .unwrap();
610 6 :
611 6 : image_writer
612 6 : .put_image(get_key(0), get_img(0), &tline, &ctx)
613 6 : .await
614 6 : .unwrap();
615 6 : image_writer
616 6 : .put_image(get_key(1), get_large_img(), &tline, &ctx)
617 21 : .await
618 6 : .unwrap();
619 6 : let layers = image_writer
620 6 : .finish(&tline, &ctx, get_key(10))
621 12 : .await
622 6 : .unwrap();
623 6 : assert_eq!(layers.len(), 2);
624 6 :
625 6 : delta_writer
626 6 : .put_value(
627 6 : get_key(0),
628 6 : Lsn(0x18),
629 6 : Value::Image(get_img(0)),
630 6 : &tline,
631 6 : &ctx,
632 6 : )
633 6 : .await
634 6 : .unwrap();
635 6 : delta_writer
636 6 : .put_value(
637 6 : get_key(1),
638 6 : Lsn(0x1A),
639 6 : Value::Image(get_large_img()),
640 6 : &tline,
641 6 : &ctx,
642 6 : )
643 18 : .await
644 6 : .unwrap();
645 6 : let layers = delta_writer
646 6 : .finish(&tline, &ctx, get_key(10))
647 15 : .await
648 6 : .unwrap();
649 6 : assert_eq!(layers.len(), 2);
650 6 : }
651 :
652 : #[tokio::test]
653 6 : async fn write_split_single_key() {
654 6 : let harness = TenantHarness::create("split_writer_write_split_single_key")
655 6 : .await
656 6 : .unwrap();
657 24 : let (tenant, ctx) = harness.load().await;
658 6 :
659 6 : let tline = tenant
660 6 : .create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
661 12 : .await
662 6 : .unwrap();
663 6 :
664 6 : const N: usize = 2000;
665 6 : let mut delta_writer = SplitDeltaLayerWriter::new(
666 6 : tenant.conf,
667 6 : tline.timeline_id,
668 6 : tenant.tenant_shard_id,
669 6 : get_key(0),
670 6 : Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
671 6 : 4 * 1024 * 1024,
672 6 : &ctx,
673 6 : )
674 6 : .await
675 6 : .unwrap();
676 6 :
677 12006 : for i in 0..N {
678 12000 : let i = i as u32;
679 12000 : delta_writer
680 12000 : .put_value(
681 12000 : get_key(0),
682 12000 : Lsn(i as u64 * 16 + 0x10),
683 12000 : Value::Image(get_large_img()),
684 12000 : &tline,
685 12000 : &ctx,
686 12000 : )
687 759 : .await
688 12000 : .unwrap();
689 6 : }
690 6 : let delta_layers = delta_writer
691 6 : .finish(&tline, &ctx, get_key(N as u32))
692 24 : .await
693 6 : .unwrap();
694 6 : assert_eq!(delta_layers.len(), 1);
695 6 : }
696 : }
|