LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io/aligned_buffer - buffer_mut.rs (source / functions) Coverage Total Hit
Test: 5445d246133daeceb0507e6cc0797ab7c1c70cb8.info Lines: 92.0 % 224 206
Test Date: 2025-03-12 18:05:02 Functions: 86.4 % 59 51

            Line data    Source code
       1              : use std::mem::MaybeUninit;
       2              : use std::ops::{Deref, DerefMut};
       3              : 
       4              : use super::alignment::{Alignment, ConstAlign};
       5              : use super::buffer::AlignedBuffer;
       6              : use super::raw::RawAlignedBuffer;
       7              : 
       8              : /// A mutable aligned buffer type.
       9              : #[derive(Debug)]
      10              : pub struct AlignedBufferMut<A: Alignment> {
      11              :     raw: RawAlignedBuffer<A>,
      12              : }
      13              : 
      14              : impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
      15              :     /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
      16              :     ///
      17              :     /// The buffer will be able to hold at most `capacity` elements and will never resize.
      18              :     ///
      19              :     ///
      20              :     /// # Panics
      21              :     ///
      22              :     /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
      23              :     /// * `align` must not be zero,
      24              :     ///
      25              :     /// * `align` must be a power of two,
      26              :     ///
      27              :     /// * `capacity`, when rounded up to the nearest multiple of `align`,
      28              :     ///    must not overflow isize (i.e., the rounded value must be
      29              :     ///    less than or equal to `isize::MAX`).
      30      1440478 :     pub fn with_capacity(capacity: usize) -> Self {
      31      1440478 :         AlignedBufferMut {
      32      1440478 :             raw: RawAlignedBuffer::with_capacity(capacity),
      33      1440478 :         }
      34      1440478 :     }
      35              : 
      36              :     /// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
      37          612 :     pub fn with_capacity_zeroed(capacity: usize) -> Self {
      38              :         use bytes::BufMut;
      39          612 :         let mut buf = Self::with_capacity(capacity);
      40          612 :         buf.put_bytes(0, capacity);
      41          612 :         // SAFETY: `put_bytes` filled the entire buffer.
      42          612 :         unsafe { buf.set_len(capacity) };
      43          612 :         buf
      44          612 :     }
      45              : }
      46              : 
      47              : impl<A: Alignment> AlignedBufferMut<A> {
      48              :     /// Constructs a mutable aligned buffer from raw.
      49        13246 :     pub(super) fn from_raw(raw: RawAlignedBuffer<A>) -> Self {
      50        13246 :         AlignedBufferMut { raw }
      51        13246 :     }
      52              : 
      53              :     /// Returns the total number of bytes the buffer can hold.
      54              :     #[inline]
      55     53703381 :     pub fn capacity(&self) -> usize {
      56     53703381 :         self.raw.capacity()
      57     53703381 :     }
      58              : 
      59              :     /// Returns the alignment of the buffer.
      60              :     #[inline]
      61           44 :     pub fn align(&self) -> usize {
      62           44 :         self.raw.align()
      63           44 :     }
      64              : 
      65              :     /// Returns the number of bytes in the buffer, also referred to as its 'length'.
      66              :     #[inline]
      67     85424215 :     pub fn len(&self) -> usize {
      68     85424215 :         self.raw.len()
      69     85424215 :     }
      70              : 
      71              :     /// Force the length of the buffer to `new_len`.
      72              :     #[inline]
      73     11062586 :     unsafe fn set_len(&mut self, new_len: usize) {
      74     11062586 :         // SAFETY: the caller is unsafe
      75     11062586 :         unsafe { self.raw.set_len(new_len) }
      76     11062586 :     }
      77              : 
      78              :     #[inline]
      79      6570281 :     fn as_ptr(&self) -> *const u8 {
      80      6570281 :         self.raw.as_ptr()
      81      6570281 :     }
      82              : 
      83              :     #[inline]
      84     12941083 :     fn as_mut_ptr(&mut self) -> *mut u8 {
      85     12941083 :         self.raw.as_mut_ptr()
      86     12941083 :     }
      87              : 
      88              :     /// Extracts a slice containing the entire buffer.
      89              :     ///
      90              :     /// Equivalent to `&s[..]`.
      91              :     #[inline]
      92      8367887 :     fn as_slice(&self) -> &[u8] {
      93      8367887 :         self.raw.as_slice()
      94      8367887 :     }
      95              : 
      96              :     /// Extracts a mutable slice of the entire buffer.
      97              :     ///
      98              :     /// Equivalent to `&mut s[..]`.
      99            0 :     fn as_mut_slice(&mut self) -> &mut [u8] {
     100            0 :         self.raw.as_mut_slice()
     101            0 :     }
     102              : 
     103              :     /// Drops the all the contents of the buffer, setting its length to `0`.
     104              :     #[inline]
     105        13298 :     pub fn clear(&mut self) {
     106        13298 :         self.raw.clear()
     107        13298 :     }
     108              : 
     109              :     /// Reserves capacity for at least `additional` more bytes to be inserted
     110              :     /// in the given `IoBufferMut`. The collection may reserve more space to
     111              :     /// speculatively avoid frequent reallocations. After calling `reserve`,
     112              :     /// capacity will be greater than or equal to `self.len() + additional`.
     113              :     /// Does nothing if capacity is already sufficient.
     114              :     ///
     115              :     /// # Panics
     116              :     ///
     117              :     /// Panics if the new capacity exceeds `isize::MAX` _bytes_.
     118      9623144 :     pub fn reserve(&mut self, additional: usize) {
     119      9623144 :         self.raw.reserve(additional);
     120      9623144 :     }
     121              : 
     122              :     /// Shortens the buffer, keeping the first len bytes.
     123           20 :     pub fn truncate(&mut self, len: usize) {
     124           20 :         self.raw.truncate(len);
     125           20 :     }
     126              : 
     127              :     /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
     128          188 :     pub fn leak<'a>(self) -> &'a mut [u8] {
     129          188 :         self.raw.leak()
     130          188 :     }
     131              : 
     132        17826 :     pub fn freeze(self) -> AlignedBuffer<A> {
     133        17826 :         let len = self.len();
     134        17826 :         AlignedBuffer::from_raw(self.raw, 0..len)
     135        17826 :     }
     136              : 
     137              :     /// Clones and appends all elements in a slice to the buffer. Reserves additional capacity as needed.
     138              :     #[inline]
     139      9623100 :     pub fn extend_from_slice(&mut self, extend: &[u8]) {
     140      9623100 :         let cnt = extend.len();
     141      9623100 :         self.reserve(cnt);
     142      9623100 : 
     143      9623100 :         // SAFETY: we already reserved additional `cnt` bytes, safe to perform memcpy.
     144      9623100 :         unsafe {
     145      9623100 :             let dst = self.spare_capacity_mut();
     146      9623100 :             // Reserved above
     147      9623100 :             debug_assert!(dst.len() >= cnt);
     148              : 
     149      9623100 :             core::ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt);
     150      9623100 :         }
     151      9623100 :         // SAFETY: We do have at least `cnt` bytes remaining before advance.
     152      9623100 :         unsafe {
     153      9623100 :             bytes::BufMut::advance_mut(self, cnt);
     154      9623100 :         }
     155      9623100 :     }
     156              : 
     157              :     /// Returns the remaining spare capacity of the vector as a slice of `MaybeUninit<u8>`.
     158              :     #[inline]
     159      9623100 :     fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
     160      9623100 :         // SAFETY: we guarantees that the `Self::capacity()` bytes from
     161      9623100 :         // `Self::as_mut_ptr()` are allocated.
     162      9623100 :         unsafe {
     163      9623100 :             let ptr = self.as_mut_ptr().add(self.len());
     164      9623100 :             let len = self.capacity() - self.len();
     165      9623100 : 
     166      9623100 :             core::slice::from_raw_parts_mut(ptr.cast(), len)
     167      9623100 :         }
     168      9623100 :     }
     169              : }
     170              : 
     171              : impl<A: Alignment> Deref for AlignedBufferMut<A> {
     172              :     type Target = [u8];
     173              : 
     174      8366479 :     fn deref(&self) -> &Self::Target {
     175      8366479 :         self.as_slice()
     176      8366479 :     }
     177              : }
     178              : 
     179              : impl<A: Alignment> DerefMut for AlignedBufferMut<A> {
     180            0 :     fn deref_mut(&mut self) -> &mut Self::Target {
     181            0 :         self.as_mut_slice()
     182            0 :     }
     183              : }
     184              : 
     185              : impl<A: Alignment> AsRef<[u8]> for AlignedBufferMut<A> {
     186            0 :     fn as_ref(&self) -> &[u8] {
     187            0 :         self.as_slice()
     188            0 :     }
     189              : }
     190              : 
     191              : impl<A: Alignment> AsMut<[u8]> for AlignedBufferMut<A> {
     192            0 :     fn as_mut(&mut self) -> &mut [u8] {
     193            0 :         self.as_mut_slice()
     194            0 :     }
     195              : }
     196              : 
     197              : impl<A: Alignment> PartialEq<[u8]> for AlignedBufferMut<A> {
     198         1408 :     fn eq(&self, other: &[u8]) -> bool {
     199         1408 :         self.as_slice().eq(other)
     200         1408 :     }
     201              : }
     202              : 
     203              : /// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
     204              : unsafe impl<A: Alignment> bytes::BufMut for AlignedBufferMut<A> {
     205              :     #[inline]
     206      9624436 :     fn remaining_mut(&self) -> usize {
     207      9624436 :         // Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
     208      9624436 :         // Thus, it can have at most `self.capacity` bytes.
     209      9624436 :         self.capacity() - self.len()
     210      9624436 :     }
     211              : 
     212              :     // SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
     213              :     #[inline]
     214      9623764 :     unsafe fn advance_mut(&mut self, cnt: usize) {
     215      9623764 :         let len = self.len();
     216      9623764 :         let remaining = self.remaining_mut();
     217      9623764 : 
     218      9623764 :         if remaining < cnt {
     219            0 :             panic_advance(cnt, remaining);
     220      9623764 :         }
     221      9623764 : 
     222      9623764 :         // SAFETY: Addition will not overflow since the sum is at most the capacity.
     223      9623764 :         unsafe {
     224      9623764 :             self.set_len(len + cnt);
     225      9623764 :         }
     226      9623764 :     }
     227              : 
     228              :     #[inline]
     229          664 :     fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
     230          664 :         let cap = self.capacity();
     231          664 :         let len = self.len();
     232          664 : 
     233          664 :         // SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
     234          664 :         // valid for `cap - len` bytes. The subtraction will not underflow since
     235          664 :         // `len <= cap`.
     236          664 :         unsafe {
     237          664 :             bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len)
     238          664 :         }
     239          664 :     }
     240              : }
     241              : 
     242              : /// Panic with a nice error message.
     243              : #[cold]
     244            0 : fn panic_advance(idx: usize, len: usize) -> ! {
     245            0 :     panic!(
     246            0 :         "advance out of bounds: the len is {} but advancing by {}",
     247            0 :         len, idx
     248            0 :     );
     249              : }
     250              : 
     251              : /// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer,
     252              : /// and the underlying pointer remains stable while io-uring is owning the buffer.
     253              : /// The tokio-epoll-uring crate itself will not resize the buffer and will respect
     254              : /// [`tokio_epoll_uring::IoBuf::bytes_total`].
     255              : unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBufferMut<A> {
     256      6570245 :     fn stable_ptr(&self) -> *const u8 {
     257      6570245 :         self.as_ptr()
     258      6570245 :     }
     259              : 
     260     14136231 :     fn bytes_init(&self) -> usize {
     261     14136231 :         self.len()
     262     14136231 :     }
     263              : 
     264      5129584 :     fn bytes_total(&self) -> usize {
     265      5129584 :         self.capacity()
     266      5129584 :     }
     267              : }
     268              : 
     269              : // SAFETY: See above.
     270              : unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
     271      3317319 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     272      3317319 :         self.as_mut_ptr()
     273      3317319 :     }
     274              : 
     275      1884470 :     unsafe fn set_init(&mut self, init_len: usize) {
     276      1884470 :         if self.len() < init_len {
     277              :             // SAFETY: caller function is unsafe
     278      1438210 :             unsafe {
     279      1438210 :                 self.set_len(init_len);
     280      1438210 :             }
     281       446260 :         }
     282      1884470 :     }
     283              : }
     284              : 
     285              : #[cfg(test)]
     286              : mod tests {
     287              : 
     288              :     use super::*;
     289              : 
     290              :     const ALIGN: usize = 4 * 1024;
     291              :     type TestIoBufferMut = AlignedBufferMut<ConstAlign<ALIGN>>;
     292              : 
     293              :     #[test]
     294            4 :     fn test_with_capacity() {
     295            4 :         let v = TestIoBufferMut::with_capacity(ALIGN * 4);
     296            4 :         assert_eq!(v.len(), 0);
     297            4 :         assert_eq!(v.capacity(), ALIGN * 4);
     298            4 :         assert_eq!(v.align(), ALIGN);
     299            4 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     300              : 
     301            4 :         let v = TestIoBufferMut::with_capacity(ALIGN / 2);
     302            4 :         assert_eq!(v.len(), 0);
     303            4 :         assert_eq!(v.capacity(), ALIGN / 2);
     304            4 :         assert_eq!(v.align(), ALIGN);
     305            4 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     306            4 :     }
     307              : 
     308              :     #[test]
     309            4 :     fn test_with_capacity_zeroed() {
     310            4 :         let v = TestIoBufferMut::with_capacity_zeroed(ALIGN);
     311            4 :         assert_eq!(v.len(), ALIGN);
     312            4 :         assert_eq!(v.capacity(), ALIGN);
     313            4 :         assert_eq!(v.align(), ALIGN);
     314            4 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     315            4 :         assert_eq!(&v[..], &[0; ALIGN])
     316            4 :     }
     317              : 
     318              :     #[test]
     319            4 :     fn test_reserve() {
     320              :         use bytes::BufMut;
     321            4 :         let mut v = TestIoBufferMut::with_capacity(ALIGN);
     322            4 :         let capacity = v.capacity();
     323            4 :         v.reserve(capacity);
     324            4 :         assert_eq!(v.capacity(), capacity);
     325            4 :         let data = [b'a'; ALIGN];
     326            4 :         v.put(&data[..]);
     327            4 :         v.reserve(capacity);
     328            4 :         assert!(v.capacity() >= capacity * 2);
     329            4 :         assert_eq!(&v[..], &data[..]);
     330            4 :         let capacity = v.capacity();
     331            4 :         v.clear();
     332            4 :         v.reserve(capacity);
     333            4 :         assert_eq!(capacity, v.capacity());
     334            4 :     }
     335              : 
     336              :     #[test]
     337            4 :     fn test_bytes_put() {
     338              :         use bytes::BufMut;
     339            4 :         let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
     340            4 :         let x = [b'a'; ALIGN];
     341              : 
     342           12 :         for _ in 0..2 {
     343           40 :             for _ in 0..4 {
     344           32 :                 v.put(&x[..]);
     345           32 :             }
     346            8 :             assert_eq!(v.len(), ALIGN * 4);
     347            8 :             assert_eq!(v.capacity(), ALIGN * 4);
     348            8 :             assert_eq!(v.align(), ALIGN);
     349            8 :             assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     350            8 :             v.clear()
     351              :         }
     352            4 :         assert_eq!(v.len(), 0);
     353            4 :         assert_eq!(v.capacity(), ALIGN * 4);
     354            4 :         assert_eq!(v.align(), ALIGN);
     355            4 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     356            4 :     }
     357              : 
     358              :     #[test]
     359              :     #[should_panic]
     360            4 :     fn test_bytes_put_panic() {
     361              :         use bytes::BufMut;
     362              :         const ALIGN: usize = 4 * 1024;
     363            4 :         let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
     364            4 :         let x = [b'a'; ALIGN];
     365           24 :         for _ in 0..5 {
     366           20 :             v.put_slice(&x[..]);
     367           20 :         }
     368            4 :     }
     369              : 
     370              :     #[test]
     371            4 :     fn test_io_buf_put_slice() {
     372              :         use tokio_epoll_uring::BoundedBufMut;
     373              :         const ALIGN: usize = 4 * 1024;
     374            4 :         let mut v = TestIoBufferMut::with_capacity(ALIGN);
     375            4 :         let x = [b'a'; ALIGN];
     376              : 
     377           12 :         for _ in 0..2 {
     378            8 :             v.put_slice(&x[..]);
     379            8 :             assert_eq!(v.len(), ALIGN);
     380            8 :             assert_eq!(v.capacity(), ALIGN);
     381            8 :             assert_eq!(v.align(), ALIGN);
     382            8 :             assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     383            8 :             v.clear()
     384              :         }
     385            4 :         assert_eq!(v.len(), 0);
     386            4 :         assert_eq!(v.capacity(), ALIGN);
     387            4 :         assert_eq!(v.align(), ALIGN);
     388            4 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     389            4 :     }
     390              : }
        

Generated by: LCOV version 2.1-beta