跳过正文

bytes

·
目录
rust crate - 这篇文章属于一个选集。
§ 4: 本文

crate bytes 提供了低开销的只读 连续内存 的共享和可修改访问,支持按照大端或小端方式来读取数据,特别适合网络 IO 等场景。

struct Bytes/BytesMut 提供了对于连续内存区域的高效读写和共享操作:

  • Bytes:提供了高效只读的 zero-copy 的内存操作,支持通过引用计数方式的共享内存区域操作,实现了 Send/Sync, 可以在多线程中高效共享数据。
  • BytesMut:提供了高效的可读写、自动扩充的内存区域操作。

Bytes 和 BytesMut 实现了 Deref<Target=[u8]>,可以当做高效的 &[u8]/&mut[u8] 来使用。

trait Buf/BufMut 提供了对内部 buffer 的大小端和连续读写方法,不要该 buffer 是连续存储的,自动更新内部读写指针:

  • Buf:支持 get_u16_le() 等大小端读取方式,get_XX() 操作自动更新内部 buffer 的读位置指针,从而支持连续读内存。
  • BufMut:支持 put_u16_le() 等大小端写入方式,put_XX() 操作自动更新内部 buffer 的写位置指针,从而支持连续写内存。

Bytes 实现了 Buf traitBytesMut 实现了 Buf/BufMut trait

struct Bytes
#

struct Bytes 为连续内存区域提供了只读的 zero copy(基于引用计数)的 slice/split/split_off/clone() 操作, 实现高效的共享读取:

  • 实现 bytes::Buf trait
  • 实现了各种 From<T> trait, T 可以是 &[u8]/&str/Bytes/BytesMut/String/Vec<u8>
  • 实现了 Deref<Target=[u8]>,可以当做高效的 &[u8] 来使用;
use bytes::Bytes;

let b = Bytes::new();
// b[..] 根据 Deref<Target=[u8]> 转换为 [u8] ,然后调用其 index() 方法
assert_eq!(&b[..], b"");

let b = Bytes::from_static(b"hello");
assert_eq!(&b[..], b"hello");

// b"hello" 为数组,&b"hello"[...] 类型为 &[u8]
let b = Bytes::from(&b"hello"[..]); 
assert_eq!(b.len(), 5);

let mut mem = Bytes::from("Hello world");

// slice() 没有内存拷贝, 只是增加引用计数,返回一个新的 Bytes 对象
let a = mem.slice(0..5);
assert_eq!(a, "Hello");

// clone() 没有内存拷贝, 只是增加引用计数,返回一个新的 Bytes 对象
let mem2 = mem.clone();

let bytes = Bytes::from(&b"012345678"[..]);
// Bytes 实现了 AsRef<[u8]> trait, as_ref() 没有内存拷贝
let as_slice = bytes.as_ref();
let subset = &as_slice[2..6];
// subset 使用 Bytes 生成的, slice_ref() 从一个 slice 生成 Bytes
let subslice = bytes.slice_ref(&subset);
// bytes/subset/subslice 都共享同一个内存区域,没有内存拷贝
assert_eq!(&subslice[..], b"2345");

// split_off 将 Bytes 拆为两个:self 包含 [0, at),返回的 Bytes 包含 [at, len)
let mut a = Bytes::from(&b"hello world"[..]);
let b = a.split_off(5);
assert_eq!(&a[..], b"hello");
assert_eq!(&b[..], b" world");

// split_to 将 Bytes 拆为两个:self 包含 [at, len), 返回 Bytes 包含 [0, at).
let b = mem.split_to(6); // 内存 zero copy
assert_eq!(mem, "world");
assert_eq!(b, "Hello ");

let mut buf = Bytes::from(&b"hello world"[..]);
buf.truncate(5);
assert_eq!(buf, b"hello"[..]);

let mut buf = Bytes::from(&b"hello world"[..]);
buf.clear();
assert!(buf.is_empty());

Bytes 内部包读取位置的指针:

  • 实现 AsRef<[u8]>, Borrow<[u8]>, Deref<Target=[u8]> trait,返回的 slice [u8] 是未读区域长度;

  • 实现 bytes::Buf trait。当调用 bytes::Buf 方法, 如 get_u8/get_u16() 时自动更新读指针 ,在连续调用时返回下一个 u8/u16 内容。

    • Buf::len():返回未读取的内容长度;
    • &Buf[..]:返回未读取的内容 slice;
use bytes::Buf;
use bytes::Bytes;

let mut buf = Bytes::from("hello world");
// 使用 bytes::Buf 的 get_XX() 方法读取时,自动更新内部 cursor 指针,这样连续调用时返回下一个内存区域内容。
assert_eq!(b'h', buf.get_u8());
assert_eq!(b'e', buf.get_u8());
assert_eq!(b'l', buf.get_u8());

// 返回未读取的数据长度
println!("remainining: {}", buf.remaining()); // 8

// &buf[..] 使用 Bytes 实现的 Deref<Target=[u8]>,返回的 slice [u8] 是未读取的内容,所以 len() 也是 8;
let buf2: &[u8] = &buf[..];
println!("len: {}, buf2: {:?}", buf.len(), buf2); // len: 8, buf2: [108, 111, 32, 119, 111, 114, 108, 100]

let mut rest = [0; 8];
// 将未读取的内容 copy 到传入的 slice
buf.copy_to_slice(&mut rest);
// bytes: after copy, len: 0, slice: []
assert_eq!(&rest[..], &b"lo world"[..]);

// &[u8], Box<T>, &mut T, VecDeque<u8>, Cursor<T>,Bytes,BytesMut 等都实现了 bytes::Buf
let mut buf = &b"hello world"[..];
assert_eq!(buf.remaining(), 11);
buf.get_u8();
assert_eq!(buf.remaining(), 10);

struct BytesMut
#

BytesMut 提供一个高效共享和读写的连续内存区域, 内部包含写指针,写入时自动扩充底层内存区域:

use bytes::{BytesMut, BufMut};

// BytesMut 实现了 BufMut trait,调用 BufMut Trait 的方法时,内部自动更新写 cursor,所以连续的写操作会依次更新内存区域。
let mut buf = BytesMut::with_capacity(64);
buf.put_u8(b'h');
buf.put_u8(b'e');
buf.put(&b"llo"[..]);

// 由于内部有读写 cursor,所以 &buf[..] 不需要指定 range,返回未读取的内容
assert_eq!(&buf[..], b"hello");

// 冻结 buffer 后可以共享,buf 是不可变的。
let a = buf.freeze();

// 基于引用计数的 clone,共享同一个连续内存区域。
let b = a.clone();
assert_eq!(&a[..], b"hello");
assert_eq!(&b[..], b"hello");

BytesMut 实现了 bytes::Bufbytes::BufMut traitAsMut<[u8]>, BorrowMut<[u8]>, DerefMut<Target=[u8]>

Buf/BufMut trait
#

Buf/BufMut trait 提供了大小段读写缓冲区的方法,而且内部维护读写指针,适合于对缓存的连续读写:

// 实现 Buf 的类型
impl Buf for &[u8]
impl Buf for VecDeque<u8>
impl Buf for Bytes
impl Buf for BytesMut
impl<T: Buf + ?Sized> Buf for &mut T
impl<T: Buf + ?Sized> Buf for Box<T>
impl<T: AsRef<[u8]>> Buf for Cursor<T>
impl<T, U> Buf for Chain<T, U> where T: Buf, U: Buf
impl<T: Buf> Buf for Take<T>

// 实现 BufMut 的类型
impl BufMut for Vec<u8>
impl BufMut for &mut [MaybeUninit<u8>]
impl BufMut for &mut [u8]
impl BufMut for BytesMut
impl<T: BufMut + ?Sized> BufMut for &mut T
impl<T: BufMut + ?Sized> BufMut for Box<T>
impl<T, U> BufMut for Chain<T, U>where T: BufMut, U: BufMut
impl<T: BufMut> BufMut for Limit<T>

BytesMut 自动扩缩容内部 buff,非常适合作为 Buf/BufMut 的实现类型:

use bytes::BufMut;
use bytes::Buf;
use bytes::BytesMut;

fn main() {
    let mut buf = BytesMut::with_capacity(64);
    buf.put_u8(b'h');
    buf.put_u8(b'e');
    buf.put(&b"llo"[..]);
    println!("len: {}, content: {:#?}", buf.len(), String::from_utf8(buf[..].to_vec())); // 5 和 hello

    // 读一个字节后, buf.len() 减少, buf[..] 返回剩余未读的内容
    buf.get_u8();
    println!("len: {}, content: {:#?}", buf.len(), String::from_utf8(buf[..].to_vec())); // 4 和 ello

    buf.put_u8(b'!');  // 继续上次写的位置写入
    buf.put_u8(b'!');
    println!("len: {}, content: {:#?}", buf.len(), String::from_utf8(buf[..].to_vec())); // 6 和 ello!!

    let bytes = buf.copy_to_bytes(5);
    // Buf trait 的 copy_to_bytes/copy_to_slice 都会读消耗缓存
    println!("len: {}, content: {:#?}", buf.len(), String::from_utf8(buf[..].to_vec())); // 1 和 !
}

使用 &mut [u8] 创建的 BufMut 风险
#

BufMut 自动增加底层内存区域,一般使用支持自动扩容的 Vec<u8>BytesBuf 类型来创建。

但如果从固定大小的 &mut[u8] 创建,put_XX() 操作不会自动扩充底层的 &mut[u8], 当写入的内容超过它的 length 时会 panic。

另外,put_XX() 操作会修改 &mut[8] 的内存起始地址, 导致 buf[..] 返回的是还剩的没有写入的的内存内容;

解决办法: 使用能自动扩容的 Vec<u8>bytes.BytesMut 类型来作为 BufMut 类型。

use bytes::BufMut;

fn main() {
    let mut v = vec![1,2]; // 长度和容量为 2;

    // buf 是固定 length 的 &mut [u8]
    let mut buf :&mut [u8] = &mut v;
    println!("{} {}", buf.remaining_mut(), buf2.remaining());

    buf.put_u8(b'h');
    println!("len: {}, content: {:#?}", buf.len(), String::from_utf8(buf[..].to_vec()));

    buf.put_u8(b'e');
    println!("len: {}, content: {:#?}", buf.len(), String::from_utf8(buf[..].to_vec()));

    buf.put(&b"llo"[..]); // panic
    println!("len: {}, content: {:#?}", buf.len(), String::from_utf8(buf[..].to_vec()));
}

// 执行失败:
// Exited with status 101

// Standard Error

//    Compiling playground v0.0.1 (/playground)
//     Finished dev [unoptimized + debuginfo] target(s) in 0.37s
//      Running `target/debug/playground`
// thread 'main' panicked at /playground/.cargo/registry/src/index.crates.io-6f17d22bba15001f/bytes-1.5.0/src/buf/buf_mut.rs:201:9:
// assertion failed: self.remaining_mut() >= src.remaining()
// note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

// Standard Output

// remaining_mut: 2
// len: 1, remaining_mut: 1, content: Ok(  // buf[..] 返回的是剩下的可写内容
//     "\u{2}",
// )
// len: 0, remaining_mut: 0, content: Ok(
//     "",
// )

tokio 的 AsyncReadExt/AsyncWriteExt 使用 Buf/BufMut
#

tokio::ioAsyncReadExt/AsyncWriteExt trait 使用这两个 trait:

pub trait AsyncReadExt: AsyncRead {}
    fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B> where Self: Unpin, B: BufMut + ?Sized,

pub trait AsyncWriteExt: AsyncWrite {}
    fn write_buf<'a, B>(&'a mut self, src: &'a mut B) -> WriteBuf<'a, Self, B> where Self: Sized + Unpin, B: Buf,

tokio::io::AsyncReadExt trait 提供了使用 BufMut 作为 buff 的 read_buf() 方法, 可以方便用来连续写:

fn read_buf<'a, B>(&'a mut self, buf: &'a mut B) -> ReadBuf<'a, Self, B  where Self: Unpin, B: BufMut + ?Sized

// 例子
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
use bytes::BytesMut;

#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = BytesMut::with_capacity(10);
    assert!(buffer.is_empty());
    assert!(buffer.capacity() >= 10);

    // 不需要返回读入的字节数, 因为 BytesMut 内部自动维护读写 cursor
    f.read_buf(&mut buffer).await?;
     // 返回读到的所有内容
    println!("The bytes: {:?}", &buffer[..]);
    Ok(())
}

// 如果使用普通的 slice buffer, 则需要返回读到的字节数
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
#[tokio::main]
async fn main() -> io::Result<()> {
    let mut f = File::open("foo.txt").await?;
    let mut buffer = [0; 10];
    // 最多读取 10 bytes 数据。
    let n = f.read(&mut buffer[..]).await?;
    println!("The bytes: {:?}", &buffer[..n]);
    Ok(())
}

BufMut 是自动增长且内部维护读写指针和 buff 内存区域,所以非常适合 loop 循环读写场景:

// https://tokio.rs/tokio/tutorial/framing
use tokio::io::AsyncReadExt;
use bytes::Buf;
use mini_redis::Result;

pub async fn read_frame(&mut self) -> Result<Option<Frame>>
{
    loop {
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // self.buffer 内部维护写指针,每次调用 read_buf() 来写 buffer 时都从上次写入的位置开始
        if 0 == self.stream.read_buf(&mut self.buffer).await? {
            if self.buffer.is_empty() {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        }
    }
}

如果使用基于 Vec 的不带写 cursor 的 buff 实现, 会复杂很多:

  1. 使用 Vec 来作为 buffer, 并用 0 值初始化;
  2. 手动维护 cursor 的位置, 当 cursor 达到 Vec 的 length 时, 需要扩容 Vec;
  3. 向 Vec 写数据时, read(&mut[buf]) 方法需要知道传入的的 buff 的 length;
  4. Vec 会一直在增长, 缺少缩容机制.
use tokio::net::TcpStream;

pub struct Connection {
    stream: TcpStream,
    buffer: Vec<u8>,
    cursor: usize,
}

impl Connection {
    pub fn new(stream: TcpStream) -> Connection {
        Connection {
            stream,
            buffer: vec![0; 4096], // len() 和 capacity() 均为 4096
            cursor: 0,
        }
    }
}

use mini_redis::{Frame, Result};

pub async fn read_frame(&mut self) -> Result<Option<Frame>>
{
    loop {
        if let Some(frame) = self.parse_frame()? {
            return Ok(Some(frame));
        }

        // 刚开始 self.buffer.len() 为 4096, self.cursor 为 0,resize() 会自动扩容 Vec 的容量.
        if self.buffer.len() == self.cursor {
            // 将 buff len 增加为指定的长度, 新增的元素用 0 填充。
            self.buffer.resize(self.cursor * 2, 0);
        }

        // 传入的 &mut[u8] 的 len 是 Vec 的 len - cursor, read() 确保读取的直接长度 n < length
        let n = self.stream.read(&mut self.buffer[self.cursor..]).await?;
        if 0 == n {
            if self.cursor == 0 {
                return Ok(None);
            } else {
                return Err("connection reset by peer".into());
            }
        } else {
            self.cursor += n;
        }
    }
}
rust crate - 这篇文章属于一个选集。
§ 4: 本文

相关文章

anyhow
·
anyhow crate 提供了自定义 Error 类型和 Result 类型,Error 类型自带 backtrace 和 context,支持用户友好的格式化信息输出。
chrono
·
chrono 提供了丰富的 Date/Time 类型和相关操作。
hyper
·
hyper 是高性能的异步 HTTP 1/2 底层库。
serde
·
Rust 主流的序列化/反序列化库。