Diesel のトランザクション中に async/await したい

ふとしです。

diesel - Rust は Rust の ORM です。書き心地が Arel によく似ていてよいです。

begin/commit/rollback を行うためにコネクションインスタンスにクロージャを取る transaction 関数がありますが async ではないため、トランザクションの中盤に await を挟むことができません。

そこで transaction 関数が行っていることを手書きします。

普通のトランザクション

だいたいこんな感じです。

let c = establish_connection();

c.transaction(|| {
    let mut bar: BarsParamsSave = bars::table.find(99).for_update().first(&c)?;
    bar.name = get_name_sync()?;
    diesel::update(&bar).set(&bar).execute(&c)?;

    Ok::<_, AppError>(())
})?;

Ok(())

この get_name_sync を async にしたいと思いました。

transaction が何をやっているか見る

fn transaction<T, E, F>(&self, f: F) -> Result<T, E>
where
    F: FnOnce() -> Result<T, E>,
    E: From<Error>,
{
    let transaction_manager = self.transaction_manager();
    transaction_manager.begin_transaction(self)?;
    match f() {
        Ok(value) => {
            transaction_manager.commit_transaction(self)?;
            Ok(value)
        }
        Err(e) => {
            transaction_manager.rollback_transaction(self)?;
            Err(e)
        }
    }
}

self を渡しているのはトランザクションの深度を見るためです。

参考にして書く

関数を用意する

self のかわりにコネクションオブジェクトへの参照を渡せるようにして、クロージャのかわりに async ブロックを渡して

async fn transaction<C, T, E, F>(c: &C, f: F) -> Result<T, E>
where
    C: Connection,
    F: Future<Output = Result<T, E>>,
    E: From<diesel::result::Error>,
{
    let transaction_manager = c.transaction_manager();
    transaction_manager.begin_transaction(c)?;
    match f.await {
        Ok(value) => {
            transaction_manager.commit_transaction(c)?;
            Ok(value)
        }
        Err(e) => {
            transaction_manager.rollback_transaction(c)?;
            Err(e)
        }
    }
}

呼び出す。

let c = establish_connection();

transaction(&c, async {
    let mut bar: BarsParamsSave = bars::table.find(99).for_update().first(&c)?;
    bar.name = get_name().await?;
    diesel::update(&bar).set(&bar).execute(&c)?;

    Ok::<_, AppError>(())
})
.await?;

Ok(())

ブロックでは返り値の型を明示できないので、推論が効かない場合はターボフィッシュによる明示 Ok::<_, AppError>(()) が必要になります。

クロージャの引数を現在使ってるコネクションにすれば間違ったコネクションを使う事故が起こらず良さそうですがライフタイムがよくわからんことになったので諦めました。

async ブロックでアドホックに書く

関数を特に用意しない場合は async ブロックを使って commit/rollback をまとめられます。

let c = establish_connection();

let tm = c.transaction_manager();
match async {
    tm.begin_transaction(&c)?;

    let mut bar: BarsParamsSave = bars::table.find(99).for_update().first(&c)?;
    bar.name = get_name().await?;
    diesel::update(&bar).set(&bar).execute(&c)?;

    Ok::<_, AppError>(())
}
.await
{
    Ok(_) => tm.commit_transaction(&c)?,
    Err(_) => tm.rollback_transaction(&c)?,
};

Ok(())

r2d2 の PooledConnection は async 関数に渡そうとすると Mutex などで囲んで渡さなければならないので、インラインで async 書く (あるいはこれようのマクロを書く) ほうが多くなりそうかなと思いました。

おわりに

diesel 便利です。