LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io/aligned_buffer - buffer_mut.rs (source / functions) Coverage Total Hit
Test: 1e20c4f2b28aa592527961bb32170ebbd2c9172f.info Lines: 94.3 % 209 197
Test Date: 2025-07-16 12:29:03 Functions: 85.7 % 63 54

            Line data    Source code
       1              : use std::mem::MaybeUninit;
       2              : use std::ops::{Deref, DerefMut};
       3              : 
       4              : use super::alignment::{Alignment, ConstAlign};
       5              : use super::buffer::AlignedBuffer;
       6              : use super::raw::RawAlignedBuffer;
       7              : 
       8              : /// A mutable aligned buffer type.
       9              : #[derive(Debug)]
      10              : pub struct AlignedBufferMut<A: Alignment> {
      11              :     raw: RawAlignedBuffer<A>,
      12              : }
      13              : 
      14              : impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
      15              :     /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
      16              :     ///
      17              :     /// The buffer will be able to hold at most `capacity` elements and will never resize.
      18              :     ///
      19              :     ///
      20              :     /// # Panics
      21              :     ///
      22              :     /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
      23              :     /// * `align` must not be zero,
      24              :     ///
      25              :     /// * `align` must be a power of two,
      26              :     ///
      27              :     /// * `capacity`, when rounded up to the nearest multiple of `align`,
      28              :     ///   must not overflow isize (i.e., the rounded value must be
      29              :     ///   less than or equal to `isize::MAX`).
      30       405605 :     pub fn with_capacity(capacity: usize) -> Self {
      31       405605 :         AlignedBufferMut {
      32       405605 :             raw: RawAlignedBuffer::with_capacity(capacity),
      33       405605 :         }
      34       405605 :     }
      35              : 
      36              :     /// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
      37          560 :     pub fn with_capacity_zeroed(capacity: usize) -> Self {
      38              :         use bytes::BufMut;
      39          560 :         let mut buf = Self::with_capacity(capacity);
      40          560 :         buf.put_bytes(0, capacity);
      41              :         // SAFETY: `put_bytes` filled the entire buffer.
      42          560 :         unsafe { buf.set_len(capacity) };
      43          560 :         buf
      44          560 :     }
      45              : }
      46              : 
      47              : impl<A: Alignment> AlignedBufferMut<A> {
      48              :     /// Constructs a mutable aligned buffer from raw.
      49        11817 :     pub(super) fn from_raw(raw: RawAlignedBuffer<A>) -> Self {
      50        11817 :         AlignedBufferMut { raw }
      51        11817 :     }
      52              : 
      53              :     /// Returns the total number of bytes the buffer can hold.
      54              :     #[inline]
      55     46662716 :     pub fn capacity(&self) -> usize {
      56     46662716 :         self.raw.capacity()
      57     46662716 :     }
      58              : 
      59              :     /// Returns the alignment of the buffer.
      60              :     #[inline]
      61           11 :     pub fn align(&self) -> usize {
      62           11 :         self.raw.align()
      63           11 :     }
      64              : 
      65              :     /// Returns the number of bytes in the buffer, also referred to as its 'length'.
      66              :     #[inline]
      67     69305243 :     pub fn len(&self) -> usize {
      68     69305243 :         self.raw.len()
      69     69305243 :     }
      70              : 
      71              :     /// Force the length of the buffer to `new_len`.
      72              :     #[inline]
      73      9459961 :     unsafe fn set_len(&mut self, new_len: usize) {
      74              :         // SAFETY: the caller is unsafe
      75      9459961 :         unsafe { self.raw.set_len(new_len) }
      76      9459961 :     }
      77              : 
      78              :     #[inline]
      79      2321845 :     fn as_ptr(&self) -> *const u8 {
      80      2321845 :         self.raw.as_ptr()
      81      2321845 :     }
      82              : 
      83              :     #[inline]
      84     10342002 :     fn as_mut_ptr(&mut self) -> *mut u8 {
      85     10342002 :         self.raw.as_mut_ptr()
      86     10342002 :     }
      87              : 
      88              :     /// Extracts a slice containing the entire buffer.
      89              :     ///
      90              :     /// Equivalent to `&s[..]`.
      91              :     #[inline]
      92      2714466 :     fn as_slice(&self) -> &[u8] {
      93      2714466 :         self.raw.as_slice()
      94      2714466 :     }
      95              : 
      96              :     /// Extracts a mutable slice of the entire buffer.
      97              :     ///
      98              :     /// Equivalent to `&mut s[..]`.
      99          203 :     fn as_mut_slice(&mut self) -> &mut [u8] {
     100          203 :         self.raw.as_mut_slice()
     101          203 :     }
     102              : 
     103              :     /// Drops the all the contents of the buffer, setting its length to `0`.
     104              :     #[inline]
     105        11830 :     pub fn clear(&mut self) {
     106        11830 :         self.raw.clear()
     107        11830 :     }
     108              : 
     109              :     /// Reserves capacity for at least `additional` more bytes to be inserted
     110              :     /// in the given `IoBufferMut`. The collection may reserve more space to
     111              :     /// speculatively avoid frequent reallocations. After calling `reserve`,
     112              :     /// capacity will be greater than or equal to `self.len() + additional`.
     113              :     /// Does nothing if capacity is already sufficient.
     114              :     ///
     115              :     /// # Panics
     116              :     ///
     117              :     /// Panics if the new capacity exceeds `isize::MAX` _bytes_.
     118      9007222 :     pub fn reserve(&mut self, additional: usize) {
     119      9007222 :         self.raw.reserve(additional);
     120      9007222 :     }
     121              : 
     122              :     /// Shortens the buffer, keeping the first len bytes.
     123            5 :     pub fn truncate(&mut self, len: usize) {
     124            5 :         self.raw.truncate(len);
     125            5 :     }
     126              : 
     127              :     /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
     128           50 :     pub fn leak<'a>(self) -> &'a mut [u8] {
     129           50 :         self.raw.leak()
     130           50 :     }
     131              : 
     132        22424 :     pub fn freeze(self) -> AlignedBuffer<A> {
     133        22424 :         let len = self.len();
     134        22424 :         AlignedBuffer::from_raw(self.raw, 0..len)
     135        22424 :     }
     136              : 
     137              :     /// Clones and appends all elements in a slice to the buffer. Reserves additional capacity as needed.
     138              :     #[inline]
     139      9007211 :     pub fn extend_from_slice(&mut self, extend: &[u8]) {
     140      9007211 :         let cnt = extend.len();
     141      9007211 :         self.reserve(cnt);
     142              : 
     143              :         // SAFETY: we already reserved additional `cnt` bytes, safe to perform memcpy.
     144              :         unsafe {
     145      9007211 :             let dst = self.spare_capacity_mut();
     146              :             // Reserved above
     147      9007211 :             debug_assert!(dst.len() >= cnt);
     148              : 
     149      9007211 :             core::ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt);
     150              :         }
     151              :         // SAFETY: We do have at least `cnt` bytes remaining before advance.
     152      9007211 :         unsafe {
     153      9007211 :             bytes::BufMut::advance_mut(self, cnt);
     154      9007211 :         }
     155      9007211 :     }
     156              : 
     157              :     /// Returns the remaining spare capacity of the vector as a slice of `MaybeUninit<u8>`.
     158              :     #[inline]
     159      9007211 :     fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
     160              :         // SAFETY: we guarantees that the `Self::capacity()` bytes from
     161              :         // `Self::as_mut_ptr()` are allocated.
     162      9007211 :         unsafe {
     163      9007211 :             let ptr = self.as_mut_ptr().add(self.len());
     164      9007211 :             let len = self.capacity() - self.len();
     165      9007211 : 
     166      9007211 :             core::slice::from_raw_parts_mut(ptr.cast(), len)
     167      9007211 :         }
     168      9007211 :     }
     169              : }
     170              : 
     171              : impl<A: Alignment> Deref for AlignedBufferMut<A> {
     172              :     type Target = [u8];
     173              : 
     174      2714115 :     fn deref(&self) -> &Self::Target {
     175      2714115 :         self.as_slice()
     176      2714115 :     }
     177              : }
     178              : 
     179              : impl<A: Alignment> DerefMut for AlignedBufferMut<A> {
     180          203 :     fn deref_mut(&mut self) -> &mut Self::Target {
     181          203 :         self.as_mut_slice()
     182          203 :     }
     183              : }
     184              : 
     185              : impl<A: Alignment> AsRef<[u8]> for AlignedBufferMut<A> {
     186            0 :     fn as_ref(&self) -> &[u8] {
     187            0 :         self.as_slice()
     188            0 :     }
     189              : }
     190              : 
     191              : impl<A: Alignment> AsMut<[u8]> for AlignedBufferMut<A> {
     192            0 :     fn as_mut(&mut self) -> &mut [u8] {
     193            0 :         self.as_mut_slice()
     194            0 :     }
     195              : }
     196              : 
     197              : impl<A: Alignment> PartialEq<[u8]> for AlignedBufferMut<A> {
     198          351 :     fn eq(&self, other: &[u8]) -> bool {
     199          351 :         self.as_slice().eq(other)
     200          351 :     }
     201              : }
     202              : 
     203              : /// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
     204              : unsafe impl<A: Alignment> bytes::BufMut for AlignedBufferMut<A> {
     205              :     #[inline]
     206      9125832 :     fn remaining_mut(&self) -> usize {
     207              :         // Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
     208              :         // Thus, it can have at most `self.capacity` bytes.
     209      9125832 :         self.capacity() - self.len()
     210      9125832 :     }
     211              : 
     212              :     // SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
     213              :     #[inline]
     214      9065937 :     unsafe fn advance_mut(&mut self, cnt: usize) {
     215      9065937 :         let len = self.len();
     216      9065937 :         let remaining = self.remaining_mut();
     217              : 
     218      9065937 :         if remaining < cnt {
     219            0 :             panic_advance(cnt, remaining);
     220      9065937 :         }
     221              : 
     222              :         // SAFETY: Addition will not overflow since the sum is at most the capacity.
     223      9065937 :         unsafe {
     224      9065937 :             self.set_len(len + cnt);
     225      9065937 :         }
     226      9065937 :     }
     227              : 
     228              :     #[inline]
     229        58726 :     fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
     230        58726 :         let cap = self.capacity();
     231        58726 :         let len = self.len();
     232              : 
     233              :         // SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
     234              :         // valid for `cap - len` bytes. The subtraction will not underflow since
     235              :         // `len <= cap`.
     236        58726 :         unsafe {
     237        58726 :             bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len)
     238        58726 :         }
     239        58726 :     }
     240              : }
     241              : 
     242              : /// Panic with a nice error message.
     243              : #[cold]
     244            0 : fn panic_advance(idx: usize, len: usize) -> ! {
     245            0 :     panic!("advance out of bounds: the len is {len} but advancing by {idx}");
     246              : }
     247              : 
     248              : /// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer,
     249              : /// and the underlying pointer remains stable while io-uring is owning the buffer.
     250              : /// The tokio-epoll-uring crate itself will not resize the buffer and will respect
     251              : /// [`tokio_epoll_uring::IoBuf::bytes_total`].
     252              : unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBufferMut<A> {
     253      2321836 :     fn stable_ptr(&self) -> *const u8 {
     254      2321836 :         self.as_ptr()
     255      2321836 :     }
     256              : 
     257      4784583 :     fn bytes_init(&self) -> usize {
     258      4784583 :         self.len()
     259      4784583 :     }
     260              : 
     261      1428486 :     fn bytes_total(&self) -> usize {
     262      1428486 :         self.capacity()
     263      1428486 :     }
     264              : }
     265              : 
     266              : // SAFETY: See above.
     267              : unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
     268      1276065 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     269      1276065 :         self.as_mut_ptr()
     270      1276065 :     }
     271              : 
     272       768665 :     unsafe fn set_init(&mut self, init_len: usize) {
     273       768665 :         if self.len() < init_len {
     274              :             // SAFETY: caller function is unsafe
     275       393464 :             unsafe {
     276       393464 :                 self.set_len(init_len);
     277       393464 :             }
     278       375201 :         }
     279       768665 :     }
     280              : }
     281              : 
     282              : impl<A: Alignment> std::io::Write for AlignedBufferMut<A> {
     283        46257 :     fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
     284        46257 :         self.extend_from_slice(buf);
     285        46257 :         Ok(buf.len())
     286        46257 :     }
     287              : 
     288            0 :     fn flush(&mut self) -> std::io::Result<()> {
     289            0 :         Ok(())
     290            0 :     }
     291              : }
     292              : 
     293              : #[cfg(test)]
     294              : mod tests {
     295              : 
     296              :     use super::*;
     297              : 
     298              :     const ALIGN: usize = 4 * 1024;
     299              :     type TestIoBufferMut = AlignedBufferMut<ConstAlign<ALIGN>>;
     300              : 
     301              :     #[test]
     302            1 :     fn test_with_capacity() {
     303            1 :         let v = TestIoBufferMut::with_capacity(ALIGN * 4);
     304            1 :         assert_eq!(v.len(), 0);
     305            1 :         assert_eq!(v.capacity(), ALIGN * 4);
     306            1 :         assert_eq!(v.align(), ALIGN);
     307            1 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     308              : 
     309            1 :         let v = TestIoBufferMut::with_capacity(ALIGN / 2);
     310            1 :         assert_eq!(v.len(), 0);
     311            1 :         assert_eq!(v.capacity(), ALIGN / 2);
     312            1 :         assert_eq!(v.align(), ALIGN);
     313            1 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     314            1 :     }
     315              : 
     316              :     #[test]
     317            1 :     fn test_with_capacity_zeroed() {
     318            1 :         let v = TestIoBufferMut::with_capacity_zeroed(ALIGN);
     319            1 :         assert_eq!(v.len(), ALIGN);
     320            1 :         assert_eq!(v.capacity(), ALIGN);
     321            1 :         assert_eq!(v.align(), ALIGN);
     322            1 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     323            1 :         assert_eq!(&v[..], &[0; ALIGN])
     324            1 :     }
     325              : 
     326              :     #[test]
     327            1 :     fn test_reserve() {
     328              :         use bytes::BufMut;
     329            1 :         let mut v = TestIoBufferMut::with_capacity(ALIGN);
     330            1 :         let capacity = v.capacity();
     331            1 :         v.reserve(capacity);
     332            1 :         assert_eq!(v.capacity(), capacity);
     333            1 :         let data = [b'a'; ALIGN];
     334            1 :         v.put(&data[..]);
     335            1 :         v.reserve(capacity);
     336            1 :         assert!(v.capacity() >= capacity * 2);
     337            1 :         assert_eq!(&v[..], &data[..]);
     338            1 :         let capacity = v.capacity();
     339            1 :         v.clear();
     340            1 :         v.reserve(capacity);
     341            1 :         assert_eq!(capacity, v.capacity());
     342            1 :     }
     343              : 
     344              :     #[test]
     345            1 :     fn test_bytes_put() {
     346              :         use bytes::BufMut;
     347            1 :         let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
     348            1 :         let x = [b'a'; ALIGN];
     349              : 
     350            3 :         for _ in 0..2 {
     351           10 :             for _ in 0..4 {
     352            8 :                 v.put(&x[..]);
     353            8 :             }
     354            2 :             assert_eq!(v.len(), ALIGN * 4);
     355            2 :             assert_eq!(v.capacity(), ALIGN * 4);
     356            2 :             assert_eq!(v.align(), ALIGN);
     357            2 :             assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     358            2 :             v.clear()
     359              :         }
     360            1 :         assert_eq!(v.len(), 0);
     361            1 :         assert_eq!(v.capacity(), ALIGN * 4);
     362            1 :         assert_eq!(v.align(), ALIGN);
     363            1 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     364            1 :     }
     365              : 
     366              :     #[test]
     367              :     #[should_panic]
     368            1 :     fn test_bytes_put_panic() {
     369              :         use bytes::BufMut;
     370              :         const ALIGN: usize = 4 * 1024;
     371            1 :         let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
     372            1 :         let x = [b'a'; ALIGN];
     373            5 :         for _ in 0..5 {
     374            4 :             v.put_slice(&x[..]);
     375            4 :         }
     376            1 :     }
     377              : 
     378              :     #[test]
     379            1 :     fn test_io_buf_put_slice() {
     380              :         use tokio_epoll_uring::BoundedBufMut;
     381              :         const ALIGN: usize = 4 * 1024;
     382            1 :         let mut v = TestIoBufferMut::with_capacity(ALIGN);
     383            1 :         let x = [b'a'; ALIGN];
     384              : 
     385            3 :         for _ in 0..2 {
     386            2 :             v.put_slice(&x[..]);
     387            2 :             assert_eq!(v.len(), ALIGN);
     388            2 :             assert_eq!(v.capacity(), ALIGN);
     389            2 :             assert_eq!(v.align(), ALIGN);
     390            2 :             assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     391            2 :             v.clear()
     392              :         }
     393            1 :         assert_eq!(v.len(), 0);
     394            1 :         assert_eq!(v.capacity(), ALIGN);
     395            1 :         assert_eq!(v.align(), ALIGN);
     396            1 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     397            1 :     }
     398              : }
        

Generated by: LCOV version 2.1-beta