跳过正文

sqlx

··6283 字
Rust Rust-Crate
目录
rust crate - 这篇文章属于一个选集。
§ 18: 本文

1 SQLx CLI
#

安装 sqlx-cli:

# supports all databases supported by SQLx
$ cargo install sqlx-cli

# only for postgres
$ cargo install sqlx-cli --no-default-features --features native-tls,postgres

# use vendored OpenSSL (build from source)
$ cargo install sqlx-cli --features openssl-vendored

# use Rustls rather than OpenSSL (be sure to add the features for the databases you intend to
# use!)
$ cargo install sqlx-cli --no-default-features --features rustls

sqlx 默认是在线模式,需要使用环境变量 DATABASE_URL 来连接数据库:

DATABASE_URL=postgres://postgres@localhost/my_database

创建数据库:

sqlx database create

创建和运行 migration:

# 创建一个 migrations/<timestamp>-<name>.sql 文件
sqlx migrate add <name>

# 比较 ./migrations 中文件和数据库的情况,执行没有执行过的 migrations sql 文件,同时校验已经执行过
# 的 sql 文件是否有变更。
sqlx migrate run

# 使用 --source 来指定 migrations 目录

# add 时加 -r 表示同时生成 revert sql 文件。后续所有 add 时不管是否加 -r,都会生成 revert sql 文件。
sqlx migrate add -r <name>
Creating migrations/20211001154420_<name>.up.sql
Creating migrations/20211001154420_<name>.down.sql

sqlx 还支持 offline 模式:

# 1. 保存 offline 使用的 query metadata 到当前目录的 .sqlx 文件
cargo sqlx prepare

# 2. 检查 .sqlx 中的 metadata 是否与数据库或 query 中一致(可选)
cargo sqlx prepare --check

强制使用 offline 模式:DATABASE_URL 环境变量的优先级比 .sqlx 高, 为了强制使用 offline 模式,可以添加环境变量 SQLX_OFFLINE=true

2 类型
#

以 sqlx::mysql::types module 为例,用于在 Rust 类型和 mysql 类型间实现转换。

Rust type MySQL/MariaDB type(s) bool TINYINT(1), BOOLEAN, BOOL (see below) i8 TINYINT i16 SMALLINT i32 INT i64 BIGINT u8 TINYINT UNSIGNED u16 SMALLINT UNSIGNED u32 INT UNSIGNED u64 BIGINT UNSIGNED f32 FLOAT f64 DOUBLE &str, String VARCHAR, CHAR, TEXT &[u8], Vec<u8> VARBINARY, BINARY, BLOB IpAddr VARCHAR, TEXT Ipv4Addr INET4 (MariaDB-only), VARCHAR, TEXT Ipv6Addr INET6 (MariaDB-only), VARCHAR, TEXT MySqlTime TIME (encode and decode full range) Duration TIME (for decoding positive values only)

chrono:Requires the chrono Cargo feature flag. Rust type MySQL/MariaDB type(s) chrono::DateTime<Utc> TIMESTAMP chrono::DateTime<Local> TIMESTAMP chrono::NaiveDateTime DATETIME chrono::NaiveDate DATE chrono::NaiveTime TIME (time-of-day only) chrono::TimeDelta TIME (decodes full range; see note for encoding)

uuid:Requires the uuid Cargo feature flag. Rust type MySQL/MariaDB type(s) uuid::Uuid BINARY(16), VARCHAR, CHAR, TEXT uuid::fmt::Hyphenated CHAR(36), UUID (MariaDB-only) uuid::fmt::Simple CHAR(32)

json:Requires the json Cargo feature flag. Rust type MySQL/MariaDB type(s) Json<T> JSON serde_json::JsonValue JSON &serde_json::value::RawValue JSON

Nullable:In addition, Option<T> is supported where T implements Type. An Option<T> represents a potentially NULL value from MySQL/MariaDB.

3 使用
#

创建单连接:

use sqlx::Connection;

let conn = SqliteConnection::connect("sqlite::memory:").await?;

使用连接池(建议):

let pool = MySqlPool::connect("mysql://user:pass@host/database").await?;

SQL 查询有两种方式:

  1. prepared query:先将 query 发送给 server 解析,返回解析后的 id,后续可以多次复用。后续使用时,需要传入参数列表,可以避免 SQL 注入;
  2. unprepared query:直接发送给服务器执行;

sqlx 支持这两种查询方式:

  1. &str 为 unprepared query;
  2. Query 或 QueryAs struct 为 prepared query;

一般建议使用更高层的 sqlx::query() 系列函数,然后调用它们实现的 Executor trait 方法:

  • execute(): 返回受影响的 row 数量,类型: sqlx::Result<DB::QueryResult> ,例如 For INSERT/UPDATE/DELETE without RETURNING.
  • fetch():返回多行记录的 stream;
  • fetch_one():必须返回一行记录;
  • fetch_optional():返回 0 或 1 行记录;
  • fetch_all():返回多行记录到 Vec 中;
sqlx::query("DELETE FROM table").execute(&mut conn).await?;
sqlx::query("DELETE FROM table").execute(&pool).await?;

fetch() 系列函数返回 Row<‘conn> trait,可以使用 get()/try_get() 来获取各列:

// provides `try_next`
use futures::TryStreamExt;
// provides `try_get`
use sqlx::Row;

let mut rows = sqlx::query("SELECT * FROM users WHERE email = ?")
    .bind(email)
    .fetch(&mut conn);

while let Some(row) = rows.try_next().await? {
    // map the row into a user-defined domain type
    let email: &str = row.try_get("email")?;
}

// 也可以shyong map 来转换记录:
let mut stream = sqlx::query("SELECT * FROM users")
    .map(|row: PgRow| {
        // map the row into a user-defined domain type
    })
    .fetch(&mut conn);

使用 query_as() 来自动将查询记录解析到实现 FromRow 的自定义类型:

#[derive(sqlx::FromRow)]
struct User { name: String, id: i64 }

let mut stream = sqlx::query_as::<_, User>("SELECT * FROM users WHERE email = ? OR name = ?")
    .bind(user_email)
    .bind(user_name)
    .fetch(&mut conn);

另外,还可以使用 query!()/query_as!() 宏来进行查询,这两个宏会生成包含结果字段的匿名对象(可以通过 field 直接访问,而不需要像上面那样通过 get(“field_name”) 来获得值),同时还在编译时对查询 SQL 语句进行验证。

4 连接
#

sqlx::ConnectOptions trait:

  • 方法 connect() 从 ConnectOptions 创建 Connection:
pub trait ConnectOptions:
    'static
    + Send
    + Sync
    + FromStr<Err = Error>
    + Debug
    + Clone {
    type Connection: Connection<Options = Self> + ?Sized;

    // Required methods
    fn from_url(url: &Url) -> Result<Self, Error>;
    fn connect(&self,) -> Pin<Box<dyn Future<Output = Result<Self::Connection, Error>> + Send + '_>>
       where Self::Connection: Sized;
    fn log_statements(self, level: LevelFilter) -> Self;
    fn log_slow_statements(self, level: LevelFilter, duration: Duration) -> Self;

    // Provided methods
    fn to_url_lossy(&self) -> Url { ... }
    fn disable_statement_logging(self) -> Self { ... }
}

impl ConnectOptions for AnyConnectOptions
impl ConnectOptions for MySqlConnectOptions
  type Connection = MySqlConnection
impl ConnectOptions for PgConnectOptions
impl ConnectOptions for SqliteConnectOptions

sqlx::Connection trait:

  • connect(url): 创建连接;
  • connect_with(connectionOptions): 使用指定的 ConnectOptions 创建连接;
pub trait Connection: Send {
    type Database: Database<Connection = Self>;
    type Options: ConnectOptions<Connection = Self>;

    // Required methods
    fn close(self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
    fn ping(&mut self, ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>>;
    fn begin( &mut self, ) -> Pin<Box<dyn Future<Output = Result<Transaction<'_, Self::Database>, Error>> + Send + '_>> where Self: Sized;
    fn shrink_buffers(&mut self);

    // Provided methods
    fn transaction<'a, F, R, E>( &'a mut self, callback: F,
    ) -> Pin<Box<dyn Future<Output = Result<R, E>> + Send + 'a>>
       where F: for<'c> FnOnce(&'c mut Transaction<'_, Self::Database>) -> Pin<Box<dyn Future<Output = Result<R, E>> + Send + 'c>> + 'a + for<'c> Send + for<'c> Sync,
             Self: Sized,
             R: Send,
             E: From<Error> + Send { ... }
    fn cached_statements_size(&self) -> usize where Self::Database: HasStatementCache { ... }
    fn clear_cached_statements( &mut self, ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>>
       where Self::Database: HasStatementCache { ... }
    fn connect( url: &str, ) -> Pin<Box<dyn Future<Output = Result<Self, Error>> + Send>>
       where Self: Sized { ... }
    fn connect_with( options: &Self::Options, ) -> Pin<Box<dyn Future<Output = Result<Self, Error>> + Send + '_>>
       where Self: Sized { ... }
}


impl Connection for AnyConnection
impl Connection for MySqlConnection
 type Database = MySql
 type Options = MySqlConnectOptions
impl Connection for PgConnection
impl Connection for SqliteConnection

Struct sqlx::mysql::MySqlConnectOptions 实现了 ConnectOptions trait,同时也提供了 mysql 相关的连接配置参数:

  • 可以直接调用对应方法来设置连接参数,也可以从字符串解析连接参数(实现了 FromStr trait);
  • 使用 connect() 方法创建一个数据库连接;
  • 或者使用 Pool 的 connect_with(opts) 函数来创建一个连接池;
pub struct MySqlConnectOptions { /* private fields */ }

// mysql://[host][/database][?properties]

use sqlx::{Connection, ConnectOptions};
use sqlx::mysql::{MySqlConnectOptions, MySqlConnection, MySqlPool, MySqlSslMode};

// URL connection string
let conn = MySqlConnection::connect("mysql://root:password@localhost/db").await?;

// 手动构造连接参数
let conn = MySqlConnectOptions::new()
    .host("localhost")
    .username("root")
    .password("password")
    .database("db")
    .connect().await?; // 创建 Connection

// 从字符串解析连接参数
let mut opts: MySqlConnectOptions = "mysql://root:password@localhost/db".parse()?;

// 修改连接参数
opts = opts.log_statements(log::LevelFilter::Trace);

// 连接池使用连接参数
let pool = MySqlPool::connect_with(opts).await?;

Struct sqlx::MySqlConnection:

pub struct MySqlConnection { /* private fields */ }

impl<'c> Acquire<'c> for &'c mut MySqlConnection
impl AnyConnectionBackend for MySqlConnection

impl Connection for MySqlConnection
  type Database = MySql
  type Options = MySqlConnectOptions

impl Debug for MySqlConnection

impl<'c> Executor<'c> for &'c mut MySqlConnection

impl Migrate for MySqlConnection

5 Executor/Execute
#

各种数据库 连接和连接池 对象,如 MySqlConnection/MySqlPool,实现了 Trait sqlx::Executor,可以直接从 Connection/Pool 对象上发起 fetch/execute() 等查询:

  • 一般更建议使用 sqlx::query/query_as/query!/query_as!() 等函数或宏来进行查询,需要给它们传入 &Pool;
pub trait Executor<'c>:
    Sized
    + Send
    + Debug {
        type Database: Database;

        // Required methods
        fn fetch_many<'e, 'q, E>(self, query: E, ) -> Pin<Box<dyn Stream<Item = Result<Either + Send + 'e>>
       where 'q: 'e,
             'c: 'e,
             E: 'q + Execute<'q, Self::Database>;
        fn fetch_optional<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Future<Output = Result<Option + Send + 'e>>
       where 'q: 'e,
             'c: 'e,
             E: 'q + Execute<'q, Self::Database>;
        fn prepare_with<'e, 'q>( self, sql: &'q str, parameters: &'e [<Self::Database as Database>::TypeInfo],
        ) -> Pin<Box<dyn Future<Output = Result + Send + 'e>>
       where 'q: 'e,
             'c: 'e;

        // Provided methods
        fn execute<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Future<Output = Result + Send + 'e>>
       where 'q: 'e,
             'c: 'e,
             E: 'q + Execute<'q, Self::Database> { ... }

        // 返回的是 Stream, 可以迭代返回结果
        fn execute_many<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Stream<Item = Result + Send + 'e>>
       where 'q: 'e,
             'c: 'e,
             E: 'q + Execute<'q, Self::Database> { ... }

        // 返回的是 Stream, 可以迭代返回结果
        fn fetch<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Stream<Item = Result + Send + 'e>>
       where 'q: 'e,
             'c: 'e,
             E: 'q + Execute<'q, Self::Database> { ... }

        // 返回的是 Vec<Row>
        fn fetch_all<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Future<Output = Result<Vec + Send + 'e>>
       where 'q: 'e,
             'c: 'e,
             E: 'q + Execute<'q, Self::Database> { ... }
        fn fetch_one<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Future<Output = Result + Send + 'e>>
       where 'q: 'e,
             'c: 'e,
             E: 'q + Execute<'q, Self::Database> { ... }
        fn prepare<'e, 'q>( self, query: &'q str, ) -> Pin<Box<dyn Future<Output = Result + Send + 'e>>
       where 'q: 'e,
             'c: 'e { ... }
    }

// 各种数据库 Connection 以及连接池 Pool 均实现了 Executor trait:
impl<'c> Executor<'c> for &'c mut PgListener
impl<'c> Executor<'c> for &'c mut AnyConnection
impl<'c> Executor<'c> for &'c mut MySqlConnection
impl<'c> Executor<'c> for &'c mut PgConnection
impl<'c> Executor<'c> for &'c mut SqliteConnection
impl<'p, DB> Executor<'p> for &Pool<DB>
where
    DB: Database,
    &'c mut <DB as Database>::Connection: for<'c> Executor<'c, Database = DB>

sqlx::Executor 的各方法的 query 参数是 Execute 类型,&str/Query/QueryAs 等均实现了 Execute trait, 故可以作为 query 参数值。

  • &str 是非缓存的查询;而 Query/QueryAs 等是 prepared statement。
pub trait Execute<'q, DB>: Sized + Send
where
    DB: Database,
{
    // Required methods
    fn sql(&self) -> &'q str;
    fn statement(&self) -> Option<&<DB as Database>::Statement<'q>>;
    fn take_arguments(
        &mut self,
    ) -> Result<Option, Box<dyn Error + Sync + Send>>;
    fn persistent(&self) -> bool;
}

// &str 实现了 Execute
impl<'q, DB> Execute<'q, DB> for &'q str where DB: Database
impl<'q, DB> Execute<'q, DB> for (&'q str, Option) where DB: Database

// Query/Map/QueryAs 等也实现了 Execute trait
impl<'q, DB> Execute<'q, DB> for RawSql<'q> where DB: Database
impl<'q, DB, A> Execute<'q, DB> for Query<'q, DB, A> where DB: Database, A: Send + IntoArguments<'q, DB>

impl<'q, DB, F, A> Execute<'q, DB> for Map<'q, DB, F, A> where F: Send, A: Send + IntoArguments<'q, DB>, DB: Database

impl<'q, DB, O, A> Execute<'q, DB> for QueryAs<'q, DB, O, A>
where
    O: Send,
    A: Send + 'q + IntoArguments<'q, DB>,
    DB: Database

impl<'q, DB, O, A> Execute<'q, DB> for QueryScalar<'q, DB, O, A>
where
    DB: Database,
    O: Send,
    A: Send + 'q + IntoArguments<'q, DB>

示例:

// 数据库 Connection 实现了 Executor,故可以直接调用 execute/fetch() 等方法。
// 使用 &str 作为 Execute
conn.execute("BEGIN").await?; // unprepared, simple query
// 使用 Query/QueryAs 作为 Execute
conn.execute(sqlx::query("DELETE FROM table")).await?; // prepared, cached query

// provides `try_next`
use futures::TryStreamExt;
// provides `try_get`
use sqlx::Row;

let mut rows = sqlx::query("SELECT * FROM users WHERE email = ?")
    .bind(email)
    .fetch(&mut conn); // 返回的是一个 Stream

// 迭代 Stream
while let Some(row) = rows.try_next().await? {
    // 按照 column name 或 index 来获取列值并转换为 Rust 类型
    let email: &str = row.try_get("email")?;
}

#[derive(Debug)]
struct Product {
    name: String,
    price: f64,
}
async fn get_products(pool: &PgPool, min_price: f64) -> Result<Vec<Product>, sqlx::Error> {
    let products = sqlx::query_as!(Product, r#"SELECT name, price FROM products WHERE price > $1"#,       min_price    )
        .fetch_all(pool) // 返回一个 Vec
        .await?;
    Ok(products)
}

// Connection 提供了事务能力
use sqlx::postgres::{PgConnection, PgRow};
use sqlx::Connection;
conn.transaction(|txn| Box::pin(async move {
    sqlx::query("select * from ..").fetch_all(&mut **txn).await
})).await

6 Row
#

Executor 的方法, 如 fetch()/fetch_one() 等返回的类型是 Row/Vec<Row> 等, 而各数据库类型则该 Row trait,如 MySqlRow:

pub trait Row: Unpin + Send + Sync + 'static {
    type Database: Database<Row = Self>;

    // Required methods
    fn columns(&self) -> &[<Self::Database as Database>::Column];
    fn try_get_raw<I>( &self, index: I, ) -> Result<<Self::Database as Database>::ValueRef<'_>, Error>
       where I: ColumnIndex<Self>;

    // Provided methods
    fn is_empty(&self) -> bool { ... }
    fn len(&self) -> usize { ... }
    fn column<I>(&self, index: I) -> &<Self::Database as Database>::Column
       where I: ColumnIndex<Self> { ... }
    fn try_column<I>( &self, index: I, ) -> Result<&<Self::Database as Database>::Column, Error>
       where I: ColumnIndex<Self> { ... }

    // 使用 index 获取 column 值
    fn get<'r, T, I>(&'r self, index: I) -> T
       where I: ColumnIndex<Self>, T: Decode<'r, Self::Database> + Type<Self::Database> { ... }
    fn get_unchecked<'r, T, I>(&'r self, index: I) -> T
       where I: ColumnIndex<Self>, T: Decode<'r, Self::Database> { ... }
    fn try_get<'r, T, I>(&'r self, index: I) -> Result<T, Error>
       where I: ColumnIndex<Self>, T: Decode<'r, Self::Database> + Type<Self::Database> { ... }
    fn try_get_unchecked<'r, T, I>(&'r self, index: I) -> Result<T, Error>
       where I: ColumnIndex<Self>, T: Decode<'r, Self::Database> { ... }
}

get() 使用的 index 是 ColumnIndex 类型, &str 和 usize 实现了 ColumnIndex, 即可以按照 column name或 index 值来获取结果 Row 中对应列的值:

pub trait ColumnIndex<T>: Debug where T: ?Sized,
{
    // Required method
    fn index(&self, container: &T) -> Result<usize, Error>;
}

// &str 和 usize 实现了 ColumnIndex
impl ColumnIndex<MySqlRow> for &str
impl ColumnIndex<MySqlRow> for usize
impl ColumnIndex<MySqlStatement<'_>> for &str
impl ColumnIndex<MySqlStatement<'_>> for usize

struct MySqlRow 实现了 Row trait:

pub struct MySqlRow { /* private fields */ }

impl ColumnIndex<MySqlRow> for &str
impl ColumnIndex<MySqlRow> for usize
impl Debug for MySqlRow
impl Row for MySqlRow
impl<'a> TryFrom<&'a MySqlRow> for AnyRow

示例:

// provides `try_next`
use futures::TryStreamExt;
// provides `try_get`
use sqlx::Row;

let mut rows = sqlx::query("SELECT * FROM users WHERE email = ?")
    .bind(email)
    .fetch(&mut conn); // 返回的是一个 Stream

// 迭代 Stream
while let Some(row) = rows.try_next().await? {
    // 按照 column name 或 index 来获取列值并转换为 Rust 类型
    let email: &str = row.try_get("email")?;
}

7 连接池
#

sqlx 一般使用连接池对象来执行 query 等操作。

创建连接池有两种方式:

  1. PgPool::connect(url): 使用缺省参数创建 PgPool 对象;
  2. PgPoolOptions::new().max_connnections(5).connect(url): 使用 PgPoolOptions 来设置连接池参数,然后创建 PgPool;

两种方式使用连接池 PgPool:

  1. PgConnection 和 PgPool 均实现了 Executor trait, 也可以直接执行 execute()/fetch() 等方法;
  2. query()/query_as()/query!()/query_as!() 返回的 Query/QueryAs 对象实现了 Executor trait,它的各方法,如execute()/fetch()/fetch_all() 等的参数是 &PgPool; 《– 建议的方式

使用 Pool::connect(url)/connect_with(connectionOptions) 来创建 Pool,然后使用 Pool::acquire() 来获得一个 PoolConnection 对象,当该对象被 drop 时,连接将返回到 Pool 中。

  • PoolOptions 配置的 Pool 自身,而 ConnectionOptions,如 MysqlConnectionOptions 配置的是 Pool 中的 Connnection,两者是不同的维度。
pub struct Pool<DB>(/* private fields */) where DB: Database;

// 方便直接使用的类型别名
pub type MySqlPool = Pool<Mysql>;
pub type PgPool = Pool<Postgres>;
pub type SqlitePool = Pool<Sqlite>;

// Pool 对象方法
impl<DB> Pool<DB> where DB: Database

//使用默认参数创建 Pool
pub async fn connect(url: &str) -> Result<Pool<DB>, Error>
// 使用 ConnecitonOptions 来自定义 Pool 中的 Connection
pub async fn connect_with( options: <<DB as Database>::Connection as Connection>::Options,) -> Result<Pool<DB>, Error>
pub fn connect_lazy(url: &str) -> Result<Pool<DB>, Error>
pub fn connect_lazy_with(options: <<DB as Database>::Connection as Connection>::Options,) -> Pool<DB>

// 从连接池中获得连接 PoolConnection
pub fn acquire(&self,) -> impl Future<Output = Result<PoolConnection<DB>, Error>> + 'static
pub fn try_acquire(&self) -> Option<PoolConnection<DB>>

// 创建事务
pub async fn begin(&self) -> Result<Transaction<'static, DB>, Error>
pub async fn try_begin(&self) -> Result<Option<Transaction<'static, DB>>, Error>

pub fn close(&self) -> impl Future<Output = ()>
pub fn is_closed(&self) -> bool
pub fn close_event(&self) -> CloseEvent

pub fn size(&self) -> u32
pub fn num_idle(&self) -> usize

// 设置连接参数
pub fn connect_options(&self,) -> Arc<<<DB as Database>::Connection as Connection>::Options>
pub fn set_connect_options(&self, connect_options: <<DB as Database>::Connection as Connection>::Options,)
pub fn options(&self) -> &PoolOptions<DB>

一般使用 Struct sqlx::pool::PoolOptions 或类型别名如 PgPoolOptions 来先设置连接池参数,如最大连接数,空闲时间等,然后使用 connect() 等方法来创建 Pool:

impl<DB> PoolOptions<DB> where DB: Database

// 类型别名
pub type PgPoolOptions = PoolOptions<Postgres>;

// PoolOptions 的方法
pub fn new() -> PoolOptions<DB>

pub fn max_connections(self, max: u32) -> PoolOptions<DB>
pub fn get_max_connections(&self) -> u32

pub fn min_connections(self, min: u32) -> PoolOptions<DB>
pub fn get_min_connections(&self) -> u32

pub fn acquire_time_level(self, level: LevelFilter) -> PoolOptions<DB>
pub fn acquire_slow_level(self, level: LevelFilter) -> PoolOptions<DB>
pub fn acquire_slow_threshold(self, threshold: Duration) -> PoolOptions<DB>
pub fn get_acquire_slow_threshold(&self) -> Duration
pub fn acquire_timeout(self, timeout: Duration) -> PoolOptions<DB>
pub fn get_acquire_timeout(&self) -> Duration

pub fn max_lifetime(self, lifetime: impl Into<Option<Duration>>,) -> PoolOptions<DB>
pub fn get_max_lifetime(&self) -> Option<Duration>

pub fn idle_timeout( self, timeout: impl Into<Option<Duration>>, ) -> PoolOptions<DB>
pub fn get_idle_timeout(&self) -> Option<Duration>

pub fn test_before_acquire(self, test: bool) -> PoolOptions<DB>
pub fn get_test_before_acquire(&self) -> bool

pub fn after_connect<F>(self, callback: F) -> PoolOptions<DB>
where
    F: for<'c> Fn(&'c mut <DB as Database>::Connection, PoolConnectionMetadata) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'c>> + 'static + for<'c> Send + for<'c> Sync

pub fn before_acquire<F>(self, callback: F) -> PoolOptions<DB>
where
    F: for<'c> Fn(&'c mut <DB as Database>::Connection, PoolConnectionMetadata) -> Pin<Box<dyn Future<Output = Result<bool, Error>> + Send + 'c>> + 'static + for<'c> Send + for<'c> Sync

pub fn after_release<F>(self, callback: F) -> PoolOptions<DB>
where
    F: for<'c> Fn(&'c mut <DB as Database>::Connection, PoolConnectionMetadata) -> Pin<Box<dyn Future<Output = Result<bool, Error>> + Send + 'c>> + 'static + for<'c> Send + for<'c> Sync

// 创建 Pool<DB> 对象
pub async fn connect(self, url: &str) -> Result<Pool<DB>, Error>
// 使用 ConnecitonOptions 来自定义 Pool 中的 Connection
pub async fn connect_with(self, options: <<DB as Database>::Connection as Connection>::Options,
) -> Result<Pool<DB>, Error>
pub fn connect_lazy(self, url: &str) -> Result<Pool<DB>, Error>
pub fn connect_lazy_with(self, options: <<DB as Database>::Connection as Connection>::Options,) -> Pool<DB>

Pool 类型实现了 Executor trait,故支持各种 execute()/fetch()/fetch_one() 等方法。

Pool 实现了 Send, Sync and Clone ,故可以在多线程或异步任务中使用:

示例:

use sqlx::Pool;
use sqlx::postgres::Postgres;
let pool = Pool::<Postgres>::connect("postgres://").await?;

// 使用类型别名
use sqlx::mssql::MssqlPool;
let pool = MssqlPool::connect("mssql://").await?;

// Pool 实现了 Executor trait,故可以作为 execute/fetch() 等方法的参数。
sqlx::query("DELETE FROM articles").execute(&pool).await?;


use sqlx::postgres::PgPoolOptions;

#[async_std::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect("postgres://postgres:password@localhost/test").await?;

    let row: (i64,) = sqlx::query_as("SELECT $1")
        .bind(150_i64)
        .fetch_one(&pool).await?;

    assert_eq!(row.0, 150);

    Ok(())
}

use sqlx::{Executor, PgPool};

let pool = PgPool::connect("postgresql://...").await?; // 使用默认参数创建 Pool
let pool2 = pool.clone();
tokio::spawn(async move {
    let res: sqlx::Result<sqlx::Result<()>> = pool2.close_event().do_until(async {
        pool2.execute("SELECT pg_sleep('30 days')").await?;
        println!("Waited!");

        Ok(())
    }).await;

    match res {
        Ok(Ok(())) => println!("Wait succeeded"),
        Ok(Err(e)) => println!("Error from inside do_until: {e:?}"),
        Err(e) => println!("Error from do_until: {e:?}"),
    }
});

// This normally wouldn't return until the above statement completed and the connection was
// returned to the pool. However, thanks to `.do_until()`, the operation was cancelled as soon as
// we called `.close().await`.
pool.close().await;

8 query/query_as
#

sqlx 提供了如下函数,可以用来构造查询语句:

  1. query():执行一个带语句缓存的 SQL 查询;返回 struct Query 对象.
  2. query_as: 执行一个带语句缓存的 SQL 查询,同时将结果 Roes 映射到实现 FromRow 的 Rust 类型上, 返回 struct QueryAs 对象.
  3. query_as_with:和 query_as 类似,但是调用函数时需要传入绑定的参数列表;
  4. query_scalar:和 query 类似,但是只返回各 row 的第一列数据, 返回 struct QueryScalar 对象.
  5. query_scalar_with:和 query_scalar 类似,但调用函数时需要传入绑定的参数列表;
  6. query_with:和 query 类似,但调用函数时需要传入绑定的参数列表;
  7. raw_sql:在一个 raw SQL 中执行分号分割的一条或多条语句;

struct Query/QueryAs 对象常用方法:

  1. bind(): 设置绑定参数, 返回 Query, 故可以链式调用;
  2. map(): 传入一个闭包, 根据查询结果 Row 来返回映射的 Rust 类型;
  3. execute()/fetch()/fetch_all()/fetch_one()/fetch_optional() 执行查询, 参数是实现 Executor 的类型,如各种数据库连接池对象;
    • 注意, 这些方法和 Executor trait 的方法类似, 但是 struct Query/QueryAs 类型并没有实现 Executor trait;
// 返回代表 prepared statement 的 Query 对象
pub fn query<DB>(sql: &str) -> Query<'_, DB, <DB as Database>::Arguments<'_>> where DB: Database

// Query 类型
pub struct Query<'q, DB, A> where DB: Database, { /* private fields */ }

// Query 类型实现的方法
pub fn bind<T>(self, value: T) -> Query<'q, DB, <DB as Database>::Arguments<'q>>
  where T: 'q + Encode<'q, DB> + Type<DB>

pub fn try_bind<T>( &mut self, value: T,) -> Result<(), Box<dyn Error + Sync + Send>>
  where T: 'q + Encode<'q, DB> + Type<DB>

pub fn persistent(self, value: bool) -> Query<'q, DB, A>

pub fn map<F, O>( self, f: F,) -> Map<'q, DB, impl FnMut(<DB as Database>::Row) + Send, A>
  where
    F: FnMut(<DB as Database>::Row) -> O + Send,
    O: Unpin

pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, A>
  where F: FnMut(<DB as Database>::Row) -> Result<O, Error> + Send, O: Unpin

pub async fn execute<'e, 'c, E>(self, executor: E, ) -> Result<<DB as Database>::QueryResult, Error>
where
    'c: 'e,
    'q: 'e,
    A: 'e,
    E: Executor<'c, Database = DB>

pub fn fetch<'e, 'c, E>( self, executor: E,) -> Pin<Box<dyn Stream<Item = Result + Send + 'e>>
where
    'c: 'e,
    'q: 'e,
    A: 'e,
    E: Executor<'c, Database = DB>

pub async fn fetch_all<'e, 'c, E>( self, executor: E, ) -> Result<Vec<<DB as Database>::Row>, Error>
where
    'c: 'e,
    'q: 'e,
    A: 'e,
    E: Executor<'c, Database = DB>


pub async fn fetch_one<'e, 'c, E>( self, executor: E, ) -> Result<<DB as Database>::Row, Error>
where
    'c: 'e,
    'q: 'e,
    A: 'e,
    E: Executor<'c, Database = DB>

pub async fn fetch_optional<'e, 'c, E>( self, executor: E,) -> Result<Option<<DB as Database>::Row>, Error>
where
    'c: 'e,
    'q: 'e,
    A: 'e,
    E: Executor<'c, Database = DB>

示例:

// 不建议直接使用字符串替换,容易导致 SQL 注入漏洞
let user_input = "possibly untrustworthy input!";
let query = format!("SELECT * FROM articles WHERE content LIKE '%{user_input}%'");
let results = sqlx::query(&query).fetch_all(&mut conn).await?;

// 推荐使用 query + bind 方式,query 的查询语句会被解析后缓存(prepared statement)
let user_input = "Alice's Apples";
let results = sqlx::query(
    "SELECT * FROM articles WHERE title LIKE '%' || $1 || '%' OR content LIKE '%' || $1 || '%'")
    .bind(user_input)
    .fetch_all(&mut conn) // 返回 MySqlRow 类型对象, 实现了 Row trait, 可以通过 get(index)  获得列值.
    .await?;

// query_as() 可以指定要映射到的 Rust 类型, 该类型实现 FromRow.
// 注意: query_as!() 宏函数也可以指定映射到的 Rust 类型, 但是该类型不需要实现 FromRow
#[derive(sqlx::FromRow)]
struct User { name: String, id: i64 }
let mut stream = sqlx::query_as::<_, User>("SELECT * FROM users WHERE email = ? OR name = ?")
    .bind(user_email)
    .bind(user_name)
    .fetch(&mut conn);

// 使用 map() 对查询结果进行转换.
let mut stream = sqlx::query("SELECT * FROM users")
    .map(|row: PgRow| {
        // map the row into a user-defined domain type
    })
    .fetch(&mut conn);

在实际场景中, 更建议使用这些函数的 宏函数版本 , 因为 sqlx 在编译时会对宏函数版本进行编译检查.

9 编译时验证
#

通过使用宏函数,如 sqlx::query!/sqlx::query_as!(),可以在编译时对查询语句进行数据库验证,同时返回匿名的记录类型,这样可以使用 field 来访问数据各列:

let countries = sqlx::query!(
    "
SELECT country, COUNT(*) as count
FROM users
GROUP BY country
WHERE organization = ?
        ",
    organization // 给出所有参数值,而不需要调用 bind() 方法
)
    .fetch_all(&pool) // -> Vec<{ country: String, count: i64 }>
    .await?;

// countries[0].country
// countries[0].count


async fn list_todos(pool: &MySqlPool) -> anyhow::Result<()> {
    let recs = sqlx::query!(
        r#"
SELECT id, description, done
FROM todos
ORDER BY id
        "#
    )
        .fetch_all(pool)
        .await?;

    // NOTE: Booleans in MySQL are stored as `TINYINT(1)` / `i8`
    //       0 = false, non-0 = true
    for rec in recs {
        println!(
            "- [{}] {}: {}",
            if rec.done != 0 { "x" } else { " " },
            rec.id,
            &rec.description,
        );
    }

    Ok(())
}


pub async fn verify(self, db: impl PgExecutor<'_> + Send) -> Result<UserId> {
    self.validate()?;

    let maybe_user = sqlx::query!(
            r#"select user_id, password_hash from "user" where username = $1"#,
        self.username
    )
        .fetch_optional(db)
        .await?;

    if let Some(user) = maybe_user {
        let verified = crate::password::verify(self.password, user.password_hash).await?;

        if verified {
            return Ok(user.user_id);
        }
    }

    // Sleep a random amount of time to avoid leaking existence of a user in timing.
    let sleep_duration =
        rand::thread_rng().gen_range(Duration::from_millis(100)..=Duration::from_millis(500));
    tokio::time::sleep(sleep_duration).await;

    Err(Error::UnprocessableEntity(
        "invalid username/password".into(),
    ))
}

query!() 和 query() 的区别:

  1. 在宏参数中列出所有绑定参数值,而不需要调用 query() 返回的 Query 类型对象的 bind() 方法;
  2. 返回的结果用匿名记录类型来表示;
  3. 使用 DATABASE_URL 环境变量对应的数据库来进行编译时验证,数据库不要求有数据,但是类型和 schema必须正确;
    • 也可以使用 offline 验证模式;

query!() 返回的对象,可以使用的方法:

  1. 不需要返回值:使用 .execute(…).await,返回 sqlx::Result<DB::QueryResult> ,即影响的记录数量,如:For INSERT/UPDATE/DELETE without RETURNING.
  2. 0 或 1 返回值:使用 .fetch_optional(…).await,返回 sqlx::Result<Option<{adhoc struct}>>,即一条匿名类型,忽略剩余行;
  3. 1 条记录:使用 .fetch_one(…).await,返回 sqlx::Result<{adhoc struct}>,即一条匿名记录,如果没有查到记录,则返回 Err,多余一条的记录将被忽略;
  4. 至少 1 条记录:使用 .fetch(…),返回 impl Stream<Item = sqlx::Result<{adhoc struct}>>,后续调用 .try_next().await 来获取各行记录;
  5. 多条记录的 Vec:使用 .fetch_all(…),返回 sqlx::Result<Vec<{adhoc struct}>>;

query!() 的最大问题是不能自定义返回的匿名类型。

解决办法是使用可以指定返回类型的 query_as!() 宏:

// no traits are needed,不需要实现 FromRow trait
struct Country { country: String, count: i64 }

let countries = sqlx::query_as!(Country,
        "
SELECT country, COUNT(*) as count
FROM users
GROUP BY country
WHERE organization = ?
        ",
        organization
    )
    .fetch_all(&pool) // -> Vec<Country>
    .await?;

// countries[0].country
// countries[0].count

10 Migrator
#

Migrator 代表运行时迁移对象,可以使用 migrate!() 来进行创建,或使用 Migrator::new() 来在运行时创建:

对于每个应用的迁移,sqlx 会在数据库中记录对应的 version 和 checksum,后续可以指定 version 来回滚,或者校验 checksum 是否可以当前 migrations 目录中的定义的一致。

pub struct Migrator { /* private fields */ }

// 实现的方法
impl Migrator
pub async fn new<'s, S>(source: S) -> Result<Migrator, MigrateError> where S: MigrationSource<'s>

pub fn set_ignore_missing(&mut self, ignore_missing: bool) -> &Migrator

// 设置迁移时是否 lock 数据库,默认为 true
pub fn set_locking(&mut self, locking: bool) -> &Migrator

// 迭代获得所有一只的 Migration
pub fn iter(&self) -> Iter<'_, Migration>

pub fn version_exists(&self, version: i64) -> bool

// 运行 pending 的迁移,同时检查已经执行过的迁移是否和目录中的定义一致(用于发现历史迁移变化、不一
// 致的情况)。
pub async fn run<'a, A>(&self, migrator: A) -> Result<(), MigrateError>
where
    A: Acquire<'a>,
    ::Connection as Deref>::Target: Migrate

// 执行 down 迁移直到指定的 version
pub async fn undo<'a, A>(&self, migrator: A, target: i64, ) -> Result<(), MigrateError>
where
    A: Acquire<'a>,
    ::Connection as Deref>::Target: Migrate,

示例:

use std::path::Path;

// Read migrations from a local folder: ./migrations
let m = Migrator::new(Path::new("./migrations")).await?;
let pool = SqlitePoolOptions::new().connect("sqlite::memory:").await?;
m.run(&pool).await;
m.undo(&pool, 4).await;
rust crate - 这篇文章属于一个选集。
§ 18: 本文

相关文章

axum
··13854 字
Rust Rust-Crate
axum 是基于 hyper 实现的高性能异步 HTTP 1/2 Server 库。
clap
··5510 字
Rust Rust-Crate
clap 用于快速构建命令行程序,提供命令&参数定义、解析等功能。
config
··1978 字
Rust Rust-Crate
config 提供从文件或环境变量解析配置参数的功能。
diesel
··34358 字
Rust Rust-Crate
diesel 是高性能的 ORM 和 Query Builder,crates.io 使用它来操作数据库。