Line data Source code
1 : mod flush;
2 :
3 : use bytes::BufMut;
4 : pub(crate) use flush::FlushControl;
5 : use flush::FlushHandle;
6 : pub(crate) use flush::FlushTaskError;
7 : use flush::ShutdownRequest;
8 : use tokio_epoll_uring::IoBuf;
9 : use tokio_util::sync::CancellationToken;
10 : use tracing::trace;
11 :
12 : use super::io_buf_aligned::IoBufAligned;
13 : use super::io_buf_aligned::IoBufAlignedMut;
14 : use super::io_buf_ext::{FullSlice, IoBufExt};
15 : use crate::context::RequestContext;
16 : use crate::virtual_file::UsizeIsU64;
17 : use crate::virtual_file::{IoBuffer, IoBufferMut};
18 :
19 : pub(crate) trait CheapCloneForRead {
20 : /// Returns a cheap clone of the buffer.
21 : fn cheap_clone(&self) -> Self;
22 : }
23 :
24 : impl CheapCloneForRead for IoBuffer {
25 141648 : fn cheap_clone(&self) -> Self {
26 141648 : // Cheap clone over an `Arc`.
27 141648 : self.clone()
28 141648 : }
29 : }
30 :
31 : /// A trait for doing owned-buffer write IO.
32 : /// Think [`tokio::io::AsyncWrite`] but with owned buffers.
33 : /// The owned buffers need to be aligned due to Direct IO requirements.
34 : pub trait OwnedAsyncWriter {
35 : fn write_all_at<Buf: IoBufAligned + Send>(
36 : &self,
37 : buf: FullSlice<Buf>,
38 : offset: u64,
39 : ctx: &RequestContext,
40 : ) -> impl std::future::Future<Output = (FullSlice<Buf>, std::io::Result<()>)> + Send;
41 : fn set_len(
42 : &self,
43 : len: u64,
44 : ctx: &RequestContext,
45 : ) -> impl Future<Output = std::io::Result<()>> + Send;
46 : }
47 :
48 : /// A wrapper aorund an [`OwnedAsyncWriter`] that uses a [`Buffer`] to batch
49 : /// small writes into larger writes of size [`Buffer::cap`].
50 : ///
51 : /// The buffer is flushed if and only if it is full ([`Buffer::pending`] == [`Buffer::cap`]).
52 : /// This guarantees that writes to the filesystem happen
53 : /// - at offsets that are multiples of [`Buffer::cap`]
54 : /// - in lengths that are multiples of [`Buffer::cap`]
55 : ///
56 : /// Above property is useful for Direct IO, where whatever the
57 : /// effectively dominating disk-sector/filesystem-block/memory-page size
58 : /// determines the requirements on
59 : /// - the alignment of the pointer passed to the read/write operation
60 : /// - the value of `count` (i.e., the length of the read/write operation)
61 : /// which must be a multiple of the dominating sector/block/page size.
62 : ///
63 : /// See [`BufferedWriter::shutdown`] / [`BufferedWriterShutdownMode`] for different
64 : /// ways of dealing with the special case that the buffer is not full by the time
65 : /// we are done writing.
66 : ///
67 : /// The first flush to the underlying `W` happens at offset `start_offset` (arg of [`BufferedWriter::new`]).
68 : /// The next flush is to offset `start_offset + Buffer::cap`. The one after at `start_offset + 2 * Buffer::cap` and so on.
69 : ///
70 : /// TODO: decouple buffer capacity from alignment requirement.
71 : /// Right now we assume [`Buffer::cap`] is the alignment requirement,
72 : /// but actually [`Buffer::cap`] should only determine how often we flush
73 : /// while writing, while a separate alignment requirement argument should
74 : /// be passed to determine alignment requirement. This could be used by
75 : /// [`BufferedWriterShutdownMode::PadThenTruncate`] to avoid excessive
76 : /// padding of zeroes. For example, today, with a capacity of 64KiB, we
77 : /// would pad up to 64KiB-1 bytes of zeroes, then truncate off 64KiB-1.
78 : /// This is wasteful, e.g., if the alignment requirement is 4KiB, we only
79 : /// need to pad & truncate up to 4KiB-1 bytes of zeroes
80 : ///
81 : // TODO(yuchen): For large write, implementing buffer bypass for aligned parts of the write could be beneficial to throughput,
82 : // since we would avoid copying majority of the data into the internal buffer.
83 : // https://github.com/neondatabase/neon/issues/10101
84 : pub struct BufferedWriter<B: Buffer, W> {
85 : /// Clone of the buffer that was last submitted to the flush loop.
86 : /// `None` if no flush request has been submitted, Some forever after.
87 : pub(super) maybe_flushed: Option<FullSlice<B::IoBuf>>,
88 : /// New writes are accumulated here.
89 : /// `None` only during submission while we wait for flush loop to accept
90 : /// the full dirty buffer in exchange for a clean buffer.
91 : /// If that exchange fails with an [`FlushTaskError`], the write path
92 : /// bails and leaves this as `None`.
93 : /// Subsequent writes will panic if attempted.
94 : /// The read path continues to work without error because [`Self::maybe_flushed`]
95 : /// and [`Self::bytes_submitted`] are advanced before the flush loop exchange starts,
96 : /// so, they will never try to read from [`Self::mutable`] anyway, because it's past
97 : /// the [`Self::maybe_flushed`] point.
98 : mutable: Option<B>,
99 : /// A handle to the background flush task for writting data to disk.
100 : flush_handle: FlushHandle<B::IoBuf, W>,
101 : /// The number of bytes submitted to the background task.
102 : bytes_submitted: u64,
103 : }
104 :
105 : /// How [`BufferedWriter::shutdown`] should deal with pending (=not-yet-flushed) data.
106 : ///
107 : /// Cf the [`BufferedWriter`] comment's paragraph for context on why we need to think about this.
108 : pub enum BufferedWriterShutdownMode {
109 : /// Drop pending data, don't write back to file.
110 : DropTail,
111 : /// Pad the pending data with zeroes (cf [`usize::next_multiple_of`]).
112 : ZeroPadToNextMultiple(usize),
113 : /// Fill the IO buffer with zeroes, flush to disk, the `ftruncate` the
114 : /// file to the exact number of bytes written to [`Self`].
115 : ///
116 : /// TODO: see in [`BufferedWriter`] comment about decoupling buffer capacity from alignment requirement.
117 : PadThenTruncate,
118 : }
119 :
120 : impl<B, Buf, W> BufferedWriter<B, W>
121 : where
122 : B: IoBufAlignedMut + Buffer<IoBuf = Buf> + Send + 'static,
123 : Buf: IoBufAligned + Send + Sync + CheapCloneForRead,
124 : W: OwnedAsyncWriter + Send + Sync + 'static + std::fmt::Debug,
125 : {
126 : /// Creates a new buffered writer.
127 : ///
128 : /// The `buf_new` function provides a way to initialize the owned buffers used by this writer.
129 20832 : pub fn new(
130 20832 : writer: W,
131 20832 : start_offset: u64,
132 20832 : buf_new: impl Fn() -> B,
133 20832 : gate_guard: utils::sync::gate::GateGuard,
134 20832 : cancel: CancellationToken,
135 20832 : ctx: &RequestContext,
136 20832 : flush_task_span: tracing::Span,
137 20832 : ) -> Self {
138 20832 : Self {
139 20832 : mutable: Some(buf_new()),
140 20832 : maybe_flushed: None,
141 20832 : flush_handle: FlushHandle::spawn_new(
142 20832 : writer,
143 20832 : buf_new(),
144 20832 : gate_guard,
145 20832 : cancel,
146 20832 : ctx.attached_child(),
147 20832 : flush_task_span,
148 20832 : ),
149 20832 : bytes_submitted: start_offset,
150 20832 : }
151 20832 : }
152 :
153 : /// Returns the number of bytes submitted to the background flush task.
154 3179205 : pub fn bytes_submitted(&self) -> u64 {
155 3179205 : self.bytes_submitted
156 3179205 : }
157 :
158 : /// Panics if used after any of the write paths returned an error
159 3179205 : pub fn inspect_mutable(&self) -> Option<&B> {
160 3179205 : self.mutable.as_ref()
161 3179205 : }
162 :
163 : /// Gets a reference to the maybe flushed read-only buffer.
164 : /// Returns `None` if the writer has not submitted any flush request.
165 3179229 : pub fn inspect_maybe_flushed(&self) -> Option<&FullSlice<Buf>> {
166 3179229 : self.maybe_flushed.as_ref()
167 3179229 : }
168 :
169 : #[cfg_attr(target_os = "macos", allow(dead_code))]
170 11256 : pub async fn shutdown(
171 11256 : mut self,
172 11256 : mode: BufferedWriterShutdownMode,
173 11256 : ctx: &RequestContext,
174 11256 : ) -> Result<(u64, W), FlushTaskError> {
175 11256 : let mut mutable = self.mutable.take().expect("must not use after an error");
176 11256 : let unpadded_pending = mutable.pending();
177 11256 : let final_len: u64;
178 11256 : let shutdown_req;
179 11256 : match mode {
180 : BufferedWriterShutdownMode::DropTail => {
181 12 : trace!(pending=%mutable.pending(), "dropping pending data");
182 12 : drop(mutable);
183 12 :
184 12 : final_len = self.bytes_submitted;
185 12 : shutdown_req = ShutdownRequest { set_len: None };
186 : }
187 11136 : BufferedWriterShutdownMode::ZeroPadToNextMultiple(next_multiple) => {
188 11136 : let len = mutable.pending();
189 11136 : let cap = mutable.cap();
190 11136 : assert!(
191 11136 : len <= cap,
192 0 : "buffer impl ensures this, but let's check because the extend_with below would panic if we go beyond"
193 : );
194 11136 : let padded_len = len.next_multiple_of(next_multiple);
195 11136 : assert!(
196 11136 : padded_len <= cap,
197 0 : "caller specified a multiple that is larger than the buffer capacity"
198 : );
199 11136 : let count = padded_len - len;
200 11136 : mutable.extend_with(0, count);
201 11136 : trace!(count, "padding with zeros");
202 11136 : self.mutable = Some(mutable);
203 11136 :
204 11136 : final_len = self.bytes_submitted + padded_len.into_u64();
205 11136 : shutdown_req = ShutdownRequest { set_len: None };
206 : }
207 : BufferedWriterShutdownMode::PadThenTruncate => {
208 108 : let len = mutable.pending();
209 108 : let cap = mutable.cap();
210 108 : // TODO: see struct comment TODO on decoupling buffer capacity from alignment requirement.
211 108 : let alignment_requirement = cap;
212 108 : assert!(len <= cap, "buffer impl should ensure this");
213 108 : let padding_end_offset = len.next_multiple_of(alignment_requirement);
214 108 : assert!(
215 108 : padding_end_offset <= cap,
216 0 : "{padding_end_offset} <= {cap} ({alignment_requirement})"
217 : );
218 108 : let count = padding_end_offset - len;
219 108 : mutable.extend_with(0, count);
220 108 : trace!(count, "padding with zeros");
221 108 : self.mutable = Some(mutable);
222 108 :
223 108 : final_len = self.bytes_submitted + len.into_u64();
224 108 : shutdown_req = ShutdownRequest {
225 108 : // Avoid set_len call if we didn't need to pad anything.
226 108 : set_len: if count > 0 { Some(final_len) } else { None },
227 : };
228 : }
229 : };
230 11256 : let padded_pending = self.mutable.as_ref().map(|b| b.pending());
231 11256 : trace!(unpadded_pending, padded_pending, "padding done");
232 11256 : if self.mutable.is_some() {
233 11244 : self.flush(ctx).await?;
234 12 : }
235 : let Self {
236 : mutable: _,
237 : maybe_flushed: _,
238 11256 : mut flush_handle,
239 11256 : bytes_submitted: _,
240 11256 : } = self;
241 11256 : let writer = flush_handle.shutdown(shutdown_req).await?;
242 :
243 11256 : Ok((final_len, writer))
244 11256 : }
245 :
246 : #[cfg(test)]
247 60 : pub(crate) fn mutable(&self) -> &B {
248 60 : self.mutable.as_ref().expect("must not use after an error")
249 60 : }
250 :
251 : #[cfg_attr(target_os = "macos", allow(dead_code))]
252 78571572 : pub async fn write_buffered_borrowed(
253 78571572 : &mut self,
254 78571572 : chunk: &[u8],
255 78571572 : ctx: &RequestContext,
256 78571572 : ) -> Result<usize, FlushTaskError> {
257 78571572 : let (len, control) = self.write_buffered_borrowed_controlled(chunk, ctx).await?;
258 78571572 : if let Some(control) = control {
259 71028 : control.release().await;
260 78500544 : }
261 78571572 : Ok(len)
262 78571572 : }
263 :
264 : /// In addition to bytes submitted in this write, also returns a handle that can control the flush behavior.
265 107400936 : pub(crate) async fn write_buffered_borrowed_controlled(
266 107400936 : &mut self,
267 107400936 : mut chunk: &[u8],
268 107400936 : ctx: &RequestContext,
269 107400936 : ) -> Result<(usize, Option<FlushControl>), FlushTaskError> {
270 107400936 : let chunk_len = chunk.len();
271 107400936 : let mut control: Option<FlushControl> = None;
272 214931760 : while !chunk.is_empty() {
273 107530824 : let buf = self.mutable.as_mut().expect("must not use after an error");
274 107530824 : let need = buf.cap() - buf.pending();
275 107530824 : let have = chunk.len();
276 107530824 : let n = std::cmp::min(need, have);
277 107530824 : buf.extend_from_slice(&chunk[..n]);
278 107530824 : chunk = &chunk[n..];
279 107530824 : if buf.pending() >= buf.cap() {
280 130416 : assert_eq!(buf.pending(), buf.cap());
281 130416 : if let Some(control) = control.take() {
282 26148 : control.release().await;
283 104268 : }
284 130416 : control = self.flush(ctx).await?;
285 107400408 : }
286 : }
287 107400936 : Ok((chunk_len, control))
288 107400936 : }
289 :
290 : /// This function can only error if the flush task got cancelled.
291 : /// In that case, we leave [`Self::mutable`] intentionally as `None`.
292 : ///
293 : /// The read path continues to function correctly; it can read up to the
294 : /// point where it could read before, i.e., including what was in [`Self::mutable`]
295 : /// before the call to this function, because that's now stored in [`Self::maybe_flushed`].
296 : ///
297 : /// The write path becomes unavailable and will panic if used.
298 : /// The only correct solution to retry writes is to discard the entire [`BufferedWriter`],
299 : /// which upper layers of pageserver write path currently do not support.
300 : /// It is in fact quite hard to reason about what exactly happens in today's code.
301 : /// Best case we accumulate junk in the EphemeralFile, worst case is data corruption.
302 : #[must_use = "caller must explcitly check the flush control"]
303 141660 : async fn flush(
304 141660 : &mut self,
305 141660 : _ctx: &RequestContext,
306 141660 : ) -> Result<Option<FlushControl>, FlushTaskError> {
307 141660 : let buf = self.mutable.take().expect("must not use after an error");
308 141660 : let buf_len = buf.pending();
309 141660 : if buf_len == 0 {
310 12 : self.mutable = Some(buf);
311 12 : return Ok(None);
312 141648 : }
313 141648 : // Prepare the buffer for read while flushing.
314 141648 : let slice = buf.flush();
315 141648 : // NB: this assignment also drops thereference to the old buffer, allowing us to re-own & make it mutable below.
316 141648 : self.maybe_flushed = Some(slice.cheap_clone());
317 141648 : let offset = self.bytes_submitted;
318 141648 : self.bytes_submitted += u64::try_from(buf_len).unwrap();
319 :
320 : // If we return/panic here or later, we'll leave mutable = None, breaking further
321 : // writers, but the read path should still work.
322 141648 : let (recycled, flush_control) = self.flush_handle.flush(slice, offset).await?;
323 :
324 : // The only other place that could hold a reference to the recycled buffer
325 : // is in `Self::maybe_flushed`, but we have already replace it with the new buffer.
326 141648 : let recycled = Buffer::reuse_after_flush(recycled.into_raw_slice().into_inner());
327 141648 :
328 141648 : // We got back some recycled buffer, can open up for more writes again.
329 141648 : self.mutable = Some(recycled);
330 141648 :
331 141648 : Ok(Some(flush_control))
332 141660 : }
333 : }
334 :
335 : /// A [`Buffer`] is used by [`BufferedWriter`] to batch smaller writes into larger ones.
336 : pub trait Buffer {
337 : type IoBuf: IoBuf;
338 :
339 : /// Capacity of the buffer. Must not change over the lifetime `self`.`
340 : fn cap(&self) -> usize;
341 :
342 : /// Add data to the buffer.
343 : /// Panics if there is not enough room to accomodate `other`'s content, i.e.,
344 : /// panics if `other.len() > self.cap() - self.pending()`.
345 : fn extend_from_slice(&mut self, other: &[u8]);
346 :
347 : /// Add `count` bytes `val` into `self`.
348 : /// Panics if `count > self.cap() - self.pending()`.
349 : fn extend_with(&mut self, val: u8, count: usize);
350 :
351 : /// Number of bytes in the buffer.
352 : fn pending(&self) -> usize;
353 :
354 : /// Turns `self` into a [`FullSlice`] of the pending data
355 : /// so we can use [`tokio_epoll_uring`] to write it to disk.
356 : fn flush(self) -> FullSlice<Self::IoBuf>;
357 :
358 : /// After the write to disk is done and we have gotten back the slice,
359 : /// [`BufferedWriter`] uses this method to re-use the io buffer.
360 : fn reuse_after_flush(iobuf: Self::IoBuf) -> Self;
361 : }
362 :
363 : impl Buffer for IoBufferMut {
364 : type IoBuf = IoBuffer;
365 :
366 322842332 : fn cap(&self) -> usize {
367 322842332 : self.capacity()
368 322842332 : }
369 :
370 107530824 : fn extend_from_slice(&mut self, other: &[u8]) {
371 107530824 : if self.len() + other.len() > self.cap() {
372 0 : panic!("Buffer capacity exceeded");
373 107530824 : }
374 107530824 :
375 107530824 : IoBufferMut::extend_from_slice(self, other);
376 107530824 : }
377 :
378 108200 : fn extend_with(&mut self, val: u8, count: usize) {
379 108200 : if self.len() + count > self.cap() {
380 0 : panic!("Buffer capacity exceeded");
381 108200 : }
382 108200 :
383 108200 : IoBufferMut::put_bytes(self, val, count);
384 108200 : }
385 :
386 218546673 : fn pending(&self) -> usize {
387 218546673 : self.len()
388 218546673 : }
389 :
390 162480 : fn flush(self) -> FullSlice<Self::IoBuf> {
391 162480 : self.freeze().slice_len()
392 162480 : }
393 :
394 : /// Caller should make sure that `iobuf` only have one strong reference before invoking this method.
395 141648 : fn reuse_after_flush(iobuf: Self::IoBuf) -> Self {
396 141648 : let mut recycled = iobuf
397 141648 : .into_mut()
398 141648 : .expect("buffer should only have one strong reference");
399 141648 : recycled.clear();
400 141648 : recycled
401 141648 : }
402 : }
403 :
404 : #[cfg(test)]
405 : mod tests {
406 : use std::sync::Mutex;
407 :
408 : use rstest::rstest;
409 :
410 : use super::*;
411 : use crate::context::{DownloadBehavior, RequestContext};
412 : use crate::task_mgr::TaskKind;
413 :
414 : #[derive(Debug, PartialEq, Eq)]
415 : enum Op {
416 : Write { buf: Vec<u8>, offset: u64 },
417 : SetLen { len: u64 },
418 : }
419 :
420 : #[derive(Default, Debug)]
421 : struct RecorderWriter {
422 : /// record bytes and write offsets.
423 : recording: Mutex<Vec<Op>>,
424 : }
425 :
426 : impl OwnedAsyncWriter for RecorderWriter {
427 156 : async fn write_all_at<Buf: IoBufAligned + Send>(
428 156 : &self,
429 156 : buf: FullSlice<Buf>,
430 156 : offset: u64,
431 156 : _: &RequestContext,
432 156 : ) -> (FullSlice<Buf>, std::io::Result<()>) {
433 156 : self.recording.lock().unwrap().push(Op::Write {
434 156 : buf: Vec::from(&buf[..]),
435 156 : offset,
436 156 : });
437 156 : (buf, Ok(()))
438 156 : }
439 12 : async fn set_len(&self, len: u64, _ctx: &RequestContext) -> std::io::Result<()> {
440 12 : self.recording.lock().unwrap().push(Op::SetLen { len });
441 12 : Ok(())
442 12 : }
443 : }
444 :
445 48 : fn test_ctx() -> RequestContext {
446 48 : RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error)
447 48 : }
448 :
449 36 : #[rstest]
450 : #[tokio::test]
451 : async fn test_write_all_borrowed_always_goes_through_buffer(
452 : #[values(
453 : BufferedWriterShutdownMode::DropTail,
454 : BufferedWriterShutdownMode::ZeroPadToNextMultiple(2),
455 : BufferedWriterShutdownMode::PadThenTruncate
456 : )]
457 : mode: BufferedWriterShutdownMode,
458 : ) -> anyhow::Result<()> {
459 : let ctx = test_ctx();
460 : let ctx = &ctx;
461 : let recorder = RecorderWriter::default();
462 : let gate = utils::sync::gate::Gate::default();
463 : let cancel = CancellationToken::new();
464 : let cap = 4;
465 : let mut writer = BufferedWriter::<_, RecorderWriter>::new(
466 : recorder,
467 : 0,
468 72 : || IoBufferMut::with_capacity(cap),
469 : gate.enter()?,
470 : cancel,
471 : ctx,
472 : tracing::Span::none(),
473 : );
474 :
475 : writer.write_buffered_borrowed(b"abc", ctx).await?;
476 : writer.write_buffered_borrowed(b"", ctx).await?;
477 : writer.write_buffered_borrowed(b"d", ctx).await?;
478 : writer.write_buffered_borrowed(b"efg", ctx).await?;
479 : writer.write_buffered_borrowed(b"hijklm", ctx).await?;
480 :
481 : let mut expect = {
482 : [(0, b"abcd"), (4, b"efgh"), (8, b"ijkl")]
483 : .into_iter()
484 108 : .map(|(offset, v)| Op::Write {
485 108 : offset,
486 108 : buf: v[..].to_vec(),
487 108 : })
488 : .collect::<Vec<_>>()
489 : };
490 : let expect_next_offset = 12;
491 :
492 : match &mode {
493 : BufferedWriterShutdownMode::DropTail => (),
494 : // We test the case with padding to next multiple of 2 so that it's different
495 : // from the alignment requirement of 4 inferred from buffer capacity.
496 : // See TODOs in the `BufferedWriter` struct comment on decoupling buffer capacity from alignment requirement.
497 : BufferedWriterShutdownMode::ZeroPadToNextMultiple(2) => {
498 : expect.push(Op::Write {
499 : offset: expect_next_offset,
500 : // it's legitimate for pad-to-next multiple 2 to be < alignment requirement 4 inferred from buffer capacity
501 : buf: b"m\0".to_vec(),
502 : });
503 : }
504 : BufferedWriterShutdownMode::ZeroPadToNextMultiple(_) => unimplemented!(),
505 : BufferedWriterShutdownMode::PadThenTruncate => {
506 : expect.push(Op::Write {
507 : offset: expect_next_offset,
508 : buf: b"m\0\0\0".to_vec(),
509 : });
510 : expect.push(Op::SetLen { len: 13 });
511 : }
512 : }
513 :
514 : let (_, recorder) = writer.shutdown(mode, ctx).await?;
515 : assert_eq!(&*recorder.recording.lock().unwrap(), &expect);
516 : Ok(())
517 : }
518 :
519 : #[tokio::test]
520 12 : async fn test_set_len_is_skipped_if_not_needed() -> anyhow::Result<()> {
521 12 : let ctx = test_ctx();
522 12 : let ctx = &ctx;
523 12 : let recorder = RecorderWriter::default();
524 12 : let gate = utils::sync::gate::Gate::default();
525 12 : let cancel = CancellationToken::new();
526 12 : let cap = 4;
527 12 : let mut writer = BufferedWriter::<_, RecorderWriter>::new(
528 12 : recorder,
529 12 : 0,
530 24 : || IoBufferMut::with_capacity(cap),
531 12 : gate.enter()?,
532 12 : cancel,
533 12 : ctx,
534 12 : tracing::Span::none(),
535 12 : );
536 12 :
537 12 : // write a multiple of `cap`
538 12 : writer.write_buffered_borrowed(b"abc", ctx).await?;
539 12 : writer.write_buffered_borrowed(b"defgh", ctx).await?;
540 12 :
541 12 : let (_, recorder) = writer
542 12 : .shutdown(BufferedWriterShutdownMode::PadThenTruncate, ctx)
543 12 : .await?;
544 12 :
545 12 : let expect = {
546 12 : [(0, b"abcd"), (4, b"efgh")]
547 12 : .into_iter()
548 24 : .map(|(offset, v)| Op::Write {
549 24 : offset,
550 24 : buf: v[..].to_vec(),
551 24 : })
552 12 : .collect::<Vec<_>>()
553 12 : };
554 12 :
555 12 : assert_eq!(
556 12 : &*recorder.recording.lock().unwrap(),
557 12 : &expect,
558 12 : "set_len should not be called if the buffer is already aligned"
559 12 : );
560 12 :
561 12 : Ok(())
562 12 : }
563 : }
|