LCOV - code coverage report
Current view: top level - pageserver/src/virtual_file/owned_buffers_io/aligned_buffer - buffer_mut.rs (source / functions) Coverage Total Hit
Test: 2620485e474b48c32427149a5d91ef8fc2cd649e.info Lines: 90.9 % 231 210
Test Date: 2025-05-01 22:50:11 Functions: 85.2 % 61 52

            Line data    Source code
       1              : use std::mem::MaybeUninit;
       2              : use std::ops::{Deref, DerefMut};
       3              : 
       4              : use super::alignment::{Alignment, ConstAlign};
       5              : use super::buffer::AlignedBuffer;
       6              : use super::raw::RawAlignedBuffer;
       7              : 
       8              : /// A mutable aligned buffer type.
       9              : #[derive(Debug)]
      10              : pub struct AlignedBufferMut<A: Alignment> {
      11              :     raw: RawAlignedBuffer<A>,
      12              : }
      13              : 
      14              : impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
      15              :     /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
      16              :     ///
      17              :     /// The buffer will be able to hold at most `capacity` elements and will never resize.
      18              :     ///
      19              :     ///
      20              :     /// # Panics
      21              :     ///
      22              :     /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
      23              :     /// * `align` must not be zero,
      24              :     ///
      25              :     /// * `align` must be a power of two,
      26              :     ///
      27              :     /// * `capacity`, when rounded up to the nearest multiple of `align`,
      28              :     ///   must not overflow isize (i.e., the rounded value must be
      29              :     ///   less than or equal to `isize::MAX`).
      30      4863527 :     pub fn with_capacity(capacity: usize) -> Self {
      31      4863527 :         AlignedBufferMut {
      32      4863527 :             raw: RawAlignedBuffer::with_capacity(capacity),
      33      4863527 :         }
      34      4863527 :     }
      35              : 
      36              :     /// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
      37         1872 :     pub fn with_capacity_zeroed(capacity: usize) -> Self {
      38              :         use bytes::BufMut;
      39         1872 :         let mut buf = Self::with_capacity(capacity);
      40         1872 :         buf.put_bytes(0, capacity);
      41         1872 :         // SAFETY: `put_bytes` filled the entire buffer.
      42         1872 :         unsafe { buf.set_len(capacity) };
      43         1872 :         buf
      44         1872 :     }
      45              : }
      46              : 
      47              : impl<A: Alignment> AlignedBufferMut<A> {
      48              :     /// Constructs a mutable aligned buffer from raw.
      49       141660 :     pub(super) fn from_raw(raw: RawAlignedBuffer<A>) -> Self {
      50       141660 :         AlignedBufferMut { raw }
      51       141660 :     }
      52              : 
      53              :     /// Returns the total number of bytes the buffer can hold.
      54              :     #[inline]
      55    559817578 :     pub fn capacity(&self) -> usize {
      56    559817578 :         self.raw.capacity()
      57    559817578 :     }
      58              : 
      59              :     /// Returns the alignment of the buffer.
      60              :     #[inline]
      61          132 :     pub fn align(&self) -> usize {
      62          132 :         self.raw.align()
      63          132 :     }
      64              : 
      65              :     /// Returns the number of bytes in the buffer, also referred to as its 'length'.
      66              :     #[inline]
      67    817432674 :     pub fn len(&self) -> usize {
      68    817432674 :         self.raw.len()
      69    817432674 :     }
      70              : 
      71              :     /// Force the length of the buffer to `new_len`.
      72              :     #[inline]
      73    113515287 :     unsafe fn set_len(&mut self, new_len: usize) {
      74    113515287 :         // SAFETY: the caller is unsafe
      75    113515287 :         unsafe { self.raw.set_len(new_len) }
      76    113515287 :     }
      77              : 
      78              :     #[inline]
      79     21527992 :     fn as_ptr(&self) -> *const u8 {
      80     21527992 :         self.raw.as_ptr()
      81     21527992 :     }
      82              : 
      83              :     #[inline]
      84    119376003 :     fn as_mut_ptr(&mut self) -> *mut u8 {
      85    119376003 :         self.raw.as_mut_ptr()
      86    119376003 :     }
      87              : 
      88              :     /// Extracts a slice containing the entire buffer.
      89              :     ///
      90              :     /// Equivalent to `&s[..]`.
      91              :     #[inline]
      92     32504699 :     fn as_slice(&self) -> &[u8] {
      93     32504699 :         self.raw.as_slice()
      94     32504699 :     }
      95              : 
      96              :     /// Extracts a mutable slice of the entire buffer.
      97              :     ///
      98              :     /// Equivalent to `&mut s[..]`.
      99            0 :     fn as_mut_slice(&mut self) -> &mut [u8] {
     100            0 :         self.raw.as_mut_slice()
     101            0 :     }
     102              : 
     103              :     /// Drops the all the contents of the buffer, setting its length to `0`.
     104              :     #[inline]
     105       141816 :     pub fn clear(&mut self) {
     106       141816 :         self.raw.clear()
     107       141816 :     }
     108              : 
     109              :     /// Reserves capacity for at least `additional` more bytes to be inserted
     110              :     /// in the given `IoBufferMut`. The collection may reserve more space to
     111              :     /// speculatively avoid frequent reallocations. After calling `reserve`,
     112              :     /// capacity will be greater than or equal to `self.len() + additional`.
     113              :     /// Does nothing if capacity is already sufficient.
     114              :     ///
     115              :     /// # Panics
     116              :     ///
     117              :     /// Panics if the new capacity exceeds `isize::MAX` _bytes_.
     118    108078564 :     pub fn reserve(&mut self, additional: usize) {
     119    108078564 :         self.raw.reserve(additional);
     120    108078564 :     }
     121              : 
     122              :     /// Shortens the buffer, keeping the first len bytes.
     123           60 :     pub fn truncate(&mut self, len: usize) {
     124           60 :         self.raw.truncate(len);
     125           60 :     }
     126              : 
     127              :     /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
     128          600 :     pub fn leak<'a>(self) -> &'a mut [u8] {
     129          600 :         self.raw.leak()
     130          600 :     }
     131              : 
     132       265468 :     pub fn freeze(self) -> AlignedBuffer<A> {
     133       265468 :         let len = self.len();
     134       265468 :         AlignedBuffer::from_raw(self.raw, 0..len)
     135       265468 :     }
     136              : 
     137              :     /// Clones and appends all elements in a slice to the buffer. Reserves additional capacity as needed.
     138              :     #[inline]
     139    108078432 :     pub fn extend_from_slice(&mut self, extend: &[u8]) {
     140    108078432 :         let cnt = extend.len();
     141    108078432 :         self.reserve(cnt);
     142    108078432 : 
     143    108078432 :         // SAFETY: we already reserved additional `cnt` bytes, safe to perform memcpy.
     144    108078432 :         unsafe {
     145    108078432 :             let dst = self.spare_capacity_mut();
     146    108078432 :             // Reserved above
     147    108078432 :             debug_assert!(dst.len() >= cnt);
     148              : 
     149    108078432 :             core::ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt);
     150    108078432 :         }
     151    108078432 :         // SAFETY: We do have at least `cnt` bytes remaining before advance.
     152    108078432 :         unsafe {
     153    108078432 :             bytes::BufMut::advance_mut(self, cnt);
     154    108078432 :         }
     155    108078432 :     }
     156              : 
     157              :     /// Returns the remaining spare capacity of the vector as a slice of `MaybeUninit<u8>`.
     158              :     #[inline]
     159    108078432 :     fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
     160    108078432 :         // SAFETY: we guarantees that the `Self::capacity()` bytes from
     161    108078432 :         // `Self::as_mut_ptr()` are allocated.
     162    108078432 :         unsafe {
     163    108078432 :             let ptr = self.as_mut_ptr().add(self.len());
     164    108078432 :             let len = self.capacity() - self.len();
     165    108078432 : 
     166    108078432 :             core::slice::from_raw_parts_mut(ptr.cast(), len)
     167    108078432 :         }
     168    108078432 :     }
     169              : }
     170              : 
     171              : impl<A: Alignment> Deref for AlignedBufferMut<A> {
     172              :     type Target = [u8];
     173              : 
     174     32500487 :     fn deref(&self) -> &Self::Target {
     175     32500487 :         self.as_slice()
     176     32500487 :     }
     177              : }
     178              : 
     179              : impl<A: Alignment> DerefMut for AlignedBufferMut<A> {
     180            0 :     fn deref_mut(&mut self) -> &mut Self::Target {
     181            0 :         self.as_mut_slice()
     182            0 :     }
     183              : }
     184              : 
     185              : impl<A: Alignment> AsRef<[u8]> for AlignedBufferMut<A> {
     186            0 :     fn as_ref(&self) -> &[u8] {
     187            0 :         self.as_slice()
     188            0 :     }
     189              : }
     190              : 
     191              : impl<A: Alignment> AsMut<[u8]> for AlignedBufferMut<A> {
     192            0 :     fn as_mut(&mut self) -> &mut [u8] {
     193            0 :         self.as_mut_slice()
     194            0 :     }
     195              : }
     196              : 
     197              : impl<A: Alignment> PartialEq<[u8]> for AlignedBufferMut<A> {
     198         4212 :     fn eq(&self, other: &[u8]) -> bool {
     199         4212 :         self.as_slice().eq(other)
     200         4212 :     }
     201              : }
     202              : 
     203              : /// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
     204              : unsafe impl<A: Alignment> bytes::BufMut for AlignedBufferMut<A> {
     205              :     #[inline]
     206    109489693 :     fn remaining_mut(&self) -> usize {
     207    109489693 :         // Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
     208    109489693 :         // Thus, it can have at most `self.capacity` bytes.
     209    109489693 :         self.capacity() - self.len()
     210    109489693 :     }
     211              : 
     212              :     // SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
     213              :     #[inline]
     214    108777137 :     unsafe fn advance_mut(&mut self, cnt: usize) {
     215    108777137 :         let len = self.len();
     216    108777137 :         let remaining = self.remaining_mut();
     217    108777137 : 
     218    108777137 :         if remaining < cnt {
     219            0 :             panic_advance(cnt, remaining);
     220    108777137 :         }
     221    108777137 : 
     222    108777137 :         // SAFETY: Addition will not overflow since the sum is at most the capacity.
     223    108777137 :         unsafe {
     224    108777137 :             self.set_len(len + cnt);
     225    108777137 :         }
     226    108777137 :     }
     227              : 
     228              :     #[inline]
     229       698705 :     fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
     230       698705 :         let cap = self.capacity();
     231       698705 :         let len = self.len();
     232       698705 : 
     233       698705 :         // SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
     234       698705 :         // valid for `cap - len` bytes. The subtraction will not underflow since
     235       698705 :         // `len <= cap`.
     236       698705 :         unsafe {
     237       698705 :             bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len)
     238       698705 :         }
     239       698705 :     }
     240              : }
     241              : 
     242              : /// Panic with a nice error message.
     243              : #[cold]
     244            0 : fn panic_advance(idx: usize, len: usize) -> ! {
     245            0 :     panic!(
     246            0 :         "advance out of bounds: the len is {} but advancing by {}",
     247            0 :         len, idx
     248            0 :     );
     249              : }
     250              : 
     251              : /// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer,
     252              : /// and the underlying pointer remains stable while io-uring is owning the buffer.
     253              : /// The tokio-epoll-uring crate itself will not resize the buffer and will respect
     254              : /// [`tokio_epoll_uring::IoBuf::bytes_total`].
     255              : unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBufferMut<A> {
     256     21527884 :     fn stable_ptr(&self) -> *const u8 {
     257     21527884 :         self.as_ptr()
     258     21527884 :     }
     259              : 
     260     46330829 :     fn bytes_init(&self) -> usize {
     261     46330829 :         self.len()
     262     46330829 :     }
     263              : 
     264     17075715 :     fn bytes_total(&self) -> usize {
     265     17075715 :         self.capacity()
     266     17075715 :     }
     267              : }
     268              : 
     269              : // SAFETY: See above.
     270              : unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
     271     10598866 :     fn stable_mut_ptr(&mut self) -> *mut u8 {
     272     10598866 :         self.as_mut_ptr()
     273     10598866 :     }
     274              : 
     275      6075058 :     unsafe fn set_init(&mut self, init_len: usize) {
     276      6075058 :         if self.len() < init_len {
     277              :             // SAFETY: caller function is unsafe
     278      4736278 :             unsafe {
     279      4736278 :                 self.set_len(init_len);
     280      4736278 :             }
     281      1338780 :         }
     282      6075058 :     }
     283              : }
     284              : 
     285              : impl<A: Alignment> std::io::Write for AlignedBufferMut<A> {
     286       547308 :     fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
     287       547308 :         self.extend_from_slice(buf);
     288       547308 :         Ok(buf.len())
     289       547308 :     }
     290              : 
     291            0 :     fn flush(&mut self) -> std::io::Result<()> {
     292            0 :         Ok(())
     293            0 :     }
     294              : }
     295              : 
     296              : #[cfg(test)]
     297              : mod tests {
     298              : 
     299              :     use super::*;
     300              : 
     301              :     const ALIGN: usize = 4 * 1024;
     302              :     type TestIoBufferMut = AlignedBufferMut<ConstAlign<ALIGN>>;
     303              : 
     304              :     #[test]
     305           12 :     fn test_with_capacity() {
     306           12 :         let v = TestIoBufferMut::with_capacity(ALIGN * 4);
     307           12 :         assert_eq!(v.len(), 0);
     308           12 :         assert_eq!(v.capacity(), ALIGN * 4);
     309           12 :         assert_eq!(v.align(), ALIGN);
     310           12 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     311              : 
     312           12 :         let v = TestIoBufferMut::with_capacity(ALIGN / 2);
     313           12 :         assert_eq!(v.len(), 0);
     314           12 :         assert_eq!(v.capacity(), ALIGN / 2);
     315           12 :         assert_eq!(v.align(), ALIGN);
     316           12 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     317           12 :     }
     318              : 
     319              :     #[test]
     320           12 :     fn test_with_capacity_zeroed() {
     321           12 :         let v = TestIoBufferMut::with_capacity_zeroed(ALIGN);
     322           12 :         assert_eq!(v.len(), ALIGN);
     323           12 :         assert_eq!(v.capacity(), ALIGN);
     324           12 :         assert_eq!(v.align(), ALIGN);
     325           12 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     326           12 :         assert_eq!(&v[..], &[0; ALIGN])
     327           12 :     }
     328              : 
     329              :     #[test]
     330           12 :     fn test_reserve() {
     331              :         use bytes::BufMut;
     332           12 :         let mut v = TestIoBufferMut::with_capacity(ALIGN);
     333           12 :         let capacity = v.capacity();
     334           12 :         v.reserve(capacity);
     335           12 :         assert_eq!(v.capacity(), capacity);
     336           12 :         let data = [b'a'; ALIGN];
     337           12 :         v.put(&data[..]);
     338           12 :         v.reserve(capacity);
     339           12 :         assert!(v.capacity() >= capacity * 2);
     340           12 :         assert_eq!(&v[..], &data[..]);
     341           12 :         let capacity = v.capacity();
     342           12 :         v.clear();
     343           12 :         v.reserve(capacity);
     344           12 :         assert_eq!(capacity, v.capacity());
     345           12 :     }
     346              : 
     347              :     #[test]
     348           12 :     fn test_bytes_put() {
     349              :         use bytes::BufMut;
     350           12 :         let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
     351           12 :         let x = [b'a'; ALIGN];
     352              : 
     353           36 :         for _ in 0..2 {
     354          120 :             for _ in 0..4 {
     355           96 :                 v.put(&x[..]);
     356           96 :             }
     357           24 :             assert_eq!(v.len(), ALIGN * 4);
     358           24 :             assert_eq!(v.capacity(), ALIGN * 4);
     359           24 :             assert_eq!(v.align(), ALIGN);
     360           24 :             assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     361           24 :             v.clear()
     362              :         }
     363           12 :         assert_eq!(v.len(), 0);
     364           12 :         assert_eq!(v.capacity(), ALIGN * 4);
     365           12 :         assert_eq!(v.align(), ALIGN);
     366           12 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     367           12 :     }
     368              : 
     369              :     #[test]
     370              :     #[should_panic]
     371           12 :     fn test_bytes_put_panic() {
     372              :         use bytes::BufMut;
     373              :         const ALIGN: usize = 4 * 1024;
     374           12 :         let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
     375           12 :         let x = [b'a'; ALIGN];
     376           60 :         for _ in 0..5 {
     377           48 :             v.put_slice(&x[..]);
     378           48 :         }
     379           12 :     }
     380              : 
     381              :     #[test]
     382           12 :     fn test_io_buf_put_slice() {
     383              :         use tokio_epoll_uring::BoundedBufMut;
     384              :         const ALIGN: usize = 4 * 1024;
     385           12 :         let mut v = TestIoBufferMut::with_capacity(ALIGN);
     386           12 :         let x = [b'a'; ALIGN];
     387              : 
     388           36 :         for _ in 0..2 {
     389           24 :             v.put_slice(&x[..]);
     390           24 :             assert_eq!(v.len(), ALIGN);
     391           24 :             assert_eq!(v.capacity(), ALIGN);
     392           24 :             assert_eq!(v.align(), ALIGN);
     393           24 :             assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     394           24 :             v.clear()
     395              :         }
     396           12 :         assert_eq!(v.len(), 0);
     397           12 :         assert_eq!(v.capacity(), ALIGN);
     398           12 :         assert_eq!(v.align(), ALIGN);
     399           12 :         assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
     400           12 :     }
     401              : }
        

Generated by: LCOV version 2.1-beta