在 TPL 数据流循环中完成

Completion in TPL Dataflow Loops

我在确定如何检测循环 TPL 数据流中的完成时遇到问题。

我在数据流的一部分有一个反馈循环,它正在向远程服务器发出 GET 请求并处理数据响应(用更多数据流转换这些响应,然后提交结果)。

数据源将其结果分成 1000 条记录的页面,并且不会告诉我它有多少页可供我使用。我必须继续阅读,直到我得到不到一整页的数据。

通常页数是1页,经常是10页,偶尔有1000页。

一开始我有很多请求要获取。
我希望能够使用一个线程池来处理这个,这一切都很好,我可以对多个数据请求进行排队并同时请求它们。如果我偶然发现需要获取大量页面的实例,我想为此使用我的所有线程。我不想留下一个线程在其他线程完成时被搅动。

我遇到的问题是当我将此逻辑放入数据流时,例如:

//generate initial requests for activity
var request = new TransformManyBlock<int, DataRequest>(cmp => QueueRequests(cmp));

//fetch the initial requests and feedback more requests to our input buffer if we need to
TransformBlock<DataRequest, DataResponse> fetch = null;
fetch = new TransformBlock<DataRequest, DataResponse>(async req =>
{
    var resp = await Fetch(req);

    if (resp.Results.Count == 1000)
        await fetch.SendAsync(QueueAnotherRequest(req));

    return resp;
}
, new ExecutionDataflowBlockOptions {  MaxDegreeOfParallelism = 10 });

//commit each type of request
var commit = new ActionBlock<DataResponse>(async resp => await Commit(resp));

request.LinkTo(fetch);
fetch.LinkTo(commit);

//when are we complete?

QueueRequests 生成 IEnumerable<DataRequest>。我立即将下 N 个页面请求排队,接受这意味着我发送的呼叫比我需要的略多。 DataRequest 实例共享一个 LastPage 计数器,以避免不必要地发出我们知道在最后一页之后的请求。这一切都很好。

问题:
如果我通过将更多请求反馈到 fetch 的输入缓冲区来循环,如我在本例中所示,那么我会遇到如何发出(甚至检测)完成信号的问题。我无法设置从请求中提取完成,因为一旦设置完成我就不能再反馈了。

我可以在获取时监视输入和输出缓冲区是否为空,但我认为当我设置完成时我冒着获取仍然忙于请求的风险,从而防止对额外页面的请求排队。

我可以通过某种方式知道 fetch 正忙(有输入或正忙于处理输入)。

我是否缺少解决此问题的 obvious/straightforward 方法?

非常感谢您提供的任何帮助!

现在我已经在获取块中添加了一个简单的忙碌状态计数器:-

int fetch_busy = 0;

TransformBlock<DataRequest, DataResponse>  fetch_activity=null;
fetch = new TransformBlock<DataRequest, ActivityResponse>(async req => 
    {
        try
        {
            Interlocked.Increment(ref fetch_busy);
            var resp = await Fetch(req);

            if (resp.Results.Count == 1000)
            {
                await fetch.SendAsync( QueueAnotherRequest(req) );
            }

            Interlocked.Decrement(ref fetch_busy);
            return resp;
        }
        catch (Exception ex)
        {
            Interlocked.Decrement(ref fetch_busy);
            throw ex;
        }
    }
    , new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });

然后我用它来表示完成如下:-

request.Completion.ContinueWith(async _ =>
    {
        while ( fetch.InputCount > 0 || fetch_busy > 0 )
        {
            await Task.Delay(100);
        }

        fetch.Complete();
    });

这看起来不太优雅,但我认为应该可行。

在 TPL 数据流中,您可以 link the blocks with DataflowLinkOptions with specifying the propagation of completion of the block:

request.LinkTo(fetch, new DataflowLinkOptions { PropagateCompletion = true });
fetch.LinkTo(commit, new DataflowLinkOptions { PropagateCompletion = true });

之后,您只需为 request 块调用 Complete() 方法,就大功告成了!

// the completion will be propagated to all the blocks
request.Complete();

你应该使用的最后一件事是最后一个块的Completion任务属性:

commit.Completion.ContinueWith(t =>
    {
        /* check the status of the task and correctness of the requests handling */
    });