Line data Source code
1 : use std::{
2 : mem::MaybeUninit,
3 : ops::{Deref, DerefMut},
4 : };
5 :
6 : use super::{
7 : alignment::{Alignment, ConstAlign},
8 : buffer::AlignedBuffer,
9 : raw::RawAlignedBuffer,
10 : };
11 :
12 : /// A mutable aligned buffer type.
13 : #[derive(Debug)]
14 : pub struct AlignedBufferMut<A: Alignment> {
15 : raw: RawAlignedBuffer<A>,
16 : }
17 :
18 : impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
19 : /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
20 : ///
21 : /// The buffer will be able to hold at most `capacity` elements and will never resize.
22 : ///
23 : ///
24 : /// # Panics
25 : ///
26 : /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
27 : /// * `align` must not be zero,
28 : ///
29 : /// * `align` must be a power of two,
30 : ///
31 : /// * `capacity`, when rounded up to the nearest multiple of `align`,
32 : /// must not overflow isize (i.e., the rounded value must be
33 : /// less than or equal to `isize::MAX`).
34 804879 : pub fn with_capacity(capacity: usize) -> Self {
35 804879 : AlignedBufferMut {
36 804879 : raw: RawAlignedBuffer::with_capacity(capacity),
37 804879 : }
38 804879 : }
39 :
40 : /// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
41 304 : pub fn with_capacity_zeroed(capacity: usize) -> Self {
42 : use bytes::BufMut;
43 304 : let mut buf = Self::with_capacity(capacity);
44 304 : buf.put_bytes(0, capacity);
45 304 : // SAFETY: `put_bytes` filled the entire buffer.
46 304 : unsafe { buf.set_len(capacity) };
47 304 : buf
48 304 : }
49 : }
50 :
51 : impl<A: Alignment> AlignedBufferMut<A> {
52 : /// Constructs a mutable aligned buffer from raw.
53 6619 : pub(super) fn from_raw(raw: RawAlignedBuffer<A>) -> Self {
54 6619 : AlignedBufferMut { raw }
55 6619 : }
56 :
57 : /// Returns the total number of bytes the buffer can hold.
58 : #[inline]
59 26811621 : pub fn capacity(&self) -> usize {
60 26811621 : self.raw.capacity()
61 26811621 : }
62 :
63 : /// Returns the alignment of the buffer.
64 : #[inline]
65 22 : pub fn align(&self) -> usize {
66 22 : self.raw.align()
67 22 : }
68 :
69 : /// Returns the number of bytes in the buffer, also referred to as its 'length'.
70 : #[inline]
71 42634472 : pub fn len(&self) -> usize {
72 42634472 : self.raw.len()
73 42634472 : }
74 :
75 : /// Force the length of the buffer to `new_len`.
76 : #[inline]
77 5519655 : unsafe fn set_len(&mut self, new_len: usize) {
78 5519655 : self.raw.set_len(new_len)
79 5519655 : }
80 :
81 : #[inline]
82 3253050 : fn as_ptr(&self) -> *const u8 {
83 3253050 : self.raw.as_ptr()
84 3253050 : }
85 :
86 : #[inline]
87 6458195 : fn as_mut_ptr(&mut self) -> *mut u8 {
88 6458195 : self.raw.as_mut_ptr()
89 6458195 : }
90 :
91 : /// Extracts a slice containing the entire buffer.
92 : ///
93 : /// Equivalent to `&s[..]`.
94 : #[inline]
95 4138554 : fn as_slice(&self) -> &[u8] {
96 4138554 : self.raw.as_slice()
97 4138554 : }
98 :
99 : /// Extracts a mutable slice of the entire buffer.
100 : ///
101 : /// Equivalent to `&mut s[..]`.
102 0 : fn as_mut_slice(&mut self) -> &mut [u8] {
103 0 : self.raw.as_mut_slice()
104 0 : }
105 :
106 : /// Drops the all the contents of the buffer, setting its length to `0`.
107 : #[inline]
108 6645 : pub fn clear(&mut self) {
109 6645 : self.raw.clear()
110 6645 : }
111 :
112 : /// Reserves capacity for at least `additional` more bytes to be inserted
113 : /// in the given `IoBufferMut`. The collection may reserve more space to
114 : /// speculatively avoid frequent reallocations. After calling `reserve`,
115 : /// capacity will be greater than or equal to `self.len() + additional`.
116 : /// Does nothing if capacity is already sufficient.
117 : ///
118 : /// # Panics
119 : ///
120 : /// Panics if the new capacity exceeds `isize::MAX` _bytes_.
121 4811514 : pub fn reserve(&mut self, additional: usize) {
122 4811514 : self.raw.reserve(additional);
123 4811514 : }
124 :
125 : /// Shortens the buffer, keeping the first len bytes.
126 10 : pub fn truncate(&mut self, len: usize) {
127 10 : self.raw.truncate(len);
128 10 : }
129 :
130 : /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
131 92 : pub fn leak<'a>(self) -> &'a mut [u8] {
132 92 : self.raw.leak()
133 92 : }
134 :
135 8872 : pub fn freeze(self) -> AlignedBuffer<A> {
136 8872 : let len = self.len();
137 8872 : AlignedBuffer::from_raw(self.raw, 0..len)
138 8872 : }
139 :
140 : /// Clones and appends all elements in a slice to the buffer. Reserves additional capacity as needed.
141 : #[inline]
142 4811492 : pub fn extend_from_slice(&mut self, extend: &[u8]) {
143 4811492 : let cnt = extend.len();
144 4811492 : self.reserve(cnt);
145 4811492 :
146 4811492 : // SAFETY: we already reserved additional `cnt` bytes, safe to perform memcpy.
147 4811492 : unsafe {
148 4811492 : let dst = self.spare_capacity_mut();
149 4811492 : // Reserved above
150 4811492 : debug_assert!(dst.len() >= cnt);
151 :
152 4811492 : core::ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt);
153 4811492 : }
154 4811492 : // SAFETY: We do have at least `cnt` bytes remaining before advance.
155 4811492 : unsafe {
156 4811492 : bytes::BufMut::advance_mut(self, cnt);
157 4811492 : }
158 4811492 : }
159 :
160 : /// Returns the remaining spare capacity of the vector as a slice of `MaybeUninit<u8>`.
161 : #[inline]
162 4811492 : fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
163 4811492 : // SAFETY: we guarantees that the `Self::capacity()` bytes from
164 4811492 : // `Self::as_mut_ptr()` are allocated.
165 4811492 : unsafe {
166 4811492 : let ptr = self.as_mut_ptr().add(self.len());
167 4811492 : let len = self.capacity() - self.len();
168 4811492 :
169 4811492 : core::slice::from_raw_parts_mut(ptr.cast(), len)
170 4811492 : }
171 4811492 : }
172 : }
173 :
174 : impl<A: Alignment> Deref for AlignedBufferMut<A> {
175 : type Target = [u8];
176 :
177 4137850 : fn deref(&self) -> &Self::Target {
178 4137850 : self.as_slice()
179 4137850 : }
180 : }
181 :
182 : impl<A: Alignment> DerefMut for AlignedBufferMut<A> {
183 0 : fn deref_mut(&mut self) -> &mut Self::Target {
184 0 : self.as_mut_slice()
185 0 : }
186 : }
187 :
188 : impl<A: Alignment> AsRef<[u8]> for AlignedBufferMut<A> {
189 0 : fn as_ref(&self) -> &[u8] {
190 0 : self.as_slice()
191 0 : }
192 : }
193 :
194 : impl<A: Alignment> AsMut<[u8]> for AlignedBufferMut<A> {
195 0 : fn as_mut(&mut self) -> &mut [u8] {
196 0 : self.as_mut_slice()
197 0 : }
198 : }
199 :
200 : impl<A: Alignment> PartialEq<[u8]> for AlignedBufferMut<A> {
201 704 : fn eq(&self, other: &[u8]) -> bool {
202 704 : self.as_slice().eq(other)
203 704 : }
204 : }
205 :
206 : /// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
207 : unsafe impl<A: Alignment> bytes::BufMut for AlignedBufferMut<A> {
208 : #[inline]
209 4812156 : fn remaining_mut(&self) -> usize {
210 4812156 : // Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
211 4812156 : // Thus, it can have at most `self.capacity` bytes.
212 4812156 : self.capacity() - self.len()
213 4812156 : }
214 :
215 : // SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
216 : #[inline]
217 4811822 : unsafe fn advance_mut(&mut self, cnt: usize) {
218 4811822 : let len = self.len();
219 4811822 : let remaining = self.remaining_mut();
220 4811822 :
221 4811822 : if remaining < cnt {
222 0 : panic_advance(cnt, remaining);
223 4811822 : }
224 4811822 :
225 4811822 : // Addition will not overflow since the sum is at most the capacity.
226 4811822 : self.set_len(len + cnt);
227 4811822 : }
228 :
229 : #[inline]
230 330 : fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
231 330 : let cap = self.capacity();
232 330 : let len = self.len();
233 330 :
234 330 : // SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
235 330 : // valid for `cap - len` bytes. The subtraction will not underflow since
236 330 : // `len <= cap`.
237 330 : unsafe {
238 330 : bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len)
239 330 : }
240 330 : }
241 : }
242 :
243 : /// Panic with a nice error message.
244 : #[cold]
245 0 : fn panic_advance(idx: usize, len: usize) -> ! {
246 0 : panic!(
247 0 : "advance out of bounds: the len is {} but advancing by {}",
248 0 : len, idx
249 0 : );
250 : }
251 :
252 : /// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer,
253 : /// and the underlying pointer remains stable while io-uring is owning the buffer.
254 : /// The tokio-epoll-uring crate itself will not resize the buffer and will respect
255 : /// [`tokio_epoll_uring::IoBuf::bytes_total`].
256 : unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBufferMut<A> {
257 3253032 : fn stable_ptr(&self) -> *const u8 {
258 3253032 : self.as_ptr()
259 3253032 : }
260 :
261 6999674 : fn bytes_init(&self) -> usize {
262 6999674 : self.len()
263 6999674 : }
264 :
265 2532864 : fn bytes_total(&self) -> usize {
266 2532864 : self.capacity()
267 2532864 : }
268 : }
269 :
270 : // SAFETY: See above.
271 : unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
272 1646373 : fn stable_mut_ptr(&mut self) -> *mut u8 {
273 1646373 : self.as_mut_ptr()
274 1646373 : }
275 :
276 934105 : unsafe fn set_init(&mut self, init_len: usize) {
277 934105 : if self.len() < init_len {
278 707529 : self.set_len(init_len);
279 707529 : }
280 934105 : }
281 : }
282 :
283 : #[cfg(test)]
284 : mod tests {
285 :
286 : use super::*;
287 :
288 : const ALIGN: usize = 4 * 1024;
289 : type TestIoBufferMut = AlignedBufferMut<ConstAlign<ALIGN>>;
290 :
291 : #[test]
292 2 : fn test_with_capacity() {
293 2 : let v = TestIoBufferMut::with_capacity(ALIGN * 4);
294 2 : assert_eq!(v.len(), 0);
295 2 : assert_eq!(v.capacity(), ALIGN * 4);
296 2 : assert_eq!(v.align(), ALIGN);
297 2 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
298 :
299 2 : let v = TestIoBufferMut::with_capacity(ALIGN / 2);
300 2 : assert_eq!(v.len(), 0);
301 2 : assert_eq!(v.capacity(), ALIGN / 2);
302 2 : assert_eq!(v.align(), ALIGN);
303 2 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
304 2 : }
305 :
306 : #[test]
307 2 : fn test_with_capacity_zeroed() {
308 2 : let v = TestIoBufferMut::with_capacity_zeroed(ALIGN);
309 2 : assert_eq!(v.len(), ALIGN);
310 2 : assert_eq!(v.capacity(), ALIGN);
311 2 : assert_eq!(v.align(), ALIGN);
312 2 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
313 2 : assert_eq!(&v[..], &[0; ALIGN])
314 2 : }
315 :
316 : #[test]
317 2 : fn test_reserve() {
318 : use bytes::BufMut;
319 2 : let mut v = TestIoBufferMut::with_capacity(ALIGN);
320 2 : let capacity = v.capacity();
321 2 : v.reserve(capacity);
322 2 : assert_eq!(v.capacity(), capacity);
323 2 : let data = [b'a'; ALIGN];
324 2 : v.put(&data[..]);
325 2 : v.reserve(capacity);
326 2 : assert!(v.capacity() >= capacity * 2);
327 2 : assert_eq!(&v[..], &data[..]);
328 2 : let capacity = v.capacity();
329 2 : v.clear();
330 2 : v.reserve(capacity);
331 2 : assert_eq!(capacity, v.capacity());
332 2 : }
333 :
334 : #[test]
335 2 : fn test_bytes_put() {
336 : use bytes::BufMut;
337 2 : let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
338 2 : let x = [b'a'; ALIGN];
339 :
340 6 : for _ in 0..2 {
341 20 : for _ in 0..4 {
342 16 : v.put(&x[..]);
343 16 : }
344 4 : assert_eq!(v.len(), ALIGN * 4);
345 4 : assert_eq!(v.capacity(), ALIGN * 4);
346 4 : assert_eq!(v.align(), ALIGN);
347 4 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
348 4 : v.clear()
349 : }
350 2 : assert_eq!(v.len(), 0);
351 2 : assert_eq!(v.capacity(), ALIGN * 4);
352 2 : assert_eq!(v.align(), ALIGN);
353 2 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
354 2 : }
355 :
356 : #[test]
357 : #[should_panic]
358 2 : fn test_bytes_put_panic() {
359 : use bytes::BufMut;
360 : const ALIGN: usize = 4 * 1024;
361 2 : let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
362 2 : let x = [b'a'; ALIGN];
363 12 : for _ in 0..5 {
364 10 : v.put_slice(&x[..]);
365 10 : }
366 2 : }
367 :
368 : #[test]
369 2 : fn test_io_buf_put_slice() {
370 : use tokio_epoll_uring::BoundedBufMut;
371 : const ALIGN: usize = 4 * 1024;
372 2 : let mut v = TestIoBufferMut::with_capacity(ALIGN);
373 2 : let x = [b'a'; ALIGN];
374 :
375 6 : for _ in 0..2 {
376 4 : v.put_slice(&x[..]);
377 4 : assert_eq!(v.len(), ALIGN);
378 4 : assert_eq!(v.capacity(), ALIGN);
379 4 : assert_eq!(v.align(), ALIGN);
380 4 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
381 4 : v.clear()
382 : }
383 2 : assert_eq!(v.len(), 0);
384 2 : assert_eq!(v.capacity(), ALIGN);
385 2 : assert_eq!(v.align(), ALIGN);
386 2 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
387 2 : }
388 : }
|