跳过正文

sqlx

·
目录
rust crate - 这篇文章属于一个选集。
§ 19: 本文

SQLx CLI
#

安装 sqlx-cli:

# supports all databases supported by SQLx
$ cargo install sqlx-cli

# only for postgres
$ cargo install sqlx-cli --no-default-features --features native-tls,postgres

# use vendored OpenSSL (build from source)
$ cargo install sqlx-cli --features openssl-vendored

# use Rustls rather than OpenSSL (be sure to add the features for the databases you intend to use!)
$ cargo install sqlx-cli --no-default-features --features rustls

sqlx 默认是在线模式,使用环境变量 DATABASE_URL 来连接数据库:

DATABASE_URL=postgres://postgres@localhost/my_database

创建数据库:

sqlx database create

创建和运行 migration,可以使用 --source 来指定 migrations 目录

# 创建一个 migrations/<timestamp>-<name>.sql 文件
sqlx migrate add <name>

# 比较 ./migrations 中文件和数据库的情况,执行没有执行过的 migrations sql 文件,同时校验已经执行过的 sql 文件是否有变更。
sqlx migrate run

# add 时加 -r 表示同时生成 revert sql 文件。后续所有 add 时不管是否加 -r,都会生成 revert sql 文件。
sqlx migrate add -r <name>
Creating migrations/20211001154420_<name>.up.sql
Creating migrations/20211001154420_<name>.down.sql

sqlx 还支持 offline 模式:

# 1. 保存 offline 使用的 query metadata 到当前目录的 .sqlx 文件
cargo sqlx prepare

# 2. 检查 .sqlx 中的 metadata 是否与数据库或 query 中一致(可选)
cargo sqlx prepare --check

强制使用 offline 模式:DATABASE_URL 环境变量的优先级比 .sqlx 文件高,为了强制使用 offline 模式,可以添加环境变量 SQLX_OFFLINE=true

安装依赖
#

sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "uuid", "chrono", "migrate"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
chrono = { version = "0.4", features = ["serde"] }
dotenvy = "0.15"
  • runtime-tokio-rustls: Use Tokio async runtime with TLS encryption for secure database connections
  • postgres: PostgreSQL database driver
  • uuid: Support for UUID data types (we’ll use these for unique IDs)
  • chrono: Date and time handling for timestamps
  • migrate: Database migration support for creating/updating tables
  • uuid: Generate unique identifiers for users, articles, comments
  • chrono: Handle created_at, updated_at timestamps in our data
  • dotenvy: Load environment variables from .env files

类型
#

sqlx::mysql::types module 为例,用于在 Rust 类型和 mysql 类型间实现转换。

Rust type MySQL/MariaDB type(s) bool TINYINT(1), BOOLEAN, BOOL (see below) i8 TINYINT i16 SMALLINT i32 INT i64 BIGINT u8 TINYINT UNSIGNED u16 SMALLINT UNSIGNED u32 INT UNSIGNED u64 BIGINT UNSIGNED f32 FLOAT f64 DOUBLE &str, String VARCHAR, CHAR, TEXT &[u8], Vec VARBINARY, BINARY, BLOB IpAddr VARCHAR, TEXT Ipv4Addr INET4 (MariaDB-only), VARCHAR, TEXT Ipv6Addr INET6 (MariaDB-only), VARCHAR, TEXT MySqlTime TIME (encode and decode full range) Duration TIME (for decoding positive values only)

chrono:Requires the chrono Cargo feature flag. Rust type MySQL/MariaDB type(s) chrono::DateTime TIMESTAMP chrono::DateTime TIMESTAMP chrono::NaiveDateTime DATETIME chrono::NaiveDate DATE chrono::NaiveTime TIME (time-of-day only) chrono::TimeDelta TIME (decodes full range; see note for encoding)

uuid:Requires the uuid Cargo feature flag. Rust type MySQL/MariaDB type(s) uuid::Uuid BINARY(16), VARCHAR, CHAR, TEXT uuid::fmt::Hyphenated CHAR(36), UUID (MariaDB-only) uuid::fmt::Simple CHAR(32)

json:Requires the json Cargo feature flag. Rust type MySQL/MariaDB type(s) Json JSON serde_json::JsonValue JSON &serde_json::value::RawValue JSON

Nullable:In addition, Option<T> is supported where T implements Type. An Option represents a potentially NULL value from MySQL/MariaDB.

使用
#

创建单连接:

use sqlx::Connection;

let conn = SqliteConnection::connect("sqlite::memory:").await?;

使用连接池(建议):

let pool = MySqlPool::connect("mysql://user:pass@host/database").await?;

SQL 两种查询方式:

  1. prepared query:先将 query 发送给 server 解析,返回解析后的 id,后续可以多次复用。后续使用时,需要传入参数列表,可以避免 SQL 注 入;
  2. unprepared query:直接发送给服务器执行;

sqlx 支持这两种查询方式:

  1. &str:为 unprepared query;
  2. Query/QueryAs struct:为 prepared query;

建议使用更高层的 sqlx::query() 系列函数,然后调用如下函数来返回记录:

  • execute(): 返回受影响的 row 数量,类型:sqlx::Result<DB::QueryResult> ,例如 For INSERT/UPDATE/DELETE without RETURNING.
  • fetch():返回多行记录的 stream;
  • fetch_one():必须返回一行记录;
  • fetch_optional():返回 0 或 1 行记录;
  • fetch_all():返回多行记录到 Vec 中;
sqlx::query("DELETE FROM table").execute(&mut conn).await?;
sqlx::query("DELETE FROM table").execute(&pool).await?;

fetch() 系列函数返回 Row<‘conn> trait,可以使用 get()/try_get() 来获取各列:

// provides `try_next`
use futures::TryStreamExt;
// provides `try_get`
use sqlx::Row;

let mut rows = sqlx::query("SELECT * FROM users WHERE email = ?")
    .bind(email)
    .fetch(&mut conn);

while let Some(row) = rows.try_next().await? {
    // map the row into a user-defined domain type
    let email: &str = row.try_get("email")?;
}

// 也可以使用 map 来转换记录:
let mut stream = sqlx::query("SELECT * FROM users")
    .map(|row: PgRow| {
        // map the row into a user-defined domain type
    })
    .fetch(&mut conn);

使用 query_as() 来自动将查询记录解析到实现 FromRow trait 的自定义类型:

#[derive(sqlx::FromRow)]
struct User { name: String, id: i64 }

let mut stream = sqlx::query_as::<_, User>("SELECT * FROM users WHERE email = ? OR name = ?")
    .bind(user_email)
    .bind(user_name)
    .fetch(&mut conn);

还可以使用 query!()/query_as!() 宏来进行查询,这两个宏会生成包含结果字段的匿名对象(可以通过 field 直接访问,而不需要像上面那样通过 get(“field_name”) 来获得值),同时还在编译时对查询 SQL 语句进行验证。

连接
#

sqlx::ConnectOptions trait:它的方法 connect() 使用 ConnectOptions 创建 Connection:

pub trait ConnectOptions: 'static + Send + Sync + FromStr<Err = Error> + Debug + Clone {
    type Connection: Connection<Options = Self> + ?Sized;

    // Required methods
    fn from_url(url: &Url) -> Result<Self, Error>;
    fn connect(&self,) -> Pin<Box<dyn Future<Output = Result<Self::Connection, Error>> + Send>> where Self::Connection: Sized;
    fn log_statements(self, level: LevelFilter) -> Self;
    fn log_slow_statements(self, level: LevelFilter, duration: Duration) -> Self;

    // Provided methods
    fn to_url_lossy(&self) -> Url { ... }
    fn disable_statement_logging(self) -> Self { ... }
}

impl ConnectOptions for AnyConnectOptions
impl ConnectOptions for MySqlConnectOptions
impl ConnectOptions for PgConnectOptions
impl ConnectOptions for SqliteConnectOptions

sqlx::Connection trait:

  • connect(url): 创建缺省连接;
  • connect_with(connectionOptions): 使用指定的 ConnectOptions 创建连接;
pub trait Connection: Send {
    type Database: Database<Connection = Self>;
    type Options: ConnectOptions<Connection = Self>;

    // Required methods
    fn close(self) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
    fn ping(&mut self, ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>>;
    fn begin( &mut self) -> Pin<Box<dyn Future<Output = Result<Transaction<Self::Database>, Error>> + Send>> where Self: Sized;
    fn shrink_buffers(&mut self);

    // Provided methods
    fn transaction<'a, F, R, E>( &'a mut self, callback: F, ) -> Pin<Box<dyn Future<Output = Result<R, E>> + Send + 'a>>
       where F: for<'c> FnOnce(&'c mut Transaction<'_, Self::Database>) -> Pin<Box<dyn Future<Output = Result<R, E>> + Send + 'c>> + 'a + for<'c> Send + for<'c> Sync, Self: Sized, R: Send, E: From<Error> + Send { ... }
    fn cached_statements_size(&self) -> usize where Self::Database: HasStatementCache { ... }
    fn clear_cached_statements( &mut self, ) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + '_>>
       where Self::Database: HasStatementCache { ... }
    fn connect( url: &str) -> Pin<Box<dyn Future<Output = Result<Self, Error>> + Send>> where Self: Sized { ... }
    fn connect_with( options: &Self::Options) -> Pin<Box<dyn Future<Output = Result<Self, Error>> + Send>> where Self: Sized
}


impl Connection for AnyConnection
impl Connection for MySqlConnection
 type Database = MySql
 type Options = MySqlConnectOptions
impl Connection for PgConnection
impl Connection for SqliteConnection

Struct sqlx::mysql::MySqlConnectOptions 实现了 ConnectOptions trait,同时也提供了 mysql 相关的连接配置参数:

  • 可以直接调用对应方法来设置连接参数,也可以从字符串解析连接参数(实现了 FromStr trait);
  • 使用 connect() 方法创建一个数据库连接;
  • 使用 Pool 的 connect_with(opts) 函数来创建一个连接池;
pub struct MySqlConnectOptions { /* private fields */ }

// mysql://[host][/database][?properties]

use sqlx::{Connection, ConnectOptions};
use sqlx::mysql::{MySqlConnectOptions, MySqlConnection, MySqlPool, MySqlSslMode};

// URL connection string
let conn = MySqlConnection::connect("mysql://root:password@localhost/db").await?;

// 手动构造连接参数
let conn = MySqlConnectOptions::new()
    .host("localhost")
    .username("root")
    .password("password")
    .database("db")
    .connect().await?; // 创建 Connection

// 从字符串解析连接参数
let mut opts: MySqlConnectOptions = "mysql://root:password@localhost/db".parse()?;
// 修改连接参数
opts = opts.log_statements(log::LevelFilter::Trace);
// 连接池使用连接参数
let pool = MySqlPool::connect_with(opts).await?;

Struct sqlx::MySqlConnection

pub struct MySqlConnection { /* private fields */ }

impl<'c> Acquire<'c> for &'c mut MySqlConnection
impl AnyConnectionBackend for MySqlConnection

impl Connection for MySqlConnection
  type Database = MySql
  type Options = MySqlConnectOptions

impl Debug for MySqlConnection

impl<'c> Executor<'c> for &'c mut MySqlConnection

impl Migrate for MySqlConnection

Executor/Execute
#

各种数据库连接和连接池对象,如 MySqlConnection/MySqlPool,实现了 Trait sqlx::Executor,故可以在 Connection/Pool 对象上执行 fetch/execute() 等操作:

pub trait Executor<'c>: Sized + Send + Debug {
        type Database: Database;

        // Required methods
        fn fetch_many<'e, 'q, E>(self, query: E, ) -> Pin<Box<dyn Stream<Item = Result<Either + Send + 'e>> where 'q: 'e, 'c: 'e, E: 'q + Execute<'q, Self::Database>;

        fn fetch_optional<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Future<Output = Result<Option + Send + 'e>> where 'q: 'e, 'c: 'e, E: 'q + Execute<'q, Self::Database>;

        fn prepare_with<'e, 'q>( self, sql: &'q str, parameters: &'e [<Self::Database as Database>::TypeInfo], ) -> Pin<Box<dyn Future<Output = Result + Send + 'e>> where 'q: 'e, 'c: 'e

        // Provided methods
        fn execute<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Future<Output = Result + Send + 'e>> where 'q: 'e, 'c: 'e, E: 'q + Execute<'q, Self::Database>

        // 返回的是 Stream, 可以迭代返回结果
        fn execute_many<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Stream<Item = Result + Send + 'e>> where 'q: 'e, 'c: 'e, E: 'q + Execute<'q, Self::Database>

        // 返回的是 Stream, 可以迭代返回结果
        fn fetch<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Stream<Item = Result + Send + 'e>> where 'q: 'e, 'c: 'e, E: 'q + Execute<'q, Self::Database>

        // 返回的是 Vec<Row>
        fn fetch_all<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Future<Output = Result<Vec + Send + 'e>> where 'q: 'e, 'c: 'e, E: 'q + Execute<'q, Self::Database>

        fn fetch_one<'e, 'q, E>( self, query: E, ) -> Pin<Box<dyn Future<Output = Result + Send + 'e>> where 'q: 'e, 'c: 'e, E: 'q + Execute<'q, Self::Database>

        fn prepare<'e, 'q>( self, query: &'q str, ) -> Pin<Box<dyn Future<Output = Result + Send + 'e>> where 'q: 'e, 'c: 'e
    }

各种数据库 Connection 的 &mut 借用,以及连接池 Pool 对象的 & 借用,实现了 Executor trait:

impl<'c> Executor<'c> for &'c mut PgListener
impl<'c> Executor<'c> for &'c mut AnyConnection
impl<'c> Executor<'c> for &'c mut MySqlConnection
impl<'c> Executor<'c> for &'c mut PgConnection
impl<'c> Executor<'c> for &'c mut SqliteConnection
impl<'p, DB> Executor<'p> for &Pool<DB> where DB: Database, &'c mut <DB as Database>::Connection: for<'c> Executor<'c, Database = DB>

sqlx::Executor trait 的各方法的 query 参数是 Execute trait 类型,&str/Query/QueryAs 等均实现了 Execute trait, 故可以作为 query 参数值。

  • &str 是非缓存的查询,而 sqlx::query()/query_as() 返回的 Query/QueryAs 是 prepared statement。
pub trait Execute<'q, DB>: Sized + Send where DB: Database,
{
    // Required methods
    fn sql(&self) -> &'q str;
    fn statement(&self) -> Option<&<DB as Database>::Statement<'q>>;
    fn take_arguments( &mut self,) -> Result<Option, Box<dyn Error + Sync + Send>>;
    fn persistent(&self) -> bool;
}

// &str 实现了 Execute
impl<'q, DB> Execute<'q, DB> for &'q str where DB: Database
impl<'q, DB> Execute<'q, DB> for (&'q str, Option) where DB: Database

// Query/Map/QueryAs 等也实现了 Execute trait
impl<'q, DB> Execute<'q, DB> for RawSql<'q> where DB: Database
impl<'q, DB, A> Execute<'q, DB> for Query<'q, DB, A> where DB: Database, A: Send + IntoArguments<'q, DB>
impl<'q, DB, F, A> Execute<'q, DB> for Map<'q, DB, F, A> where F: Send, A: Send + IntoArguments<'q, DB>, DB: Database
impl<'q, DB, O, A> Execute<'q, DB> for QueryAs<'q, DB, O, A> where O: Send, A: Send + 'q + IntoArguments<'q, DB>, DB: Database
impl<'q, DB, O, A> Execute<'q, DB> for QueryScalar<'q, DB, O, A> where DB: Database, O: Send, A: Send + 'q + IntoArguments<'q, DB>

示例:

// 数据库 Connection 实现了 Executor Trait,故可以直接调用 execute/fetch() 等方法。

// 使用 &str 作为 Execute
conn.execute("BEGIN").await?; // unprepared, simple query
// 使用 Query/QueryAs 作为 Execute
conn.execute(sqlx::query("DELETE FROM table")).await?; // prepared, cached query

// Connection 提供了事务能力
use sqlx::postgres::{PgConnection, PgRow};
use sqlx::Connection;
conn.transaction(|txn| Box::pin(async move {
    sqlx::query("select * from ..").fetch_all(&mut **txn).await
})).await

Row
#

Executor 的方法, 如 fetch()/fetch_one() 等返回的类型是 Vec<Row>/Row, 而各数据库实现了该 Row trait, 如 MySqlRow:

pub trait Row: Unpin + Send + Sync + 'static {
    type Database: Database<Row = Self>;

    // Required methods
    fn columns(&self) -> &[<Self::Database as Database>::Column];
    fn try_get_raw<I>( &self, index: I,) -> Result<<Self::Database as Database>::ValueRef<'_>, Error>  where I: ColumnIndex<Self>;

    // Provided methods
    fn is_empty(&self) -> bool
    fn len(&self) -> usize

    fn column<I>(&self, index: I) -> &<Self::Database as Database>::Column where I: ColumnIndex<Self>
    fn try_column<I>( &self, index: I, ) -> Result<&<Self::Database as Database>::Column, Error> where I: ColumnIndex<Self>

    // 使用 index 获取 column 值
    fn get<'r, T, I>(&'r self, index: I) -> T where I: ColumnIndex<Self>, T: Decode<'r, Self::Database> + Type<Self::Database>
    fn get_unchecked<'r, T, I>(&'r self, index: I) -> T where I: ColumnIndex<Self>, T: Decode<'r, Self::Database>
    fn try_get<'r, T, I>(&'r self, index: I) -> Result<T, Error> where I: ColumnIndex<Self>, T: Decode<'r, Self::Database> + Type<Self::Database>
    fn try_get_unchecked<'r, T, I>(&'r self, index: I) -> Result<T, Error> where I: ColumnIndex<Self>, T: Decode<'r, Self::Database>
}

get() 使用的 index 是 ColumnIndex trait 类型, &str 和 usize 实现了 ColumnIndex, 可以按照 column name 或 index 值来获取结 果 Row 中对应列的值:

pub trait ColumnIndex<T>: Debug where T: ?Sized,
{
    // Required method
    fn index(&self, container: &T) -> Result<usize, Error>;
}

// &str 和 usize 实现了 ColumnIndex
impl ColumnIndex<MySqlRow> for &str
impl ColumnIndex<MySqlRow> for usize
impl ColumnIndex<MySqlStatement<'_>> for &str
impl ColumnIndex<MySqlStatement<'_>> for usize

struct MySqlRow 实现了 Row trait:

pub struct MySqlRow { /* private fields */ }

impl ColumnIndex<MySqlRow> for &str
impl ColumnIndex<MySqlRow> for usize
impl Debug for MySqlRow
impl Row for MySqlRow
impl<'a> TryFrom<&'a MySqlRow> for AnyRow

示例:

// provides `try_next`
use futures::TryStreamExt;
// provides `try_get`
use sqlx::Row;

let mut rows = sqlx::query("SELECT * FROM users WHERE email = ?")
    .bind(email)
    .fetch(&mut conn); // 返回的是一个 Stream

// 迭代 Stream
while let Some(row) = rows.try_next().await? {
    // 按照 column name 或 index 来获取列值并转换为 Rust 类型
    let email: &str = row.try_get("email")?;
}

连接池
#

sqlx 一般使用连接池来执行 query 等操作。

创建连接池有两种方式:

  1. PgPool::connect(url): 使用缺省参数创建 PgPool 对象;
  2. PgPoolOptions::new().max_connnections(5).connect(url): 使用 PgPoolOptions 来设置连接池参数,然后创建 PgPool;

两种方式使用连接池 PgPool:

  1. PgConnection 和 PgPool 均实现了 Executor trait, 可以直接执行 execute()/fetch() 等方法;
  2. query()/query_as()/query!()/query_as!() 返回的 Query/QueryAs 对象实现了 Executor trait,它的各方法,如 execute()/fetch()/fetch_all() 等,的参数是 &PgPool 或 &mut PgConnection ; 《– 建议的方式

使用 Pool::connect(url)/connect_with(connectionOptions) 来创建 Pool,然后使用 Pool::acquire() 来获得一个 PoolConnection 对象,当该对象被 drop 时,连接将返回到 Pool 中。

  • PoolOptions 配置的 Pool 自身,而 ConnectionOptions,如 MysqlConnectionOptions,配置的是 Pool 中的 Connnection,两者是不同的维度。
// Pool 是范型 struct 而非 trait 类型
pub struct Pool<DB>(/* private fields */) where DB: Database;

// 方便直接使用的类型别名
pub type MySqlPool = Pool<Mysql>;
pub type PgPool = Pool<Postgres>;
pub type SqlitePool = Pool<Sqlite>;

// Pool 对象方法
impl<DB> Pool<DB> where DB: Database

// 使用默认参数创建 Pool
pub async fn connect(url: &str) -> Result<Pool<DB>, Error>
// 使用 ConnecitonOptions 来自定义 Pool 中的 Connection
pub async fn connect_with( options: <<DB as Database>::Connection as Connection>::Options,) -> Result<Pool<DB>, Error>
pub fn connect_lazy(url: &str) -> Result<Pool<DB>, Error>
pub fn connect_lazy_with(options: <<DB as Database>::Connection as Connection>::Options,) -> Pool<DB>

// 从连接池中获得连接 PoolConnection,该对象被drop 时,对应的连接会连接池回收
pub fn acquire(&self,) -> impl Future<Output = Result<PoolConnection<DB>, Error>> + 'static
pub fn try_acquire(&self) -> Option<PoolConnection<DB>>

// 创建事务
pub async fn begin(&self) -> Result<Transaction<'static, DB>, Error>
pub async fn try_begin(&self) -> Result<Option<Transaction<'static, DB>>, Error>

pub fn close(&self) -> impl Future<Output = ()>
pub fn is_closed(&self) -> bool
pub fn close_event(&self) -> CloseEvent

pub fn size(&self) -> u32
pub fn num_idle(&self) -> usize

// 获得和设置连接参数
pub fn connect_options(&self,) -> Arc<<<DB as Database>::Connection as Connection>::Options>
pub fn set_connect_options(&self, connect_options: <<DB as Database>::Connection as Connection>::Options,)

// 获得连接池参数
pub fn options(&self) -> &PoolOptions<DB>

一般使用 Struct sqlx::pool::PoolOptions 或类型别名如 PgPoolOptions 来先设置连接池参数,如最大连接数,空闲时间等,然后使用 connect() 等方法来创建 Pool:

impl<DB> PoolOptions<DB> where DB: Database

// 类型别名
pub type PgPoolOptions = PoolOptions<Postgres>;

// PoolOptions 的方法
pub fn new() -> PoolOptions<DB>

pub fn max_connections(self, max: u32) -> PoolOptions<DB>
pub fn get_max_connections(&self) -> u32

pub fn min_connections(self, min: u32) -> PoolOptions<DB>
pub fn get_min_connections(&self) -> u32

pub fn acquire_time_level(self, level: LevelFilter) -> PoolOptions<DB>
pub fn acquire_slow_level(self, level: LevelFilter) -> PoolOptions<DB>
pub fn acquire_slow_threshold(self, threshold: Duration) -> PoolOptions<DB>
pub fn get_acquire_slow_threshold(&self) -> Duration
pub fn acquire_timeout(self, timeout: Duration) -> PoolOptions<DB>
pub fn get_acquire_timeout(&self) -> Duration

pub fn max_lifetime(self, lifetime: impl Into<Option<Duration>>,) -> PoolOptions<DB>
pub fn get_max_lifetime(&self) -> Option<Duration>

pub fn idle_timeout( self, timeout: impl Into<Option<Duration>>, ) -> PoolOptions<DB>
pub fn get_idle_timeout(&self) -> Option<Duration>

pub fn test_before_acquire(self, test: bool) -> PoolOptions<DB>
pub fn get_test_before_acquire(&self) -> bool

pub fn after_connect<F>(self, callback: F) -> PoolOptions<DB> where F: for<'c> Fn(&'c mut <DB as Database>::Connection, PoolConnectionMetadata) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send + 'c>> + 'static + for<'c> Send + for<'c> Sync

pub fn before_acquire<F>(self, callback: F) -> PoolOptions<DB> where F: for<'c> Fn(&'c mut <DB as Database>::Connection, PoolConnectionMetadata) -> Pin<Box<dyn Future<Output = Result<bool, Error>> + Send + 'c>> + 'static + for<'c> Send + for<'c> Sync

pub fn after_release<F>(self, callback: F) -> PoolOptions<DB>  where F: for<'c> Fn(&'c mut <DB as Database>::Connection, PoolConnectionMetadata) -> Pin<Box<dyn Future<Output = Result<bool, Error>> + Send + 'c>> + 'static + for<'c> Send + for<'c> Sync

// 创建 Pool<DB> 对象
pub async fn connect(self, url: &str) -> Result<Pool<DB>, Error>
// 使用 ConnectionOptions 来自定义 Pool 中的 Connection
pub async fn connect_with(self, options: <<DB as Database>::Connection as Connection>::Options, ) -> Result<Pool<DB>, Error>
pub fn connect_lazy(self, url: &str) -> Result<Pool<DB>, Error>
pub fn connect_lazy_with(self, options: <<DB as Database>::Connection as Connection>::Options,) -> Pool<DB>

Pool 类型实现了 Executor trait,故支持各种 execute()/fetch()/fetch_one() 等方法。它同时实现了 Send, Sync, Clone ,故可以在多线程或异步任务中使用:

示例:

use sqlx::Pool;
use sqlx::postgres::Postgres;

// Pool 是范型 struct 类型而非 trait
let pool = Pool::<Postgres>::connect("postgres://").await?;

// 使用类型别名
use sqlx::mssql::MssqlPool;
let pool = MssqlPool::connect("mssql://").await?;

use sqlx::{Executor, PgPool};
// 使用默认参数创建 Pool
let pool = PgPool::connect("postgresql://...").await?;
// Pool 实现了 Send+Sync+Clone,可以 Clone 后在多线程环境中使用
let pool2 = pool.clone();
tokio::spawn(async move {
    let res: sqlx::Result<sqlx::Result<()>> = pool2.close_event().do_until(async {
        // Pool 实现了 Executor trait,故可以直接调用 execute() 方法
        pool2.execute("SELECT pg_sleep('30 days')").await?;
        println!("Waited!");
        Ok(())
    }).await;

    match res {
        Ok(Ok(())) => println!("Wait succeeded"),
        Ok(Err(e)) => println!("Error from inside do_until: {e:?}"),
        Err(e) => println!("Error from do_until: {e:?}"),
    }
});

// This normally wouldn't return until the above statement completed and the connection was
// returned to the pool. However, thanks to `.do_until()`, the operation was cancelled as soon as
// we called `.close().await`.
pool.close().await;

// Pool 实现了 Executor trait,故可以作为 execute/fetch() 等方法的参数。
sqlx::query("DELETE FROM articles").execute(&pool).await?;
use sqlx::postgres::PgPoolOptions;
#[async_std::main]
async fn main() -> Result<(), sqlx::Error> {
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect("postgres://postgres:password@localhost/test").await?;
    let row: (i64,) = sqlx::query_as("SELECT $1")
        .bind(150_i64)
        .fetch_one(&pool).await?;
    assert_eq!(row.0, 150);
    Ok(())
}

query/query_as
#

sqlx 提供了如下函数,可以用来构造查询语句:

  1. query():执行一个带语句缓存的 SQL 查询;返回 struct Query 对象.
  2. query_as(): 执行一个带语句缓存的 SQL 查询,同时将结果 Rows 映射到实现 FromRow 的 Rust 类型上, 返回 struct QueryAs 对象.
  3. query_as_with():和 query_as 类似,但是调用函数时需要传入绑定的参数列表;
  4. query_scalar():和 query 类似,但是只返回各 row 的第一列数据, 返回 struct QueryScalar 对象.
  5. query_scalar_with():和 query_scalar 类似,但调用函数时需要传入绑定的参数列表;
  6. query_with():和 query 类似,但调用函数时需要传入绑定的参数列表;
  7. raw_sql():在一个 raw SQL 中执行分号分割的一条或多条语句;

struct Query/QueryAs 对象方法:

  1. bind(): 设置绑定参数, 返回 Query, 故可以链式调用;
  2. map(): 传入一个闭包, 根据查询结果 Row 来返回映射的 Rust 类型;
  3. execute()/fetch()/fetch_all()/fetch_one()/fetch_optional(): 它们的参数是实现 Executor trait 的类型
  • 各种数据库 Connection 的 &mut 借用,以及连接池 Pool 对象的 & 借用,实现了 Executor trait:
  • 这些方法名虽然和 Executor trait 的方法类似, 但 struct Query/QueryAs 并没有实现 Executor trait;
// 返回代表 prepared statement 的 Query 对象
pub fn query<DB>(sql: &str) -> Query<'_, DB, <DB as Database>::Arguments<'_>> where DB: Database

// Query 类型
pub struct Query<'q, DB, A> where DB: Database, { /* private fields */ }

// Query 类型实现的方法
pub fn bind<T>(self, value: T) -> Query<'q, DB, <DB as Database>::Arguments<'q>> where T: 'q + Encode<'q, DB> + Type<DB>

pub fn try_bind<T>( &mut self, value: T,) -> Result<(), Box<dyn Error + Sync + Send>> where T: 'q + Encode<'q, DB> + Type<DB>

pub fn persistent(self, value: bool) -> Query<'q, DB, A>

pub fn map<F, O>( self, f: F,) -> Map<'q, DB, impl FnMut(<DB as Database>::Row) + Send, A> where F: FnMut(<DB as Database>::Row) -> O + Send, O: Unpin

pub fn try_map<F, O>(self, f: F) -> Map<'q, DB, F, A> where F: FnMut(<DB as Database>::Row) -> Result<O, Error> + Send, O: Unpin

pub async fn execute<'e, 'c, E>(self, executor: E, ) -> Result<<DB as Database>::QueryResult, Error> where 'c: 'e, 'q: 'e, A: 'e, E: Executor<'c, Database = DB>

// fetch/fetch_all/fetch_one/fetch_options 返回的各行类型是 Row

pub fn fetch<'e, 'c, E>( self, executor: E,) -> Pin<Box<dyn Stream<Item = Result + Send + 'e>> where 'c: 'e, 'q: 'e, A: 'e, E: Executor<'c, Database = DB>

pub async fn fetch_all<'e, 'c, E>( self, executor: E, ) -> Result<Vec<<DB as Database>::Row>, Error> where 'c: 'e, 'q: 'e, A: 'e, E: Executor<'c, Database = DB>

pub async fn fetch_one<'e, 'c, E>( self, executor: E, ) -> Result<<DB as Database>::Row, Error> where 'c: 'e, 'q: 'e, A: 'e, E: Executor<'c, Database = DB>

pub async fn fetch_optional<'e, 'c, E>( self, executor: E,) -> Result<Option<<DB as Database>::Row>, Error> where 'c: 'e, 'q: 'e, A: 'e, E: Executor<'c, Database = DB>

示例:

// 不建议直接使用字符串替换,容易导致 SQL 注入漏洞
let user_input = "possibly untrustworthy input!";
let query = format!("SELECT * FROM articles WHERE content LIKE '%{user_input}%'");
// &mut Connection 实现了 Executor trait
let results = sqlx::query(&query).fetch_all(&mut conn).await?;

// 推荐使用 query + bind 方式,query 的查询语句会被解析后缓存(prepared statement)
let user_input = "Alice's Apples";
let results = sqlx::query(
    "SELECT * FROM articles WHERE title LIKE '%' || $1 || '%' OR content LIKE '%' || $1 || '%'")
    .bind(user_input)
    .fetch_all(&mut conn) // 返回 Vec<MySqlRow> 类型对象, 实现了 Row trait, 可以通过 get(index)  获得列值.
    .await?;

// query_as() 可以指定要映射到的 Rust 类型, 该类型实现 FromRow.
// 注意: query_as!() 宏函数也可以指定映射到的 Rust 类型, 但是该类型不需要实现 FromRow
#[derive(sqlx::FromRow)]
struct User { name: String, id: i64 }
let mut stream = sqlx::query_as::<_, User>("SELECT * FROM users WHERE email = ? OR name = ?")
    .bind(user_email)
    .bind(user_name)
    .fetch(&mut conn);

// 使用 map() 对查询结果进行转换.
let mut stream = sqlx::query("SELECT * FROM users")
    .map(|row: PgRow| {
        // map the row into a user-defined domain type
    })
    .fetch(&mut conn);

// provides `try_next`
use futures::TryStreamExt;
// provides `try_get`
use sqlx::Row;

let mut rows = sqlx::query("SELECT * FROM users WHERE email = ?")
    .bind(email)
    .fetch(&mut conn); // 返回的是一个 Stream

// 迭代 Stream
while let Some(row) = rows.try_next().await? {
    // 按照 column name 或 index 来获取列值并转换为 Rust 类型
    let email: &str = row.try_get("email")?;
}

#[derive(Debug)]
struct Product {
    name: String,
    price: f64,
}
async fn get_products(pool: &PgPool, min_price: f64) -> Result<Vec<Product>, sqlx::Error> {
    let products = sqlx::query_as!(Product, r#"SELECT name, price FROM products WHERE price > $1"#, min_price)
        // &Pool 实现了 Executor trait
        .fetch_all(pool) // 返回一个 Vec
        .await?;
    Ok(products)
}

在实际场景中, 更建议使用这些函数的宏函数版本, 如 query!()/query_as!() , 因为 sqlx 在编译时会对宏函数版本进行编译检查.

编译时验证
#

通过使用宏函数,如 sqlx::query!()/query_as!(),可以在编译时对查询语句进行数据库验证,同时返回匿名的记录类型,后续可以使用 field 来访问数据各列:

let countries = sqlx::query!(
    "
SELECT country, COUNT(*) as count
FROM users
GROUP BY country
WHERE organization = ?
        ",
    organization // 给出所有参数值,而不需要调用 bind() 方法
)
    .fetch_all(&pool) // -> Vec<{ country: String, count: i64 }>
    .await?;

// countries[0].country
// countries[0].count


async fn list_todos(pool: &MySqlPool) -> anyhow::Result<()> {
    let recs = sqlx::query!(
        r#"
SELECT id, description, done
FROM todos
ORDER BY id
        "#
    )
        .fetch_all(pool)
        .await?;

    // NOTE: Booleans in MySQL are stored as `TINYINT(1)` / `i8`
    //       0 = false, non-0 = true
    for rec in recs {
        println!(
            "- [{}] {}: {}",
            if rec.done != 0 { "x" } else { " " },
            rec.id,
            &rec.description,
        );
    }

    Ok(())
}


pub async fn verify(self, db: impl PgExecutor<'_> + Send) -> Result<UserId> {
    self.validate()?;

    let maybe_user = sqlx::query!(
            r#"select user_id, password_hash from "user" where username = $1"#,
        self.username
    )
        .fetch_optional(db)
        .await?;

    if let Some(user) = maybe_user {
        let verified = crate::password::verify(self.password, user.password_hash).await?;

        if verified {
            return Ok(user.user_id);
        }
    }

    // Sleep a random amount of time to avoid leaking existence of a user in timing.
    let sleep_duration = rand::thread_rng().gen_range(Duration::from_millis(100)..=Duration::from_millis(500));
    tokio::time::sleep(sleep_duration).await;

    Err(Error::UnprocessableEntity( "invalid username/password".into(), ))
}

query!() 宏函数和 query() 函数的区别:

  1. 在宏参数中列出所有绑定参数值,而不需要调用 query() 返回的 Query 类型对象的 bind() 方法;
  2. 返回的结果用匿名记录类型来表示;
  3. 使用 DATABASE_URL 环境变量对应的数据库来进行编译时验证,数据库不要求有数据,但是类型和 schema 必须正确;
    • 也可以使用 offline 验证模式;

query!() 返回对象的方法:

  1. 不需要返回值:使用 .execute(…).await,返回 sqlx::ResultDB::QueryResult ,即影响的记录数量,如:For INSERT/UPDATE/DELETE without RETURNING.
  2. 0 或 1 返回值:使用 .fetch_optional(…).await,返回 sqlx::Result<Option<{adhoc struct}»,即一条匿名类型,忽略剩余行;
  3. 1 条记录:使用 .fetch_one(…).await,返回 sqlx::Result<{adhoc struct}>,即一条匿名记录,如果没有查到记录,则返回 Err,多于一条的记录将被忽略;
  4. 至少 1 条记录:使用 .fetch(…),返回 impl Stream<Item = sqlx::Result<{adhoc struct}»,后续调用 .try_next().await 来获取各行记录;
  5. 多条记录的 Vec:使用 .fetch_all(…),返回 sqlx::Result<Vec<{adhoc struct}»;

query!() 的局限性是不能自定义返回的匿名类型。

解决办法是使用可以指定返回类型的 query_as!() 宏:

// no traits are needed,也不需要实现 FromRow trait
struct Country { country: String, count: i64 }

let countries = sqlx::query_as!(Country,
        "
SELECT country, COUNT(*) as count
FROM users
GROUP BY country
WHERE organization = ?
        ",
        organization
    )
    .fetch_all(&pool) // -> Vec<Country>
    .await?;

// countries[0].country
// countries[0].count

Migrator
#

Migrator 代表运行时迁移对象,可以使用 migrate!() 来进行创建,或使用 Migrator::new() 来在运行时创建。

  • 使用 migrate!() 创建时,会在编译时将 migrations 目录下的内容打包的二进制中。

对于每个迁移,sqlx 会在数据库中记录对应的 version 和 checksum,后续可以指定 version 来回滚,或者校验 checksum 是否和当前 migrations 目录中的定义的一致。

pub struct Migrator { /* private fields */ }

// 实现的方法
impl Migrator

pub async fn new<'s, S>(source: S) -> Result<Migrator, MigrateError> where S: MigrationSource<'s>

pub fn set_ignore_missing(&mut self, ignore_missing: bool) -> &Migrator

// 设置迁移时是否 lock 数据库,默认为 true
pub fn set_locking(&mut self, locking: bool) -> &Migrator

// 迭代获得所有的 Migration
pub fn iter(&self) -> Iter<'_, Migration>

pub fn version_exists(&self, version: i64) -> bool

// 运行 pending 的迁移,同时检查已经执行过的迁移是否和目录中的定义一致(用于发现历史迁移变化、不一致的情况)。
pub async fn run<'a, A>(&self, migrator: A) -> Result<(), MigrateError> where A: Acquire<'a>, ::Connection as Deref>::Target: Migrate

// 执行 down 迁移直到指定的 version
pub async fn undo<'a, A>(&self, migrator: A, target: i64, ) -> Result<(), MigrateError> where A: Acquire<'a>, ::Connection as Deref>::Target: Migrate,

示例:

use std::path::Path;

// Read migrations from a local folder: ./migrations
let m = Migrator::new(Path::new("./migrations")).await?;
let pool = SqlitePoolOptions::new().connect("sqlite::memory:").await?;
m.run(&pool).await;
m.undo(&pool, 4).await;

其它 Rust SQL 项目
#

  1. SeaORM: 底层基于 sqlx 的 ORM;

  2. diesel: A safe, extensible ORM and Query Builder for Rust;

由于使用了 async,sqlx 性能比 diesel 慢 7-70 倍: https://github.com/skerkour/kerkour.com/issues/1

SeaORM 底层也是基于 sqlx 的,它的性能也比 Diesel 慢:https://github.com/diesel-rs/metrics/

shuttle 项目使用 sqlx:https://docs.shuttle.rs/resources/shuttle-shared-db

crates.io 项目使用了 axum + diesel。

使用 SeaORM 项目列表(zed 等):https://github.com/SeaQL/sea-orm?tab=readme-ov-file#whos-using-seaorm

参考
#

No More Unchecked Sqlx Queries
#

http://www.matildasmeds.com/posts/no-more-unchecked-sqlx-queries/

A checked query is a query that can be statically validated at compile time. SQLx compares column and table names to the database schema, validates the syntax, and verifies data types without running any code.

How to recognize unchecked queries in the wild?

As a general rule, queries created using the sqlx::query() and sqlx::query_as() functions are unchecked , while those created with the macros sqlx::query! and sqlx::query_as! (and several others) are checked. If you see QueryBuilder being used, that query is not checked. Check the documentation for other functions and macros.

Personally I like to use sqlx::query_as! macro with an appropriate struct, except maybe for very simple queries, when I would use the sqlx::query!() macro. In our project, we mainly use these two, and steer clear from functions that don’t check the query.

// This is checked, but not DRY
let query = if let Some(is_guest) = is_guest_option {
    sqlx::query!("SELECT id FROM users WHERE is_guest = ?", is_guest)
} else {
    sqlx::query!("SELECT id FROM users")
};


let res = query
    .fetch_all(&pool)
    .await;

What would be a better alternative? Well, we can use what I like to call =“Option pattern”=. It will be easier to manage, especially when the queries grow in size.

  • 利用的原理是 None is NULL;
// Postgres version
let ids = sqlx::query_as!(
    Uuid,
    "SELECT id FROM users \
     WHERE ($1::timestamptz IS NULL OR updated_at < $1) \
       AND ($2::timestamptz IS NULL OR updated_at > $2) \
       AND ($3::boolean IS NULL OR is_guest = $3)",
    updated_before_option,
    updated_after_option,
    is_guest_option,
    )
    .fetch_all(&pool)
    .await;

// MySQL version
let ids = sqlx::query_as!(
    Uuid,
    "SELECT id FROM users \
     WHERE (? IS NULL OR updated_at < ?) \
       AND (? IS NULL OR updated_at > ?) \
       AND (? IS NULL OR is_guest = ?)",
    updated_before_option,
    updated_before_option,
    updated_after_option,
    updated_after_option,
    is_guest_option,
    is_guest_option,
    )
    .fetch_all(&pool)
    .await;

When IN does not compile, use ANY instead

SQLx has an important limitation when using the IN operator with list parameters. The IN operator works with a single item, subqueries, or hard-coded values. However, it =cannot be included in a checked query=, if it takes a list as a parameter. This can be confusing when encountered for the first time, so it is useful to mention in this post as well.

The solution is to =use the ANY operator=, which works seamlessly with lists and individual items alike.

Here’s an example of a checked query using IN that will not compile in:

// Does not compile
let records = sqlx::query!("SELECT name, email, created_at \
    FROM users WHERE id IN ($1)",
    ids,
    )
    .fetch_all(&pool)
    .await;

To address this issue, we can use ANY operator instead:

// This compiles
let records = sqlx::query!("SELECT name, email, created_at \
    FROM users WHERE id = ANY($1)",
    ids,
    )
    .fetch_all(&pool)
    .await;

And there, we have a checked query! Here are some further reflections on the rather subtle difference between IN and ANY.

rust crate - 这篇文章属于一个选集。
§ 19: 本文

相关文章

axum
·
axum 是基于 hyper 实现的高性能异步 HTTP 1/2 Server 库。
clap
·
clap 用于快速构建命令行程序,提供命令&参数定义、解析等功能。
config
·
config 提供从文件或环境变量解析配置参数的功能。
diesel
·
diesel 是高性能的 ORM 和 Query Builder,crates.io 使用它来操作数据库。