LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io/aligned_buffer - buffer_mut.rs (source / functions) Coverage Total Hit
Test: 472031e0b71f3195f7f21b1f2b20de09fd07bb56.info Lines: 93.5 % 231 216
Test Date: 2025-05-26 10:37:33 Functions: 85.7 % 63 54

            Line data    Source code
       1              : use std::mem::MaybeUninit;
       2              : use std::ops::{Deref, DerefMut};
       3              : 
       4              : use super::alignment::{Alignment, ConstAlign};
       5              : use super::buffer::AlignedBuffer;
       6              : use super::raw::RawAlignedBuffer;
       7              : 
       8              : /// A mutable aligned buffer type.
       9              : #[derive(Debug)]
      10              : pub struct AlignedBufferMut<A: Alignment> {
      11              :     raw: RawAlignedBuffer<A>,
      12              : }
      13              : 
      14              : impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
      15              :     /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
      16              :     ///
      17              :     /// The buffer will be able to hold at most `capacity` elements and will never resize.
      18              :     ///
      19              :     ///
      20              :     /// # Panics
      21              :     ///
      22              :     /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
      23              :     /// * `align` must not be zero,
      24              :     ///
      25              :     /// * `align` must be a power of two,
      26              :     ///
      27              :     /// * `capacity`, when rounded up to the nearest multiple of `align`,
      28              :     ///   must not overflow isize (i.e., the rounded value must be
      29              :     ///   less than or equal to `isize::MAX`).
      30       402637 :     pub fn with_capacity(capacity: usize) -> Self {
      31       402637 :         AlignedBufferMut {
      32       402637 :             raw: RawAlignedBuffer::with_capacity(capacity),
      33       402637 :         }
      34       402637 :     }
      35              : 
      36              :     /// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
      37          560 :     pub fn with_capacity_zeroed(capacity: usize) -> Self {
      38              :         use bytes::BufMut;
      39          560 :         let mut buf = Self::with_capacity(capacity);
      40          560 :         buf.put_bytes(0, capacity);
      41          560 :         // SAFETY: `put_bytes` filled the entire buffer.
      42          560 :         unsafe { buf.set_len(capacity) };
      43          560 :         buf
      44          560 :     }
      45              : }
      46              : 
      47              : impl<A: Alignment> AlignedBufferMut<A> {
      48              :     /// Constructs a mutable aligned buffer from raw.
      49        11805 :     pub(super) fn from_raw(raw: RawAlignedBuffer<A>) -> Self {
      50        11805 :         AlignedBufferMut { raw }
      51        11805 :     }
      52              : 
      53              :     /// Returns the total number of bytes the buffer can hold.
      54              :     #[inline]
      55     46647392 :     pub fn capacity(&self) -> usize {
      56     46647392 :         self.raw.capacity()
      57     46647392 :     }
      58              : 
      59              :     /// Returns the alignment of the buffer.
      60              :     #[inline]
      61           11 :     pub fn align(&self) -> usize {
      62           11 :         self.raw.align()
      63           11 :     }
      64              : 
      65              :     /// Returns the number of bytes in the buffer, also referred to as its 'length'.
      66              :     #[inline]
      67     67964003 :     pub fn len(&self) -> usize {
      68     67964003 :         self.raw.len()
      69     67964003 :     }
      70              : 
      71              :     /// Force the length of the buffer to `new_len`.
      72              :     #[inline]
      73      9457326 :     unsafe fn set_len(&mut self, new_len: usize) {
      74      9457326 :         // SAFETY: the caller is unsafe
      75      9457326 :         unsafe { self.raw.set_len(new_len) }
      76      9457326 :     }
      77              : 
      78              :     #[inline]
      79      1785983 :     fn as_ptr(&self) -> *const u8 {
      80      1785983 :         self.raw.as_ptr()
      81      1785983 :     }
      82              : 
      83              :     #[inline]
      84      9813264 :     fn as_mut_ptr(&mut self) -> *mut u8 {
      85      9813264 :         self.raw.as_mut_ptr()
      86      9813264 :     }
      87              : 
      88              :     /// Extracts a slice containing the entire buffer.
      89              :     ///
      90              :     /// Equivalent to `&s[..]`.
      91              :     #[inline]
      92      2702902 :     fn as_slice(&self) -> &[u8] {
      93      2702902 :         self.raw.as_slice()
      94      2702902 :     }
      95              : 
      96              :     /// Extracts a mutable slice of the entire buffer.
      97              :     ///
      98              :     /// Equivalent to `&mut s[..]`.
      99          203 :     fn as_mut_slice(&mut self) -> &mut [u8] {
     100          203 :         self.raw.as_mut_slice()
     101          203 :     }
     102              : 
     103              :     /// Drops the all the contents of the buffer, setting its length to `0`.
     104              :     #[inline]
     105        11818 :     pub fn clear(&mut self) {
     106        11818 :         self.raw.clear()
     107        11818 :     }
     108              : 
     109              :     /// Reserves capacity for at least `additional` more bytes to be inserted
     110              :     /// in the given `IoBufferMut`. The collection may reserve more space to
     111              :     /// speculatively avoid frequent reallocations. After calling `reserve`,
     112              :     /// capacity will be greater than or equal to `self.len() + additional`.
     113              :     /// Does nothing if capacity is already sufficient.
     114              :     ///
     115              :     /// # Panics
     116              :     ///
     117              :     /// Panics if the new capacity exceeds `isize::MAX` _bytes_.
     118      9006539 :     pub fn reserve(&mut self, additional: usize) {
     119      9006539 :         self.raw.reserve(additional);
     120      9006539 :     }
     121              : 
     122              :     /// Shortens the buffer, keeping the first len bytes.
     123            5 :     pub fn truncate(&mut self, len: usize) {
     124            5 :         self.raw.truncate(len);
     125            5 :     }
     126              : 
     127              :     /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
     128           50 :     pub fn leak<'a>(self) -> &'a mut [u8] {
     129           50 :         self.raw.leak()
     130           50 :     }
     131              : 
     132        22312 :     pub fn freeze(self) -> AlignedBuffer<A> {
     133        22312 :         let len = self.len();
     134        22312 :         AlignedBuffer::from_raw(self.raw, 0..len)
     135        22312 :     }
     136              : 
     137              :     /// Clones and appends all elements in a slice to the buffer. Reserves additional capacity as needed.
     138              :     #[inline]
     139      9006528 :     pub fn extend_from_slice(&mut self, extend: &[u8]) {
     140      9006528 :         let cnt = extend.len();
     141      9006528 :         self.reserve(cnt);
     142      9006528 : 
     143      9006528 :         // SAFETY: we already reserved additional `cnt` bytes, safe to perform memcpy.
     144      9006528 :         unsafe {
     145      9006528 :             let dst = self.spare_capacity_mut();
     146      9006528 :             // Reserved above
     147      9006528 :             debug_assert!(dst.len() >= cnt);
     148              : 
     149      9006528 :             core::ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt);
     150      9006528 :         }
     151      9006528 :         // SAFETY: We do have at least `cnt` bytes remaining before advance.
     152      9006528 :         unsafe {
     153      9006528 :             bytes::BufMut::advance_mut(self, cnt);
     154      9006528 :         }
     155      9006528 :     }
     156              : 
     157              :     /// Returns the remaining spare capacity of the vector as a slice of `MaybeUninit<u8>`.
     158              :     #[inline]
     159      9006528 :     fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
     160      9006528 :         // SAFETY: we guarantees that the `Self::capacity()` bytes from
     161      9006528 :         // `Self::as_mut_ptr()` are allocated.
     162      9006528 :         unsafe {
     163      9006528 :             let ptr = self.as_mut_ptr().add(self.len());
     164      9006528 :             let len = self.capacity() - self.len();
     165      9006528 : 
     166      9006528 :             core::slice::from_raw_parts_mut(ptr.cast(), len)
     167      9006528 :         }
     168      9006528 :     }
     169              : }
     170              : 
     171              : impl<A: Alignment> Deref for AlignedBufferMut<A> {
     172              :     type Target = [u8];
     173              : 
     174      2702551 :     fn deref(&self) -> &Self::Target {
     175      2702551 :         self.as_slice()
     176      2702551 :     }
     177              : }
     178              : 
     179              : impl<A: Alignment> DerefMut for AlignedBufferMut<A> {
     180          203 :     fn deref_mut(&mut self) -> &mut Self::Target {
     181          203 :         self.as_mut_slice()
     182          203 :     }
     183              : }
     184              : 
     185              : impl<A: Alignment> AsRef<[u8]> for AlignedBufferMut<A> {
     186            0 :     fn as_ref(&self) -> &[u8] {
     187            0 :         self.as_slice()
     188            0 :     }
     189              : }
     190              : 
     191              : impl<A: Alignment> AsMut<[u8]> for AlignedBufferMut<A> {
     192            0 :     fn as_mut(&mut self) -> &mut [u8] {
     193            0 :         self.as_mut_slice()
     194            0 :     }
     195              : }
     196              : 
     197              : impl<A: Alignment> PartialEq<[u8]> for AlignedBufferMut<A> {
     198          351 :     fn eq(&self, other: &[u8]) -> bool {
     199          351 :         self.as_slice().eq(other)
     200          351 :     }
     201              : }
     202              : 
     203              : /// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
     204              : unsafe impl<A: Alignment> bytes::BufMut for AlignedBufferMut<A> {
     205              :     #[inline]
     206      9124858 :     fn remaining_mut(&self) -> usize {
     207      9124858 :         // Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
     208      9124858 :         // Thus, it can have at most `self.capacity` bytes.
     209      9124858 :         self.capacity() - self.len()
     210      9124858 :     }
     211              : 
     212              :     // SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
     213              :     #[inline]
     214      9065117 :     unsafe fn advance_mut(&mut self, cnt: usize) {
     215      9065117 :         let len = self.len();
     216      9065117 :         let remaining = self.remaining_mut();
     217      9065117 : 
     218      9065117 :         if remaining < cnt {
     219            0 :             panic_advance(cnt, remaining);
     220      9065117 :         }
     221      9065117 : 
     222      9065117 :         // SAFETY: Addition will not overflow since the sum is at most the capacity.
     223      9065117 :         unsafe {
     224      9065117 :             self.set_len(len + cnt);
     225      9065117 :         }
     226      9065117 :     }
     227              : 
     228              :     #[inline]
     229        58589 :     fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
     230        58589 :         let cap = self.capacity();
     231        58589 :         let len = self.len();
     232        58589 : 
     233        58589 :         // SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
     234        58589 :         // valid for `cap - len` bytes. The subtraction will not underflow since
     235        58589 :         // `len <= cap`.
     236        58589 :         unsafe {
     237        58589 :             bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len)
     238        58589 :         }
     239        58589 :     }
     240              : }
     241              : 
     242              : /// Panic with a nice error message.
     243              : #[cold]
     244            0 : fn panic_advance(idx: usize, len: usize) -> ! {
     245            0 :     panic!(
     246            0 :         "advance out of bounds: the len is {} but advancing by {}",
     247            0 :         len, idx
     248            0 :     );
     249              : }
     250              : 
     251              : /// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer,
     252              : /// and the underlying pointer remains stable while io-uring is owning the buffer.
     253              : /// The tokio-epoll-uring crate itself will not resize the buffer and will respect
     254              : /// [`tokio_epoll_uring::IoBuf::bytes_total`].
     255              : unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBufferMut<A> {
     256      1785974 :     fn stable_ptr(&self) -> *const u8 {
     257      1785974 :         self.as_ptr()
     258      1785974 :     }
     259              : 
     260      3712660 :     fn bytes_init(&self) -> usize {
     261      3712660 :         self.len()
     262      3712660 :     }
     263              : 
     264      1417731 :     fn bytes_total(&self) -> usize {
     265      1417731 :         self.capacity()
     266      1417731 :     }
     267              : }
     268              : 
     269              : // SAFETY: See above.
     270              : unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
     271       748147 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     272       748147 :         self.as_mut_ptr()
     273       748147 :     }
     274              : 
     275       503415 :     unsafe fn set_init(&mut self, init_len: usize) {
     276       503415 :         if self.len() < init_len {
     277              :             // SAFETY: caller function is unsafe
     278       391649 :             unsafe {
     279       391649 :                 self.set_len(init_len);
     280       391649 :             }
     281       111766 :         }
     282       503415 :     }
     283              : }
     284              : 
     285              : impl<A: Alignment> std::io::Write for AlignedBufferMut<A> {
     286        45609 :     fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
     287        45609 :         self.extend_from_slice(buf);
     288        45609 :         Ok(buf.len())
     289        45609 :     }
     290              : 
     291            0 :     fn flush(&mut self) -> std::io::Result<()> {
     292            0 :         Ok(())
     293            0 :     }
     294              : }
     295              : 
     296              : #[cfg(test)]
     297              : mod tests {
     298              : 
     299              :     use super::*;
     300              : 
     301              :     const ALIGN: usize = 4 * 1024;
     302              :     type TestIoBufferMut = AlignedBufferMut<ConstAlign<ALIGN>>;
     303              : 
     304              :     #[test]
     305            1 :     fn test_with_capacity() {
     306            1 :         let v = TestIoBufferMut::with_capacity(ALIGN * 4);
     307            1 :         assert_eq!(v.len(), 0);
     308            1 :         assert_eq!(v.capacity(), ALIGN * 4);
     309            1 :         assert_eq!(v.align(), ALIGN);
     310            1 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     311              : 
     312            1 :         let v = TestIoBufferMut::with_capacity(ALIGN / 2);
     313            1 :         assert_eq!(v.len(), 0);
     314            1 :         assert_eq!(v.capacity(), ALIGN / 2);
     315            1 :         assert_eq!(v.align(), ALIGN);
     316            1 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     317            1 :     }
     318              : 
     319              :     #[test]
     320            1 :     fn test_with_capacity_zeroed() {
     321            1 :         let v = TestIoBufferMut::with_capacity_zeroed(ALIGN);
     322            1 :         assert_eq!(v.len(), ALIGN);
     323            1 :         assert_eq!(v.capacity(), ALIGN);
     324            1 :         assert_eq!(v.align(), ALIGN);
     325            1 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     326            1 :         assert_eq!(&v[..], &[0; ALIGN])
     327            1 :     }
     328              : 
     329              :     #[test]
     330            1 :     fn test_reserve() {
     331              :         use bytes::BufMut;
     332            1 :         let mut v = TestIoBufferMut::with_capacity(ALIGN);
     333            1 :         let capacity = v.capacity();
     334            1 :         v.reserve(capacity);
     335            1 :         assert_eq!(v.capacity(), capacity);
     336            1 :         let data = [b'a'; ALIGN];
     337            1 :         v.put(&data[..]);
     338            1 :         v.reserve(capacity);
     339            1 :         assert!(v.capacity() >= capacity * 2);
     340            1 :         assert_eq!(&v[..], &data[..]);
     341            1 :         let capacity = v.capacity();
     342            1 :         v.clear();
     343            1 :         v.reserve(capacity);
     344            1 :         assert_eq!(capacity, v.capacity());
     345            1 :     }
     346              : 
     347              :     #[test]
     348            1 :     fn test_bytes_put() {
     349              :         use bytes::BufMut;
     350            1 :         let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
     351            1 :         let x = [b'a'; ALIGN];
     352              : 
     353            3 :         for _ in 0..2 {
     354           10 :             for _ in 0..4 {
     355            8 :                 v.put(&x[..]);
     356            8 :             }
     357            2 :             assert_eq!(v.len(), ALIGN * 4);
     358            2 :             assert_eq!(v.capacity(), ALIGN * 4);
     359            2 :             assert_eq!(v.align(), ALIGN);
     360            2 :             assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     361            2 :             v.clear()
     362              :         }
     363            1 :         assert_eq!(v.len(), 0);
     364            1 :         assert_eq!(v.capacity(), ALIGN * 4);
     365            1 :         assert_eq!(v.align(), ALIGN);
     366            1 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     367            1 :     }
     368              : 
     369              :     #[test]
     370              :     #[should_panic]
     371            1 :     fn test_bytes_put_panic() {
     372              :         use bytes::BufMut;
     373              :         const ALIGN: usize = 4 * 1024;
     374            1 :         let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
     375            1 :         let x = [b'a'; ALIGN];
     376            5 :         for _ in 0..5 {
     377            4 :             v.put_slice(&x[..]);
     378            4 :         }
     379            1 :     }
     380              : 
     381              :     #[test]
     382            1 :     fn test_io_buf_put_slice() {
     383              :         use tokio_epoll_uring::BoundedBufMut;
     384              :         const ALIGN: usize = 4 * 1024;
     385            1 :         let mut v = TestIoBufferMut::with_capacity(ALIGN);
     386            1 :         let x = [b'a'; ALIGN];
     387              : 
     388            3 :         for _ in 0..2 {
     389            2 :             v.put_slice(&x[..]);
     390            2 :             assert_eq!(v.len(), ALIGN);
     391            2 :             assert_eq!(v.capacity(), ALIGN);
     392            2 :             assert_eq!(v.align(), ALIGN);
     393            2 :             assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     394            2 :             v.clear()
     395              :         }
     396            1 :         assert_eq!(v.len(), 0);
     397            1 :         assert_eq!(v.capacity(), ALIGN);
     398            1 :         assert_eq!(v.align(), ALIGN);
     399            1 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     400            1 :     }
     401              : }
        

Generated by: LCOV version 2.1-beta