Line data Source code
1 : use std::mem::MaybeUninit;
2 : use std::ops::{Deref, DerefMut};
3 :
4 : use super::alignment::{Alignment, ConstAlign};
5 : use super::buffer::AlignedBuffer;
6 : use super::raw::RawAlignedBuffer;
7 :
8 : /// A mutable aligned buffer type.
9 : #[derive(Debug)]
10 : pub struct AlignedBufferMut<A: Alignment> {
11 : raw: RawAlignedBuffer<A>,
12 : }
13 :
14 : impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
15 : /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
16 : ///
17 : /// The buffer will be able to hold at most `capacity` elements and will never resize.
18 : ///
19 : ///
20 : /// # Panics
21 : ///
22 : /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
23 : /// * `align` must not be zero,
24 : ///
25 : /// * `align` must be a power of two,
26 : ///
27 : /// * `capacity`, when rounded up to the nearest multiple of `align`,
28 : /// must not overflow isize (i.e., the rounded value must be
29 : /// less than or equal to `isize::MAX`).
30 4862956 : pub fn with_capacity(capacity: usize) -> Self {
31 4862956 : AlignedBufferMut {
32 4862956 : raw: RawAlignedBuffer::with_capacity(capacity),
33 4862956 : }
34 4862956 : }
35 :
36 : /// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
37 1872 : pub fn with_capacity_zeroed(capacity: usize) -> Self {
38 : use bytes::BufMut;
39 1872 : let mut buf = Self::with_capacity(capacity);
40 1872 : buf.put_bytes(0, capacity);
41 1872 : // SAFETY: `put_bytes` filled the entire buffer.
42 1872 : unsafe { buf.set_len(capacity) };
43 1872 : buf
44 1872 : }
45 : }
46 :
47 : impl<A: Alignment> AlignedBufferMut<A> {
48 : /// Constructs a mutable aligned buffer from raw.
49 141648 : pub(super) fn from_raw(raw: RawAlignedBuffer<A>) -> Self {
50 141648 : AlignedBufferMut { raw }
51 141648 : }
52 :
53 : /// Returns the total number of bytes the buffer can hold.
54 : #[inline]
55 559814405 : pub fn capacity(&self) -> usize {
56 559814405 : self.raw.capacity()
57 559814405 : }
58 :
59 : /// Returns the alignment of the buffer.
60 : #[inline]
61 132 : pub fn align(&self) -> usize {
62 132 : self.raw.align()
63 132 : }
64 :
65 : /// Returns the number of bytes in the buffer, also referred to as its 'length'.
66 : #[inline]
67 817421756 : pub fn len(&self) -> usize {
68 817421756 : self.raw.len()
69 817421756 : }
70 :
71 : /// Force the length of the buffer to `new_len`.
72 : #[inline]
73 113513620 : unsafe fn set_len(&mut self, new_len: usize) {
74 113513620 : // SAFETY: the caller is unsafe
75 113513620 : unsafe { self.raw.set_len(new_len) }
76 113513620 : }
77 :
78 : #[inline]
79 21526823 : fn as_ptr(&self) -> *const u8 {
80 21526823 : self.raw.as_ptr()
81 21526823 : }
82 :
83 : #[inline]
84 119373482 : fn as_mut_ptr(&mut self) -> *mut u8 {
85 119373482 : self.raw.as_mut_ptr()
86 119373482 : }
87 :
88 : /// Extracts a slice containing the entire buffer.
89 : ///
90 : /// Equivalent to `&s[..]`.
91 : #[inline]
92 32503891 : fn as_slice(&self) -> &[u8] {
93 32503891 : self.raw.as_slice()
94 32503891 : }
95 :
96 : /// Extracts a mutable slice of the entire buffer.
97 : ///
98 : /// Equivalent to `&mut s[..]`.
99 0 : fn as_mut_slice(&mut self) -> &mut [u8] {
100 0 : self.raw.as_mut_slice()
101 0 : }
102 :
103 : /// Drops the all the contents of the buffer, setting its length to `0`.
104 : #[inline]
105 141804 : pub fn clear(&mut self) {
106 141804 : self.raw.clear()
107 141804 : }
108 :
109 : /// Reserves capacity for at least `additional` more bytes to be inserted
110 : /// in the given `IoBufferMut`. The collection may reserve more space to
111 : /// speculatively avoid frequent reallocations. After calling `reserve`,
112 : /// capacity will be greater than or equal to `self.len() + additional`.
113 : /// Does nothing if capacity is already sufficient.
114 : ///
115 : /// # Panics
116 : ///
117 : /// Panics if the new capacity exceeds `isize::MAX` _bytes_.
118 108077772 : pub fn reserve(&mut self, additional: usize) {
119 108077772 : self.raw.reserve(additional);
120 108077772 : }
121 :
122 : /// Shortens the buffer, keeping the first len bytes.
123 60 : pub fn truncate(&mut self, len: usize) {
124 60 : self.raw.truncate(len);
125 60 : }
126 :
127 : /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
128 600 : pub fn leak<'a>(self) -> &'a mut [u8] {
129 600 : self.raw.leak()
130 600 : }
131 :
132 265340 : pub fn freeze(self) -> AlignedBuffer<A> {
133 265340 : let len = self.len();
134 265340 : AlignedBuffer::from_raw(self.raw, 0..len)
135 265340 : }
136 :
137 : /// Clones and appends all elements in a slice to the buffer. Reserves additional capacity as needed.
138 : #[inline]
139 108077640 : pub fn extend_from_slice(&mut self, extend: &[u8]) {
140 108077640 : let cnt = extend.len();
141 108077640 : self.reserve(cnt);
142 108077640 :
143 108077640 : // SAFETY: we already reserved additional `cnt` bytes, safe to perform memcpy.
144 108077640 : unsafe {
145 108077640 : let dst = self.spare_capacity_mut();
146 108077640 : // Reserved above
147 108077640 : debug_assert!(dst.len() >= cnt);
148 :
149 108077640 : core::ptr::copy_nonoverlapping(extend.as_ptr(), dst.as_mut_ptr().cast(), cnt);
150 108077640 : }
151 108077640 : // SAFETY: We do have at least `cnt` bytes remaining before advance.
152 108077640 : unsafe {
153 108077640 : bytes::BufMut::advance_mut(self, cnt);
154 108077640 : }
155 108077640 : }
156 :
157 : /// Returns the remaining spare capacity of the vector as a slice of `MaybeUninit<u8>`.
158 : #[inline]
159 108077640 : fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<u8>] {
160 108077640 : // SAFETY: we guarantees that the `Self::capacity()` bytes from
161 108077640 : // `Self::as_mut_ptr()` are allocated.
162 108077640 : unsafe {
163 108077640 : let ptr = self.as_mut_ptr().add(self.len());
164 108077640 : let len = self.capacity() - self.len();
165 108077640 :
166 108077640 : core::slice::from_raw_parts_mut(ptr.cast(), len)
167 108077640 : }
168 108077640 : }
169 : }
170 :
171 : impl<A: Alignment> Deref for AlignedBufferMut<A> {
172 : type Target = [u8];
173 :
174 32499679 : fn deref(&self) -> &Self::Target {
175 32499679 : self.as_slice()
176 32499679 : }
177 : }
178 :
179 : impl<A: Alignment> DerefMut for AlignedBufferMut<A> {
180 0 : fn deref_mut(&mut self) -> &mut Self::Target {
181 0 : self.as_mut_slice()
182 0 : }
183 : }
184 :
185 : impl<A: Alignment> AsRef<[u8]> for AlignedBufferMut<A> {
186 0 : fn as_ref(&self) -> &[u8] {
187 0 : self.as_slice()
188 0 : }
189 : }
190 :
191 : impl<A: Alignment> AsMut<[u8]> for AlignedBufferMut<A> {
192 0 : fn as_mut(&mut self) -> &mut [u8] {
193 0 : self.as_mut_slice()
194 0 : }
195 : }
196 :
197 : impl<A: Alignment> PartialEq<[u8]> for AlignedBufferMut<A> {
198 4212 : fn eq(&self, other: &[u8]) -> bool {
199 4212 : self.as_slice().eq(other)
200 4212 : }
201 : }
202 :
203 : /// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
204 : unsafe impl<A: Alignment> bytes::BufMut for AlignedBufferMut<A> {
205 : #[inline]
206 109487996 : fn remaining_mut(&self) -> usize {
207 109487996 : // Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
208 109487996 : // Thus, it can have at most `self.capacity` bytes.
209 109487996 : self.capacity() - self.len()
210 109487996 : }
211 :
212 : // SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
213 : #[inline]
214 108775912 : unsafe fn advance_mut(&mut self, cnt: usize) {
215 108775912 : let len = self.len();
216 108775912 : let remaining = self.remaining_mut();
217 108775912 :
218 108775912 : if remaining < cnt {
219 0 : panic_advance(cnt, remaining);
220 108775912 : }
221 108775912 :
222 108775912 : // SAFETY: Addition will not overflow since the sum is at most the capacity.
223 108775912 : unsafe {
224 108775912 : self.set_len(len + cnt);
225 108775912 : }
226 108775912 : }
227 :
228 : #[inline]
229 698272 : fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
230 698272 : let cap = self.capacity();
231 698272 : let len = self.len();
232 698272 :
233 698272 : // SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
234 698272 : // valid for `cap - len` bytes. The subtraction will not underflow since
235 698272 : // `len <= cap`.
236 698272 : unsafe {
237 698272 : bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len)
238 698272 : }
239 698272 : }
240 : }
241 :
242 : /// Panic with a nice error message.
243 : #[cold]
244 0 : fn panic_advance(idx: usize, len: usize) -> ! {
245 0 : panic!(
246 0 : "advance out of bounds: the len is {} but advancing by {}",
247 0 : len, idx
248 0 : );
249 : }
250 :
251 : /// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer,
252 : /// and the underlying pointer remains stable while io-uring is owning the buffer.
253 : /// The tokio-epoll-uring crate itself will not resize the buffer and will respect
254 : /// [`tokio_epoll_uring::IoBuf::bytes_total`].
255 : unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBufferMut<A> {
256 21526715 : fn stable_ptr(&self) -> *const u8 {
257 21526715 : self.as_ptr()
258 21526715 : }
259 :
260 46327974 : fn bytes_init(&self) -> usize {
261 46327974 : self.len()
262 46327974 : }
263 :
264 17075770 : fn bytes_total(&self) -> usize {
265 17075770 : self.capacity()
266 17075770 : }
267 : }
268 :
269 : // SAFETY: See above.
270 : unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
271 10597570 : fn stable_mut_ptr(&mut self) -> *mut u8 {
272 10597570 : self.as_mut_ptr()
273 10597570 : }
274 :
275 6074616 : unsafe fn set_init(&mut self, init_len: usize) {
276 6074616 : if self.len() < init_len {
277 : // SAFETY: caller function is unsafe
278 4735836 : unsafe {
279 4735836 : self.set_len(init_len);
280 4735836 : }
281 1338780 : }
282 6074616 : }
283 : }
284 :
285 : impl<A: Alignment> std::io::Write for AlignedBufferMut<A> {
286 546720 : fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
287 546720 : self.extend_from_slice(buf);
288 546720 : Ok(buf.len())
289 546720 : }
290 :
291 0 : fn flush(&mut self) -> std::io::Result<()> {
292 0 : Ok(())
293 0 : }
294 : }
295 :
296 : #[cfg(test)]
297 : mod tests {
298 :
299 : use super::*;
300 :
301 : const ALIGN: usize = 4 * 1024;
302 : type TestIoBufferMut = AlignedBufferMut<ConstAlign<ALIGN>>;
303 :
304 : #[test]
305 12 : fn test_with_capacity() {
306 12 : let v = TestIoBufferMut::with_capacity(ALIGN * 4);
307 12 : assert_eq!(v.len(), 0);
308 12 : assert_eq!(v.capacity(), ALIGN * 4);
309 12 : assert_eq!(v.align(), ALIGN);
310 12 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
311 :
312 12 : let v = TestIoBufferMut::with_capacity(ALIGN / 2);
313 12 : assert_eq!(v.len(), 0);
314 12 : assert_eq!(v.capacity(), ALIGN / 2);
315 12 : assert_eq!(v.align(), ALIGN);
316 12 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
317 12 : }
318 :
319 : #[test]
320 12 : fn test_with_capacity_zeroed() {
321 12 : let v = TestIoBufferMut::with_capacity_zeroed(ALIGN);
322 12 : assert_eq!(v.len(), ALIGN);
323 12 : assert_eq!(v.capacity(), ALIGN);
324 12 : assert_eq!(v.align(), ALIGN);
325 12 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
326 12 : assert_eq!(&v[..], &[0; ALIGN])
327 12 : }
328 :
329 : #[test]
330 12 : fn test_reserve() {
331 : use bytes::BufMut;
332 12 : let mut v = TestIoBufferMut::with_capacity(ALIGN);
333 12 : let capacity = v.capacity();
334 12 : v.reserve(capacity);
335 12 : assert_eq!(v.capacity(), capacity);
336 12 : let data = [b'a'; ALIGN];
337 12 : v.put(&data[..]);
338 12 : v.reserve(capacity);
339 12 : assert!(v.capacity() >= capacity * 2);
340 12 : assert_eq!(&v[..], &data[..]);
341 12 : let capacity = v.capacity();
342 12 : v.clear();
343 12 : v.reserve(capacity);
344 12 : assert_eq!(capacity, v.capacity());
345 12 : }
346 :
347 : #[test]
348 12 : fn test_bytes_put() {
349 : use bytes::BufMut;
350 12 : let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
351 12 : let x = [b'a'; ALIGN];
352 :
353 36 : for _ in 0..2 {
354 120 : for _ in 0..4 {
355 96 : v.put(&x[..]);
356 96 : }
357 24 : assert_eq!(v.len(), ALIGN * 4);
358 24 : assert_eq!(v.capacity(), ALIGN * 4);
359 24 : assert_eq!(v.align(), ALIGN);
360 24 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
361 24 : v.clear()
362 : }
363 12 : assert_eq!(v.len(), 0);
364 12 : assert_eq!(v.capacity(), ALIGN * 4);
365 12 : assert_eq!(v.align(), ALIGN);
366 12 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
367 12 : }
368 :
369 : #[test]
370 : #[should_panic]
371 12 : fn test_bytes_put_panic() {
372 : use bytes::BufMut;
373 : const ALIGN: usize = 4 * 1024;
374 12 : let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
375 12 : let x = [b'a'; ALIGN];
376 60 : for _ in 0..5 {
377 48 : v.put_slice(&x[..]);
378 48 : }
379 12 : }
380 :
381 : #[test]
382 12 : fn test_io_buf_put_slice() {
383 : use tokio_epoll_uring::BoundedBufMut;
384 : const ALIGN: usize = 4 * 1024;
385 12 : let mut v = TestIoBufferMut::with_capacity(ALIGN);
386 12 : let x = [b'a'; ALIGN];
387 :
388 36 : for _ in 0..2 {
389 24 : v.put_slice(&x[..]);
390 24 : assert_eq!(v.len(), ALIGN);
391 24 : assert_eq!(v.capacity(), ALIGN);
392 24 : assert_eq!(v.align(), ALIGN);
393 24 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
394 24 : v.clear()
395 : }
396 12 : assert_eq!(v.len(), 0);
397 12 : assert_eq!(v.capacity(), ALIGN);
398 12 : assert_eq!(v.align(), ALIGN);
399 12 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
400 12 : }
401 : }
|