crate bytes 提供了低开销的只读=连续内存= 的共享和可修改访问,支持按照 le/ne 方式来读取数据,特别适合网络IO 场景。
struct Bytes/BytesMut:
- Bytes:提供了高效
只读的
zero-copy 的内存区域操作,支持通过引用计数方式对共享内存区域的 clone/slice/split/slice() 操作。 支持 Send/Sync, 可以在多线程中高效共享数据。 - BytesMut:提供了高效的
可读写、自动扩充
的内存区域操作。
Bytes 和 BytesMut 实现了 Deref<Target=[u8]>,所以可以当做高效的 &[u8]/&mut[u8] 来使用。
Buf/BufMut trait:
- Buf trait:支持 get_u16_le() 等大小端读取,同时 get_XX 操作自动更新内部 buffer 的读 cursor 指针,从而实现多次连续读内存。
- BufMut trait:支持 put_u16_le() 等大小端写入,同时 put_XX 操作自动更新内部 buffer 的写 cursor 指针,从而实现多次连续写内存。
struct Bytes 实现了 Buf trait,struct BytesMut 同时实现了 Buf 和 BufMut trait。
通过 Buf、BufMut trait 方法读写内存时 才会
自动更新内部读写指针,但是如果没有使用这些方法,如使用 &[u8] 的 slice 方法,则不会感知读写指针。
1 struct Bytes #
struct Bytes 为底层连续内存区域提供了只读的 zero copy(基于引用计数)的 slice/split/clone() 操作, 主要用于高效共享读取访问:
- 实现了 bytes::Buf trait
- 实现 AsRef<[u8]>, Borrow<[u8]>, Deref<Target=[u8]> trait,所以
Bytes 可以当作 [u8] 来使用
; - 实现了各种 From<T> trait, T 可以是 &[u8]/&str/Bytes/BytesMut/String/Vec<u8>
use bytes::Bytes;
let b = Bytes::new();
// b[..] 自动调用 Deref<Target=[u8]>, 转换为 [u8] 后,调用其 index() 操作
assert_eq!(&b[..], b"");
let b = Bytes::from_static(b"hello");
assert_eq!(&b[..], b"hello");
let b = Bytes::from(&b"hello"[..]); // &[u8]
assert_eq!(b.len(), 5);
let mut mem = Bytes::from("Hello world");
// 没有内存拷贝, 只是增加引用计数,返回一个新的 Bytes 对象.
let a = mem.slice(0..5);
assert_eq!(a, "Hello");
// 没有内存拷贝, 只是增加引用计数. 返回一个新的 Bytes 对象
let mem2 = mem.clone();
// Returns a slice of self that is equivalent to the given subset.
let bytes = Bytes::from(&b"012345678"[..]);
let as_slice = bytes.as_ref();
let subset = &as_slice[2..6];
// subset 使用 Bytes 生成的, 然后又使用它生成一个 Bytes
let subslice = bytes.slice_ref(&subset);
// bytes/subset/subslice 都共享同一个内存区域,没有内存拷贝
assert_eq!(&subslice[..], b"2345");
// split_off 将 Bytes 拆为两个, self 包含 [0, at),返回 Bytes 包含 [at, len)
let mut a = Bytes::from(&b"hello world"[..]);
let b = a.split_off(5);
assert_eq!(&a[..], b"hello");
assert_eq!(&b[..], b" world");
// split_to 将 Bytes 拆为两个,self 包含 [at, len), 返回 Bytes 包含 [0, at).
let b = mem.split_to(6); // 内存 zero copy
assert_eq!(mem, "world");
assert_eq!(b, "Hello ");
let mut buf = Bytes::from(&b"hello world"[..]);
buf.truncate(5);
assert_eq!(buf, b"hello"[..]);
let mut buf = Bytes::from(&b"hello world"[..]);
buf.clear();
assert!(buf.is_empty());
struct Bytes 实现了 bytes::Buf trait,当调用 bytes::Buf 方法, 例如 get_u8/get_u16() 时, 会自动更新内部的 cursor
,从而在连续调用时依次返回下一个 u8/u16 内存:
- bytes::Buf.len() 返回
未读取的
内容长度; - &bytes::Buf[..] 返回
未读取的
内容 slice;
注:Bytes 本身没有维护读指针,它是在实现 bytes::Buf 的方法内部维护的,所以如果没有使用 bytes::Buf trait 方法来读取数据,如直接 &buf[…] 则读取的是当前未读取的所有数据(和读指针没有关系)。
use bytes::Buf;
use bytes::Bytes;
let mut buf = Bytes::from("hello world");
// 使用 bytes::Buf 的 get_XX() 方法读取时,自动更新内部 cursor 指针,
// 这样连续调用时返回下一个内存
// 区域内容。
assert_eq!(b'h', buf.get_u8());
assert_eq!(b'e', buf.get_u8());
assert_eq!(b'l', buf.get_u8());
buf.remaining(); // 返回未读取的数据长度
// buf.len() 返回未读取的内容长度 8, &buf[..] 返回未读取的内容
println!("{}, {:#?}", buf.len(), &buf[..]);
let mut rest = [0; 8];
// 将未读取的内容 copy 到传入的 slice
buf.copy_to_slice(&mut rest);
// bytes: after copy, len: 0, slice: []
assert_eq!(&rest[..], &b"lo world"[..]);
// &[u8], Box<T>, &mut T, VecDeque<u8>, Cursor<T>, Bytes, BytesMut
// 等都实现了 bytes::Buf
let mut buf = &b"hello world"[..];
assert_eq!(buf.remaining(), 11);
buf.get_u8();
assert_eq!(buf.remaining(), 10);
2 struct BytesMut #
struct BytesMut 提供了一个可以高效共享和读写的连续内存区域, 向 BytesMut 写入时自动扩充底层内存区域(但是如果是从固定大小的 &mut[u8] 创建的 BytesMut 则超过容量时会 panic):
use bytes::{BytesMut, BufMut};
// BytesMut 实现了 BufMut trait, 调用 BufMut Trait 的方法时,
// 内部自动更新写 cursor,所以连续的写操作会依次更新内存区域。
let mut buf = BytesMut::with_capacity(64);
buf.put_u8(b'h');
buf.put_u8(b'e');
buf.put(&b"llo"[..]);
// 由于内部有读写 cursor,所以 &buf[..] 不需要指定 range,返回未读取的内容
assert_eq!(&buf[..], b"hello");
// 冻结 buffer,这样可以共享。buf 是不可变的。
let a = buf.freeze();
// 基于引用计数的 clone,共享同一个连续内存区域。
let b = a.clone();
assert_eq!(&a[..], b"hello");
assert_eq!(&b[..], b"hello");
struct BytesMut 同时实现了 bytes::Buf 和 bytes::BufMut trait, 可以 AsMut<[u8]>, BorrowMut<[u8]>, DerefMut<Target=[u8]>;
- 读 cursor, 如使用 Buf trait 的 get_XX() 方法, 从数据区域的 self.ptr 开始,可以继续读的有效内容为 self.len, 读取一段长度 cnt 后, self.ptr 增加 cnt, self.len 减少 cnt;
- 写 cursor, 如使用 BufMut trait 的 put_XX() 方法, 从数据区域的self.ptr+self.len 开始, 可以继续写的空间是无限大( u32::MAX - self.len), 写一段长度 cnt 后, self.len 增加 cnt, self.capacity 减少 cnt;
- 写内容时, 会看当前实际分配的 Vec 的 capacity-len 是否满足 reserve 的需求, 同时看当前Vec 前面已经读的内存区域长度是否满足 reserve 的需求: 如果满足, 不需要 increase Vec 长度, 而是将 Vec 当前读取 cursor 到 len 的
内容移动到 Vec 开始
的位置,这时不需要扩容 Vec。如果不满足,则扩容 Vec 容量。
由于 BufMut 是自动增长且内部维护读写 cursor 和 buff 内存区域,所以非常适合循环读写场景:
// https://tokio.rs/tokio/tutorial/framing
use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;
pub async fn read_frame(&mut self) -> Result<Option<Frame>>
{
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// self.buffer 内部维护读写指针,每次调用 read_buf() 来写
// buffer 时都从上次写入的位置开始
if 0 == self.stream.read_buf(&mut self.buffer).await? {
if self.buffer.is_empty() {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
}
}
}
如果使用 Vec 的不带写 cursor 的 buffer 实现, 会复杂很多:
- 使用 Vec 来作为 buffer, 并用 0 值初始化;
- 手动维护 cursor 的位置, 当 cursor 打到 Vec len 时, 需要扩容 Vec;
- 向 Vec buff 写数据时, read(&mut[buf]) 方法需要知道传入的的 buff 的 length;
- Vec 会一直在增长, 缺少缩容的机制.( ).
use tokio::net::TcpStream;
pub struct Connection {
stream: TcpStream,
buffer: Vec<u8>,
cursor: usize,
}
impl Connection {
pub fn new(stream: TcpStream) -> Connection {
Connection {
stream,
buffer: vec![0; 4096], // len() 和 capacity() 均为 4096
cursor: 0,
}
}
}
use mini_redis::{Frame, Result};
pub async fn read_frame(&mut self) -> Result<Option<Frame>>
{
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
// 刚开始 self.buffer.len() 为 4096, self.cursor 为 0,
// resize() 会自动扩容 Vec 的容量.
if self.buffer.len() == self.cursor {
// 将 buff len 增加为指定的长度, 新增的元素用 0 填充。
self.buffer.resize(self.cursor * 2, 0);
}
// 传入的 &mut[u8] 的 len 是 Vec 的 len - cursor, read()
// 确保读取的直接长度 n < length
let n = self.stream.read(&mut self.buffer[self.cursor..]).await?;
if 0 == n {
if self.cursor == 0 {
return Ok(None);
} else {
return Err("connection reset by peer".into());
}
} else {
self.cursor += n;
}
}
}
3 Buf/BufMut trait #
Buf 和 BufMut 都是 trait, 内部维护读写 cursor:
- BytesMut 和 &mut [u8] 同时实现了 Buf 和 BufMut trait;
- &[u8]/VecDeque<u8>/Bytes/BytesMut 实现了 Buf;
- &mut [u8]/Vec<u8>/BytesMut 实现了 BufMut;
// 实现了 Buf 的类型
impl Buf for &[u8]
impl<T: Buf + ?Sized> Buf for &mut T
impl<T: Buf + ?Sized> Buf for Box<T>
impl Buf for VecDeque<u8>
impl<T: AsRef<[u8]>> Buf for Cursor<T>
impl Buf for Bytes
impl Buf for BytesMut
impl<T, U> Buf for Chain<T, U> where T: Buf, U: Buf
impl<T: Buf> Buf for Take<T>
// 实现了 BufMut 的类型
impl BufMut for Vec<u8>
impl BufMut for &mut [MaybeUninit<u8>]
impl<T: BufMut + ?Sized> BufMut for &mut T
impl<T: BufMut + ?Sized> BufMut for Box<T>
impl BufMut for &mut [u8]
impl BufMut for BytesMut
impl<T, U> BufMut for Chain<T, U>where T: BufMut, U: BufMut
impl<T: BufMut> BufMut for Limit<T>
tokio::io 的 AsyncReadExt 的 read_buff(),AsyncWriteExt 的 write_buf() 使用这两个 trait:
pub trait AsyncReadExt: AsyncRead {}
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
where
Self: Unpin,
B: BufMut + ?Sized,
pub trait AsyncWriteExt: AsyncWrite {}
fn write_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteBuf<'a, Self, B>
where
Self: Sized + Unpin,
B: Buf,
使用 &mut[u8] 作为 BufMut trait 的实现时 有风险
:
- put_XX() 操作不会自动扩充底层的 &mut[u8], 当写入的内容超过它的 length 时会 panic;
- put_XX() 操作会修改 &mut[8] 的内存起始地址, 导致 buf[..] 返回的是
还剩的没有写入的
的内存内容;
解决办法: 使用能自动扩容的 Vec 作为 buff 或者使用自动扩容的 bytes::BytesMut;
use bytes::BufMut;
fn main() {
let mut v = vec![1,2]; // 长度和容量为 2;
let mut buf :&mut [u8] = &mut v;
println!("{} {}", buf.remaining_mut(), buf2.remaining());
buf.put_u8(b'h');
println!("len: {}, content: {:#?}", buf.len(), String::from_utf8(buf[..].to_vec()));
buf.put_u8(b'e');
println!("len: {}, content: {:#?}", buf.len(), String::from_utf8(buf[..].to_vec()));
buf.put(&b"llo"[..]); // panic
println!("len: {}, content: {:#?}", buf.len(), String::from_utf8(buf[..].to_vec()));
}
// 执行失败:
// Exited with status 101
// Standard Error
// Compiling playground v0.0.1 (/playground)
// Finished dev [unoptimized + debuginfo] target(s) in 0.37s
// Running `target/debug/playground`
// thread 'main' panicked at /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/bytes-1.5.0/src/buf/buf_mut.rs:201:9:
// assertion failed: self.remaining_mut() >= src.remaining()
// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
// Standard Output
// remaining_mut: 2
// len: 1, remaining_mut: 1, content: Ok( // buf[..] 返回的是剩下的可写内容
// "\u{2}",
// )
// len: 0, remaining_mut: 0, content: Ok(
// "",
// )
BytesMut 实现了 Buf/BufMut, 可以同时读写, 是建议的使用 bytes Crate 的类型
:
- 使用 Buf/BufMut trait 方法才会更新内部读写 cursor, 使用继承自 [u8] 的方法=并不会更新内部 cursor= ;
- BytesMut Deref<target=[u8]>, 所以 BytesMut 支持 index slice 操作, 该操作会感知内部读指针(返回当前未读的所有内容),但是不会更新它。
创建可扩容
use bytes::BufMut;
use bytes::Buf;
use bytes::BytesMut;
fn main() {
let mut buf = BytesMut::with_capacity(64);
buf.put_u8(b'h');
buf.put_u8(b'e');
buf.put(&b"llo"[..]);
println!("len: {}, content: {:#?}", buf.len(),
String::from_utf8(buf[..].to_vec())); // 5 和 hello
// 读一个字节后, buf.len() 减少, buf[..] 返回剩余未读的内容
buf.get_u8();
println!("len: {}, content: {:#?}", buf.len(),
String::from_utf8(buf[..].to_vec())); // 4 和 ello
buf.put_u8(b'!'); // 继续上次写的位置写入
buf.put_u8(b'!');
println!("len: {}, content: {:#?}", buf.len(),
String::from_utf8(buf[..].to_vec())); // 6 和 ello!!
let bytes = buf.copy_to_bytes(5);
// Buf trait 的 copy_to_bytes/copy_to_slice 都会读消耗缓存
println!("len: {}, content: {:#?}", buf.len(),
String::from_utf8(buf[..].to_vec())); // 1 和 !
}
tokio::io::AsyncReadExt trait
提供了使用 BufMut 作为 buff 的 read_buf() 方法
,内部可以自动管理读写 cursor:
fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B>
where
Self: Unpin,
B: BufMut + ?Sized,
// 例子
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
use bytes::BytesMut;
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = BytesMut::with_capacity(10);
assert!(buffer.is_empty());
assert!(buffer.capacity() >= 10);
// 不需要返回读入的字节数, 因为 BytesMut 内部自动维护读写 cursor
f.read_buf(&mut buffer).await?;
println!("The bytes: {:?}", &buffer[..]); // 返回读到的所有内容
Ok(())
}
// 如果使用普通的 slice buffer, 则需要返回读到的字节数
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
let mut f = File::open("foo.txt").await?;
let mut buffer = [0; 10];
// 最多读取 10 bytes 数据。
let n = f.read(&mut buffer[..]).await?;
println!("The bytes: {:?}", &buffer[..n]);
Ok(())
}