Line data Source code
1 : use std::{
2 : mem::MaybeUninit,
3 : ops::{Deref, DerefMut},
4 : };
5 :
6 : use super::{
7 : alignment::{Alignment, ConstAlign},
8 : buffer::AlignedBuffer,
9 : raw::RawAlignedBuffer,
10 : };
11 :
12 : /// A mutable aligned buffer type.
13 : #[derive(Debug)]
14 : pub struct AlignedBufferMut<A: Alignment> {
15 : raw: RawAlignedBuffer<A>,
16 : }
17 :
18 : impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
19 : /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
20 : ///
21 : /// The buffer will be able to hold at most `capacity` elements and will never resize.
22 : ///
23 : ///
24 : /// # Panics
25 : ///
26 : /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
27 : /// * `align` must not be zero,
28 : ///
29 : /// * `align` must be a power of two,
30 : ///
31 : /// * `capacity`, when rounded up to the nearest multiple of `align`,
32 : /// must not overflow isize (i.e., the rounded value must be
33 : /// less than or equal to `isize::MAX`).
34 1440396 : pub fn with_capacity(capacity: usize) -> Self {
35 1440396 : AlignedBufferMut {
36 1440396 : raw: RawAlignedBuffer::with_capacity(capacity),
37 1440396 : }
38 1440396 : }
39 :
40 : /// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
41 608 : pub fn with_capacity_zeroed(capacity: usize) -> Self {
42 : use bytes::BufMut;
43 608 : let mut buf = Self::with_capacity(capacity);
44 608 : buf.put_bytes(0, capacity);
45 608 : // SAFETY: `put_bytes` filled the entire buffer.
46 608 : unsafe { buf.set_len(capacity) };
47 608 : buf
48 608 : }
49 : }
50 :
51 : impl<A: Alignment> AlignedBufferMut<A> {
52 : /// Constructs a mutable aligned buffer from raw.
53 13246 : pub(super) fn from_raw(raw: RawAlignedBuffer<A>) -> Self {
54 13246 : AlignedBufferMut { raw }
55 13246 : }
56 :
57 : /// Returns the total number of bytes the buffer can hold.
58 : #[inline]
59 53703635 : pub fn capacity(&self) -> usize {
60 53703635 : self.raw.capacity()
61 53703635 : }
62 :
63 : /// Returns the alignment of the buffer.
64 : #[inline]
65 44 : pub fn align(&self) -> usize {
66 44 : self.raw.align()
67 44 : }
68 :
69 : /// Returns the number of bytes in the buffer, also referred to as its 'length'.
70 : #[inline]
71 85422780 : pub fn len(&self) -> usize {
72 85422780 : self.raw.len()
73 85422780 : }
74 :
75 : /// Force the length of the buffer to `new_len`.
76 : #[inline]
77 11062530 : unsafe fn set_len(&mut self, new_len: usize) {
78 11062530 : self.raw.set_len(new_len)
79 11062530 : }
80 :
81 : #[inline]
82 6569751 : fn as_ptr(&self) -> *const u8 {
83 6569751 : self.raw.as_ptr()
84 6569751 : }
85 :
86 : #[inline]
87 12940663 : fn as_mut_ptr(&mut self) -> *mut u8 {
88 12940663 : self.raw.as_mut_ptr()
89 12940663 : }
90 :
91 : /// Extracts a slice containing the entire buffer.
92 : ///
93 : /// Equivalent to `&s[..]`.
94 : #[inline]
95 8367853 : fn as_slice(&self) -> &[u8] {
96 8367853 : self.raw.as_slice()
97 8367853 : }
98 :
99 : /// Extracts a mutable slice of the entire buffer.
100 : ///
101 : /// Equivalent to `&mut s[..]`.
102 0 : fn as_mut_slice(&mut self) -> &mut [u8] {
103 0 : self.raw.as_mut_slice()
104 0 : }
105 :
106 : /// Drops the all the contents of the buffer, setting its length to `0`.
107 : #[inline]
108 13298 : pub fn clear(&mut self) {
109 13298 : self.raw.clear()
110 13298 : }
111 :
112 : /// Reserves capacity for at least `additional` more bytes to be inserted
113 : /// in the given `IoBufferMut`. The collection may reserve more space to
114 : /// speculatively avoid frequent reallocations. After calling `reserve`,
115 : /// capacity will be greater than or equal to `self.len() + additional`.
116 : /// Does nothing if capacity is already sufficient.
117 : ///
118 : /// # Panics
119 : ///
120 : /// Panics if the new capacity exceeds `isize::MAX` _bytes_.
121 9623128 : pub fn reserve(&mut self, additional: usize) {
122 9623128 : self.raw.reserve(additional);
123 9623128 : }
124 :
125 : /// Shortens the buffer, keeping the first len bytes.
126 20 : pub fn truncate(&mut self, len: usize) {
127 20 : self.raw.truncate(len);
128 20 : }
129 :
130 : /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
131 184 : pub fn leak<'a>(self) -> &'a mut [u8] {
132 184 : self.raw.leak()
133 184 : }
134 :
135 17813 : pub fn freeze(self) -> AlignedBuffer<A> {
136 17813 : let len = self.len();
137 17813 : AlignedBuffer::from_raw(self.raw, 0..len)
138 17813 : }
139 :
140 : /// Clones and appends all elements in a slice to the buffer. Reserves additional capacity as needed.
141 : #[inline]
142 9623084 : pub fn extend_from_slice(&mut self, extend: &[u8]) {
143 9623084 : let cnt = extend.len();
144 9623084 : self.reserve(cnt);
145 9623084 :
146 9623084 : // SAFETY: we already reserved additional `cnt` bytes, safe to perform memcpy.
147 9623084 : unsafe {
148 9623084 : let dst = self.spare_capacity_mut();
149 9623084 : // Reserved above
150 9623084 : debug_assert!(dst.len() >= cnt);
151 :
152 9623084 : core::ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt);
153 9623084 : }
154 9623084 : // SAFETY: We do have at least `cnt` bytes remaining before advance.
155 9623084 : unsafe {
156 9623084 : bytes::BufMut::advance_mut(self, cnt);
157 9623084 : }
158 9623084 : }
159 :
160 : /// Returns the remaining spare capacity of the vector as a slice of `MaybeUninit<u8>`.
161 : #[inline]
162 9623084 : fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
163 9623084 : // SAFETY: we guarantees that the `Self::capacity()` bytes from
164 9623084 : // `Self::as_mut_ptr()` are allocated.
165 9623084 : unsafe {
166 9623084 : let ptr = self.as_mut_ptr().add(self.len());
167 9623084 : let len = self.capacity() - self.len();
168 9623084 :
169 9623084 : core::slice::from_raw_parts_mut(ptr.cast(), len)
170 9623084 : }
171 9623084 : }
172 : }
173 :
174 : impl<A: Alignment> Deref for AlignedBufferMut<A> {
175 : type Target = [u8];
176 :
177 8366445 : fn deref(&self) -> &Self::Target {
178 8366445 : self.as_slice()
179 8366445 : }
180 : }
181 :
182 : impl<A: Alignment> DerefMut for AlignedBufferMut<A> {
183 0 : fn deref_mut(&mut self) -> &mut Self::Target {
184 0 : self.as_mut_slice()
185 0 : }
186 : }
187 :
188 : impl<A: Alignment> AsRef<[u8]> for AlignedBufferMut<A> {
189 0 : fn as_ref(&self) -> &[u8] {
190 0 : self.as_slice()
191 0 : }
192 : }
193 :
194 : impl<A: Alignment> AsMut<[u8]> for AlignedBufferMut<A> {
195 0 : fn as_mut(&mut self) -> &mut [u8] {
196 0 : self.as_mut_slice()
197 0 : }
198 : }
199 :
200 : impl<A: Alignment> PartialEq<[u8]> for AlignedBufferMut<A> {
201 1408 : fn eq(&self, other: &[u8]) -> bool {
202 1408 : self.as_slice().eq(other)
203 1408 : }
204 : }
205 :
206 : /// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
207 : unsafe impl<A: Alignment> bytes::BufMut for AlignedBufferMut<A> {
208 : #[inline]
209 9624412 : fn remaining_mut(&self) -> usize {
210 9624412 : // Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
211 9624412 : // Thus, it can have at most `self.capacity` bytes.
212 9624412 : self.capacity() - self.len()
213 9624412 : }
214 :
215 : // SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
216 : #[inline]
217 9623744 : unsafe fn advance_mut(&mut self, cnt: usize) {
218 9623744 : let len = self.len();
219 9623744 : let remaining = self.remaining_mut();
220 9623744 :
221 9623744 : if remaining < cnt {
222 0 : panic_advance(cnt, remaining);
223 9623744 : }
224 9623744 :
225 9623744 : // Addition will not overflow since the sum is at most the capacity.
226 9623744 : self.set_len(len + cnt);
227 9623744 : }
228 :
229 : #[inline]
230 660 : fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
231 660 : let cap = self.capacity();
232 660 : let len = self.len();
233 660 :
234 660 : // SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
235 660 : // valid for `cap - len` bytes. The subtraction will not underflow since
236 660 : // `len <= cap`.
237 660 : unsafe {
238 660 : bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len)
239 660 : }
240 660 : }
241 : }
242 :
243 : /// Panic with a nice error message.
244 : #[cold]
245 0 : fn panic_advance(idx: usize, len: usize) -> ! {
246 0 : panic!(
247 0 : "advance out of bounds: the len is {} but advancing by {}",
248 0 : len, idx
249 0 : );
250 : }
251 :
252 : /// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer,
253 : /// and the underlying pointer remains stable while io-uring is owning the buffer.
254 : /// The tokio-epoll-uring crate itself will not resize the buffer and will respect
255 : /// [`tokio_epoll_uring::IoBuf::bytes_total`].
256 : unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBufferMut<A> {
257 6569715 : fn stable_ptr(&self) -> *const u8 {
258 6569715 : self.as_ptr()
259 6569715 : }
260 :
261 14135349 : fn bytes_init(&self) -> usize {
262 14135349 : self.len()
263 14135349 : }
264 :
265 5129778 : fn bytes_total(&self) -> usize {
266 5129778 : self.capacity()
267 5129778 : }
268 : }
269 :
270 : // SAFETY: See above.
271 : unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
272 3316919 : fn stable_mut_ptr(&mut self) -> *mut u8 {
273 3316919 : self.as_mut_ptr()
274 3316919 : }
275 :
276 1884438 : unsafe fn set_init(&mut self, init_len: usize) {
277 1884438 : if self.len() < init_len {
278 1438178 : self.set_len(init_len);
279 1438178 : }
280 1884438 : }
281 : }
282 :
283 : #[cfg(test)]
284 : mod tests {
285 :
286 : use super::*;
287 :
288 : const ALIGN: usize = 4 * 1024;
289 : type TestIoBufferMut = AlignedBufferMut<ConstAlign<ALIGN>>;
290 :
291 : #[test]
292 4 : fn test_with_capacity() {
293 4 : let v = TestIoBufferMut::with_capacity(ALIGN * 4);
294 4 : assert_eq!(v.len(), 0);
295 4 : assert_eq!(v.capacity(), ALIGN * 4);
296 4 : assert_eq!(v.align(), ALIGN);
297 4 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
298 :
299 4 : let v = TestIoBufferMut::with_capacity(ALIGN / 2);
300 4 : assert_eq!(v.len(), 0);
301 4 : assert_eq!(v.capacity(), ALIGN / 2);
302 4 : assert_eq!(v.align(), ALIGN);
303 4 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
304 4 : }
305 :
306 : #[test]
307 4 : fn test_with_capacity_zeroed() {
308 4 : let v = TestIoBufferMut::with_capacity_zeroed(ALIGN);
309 4 : assert_eq!(v.len(), ALIGN);
310 4 : assert_eq!(v.capacity(), ALIGN);
311 4 : assert_eq!(v.align(), ALIGN);
312 4 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
313 4 : assert_eq!(&v[..], &[0; ALIGN])
314 4 : }
315 :
316 : #[test]
317 4 : fn test_reserve() {
318 : use bytes::BufMut;
319 4 : let mut v = TestIoBufferMut::with_capacity(ALIGN);
320 4 : let capacity = v.capacity();
321 4 : v.reserve(capacity);
322 4 : assert_eq!(v.capacity(), capacity);
323 4 : let data = [b'a'; ALIGN];
324 4 : v.put(&data[..]);
325 4 : v.reserve(capacity);
326 4 : assert!(v.capacity() >= capacity * 2);
327 4 : assert_eq!(&v[..], &data[..]);
328 4 : let capacity = v.capacity();
329 4 : v.clear();
330 4 : v.reserve(capacity);
331 4 : assert_eq!(capacity, v.capacity());
332 4 : }
333 :
334 : #[test]
335 4 : fn test_bytes_put() {
336 : use bytes::BufMut;
337 4 : let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
338 4 : let x = [b'a'; ALIGN];
339 :
340 12 : for _ in 0..2 {
341 40 : for _ in 0..4 {
342 32 : v.put(&x[..]);
343 32 : }
344 8 : assert_eq!(v.len(), ALIGN * 4);
345 8 : assert_eq!(v.capacity(), ALIGN * 4);
346 8 : assert_eq!(v.align(), ALIGN);
347 8 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
348 8 : v.clear()
349 : }
350 4 : assert_eq!(v.len(), 0);
351 4 : assert_eq!(v.capacity(), ALIGN * 4);
352 4 : assert_eq!(v.align(), ALIGN);
353 4 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
354 4 : }
355 :
356 : #[test]
357 : #[should_panic]
358 4 : fn test_bytes_put_panic() {
359 : use bytes::BufMut;
360 : const ALIGN: usize = 4 * 1024;
361 4 : let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
362 4 : let x = [b'a'; ALIGN];
363 24 : for _ in 0..5 {
364 20 : v.put_slice(&x[..]);
365 20 : }
366 4 : }
367 :
368 : #[test]
369 4 : fn test_io_buf_put_slice() {
370 : use tokio_epoll_uring::BoundedBufMut;
371 : const ALIGN: usize = 4 * 1024;
372 4 : let mut v = TestIoBufferMut::with_capacity(ALIGN);
373 4 : let x = [b'a'; ALIGN];
374 :
375 12 : for _ in 0..2 {
376 8 : v.put_slice(&x[..]);
377 8 : assert_eq!(v.len(), ALIGN);
378 8 : assert_eq!(v.capacity(), ALIGN);
379 8 : assert_eq!(v.align(), ALIGN);
380 8 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
381 8 : v.clear()
382 : }
383 4 : assert_eq!(v.len(), 0);
384 4 : assert_eq!(v.capacity(), ALIGN);
385 4 : assert_eq!(v.align(), ALIGN);
386 4 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
387 4 : }
388 : }