Diesel のトランザクション中に async/await したい
ふとしです。
diesel - Rust は Rust の ORM です。書き心地が Arel によく似ていてよいです。
begin/commit/rollback を行うためにコネクションインスタンスにクロージャを取る transaction 関数がありますが async ではないため、トランザクションの中盤に await を挟むことができません。
そこで transaction 関数が行っていることを手書きします。
普通のトランザクション
だいたいこんな感じです。
let c = establish_connection();
c.transaction(|| {
let mut bar: BarsParamsSave = bars::table.find(99).for_update().first(&c)?;
bar.name = get_name_sync()?;
diesel::update(&bar).set(&bar).execute(&c)?;
Ok::<_, AppError>(())
})?;
Ok(())
この get_name_sync
を async にしたいと思いました。
transaction が何をやっているか見る
fn transaction<T, E, F>(&self, f: F) -> Result<T, E>
where
F: FnOnce() -> Result<T, E>,
E: From<Error>,
{
let transaction_manager = self.transaction_manager();
transaction_manager.begin_transaction(self)?;
match f() {
Ok(value) => {
transaction_manager.commit_transaction(self)?;
Ok(value)
}
Err(e) => {
transaction_manager.rollback_transaction(self)?;
Err(e)
}
}
}
self
を渡しているのはトランザクションの深度を見るためです。
参考にして書く
関数を用意する
self
のかわりにコネクションオブジェクトへの参照を渡せるようにして、クロージャのかわりに async ブロックを渡して
async fn transaction<C, T, E, F>(c: &C, f: F) -> Result<T, E>
where
C: Connection,
F: Future<Output = Result<T, E>>,
E: From<diesel::result::Error>,
{
let transaction_manager = c.transaction_manager();
transaction_manager.begin_transaction(c)?;
match f.await {
Ok(value) => {
transaction_manager.commit_transaction(c)?;
Ok(value)
}
Err(e) => {
transaction_manager.rollback_transaction(c)?;
Err(e)
}
}
}
呼び出す。
let c = establish_connection();
transaction(&c, async {
let mut bar: BarsParamsSave = bars::table.find(99).for_update().first(&c)?;
bar.name = get_name().await?;
diesel::update(&bar).set(&bar).execute(&c)?;
Ok::<_, AppError>(())
})
.await?;
Ok(())
ブロックでは返り値の型を明示できないので、推論が効かない場合はターボフィッシュによる明示 Ok::<_, AppError>(())
が必要になります。
クロージャの引数を現在使ってるコネクションにすれば間違ったコネクションを使う事故が起こらず良さそうですがライフタイムがよくわからんことになったので諦めました。
async ブロックでアドホックに書く
関数を特に用意しない場合は async ブロックを使って commit/rollback をまとめられます。
let c = establish_connection();
let tm = c.transaction_manager();
match async {
tm.begin_transaction(&c)?;
let mut bar: BarsParamsSave = bars::table.find(99).for_update().first(&c)?;
bar.name = get_name().await?;
diesel::update(&bar).set(&bar).execute(&c)?;
Ok::<_, AppError>(())
}
.await
{
Ok(_) => tm.commit_transaction(&c)?,
Err(_) => tm.rollback_transaction(&c)?,
};
Ok(())
r2d2 の PooledConnection は async 関数に渡そうとすると Mutex などで囲んで渡さなければならないので、インラインで async 書く (あるいはこれようのマクロを書く) ほうが多くなりそうかなと思いました。
おわりに
diesel 便利です。