在两个线程上交错处理

Interleaving processes on two threads

我有一个可以用来访问一些表格数据的库。这个库是我访问数据的唯一途径。我使用的方法采用查询字符串和为每个结果行调用的回调。

目前,回调将每一行加载到一个列表中,然后 returns 列表。我想使用迭代器模式,但我对数据的唯一访问权限是通过此回调方法。

有没有办法在第二个线程上 运行 query/callback 并将该代码与迭代器代码交错? Psudocode:

IEnumerable<Row> QueryData(string queryString)
{
    var callerLock = create new sync lock;
    var callbackLock = create new sync lock;
    var rows = create new stack of rows with capacity 1;
    var qthread = create new thread with QueryCallback(queryString, callerLock, callbackLock, rows);

    start qthread;
    while (qthread is running)
    {
        signal callbackLock;
        wait for callerLock;

        if stack is empty
            break;

        var row = pop from rows;
        yield return row;
    }
}

void QueryCallback(string queryString, lock callerLock, lock callbackLock, Stack<Row> rows)
{
    DoQueryWithCallback(queryString, row =>
    {
        wait for callbackLock;
        push row to rows;
        signal callerLock;
    });

    signal callerLock;
}

我已尝试使用 .NET Framework 中的大部分可用锁来实现此功能,但 none 其中的锁有效。我记得尝试过 Semaphore、SemaphoreSlim、AutoResetEvent、ManualResetEvent 和 Mutex。

P.S.: DoQueryWithCallback 来自图书馆。它是一个原生库(ILSpy/Reflector/etc 无法反编译)。我想函数看起来像这样:

long DoQueryWithCallback(string queryString, Callback rowCallback)
{
    do some setup;

    Row row;
    while (next(out row))
            rowCallback(row);

    do some teardown;
}

如果我对伪代码的理解正确,您希望在后台线程上触发提取操作并在结果进入时使用迭代器生成结果,而不是等待整个提取操作完成再返回。我要更改的几件事:

  • 如果要保持获取行的顺序,请使用队列而不是堆栈
  • signalling/blocking 只需要走一条路——线程 yield 返回行需要等待获取线程将项目添加到队列中。无需阻塞获取线程

这是一个使用 TaskConcurrentQueueAutoResetEvent 的简单示例:

public IEnumerable<Row> GetRows(string query)
{
    using (var resetEvent = new AutoResetEvent(false))
    {
        var rows = new ConcurrentQueue<Row>();

        var queryTask = Task.Run(() => DoQueryWithCallback(query, r =>
        {
            rows.Enqueue(r);
            resetEvent.Set();
        }));
        queryTask.ContinueWith(t => resetEvent.Set()); // This ensures that queryTask.IsCompleted will be true in the while loop below

        while (resetEvent.WaitOne() && !queryTask.IsCompleted)
        {
            Row row;
            while (rows.TryDequeue(out row))
                yield return row;
        }
    }
}

编辑

实际上有更好的方法使用 BlockingCollection

public IEnumerable<Row> GetRows(string query)
{
    using (var rows = new BlockingCollection<Row>())
    {
        Task.Run(() =>
        {
            DoQueryWithCallback(query, r => rows.Add(r));
            rows.CompleteAdding();
        });

        while (!rows.IsCompleted)
            yield return rows.Take();
    }
}