在没有 TransactionScopeAsyncFlowOption.Enabled 的情况下启用 Async TransactionScope
Enable Async TransactionScope without TransactionScopeAsyncFlowOption.Enabled
以下是使用事务作用域的异步缓存和数据库更新。我不能使用 v 4.5.1 中引入的 TransactionScopeAsyncFlowOption.Enabled
,因为我使用的 Apache Ignite.Net 缓存不支持它。我尝试通过捕获当前 Synchronization Context
然后显式使用 Synchronization Context Send
方法来完成交易来找到解决方法,但这不起作用,因为我仍然收到错误 Transaction scope must be disposed on same thread it was created
关于如何实现 Async Update
的任何建议。 Apache Ignite 支持的建议之一是使用类似的东西:
Task.WhenAll(cacheUpdate, databaseUpdate).Wait()
,但这会使异步代码同步,因此不是最佳选择之一
public async Task Update()
{
// Capture Current Synchronization Context
var sc = SynchronizationContext.Current;
TransactionOptions tranOptions = new TransactionOptions();
tranOptions.IsolationLevel = System.Transactions.IsolationLevel.RepeatableRead;
using (var ts = new TransactionScope())
{
// Do Cache Update Operation as Async
Task cacheUpdate = // Update Cache Async
// Do Database Update Operation as Async
Task databaseUpdate = // Update Database Async
await Task.WhenAll(cacheUpdate, databaseUpdate);
sc.Send(new SendOrPostCallback(
o =>
{
ts.Complete();
}), sc);
}
}
在对博客和文章进行大量搜索后,我发现了 Stephen Toub 的以下博客,有助于在完全相同的线程上实现异步方法的延续,从而避免事务范围问题。现在我不需要 TransactionScopeAsyncFlowOption.Enabled
来获取 TransactionScope
中的异步方法 运行
https://blogs.msdn.microsoft.com/pfxteam/2012/01/20/await-synchronizationcontext-and-console-apps/
void Main()
{
// Modified Async Scheduler for Continuations to work on Exactly same thread
// Required in the case same Thread is required for Task Continuation post await
Run(async () => await DemoAsync());
"Main Complete".Dump();
}
static async Task DemoAsync()
{
// Transcation Scope test (shall dispose
using (var ts = new TransactionScope())
{
await Cache + Database Async update
ts.Complete();
"Transaction Scope Complete".Dump();
}
}
// Run Method to utilize the Single Thread Synchronization context, thus ensuring we can
// Control the threads / Synchronization context post await, cotinuation run of specific set of threads
public static void Run(Func<Task> func)
{
// Fetch Current Synchronization context
var prevCtx = SynchronizationContext.Current;
try
{
// Create SingleThreadSynchronizationContext
var syncCtx = new SingleThreadSynchronizationContext();
// Set SingleThreadSynchronizationContext
SynchronizationContext.SetSynchronizationContext(syncCtx);
// Execute Func<Task> to fetch the task to be executed
var t = func();
// On Continuation complete the SingleThreadSynchronizationContext
t.ContinueWith(
delegate { syncCtx.Complete(); }, TaskScheduler.Default);
// Ensure that SingleThreadSynchronizationContext run on a single thread
// Execute a Task and its continuation on same thread
syncCtx.RunOnCurrentThread();
// Fetch Result if any
t.GetAwaiter().GetResult();
}
// Reset the Previous Synchronization Context
finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
}
// Overriden Synchronization context, using Blocking Collection Consumer / Producer model
// Ensure that same Synchronization context / Thread / set of threads are maintained
// In this case we main a single thread for continuation post await
private sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
// BlockingCollection Consumer Producer Model
private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>>
m_queue = new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();
// Override Post, which is called during Async continuation
// Send is for Synchronous continuation
public override void Post(SendOrPostCallback d, object state)
{
m_queue.Add(
new KeyValuePair<SendOrPostCallback, object>(d, state));
}
// RunOnCurrentThread, does the job if fetching object from BlockingCollection and execute it
public void RunOnCurrentThread()
{
KeyValuePair<SendOrPostCallback, object> workItem;
while (m_queue.TryTake(out workItem, Timeout.Infinite))
workItem.Key(workItem.Value);
}
// Compete the SynchronizationContext
public void Complete() { m_queue.CompleteAdding(); }
}
以下是使用事务作用域的异步缓存和数据库更新。我不能使用 v 4.5.1 中引入的 TransactionScopeAsyncFlowOption.Enabled
,因为我使用的 Apache Ignite.Net 缓存不支持它。我尝试通过捕获当前 Synchronization Context
然后显式使用 Synchronization Context Send
方法来完成交易来找到解决方法,但这不起作用,因为我仍然收到错误 Transaction scope must be disposed on same thread it was created
关于如何实现 Async Update
的任何建议。 Apache Ignite 支持的建议之一是使用类似的东西:
Task.WhenAll(cacheUpdate, databaseUpdate).Wait()
,但这会使异步代码同步,因此不是最佳选择之一
public async Task Update()
{
// Capture Current Synchronization Context
var sc = SynchronizationContext.Current;
TransactionOptions tranOptions = new TransactionOptions();
tranOptions.IsolationLevel = System.Transactions.IsolationLevel.RepeatableRead;
using (var ts = new TransactionScope())
{
// Do Cache Update Operation as Async
Task cacheUpdate = // Update Cache Async
// Do Database Update Operation as Async
Task databaseUpdate = // Update Database Async
await Task.WhenAll(cacheUpdate, databaseUpdate);
sc.Send(new SendOrPostCallback(
o =>
{
ts.Complete();
}), sc);
}
}
在对博客和文章进行大量搜索后,我发现了 Stephen Toub 的以下博客,有助于在完全相同的线程上实现异步方法的延续,从而避免事务范围问题。现在我不需要 TransactionScopeAsyncFlowOption.Enabled
来获取 TransactionScope
https://blogs.msdn.microsoft.com/pfxteam/2012/01/20/await-synchronizationcontext-and-console-apps/
void Main()
{
// Modified Async Scheduler for Continuations to work on Exactly same thread
// Required in the case same Thread is required for Task Continuation post await
Run(async () => await DemoAsync());
"Main Complete".Dump();
}
static async Task DemoAsync()
{
// Transcation Scope test (shall dispose
using (var ts = new TransactionScope())
{
await Cache + Database Async update
ts.Complete();
"Transaction Scope Complete".Dump();
}
}
// Run Method to utilize the Single Thread Synchronization context, thus ensuring we can
// Control the threads / Synchronization context post await, cotinuation run of specific set of threads
public static void Run(Func<Task> func)
{
// Fetch Current Synchronization context
var prevCtx = SynchronizationContext.Current;
try
{
// Create SingleThreadSynchronizationContext
var syncCtx = new SingleThreadSynchronizationContext();
// Set SingleThreadSynchronizationContext
SynchronizationContext.SetSynchronizationContext(syncCtx);
// Execute Func<Task> to fetch the task to be executed
var t = func();
// On Continuation complete the SingleThreadSynchronizationContext
t.ContinueWith(
delegate { syncCtx.Complete(); }, TaskScheduler.Default);
// Ensure that SingleThreadSynchronizationContext run on a single thread
// Execute a Task and its continuation on same thread
syncCtx.RunOnCurrentThread();
// Fetch Result if any
t.GetAwaiter().GetResult();
}
// Reset the Previous Synchronization Context
finally { SynchronizationContext.SetSynchronizationContext(prevCtx); }
}
// Overriden Synchronization context, using Blocking Collection Consumer / Producer model
// Ensure that same Synchronization context / Thread / set of threads are maintained
// In this case we main a single thread for continuation post await
private sealed class SingleThreadSynchronizationContext : SynchronizationContext
{
// BlockingCollection Consumer Producer Model
private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>>
m_queue = new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();
// Override Post, which is called during Async continuation
// Send is for Synchronous continuation
public override void Post(SendOrPostCallback d, object state)
{
m_queue.Add(
new KeyValuePair<SendOrPostCallback, object>(d, state));
}
// RunOnCurrentThread, does the job if fetching object from BlockingCollection and execute it
public void RunOnCurrentThread()
{
KeyValuePair<SendOrPostCallback, object> workItem;
while (m_queue.TryTake(out workItem, Timeout.Infinite))
workItem.Key(workItem.Value);
}
// Compete the SynchronizationContext
public void Complete() { m_queue.CompleteAdding(); }
}