如何在 IAsyncEnumerable 发射器函数中正确使用 NpgsqlTransaction?

How can I correctly use NpgsqlTransaction inside a IAsyncEnumerable emitter function?

我不需要捕获异常,但是如果有异常我需要回滚:

public async IAsyncEnumerable<Item> Select()
{
    var t = await con.BeginTransactionAsync(token);
    try {
        var batchOfItems = new List<Item>(); //Buffer, so the one connection can be used while enumerating items
        using (var reader = await com.ExecuteReaderAsync(SQL, token)) 
        {
            while (await reader.ReadAsync(token))
            {
                var M = await Materializer(reader, token);
                batchOfItems.Add(M);
            }
        }

        foreach (var item in batchOfItems)
        {
            yield return item;
        }

        await t.CommitAsync();
    }
    catch
    {
        await t.RollbackAsync();
    }
    finally
    {
        await t.DisposeAsync();
    }
}

(此代码是我所做的简化版本,用于说明目的)

失败并显示消息:

cannot yield a value in the body of a try block with a catch clause


这与 Yield return from a try/catch block 相似,但具有新颖的上下文:

这与Why can't yield return appear inside a try block with a catch?. In my case, the context is more specific: I need the catch block to Rollback, not to do anything else. Also, as you can see, I already know the answer and created this as a Q&A combo. As you can see from the answer, that answer isn't relevant to Why can't yield return appear inside a try block with a catch?

不同

如果您可以检查事务是否已提交,则可以将回滚移动到 finally 块,您可以使用 IsCompleted

public async IAsyncEnumerable<Item> Select()
{
    var t = await con.BeginTransactionAsync(token);
    try {
        var batchOfItems = new List<Item>(); //Buffer, so the one connection can be used while enumerating items
        async using (var reader = await com.ExecuteReaderAsync(SQL, token)) 
        {
            while (await reader.ReadAsync(token))
            {
                var M = await Materializer(reader, token);
                batchOfItems.Add(M);
            }
        }

        foreach (var item in batchOfItems)
        {
            yield return item;
        }

        await t.CommitAsync();
    }
    finally
    {
        if (t.IsCompleted == false) //Implemented on NpgsqlTransaction, but not DbTransaction
            await t.RollbackAsync();
        await t.DisposeAsync();
    }
}

注意:catch 块已被删除,finally 块的开头添加了两行。

同样的方法也适用于其他 DbTransaction 没有 IsCompleted

的实现

使用 C# iterator could be to use the third-party library AsyncEnumerator (package) 创建 IAsyncEnumerable 的替代方法。

这个库是在 C# 8 出现之前创建异步枚举的主要资源,它可能仍然有用,因为据我所知,它不受原生 yield 的限制。您可以在传递给 AsyncEnumerable 构造函数的 lambda 主体中包含 trycatchfinally 块,并从任何调用 yield.ReturnAsync 方法这些块。

用法示例:

using Dasync.Collections;

//...

public IAsyncEnumerable<Item> Select()
{
    return new AsyncEnumerable<Item>(async yield => // This yield is a normal argument
    {
        await using var transaction = await con.BeginTransactionAsync(token);
        try
        {
            var batchOfItems = new List<Item>();
            await using (var reader = await com.ExecuteReaderAsync(SQL, token))
            {
                while (await reader.ReadAsync(token))
                {
                    var M = await Materializer(reader, token);
                    batchOfItems.Add(M);
                }
            }
            foreach (var item in batchOfItems)
            {
                await yield.ReturnAsync(item); // Instead of yield return item;
            }
            await transaction.CommitAsync();
        }
        catch (Exception ex)
        {
            await transaction.RollbackAsync();
        }
    });
}

上例中的yield不是C#的yield contextual keyword,只是一个同名参数。如果你愿意,你可以给它起另一个名字。

DbTransaction 被认为是在 SqlConnections 上管理事务的最佳方式,但 TransactionScope 也是有效的,并且可能会在相关场景中帮助其他人

public async IAsyncEnumerable<Item> Select()
{
    using (var scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
    {
        con.EnlistTransaction(Transaction.Current); //it's better to open the connection here, then dispose, but this will work
        com = con.CreateCommand(); //Probably need a new command object so it has the transaction context
        var batchOfItems = new List<Item>(); //Buffer, so the one connection can be used while enumerating items
        
        async using (var reader = await com.ExecuteReaderAsync(SQL, token)) 
        {
            while (await reader.ReadAsync(token))
            {
                var M = await Materializer(reader, token);
                batchOfItems.Add(M);
            }
        }

        foreach (var item in batchOfItems)
        {
            yield return item;
        }

        scope.Complete(); //Asynch option not available
        //No need to have explicit rollback call, instead it's standard for that to happen upon disposal if not completed
    }
}