如何将此 WebSocket 模式转换为我可以的任务 await/cancel/continuewith/

How to convert this WebSocket pattern to a Task I can await/cancel/continuewith/

我有第 3 方 WebSocket,我需要在其中以一种方法发送请求并以另一种方法给出响应的情况下使用它。我如何将这种类型的模式转换为更典型的 async/await TPL 任务,该任务将支持取消(通过令牌)、继续以及所有其他好东西。

这就是我到目前为止的想法,尽管我不确定它是否有效。我要到星期一才能测试。

所以这是我的问题:

  1. 这行得通吗?

  2. 我一直在阅读 TaskCompletionSource。有没有更好的方法可以使用 TaskCompletionSource?

  3. 我真的不喜欢锁,因为我知道它有可能阻塞很长时间,但我不确定如何做得更好,因为如果我不这样做锁定对 AsyncGetPositions 的第二次调用可以清除任何已返回的头寸。

  4. 即使使用 lock,我也知道如果超时或取消会造成问题,所以也许我只是删除取消令牌。我唯一能想到的另一件事是创建多个客户端,这些客户端都经过身份验证并准备好处理请求,然后像管理这些类型的请求的线程池一样管理它们,但我不会很快这样做除此之外... idk.

    private object GetPositionsLock = new object();
    private IEnumerable<Position> Positions { get; } = new List<Position>();
    private Task PositionsReturned { get; set; }
    public async Task<List<Position>> AsyncGetPositions(CancellationToken token)
    {
        try
        {
            lock (GetPositionsLock)
            {
                Positions.Clear();
                IbWebSocket.reqPositions();
                PositionsReturned = new Task(null, token, TaskCreationOptions.None);
                PositionsReturned.GetAwaiter().GetResult();
                return token.IsCancellationRequested ? null : Positions.ToList().Where(x => x.Shares != 0).ToList();
            }
        }
        catch (TimeoutException ex)
        {
            //LogTimeout
            throw;
        }
        catch (Exception ex)
        {
            //LogError
            throw;
        }
    }
    
    ///  <summary>
    ///         Provides a position to the reqPositions() method.  When the last position has been received positionEnd() is called.
    ///  </summary>
    ///  <param name="contract"></param>
    ///  <param name="value"></param>
    ///  <param name="account"></param>
    public void position(string account, Contract contract, double value)
    {
        try
        {
            Positions.Concat(new Position(account, contract, value));
        }
        catch (Exception ex)
        {
            //LogError
            throw;
        }
    }
    
    /// <summary>
    ///     Indicates all the positions have been transmitted.
    /// </summary>
    public void positionEnd()
    {
        PositionsReturned = Task.CompletedTask;
    }
    

Will this work?

没有。你shouldn't use the Task constructor, use lock with async code, or mix blocking with asynchronous code.

I've been reading about TaskCompletionSource. Is there a better way to do any of this possibly with TaskCompletionSource?

是的,这是用于此场景的类型。

I really don't like the lock because I know it has the potential to block for a long time but but I'm not sure how to do it any better because if I don't lock a second call to AsyncGetPositions could clear any positions already returned.

我建议先 让这个工作,然后 然后 处理额外的重入要求。其中每一个都已经够难了。


您要做的是拥有一个 TaskCompletionSource<T> 并在调用 positionEnd 时完成它。为简单起见,从没有重入问题和没有 CancellationToken 的情况开始。一旦你完全理解TaskCompletionSource<T>,那么你可以增加复杂性:

private List<Position> Positions { get; } = new();
private TaskCompletionSource<List<Position>> PositionsReturned { get; set; }
public Task<List<Position>> AsyncGetPositions()
{
  Positions.Clear();
  PositionsReturned = new();
  IbWebSocket.reqPositions();
  return PositionsReturned.Task;
}

public void position(string account, Contract contract, double value)
{
  Positions.Add(new Position(account, contract, value));
}

public void positionEnd()
{
  PositionsReturned.TrySetResult(Positions);
}