Line data Source code
1 : use std::ops::{Deref, DerefMut};
2 :
3 : use super::{
4 : alignment::{Alignment, ConstAlign},
5 : buffer::AlignedBuffer,
6 : raw::RawAlignedBuffer,
7 : };
8 :
9 : /// A mutable aligned buffer type.
10 : #[derive(Debug)]
11 : pub struct AlignedBufferMut<A: Alignment> {
12 : raw: RawAlignedBuffer<A>,
13 : }
14 :
15 : impl<const A: usize> AlignedBufferMut<ConstAlign<A>> {
16 : /// Constructs a new, empty `IoBufferMut` with at least the specified capacity and alignment.
17 : ///
18 : /// The buffer will be able to hold at most `capacity` elements and will never resize.
19 : ///
20 : ///
21 : /// # Panics
22 : ///
23 : /// Panics if the new capacity exceeds `isize::MAX` _bytes_, or if the following alignment requirement is not met:
24 : /// * `align` must not be zero,
25 : ///
26 : /// * `align` must be a power of two,
27 : ///
28 : /// * `capacity`, when rounded up to the nearest multiple of `align`,
29 : /// must not overflow isize (i.e., the rounded value must be
30 : /// less than or equal to `isize::MAX`).
31 996495 : pub fn with_capacity(capacity: usize) -> Self {
32 996495 : AlignedBufferMut {
33 996495 : raw: RawAlignedBuffer::with_capacity(capacity),
34 996495 : }
35 996495 : }
36 :
37 : /// Constructs a new `IoBufferMut` with at least the specified capacity and alignment, filled with zeros.
38 300 : pub fn with_capacity_zeroed(capacity: usize) -> Self {
39 : use bytes::BufMut;
40 300 : let mut buf = Self::with_capacity(capacity);
41 300 : buf.put_bytes(0, capacity);
42 300 : // SAFETY: `put_bytes` filled the entire buffer.
43 300 : unsafe { buf.set_len(capacity) };
44 300 : buf
45 300 : }
46 : }
47 :
48 : impl<A: Alignment> AlignedBufferMut<A> {
49 : /// Returns the total number of bytes the buffer can hold.
50 : #[inline]
51 116789190 : pub fn capacity(&self) -> usize {
52 116789190 : self.raw.capacity()
53 116789190 : }
54 :
55 : /// Returns the alignment of the buffer.
56 : #[inline]
57 18 : pub fn align(&self) -> usize {
58 18 : self.raw.align()
59 18 : }
60 :
61 : /// Returns the number of bytes in the buffer, also referred to as its 'length'.
62 : #[inline]
63 161574830 : pub fn len(&self) -> usize {
64 161574830 : self.raw.len()
65 161574830 : }
66 :
67 : /// Force the length of the buffer to `new_len`.
68 : #[inline]
69 38593803 : unsafe fn set_len(&mut self, new_len: usize) {
70 38593803 : self.raw.set_len(new_len)
71 38593803 : }
72 :
73 : #[inline]
74 4221117 : fn as_ptr(&self) -> *const u8 {
75 4221117 : self.raw.as_ptr()
76 4221117 : }
77 :
78 : #[inline]
79 39582205 : fn as_mut_ptr(&mut self) -> *mut u8 {
80 39582205 : self.raw.as_mut_ptr()
81 39582205 : }
82 :
83 : /// Extracts a slice containing the entire buffer.
84 : ///
85 : /// Equivalent to `&s[..]`.
86 : #[inline]
87 3834395 : fn as_slice(&self) -> &[u8] {
88 3834395 : self.raw.as_slice()
89 3834395 : }
90 :
91 : /// Extracts a mutable slice of the entire buffer.
92 : ///
93 : /// Equivalent to `&mut s[..]`.
94 0 : fn as_mut_slice(&mut self) -> &mut [u8] {
95 0 : self.raw.as_mut_slice()
96 0 : }
97 :
98 : /// Drops the all the contents of the buffer, setting its length to `0`.
99 : #[inline]
100 26 : pub fn clear(&mut self) {
101 26 : self.raw.clear()
102 26 : }
103 :
104 : /// Reserves capacity for at least `additional` more bytes to be inserted
105 : /// in the given `IoBufferMut`. The collection may reserve more space to
106 : /// speculatively avoid frequent reallocations. After calling `reserve`,
107 : /// capacity will be greater than or equal to `self.len() + additional`.
108 : /// Does nothing if capacity is already sufficient.
109 : ///
110 : /// # Panics
111 : ///
112 : /// Panics if the new capacity exceeds `isize::MAX` _bytes_.
113 22 : pub fn reserve(&mut self, additional: usize) {
114 22 : self.raw.reserve(additional);
115 22 : }
116 :
117 : /// Shortens the buffer, keeping the first len bytes.
118 10 : pub fn truncate(&mut self, len: usize) {
119 10 : self.raw.truncate(len);
120 10 : }
121 :
122 : /// Consumes and leaks the `IoBufferMut`, returning a mutable reference to the contents, &'a mut [u8].
123 88 : pub fn leak<'a>(self) -> &'a mut [u8] {
124 88 : self.raw.leak()
125 88 : }
126 :
127 968 : pub fn freeze(self) -> AlignedBuffer<A> {
128 968 : let len = self.len();
129 968 : AlignedBuffer::from_raw(self.raw, 0..len)
130 968 : }
131 : }
132 :
133 : impl<A: Alignment> Deref for AlignedBufferMut<A> {
134 : type Target = [u8];
135 :
136 3637769 : fn deref(&self) -> &Self::Target {
137 3637769 : self.as_slice()
138 3637769 : }
139 : }
140 :
141 : impl<A: Alignment> DerefMut for AlignedBufferMut<A> {
142 0 : fn deref_mut(&mut self) -> &mut Self::Target {
143 0 : self.as_mut_slice()
144 0 : }
145 : }
146 :
147 : impl<A: Alignment> AsRef<[u8]> for AlignedBufferMut<A> {
148 0 : fn as_ref(&self) -> &[u8] {
149 0 : self.as_slice()
150 0 : }
151 : }
152 :
153 : impl<A: Alignment> AsMut<[u8]> for AlignedBufferMut<A> {
154 0 : fn as_mut(&mut self) -> &mut [u8] {
155 0 : self.as_mut_slice()
156 0 : }
157 : }
158 :
159 : impl<A: Alignment> PartialEq<[u8]> for AlignedBufferMut<A> {
160 196626 : fn eq(&self, other: &[u8]) -> bool {
161 196626 : self.as_slice().eq(other)
162 196626 : }
163 : }
164 :
165 : /// SAFETY: When advancing the internal cursor, the caller needs to make sure the bytes advcanced past have been initialized.
166 : unsafe impl<A: Alignment> bytes::BufMut for AlignedBufferMut<A> {
167 : #[inline]
168 75382876 : fn remaining_mut(&self) -> usize {
169 75382876 : // Although a `Vec` can have at most isize::MAX bytes, we never want to grow `IoBufferMut`.
170 75382876 : // Thus, it can have at most `self.capacity` bytes.
171 75382876 : self.capacity() - self.len()
172 75382876 : }
173 :
174 : // SAFETY: Caller needs to make sure the bytes being advanced past have been initialized.
175 : #[inline]
176 37691436 : unsafe fn advance_mut(&mut self, cnt: usize) {
177 37691436 : let len = self.len();
178 37691436 : let remaining = self.remaining_mut();
179 37691436 :
180 37691436 : if remaining < cnt {
181 0 : panic_advance(cnt, remaining);
182 37691436 : }
183 37691436 :
184 37691436 : // Addition will not overflow since the sum is at most the capacity.
185 37691436 : self.set_len(len + cnt);
186 37691436 : }
187 :
188 : #[inline]
189 37691436 : fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
190 37691436 : let cap = self.capacity();
191 37691436 : let len = self.len();
192 37691436 :
193 37691436 : // SAFETY: Since `self.ptr` is valid for `cap` bytes, `self.ptr.add(len)` must be
194 37691436 : // valid for `cap - len` bytes. The subtraction will not underflow since
195 37691436 : // `len <= cap`.
196 37691436 : unsafe {
197 37691436 : bytes::buf::UninitSlice::from_raw_parts_mut(self.as_mut_ptr().add(len), cap - len)
198 37691436 : }
199 37691436 : }
200 : }
201 :
202 : /// Panic with a nice error message.
203 : #[cold]
204 0 : fn panic_advance(idx: usize, len: usize) -> ! {
205 0 : panic!(
206 0 : "advance out of bounds: the len is {} but advancing by {}",
207 0 : len, idx
208 0 : );
209 : }
210 :
211 : /// Safety: [`AlignedBufferMut`] has exclusive ownership of the io buffer,
212 : /// and the underlying pointer remains stable while io-uring is owning the buffer.
213 : /// The tokio-epoll-uring crate itself will not resize the buffer and will respect
214 : /// [`tokio_epoll_uring::IoBuf::bytes_total`].
215 : unsafe impl<A: Alignment> tokio_epoll_uring::IoBuf for AlignedBufferMut<A> {
216 4221099 : fn stable_ptr(&self) -> *const u8 {
217 4221099 : self.as_ptr()
218 4221099 : }
219 :
220 9181180 : fn bytes_init(&self) -> usize {
221 9181180 : self.len()
222 9181180 : }
223 :
224 3501770 : fn bytes_total(&self) -> usize {
225 3501770 : self.capacity()
226 3501770 : }
227 : }
228 :
229 : // SAFETY: See above.
230 : unsafe impl<A: Alignment> tokio_epoll_uring::IoBufMut for AlignedBufferMut<A> {
231 1890769 : fn stable_mut_ptr(&mut self) -> *mut u8 {
232 1890769 : self.as_mut_ptr()
233 1890769 : }
234 :
235 1128737 : unsafe fn set_init(&mut self, init_len: usize) {
236 1128737 : if self.len() < init_len {
237 902067 : self.set_len(init_len);
238 902067 : }
239 1128737 : }
240 : }
241 :
242 : #[cfg(test)]
243 : mod tests {
244 :
245 : use super::*;
246 :
247 : const ALIGN: usize = 4 * 1024;
248 : type TestIoBufferMut = AlignedBufferMut<ConstAlign<ALIGN>>;
249 :
250 : #[test]
251 2 : fn test_with_capacity() {
252 2 : let v = TestIoBufferMut::with_capacity(ALIGN * 4);
253 2 : assert_eq!(v.len(), 0);
254 2 : assert_eq!(v.capacity(), ALIGN * 4);
255 2 : assert_eq!(v.align(), ALIGN);
256 2 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
257 :
258 2 : let v = TestIoBufferMut::with_capacity(ALIGN / 2);
259 2 : assert_eq!(v.len(), 0);
260 2 : assert_eq!(v.capacity(), ALIGN / 2);
261 2 : assert_eq!(v.align(), ALIGN);
262 2 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
263 2 : }
264 :
265 : #[test]
266 2 : fn test_with_capacity_zeroed() {
267 2 : let v = TestIoBufferMut::with_capacity_zeroed(ALIGN);
268 2 : assert_eq!(v.len(), ALIGN);
269 2 : assert_eq!(v.capacity(), ALIGN);
270 2 : assert_eq!(v.align(), ALIGN);
271 2 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
272 2 : assert_eq!(&v[..], &[0; ALIGN])
273 2 : }
274 :
275 : #[test]
276 2 : fn test_reserve() {
277 : use bytes::BufMut;
278 2 : let mut v = TestIoBufferMut::with_capacity(ALIGN);
279 2 : let capacity = v.capacity();
280 2 : v.reserve(capacity);
281 2 : assert_eq!(v.capacity(), capacity);
282 2 : let data = [b'a'; ALIGN];
283 2 : v.put(&data[..]);
284 2 : v.reserve(capacity);
285 2 : assert!(v.capacity() >= capacity * 2);
286 2 : assert_eq!(&v[..], &data[..]);
287 2 : let capacity = v.capacity();
288 2 : v.clear();
289 2 : v.reserve(capacity);
290 2 : assert_eq!(capacity, v.capacity());
291 2 : }
292 :
293 : #[test]
294 2 : fn test_bytes_put() {
295 : use bytes::BufMut;
296 2 : let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
297 2 : let x = [b'a'; ALIGN];
298 :
299 6 : for _ in 0..2 {
300 20 : for _ in 0..4 {
301 16 : v.put(&x[..]);
302 16 : }
303 4 : assert_eq!(v.len(), ALIGN * 4);
304 4 : assert_eq!(v.capacity(), ALIGN * 4);
305 4 : assert_eq!(v.align(), ALIGN);
306 4 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
307 4 : v.clear()
308 : }
309 2 : assert_eq!(v.len(), 0);
310 2 : assert_eq!(v.capacity(), ALIGN * 4);
311 2 : assert_eq!(v.align(), ALIGN);
312 2 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
313 2 : }
314 :
315 : #[test]
316 : #[should_panic]
317 2 : fn test_bytes_put_panic() {
318 : use bytes::BufMut;
319 : const ALIGN: usize = 4 * 1024;
320 2 : let mut v = TestIoBufferMut::with_capacity(ALIGN * 4);
321 2 : let x = [b'a'; ALIGN];
322 12 : for _ in 0..5 {
323 10 : v.put_slice(&x[..]);
324 10 : }
325 2 : }
326 :
327 : #[test]
328 2 : fn test_io_buf_put_slice() {
329 : use tokio_epoll_uring::BoundedBufMut;
330 : const ALIGN: usize = 4 * 1024;
331 2 : let mut v = TestIoBufferMut::with_capacity(ALIGN);
332 2 : let x = [b'a'; ALIGN];
333 :
334 6 : for _ in 0..2 {
335 4 : v.put_slice(&x[..]);
336 4 : assert_eq!(v.len(), ALIGN);
337 4 : assert_eq!(v.capacity(), ALIGN);
338 4 : assert_eq!(v.align(), ALIGN);
339 4 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
340 4 : v.clear()
341 : }
342 2 : assert_eq!(v.len(), 0);
343 2 : assert_eq!(v.capacity(), ALIGN);
344 2 : assert_eq!(v.align(), ALIGN);
345 2 : assert_eq!(v.as_ptr().align_offset(ALIGN), 0);
346 2 : }
347 : }
|