在没有 TransactionScopeAsyncFlowOption.Enabled 的情况下启用 Async TransactionScope

Enable Async TransactionScope without TransactionScopeAsyncFlowOption.Enabled

以下是使用事务作用域的异步缓存和数据库更新。我不能使用 v 4.5.1 中引入的 TransactionScopeAsyncFlowOption.Enabled,因为我使用的 Apache Ignite.Net 缓存不支持它。我尝试通过捕获当前 Synchronization Context 然后显式使用 Synchronization Context Send 方法来完成交易来找到解决方法,但这不起作用,因为我仍然收到错误 Transaction scope must be disposed on same thread it was created

关于如何实现 Async Update 的任何建议。 Apache Ignite 支持的建议之一是使用类似的东西:

Task.WhenAll(cacheUpdate, databaseUpdate).Wait(),但这会使异步代码同步,因此不是最佳选择之一

public async Task Update()
{
    // Capture Current Synchronization Context
    var sc = SynchronizationContext.Current;

    TransactionOptions tranOptions = new TransactionOptions();
    tranOptions.IsolationLevel = System.Transactions.IsolationLevel.RepeatableRead;


    using (var ts = new TransactionScope())
    {
        // Do Cache Update Operation as Async
        Task cacheUpdate = // Update Cache Async

        // Do Database Update Operation as Async
        Task databaseUpdate = // Update Database Async

        await Task.WhenAll(cacheUpdate, databaseUpdate);

                sc.Send(new SendOrPostCallback(
                o =>
                {
                    ts.Complete();
                }), sc);        
    }
}

在对博客和文章进行大量搜索后,我发现了 Stephen Toub 的以下博客,有助于在完全相同的线程上实现异步方法的延续,从而避免事务范围问题。现在我不需要 TransactionScopeAsyncFlowOption.Enabled 来获取 TransactionScope

中的异步方法 运行

https://blogs.msdn.microsoft.com/pfxteam/2012/01/20/await-synchronizationcontext-and-console-apps/

void Main()
{
    // Modified Async Scheduler for Continuations to work on Exactly same thread
    // Required in the case same Thread is required for Task Continuation post await
    Run(async () => await DemoAsync());

    "Main Complete".Dump();
}

static async Task DemoAsync()
{
    // Transcation Scope test (shall dispose 
    using (var ts = new TransactionScope())
    {            
        await Cache + Database Async update
        ts.Complete();
        "Transaction Scope Complete".Dump();
    }   
}

// Run Method to utilize the Single Thread Synchronization context, thus ensuring we can
// Control the threads / Synchronization context post await, cotinuation run of specific set of threads

public static void Run(Func<Task> func)
{
    // Fetch Current Synchronization context
    var prevCtx = SynchronizationContext.Current;

    try
    {
        // Create SingleThreadSynchronizationContext
        var syncCtx = new SingleThreadSynchronizationContext();

        // Set SingleThreadSynchronizationContext
        SynchronizationContext.SetSynchronizationContext(syncCtx);

        // Execute Func<Task> to fetch the task to be executed
        var t = func();

        // On Continuation complete the SingleThreadSynchronizationContext
        t.ContinueWith(
            delegate { syncCtx.Complete(); }, TaskScheduler.Default);

        // Ensure that SingleThreadSynchronizationContext run on a single thread
        // Execute a Task and its continuation on same thread
        syncCtx.RunOnCurrentThread();

        // Fetch Result if any
        t.GetAwaiter().GetResult();
    }
    // Reset the Previous Synchronization Context
    finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
}

// Overriden Synchronization context, using Blocking Collection Consumer / Producer model
// Ensure that same Synchronization context / Thread / set of threads are maintained
// In this case we main a single thread for continuation post await

private sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
    // BlockingCollection Consumer Producer Model
    private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>>
      m_queue = new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();

    // Override Post, which is called during Async continuation
    // Send is for Synchronous continuation
    public override void Post(SendOrPostCallback d, object state)
    {
        m_queue.Add(
            new KeyValuePair<SendOrPostCallback, object>(d, state));
    }

    // RunOnCurrentThread, does the job if fetching object from BlockingCollection and execute it
    public void RunOnCurrentThread()
    {
        KeyValuePair<SendOrPostCallback, object> workItem;
        while (m_queue.TryTake(out workItem, Timeout.Infinite))
            workItem.Key(workItem.Value);
    }

    // Compete the SynchronizationContext
    public void Complete() { m_queue.CompleteAdding(); }
}