LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io/aligned_buffer - buffer_mut.rs (source / functions) Coverage Total Hit
Test: 07bee600374ccd486c69370d0972d9035964fe68.info Lines: 91.8 % 219 201
Test Date: 2025-02-20 13:11:02 Functions: 86.4 % 59 51

            Line data    Source code
       1              : use std::{
       2              :     mem::MaybeUninit,
       3              :     ops::{Deref, DerefMut},
       4              : };
       5              : 
       6              : use super::{
       7              :     alignment::{Alignment, ConstAlign},
       8              :     buffer::AlignedBuffer,
       9              :     raw::RawAlignedBuffer,
      10              : };
      11              : 
      12              : /// A mutable aligned buffer type.
      13              : #[derive(Debug)]
      14              : pub struct AlignedBufferMut<A: Alignment> {
      15              :     raw: RawAlignedBuffer<A>,
      16              : }
      17              : 
      18              : impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
      19              :     /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
      20              :     ///
      21              :     /// The buffer will be able to hold at most `capacity` elements and will never resize.
      22              :     ///
      23              :     ///
      24              :     /// # Panics
      25              :     ///
      26              :     /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
      27              :     /// * `align` must not be zero,
      28              :     ///
      29              :     /// * `align` must be a power of two,
      30              :     ///
      31              :     /// * `capacity`, when rounded up to the nearest multiple of `align`,
      32              :     ///    must not overflow isize (i.e., the rounded value must be
      33              :     ///    less than or equal to `isize::MAX`).
      34      1440396 :     pub fn with_capacity(capacity: usize) -> Self {
      35      1440396 :         AlignedBufferMut {
      36      1440396 :             raw: RawAlignedBuffer::with_capacity(capacity),
      37      1440396 :         }
      38      1440396 :     }
      39              : 
      40              :     /// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
      41          608 :     pub fn with_capacity_zeroed(capacity: usize) -> Self {
      42              :         use bytes::BufMut;
      43          608 :         let mut buf = Self::with_capacity(capacity);
      44          608 :         buf.put_bytes(0, capacity);
      45          608 :         // SAFETY: `put_bytes` filled the entire buffer.
      46          608 :         unsafe { buf.set_len(capacity) };
      47          608 :         buf
      48          608 :     }
      49              : }
      50              : 
      51              : impl<A: Alignment> AlignedBufferMut<A> {
      52              :     /// Constructs a mutable aligned buffer from raw.
      53        13246 :     pub(super) fn from_raw(raw: RawAlignedBuffer<A>) -> Self {
      54        13246 :         AlignedBufferMut { raw }
      55        13246 :     }
      56              : 
      57              :     /// Returns the total number of bytes the buffer can hold.
      58              :     #[inline]
      59     53703635 :     pub fn capacity(&self) -> usize {
      60     53703635 :         self.raw.capacity()
      61     53703635 :     }
      62              : 
      63              :     /// Returns the alignment of the buffer.
      64              :     #[inline]
      65           44 :     pub fn align(&self) -> usize {
      66           44 :         self.raw.align()
      67           44 :     }
      68              : 
      69              :     /// Returns the number of bytes in the buffer, also referred to as its 'length'.
      70              :     #[inline]
      71     85422780 :     pub fn len(&self) -> usize {
      72     85422780 :         self.raw.len()
      73     85422780 :     }
      74              : 
      75              :     /// Force the length of the buffer to `new_len`.
      76              :     #[inline]
      77     11062530 :     unsafe fn set_len(&mut self, new_len: usize) {
      78     11062530 :         self.raw.set_len(new_len)
      79     11062530 :     }
      80              : 
      81              :     #[inline]
      82      6569751 :     fn as_ptr(&self) -> *const u8 {
      83      6569751 :         self.raw.as_ptr()
      84      6569751 :     }
      85              : 
      86              :     #[inline]
      87     12940663 :     fn as_mut_ptr(&mut self) -> *mut u8 {
      88     12940663 :         self.raw.as_mut_ptr()
      89     12940663 :     }
      90              : 
      91              :     /// Extracts a slice containing the entire buffer.
      92              :     ///
      93              :     /// Equivalent to `&s[..]`.
      94              :     #[inline]
      95      8367853 :     fn as_slice(&self) -> &[u8] {
      96      8367853 :         self.raw.as_slice()
      97      8367853 :     }
      98              : 
      99              :     /// Extracts a mutable slice of the entire buffer.
     100              :     ///
     101              :     /// Equivalent to `&mut s[..]`.
     102            0 :     fn as_mut_slice(&mut self) -> &mut [u8] {
     103            0 :         self.raw.as_mut_slice()
     104            0 :     }
     105              : 
     106              :     /// Drops the all the contents of the buffer, setting its length to `0`.
     107              :     #[inline]
     108        13298 :     pub fn clear(&mut self) {
     109        13298 :         self.raw.clear()
     110        13298 :     }
     111              : 
     112              :     /// Reserves capacity for at least `additional` more bytes to be inserted
     113              :     /// in the given `IoBufferMut`. The collection may reserve more space to
     114              :     /// speculatively avoid frequent reallocations. After calling `reserve`,
     115              :     /// capacity will be greater than or equal to `self.len() + additional`.
     116              :     /// Does nothing if capacity is already sufficient.
     117              :     ///
     118              :     /// # Panics
     119              :     ///
     120              :     /// Panics if the new capacity exceeds `isize::MAX` _bytes_.
     121      9623128 :     pub fn reserve(&mut self, additional: usize) {
     122      9623128 :         self.raw.reserve(additional);
     123      9623128 :     }
     124              : 
     125              :     /// Shortens the buffer, keeping the first len bytes.
     126           20 :     pub fn truncate(&mut self, len: usize) {
     127           20 :         self.raw.truncate(len);
     128           20 :     }
     129              : 
     130              :     /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
     131          184 :     pub fn leak<'a>(self) -> &'a mut [u8] {
     132          184 :         self.raw.leak()
     133          184 :     }
     134              : 
     135        17813 :     pub fn freeze(self) -> AlignedBuffer<A> {
     136        17813 :         let len = self.len();
     137        17813 :         AlignedBuffer::from_raw(self.raw, 0..len)
     138        17813 :     }
     139              : 
     140              :     /// Clones and appends all elements in a slice to the buffer. Reserves additional capacity as needed.
     141              :     #[inline]
     142      9623084 :     pub fn extend_from_slice(&mut self, extend: &[u8]) {
     143      9623084 :         let cnt = extend.len();
     144      9623084 :         self.reserve(cnt);
     145      9623084 : 
     146      9623084 :         // SAFETY: we already reserved additional `cnt` bytes, safe to perform memcpy.
     147      9623084 :         unsafe {
     148      9623084 :             let dst = self.spare_capacity_mut();
     149      9623084 :             // Reserved above
     150      9623084 :             debug_assert!(dst.len() >= cnt);
     151              : 
     152      9623084 :             core::ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt);
     153      9623084 :         }
     154      9623084 :         // SAFETY: We do have at least `cnt` bytes remaining before advance.
     155      9623084 :         unsafe {
     156      9623084 :             bytes::BufMut::advance_mut(self, cnt);
     157      9623084 :         }
     158      9623084 :     }
     159              : 
     160              :     /// Returns the remaining spare capacity of the vector as a slice of `MaybeUninit<u8>`.
     161              :     #[inline]
     162      9623084 :     fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
     163      9623084 :         // SAFETY: we guarantees that the `Self::capacity()` bytes from
     164      9623084 :         // `Self::as_mut_ptr()` are allocated.
     165      9623084 :         unsafe {
     166      9623084 :             let ptr = self.as_mut_ptr().add(self.len());
     167      9623084 :             let len = self.capacity() - self.len();
     168      9623084 : 
     169      9623084 :             core::slice::from_raw_parts_mut(ptr.cast(), len)
     170      9623084 :         }
     171      9623084 :     }
     172              : }
     173              : 
     174              : impl<A: Alignment> Deref for AlignedBufferMut<A> {
     175              :     type Target = [u8];
     176              : 
     177      8366445 :     fn deref(&self) -> &Self::Target {
     178      8366445 :         self.as_slice()
     179      8366445 :     }
     180              : }
     181              : 
     182              : impl<A: Alignment> DerefMut for AlignedBufferMut<A> {
     183            0 :     fn deref_mut(&mut self) -> &mut Self::Target {
     184            0 :         self.as_mut_slice()
     185            0 :     }
     186              : }
     187              : 
     188              : impl<A: Alignment> AsRef<[u8]> for AlignedBufferMut<A> {
     189            0 :     fn as_ref(&self) -> &[u8] {
     190            0 :         self.as_slice()
     191            0 :     }
     192              : }
     193              : 
     194              : impl<A: Alignment> AsMut<[u8]> for AlignedBufferMut<A> {
     195            0 :     fn as_mut(&mut self) -> &mut [u8] {
     196            0 :         self.as_mut_slice()
     197            0 :     }
     198              : }
     199              : 
     200              : impl<A: Alignment> PartialEq<[u8]> for AlignedBufferMut<A> {
     201         1408 :     fn eq(&self, other: &[u8]) -> bool {
     202         1408 :         self.as_slice().eq(other)
     203         1408 :     }
     204              : }
     205              : 
     206              : /// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
     207              : unsafe impl<A: Alignment> bytes::BufMut for AlignedBufferMut<A> {
     208              :     #[inline]
     209      9624412 :     fn remaining_mut(&self) -> usize {
     210      9624412 :         // Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
     211      9624412 :         // Thus, it can have at most `self.capacity` bytes.
     212      9624412 :         self.capacity() - self.len()
     213      9624412 :     }
     214              : 
     215              :     // SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
     216              :     #[inline]
     217      9623744 :     unsafe fn advance_mut(&mut self, cnt: usize) {
     218      9623744 :         let len = self.len();
     219      9623744 :         let remaining = self.remaining_mut();
     220      9623744 : 
     221      9623744 :         if remaining < cnt {
     222            0 :             panic_advance(cnt, remaining);
     223      9623744 :         }
     224      9623744 : 
     225      9623744 :         // Addition will not overflow since the sum is at most the capacity.
     226      9623744 :         self.set_len(len + cnt);
     227      9623744 :     }
     228              : 
     229              :     #[inline]
     230          660 :     fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
     231          660 :         let cap = self.capacity();
     232          660 :         let len = self.len();
     233          660 : 
     234          660 :         // SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
     235          660 :         // valid for `cap - len` bytes. The subtraction will not underflow since
     236          660 :         // `len <= cap`.
     237          660 :         unsafe {
     238          660 :             bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len)
     239          660 :         }
     240          660 :     }
     241              : }
     242              : 
     243              : /// Panic with a nice error message.
     244              : #[cold]
     245            0 : fn panic_advance(idx: usize, len: usize) -> ! {
     246            0 :     panic!(
     247            0 :         "advance out of bounds: the len is {} but advancing by {}",
     248            0 :         len, idx
     249            0 :     );
     250              : }
     251              : 
     252              : /// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer,
     253              : /// and the underlying pointer remains stable while io-uring is owning the buffer.
     254              : /// The tokio-epoll-uring crate itself will not resize the buffer and will respect
     255              : /// [`tokio_epoll_uring::IoBuf::bytes_total`].
     256              : unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBufferMut<A> {
     257      6569715 :     fn stable_ptr(&self) -> *const u8 {
     258      6569715 :         self.as_ptr()
     259      6569715 :     }
     260              : 
     261     14135349 :     fn bytes_init(&self) -> usize {
     262     14135349 :         self.len()
     263     14135349 :     }
     264              : 
     265      5129778 :     fn bytes_total(&self) -> usize {
     266      5129778 :         self.capacity()
     267      5129778 :     }
     268              : }
     269              : 
     270              : // SAFETY: See above.
     271              : unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
     272      3316919 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     273      3316919 :         self.as_mut_ptr()
     274      3316919 :     }
     275              : 
     276      1884438 :     unsafe fn set_init(&mut self, init_len: usize) {
     277      1884438 :         if self.len() < init_len {
     278      1438178 :             self.set_len(init_len);
     279      1438178 :         }
     280      1884438 :     }
     281              : }
     282              : 
     283              : #[cfg(test)]
     284              : mod tests {
     285              : 
     286              :     use super::*;
     287              : 
     288              :     const ALIGN: usize = 4 * 1024;
     289              :     type TestIoBufferMut = AlignedBufferMut<ConstAlign<ALIGN>>;
     290              : 
     291              :     #[test]
     292            4 :     fn test_with_capacity() {
     293            4 :         let v = TestIoBufferMut::with_capacity(ALIGN * 4);
     294            4 :         assert_eq!(v.len(), 0);
     295            4 :         assert_eq!(v.capacity(), ALIGN * 4);
     296            4 :         assert_eq!(v.align(), ALIGN);
     297            4 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     298              : 
     299            4 :         let v = TestIoBufferMut::with_capacity(ALIGN / 2);
     300            4 :         assert_eq!(v.len(), 0);
     301            4 :         assert_eq!(v.capacity(), ALIGN / 2);
     302            4 :         assert_eq!(v.align(), ALIGN);
     303            4 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     304            4 :     }
     305              : 
     306              :     #[test]
     307            4 :     fn test_with_capacity_zeroed() {
     308            4 :         let v = TestIoBufferMut::with_capacity_zeroed(ALIGN);
     309            4 :         assert_eq!(v.len(), ALIGN);
     310            4 :         assert_eq!(v.capacity(), ALIGN);
     311            4 :         assert_eq!(v.align(), ALIGN);
     312            4 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     313            4 :         assert_eq!(&v[..], &[0; ALIGN])
     314            4 :     }
     315              : 
     316              :     #[test]
     317            4 :     fn test_reserve() {
     318              :         use bytes::BufMut;
     319            4 :         let mut v = TestIoBufferMut::with_capacity(ALIGN);
     320            4 :         let capacity = v.capacity();
     321            4 :         v.reserve(capacity);
     322            4 :         assert_eq!(v.capacity(), capacity);
     323            4 :         let data = [b'a'; ALIGN];
     324            4 :         v.put(&data[..]);
     325            4 :         v.reserve(capacity);
     326            4 :         assert!(v.capacity() >= capacity * 2);
     327            4 :         assert_eq!(&v[..], &data[..]);
     328            4 :         let capacity = v.capacity();
     329            4 :         v.clear();
     330            4 :         v.reserve(capacity);
     331            4 :         assert_eq!(capacity, v.capacity());
     332            4 :     }
     333              : 
     334              :     #[test]
     335            4 :     fn test_bytes_put() {
     336              :         use bytes::BufMut;
     337            4 :         let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
     338            4 :         let x = [b'a'; ALIGN];
     339              : 
     340           12 :         for _ in 0..2 {
     341           40 :             for _ in 0..4 {
     342           32 :                 v.put(&x[..]);
     343           32 :             }
     344            8 :             assert_eq!(v.len(), ALIGN * 4);
     345            8 :             assert_eq!(v.capacity(), ALIGN * 4);
     346            8 :             assert_eq!(v.align(), ALIGN);
     347            8 :             assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     348            8 :             v.clear()
     349              :         }
     350            4 :         assert_eq!(v.len(), 0);
     351            4 :         assert_eq!(v.capacity(), ALIGN * 4);
     352            4 :         assert_eq!(v.align(), ALIGN);
     353            4 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     354            4 :     }
     355              : 
     356              :     #[test]
     357              :     #[should_panic]
     358            4 :     fn test_bytes_put_panic() {
     359              :         use bytes::BufMut;
     360              :         const ALIGN: usize = 4 * 1024;
     361            4 :         let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
     362            4 :         let x = [b'a'; ALIGN];
     363           24 :         for _ in 0..5 {
     364           20 :             v.put_slice(&x[..]);
     365           20 :         }
     366            4 :     }
     367              : 
     368              :     #[test]
     369            4 :     fn test_io_buf_put_slice() {
     370              :         use tokio_epoll_uring::BoundedBufMut;
     371              :         const ALIGN: usize = 4 * 1024;
     372            4 :         let mut v = TestIoBufferMut::with_capacity(ALIGN);
     373            4 :         let x = [b'a'; ALIGN];
     374              : 
     375           12 :         for _ in 0..2 {
     376            8 :             v.put_slice(&x[..]);
     377            8 :             assert_eq!(v.len(), ALIGN);
     378            8 :             assert_eq!(v.capacity(), ALIGN);
     379            8 :             assert_eq!(v.align(), ALIGN);
     380            8 :             assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     381            8 :             v.clear()
     382              :         }
     383            4 :         assert_eq!(v.len(), 0);
     384            4 :         assert_eq!(v.capacity(), ALIGN);
     385            4 :         assert_eq!(v.align(), ALIGN);
     386            4 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     387            4 :     }
     388              : }
        

Generated by: LCOV version 2.1-beta