如何包装 async/await 的多线程回调?
How to wrap a multithreaded callback for async/await?
我有 async/await 代码并想使用类似于 websocket 的 API。它需要一个回调来接收从另一个线程调用的新消息。
我可以在与连接启动相同的 async/await 上下文中执行此回调而不诉诸锁定吗?
我认为这就是 SynchronizationContext 的用途,但我无法判断它是否线程安全。如果我记录 thread-id,每个回调将在不同的线程上。如果我记录 Task.CurrentId 它为空。我认为相同的同步上下文在不同线程之间移动,所以这可能没问题,但我不知道如何确认。
// External api, the callbacks will be from multiple threads
public class Api
{
public static Connect(
Action<Connection> onConnect,
Action<Connection> onMessage)
{}
}
async Task<Connection> ConnectAsync(Action<Message> callback)
{
if (SynchronizationContext.Current == null)
{
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
}
var syncContext = SynchronizationContext.Current;
var tcs = new TaskCompletionSource<Connection>();
// use post() to ensure callbacks from other threads are executed thread-safely
Action<Connection> onConnect = conn =>
{
syncContext.Post(o => tcs.SetResult(conn), null);
};
Action<Message> onMsg = msg =>
{
syncContext.Post(o => callback(msg), null);
};
// call the multi-threaded non async/await api supplying the callbacks
Api.Connect(onConnect, onMsg);
return await tcs.Task;
}
var connection = await ConnectAsync(
msg =>
{
/* is code here threadsafe with the send without extra locking? */
});
await connection.Send("Hello world);
感谢@Evk,他指出默认的 SynchronizationContext 实际上并没有同步任何东西,也没有按照您期望的方式实现 send/post。
https://github.com/StephenClearyArchive/AsyncEx.Context
解决方法是使用 Stephen Cleary 的异步库,该库将 SynchronizationContext 实现为单个线程中的消息泵,以便 post() 调用与其他等待调用在同一线程中被调用。
// External api, the callbacks will be from multiple threads
public class Api
{
public static Connect(
Action<Connection> onConnect,
Action<Connection> onMessage)
{}
}
async Task<Connection> ConnectAsync(Action<Message> callback)
{
var syncContext = SynchronizationContext.Current;
var tcs = new TaskCompletionSource<Connection>();
// use post() to ensure callbacks from other threads are executed thread-safely
Action<Connection> onConnect = conn =>
{
syncContext.Post(o => tcs.SetResult(conn), null);
};
Action<Message> onMsg = msg =>
{
syncContext.Post(o => callback(msg), null);
};
// call the multi-threaded non async/await api supplying the callbacks
Api.Connect(onConnect, onMsg);
return await tcs.Task;
}
//
// https://github.com/StephenClearyArchive/AsyncEx.Context
//
Nito.AsyncEx.AsyncContext.Run(async () =>
{
var connection = await ConnectAsync(
msg =>
{
/* this will execute in same thread as ConnectAsync and Send */
});
await connection.Send("Hello world);
... more async/await code
});
我有 async/await 代码并想使用类似于 websocket 的 API。它需要一个回调来接收从另一个线程调用的新消息。
我可以在与连接启动相同的 async/await 上下文中执行此回调而不诉诸锁定吗?
我认为这就是 SynchronizationContext 的用途,但我无法判断它是否线程安全。如果我记录 thread-id,每个回调将在不同的线程上。如果我记录 Task.CurrentId 它为空。我认为相同的同步上下文在不同线程之间移动,所以这可能没问题,但我不知道如何确认。
// External api, the callbacks will be from multiple threads
public class Api
{
public static Connect(
Action<Connection> onConnect,
Action<Connection> onMessage)
{}
}
async Task<Connection> ConnectAsync(Action<Message> callback)
{
if (SynchronizationContext.Current == null)
{
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
}
var syncContext = SynchronizationContext.Current;
var tcs = new TaskCompletionSource<Connection>();
// use post() to ensure callbacks from other threads are executed thread-safely
Action<Connection> onConnect = conn =>
{
syncContext.Post(o => tcs.SetResult(conn), null);
};
Action<Message> onMsg = msg =>
{
syncContext.Post(o => callback(msg), null);
};
// call the multi-threaded non async/await api supplying the callbacks
Api.Connect(onConnect, onMsg);
return await tcs.Task;
}
var connection = await ConnectAsync(
msg =>
{
/* is code here threadsafe with the send without extra locking? */
});
await connection.Send("Hello world);
感谢@Evk,他指出默认的 SynchronizationContext 实际上并没有同步任何东西,也没有按照您期望的方式实现 send/post。
https://github.com/StephenClearyArchive/AsyncEx.Context
解决方法是使用 Stephen Cleary 的异步库,该库将 SynchronizationContext 实现为单个线程中的消息泵,以便 post() 调用与其他等待调用在同一线程中被调用。
// External api, the callbacks will be from multiple threads
public class Api
{
public static Connect(
Action<Connection> onConnect,
Action<Connection> onMessage)
{}
}
async Task<Connection> ConnectAsync(Action<Message> callback)
{
var syncContext = SynchronizationContext.Current;
var tcs = new TaskCompletionSource<Connection>();
// use post() to ensure callbacks from other threads are executed thread-safely
Action<Connection> onConnect = conn =>
{
syncContext.Post(o => tcs.SetResult(conn), null);
};
Action<Message> onMsg = msg =>
{
syncContext.Post(o => callback(msg), null);
};
// call the multi-threaded non async/await api supplying the callbacks
Api.Connect(onConnect, onMsg);
return await tcs.Task;
}
//
// https://github.com/StephenClearyArchive/AsyncEx.Context
//
Nito.AsyncEx.AsyncContext.Run(async () =>
{
var connection = await ConnectAsync(
msg =>
{
/* this will execute in same thread as ConnectAsync and Send */
});
await connection.Send("Hello world);
... more async/await code
});