LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io/aligned_buffer - buffer_mut.rs (source / functions) Coverage Total Hit
Test: 249f165943bd2c492f96a3f7d250276e4addca1a.info Lines: 90.5 % 190 172
Test Date: 2024-11-20 18:39:52 Functions: 85.5 % 55 47

            Line data    Source code
       1              : use std::ops::{Deref, DerefMut};
       2              : 
       3              : use super::{
       4              :     alignment::{Alignment, ConstAlign},
       5              :     buffer::AlignedBuffer,
       6              :     raw::RawAlignedBuffer,
       7              : };
       8              : 
       9              : /// A mutable aligned buffer type.
      10              : #[derive(Debug)]
      11              : pub struct AlignedBufferMut<A: Alignment> {
      12              :     raw: RawAlignedBuffer<A>,
      13              : }
      14              : 
      15              : impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
      16              :     /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
      17              :     ///
      18              :     /// The buffer will be able to hold at most `capacity` elements and will never resize.
      19              :     ///
      20              :     ///
      21              :     /// # Panics
      22              :     ///
      23              :     /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
      24              :     /// * `align` must not be zero,
      25              :     ///
      26              :     /// * `align` must be a power of two,
      27              :     ///
      28              :     /// * `capacity`, when rounded up to the nearest multiple of `align`,
      29              :     ///    must not overflow isize (i.e., the rounded value must be
      30              :     ///    less than or equal to `isize::MAX`).
      31       996463 :     pub fn with_capacity(capacity: usize) -> Self {
      32       996463 :         AlignedBufferMut {
      33       996463 :             raw: RawAlignedBuffer::with_capacity(capacity),
      34       996463 :         }
      35       996463 :     }
      36              : 
      37              :     /// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
      38          300 :     pub fn with_capacity_zeroed(capacity: usize) -> Self {
      39              :         use bytes::BufMut;
      40          300 :         let mut buf = Self::with_capacity(capacity);
      41          300 :         buf.put_bytes(0, capacity);
      42          300 :         // SAFETY: `put_bytes` filled the entire buffer.
      43          300 :         unsafe { buf.set_len(capacity) };
      44          300 :         buf
      45          300 :     }
      46              : }
      47              : 
      48              : impl<A: Alignment> AlignedBufferMut<A> {
      49              :     /// Returns the total number of bytes the buffer can hold.
      50              :     #[inline]
      51    116788143 :     pub fn capacity(&self) -> usize {
      52    116788143 :         self.raw.capacity()
      53    116788143 :     }
      54              : 
      55              :     /// Returns the alignment of the buffer.
      56              :     #[inline]
      57           18 :     pub fn align(&self) -> usize {
      58           18 :         self.raw.align()
      59           18 :     }
      60              : 
      61              :     /// Returns the number of bytes in the buffer, also referred to as its 'length'.
      62              :     #[inline]
      63    161575343 :     pub fn len(&self) -> usize {
      64    161575343 :         self.raw.len()
      65    161575343 :     }
      66              : 
      67              :     /// Force the length of the buffer to `new_len`.
      68              :     #[inline]
      69     38593715 :     unsafe fn set_len(&mut self, new_len: usize) {
      70     38593715 :         self.raw.set_len(new_len)
      71     38593715 :     }
      72              : 
      73              :     #[inline]
      74      4221240 :     fn as_ptr(&self) -> *const u8 {
      75      4221240 :         self.raw.as_ptr()
      76      4221240 :     }
      77              : 
      78              :     #[inline]
      79     39581933 :     fn as_mut_ptr(&mut self) -> *mut u8 {
      80     39581933 :         self.raw.as_mut_ptr()
      81     39581933 :     }
      82              : 
      83              :     /// Extracts a slice containing the entire buffer.
      84              :     ///
      85              :     /// Equivalent to `&s[..]`.
      86              :     #[inline]
      87      3833604 :     fn as_slice(&self) -> &[u8] {
      88      3833604 :         self.raw.as_slice()
      89      3833604 :     }
      90              : 
      91              :     /// Extracts a mutable slice of the entire buffer.
      92              :     ///
      93              :     /// Equivalent to `&mut s[..]`.
      94            0 :     fn as_mut_slice(&mut self) -> &mut [u8] {
      95            0 :         self.raw.as_mut_slice()
      96            0 :     }
      97              : 
      98              :     /// Drops the all the contents of the buffer, setting its length to `0`.
      99              :     #[inline]
     100           26 :     pub fn clear(&mut self) {
     101           26 :         self.raw.clear()
     102           26 :     }
     103              : 
     104              :     /// Reserves capacity for at least `additional` more bytes to be inserted
     105              :     /// in the given `IoBufferMut`. The collection may reserve more space to
     106              :     /// speculatively avoid frequent reallocations. After calling `reserve`,
     107              :     /// capacity will be greater than or equal to `self.len() + additional`.
     108              :     /// Does nothing if capacity is already sufficient.
     109              :     ///
     110              :     /// # Panics
     111              :     ///
     112              :     /// Panics if the new capacity exceeds `isize::MAX` _bytes_.
     113           22 :     pub fn reserve(&mut self, additional: usize) {
     114           22 :         self.raw.reserve(additional);
     115           22 :     }
     116              : 
     117              :     /// Shortens the buffer, keeping the first len bytes.
     118           10 :     pub fn truncate(&mut self, len: usize) {
     119           10 :         self.raw.truncate(len);
     120           10 :     }
     121              : 
     122              :     /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
     123           88 :     pub fn leak<'a>(self) -> &'a mut [u8] {
     124           88 :         self.raw.leak()
     125           88 :     }
     126              : 
     127          968 :     pub fn freeze(self) -> AlignedBuffer<A> {
     128          968 :         let len = self.len();
     129          968 :         AlignedBuffer::from_raw(self.raw, 0..len)
     130          968 :     }
     131              : }
     132              : 
     133              : impl<A: Alignment> Deref for AlignedBufferMut<A> {
     134              :     type Target = [u8];
     135              : 
     136      3636978 :     fn deref(&self) -> &Self::Target {
     137      3636978 :         self.as_slice()
     138      3636978 :     }
     139              : }
     140              : 
     141              : impl<A: Alignment> DerefMut for AlignedBufferMut<A> {
     142            0 :     fn deref_mut(&mut self) -> &mut Self::Target {
     143            0 :         self.as_mut_slice()
     144            0 :     }
     145              : }
     146              : 
     147              : impl<A: Alignment> AsRef<[u8]> for AlignedBufferMut<A> {
     148            0 :     fn as_ref(&self) -> &[u8] {
     149            0 :         self.as_slice()
     150            0 :     }
     151              : }
     152              : 
     153              : impl<A: Alignment> AsMut<[u8]> for AlignedBufferMut<A> {
     154            0 :     fn as_mut(&mut self) -> &mut [u8] {
     155            0 :         self.as_mut_slice()
     156            0 :     }
     157              : }
     158              : 
     159              : impl<A: Alignment> PartialEq<[u8]> for AlignedBufferMut<A> {
     160       196626 :     fn eq(&self, other: &[u8]) -> bool {
     161       196626 :         self.as_slice().eq(other)
     162       196626 :     }
     163              : }
     164              : 
     165              : /// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
     166              : unsafe impl<A: Alignment> bytes::BufMut for AlignedBufferMut<A> {
     167              :     #[inline]
     168     75382876 :     fn remaining_mut(&self) -> usize {
     169     75382876 :         // Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
     170     75382876 :         // Thus, it can have at most `self.capacity` bytes.
     171     75382876 :         self.capacity() - self.len()
     172     75382876 :     }
     173              : 
     174              :     // SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
     175              :     #[inline]
     176     37691436 :     unsafe fn advance_mut(&mut self, cnt: usize) {
     177     37691436 :         let len = self.len();
     178     37691436 :         let remaining = self.remaining_mut();
     179     37691436 : 
     180     37691436 :         if remaining < cnt {
     181            0 :             panic_advance(cnt, remaining);
     182     37691436 :         }
     183     37691436 : 
     184     37691436 :         // Addition will not overflow since the sum is at most the capacity.
     185     37691436 :         self.set_len(len + cnt);
     186     37691436 :     }
     187              : 
     188              :     #[inline]
     189     37691436 :     fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
     190     37691436 :         let cap = self.capacity();
     191     37691436 :         let len = self.len();
     192     37691436 : 
     193     37691436 :         // SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
     194     37691436 :         // valid for `cap - len` bytes. The subtraction will not underflow since
     195     37691436 :         // `len <= cap`.
     196     37691436 :         unsafe {
     197     37691436 :             bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len)
     198     37691436 :         }
     199     37691436 :     }
     200              : }
     201              : 
     202              : /// Panic with a nice error message.
     203              : #[cold]
     204            0 : fn panic_advance(idx: usize, len: usize) -> ! {
     205            0 :     panic!(
     206            0 :         "advance out of bounds: the len is {} but advancing by {}",
     207            0 :         len, idx
     208            0 :     );
     209              : }
     210              : 
     211              : /// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer,
     212              : /// and the underlying pointer remains stable while io-uring is owning the buffer.
     213              : /// The tokio-epoll-uring crate itself will not resize the buffer and will respect
     214              : /// [`tokio_epoll_uring::IoBuf::bytes_total`].
     215              : unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBufferMut<A> {
     216      4221222 :     fn stable_ptr(&self) -> *const u8 {
     217      4221222 :         self.as_ptr()
     218      4221222 :     }
     219              : 
     220      9181320 :     fn bytes_init(&self) -> usize {
     221      9181320 :         self.len()
     222      9181320 :     }
     223              : 
     224      3501388 :     fn bytes_total(&self) -> usize {
     225      3501388 :         self.capacity()
     226      3501388 :     }
     227              : }
     228              : 
     229              : // SAFETY: See above.
     230              : unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
     231      1890497 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     232      1890497 :         self.as_mut_ptr()
     233      1890497 :     }
     234              : 
     235      1128594 :     unsafe fn set_init(&mut self, init_len: usize) {
     236      1128594 :         if self.len() < init_len {
     237       901979 :             self.set_len(init_len);
     238       901979 :         }
     239      1128594 :     }
     240              : }
     241              : 
     242              : #[cfg(test)]
     243              : mod tests {
     244              : 
     245              :     use super::*;
     246              : 
     247              :     const ALIGN: usize = 4 * 1024;
     248              :     type TestIoBufferMut = AlignedBufferMut<ConstAlign<ALIGN>>;
     249              : 
     250              :     #[test]
     251            2 :     fn test_with_capacity() {
     252            2 :         let v = TestIoBufferMut::with_capacity(ALIGN * 4);
     253            2 :         assert_eq!(v.len(), 0);
     254            2 :         assert_eq!(v.capacity(), ALIGN * 4);
     255            2 :         assert_eq!(v.align(), ALIGN);
     256            2 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     257              : 
     258            2 :         let v = TestIoBufferMut::with_capacity(ALIGN / 2);
     259            2 :         assert_eq!(v.len(), 0);
     260            2 :         assert_eq!(v.capacity(), ALIGN / 2);
     261            2 :         assert_eq!(v.align(), ALIGN);
     262            2 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     263            2 :     }
     264              : 
     265              :     #[test]
     266            2 :     fn test_with_capacity_zeroed() {
     267            2 :         let v = TestIoBufferMut::with_capacity_zeroed(ALIGN);
     268            2 :         assert_eq!(v.len(), ALIGN);
     269            2 :         assert_eq!(v.capacity(), ALIGN);
     270            2 :         assert_eq!(v.align(), ALIGN);
     271            2 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     272            2 :         assert_eq!(&v[..], &[0; ALIGN])
     273            2 :     }
     274              : 
     275              :     #[test]
     276            2 :     fn test_reserve() {
     277              :         use bytes::BufMut;
     278            2 :         let mut v = TestIoBufferMut::with_capacity(ALIGN);
     279            2 :         let capacity = v.capacity();
     280            2 :         v.reserve(capacity);
     281            2 :         assert_eq!(v.capacity(), capacity);
     282            2 :         let data = [b'a'; ALIGN];
     283            2 :         v.put(&data[..]);
     284            2 :         v.reserve(capacity);
     285            2 :         assert!(v.capacity() >= capacity * 2);
     286            2 :         assert_eq!(&v[..], &data[..]);
     287            2 :         let capacity = v.capacity();
     288            2 :         v.clear();
     289            2 :         v.reserve(capacity);
     290            2 :         assert_eq!(capacity, v.capacity());
     291            2 :     }
     292              : 
     293              :     #[test]
     294            2 :     fn test_bytes_put() {
     295              :         use bytes::BufMut;
     296            2 :         let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
     297            2 :         let x = [b'a'; ALIGN];
     298              : 
     299            6 :         for _ in 0..2 {
     300           20 :             for _ in 0..4 {
     301           16 :                 v.put(&x[..]);
     302           16 :             }
     303            4 :             assert_eq!(v.len(), ALIGN * 4);
     304            4 :             assert_eq!(v.capacity(), ALIGN * 4);
     305            4 :             assert_eq!(v.align(), ALIGN);
     306            4 :             assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     307            4 :             v.clear()
     308              :         }
     309            2 :         assert_eq!(v.len(), 0);
     310            2 :         assert_eq!(v.capacity(), ALIGN * 4);
     311            2 :         assert_eq!(v.align(), ALIGN);
     312            2 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     313            2 :     }
     314              : 
     315              :     #[test]
     316              :     #[should_panic]
     317            2 :     fn test_bytes_put_panic() {
     318              :         use bytes::BufMut;
     319              :         const ALIGN: usize = 4 * 1024;
     320            2 :         let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
     321            2 :         let x = [b'a'; ALIGN];
     322           12 :         for _ in 0..5 {
     323           10 :             v.put_slice(&x[..]);
     324           10 :         }
     325            2 :     }
     326              : 
     327              :     #[test]
     328            2 :     fn test_io_buf_put_slice() {
     329              :         use tokio_epoll_uring::BoundedBufMut;
     330              :         const ALIGN: usize = 4 * 1024;
     331            2 :         let mut v = TestIoBufferMut::with_capacity(ALIGN);
     332            2 :         let x = [b'a'; ALIGN];
     333              : 
     334            6 :         for _ in 0..2 {
     335            4 :             v.put_slice(&x[..]);
     336            4 :             assert_eq!(v.len(), ALIGN);
     337            4 :             assert_eq!(v.capacity(), ALIGN);
     338            4 :             assert_eq!(v.align(), ALIGN);
     339            4 :             assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     340            4 :             v.clear()
     341              :         }
     342            2 :         assert_eq!(v.len(), 0);
     343            2 :         assert_eq!(v.capacity(), ALIGN);
     344            2 :         assert_eq!(v.align(), ALIGN);
     345            2 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     346            2 :     }
     347              : }
        

Generated by: LCOV version 2.1-beta